Fix AOS support for realtime replay

This patch fixes the use-case where you provide a single event loop and
just want to replay logged events from the perspective of that event
loop's node.

Includes a test running a ShmEventLoop against a single-node logfile,
and running a multi-node replay into a single EventLoop in
simulation (the choice of single vs multi node here is arbitrary--it
should work with both single and multi node configs in both
simulation and shm).

This has a few caveats:
* Doesn't replay remote timestamps currently.
* Doesn't correct for implied changes in node<->node offsets due to
  changes in the clock.
* Had to add a flag to choose how to manage fetcher behavior for
  messages from before the start of the log.

Change-Id: I8f101e8774e0923bacc4f7e1bf58c9da02fd0d3f
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 1c3a349..a4d6cd0 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -15,6 +15,7 @@
 #include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/flatbuffer_merge.h"
+#include "aos/json_to_flatbuffer.h"
 #include "aos/network/multinode_timestamp_filter.h"
 #include "aos/network/remote_message_generated.h"
 #include "aos/network/remote_message_schema.h"
@@ -46,6 +47,12 @@
     end_time, "",
     "If set, end at this point in time in the log on the realtime clock.");
 
+DEFINE_bool(drop_realtime_messages_before_start, false,
+            "If set, will drop any messages sent before the start of the "
+            "logfile in realtime replay. Setting this guarantees consistency "
+            "in timing with the original logfile, but means that you lose "
+            "access to fetched low-frequency messages.");
+
 namespace aos {
 namespace configuration {
 // We don't really want to expose this publicly, but log reader doesn't really
@@ -162,6 +169,11 @@
 
   ~EventNotifier() { event_timer_->Disable(); }
 
+  // Sets the clock offset for realtime playback.
+  void SetClockOffset(std::chrono::nanoseconds clock_offset) {
+    clock_offset_ = clock_offset;
+  }
+
   // Returns the event trigger time.
   realtime_clock::time_point realtime_event_time() const {
     return realtime_event_time_;
@@ -189,7 +201,7 @@
       // Whops, time went backwards.  Just do it now.
       HandleTime();
     } else {
-      event_timer_->Setup(candidate_monotonic);
+      event_timer_->Setup(candidate_monotonic + clock_offset_);
     }
   }
 
@@ -208,6 +220,8 @@
   const realtime_clock::time_point realtime_event_time_ =
       realtime_clock::min_time;
 
+  std::chrono::nanoseconds clock_offset_{0};
+
   bool called_ = false;
 };
 
@@ -325,9 +339,7 @@
   }
 
   if (!configuration::MultiNode(configuration())) {
-    states_.emplace_back(std::make_unique<State>(
-        std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, "")),
-        nullptr));
+    states_.resize(1);
   } else {
     if (replay_configuration) {
       CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -494,7 +506,7 @@
         filtered_parts.size() == 0u
             ? nullptr
             : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
-        node);
+        filters_.get(), node);
     State *state = states_[node_index].get();
     state->SetNodeEventLoopFactory(
         event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -661,8 +673,58 @@
   return nullptr;
 }
 
+// TODO(jkuszmaul): Make in-line modifications to
+// ServerStatistics/ClientStatistics messages for ShmEventLoop-based replay to
+// avoid messing up anything that depends on them having valid offsets.
 void LogReader::Register(EventLoop *event_loop) {
-  Register(event_loop, event_loop->node());
+  filters_ =
+      std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
+          event_loop->configuration(), logged_configuration(),
+          log_files_[0].boots, FLAGS_skip_order_validation,
+          chrono::duration_cast<chrono::nanoseconds>(
+              chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
+
+  std::vector<TimestampMapper *> timestamp_mappers;
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    const size_t node_index =
+        configuration::GetNodeIndex(configuration(), node);
+    std::vector<LogParts> filtered_parts = FilterPartsForNode(
+        log_files_, node != nullptr ? node->name()->string_view() : "");
+
+    states_[node_index] = std::make_unique<State>(
+        filtered_parts.size() == 0u
+            ? nullptr
+            : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
+        filters_.get(), node);
+    State *state = states_[node_index].get();
+
+    state->SetChannelCount(logged_configuration()->channels()->size());
+    timestamp_mappers.emplace_back(state->timestamp_mapper());
+  }
+
+  filters_->SetTimestampMappers(std::move(timestamp_mappers));
+
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    const size_t node_index =
+        configuration::GetNodeIndex(configuration(), node);
+    State *state = states_[node_index].get();
+    for (const Node *other_node : configuration::GetNodes(configuration())) {
+      const size_t other_node_index =
+          configuration::GetNodeIndex(configuration(), other_node);
+      State *other_state = states_[other_node_index].get();
+      if (other_state != state) {
+        state->AddPeer(other_state);
+      }
+    }
+  }
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    if (node == nullptr || node->name()->string_view() ==
+                               event_loop->node()->name()->string_view()) {
+      Register(event_loop, event_loop->node());
+    } else {
+      Register(nullptr, node);
+    }
+  }
 }
 
 void LogReader::Register(EventLoop *event_loop, const Node *node) {
@@ -674,7 +736,10 @@
   if (state->OldestMessageTime() == BootTimestamp::max_time()) {
     return;
   }
-  ++live_nodes_;
+
+  if (event_loop != nullptr) {
+    ++live_nodes_;
+  }
 
   if (event_loop_factory_ != nullptr) {
     event_loop_factory_->GetNodeEventLoopFactory(node)->OnStartup(
@@ -687,14 +752,14 @@
 }
 
 void LogReader::RegisterDuringStartup(EventLoop *event_loop, const Node *node) {
-  if (event_loop) {
+  if (event_loop != nullptr) {
     CHECK(event_loop->configuration() == configuration());
   }
 
   State *state =
       states_[configuration::GetNodeIndex(configuration(), node)].get();
 
-  if (!event_loop) {
+  if (event_loop == nullptr) {
     state->ClearTimeFlags();
   }
 
@@ -703,7 +768,7 @@
   // We don't run timing reports when trying to print out logged data, because
   // otherwise we would end up printing out the timing reports themselves...
   // This is only really relevant when we are replaying into a simulation.
-  if (event_loop) {
+  if (event_loop != nullptr) {
     event_loop->SkipTimingReport();
     event_loop->SkipAosLog();
   }
@@ -716,10 +781,10 @@
         logged_configuration()->channels()->Get(logged_channel_index));
 
     const bool logged = channel->logger() != LoggerConfig::NOT_LOGGED;
-
     message_bridge::NoncausalOffsetEstimator *filter = nullptr;
 
     State *source_state = nullptr;
+
     if (!configuration::ChannelIsSendableOnNode(channel, node) &&
         configuration::ChannelIsReadableOnNode(channel, node)) {
       const Node *source_node = configuration::GetNode(
@@ -741,7 +806,10 @@
     state->SetChannel(
         logged_channel_index,
         configuration::ChannelIndex(configuration(), channel),
-        event_loop && logged ? event_loop->MakeRawSender(channel) : nullptr,
+        event_loop && logged &&
+                configuration::ChannelIsReadableOnNode(channel, node)
+            ? event_loop->MakeRawSender(channel)
+            : nullptr,
         filter, is_forwarded, source_state);
 
     if (is_forwarded && logged) {
@@ -758,10 +826,12 @@
               states_[configuration::GetNodeIndex(
                           configuration(), connection->name()->string_view())]
                   .get();
-          destination_state->SetRemoteTimestampSender(
-              logged_channel_index,
-              event_loop ? state->RemoteTimestampSender(channel, connection)
-                         : nullptr);
+          if (destination_state) {
+            destination_state->SetRemoteTimestampSender(
+                logged_channel_index,
+                event_loop ? state->RemoteTimestampSender(channel, connection)
+                           : nullptr);
+          }
         }
       }
     }
@@ -775,14 +845,11 @@
   }
 
   state->set_timer_handler(event_loop->AddTimer([this, state]() {
-    VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
-            << "at " << state->event_loop()->context().monotonic_event_time
-            << " now " << state->monotonic_now();
     if (state->OldestMessageTime() == BootTimestamp::max_time()) {
       --live_nodes_;
       VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
       if (exit_on_finish_ && live_nodes_ == 0) {
-        event_loop_factory_->Exit();
+        CHECK_NOTNULL(event_loop_factory_)->Exit();
       }
       return;
     }
@@ -794,32 +861,37 @@
 
     const monotonic_clock::time_point monotonic_now =
         state->event_loop()->context().monotonic_event_time;
-    if (!FLAGS_skip_order_validation) {
-      CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
-          << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
-          << monotonic_now << " trying to send "
-          << timestamped_message.monotonic_event_time << " failure "
-          << state->DebugString();
-    } else if (BootTimestamp{.boot = state->boot_count(),
-                             .time = monotonic_now} !=
-               timestamped_message.monotonic_event_time) {
-      LOG(WARNING) << "Check failed: monotonic_now == "
-                      "timestamped_message.monotonic_event_time) ("
-                   << monotonic_now << " vs. "
-                   << timestamped_message.monotonic_event_time
-                   << "): " << FlatbufferToJson(state->event_loop()->node())
-                   << " Now " << monotonic_now << " trying to send "
-                   << timestamped_message.monotonic_event_time << " failure "
-                   << state->DebugString();
+    if (event_loop_factory_ != nullptr) {
+      // Only enforce exact timing in simulation.
+      if (!FLAGS_skip_order_validation) {
+        CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
+            << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
+            << monotonic_now << " trying to send "
+            << timestamped_message.monotonic_event_time << " failure "
+            << state->DebugString();
+      } else if (BootTimestamp{.boot = state->boot_count(),
+                               .time = monotonic_now} !=
+                 timestamped_message.monotonic_event_time) {
+        LOG(WARNING) << "Check failed: monotonic_now == "
+                        "timestamped_message.monotonic_event_time) ("
+                     << monotonic_now << " vs. "
+                     << timestamped_message.monotonic_event_time
+                     << "): " << FlatbufferToJson(state->event_loop()->node())
+                     << " Now " << monotonic_now << " trying to send "
+                     << timestamped_message.monotonic_event_time << " failure "
+                     << state->DebugString();
+      }
     }
 
     if (timestamped_message.monotonic_event_time.time >
             state->monotonic_start_time(
                 timestamped_message.monotonic_event_time.boot) ||
-        event_loop_factory_ != nullptr) {
+        event_loop_factory_ != nullptr ||
+        !FLAGS_drop_realtime_messages_before_start) {
       if (timestamped_message.data != nullptr && !state->found_last_message()) {
         if (timestamped_message.monotonic_remote_time !=
-            BootTimestamp::min_time()) {
+                BootTimestamp::min_time() &&
+            !FLAGS_skip_order_validation && event_loop_factory_ != nullptr) {
           // Confirm that the message was sent on the sending node before the
           // destination node (this node).  As a proxy, do this by making sure
           // that time on the source node is past when the message was sent.
@@ -890,7 +962,8 @@
                                  timestamped_message.realtime_event_time);
 
         VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
-                << timestamped_message.monotonic_event_time;
+                << timestamped_message.monotonic_event_time << " "
+                << state->DebugString();
         // TODO(austin): std::move channel_data in and make that efficient in
         // simulation.
         state->Send(std::move(timestamped_message));
@@ -982,13 +1055,13 @@
         }
       }
     } else {
-      LOG(WARNING) << "Not sending data from before the start of the log file. "
-                   << timestamped_message.monotonic_event_time.time
-                          .time_since_epoch()
-                          .count()
-                   << " start "
-                   << monotonic_start_time().time_since_epoch().count() << " "
-                   << *timestamped_message.data;
+      LOG(WARNING)
+          << "Not sending data from before the start of the log file. "
+          << timestamped_message.monotonic_event_time.time.time_since_epoch()
+                 .count()
+          << " start "
+          << monotonic_start_time(state->node()).time_since_epoch().count()
+          << " timestamped_message.data is null";
     }
 
     const BootTimestamp next_time = state->OldestMessageTime();
@@ -1002,18 +1075,26 @@
         state->NotifyLogfileEnd();
         return;
       }
-      VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
-              << "wakeup for " << next_time.time << "("
-              << state->ToDistributedClock(next_time.time)
-              << " distributed), now is " << state->monotonic_now();
+      if (event_loop_factory_ != nullptr) {
+        VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+                << "wakeup for " << next_time.time << "("
+                << state->ToDistributedClock(next_time.time)
+                << " distributed), now is " << state->monotonic_now();
+      } else {
+        VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+                << "wakeup for " << next_time.time << ", now is "
+                << state->monotonic_now();
+      }
       state->Setup(next_time.time);
     } else {
       VLOG(1) << MaybeNodeName(state->event_loop()->node())
               << "No next message, scheduling shutdown";
       state->NotifyLogfileEnd();
       // Set a timer up immediately after now to die. If we don't do this,
-      // then the senders waiting on the message we just read will never get
+      // then the watchers waiting on the message we just read will never get
       // called.
+      // Doesn't apply to single-EventLoop replay since the watchers in question
+      // are not under our control.
       if (event_loop_factory_ != nullptr) {
         state->Setup(monotonic_now + event_loop_factory_->send_delay() +
                      std::chrono::nanoseconds(1));
@@ -1037,6 +1118,7 @@
     event_loop->OnRun([state]() {
       BootTimestamp next_time = state->OldestMessageTime();
       CHECK_EQ(next_time.boot, state->boot_count());
+      state->SetClockOffset();
       state->Setup(next_time.time);
       state->SetupStartupTimer();
     });
@@ -1451,9 +1533,13 @@
   return remapped_channel;
 }
 
-LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper,
-                        const Node *node)
-    : timestamp_mapper_(std::move(timestamp_mapper)), node_(node) {}
+LogReader::State::State(
+    std::unique_ptr<TimestampMapper> timestamp_mapper,
+    message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
+    const Node *node)
+    : timestamp_mapper_(std::move(timestamp_mapper)),
+      node_(node),
+      multinode_filters_(multinode_filters) {}
 
 void LogReader::State::AddPeer(State *peer) {
   if (timestamp_mapper_ && peer->timestamp_mapper_) {
@@ -1572,6 +1658,23 @@
              source_state->boot_count());
   }
 
+  if (event_loop_factory_ != nullptr &&
+      channel_source_state_[timestamped_message.channel_index] != nullptr &&
+      multinode_filters_ != nullptr) {
+    // Sanity check that we are using consistent boot uuids.
+    State *source_state =
+        channel_source_state_[timestamped_message.channel_index];
+    CHECK_EQ(multinode_filters_->boot_uuid(
+                 configuration::GetNodeIndex(event_loop_->configuration(),
+                                             source_state->node()),
+                 timestamped_message.monotonic_remote_time.boot),
+             CHECK_NOTNULL(
+                 CHECK_NOTNULL(
+                     channel_source_state_[timestamped_message.channel_index])
+                     ->event_loop_)
+                 ->boot_uuid());
+  }
+
   // Send!  Use the replayed queue index here instead of the logged queue index
   // for the remote queue index.  This makes re-logging work.
   const auto err = sender->Send(
@@ -1580,9 +1683,13 @@
       timestamped_message.monotonic_remote_time.time,
       timestamped_message.realtime_remote_time, remote_queue_index,
       (channel_source_state_[timestamped_message.channel_index] != nullptr
-           ? CHECK_NOTNULL(
-                 channel_source_state_[timestamped_message.channel_index])
-                 ->event_loop_->boot_uuid()
+           ? CHECK_NOTNULL(multinode_filters_)
+                 ->boot_uuid(configuration::GetNodeIndex(
+                                 event_loop_->configuration(),
+                                 channel_source_state_[timestamped_message
+                                                           .channel_index]
+                                     ->node()),
+                             timestamped_message.monotonic_remote_time.boot)
            : event_loop_->boot_uuid()));
   if (err != RawSender::Error::kOk) return false;
 
@@ -1629,6 +1736,11 @@
     // map.
   } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
              nullptr) {
+    // TODO(james): Currently, If running replay against a single event loop,
+    // remote timestamps will not get replayed because this code-path only
+    // gets triggered on the event loop that receives the forwarded message
+    // that the timestamps correspond to. This code, as written, also doesn't
+    // correctly handle a non-zero clock_offset for the *_remote_time fields.
     State *source_state =
         CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
 
@@ -1934,5 +2046,20 @@
   }
 }
 
+void LogReader::State::SetClockOffset() {
+  if (node_event_loop_factory_ == nullptr) {
+    // If not running with simulated event loop, set the monotonic clock
+    // offset.
+    clock_offset_ = event_loop()->monotonic_now() - monotonic_start_time(0);
+
+    if (start_event_notifier_) {
+      start_event_notifier_->SetClockOffset(clock_offset_);
+    }
+    if (end_event_notifier_) {
+      end_event_notifier_->SetClockOffset(clock_offset_);
+    }
+  }
+}
+
 }  // namespace logger
 }  // namespace aos