Add tests for max out of order duration.

Add test cases to check max out of order duration doesn't explode when
we're logging messages from a previous boot and when we don't have a
start time because the remote node is disconnected when we start
logging.

Change-Id: Ie0a3e1758503d128d0ef98d85e5b6313fabc2a9e
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index 648f430..48f1b8b 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -4156,6 +4156,224 @@
   // up in both logs correctly.
 }
 
+// Tests that when we have evidence of 2 boots, and then start logging, the
+// max_out_of_order_duration ends up reasonable on the boot with the start time.
+TEST(MultinodeLoggerLoopTest, PreviousBootData) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(ArtifactPath(
+          "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
+  message_bridge::TestingTimeConverter time_converter(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory event_loop_factory(&config.message());
+  event_loop_factory.SetTimeConverter(&time_converter);
+
+  const UUID pi1_boot0 = UUID::Random();
+  const UUID pi2_boot0 = UUID::Random();
+  const UUID pi2_boot1 = UUID::Random();
+
+  const std::string kLogfile1_1 =
+      aos::testing::TestTmpDir() + "/multi_logfile1/";
+  util::UnlinkRecursive(kLogfile1_1);
+
+  {
+    constexpr size_t kPi1Index = 0;
+    constexpr size_t kPi2Index = 1;
+    time_converter.set_boot_uuid(kPi1Index, 0, pi1_boot0);
+    time_converter.set_boot_uuid(kPi2Index, 0, pi2_boot0);
+    time_converter.set_boot_uuid(kPi2Index, 1, pi2_boot1);
+
+    // Make pi1 boot before everything else.
+    time_converter.AddNextTimestamp(
+        distributed_clock::epoch(),
+        {BootTimestamp::epoch(),
+         BootTimestamp::epoch() - chrono::milliseconds(100)});
+
+    const chrono::nanoseconds reboot_time = chrono::seconds(1005);
+    time_converter.AddNextTimestamp(
+        distributed_clock::epoch() + reboot_time,
+        {BootTimestamp::epoch() + reboot_time,
+         BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()}});
+  }
+
+  NodeEventLoopFactory *const pi1 =
+      event_loop_factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *const pi2 =
+      event_loop_factory.GetNodeEventLoopFactory("pi2");
+
+  // What we want is for pi2 to send a message at t=1000 on the first channel
+  // (/atest1 pong), and t=1 on the second channel (/atest3 pong).  That'll make
+  // the max out of order duration be large.
+  //
+  // Then, we reboot, and only send messages on a third channel (/atest2 pong).
+  // The order is key, they need to sort in this order in the config.
+
+  std::vector<std::string> filenames;
+  {
+    {
+      std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
+      aos::Sender<examples::Pong> pong_sender =
+          pi2_event_loop->MakeSender<examples::Pong>("/atest3");
+
+      pi2_event_loop->OnRun([&]() {
+        aos::Sender<examples::Pong>::Builder builder =
+            pong_sender.MakeBuilder();
+        examples::Pong::Builder pong_builder =
+            builder.MakeBuilder<examples::Pong>();
+        CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
+      });
+
+      event_loop_factory.RunFor(chrono::seconds(1000));
+    }
+
+    {
+      std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
+      aos::Sender<examples::Pong> pong_sender =
+          pi2_event_loop->MakeSender<examples::Pong>("/atest1");
+
+      aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
+      examples::Pong::Builder pong_builder =
+          builder.MakeBuilder<examples::Pong>();
+      CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
+    }
+
+    event_loop_factory.RunFor(chrono::seconds(10));
+
+    // Now start a receiving node first.  This sets up 2 tight bounds between
+    // 2 of the nodes.
+    LoggerState pi1_logger = MakeLoggerState(
+        pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+    pi1_logger.StartLogger(kLogfile1_1);
+
+    std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
+    aos::Sender<examples::Pong> pong_sender =
+        pi2_event_loop->MakeSender<examples::Pong>("/atest2");
+
+    pi2_event_loop->AddPhasedLoop(
+        [&pong_sender](int) {
+          aos::Sender<examples::Pong>::Builder builder =
+              pong_sender.MakeBuilder();
+          examples::Pong::Builder pong_builder =
+              builder.MakeBuilder<examples::Pong>();
+          CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
+        },
+        chrono::milliseconds(10));
+
+    event_loop_factory.RunFor(chrono::seconds(100));
+
+    pi1_logger.AppendAllFilenames(&filenames);
+  }
+
+  // Make sure we can read this.
+  const std::vector<LogFile> sorted_parts = SortParts(filenames);
+  EXPECT_TRUE(AllRebootPartsMatchOutOfOrderDuration(sorted_parts, "pi2"));
+  auto result = ConfirmReadable(filenames);
+}
+
+// Tests that when we start without a connection, and then start logging, the
+// max_out_of_order_duration ends up reasonable.
+TEST(MultinodeLoggerLoopTest, StartDisconnected) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(ArtifactPath(
+          "aos/events/logging/multinode_pingpong_reboot_ooo_config.json"));
+  message_bridge::TestingTimeConverter time_converter(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory event_loop_factory(&config.message());
+  event_loop_factory.SetTimeConverter(&time_converter);
+
+  time_converter.StartEqual();
+
+  const std::string kLogfile1_1 =
+      aos::testing::TestTmpDir() + "/multi_logfile1/";
+  util::UnlinkRecursive(kLogfile1_1);
+
+  NodeEventLoopFactory *const pi1 =
+      event_loop_factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *const pi2 =
+      event_loop_factory.GetNodeEventLoopFactory("pi2");
+
+  // What we want is for pi2 to send a message at t=1000 on the first channel
+  // (/atest1 pong), and t=1 on the second channel (/atest3 pong).  That'll make
+  // the max out of order duration be large.
+  //
+  // Then, we disconnect, and only send messages on a third channel
+  // (/atest2 pong). The order is key, they need to sort in this order in the
+  // config so we observe them in the order which grows the
+  // max_out_of_order_duration.
+
+  std::vector<std::string> filenames;
+  {
+    {
+      std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
+      aos::Sender<examples::Pong> pong_sender =
+          pi2_event_loop->MakeSender<examples::Pong>("/atest3");
+
+      pi2_event_loop->OnRun([&]() {
+        aos::Sender<examples::Pong>::Builder builder =
+            pong_sender.MakeBuilder();
+        examples::Pong::Builder pong_builder =
+            builder.MakeBuilder<examples::Pong>();
+        CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
+      });
+
+      event_loop_factory.RunFor(chrono::seconds(1000));
+    }
+
+    {
+      std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
+      aos::Sender<examples::Pong> pong_sender =
+          pi2_event_loop->MakeSender<examples::Pong>("/atest1");
+
+      aos::Sender<examples::Pong>::Builder builder = pong_sender.MakeBuilder();
+      examples::Pong::Builder pong_builder =
+          builder.MakeBuilder<examples::Pong>();
+      CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
+    }
+
+    event_loop_factory.RunFor(chrono::seconds(10));
+
+    pi1->Disconnect(pi2->node());
+    pi2->Disconnect(pi1->node());
+
+    // Make data flow.
+    std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
+    aos::Sender<examples::Pong> pong_sender =
+        pi2_event_loop->MakeSender<examples::Pong>("/atest2");
+
+    pi2_event_loop->AddPhasedLoop(
+        [&pong_sender](int) {
+          aos::Sender<examples::Pong>::Builder builder =
+              pong_sender.MakeBuilder();
+          examples::Pong::Builder pong_builder =
+              builder.MakeBuilder<examples::Pong>();
+          CHECK_EQ(builder.Send(pong_builder.Finish()), RawSender::Error::kOk);
+        },
+        chrono::milliseconds(10));
+
+    event_loop_factory.RunFor(chrono::seconds(10));
+
+    // Now start a receiving node first.  This sets up 2 tight bounds between
+    // 2 of the nodes.
+    LoggerState pi1_logger = MakeLoggerState(
+        pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+    pi1_logger.StartLogger(kLogfile1_1);
+
+    event_loop_factory.RunFor(chrono::seconds(10));
+
+    // Now, reconnect, and everything should recover.
+    pi1->Connect(pi2->node());
+    pi2->Connect(pi1->node());
+
+    event_loop_factory.RunFor(chrono::seconds(10));
+
+    pi1_logger.AppendAllFilenames(&filenames);
+  }
+
+  // Make sure we can read this.
+  const std::vector<LogFile> sorted_parts = SortParts(filenames);
+  EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+  auto result = ConfirmReadable(filenames);
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos