Allow LogReader to violate sent-too-fast checks

Otherwise we can't extract too fast messages from old logs.

Change-Id: I7f85e2e0e5235ea4340229ac19eee02a24f8ac4f
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 9bfc90a..196ac53 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -406,8 +406,11 @@
 
     EventLoop *MakeEventLoop() {
       CHECK(!event_loop_unique_ptr_);
-      event_loop_unique_ptr_ =
-          node_event_loop_factory_->MakeEventLoop("log_reader");
+      // TODO(james): Enable exclusive senders on LogReader to allow us to
+      // ensure we are remapping channels correctly.
+      event_loop_unique_ptr_ = node_event_loop_factory_->MakeEventLoop(
+          "log_reader", {NodeEventLoopFactory::CheckSentTooFast::kNo,
+                         NodeEventLoopFactory::ExclusiveSenders::kNo});
       return event_loop_unique_ptr_.get();
     }
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 42ba12f..6d828c9 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -416,6 +416,83 @@
   }
 }
 
+// Tests that we can read a logfile that has channels which were sent too fast.
+TEST(SingleNodeLoggerNoFixtureTest, ReadTooFast) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/pingpong_config.json"));
+  SimulatedEventLoopFactory event_loop_factory(&config.message());
+  const ::std::string tmpdir = aos::testing::TestTmpDir();
+  const ::std::string base_name = tmpdir + "/logfile";
+  const ::std::string config_file =
+      absl::StrCat(base_name, kSingleConfigSha256, ".bfbs");
+  const ::std::string logfile = base_name + ".part0.bfbs";
+  // Remove the log file.
+  unlink(config_file.c_str());
+  unlink(logfile.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile;
+
+  int sent_messages = 0;
+
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory.MakeEventLoop("logger");
+
+    std::unique_ptr<EventLoop> ping_spammer_event_loop =
+        event_loop_factory.GetNodeEventLoopFactory(nullptr)->MakeEventLoop(
+            "ping_spammer", {NodeEventLoopFactory::CheckSentTooFast::kNo,
+                             NodeEventLoopFactory::ExclusiveSenders::kNo});
+    aos::Sender<examples::Ping> ping_sender =
+        ping_spammer_event_loop->MakeSender<examples::Ping>("/test");
+
+    aos::TimerHandler *timer_handler =
+        ping_spammer_event_loop->AddTimer([&ping_sender, &sent_messages]() {
+          aos::Sender<examples::Ping>::Builder builder =
+              ping_sender.MakeBuilder();
+          examples::Ping::Builder ping_builder =
+              builder.MakeBuilder<examples::Ping>();
+          CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
+          ++sent_messages;
+        });
+
+    constexpr std::chrono::microseconds kSendPeriod{10};
+    const int max_legal_messages =
+        ping_sender.channel()->frequency() *
+        event_loop_factory.configuration()->channel_storage_duration() /
+        1000000000;
+
+    ping_spammer_event_loop->OnRun(
+        [&ping_spammer_event_loop, kSendPeriod, timer_handler]() {
+          timer_handler->Setup(
+              ping_spammer_event_loop->monotonic_now() + kSendPeriod / 2,
+              kSendPeriod);
+        });
+
+    Logger logger(logger_event_loop.get());
+    logger.set_separate_config(false);
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLoggingLocalNamerOnRun(base_name);
+
+    event_loop_factory.RunFor(kSendPeriod * max_legal_messages * 2);
+  }
+
+  LogReader reader(logfile);
+
+  reader.Register();
+
+  std::unique_ptr<EventLoop> test_event_loop =
+      reader.event_loop_factory()->MakeEventLoop("log_reader");
+
+  int replay_count = 0;
+
+  test_event_loop->MakeWatcher(
+      "/test", [&replay_count](const examples::Ping &) { ++replay_count; });
+
+  reader.event_loop_factory()->Run();
+  EXPECT_EQ(replay_count, sent_messages);
+}
+
 struct CompressionParams {
   std::string_view extension;
   std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory;
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 69e6638..4671f12 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -21,6 +21,10 @@
 class SimulatedFetcher;
 class SimulatedChannel;
 
+using CheckSentTooFast = NodeEventLoopFactory::CheckSentTooFast;
+using ExclusiveSenders = NodeEventLoopFactory::ExclusiveSenders;
+using EventLoopOptions = NodeEventLoopFactory::EventLoopOptions;
+
 namespace {
 
 std::string NodeName(const Node *node) {
@@ -231,7 +235,8 @@
 
   // Sends the message to all the connected receivers and fetchers.  Returns the
   // sent queue index, or std::nullopt if messages were sent too fast.
-  std::optional<uint32_t> Send(std::shared_ptr<SimulatedMessage> message);
+  std::optional<uint32_t> Send(std::shared_ptr<SimulatedMessage> message,
+                               CheckSentTooFast check_sent_too_fast);
 
   // Unregisters a fetcher.
   void UnregisterFetcher(SimulatedFetcher *fetcher);
@@ -259,6 +264,9 @@
   void CountSenderDestroyed() {
     --sender_count_;
     CHECK_GE(sender_count_, 0);
+    if (sender_count_ == 0) {
+      allow_new_senders_ = true;
+    }
   }
 
  private:
@@ -296,6 +304,9 @@
   ipc_lib::QueueIndex next_queue_index_;
 
   int sender_count_ = 0;
+  // Used to track when an exclusive sender has been created (e.g., for log
+  // replay) and we want to prevent new senders from being accidentally created.
+  bool allow_new_senders_ = true;
 
   std::vector<uint16_t> available_buffer_indices_;
 
@@ -531,7 +542,7 @@
           *channels,
       const Configuration *configuration,
       std::vector<SimulatedEventLoop *> *event_loops_, const Node *node,
-      pid_t tid)
+      pid_t tid, EventLoopOptions options)
       : EventLoop(CHECK_NOTNULL(configuration)),
         scheduler_(scheduler),
         node_event_loop_factory_(node_event_loop_factory),
@@ -539,7 +550,8 @@
         event_loops_(event_loops_),
         node_(node),
         tid_(tid),
-        startup_tracker_(std::make_shared<StartupTracker>()) {
+        startup_tracker_(std::make_shared<StartupTracker>()),
+        options_(options) {
     startup_tracker_->loop = this;
     scheduler_->ScheduleOnStartup([startup_tracker = startup_tracker_]() {
       if (startup_tracker->loop) {
@@ -675,6 +687,8 @@
     return node_event_loop_factory_->boot_uuid();
   }
 
+  const EventLoopOptions &options() const { return options_; }
+
  private:
   friend class SimulatedTimerHandler;
   friend class SimulatedPhasedLoopHandler;
@@ -723,6 +737,8 @@
   bool has_run_ = false;
 
   std::shared_ptr<StartupTracker> startup_tracker_;
+
+  EventLoopOptions options_;
 };
 
 void SimulatedEventLoopFactory::set_send_delay(
@@ -923,6 +939,16 @@
 
 ::std::unique_ptr<RawSender> SimulatedChannel::MakeRawSender(
     SimulatedEventLoop *event_loop) {
+  CHECK(allow_new_senders_)
+      << ": Attempted to create a new sender on exclusive channel "
+      << configuration::StrippedChannelToString(channel_);
+  if (event_loop->options().exclusive_senders == ExclusiveSenders::kYes) {
+    CHECK_EQ(0, sender_count_)
+        << ": Attempted to add an exclusive sender on a channel with existing "
+           "senders: "
+        << configuration::StrippedChannelToString(channel_);
+    allow_new_senders_ = false;
+  }
   return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
 }
 
@@ -936,7 +962,7 @@
 }
 
 std::optional<uint32_t> SimulatedChannel::Send(
-    std::shared_ptr<SimulatedMessage> message) {
+    std::shared_ptr<SimulatedMessage> message, CheckSentTooFast check_sent_too_fast) {
   const auto now = scheduler_->monotonic_now();
   // Remove times that are greater than or equal to a channel_storage_duration_
   // ago
@@ -946,7 +972,8 @@
   }
 
   // Check that we are not sending messages too fast
-  if (static_cast<int>(last_times_.size()) >= queue_size()) {
+  if (check_sent_too_fast == CheckSentTooFast::kYes &&
+      static_cast<int>(last_times_.size()) >= queue_size()) {
     return std::nullopt;
   }
 
@@ -1025,7 +1052,7 @@
   message_->context.size = length;
 
   const std::optional<uint32_t> optional_queue_index =
-      simulated_channel_->Send(message_);
+      simulated_channel_->Send(message_, simulated_event_loop_->options().check_sent_too_fast);
 
   // Check that we are not sending messages too fast
   if (!optional_queue_index) {
@@ -1462,7 +1489,7 @@
 }
 
 ::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
-    std::string_view name) {
+    std::string_view name, EventLoopOptions options) {
   CHECK(!scheduler_.is_running() || !started_)
       << ": Can't create an event loop while running";
 
@@ -1470,7 +1497,7 @@
   ++tid_;
   ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
       &scheduler_, this, &channels_, factory_->configuration(), &event_loops_,
-      node_, tid));
+      node_, tid, options));
   result->set_name(name);
   result->set_send_delay(factory_->send_delay());
   if (skip_timing_report_) {
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 9a447e9..234afa0 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -154,7 +154,26 @@
  public:
   ~NodeEventLoopFactory();
 
-  std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
+  // Whether a given event loop should have its senders checked for messages
+  // being sent too fast. Should only be used by the LogReader or other highly
+  // specialized applications that need to be able to bypass normal behaviors.
+  enum class CheckSentTooFast { kNo, kYes };
+  // Whether the created EventLoop should be the only one allowed to send on all
+  // of its channels. Mostly useful for the LogReader, to allow us to confirm
+  // whether the LogReader is conflicting with the output of any applications
+  // being run in replay.
+  enum class ExclusiveSenders { kNo, kYes };
+  struct EventLoopOptions {
+    CheckSentTooFast check_sent_too_fast;
+    ExclusiveSenders exclusive_senders;
+  };
+
+  // Takes the name for the event loop and a struct of options for selecting
+  // what checks to run for the event loop in question.
+  std::unique_ptr<EventLoop> MakeEventLoop(
+      std::string_view name,
+      EventLoopOptions options = EventLoopOptions{CheckSentTooFast::kYes,
+                                                  ExclusiveSenders::kNo});
 
   // Returns the node that this factory is running as, or nullptr if this is a
   // single node setup.
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index a83243d..4b155ce 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -246,6 +246,72 @@
   EXPECT_EQ(test_message_counter2.count(), 0u);
 }
 
+// Test that if we configure an event loop to be able to send too fast that we do allow it to do so.
+TEST(SimulatedEventLoopTest, AllowSendTooFast) {
+  SimulatedEventLoopTestFactory factory;
+
+  SimulatedEventLoopFactory simulated_event_loop_factory(
+      factory.configuration());
+
+  // Create two event loops: One will be allowed to send too fast, one won't. We
+  // will then test to ensure that the one that is allowed to send too fast can
+  // indeed send too fast, but that it then makes it so that the second event
+  // loop can no longer send anything because *it* is still limited.
+  ::std::unique_ptr<EventLoop> too_fast_event_loop =
+      simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
+          ->MakeEventLoop("too_fast_sender",
+                          {NodeEventLoopFactory::CheckSentTooFast::kNo,
+                           NodeEventLoopFactory::ExclusiveSenders::kNo});
+  aos::Sender<TestMessage> too_fast_message_sender =
+      too_fast_event_loop->MakeSender<TestMessage>("/test");
+
+  ::std::unique_ptr<EventLoop> limited_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("limited_sender");
+  aos::Sender<TestMessage> limited_message_sender =
+      limited_event_loop->MakeSender<TestMessage>("/test");
+
+  const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
+  for (int ii = 0; ii < queue_size; ++ii) {
+    ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
+  }
+  // And now we should start being in the sending-too-fast phase.
+  for (int ii = 0; ii < queue_size; ++ii) {
+    ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
+    ASSERT_EQ(SendTestMessage(limited_message_sender), RawSender::Error::kMessagesSentTooFast);
+  }
+}
+
+// Test that if we setup an exclusive sender that it is indeed exclusive.
+TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
+  SimulatedEventLoopTestFactory factory;
+
+  SimulatedEventLoopFactory simulated_event_loop_factory(
+      factory.configuration());
+
+  ::std::unique_ptr<EventLoop> exclusive_event_loop =
+      simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
+          ->MakeEventLoop("too_fast_sender",
+                          {NodeEventLoopFactory::CheckSentTooFast::kYes,
+                           NodeEventLoopFactory::ExclusiveSenders::kYes});
+  exclusive_event_loop->SkipAosLog();
+  exclusive_event_loop->SkipTimingReport();
+  ::std::unique_ptr<EventLoop> normal_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("limited_sender");
+  // Set things up to have the exclusive sender be destroyed so we can test
+  // recovery.
+  {
+    aos::Sender<TestMessage> exclusive_sender =
+        exclusive_event_loop->MakeSender<TestMessage>("/test");
+
+    EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
+                 "TestMessage");
+  }
+  // This one should succeed now that the exclusive channel is removed.
+  aos::Sender<TestMessage> normal_sender =
+      normal_event_loop->MakeSender<TestMessage>("/test");
+  EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"), "TestMessage");
+}
+
 void TestSentTooFastCheckEdgeCase(
     const std::function<RawSender::Error(int, int)> expected_err,
     const bool send_twice_at_end) {
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 3a7a4e8..f5b3eae 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -83,7 +83,12 @@
     }
 
     void MakeEventLoop() {
-      SetEventLoop(node_factory_->MakeEventLoop("message_bridge"));
+      // Message bridge isn't the thing that should be catching sent-too-fast,
+      // and may need to be able to forward too-fast messages replayed from old
+      // logfiles.
+      SetEventLoop(node_factory_->MakeEventLoop(
+          "message_bridge", {NodeEventLoopFactory::CheckSentTooFast::kNo,
+                             NodeEventLoopFactory::ExclusiveSenders::kNo}));
     }
 
     void SetEventLoop(std::unique_ptr<aos::EventLoop> loop);