Add multi-node local logging to the logger

This is not yet able to forward messages, but is able to log messages
that have been forwarded.  Create a log file and test that the
timestamps are getting recorded correctly.

Change-Id: Ica891dbc560543546f6ee594438cebb03672190e
diff --git a/aos/aos_dump.cc b/aos/aos_dump.cc
index e7d9d7c..8d6c2b1 100644
--- a/aos/aos_dump.cc
+++ b/aos/aos_dump.cc
@@ -8,6 +8,7 @@
 #include "gflags/gflags.h"
 
 DEFINE_string(config, "./config.json", "File path of aos configuration");
+
 int main(int argc, char **argv) {
   aos::InitGoogle(&argc, &argv);
 
@@ -43,13 +44,28 @@
     if (channel->name()->c_str() == channel_name &&
         channel->type()->str().find(message_type) != std::string::npos) {
       event_loop.MakeRawWatcher(
-          channel,
-          [channel](const aos::Context /* &context*/, const void *message) {
-            LOG(INFO) << '(' << channel->type()->c_str() << ") "
-                      << aos::FlatbufferToJson(
-                             channel->schema(),
-                             static_cast<const uint8_t *>(message))
-                      << '\n';
+          channel, [channel](const aos::Context &context, const void *message) {
+            // Print the flatbuffer out to stdout, both to remove the
+            // unnecessary cruft from glog and to allow the user to readily
+            // redirect just the logged output independent of any debugging
+            // information on stderr.
+            if (context.monotonic_remote_time != context.monotonic_event_time) {
+              std::cout << context.realtime_remote_time << " ("
+                        << context.monotonic_remote_time << ") delivered "
+                        << context.realtime_event_time << " ("
+                        << context.monotonic_event_time << "): "
+                        << aos::FlatbufferToJson(
+                               channel->schema(),
+                               static_cast<const uint8_t *>(message))
+                        << '\n';
+            } else {
+              std::cout << context.realtime_event_time << " ("
+                        << context.monotonic_event_time << "): "
+                        << aos::FlatbufferToJson(
+                               channel->schema(),
+                               static_cast<const uint8_t *>(message))
+                        << '\n';
+            }
           });
       found_channels++;
     }
diff --git a/aos/events/BUILD b/aos/events/BUILD
index dfad4ff..e40ab99 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -115,6 +115,16 @@
     deps = [":config"],
 )
 
+aos_config(
+    name = "multinode_pingpong_config",
+    src = "multinode_pingpong.json",
+    flatbuffers = [
+        ":ping_fbs",
+        ":pong_fbs",
+    ],
+    deps = [":config"],
+)
+
 cc_library(
     name = "pong_lib",
     srcs = [
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 94e25c2..80d8798 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -65,7 +65,10 @@
 cc_test(
     name = "logger_test",
     srcs = ["logger_test.cc"],
-    data = ["//aos/events:pingpong_config.json"],
+    data = [
+        "//aos/events:multinode_pingpong_config.json",
+        "//aos/events:pingpong_config.json",
+    ],
     deps = [
         ":logger",
         "//aos/events:ping_lib",
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index a53d337..45f0811 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -28,7 +28,8 @@
   aos::InitGoogle(&argc, &argv);
 
   aos::logger::LogReader reader(FLAGS_logfile);
-  aos::SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+  aos::SimulatedEventLoopFactory log_reader_factory(reader.configuration(),
+                                                    reader.node());
   reader.Register(&log_reader_factory);
 
   std::unique_ptr<aos::EventLoop> printer_event_loop =
@@ -44,6 +45,10 @@
     const flatbuffers::string_view type = channel->type()->string_view();
     if (name.find(FLAGS_name) != std::string::npos &&
         type.find(FLAGS_type) != std::string::npos) {
+      if (!aos::configuration::ChannelIsReadableOnNode(
+              channel, printer_event_loop->node())) {
+        continue;
+      }
       LOG(INFO) << "Listening on " << name << " " << type;
 
       CHECK_NOTNULL(channel->schema());
@@ -53,14 +58,27 @@
             // unnecessary cruft from glog and to allow the user to readily
             // redirect just the logged output independent of any debugging
             // information on stderr.
-            std::cout << context.realtime_event_time << " ("
-                      << context.monotonic_event_time << ") "
-                      << channel->name()->c_str() << ' '
-                      << channel->type()->c_str() << ": "
-                      << aos::FlatbufferToJson(
-                             channel->schema(),
-                             static_cast<const uint8_t *>(message))
-                      << '\n';
+            if (context.monotonic_remote_time != context.monotonic_event_time) {
+              std::cout << context.realtime_remote_time << " ("
+                        << context.monotonic_remote_time << ") delivered "
+                        << context.realtime_event_time << " ("
+                        << context.monotonic_event_time << ") "
+                        << channel->name()->c_str() << ' '
+                        << channel->type()->c_str() << ": "
+                        << aos::FlatbufferToJson(
+                               channel->schema(),
+                               static_cast<const uint8_t *>(message))
+                        << '\n';
+            } else {
+              std::cout << context.realtime_event_time << " ("
+                        << context.monotonic_event_time << ") "
+                        << channel->name()->c_str() << ' '
+                        << channel->type()->c_str() << ": "
+                        << aos::FlatbufferToJson(
+                               channel->schema(),
+                               static_cast<const uint8_t *>(message))
+                        << '\n';
+            }
           });
       found_channel = true;
     }
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index e35fe5b..6eae9e9 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -18,6 +18,9 @@
 
 DEFINE_int32(flush_size, 1000000,
              "Number of outstanding bytes to allow before flushing to disk.");
+DEFINE_bool(skip_missing_forwarding_entries, false,
+            "If true, drop any forwarding entries with missing data.  If "
+            "false, CHECK.");
 
 namespace aos {
 namespace logger {
@@ -86,7 +89,44 @@
       polling_period_(polling_period) {
   for (const Channel *channel : *event_loop_->configuration()->channels()) {
     FetcherStruct fs;
-    fs.fetcher = event_loop->MakeRawFetcher(channel);
+    const bool is_readable =
+        configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
+    const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
+                                 channel, event_loop_->node()) &&
+                             is_readable;
+
+    const bool log_delivery_times =
+        (event_loop_->node() == nullptr)
+            ? false
+            : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+                  channel, event_loop_->node(), event_loop_->node());
+
+    if (log_message || log_delivery_times) {
+      fs.fetcher = event_loop->MakeRawFetcher(channel);
+      VLOG(1) << "Logging channel "
+              << configuration::CleanedChannelToString(channel);
+
+      if (log_delivery_times) {
+        if (log_message) {
+          VLOG(1) << "  Logging message and delivery times";
+          fs.log_type = LogType::kLogMessageAndDeliveryTime;
+        } else {
+          VLOG(1) << "  Logging delivery times only";
+          fs.log_type = LogType::kLogDeliveryTimeOnly;
+        }
+      } else {
+        // We don't have a particularly great use case right now for logging a
+        // forwarded message, but either not logging the delivery times, or
+        // logging them on another node.  Fail rather than produce bad results.
+        CHECK(configuration::ChannelIsSendableOnNode(channel,
+                                                     event_loop_->node()))
+            << ": Logger only knows how to log remote messages with "
+               "forwarding timestamps.";
+        VLOG(1) << "  Logging message only";
+        fs.log_type = LogType::kLogMessage;
+      }
+    }
+
     fs.written = false;
     fetchers_.emplace_back(std::move(fs));
   }
@@ -99,7 +139,9 @@
     // so we can capture the latest message on each channel.  This lets us have
     // non periodic messages with configuration that now get logged.
     for (FetcherStruct &f : fetchers_) {
-      f.written = !f.fetcher->Fetch();
+      if (f.fetcher.get() != nullptr) {
+        f.written = !f.fetcher->Fetch();
+      }
     }
 
     // We need to pick a point in time to declare the log file "started".  This
@@ -122,10 +164,16 @@
       flatbuffers::Offset<flatbuffers::String> string_offset =
           fbb.CreateString(network::GetHostname());
 
+      flatbuffers::Offset<Node> node_offset =
+          CopyFlatBuffer(event_loop_->node(), &fbb);
+      LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node());
+
       aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
 
       log_file_header_builder.add_name(string_offset);
 
+      log_file_header_builder.add_node(node_offset);
+
       log_file_header_builder.add_configuration(configuration_offset);
       // The worst case theoretical out of order is the polling period times 2.
       // One message could get logged right after the boundary, but be for right
@@ -157,20 +205,46 @@
 
 flatbuffers::Offset<MessageHeader> PackMessage(
     flatbuffers::FlatBufferBuilder *fbb, const Context &context,
-    int channel_index) {
-  flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
-      fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
+    int channel_index, LogType log_type) {
+  flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
+
+  switch(log_type) {
+    case LogType::kLogMessage:
+    case LogType::kLogMessageAndDeliveryTime:
+      data_offset =
+          fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
+      break;
+
+    case LogType::kLogDeliveryTimeOnly:
+      break;
+  }
 
   MessageHeader::Builder message_header_builder(*fbb);
   message_header_builder.add_channel_index(channel_index);
+  message_header_builder.add_queue_index(context.queue_index);
   message_header_builder.add_monotonic_sent_time(
       context.monotonic_event_time.time_since_epoch().count());
   message_header_builder.add_realtime_sent_time(
       context.realtime_event_time.time_since_epoch().count());
 
-  message_header_builder.add_queue_index(context.queue_index);
+  switch (log_type) {
+    case LogType::kLogMessage:
+      message_header_builder.add_data(data_offset);
+      break;
 
-  message_header_builder.add_data(data_offset);
+    case LogType::kLogMessageAndDeliveryTime:
+      message_header_builder.add_data(data_offset);
+      [[fallthrough]];
+
+    case LogType::kLogDeliveryTimeOnly:
+      message_header_builder.add_monotonic_remote_time(
+          context.monotonic_remote_time.time_since_epoch().count());
+      message_header_builder.add_realtime_remote_time(
+          context.realtime_remote_time.time_since_epoch().count());
+      message_header_builder.add_remote_queue_index(context.remote_queue_index);
+      break;
+  }
+
   return message_header_builder.Finish();
 }
 
@@ -188,51 +262,46 @@
     size_t channel_index = 0;
     // Write each channel to disk, one at a time.
     for (FetcherStruct &f : fetchers_) {
-      while (true) {
-        if (f.fetcher.get() == nullptr) {
-          if (!f.fetcher->FetchNext()) {
-            VLOG(1) << "No new data on "
-                    << FlatbufferToJson(f.fetcher->channel());
-            break;
-          } else {
-            f.written = false;
+      // Skip any channels which we aren't supposed to log.
+      if (f.fetcher.get() != nullptr) {
+        while (true) {
+          if (f.written) {
+            if (!f.fetcher->FetchNext()) {
+              VLOG(2) << "No new data on "
+                      << configuration::CleanedChannelToString(
+                             f.fetcher->channel());
+              break;
+            } else {
+              f.written = false;
+            }
           }
-        }
 
-        if (f.written) {
-          if (!f.fetcher->FetchNext()) {
-            VLOG(1) << "No new data on "
-                    << FlatbufferToJson(f.fetcher->channel());
-            break;
+          CHECK(!f.written);
+
+          // TODO(james): Write tests to exercise this logic.
+          if (f.fetcher->context().monotonic_event_time <
+              last_synchronized_time_) {
+            // Write!
+            flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+                                               max_header_size_);
+            fbb.ForceDefaults(1);
+
+            fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+                                               channel_index, f.log_type));
+
+            VLOG(2) << "Writing data for channel "
+                    << configuration::CleanedChannelToString(
+                           f.fetcher->channel());
+
+            max_header_size_ = std::max(
+                max_header_size_, fbb.GetSize() - f.fetcher->context().size);
+            writer_->QueueSizedFlatbuffer(&fbb);
+
+            f.written = true;
           } else {
-            f.written = false;
+            break;
           }
         }
-
-        CHECK(!f.written);
-
-        // TODO(james): Write tests to exercise this logic.
-        if (f.fetcher->context().monotonic_event_time <
-            last_synchronized_time_) {
-          // Write!
-          flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
-                                             max_header_size_);
-          fbb.ForceDefaults(1);
-
-          fbb.FinishSizePrefixed(
-              PackMessage(&fbb, f.fetcher->context(), channel_index));
-
-          VLOG(1) << "Writing data for channel "
-                  << FlatbufferToJson(f.fetcher->channel());
-
-          max_header_size_ = std::max(
-              max_header_size_, fbb.GetSize() - f.fetcher->context().size);
-          writer_->QueueSizedFlatbuffer(&fbb);
-
-          f.written = true;
-        } else {
-          break;
-        }
       }
 
       ++channel_index;
@@ -373,11 +442,20 @@
   queue_data_time_ = newest_timestamp_ - max_out_of_order_duration_;
 }
 
-const Configuration *LogReader::configuration() {
+const Configuration *LogReader::configuration() const {
   return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
       ->configuration();
 }
 
+const Node *LogReader::node() const {
+  return configuration::GetNode(
+      configuration(),
+      flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+          ->node()
+          ->name()
+          ->string_view());
+}
+
 monotonic_clock::time_point LogReader::monotonic_start_time() {
   return monotonic_clock::time_point(std::chrono::nanoseconds(
       flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
@@ -434,20 +512,32 @@
 
     FlatbufferVector<MessageHeader> front = std::move(channel.front());
 
-    CHECK(front.message().data() != nullptr);
+    if (oldest_channel_index.first > monotonic_start_time() ||
+        event_loop_factory_ != nullptr) {
+      if (!FLAGS_skip_missing_forwarding_entries ||
+          front.message().data() != nullptr) {
+        CHECK(front.message().data() != nullptr)
+            << ": Got a message without data.  Forwarding entry which was not "
+               "matched?  Use --skip_missing_forwarding_entries to ignore "
+               "this.";
 
-    if (oldest_channel_index.first > monotonic_start_time()) {
-      // If we have access to the factory, use it to fix the realtime time.
-      if (event_loop_factory_ != nullptr) {
-        event_loop_factory_->SetRealtimeOffset(
+        // If we have access to the factory, use it to fix the realtime time.
+        if (event_loop_factory_ != nullptr) {
+          event_loop_factory_->SetRealtimeOffset(
+              monotonic_clock::time_point(
+                  chrono::nanoseconds(front.message().monotonic_sent_time())),
+              realtime_clock::time_point(
+                  chrono::nanoseconds(front.message().realtime_sent_time())));
+        }
+
+        channel.raw_sender->Send(
+            front.message().data()->Data(), front.message().data()->size(),
             monotonic_clock::time_point(
-                chrono::nanoseconds(front.message().monotonic_sent_time())),
+                chrono::nanoseconds(front.message().monotonic_remote_time())),
             realtime_clock::time_point(
-                chrono::nanoseconds(front.message().realtime_sent_time())));
+                chrono::nanoseconds(front.message().realtime_remote_time())),
+            front.message().remote_queue_index());
       }
-
-      channel.raw_sender->Send(front.message().data()->Data(),
-                               front.message().data()->size());
     } else {
       LOG(WARNING) << "Not sending data from before the start of the log file. "
                    << oldest_channel_index.first.time_since_epoch().count()
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index 3e89ff3..f1af17e 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -31,7 +31,8 @@
   // Name of the device which this log file is for.
   name:string;
 
-  // TODO(austin): Node!
+  // The current node, if known and running in a multi-node configuration.
+  node:Node;
 }
 
 // Table holding a message.
@@ -39,18 +40,28 @@
   // Index into the channel datastructure in the log file header.  This
   // provides the data type.
   channel_index:uint;
-  // Time this message was sent on the monotonic clock in nanoseconds.
+  // Time this message was sent on the monotonic clock in nanoseconds on this
+  // node.
   monotonic_sent_time:long;
-  // Time this message was sent on the realtime clock in nanoseconds.
+  // Time this message was sent on the realtime clock in nanoseconds on this
+  // node.
   realtime_sent_time:long;
   // Index into the ipc queue of this message.  This should start with 0 and
   // always monotonically increment if no messages were ever lost.  It will
   // wrap at a multiple of the queue size.
   queue_index:uint;
-  // TODO(austin): Node.
 
   // TODO(austin): Format?  Compressed?
 
   // The nested flatbuffer.
   data:[ubyte];
+
+  // Time this message was sent on the monotonic clock of the remote node in
+  // nanoseconds.
+  monotonic_remote_time:long = -9223372036854775808;
+  // Time this message was sent on the realtime clock of the remote node in
+  // nanoseconds.
+  realtime_remote_time:long = -9223372036854775808;
+  // Queue index of this message on the remote node.
+  remote_queue_index:uint = 4294967295;
 }
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 497dabb..1e9a218 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -46,10 +46,17 @@
   std::vector<struct iovec> iovec_;
 };
 
-// Packes a message pointed to by the context into a MessageHeader.
-flatbuffers::Offset<MessageHeader> PackMessage(
-    flatbuffers::FlatBufferBuilder *fbb, const Context &context,
-    int channel_index);
+enum class LogType : uint8_t {
+  // The message originated on this node and should be logged here.
+  kLogMessage,
+  // The message originated on another node, but only the delivery times are
+  // logged here.
+  kLogDeliveryTimeOnly,
+  // The message originated on another node. Log it and the delivery times
+  // together.  The message_gateway is responsible for logging any messages
+  // which didn't get delivered.
+  kLogMessageAndDeliveryTime
+};
 
 // Logs all channels available in the event loop to disk every 100 ms.
 // Start by logging one message per channel to capture any state and
@@ -73,6 +80,8 @@
   struct FetcherStruct {
     std::unique_ptr<RawFetcher> fetcher;
     bool written = false;
+
+    LogType log_type;
   };
 
   std::vector<FetcherStruct> fetchers_;
@@ -89,6 +98,11 @@
   size_t max_header_size_ = 0;
 };
 
+// Packes a message pointed to by the context into a MessageHeader.
+flatbuffers::Offset<MessageHeader> PackMessage(
+    flatbuffers::FlatBufferBuilder *fbb, const Context &context,
+    int channel_index, LogType log_type);
+
 // Replays all the channels in the logfile to the event loop.
 class LogReader {
  public:
@@ -106,7 +120,10 @@
   // TODO(austin): Remap channels?
 
   // Returns the configuration from the log file.
-  const Configuration *configuration();
+  const Configuration *configuration() const;
+
+  // Returns the node that this log file was created on.
+  const Node *node() const;
 
   // Returns the starting timestamp for the log file.
   monotonic_clock::time_point monotonic_start_time();
@@ -125,8 +142,8 @@
   // will have to read more data from disk.
   bool MessageAvailable();
 
-  // Returns a span with the data for a message from the log file, excluding the
-  // size.
+  // Returns a span with the data for a message from the log file, excluding
+  // the size.
   absl::Span<const uint8_t> ReadMessage();
 
   // Queues at least max_out_of_order_duration_ messages into channels_.
@@ -144,16 +161,16 @@
   // buffer, then into sender), but none of it is all that expensive.  We can
   // optimize if it is slow later.
   //
-  // As we place the elements in the sorted list of times, keep doing this until
-  // we read a message that is newer than the threshold.
+  // As we place the elements in the sorted list of times, keep doing this
+  // until we read a message that is newer than the threshold.
   //
   // Then repeat.  Keep filling up the sorted list with 256 KB chunks (need a
   // small state machine so we can resume), and keep pulling messages back out
   // and sending.
   //
-  // For sorting, we want to use the fact that each channel is sorted, and then
-  // merge sort the channels.  Have a vector of deques, and then hold a sorted
-  // list of pointers to those.
+  // For sorting, we want to use the fact that each channel is sorted, and
+  // then merge sort the channels.  Have a vector of deques, and then hold a
+  // sorted list of pointers to those.
   //
   // TODO(austin): Multithreaded read at some point.  Gotta go faster!
   // Especially if we start compressing.
@@ -183,8 +200,8 @@
     }
   };
 
-  // Minimum amount of data to queue up for sorting before we are guarenteed to
-  // not see data out of order.
+  // Minimum amount of data to queue up for sorting before we are guarenteed
+  // to not see data out of order.
   std::chrono::nanoseconds max_out_of_order_duration_;
 
   // File descriptor for the log file.
@@ -195,8 +212,8 @@
   EventLoop *event_loop_ = nullptr;
   TimerHandler *timer_handler_;
 
-  // Vector to read into.  This uses an allocator which doesn't zero initialize
-  // the memory.
+  // Vector to read into.  This uses an allocator which doesn't zero
+  // initialize the memory.
   std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
 
   // Amount of data consumed already in data_.
@@ -223,8 +240,8 @@
   // timestamp.
   std::pair<monotonic_clock::time_point, int> PopOldestChannel();
 
-  // Datastructure to hold the list of messages, cached timestamp for the oldest
-  // message, and sender to send with.
+  // Datastructure to hold the list of messages, cached timestamp for the
+  // oldest message, and sender to send with.
   struct ChannelData {
     monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
     std::deque<FlatbufferVector<MessageHeader>> data;
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 9aae936..02a9a14 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -65,7 +65,8 @@
 
   SimulatedEventLoopFactory log_reader_factory(reader.configuration());
 
-  // This sends out the fetched messages and advances time to the start of the log file.
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
   reader.Register(&log_reader_factory);
 
   std::unique_ptr<EventLoop> test_event_loop =
@@ -136,6 +137,123 @@
   }
 }
 
+class MultinodeLoggerTest : public ::testing::Test {
+ public:
+  MultinodeLoggerTest()
+      : config_(aos::configuration::ReadConfig(
+            "aos/events/multinode_pingpong_config.json")),
+        event_loop_factory_(&config_.message(), "pi1"),
+        ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
+        ping_(ping_event_loop_.get()) {}
+
+  // Config and factory.
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+  SimulatedEventLoopFactory event_loop_factory_;
+
+  // Event loop and app for Ping
+  std::unique_ptr<EventLoop> ping_event_loop_;
+  Ping ping_;
+};
+
+// Tests that we can startup at all in a multinode configuration.
+TEST_F(MultinodeLoggerTest, MultiNode) {
+  constexpr chrono::seconds kTimeOffset = chrono::seconds(10000);
+  constexpr uint32_t kQueueIndexOffset = 1024;
+  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string logfile = tmpdir + "/multi_logfile.bfbs";
+  // Remove it.
+  unlink(logfile.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile;
+
+  {
+    std::unique_ptr<EventLoop> pong_event_loop =
+        event_loop_factory_.MakeEventLoop("pong");
+
+    std::unique_ptr<aos::RawSender> pong_sender(
+        pong_event_loop->MakeRawSender(aos::configuration::GetChannel(
+            pong_event_loop->configuration(), "/test", "aos.examples.Pong",
+            pong_event_loop->name(), pong_event_loop->node())));
+
+    // Ok, let's fake a remote node.  We use the fancy raw sender Send
+    // method that message_gateway will use to do that.
+    int pong_count = 0;
+    pong_event_loop->MakeWatcher(
+        "/test", [&pong_event_loop, &pong_count, &pong_sender,
+                  kTimeOffset](const examples::Ping &ping) {
+          flatbuffers::FlatBufferBuilder fbb;
+          examples::Pong::Builder pong_builder(fbb);
+          pong_builder.add_value(ping.value());
+          pong_builder.add_initial_send_time(ping.send_time());
+          fbb.Finish(pong_builder.Finish());
+
+          pong_sender->Send(fbb.GetBufferPointer(), fbb.GetSize(),
+                            pong_event_loop->monotonic_now() + kTimeOffset,
+                            pong_event_loop->realtime_now() + kTimeOffset,
+                            kQueueIndexOffset + pong_count);
+          ++pong_count;
+        });
+
+    DetachedBufferWriter writer(logfile);
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger");
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    Logger logger(&writer, logger_event_loop.get(),
+                  std::chrono::milliseconds(100));
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+
+  LogReader reader(logfile);
+
+  // TODO(austin): Also replay as pi2 or pi3 and make sure we see the pong
+  // messages.  This won't work today yet until the log reading code gets
+  // significantly better.
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration(),
+                                               reader.node());
+  log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  std::unique_ptr<EventLoop> test_event_loop =
+      log_reader_factory.MakeEventLoop("test");
+
+  int ping_count = 10;
+  int pong_count = 10;
+
+  // Confirm that the ping value matches.
+  test_event_loop->MakeWatcher("/test",
+                               [&ping_count](const examples::Ping &ping) {
+                                 EXPECT_EQ(ping.value(), ping_count + 1);
+                                 ++ping_count;
+                               });
+  // Confirm that the ping and pong counts both match, and the value also
+  // matches.
+  test_event_loop->MakeWatcher(
+      "/test", [&test_event_loop, &ping_count, &pong_count,
+                kTimeOffset](const examples::Pong &pong) {
+        EXPECT_EQ(test_event_loop->context().remote_queue_index,
+                  pong_count + kQueueIndexOffset);
+        EXPECT_EQ(test_event_loop->context().monotonic_remote_time,
+                  test_event_loop->monotonic_now() + kTimeOffset);
+        EXPECT_EQ(test_event_loop->context().realtime_remote_time,
+                  test_event_loop->realtime_now() + kTimeOffset);
+
+        EXPECT_EQ(pong.value(), pong_count + 1);
+        ++pong_count;
+        EXPECT_EQ(ping_count, pong_count);
+      });
+
+  log_reader_factory.RunFor(std::chrono::seconds(100));
+  EXPECT_EQ(ping_count, 2010);
+  EXPECT_EQ(pong_count, 2010);
+
+  reader.Deregister();
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
new file mode 100644
index 0000000..f0e532e
--- /dev/null
+++ b/aos/events/multinode_pingpong.json
@@ -0,0 +1,161 @@
+{
+  "channels": [
+    {
+      "name": "/aos/pi1",
+      "type": "aos.timing.Report",
+      "source_node": "pi1",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.timing.Report",
+      "source_node": "pi2",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/aos/pi3",
+      "type": "aos.timing.Report",
+      "source_node": "pi3",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/test",
+      "type": "aos.examples.Ping",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "priority": 1,
+          "timestamp_logger": "REMOTE_LOGGER",
+          "timestamp_logger_node": "pi1"
+        }
+      ]
+    },
+    {
+      "name": "/test",
+      "type": "aos.examples.Pong",
+      "source_node": "pi2",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1,
+          "timestamp_logger": "REMOTE_LOGGER",
+          "timestamp_logger_node": "pi1"
+        }
+      ]
+    },
+    {
+      "name": "/test2",
+      "type": "aos.examples.Ping",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi3",
+          "priority": 1,
+          "timestamp_logger": "REMOTE_LOGGER",
+          "timestamp_logger_node": "pi1"
+        }
+      ]
+    },
+    {
+      "name": "/test2",
+      "type": "aos.examples.Pong",
+      "source_node": "pi3",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1,
+          "timestamp_logger": "REMOTE_LOGGER",
+          "timestamp_logger_node": "pi1"
+        }
+      ]
+    }
+  ],
+  "maps": [
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.timing.Report",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/aos/pi1"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.timing.Report",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/aos/pi2"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.timing.Report",
+        "source_node": "pi3"
+      },
+      "rename": {
+        "name": "/aos/pi3"
+      }
+    }
+  ],
+  "nodes": [
+    {
+      "name": "pi1",
+      "hostname": "raspberrypi",
+      "port": 9971
+    },
+    {
+      "name": "pi2",
+      "hostname": "raspberrypi2",
+      "port": 9971
+    },
+    {
+      "name": "pi3",
+      "hostname": "raspberrypi3",
+      "port": 9971
+    }
+  ],
+  "applications": [
+    {
+      "name": "ping2",
+      "maps": [
+        {
+          "match": {
+            "name": "/test"
+          },
+          "rename": {
+            "name": "/test2"
+          }
+        }
+      ]
+    },
+    {
+      "name": "pong2",
+      "maps": [
+        {
+          "match": {
+            "name": "/test"
+          },
+          "rename": {
+            "name": "/test2"
+          }
+        }
+      ]
+    }
+  ]
+}
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index a8e227e..cde538a 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -155,7 +155,11 @@
 ShmEventLoop::ShmEventLoop(const Configuration *configuration)
     : EventLoop(configuration),
       name_(Filename(program_invocation_name)),
-      node_(MaybeMyNode(configuration)) {}
+      node_(MaybeMyNode(configuration)) {
+  if (configuration->has_nodes()) {
+    CHECK(node_ != nullptr) << ": Couldn't find node in config.";
+  }
+}
 
 namespace internal {
 
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index a2740e4..af0c02b 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -748,6 +748,22 @@
                           << "\" in the configuration.";
 }
 
+SimulatedEventLoopFactory::SimulatedEventLoopFactory(
+    const Configuration *configuration, const Node *node)
+    : configuration_(CHECK_NOTNULL(configuration)), node_(node) {
+  CHECK(configuration_->has_nodes())
+      << ": Got a configuration with no nodes and node \""
+      << node->name()->string_view() << "\" was selected.";
+  bool found = false;
+  for (const Node *node : *configuration_->nodes()) {
+    if (node == node_) {
+      found = true;
+      break;
+    }
+  }
+  CHECK(found) << ": node must be a pointer in the configuration.";
+}
+
 SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
 
 ::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index fee3ef0..db0b840 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -54,6 +54,8 @@
   SimulatedEventLoopFactory(const Configuration *configuration);
   SimulatedEventLoopFactory(const Configuration *configuration,
                             std::string_view node_name);
+  SimulatedEventLoopFactory(const Configuration *configuration,
+                            const Node *node);
   ~SimulatedEventLoopFactory();
 
   ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);