Add multi-node local logging to the logger

This is not yet able to forward messages, but is able to log messages
that have been forwarded.  Create a log file and test that the
timestamps are getting recorded correctly.

Change-Id: Ica891dbc560543546f6ee594438cebb03672190e
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 9aae936..02a9a14 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -65,7 +65,8 @@
 
   SimulatedEventLoopFactory log_reader_factory(reader.configuration());
 
-  // This sends out the fetched messages and advances time to the start of the log file.
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
   reader.Register(&log_reader_factory);
 
   std::unique_ptr<EventLoop> test_event_loop =
@@ -136,6 +137,123 @@
   }
 }
 
+class MultinodeLoggerTest : public ::testing::Test {
+ public:
+  MultinodeLoggerTest()
+      : config_(aos::configuration::ReadConfig(
+            "aos/events/multinode_pingpong_config.json")),
+        event_loop_factory_(&config_.message(), "pi1"),
+        ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
+        ping_(ping_event_loop_.get()) {}
+
+  // Config and factory.
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+  SimulatedEventLoopFactory event_loop_factory_;
+
+  // Event loop and app for Ping
+  std::unique_ptr<EventLoop> ping_event_loop_;
+  Ping ping_;
+};
+
+// Tests that we can startup at all in a multinode configuration.
+TEST_F(MultinodeLoggerTest, MultiNode) {
+  constexpr chrono::seconds kTimeOffset = chrono::seconds(10000);
+  constexpr uint32_t kQueueIndexOffset = 1024;
+  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string logfile = tmpdir + "/multi_logfile.bfbs";
+  // Remove it.
+  unlink(logfile.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile;
+
+  {
+    std::unique_ptr<EventLoop> pong_event_loop =
+        event_loop_factory_.MakeEventLoop("pong");
+
+    std::unique_ptr<aos::RawSender> pong_sender(
+        pong_event_loop->MakeRawSender(aos::configuration::GetChannel(
+            pong_event_loop->configuration(), "/test", "aos.examples.Pong",
+            pong_event_loop->name(), pong_event_loop->node())));
+
+    // Ok, let's fake a remote node.  We use the fancy raw sender Send
+    // method that message_gateway will use to do that.
+    int pong_count = 0;
+    pong_event_loop->MakeWatcher(
+        "/test", [&pong_event_loop, &pong_count, &pong_sender,
+                  kTimeOffset](const examples::Ping &ping) {
+          flatbuffers::FlatBufferBuilder fbb;
+          examples::Pong::Builder pong_builder(fbb);
+          pong_builder.add_value(ping.value());
+          pong_builder.add_initial_send_time(ping.send_time());
+          fbb.Finish(pong_builder.Finish());
+
+          pong_sender->Send(fbb.GetBufferPointer(), fbb.GetSize(),
+                            pong_event_loop->monotonic_now() + kTimeOffset,
+                            pong_event_loop->realtime_now() + kTimeOffset,
+                            kQueueIndexOffset + pong_count);
+          ++pong_count;
+        });
+
+    DetachedBufferWriter writer(logfile);
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger");
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    Logger logger(&writer, logger_event_loop.get(),
+                  std::chrono::milliseconds(100));
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+
+  LogReader reader(logfile);
+
+  // TODO(austin): Also replay as pi2 or pi3 and make sure we see the pong
+  // messages.  This won't work today yet until the log reading code gets
+  // significantly better.
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration(),
+                                               reader.node());
+  log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  std::unique_ptr<EventLoop> test_event_loop =
+      log_reader_factory.MakeEventLoop("test");
+
+  int ping_count = 10;
+  int pong_count = 10;
+
+  // Confirm that the ping value matches.
+  test_event_loop->MakeWatcher("/test",
+                               [&ping_count](const examples::Ping &ping) {
+                                 EXPECT_EQ(ping.value(), ping_count + 1);
+                                 ++ping_count;
+                               });
+  // Confirm that the ping and pong counts both match, and the value also
+  // matches.
+  test_event_loop->MakeWatcher(
+      "/test", [&test_event_loop, &ping_count, &pong_count,
+                kTimeOffset](const examples::Pong &pong) {
+        EXPECT_EQ(test_event_loop->context().remote_queue_index,
+                  pong_count + kQueueIndexOffset);
+        EXPECT_EQ(test_event_loop->context().monotonic_remote_time,
+                  test_event_loop->monotonic_now() + kTimeOffset);
+        EXPECT_EQ(test_event_loop->context().realtime_remote_time,
+                  test_event_loop->realtime_now() + kTimeOffset);
+
+        EXPECT_EQ(pong.value(), pong_count + 1);
+        ++pong_count;
+        EXPECT_EQ(ping_count, pong_count);
+      });
+
+  log_reader_factory.RunFor(std::chrono::seconds(100));
+  EXPECT_EQ(ping_count, 2010);
+  EXPECT_EQ(pong_count, 2010);
+
+  reader.Deregister();
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos