Turn on exclusive senders in LogReader

This makes it so that we now automatically detect situations where a
user attempts to send messages during replay that are simultaneously
*being* replayed.

Change-Id: I40f30693fe93c94018b6ddbc9f748e655cdf1fe3
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 1ad60c9..09c585b 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -528,6 +528,48 @@
   }
 }
 
+std::vector<
+    std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+LogReader::State::NonExclusiveChannels() {
+  CHECK_NOTNULL(node_event_loop_factory_);
+  const aos::Configuration *config = node_event_loop_factory_->configuration();
+  std::vector<
+      std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+      result{// Timing reports can be sent by logged and replayed applications.
+             {aos::configuration::GetChannel(config, "/aos",
+                                             "aos.timing.Report", "", node_),
+              NodeEventLoopFactory::ExclusiveSenders::kNo},
+             // AOS_LOG may be used in the log and in replay.
+             {aos::configuration::GetChannel(
+                  config, "/aos", "aos.logging.LogMessageFbs", "", node_),
+              NodeEventLoopFactory::ExclusiveSenders::kNo}};
+  for (const Node *const node : configuration::GetNodes(config)) {
+    if (node == nullptr) {
+      break;
+    }
+    const Channel *const old_timestamp_channel = aos::configuration::GetChannel(
+        config,
+        absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
+        "aos.message_bridge.RemoteMessage", "", node_);
+    // The old-style remote timestamp channel can be populated from any
+    // channel, simulated or replayed.
+    if (old_timestamp_channel != nullptr) {
+      result.push_back(std::make_pair(
+          old_timestamp_channel, NodeEventLoopFactory::ExclusiveSenders::kNo));
+    }
+  }
+  // Remove any channels that weren't found due to not existing in the
+  // config.
+  for (size_t ii = 0; ii < result.size();) {
+    if (result[ii].first == nullptr) {
+      result.erase(result.begin() + ii);
+    } else {
+      ++ii;
+    }
+  }
+  return result;
+}
+
 void LogReader::Register() {
   event_loop_factory_unique_ptr_ =
       std::make_unique<SimulatedEventLoopFactory>(configuration());
@@ -647,7 +689,7 @@
 
     // If we are replaying a log, we don't want a bunch of redundant messages
     // from both the real message bridge and simulated message bridge.
-    event_loop_factory_->DisableStatistics();
+    event_loop_factory_->PermanentlyDisableStatistics();
   }
 
   // Write pseudo start times out to file now that we are all setup.
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 157249c..04d2888 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -453,7 +453,8 @@
       // ensure we are remapping channels correctly.
       event_loop_unique_ptr_ = node_event_loop_factory_->MakeEventLoop(
           "log_reader", {NodeEventLoopFactory::CheckSentTooFast::kNo,
-                         NodeEventLoopFactory::ExclusiveSenders::kNo});
+                         NodeEventLoopFactory::ExclusiveSenders::kYes,
+                         NonExclusiveChannels()});
       return event_loop_unique_ptr_.get();
     }
 
@@ -620,6 +621,12 @@
       uint32_t actual_queue_index = 0xffffffff;
     };
 
+    // Returns a list of channels which LogReader will send on but which may
+    // *also* get sent on by other applications in replay.
+    std::vector<
+        std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+    NonExclusiveChannels();
+
     // Stores all the timestamps that have been sent on this channel.  This is
     // only done for channels which are forwarded and on the node which
     // initially sends the message.  Compress using ranges and offsets.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 02081fd..35474c7 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -181,6 +181,46 @@
   }
 }
 
+// Tests that we die if the replayer attempts to send on a logged channel.
+TEST_F(LoggerDeathTest, DieOnDuplicateReplayChannels) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/pingpong_config.json"));
+  SimulatedEventLoopFactory event_loop_factory(&config.message());
+  const ::std::string tmpdir = aos::testing::TestTmpDir();
+  const ::std::string base_name = tmpdir + "/logfile";
+  const ::std::string config_file =
+      absl::StrCat(base_name, kSingleConfigSha256, ".bfbs");
+  const ::std::string logfile = base_name + ".part0.bfbs";
+  // Remove the log file.
+  unlink(config_file.c_str());
+  unlink(logfile.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile;
+
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory.MakeEventLoop("logger");
+
+    Logger logger(logger_event_loop.get());
+    logger.set_separate_config(false);
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLoggingLocalNamerOnRun(base_name);
+
+    event_loop_factory.RunFor(chrono::seconds(2));
+  }
+
+  LogReader reader(logfile);
+
+  reader.Register();
+
+  std::unique_ptr<EventLoop> test_event_loop =
+      reader.event_loop_factory()->MakeEventLoop("log_reader");
+
+  EXPECT_DEATH(test_event_loop->MakeSender<examples::Ping>("/test"),
+               "exclusive channel.*examples.Ping");
+}
+
 // Tests calling StopLogging twice.
 TEST_F(LoggerDeathTest, ExtraStop) {
   const ::std::string tmpdir = aos::testing::TestTmpDir();
@@ -442,7 +482,8 @@
     std::unique_ptr<EventLoop> ping_spammer_event_loop =
         event_loop_factory.GetNodeEventLoopFactory(nullptr)->MakeEventLoop(
             "ping_spammer", {NodeEventLoopFactory::CheckSentTooFast::kNo,
-                             NodeEventLoopFactory::ExclusiveSenders::kNo});
+                             NodeEventLoopFactory::ExclusiveSenders::kNo,
+                             {}});
     aos::Sender<examples::Ping> ping_sender =
         ping_spammer_event_loop->MakeSender<examples::Ping>("/test");
 
@@ -2216,14 +2257,14 @@
   SimulatedEventLoopFactory full_factory(full_reader.configuration());
   SimulatedEventLoopFactory single_node_factory(
       single_node_reader.configuration());
+  single_node_factory.SkipTimingReport();
+  single_node_factory.DisableStatistics();
   std::unique_ptr<EventLoop> replay_event_loop =
       single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
           "log_reader");
 
   full_reader.Register(&full_factory);
   single_node_reader.Register(replay_event_loop.get());
-  single_node_factory.SkipTimingReport();
-  single_node_factory.DisableStatistics();
 
   const Node *full_pi1 =
       configuration::GetNode(full_factory.configuration(), "pi1");