Make simulated_message_bridge resend reliable messages

Just like message_bridge now sends reliable messages (ttl = 0) sent
before connecting when connecting, simulated_message_bridge now does the
same.  This brings sim up to the same standard as the real world.

The combination of the two of these let us send low rate configuration
messages over message_gateway and guarantee the latest message will
always be delivered.

Change-Id: I7c32ea94f56dfa89e155af807ed284bf47dc4912
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
index 970b7e3..097a31c 100644
--- a/aos/events/multinode_pingpong.json
+++ b/aos/events/multinode_pingpong.json
@@ -203,6 +203,33 @@
           "time_to_live": 5000000
         }
       ]
+    },
+    {
+      "name": "/reliable",
+      "type": "aos.examples.Ping",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "priority": 1,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi1"],
+          "time_to_live": 0
+        }
+      ]
+    },
+    {
+      "name": "/unreliable",
+      "type": "aos.examples.Ping",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "priority": 1,
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
+        }
+      ]
     }
   ],
   "maps": [
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index d7b3c3d..bdf52c1 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -709,7 +709,6 @@
   std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
       simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
 
-
   // Confirm the offsets are being recovered correctly.
   int pi1_server_statistics_count = 0;
   pi1_pong_counter_event_loop->MakeWatcher(
@@ -977,5 +976,78 @@
                                pi2_pong_time);
 }
 
+void SendPing(aos::Sender<examples::Ping> *sender, int value) {
+  aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
+  examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
+  ping_builder.add_value(value);
+  builder.Send(ping_builder.Finish());
+}
+
+// Tests that reliable (and unreliable) ping messages get forwarded as expected.
+TEST(SimulatedEventLoopTest, MultinodeStartupTesting) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(ConfigPrefix() +
+                                     "events/multinode_pingpong_config.json");
+  const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+  const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+
+  SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+
+  std::unique_ptr<EventLoop> ping_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+  aos::Sender<examples::Ping> pi1_reliable_sender =
+      ping_event_loop->MakeSender<examples::Ping>("/reliable");
+  aos::Sender<examples::Ping> pi1_unreliable_sender =
+      ping_event_loop->MakeSender<examples::Ping>("/unreliable");
+  SendPing(&pi1_reliable_sender, 1);
+  SendPing(&pi1_unreliable_sender, 1);
+
+  std::unique_ptr<EventLoop> pi2_pong_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+  MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
+                                                      "/reliable");
+  MessageCounter<examples::Ping> pi2_unreliable_counter(
+      pi2_pong_event_loop.get(), "/unreliable");
+  aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
+      pi2_pong_event_loop->MakeFetcher<examples::Ping>("/reliable");
+  aos::Fetcher<examples::Ping> unreliable_on_pi2_fetcher =
+      pi2_pong_event_loop->MakeFetcher<examples::Ping>("/unreliable");
+
+  const size_t reliable_channel_index = configuration::ChannelIndex(
+      pi2_pong_event_loop->configuration(), reliable_on_pi2_fetcher.channel());
+
+  std::unique_ptr<EventLoop> pi1_remote_timestamp =
+      simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
+
+  int reliable_timestamp_count = 0;
+  pi1_remote_timestamp->MakeWatcher(
+      "/pi1/aos/remote_timestamps/pi2",
+      [reliable_channel_index,
+       &reliable_timestamp_count](const logger::MessageHeader &header) {
+        VLOG(1) << aos::FlatbufferToJson(&header);
+        if (header.channel_index() == reliable_channel_index) {
+          ++reliable_timestamp_count;
+        }
+      });
+
+  // Wait to let timestamp estimation start up before looking for the results.
+  simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
+
+  EXPECT_EQ(pi2_reliable_counter.count(), 1u);
+  // This one isn't reliable, but was sent before the start.  It should *not* be
+  // delivered.
+  EXPECT_EQ(pi2_unreliable_counter.count(), 0u);
+  // Confirm we got a timestamp logged for the message that was forwarded.
+  EXPECT_EQ(reliable_timestamp_count, 1u);
+
+  SendPing(&pi1_reliable_sender, 2);
+  SendPing(&pi1_unreliable_sender, 2);
+  simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
+  EXPECT_EQ(pi2_reliable_counter.count(), 2u);
+  EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
+
+  EXPECT_EQ(reliable_timestamp_count, 2u);
+}
+
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 5660331..b9ff861 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -40,6 +40,12 @@
 
   const Channel *channel() const { return fetcher_->channel(); }
 
+  uint32_t time_to_live() {
+    return configuration::ConnectionToNode(sender_->channel(),
+                                           send_node_factory_->node())
+        ->time_to_live();
+  }
+
   // Kicks us to re-fetch and schedule the timer.
   void Schedule() {
     // Keep pulling messages out of the fetcher until we find one in the future.
@@ -284,6 +290,16 @@
           });
     } else {
       // And register every delayer to be poked when a new message shows up.
+
+      source_event_loop->second.event_loop->OnRun([captured_delayers =
+                                                       delayers.get()]() {
+        // Poke all the reliable delayers so they send any queued messages.
+        for (std::unique_ptr<RawMessageDelayer> &delayer : *captured_delayers) {
+          if (delayer->time_to_live() == 0) {
+            delayer->Schedule();
+          }
+        }
+      });
       source_event_loop->second.event_loop->MakeRawNoArgWatcher(
           channel, [captured_delayers = delayers.get()](const Context &) {
             for (std::unique_ptr<RawMessageDelayer> &delayer :