Turn on exclusive senders in LogReader

This makes it so that we now automatically detect situations where a
user attempts to send messages during replay that are simultaneously
*being* replayed.

Change-Id: I40f30693fe93c94018b6ddbc9f748e655cdf1fe3
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 1ad60c9..09c585b 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -528,6 +528,48 @@
   }
 }
 
+std::vector<
+    std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+LogReader::State::NonExclusiveChannels() {
+  CHECK_NOTNULL(node_event_loop_factory_);
+  const aos::Configuration *config = node_event_loop_factory_->configuration();
+  std::vector<
+      std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+      result{// Timing reports can be sent by logged and replayed applications.
+             {aos::configuration::GetChannel(config, "/aos",
+                                             "aos.timing.Report", "", node_),
+              NodeEventLoopFactory::ExclusiveSenders::kNo},
+             // AOS_LOG may be used in the log and in replay.
+             {aos::configuration::GetChannel(
+                  config, "/aos", "aos.logging.LogMessageFbs", "", node_),
+              NodeEventLoopFactory::ExclusiveSenders::kNo}};
+  for (const Node *const node : configuration::GetNodes(config)) {
+    if (node == nullptr) {
+      break;
+    }
+    const Channel *const old_timestamp_channel = aos::configuration::GetChannel(
+        config,
+        absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
+        "aos.message_bridge.RemoteMessage", "", node_);
+    // The old-style remote timestamp channel can be populated from any
+    // channel, simulated or replayed.
+    if (old_timestamp_channel != nullptr) {
+      result.push_back(std::make_pair(
+          old_timestamp_channel, NodeEventLoopFactory::ExclusiveSenders::kNo));
+    }
+  }
+  // Remove any channels that weren't found due to not existing in the
+  // config.
+  for (size_t ii = 0; ii < result.size();) {
+    if (result[ii].first == nullptr) {
+      result.erase(result.begin() + ii);
+    } else {
+      ++ii;
+    }
+  }
+  return result;
+}
+
 void LogReader::Register() {
   event_loop_factory_unique_ptr_ =
       std::make_unique<SimulatedEventLoopFactory>(configuration());
@@ -647,7 +689,7 @@
 
     // If we are replaying a log, we don't want a bunch of redundant messages
     // from both the real message bridge and simulated message bridge.
-    event_loop_factory_->DisableStatistics();
+    event_loop_factory_->PermanentlyDisableStatistics();
   }
 
   // Write pseudo start times out to file now that we are all setup.
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 157249c..04d2888 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -453,7 +453,8 @@
       // ensure we are remapping channels correctly.
       event_loop_unique_ptr_ = node_event_loop_factory_->MakeEventLoop(
           "log_reader", {NodeEventLoopFactory::CheckSentTooFast::kNo,
-                         NodeEventLoopFactory::ExclusiveSenders::kNo});
+                         NodeEventLoopFactory::ExclusiveSenders::kYes,
+                         NonExclusiveChannels()});
       return event_loop_unique_ptr_.get();
     }
 
@@ -620,6 +621,12 @@
       uint32_t actual_queue_index = 0xffffffff;
     };
 
+    // Returns a list of channels which LogReader will send on but which may
+    // *also* get sent on by other applications in replay.
+    std::vector<
+        std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+    NonExclusiveChannels();
+
     // Stores all the timestamps that have been sent on this channel.  This is
     // only done for channels which are forwarded and on the node which
     // initially sends the message.  Compress using ranges and offsets.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 02081fd..35474c7 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -181,6 +181,46 @@
   }
 }
 
+// Tests that we die if the replayer attempts to send on a logged channel.
+TEST_F(LoggerDeathTest, DieOnDuplicateReplayChannels) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/pingpong_config.json"));
+  SimulatedEventLoopFactory event_loop_factory(&config.message());
+  const ::std::string tmpdir = aos::testing::TestTmpDir();
+  const ::std::string base_name = tmpdir + "/logfile";
+  const ::std::string config_file =
+      absl::StrCat(base_name, kSingleConfigSha256, ".bfbs");
+  const ::std::string logfile = base_name + ".part0.bfbs";
+  // Remove the log file.
+  unlink(config_file.c_str());
+  unlink(logfile.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile;
+
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory.MakeEventLoop("logger");
+
+    Logger logger(logger_event_loop.get());
+    logger.set_separate_config(false);
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLoggingLocalNamerOnRun(base_name);
+
+    event_loop_factory.RunFor(chrono::seconds(2));
+  }
+
+  LogReader reader(logfile);
+
+  reader.Register();
+
+  std::unique_ptr<EventLoop> test_event_loop =
+      reader.event_loop_factory()->MakeEventLoop("log_reader");
+
+  EXPECT_DEATH(test_event_loop->MakeSender<examples::Ping>("/test"),
+               "exclusive channel.*examples.Ping");
+}
+
 // Tests calling StopLogging twice.
 TEST_F(LoggerDeathTest, ExtraStop) {
   const ::std::string tmpdir = aos::testing::TestTmpDir();
@@ -442,7 +482,8 @@
     std::unique_ptr<EventLoop> ping_spammer_event_loop =
         event_loop_factory.GetNodeEventLoopFactory(nullptr)->MakeEventLoop(
             "ping_spammer", {NodeEventLoopFactory::CheckSentTooFast::kNo,
-                             NodeEventLoopFactory::ExclusiveSenders::kNo});
+                             NodeEventLoopFactory::ExclusiveSenders::kNo,
+                             {}});
     aos::Sender<examples::Ping> ping_sender =
         ping_spammer_event_loop->MakeSender<examples::Ping>("/test");
 
@@ -2216,14 +2257,14 @@
   SimulatedEventLoopFactory full_factory(full_reader.configuration());
   SimulatedEventLoopFactory single_node_factory(
       single_node_reader.configuration());
+  single_node_factory.SkipTimingReport();
+  single_node_factory.DisableStatistics();
   std::unique_ptr<EventLoop> replay_event_loop =
       single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
           "log_reader");
 
   full_reader.Register(&full_factory);
   single_node_reader.Register(replay_event_loop.get());
-  single_node_factory.SkipTimingReport();
-  single_node_factory.DisableStatistics();
 
   const Node *full_pi1 =
       configuration::GetNode(full_factory.configuration(), "pi1");
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index c218df6..c78405d 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -949,7 +949,27 @@
   CHECK(allow_new_senders_)
       << ": Attempted to create a new sender on exclusive channel "
       << configuration::StrippedChannelToString(channel_);
-  if (event_loop->options().exclusive_senders == ExclusiveSenders::kYes) {
+  std::optional<ExclusiveSenders> per_channel_option;
+  for (const std::pair<const aos::Channel *, ExclusiveSenders> &per_channel :
+       event_loop->options().per_channel_exclusivity) {
+    if (per_channel.first->name()->string_view() ==
+            channel_->name()->string_view() &&
+        per_channel.first->type()->string_view() ==
+            channel_->type()->string_view()) {
+      CHECK(!per_channel_option.has_value())
+          << ": Channel " << configuration::StrippedChannelToString(channel_)
+          << " listed twice in per-channel list.";
+      per_channel_option = per_channel.second;
+    }
+  }
+  if (!per_channel_option.has_value()) {
+    // This could just as easily be implemented by setting
+    // per_channel_option to the global setting when we initialize it, but
+    // then we'd lose track of whether a given channel appears twice in
+    // the list.
+    per_channel_option = event_loop->options().exclusive_senders;
+  }
+  if (per_channel_option.value() == ExclusiveSenders::kYes) {
     CHECK_EQ(0, sender_count_)
         << ": Attempted to add an exclusive sender on a channel with existing "
            "senders: "
@@ -1466,7 +1486,14 @@
 
 void SimulatedEventLoopFactory::DisableStatistics() {
   CHECK(bridge_) << ": Can't disable statistics without a message bridge.";
-  bridge_->DisableStatistics();
+  bridge_->DisableStatistics(
+      message_bridge::SimulatedMessageBridge::DestroySenders::kNo);
+}
+
+void SimulatedEventLoopFactory::PermanentlyDisableStatistics() {
+  CHECK(bridge_) << ": Can't disable statistics without a message bridge.";
+  bridge_->DisableStatistics(
+      message_bridge::SimulatedMessageBridge::DestroySenders::kYes);
 }
 
 void SimulatedEventLoopFactory::EnableStatistics() {
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index e6ba4bf..9d5d11f 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -120,6 +120,9 @@
 
   // Disables the messages sent by the simulated message gateway.
   void DisableStatistics();
+  // Disables statistics sent by the simulated message gateway, and prevents
+  // EnableStatistcs from ever being called again (used by LogReader).
+  void PermanentlyDisableStatistics();
   // Enables the messages sent by the simulated message gateway.
   void EnableStatistics();
 
@@ -175,14 +178,18 @@
   struct EventLoopOptions {
     CheckSentTooFast check_sent_too_fast;
     ExclusiveSenders exclusive_senders;
+    // per_channel_exclusivity is used to list any exceptions to the overall
+    // exclusive_senders policy for this event loop.
+    std::vector<std::pair<const aos::Channel *, ExclusiveSenders>>
+        per_channel_exclusivity;
   };
 
   // Takes the name for the event loop and a struct of options for selecting
   // what checks to run for the event loop in question.
   std::unique_ptr<EventLoop> MakeEventLoop(
       std::string_view name,
-      EventLoopOptions options = EventLoopOptions{CheckSentTooFast::kYes,
-                                                  ExclusiveSenders::kNo});
+      EventLoopOptions options = EventLoopOptions{
+          CheckSentTooFast::kYes, ExclusiveSenders::kNo, {}});
 
   // Returns the node that this factory is running as, or nullptr if this is a
   // single node setup.
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 1a35b69..a46cc1c 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -262,7 +262,8 @@
       simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
           ->MakeEventLoop("too_fast_sender",
                           {NodeEventLoopFactory::CheckSentTooFast::kNo,
-                           NodeEventLoopFactory::ExclusiveSenders::kNo});
+                           NodeEventLoopFactory::ExclusiveSenders::kNo,
+                           {}});
   aos::Sender<TestMessage> too_fast_message_sender =
       too_fast_event_loop->MakeSender<TestMessage>("/test");
 
@@ -292,9 +293,13 @@
 
   ::std::unique_ptr<EventLoop> exclusive_event_loop =
       simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
-          ->MakeEventLoop("too_fast_sender",
-                          {NodeEventLoopFactory::CheckSentTooFast::kYes,
-                           NodeEventLoopFactory::ExclusiveSenders::kYes});
+          ->MakeEventLoop(
+              "too_fast_sender",
+              {NodeEventLoopFactory::CheckSentTooFast::kYes,
+               NodeEventLoopFactory::ExclusiveSenders::kYes,
+               {{configuration::GetChannel(factory.configuration(), "/test1",
+                                           "aos.TestMessage", "", nullptr),
+                 NodeEventLoopFactory::ExclusiveSenders::kNo}}});
   exclusive_event_loop->SkipAosLog();
   exclusive_event_loop->SkipTimingReport();
   ::std::unique_ptr<EventLoop> normal_event_loop =
@@ -313,6 +318,12 @@
       normal_event_loop->MakeSender<TestMessage>("/test");
   EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
                "TestMessage");
+
+  // And check an explicitly exempted channel:
+  aos::Sender<TestMessage> non_exclusive_sender =
+      exclusive_event_loop->MakeSender<TestMessage>("/test1");
+  aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
+      normal_event_loop->MakeSender<TestMessage>("/test1");
 }
 
 void TestSentTooFastCheckEdgeCase(
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 19ca658..108176e 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -32,6 +32,10 @@
   bool forwarding_disabled() const { return forwarding_disabled_; }
   void set_forwarding_disabled(bool forwarding_disabled) {
     forwarding_disabled_ = forwarding_disabled;
+    if (!forwarding_disabled_) {
+      CHECK(timestamp_logger_ == nullptr);
+      CHECK(sender_ == nullptr);
+    }
   }
 
   void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
@@ -50,7 +54,8 @@
       server_connection_ =
           server_status_->FindServerConnection(send_node_factory_->node());
     }
-    if (delivery_time_is_logged_ && timestamp_loggers != nullptr) {
+    if (delivery_time_is_logged_ && timestamp_loggers != nullptr &&
+        !forwarding_disabled_) {
       timestamp_logger_ =
           timestamp_loggers->SenderForChannel(channel_, connection_);
     } else {
@@ -79,7 +84,7 @@
                         MessageBridgeClientStatus *client_status) {
     sent_ = false;
     send_event_loop_ = send_event_loop;
-    if (send_event_loop_) {
+    if (send_event_loop_ && !forwarding_disabled_) {
       sender_ = send_event_loop_->MakeRawSender(channel_);
     } else {
       sender_ = nullptr;
@@ -561,16 +566,17 @@
   destination_state->second.SetClientState(source, state);
 }
 
-void SimulatedMessageBridge::DisableStatistics() {
+void SimulatedMessageBridge::DisableStatistics(DestroySenders destroy_senders) {
   for (std::pair<const Node *const, State> &state : event_loop_map_) {
-    state.second.DisableStatistics();
+    state.second.DisableStatistics(destroy_senders);
   }
 }
 
-void SimulatedMessageBridge::DisableStatistics(const Node *node) {
+void SimulatedMessageBridge::DisableStatistics(const Node *node,
+                                               DestroySenders destroy_senders) {
   auto it = event_loop_map_.find(node);
   CHECK(it != event_loop_map_.end());
-  it->second.DisableStatistics();
+  it->second.DisableStatistics(destroy_senders);
 }
 
 void SimulatedMessageBridge::EnableStatistics() {
@@ -625,7 +631,7 @@
   timestamp_loggers = ChannelTimestampSender(event_loop.get());
   server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
   if (disable_statistics_) {
-    server_status->DisableStatistics();
+    server_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
   }
 
   {
@@ -659,7 +665,7 @@
   }
   client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
   if (disable_statistics_) {
-    client_status->DisableStatistics();
+    client_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
   }
 
   for (size_t i = 0;
@@ -701,8 +707,16 @@
             configuration::ConnectionDeliveryTimeIsLoggedOnNode(
                 connection, event_loop->node());
 
+        const RawMessageDelayer *delayer = nullptr;
+        for (const RawMessageDelayer *candidate : source_delayers_) {
+          if (candidate->channel() == channel) {
+            delayer = candidate;
+          }
+        }
+
         // And the timestamps are then logged back by us again.
-        if (!delivery_time_is_logged) {
+        if (!delivery_time_is_logged ||
+            CHECK_NOTNULL(delayer)->forwarding_disabled()) {
           continue;
         }
 
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index f5b3eae..9565029 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -35,8 +35,10 @@
   // Disables generating and sending the messages which message_gateway sends.
   // The messages are the ClientStatistics, ServerStatistics and Timestamp
   // messages.
-  void DisableStatistics();
-  void DisableStatistics(const Node *node);
+  enum class DestroySenders { kNo, kYes };
+  void DisableStatistics(DestroySenders destroy_senders = DestroySenders::kNo);
+  void DisableStatistics(const Node *node,
+                         DestroySenders destroy_senders = DestroySenders::kNo);
   void EnableStatistics();
   void EnableStatistics(const Node *node);
 
@@ -55,13 +57,16 @@
     }
     State(const State &state) = delete;
 
-    void DisableStatistics() {
+    void DisableStatistics(DestroySenders destroy_senders) {
       disable_statistics_ = true;
+      destroy_senders_ = destroy_senders;
       if (server_status) {
-        server_status->DisableStatistics();
+        server_status->DisableStatistics(destroy_senders ==
+                                         DestroySenders::kYes);
       }
       if (client_status) {
-        client_status->DisableStatistics();
+        client_status->DisableStatistics(destroy_senders ==
+                                         DestroySenders::kYes);
       }
     }
 
@@ -88,7 +93,8 @@
       // logfiles.
       SetEventLoop(node_factory_->MakeEventLoop(
           "message_bridge", {NodeEventLoopFactory::CheckSentTooFast::kNo,
-                             NodeEventLoopFactory::ExclusiveSenders::kNo}));
+                             NodeEventLoopFactory::ExclusiveSenders::kNo,
+                             {}}));
     }
 
     void SetEventLoop(std::unique_ptr<aos::EventLoop> loop);
@@ -218,7 +224,9 @@
     std::vector<RawMessageDelayer *> destination_delayers_;
 
     bool disable_statistics_ = false;
+    DestroySenders destroy_senders_ = DestroySenders::kNo;
   };
+
   // Map of nodes to event loops.  This is a member variable so that the
   // lifetime of the event loops matches the lifetime of the bridge.
   std::map<const Node *, State> event_loop_map_;
diff --git a/aos/network/message_bridge_client_status.cc b/aos/network/message_bridge_client_status.cc
index aa2484d..f3511a0 100644
--- a/aos/network/message_bridge_client_status.cc
+++ b/aos/network/message_bridge_client_status.cc
@@ -219,12 +219,16 @@
   filter->Sample(monotonic_delivered_time, offset);
 }
 
-void MessageBridgeClientStatus::DisableStatistics() {
+void MessageBridgeClientStatus::DisableStatistics(bool destroy_sender) {
   statistics_timer_->Disable();
   send_ = false;
+  if (destroy_sender) {
+    sender_ = aos::Sender<ClientStatistics>();
+  }
 }
 
 void MessageBridgeClientStatus::EnableStatistics() {
+  CHECK(sender_.valid());
   send_ = true;
   statistics_timer_->Setup(event_loop_->monotonic_now() + kStatisticsPeriod,
                            kStatisticsPeriod);
diff --git a/aos/network/message_bridge_client_status.h b/aos/network/message_bridge_client_status.h
index 9c21169..033af95 100644
--- a/aos/network/message_bridge_client_status.h
+++ b/aos/network/message_bridge_client_status.h
@@ -54,7 +54,10 @@
   void Connect(int client_index);
 
   // Disables sending out any statistics messages.
-  void DisableStatistics();
+  // If destroy_sender is set, will clear the ClientStatistics Sender.
+  // EnableStatistics cannot be called again if destroy_sender is set. This is
+  // used by the LogReader to enforce one-sender-per-channel checks.
+  void DisableStatistics(bool destroy_sender);
   // Enables sending out any statistics messages.
   void EnableStatistics();
 
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index c82158b..4f6abff 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -411,13 +411,19 @@
   }
 }
 
-void MessageBridgeServerStatus::DisableStatistics() {
+void MessageBridgeServerStatus::DisableStatistics(bool destroy_senders) {
   send_ = false;
   statistics_timer_->Disable();
+  if (destroy_senders) {
+    sender_ = aos::Sender<ServerStatistics>();
+    timestamp_sender_ = aos::Sender<Timestamp>();
+  }
 }
 
 void MessageBridgeServerStatus::EnableStatistics() {
   send_ = true;
+  CHECK(sender_.valid());
+  CHECK(timestamp_sender_.valid());
   statistics_timer_->Setup(event_loop_->monotonic_now() + kPingPeriod,
                            kPingPeriod);
 }
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index feb1b05..327930c 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -73,7 +73,7 @@
   }
 
   // Disables sending out any statistics messages.
-  void DisableStatistics();
+  void DisableStatistics(bool destroy_senders);
   // Enables sending out any statistics messages.
   void EnableStatistics();
 
diff --git a/aos/network/timestamp_channel.h b/aos/network/timestamp_channel.h
index e35a170..d95edda 100644
--- a/aos/network/timestamp_channel.h
+++ b/aos/network/timestamp_channel.h
@@ -59,6 +59,8 @@
 
   aos::Sender<RemoteMessage> *SenderForChannel(const Channel *channel,
                                                const Connection *connection);
+  void ClearSenderForChannel(const Channel *channel,
+                             const Connection *connection);
 
  private:
   aos::EventLoop *event_loop_;