Don't queue all future messages in memory in LogReader
When we were dropping messages at the end of the log file without data,
we were doing that by reading to the end of the file. This actually
triggers all the rest of the nodes to queue up their data until that
point, which can cause a lot of memory to be used rather quickly.
Instead, just mark down that we shouldn't be sending and don't send.
This keeps things memory efficient.
Change-Id: I7417d4d5b720ab7986b5a55f9efba5612f962d58
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index a80b0e5..f638d8d 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -706,7 +706,7 @@
state->monotonic_start_time(
timestamped_message.monotonic_event_time.boot) ||
event_loop_factory_ != nullptr) {
- if (timestamped_message.data != nullptr) {
+ if (timestamped_message.data != nullptr && !state->found_last_message()) {
if (timestamped_message.monotonic_remote_time !=
BootTimestamp::min_time()) {
// Confirm that the message was sent on the sending node before the
@@ -774,34 +774,37 @@
// TODO(austin): std::move channel_data in and make that efficient in
// simulation.
state->Send(std::move(timestamped_message));
- } else if (!ignore_missing_data_ &&
- // When starting up, we can have data which was sent before the
- // log starts, but the timestamp was after the log starts. This
- // is unreasonable to avoid, so ignore the missing data.
- timestamped_message.monotonic_remote_time.time >=
- state->monotonic_remote_start_time(
- timestamped_message.monotonic_remote_time.boot,
- timestamped_message.channel_index) &&
- !FLAGS_skip_missing_forwarding_entries) {
- // We've found a timestamp without data that we expect to have data for.
- // This likely means that we are at the end of the log file. Record it
- // and CHECK that in the rest of the log file, we don't find any more
- // data on that channel. Not all channels will end at the same point in
- // time since they can be in different files.
- VLOG(1) << "Found the last message on channel "
- << timestamped_message.channel_index << ", "
- << configuration::CleanedChannelToString(
- logged_configuration()->channels()->Get(
- timestamped_message.channel_index))
- << " " << timestamped_message;
+ } else if (state->found_last_message() ||
+ (!ignore_missing_data_ &&
+ // When starting up, we can have data which was sent before
+ // the log starts, but the timestamp was after the log
+ // starts. This is unreasonable to avoid, so ignore the
+ // missing data.
+ timestamped_message.monotonic_remote_time.time >=
+ state->monotonic_remote_start_time(
+ timestamped_message.monotonic_remote_time.boot,
+ timestamped_message.channel_index) &&
+ !FLAGS_skip_missing_forwarding_entries)) {
+ if (!state->found_last_message()) {
+ // We've found a timestamp without data that we expect to have data
+ // for. This likely means that we are at the end of the log file.
+ // Record it and CHECK that in the rest of the log file, we don't find
+ // any more data on that channel. Not all channels will end at the
+ // same point in time since they can be in different files.
+ VLOG(1) << "Found the last message on channel "
+ << timestamped_message.channel_index << ", "
+ << configuration::CleanedChannelToString(
+ logged_configuration()->channels()->Get(
+ timestamped_message.channel_index))
+ << " on node " << MaybeNodeName(state->event_loop()->node())
+ << timestamped_message;
- // The user might be working with log files from 1 node but forgot to
- // configure the infrastructure to log data for a remote channel on that
- // node. That can be very hard to debug, even though the log reader is
- // doing the right thing. At least log a warning in that case and tell
- // the user what is happening so they can either update their config to
- // log the channel or can find a log with the data.
- {
+ // The user might be working with log files from 1 node but forgot to
+ // configure the infrastructure to log data for a remote channel on
+ // that node. That can be very hard to debug, even though the log
+ // reader is doing the right thing. At least log a warning in that
+ // case and tell the user what is happening so they can either update
+ // their config to log the channel or can find a log with the data.
const std::vector<std::string> logger_nodes =
FindLoggerNodes(log_files_);
if (logger_nodes.size()) {
@@ -829,41 +832,32 @@
"from one of the nodes it is logged on.";
}
}
+ // Now that we found the end of one channel, artificially stop the
+ // rest by setting the found_last_message bit. It is confusing when
+ // part of your data gets replayed but not all. The rest of them will
+ // get dropped as they are replayed to keep memory usage down.
+ state->SetFoundLastMessage(true);
+
+ // Vector storing if we've seen a nullptr message or not per channel.
+ state->set_last_message(timestamped_message.channel_index);
}
- // Vector storing if we've seen a nullptr message or not per channel.
- std::vector<bool> last_message;
- last_message.resize(logged_configuration()->channels()->size(), false);
-
- last_message[timestamped_message.channel_index] = true;
-
- // Now that we found the end of one channel, artificially stop the
- // rest. It is confusing when part of your data gets replayed but not
- // all. Read the rest of the messages and drop them on the floor while
- // doing some basic validation.
- while (state->OldestMessageTime() != BootTimestamp::max_time()) {
- // TODO(austin): This force queues up the rest of the log file for all
- // the other nodes. We should do this through the timer instead to
- // keep memory usage down.
- TimestampedMessage next = state->PopOldest();
- // Make sure that once we have seen the last message on a channel,
- // data doesn't start back up again. If the user wants to play
- // through events like this, they can set
- // --skip_missing_forwarding_entries or ignore_missing_data_.
- CHECK_LT(next.channel_index, last_message.size());
- if (next.data == nullptr) {
- last_message[next.channel_index] = true;
- } else {
- if (last_message[next.channel_index]) {
- LOG(FATAL)
- << "Found missing data in the middle of the log file on "
- "channel "
- << next.channel_index << " "
- << configuration::StrippedChannelToString(
- logged_configuration()->channels()->Get(
- next.channel_index))
- << " " << next << " " << state->DebugString();
- }
+ // Make sure that once we have seen the last message on a channel,
+ // data doesn't start back up again. If the user wants to play
+ // through events like this, they can set
+ // --skip_missing_forwarding_entries or ignore_missing_data_.
+ if (timestamped_message.data == nullptr) {
+ state->set_last_message(timestamped_message.channel_index);
+ } else {
+ if (state->last_message(timestamped_message.channel_index)) {
+ LOG(FATAL) << "Found missing data in the middle of the log file on "
+ "channel "
+ << timestamped_message.channel_index << " "
+ << configuration::StrippedChannelToString(
+ logged_configuration()->channels()->Get(
+ timestamped_message.channel_index))
+ << " " << timestamped_message << " "
+ << state->DebugString();
}
}
}