Bound the max number of messages saved for matching
We have seen log files with a bunch of data but no timestamps logged.
This can happen if the timestamps are not logged, or a ton of messages
aren't delivered.
In this case, TimestampMapper accumulates up all the data in the hopes
that there is a matching timestamp at some point in the future.
Eventually, this is futile and just results in a memory explosion. I've
seen logs exhaust 64 GB of ram, which is pretty ridiculous.
Instead, bound the amount of time we save these messages by the TTL and
an expected network delay. This is a bit scary since if we throw the
data out too early, we'll declare it as an early end of the log, and
someone might not notice. That seems like a safer failure mode right
now than producing unreadable logs.
Change-Id: I4e0d93e77e5ae3b3b2ee1e62e829009cd56b09be
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 9c7fe5d..9059eb3 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -36,6 +36,13 @@
"Max time to let data sit in the queue before flushing in seconds.");
DEFINE_double(
+ max_network_delay, 1.0,
+ "Max time to assume a message takes to cross the network before we are "
+ "willing to drop it from our buffers and assume it didn't make it. "
+ "Increasing this number can increase memory usage depending on the packet "
+ "loss of your network or if the timestamps aren't logged for a message.");
+
+DEFINE_double(
max_out_of_order, -1,
"If set, this overrides the max out of order duration for a log file.");
@@ -1083,6 +1090,12 @@
(my_node != node);
node_data->any_delivered = node_data->any_delivered ||
node_data->channels[channel_index].delivered;
+ if (node_data->channels[channel_index].delivered) {
+ const Connection *connection =
+ configuration::ConnectionToNode(channel, node);
+ node_data->channels[channel_index].time_to_live =
+ chrono::nanoseconds(connection->time_to_live());
+ }
++channel_index;
}
}
@@ -1264,6 +1277,7 @@
void TimestampMapper::PopFront() {
CHECK(first_message_ != FirstMessage::kNeedsUpdate);
+ last_popped_message_time_ = Front()->monotonic_event_time;
first_message_ = FirstMessage::kNeedsUpdate;
matched_messages_.pop_front();
@@ -1347,8 +1361,7 @@
CHECK_EQ(result.timestamp, monotonic_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please investigate!";
- CHECK_EQ(result.data->realtime_sent_time,
- realtime_remote_time)
+ CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please investigate!";
// Now drop the data off the front. We have deduplicated timestamps, so we
// are done. And all the data is in order.
@@ -1423,10 +1436,38 @@
if (!node_data.any_delivered) continue;
if (!node_data.save_for_peer) continue;
if (node_data.channels[m->channel_index].delivered) {
- // TODO(austin): This copies the data... Probably not worth stressing
- // about yet.
- // TODO(austin): Bound how big this can get. We tend not to send
- // massive data, so we can probably ignore this for a bit.
+ // If we have data but no timestamps (logs where the timestamps didn't get
+ // logged are classic), we can grow this indefinitely. We don't need to
+ // keep anything that is older than the last message returned.
+
+ // We have the time on the source node.
+ // We care to wait until we have the time on the destination node.
+ std::deque<Message> &messages =
+ node_data.channels[m->channel_index].messages;
+ // Max delay over the network is the TTL, so let's take the queue time and
+ // add TTL to it. Don't forget any messages which are reliable until
+ // someone can come up with a good reason to forget those too.
+ if (node_data.channels[m->channel_index].time_to_live >
+ chrono::nanoseconds(0)) {
+ // We need to make *some* assumptions about network delay for this to
+ // work. We want to only look at the RX side. This means we need to
+ // track the last time a message was popped from any channel from the
+ // node sending this message, and compare that to the max time we expect
+ // that a message will take to be delivered across the network. This
+ // assumes that messages are popped in time order as a proxy for
+ // measuring the distributed time at this layer.
+ //
+ // Leave at least 1 message in here so we can handle reboots and
+ // messages getting sent twice.
+ while (messages.size() > 1u &&
+ messages.begin()->timestamp +
+ node_data.channels[m->channel_index].time_to_live +
+ chrono::duration_cast<chrono::nanoseconds>(
+ chrono::duration<double>(FLAGS_max_network_delay)) <
+ last_popped_message_time_) {
+ messages.pop_front();
+ }
+ }
node_data.channels[m->channel_index].messages.emplace_back(*m);
}
}