Sort messages between nodes properly

We used to assume the realtime clocks were in sync.  This isn't
realistic.  Use the timestamps on forwarded messages in each
direction to observe the network latency and the offset between nodes.

Since time is no longer exactly linear with all the adjustments, we need
to redo how events are scheduled.  They can't be converted to the
distributed_clock once.  They need to now be converted every time they
are compared between nodes.

Change-Id: I1888c1e6a12f475c321a73aa020b0dc0bab107b3
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 2b3df8f..2e63f45 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -16,6 +16,7 @@
     srcs = [
         "logfile_utils.cc",
         "logger.cc",
+        "logger_math.cc",
     ],
     hdrs = [
         "logfile_utils.h",
@@ -28,10 +29,12 @@
         "//aos/events:event_loop",
         "//aos/events:simulated_event_loop",
         "//aos/network:team_number",
+        "//aos/network:timestamp_filter",
         "//aos/time",
         "@com_github_google_flatbuffers//:flatbuffers",
         "@com_google_absl//absl/container:inlined_vector",
         "@com_google_absl//absl/strings",
+        "@org_tuxfamily_eigen//:eigen",
     ],
 )
 
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 03b89d1..44381f3 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -291,7 +291,7 @@
       chrono::nanoseconds(result.message().monotonic_sent_time()));
 
   newest_timestamp_ = std::max(newest_timestamp_, timestamp);
-  VLOG(1) << "Read from " << filename() << " data " << FlatbufferToJson(result);
+  VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(result);
   return std::move(result);
 }
 
@@ -435,8 +435,14 @@
       }
     } else {
       if (!NextLogFile()) {
-        VLOG(1) << "End of log file.";
+        VLOG(1) << "End of log file " << filenames_.back();
         at_end_ = true;
+        for (MessageHeaderQueue *queue : channels_to_write_) {
+          if (queue == nullptr || queue->timestamp_merger == nullptr) {
+            continue;
+          }
+          queue->timestamp_merger->NoticeAtEnd();
+        }
         return false;
       }
     }
@@ -800,15 +806,40 @@
   return oldest_timestamp;
 }
 
+TimestampMerger::DeliveryTimestamp TimestampMerger::OldestTimestamp() const {
+  if (!has_timestamps_ || timestamp_heap_.size() == 0u) {
+    return TimestampMerger::DeliveryTimestamp{};
+  }
+
+  std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+      oldest_timestamp_reader = timestamp_heap_.front();
+
+  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+      oldest_timestamp = std::get<2>(oldest_timestamp_reader)
+                             ->oldest_message(channel_index_, node_index_);
+
+  TimestampMerger::DeliveryTimestamp timestamp;
+  timestamp.monotonic_event_time =
+      monotonic_clock::time_point(chrono::nanoseconds(
+          std::get<2>(oldest_timestamp)->monotonic_sent_time()));
+  timestamp.realtime_event_time = realtime_clock::time_point(
+      chrono::nanoseconds(std::get<2>(oldest_timestamp)->realtime_sent_time()));
+
+  timestamp.monotonic_remote_time =
+      monotonic_clock::time_point(chrono::nanoseconds(
+          std::get<2>(oldest_timestamp)->monotonic_remote_time()));
+  timestamp.realtime_remote_time =
+      realtime_clock::time_point(chrono::nanoseconds(
+          std::get<2>(oldest_timestamp)->realtime_remote_time()));
+
+  timestamp.remote_queue_index = std::get<2>(oldest_timestamp)->queue_index();
+  return timestamp;
+}
+
 std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
 TimestampMerger::PopOldest() {
   if (has_timestamps_) {
-    CHECK_GT(message_heap_.size(), 0u)
-        << ": Missing data from source node, no data available to match "
-           "timestamp on "
-        << configuration::CleanedChannelToString(
-               configuration_->channels()->Get(channel_index_));
-
+    // Read the timestamps.
     std::tuple<monotonic_clock::time_point, uint32_t,
                FlatbufferVector<MessageHeader>>
         oldest_timestamp = PopTimestampHeap();
@@ -830,6 +861,15 @@
         chrono::nanoseconds(
             std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
 
+    // See if we have any data.  If not, pass the problem up the chain.
+    if (message_heap_.size() == 0u) {
+      VLOG(1) << "No data to match timestamp on "
+              << configuration::CleanedChannelToString(
+                     configuration_->channels()->Get(channel_index_));
+      return std::make_tuple(timestamp,
+                             std::move(std::get<2>(oldest_timestamp)));
+    }
+
     while (true) {
       {
         // Ok, now try grabbing data until we find one which matches.
@@ -842,16 +882,16 @@
             std::get<2>(oldest_message_ref)->monotonic_sent_time()));
 
         if (remote_monotonic_time < remote_timestamp_monotonic_time) {
-          LOG(INFO) << "Undelivered message, skipping.  Remote time is "
-                    << remote_monotonic_time << " timestamp is "
-                    << remote_timestamp_monotonic_time << " on channel "
-                    << channel_index_;
+          VLOG(1) << "Undelivered message, skipping.  Remote time is "
+                  << remote_monotonic_time << " timestamp is "
+                  << remote_timestamp_monotonic_time << " on channel "
+                  << channel_index_;
           PopMessageHeap();
           continue;
         } else if (remote_monotonic_time > remote_timestamp_monotonic_time) {
-          LOG(INFO) << "Data not found.  Remote time should be "
-                    << remote_timestamp_monotonic_time << " on channel "
-                    << channel_index_;
+          VLOG(1) << "Data not found.  Remote time should be "
+                  << remote_timestamp_monotonic_time << " on channel "
+                  << channel_index_;
           return std::make_tuple(timestamp,
                                  std::move(std::get<2>(oldest_timestamp)));
         }
@@ -902,6 +942,8 @@
   }
 }
 
+void TimestampMerger::NoticeAtEnd() { channel_merger_->NoticeAtEnd(); }
+
 namespace {
 std::vector<std::unique_ptr<SplitMessageReader>> MakeSplitMessageReaders(
     const std::vector<std::vector<std::string>> &filenames) {
@@ -997,6 +1039,18 @@
   return channel_heap_.front().first;
 }
 
+TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestamp() const {
+  if (timestamp_heap_.size() == 0u) {
+    return TimestampMerger::DeliveryTimestamp{};
+  }
+  return timestamp_mergers_[timestamp_heap_.front().second].OldestTimestamp();
+}
+
+TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestampForChannel(
+    int channel) const {
+  return timestamp_mergers_[channel].OldestTimestamp();
+}
+
 void ChannelMerger::PushChannelHeap(monotonic_clock::time_point timestamp,
                                     int channel_index) {
   // Pop and recreate the heap if it has already been pushed.  And since we are
@@ -1009,6 +1063,16 @@
         }));
     std::make_heap(channel_heap_.begin(), channel_heap_.end(),
                    ChannelHeapCompare);
+
+    if (timestamp_mergers_[channel_index].has_timestamps()) {
+      timestamp_heap_.erase(std::find_if(
+          timestamp_heap_.begin(), timestamp_heap_.end(),
+          [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
+            return x.second == channel_index;
+          }));
+      std::make_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+                     ChannelHeapCompare);
+    }
   }
 
   channel_heap_.push_back(std::make_pair(timestamp, channel_index));
@@ -1017,22 +1081,40 @@
   // put the oldest message first.
   std::push_heap(channel_heap_.begin(), channel_heap_.end(),
                  ChannelHeapCompare);
+
+  if (timestamp_mergers_[channel_index].has_timestamps()) {
+    timestamp_heap_.push_back(std::make_pair(timestamp, channel_index));
+    std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+                   ChannelHeapCompare);
+  }
 }
 
 std::tuple<TimestampMerger::DeliveryTimestamp, int,
            FlatbufferVector<MessageHeader>>
 ChannelMerger::PopOldest() {
-  CHECK(channel_heap_.size() > 0);
+  CHECK_GT(channel_heap_.size(), 0u);
   std::pair<monotonic_clock::time_point, int> oldest_channel_data =
       channel_heap_.front();
   int channel_index = oldest_channel_data.second;
   std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
                 &ChannelHeapCompare);
   channel_heap_.pop_back();
+
   timestamp_mergers_[channel_index].set_pushed(false);
 
   TimestampMerger *merger = &timestamp_mergers_[channel_index];
 
+  if (merger->has_timestamps()) {
+    CHECK_GT(timestamp_heap_.size(), 0u);
+    std::pair<monotonic_clock::time_point, int> oldest_timestamp_data =
+        timestamp_heap_.front();
+    CHECK(oldest_timestamp_data == oldest_channel_data)
+        << ": Timestamp heap out of sync.";
+    std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+                  &ChannelHeapCompare);
+    timestamp_heap_.pop_back();
+  }
+
   // Merger handles any queueing needed from here.
   std::tuple<TimestampMerger::DeliveryTimestamp,
              FlatbufferVector<MessageHeader>>
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index e5e0175..4ab4dca 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -302,6 +302,10 @@
   std::string DebugString(int channel) const;
   std::string DebugString(int channel, int node_index) const;
 
+  // Returns true if all the messages have been queued from the last log file in
+  // the list of log files chunks.
+  bool at_end() const { return at_end_; }
+
  private:
   // TODO(austin): Need to copy or refcount the message instead of running
   // multiple copies of the reader.  Or maybe have a "as_node" index and hide it
@@ -441,6 +445,9 @@
   // The caller can determine what the appropriate action is to recover.
   std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
 
+  // Returns the oldest forwarding timestamp.
+  DeliveryTimestamp OldestTimestamp() const;
+
   // Tracks if the channel merger has pushed this onto it's heap or not.
   bool pushed() { return pushed_; }
   // Sets if this has been pushed to the channel merger heap.  Should only be
@@ -450,6 +457,13 @@
   // Returns a debug string with the heaps printed out.
   std::string DebugString() const;
 
+  // Returns true if we have timestamps.
+  bool has_timestamps() const { return has_timestamps_; }
+
+  // Records that one of the log files ran out of data.  This should only be
+  // called by a SplitMessageReader.
+  void NoticeAtEnd();
+
  private:
   // Pushes messages and timestamps to the corresponding heaps.
   void PushMessageHeap(
@@ -536,6 +550,12 @@
              FlatbufferVector<MessageHeader>>
   PopOldest();
 
+  // Returns the oldest timestamp in the timestamp heap.
+  TimestampMerger::DeliveryTimestamp OldestTimestamp() const;
+  // Returns the oldest timestamp in the timestamp heap for a specific channel.
+  TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
+      int channel) const;
+
   // Returns the config for this set of log files.
   const Configuration *configuration() const {
     return log_file_header()->configuration();
@@ -568,6 +588,15 @@
   // debugging what went wrong.
   std::string DebugString() const;
 
+  // Returns true if one of the log files has finished reading everything.  When
+  // log file chunks are involved, this means that the last chunk in a log file
+  // has been read.  It is acceptable to be missing data at this point in time.
+  bool at_end() const { return at_end_; }
+
+  // Marks that one of the log files is at the end.  This should only be called
+  // by timestamp mergers.
+  void NoticeAtEnd() { at_end_ = true; }
+
  private:
   // Pushes the timestamp for new data on the provided channel.
   void PushChannelHeap(monotonic_clock::time_point timestamp,
@@ -584,10 +613,15 @@
 
   // A heap of the channel readers and timestamps for the oldest data in each.
   std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
+  // A heap of just the timestamp channel readers and timestamps for the oldest
+  // data in each.
+  std::vector<std::pair<monotonic_clock::time_point, int>> timestamp_heap_;
 
   // Configured node.
   const Node *node_;
 
+  bool at_end_ = false;
+
   // Cached copy of the list of nodes.
   std::vector<const Node *> nodes_;
 };
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index c7f64ae..57c2c2c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -7,6 +7,7 @@
 #include <sys/uio.h>
 #include <vector>
 
+#include "Eigen/Dense"
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/logger_generated.h"
@@ -19,6 +20,11 @@
             "If true, drop any forwarding entries with missing data.  If "
             "false, CHECK.");
 
+DEFINE_bool(timestamps_to_csv, false,
+            "If true, write all the time synchronization information to a set "
+            "of CSV files in /tmp/.  This should only be needed when debugging "
+            "time synchronization.");
+
 namespace aos {
 namespace logger {
 
@@ -109,12 +115,12 @@
 // TODO(austin): Set the remote start time to the first time we see a remote
 // message when we are logging those messages separate?  Need to signal what to
 // do, or how to get a good timestamp.
-
 void Logger::WriteHeader() {
   for (const Node *node : log_namer_->nodes()) {
     WriteHeader(node);
   }
 }
+
 void Logger::WriteHeader(const Node *node) {
   // Now write the header with this timestamp in it.
   flatbuffers::FlatBufferBuilder fbb;
@@ -294,14 +300,20 @@
   MakeRemappedConfig();
 
   if (!configuration::MultiNode(configuration())) {
-    auto it = channel_mergers_.insert(std::make_pair(nullptr, State{}));
-    State *state = &(it.first->second);
+    states_.emplace_back(std::make_unique<State>());
+    State *state = states_[0].get();
 
     state->channel_merger = std::make_unique<ChannelMerger>(filenames);
+  } else {
+    states_.resize(configuration()->nodes()->size());
   }
 }
 
-LogReader::~LogReader() { Deregister(); }
+LogReader::~LogReader() { Deregister();
+  if (offset_fp_ != nullptr) {
+    fclose(offset_fp_);
+  }
+}
 
 const Configuration *LogReader::logged_configuration() const {
   return log_file_header_.message().configuration();
@@ -324,17 +336,19 @@
 }
 
 monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
-  auto it = channel_mergers_.find(node);
-  CHECK(it != channel_mergers_.end())
-      << ": Unknown node " << FlatbufferToJson(node);
-  return it->second.channel_merger->monotonic_start_time();
+  State *state =
+      states_[configuration::GetNodeIndex(configuration(), node)].get();
+  CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
+
+  return state->channel_merger->monotonic_start_time();
 }
 
 realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
-  auto it = channel_mergers_.find(node);
-  CHECK(it != channel_mergers_.end())
-      << ": Unknown node " << FlatbufferToJson(node);
-  return it->second.channel_merger->realtime_start_time();
+  State *state =
+      states_[configuration::GetNodeIndex(configuration(), node)].get();
+  CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
+
+  return state->channel_merger->realtime_start_time();
 }
 
 void LogReader::Register() {
@@ -347,9 +361,10 @@
   event_loop_factory_ = event_loop_factory;
 
   for (const Node *node : configuration::GetNodes(configuration())) {
-    auto it = channel_mergers_.insert(std::make_pair(node, State{}));
-
-    State *state = &(it.first->second);
+    const size_t node_index =
+        configuration::GetNodeIndex(configuration(), node);
+    states_[node_index] = std::make_unique<State>();
+    State *state = states_[node_index].get();
 
     state->channel_merger = std::make_unique<ChannelMerger>(filenames_);
 
@@ -361,47 +376,169 @@
     Register(state->event_loop_unique_ptr.get());
   }
 
-  // Basic idea is that we want to
-  //   1) Find the node which booted first.
-  //   2) Setup the clocks so that each clock is at the time it would be at when
-  //      the first node booted.
+  // We need to now seed our per-node time offsets and get everything set up to
+  // run.
+  const size_t num_nodes = !configuration::MultiNode(logged_configuration())
+                               ? 1u
+                               : logged_configuration()->nodes()->size();
 
-  realtime_clock::time_point earliest_boot_time = realtime_clock::max_time;
-  for (std::pair<const Node *const, State> &state_pair : channel_mergers_) {
-    State *state = &(state_pair.second);
+  // It is easiest to solve for per node offsets with a matrix rather than
+  // trying to solve the equations by hand.  So let's get after it.
+  //
+  // Now, build up the map matrix.
+  //
+  // sample_matrix_ = map_matrix_ * offset_matrix_
+  map_matrix_ = Eigen::MatrixXd::Zero(filters_.size() + 1, num_nodes);
 
-    const realtime_clock::time_point boot_time =
-        state->channel_merger->realtime_start_time() -
-        state->channel_merger->monotonic_start_time().time_since_epoch();
+  sample_matrix_ = Eigen::VectorXd::Zero(filters_.size() + 1);
+  offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
 
-    if (boot_time < earliest_boot_time) {
-      earliest_boot_time = boot_time;
+  // And the base offset matrix, which will be a copy of the initial offset
+  // matrix.
+  base_offset_matrix_ =
+      Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>::Zero(
+          num_nodes);
+
+  // All offsets should sum to 0.  Add that as the first constraint in our least
+  // squares.
+  map_matrix_.row(0).setOnes();
+
+  {
+    // Now, add the a - b -> sample elements.
+    size_t i = 1;
+    for (std::pair<const std::tuple<const Node *, const Node *>,
+                   message_bridge::ClippedAverageFilter> &filter : filters_) {
+      const Node *const node_a = std::get<0>(filter.first);
+      const Node *const node_b = std::get<1>(filter.first);
+
+      const size_t node_a_index =
+          configuration::GetNodeIndex(configuration(), node_a);
+      const size_t node_b_index =
+          configuration::GetNodeIndex(configuration(), node_b);
+
+      // +a
+      map_matrix_(i, node_a_index) = 1.0;
+      // -b
+      map_matrix_(i, node_b_index) = -1.0;
+
+      // -> sample
+      filter.second.set_sample_pointer(&sample_matrix_(i, 0));
+
+      ++i;
     }
   }
 
+  // Rank of the map matrix tells you if all the nodes are in communication with
+  // each other, which tells you if the offsets are observable.
+  const size_t connected_nodes =
+      Eigen::FullPivLU<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>>(
+          map_matrix_)
+          .rank();
+
+  // We don't need to support isolated nodes until someone has a real use case.
+  CHECK_EQ(connected_nodes, num_nodes)
+      << ": There is a node which isn't communicating with the rest.";
+
+  // Now, iterate through all the timestamps from all the nodes and seed
+  // everything.
+  for (std::unique_ptr<State> &state : states_) {
+    for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
+      TimestampMerger::DeliveryTimestamp timestamp =
+          state->channel_merger->OldestTimestampForChannel(i);
+      if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
+        CHECK(state->MaybeUpdateTimestamp(timestamp, i));
+      }
+    }
+  }
+
+  // Make sure all the samples have been seeded.
+  for (int i = 1; i < sample_matrix_.cols(); ++i) {
+    // The seeding logic is pretty basic right now because we don't have great
+    // use cases yet.  It wants to see data from every node.  Blow up for now,
+    // and once we have a reason to do something different, update this logic.
+    // Maybe read further in the log file?  Or seed off the realtime time?
+    CHECK_NE(sample_matrix_(i, 0), 0.0)
+        << ": Sample " << i << " is not seeded.";
+  }
+
+  // And solve.
+  offset_matrix_ = SolveOffsets();
+
+  // Save off the base offsets so we can work in deltas from here out.  That
+  // will significantly simplify the numerical precision problems.
+  for (size_t i = 0; i < num_nodes; ++i) {
+    base_offset_matrix_(i, 0) =
+        std::chrono::duration_cast<std::chrono::nanoseconds>(
+            std::chrono::duration<double>(offset_matrix_(i, 0)));
+  }
+
+  {
+    // Shift everything so we never could (reasonably) require the distributed
+    // clock to have a large backwards jump in time.  This makes it so the boot
+    // time on the node up the longest will essentially start matching the
+    // distributed clock.
+    const chrono::nanoseconds offset = -base_offset_matrix_.maxCoeff();
+    for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
+      base_offset_matrix_(i, 0) += offset;
+    }
+  }
+
+  {
+    // Re-compute the samples and setup all the filters so that they
+    // subtract this base offset.
+
+    size_t i = 1;
+    for (std::pair<const std::tuple<const Node *, const Node *>,
+                   message_bridge::ClippedAverageFilter> &filter : filters_) {
+      CHECK(filter.second.sample_pointer() == &sample_matrix_(i, 0));
+
+      const Node *const node_a = std::get<0>(filter.first);
+      const Node *const node_b = std::get<1>(filter.first);
+
+      const size_t node_a_index =
+          configuration::GetNodeIndex(configuration(), node_a);
+      const size_t node_b_index =
+          configuration::GetNodeIndex(configuration(), node_b);
+
+      filter.second.set_base_offset(base_offset_matrix_(node_a_index) -
+                                    base_offset_matrix_(node_b_index));
+
+      ++i;
+    }
+  }
+
+  // Now, iterate again through all the offsets now that we have set the base
+  // offset to something sane.  This will seed everything with an accurate
+  // initial offset.
+  for (std::unique_ptr<State> &state : states_) {
+    for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
+      TimestampMerger::DeliveryTimestamp timestamp =
+          state->channel_merger->OldestTimestampForChannel(i);
+      if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
+        CHECK(state->MaybeUpdateTimestamp(timestamp, i));
+      }
+    }
+  }
+
+  UpdateOffsets();
+
   // We want to start the log file at the last start time of the log files from
   // all the nodes.  Compute how long each node's simulation needs to run to
   // move time to this point.
-  monotonic_clock::duration run_time = monotonic_clock::duration(0);
+  distributed_clock::time_point start_time = distributed_clock::min_time;
 
-  for (std::pair<const Node *const, State> &state_pair : channel_mergers_) {
-    State *state = &(state_pair.second);
-
-    const realtime_clock::time_point boot_time =
-        state->channel_merger->realtime_start_time() -
-        state->channel_merger->monotonic_start_time().time_since_epoch();
-
-    // And start each node's clocks so the realtime clocks line up for the start
-    // times.  This will let us start using it, but isn't good enough.
-    state->node_event_loop_factory->SetMonotonicNow(
-        monotonic_clock::time_point(earliest_boot_time - boot_time));
+  for (std::unique_ptr<State> &state : states_) {
+    // Setup the realtime clock to have something sane in it now.
     state->node_event_loop_factory->SetRealtimeOffset(
         state->channel_merger->monotonic_start_time(),
         state->channel_merger->realtime_start_time());
-    run_time =
-        std::max(run_time, state->channel_merger->monotonic_start_time() -
-                               state->node_event_loop_factory->monotonic_now());
+    // And start computing the start time on the distributed clock now that that
+    // works.
+    start_time = std::max(start_time,
+                          state->node_event_loop_factory->ToDistributedClock(
+                              state->channel_merger->monotonic_start_time()));
   }
+  CHECK_GE(start_time, distributed_clock::epoch());
 
   // Forwarding is tracked per channel.  If it is enabled, we want to turn it
   // off.  Otherwise messages replayed will get forwarded across to the other
@@ -413,9 +550,8 @@
       const Node *node = configuration::GetNode(
           configuration(), channel->source_node()->string_view());
 
-      auto state_pair = channel_mergers_.find(node);
-      CHECK(state_pair != channel_mergers_.end());
-      State *state = &(state_pair->second);
+      State *state =
+          states_[configuration::GetNodeIndex(configuration(), node)].get();
 
       const Channel *remapped_channel =
           RemapChannel(state->event_loop, channel);
@@ -428,16 +564,90 @@
   // to timestamps on log files where the timestamp log file starts before the
   // data.  In this case, it is reasonable to expect missing data.
   ignore_missing_data_ = true;
-  event_loop_factory_->RunFor(run_time);
+  event_loop_factory_->RunFor(start_time.time_since_epoch());
   // Now that we are running for real, missing data means that the log file is
   // corrupted or went wrong.
   ignore_missing_data_ = false;
 }
 
+void LogReader::UpdateOffsets() {
+  // TODO(austin): Evaluate less accurate inverses.  We might be able to
+  // do some tricks to keep the accuracy up.
+  offset_matrix_ = SolveOffsets();
+
+  size_t node_index = 0;
+  for (std::unique_ptr<State> &state : states_) {
+    state->node_event_loop_factory->SetDistributedOffset(offset(node_index));
+    ++node_index;
+  }
+}
+
+std::tuple<message_bridge::ClippedAverageFilter *, bool> LogReader::GetFilter(
+    const Node *node_a, const Node *node_b) {
+  CHECK_NE(node_a, node_b);
+  CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
+  CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
+
+  if (node_a > node_b) {
+    return std::make_pair(std::get<0>(GetFilter(node_b, node_a)), false);
+  }
+
+  auto tuple = std::make_tuple(node_a, node_b);
+
+  auto it = filters_.find(tuple);
+
+  if (it == filters_.end()) {
+    auto &x = filters_
+                  .insert(std::make_pair(
+                      tuple, message_bridge::ClippedAverageFilter()))
+                  .first->second;
+    if (FLAGS_timestamps_to_csv) {
+      std::string fwd_name =
+          absl::StrCat("/tmp/timestamp_", node_a->name()->string_view(), "_",
+                       node_b->name()->string_view(), ".csv");
+      x.fwd_fp = fopen(fwd_name.c_str(), "w");
+      std::string rev_name =
+          absl::StrCat("/tmp/timestamp_", node_b->name()->string_view(), "_",
+                       node_a->name()->string_view(), ".csv");
+      x.rev_fp = fopen(rev_name.c_str(), "w");
+    }
+
+    return std::make_tuple(&x, true);
+  } else {
+    return std::make_tuple(&(it->second), true);
+  }
+}
+
+bool LogReader::State::MaybeUpdateTimestamp(
+    const TimestampMerger::DeliveryTimestamp &channel_timestamp,
+    int channel_index) {
+  if (channel_timestamp.monotonic_remote_time == monotonic_clock::min_time) {
+    return false;
+  }
+
+  // Got a forwarding timestamp!
+  CHECK(std::get<0>(filters[channel_index]) != nullptr);
+
+  // Call the correct method depending on if we are the forward or reverse
+  // direction here.
+  if (std::get<1>(filters[channel_index])) {
+    std::get<0>(filters[channel_index])
+        ->FwdSample(channel_timestamp.monotonic_event_time,
+                    channel_timestamp.monotonic_event_time -
+                        channel_timestamp.monotonic_remote_time);
+  } else {
+    std::get<0>(filters[channel_index])
+        ->RevSample(channel_timestamp.monotonic_event_time,
+                    channel_timestamp.monotonic_event_time -
+                        channel_timestamp.monotonic_remote_time);
+  }
+  return true;
+}
+
 void LogReader::Register(EventLoop *event_loop) {
-  auto state_pair = channel_mergers_.find(event_loop->node());
-  CHECK(state_pair != channel_mergers_.end());
-  State *state = &(state_pair->second);
+  State *state =
+      states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
+          .get();
 
   state->event_loop = event_loop;
 
@@ -450,12 +660,29 @@
   state->channel_merger->SetNode(event_loop->node());
 
   state->channels.resize(logged_configuration()->channels()->size());
+  state->filters.resize(state->channels.size());
+
+  state->channel_target_event_loop_factory.resize(state->channels.size());
 
   for (size_t i = 0; i < state->channels.size(); ++i) {
     const Channel *channel =
         RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
 
     state->channels[i] = event_loop->MakeRawSender(channel);
+
+    state->filters[i] = std::make_tuple(nullptr, false);
+
+    if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
+        configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
+      const Node *target_node = configuration::GetNode(
+          event_loop->configuration(), channel->source_node()->string_view());
+      state->filters[i] = GetFilter(event_loop->node(), target_node);
+
+      if (event_loop_factory_ != nullptr) {
+        state->channel_target_event_loop_factory[i] =
+            event_loop_factory_->GetNodeEventLoopFactory(target_node);
+      }
+    }
   }
 
   state->timer_handler = event_loop->AddTimer([this, state]() {
@@ -466,6 +693,7 @@
       }
       return;
     }
+    bool update_offsets = false;
     TimestampMerger::DeliveryTimestamp channel_timestamp;
     int channel_index;
     FlatbufferVector<MessageHeader> channel_data =
@@ -477,20 +705,60 @@
     const monotonic_clock::time_point monotonic_now =
         state->event_loop->context().monotonic_event_time;
     CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
-        << ": Now " << monotonic_now.time_since_epoch().count()
-        << " trying to send "
-        << channel_timestamp.monotonic_event_time.time_since_epoch().count();
+        << ": " << FlatbufferToJson(state->event_loop->node()) << " Now "
+        << monotonic_now << " trying to send "
+        << channel_timestamp.monotonic_event_time << " failure "
+        << state->channel_merger->DebugString();
 
     if (channel_timestamp.monotonic_event_time >
             state->channel_merger->monotonic_start_time() ||
         event_loop_factory_ != nullptr) {
-      if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries) ||
+      if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
+           !state->channel_merger->at_end()) ||
           channel_data.message().data() != nullptr) {
         CHECK(channel_data.message().data() != nullptr)
             << ": Got a message without data.  Forwarding entry which was "
                "not matched?  Use --skip_missing_forwarding_entries to ignore "
                "this.";
 
+        if (state->MaybeUpdateTimestamp(channel_timestamp, channel_index)) {
+          // Confirm that the message was sent on the sending node before the
+          // destination node (this node).  As a proxy, do this by making sure
+          // that time on the source node is past when the message was sent.
+          CHECK_LT(channel_timestamp.monotonic_remote_time,
+                   state->channel_target_event_loop_factory[channel_index]
+                       ->monotonic_now());
+
+          update_offsets = true;
+
+          if (FLAGS_timestamps_to_csv) {
+            if (offset_fp_ == nullptr) {
+              offset_fp_ = fopen("/tmp/offsets.csv", "w");
+              fprintf(
+                  offset_fp_,
+                  "# time_since_start, offset node 0, offset node 1, ...\n");
+              first_time_ = channel_timestamp.realtime_event_time;
+            }
+
+            fprintf(offset_fp_, "%.9f",
+                    std::chrono::duration_cast<std::chrono::duration<double>>(
+                        channel_timestamp.realtime_event_time - first_time_)
+                        .count());
+            for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
+              fprintf(
+                  offset_fp_, ", %.9f",
+                  offset_matrix_(i, 0) +
+                      std::chrono::duration_cast<std::chrono::duration<double>>(
+                          base_offset_matrix_(i, 0))
+                          .count());
+            }
+            fprintf(offset_fp_, "\n");
+          }
+
+        } else {
+          CHECK(std::get<0>(state->filters[channel_index]) == nullptr);
+        }
+
         // If we have access to the factory, use it to fix the realtime time.
         if (state->node_event_loop_factory != nullptr) {
           state->node_event_loop_factory->SetRealtimeOffset(
@@ -504,7 +772,16 @@
             channel_timestamp.monotonic_remote_time,
             channel_timestamp.realtime_remote_time,
             channel_timestamp.remote_queue_index);
+      } else if (state->channel_merger->at_end()) {
+        // We are at the end of the log file and found missing data.  Finish
+        // reading the rest of the log file and call it quits.  We don't want to
+        // replay partial data.
+        while (state->channel_merger->OldestMessage() !=
+               monotonic_clock::max_time) {
+          state->channel_merger->PopOldest();
+        }
       }
+
     } else {
       LOG(WARNING)
           << "Not sending data from before the start of the log file. "
@@ -526,6 +803,13 @@
                                     std::chrono::nanoseconds(1));
       }
     }
+
+    // Once we make this call, the current time changes.  So do everything which
+    // involves time before changing it.  That especially includes sending the
+    // message.
+    if (update_offsets) {
+      UpdateOffsets();
+    }
   });
 
   ++live_nodes_;
@@ -540,10 +824,7 @@
 void LogReader::Deregister() {
   // Make sure that things get destroyed in the correct order, rather than
   // relying on getting the order correct in the class definition.
-  for (const Node *node : Nodes()) {
-    auto state_pair = channel_mergers_.find(node);
-    CHECK(state_pair != channel_mergers_.end());
-    State *state = &(state_pair->second);
+  for (std::unique_ptr<State> &state : states_) {
     for (size_t i = 0; i < state->channels.size(); ++i) {
       state->channels[i].reset();
     }
@@ -578,8 +859,8 @@
 }
 
 void LogReader::MakeRemappedConfig() {
-  for (std::pair<const Node *const, State> &state : channel_mergers_) {
-    CHECK(!state.second.event_loop)
+  for (std::unique_ptr<State> &state : states_) {
+    CHECK(!state->event_loop)
         << ": Can't change the mapping after the events are scheduled.";
   }
 
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e0350bb..34dbd24 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -1,15 +1,19 @@
 #ifndef AOS_EVENTS_LOGGER_H_
 #define AOS_EVENTS_LOGGER_H_
 
+#include <chrono>
 #include <deque>
 #include <string_view>
 #include <vector>
 
+#include "Eigen/Dense"
+#include "absl/strings/str_cat.h"
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/events/simulated_event_loop.h"
+#include "aos/network/timestamp_filter.h"
 #include "aos/time/time.h"
 #include "flatbuffers/flatbuffers.h"
 
@@ -174,6 +178,7 @@
       data_writers_;
 };
 
+
 // Logs all channels available in the event loop to disk every 100 ms.
 // Start by logging one message per channel to capture any state and
 // configuration that is sent rately on a channel and would affect execution.
@@ -359,6 +364,8 @@
   // the header is likely distracting.
   FlatbufferVector<LogFileHeader> log_file_header_;
 
+  Eigen::Matrix<double, Eigen::Dynamic, 1> SolveOffsets();
+
   // State per node.
   struct State {
     // Log file.
@@ -373,10 +380,68 @@
     EventLoop *event_loop = nullptr;
     // And timer used to send messages.
     TimerHandler *timer_handler;
+
+    // Updates the timestamp filter with the timestamp.  Returns true if the
+    // provided timestamp was actually a forwarding timestamp and used, and
+    // false otherwise.
+    bool MaybeUpdateTimestamp(
+        const TimestampMerger::DeliveryTimestamp &channel_timestamp,
+        int channel_index);
+
+    // Filters (or nullptr if it isn't a forwarded channel) for each channel.
+    // This corresponds to the object which is shared among all the channels
+    // going between 2 nodes.  The second element in the tuple indicates if this
+    // is the primary direction or not.
+    std::vector<std::tuple<message_bridge::ClippedAverageFilter *, bool>>
+        filters;
+
+    // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
+    // channel) which correspond to the originating node.
+    std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory;
   };
 
-  // Map of nodes to States used to hold all the state for all the nodes.
-  std::map<const Node *, State> channel_mergers_;
+  // Node index -> State.
+  std::vector<std::unique_ptr<State>> states_;
+
+  // Creates the requested filter if it doesn't exist, regardless of whether
+  // these nodes can actually communicate directly.  The second return value
+  // reports if this is the primary direction or not.
+  std::tuple<message_bridge::ClippedAverageFilter *, bool> GetFilter(
+      const Node *node_a, const Node *node_b);
+
+  // FILE to write offsets to (if populated).
+  FILE *offset_fp_ = nullptr;
+  // Timestamp of the first piece of data used for the horizontal axis on the
+  // plot.
+  aos::realtime_clock::time_point first_time_;
+
+  // List of filters for a connection.  The pointer to the first node will be
+  // less than the second node.
+  std::map<std::tuple<const Node *, const Node *>,
+           message_bridge::ClippedAverageFilter>
+      filters_;
+
+  // Returns the offset from the monotonic clock for a node to the distributed
+  // clock.  distributed = monotonic + offset;
+  std::chrono::nanoseconds offset(int node_index) const {
+    return -std::chrono::duration_cast<std::chrono::nanoseconds>(
+               std::chrono::duration<double>(offset_matrix_(node_index))) -
+           base_offset_matrix_(node_index);
+  }
+
+  // Updates the offset matrix solution and sets the per-node distributed
+  // offsets in the factory.
+  void UpdateOffsets();
+
+  // sample_matrix_ = map_matrix_ * offset_matrix_
+  Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
+  Eigen::Matrix<double, Eigen::Dynamic, 1> sample_matrix_;
+  Eigen::Matrix<double, Eigen::Dynamic, 1> offset_matrix_;
+
+  // Base offsets.  The actual offset is the sum of this and the offset matrix.
+  // This removes some of the dynamic range challenges from the double above.
+  Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>
+      base_offset_matrix_;
 
   std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
       remapped_configuration_buffer_;
diff --git a/aos/events/logging/logger_math.cc b/aos/events/logging/logger_math.cc
new file mode 100644
index 0000000..313894b
--- /dev/null
+++ b/aos/events/logging/logger_math.cc
@@ -0,0 +1,16 @@
+#include "aos/events/logging/logger.h"
+
+#include "Eigen/Dense"
+
+namespace aos {
+namespace logger {
+
+// This is slow to compile, so we put it in a separate file.  More parallelism
+// and less change.
+Eigen::Matrix<double, Eigen::Dynamic, 1> LogReader::SolveOffsets() {
+  return map_matrix_.bdcSvd(Eigen::ComputeThinU | Eigen::ComputeThinV)
+      .solve(sample_matrix_);
+}
+
+}  // namespace logger
+}  // namespace aos
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 9f2969e..9e69ae4 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -325,10 +325,10 @@
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
     Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
-                      std::chrono::milliseconds(100));
+                      chrono::milliseconds(100));
 
     Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
-                      std::chrono::milliseconds(100));
+                      chrono::milliseconds(100));
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
 
@@ -547,12 +547,12 @@
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
     Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
-                      std::chrono::milliseconds(100));
+                      chrono::milliseconds(100));
 
     event_loop_factory_.RunFor(chrono::milliseconds(200));
 
     Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
-                      std::chrono::milliseconds(100));
+                      chrono::milliseconds(100));
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
 
@@ -638,12 +638,11 @@
 
   reader.Deregister();
 }
-// TODO(austin): We can write a test which recreates a logfile and confirms that
-// we get it back.  That is the ultimate test.
 
-// Tests that we can read log files where the monotonic clocks don't match
-// correctly.
-TEST_F(MultinodeLoggerTest, MissmatchingTimeStart) {
+// Tests that we can read log files where the monotonic clocks drift and don't
+// match correctly.  While we are here, also test that different ending times
+// also is readable.
+TEST_F(MultinodeLoggerTest, MismatchedClocks) {
   const ::std::string tmpdir(getenv("TEST_TMPDIR"));
   const ::std::string logfile_base = tmpdir + "/multi_logfile";
   const ::std::string logfile1 = logfile_base + "_pi1_data.bfbs";
@@ -659,47 +658,76 @@
   LOG(INFO) << "Logging data to " << logfile1 << " and " << logfile3;
 
   {
-   NodeEventLoopFactory *pi2 = event_loop_factory_.GetNodeEventLoopFactory(pi2_);
-   LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
-             << pi2->realtime_now() << " distributed "
-             << pi2->ToDistributedClock(pi2->monotonic_now());
+    NodeEventLoopFactory *pi2 =
+        event_loop_factory_.GetNodeEventLoopFactory(pi2_);
+    LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
+              << pi2->realtime_now() << " distributed "
+              << pi2->ToDistributedClock(pi2->monotonic_now());
 
-   pi2->SetMonotonicNow(pi2->monotonic_now() + std::chrono::seconds(1000));
-   LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
-             << pi2->realtime_now() << " distributed "
-             << pi2->ToDistributedClock(pi2->monotonic_now());
+    const chrono::nanoseconds initial_pi2_offset = -chrono::seconds(1000);
+    chrono::nanoseconds pi2_offset = initial_pi2_offset;
 
-   std::unique_ptr<EventLoop> ping_event_loop =
-       event_loop_factory_.MakeEventLoop("ping", pi1_);
-   Ping ping(ping_event_loop.get());
-   std::unique_ptr<EventLoop> pong_event_loop =
-       event_loop_factory_.MakeEventLoop("pong", pi2_);
-   Pong pong(pong_event_loop.get());
+    pi2->SetDistributedOffset(pi2_offset);
+    LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
+              << pi2->realtime_now() << " distributed "
+              << pi2->ToDistributedClock(pi2->monotonic_now());
 
-   std::unique_ptr<EventLoop> pi1_logger_event_loop =
-       event_loop_factory_.MakeEventLoop("logger", pi1_);
-   std::unique_ptr<LogNamer> pi1_log_namer =
-       std::make_unique<MultiNodeLogNamer>(
-           logfile_base, pi1_logger_event_loop->configuration(),
-           pi1_logger_event_loop->node());
+    std::unique_ptr<EventLoop> ping_event_loop =
+        event_loop_factory_.MakeEventLoop("ping", pi1_);
+    Ping ping(ping_event_loop.get());
+    std::unique_ptr<EventLoop> pong_event_loop =
+        event_loop_factory_.MakeEventLoop("pong", pi2_);
+    Pong pong(pong_event_loop.get());
 
-   std::unique_ptr<EventLoop> pi2_logger_event_loop =
-       event_loop_factory_.MakeEventLoop("logger", pi2_);
-   std::unique_ptr<LogNamer> pi2_log_namer =
-       std::make_unique<MultiNodeLogNamer>(
-           logfile_base, pi2_logger_event_loop->configuration(),
-           pi2_logger_event_loop->node());
+    std::unique_ptr<EventLoop> pi2_logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger", pi2_);
+    std::unique_ptr<LogNamer> pi2_log_namer =
+        std::make_unique<MultiNodeLogNamer>(
+            logfile_base, pi2_logger_event_loop->configuration(),
+            pi2_logger_event_loop->node());
 
-   event_loop_factory_.RunFor(chrono::milliseconds(95));
+    for (int i = 0; i < 95; ++i) {
+      pi2_offset += chrono::nanoseconds(200);
+      pi2->SetDistributedOffset(pi2_offset);
+      event_loop_factory_.RunFor(chrono::milliseconds(1));
+    }
 
-   Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
-                     std::chrono::milliseconds(100));
+    Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
+                      chrono::milliseconds(100));
 
-   event_loop_factory_.RunFor(chrono::milliseconds(200));
+    event_loop_factory_.RunFor(chrono::milliseconds(200));
 
-   Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
-                     std::chrono::milliseconds(100));
-   event_loop_factory_.RunFor(chrono::milliseconds(20000));
+    {
+      // Run pi1's logger for only part of the time.
+      std::unique_ptr<EventLoop> pi1_logger_event_loop =
+          event_loop_factory_.MakeEventLoop("logger", pi1_);
+      std::unique_ptr<LogNamer> pi1_log_namer =
+          std::make_unique<MultiNodeLogNamer>(
+              logfile_base, pi1_logger_event_loop->configuration(),
+              pi1_logger_event_loop->node());
+
+      Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
+                        chrono::milliseconds(100));
+
+      for (int i = 0; i < 20000; ++i) {
+        pi2_offset += chrono::nanoseconds(200);
+        pi2->SetDistributedOffset(pi2_offset);
+        event_loop_factory_.RunFor(chrono::milliseconds(1));
+      }
+
+      EXPECT_GT(pi2_offset - initial_pi2_offset,
+                event_loop_factory_.send_delay() +
+                    event_loop_factory_.network_delay());
+
+      for (int i = 0; i < 40000; ++i) {
+        pi2_offset -= chrono::nanoseconds(200);
+        pi2->SetDistributedOffset(pi2_offset);
+        event_loop_factory_.RunFor(chrono::milliseconds(1));
+      }
+    }
+
+    // And log a bit more on pi2.
+    event_loop_factory_.RunFor(chrono::milliseconds(400));
   }
 
   LogReader reader(
@@ -785,14 +813,17 @@
       });
 
   log_reader_factory.Run();
-  EXPECT_EQ(pi1_ping_count, 2030);
-  EXPECT_EQ(pi2_ping_count, 2030);
-  EXPECT_EQ(pi1_pong_count, 2030);
-  EXPECT_EQ(pi2_pong_count, 2030);
+  EXPECT_EQ(pi1_ping_count, 6030);
+  EXPECT_EQ(pi2_ping_count, 6030);
+  EXPECT_EQ(pi1_pong_count, 6030);
+  EXPECT_EQ(pi2_pong_count, 6030);
 
   reader.Deregister();
 }
 
+// TODO(austin): We can write a test which recreates a logfile and confirms that
+// we get it back.  That is the ultimate test.
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos