Turn on exclusive senders in LogReader
This makes it so that we now automatically detect situations where a
user attempts to send messages during replay that are simultaneously
*being* replayed.
Change-Id: I40f30693fe93c94018b6ddbc9f748e655cdf1fe3
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 1ad60c9..09c585b 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -528,6 +528,48 @@
}
}
+std::vector<
+ std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+LogReader::State::NonExclusiveChannels() {
+ CHECK_NOTNULL(node_event_loop_factory_);
+ const aos::Configuration *config = node_event_loop_factory_->configuration();
+ std::vector<
+ std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+ result{// Timing reports can be sent by logged and replayed applications.
+ {aos::configuration::GetChannel(config, "/aos",
+ "aos.timing.Report", "", node_),
+ NodeEventLoopFactory::ExclusiveSenders::kNo},
+ // AOS_LOG may be used in the log and in replay.
+ {aos::configuration::GetChannel(
+ config, "/aos", "aos.logging.LogMessageFbs", "", node_),
+ NodeEventLoopFactory::ExclusiveSenders::kNo}};
+ for (const Node *const node : configuration::GetNodes(config)) {
+ if (node == nullptr) {
+ break;
+ }
+ const Channel *const old_timestamp_channel = aos::configuration::GetChannel(
+ config,
+ absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
+ "aos.message_bridge.RemoteMessage", "", node_);
+ // The old-style remote timestamp channel can be populated from any
+ // channel, simulated or replayed.
+ if (old_timestamp_channel != nullptr) {
+ result.push_back(std::make_pair(
+ old_timestamp_channel, NodeEventLoopFactory::ExclusiveSenders::kNo));
+ }
+ }
+ // Remove any channels that weren't found due to not existing in the
+ // config.
+ for (size_t ii = 0; ii < result.size();) {
+ if (result[ii].first == nullptr) {
+ result.erase(result.begin() + ii);
+ } else {
+ ++ii;
+ }
+ }
+ return result;
+}
+
void LogReader::Register() {
event_loop_factory_unique_ptr_ =
std::make_unique<SimulatedEventLoopFactory>(configuration());
@@ -647,7 +689,7 @@
// If we are replaying a log, we don't want a bunch of redundant messages
// from both the real message bridge and simulated message bridge.
- event_loop_factory_->DisableStatistics();
+ event_loop_factory_->PermanentlyDisableStatistics();
}
// Write pseudo start times out to file now that we are all setup.
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 157249c..04d2888 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -453,7 +453,8 @@
// ensure we are remapping channels correctly.
event_loop_unique_ptr_ = node_event_loop_factory_->MakeEventLoop(
"log_reader", {NodeEventLoopFactory::CheckSentTooFast::kNo,
- NodeEventLoopFactory::ExclusiveSenders::kNo});
+ NodeEventLoopFactory::ExclusiveSenders::kYes,
+ NonExclusiveChannels()});
return event_loop_unique_ptr_.get();
}
@@ -620,6 +621,12 @@
uint32_t actual_queue_index = 0xffffffff;
};
+ // Returns a list of channels which LogReader will send on but which may
+ // *also* get sent on by other applications in replay.
+ std::vector<
+ std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+ NonExclusiveChannels();
+
// Stores all the timestamps that have been sent on this channel. This is
// only done for channels which are forwarded and on the node which
// initially sends the message. Compress using ranges and offsets.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 02081fd..35474c7 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -181,6 +181,46 @@
}
}
+// Tests that we die if the replayer attempts to send on a logged channel.
+TEST_F(LoggerDeathTest, DieOnDuplicateReplayChannels) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ ArtifactPath("aos/events/pingpong_config.json"));
+ SimulatedEventLoopFactory event_loop_factory(&config.message());
+ const ::std::string tmpdir = aos::testing::TestTmpDir();
+ const ::std::string base_name = tmpdir + "/logfile";
+ const ::std::string config_file =
+ absl::StrCat(base_name, kSingleConfigSha256, ".bfbs");
+ const ::std::string logfile = base_name + ".part0.bfbs";
+ // Remove the log file.
+ unlink(config_file.c_str());
+ unlink(logfile.c_str());
+
+ LOG(INFO) << "Logging data to " << logfile;
+
+ {
+ std::unique_ptr<EventLoop> logger_event_loop =
+ event_loop_factory.MakeEventLoop("logger");
+
+ Logger logger(logger_event_loop.get());
+ logger.set_separate_config(false);
+ logger.set_polling_period(std::chrono::milliseconds(100));
+ logger.StartLoggingLocalNamerOnRun(base_name);
+
+ event_loop_factory.RunFor(chrono::seconds(2));
+ }
+
+ LogReader reader(logfile);
+
+ reader.Register();
+
+ std::unique_ptr<EventLoop> test_event_loop =
+ reader.event_loop_factory()->MakeEventLoop("log_reader");
+
+ EXPECT_DEATH(test_event_loop->MakeSender<examples::Ping>("/test"),
+ "exclusive channel.*examples.Ping");
+}
+
// Tests calling StopLogging twice.
TEST_F(LoggerDeathTest, ExtraStop) {
const ::std::string tmpdir = aos::testing::TestTmpDir();
@@ -442,7 +482,8 @@
std::unique_ptr<EventLoop> ping_spammer_event_loop =
event_loop_factory.GetNodeEventLoopFactory(nullptr)->MakeEventLoop(
"ping_spammer", {NodeEventLoopFactory::CheckSentTooFast::kNo,
- NodeEventLoopFactory::ExclusiveSenders::kNo});
+ NodeEventLoopFactory::ExclusiveSenders::kNo,
+ {}});
aos::Sender<examples::Ping> ping_sender =
ping_spammer_event_loop->MakeSender<examples::Ping>("/test");
@@ -2216,14 +2257,14 @@
SimulatedEventLoopFactory full_factory(full_reader.configuration());
SimulatedEventLoopFactory single_node_factory(
single_node_reader.configuration());
+ single_node_factory.SkipTimingReport();
+ single_node_factory.DisableStatistics();
std::unique_ptr<EventLoop> replay_event_loop =
single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
"log_reader");
full_reader.Register(&full_factory);
single_node_reader.Register(replay_event_loop.get());
- single_node_factory.SkipTimingReport();
- single_node_factory.DisableStatistics();
const Node *full_pi1 =
configuration::GetNode(full_factory.configuration(), "pi1");
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index c218df6..c78405d 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -949,7 +949,27 @@
CHECK(allow_new_senders_)
<< ": Attempted to create a new sender on exclusive channel "
<< configuration::StrippedChannelToString(channel_);
- if (event_loop->options().exclusive_senders == ExclusiveSenders::kYes) {
+ std::optional<ExclusiveSenders> per_channel_option;
+ for (const std::pair<const aos::Channel *, ExclusiveSenders> &per_channel :
+ event_loop->options().per_channel_exclusivity) {
+ if (per_channel.first->name()->string_view() ==
+ channel_->name()->string_view() &&
+ per_channel.first->type()->string_view() ==
+ channel_->type()->string_view()) {
+ CHECK(!per_channel_option.has_value())
+ << ": Channel " << configuration::StrippedChannelToString(channel_)
+ << " listed twice in per-channel list.";
+ per_channel_option = per_channel.second;
+ }
+ }
+ if (!per_channel_option.has_value()) {
+ // This could just as easily be implemented by setting
+ // per_channel_option to the global setting when we initialize it, but
+ // then we'd lose track of whether a given channel appears twice in
+ // the list.
+ per_channel_option = event_loop->options().exclusive_senders;
+ }
+ if (per_channel_option.value() == ExclusiveSenders::kYes) {
CHECK_EQ(0, sender_count_)
<< ": Attempted to add an exclusive sender on a channel with existing "
"senders: "
@@ -1466,7 +1486,14 @@
void SimulatedEventLoopFactory::DisableStatistics() {
CHECK(bridge_) << ": Can't disable statistics without a message bridge.";
- bridge_->DisableStatistics();
+ bridge_->DisableStatistics(
+ message_bridge::SimulatedMessageBridge::DestroySenders::kNo);
+}
+
+void SimulatedEventLoopFactory::PermanentlyDisableStatistics() {
+ CHECK(bridge_) << ": Can't disable statistics without a message bridge.";
+ bridge_->DisableStatistics(
+ message_bridge::SimulatedMessageBridge::DestroySenders::kYes);
}
void SimulatedEventLoopFactory::EnableStatistics() {
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index e6ba4bf..9d5d11f 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -120,6 +120,9 @@
// Disables the messages sent by the simulated message gateway.
void DisableStatistics();
+ // Disables statistics sent by the simulated message gateway, and prevents
+ // EnableStatistcs from ever being called again (used by LogReader).
+ void PermanentlyDisableStatistics();
// Enables the messages sent by the simulated message gateway.
void EnableStatistics();
@@ -175,14 +178,18 @@
struct EventLoopOptions {
CheckSentTooFast check_sent_too_fast;
ExclusiveSenders exclusive_senders;
+ // per_channel_exclusivity is used to list any exceptions to the overall
+ // exclusive_senders policy for this event loop.
+ std::vector<std::pair<const aos::Channel *, ExclusiveSenders>>
+ per_channel_exclusivity;
};
// Takes the name for the event loop and a struct of options for selecting
// what checks to run for the event loop in question.
std::unique_ptr<EventLoop> MakeEventLoop(
std::string_view name,
- EventLoopOptions options = EventLoopOptions{CheckSentTooFast::kYes,
- ExclusiveSenders::kNo});
+ EventLoopOptions options = EventLoopOptions{
+ CheckSentTooFast::kYes, ExclusiveSenders::kNo, {}});
// Returns the node that this factory is running as, or nullptr if this is a
// single node setup.
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 1a35b69..a46cc1c 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -262,7 +262,8 @@
simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
->MakeEventLoop("too_fast_sender",
{NodeEventLoopFactory::CheckSentTooFast::kNo,
- NodeEventLoopFactory::ExclusiveSenders::kNo});
+ NodeEventLoopFactory::ExclusiveSenders::kNo,
+ {}});
aos::Sender<TestMessage> too_fast_message_sender =
too_fast_event_loop->MakeSender<TestMessage>("/test");
@@ -292,9 +293,13 @@
::std::unique_ptr<EventLoop> exclusive_event_loop =
simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
- ->MakeEventLoop("too_fast_sender",
- {NodeEventLoopFactory::CheckSentTooFast::kYes,
- NodeEventLoopFactory::ExclusiveSenders::kYes});
+ ->MakeEventLoop(
+ "too_fast_sender",
+ {NodeEventLoopFactory::CheckSentTooFast::kYes,
+ NodeEventLoopFactory::ExclusiveSenders::kYes,
+ {{configuration::GetChannel(factory.configuration(), "/test1",
+ "aos.TestMessage", "", nullptr),
+ NodeEventLoopFactory::ExclusiveSenders::kNo}}});
exclusive_event_loop->SkipAosLog();
exclusive_event_loop->SkipTimingReport();
::std::unique_ptr<EventLoop> normal_event_loop =
@@ -313,6 +318,12 @@
normal_event_loop->MakeSender<TestMessage>("/test");
EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
"TestMessage");
+
+ // And check an explicitly exempted channel:
+ aos::Sender<TestMessage> non_exclusive_sender =
+ exclusive_event_loop->MakeSender<TestMessage>("/test1");
+ aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
+ normal_event_loop->MakeSender<TestMessage>("/test1");
}
void TestSentTooFastCheckEdgeCase(
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 19ca658..108176e 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -32,6 +32,10 @@
bool forwarding_disabled() const { return forwarding_disabled_; }
void set_forwarding_disabled(bool forwarding_disabled) {
forwarding_disabled_ = forwarding_disabled;
+ if (!forwarding_disabled_) {
+ CHECK(timestamp_logger_ == nullptr);
+ CHECK(sender_ == nullptr);
+ }
}
void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
@@ -50,7 +54,8 @@
server_connection_ =
server_status_->FindServerConnection(send_node_factory_->node());
}
- if (delivery_time_is_logged_ && timestamp_loggers != nullptr) {
+ if (delivery_time_is_logged_ && timestamp_loggers != nullptr &&
+ !forwarding_disabled_) {
timestamp_logger_ =
timestamp_loggers->SenderForChannel(channel_, connection_);
} else {
@@ -79,7 +84,7 @@
MessageBridgeClientStatus *client_status) {
sent_ = false;
send_event_loop_ = send_event_loop;
- if (send_event_loop_) {
+ if (send_event_loop_ && !forwarding_disabled_) {
sender_ = send_event_loop_->MakeRawSender(channel_);
} else {
sender_ = nullptr;
@@ -561,16 +566,17 @@
destination_state->second.SetClientState(source, state);
}
-void SimulatedMessageBridge::DisableStatistics() {
+void SimulatedMessageBridge::DisableStatistics(DestroySenders destroy_senders) {
for (std::pair<const Node *const, State> &state : event_loop_map_) {
- state.second.DisableStatistics();
+ state.second.DisableStatistics(destroy_senders);
}
}
-void SimulatedMessageBridge::DisableStatistics(const Node *node) {
+void SimulatedMessageBridge::DisableStatistics(const Node *node,
+ DestroySenders destroy_senders) {
auto it = event_loop_map_.find(node);
CHECK(it != event_loop_map_.end());
- it->second.DisableStatistics();
+ it->second.DisableStatistics(destroy_senders);
}
void SimulatedMessageBridge::EnableStatistics() {
@@ -625,7 +631,7 @@
timestamp_loggers = ChannelTimestampSender(event_loop.get());
server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
if (disable_statistics_) {
- server_status->DisableStatistics();
+ server_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
}
{
@@ -659,7 +665,7 @@
}
client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
if (disable_statistics_) {
- client_status->DisableStatistics();
+ client_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
}
for (size_t i = 0;
@@ -701,8 +707,16 @@
configuration::ConnectionDeliveryTimeIsLoggedOnNode(
connection, event_loop->node());
+ const RawMessageDelayer *delayer = nullptr;
+ for (const RawMessageDelayer *candidate : source_delayers_) {
+ if (candidate->channel() == channel) {
+ delayer = candidate;
+ }
+ }
+
// And the timestamps are then logged back by us again.
- if (!delivery_time_is_logged) {
+ if (!delivery_time_is_logged ||
+ CHECK_NOTNULL(delayer)->forwarding_disabled()) {
continue;
}
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index f5b3eae..9565029 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -35,8 +35,10 @@
// Disables generating and sending the messages which message_gateway sends.
// The messages are the ClientStatistics, ServerStatistics and Timestamp
// messages.
- void DisableStatistics();
- void DisableStatistics(const Node *node);
+ enum class DestroySenders { kNo, kYes };
+ void DisableStatistics(DestroySenders destroy_senders = DestroySenders::kNo);
+ void DisableStatistics(const Node *node,
+ DestroySenders destroy_senders = DestroySenders::kNo);
void EnableStatistics();
void EnableStatistics(const Node *node);
@@ -55,13 +57,16 @@
}
State(const State &state) = delete;
- void DisableStatistics() {
+ void DisableStatistics(DestroySenders destroy_senders) {
disable_statistics_ = true;
+ destroy_senders_ = destroy_senders;
if (server_status) {
- server_status->DisableStatistics();
+ server_status->DisableStatistics(destroy_senders ==
+ DestroySenders::kYes);
}
if (client_status) {
- client_status->DisableStatistics();
+ client_status->DisableStatistics(destroy_senders ==
+ DestroySenders::kYes);
}
}
@@ -88,7 +93,8 @@
// logfiles.
SetEventLoop(node_factory_->MakeEventLoop(
"message_bridge", {NodeEventLoopFactory::CheckSentTooFast::kNo,
- NodeEventLoopFactory::ExclusiveSenders::kNo}));
+ NodeEventLoopFactory::ExclusiveSenders::kNo,
+ {}}));
}
void SetEventLoop(std::unique_ptr<aos::EventLoop> loop);
@@ -218,7 +224,9 @@
std::vector<RawMessageDelayer *> destination_delayers_;
bool disable_statistics_ = false;
+ DestroySenders destroy_senders_ = DestroySenders::kNo;
};
+
// Map of nodes to event loops. This is a member variable so that the
// lifetime of the event loops matches the lifetime of the bridge.
std::map<const Node *, State> event_loop_map_;
diff --git a/aos/network/message_bridge_client_status.cc b/aos/network/message_bridge_client_status.cc
index aa2484d..f3511a0 100644
--- a/aos/network/message_bridge_client_status.cc
+++ b/aos/network/message_bridge_client_status.cc
@@ -219,12 +219,16 @@
filter->Sample(monotonic_delivered_time, offset);
}
-void MessageBridgeClientStatus::DisableStatistics() {
+void MessageBridgeClientStatus::DisableStatistics(bool destroy_sender) {
statistics_timer_->Disable();
send_ = false;
+ if (destroy_sender) {
+ sender_ = aos::Sender<ClientStatistics>();
+ }
}
void MessageBridgeClientStatus::EnableStatistics() {
+ CHECK(sender_.valid());
send_ = true;
statistics_timer_->Setup(event_loop_->monotonic_now() + kStatisticsPeriod,
kStatisticsPeriod);
diff --git a/aos/network/message_bridge_client_status.h b/aos/network/message_bridge_client_status.h
index 9c21169..033af95 100644
--- a/aos/network/message_bridge_client_status.h
+++ b/aos/network/message_bridge_client_status.h
@@ -54,7 +54,10 @@
void Connect(int client_index);
// Disables sending out any statistics messages.
- void DisableStatistics();
+ // If destroy_sender is set, will clear the ClientStatistics Sender.
+ // EnableStatistics cannot be called again if destroy_sender is set. This is
+ // used by the LogReader to enforce one-sender-per-channel checks.
+ void DisableStatistics(bool destroy_sender);
// Enables sending out any statistics messages.
void EnableStatistics();
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index c82158b..4f6abff 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -411,13 +411,19 @@
}
}
-void MessageBridgeServerStatus::DisableStatistics() {
+void MessageBridgeServerStatus::DisableStatistics(bool destroy_senders) {
send_ = false;
statistics_timer_->Disable();
+ if (destroy_senders) {
+ sender_ = aos::Sender<ServerStatistics>();
+ timestamp_sender_ = aos::Sender<Timestamp>();
+ }
}
void MessageBridgeServerStatus::EnableStatistics() {
send_ = true;
+ CHECK(sender_.valid());
+ CHECK(timestamp_sender_.valid());
statistics_timer_->Setup(event_loop_->monotonic_now() + kPingPeriod,
kPingPeriod);
}
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index feb1b05..327930c 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -73,7 +73,7 @@
}
// Disables sending out any statistics messages.
- void DisableStatistics();
+ void DisableStatistics(bool destroy_senders);
// Enables sending out any statistics messages.
void EnableStatistics();
diff --git a/aos/network/timestamp_channel.h b/aos/network/timestamp_channel.h
index e35a170..d95edda 100644
--- a/aos/network/timestamp_channel.h
+++ b/aos/network/timestamp_channel.h
@@ -59,6 +59,8 @@
aos::Sender<RemoteMessage> *SenderForChannel(const Channel *channel,
const Connection *connection);
+ void ClearSenderForChannel(const Channel *channel,
+ const Connection *connection);
private:
aos::EventLoop *event_loop_;