Allow LogReader to violate sent-too-fast checks
Otherwise we can't extract too fast messages from old logs.
Change-Id: I7f85e2e0e5235ea4340229ac19eee02a24f8ac4f
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 9bfc90a..196ac53 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -406,8 +406,11 @@
EventLoop *MakeEventLoop() {
CHECK(!event_loop_unique_ptr_);
- event_loop_unique_ptr_ =
- node_event_loop_factory_->MakeEventLoop("log_reader");
+ // TODO(james): Enable exclusive senders on LogReader to allow us to
+ // ensure we are remapping channels correctly.
+ event_loop_unique_ptr_ = node_event_loop_factory_->MakeEventLoop(
+ "log_reader", {NodeEventLoopFactory::CheckSentTooFast::kNo,
+ NodeEventLoopFactory::ExclusiveSenders::kNo});
return event_loop_unique_ptr_.get();
}
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 42ba12f..6d828c9 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -416,6 +416,83 @@
}
}
+// Tests that we can read a logfile that has channels which were sent too fast.
+TEST(SingleNodeLoggerNoFixtureTest, ReadTooFast) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ ArtifactPath("aos/events/pingpong_config.json"));
+ SimulatedEventLoopFactory event_loop_factory(&config.message());
+ const ::std::string tmpdir = aos::testing::TestTmpDir();
+ const ::std::string base_name = tmpdir + "/logfile";
+ const ::std::string config_file =
+ absl::StrCat(base_name, kSingleConfigSha256, ".bfbs");
+ const ::std::string logfile = base_name + ".part0.bfbs";
+ // Remove the log file.
+ unlink(config_file.c_str());
+ unlink(logfile.c_str());
+
+ LOG(INFO) << "Logging data to " << logfile;
+
+ int sent_messages = 0;
+
+ {
+ std::unique_ptr<EventLoop> logger_event_loop =
+ event_loop_factory.MakeEventLoop("logger");
+
+ std::unique_ptr<EventLoop> ping_spammer_event_loop =
+ event_loop_factory.GetNodeEventLoopFactory(nullptr)->MakeEventLoop(
+ "ping_spammer", {NodeEventLoopFactory::CheckSentTooFast::kNo,
+ NodeEventLoopFactory::ExclusiveSenders::kNo});
+ aos::Sender<examples::Ping> ping_sender =
+ ping_spammer_event_loop->MakeSender<examples::Ping>("/test");
+
+ aos::TimerHandler *timer_handler =
+ ping_spammer_event_loop->AddTimer([&ping_sender, &sent_messages]() {
+ aos::Sender<examples::Ping>::Builder builder =
+ ping_sender.MakeBuilder();
+ examples::Ping::Builder ping_builder =
+ builder.MakeBuilder<examples::Ping>();
+ CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
+ ++sent_messages;
+ });
+
+ constexpr std::chrono::microseconds kSendPeriod{10};
+ const int max_legal_messages =
+ ping_sender.channel()->frequency() *
+ event_loop_factory.configuration()->channel_storage_duration() /
+ 1000000000;
+
+ ping_spammer_event_loop->OnRun(
+ [&ping_spammer_event_loop, kSendPeriod, timer_handler]() {
+ timer_handler->Setup(
+ ping_spammer_event_loop->monotonic_now() + kSendPeriod / 2,
+ kSendPeriod);
+ });
+
+ Logger logger(logger_event_loop.get());
+ logger.set_separate_config(false);
+ logger.set_polling_period(std::chrono::milliseconds(100));
+ logger.StartLoggingLocalNamerOnRun(base_name);
+
+ event_loop_factory.RunFor(kSendPeriod * max_legal_messages * 2);
+ }
+
+ LogReader reader(logfile);
+
+ reader.Register();
+
+ std::unique_ptr<EventLoop> test_event_loop =
+ reader.event_loop_factory()->MakeEventLoop("log_reader");
+
+ int replay_count = 0;
+
+ test_event_loop->MakeWatcher(
+ "/test", [&replay_count](const examples::Ping &) { ++replay_count; });
+
+ reader.event_loop_factory()->Run();
+ EXPECT_EQ(replay_count, sent_messages);
+}
+
struct CompressionParams {
std::string_view extension;
std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory;
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 69e6638..4671f12 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -21,6 +21,10 @@
class SimulatedFetcher;
class SimulatedChannel;
+using CheckSentTooFast = NodeEventLoopFactory::CheckSentTooFast;
+using ExclusiveSenders = NodeEventLoopFactory::ExclusiveSenders;
+using EventLoopOptions = NodeEventLoopFactory::EventLoopOptions;
+
namespace {
std::string NodeName(const Node *node) {
@@ -231,7 +235,8 @@
// Sends the message to all the connected receivers and fetchers. Returns the
// sent queue index, or std::nullopt if messages were sent too fast.
- std::optional<uint32_t> Send(std::shared_ptr<SimulatedMessage> message);
+ std::optional<uint32_t> Send(std::shared_ptr<SimulatedMessage> message,
+ CheckSentTooFast check_sent_too_fast);
// Unregisters a fetcher.
void UnregisterFetcher(SimulatedFetcher *fetcher);
@@ -259,6 +264,9 @@
void CountSenderDestroyed() {
--sender_count_;
CHECK_GE(sender_count_, 0);
+ if (sender_count_ == 0) {
+ allow_new_senders_ = true;
+ }
}
private:
@@ -296,6 +304,9 @@
ipc_lib::QueueIndex next_queue_index_;
int sender_count_ = 0;
+ // Used to track when an exclusive sender has been created (e.g., for log
+ // replay) and we want to prevent new senders from being accidentally created.
+ bool allow_new_senders_ = true;
std::vector<uint16_t> available_buffer_indices_;
@@ -531,7 +542,7 @@
*channels,
const Configuration *configuration,
std::vector<SimulatedEventLoop *> *event_loops_, const Node *node,
- pid_t tid)
+ pid_t tid, EventLoopOptions options)
: EventLoop(CHECK_NOTNULL(configuration)),
scheduler_(scheduler),
node_event_loop_factory_(node_event_loop_factory),
@@ -539,7 +550,8 @@
event_loops_(event_loops_),
node_(node),
tid_(tid),
- startup_tracker_(std::make_shared<StartupTracker>()) {
+ startup_tracker_(std::make_shared<StartupTracker>()),
+ options_(options) {
startup_tracker_->loop = this;
scheduler_->ScheduleOnStartup([startup_tracker = startup_tracker_]() {
if (startup_tracker->loop) {
@@ -675,6 +687,8 @@
return node_event_loop_factory_->boot_uuid();
}
+ const EventLoopOptions &options() const { return options_; }
+
private:
friend class SimulatedTimerHandler;
friend class SimulatedPhasedLoopHandler;
@@ -723,6 +737,8 @@
bool has_run_ = false;
std::shared_ptr<StartupTracker> startup_tracker_;
+
+ EventLoopOptions options_;
};
void SimulatedEventLoopFactory::set_send_delay(
@@ -923,6 +939,16 @@
::std::unique_ptr<RawSender> SimulatedChannel::MakeRawSender(
SimulatedEventLoop *event_loop) {
+ CHECK(allow_new_senders_)
+ << ": Attempted to create a new sender on exclusive channel "
+ << configuration::StrippedChannelToString(channel_);
+ if (event_loop->options().exclusive_senders == ExclusiveSenders::kYes) {
+ CHECK_EQ(0, sender_count_)
+ << ": Attempted to add an exclusive sender on a channel with existing "
+ "senders: "
+ << configuration::StrippedChannelToString(channel_);
+ allow_new_senders_ = false;
+ }
return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
}
@@ -936,7 +962,7 @@
}
std::optional<uint32_t> SimulatedChannel::Send(
- std::shared_ptr<SimulatedMessage> message) {
+ std::shared_ptr<SimulatedMessage> message, CheckSentTooFast check_sent_too_fast) {
const auto now = scheduler_->monotonic_now();
// Remove times that are greater than or equal to a channel_storage_duration_
// ago
@@ -946,7 +972,8 @@
}
// Check that we are not sending messages too fast
- if (static_cast<int>(last_times_.size()) >= queue_size()) {
+ if (check_sent_too_fast == CheckSentTooFast::kYes &&
+ static_cast<int>(last_times_.size()) >= queue_size()) {
return std::nullopt;
}
@@ -1025,7 +1052,7 @@
message_->context.size = length;
const std::optional<uint32_t> optional_queue_index =
- simulated_channel_->Send(message_);
+ simulated_channel_->Send(message_, simulated_event_loop_->options().check_sent_too_fast);
// Check that we are not sending messages too fast
if (!optional_queue_index) {
@@ -1462,7 +1489,7 @@
}
::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
- std::string_view name) {
+ std::string_view name, EventLoopOptions options) {
CHECK(!scheduler_.is_running() || !started_)
<< ": Can't create an event loop while running";
@@ -1470,7 +1497,7 @@
++tid_;
::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
&scheduler_, this, &channels_, factory_->configuration(), &event_loops_,
- node_, tid));
+ node_, tid, options));
result->set_name(name);
result->set_send_delay(factory_->send_delay());
if (skip_timing_report_) {
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 9a447e9..234afa0 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -154,7 +154,26 @@
public:
~NodeEventLoopFactory();
- std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
+ // Whether a given event loop should have its senders checked for messages
+ // being sent too fast. Should only be used by the LogReader or other highly
+ // specialized applications that need to be able to bypass normal behaviors.
+ enum class CheckSentTooFast { kNo, kYes };
+ // Whether the created EventLoop should be the only one allowed to send on all
+ // of its channels. Mostly useful for the LogReader, to allow us to confirm
+ // whether the LogReader is conflicting with the output of any applications
+ // being run in replay.
+ enum class ExclusiveSenders { kNo, kYes };
+ struct EventLoopOptions {
+ CheckSentTooFast check_sent_too_fast;
+ ExclusiveSenders exclusive_senders;
+ };
+
+ // Takes the name for the event loop and a struct of options for selecting
+ // what checks to run for the event loop in question.
+ std::unique_ptr<EventLoop> MakeEventLoop(
+ std::string_view name,
+ EventLoopOptions options = EventLoopOptions{CheckSentTooFast::kYes,
+ ExclusiveSenders::kNo});
// Returns the node that this factory is running as, or nullptr if this is a
// single node setup.
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index a83243d..4b155ce 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -246,6 +246,72 @@
EXPECT_EQ(test_message_counter2.count(), 0u);
}
+// Test that if we configure an event loop to be able to send too fast that we do allow it to do so.
+TEST(SimulatedEventLoopTest, AllowSendTooFast) {
+ SimulatedEventLoopTestFactory factory;
+
+ SimulatedEventLoopFactory simulated_event_loop_factory(
+ factory.configuration());
+
+ // Create two event loops: One will be allowed to send too fast, one won't. We
+ // will then test to ensure that the one that is allowed to send too fast can
+ // indeed send too fast, but that it then makes it so that the second event
+ // loop can no longer send anything because *it* is still limited.
+ ::std::unique_ptr<EventLoop> too_fast_event_loop =
+ simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
+ ->MakeEventLoop("too_fast_sender",
+ {NodeEventLoopFactory::CheckSentTooFast::kNo,
+ NodeEventLoopFactory::ExclusiveSenders::kNo});
+ aos::Sender<TestMessage> too_fast_message_sender =
+ too_fast_event_loop->MakeSender<TestMessage>("/test");
+
+ ::std::unique_ptr<EventLoop> limited_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("limited_sender");
+ aos::Sender<TestMessage> limited_message_sender =
+ limited_event_loop->MakeSender<TestMessage>("/test");
+
+ const int queue_size = TestChannelQueueSize(too_fast_event_loop.get());
+ for (int ii = 0; ii < queue_size; ++ii) {
+ ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
+ }
+ // And now we should start being in the sending-too-fast phase.
+ for (int ii = 0; ii < queue_size; ++ii) {
+ ASSERT_EQ(SendTestMessage(too_fast_message_sender), RawSender::Error::kOk);
+ ASSERT_EQ(SendTestMessage(limited_message_sender), RawSender::Error::kMessagesSentTooFast);
+ }
+}
+
+// Test that if we setup an exclusive sender that it is indeed exclusive.
+TEST(SimulatedEventLoopDeathTest, ExclusiveSenders) {
+ SimulatedEventLoopTestFactory factory;
+
+ SimulatedEventLoopFactory simulated_event_loop_factory(
+ factory.configuration());
+
+ ::std::unique_ptr<EventLoop> exclusive_event_loop =
+ simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
+ ->MakeEventLoop("too_fast_sender",
+ {NodeEventLoopFactory::CheckSentTooFast::kYes,
+ NodeEventLoopFactory::ExclusiveSenders::kYes});
+ exclusive_event_loop->SkipAosLog();
+ exclusive_event_loop->SkipTimingReport();
+ ::std::unique_ptr<EventLoop> normal_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("limited_sender");
+ // Set things up to have the exclusive sender be destroyed so we can test
+ // recovery.
+ {
+ aos::Sender<TestMessage> exclusive_sender =
+ exclusive_event_loop->MakeSender<TestMessage>("/test");
+
+ EXPECT_DEATH(normal_event_loop->MakeSender<TestMessage>("/test"),
+ "TestMessage");
+ }
+ // This one should succeed now that the exclusive channel is removed.
+ aos::Sender<TestMessage> normal_sender =
+ normal_event_loop->MakeSender<TestMessage>("/test");
+ EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"), "TestMessage");
+}
+
void TestSentTooFastCheckEdgeCase(
const std::function<RawSender::Error(int, int)> expected_err,
const bool send_twice_at_end) {
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 3a7a4e8..f5b3eae 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -83,7 +83,12 @@
}
void MakeEventLoop() {
- SetEventLoop(node_factory_->MakeEventLoop("message_bridge"));
+ // Message bridge isn't the thing that should be catching sent-too-fast,
+ // and may need to be able to forward too-fast messages replayed from old
+ // logfiles.
+ SetEventLoop(node_factory_->MakeEventLoop(
+ "message_bridge", {NodeEventLoopFactory::CheckSentTooFast::kNo,
+ NodeEventLoopFactory::ExclusiveSenders::kNo}));
}
void SetEventLoop(std::unique_ptr<aos::EventLoop> loop);