Switch magic timestamp channel to RemoteMessage
In order to track reboots and such, we really need to communicate more
information from the message_bridge to the logger. It is a shame to
have to modify the MessageHeader to do this, even though it would be
nice. Switch the remote timestamp channels over to a new RemoteMessage
channel instead, and add code to rename the channel when replaying.
There are no known log files with a MessageHeader actually logged, so
most of this should be pretty safe. I've tested this on an old log file
by hand.
Change-Id: If81b31869b95040151d833d20ec3eb8623ab1cd4
diff --git a/aos/BUILD b/aos/BUILD
index beeb2b0..4ccfd0e 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -596,3 +596,9 @@
"//aos/testing:googletest",
],
)
+
+py_binary(
+ name = "flatbuffers_static",
+ srcs = ["flatbuffers_static.py"],
+ visibility = ["//visibility:public"],
+)
diff --git a/aos/config_flattener.cc b/aos/config_flattener.cc
index a9500e3..8018484 100644
--- a/aos/config_flattener.cc
+++ b/aos/config_flattener.cc
@@ -35,10 +35,10 @@
}
}
- std::vector<aos::FlatbufferString<reflection::Schema>> schemas;
+ std::vector<aos::FlatbufferVector<reflection::Schema>> schemas;
for (int i = 6; i < argc; ++i) {
- schemas.emplace_back(util::ReadFileToStringOrDie(argv[i]));
+ schemas.emplace_back(FileToFlatbuffer<reflection::Schema>(argv[i]));
}
aos::FlatbufferDetachedBuffer<Configuration> merged_config =
diff --git a/aos/configuration.cc b/aos/configuration.cc
index d32cae8..ad2e295 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -591,7 +591,8 @@
const Channel *GetChannel(const Configuration *config, std::string_view name,
std::string_view type,
- std::string_view application_name, const Node *node) {
+ std::string_view application_name, const Node *node,
+ bool quiet) {
const std::string_view original_name = name;
std::string mutable_name;
if (node != nullptr) {
@@ -647,7 +648,7 @@
} else {
VLOG(1) << "No match for { \"name\": \"" << name << "\", \"type\": \""
<< type << "\" }";
- if (original_name != name) {
+ if (original_name != name && !quiet) {
LOG(WARNING) << "Remapped from {\"name\": \"" << original_name
<< "\", \"type\": \"" << type << "\"}, to {\"name\": \""
<< name << "\", \"type\": \"" << type
@@ -683,7 +684,7 @@
FlatbufferDetachedBuffer<Configuration> MergeConfiguration(
const Flatbuffer<Configuration> &config,
- const std::vector<aos::FlatbufferString<reflection::Schema>> &schemas) {
+ const std::vector<aos::FlatbufferVector<reflection::Schema>> &schemas) {
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
@@ -701,8 +702,8 @@
std::vector<flatbuffers::Offset<Channel>> channel_offsets;
for (const Channel *c : *config.message().channels()) {
// Search for a schema with a matching type.
- const aos::FlatbufferString<reflection::Schema> *found_schema = nullptr;
- for (const aos::FlatbufferString<reflection::Schema> &schema : schemas) {
+ const aos::FlatbufferVector<reflection::Schema> *found_schema = nullptr;
+ for (const aos::FlatbufferVector<reflection::Schema> &schema : schemas) {
if (schema.message().root_table() != nullptr) {
if (schema.message().root_table()->name()->string_view() ==
c->type()->string_view()) {
diff --git a/aos/configuration.h b/aos/configuration.h
index 7383a1a..d10d16d 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -31,7 +31,7 @@
// schema list.
FlatbufferDetachedBuffer<Configuration> MergeConfiguration(
const Flatbuffer<Configuration> &config,
- const std::vector<aos::FlatbufferString<reflection::Schema>> &schemas);
+ const std::vector<aos::FlatbufferVector<reflection::Schema>> &schemas);
// Merges a configuration json snippet into the provided configuration and
// returns the modified config.
@@ -49,7 +49,7 @@
const std::string_view name,
const std::string_view type,
const std::string_view application_name,
- const Node *node);
+ const Node *node, bool quiet = false);
inline const Channel *GetChannel(const Flatbuffer<Configuration> &config,
const std::string_view name,
const std::string_view type,
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 95e6cdc..239b324 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -157,7 +157,7 @@
":ping_fbs",
":pong_fbs",
"//aos/network:message_bridge_client_fbs",
- "//aos/events/logging:logger_fbs",
+ "//aos/network:remote_message_fbs",
"//aos/network:timestamp_fbs",
"//aos/network:message_bridge_server_fbs",
],
@@ -301,6 +301,7 @@
":ping_lib",
":pong_lib",
":simulated_event_loop",
+ "//aos/network:remote_message_fbs",
"//aos/testing:googletest",
],
)
@@ -342,6 +343,7 @@
"//aos/ipc_lib:index",
"//aos/network:message_bridge_client_status",
"//aos/network:message_bridge_server_status",
+ "//aos/network:remote_message_fbs",
"//aos/util:phased_loop",
"@com_google_absl//absl/container:btree",
],
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index a1f87cd..bf7381c 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -485,8 +485,8 @@
template <typename T>
bool HasChannel(const std::string_view channel_name) {
return configuration::GetChannel(configuration(), channel_name,
- T::GetFullyQualifiedName(), name(),
- node()) != nullptr;
+ T::GetFullyQualifiedName(), name(), node(),
+ true) != nullptr;
}
// Note, it is supported to create:
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index db718ba..e05f2b3 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -185,6 +185,8 @@
"//aos/events:event_loop",
"//aos/events:simulated_event_loop",
"//aos/network:message_bridge_server_fbs",
+ "//aos/network:remote_message_fbs",
+ "//aos/network:remote_message_schema",
"//aos/network:team_number",
"//aos/network:timestamp_filter",
"//aos/time",
@@ -273,11 +275,11 @@
name = "multinode_pingpong_config",
src = "multinode_pingpong.json",
flatbuffers = [
- ":logger_fbs",
"//aos/events:ping_fbs",
"//aos/events:pong_fbs",
"//aos/network:message_bridge_client_fbs",
"//aos/network:message_bridge_server_fbs",
+ "//aos/network:remote_message_fbs",
"//aos/network:timestamp_fbs",
],
target_compatible_with = ["@platforms//os:linux"],
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 0aa37e7..a8bf77c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -15,6 +15,8 @@
#include "aos/events/logging/logger_generated.h"
#include "aos/events/logging/uuid.h"
#include "aos/flatbuffer_merge.h"
+#include "aos/network/remote_message_generated.h"
+#include "aos/network/remote_message_schema.h"
#include "aos/network/team_number.h"
#include "aos/time/time.h"
#include "aos/util/file.h"
@@ -47,7 +49,69 @@
CHECK(result);
return result.value();
}
+
+// Copies the channel, removing the schema as we go. If new_name is provided,
+// it is used instead of the name inside the channel. If new_type is provided,
+// it is used instead of the type in the channel.
+flatbuffers::Offset<Channel> CopyChannel(const Channel *c,
+ std::string_view new_name,
+ std::string_view new_type,
+ flatbuffers::FlatBufferBuilder *fbb) {
+ flatbuffers::Offset<flatbuffers::String> name_offset =
+ fbb->CreateSharedString(new_name.empty() ? c->name()->string_view()
+ : new_name);
+ flatbuffers::Offset<flatbuffers::String> type_offset =
+ fbb->CreateSharedString(new_type.empty() ? c->type()->str() : new_type);
+ flatbuffers::Offset<flatbuffers::String> source_node_offset =
+ c->has_source_node() ? fbb->CreateSharedString(c->source_node()->str())
+ : 0;
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Connection>>>
+ destination_nodes_offset =
+ aos::RecursiveCopyVectorTable(c->destination_nodes(), fbb);
+
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+ logger_nodes_offset = aos::CopyVectorSharedString(c->logger_nodes(), fbb);
+
+ Channel::Builder channel_builder(*fbb);
+ channel_builder.add_name(name_offset);
+ channel_builder.add_type(type_offset);
+ if (c->has_frequency()) {
+ channel_builder.add_frequency(c->frequency());
+ }
+ if (c->has_max_size()) {
+ channel_builder.add_max_size(c->max_size());
+ }
+ if (c->has_num_senders()) {
+ channel_builder.add_num_senders(c->num_senders());
+ }
+ if (c->has_num_watchers()) {
+ channel_builder.add_num_watchers(c->num_watchers());
+ }
+ if (!source_node_offset.IsNull()) {
+ channel_builder.add_source_node(source_node_offset);
+ }
+ if (!destination_nodes_offset.IsNull()) {
+ channel_builder.add_destination_nodes(destination_nodes_offset);
+ }
+ if (c->has_logger()) {
+ channel_builder.add_logger(c->logger());
+ }
+ if (!logger_nodes_offset.IsNull()) {
+ channel_builder.add_logger_nodes(logger_nodes_offset);
+ }
+ if (c->has_read_method()) {
+ channel_builder.add_read_method(c->read_method());
+ }
+ if (c->has_num_readers()) {
+ channel_builder.add_num_readers(c->num_readers());
+ }
+ return channel_builder.Finish();
+}
+
namespace chrono = std::chrono;
+using message_bridge::RemoteMessage;
} // namespace
Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
@@ -83,7 +147,7 @@
const Channel *channel = configuration::GetChannel(
event_loop->configuration(),
absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
- logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
+ RemoteMessage::GetFullyQualifiedName(), event_loop_->name(),
event_loop_->node());
CHECK(channel != nullptr)
@@ -137,7 +201,7 @@
channel, event_loop_->node(), event_loop_->node());
}
- // Now, detect a MessageHeader timestamp logger where we should just log the
+ // Now, detect a RemoteMessage timestamp logger where we should just log the
// contents to a file directly.
const bool log_contents = timestamp_logger_channels.find(channel) !=
timestamp_logger_channels.end();
@@ -607,8 +671,8 @@
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
- const MessageHeader *msg =
- flatbuffers::GetRoot<MessageHeader>(f.fetcher->context().data);
+ const RemoteMessage *msg =
+ flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
logger::MessageHeader::Builder message_header_builder(fbb);
@@ -735,11 +799,22 @@
for (const Node *remote_node : timestamp_logger_nodes) {
const std::string channel = absl::StrCat(
"/aos/remote_timestamps/", remote_node->name()->string_view());
- CHECK(HasChannel<logger::MessageHeader>(channel, node))
- << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
- << logger::MessageHeader::GetFullyQualifiedName() << "\"} for node "
- << node->name()->string_view();
- RemapLoggedChannel<logger::MessageHeader>(channel, node);
+ // See if the log file is an old log with MessageHeader channels in it, or
+ // a newer log with RemoteMessage. If we find an older log, rename the
+ // type too along with the name.
+ if (HasChannel<MessageHeader>(channel, node)) {
+ CHECK(!HasChannel<RemoteMessage>(channel, node))
+ << ": Can't have both a MessageHeader and RemoteMessage remote "
+ "timestamp channel.";
+ RemapLoggedChannel<MessageHeader>(channel, node, "/original",
+ "aos.message_bridge.RemoteMessage");
+ } else {
+ CHECK(HasChannel<RemoteMessage>(channel, node))
+ << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
+ << RemoteMessage::GetFullyQualifiedName() << "\"} for node "
+ << node->name()->string_view();
+ RemapLoggedChannel<RemoteMessage>(channel, node);
+ }
}
}
@@ -1239,7 +1314,7 @@
logged_configuration()->channels()->Get(logged_channel_index));
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
- aos::Sender<MessageHeader> *remote_timestamp_sender = nullptr;
+ aos::Sender<RemoteMessage> *remote_timestamp_sender = nullptr;
State *source_state = nullptr;
@@ -1542,7 +1617,8 @@
}
void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
- std::string_view add_prefix) {
+ std::string_view add_prefix,
+ std::string_view new_type) {
for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
const Channel *const channel = logged_configuration()->channels()->Get(ii);
if (channel->name()->str() == name &&
@@ -1550,10 +1626,14 @@
CHECK_EQ(0u, remapped_channels_.count(ii))
<< "Already remapped channel "
<< configuration::CleanedChannelToString(channel);
- remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
+ RemappedChannel remapped_channel;
+ remapped_channel.remapped_name =
+ std::string(add_prefix) + std::string(name);
+ remapped_channel.new_type = new_type;
+ remapped_channels_[ii] = std::move(remapped_channel);
VLOG(1) << "Remapping channel "
<< configuration::CleanedChannelToString(channel)
- << " to have name " << remapped_channels_[ii];
+ << " to have name " << remapped_channels_[ii].remapped_name;
MakeRemappedConfig();
return;
}
@@ -1564,7 +1644,8 @@
void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
const Node *node,
- std::string_view add_prefix) {
+ std::string_view add_prefix,
+ std::string_view new_type) {
VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
const Channel *remapped_channel =
configuration::GetChannel(logged_configuration(), name, type, "", node);
@@ -1601,8 +1682,13 @@
CHECK_EQ(0u, remapped_channels_.count(channel_index))
<< "Already remapped channel "
<< configuration::CleanedChannelToString(remapped_channel);
- remapped_channels_[channel_index] =
- absl::StrCat(add_prefix, remapped_channel->name()->string_view());
+
+ RemappedChannel remapped_channel_struct;
+ remapped_channel_struct.remapped_name =
+ std::string(add_prefix) +
+ std::string(remapped_channel->name()->string_view());
+ remapped_channel_struct.new_type = new_type;
+ remapped_channels_[channel_index] = std::move(remapped_channel_struct);
MakeRemappedConfig();
}
@@ -1625,73 +1711,96 @@
const Configuration *const base_config = replay_configuration_ == nullptr
? logged_configuration()
: replay_configuration_;
- // The remapped config will be identical to the base_config, except that it
- // will have a bunch of extra channels in the channel list, which are exact
- // copies of the remapped channels, but with different names.
- // Because the flatbuffers API is a pain to work with, this requires a bit of
- // a song-and-dance to get copied over.
- // The order of operations is to:
- // 1) Make a flatbuffer builder for a config that will just contain a list of
- // the new channels that we want to add.
- // 2) For each channel that we are remapping:
- // a) Make a buffer/builder and construct into it a Channel table that only
- // contains the new name for the channel.
- // b) Merge the new channel with just the name into the channel that we are
- // trying to copy, built in the flatbuffer builder made in 1. This gives
- // us the new channel definition that we need.
- // 3) Using this list of offsets, build the Configuration of just new
- // Channels.
- // 4) Merge the Configuration with the new Channels into the base_config.
- // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
- // chance to sanitize the config.
+
+ // Create a config with all the channels, but un-sorted/merged. Collect up
+ // the schemas while we do this. Call MergeConfiguration to sort everything,
+ // and then merge it all in together.
// This is the builder that we use for the config containing all the new
// channels.
- flatbuffers::FlatBufferBuilder new_config_fbb;
- new_config_fbb.ForceDefaults(true);
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+
+ CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 13u)
+ << ": Merging logic needs to be updated when the number of channel "
+ "fields changes.";
+
+ // List of schemas.
+ std::map<std::string_view, FlatbufferVector<reflection::Schema>> schema_map;
+ // Make sure our new RemoteMessage schema is in there for old logs without it.
+ schema_map.insert(std::make_pair(
+ RemoteMessage::GetFullyQualifiedName(),
+ FlatbufferVector<reflection::Schema>(FlatbufferSpan<reflection::Schema>(
+ message_bridge::RemoteMessageSchema()))));
+
+ // Reconstruct the remapped channels.
for (auto &pair : remapped_channels_) {
- // This is the builder that we use for creating the Channel with just the
- // new name.
- flatbuffers::FlatBufferBuilder new_name_fbb;
- new_name_fbb.ForceDefaults(true);
- const flatbuffers::Offset<flatbuffers::String> name_offset =
- new_name_fbb.CreateString(pair.second);
- ChannelBuilder new_name_builder(new_name_fbb);
- new_name_builder.add_name(name_offset);
- new_name_fbb.Finish(new_name_builder.Finish());
- const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
- // Retrieve the channel that we want to copy, confirming that it is
- // actually present in base_config.
- const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
+ const Channel *const c = CHECK_NOTNULL(configuration::GetChannel(
base_config, logged_configuration()->channels()->Get(pair.first), "",
nullptr));
- // Actually create the new channel and put it into the vector of Offsets
- // that we will use to create the new Configuration.
- channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
- reinterpret_cast<const flatbuffers::Table *>(base_channel),
- reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
- &new_config_fbb));
+ channel_offsets.emplace_back(
+ CopyChannel(c, pair.second.remapped_name, "", &fbb));
}
- // Create the Configuration containing the new channels that we want to add.
- const auto new_channel_vector_offsets =
- new_config_fbb.CreateVector(channel_offsets);
- // Now create the new maps.
+ // Now reconstruct the original channels, translating types as needed
+ for (const Channel *c : *base_config->channels()) {
+ // Search for a mapping channel.
+ std::string_view new_type = "";
+ for (auto &pair : remapped_channels_) {
+ const Channel *const remapped_channel =
+ logged_configuration()->channels()->Get(pair.first);
+ if (remapped_channel->name()->string_view() == c->name()->string_view() &&
+ remapped_channel->type()->string_view() == c->type()->string_view()) {
+ new_type = pair.second.new_type;
+ break;
+ }
+ }
+
+ // Copy everything over.
+ channel_offsets.emplace_back(CopyChannel(c, "", new_type, &fbb));
+
+ // Add the schema if it doesn't exist.
+ if (schema_map.find(c->type()->string_view()) == schema_map.end()) {
+ CHECK(c->has_schema());
+ schema_map.insert(std::make_pair(c->type()->string_view(),
+ RecursiveCopyFlatBuffer(c->schema())));
+ }
+ }
+
+ // The MergeConfiguration API takes a vector, not a map. Convert.
+ std::vector<FlatbufferVector<reflection::Schema>> schemas;
+ while (!schema_map.empty()) {
+ schemas.emplace_back(std::move(schema_map.begin()->second));
+ schema_map.erase(schema_map.begin());
+ }
+
+ // Create the Configuration containing the new channels that we want to add.
+ const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
+ channels_offset =
+ channel_offsets.empty() ? 0 : fbb.CreateVector(channel_offsets);
+
+ // Copy over the old maps.
std::vector<flatbuffers::Offset<Map>> map_offsets;
+ if (base_config->maps()) {
+ for (const Map *map : *base_config->maps()) {
+ map_offsets.emplace_back(aos::RecursiveCopyFlatBuffer(map, &fbb));
+ }
+ }
+
+ // Now create the new maps. These are second so they take effect first.
for (const MapT &map : maps_) {
const flatbuffers::Offset<flatbuffers::String> match_name_offset =
- new_config_fbb.CreateString(map.match->name);
+ fbb.CreateString(map.match->name);
const flatbuffers::Offset<flatbuffers::String> match_type_offset =
- new_config_fbb.CreateString(map.match->type);
+ fbb.CreateString(map.match->type);
const flatbuffers::Offset<flatbuffers::String> rename_name_offset =
- new_config_fbb.CreateString(map.rename->name);
+ fbb.CreateString(map.rename->name);
flatbuffers::Offset<flatbuffers::String> match_source_node_offset;
if (!map.match->source_node.empty()) {
- match_source_node_offset =
- new_config_fbb.CreateString(map.match->source_node);
+ match_source_node_offset = fbb.CreateString(map.match->source_node);
}
- Channel::Builder match_builder(new_config_fbb);
+ Channel::Builder match_builder(fbb);
match_builder.add_name(match_name_offset);
match_builder.add_type(match_type_offset);
if (!map.match->source_node.empty()) {
@@ -1699,36 +1808,66 @@
}
const flatbuffers::Offset<Channel> match_offset = match_builder.Finish();
- Channel::Builder rename_builder(new_config_fbb);
+ Channel::Builder rename_builder(fbb);
rename_builder.add_name(rename_name_offset);
const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish();
- Map::Builder map_builder(new_config_fbb);
+ Map::Builder map_builder(fbb);
map_builder.add_match(match_offset);
map_builder.add_rename(rename_offset);
map_offsets.emplace_back(map_builder.Finish());
}
- const auto new_maps_offsets = new_config_fbb.CreateVector(map_offsets);
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Map>>>
+ maps_offsets = map_offsets.empty() ? 0 : fbb.CreateVector(map_offsets);
- ConfigurationBuilder new_config_builder(new_config_fbb);
- new_config_builder.add_channels(new_channel_vector_offsets);
- new_config_builder.add_maps(new_maps_offsets);
- new_config_fbb.Finish(new_config_builder.Finish());
- const FlatbufferDetachedBuffer<Configuration> new_name_config =
- new_config_fbb.Release();
- // Merge the new channels configuration into the base_config, giving us the
- // remapped configuration.
+ // And copy everything else over.
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>>
+ nodes_offset = aos::RecursiveCopyVectorTable(base_config->nodes(), &fbb);
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>>
+ applications_offset =
+ aos::RecursiveCopyVectorTable(base_config->applications(), &fbb);
+
+ // Now insert everything else in unmodified.
+ ConfigurationBuilder configuration_builder(fbb);
+ if (!channels_offset.IsNull()) {
+ configuration_builder.add_channels(channels_offset);
+ }
+ if (!maps_offsets.IsNull()) {
+ configuration_builder.add_maps(maps_offsets);
+ }
+ if (!nodes_offset.IsNull()) {
+ configuration_builder.add_nodes(nodes_offset);
+ }
+ if (!applications_offset.IsNull()) {
+ configuration_builder.add_applications(applications_offset);
+ }
+
+ if (base_config->has_channel_storage_duration()) {
+ configuration_builder.add_channel_storage_duration(
+ base_config->channel_storage_duration());
+ }
+
+ CHECK_EQ(Configuration::MiniReflectTypeTable()->num_elems, 6u)
+ << ": Merging logic needs to be updated when the number of configuration "
+ "fields changes.";
+
+ fbb.Finish(configuration_builder.Finish());
+
+ // Clean it up and return it! By using MergeConfiguration here, we'll
+ // actually get a deduplicated config for free too.
+ FlatbufferDetachedBuffer<Configuration> new_merged_config =
+ configuration::MergeConfiguration(
+ FlatbufferDetachedBuffer<Configuration>(fbb.Release()));
+
remapped_configuration_buffer_ =
std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
- MergeFlatBuffers<Configuration>(base_config,
- &new_name_config.message()));
- // Call MergeConfiguration to deal with sanitizing the config.
- remapped_configuration_buffer_ =
- std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
- configuration::MergeConfiguration(*remapped_configuration_buffer_));
+ configuration::MergeConfiguration(new_merged_config, schemas));
remapped_configuration_ = &remapped_configuration_buffer_->message();
+
+ // TODO(austin): Lazily re-build to save CPU?
}
const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
@@ -1741,7 +1880,7 @@
if (remapped_channels_.count(channel_index) > 0) {
VLOG(3) << "Got remapped channel on "
<< configuration::CleanedChannelToString(channel);
- channel_name = remapped_channels_[channel_index];
+ channel_name = remapped_channels_[channel_index].remapped_name;
}
VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
@@ -1786,7 +1925,7 @@
size_t logged_channel_index, size_t factory_channel_index,
std::unique_ptr<RawSender> sender,
message_bridge::NoncausalOffsetEstimator *filter,
- aos::Sender<MessageHeader> *remote_timestamp_sender, State *source_state) {
+ aos::Sender<RemoteMessage> *remote_timestamp_sender, State *source_state) {
channels_[logged_channel_index] = std::move(sender);
filters_[logged_channel_index] = filter;
remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
@@ -1874,12 +2013,12 @@
timestamp);
} else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
nullptr) {
- aos::Sender<MessageHeader>::Builder builder =
+ aos::Sender<RemoteMessage>::Builder builder =
remote_timestamp_senders_[timestamped_message.channel_index]
->MakeBuilder();
- logger::MessageHeader::Builder message_header_builder =
- builder.MakeBuilder<logger::MessageHeader>();
+ RemoteMessage::Builder message_header_builder =
+ builder.MakeBuilder<RemoteMessage>();
message_header_builder.add_channel_index(
factory_channel_index_[timestamped_message.channel_index]);
@@ -1905,7 +2044,7 @@
return true;
}
-aos::Sender<MessageHeader> *LogReader::State::RemoteTimestampSender(
+aos::Sender<RemoteMessage> *LogReader::State::RemoteTimestampSender(
const Node *delivered_node) {
auto sender = remote_timestamp_senders_map_.find(delivered_node);
@@ -1913,7 +2052,7 @@
sender = remote_timestamp_senders_map_
.emplace(std::make_pair(
delivered_node,
- event_loop()->MakeSender<MessageHeader>(
+ event_loop()->MakeSender<RemoteMessage>(
absl::StrCat("/aos/remote_timestamps/",
delivered_node->name()->string_view()))))
.first;
@@ -1984,7 +2123,8 @@
timestamp_mapper_->PopFront();
// Skip any messages without forwarding information.
- if (timestamped_message.monotonic_remote_time != monotonic_clock::min_time) {
+ if (timestamped_message.monotonic_remote_time !=
+ monotonic_clock::min_time) {
// Got a forwarding timestamp!
filter = filters_[timestamped_message.channel_index];
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index f0f0a69..a248f80 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -19,6 +19,7 @@
#include "aos/events/logging/uuid.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/remote_message_generated.h"
#include "aos/network/timestamp_filter.h"
#include "aos/time/time.h"
#include "flatbuffers/flatbuffers.h"
@@ -376,35 +377,41 @@
// interference. This operates on raw channel names, without any node or
// application specific mappings.
void RemapLoggedChannel(std::string_view name, std::string_view type,
- std::string_view add_prefix = "/original");
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "");
template <typename T>
void RemapLoggedChannel(std::string_view name,
- std::string_view add_prefix = "/original") {
- RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "") {
+ RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix, new_type);
}
// Remaps the provided channel, though this respects node mappings, and
// preserves them too. This makes it so if /aos -> /pi1/aos on one node,
// /original/aos -> /original/pi1/aos on the same node after renaming, just
- // like you would hope.
+ // like you would hope. If new_type is not empty, the new channel will use
+ // the provided type instead. This allows for renaming messages.
//
// TODO(austin): If you have 2 nodes remapping something to the same channel,
// this doesn't handle that. No use cases exist yet for that, so it isn't
// being done yet.
void RemapLoggedChannel(std::string_view name, std::string_view type,
const Node *node,
- std::string_view add_prefix = "/original");
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "");
template <typename T>
void RemapLoggedChannel(std::string_view name, const Node *node,
- std::string_view add_prefix = "/original") {
- RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix);
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "") {
+ RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix,
+ new_type);
}
template <typename T>
bool HasChannel(std::string_view name, const Node *node = nullptr) {
return configuration::GetChannel(log_file_header()->configuration(), name,
- T::GetFullyQualifiedName(), "",
- node) != nullptr;
+ T::GetFullyQualifiedName(), "", node,
+ true) != nullptr;
}
SimulatedEventLoopFactory *event_loop_factory() {
@@ -500,7 +507,7 @@
// Returns the MessageHeader sender to log delivery timestamps to for the
// provided remote node.
- aos::Sender<MessageHeader> *RemoteTimestampSender(
+ aos::Sender<message_bridge::RemoteMessage> *RemoteTimestampSender(
const Node *delivered_node);
// Converts a timestamp from the monotonic clock on this node to the
@@ -548,11 +555,12 @@
void SetChannelCount(size_t count);
// Sets the sender, filter, and target factory for a channel.
- void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
- std::unique_ptr<RawSender> sender,
- message_bridge::NoncausalOffsetEstimator *filter,
- aos::Sender<MessageHeader> *remote_timestamp_sender,
- State *source_state);
+ void SetChannel(
+ size_t logged_channel_index, size_t factory_channel_index,
+ std::unique_ptr<RawSender> sender,
+ message_bridge::NoncausalOffsetEstimator *filter,
+ aos::Sender<message_bridge::RemoteMessage> *remote_timestamp_sender,
+ State *source_state);
// Returns if we have read all the messages from all the logs.
bool at_end() const {
@@ -608,7 +616,8 @@
// Senders.
std::vector<std::unique_ptr<RawSender>> channels_;
- std::vector<aos::Sender<MessageHeader> *> remote_timestamp_senders_;
+ std::vector<aos::Sender<message_bridge::RemoteMessage> *>
+ remote_timestamp_senders_;
// The mapping from logged channel index to sent channel index. Needed for
// sending out MessageHeaders.
std::vector<int> factory_channel_index_;
@@ -649,7 +658,7 @@
// channel) which correspond to the originating node.
std::vector<State *> channel_source_state_;
- std::map<const Node *, aos::Sender<MessageHeader>>
+ std::map<const Node *, aos::Sender<message_bridge::RemoteMessage>>
remote_timestamp_senders_map_;
};
@@ -754,7 +763,11 @@
// Map of channel indices to new name. The channel index will be an index into
// logged_configuration(), and the string key will be the name of the channel
// to send on instead of the logged channel name.
- std::map<size_t, std::string> remapped_channels_;
+ struct RemappedChannel {
+ std::string remapped_name;
+ std::string new_type;
+ };
+ std::map<size_t, RemappedChannel> remapped_channels_;
std::vector<MapT> maps_;
// Number of nodes which still have data to send. This is used to figure out
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index a6385d3..c536711 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -5,6 +5,7 @@
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/simulated_event_loop.h"
+#include "aos/network/remote_message_generated.h"
#include "aos/network/timestamp_generated.h"
#include "aos/testing/tmpdir.h"
#include "aos/util/file.h"
@@ -21,6 +22,7 @@
namespace testing {
namespace chrono = std::chrono;
+using aos::message_bridge::RemoteMessage;
using aos::testing::MessageCounter;
class LoggerTest : public ::testing::Test {
@@ -376,13 +378,13 @@
logfile_base_ + "_pi2_data/test/aos.examples.Pong.part1.bfbs",
logfile_base_ + "_pi2_data.part0.bfbs",
logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
- "aos.logger.MessageHeader.part0.bfbs",
+ "aos.message_bridge.RemoteMessage.part0.bfbs",
logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
- "aos.logger.MessageHeader.part1.bfbs",
+ "aos.message_bridge.RemoteMessage.part1.bfbs",
logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
- "aos.logger.MessageHeader.part0.bfbs",
+ "aos.message_bridge.RemoteMessage.part0.bfbs",
logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
- "aos.logger.MessageHeader.part1.bfbs",
+ "aos.message_bridge.RemoteMessage.part1.bfbs",
logfile_base_ +
"_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part0.bfbs",
logfile_base_ +
@@ -1415,9 +1417,9 @@
std::unique_ptr<EventLoop> pi2_event_loop =
log_reader_factory.MakeEventLoop("test", pi2);
- MessageCounter<MessageHeader> pi1_original_message_header_counter(
+ MessageCounter<RemoteMessage> pi1_original_message_header_counter(
pi1_event_loop.get(), "/original/aos/remote_timestamps/pi2");
- MessageCounter<MessageHeader> pi2_original_message_header_counter(
+ MessageCounter<RemoteMessage> pi2_original_message_header_counter(
pi2_event_loop.get(), "/original/aos/remote_timestamps/pi1");
aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
@@ -1455,7 +1457,7 @@
[&pi1_event_loop, pi1_timestamp_channel, ping_timestamp_channel,
&pi1_timestamp_on_pi1_fetcher, &pi1_timestamp_on_pi2_fetcher,
&ping_on_pi1_fetcher,
- &ping_on_pi2_fetcher](const MessageHeader &header) {
+ &ping_on_pi2_fetcher](const RemoteMessage &header) {
const aos::monotonic_clock::time_point header_monotonic_sent_time(
chrono::nanoseconds(header.monotonic_sent_time()));
const aos::realtime_clock::time_point header_realtime_sent_time(
@@ -1507,7 +1509,7 @@
[&pi2_event_loop, pi2_timestamp_channel, pong_timestamp_channel,
&pi2_timestamp_on_pi2_fetcher, &pi2_timestamp_on_pi1_fetcher,
&pong_on_pi2_fetcher,
- &pong_on_pi1_fetcher](const MessageHeader &header) {
+ &pong_on_pi1_fetcher](const RemoteMessage &header) {
const aos::monotonic_clock::time_point header_monotonic_sent_time(
chrono::nanoseconds(header.monotonic_sent_time()));
const aos::realtime_clock::time_point header_realtime_sent_time(
diff --git a/aos/events/logging/multinode_pingpong.json b/aos/events/logging/multinode_pingpong.json
index 6ac51e7..fde7525 100644
--- a/aos/events/logging/multinode_pingpong.json
+++ b/aos/events/logging/multinode_pingpong.json
@@ -87,13 +87,13 @@
},
{
"name": "/pi1/aos/remote_timestamps/pi2",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "pi1"
},
{
"name": "/pi2/aos/remote_timestamps/pi1",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "pi2"
},
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
index 097a31c..54da1c1 100644
--- a/aos/events/multinode_pingpong.json
+++ b/aos/events/multinode_pingpong.json
@@ -116,13 +116,13 @@
},
{
"name": "/pi1/aos/remote_timestamps/pi2",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "pi1"
},
{
"name": "/pi2/aos/remote_timestamps/pi1",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "pi2"
},
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index bdf52c1..fb144c3 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -10,6 +10,7 @@
#include "aos/events/test_message_generated.h"
#include "aos/network/message_bridge_client_generated.h"
#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/remote_message_generated.h"
#include "aos/network/timestamp_generated.h"
#include "gtest/gtest.h"
@@ -19,10 +20,11 @@
std::string ConfigPrefix() { return "aos/"; }
-} // namespace
-
+using message_bridge::RemoteMessage;
namespace chrono = ::std::chrono;
+} // namespace
+
class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
public:
::std::unique_ptr<EventLoop> Make(std::string_view name) override {
@@ -422,9 +424,9 @@
pi3_pong_counter_event_loop.get(), "/pi3/aos");
// Count remote timestamps
- MessageCounter<logger::MessageHeader> remote_timestamps_pi2_on_pi1(
+ MessageCounter<RemoteMessage> remote_timestamps_pi2_on_pi1(
pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
- MessageCounter<logger::MessageHeader> remote_timestamps_pi1_on_pi2(
+ MessageCounter<RemoteMessage> remote_timestamps_pi1_on_pi2(
pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
// Wait to let timestamp estimation start up before looking for the results.
@@ -575,7 +577,7 @@
"/pi1/aos/remote_timestamps/pi2",
[pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
&ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
- &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
+ &pi1_on_pi1_timestamp_fetcher](const RemoteMessage &header) {
VLOG(1) << aos::FlatbufferToJson(&header);
const aos::monotonic_clock::time_point header_monotonic_sent_time(
@@ -825,9 +827,9 @@
pi3_pong_counter_event_loop.get(), "/pi3/aos");
// Count remote timestamps
- MessageCounter<logger::MessageHeader> remote_timestamps_pi2_on_pi1(
+ MessageCounter<RemoteMessage> remote_timestamps_pi2_on_pi1(
pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
- MessageCounter<logger::MessageHeader> remote_timestamps_pi1_on_pi2(
+ MessageCounter<RemoteMessage> remote_timestamps_pi1_on_pi2(
pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
MessageCounter<message_bridge::ServerStatistics>
@@ -1023,7 +1025,7 @@
pi1_remote_timestamp->MakeWatcher(
"/pi1/aos/remote_timestamps/pi2",
[reliable_channel_index,
- &reliable_timestamp_count](const logger::MessageHeader &header) {
+ &reliable_timestamp_count](const RemoteMessage &header) {
VLOG(1) << aos::FlatbufferToJson(&header);
if (header.channel_index() == reliable_channel_index) {
++reliable_timestamp_count;
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index b9ff861..1ba1408 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -1,7 +1,10 @@
#include "aos/events/simulated_network_bridge.h"
+#include "absl/strings/str_cat.h"
+#include "aos/configuration.h"
#include "aos/events/event_loop.h"
#include "aos/events/simulated_event_loop.h"
+#include "aos/network/remote_message_generated.h"
namespace aos {
namespace message_bridge {
@@ -21,7 +24,7 @@
ServerConnection *server_connection, int client_index,
MessageBridgeClientStatus *client_status,
size_t channel_index,
- aos::Sender<logger::MessageHeader> *timestamp_logger)
+ aos::Sender<RemoteMessage> *timestamp_logger)
: fetch_node_factory_(fetch_node_factory),
send_node_factory_(send_node_factory),
send_event_loop_(send_event_loop),
@@ -117,11 +120,11 @@
client_connection_->received_packets() + 1);
if (timestamp_logger_) {
- aos::Sender<logger::MessageHeader>::Builder builder =
+ aos::Sender<RemoteMessage>::Builder builder =
timestamp_logger_->MakeBuilder();
- logger::MessageHeader::Builder message_header_builder =
- builder.MakeBuilder<logger::MessageHeader>();
+ RemoteMessage::Builder message_header_builder =
+ builder.MakeBuilder<RemoteMessage>();
message_header_builder.add_channel_index(channel_index_);
@@ -178,7 +181,7 @@
ClientConnection *client_connection_ = nullptr;
size_t channel_index_;
- aos::Sender<logger::MessageHeader> *timestamp_logger_ = nullptr;
+ aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
};
SimulatedMessageBridge::SimulatedMessageBridge(
@@ -370,7 +373,7 @@
if (!timestamp_loggers[other_node_index]) {
timestamp_loggers[other_node_index] =
- event_loop->MakeSender<logger::MessageHeader>(
+ event_loop->MakeSender<RemoteMessage>(
absl::StrCat("/aos/remote_timestamps/",
connection->name()->string_view()));
}
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 2a4da63..5130c65 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -2,10 +2,10 @@
#define AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
#include "aos/events/event_loop.h"
-#include "aos/events/logging/logger_generated.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/network/message_bridge_client_status.h"
#include "aos/network/message_bridge_server_status.h"
+#include "aos/network/remote_message_generated.h"
namespace aos {
namespace message_bridge {
@@ -41,7 +41,7 @@
MessageBridgeServerStatus server_status;
MessageBridgeClientStatus client_status;
- std::vector<aos::Sender<logger::MessageHeader>> timestamp_loggers;
+ std::vector<aos::Sender<RemoteMessage>> timestamp_loggers;
};
// Map of nodes to event loops. This is a member variable so that the
// lifetime of the event loops matches the lifetime of the bridge.
diff --git a/aos/flatbuffer_introspection_test.cc b/aos/flatbuffer_introspection_test.cc
index cd005ce..1119b3b 100644
--- a/aos/flatbuffer_introspection_test.cc
+++ b/aos/flatbuffer_introspection_test.cc
@@ -10,13 +10,13 @@
class FlatbufferIntrospectionTest : public ::testing::Test {
public:
FlatbufferIntrospectionTest()
- : schema_data_(
- util::ReadFileToStringOrDie("aos/json_to_flatbuffer.bfbs")) {
+ : schema_data_(FileToFlatbuffer<reflection::Schema>(
+ "aos/json_to_flatbuffer.bfbs")) {
schema_ = reflection::GetSchema(schema_data_.span().data());
}
protected:
- FlatbufferString<reflection::Schema> schema_data_;
+ FlatbufferVector<reflection::Schema> schema_data_;
const reflection::Schema *schema_;
};
diff --git a/aos/flatbuffers.bzl b/aos/flatbuffers.bzl
new file mode 100644
index 0000000..eb19beb
--- /dev/null
+++ b/aos/flatbuffers.bzl
@@ -0,0 +1,22 @@
+def cc_static_flatbuffer(name, target, function):
+ """Creates a cc_library which encodes a file as a Span.
+
+ args:
+ target, The file to encode.
+ function, The inline function, with full namespaces, to create.
+ """
+ native.genrule(
+ name = name + "_gen",
+ tools = ["//aos:flatbuffers_static"],
+ srcs = [target],
+ outs = [name + ".h"],
+ cmd = "$(location //aos:flatbuffers_static) $(SRCS) $(OUTS) '" + function + "'",
+ )
+
+ native.cc_library(
+ name = name,
+ hdrs = [name + ".h"],
+ deps = [
+ "@com_google_absl//absl/types:span",
+ ],
+ )
diff --git a/aos/flatbuffers_static.py b/aos/flatbuffers_static.py
new file mode 100644
index 0000000..199c5b9
--- /dev/null
+++ b/aos/flatbuffers_static.py
@@ -0,0 +1,65 @@
+#!/usr/bin/python3
+
+# Application to generate C++ code with a binary flatbuffer file embedded in it
+# as a Span.
+
+import sys
+from pathlib import Path
+
+
+def main(argv):
+ if len(argv) != 4:
+ return 1
+
+ input_path = sys.argv[1]
+ output_path = sys.argv[2]
+ function = sys.argv[3].split("::")
+ include_guard = output_path.replace('/', '_').replace('-', '_').replace(
+ '.', '_').upper() + '_'
+
+ output_prefix = [
+ b'#ifndef ' + include_guard.encode(),
+ b'#define ' + include_guard.encode(),
+ b'',
+ b'#include "absl/types/span.h"',
+ b'',
+ ]
+
+ for f in function[:-1]:
+ output_prefix.append(b'namespace ' + f.encode() + b' {')
+
+ output_prefix.append(b'')
+ output_prefix.append(b'inline absl::Span<const uint8_t> ' +
+ function[-1].encode() + b'() {')
+
+ output_suffix = [
+ b' return absl::Span<const uint8_t>(reinterpret_cast<const uint8_t*>(kData), sizeof(kData));',
+ b'}'
+ ]
+ output_suffix.append(b'')
+
+ for f in function[:-1]:
+ output_suffix.append(b'} // namespace ' + f.encode())
+
+ output_suffix.append(b'')
+ output_suffix.append(b'#endif // ' + include_guard.encode())
+
+ with open(input_path, 'rb') as binary_file:
+ bfbs = binary_file.read()
+
+ # Write out the header file
+ with open(output_path, 'wb') as output:
+ for line in output_prefix:
+ output.write(line)
+ output.write(b'\n')
+ output.write(b' alignas(64) static constexpr char kData[] = "')
+ for byte in bfbs:
+ output.write(b'\\x' + (b'%x' % byte).zfill(2))
+ output.write(b'";\n')
+ for line in output_suffix:
+ output.write(line)
+ output.write(b'\n')
+
+
+if __name__ == '__main__':
+ sys.exit(main(sys.argv))
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 4818aa2..13ba42e 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -1,10 +1,23 @@
load("//aos/seasocks:gen_embedded.bzl", "gen_embedded")
-load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_ts_library")
load("//aos:config.bzl", "aos_config")
+load("//aos:flatbuffers.bzl", "cc_static_flatbuffer")
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_ts_library")
package(default_visibility = ["//visibility:public"])
flatbuffer_cc_library(
+ name = "remote_message_fbs",
+ srcs = ["remote_message.fbs"],
+ gen_reflections = 1,
+)
+
+cc_static_flatbuffer(
+ name = "remote_message_schema",
+ function = "aos::message_bridge::RemoteMessageSchema",
+ target = ":remote_message_fbs_reflection_out",
+)
+
+flatbuffer_cc_library(
name = "connect_fbs",
srcs = ["connect.fbs"],
gen_reflections = 1,
@@ -178,6 +191,7 @@
":message_bridge_protocol",
":message_bridge_server_fbs",
":message_bridge_server_status",
+ ":remote_message_fbs",
":sctp_lib",
":sctp_server",
":timestamp_fbs",
@@ -257,6 +271,7 @@
":message_bridge_client_status",
":message_bridge_protocol",
":message_bridge_server_fbs",
+ ":remote_message_fbs",
":sctp_client",
":timestamp_fbs",
"//aos/events:shm_event_loop",
@@ -285,7 +300,7 @@
name = "message_bridge_test_common_config",
src = "message_bridge_test_common.json",
flatbuffers = [
- "//aos/events/logging:logger_fbs",
+ ":remote_message_fbs",
"//aos/events:ping_fbs",
"//aos/events:pong_fbs",
"//aos/network:message_bridge_client_fbs",
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index b6c94e5..d8a632e 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -8,6 +8,7 @@
#include "aos/network/connect_generated.h"
#include "aos/network/message_bridge_protocol.h"
#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/remote_message_generated.h"
#include "aos/network/sctp_server.h"
#include "glog/logging.h"
@@ -106,11 +107,11 @@
// This needs to be munged and cleaned up to match the timestamp
// standard.
- aos::Sender<logger::MessageHeader>::Builder builder =
+ aos::Sender<RemoteMessage>::Builder builder =
peer.timestamp_logger->MakeBuilder();
- logger::MessageHeader::Builder message_header_builder =
- builder.MakeBuilder<logger::MessageHeader>();
+ RemoteMessage::Builder message_header_builder =
+ builder.MakeBuilder<RemoteMessage>();
message_header_builder.add_channel_index(
message_header->channel_index());
@@ -173,10 +174,10 @@
// time out eventually. Need to sort that out.
}
-void ChannelState::AddPeer(
- const Connection *connection, int node_index,
- ServerConnection *server_connection_statistics, bool logged_remotely,
- aos::Sender<logger::MessageHeader> *timestamp_logger) {
+void ChannelState::AddPeer(const Connection *connection, int node_index,
+ ServerConnection *server_connection_statistics,
+ bool logged_remotely,
+ aos::Sender<RemoteMessage> *timestamp_logger) {
peers_.emplace_back(connection, node_index, server_connection_statistics,
logged_remotely, timestamp_logger);
}
@@ -330,7 +331,7 @@
// timestamps from it.
if (delivery_time_is_logged && !timestamp_loggers_[other_node_index]) {
timestamp_loggers_[other_node_index] =
- event_loop_->MakeSender<logger::MessageHeader>(
+ event_loop_->MakeSender<RemoteMessage>(
absl::StrCat("/aos/remote_timestamps/",
connection->name()->string_view()));
}
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index c0c1d56..04b0e16 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -11,6 +11,7 @@
#include "aos/network/message_bridge_client_generated.h"
#include "aos/network/message_bridge_server_generated.h"
#include "aos/network/message_bridge_server_status.h"
+#include "aos/network/remote_message_generated.h"
#include "aos/network/sctp_server.h"
#include "aos/network/timestamp_generated.h"
#include "glog/logging.h"
@@ -36,7 +37,7 @@
Peer(const Connection *new_connection, int new_node_index,
ServerConnection *new_server_connection_statistics,
bool new_logged_remotely,
- aos::Sender<logger::MessageHeader> *new_timestamp_logger)
+ aos::Sender<RemoteMessage> *new_timestamp_logger)
: connection(new_connection),
node_index(new_node_index),
server_connection_statistics(new_server_connection_statistics),
@@ -50,7 +51,7 @@
const aos::Connection *connection;
const int node_index;
ServerConnection *server_connection_statistics;
- aos::Sender<logger::MessageHeader> *timestamp_logger = nullptr;
+ aos::Sender<RemoteMessage> *timestamp_logger = nullptr;
// If true, this message will be logged on a receiving node. We need to
// keep it around to log it locally if that fails.
@@ -67,7 +68,7 @@
void AddPeer(const Connection *connection, int node_index,
ServerConnection *server_connection_statistics,
bool logged_remotely,
- aos::Sender<logger::MessageHeader> *timestamp_logger);
+ aos::Sender<RemoteMessage> *timestamp_logger);
// Returns true if this channel has the same name and type as the other
// channel.
@@ -125,7 +126,7 @@
// Event loop to schedule everything on.
aos::ShmEventLoop *event_loop_;
- std::vector<aos::Sender<logger::MessageHeader>> timestamp_loggers_;
+ std::vector<aos::Sender<RemoteMessage>> timestamp_loggers_;
SctpServer server_;
MessageBridgeServerStatus server_status_;
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 36bdde9..ed192b7 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -34,18 +34,18 @@
}
class MessageBridgeTest : public ::testing::Test {
- public:
- MessageBridgeTest()
- : pi1_config(aos::configuration::ReadConfig(
- "aos/network/message_bridge_test_server_config.json")),
- pi2_config(aos::configuration::ReadConfig(
- "aos/network/message_bridge_test_client_config.json")) {
- util::UnlinkRecursive(ShmBase("pi1"));
- util::UnlinkRecursive(ShmBase("pi2"));
- }
+ public:
+ MessageBridgeTest()
+ : pi1_config(aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_server_config.json")),
+ pi2_config(aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_client_config.json")) {
+ util::UnlinkRecursive(ShmBase("pi1"));
+ util::UnlinkRecursive(ShmBase("pi2"));
+ }
- aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
- aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
+ aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
+ aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
};
// Test that we can send a ping message over sctp and receive it.
@@ -89,8 +89,8 @@
ping_event_loop.MakeSender<examples::Ping>("/test");
aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
- aos::Fetcher<logger::MessageHeader> message_header_fetcher1 =
- pi1_test_event_loop.MakeFetcher<logger::MessageHeader>(
+ aos::Fetcher<RemoteMessage> message_header_fetcher1 =
+ pi1_test_event_loop.MakeFetcher<RemoteMessage>(
"/pi1/aos/remote_timestamps/pi2");
// Fetchers for confirming the remote timestamps made it.
@@ -121,8 +121,8 @@
aos::Fetcher<ClientStatistics> client_statistics_fetcher =
test_event_loop.MakeFetcher<ClientStatistics>("/aos");
- aos::Fetcher<logger::MessageHeader> message_header_fetcher2 =
- test_event_loop.MakeFetcher<logger::MessageHeader>(
+ aos::Fetcher<RemoteMessage> message_header_fetcher2 =
+ test_event_loop.MakeFetcher<RemoteMessage>(
"/pi2/aos/remote_timestamps/pi1");
// Event loop for fetching data delivered to pi2 from pi1 to match up
@@ -282,8 +282,9 @@
"/pi1/aos/remote_timestamps/pi2",
[pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
&ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
- &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
- VLOG(1) << aos::FlatbufferToJson(&header);
+ &pi1_on_pi1_timestamp_fetcher](const RemoteMessage &header) {
+ VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ << aos::FlatbufferToJson(&header);
const aos::monotonic_clock::time_point header_monotonic_sent_time(
chrono::nanoseconds(header.monotonic_sent_time()));
@@ -879,9 +880,9 @@
std::atomic<int> ping_timestamp_count{0};
pi1_remote_timestamp_event_loop.MakeWatcher(
"/pi1/aos/remote_timestamps/pi2",
- [ping_channel_index,
- &ping_timestamp_count](const logger::MessageHeader &header) {
- VLOG(1) << aos::FlatbufferToJson(&header);
+ [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
+ VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ << aos::FlatbufferToJson(&header);
if (header.channel_index() == ping_channel_index) {
++ping_timestamp_count;
}
@@ -1034,9 +1035,9 @@
std::atomic<int> ping_timestamp_count{0};
pi1_remote_timestamp_event_loop.MakeWatcher(
"/pi1/aos/remote_timestamps/pi2",
- [ping_channel_index,
- &ping_timestamp_count](const logger::MessageHeader &header) {
- VLOG(1) << aos::FlatbufferToJson(&header);
+ [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
+ VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ << aos::FlatbufferToJson(&header);
if (header.channel_index() == ping_channel_index) {
++ping_timestamp_count;
}
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index a7c0d1b..2b89080 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -74,13 +74,13 @@
},
{
"name": "/pi1/aos/remote_timestamps/pi2",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "pi1",
"frequency": 10
},
{
"name": "/pi2/aos/remote_timestamps/pi1",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "pi2",
"frequency": 10
},
diff --git a/aos/network/remote_message.fbs b/aos/network/remote_message.fbs
new file mode 100644
index 0000000..5659704
--- /dev/null
+++ b/aos/network/remote_message.fbs
@@ -0,0 +1,31 @@
+namespace aos.message_bridge;
+
+table RemoteMessage {
+ // Index into the channel datastructure in the log file header. This
+ // provides the data type.
+ channel_index:uint (id: 0);
+ // Time this message was sent on the monotonic clock in nanoseconds on this
+ // node.
+ monotonic_sent_time:long (id: 1);
+ // Time this message was sent on the realtime clock in nanoseconds on this
+ // node.
+ realtime_sent_time:long (id: 2);
+ // Index into the ipc queue of this message. This should start with 0 and
+ // always monotonically increment if no messages were ever lost. It will
+ // wrap at a multiple of the queue size.
+ queue_index:uint (id: 3);
+
+ // The nested flatbuffer.
+ data:[ubyte] (id: 4);
+
+ // Time this message was sent on the monotonic clock of the remote node in
+ // nanoseconds.
+ monotonic_remote_time:int64 = -9223372036854775808 (id: 5);
+ // Time this message was sent on the realtime clock of the remote node in
+ // nanoseconds.
+ realtime_remote_time:int64 = -9223372036854775808 (id: 6);
+ // Queue index of this message on the remote node.
+ remote_queue_index:uint32 = 4294967295 (id: 7);
+}
+
+root_type RemoteMessage;
diff --git a/third_party/flatbuffers/BUILD b/third_party/flatbuffers/BUILD
index 57f2b8e..ed39893 100644
--- a/third_party/flatbuffers/BUILD
+++ b/third_party/flatbuffers/BUILD
@@ -7,6 +7,7 @@
exports_files([
"LICENSE",
"tsconfig.json",
+ "include/flatbuffers/reflection.bfbs",
])
# Public flatc library to compile flatbuffer files at runtime.
diff --git a/third_party/flatbuffers/include/flatbuffers/flatbuffers.h b/third_party/flatbuffers/include/flatbuffers/flatbuffers.h
index c623126..1a24984 100644
--- a/third_party/flatbuffers/include/flatbuffers/flatbuffers.h
+++ b/third_party/flatbuffers/include/flatbuffers/flatbuffers.h
@@ -1638,6 +1638,17 @@
return CreateSharedString(str->c_str(), str->size());
}
+#ifdef FLATBUFFERS_HAS_STRING_VIEW
+ /// @brief Store a string in the buffer, which can contain any binary data.
+ /// If a string with this exact contents has already been serialized before,
+ /// instead simply returns the offset of the existing string.
+ /// @param[in] str A const pointer to a `String` struct to add to the buffer.
+ /// @return Returns the offset in the buffer where the string starts
+ Offset<String> CreateSharedString(const flatbuffers::string_view str) {
+ return CreateSharedString(str.data(), str.size());
+ }
+#endif
+
/// @cond FLATBUFFERS_INTERNAL
uoffset_t EndVector(size_t len) {
FLATBUFFERS_ASSERT(nested); // Hit if no corresponding StartVector.
diff --git a/y2020/BUILD b/y2020/BUILD
index e54c168..a759681 100644
--- a/y2020/BUILD
+++ b/y2020/BUILD
@@ -172,7 +172,7 @@
"//y2020/vision/sift:sift_fbs",
"//y2020/vision/sift:sift_training_fbs",
"//y2020/vision:vision_fbs",
- "//aos/events/logging:logger_fbs",
+ "//aos/network:remote_message_fbs",
],
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
@@ -200,7 +200,7 @@
"//y2020/vision/sift:sift_fbs",
"//y2020/vision/sift:sift_training_fbs",
"//y2020/vision:vision_fbs",
- "//aos/events/logging:logger_fbs",
+ "//aos/network:remote_message_fbs",
],
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
@@ -216,7 +216,7 @@
src = "y2020_roborio.json",
flatbuffers = [
":setpoint_fbs",
- "//aos/events/logging:logger_fbs",
+ "//aos/network:remote_message_fbs",
"//aos/network:message_bridge_client_fbs",
"//aos/network:message_bridge_server_fbs",
"//aos/network:timestamp_fbs",
diff --git a/y2020/y2020_laptop.json b/y2020/y2020_laptop.json
index 37961f8..164e707 100644
--- a/y2020/y2020_laptop.json
+++ b/y2020/y2020_laptop.json
@@ -176,7 +176,7 @@
},
{
"name": "/laptop/aos/remote_timestamps/roborio",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "laptop",
"logger": "NOT_LOGGED",
"frequency": 20,
@@ -185,7 +185,7 @@
},
{
"name": "/laptop/aos/remote_timestamps/pi1",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "laptop",
"logger": "NOT_LOGGED",
"frequency": 20,
@@ -194,7 +194,7 @@
},
{
"name": "/laptop/aos/remote_timestamps/pi2",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "laptop",
"logger": "NOT_LOGGED",
"frequency": 20,
@@ -203,7 +203,7 @@
},
{
"name": "/laptop/aos/remote_timestamps/pi3",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "laptop",
"logger": "NOT_LOGGED",
"frequency": 20,
@@ -212,7 +212,7 @@
},
{
"name": "/laptop/aos/remote_timestamps/pi4",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "laptop",
"logger": "NOT_LOGGED",
"frequency": 20,
diff --git a/y2020/y2020_roborio.json b/y2020/y2020_roborio.json
index 703d560..d463848 100644
--- a/y2020/y2020_roborio.json
+++ b/y2020/y2020_roborio.json
@@ -44,25 +44,25 @@
},
{
"name": "/roborio/aos/remote_timestamps/pi1",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "roborio"
},
{
"name": "/roborio/aos/remote_timestamps/pi2",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "roborio"
},
{
"name": "/roborio/aos/remote_timestamps/pi3",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "roborio"
},
{
"name": "/roborio/aos/remote_timestamps/pi4",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "roborio"
},