Support nodes rebooting

We have log files which span a reboot.  We want to be able to replay the
timeline across that reboot so we can run simulations and everything
else interesting.

This requires a bunch of stuff, unfortunately.

The most visible one is that we need to be able to "reboot" a node.
This means we need a way of starting it up and stopping it.  There are
now OnStartup and OnShutdown handlers in NodeEventLoopFactory to serve
this purpose, and better application context tracking to make it easier
to start and stop applications through a virtual starter.

This requires LogReader and the simulated network bridge to be
refactored to support nodes coming and going while the main application
continues to run.

From there, everything else is just a massive amount of plumbing of the
BootTimestamp through everything just short of the user.  Boot UUIDs
were put in TimeConverter so everything related to rebooting is all
nicely together.

Change-Id: I2cfb659c5764c1dd80dc66f33cfab3937159e324
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 83568da..43c0c97 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -22,6 +22,7 @@
 
 using aos::testing::ArtifactPath;
 
+using logger::BootTimestamp;
 using message_bridge::RemoteMessage;
 namespace chrono = ::std::chrono;
 
@@ -140,7 +141,7 @@
 TEST(EventSchedulerTest, ScheduleEvent) {
   int counter = 0;
   EventSchedulerScheduler scheduler_scheduler;
-  EventScheduler scheduler;
+  EventScheduler scheduler(0);
   scheduler_scheduler.AddEventScheduler(&scheduler);
 
   scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
@@ -158,7 +159,7 @@
 TEST(EventSchedulerTest, DescheduleEvent) {
   int counter = 0;
   EventSchedulerScheduler scheduler_scheduler;
-  EventScheduler scheduler;
+  EventScheduler scheduler(0);
   scheduler_scheduler.AddEventScheduler(&scheduler);
 
   auto token = scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
@@ -778,15 +779,13 @@
   message_bridge::TestingTimeConverter time(
       configuration::NodesCount(&config.message()));
   SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
-  NodeEventLoopFactory *pi2_factory =
-      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
-  pi2_factory->SetTimeConverter(&time);
+  simulated_event_loop_factory.SetTimeConverter(&time);
 
   constexpr chrono::milliseconds kOffset{1501};
   time.AddNextTimestamp(
       distributed_clock::epoch(),
-      {logger::BootTimestamp::epoch(), logger::BootTimestamp::epoch() + kOffset,
-       logger::BootTimestamp::epoch()});
+      {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
+       BootTimestamp::epoch()});
 
   std::unique_ptr<EventLoop> ping_event_loop =
       simulated_event_loop_factory.MakeEventLoop("ping", pi1);
@@ -871,7 +870,7 @@
                                       chrono::milliseconds(5));
 
   EXPECT_EQ(pi1_server_statistics_count, 10);
-  EXPECT_EQ(pi2_server_statistics_count, 9);
+  EXPECT_EQ(pi2_server_statistics_count, 10);
   EXPECT_EQ(pi3_server_statistics_count, 10);
 }
 
@@ -1034,31 +1033,32 @@
 
 // Test that disconnecting nodes actually disconnects them.
 TEST_P(RemoteMessageSimulatedEventLoopTest, MultinodeDisconnect) {
-  const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
-  const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
-  const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
-
   SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
 
-  std::unique_ptr<EventLoop> ping_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+  NodeEventLoopFactory *pi1 =
+      simulated_event_loop_factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 =
+      simulated_event_loop_factory.GetNodeEventLoopFactory("pi2");
+  NodeEventLoopFactory *pi3 =
+      simulated_event_loop_factory.GetNodeEventLoopFactory("pi3");
+
+  std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
   Ping ping(ping_event_loop.get());
 
-  std::unique_ptr<EventLoop> pong_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+  std::unique_ptr<EventLoop> pong_event_loop = pi2->MakeEventLoop("pong");
   Pong pong(pong_event_loop.get());
 
   std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+      pi2->MakeEventLoop("pi2_pong_counter");
 
   MessageCounter<examples::Pong> pi2_pong_counter(
       pi2_pong_counter_event_loop.get(), "/test");
 
   std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
+      pi3->MakeEventLoop("pi3_pong_counter");
 
   std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+      pi1->MakeEventLoop("pi1_pong_counter");
 
   MessageCounter<examples::Pong> pi1_pong_counter(
       pi1_pong_counter_event_loop.get(), "/test");
@@ -1088,8 +1088,13 @@
           MakePi1OnPi2MessageCounters(pi2_pong_counter_event_loop.get());
 
   MessageCounter<message_bridge::ServerStatistics>
-      pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
-                                    "/pi1/aos");
+      *pi1_server_statistics_counter;
+  pi1->OnStartup([pi1, &pi1_server_statistics_counter]() {
+    pi1_server_statistics_counter =
+        pi1->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
+            "pi1_server_statistics_counter", "/pi1/aos");
+  });
+
   aos::Fetcher<message_bridge::ServerStatistics> pi1_server_statistics_fetcher =
       pi1_pong_counter_event_loop
           ->MakeFetcher<message_bridge::ServerStatistics>("/pi1/aos");
@@ -1098,8 +1103,12 @@
           ->MakeFetcher<message_bridge::ClientStatistics>("/pi1/aos");
 
   MessageCounter<message_bridge::ServerStatistics>
-      pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
-                                    "/pi2/aos");
+      *pi2_server_statistics_counter;
+  pi2->OnStartup([pi2, &pi2_server_statistics_counter]() {
+    pi2_server_statistics_counter =
+        pi2->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
+            "pi2_server_statistics_counter", "/pi2/aos");
+  });
   aos::Fetcher<message_bridge::ServerStatistics> pi2_server_statistics_fetcher =
       pi2_pong_counter_event_loop
           ->MakeFetcher<message_bridge::ServerStatistics>("/pi2/aos");
@@ -1108,8 +1117,12 @@
           ->MakeFetcher<message_bridge::ClientStatistics>("/pi2/aos");
 
   MessageCounter<message_bridge::ServerStatistics>
-      pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
-                                    "/pi3/aos");
+      *pi3_server_statistics_counter;
+  pi3->OnStartup([pi3, &pi3_server_statistics_counter]() {
+    pi3_server_statistics_counter =
+        pi3->AlwaysStart<MessageCounter<message_bridge::ServerStatistics>>(
+            "pi3_server_statistics_counter", "/pi3/aos");
+  });
   aos::Fetcher<message_bridge::ServerStatistics> pi3_server_statistics_fetcher =
       pi3_pong_counter_event_loop
           ->MakeFetcher<message_bridge::ServerStatistics>("/pi3/aos");
@@ -1141,9 +1154,9 @@
   EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 20u);
   EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 20u);
 
-  EXPECT_EQ(pi1_server_statistics_counter.count(), 2u);
-  EXPECT_EQ(pi2_server_statistics_counter.count(), 2u);
-  EXPECT_EQ(pi3_server_statistics_counter.count(), 2u);
+  EXPECT_EQ(pi1_server_statistics_counter->count(), 2u);
+  EXPECT_EQ(pi2_server_statistics_counter->count(), 2u);
+  EXPECT_EQ(pi3_server_statistics_counter->count(), 2u);
 
   EXPECT_EQ(pi1_client_statistics_counter.count(), 20u);
   EXPECT_EQ(pi2_client_statistics_counter.count(), 20u);
@@ -1172,7 +1185,7 @@
   EXPECT_TRUE(AllConnected(pi3_client_statistics_fetcher.get()))
       << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
 
-  simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Disconnect(pi3);
+  pi1->Disconnect(pi3->node());
 
   simulated_event_loop_factory.RunFor(chrono::seconds(2));
 
@@ -1187,9 +1200,9 @@
   EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 40u);
   EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 40u);
 
-  EXPECT_EQ(pi1_server_statistics_counter.count(), 4u);
-  EXPECT_EQ(pi2_server_statistics_counter.count(), 4u);
-  EXPECT_EQ(pi3_server_statistics_counter.count(), 4u);
+  EXPECT_EQ(pi1_server_statistics_counter->count(), 4u);
+  EXPECT_EQ(pi2_server_statistics_counter->count(), 4u);
+  EXPECT_EQ(pi3_server_statistics_counter->count(), 4u);
 
   EXPECT_EQ(pi1_client_statistics_counter.count(), 40u);
   EXPECT_EQ(pi2_client_statistics_counter.count(), 40u);
@@ -1218,7 +1231,7 @@
   EXPECT_TRUE(AllConnectedBut(pi3_client_statistics_fetcher.get(), "pi1"))
       << " : " << aos::FlatbufferToJson(pi3_client_statistics_fetcher.get());
 
-  simulated_event_loop_factory.GetNodeEventLoopFactory(pi1)->Connect(pi3);
+  pi1->Connect(pi3->node());
 
   simulated_event_loop_factory.RunFor(chrono::seconds(2));
 
@@ -1233,9 +1246,9 @@
   EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 60u);
   EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 60u);
 
-  EXPECT_EQ(pi1_server_statistics_counter.count(), 6u);
-  EXPECT_EQ(pi2_server_statistics_counter.count(), 6u);
-  EXPECT_EQ(pi3_server_statistics_counter.count(), 6u);
+  EXPECT_EQ(pi1_server_statistics_counter->count(), 6u);
+  EXPECT_EQ(pi2_server_statistics_counter->count(), 6u);
+  EXPECT_EQ(pi3_server_statistics_counter->count(), 6u);
 
   EXPECT_EQ(pi1_client_statistics_counter.count(), 60u);
   EXPECT_EQ(pi2_client_statistics_counter.count(), 60u);
@@ -1286,20 +1299,17 @@
   message_bridge::TestingTimeConverter time(
       configuration::NodesCount(&config.message()));
   SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
-  NodeEventLoopFactory *pi2_factory =
-      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
-  pi2_factory->SetTimeConverter(&time);
+  simulated_event_loop_factory.SetTimeConverter(&time);
 
   constexpr chrono::milliseconds kOffset{150100};
   time.AddNextTimestamp(
       distributed_clock::epoch(),
-      {logger::BootTimestamp::epoch(), logger::BootTimestamp::epoch() + kOffset,
-       logger::BootTimestamp::epoch()});
-  time.AddNextTimestamp(
-      distributed_clock::epoch() + chrono::seconds(10),
-      {logger::BootTimestamp::epoch() + chrono::milliseconds(9999),
-       logger::BootTimestamp::epoch() + kOffset + chrono::seconds(10),
-       logger::BootTimestamp::epoch() + chrono::milliseconds(9999)});
+      {BootTimestamp::epoch(), BootTimestamp::epoch() + kOffset,
+       BootTimestamp::epoch()});
+  time.AddNextTimestamp(distributed_clock::epoch() + chrono::seconds(10),
+                        {BootTimestamp::epoch() + chrono::milliseconds(9999),
+                         BootTimestamp::epoch() + kOffset + chrono::seconds(10),
+                         BootTimestamp::epoch() + chrono::milliseconds(9999)});
 
   std::unique_ptr<EventLoop> ping_event_loop =
       simulated_event_loop_factory.MakeEventLoop("ping", pi1);
@@ -1470,23 +1480,51 @@
 // Tests that rebooting a node changes the ServerStatistics message and the
 // RemoteTimestamp message.
 TEST_P(RemoteMessageSimulatedEventLoopTest, BootUUIDTest) {
-  const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
-  const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
 
-  SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
 
-  std::unique_ptr<EventLoop> ping_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("ping", pi1);
-  Ping ping(ping_event_loop.get());
+  const size_t pi1_index =
+      configuration::GetNodeIndex(&config.message(), "pi1");
+  const size_t pi2_index =
+      configuration::GetNodeIndex(&config.message(), "pi2");
+  const size_t pi3_index =
+      configuration::GetNodeIndex(&config.message(), "pi3");
 
-  std::unique_ptr<EventLoop> pong_event_loop =
-      simulated_event_loop_factory.MakeEventLoop("pong", pi2);
-  Pong pong(pong_event_loop.get());
+  const UUID pi1_boot0 = UUID::Random();
+  const UUID pi2_boot0 = UUID::Random();
+  const UUID pi2_boot1 = UUID::Random();
+  const UUID pi3_boot0 = UUID::Random();
+  {
+    time.AddNextTimestamp(distributed_clock::epoch(),
+                          {BootTimestamp::epoch(), BootTimestamp::epoch(),
+                           BootTimestamp::epoch()});
+
+    const chrono::nanoseconds dt = chrono::milliseconds(2001);
+
+    time.AddNextTimestamp(
+        distributed_clock::epoch() + dt,
+        {BootTimestamp::epoch() + dt,
+         BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
+         BootTimestamp::epoch() + dt});
+
+    time.set_boot_uuid(pi1_index, 0, pi1_boot0);
+    time.set_boot_uuid(pi2_index, 0, pi2_boot0);
+    time.set_boot_uuid(pi2_index, 1, pi2_boot1);
+    time.set_boot_uuid(pi3_index, 0, pi3_boot0);
+  }
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+  pi1->OnStartup([pi1]() { pi1->AlwaysStart<Ping>("ping"); });
+  pi2->OnStartup([pi2]() { pi2->AlwaysStart<Pong>("pong"); });
 
   std::unique_ptr<EventLoop> pi1_remote_timestamp =
-      simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
-  UUID expected_boot_uuid =
-      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid();
+      pi1->MakeEventLoop("pi1_remote_timestamp");
+  UUID expected_boot_uuid = pi2_boot0;
 
   int timestamp_count = 0;
   pi1_remote_timestamp->MakeWatcher(
@@ -1512,41 +1550,65 @@
       });
 
   int pi1_server_statistics_count = 0;
+  bool first_pi1_server_statistics = true;
   pi1_remote_timestamp->MakeWatcher(
-      "/pi1/aos", [&pi1_server_statistics_count, &expected_boot_uuid](
+      "/pi1/aos", [&pi1_server_statistics_count, &expected_boot_uuid,
+                   &first_pi1_server_statistics](
                       const message_bridge::ServerStatistics &stats) {
         VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
         for (const message_bridge::ServerConnection *connection :
              *stats.connections()) {
-          EXPECT_TRUE(connection->has_boot_uuid());
+          if (connection->state() == message_bridge::State::CONNECTED) {
+            ASSERT_TRUE(connection->has_boot_uuid());
+          }
+          if (!first_pi1_server_statistics) {
+            EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+          }
           if (connection->node()->name()->string_view() == "pi2") {
+            EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+            ASSERT_TRUE(connection->has_boot_uuid());
             EXPECT_EQ(expected_boot_uuid,
                       UUID::FromString(connection->boot_uuid()))
                 << " : Got " << aos::FlatbufferToJson(&stats);
             ++pi1_server_statistics_count;
           }
         }
+        first_pi1_server_statistics = false;
       });
 
+  int pi1_client_statistics_count = 0;
+  pi1_remote_timestamp->MakeWatcher(
+      "/pi1/aos", [&pi1_client_statistics_count](
+                      const message_bridge::ClientStatistics &stats) {
+        VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+        for (const message_bridge::ClientConnection *connection :
+             *stats.connections()) {
+          EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+          if (connection->node()->name()->string_view() == "pi2") {
+            ++pi1_client_statistics_count;
+          }
+        }
+      });
+
+  // Confirm that reboot changes the UUID.
+  pi2->OnShutdown([&expected_boot_uuid, pi2, pi2_boot1]() {
+    expected_boot_uuid = pi2_boot1;
+    LOG(INFO) << "OnShutdown triggered for pi2";
+    pi2->OnStartup([&expected_boot_uuid, pi2]() {
+      EXPECT_EQ(expected_boot_uuid, pi2->boot_uuid());
+    });
+  });
+
   // Let a couple of ServerStatistics messages show up before rebooting.
-  simulated_event_loop_factory.RunFor(chrono::milliseconds(2001));
+  factory.RunFor(chrono::milliseconds(2002));
 
   EXPECT_GT(timestamp_count, 100);
   EXPECT_GE(pi1_server_statistics_count, 1u);
 
-  // Confirm that reboot changes the UUID.
-  simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->Reboot();
-
-  EXPECT_NE(
-      expected_boot_uuid,
-      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid());
-
-  expected_boot_uuid =
-      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid();
   timestamp_count = 0;
   pi1_server_statistics_count = 0;
 
-  simulated_event_loop_factory.RunFor(chrono::milliseconds(2000));
+  factory.RunFor(chrono::milliseconds(2000));
   EXPECT_GT(timestamp_count, 100);
   EXPECT_GE(pi1_server_statistics_count, 1u);
 }
@@ -1557,5 +1619,321 @@
         Param{"multinode_pingpong_test_combined_config.json", true},
         Param{"multinode_pingpong_test_split_config.json", false}));
 
+// Tests that Startup and Shutdown do reasonable things.
+TEST(SimulatedEventLoopTest, MultinodePingPongStartup) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
+  time.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {BootTimestamp::epoch(), BootTimestamp::epoch(), BootTimestamp::epoch()});
+
+  const chrono::nanoseconds dt = chrono::seconds(10) + chrono::milliseconds(6);
+
+  time.AddNextTimestamp(
+      distributed_clock::epoch() + dt,
+      {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
+       BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
+       BootTimestamp::epoch() + dt});
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+  // Configure startup to start Ping and Pong, and count.
+  size_t pi1_startup_counter = 0;
+  size_t pi2_startup_counter = 0;
+  pi1->OnStartup([pi1]() {
+    LOG(INFO) << "Made ping";
+    pi1->AlwaysStart<Ping>("ping");
+  });
+  pi1->OnStartup([&pi1_startup_counter]() { ++pi1_startup_counter; });
+  pi2->OnStartup([pi2]() {
+    LOG(INFO) << "Made pong";
+    pi2->AlwaysStart<Pong>("pong");
+  });
+  pi2->OnStartup([&pi2_startup_counter]() { ++pi2_startup_counter; });
+
+  // Shutdown just counts.
+  size_t pi1_shutdown_counter = 0;
+  size_t pi2_shutdown_counter = 0;
+  pi1->OnShutdown([&pi1_shutdown_counter]() { ++pi1_shutdown_counter; });
+  pi2->OnShutdown([&pi2_shutdown_counter]() { ++pi2_shutdown_counter; });
+
+  MessageCounter<examples::Pong> *pi1_pong_counter = nullptr;
+  MessageCounter<examples::Ping> *pi2_ping_counter = nullptr;
+
+  // Automatically make counters on startup.
+  pi1->OnStartup([&pi1_pong_counter, pi1]() {
+    pi1_pong_counter = pi1->AlwaysStart<MessageCounter<examples::Pong>>(
+        "pi1_pong_counter", "/test");
+  });
+  pi1->OnShutdown([&pi1_pong_counter]() { pi1_pong_counter = nullptr; });
+  pi2->OnStartup([&pi2_ping_counter, pi2]() {
+    pi2_ping_counter = pi2->AlwaysStart<MessageCounter<examples::Ping>>(
+        "pi2_ping_counter", "/test");
+  });
+  pi2->OnShutdown([&pi2_ping_counter]() { pi2_ping_counter = nullptr; });
+
+  EXPECT_EQ(pi2_ping_counter, nullptr);
+  EXPECT_EQ(pi1_pong_counter, nullptr);
+
+  EXPECT_EQ(pi1_startup_counter, 0u);
+  EXPECT_EQ(pi2_startup_counter, 0u);
+  EXPECT_EQ(pi1_shutdown_counter, 0u);
+  EXPECT_EQ(pi2_shutdown_counter, 0u);
+
+  factory.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
+  EXPECT_EQ(pi1_startup_counter, 1u);
+  EXPECT_EQ(pi2_startup_counter, 1u);
+  EXPECT_EQ(pi1_shutdown_counter, 0u);
+  EXPECT_EQ(pi2_shutdown_counter, 0u);
+  EXPECT_EQ(pi2_ping_counter->count(), 1001);
+  EXPECT_EQ(pi1_pong_counter->count(), 1001);
+
+  LOG(INFO) << pi1->monotonic_now();
+  LOG(INFO) << pi2->monotonic_now();
+
+  factory.RunFor(chrono::seconds(5) + chrono::milliseconds(5));
+
+  EXPECT_EQ(pi1_startup_counter, 2u);
+  EXPECT_EQ(pi2_startup_counter, 2u);
+  EXPECT_EQ(pi1_shutdown_counter, 1u);
+  EXPECT_EQ(pi2_shutdown_counter, 1u);
+  EXPECT_EQ(pi2_ping_counter->count(), 501);
+  EXPECT_EQ(pi1_pong_counter->count(), 501);
+}
+
+// Tests that OnStartup handlers can be added after running and get called, and
+// can't be called when running.
+TEST(SimulatedEventLoopDeathTest, OnStartupWhileRunning) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  // Test that we can add startup handlers as long as we aren't running, and
+  // they get run when Run gets called again.
+  // Test that adding a startup handler when running fails.
+  //
+  // Test shutdown handlers get called on destruction.
+  SimulatedEventLoopFactory factory(&config.message());
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+
+  int startup_count0 = 0;
+  int startup_count1 = 0;
+
+  pi1->OnStartup([&]() { ++startup_count0; });
+  EXPECT_EQ(startup_count0, 0);
+  EXPECT_EQ(startup_count1, 0);
+
+  factory.RunFor(chrono::nanoseconds(1));
+  EXPECT_EQ(startup_count0, 1);
+  EXPECT_EQ(startup_count1, 0);
+
+  pi1->OnStartup([&]() { ++startup_count1; });
+  EXPECT_EQ(startup_count0, 1);
+  EXPECT_EQ(startup_count1, 0);
+
+  factory.RunFor(chrono::nanoseconds(1));
+  EXPECT_EQ(startup_count0, 1);
+  EXPECT_EQ(startup_count1, 1);
+
+  std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
+  loop->OnRun([&]() { pi1->OnStartup([]() {}); });
+
+  EXPECT_DEATH({ factory.RunFor(chrono::nanoseconds(1)); },
+               "Can only register OnStartup handlers when not running.");
+}
+
+// Tests that OnStartup handlers can be added after running and get called, and
+// all the handlers get called on reboot.  Shutdown handlers are tested the same
+// way.
+TEST(SimulatedEventLoopTest, OnStartupShutdownAllRestarts) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
+  time.StartEqual();
+
+  const chrono::nanoseconds dt = chrono::seconds(10);
+  time.RebootAt(0, distributed_clock::epoch() + dt);
+  time.RebootAt(0, distributed_clock::epoch() + 2 * dt);
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+
+  int startup_count0 = 0;
+  int shutdown_count0 = 0;
+  int startup_count1 = 0;
+  int shutdown_count1 = 0;
+
+  pi1->OnStartup([&]() { ++startup_count0; });
+  pi1->OnShutdown([&]() { ++shutdown_count0; });
+  EXPECT_EQ(startup_count0, 0);
+  EXPECT_EQ(startup_count1, 0);
+  EXPECT_EQ(shutdown_count0, 0);
+  EXPECT_EQ(shutdown_count1, 0);
+
+  factory.RunFor(chrono::nanoseconds(1));
+  EXPECT_EQ(startup_count0, 1);
+  EXPECT_EQ(startup_count1, 0);
+  EXPECT_EQ(shutdown_count0, 0);
+  EXPECT_EQ(shutdown_count1, 0);
+
+  pi1->OnStartup([&]() { ++startup_count1; });
+  EXPECT_EQ(startup_count0, 1);
+  EXPECT_EQ(startup_count1, 0);
+  EXPECT_EQ(shutdown_count0, 0);
+  EXPECT_EQ(shutdown_count1, 0);
+
+  factory.RunFor(chrono::nanoseconds(1));
+  EXPECT_EQ(startup_count0, 1);
+  EXPECT_EQ(startup_count1, 1);
+  EXPECT_EQ(shutdown_count0, 0);
+  EXPECT_EQ(shutdown_count1, 0);
+
+  factory.RunFor(chrono::seconds(15));
+
+  EXPECT_EQ(startup_count0, 2);
+  EXPECT_EQ(startup_count1, 2);
+  EXPECT_EQ(shutdown_count0, 1);
+  EXPECT_EQ(shutdown_count1, 0);
+
+  pi1->OnShutdown([&]() { ++shutdown_count1; });
+  factory.RunFor(chrono::seconds(10));
+
+  EXPECT_EQ(startup_count0, 3);
+  EXPECT_EQ(startup_count1, 3);
+  EXPECT_EQ(shutdown_count0, 2);
+  EXPECT_EQ(shutdown_count1, 1);
+}
+
+// Tests that event loops which outlive shutdown crash.
+TEST(SimulatedEventLoopDeathTest, EventLoopOutlivesReboot) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
+  time.StartEqual();
+
+  const chrono::nanoseconds dt = chrono::seconds(10);
+  time.RebootAt(0, distributed_clock::epoch() + dt);
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+
+  std::unique_ptr<EventLoop> loop = pi1->MakeEventLoop("foo");
+
+  EXPECT_DEATH({ factory.RunFor(dt * 2); }, "Event loop");
+}
+
+// Tests that messages don't survive a reboot of a node.
+TEST(SimulatedEventLoopTest, ChannelClearedOnReboot) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
+  time.StartEqual();
+
+  const chrono::nanoseconds dt = chrono::seconds(10);
+  time.RebootAt(0, distributed_clock::epoch() + dt);
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+
+  const UUID boot_uuid = pi1->boot_uuid();
+  EXPECT_NE(boot_uuid, UUID::Zero());
+
+  {
+    ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+    aos::Sender<examples::Ping> test_message_sender =
+        ping_event_loop->MakeSender<examples::Ping>("/reliable");
+    SendPing(&test_message_sender, 1);
+  }
+
+  factory.RunFor(chrono::seconds(5));
+
+  {
+    ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+    aos::Fetcher<examples::Ping> fetcher =
+        ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
+    EXPECT_TRUE(fetcher.Fetch());
+  }
+
+  factory.RunFor(chrono::seconds(10));
+
+  {
+    ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+    aos::Fetcher<examples::Ping> fetcher =
+        ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
+    EXPECT_FALSE(fetcher.Fetch());
+  }
+  EXPECT_NE(boot_uuid, pi1->boot_uuid());
+}
+
+// Tests that reliable messages get resent on reboot.
+TEST(SimulatedEventLoopTest, ReliableMessageResentOnReboot) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
+  time.StartEqual();
+
+  const chrono::nanoseconds dt = chrono::seconds(1);
+  time.RebootAt(1, distributed_clock::epoch() + dt);
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+  const UUID pi1_boot_uuid = pi1->boot_uuid();
+  const UUID pi2_boot_uuid = pi2->boot_uuid();
+  EXPECT_NE(pi1_boot_uuid, UUID::Zero());
+  EXPECT_NE(pi2_boot_uuid, UUID::Zero());
+
+  {
+    ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+    aos::Sender<examples::Ping> test_message_sender =
+        ping_event_loop->MakeSender<examples::Ping>("/reliable");
+    SendPing(&test_message_sender, 1);
+  }
+
+  factory.RunFor(chrono::milliseconds(500));
+
+  {
+    ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
+    aos::Fetcher<examples::Ping> fetcher =
+        ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
+    EXPECT_TRUE(fetcher.Fetch());
+  }
+
+  factory.RunFor(chrono::seconds(1));
+
+  {
+    ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
+    aos::Fetcher<examples::Ping> fetcher =
+        ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
+    EXPECT_TRUE(fetcher.Fetch());
+  }
+  EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
+}
+
 }  // namespace testing
 }  // namespace aos