Allow simulated nodes to startup after global startup

This lets us not boot nodes until their monotonic clock has reached
zero.

This also changes the semantics of OnStartup slightly--even if all the
nodes startup at the start of the simulation, they will each complete
their own startup sequence before going to the next node. This doesn't
appear to have had any negative consequences (and is similar
to if the nodes had tiny monotonic clock offsets that forced several of
the nodes to start late), but is a change.

Change-Id: I25d343b9509a3cdae6db9747f60a212f1cb21187
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index a46cc1c..2d59c5d 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -139,72 +139,6 @@
   aos::FlatbufferDetachedBuffer<aos::Configuration> config;
 };
 
-class FunctionEvent : public EventScheduler::Event {
- public:
-  FunctionEvent(std::function<void()> fn) : fn_(fn) {}
-
-  void Handle() noexcept override { fn_(); }
-
- private:
-  std::function<void()> fn_;
-};
-
-// Test that creating an event and running the scheduler runs the event.
-TEST(EventSchedulerTest, ScheduleEvent) {
-  int counter = 0;
-  EventSchedulerScheduler scheduler_scheduler;
-  EventScheduler scheduler(0);
-  scheduler_scheduler.AddEventScheduler(&scheduler);
-
-  FunctionEvent e([&counter]() { counter += 1; });
-  scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
-  scheduler_scheduler.Run();
-  EXPECT_EQ(counter, 1);
-  auto token =
-      scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2), &e);
-  scheduler.Deschedule(token);
-  scheduler_scheduler.Run();
-  EXPECT_EQ(counter, 1);
-}
-
-// Test that descheduling an already scheduled event doesn't run the event.
-TEST(EventSchedulerTest, DescheduleEvent) {
-  int counter = 0;
-  EventSchedulerScheduler scheduler_scheduler;
-  EventScheduler scheduler(0);
-  scheduler_scheduler.AddEventScheduler(&scheduler);
-
-  FunctionEvent e([&counter]() { counter += 1; });
-  auto token =
-      scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
-  scheduler.Deschedule(token);
-  scheduler_scheduler.Run();
-  EXPECT_EQ(counter, 0);
-}
-
-// Test that TemporarilyStopAndRun respects and preserves running.
-TEST(EventSchedulerTest, TemporarilyStopAndRun) {
-  int counter = 0;
-  EventSchedulerScheduler scheduler_scheduler;
-  EventScheduler scheduler(0);
-  scheduler_scheduler.AddEventScheduler(&scheduler);
-
-  scheduler_scheduler.TemporarilyStopAndRun(
-      [&]() { CHECK(!scheduler_scheduler.is_running()); });
-  ASSERT_FALSE(scheduler_scheduler.is_running());
-
-  FunctionEvent e([&]() {
-    counter += 1;
-    CHECK(scheduler_scheduler.is_running());
-    scheduler_scheduler.TemporarilyStopAndRun(
-        [&]() { CHECK(!scheduler_scheduler.is_running()); });
-    CHECK(scheduler_scheduler.is_running());
-  });
-  scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
-  scheduler_scheduler.Run();
-  EXPECT_EQ(counter, 1);
-}
-
 // Test that sending a message after running gets properly notified.
 TEST(SimulatedEventLoopTest, SendAfterRunFor) {
   SimulatedEventLoopTestFactory factory;
@@ -1347,9 +1281,34 @@
       pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
                                     "/pi3/aos");
 
+  std::vector<std::unique_ptr<aos::EventLoop>> statistics_watcher_loops;
+  statistics_watcher_loops.emplace_back(pi1->MakeEventLoop("test"));
+  statistics_watcher_loops.emplace_back(pi2->MakeEventLoop("test"));
+  statistics_watcher_loops.emplace_back(pi3->MakeEventLoop("test"));
+  // The currenct contract is that, if all nodes boot simultaneously in
+  // simulation, that they should all act as if they area already connected,
+  // without ever observing the transition from disconnected to connected (note
+  // that on a real system the ServerStatistics message will get resent for each
+  // and every new connection, even if the new connections happen
+  // "simultaneously"--in simulation, we are essentially acting as if we are
+  // starting execution in an already running system, rather than observing the
+  // boot process).
+  for (auto &event_loop : statistics_watcher_loops) {
+    event_loop->MakeWatcher(
+        "/aos", [](const message_bridge::ServerStatistics &msg) {
+          for (const message_bridge::ServerConnection *connection :
+               *msg.connections()) {
+            EXPECT_EQ(message_bridge::State::CONNECTED, connection->state())
+                << connection->node()->name()->string_view();
+          }
+        });
+  }
+
   simulated_event_loop_factory.RunFor(chrono::seconds(2) +
                                       chrono::milliseconds(5));
 
+  statistics_watcher_loops.clear();
+
   EXPECT_EQ(pi1_pong_counter.count(), 201u);
   EXPECT_EQ(pi2_pong_counter.count(), 201u);
 
@@ -1642,8 +1601,13 @@
 
   std::unique_ptr<EventLoop> pi2_pong_event_loop =
       simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+  aos::Sender<examples::Ping> pi2_reliable_sender =
+      pi2_pong_event_loop->MakeSender<examples::Ping>("/reliable2");
+  SendPing(&pi2_reliable_sender, 1);
   MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
                                                       "/reliable");
+  MessageCounter<examples::Ping> pi1_reliable_counter(ping_event_loop.get(),
+                                                      "/reliable2");
   MessageCounter<examples::Ping> pi2_unreliable_counter(
       pi2_pong_event_loop.get(), "/unreliable");
   aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
@@ -1699,6 +1663,7 @@
   SendPing(&pi1_unreliable_sender, 2);
   simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
   EXPECT_EQ(pi2_reliable_counter.count(), 2u);
+  EXPECT_EQ(pi1_reliable_counter.count(), 1u);
   EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
 
   EXPECT_EQ(reliable_timestamp_count, 2u);
@@ -2184,6 +2149,71 @@
   EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
 }
 
+TEST(SimulatedEventLoopTest, ReliableMessageSentOnStaggeredBoot) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  time.AddNextTimestamp(
+      distributed_clock::epoch(),
+      {BootTimestamp{0, monotonic_clock::epoch()},
+       BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)},
+       BootTimestamp{0, monotonic_clock::epoch()}});
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+  const UUID pi1_boot_uuid = pi1->boot_uuid();
+  const UUID pi2_boot_uuid = pi2->boot_uuid();
+  EXPECT_NE(pi1_boot_uuid, UUID::Zero());
+  EXPECT_NE(pi2_boot_uuid, UUID::Zero());
+
+  {
+    ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
+    aos::Sender<examples::Ping> pi1_sender =
+        pi1_event_loop->MakeSender<examples::Ping>("/reliable");
+    SendPing(&pi1_sender, 1);
+  }
+  ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("ping");
+  aos::Sender<examples::Ping> pi2_sender =
+      pi2_event_loop->MakeSender<examples::Ping>("/reliable2");
+  SendPing(&pi2_sender, 1);
+  // Verify that we staggered the OnRun callback correctly.
+  pi2_event_loop->OnRun([pi1, pi2]() {
+    EXPECT_EQ(pi1->monotonic_now(),
+              monotonic_clock::epoch() + std::chrono::seconds(1));
+    EXPECT_EQ(pi2->monotonic_now(), monotonic_clock::epoch());
+  });
+
+  factory.RunFor(chrono::seconds(2));
+
+  {
+    ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
+    aos::Fetcher<examples::Ping> fetcher =
+        pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
+    ASSERT_TRUE(fetcher.Fetch());
+    EXPECT_EQ(fetcher.context().monotonic_event_time,
+              monotonic_clock::epoch() + factory.network_delay());
+    EXPECT_EQ(fetcher.context().monotonic_remote_time,
+              monotonic_clock::epoch());
+  }
+  {
+    ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
+    aos::Fetcher<examples::Ping> fetcher =
+        pi1_event_loop->MakeFetcher<examples::Ping>("/reliable2");
+    ASSERT_TRUE(fetcher.Fetch());
+    EXPECT_EQ(fetcher.context().monotonic_event_time,
+              monotonic_clock::epoch() + std::chrono::seconds(1) +
+                  factory.network_delay());
+    EXPECT_EQ(fetcher.context().monotonic_remote_time,
+              monotonic_clock::epoch() - std::chrono::seconds(1));
+  }
+}
+
 class SimulatedEventLoopDisconnectTest : public ::testing::Test {
  public:
   SimulatedEventLoopDisconnectTest()