Switch magic timestamp channel to RemoteMessage
In order to track reboots and such, we really need to communicate more
information from the message_bridge to the logger. It is a shame to
have to modify the MessageHeader to do this, even though it would be
nice. Switch the remote timestamp channels over to a new RemoteMessage
channel instead, and add code to rename the channel when replaying.
There are no known log files with a MessageHeader actually logged, so
most of this should be pretty safe. I've tested this on an old log file
by hand.
Change-Id: If81b31869b95040151d833d20ec3eb8623ab1cd4
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 0aa37e7..a8bf77c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -15,6 +15,8 @@
#include "aos/events/logging/logger_generated.h"
#include "aos/events/logging/uuid.h"
#include "aos/flatbuffer_merge.h"
+#include "aos/network/remote_message_generated.h"
+#include "aos/network/remote_message_schema.h"
#include "aos/network/team_number.h"
#include "aos/time/time.h"
#include "aos/util/file.h"
@@ -47,7 +49,69 @@
CHECK(result);
return result.value();
}
+
+// Copies the channel, removing the schema as we go. If new_name is provided,
+// it is used instead of the name inside the channel. If new_type is provided,
+// it is used instead of the type in the channel.
+flatbuffers::Offset<Channel> CopyChannel(const Channel *c,
+ std::string_view new_name,
+ std::string_view new_type,
+ flatbuffers::FlatBufferBuilder *fbb) {
+ flatbuffers::Offset<flatbuffers::String> name_offset =
+ fbb->CreateSharedString(new_name.empty() ? c->name()->string_view()
+ : new_name);
+ flatbuffers::Offset<flatbuffers::String> type_offset =
+ fbb->CreateSharedString(new_type.empty() ? c->type()->str() : new_type);
+ flatbuffers::Offset<flatbuffers::String> source_node_offset =
+ c->has_source_node() ? fbb->CreateSharedString(c->source_node()->str())
+ : 0;
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Connection>>>
+ destination_nodes_offset =
+ aos::RecursiveCopyVectorTable(c->destination_nodes(), fbb);
+
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+ logger_nodes_offset = aos::CopyVectorSharedString(c->logger_nodes(), fbb);
+
+ Channel::Builder channel_builder(*fbb);
+ channel_builder.add_name(name_offset);
+ channel_builder.add_type(type_offset);
+ if (c->has_frequency()) {
+ channel_builder.add_frequency(c->frequency());
+ }
+ if (c->has_max_size()) {
+ channel_builder.add_max_size(c->max_size());
+ }
+ if (c->has_num_senders()) {
+ channel_builder.add_num_senders(c->num_senders());
+ }
+ if (c->has_num_watchers()) {
+ channel_builder.add_num_watchers(c->num_watchers());
+ }
+ if (!source_node_offset.IsNull()) {
+ channel_builder.add_source_node(source_node_offset);
+ }
+ if (!destination_nodes_offset.IsNull()) {
+ channel_builder.add_destination_nodes(destination_nodes_offset);
+ }
+ if (c->has_logger()) {
+ channel_builder.add_logger(c->logger());
+ }
+ if (!logger_nodes_offset.IsNull()) {
+ channel_builder.add_logger_nodes(logger_nodes_offset);
+ }
+ if (c->has_read_method()) {
+ channel_builder.add_read_method(c->read_method());
+ }
+ if (c->has_num_readers()) {
+ channel_builder.add_num_readers(c->num_readers());
+ }
+ return channel_builder.Finish();
+}
+
namespace chrono = std::chrono;
+using message_bridge::RemoteMessage;
} // namespace
Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
@@ -83,7 +147,7 @@
const Channel *channel = configuration::GetChannel(
event_loop->configuration(),
absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
- logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
+ RemoteMessage::GetFullyQualifiedName(), event_loop_->name(),
event_loop_->node());
CHECK(channel != nullptr)
@@ -137,7 +201,7 @@
channel, event_loop_->node(), event_loop_->node());
}
- // Now, detect a MessageHeader timestamp logger where we should just log the
+ // Now, detect a RemoteMessage timestamp logger where we should just log the
// contents to a file directly.
const bool log_contents = timestamp_logger_channels.find(channel) !=
timestamp_logger_channels.end();
@@ -607,8 +671,8 @@
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
- const MessageHeader *msg =
- flatbuffers::GetRoot<MessageHeader>(f.fetcher->context().data);
+ const RemoteMessage *msg =
+ flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
logger::MessageHeader::Builder message_header_builder(fbb);
@@ -735,11 +799,22 @@
for (const Node *remote_node : timestamp_logger_nodes) {
const std::string channel = absl::StrCat(
"/aos/remote_timestamps/", remote_node->name()->string_view());
- CHECK(HasChannel<logger::MessageHeader>(channel, node))
- << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
- << logger::MessageHeader::GetFullyQualifiedName() << "\"} for node "
- << node->name()->string_view();
- RemapLoggedChannel<logger::MessageHeader>(channel, node);
+ // See if the log file is an old log with MessageHeader channels in it, or
+ // a newer log with RemoteMessage. If we find an older log, rename the
+ // type too along with the name.
+ if (HasChannel<MessageHeader>(channel, node)) {
+ CHECK(!HasChannel<RemoteMessage>(channel, node))
+ << ": Can't have both a MessageHeader and RemoteMessage remote "
+ "timestamp channel.";
+ RemapLoggedChannel<MessageHeader>(channel, node, "/original",
+ "aos.message_bridge.RemoteMessage");
+ } else {
+ CHECK(HasChannel<RemoteMessage>(channel, node))
+ << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
+ << RemoteMessage::GetFullyQualifiedName() << "\"} for node "
+ << node->name()->string_view();
+ RemapLoggedChannel<RemoteMessage>(channel, node);
+ }
}
}
@@ -1239,7 +1314,7 @@
logged_configuration()->channels()->Get(logged_channel_index));
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
- aos::Sender<MessageHeader> *remote_timestamp_sender = nullptr;
+ aos::Sender<RemoteMessage> *remote_timestamp_sender = nullptr;
State *source_state = nullptr;
@@ -1542,7 +1617,8 @@
}
void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
- std::string_view add_prefix) {
+ std::string_view add_prefix,
+ std::string_view new_type) {
for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
const Channel *const channel = logged_configuration()->channels()->Get(ii);
if (channel->name()->str() == name &&
@@ -1550,10 +1626,14 @@
CHECK_EQ(0u, remapped_channels_.count(ii))
<< "Already remapped channel "
<< configuration::CleanedChannelToString(channel);
- remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
+ RemappedChannel remapped_channel;
+ remapped_channel.remapped_name =
+ std::string(add_prefix) + std::string(name);
+ remapped_channel.new_type = new_type;
+ remapped_channels_[ii] = std::move(remapped_channel);
VLOG(1) << "Remapping channel "
<< configuration::CleanedChannelToString(channel)
- << " to have name " << remapped_channels_[ii];
+ << " to have name " << remapped_channels_[ii].remapped_name;
MakeRemappedConfig();
return;
}
@@ -1564,7 +1644,8 @@
void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
const Node *node,
- std::string_view add_prefix) {
+ std::string_view add_prefix,
+ std::string_view new_type) {
VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
const Channel *remapped_channel =
configuration::GetChannel(logged_configuration(), name, type, "", node);
@@ -1601,8 +1682,13 @@
CHECK_EQ(0u, remapped_channels_.count(channel_index))
<< "Already remapped channel "
<< configuration::CleanedChannelToString(remapped_channel);
- remapped_channels_[channel_index] =
- absl::StrCat(add_prefix, remapped_channel->name()->string_view());
+
+ RemappedChannel remapped_channel_struct;
+ remapped_channel_struct.remapped_name =
+ std::string(add_prefix) +
+ std::string(remapped_channel->name()->string_view());
+ remapped_channel_struct.new_type = new_type;
+ remapped_channels_[channel_index] = std::move(remapped_channel_struct);
MakeRemappedConfig();
}
@@ -1625,73 +1711,96 @@
const Configuration *const base_config = replay_configuration_ == nullptr
? logged_configuration()
: replay_configuration_;
- // The remapped config will be identical to the base_config, except that it
- // will have a bunch of extra channels in the channel list, which are exact
- // copies of the remapped channels, but with different names.
- // Because the flatbuffers API is a pain to work with, this requires a bit of
- // a song-and-dance to get copied over.
- // The order of operations is to:
- // 1) Make a flatbuffer builder for a config that will just contain a list of
- // the new channels that we want to add.
- // 2) For each channel that we are remapping:
- // a) Make a buffer/builder and construct into it a Channel table that only
- // contains the new name for the channel.
- // b) Merge the new channel with just the name into the channel that we are
- // trying to copy, built in the flatbuffer builder made in 1. This gives
- // us the new channel definition that we need.
- // 3) Using this list of offsets, build the Configuration of just new
- // Channels.
- // 4) Merge the Configuration with the new Channels into the base_config.
- // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
- // chance to sanitize the config.
+
+ // Create a config with all the channels, but un-sorted/merged. Collect up
+ // the schemas while we do this. Call MergeConfiguration to sort everything,
+ // and then merge it all in together.
// This is the builder that we use for the config containing all the new
// channels.
- flatbuffers::FlatBufferBuilder new_config_fbb;
- new_config_fbb.ForceDefaults(true);
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+
+ CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 13u)
+ << ": Merging logic needs to be updated when the number of channel "
+ "fields changes.";
+
+ // List of schemas.
+ std::map<std::string_view, FlatbufferVector<reflection::Schema>> schema_map;
+ // Make sure our new RemoteMessage schema is in there for old logs without it.
+ schema_map.insert(std::make_pair(
+ RemoteMessage::GetFullyQualifiedName(),
+ FlatbufferVector<reflection::Schema>(FlatbufferSpan<reflection::Schema>(
+ message_bridge::RemoteMessageSchema()))));
+
+ // Reconstruct the remapped channels.
for (auto &pair : remapped_channels_) {
- // This is the builder that we use for creating the Channel with just the
- // new name.
- flatbuffers::FlatBufferBuilder new_name_fbb;
- new_name_fbb.ForceDefaults(true);
- const flatbuffers::Offset<flatbuffers::String> name_offset =
- new_name_fbb.CreateString(pair.second);
- ChannelBuilder new_name_builder(new_name_fbb);
- new_name_builder.add_name(name_offset);
- new_name_fbb.Finish(new_name_builder.Finish());
- const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
- // Retrieve the channel that we want to copy, confirming that it is
- // actually present in base_config.
- const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
+ const Channel *const c = CHECK_NOTNULL(configuration::GetChannel(
base_config, logged_configuration()->channels()->Get(pair.first), "",
nullptr));
- // Actually create the new channel and put it into the vector of Offsets
- // that we will use to create the new Configuration.
- channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
- reinterpret_cast<const flatbuffers::Table *>(base_channel),
- reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
- &new_config_fbb));
+ channel_offsets.emplace_back(
+ CopyChannel(c, pair.second.remapped_name, "", &fbb));
}
- // Create the Configuration containing the new channels that we want to add.
- const auto new_channel_vector_offsets =
- new_config_fbb.CreateVector(channel_offsets);
- // Now create the new maps.
+ // Now reconstruct the original channels, translating types as needed
+ for (const Channel *c : *base_config->channels()) {
+ // Search for a mapping channel.
+ std::string_view new_type = "";
+ for (auto &pair : remapped_channels_) {
+ const Channel *const remapped_channel =
+ logged_configuration()->channels()->Get(pair.first);
+ if (remapped_channel->name()->string_view() == c->name()->string_view() &&
+ remapped_channel->type()->string_view() == c->type()->string_view()) {
+ new_type = pair.second.new_type;
+ break;
+ }
+ }
+
+ // Copy everything over.
+ channel_offsets.emplace_back(CopyChannel(c, "", new_type, &fbb));
+
+ // Add the schema if it doesn't exist.
+ if (schema_map.find(c->type()->string_view()) == schema_map.end()) {
+ CHECK(c->has_schema());
+ schema_map.insert(std::make_pair(c->type()->string_view(),
+ RecursiveCopyFlatBuffer(c->schema())));
+ }
+ }
+
+ // The MergeConfiguration API takes a vector, not a map. Convert.
+ std::vector<FlatbufferVector<reflection::Schema>> schemas;
+ while (!schema_map.empty()) {
+ schemas.emplace_back(std::move(schema_map.begin()->second));
+ schema_map.erase(schema_map.begin());
+ }
+
+ // Create the Configuration containing the new channels that we want to add.
+ const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
+ channels_offset =
+ channel_offsets.empty() ? 0 : fbb.CreateVector(channel_offsets);
+
+ // Copy over the old maps.
std::vector<flatbuffers::Offset<Map>> map_offsets;
+ if (base_config->maps()) {
+ for (const Map *map : *base_config->maps()) {
+ map_offsets.emplace_back(aos::RecursiveCopyFlatBuffer(map, &fbb));
+ }
+ }
+
+ // Now create the new maps. These are second so they take effect first.
for (const MapT &map : maps_) {
const flatbuffers::Offset<flatbuffers::String> match_name_offset =
- new_config_fbb.CreateString(map.match->name);
+ fbb.CreateString(map.match->name);
const flatbuffers::Offset<flatbuffers::String> match_type_offset =
- new_config_fbb.CreateString(map.match->type);
+ fbb.CreateString(map.match->type);
const flatbuffers::Offset<flatbuffers::String> rename_name_offset =
- new_config_fbb.CreateString(map.rename->name);
+ fbb.CreateString(map.rename->name);
flatbuffers::Offset<flatbuffers::String> match_source_node_offset;
if (!map.match->source_node.empty()) {
- match_source_node_offset =
- new_config_fbb.CreateString(map.match->source_node);
+ match_source_node_offset = fbb.CreateString(map.match->source_node);
}
- Channel::Builder match_builder(new_config_fbb);
+ Channel::Builder match_builder(fbb);
match_builder.add_name(match_name_offset);
match_builder.add_type(match_type_offset);
if (!map.match->source_node.empty()) {
@@ -1699,36 +1808,66 @@
}
const flatbuffers::Offset<Channel> match_offset = match_builder.Finish();
- Channel::Builder rename_builder(new_config_fbb);
+ Channel::Builder rename_builder(fbb);
rename_builder.add_name(rename_name_offset);
const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish();
- Map::Builder map_builder(new_config_fbb);
+ Map::Builder map_builder(fbb);
map_builder.add_match(match_offset);
map_builder.add_rename(rename_offset);
map_offsets.emplace_back(map_builder.Finish());
}
- const auto new_maps_offsets = new_config_fbb.CreateVector(map_offsets);
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Map>>>
+ maps_offsets = map_offsets.empty() ? 0 : fbb.CreateVector(map_offsets);
- ConfigurationBuilder new_config_builder(new_config_fbb);
- new_config_builder.add_channels(new_channel_vector_offsets);
- new_config_builder.add_maps(new_maps_offsets);
- new_config_fbb.Finish(new_config_builder.Finish());
- const FlatbufferDetachedBuffer<Configuration> new_name_config =
- new_config_fbb.Release();
- // Merge the new channels configuration into the base_config, giving us the
- // remapped configuration.
+ // And copy everything else over.
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>>
+ nodes_offset = aos::RecursiveCopyVectorTable(base_config->nodes(), &fbb);
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>>
+ applications_offset =
+ aos::RecursiveCopyVectorTable(base_config->applications(), &fbb);
+
+ // Now insert everything else in unmodified.
+ ConfigurationBuilder configuration_builder(fbb);
+ if (!channels_offset.IsNull()) {
+ configuration_builder.add_channels(channels_offset);
+ }
+ if (!maps_offsets.IsNull()) {
+ configuration_builder.add_maps(maps_offsets);
+ }
+ if (!nodes_offset.IsNull()) {
+ configuration_builder.add_nodes(nodes_offset);
+ }
+ if (!applications_offset.IsNull()) {
+ configuration_builder.add_applications(applications_offset);
+ }
+
+ if (base_config->has_channel_storage_duration()) {
+ configuration_builder.add_channel_storage_duration(
+ base_config->channel_storage_duration());
+ }
+
+ CHECK_EQ(Configuration::MiniReflectTypeTable()->num_elems, 6u)
+ << ": Merging logic needs to be updated when the number of configuration "
+ "fields changes.";
+
+ fbb.Finish(configuration_builder.Finish());
+
+ // Clean it up and return it! By using MergeConfiguration here, we'll
+ // actually get a deduplicated config for free too.
+ FlatbufferDetachedBuffer<Configuration> new_merged_config =
+ configuration::MergeConfiguration(
+ FlatbufferDetachedBuffer<Configuration>(fbb.Release()));
+
remapped_configuration_buffer_ =
std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
- MergeFlatBuffers<Configuration>(base_config,
- &new_name_config.message()));
- // Call MergeConfiguration to deal with sanitizing the config.
- remapped_configuration_buffer_ =
- std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
- configuration::MergeConfiguration(*remapped_configuration_buffer_));
+ configuration::MergeConfiguration(new_merged_config, schemas));
remapped_configuration_ = &remapped_configuration_buffer_->message();
+
+ // TODO(austin): Lazily re-build to save CPU?
}
const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
@@ -1741,7 +1880,7 @@
if (remapped_channels_.count(channel_index) > 0) {
VLOG(3) << "Got remapped channel on "
<< configuration::CleanedChannelToString(channel);
- channel_name = remapped_channels_[channel_index];
+ channel_name = remapped_channels_[channel_index].remapped_name;
}
VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
@@ -1786,7 +1925,7 @@
size_t logged_channel_index, size_t factory_channel_index,
std::unique_ptr<RawSender> sender,
message_bridge::NoncausalOffsetEstimator *filter,
- aos::Sender<MessageHeader> *remote_timestamp_sender, State *source_state) {
+ aos::Sender<RemoteMessage> *remote_timestamp_sender, State *source_state) {
channels_[logged_channel_index] = std::move(sender);
filters_[logged_channel_index] = filter;
remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
@@ -1874,12 +2013,12 @@
timestamp);
} else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
nullptr) {
- aos::Sender<MessageHeader>::Builder builder =
+ aos::Sender<RemoteMessage>::Builder builder =
remote_timestamp_senders_[timestamped_message.channel_index]
->MakeBuilder();
- logger::MessageHeader::Builder message_header_builder =
- builder.MakeBuilder<logger::MessageHeader>();
+ RemoteMessage::Builder message_header_builder =
+ builder.MakeBuilder<RemoteMessage>();
message_header_builder.add_channel_index(
factory_channel_index_[timestamped_message.channel_index]);
@@ -1905,7 +2044,7 @@
return true;
}
-aos::Sender<MessageHeader> *LogReader::State::RemoteTimestampSender(
+aos::Sender<RemoteMessage> *LogReader::State::RemoteTimestampSender(
const Node *delivered_node) {
auto sender = remote_timestamp_senders_map_.find(delivered_node);
@@ -1913,7 +2052,7 @@
sender = remote_timestamp_senders_map_
.emplace(std::make_pair(
delivered_node,
- event_loop()->MakeSender<MessageHeader>(
+ event_loop()->MakeSender<RemoteMessage>(
absl::StrCat("/aos/remote_timestamps/",
delivered_node->name()->string_view()))))
.first;
@@ -1984,7 +2123,8 @@
timestamp_mapper_->PopFront();
// Skip any messages without forwarding information.
- if (timestamped_message.monotonic_remote_time != monotonic_clock::min_time) {
+ if (timestamped_message.monotonic_remote_time !=
+ monotonic_clock::min_time) {
// Got a forwarding timestamp!
filter = filters_[timestamped_message.channel_index];