Turn on exclusive senders in LogReader
This makes it so that we now automatically detect situations where a
user attempts to send messages during replay that are simultaneously
*being* replayed.
Change-Id: I40f30693fe93c94018b6ddbc9f748e655cdf1fe3
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 1ad60c9..09c585b 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -528,6 +528,48 @@
}
}
+std::vector<
+ std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+LogReader::State::NonExclusiveChannels() {
+ CHECK_NOTNULL(node_event_loop_factory_);
+ const aos::Configuration *config = node_event_loop_factory_->configuration();
+ std::vector<
+ std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+ result{// Timing reports can be sent by logged and replayed applications.
+ {aos::configuration::GetChannel(config, "/aos",
+ "aos.timing.Report", "", node_),
+ NodeEventLoopFactory::ExclusiveSenders::kNo},
+ // AOS_LOG may be used in the log and in replay.
+ {aos::configuration::GetChannel(
+ config, "/aos", "aos.logging.LogMessageFbs", "", node_),
+ NodeEventLoopFactory::ExclusiveSenders::kNo}};
+ for (const Node *const node : configuration::GetNodes(config)) {
+ if (node == nullptr) {
+ break;
+ }
+ const Channel *const old_timestamp_channel = aos::configuration::GetChannel(
+ config,
+ absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
+ "aos.message_bridge.RemoteMessage", "", node_);
+ // The old-style remote timestamp channel can be populated from any
+ // channel, simulated or replayed.
+ if (old_timestamp_channel != nullptr) {
+ result.push_back(std::make_pair(
+ old_timestamp_channel, NodeEventLoopFactory::ExclusiveSenders::kNo));
+ }
+ }
+ // Remove any channels that weren't found due to not existing in the
+ // config.
+ for (size_t ii = 0; ii < result.size();) {
+ if (result[ii].first == nullptr) {
+ result.erase(result.begin() + ii);
+ } else {
+ ++ii;
+ }
+ }
+ return result;
+}
+
void LogReader::Register() {
event_loop_factory_unique_ptr_ =
std::make_unique<SimulatedEventLoopFactory>(configuration());
@@ -647,7 +689,7 @@
// If we are replaying a log, we don't want a bunch of redundant messages
// from both the real message bridge and simulated message bridge.
- event_loop_factory_->DisableStatistics();
+ event_loop_factory_->PermanentlyDisableStatistics();
}
// Write pseudo start times out to file now that we are all setup.
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 157249c..04d2888 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -453,7 +453,8 @@
// ensure we are remapping channels correctly.
event_loop_unique_ptr_ = node_event_loop_factory_->MakeEventLoop(
"log_reader", {NodeEventLoopFactory::CheckSentTooFast::kNo,
- NodeEventLoopFactory::ExclusiveSenders::kNo});
+ NodeEventLoopFactory::ExclusiveSenders::kYes,
+ NonExclusiveChannels()});
return event_loop_unique_ptr_.get();
}
@@ -620,6 +621,12 @@
uint32_t actual_queue_index = 0xffffffff;
};
+ // Returns a list of channels which LogReader will send on but which may
+ // *also* get sent on by other applications in replay.
+ std::vector<
+ std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+ NonExclusiveChannels();
+
// Stores all the timestamps that have been sent on this channel. This is
// only done for channels which are forwarded and on the node which
// initially sends the message. Compress using ranges and offsets.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 02081fd..35474c7 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -181,6 +181,46 @@
}
}
+// Tests that we die if the replayer attempts to send on a logged channel.
+TEST_F(LoggerDeathTest, DieOnDuplicateReplayChannels) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ ArtifactPath("aos/events/pingpong_config.json"));
+ SimulatedEventLoopFactory event_loop_factory(&config.message());
+ const ::std::string tmpdir = aos::testing::TestTmpDir();
+ const ::std::string base_name = tmpdir + "/logfile";
+ const ::std::string config_file =
+ absl::StrCat(base_name, kSingleConfigSha256, ".bfbs");
+ const ::std::string logfile = base_name + ".part0.bfbs";
+ // Remove the log file.
+ unlink(config_file.c_str());
+ unlink(logfile.c_str());
+
+ LOG(INFO) << "Logging data to " << logfile;
+
+ {
+ std::unique_ptr<EventLoop> logger_event_loop =
+ event_loop_factory.MakeEventLoop("logger");
+
+ Logger logger(logger_event_loop.get());
+ logger.set_separate_config(false);
+ logger.set_polling_period(std::chrono::milliseconds(100));
+ logger.StartLoggingLocalNamerOnRun(base_name);
+
+ event_loop_factory.RunFor(chrono::seconds(2));
+ }
+
+ LogReader reader(logfile);
+
+ reader.Register();
+
+ std::unique_ptr<EventLoop> test_event_loop =
+ reader.event_loop_factory()->MakeEventLoop("log_reader");
+
+ EXPECT_DEATH(test_event_loop->MakeSender<examples::Ping>("/test"),
+ "exclusive channel.*examples.Ping");
+}
+
// Tests calling StopLogging twice.
TEST_F(LoggerDeathTest, ExtraStop) {
const ::std::string tmpdir = aos::testing::TestTmpDir();
@@ -442,7 +482,8 @@
std::unique_ptr<EventLoop> ping_spammer_event_loop =
event_loop_factory.GetNodeEventLoopFactory(nullptr)->MakeEventLoop(
"ping_spammer", {NodeEventLoopFactory::CheckSentTooFast::kNo,
- NodeEventLoopFactory::ExclusiveSenders::kNo});
+ NodeEventLoopFactory::ExclusiveSenders::kNo,
+ {}});
aos::Sender<examples::Ping> ping_sender =
ping_spammer_event_loop->MakeSender<examples::Ping>("/test");
@@ -2216,14 +2257,14 @@
SimulatedEventLoopFactory full_factory(full_reader.configuration());
SimulatedEventLoopFactory single_node_factory(
single_node_reader.configuration());
+ single_node_factory.SkipTimingReport();
+ single_node_factory.DisableStatistics();
std::unique_ptr<EventLoop> replay_event_loop =
single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
"log_reader");
full_reader.Register(&full_factory);
single_node_reader.Register(replay_event_loop.get());
- single_node_factory.SkipTimingReport();
- single_node_factory.DisableStatistics();
const Node *full_pi1 =
configuration::GetNode(full_factory.configuration(), "pi1");