Handle log files not starting at the same time.

The monotonic clocks were assumed to be in sync.  That isn't realistic.
This assumption leaked into how we kept the queues primed, and how the
event loop was initialized.

This isn't enough to actually replay in sync.  We are assuming that the
realtime clocks are in sync and the monotonic clocks don't drift from
each other.  That'll be good enough to get started, but not for long.

Change-Id: Ic18e31598f1a76edee0b0d5a2d7936deee1fbfec
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 144890a..03b89d1 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -271,9 +271,11 @@
   // And copy the config so we have it forever.
   configuration_ = std::vector<uint8_t>(config_data.begin(), config_data.end());
 
-  max_out_of_order_duration_ = std::chrono::nanoseconds(
-      flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
-          ->max_out_of_order_duration());
+  max_out_of_order_duration_ =
+      std::chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
+
+  VLOG(1) << "Opened " << filename << " as node "
+          << FlatbufferToJson(log_file_header()->node());
 }
 
 std::optional<FlatbufferVector<MessageHeader>> MessageReader::ReadMessage() {
@@ -289,8 +291,7 @@
       chrono::nanoseconds(result.message().monotonic_sent_time()));
 
   newest_timestamp_ = std::max(newest_timestamp_, timestamp);
-  VLOG(1) << "Read from " << filename().substr(130) << " data "
-          << FlatbufferToJson(result);
+  VLOG(1) << "Read from " << filename() << " data " << FlatbufferToJson(result);
   return std::move(result);
 }
 
@@ -361,16 +362,56 @@
 }
 
 bool SplitMessageReader::QueueMessages(
-    monotonic_clock::time_point oldest_message_time) {
+    monotonic_clock::time_point last_dequeued_time) {
   // TODO(austin): Once we are happy that everything works, read a 256kb chunk
   // to reduce the need to re-heap down below.
+
+  // Special case no more data.  Otherwise we blow up on the CHECK statement
+  // confirming that we have enough data queued.
+  if (at_end_) {
+    return false;
+  }
+
+  // If this isn't the first time around, confirm that we had enough data queued
+  // to follow the contract.
+  if (time_to_queue_ != monotonic_clock::min_time) {
+    CHECK_LE(last_dequeued_time,
+             newest_timestamp() - max_out_of_order_duration())
+        << " node " << FlatbufferToJson(node()) << " on " << this;
+
+    // Bail if there is enough data already queued.
+    if (last_dequeued_time < time_to_queue_) {
+      VLOG(1) << "All up to date on " << this << ", dequeued "
+              << last_dequeued_time << " queue time " << time_to_queue_;
+      return true;
+    }
+  } else {
+    // Startup takes a special dance.  We want to queue up until the start time,
+    // but we then want to find the next message to read.  The conservative
+    // answer is to immediately trigger a second requeue to get things moving.
+    time_to_queue_ = monotonic_start_time();
+    QueueMessages(time_to_queue_);
+  }
+
+  // If we are asked to queue, queue for at least max_out_of_order_duration past
+  // the last known time in the log file (ie the newest timestep read).  As long
+  // as we requeue exactly when time_to_queue_ is dequeued and go no further, we
+  // are safe.  And since we pop in order, that works.
+  //
+  // Special case the start of the log file.  There should be at most 1 message
+  // from each channel at the start of the log file.  So always force the start
+  // of the log file to just be read.
+  time_to_queue_ = std::max(time_to_queue_, newest_timestamp());
+  VLOG(1) << "Queueing, going until " << time_to_queue_ << " " << filename();
+
+  bool was_emplaced = false;
   while (true) {
-    // Don't queue if we have enough data already.
-    // When a log file starts, there should be a message from each channel.
-    // Those messages might be very old. Make sure to read a chunk past the
-    // starting time.
-    if (queued_messages_ > 0 &&
-        message_reader_->queue_data_time() > oldest_message_time) {
+    // Stop if we have enough.
+    if (newest_timestamp() >
+            time_to_queue_ + max_out_of_order_duration() &&
+        was_emplaced) {
+      VLOG(1) << "Done queueing on " << this << ", queued to "
+              << newest_timestamp() << " with requeue time " << time_to_queue_;
       return true;
     }
 
@@ -378,12 +419,24 @@
             message_reader_->ReadMessage()) {
       const MessageHeader &header = msg.value().message();
 
-      const int channel_index = header.channel_index();
-      channels_to_write_[channel_index]->emplace_back(std::move(msg.value()));
+      const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+          chrono::nanoseconds(header.monotonic_sent_time()));
 
-      ++queued_messages_;
+      VLOG(1) << "Queued " << this << " " << filename()
+              << " ttq: " << time_to_queue_ << " now "
+              << newest_timestamp() << " start time "
+              << monotonic_start_time() << " " << FlatbufferToJson(&header);
+
+      const int channel_index = header.channel_index();
+      was_emplaced = channels_to_write_[channel_index]->emplace_back(
+          std::move(msg.value()));
+      if (was_emplaced) {
+        newest_timestamp_ = std::max(newest_timestamp_, timestamp);
+      }
     } else {
       if (!NextLogFile()) {
+        VLOG(1) << "End of log file.";
+        at_end_ = true;
         return false;
       }
     }
@@ -398,26 +451,41 @@
   const Channel *const channel =
       configuration()->channels()->Get(channel_index);
 
+  VLOG(1) << "  Configuring merger " << this << " for channel " << channel_index
+          << " "
+          << configuration::CleanedChannelToString(
+                 configuration()->channels()->Get(channel_index));
+
   MessageHeaderQueue *message_header_queue = nullptr;
 
   // Figure out if this log file is from our point of view, or the other node's
   // point of view.
   if (node() == reinterpreted_target_node) {
-    if (channels_to_write_[channel_index] != nullptr) {
-      // We already have deduced which is the right channel.  Use
-      // channels_to_write_ here.
-      message_header_queue = channels_to_write_[channel_index];
+    VLOG(1) << "    Replaying as logged node " << filename();
+
+    if (configuration::ChannelIsSendableOnNode(channel, node())) {
+      VLOG(1) << "      Data on node";
+      message_header_queue = &(channels_[channel_index].data);
+    } else if (configuration::ChannelIsReadableOnNode(channel, node())) {
+      VLOG(1) << "      Timestamps on node";
+      message_header_queue =
+          &(channels_[channel_index].timestamps[configuration::GetNodeIndex(
+              configuration(), node())]);
     } else {
-      // This means this is data from another node, and will be ignored.
+      VLOG(1) << "     Dropping";
     }
   } else {
+    VLOG(1) << "    Replaying as other node " << filename();
     // We are replaying from another node's point of view.  The only interesting
-    // data is data that is forwarded to our node, ie was sent on the other
-    // node.
-    if (configuration::ChannelIsSendableOnNode(channel, node())) {
+    // data is data that is sent from our node and received on theirs.
+    if (configuration::ChannelIsReadableOnNode(channel,
+                                               reinterpreted_target_node) &&
+        configuration::ChannelIsSendableOnNode(channel, node())) {
+      VLOG(1) << "      Readable on target node";
       // Data from another node.
       message_header_queue = &(channels_[channel_index].data);
     } else {
+      VLOG(1) << "      Dropping";
       // This is either not sendable on the other node, or is a timestamp and
       // therefore not interesting.
     }
@@ -435,12 +503,15 @@
            FlatbufferVector<MessageHeader>>
 SplitMessageReader::PopOldest(int channel_index) {
   CHECK_GT(channels_[channel_index].data.size(), 0u);
-  const std::tuple<monotonic_clock::time_point, uint32_t> timestamp =
-      channels_[channel_index].data.front_timestamp();
+  const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+      timestamp = channels_[channel_index].data.front_timestamp();
   FlatbufferVector<MessageHeader> front =
       std::move(channels_[channel_index].data.front());
   channels_[channel_index].data.pop_front();
-  --queued_messages_;
+
+  VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp);
+
+  QueueMessages(std::get<0>(timestamp));
 
   return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
                          std::move(front));
@@ -450,18 +521,21 @@
            FlatbufferVector<MessageHeader>>
 SplitMessageReader::PopOldest(int channel, int node_index) {
   CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
-  const std::tuple<monotonic_clock::time_point, uint32_t> timestamp =
-      channels_[channel].timestamps[node_index].front_timestamp();
+  const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+      timestamp = channels_[channel].timestamps[node_index].front_timestamp();
   FlatbufferVector<MessageHeader> front =
       std::move(channels_[channel].timestamps[node_index].front());
   channels_[channel].timestamps[node_index].pop_front();
-  --queued_messages_;
+
+  VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp);
+
+  QueueMessages(std::get<0>(timestamp));
 
   return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
                          std::move(front));
 }
 
-void SplitMessageReader::MessageHeaderQueue::emplace_back(
+bool SplitMessageReader::MessageHeaderQueue::emplace_back(
     FlatbufferVector<MessageHeader> &&msg) {
   CHECK(split_reader != nullptr);
 
@@ -469,7 +543,7 @@
   // the message.  This happens when a log file from another node is replayed,
   // and the timestamp mergers down stream just don't care.
   if (timestamp_merger == nullptr) {
-    return;
+    return false;
   }
 
   CHECK(timestamps != msg.message().has_data())
@@ -486,6 +560,8 @@
       timestamp_merger->Update(split_reader, front_timestamp());
     }
   }
+
+  return true;
 }
 
 void SplitMessageReader::MessageHeaderQueue::pop_front() {
@@ -550,6 +626,8 @@
                       : -1),
       channel_merger_(channel_merger) {
   // Tell the readers we care so they know who to notify.
+  VLOG(1) << "Configuring channel " << channel_index << " target node "
+          << FlatbufferToJson(target_node);
   for (SplitMessageReader *reader : split_message_readers_) {
     reader->SetTimestampMerger(this, channel_index, target_node);
   }
@@ -563,7 +641,8 @@
 }
 
 void TimestampMerger::PushMessageHeap(
-    std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+    std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+        timestamp,
     SplitMessageReader *split_message_reader) {
   DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
                       [split_message_reader](
@@ -587,8 +666,26 @@
   }
 }
 
+std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+TimestampMerger::oldest_message() const {
+  CHECK_GT(message_heap_.size(), 0u);
+  std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+      oldest_message_reader = message_heap_.front();
+  return std::get<2>(oldest_message_reader)->oldest_message(channel_index_);
+}
+
+std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+TimestampMerger::oldest_timestamp() const {
+  CHECK_GT(timestamp_heap_.size(), 0u);
+  std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+      oldest_message_reader = timestamp_heap_.front();
+  return std::get<2>(oldest_message_reader)
+      ->oldest_message(channel_index_, node_index_);
+}
+
 void TimestampMerger::PushTimestampHeap(
-    std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+    std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+        timestamp,
     SplitMessageReader *split_message_reader) {
   DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
                       [split_message_reader](
@@ -642,8 +739,9 @@
     std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
         next_oldest_message_reader = message_heap_.front();
 
-    std::tuple<monotonic_clock::time_point, uint32_t> next_oldest_message_time =
-        std::get<2>(next_oldest_message_reader)->oldest_message(channel_index_);
+    std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+        next_oldest_message_time = std::get<2>(next_oldest_message_reader)
+                                       ->oldest_message(channel_index_);
 
     if (std::get<0>(next_oldest_message_time) == std::get<0>(oldest_message) &&
         std::get<1>(next_oldest_message_time) == std::get<1>(oldest_message)) {
@@ -733,33 +831,52 @@
             std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
 
     while (true) {
-      // Ok, now try grabbing data until we find one which matches.
+      {
+        // Ok, now try grabbing data until we find one which matches.
+        std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+            oldest_message_ref = oldest_message();
+
+        // Time at which the message was sent (this message is written from the
+        // sending node's perspective.
+        monotonic_clock::time_point remote_monotonic_time(chrono::nanoseconds(
+            std::get<2>(oldest_message_ref)->monotonic_sent_time()));
+
+        if (remote_monotonic_time < remote_timestamp_monotonic_time) {
+          LOG(INFO) << "Undelivered message, skipping.  Remote time is "
+                    << remote_monotonic_time << " timestamp is "
+                    << remote_timestamp_monotonic_time << " on channel "
+                    << channel_index_;
+          PopMessageHeap();
+          continue;
+        } else if (remote_monotonic_time > remote_timestamp_monotonic_time) {
+          LOG(INFO) << "Data not found.  Remote time should be "
+                    << remote_timestamp_monotonic_time << " on channel "
+                    << channel_index_;
+          return std::make_tuple(timestamp,
+                                 std::move(std::get<2>(oldest_timestamp)));
+        }
+
+        timestamp.monotonic_remote_time = remote_monotonic_time;
+      }
+
       std::tuple<monotonic_clock::time_point, uint32_t,
                  FlatbufferVector<MessageHeader>>
           oldest_message = PopMessageHeap();
 
-      // Time at which the message was sent (this message is written from the
-      // sending node's perspective.
-      monotonic_clock::time_point remote_monotonic_time(chrono::nanoseconds(
-          std::get<2>(oldest_message).message().monotonic_sent_time()));
-
-      if (remote_monotonic_time < remote_timestamp_monotonic_time) {
-        LOG(INFO) << "Undelivered message, skipping.  Remote time is "
-                  << remote_monotonic_time << " timestamp is "
-                  << remote_timestamp_monotonic_time << " on channel "
-                  << channel_index_;
-        continue;
-      }
-
-      timestamp.monotonic_remote_time = remote_monotonic_time;
       timestamp.realtime_remote_time =
           realtime_clock::time_point(chrono::nanoseconds(
               std::get<2>(oldest_message).message().realtime_sent_time()));
       timestamp.remote_queue_index =
           std::get<2>(oldest_message).message().queue_index();
 
-      CHECK_EQ(remote_monotonic_time, remote_timestamp_monotonic_time);
-      CHECK_EQ(timestamp.remote_queue_index, std::get<1>(oldest_timestamp));
+      CHECK_EQ(timestamp.monotonic_remote_time,
+               remote_timestamp_monotonic_time);
+
+      CHECK_EQ(timestamp.remote_queue_index,
+               std::get<2>(oldest_timestamp).message().remote_queue_index())
+          << ": " << FlatbufferToJson(&std::get<2>(oldest_timestamp).message())
+          << " data "
+          << FlatbufferToJson(&std::get<2>(oldest_message).message());
 
       return std::make_tuple(timestamp, std::get<2>(oldest_message));
     }
@@ -833,10 +950,14 @@
         if (!found_node) {
           found_node = true;
           log_file_header_ = CopyFlatBuffer(reader->log_file_header());
+          VLOG(1) << "Found log file " << reader->filename() << " with node "
+                  << FlatbufferToJson(reader->node()) << " start_time "
+                  << monotonic_start_time();
         } else {
           // And then make sure all the other files have matching headers.
-          CHECK(
-              CompareFlatBuffer(log_file_header(), reader->log_file_header()));
+          CHECK(CompareFlatBuffer(log_file_header(), reader->log_file_header()))
+              << ": " << FlatbufferToJson(log_file_header()) << " reader "
+              << FlatbufferToJson(reader->log_file_header());
         }
       }
     }
@@ -859,18 +980,10 @@
   }
 
   // And prime everything.
-  size_t split_message_reader_index = 0;
   for (std::unique_ptr<SplitMessageReader> &split_message_reader :
        split_message_readers_) {
-    if (split_message_reader->QueueMessages(
-            split_message_reader->monotonic_start_time())) {
-      split_message_reader_heap_.push_back(std::make_pair(
-          split_message_reader->queue_data_time(), split_message_reader_index));
-
-      std::push_heap(split_message_reader_heap_.begin(),
-                     split_message_reader_heap_.end(), ChannelHeapCompare);
-    }
-    ++split_message_reader_index;
+    split_message_reader->QueueMessages(
+        split_message_reader->monotonic_start_time());
   }
 
   node_ = configuration::GetNodeOrDie(configuration(), target_node);
@@ -920,51 +1033,138 @@
 
   TimestampMerger *merger = &timestamp_mergers_[channel_index];
 
-  // Merger auto-pushes from here, but doesn't fetch anything new from the log
-  // file.
+  // Merger handles any queueing needed from here.
   std::tuple<TimestampMerger::DeliveryTimestamp,
              FlatbufferVector<MessageHeader>>
       message = merger->PopOldest();
 
-  QueueMessages(OldestMessage());
-
   return std::make_tuple(std::get<0>(message), channel_index,
                          std::move(std::get<1>(message)));
 }
 
-void ChannelMerger::QueueMessages(
-    monotonic_clock::time_point oldest_message_time) {
-  // Pop and re-queue readers until they are all caught up.
-  while (true) {
-    if (split_message_reader_heap_.size() == 0) {
-      return;
+std::string SplitMessageReader::MessageHeaderQueue::DebugString() const {
+  std::stringstream ss;
+  for (size_t i = 0; i < data_.size(); ++i) {
+    if (timestamps) {
+      ss << "        msg: ";
+    } else {
+      ss << "        timestamp: ";
     }
-    std::pair<monotonic_clock::time_point, int> oldest_channel_data =
-        split_message_reader_heap_.front();
-
-    // No work to do, bail.
-    if (oldest_channel_data.first > oldest_message_time) {
-      return;
+    ss << monotonic_clock::time_point(std::chrono::nanoseconds(
+              data_[i].message().monotonic_sent_time()))
+       << " ("
+       << realtime_clock::time_point(
+              std::chrono::nanoseconds(data_[i].message().realtime_sent_time()))
+       << ") " << data_[i].message().queue_index();
+    if (timestamps) {
+      ss << "  <- remote "
+         << monotonic_clock::time_point(std::chrono::nanoseconds(
+                data_[i].message().monotonic_remote_time()))
+         << " ("
+         << realtime_clock::time_point(std::chrono::nanoseconds(
+                data_[i].message().realtime_remote_time()))
+         << ")";
     }
+    ss << "\n";
+  }
 
-    // Drop it off the heap.
-    std::pop_heap(split_message_reader_heap_.begin(),
-                  split_message_reader_heap_.end(), &ChannelHeapCompare);
-    split_message_reader_heap_.pop_back();
+  return ss.str();
+}
 
-    // And if there is data left in the log file, push it back on the heap with
-    // the updated time.
-    const int split_message_reader_index = oldest_channel_data.second;
-    if (split_message_readers_[split_message_reader_index]->QueueMessages(
-            oldest_message_time)) {
-      split_message_reader_heap_.push_back(std::make_pair(
-          split_message_readers_[split_message_reader_index]->queue_data_time(),
-          split_message_reader_index));
+std::string SplitMessageReader::DebugString(int channel) const {
+  std::stringstream ss;
+  ss << "[\n";
+  ss << channels_[channel].data.DebugString();
+  ss << "      ]";
+  return ss.str();
+}
 
-      std::push_heap(split_message_reader_heap_.begin(),
-                     split_message_reader_heap_.end(), ChannelHeapCompare);
+std::string SplitMessageReader::DebugString(int channel, int node_index) const {
+  std::stringstream ss;
+  ss << "[\n";
+  ss << channels_[channel].timestamps[node_index].DebugString();
+  ss << "      ]";
+  return ss.str();
+}
+
+std::string TimestampMerger::DebugString() const {
+  std::stringstream ss;
+
+  if (timestamp_heap_.size() > 0) {
+    ss << "    timestamp_heap {\n";
+    std::vector<
+        std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
+        timestamp_heap = timestamp_heap_;
+    while (timestamp_heap.size() > 0u) {
+      std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+          oldest_timestamp_reader = timestamp_heap.front();
+
+      ss << "      " << std::get<2>(oldest_timestamp_reader) << " "
+         << std::get<0>(oldest_timestamp_reader) << " queue_index ("
+         << std::get<1>(oldest_timestamp_reader) << ") ttq "
+         << std::get<2>(oldest_timestamp_reader)->time_to_queue() << " "
+         << std::get<2>(oldest_timestamp_reader)->filename() << " -> "
+         << std::get<2>(oldest_timestamp_reader)
+                ->DebugString(channel_index_, node_index_)
+         << "\n";
+
+      std::pop_heap(timestamp_heap.begin(), timestamp_heap.end(),
+                    &SplitMessageReaderHeapCompare);
+      timestamp_heap.pop_back();
+    }
+    ss << "    }\n";
+  }
+
+  ss << "    message_heap {\n";
+  {
+    std::vector<
+        std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
+        message_heap = message_heap_;
+    while (message_heap.size() > 0u) {
+      std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+          oldest_message_reader = message_heap.front();
+
+      ss << "      " << std::get<2>(oldest_message_reader) << " "
+         << std::get<0>(oldest_message_reader) << " queue_index ("
+         << std::get<1>(oldest_message_reader) << ") ttq "
+         << std::get<2>(oldest_message_reader)->time_to_queue() << " "
+         << std::get<2>(oldest_message_reader)->filename() << " -> "
+         << std::get<2>(oldest_message_reader)->DebugString(channel_index_)
+         << "\n";
+
+      std::pop_heap(message_heap.begin(), message_heap.end(),
+                    &SplitMessageReaderHeapCompare);
+      message_heap.pop_back();
     }
   }
+  ss << "    }";
+
+  return ss.str();
+}
+
+std::string ChannelMerger::DebugString() const {
+  std::stringstream ss;
+  ss << "start_time " << realtime_start_time() << " " << monotonic_start_time()
+     << "\n";
+  ss << "channel_heap {\n";
+  std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
+      channel_heap_;
+  while (channel_heap.size() > 0u) {
+    std::tuple<monotonic_clock::time_point, int> channel = channel_heap.front();
+    ss << "  " << std::get<0>(channel) << " (" << std::get<1>(channel) << ") "
+       << configuration::CleanedChannelToString(
+              configuration()->channels()->Get(std::get<1>(channel)))
+       << "\n";
+
+    ss << timestamp_mergers_[std::get<1>(channel)].DebugString() << "\n";
+
+    std::pop_heap(channel_heap.begin(), channel_heap.end(),
+                  &ChannelHeapCompare);
+    channel_heap.pop_back();
+  }
+  ss << "}";
+
+  return ss.str();
 }
 
 }  // namespace logger
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 9a849b2..e5e0175 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -163,6 +163,7 @@
     return max_out_of_order_duration_;
   }
 
+  // Returns the newest timestamp read out of the log file.
   monotonic_clock::time_point newest_timestamp() const {
     return newest_timestamp_;
   }
@@ -218,15 +219,15 @@
 
   // Returns the (timestamp, queue_idex) for the oldest message in a channel, or
   // max_time if there is nothing in the channel.
-  std::tuple<monotonic_clock::time_point, uint32_t> oldest_message(
-      int channel) {
+  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+  oldest_message(int channel) {
     return channels_[channel].data.front_timestamp();
   }
 
   // Returns the (timestamp, queue_index) for the oldest delivery time in a
   // channel, or max_time if there is nothing in the channel.
-  std::tuple<monotonic_clock::time_point, uint32_t> oldest_message(
-      int channel, int destination_node) {
+  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+  oldest_message(int channel, int destination_node) {
     return channels_[channel].timestamps[destination_node].front_timestamp();
   }
 
@@ -278,18 +279,29 @@
   // Returns the timestamp of the newest message read from the log file, and the
   // timestamp that we need to re-queue data.
   monotonic_clock::time_point newest_timestamp() const {
-    return message_reader_->newest_timestamp();
-  }
-  monotonic_clock::time_point queue_data_time() const {
-    return message_reader_->queue_data_time();
+    return newest_timestamp_;
   }
 
+  // Returns the next time to trigger a requeue.
+  monotonic_clock::time_point time_to_queue() const { return time_to_queue_; }
+
+  // Returns the minimum amount of data needed to queue up for sorting before
+  // ware guarenteed to not see data out of order.
+  std::chrono::nanoseconds max_out_of_order_duration() const {
+    return message_reader_->max_out_of_order_duration();
+  }
+
+  std::string_view filename() const { return message_reader_->filename(); }
 
   // Adds more messages to the sorted list.  This reads enough data such that
   // oldest_message_time can be replayed safely.  Returns false if the log file
   // has all been read.
   bool QueueMessages(monotonic_clock::time_point oldest_message_time);
 
+  // Returns debug strings for a channel, and timestamps for a node.
+  std::string DebugString(int channel) const;
+  std::string DebugString(int channel, int node_index) const;
+
  private:
   // TODO(austin): Need to copy or refcount the message instead of running
   // multiple copies of the reader.  Or maybe have a "as_node" index and hide it
@@ -320,8 +332,9 @@
       return data_.front();
     }
 
-    // Adds a message to the back of the queue.
-    void emplace_back(FlatbufferVector<MessageHeader> &&msg);
+    // Adds a message to the back of the queue. Returns true if it was actually
+    // emplaced.
+    bool emplace_back(FlatbufferVector<MessageHeader> &&msg);
 
     // Drops the front message.  Invalidates the front() reference.
     void pop_front();
@@ -329,13 +342,18 @@
     // The size of the queue.
     size_t size() { return data_.size(); }
 
+    // Returns a debug string with info about each message in the queue.
+    std::string DebugString() const;
+
     // Returns the (timestamp, queue_index) for the oldest message.
-    const std::tuple<monotonic_clock::time_point, uint32_t> front_timestamp() {
+    const std::tuple<monotonic_clock::time_point, uint32_t,
+                     const MessageHeader *>
+    front_timestamp() {
       CHECK_GT(data_.size(), 0u);
       return std::make_tuple(
           monotonic_clock::time_point(std::chrono::nanoseconds(
               front().message().monotonic_sent_time())),
-          front().message().queue_index());
+          front().message().queue_index(), &front().message());
     }
 
     // Pointer to the timestamp merger for this queue if available.
@@ -365,8 +383,16 @@
   // Precompute this here for efficiency.
   std::vector<MessageHeaderQueue *> channels_to_write_;
 
-  // Number of messages queued.
-  size_t queued_messages_ = 0;
+  monotonic_clock::time_point time_to_queue_ = monotonic_clock::min_time;
+
+  // Latches true when we hit the end of the last log file and there is no sense
+  // poking it further.
+  bool at_end_ = false;
+
+  // Timestamp of the newest message that was read and actually queued.  We want
+  // to track this independently from the log file because we need the
+  // timestamps here to be timestamps of messages that are queued.
+  monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
 };
 
 class ChannelMerger;
@@ -396,7 +422,8 @@
   // the reader.
   void UpdateTimestamp(
       SplitMessageReader *split_message_reader,
-      std::tuple<monotonic_clock::time_point, uint32_t> oldest_message_time) {
+      std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+          oldest_message_time) {
     PushTimestampHeap(oldest_message_time, split_message_reader);
   }
   // Pushes SplitMessageReader onto the message heap.  This should only be
@@ -404,11 +431,14 @@
   // reader.
   void Update(
       SplitMessageReader *split_message_reader,
-      std::tuple<monotonic_clock::time_point, uint32_t> oldest_message_time) {
+      std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+          oldest_message_time) {
     PushMessageHeap(oldest_message_time, split_message_reader);
   }
 
-  // Returns the oldest combined timestamp and data for this channel.
+  // Returns the oldest combined timestamp and data for this channel.  If there
+  // isn't a matching piece of data, returns only the timestamp with no data.
+  // The caller can determine what the appropriate action is to recover.
   std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
 
   // Tracks if the channel merger has pushed this onto it's heap or not.
@@ -417,13 +447,18 @@
   // called by the channel merger.
   void set_pushed(bool pushed) { pushed_ = pushed; }
 
+  // Returns a debug string with the heaps printed out.
+  std::string DebugString() const;
+
  private:
   // Pushes messages and timestamps to the corresponding heaps.
   void PushMessageHeap(
-      std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+      std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+          timestamp,
       SplitMessageReader *split_message_reader);
   void PushTimestampHeap(
-      std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+      std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+          timestamp,
       SplitMessageReader *split_message_reader);
 
   // Pops a message from the message heap.  This automatically triggers the
@@ -431,6 +466,11 @@
   std::tuple<monotonic_clock::time_point, uint32_t,
              FlatbufferVector<MessageHeader>>
   PopMessageHeap();
+
+  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+  oldest_message() const;
+  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+  oldest_timestamp() const;
   // Pops a message from the timestamp heap.  This automatically triggers the
   // split message reader to re-fetch any new data.
   std::tuple<monotonic_clock::time_point, uint32_t,
@@ -506,11 +546,11 @@
   }
 
   // Returns the start times for the configured node's log files.
-  monotonic_clock::time_point monotonic_start_time() {
+  monotonic_clock::time_point monotonic_start_time() const {
     return monotonic_clock::time_point(
         std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
   }
-  realtime_clock::time_point realtime_start_time() {
+  realtime_clock::time_point realtime_start_time() const {
     return realtime_clock::time_point(
         std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
   }
@@ -524,11 +564,11 @@
     PushChannelHeap(timestamp, channel_index);
   }
 
- private:
-  // Queues messages from each SplitMessageReader until enough data is queued
-  // such that we can guarentee all sorting has happened.
-  void QueueMessages(monotonic_clock::time_point oldest_message_time);
+  // Returns a debug string with all the heaps in it.  Generally only useful for
+  // debugging what went wrong.
+  std::string DebugString() const;
 
+ private:
   // Pushes the timestamp for new data on the provided channel.
   void PushChannelHeap(monotonic_clock::time_point timestamp,
                        int channel_index);
@@ -545,11 +585,6 @@
   // A heap of the channel readers and timestamps for the oldest data in each.
   std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
 
-  // This holds a heap of split_message_readers sorted by the time at which they
-  // need to have QueueMessages called on them.
-  std::vector<std::pair<monotonic_clock::time_point, int>>
-      split_message_reader_heap_;
-
   // Configured node.
   const Node *node_;
 
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 84e1f43..c7f64ae 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -96,7 +96,8 @@
     realtime_start_time_ = event_loop_->realtime_now();
     last_synchronized_time_ = monotonic_start_time_;
 
-    LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node());
+    LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
+              << " start_time " << monotonic_start_time_;
 
     WriteHeader();
 
@@ -105,6 +106,10 @@
   });
 }
 
+// TODO(austin): Set the remote start time to the first time we see a remote
+// message when we are logging those messages separate?  Need to signal what to
+// do, or how to get a good timestamp.
+
 void Logger::WriteHeader() {
   for (const Node *node : log_namer_->nodes()) {
     WriteHeader(node);
@@ -224,12 +229,11 @@
             fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
                                                f.channel_index, f.log_type));
 
-            VLOG(1) << "Writing data as node "
+            VLOG(2) << "Writing data as node "
                     << FlatbufferToJson(event_loop_->node()) << " for channel "
                     << configuration::CleanedChannelToString(
                            f.fetcher->channel())
-                    << " to " << f.writer->filename()
-                    << " data "
+                    << " to " << f.writer->filename() << " data "
                     << FlatbufferToJson(
                            flatbuffers::GetSizePrefixedRoot<MessageHeader>(
                                fbb.GetBufferPointer()));
@@ -248,12 +252,11 @@
                                                f.channel_index,
                                                LogType::kLogDeliveryTimeOnly));
 
-            VLOG(1) << "Writing timestamps as node "
+            VLOG(2) << "Writing timestamps as node "
                     << FlatbufferToJson(event_loop_->node()) << " for channel "
                     << configuration::CleanedChannelToString(
                            f.fetcher->channel())
-                    << " to " << f.timestamp_writer->filename()
-                    << " timestamp "
+                    << " to " << f.timestamp_writer->filename() << " timestamp "
                     << FlatbufferToJson(
                            flatbuffers::GetSizePrefixedRoot<MessageHeader>(
                                fbb.GetBufferPointer()));
@@ -342,10 +345,6 @@
 
 void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
   event_loop_factory_ = event_loop_factory;
-  // We want to start the log file at the last start time of the log files from
-  // all the nodes.  Compute how long each node's simulation needs to run to
-  // move time to this point.
-  monotonic_clock::duration run_time = monotonic_clock::duration(0);
 
   for (const Node *node : configuration::GetNodes(configuration())) {
     auto it = channel_mergers_.insert(std::make_pair(node, State{}));
@@ -360,15 +359,50 @@
         event_loop_factory->MakeEventLoop("log_reader", node);
 
     Register(state->event_loop_unique_ptr.get());
+  }
 
-    const monotonic_clock::duration startup_time =
-        state->channel_merger->monotonic_start_time() -
-        state->event_loop->monotonic_now();
-    if (startup_time > run_time) {
-      run_time = startup_time;
+  // Basic idea is that we want to
+  //   1) Find the node which booted first.
+  //   2) Setup the clocks so that each clock is at the time it would be at when
+  //      the first node booted.
+
+  realtime_clock::time_point earliest_boot_time = realtime_clock::max_time;
+  for (std::pair<const Node *const, State> &state_pair : channel_mergers_) {
+    State *state = &(state_pair.second);
+
+    const realtime_clock::time_point boot_time =
+        state->channel_merger->realtime_start_time() -
+        state->channel_merger->monotonic_start_time().time_since_epoch();
+
+    if (boot_time < earliest_boot_time) {
+      earliest_boot_time = boot_time;
     }
   }
 
+  // We want to start the log file at the last start time of the log files from
+  // all the nodes.  Compute how long each node's simulation needs to run to
+  // move time to this point.
+  monotonic_clock::duration run_time = monotonic_clock::duration(0);
+
+  for (std::pair<const Node *const, State> &state_pair : channel_mergers_) {
+    State *state = &(state_pair.second);
+
+    const realtime_clock::time_point boot_time =
+        state->channel_merger->realtime_start_time() -
+        state->channel_merger->monotonic_start_time().time_since_epoch();
+
+    // And start each node's clocks so the realtime clocks line up for the start
+    // times.  This will let us start using it, but isn't good enough.
+    state->node_event_loop_factory->SetMonotonicNow(
+        monotonic_clock::time_point(earliest_boot_time - boot_time));
+    state->node_event_loop_factory->SetRealtimeOffset(
+        state->channel_merger->monotonic_start_time(),
+        state->channel_merger->realtime_start_time());
+    run_time =
+        std::max(run_time, state->channel_merger->monotonic_start_time() -
+                               state->node_event_loop_factory->monotonic_now());
+  }
+
   // Forwarding is tracked per channel.  If it is enabled, we want to turn it
   // off.  Otherwise messages replayed will get forwarded across to the other
   // nodes, and also replayed on the other nodes.  This may not satisfy all our
@@ -390,7 +424,14 @@
     }
   }
 
+  // While we are starting the system up, we might be relying on matching data
+  // to timestamps on log files where the timestamp log file starts before the
+  // data.  In this case, it is reasonable to expect missing data.
+  ignore_missing_data_ = true;
   event_loop_factory_->RunFor(run_time);
+  // Now that we are running for real, missing data means that the log file is
+  // corrupted or went wrong.
+  ignore_missing_data_ = false;
 }
 
 void LogReader::Register(EventLoop *event_loop) {
@@ -443,7 +484,7 @@
     if (channel_timestamp.monotonic_event_time >
             state->channel_merger->monotonic_start_time() ||
         event_loop_factory_ != nullptr) {
-      if (!FLAGS_skip_missing_forwarding_entries ||
+      if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries) ||
           channel_data.message().data() != nullptr) {
         CHECK(channel_data.message().data() != nullptr)
             << ": Got a message without data.  Forwarding entry which was "
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 5e3ca52..e0350bb 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -395,6 +395,11 @@
 
   const Configuration *remapped_configuration_ = nullptr;
   const Configuration *replay_configuration_ = nullptr;
+
+  // If true, the replay timer will ignore any missing data.  This is used
+  // during startup when we are bootstrapping everything and trying to get to
+  // the start of all the log files.
+  bool ignore_missing_data_ = false;
 };
 
 }  // namespace logger
diff --git a/aos/events/logging/logger_main.cc b/aos/events/logging/logger_main.cc
index 74e9f65..5288e9b 100644
--- a/aos/events/logging/logger_main.cc
+++ b/aos/events/logging/logger_main.cc
@@ -22,8 +22,13 @@
 
   aos::ShmEventLoop event_loop(&config.message());
 
-  aos::logger::DetachedBufferWriter writer(aos::logging::GetLogName("fbs_log"));
-  aos::logger::Logger logger(&writer, &event_loop);
+  std::unique_ptr<aos::logger::LogNamer> log_namer =
+      std::make_unique<aos::logger::MultiNodeLogNamer>(
+          aos::logging::GetLogName("fbs_log"), event_loop.configuration(),
+          event_loop.node());
+
+  aos::logger::Logger logger(std::move(log_namer), &event_loop,
+                             std::chrono::milliseconds(100));
 
   event_loop.Run();
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 3a969d4..9f2969e 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -210,26 +210,17 @@
       : config_(aos::configuration::ReadConfig(
             "aos/events/logging/multinode_pingpong_config.json")),
         event_loop_factory_(&config_.message()),
-        ping_event_loop_(event_loop_factory_.MakeEventLoop(
-            "ping", configuration::GetNode(event_loop_factory_.configuration(),
-                                           "pi1"))),
-        ping_(ping_event_loop_.get()),
-        pong_event_loop_(event_loop_factory_.MakeEventLoop(
-            "pong", configuration::GetNode(event_loop_factory_.configuration(),
-                                           "pi2"))),
-        pong_(pong_event_loop_.get()) {}
+        pi1_(
+            configuration::GetNode(event_loop_factory_.configuration(), "pi1")),
+        pi2_(configuration::GetNode(event_loop_factory_.configuration(),
+                                    "pi2")) {}
 
   // Config and factory.
   aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
   SimulatedEventLoopFactory event_loop_factory_;
 
-  // Event loop and app for Ping
-  std::unique_ptr<EventLoop> ping_event_loop_;
-  Ping ping_;
-
-  // Event loop and app for Pong
-  std::unique_ptr<EventLoop> pong_event_loop_;
-  Pong pong_;
+  const Node *pi1_;
+  const Node *pi2_;
 };
 
 // Counts the number of messages on a channel (returns channel, count) for every
@@ -292,8 +283,8 @@
   });
 }
 
-// Tests that we can write and read multi-node log files correctly.
-TEST_F(MultinodeLoggerTest, MultiNode) {
+// Tests that we can write and read simple multi-node log files.
+TEST_F(MultinodeLoggerTest, SimpleMultiNode) {
   const ::std::string tmpdir(getenv("TEST_TMPDIR"));
   const ::std::string logfile_base = tmpdir + "/multi_logfile";
   const ::std::string logfile1 = logfile_base + "_pi1_data.bfbs";
@@ -310,20 +301,22 @@
             << logfile3;
 
   {
-    const Node *pi1 =
-        configuration::GetNode(event_loop_factory_.configuration(), "pi1");
-    const Node *pi2 =
-        configuration::GetNode(event_loop_factory_.configuration(), "pi2");
+    std::unique_ptr<EventLoop> ping_event_loop =
+        event_loop_factory_.MakeEventLoop("ping", pi1_);
+    Ping ping(ping_event_loop.get());
+    std::unique_ptr<EventLoop> pong_event_loop =
+        event_loop_factory_.MakeEventLoop("pong", pi2_);
+    Pong pong(pong_event_loop.get());
 
     std::unique_ptr<EventLoop> pi1_logger_event_loop =
-        event_loop_factory_.MakeEventLoop("logger", pi1);
+        event_loop_factory_.MakeEventLoop("logger", pi1_);
     std::unique_ptr<LogNamer> pi1_log_namer =
         std::make_unique<MultiNodeLogNamer>(
             logfile_base, pi1_logger_event_loop->configuration(),
             pi1_logger_event_loop->node());
 
     std::unique_ptr<EventLoop> pi2_logger_event_loop =
-        event_loop_factory_.MakeEventLoop("logger", pi2);
+        event_loop_factory_.MakeEventLoop("logger", pi2_);
     std::unique_ptr<LogNamer> pi2_log_namer =
         std::make_unique<MultiNodeLogNamer>(
             logfile_base, pi2_logger_event_loop->configuration(),
@@ -333,6 +326,7 @@
 
     Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
                       std::chrono::milliseconds(100));
+
     Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
                       std::chrono::milliseconds(100));
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
@@ -349,7 +343,7 @@
 
     // Timing reports, pings
     EXPECT_THAT(CountChannelsData(logfile1),
-                ::testing::ElementsAre(::testing::Pair(1, 60),
+                ::testing::ElementsAre(::testing::Pair(1, 40),
                                        ::testing::Pair(4, 2001)));
     // Timestamps for pong
     EXPECT_THAT(CountChannelsTimestamp(logfile1),
@@ -363,16 +357,15 @@
 
     // Timing reports and pongs.
     EXPECT_THAT(CountChannelsData(logfile3),
-                ::testing::ElementsAre(::testing::Pair(3, 60),
+                ::testing::ElementsAre(::testing::Pair(3, 40),
                                        ::testing::Pair(5, 2001)));
     // And ping timestamps.
     EXPECT_THAT(CountChannelsTimestamp(logfile3),
                 ::testing::ElementsAre(::testing::Pair(4, 2001)));
   }
 
-  LogReader reader({std::vector<std::string>{logfile1},
-                    std::vector<std::string>{logfile2},
-                    std::vector<std::string>{logfile3}});
+  LogReader reader(
+      {std::vector<std::string>{logfile1}, std::vector<std::string>{logfile3}});
 
   SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -512,9 +505,294 @@
   reader.Deregister();
 }
 
+// Tests that we can read log files where they don't start at the same monotonic
+// time.
+TEST_F(MultinodeLoggerTest, StaggeredStart) {
+  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string logfile_base = tmpdir + "/multi_logfile";
+  const ::std::string logfile1 = logfile_base + "_pi1_data.bfbs";
+  const ::std::string logfile2 =
+      logfile_base + "_pi2_data/test/aos.examples.Pong.bfbs";
+  const ::std::string logfile3 = logfile_base + "_pi2_data.bfbs";
+
+  // Remove them.
+  unlink(logfile1.c_str());
+  unlink(logfile2.c_str());
+  unlink(logfile3.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile1 << " and " << logfile3;
+
+  {
+    std::unique_ptr<EventLoop> ping_event_loop =
+        event_loop_factory_.MakeEventLoop("ping", pi1_);
+    Ping ping(ping_event_loop.get());
+    std::unique_ptr<EventLoop> pong_event_loop =
+        event_loop_factory_.MakeEventLoop("pong", pi2_);
+    Pong pong(pong_event_loop.get());
+
+    std::unique_ptr<EventLoop> pi1_logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger", pi1_);
+    std::unique_ptr<LogNamer> pi1_log_namer =
+        std::make_unique<MultiNodeLogNamer>(
+            logfile_base, pi1_logger_event_loop->configuration(),
+            pi1_logger_event_loop->node());
+
+    std::unique_ptr<EventLoop> pi2_logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger", pi2_);
+    std::unique_ptr<LogNamer> pi2_log_namer =
+        std::make_unique<MultiNodeLogNamer>(
+            logfile_base, pi2_logger_event_loop->configuration(),
+            pi2_logger_event_loop->node());
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
+                      std::chrono::milliseconds(100));
+
+    event_loop_factory_.RunFor(chrono::milliseconds(200));
+
+    Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
+                      std::chrono::milliseconds(100));
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+
+  LogReader reader(
+      {std::vector<std::string>{logfile1}, std::vector<std::string>{logfile3}});
+
+  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
+  log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(pi1, pi2));
+
+  reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
+
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+
+  int pi1_ping_count = 30;
+  int pi2_ping_count = 30;
+  int pi1_pong_count = 30;
+  int pi2_pong_count = 30;
+
+  // Confirm that the ping value matches.
+  pi1_event_loop->MakeWatcher(
+      "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
+        VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
+                << pi1_event_loop->context().monotonic_remote_time << " -> "
+                << pi1_event_loop->context().monotonic_event_time;
+        EXPECT_EQ(ping.value(), pi1_ping_count + 1);
+
+        ++pi1_ping_count;
+      });
+  pi2_event_loop->MakeWatcher(
+      "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
+        VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
+                << pi2_event_loop->context().monotonic_remote_time << " -> "
+                << pi2_event_loop->context().monotonic_event_time;
+        EXPECT_EQ(ping.value(), pi2_ping_count + 1);
+
+        ++pi2_ping_count;
+      });
+
+  // Confirm that the ping and pong counts both match, and the value also
+  // matches.
+  pi1_event_loop->MakeWatcher(
+      "/test", [&pi1_event_loop, &pi1_ping_count,
+                &pi1_pong_count](const examples::Pong &pong) {
+        VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
+                << pi1_event_loop->context().monotonic_remote_time << " -> "
+                << pi1_event_loop->context().monotonic_event_time;
+
+        EXPECT_EQ(pong.value(), pi1_pong_count + 1);
+        ++pi1_pong_count;
+        EXPECT_EQ(pi1_ping_count, pi1_pong_count);
+      });
+  pi2_event_loop->MakeWatcher(
+      "/test", [&pi2_event_loop, &pi2_ping_count,
+                &pi2_pong_count](const examples::Pong &pong) {
+        VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
+                << pi2_event_loop->context().monotonic_remote_time << " -> "
+                << pi2_event_loop->context().monotonic_event_time;
+
+        EXPECT_EQ(pong.value(), pi2_pong_count + 1);
+        ++pi2_pong_count;
+        EXPECT_EQ(pi2_ping_count, pi2_pong_count);
+      });
+
+  log_reader_factory.Run();
+  EXPECT_EQ(pi1_ping_count, 2030);
+  EXPECT_EQ(pi2_ping_count, 2030);
+  EXPECT_EQ(pi1_pong_count, 2030);
+  EXPECT_EQ(pi2_pong_count, 2030);
+
+  reader.Deregister();
+}
 // TODO(austin): We can write a test which recreates a logfile and confirms that
 // we get it back.  That is the ultimate test.
 
+// Tests that we can read log files where the monotonic clocks don't match
+// correctly.
+TEST_F(MultinodeLoggerTest, MissmatchingTimeStart) {
+  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string logfile_base = tmpdir + "/multi_logfile";
+  const ::std::string logfile1 = logfile_base + "_pi1_data.bfbs";
+  const ::std::string logfile2 =
+      logfile_base + "_pi2_data/test/aos.examples.Pong.bfbs";
+  const ::std::string logfile3 = logfile_base + "_pi2_data.bfbs";
+
+  // Remove them.
+  unlink(logfile1.c_str());
+  unlink(logfile2.c_str());
+  unlink(logfile3.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile1 << " and " << logfile3;
+
+  {
+   NodeEventLoopFactory *pi2 = event_loop_factory_.GetNodeEventLoopFactory(pi2_);
+   LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
+             << pi2->realtime_now() << " distributed "
+             << pi2->ToDistributedClock(pi2->monotonic_now());
+
+   pi2->SetMonotonicNow(pi2->monotonic_now() + std::chrono::seconds(1000));
+   LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
+             << pi2->realtime_now() << " distributed "
+             << pi2->ToDistributedClock(pi2->monotonic_now());
+
+   std::unique_ptr<EventLoop> ping_event_loop =
+       event_loop_factory_.MakeEventLoop("ping", pi1_);
+   Ping ping(ping_event_loop.get());
+   std::unique_ptr<EventLoop> pong_event_loop =
+       event_loop_factory_.MakeEventLoop("pong", pi2_);
+   Pong pong(pong_event_loop.get());
+
+   std::unique_ptr<EventLoop> pi1_logger_event_loop =
+       event_loop_factory_.MakeEventLoop("logger", pi1_);
+   std::unique_ptr<LogNamer> pi1_log_namer =
+       std::make_unique<MultiNodeLogNamer>(
+           logfile_base, pi1_logger_event_loop->configuration(),
+           pi1_logger_event_loop->node());
+
+   std::unique_ptr<EventLoop> pi2_logger_event_loop =
+       event_loop_factory_.MakeEventLoop("logger", pi2_);
+   std::unique_ptr<LogNamer> pi2_log_namer =
+       std::make_unique<MultiNodeLogNamer>(
+           logfile_base, pi2_logger_event_loop->configuration(),
+           pi2_logger_event_loop->node());
+
+   event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+   Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
+                     std::chrono::milliseconds(100));
+
+   event_loop_factory_.RunFor(chrono::milliseconds(200));
+
+   Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
+                     std::chrono::milliseconds(100));
+   event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+
+  LogReader reader(
+      {std::vector<std::string>{logfile1}, std::vector<std::string>{logfile3}});
+
+  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
+  log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  LOG(INFO) << "Done registering (pi1) "
+            << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now() << " "
+            << log_reader_factory.GetNodeEventLoopFactory(pi1)->realtime_now();
+  LOG(INFO) << "Done registering (pi2) "
+            << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now() << " "
+            << log_reader_factory.GetNodeEventLoopFactory(pi2)->realtime_now();
+
+  EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(pi1, pi2));
+
+  reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
+
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+
+  int pi1_ping_count = 30;
+  int pi2_ping_count = 30;
+  int pi1_pong_count = 30;
+  int pi2_pong_count = 30;
+
+  // Confirm that the ping value matches.
+  pi1_event_loop->MakeWatcher(
+      "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
+        VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
+                << pi1_event_loop->context().monotonic_remote_time << " -> "
+                << pi1_event_loop->context().monotonic_event_time;
+        EXPECT_EQ(ping.value(), pi1_ping_count + 1);
+
+        ++pi1_ping_count;
+      });
+  pi2_event_loop->MakeWatcher(
+      "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
+        VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
+                << pi2_event_loop->context().monotonic_remote_time << " -> "
+                << pi2_event_loop->context().monotonic_event_time;
+        EXPECT_EQ(ping.value(), pi2_ping_count + 1);
+
+        ++pi2_ping_count;
+      });
+
+  // Confirm that the ping and pong counts both match, and the value also
+  // matches.
+  pi1_event_loop->MakeWatcher(
+      "/test", [&pi1_event_loop, &pi1_ping_count,
+                &pi1_pong_count](const examples::Pong &pong) {
+        VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
+                << pi1_event_loop->context().monotonic_remote_time << " -> "
+                << pi1_event_loop->context().monotonic_event_time;
+
+        EXPECT_EQ(pong.value(), pi1_pong_count + 1);
+        ++pi1_pong_count;
+        EXPECT_EQ(pi1_ping_count, pi1_pong_count);
+      });
+  pi2_event_loop->MakeWatcher(
+      "/test", [&pi2_event_loop, &pi2_ping_count,
+                &pi2_pong_count](const examples::Pong &pong) {
+        VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
+                << pi2_event_loop->context().monotonic_remote_time << " -> "
+                << pi2_event_loop->context().monotonic_event_time;
+
+        EXPECT_EQ(pong.value(), pi2_pong_count + 1);
+        ++pi2_pong_count;
+        EXPECT_EQ(pi2_ping_count, pi2_pong_count);
+      });
+
+  log_reader_factory.Run();
+  EXPECT_EQ(pi1_ping_count, 2030);
+  EXPECT_EQ(pi2_ping_count, 2030);
+  EXPECT_EQ(pi1_pong_count, 2030);
+  EXPECT_EQ(pi2_pong_count, 2030);
+
+  reader.Deregister();
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 21b241a..9dd3d1f 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -154,6 +154,14 @@
   inline distributed_clock::time_point ToDistributedClock(
       monotonic_clock::time_point time) const;
 
+  // Note: use this very very carefully.  It can cause massive problems.  This
+  // needs to go away as we properly handle time drifting between nodes.
+  void SetMonotonicNow(monotonic_clock::time_point monotonic_now) {
+    monotonic_clock::duration offset = (monotonic_now - this->monotonic_now());
+    monotonic_offset_ += offset;
+    realtime_offset_ -= offset;
+  }
+
  private:
   friend class SimulatedEventLoopFactory;
   NodeEventLoopFactory(
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 10ce37d..6b03b2f 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -109,10 +109,14 @@
 
   // Pre-build up event loops for every node.  They are pretty cheap anyways.
   for (const Node *node : simulated_event_loop_factory->nodes()) {
-    CHECK(event_loop_map_
-              .insert({node, simulated_event_loop_factory->MakeEventLoop(
-                                 "message_bridge", node)})
-              .second);
+    auto it = event_loop_map_.insert(
+        {node,
+         simulated_event_loop_factory->MakeEventLoop("message_bridge", node)});
+
+    CHECK(it.second);
+
+    it.first->second->SkipTimingReport();
+    it.first->second->SkipAosLog();
   }
 
   for (const Channel *channel :