Handle logs with only reliable messages from a boot

We have logs with only a reliable message from the previous boot, and
both reliable and unreliable messages from the subsequent boot.  These
are still sortable, though the logic needed to be expanded.

There are still small holes here with enough random combinations and
boots, but let's tackle those when they happen for real.

Change-Id: Ie9eabf2662216c8fd30ea764e6f78039e7eba60e
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 3da0733..a4de942 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -417,6 +417,21 @@
 )
 
 aos_config(
+    name = "multinode_pingpong_split4_reliable_config",
+    src = "multinode_pingpong_split4_reliable.json",
+    flatbuffers = [
+        "//aos/events:ping_fbs",
+        "//aos/events:pong_fbs",
+        "//aos/network:message_bridge_client_fbs",
+        "//aos/network:message_bridge_server_fbs",
+        "//aos/network:remote_message_fbs",
+        "//aos/network:timestamp_fbs",
+    ],
+    target_compatible_with = ["@platforms//os:linux"],
+    deps = ["//aos/events:aos_config"],
+)
+
+aos_config(
     name = "multinode_pingpong_combined_config",
     src = "multinode_pingpong_combined.json",
     flatbuffers = [
@@ -443,6 +458,7 @@
         ":multinode_pingpong_combined_config",
         ":multinode_pingpong_split3_config",
         ":multinode_pingpong_split4_config",
+        ":multinode_pingpong_split4_reliable_config",
         ":multinode_pingpong_split_config",
         "//aos/events:pingpong_config",
     ],
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 9921063..e0c378e 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -1017,6 +1017,9 @@
         const std::string remote_node_name =
             config->nodes()->Get(remote_node.first)->name()->str();
 
+        VLOG(1) << "Local " << local_node_name << " boot " << local_boot_uuid
+                << " remote " << remote_node_name;
+
         // Now, we have a bunch of remote boots for the same local boot and
         // remote node.  We want to sort them by observed local time.  This will
         // tell us which ones happened first.  Hold on to the max time on that
@@ -1030,6 +1033,7 @@
           BootPairTimes max_boot_time = boot_time_list.second[0];
           for (size_t i = 0; i < boot_time_list.second.size(); ++i) {
             const BootPairTimes &next_boot_time = boot_time_list.second[i];
+            VLOG(1) << " Found " << next_boot_time;
             if (next_boot_time.oldest_local_unreliable_monotonic_timestamp !=
                 aos::monotonic_clock::max_time) {
               VLOG(1)
@@ -1093,7 +1097,12 @@
 
           // Skip anything without a time in it.
           if (boot_time.oldest_remote_unreliable_monotonic_timestamp ==
-              aos::monotonic_clock::max_time) {
+                  aos::monotonic_clock::max_time &&
+              boot_time.oldest_remote_reliable_monotonic_timestamp ==
+                  aos::monotonic_clock::max_time) {
+            VLOG(1) << " Skipping local " << local_node_name << " boot "
+                    << local_boot_uuid << " remote " << remote_node_name
+                    << " boot " << boot_time_list.first << " " << boot_time;
             continue;
           }
 
@@ -1134,8 +1143,7 @@
           }
 
           auto reverse_local_node_it =
-              reverse_remote_node_boot_uuid_it->second.find(
-                  local_node_name);
+              reverse_remote_node_boot_uuid_it->second.find(local_node_name);
           if (reverse_local_node_it ==
               reverse_remote_node_boot_uuid_it->second.end()) {
             reverse_local_node_it =
@@ -1150,14 +1158,155 @@
           CHECK(reverse_local_node_boot_uuid_it ==
                 reverse_local_node_it->second.end());
           reverse_local_node_it->second.emplace(local_boot_uuid, boot_time);
+          VLOG(1) << " Boot time for local " << local_node_name << " boot "
+                  << local_boot_uuid << " remote " << remote_node_name
+                  << " boot " << boot_time_list.first << " " << boot_time;
         }
+
+        // All the nodes are from the same local boot here.  We are trying to
+        // order the remote boots.  Let's list out the combinatorics.
+        //  - We can have reliable and/or unreliable timestamps, but not neither
+        //    for both times we are comparing.
+        //  - Reliable timestamps can match or not match (ie, same message got
+        //    forwarded to 2 boots)
+        //
+        //  1) Both have unreliable, various reliable.  1 < 2.
+        //     Unreliable timestamps tell us more
+        //      {
+        //       .oldest_remote_reliable_monotonic_timestamp=10.122999611sec,
+        //       .oldest_local_reliable_monotonic_timestamp=9.400951024sec,
+        //       .oldest_remote_unreliable_monotonic_timestamp=23431.315344025sec,
+        //       .oldest_local_unreliable_monotonic_timestamp=23413.172709284sec
+        //      }
+        //      {
+        //       .oldest_remote_reliable_monotonic_timestamp=11.798054208sec,
+        //       .oldest_local_reliable_monotonic_timestamp=23457.772660691sec,
+        //       .oldest_remote_unreliable_monotonic_timestamp=12.315344025sec,
+        //       .oldest_local_unreliable_monotonic_timestamp=23458.772660691sec
+        //      }
+        // 2) Only reliable, or only one unreliable, local times don't match.  1
+        // < 2
+        //      {
+        //       .oldest_remote_reliable_monotonic_timestamp=10.122999611sec,
+        //       .oldest_local_reliable_monotonic_timestamp=9.400951024sec,
+        //       .oldest_remote_unreliable_monotonic_timestamp=9223372036.854775807sec,
+        //       .oldest_local_unreliable_monotonic_timestamp=9223372036.854775807sec
+        //      }
+        //      {
+        //       .oldest_remote_reliable_monotonic_timestamp=11.798054208sec,
+        //       .oldest_local_reliable_monotonic_timestamp=23457.772660691sec,
+        //       .oldest_remote_unreliable_monotonic_timestamp=9223372036.854775807sec,
+        //       .oldest_local_unreliable_monotonic_timestamp=9223372036.854775807sec
+        //      }
+        //  3) Only reliable, local times match.  Unable to compare because the
+        //     same message got sent, and with reliable timestamps, we don't
+        //     know how long it took to cross the network.
+        //      {
+        //       .oldest_remote_reliable_monotonic_timestamp=10.122999611sec,
+        //       .oldest_local_reliable_monotonic_timestamp=9.400951024sec,
+        //       .oldest_remote_unreliable_monotonic_timestamp=9223372036.854775807sec,
+        //       .oldest_local_unreliable_monotonic_timestamp=9223372036.854775807sec
+        //      }
+        //      {
+        //       .oldest_remote_reliable_monotonic_timestamp=11.798054208sec,
+        //       .oldest_local_reliable_monotonic_timestamp=9.400951024sec,
+        //       .oldest_remote_unreliable_monotonic_timestamp=9223372036.854775807sec,
+        //       .oldest_local_unreliable_monotonic_timestamp=9223372036.854775807sec
+        //      }
+        //
+        //  Writing all this out for which timestamps we have out of all 32
+        //  combinations, and which cases each of the correspond to:
+        //
+        //  {  }, {  } no match -> fail, won't be in the list
+        //  {  }, { u} no match -> fail, won't be in the list
+        //  {  }, {r } no match -> fail, won't be in the list
+        //  {  }, {ru} no match -> fail, won't be in the list
+        //  { u}, {  } no match -> fail, won't be in the list
+        //  { u}, { u} no match -> 1
+        //  { u}, {r } no match -> fail
+        //  { u}, {ru} no match -> 1
+        //  {r }, {  } no match -> fail, won't be in the list
+        //  {r }, { u} no match -> fail
+        //  {r }, {r } no match -> 2
+        //  {r }, {ru} no match -> 2
+        //  {ru}, {  } no match -> fail, won't be in the list
+        //  {ru}, { u} no match -> 1
+        //  {ru}, {r } no match -> 2
+        //  {ru}, {ru} no match -> 1
+        //  {  }, {  } match -> fail, won't be in the list
+        //  {  }, { u} match -> fail, won't be in the list
+        //  {  }, {r } match -> fail, won't be in the list
+        //  {  }, {ru} match -> fail, won't be in the list
+        //  { u}, {  } match -> fail, won't be in the list
+        //  { u}, { u} match -> 1
+        //  { u}, {r } match -> fail
+        //  { u}, {ru} match -> 1
+        //  {r }, {  } match -> fail, won't be in the list
+        //  {r }, { u} match -> fail
+        //  {r }, {r } match -> fail (case 3)
+        //  {r }, {ru} match -> fail (case 3)
+        //  {ru}, {  } match -> fail, won't be in the list
+        //  {ru}, { u} match -> 1
+        //  {ru}, {r } match -> fail (case 3)
+        //  {ru}, {ru} match -> 1
+        //
+        //  Combined, we get:
+        //
+        //  { u}, { u} no match -> 1
+        //  { u}, {ru} no match -> 1
+        //  {ru}, { u} no match -> 1
+        //  {ru}, {ru} no match -> 1
+        //  { u}, { u} match -> 1
+        //  { u}, {ru} match -> 1
+        //  {ru}, { u} match -> 1
+        //  {ru}, {ru} match -> 1
+        //
+        //  {r }, {r } no match -> 2
+        //  {r }, {ru} no match -> 2
+        //  {ru}, {r } no match -> 2
+        //
+        //  { u}, {r } no match -> fail
+        //  { u}, {r } match -> fail
+        //  {r }, { u} no match -> fail
+        //  {r }, { u} match -> fail
+        //
+        //  {r }, {r } match -> fail (case 3)
+        //  {r }, {ru} match -> fail (case 3)
+        //  {ru}, {r } match -> fail (case 3)
+
         std::sort(
             remote_boot_times.begin(), remote_boot_times.end(),
             [](const std::tuple<std::string, BootPairTimes, BootPairTimes> &a,
                const std::tuple<std::string, BootPairTimes, BootPairTimes> &b) {
-              return std::get<1>(a)
-                         .oldest_local_unreliable_monotonic_timestamp <
-                     std::get<1>(b).oldest_local_unreliable_monotonic_timestamp;
+              const bool both_reliable =
+                  std::get<1>(a).oldest_local_reliable_monotonic_timestamp !=
+                      aos::monotonic_clock::max_time &&
+                  std::get<1>(b).oldest_local_reliable_monotonic_timestamp !=
+                      aos::monotonic_clock::max_time;
+              const bool both_unreliable =
+                  std::get<1>(a).oldest_local_unreliable_monotonic_timestamp !=
+                      aos::monotonic_clock::max_time &&
+                  std::get<1>(b).oldest_local_unreliable_monotonic_timestamp !=
+                      aos::monotonic_clock::max_time;
+
+              if (both_unreliable) {
+                return std::get<1>(a)
+                           .oldest_local_unreliable_monotonic_timestamp <
+                       std::get<1>(b)
+                           .oldest_local_unreliable_monotonic_timestamp;
+              } else if (both_reliable) {
+                CHECK_NE(
+                    std::get<1>(a).oldest_local_reliable_monotonic_timestamp,
+                    std::get<1>(b).oldest_local_reliable_monotonic_timestamp)
+                    << ": The same reliable message has been forwarded to both "
+                       "boots.  This is ambiguous, please investigate.";
+                return std::get<1>(a)
+                           .oldest_local_reliable_monotonic_timestamp <
+                       std::get<1>(b).oldest_local_reliable_monotonic_timestamp;
+              } else {
+                LOG(FATAL) << "Unable to compare timestamps " << std::get<1>(a)
+                           << ", " << std::get<1>(b);
+              }
             });
 
         // The last time from the local node on the logger node.
@@ -1172,6 +1321,8 @@
           const std::tuple<std::string, BootPairTimes, BootPairTimes>
               &boot_time = remote_boot_times[boot_id];
           const std::string &local_boot_uuid = std::get<0>(boot_time);
+          VLOG(1) << " Boot " << local_boot_uuid << " is "
+                  << std::get<1>(boot_time);
 
           // Enforce that the last time observed in the headers on the previous
           // boot is less than the first time on the next boot.  This equates to
@@ -1204,14 +1355,12 @@
           if (remote_node_boot_constraints_it == boot_constraints.end()) {
             remote_node_boot_constraints_it =
                 boot_constraints
-                    .insert(
-                        std::make_pair(remote_node_name, NodeBootState()))
+                    .insert(std::make_pair(remote_node_name, NodeBootState()))
                     .first;
           }
 
           // Track that this boot happened.
-          remote_node_boot_constraints_it->second.boots.insert(
-              local_boot_uuid);
+          remote_node_boot_constraints_it->second.boots.insert(local_boot_uuid);
 
           if (boot_id > 0) {
             // And now add the constraints.  The vector is in order, so all we
@@ -1248,6 +1397,10 @@
                       .first;
             }
 
+            VLOG(1) << "Inserting " << first_per_boot_constraints->first
+                    << " < " << local_boot_uuid;
+            VLOG(1) << "Inserting " << second_per_boot_constraints->first
+                    << " > " << prior_boot_uuid;
             first_per_boot_constraints->second.emplace_back(
                 std::make_pair(local_boot_uuid, true));
             second_per_boot_constraints->second.emplace_back(
@@ -1261,17 +1414,23 @@
   // Now, sort the reverse direction so we can handle when we only get
   // timestamps and need to make sense of the boots.
   for (const auto &remote_node : reverse_boot_times) {
+    const std::string &remote_node_name = remote_node.first;
     for (const auto &remote_node_boot_uuid : remote_node.second) {
       for (const auto &local_node : remote_node_boot_uuid.second) {
         // Now, we need to take all the boots + times and put them in a list to
-        // sort and derive constraints from.
+        // sort and derive constraints from.  This is very similar to the
+        // forward direction.
 
-        std::vector<std::pair<std::string, BootPairTimes>>
-            local_boot_times;
+        std::vector<std::pair<std::string, BootPairTimes>> local_boot_times;
         for (const auto &local_boot_uuid : local_node.second) {
-          CHECK_NE(local_boot_uuid.second
-                       .oldest_remote_unreliable_monotonic_timestamp,
-                   monotonic_clock::max_time);
+          // TODO(austin): If we only have reliable timestamps going the other
+          // way, we need to update this logic (and the sorting logic below) to
+          // use them.  This should be quite rare, so I feel safe delaying it.
+          if (local_boot_uuid.second
+                  .oldest_remote_unreliable_monotonic_timestamp ==
+              monotonic_clock::max_time) {
+            continue;
+          }
           local_boot_times.emplace_back(local_boot_uuid);
         }
 
@@ -1283,8 +1442,11 @@
                      b.second.oldest_remote_unreliable_monotonic_timestamp;
             });
 
-        for (size_t boot_id = 0; boot_id < local_boot_times.size();
-             ++boot_id) {
+        VLOG(1) << "Reverse sort from " << remote_node_name << " boot "
+                << remote_node_boot_uuid.first << " to " << local_node.first;
+        for (size_t boot_id = 0; boot_id < local_boot_times.size(); ++boot_id) {
+          VLOG(1) << "Sorted local times: " << local_boot_times[boot_id].first
+                  << " time " << local_boot_times[boot_id].second;
           const std::pair<std::string, BootPairTimes> &boot_time =
               local_boot_times[boot_id];
           const std::string &local_boot_uuid = boot_time.first;
@@ -1299,8 +1461,7 @@
           }
 
           // Track that this boot happened.
-          local_node_boot_constraints_it->second.boots.insert(
-              local_boot_uuid);
+          local_node_boot_constraints_it->second.boots.insert(local_boot_uuid);
           if (boot_id > 0) {
             const std::pair<std::string, BootPairTimes> &prior_boot_time =
                 local_boot_times[boot_id - 1];
@@ -1333,6 +1494,10 @@
                       .first;
             }
 
+            VLOG(1) << "Inserting " << first_per_boot_constraints->first
+                    << " < " << local_boot_uuid;
+            VLOG(1) << "Inserting " << second_per_boot_constraints->first
+                    << " > " << prior_boot_uuid;
             first_per_boot_constraints->second.emplace_back(
                 std::make_pair(local_boot_uuid, true));
             second_per_boot_constraints->second.emplace_back(
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 3b8d70b..aee2883 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -2966,8 +2966,8 @@
                   .oldest_remote_reliable_monotonic_timestamps()
                   ->size(),
               2u);
-    ASSERT_TRUE(log_header->message()
-                    .has_oldest_local_reliable_monotonic_timestamps());
+    ASSERT_TRUE(
+        log_header->message().has_oldest_local_reliable_monotonic_timestamps());
     ASSERT_EQ(log_header->message()
                   .oldest_local_reliable_monotonic_timestamps()
                   ->size(),
@@ -3090,9 +3090,11 @@
             EXPECT_EQ(oldest_local_monotonic_timestamps,
                       expected_oldest_local_monotonic_timestamps);
             EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
-                      expected_oldest_local_monotonic_timestamps) << file;
+                      expected_oldest_local_monotonic_timestamps)
+                << file;
             EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
-                      expected_oldest_timestamp_monotonic_timestamps) << file;
+                      expected_oldest_timestamp_monotonic_timestamps)
+                << file;
 
             if (reliable) {
               EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
@@ -3153,9 +3155,11 @@
                 oldest_local_monotonic_timestamps,
                 monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
             EXPECT_EQ(oldest_logger_remote_unreliable_monotonic_timestamps,
-                      expected_oldest_local_monotonic_timestamps) << file;
+                      expected_oldest_local_monotonic_timestamps)
+                << file;
             EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
-                      expected_oldest_timestamp_monotonic_timestamps) << file;
+                      expected_oldest_timestamp_monotonic_timestamps)
+                << file;
             if (reliable) {
               EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
                         expected_oldest_remote_monotonic_timestamps);
@@ -3900,18 +3904,24 @@
       filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
       realtime_clock::epoch() + chrono::milliseconds(3000));
 
-  EXPECT_THAT(start_stop_result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
-                                                      chrono::seconds(2)));
-  EXPECT_THAT(start_stop_result[0].second, ::testing::ElementsAre(realtime_clock::epoch() +
-                                                       chrono::seconds(3)));
-  EXPECT_THAT(start_stop_result[1].first, ::testing::ElementsAre(realtime_clock::epoch() +
-                                                      chrono::seconds(2)));
-  EXPECT_THAT(start_stop_result[1].second, ::testing::ElementsAre(realtime_clock::epoch() +
-                                                       chrono::seconds(3)));
-  EXPECT_THAT(start_stop_result[2].first, ::testing::ElementsAre(realtime_clock::epoch() +
-                                                      chrono::seconds(2)));
-  EXPECT_THAT(start_stop_result[2].second, ::testing::ElementsAre(realtime_clock::epoch() +
-                                                       chrono::seconds(3)));
+  EXPECT_THAT(
+      start_stop_result[0].first,
+      ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
+  EXPECT_THAT(
+      start_stop_result[0].second,
+      ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
+  EXPECT_THAT(
+      start_stop_result[1].first,
+      ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
+  EXPECT_THAT(
+      start_stop_result[1].second,
+      ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
+  EXPECT_THAT(
+      start_stop_result[2].first,
+      ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(2)));
+  EXPECT_THAT(
+      start_stop_result[2].second,
+      ::testing::ElementsAre(realtime_clock::epoch() + chrono::seconds(3)));
 }
 
 // Tests that setting the start and stop flags across a reboot works as
@@ -4102,8 +4112,7 @@
     const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
     time_converter.AddNextTimestamp(
         distributed_clock::epoch() + reboot_time,
-        {BootTimestamp{.boot = 1,
-                       .time = monotonic_clock::epoch()},
+        {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
          BootTimestamp::epoch() + reboot_time});
   }
 
@@ -4176,8 +4185,7 @@
     const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
     time_converter.AddNextTimestamp(
         distributed_clock::epoch() + reboot_time,
-        {BootTimestamp{.boot = 1,
-                       .time = monotonic_clock::epoch()},
+        {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
          BootTimestamp::epoch() + reboot_time});
   }
 
@@ -4185,6 +4193,73 @@
       aos::testing::TestTmpDir() + "/multi_logfile2.1/";
   util::UnlinkRecursive(kLogfile2_1);
 
+  pi1->AlwaysStart<Ping>("ping");
+
+  // Pi1 sends to pi2.  Reboot pi1, but don't let pi2 connect to pi1.  This
+  // makes it such that we will only get timestamps from pi1 -> pi2 on the
+  // second boot.
+  {
+    LoggerState pi2_logger = LoggerState::MakeLogger(
+        pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+
+    event_loop_factory.RunFor(chrono::milliseconds(95));
+
+    pi2_logger.StartLogger(kLogfile2_1);
+
+    event_loop_factory.RunFor(chrono::milliseconds(4000));
+
+    pi2->Disconnect(pi1->node());
+
+    event_loop_factory.RunFor(chrono::milliseconds(1000));
+    pi1->AlwaysStart<Ping>("ping");
+
+    event_loop_factory.RunFor(chrono::milliseconds(5000));
+    pi2_logger.AppendAllFilenames(&filenames);
+  }
+
+  const std::vector<LogFile> sorted_parts = SortParts(filenames);
+  ConfirmReadable(filenames);
+}
+
+// Tests that we properly handle only one direction ever existing after a reboot
+// with only reliable data.
+TEST(MissingDirectionTest, OneDirectionAfterRebootReliable) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(ArtifactPath(
+          "aos/events/logging/multinode_pingpong_split4_reliable_config.json"));
+  message_bridge::TestingTimeConverter time_converter(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory event_loop_factory(&config.message());
+  event_loop_factory.SetTimeConverter(&time_converter);
+
+  NodeEventLoopFactory *const pi1 =
+      event_loop_factory.GetNodeEventLoopFactory("pi1");
+  const size_t pi1_index = configuration::GetNodeIndex(
+      event_loop_factory.configuration(), pi1->node());
+  NodeEventLoopFactory *const pi2 =
+      event_loop_factory.GetNodeEventLoopFactory("pi2");
+  const size_t pi2_index = configuration::GetNodeIndex(
+      event_loop_factory.configuration(), pi2->node());
+  std::vector<std::string> filenames;
+
+  {
+    CHECK_EQ(pi1_index, 0u);
+    CHECK_EQ(pi2_index, 1u);
+
+    time_converter.AddNextTimestamp(
+        distributed_clock::epoch(),
+        {BootTimestamp::epoch(), BootTimestamp::epoch()});
+
+    const chrono::nanoseconds reboot_time = chrono::milliseconds(5000);
+    time_converter.AddNextTimestamp(
+        distributed_clock::epoch() + reboot_time,
+        {BootTimestamp{.boot = 1, .time = monotonic_clock::epoch()},
+         BootTimestamp::epoch() + reboot_time});
+  }
+
+  const std::string kLogfile2_1 =
+      aos::testing::TestTmpDir() + "/multi_logfile2.1/";
+  util::UnlinkRecursive(kLogfile2_1);
 
   pi1->AlwaysStart<Ping>("ping");
 
diff --git a/aos/events/logging/multinode_pingpong_split4_reliable.json b/aos/events/logging/multinode_pingpong_split4_reliable.json
new file mode 100644
index 0000000..3de3785
--- /dev/null
+++ b/aos/events/logging/multinode_pingpong_split4_reliable.json
@@ -0,0 +1,175 @@
+{
+  "channels": [
+    {
+      "name": "/pi1/aos",
+      "type": "aos.logging.LogMessageFbs",
+      "source_node": "pi1",
+      "frequency": 200,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/pi2/aos",
+      "type": "aos.logging.LogMessageFbs",
+      "source_node": "pi2",
+      "frequency": 200,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    /* Logged on pi1 locally */
+    {
+      "name": "/pi1/aos",
+      "type": "aos.timing.Report",
+      "source_node": "pi1",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/pi2/aos",
+      "type": "aos.timing.Report",
+      "source_node": "pi2",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/pi1/aos",
+      "type": "aos.message_bridge.ServerStatistics",
+      "logger": "LOCAL_LOGGER",
+      "source_node": "pi1"
+    },
+    {
+      "name": "/pi2/aos",
+      "type": "aos.message_bridge.ServerStatistics",
+      "logger": "LOCAL_LOGGER",
+      "source_node": "pi2"
+    },
+    {
+      "name": "/pi1/aos",
+      "type": "aos.message_bridge.ClientStatistics",
+      "logger": "LOCAL_LOGGER",
+      "source_node": "pi1"
+    },
+    {
+      "name": "/pi2/aos",
+      "type": "aos.message_bridge.ClientStatistics",
+      "logger": "LOCAL_LOGGER",
+      "source_node": "pi2"
+    },
+    {
+      "name": "/pi1/aos",
+      "type": "aos.message_bridge.Timestamp",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": ["pi2"],
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2"
+        }
+      ]
+    },
+    {
+      "name": "/pi2/aos",
+      "type": "aos.message_bridge.Timestamp",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": ["pi1"],
+      "source_node": "pi2",
+      "destination_nodes": [
+        {
+          "name": "pi1"
+        }
+      ]
+    },
+    {
+      "name": "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
+      "type": "aos.message_bridge.RemoteMessage",
+      "logger": "NOT_LOGGED",
+      "num_senders": 2,
+      "source_node": "pi1"
+    },
+    {
+      "name": "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
+      "type": "aos.message_bridge.RemoteMessage",
+      "logger": "NOT_LOGGED",
+      "num_senders": 2,
+      "source_node": "pi1",
+      "frequency": 150
+    },
+    {
+      "name": "/pi2/aos/remote_timestamps/pi1/test/aos-examples-Pong",
+      "type": "aos.message_bridge.RemoteMessage",
+      "logger": "NOT_LOGGED",
+      "num_senders": 2,
+      "source_node": "pi2",
+      "frequency": 150
+    },
+    /* Forwarded to pi2 */
+    {
+      "name": "/test",
+      "type": "aos.examples.Ping",
+      "source_node": "pi1",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": ["pi2"],
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "priority": 1
+        }
+      ],
+      "frequency": 150
+    },
+    /* Forwarded back to pi1.
+     * The message is logged both on the sending node and the receiving node
+     * (to make it easier to look at the results for now).
+     *
+     * The timestamps are logged on the receiving node.
+     */
+    {
+      "name": "/test",
+      "type": "aos.examples.Pong",
+      "source_node": "pi2",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": ["pi1"],
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1
+        }
+      ],
+      "frequency": 150
+    }
+  ],
+  "maps": [
+    {
+      "match": {
+        "name": "/aos*",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/pi1/aos"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos*",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/pi2/aos"
+      }
+    }
+  ],
+  "nodes": [
+    {
+      "name": "pi1",
+      "hostname": "raspberrypi",
+      "port": 9971
+    },
+    {
+      "name": "pi2",
+      "hostname": "raspberrypi2",
+      "port": 9971
+    }
+  ]
+}