Move log reader and writer over to split timestamp channels
Reuse TimestampChannel (and split it up further) to implement naming and
finding channels for log reader and writer. Test both the old and new
timestamp configuration.
Once the config changes go in for y2020, this should fix reading the log
file James broke.
Change-Id: I6b09eec69c064ded3b3c149e0fdf23162bd352cf
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index c111a73..6e92568 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -19,6 +19,7 @@
#include "aos/network/remote_message_generated.h"
#include "aos/network/remote_message_schema.h"
#include "aos/network/team_number.h"
+#include "aos/network/timestamp_channel.h"
#include "aos/time/time.h"
#include "aos/util/file.h"
#include "flatbuffers/flatbuffers.h"
@@ -143,11 +144,48 @@
// Remap all existing remote timestamp channels. They will be recreated, and
// the data logged isn't relevant anymore.
for (const Node *node : configuration::GetNodes(logged_configuration())) {
+ message_bridge::ChannelTimestampFinder finder(logged_configuration(),
+ "log_reader", node);
+
+ absl::btree_set<std::string_view> remote_nodes;
+
+ for (const Channel *channel : *logged_configuration()->channels()) {
+ if (!configuration::ChannelIsSendableOnNode(channel, node)) {
+ continue;
+ }
+ if (!channel->has_destination_nodes()) {
+ continue;
+ }
+ for (const Connection *connection : *channel->destination_nodes()) {
+ if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+ node)) {
+ // Start by seeing if the split timestamp channels are being used for
+ // this message. If so, remap them.
+ const Channel *timestamp_channel = configuration::GetChannel(
+ logged_configuration(),
+ finder.SplitChannelName(channel, connection),
+ RemoteMessage::GetFullyQualifiedName(), "", node, true);
+
+ if (timestamp_channel != nullptr) {
+ if (timestamp_channel->logger() != LoggerConfig::NOT_LOGGED) {
+ RemapLoggedChannel<RemoteMessage>(
+ timestamp_channel->name()->string_view(), node);
+ }
+ continue;
+ }
+
+ // Otherwise collect this one up as a node to look for a combined
+ // channel from. It is more efficient to compare nodes than channels.
+ remote_nodes.insert(connection->name()->string_view());
+ }
+ }
+ }
+
std::vector<const Node *> timestamp_logger_nodes =
configuration::TimestampNodes(logged_configuration(), node);
- for (const Node *remote_node : timestamp_logger_nodes) {
- const std::string channel = absl::StrCat(
- "/aos/remote_timestamps/", remote_node->name()->string_view());
+ for (const std::string_view remote_node : remote_nodes) {
+ const std::string channel = finder.CombinedChannelName(remote_node);
+
// See if the log file is an old log with MessageHeader channels in it, or
// a newer log with RemoteMessage. If we find an older log, rename the
// type too along with the name.
@@ -467,9 +505,11 @@
// Delivery timestamps are supposed to be logged back on the source node.
// Configure remote timestamps to be sent.
+ const Connection *connection =
+ configuration::ConnectionToNode(channel, event_loop->node());
const bool delivery_time_is_logged =
- configuration::ConnectionDeliveryTimeIsLoggedOnNode(
- channel, event_loop->node(), source_node);
+ configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+ source_node);
source_state =
states_[configuration::GetNodeIndex(configuration(), source_node)]
@@ -477,7 +517,7 @@
if (delivery_time_is_logged) {
remote_timestamp_sender =
- source_state->RemoteTimestampSender(event_loop->node());
+ source_state->RemoteTimestampSender(channel, connection);
}
}
@@ -1233,22 +1273,44 @@
}
LogReader::RemoteMessageSender *LogReader::State::RemoteTimestampSender(
- const Node *delivered_node) {
- auto sender = remote_timestamp_senders_map_.find(delivered_node);
-
- if (sender == remote_timestamp_senders_map_.end()) {
- sender =
- remote_timestamp_senders_map_
- .emplace(delivered_node,
- std::make_unique<RemoteMessageSender>(
- event_loop()->MakeSender<RemoteMessage>(absl::StrCat(
- "/aos/remote_timestamps/",
- delivered_node->name()->string_view())),
- event_loop()))
- .first;
+ const Channel *channel, const Connection *connection) {
+ message_bridge::ChannelTimestampFinder finder(event_loop_);
+ // Look at any pre-created channel/connection pairs.
+ {
+ auto it =
+ channel_timestamp_loggers_.find(std::make_pair(channel, connection));
+ if (it != channel_timestamp_loggers_.end()) {
+ return it->second.get();
+ }
}
- return sender->second.get();
+ // That failed, so resolve the RemoteMessage channel timestamps will be logged
+ // to.
+ const Channel *timestamp_channel = finder.ForChannel(channel, connection);
+
+ {
+ // See if that has been created before. If so, cache it in
+ // channel_timestamp_loggers_ and return.
+ auto it = timestamp_loggers_.find(timestamp_channel);
+ if (it != timestamp_loggers_.end()) {
+ CHECK(channel_timestamp_loggers_
+ .try_emplace(std::make_pair(channel, connection), it->second)
+ .second);
+ return it->second.get();
+ }
+ }
+
+ // Otherwise, make a sender, save it, and cache it.
+ auto result = channel_timestamp_loggers_.try_emplace(
+ std::make_pair(channel, connection),
+ std::make_shared<RemoteMessageSender>(
+ event_loop()->MakeSender<RemoteMessage>(
+ timestamp_channel->name()->string_view()),
+ event_loop()));
+
+ CHECK(timestamp_loggers_.try_emplace(timestamp_channel, result.first->second)
+ .second);
+ return result.first->second.get();
}
TimestampedMessage LogReader::State::PopOldest() {
@@ -1303,7 +1365,8 @@
for (size_t i = 0; i < channels_.size(); ++i) {
channels_[i].reset();
}
- remote_timestamp_senders_map_.clear();
+ channel_timestamp_loggers_.clear();
+ timestamp_loggers_.clear();
event_loop_unique_ptr_.reset();
event_loop_ = nullptr;
timer_handler_ = nullptr;