Merge changes I8894689f,I943de460,If81b3186
* changes:
Quiet down message_bridge_test to reduce flakes
Put remote boot UUID in ServerStatistics and RemoteMessage
Switch magic timestamp channel to RemoteMessage
diff --git a/aos/BUILD b/aos/BUILD
index beeb2b0..4ccfd0e 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -596,3 +596,9 @@
"//aos/testing:googletest",
],
)
+
+py_binary(
+ name = "flatbuffers_static",
+ srcs = ["flatbuffers_static.py"],
+ visibility = ["//visibility:public"],
+)
diff --git a/aos/config_flattener.cc b/aos/config_flattener.cc
index a9500e3..8018484 100644
--- a/aos/config_flattener.cc
+++ b/aos/config_flattener.cc
@@ -35,10 +35,10 @@
}
}
- std::vector<aos::FlatbufferString<reflection::Schema>> schemas;
+ std::vector<aos::FlatbufferVector<reflection::Schema>> schemas;
for (int i = 6; i < argc; ++i) {
- schemas.emplace_back(util::ReadFileToStringOrDie(argv[i]));
+ schemas.emplace_back(FileToFlatbuffer<reflection::Schema>(argv[i]));
}
aos::FlatbufferDetachedBuffer<Configuration> merged_config =
diff --git a/aos/configuration.cc b/aos/configuration.cc
index d32cae8..ad2e295 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -591,7 +591,8 @@
const Channel *GetChannel(const Configuration *config, std::string_view name,
std::string_view type,
- std::string_view application_name, const Node *node) {
+ std::string_view application_name, const Node *node,
+ bool quiet) {
const std::string_view original_name = name;
std::string mutable_name;
if (node != nullptr) {
@@ -647,7 +648,7 @@
} else {
VLOG(1) << "No match for { \"name\": \"" << name << "\", \"type\": \""
<< type << "\" }";
- if (original_name != name) {
+ if (original_name != name && !quiet) {
LOG(WARNING) << "Remapped from {\"name\": \"" << original_name
<< "\", \"type\": \"" << type << "\"}, to {\"name\": \""
<< name << "\", \"type\": \"" << type
@@ -683,7 +684,7 @@
FlatbufferDetachedBuffer<Configuration> MergeConfiguration(
const Flatbuffer<Configuration> &config,
- const std::vector<aos::FlatbufferString<reflection::Schema>> &schemas) {
+ const std::vector<aos::FlatbufferVector<reflection::Schema>> &schemas) {
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
@@ -701,8 +702,8 @@
std::vector<flatbuffers::Offset<Channel>> channel_offsets;
for (const Channel *c : *config.message().channels()) {
// Search for a schema with a matching type.
- const aos::FlatbufferString<reflection::Schema> *found_schema = nullptr;
- for (const aos::FlatbufferString<reflection::Schema> &schema : schemas) {
+ const aos::FlatbufferVector<reflection::Schema> *found_schema = nullptr;
+ for (const aos::FlatbufferVector<reflection::Schema> &schema : schemas) {
if (schema.message().root_table() != nullptr) {
if (schema.message().root_table()->name()->string_view() ==
c->type()->string_view()) {
diff --git a/aos/configuration.h b/aos/configuration.h
index 7383a1a..d10d16d 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -31,7 +31,7 @@
// schema list.
FlatbufferDetachedBuffer<Configuration> MergeConfiguration(
const Flatbuffer<Configuration> &config,
- const std::vector<aos::FlatbufferString<reflection::Schema>> &schemas);
+ const std::vector<aos::FlatbufferVector<reflection::Schema>> &schemas);
// Merges a configuration json snippet into the provided configuration and
// returns the modified config.
@@ -49,7 +49,7 @@
const std::string_view name,
const std::string_view type,
const std::string_view application_name,
- const Node *node);
+ const Node *node, bool quiet = false);
inline const Channel *GetChannel(const Flatbuffer<Configuration> &config,
const std::string_view name,
const std::string_view type,
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 95e6cdc..646f597 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -83,6 +83,7 @@
"//aos:configuration_fbs",
"//aos:flatbuffers",
"//aos:ftrace",
+ "//aos/events/logging:uuid",
"//aos/ipc_lib:data_alignment",
"//aos/logging:implementations",
"//aos/time",
@@ -157,7 +158,7 @@
":ping_fbs",
":pong_fbs",
"//aos/network:message_bridge_client_fbs",
- "//aos/events/logging:logger_fbs",
+ "//aos/network:remote_message_fbs",
"//aos/network:timestamp_fbs",
"//aos/network:message_bridge_server_fbs",
],
@@ -301,6 +302,7 @@
":ping_lib",
":pong_lib",
":simulated_event_loop",
+ "//aos/network:remote_message_fbs",
"//aos/testing:googletest",
],
)
@@ -342,6 +344,7 @@
"//aos/ipc_lib:index",
"//aos/network:message_bridge_client_status",
"//aos/network:message_bridge_server_status",
+ "//aos/network:remote_message_fbs",
"//aos/util:phased_loop",
"@com_google_absl//absl/container:btree",
],
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index 08fa7fd..8d01549 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -79,8 +79,9 @@
PhasedLoopHandler::~PhasedLoopHandler() {}
-EventLoop::EventLoop(const Configuration *configuration)
- : timing_report_(flatbuffers::DetachedBuffer()),
+EventLoop::EventLoop(const Configuration *configuration, UUID boot_uuid)
+ : boot_uuid_(boot_uuid),
+ timing_report_(flatbuffers::DetachedBuffer()),
configuration_(configuration) {}
EventLoop::~EventLoop() {
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index a1f87cd..3916ce4 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -12,6 +12,7 @@
#include "aos/events/channel_preallocated_allocator.h"
#include "aos/events/event_loop_event.h"
#include "aos/events/event_loop_generated.h"
+#include "aos/events/logging/uuid.h"
#include "aos/events/timing_statistics.h"
#include "aos/flatbuffers.h"
#include "aos/ftrace.h"
@@ -473,7 +474,7 @@
class EventLoop {
public:
- EventLoop(const Configuration *configuration);
+ EventLoop(const Configuration *configuration, UUID boot_uuid);
virtual ~EventLoop();
@@ -485,8 +486,8 @@
template <typename T>
bool HasChannel(const std::string_view channel_name) {
return configuration::GetChannel(configuration(), channel_name,
- T::GetFullyQualifiedName(), name(),
- node()) != nullptr;
+ T::GetFullyQualifiedName(), name(), node(),
+ true) != nullptr;
}
// Note, it is supported to create:
@@ -653,6 +654,9 @@
// range of Context::buffer_index values for this channel.
virtual int NumberBuffers(const Channel *channel) = 0;
+ // Returns the boot UUID.
+ const UUID &boot_uuid() { return boot_uuid_; }
+
protected:
// Sets the name of the event loop. This is the application name.
virtual void set_name(const std::string_view name) = 0;
@@ -732,6 +736,8 @@
// If true, don't send AOS_LOG to /aos
bool skip_logger_ = false;
+ UUID boot_uuid_;
+
private:
virtual pid_t GetTid() = 0;
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index db718ba..45ec394 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -26,8 +26,8 @@
visibility = ["//visibility:public"],
deps = [
":buffer_encoder",
- ":uuid",
":logger_fbs",
+ ":uuid",
"//aos:configuration",
"//aos:flatbuffer_merge",
"//aos:flatbuffers",
@@ -185,6 +185,8 @@
"//aos/events:event_loop",
"//aos/events:simulated_event_loop",
"//aos/network:message_bridge_server_fbs",
+ "//aos/network:remote_message_fbs",
+ "//aos/network:remote_message_schema",
"//aos/network:team_number",
"//aos/network:timestamp_filter",
"//aos/time",
@@ -273,11 +275,11 @@
name = "multinode_pingpong_config",
src = "multinode_pingpong.json",
flatbuffers = [
- ":logger_fbs",
"//aos/events:ping_fbs",
"//aos/events:pong_fbs",
"//aos/network:message_bridge_client_fbs",
"//aos/network:message_bridge_server_fbs",
+ "//aos/network:remote_message_fbs",
"//aos/network:timestamp_fbs",
],
target_compatible_with = ["@platforms//os:linux"],
@@ -314,6 +316,10 @@
srcs = ["uuid.cc"],
hdrs = ["uuid.h"],
target_compatible_with = ["@platforms//os:linux"],
+ visibility = ["//visibility:public"],
+ deps = [
+ "@com_github_google_glog//:glog",
+ ],
)
cc_test(
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index cd153c2..a73d678 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -15,6 +15,8 @@
#include "aos/events/logging/logger_generated.h"
#include "aos/events/logging/uuid.h"
#include "aos/flatbuffer_merge.h"
+#include "aos/network/remote_message_generated.h"
+#include "aos/network/remote_message_schema.h"
#include "aos/network/team_number.h"
#include "aos/time/time.h"
#include "aos/util/file.h"
@@ -47,7 +49,69 @@
CHECK(result);
return result.value();
}
+
+// Copies the channel, removing the schema as we go. If new_name is provided,
+// it is used instead of the name inside the channel. If new_type is provided,
+// it is used instead of the type in the channel.
+flatbuffers::Offset<Channel> CopyChannel(const Channel *c,
+ std::string_view new_name,
+ std::string_view new_type,
+ flatbuffers::FlatBufferBuilder *fbb) {
+ flatbuffers::Offset<flatbuffers::String> name_offset =
+ fbb->CreateSharedString(new_name.empty() ? c->name()->string_view()
+ : new_name);
+ flatbuffers::Offset<flatbuffers::String> type_offset =
+ fbb->CreateSharedString(new_type.empty() ? c->type()->str() : new_type);
+ flatbuffers::Offset<flatbuffers::String> source_node_offset =
+ c->has_source_node() ? fbb->CreateSharedString(c->source_node()->str())
+ : 0;
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Connection>>>
+ destination_nodes_offset =
+ aos::RecursiveCopyVectorTable(c->destination_nodes(), fbb);
+
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+ logger_nodes_offset = aos::CopyVectorSharedString(c->logger_nodes(), fbb);
+
+ Channel::Builder channel_builder(*fbb);
+ channel_builder.add_name(name_offset);
+ channel_builder.add_type(type_offset);
+ if (c->has_frequency()) {
+ channel_builder.add_frequency(c->frequency());
+ }
+ if (c->has_max_size()) {
+ channel_builder.add_max_size(c->max_size());
+ }
+ if (c->has_num_senders()) {
+ channel_builder.add_num_senders(c->num_senders());
+ }
+ if (c->has_num_watchers()) {
+ channel_builder.add_num_watchers(c->num_watchers());
+ }
+ if (!source_node_offset.IsNull()) {
+ channel_builder.add_source_node(source_node_offset);
+ }
+ if (!destination_nodes_offset.IsNull()) {
+ channel_builder.add_destination_nodes(destination_nodes_offset);
+ }
+ if (c->has_logger()) {
+ channel_builder.add_logger(c->logger());
+ }
+ if (!logger_nodes_offset.IsNull()) {
+ channel_builder.add_logger_nodes(logger_nodes_offset);
+ }
+ if (c->has_read_method()) {
+ channel_builder.add_read_method(c->read_method());
+ }
+ if (c->has_num_readers()) {
+ channel_builder.add_num_readers(c->num_readers());
+ }
+ return channel_builder.Finish();
+}
+
namespace chrono = std::chrono;
+using message_bridge::RemoteMessage;
} // namespace
Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
@@ -83,7 +147,7 @@
const Channel *channel = configuration::GetChannel(
event_loop->configuration(),
absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
- logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
+ RemoteMessage::GetFullyQualifiedName(), event_loop_->name(),
event_loop_->node());
CHECK(channel != nullptr)
@@ -137,7 +201,7 @@
channel, event_loop_->node(), event_loop_->node());
}
- // Now, detect a MessageHeader timestamp logger where we should just log the
+ // Now, detect a RemoteMessage timestamp logger where we should just log the
// contents to a file directly.
const bool log_contents = timestamp_logger_channels.find(channel) !=
timestamp_logger_channels.end();
@@ -607,8 +671,8 @@
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
- const MessageHeader *msg =
- flatbuffers::GetRoot<MessageHeader>(f.fetcher->context().data);
+ const RemoteMessage *msg =
+ flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
logger::MessageHeader::Builder message_header_builder(fbb);
@@ -735,11 +799,22 @@
for (const Node *remote_node : timestamp_logger_nodes) {
const std::string channel = absl::StrCat(
"/aos/remote_timestamps/", remote_node->name()->string_view());
- CHECK(HasChannel<logger::MessageHeader>(channel, node))
- << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
- << logger::MessageHeader::GetFullyQualifiedName() << "\"} for node "
- << node->name()->string_view();
- RemapLoggedChannel<logger::MessageHeader>(channel, node);
+ // See if the log file is an old log with MessageHeader channels in it, or
+ // a newer log with RemoteMessage. If we find an older log, rename the
+ // type too along with the name.
+ if (HasChannel<MessageHeader>(channel, node)) {
+ CHECK(!HasChannel<RemoteMessage>(channel, node))
+ << ": Can't have both a MessageHeader and RemoteMessage remote "
+ "timestamp channel.";
+ RemapLoggedChannel<MessageHeader>(channel, node, "/original",
+ "aos.message_bridge.RemoteMessage");
+ } else {
+ CHECK(HasChannel<RemoteMessage>(channel, node))
+ << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
+ << RemoteMessage::GetFullyQualifiedName() << "\"} for node "
+ << node->name()->string_view();
+ RemapLoggedChannel<RemoteMessage>(channel, node);
+ }
}
}
@@ -1239,7 +1314,7 @@
logged_configuration()->channels()->Get(logged_channel_index));
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
- aos::Sender<MessageHeader> *remote_timestamp_sender = nullptr;
+ aos::Sender<RemoteMessage> *remote_timestamp_sender = nullptr;
State *source_state = nullptr;
@@ -1542,7 +1617,8 @@
}
void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
- std::string_view add_prefix) {
+ std::string_view add_prefix,
+ std::string_view new_type) {
for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
const Channel *const channel = logged_configuration()->channels()->Get(ii);
if (channel->name()->str() == name &&
@@ -1550,10 +1626,14 @@
CHECK_EQ(0u, remapped_channels_.count(ii))
<< "Already remapped channel "
<< configuration::CleanedChannelToString(channel);
- remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
+ RemappedChannel remapped_channel;
+ remapped_channel.remapped_name =
+ std::string(add_prefix) + std::string(name);
+ remapped_channel.new_type = new_type;
+ remapped_channels_[ii] = std::move(remapped_channel);
VLOG(1) << "Remapping channel "
<< configuration::CleanedChannelToString(channel)
- << " to have name " << remapped_channels_[ii];
+ << " to have name " << remapped_channels_[ii].remapped_name;
MakeRemappedConfig();
return;
}
@@ -1564,7 +1644,8 @@
void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
const Node *node,
- std::string_view add_prefix) {
+ std::string_view add_prefix,
+ std::string_view new_type) {
VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
const Channel *remapped_channel =
configuration::GetChannel(logged_configuration(), name, type, "", node);
@@ -1601,8 +1682,13 @@
CHECK_EQ(0u, remapped_channels_.count(channel_index))
<< "Already remapped channel "
<< configuration::CleanedChannelToString(remapped_channel);
- remapped_channels_[channel_index] =
- absl::StrCat(add_prefix, remapped_channel->name()->string_view());
+
+ RemappedChannel remapped_channel_struct;
+ remapped_channel_struct.remapped_name =
+ std::string(add_prefix) +
+ std::string(remapped_channel->name()->string_view());
+ remapped_channel_struct.new_type = new_type;
+ remapped_channels_[channel_index] = std::move(remapped_channel_struct);
MakeRemappedConfig();
}
@@ -1625,73 +1711,96 @@
const Configuration *const base_config = replay_configuration_ == nullptr
? logged_configuration()
: replay_configuration_;
- // The remapped config will be identical to the base_config, except that it
- // will have a bunch of extra channels in the channel list, which are exact
- // copies of the remapped channels, but with different names.
- // Because the flatbuffers API is a pain to work with, this requires a bit of
- // a song-and-dance to get copied over.
- // The order of operations is to:
- // 1) Make a flatbuffer builder for a config that will just contain a list of
- // the new channels that we want to add.
- // 2) For each channel that we are remapping:
- // a) Make a buffer/builder and construct into it a Channel table that only
- // contains the new name for the channel.
- // b) Merge the new channel with just the name into the channel that we are
- // trying to copy, built in the flatbuffer builder made in 1. This gives
- // us the new channel definition that we need.
- // 3) Using this list of offsets, build the Configuration of just new
- // Channels.
- // 4) Merge the Configuration with the new Channels into the base_config.
- // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
- // chance to sanitize the config.
+
+ // Create a config with all the channels, but un-sorted/merged. Collect up
+ // the schemas while we do this. Call MergeConfiguration to sort everything,
+ // and then merge it all in together.
// This is the builder that we use for the config containing all the new
// channels.
- flatbuffers::FlatBufferBuilder new_config_fbb;
- new_config_fbb.ForceDefaults(true);
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+
+ CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 13u)
+ << ": Merging logic needs to be updated when the number of channel "
+ "fields changes.";
+
+ // List of schemas.
+ std::map<std::string_view, FlatbufferVector<reflection::Schema>> schema_map;
+ // Make sure our new RemoteMessage schema is in there for old logs without it.
+ schema_map.insert(std::make_pair(
+ RemoteMessage::GetFullyQualifiedName(),
+ FlatbufferVector<reflection::Schema>(FlatbufferSpan<reflection::Schema>(
+ message_bridge::RemoteMessageSchema()))));
+
+ // Reconstruct the remapped channels.
for (auto &pair : remapped_channels_) {
- // This is the builder that we use for creating the Channel with just the
- // new name.
- flatbuffers::FlatBufferBuilder new_name_fbb;
- new_name_fbb.ForceDefaults(true);
- const flatbuffers::Offset<flatbuffers::String> name_offset =
- new_name_fbb.CreateString(pair.second);
- ChannelBuilder new_name_builder(new_name_fbb);
- new_name_builder.add_name(name_offset);
- new_name_fbb.Finish(new_name_builder.Finish());
- const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
- // Retrieve the channel that we want to copy, confirming that it is
- // actually present in base_config.
- const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
+ const Channel *const c = CHECK_NOTNULL(configuration::GetChannel(
base_config, logged_configuration()->channels()->Get(pair.first), "",
nullptr));
- // Actually create the new channel and put it into the vector of Offsets
- // that we will use to create the new Configuration.
- channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
- reinterpret_cast<const flatbuffers::Table *>(base_channel),
- reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
- &new_config_fbb));
+ channel_offsets.emplace_back(
+ CopyChannel(c, pair.second.remapped_name, "", &fbb));
}
- // Create the Configuration containing the new channels that we want to add.
- const auto new_channel_vector_offsets =
- new_config_fbb.CreateVector(channel_offsets);
- // Now create the new maps.
+ // Now reconstruct the original channels, translating types as needed
+ for (const Channel *c : *base_config->channels()) {
+ // Search for a mapping channel.
+ std::string_view new_type = "";
+ for (auto &pair : remapped_channels_) {
+ const Channel *const remapped_channel =
+ logged_configuration()->channels()->Get(pair.first);
+ if (remapped_channel->name()->string_view() == c->name()->string_view() &&
+ remapped_channel->type()->string_view() == c->type()->string_view()) {
+ new_type = pair.second.new_type;
+ break;
+ }
+ }
+
+ // Copy everything over.
+ channel_offsets.emplace_back(CopyChannel(c, "", new_type, &fbb));
+
+ // Add the schema if it doesn't exist.
+ if (schema_map.find(c->type()->string_view()) == schema_map.end()) {
+ CHECK(c->has_schema());
+ schema_map.insert(std::make_pair(c->type()->string_view(),
+ RecursiveCopyFlatBuffer(c->schema())));
+ }
+ }
+
+ // The MergeConfiguration API takes a vector, not a map. Convert.
+ std::vector<FlatbufferVector<reflection::Schema>> schemas;
+ while (!schema_map.empty()) {
+ schemas.emplace_back(std::move(schema_map.begin()->second));
+ schema_map.erase(schema_map.begin());
+ }
+
+ // Create the Configuration containing the new channels that we want to add.
+ const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
+ channels_offset =
+ channel_offsets.empty() ? 0 : fbb.CreateVector(channel_offsets);
+
+ // Copy over the old maps.
std::vector<flatbuffers::Offset<Map>> map_offsets;
+ if (base_config->maps()) {
+ for (const Map *map : *base_config->maps()) {
+ map_offsets.emplace_back(aos::RecursiveCopyFlatBuffer(map, &fbb));
+ }
+ }
+
+ // Now create the new maps. These are second so they take effect first.
for (const MapT &map : maps_) {
const flatbuffers::Offset<flatbuffers::String> match_name_offset =
- new_config_fbb.CreateString(map.match->name);
+ fbb.CreateString(map.match->name);
const flatbuffers::Offset<flatbuffers::String> match_type_offset =
- new_config_fbb.CreateString(map.match->type);
+ fbb.CreateString(map.match->type);
const flatbuffers::Offset<flatbuffers::String> rename_name_offset =
- new_config_fbb.CreateString(map.rename->name);
+ fbb.CreateString(map.rename->name);
flatbuffers::Offset<flatbuffers::String> match_source_node_offset;
if (!map.match->source_node.empty()) {
- match_source_node_offset =
- new_config_fbb.CreateString(map.match->source_node);
+ match_source_node_offset = fbb.CreateString(map.match->source_node);
}
- Channel::Builder match_builder(new_config_fbb);
+ Channel::Builder match_builder(fbb);
match_builder.add_name(match_name_offset);
match_builder.add_type(match_type_offset);
if (!map.match->source_node.empty()) {
@@ -1699,36 +1808,66 @@
}
const flatbuffers::Offset<Channel> match_offset = match_builder.Finish();
- Channel::Builder rename_builder(new_config_fbb);
+ Channel::Builder rename_builder(fbb);
rename_builder.add_name(rename_name_offset);
const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish();
- Map::Builder map_builder(new_config_fbb);
+ Map::Builder map_builder(fbb);
map_builder.add_match(match_offset);
map_builder.add_rename(rename_offset);
map_offsets.emplace_back(map_builder.Finish());
}
- const auto new_maps_offsets = new_config_fbb.CreateVector(map_offsets);
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Map>>>
+ maps_offsets = map_offsets.empty() ? 0 : fbb.CreateVector(map_offsets);
- ConfigurationBuilder new_config_builder(new_config_fbb);
- new_config_builder.add_channels(new_channel_vector_offsets);
- new_config_builder.add_maps(new_maps_offsets);
- new_config_fbb.Finish(new_config_builder.Finish());
- const FlatbufferDetachedBuffer<Configuration> new_name_config =
- new_config_fbb.Release();
- // Merge the new channels configuration into the base_config, giving us the
- // remapped configuration.
+ // And copy everything else over.
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>>
+ nodes_offset = aos::RecursiveCopyVectorTable(base_config->nodes(), &fbb);
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>>
+ applications_offset =
+ aos::RecursiveCopyVectorTable(base_config->applications(), &fbb);
+
+ // Now insert everything else in unmodified.
+ ConfigurationBuilder configuration_builder(fbb);
+ if (!channels_offset.IsNull()) {
+ configuration_builder.add_channels(channels_offset);
+ }
+ if (!maps_offsets.IsNull()) {
+ configuration_builder.add_maps(maps_offsets);
+ }
+ if (!nodes_offset.IsNull()) {
+ configuration_builder.add_nodes(nodes_offset);
+ }
+ if (!applications_offset.IsNull()) {
+ configuration_builder.add_applications(applications_offset);
+ }
+
+ if (base_config->has_channel_storage_duration()) {
+ configuration_builder.add_channel_storage_duration(
+ base_config->channel_storage_duration());
+ }
+
+ CHECK_EQ(Configuration::MiniReflectTypeTable()->num_elems, 6u)
+ << ": Merging logic needs to be updated when the number of configuration "
+ "fields changes.";
+
+ fbb.Finish(configuration_builder.Finish());
+
+ // Clean it up and return it! By using MergeConfiguration here, we'll
+ // actually get a deduplicated config for free too.
+ FlatbufferDetachedBuffer<Configuration> new_merged_config =
+ configuration::MergeConfiguration(
+ FlatbufferDetachedBuffer<Configuration>(fbb.Release()));
+
remapped_configuration_buffer_ =
std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
- MergeFlatBuffers<Configuration>(base_config,
- &new_name_config.message()));
- // Call MergeConfiguration to deal with sanitizing the config.
- remapped_configuration_buffer_ =
- std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
- configuration::MergeConfiguration(*remapped_configuration_buffer_));
+ configuration::MergeConfiguration(new_merged_config, schemas));
remapped_configuration_ = &remapped_configuration_buffer_->message();
+
+ // TODO(austin): Lazily re-build to save CPU?
}
const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
@@ -1741,7 +1880,7 @@
if (remapped_channels_.count(channel_index) > 0) {
VLOG(3) << "Got remapped channel on "
<< configuration::CleanedChannelToString(channel);
- channel_name = remapped_channels_[channel_index];
+ channel_name = remapped_channels_[channel_index].remapped_name;
}
VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
@@ -1786,7 +1925,7 @@
size_t logged_channel_index, size_t factory_channel_index,
std::unique_ptr<RawSender> sender,
message_bridge::NoncausalOffsetEstimator *filter,
- aos::Sender<MessageHeader> *remote_timestamp_sender, State *source_state) {
+ aos::Sender<RemoteMessage> *remote_timestamp_sender, State *source_state) {
channels_[logged_channel_index] = std::move(sender);
filters_[logged_channel_index] = filter;
remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
@@ -1874,12 +2013,12 @@
timestamp);
} else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
nullptr) {
- aos::Sender<MessageHeader>::Builder builder =
+ aos::Sender<RemoteMessage>::Builder builder =
remote_timestamp_senders_[timestamped_message.channel_index]
->MakeBuilder();
- logger::MessageHeader::Builder message_header_builder =
- builder.MakeBuilder<logger::MessageHeader>();
+ RemoteMessage::Builder message_header_builder =
+ builder.MakeBuilder<RemoteMessage>();
message_header_builder.add_channel_index(
factory_channel_index_[timestamped_message.channel_index]);
@@ -1905,7 +2044,7 @@
return true;
}
-aos::Sender<MessageHeader> *LogReader::State::RemoteTimestampSender(
+aos::Sender<RemoteMessage> *LogReader::State::RemoteTimestampSender(
const Node *delivered_node) {
auto sender = remote_timestamp_senders_map_.find(delivered_node);
@@ -1913,7 +2052,7 @@
sender = remote_timestamp_senders_map_
.emplace(std::make_pair(
delivered_node,
- event_loop()->MakeSender<MessageHeader>(
+ event_loop()->MakeSender<RemoteMessage>(
absl::StrCat("/aos/remote_timestamps/",
delivered_node->name()->string_view()))))
.first;
@@ -1984,7 +2123,8 @@
timestamp_mapper_->PopFront();
// Skip any messages without forwarding information.
- if (timestamped_message.monotonic_remote_time != monotonic_clock::min_time) {
+ if (timestamped_message.monotonic_remote_time !=
+ monotonic_clock::min_time) {
// Got a forwarding timestamp!
filter = filters_[timestamped_message.channel_index];
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index f0f0a69..a248f80 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -19,6 +19,7 @@
#include "aos/events/logging/uuid.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/remote_message_generated.h"
#include "aos/network/timestamp_filter.h"
#include "aos/time/time.h"
#include "flatbuffers/flatbuffers.h"
@@ -376,35 +377,41 @@
// interference. This operates on raw channel names, without any node or
// application specific mappings.
void RemapLoggedChannel(std::string_view name, std::string_view type,
- std::string_view add_prefix = "/original");
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "");
template <typename T>
void RemapLoggedChannel(std::string_view name,
- std::string_view add_prefix = "/original") {
- RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "") {
+ RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix, new_type);
}
// Remaps the provided channel, though this respects node mappings, and
// preserves them too. This makes it so if /aos -> /pi1/aos on one node,
// /original/aos -> /original/pi1/aos on the same node after renaming, just
- // like you would hope.
+ // like you would hope. If new_type is not empty, the new channel will use
+ // the provided type instead. This allows for renaming messages.
//
// TODO(austin): If you have 2 nodes remapping something to the same channel,
// this doesn't handle that. No use cases exist yet for that, so it isn't
// being done yet.
void RemapLoggedChannel(std::string_view name, std::string_view type,
const Node *node,
- std::string_view add_prefix = "/original");
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "");
template <typename T>
void RemapLoggedChannel(std::string_view name, const Node *node,
- std::string_view add_prefix = "/original") {
- RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix);
+ std::string_view add_prefix = "/original",
+ std::string_view new_type = "") {
+ RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix,
+ new_type);
}
template <typename T>
bool HasChannel(std::string_view name, const Node *node = nullptr) {
return configuration::GetChannel(log_file_header()->configuration(), name,
- T::GetFullyQualifiedName(), "",
- node) != nullptr;
+ T::GetFullyQualifiedName(), "", node,
+ true) != nullptr;
}
SimulatedEventLoopFactory *event_loop_factory() {
@@ -500,7 +507,7 @@
// Returns the MessageHeader sender to log delivery timestamps to for the
// provided remote node.
- aos::Sender<MessageHeader> *RemoteTimestampSender(
+ aos::Sender<message_bridge::RemoteMessage> *RemoteTimestampSender(
const Node *delivered_node);
// Converts a timestamp from the monotonic clock on this node to the
@@ -548,11 +555,12 @@
void SetChannelCount(size_t count);
// Sets the sender, filter, and target factory for a channel.
- void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
- std::unique_ptr<RawSender> sender,
- message_bridge::NoncausalOffsetEstimator *filter,
- aos::Sender<MessageHeader> *remote_timestamp_sender,
- State *source_state);
+ void SetChannel(
+ size_t logged_channel_index, size_t factory_channel_index,
+ std::unique_ptr<RawSender> sender,
+ message_bridge::NoncausalOffsetEstimator *filter,
+ aos::Sender<message_bridge::RemoteMessage> *remote_timestamp_sender,
+ State *source_state);
// Returns if we have read all the messages from all the logs.
bool at_end() const {
@@ -608,7 +616,8 @@
// Senders.
std::vector<std::unique_ptr<RawSender>> channels_;
- std::vector<aos::Sender<MessageHeader> *> remote_timestamp_senders_;
+ std::vector<aos::Sender<message_bridge::RemoteMessage> *>
+ remote_timestamp_senders_;
// The mapping from logged channel index to sent channel index. Needed for
// sending out MessageHeaders.
std::vector<int> factory_channel_index_;
@@ -649,7 +658,7 @@
// channel) which correspond to the originating node.
std::vector<State *> channel_source_state_;
- std::map<const Node *, aos::Sender<MessageHeader>>
+ std::map<const Node *, aos::Sender<message_bridge::RemoteMessage>>
remote_timestamp_senders_map_;
};
@@ -754,7 +763,11 @@
// Map of channel indices to new name. The channel index will be an index into
// logged_configuration(), and the string key will be the name of the channel
// to send on instead of the logged channel name.
- std::map<size_t, std::string> remapped_channels_;
+ struct RemappedChannel {
+ std::string remapped_name;
+ std::string new_type;
+ };
+ std::map<size_t, RemappedChannel> remapped_channels_;
std::vector<MapT> maps_;
// Number of nodes which still have data to send. This is used to figure out
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index a6385d3..8115400 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -5,6 +5,7 @@
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/simulated_event_loop.h"
+#include "aos/network/remote_message_generated.h"
#include "aos/network/timestamp_generated.h"
#include "aos/testing/tmpdir.h"
#include "aos/util/file.h"
@@ -21,6 +22,7 @@
namespace testing {
namespace chrono = std::chrono;
+using aos::message_bridge::RemoteMessage;
using aos::testing::MessageCounter;
class LoggerTest : public ::testing::Test {
@@ -376,13 +378,13 @@
logfile_base_ + "_pi2_data/test/aos.examples.Pong.part1.bfbs",
logfile_base_ + "_pi2_data.part0.bfbs",
logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
- "aos.logger.MessageHeader.part0.bfbs",
+ "aos.message_bridge.RemoteMessage.part0.bfbs",
logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
- "aos.logger.MessageHeader.part1.bfbs",
+ "aos.message_bridge.RemoteMessage.part1.bfbs",
logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
- "aos.logger.MessageHeader.part0.bfbs",
+ "aos.message_bridge.RemoteMessage.part0.bfbs",
logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
- "aos.logger.MessageHeader.part1.bfbs",
+ "aos.message_bridge.RemoteMessage.part1.bfbs",
logfile_base_ +
"_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part0.bfbs",
logfile_base_ +
@@ -684,25 +686,31 @@
UnorderedElementsAre(
std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
- std::make_tuple("/test", "aos.examples.Ping", 2001)));
+ std::make_tuple("/test", "aos.examples.Ping", 2001)))
+ << " : " << logfiles_[0];
// Timestamps for pong
EXPECT_THAT(
CountChannelsTimestamp(logfiles_[0]),
UnorderedElementsAre(
std::make_tuple("/test", "aos.examples.Pong", 2001),
- std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)));
+ std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
+ << " : " << logfiles_[0];
// Pong data.
- EXPECT_THAT(CountChannelsData(logfiles_[1]),
- UnorderedElementsAre(
- std::make_tuple("/test", "aos.examples.Pong", 101)));
+ EXPECT_THAT(
+ CountChannelsData(logfiles_[1]),
+ UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 91)))
+ << " : " << logfiles_[1];
EXPECT_THAT(CountChannelsData(logfiles_[2]),
UnorderedElementsAre(
- std::make_tuple("/test", "aos.examples.Pong", 1900)));
+ std::make_tuple("/test", "aos.examples.Pong", 1910)))
+ << " : " << logfiles_[1];
// No timestamps
- EXPECT_THAT(CountChannelsTimestamp(logfiles_[1]), UnorderedElementsAre());
- EXPECT_THAT(CountChannelsTimestamp(logfiles_[2]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[1]), UnorderedElementsAre())
+ << " : " << logfiles_[1];
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[2]), UnorderedElementsAre())
+ << " : " << logfiles_[2];
// Timing reports and pongs.
EXPECT_THAT(
@@ -710,56 +718,74 @@
UnorderedElementsAre(
std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
std::make_tuple("/pi2/aos", "aos.timing.Report", 40),
- std::make_tuple("/test", "aos.examples.Pong", 2001)));
+ std::make_tuple("/test", "aos.examples.Pong", 2001)))
+ << " : " << logfiles_[3];
// And ping timestamps.
EXPECT_THAT(
CountChannelsTimestamp(logfiles_[3]),
UnorderedElementsAre(
std::make_tuple("/test", "aos.examples.Ping", 2001),
- std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)));
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
+ << " : " << logfiles_[3];
// Timestamps from pi2 on pi1, and the other way.
- EXPECT_THAT(CountChannelsData(logfiles_[4]), UnorderedElementsAre());
- EXPECT_THAT(CountChannelsData(logfiles_[5]), UnorderedElementsAre());
- EXPECT_THAT(CountChannelsData(logfiles_[6]), UnorderedElementsAre());
- EXPECT_THAT(CountChannelsData(logfiles_[7]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsData(logfiles_[4]), UnorderedElementsAre())
+ << " : " << logfiles_[4];
+ EXPECT_THAT(CountChannelsData(logfiles_[5]), UnorderedElementsAre())
+ << " : " << logfiles_[5];
+ EXPECT_THAT(CountChannelsData(logfiles_[6]), UnorderedElementsAre())
+ << " : " << logfiles_[6];
+ EXPECT_THAT(CountChannelsData(logfiles_[7]), UnorderedElementsAre())
+ << " : " << logfiles_[7];
EXPECT_THAT(
CountChannelsTimestamp(logfiles_[4]),
UnorderedElementsAre(
- std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 10),
- std::make_tuple("/test", "aos.examples.Ping", 101)));
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
+ std::make_tuple("/test", "aos.examples.Ping", 91)))
+ << " : " << logfiles_[4];
EXPECT_THAT(
CountChannelsTimestamp(logfiles_[5]),
UnorderedElementsAre(
- std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 190),
- std::make_tuple("/test", "aos.examples.Ping", 1900)));
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
+ std::make_tuple("/test", "aos.examples.Ping", 1910)))
+ << " : " << logfiles_[5];
EXPECT_THAT(CountChannelsTimestamp(logfiles_[6]),
UnorderedElementsAre(std::make_tuple(
- "/pi2/aos", "aos.message_bridge.Timestamp", 10)));
+ "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
+ << " : " << logfiles_[6];
EXPECT_THAT(CountChannelsTimestamp(logfiles_[7]),
UnorderedElementsAre(std::make_tuple(
- "/pi2/aos", "aos.message_bridge.Timestamp", 190)));
+ "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
+ << " : " << logfiles_[7];
// And then test that the remotely logged timestamp data files only have
// timestamps in them.
- EXPECT_THAT(CountChannelsTimestamp(logfiles_[8]), UnorderedElementsAre());
- EXPECT_THAT(CountChannelsTimestamp(logfiles_[9]), UnorderedElementsAre());
- EXPECT_THAT(CountChannelsTimestamp(logfiles_[10]), UnorderedElementsAre());
- EXPECT_THAT(CountChannelsTimestamp(logfiles_[11]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[8]), UnorderedElementsAre())
+ << " : " << logfiles_[8];
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[9]), UnorderedElementsAre())
+ << " : " << logfiles_[9];
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[10]), UnorderedElementsAre())
+ << " : " << logfiles_[10];
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[11]), UnorderedElementsAre())
+ << " : " << logfiles_[11];
EXPECT_THAT(CountChannelsData(logfiles_[8]),
UnorderedElementsAre(std::make_tuple(
- "/pi1/aos", "aos.message_bridge.Timestamp", 10)));
+ "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
+ << " : " << logfiles_[8];
EXPECT_THAT(CountChannelsData(logfiles_[9]),
UnorderedElementsAre(std::make_tuple(
- "/pi1/aos", "aos.message_bridge.Timestamp", 190)));
+ "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
+ << " : " << logfiles_[9];
EXPECT_THAT(CountChannelsData(logfiles_[10]),
UnorderedElementsAre(std::make_tuple(
- "/pi2/aos", "aos.message_bridge.Timestamp", 10)));
+ "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
+ << " : " << logfiles_[10];
EXPECT_THAT(CountChannelsData(logfiles_[11]),
UnorderedElementsAre(std::make_tuple(
- "/pi2/aos", "aos.message_bridge.Timestamp", 190)));
+ "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
+ << " : " << logfiles_[11];
}
LogReader reader(SortParts(logfiles_));
@@ -1415,9 +1441,9 @@
std::unique_ptr<EventLoop> pi2_event_loop =
log_reader_factory.MakeEventLoop("test", pi2);
- MessageCounter<MessageHeader> pi1_original_message_header_counter(
+ MessageCounter<RemoteMessage> pi1_original_message_header_counter(
pi1_event_loop.get(), "/original/aos/remote_timestamps/pi2");
- MessageCounter<MessageHeader> pi2_original_message_header_counter(
+ MessageCounter<RemoteMessage> pi2_original_message_header_counter(
pi2_event_loop.get(), "/original/aos/remote_timestamps/pi1");
aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
@@ -1455,7 +1481,7 @@
[&pi1_event_loop, pi1_timestamp_channel, ping_timestamp_channel,
&pi1_timestamp_on_pi1_fetcher, &pi1_timestamp_on_pi2_fetcher,
&ping_on_pi1_fetcher,
- &ping_on_pi2_fetcher](const MessageHeader &header) {
+ &ping_on_pi2_fetcher](const RemoteMessage &header) {
const aos::monotonic_clock::time_point header_monotonic_sent_time(
chrono::nanoseconds(header.monotonic_sent_time()));
const aos::realtime_clock::time_point header_realtime_sent_time(
@@ -1507,7 +1533,7 @@
[&pi2_event_loop, pi2_timestamp_channel, pong_timestamp_channel,
&pi2_timestamp_on_pi2_fetcher, &pi2_timestamp_on_pi1_fetcher,
&pong_on_pi2_fetcher,
- &pong_on_pi1_fetcher](const MessageHeader &header) {
+ &pong_on_pi1_fetcher](const RemoteMessage &header) {
const aos::monotonic_clock::time_point header_monotonic_sent_time(
chrono::nanoseconds(header.monotonic_sent_time()));
const aos::realtime_clock::time_point header_realtime_sent_time(
diff --git a/aos/events/logging/multinode_pingpong.json b/aos/events/logging/multinode_pingpong.json
index 6ac51e7..fde7525 100644
--- a/aos/events/logging/multinode_pingpong.json
+++ b/aos/events/logging/multinode_pingpong.json
@@ -87,13 +87,13 @@
},
{
"name": "/pi1/aos/remote_timestamps/pi2",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "pi1"
},
{
"name": "/pi2/aos/remote_timestamps/pi1",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "pi2"
},
diff --git a/aos/events/logging/uuid.cc b/aos/events/logging/uuid.cc
index 376fa95..3914740 100644
--- a/aos/events/logging/uuid.cc
+++ b/aos/events/logging/uuid.cc
@@ -1,9 +1,14 @@
#include "aos/events/logging/uuid.h"
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
#include <array>
#include <random>
#include <string_view>
+#include "glog/logging.h"
+
namespace aos {
namespace {
char ToHex(int val) {
@@ -57,9 +62,24 @@
return result;
}
-UUID UUID::Zero() {
+UUID UUID::Zero() { return FromString("00000000-0000-0000-0000-000000000000"); }
+
+UUID UUID::FromString(std::string_view str) {
UUID result;
- result.data_.fill(0);
+ CHECK_EQ(str.size(), kSize);
+
+ std::copy(str.begin(), str.end(), result.data_.begin());
+ return result;
+}
+
+UUID UUID::BootUUID() {
+ int fd = open("/proc/sys/kernel/random/boot_id", O_RDONLY);
+ PCHECK(fd != -1);
+
+ UUID result;
+ CHECK_EQ(static_cast<ssize_t>(kSize), read(fd, result.data_.begin(), kSize));
+ close(fd);
+
return result;
}
diff --git a/aos/events/logging/uuid.h b/aos/events/logging/uuid.h
index 0387a4b..d7ec33a 100644
--- a/aos/events/logging/uuid.h
+++ b/aos/events/logging/uuid.h
@@ -13,8 +13,16 @@
// Returns a randomly generated UUID. This is known as a UUID4.
static UUID Random();
+ // Returns a uuid with all '0' characters.
static UUID Zero();
+ static UUID FromString(std::string_view);
+
+ static UUID BootUUID();
+
+ // Size of a UUID.
+ static constexpr size_t kSize = 36;
+
std::string_view string_view() const {
return std::string_view(data_.data(), data_.size());
}
@@ -30,7 +38,7 @@
UUID() {}
// Fixed size storage for the data. Non-null terminated.
- std::array<char, 36> data_;
+ std::array<char, kSize> data_;
};
} // namespace aos
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
index 097a31c..54da1c1 100644
--- a/aos/events/multinode_pingpong.json
+++ b/aos/events/multinode_pingpong.json
@@ -116,13 +116,13 @@
},
{
"name": "/pi1/aos/remote_timestamps/pi2",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "pi1"
},
{
"name": "/pi2/aos/remote_timestamps/pi1",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "pi2"
},
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index fed5f4c..2a0d332 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -222,7 +222,7 @@
} // namespace
ShmEventLoop::ShmEventLoop(const Configuration *configuration)
- : EventLoop(configuration),
+ : EventLoop(configuration, UUID::BootUUID()),
shm_base_(FLAGS_shm_base),
name_(FLAGS_application_name),
node_(MaybeMyNode(configuration)) {
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 9c8bb09..80ba88c 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -455,7 +455,8 @@
std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
*raw_event_loops,
const Node *node, pid_t tid)
- : EventLoop(CHECK_NOTNULL(configuration)),
+ : EventLoop(CHECK_NOTNULL(configuration),
+ node_event_loop_factory->boot_uuid()),
scheduler_(scheduler),
node_event_loop_factory_(node_event_loop_factory),
channels_(channels),
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 5ba7a8a..40deb43 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -13,6 +13,7 @@
#include "absl/container/btree_map.h"
#include "aos/events/event_loop.h"
#include "aos/events/event_scheduler.h"
+#include "aos/events/logging/uuid.h"
#include "aos/events/simple_channel.h"
#include "aos/flatbuffer_merge.h"
#include "aos/flatbuffers.h"
@@ -175,6 +176,15 @@
scheduler_.SetDistributedOffset(monotonic_offset, monotonic_slope);
}
+ // Returns the boot UUID for this node.
+ const UUID &boot_uuid() const { return boot_uuid_; }
+
+ // Reboots the node. This just resets the boot_uuid_, nothing else.
+ // TODO(austin): This is here for a test case or two, not for general
+ // consumption. The interactions with the rest of the system need to be
+ // worked out better. Don't use this for anything real yet.
+ void Reboot() { boot_uuid_ = UUID::Random(); }
+
private:
friend class SimulatedEventLoopFactory;
NodeEventLoopFactory(
@@ -186,6 +196,8 @@
EventScheduler scheduler_;
SimulatedEventLoopFactory *const factory_;
+ UUID boot_uuid_ = UUID::Random();
+
const Node *const node_;
std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index bdf52c1..3768348 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -10,6 +10,7 @@
#include "aos/events/test_message_generated.h"
#include "aos/network/message_bridge_client_generated.h"
#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/remote_message_generated.h"
#include "aos/network/timestamp_generated.h"
#include "gtest/gtest.h"
@@ -19,10 +20,11 @@
std::string ConfigPrefix() { return "aos/"; }
-} // namespace
-
+using message_bridge::RemoteMessage;
namespace chrono = ::std::chrono;
+} // namespace
+
class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
public:
::std::unique_ptr<EventLoop> Make(std::string_view name) override {
@@ -422,9 +424,9 @@
pi3_pong_counter_event_loop.get(), "/pi3/aos");
// Count remote timestamps
- MessageCounter<logger::MessageHeader> remote_timestamps_pi2_on_pi1(
+ MessageCounter<RemoteMessage> remote_timestamps_pi2_on_pi1(
pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
- MessageCounter<logger::MessageHeader> remote_timestamps_pi1_on_pi2(
+ MessageCounter<RemoteMessage> remote_timestamps_pi1_on_pi2(
pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
// Wait to let timestamp estimation start up before looking for the results.
@@ -446,6 +448,7 @@
for (const message_bridge::ServerConnection *connection :
*stats.connections()) {
EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+ EXPECT_TRUE(connection->has_boot_uuid());
if (connection->node()->name()->string_view() == "pi2") {
EXPECT_GT(connection->sent_packets(), 50);
} else if (connection->node()->name()->string_view() == "pi3") {
@@ -469,6 +472,7 @@
const message_bridge::ServerConnection *connection =
stats.connections()->Get(0);
+ EXPECT_TRUE(connection->has_boot_uuid());
EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
EXPECT_GT(connection->sent_packets(), 50);
EXPECT_TRUE(connection->has_monotonic_offset());
@@ -485,6 +489,7 @@
const message_bridge::ServerConnection *connection =
stats.connections()->Get(0);
+ EXPECT_TRUE(connection->has_boot_uuid());
EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
EXPECT_GE(connection->sent_packets(), 5);
EXPECT_TRUE(connection->has_monotonic_offset());
@@ -575,8 +580,14 @@
"/pi1/aos/remote_timestamps/pi2",
[pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
&ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
- &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
+ &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory,
+ pi2](const RemoteMessage &header) {
VLOG(1) << aos::FlatbufferToJson(&header);
+ EXPECT_TRUE(header.has_boot_uuid());
+ EXPECT_EQ(header.boot_uuid()->string_view(),
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
+ ->boot_uuid()
+ .string_view());
const aos::monotonic_clock::time_point header_monotonic_sent_time(
chrono::nanoseconds(header.monotonic_sent_time()));
@@ -659,9 +670,9 @@
EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
- EXPECT_EQ(pi1_server_statistics_count, 9);
- EXPECT_EQ(pi2_server_statistics_count, 9);
- EXPECT_EQ(pi3_server_statistics_count, 9);
+ EXPECT_EQ(pi1_server_statistics_count, 10);
+ EXPECT_EQ(pi2_server_statistics_count, 10);
+ EXPECT_EQ(pi3_server_statistics_count, 10);
EXPECT_EQ(pi1_client_statistics_count, 95);
EXPECT_EQ(pi2_client_statistics_count, 95);
@@ -719,6 +730,7 @@
for (const message_bridge::ServerConnection *connection :
*stats.connections()) {
EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+ EXPECT_TRUE(connection->has_boot_uuid());
if (connection->node()->name()->string_view() == "pi2") {
EXPECT_EQ(connection->monotonic_offset(),
chrono::nanoseconds(kOffset).count());
@@ -742,6 +754,7 @@
const message_bridge::ServerConnection *connection =
stats.connections()->Get(0);
+ EXPECT_TRUE(connection->has_boot_uuid());
EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
EXPECT_TRUE(connection->has_monotonic_offset());
EXPECT_EQ(connection->monotonic_offset(),
@@ -758,6 +771,7 @@
const message_bridge::ServerConnection *connection =
stats.connections()->Get(0);
+ EXPECT_TRUE(connection->has_boot_uuid());
EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
EXPECT_TRUE(connection->has_monotonic_offset());
EXPECT_EQ(connection->monotonic_offset(), 0);
@@ -768,9 +782,9 @@
chrono::milliseconds(500) +
chrono::milliseconds(5));
- EXPECT_EQ(pi1_server_statistics_count, 9);
+ EXPECT_EQ(pi1_server_statistics_count, 10);
EXPECT_EQ(pi2_server_statistics_count, 9);
- EXPECT_EQ(pi3_server_statistics_count, 9);
+ EXPECT_EQ(pi3_server_statistics_count, 10);
}
// Test that disabling statistics actually disables them.
@@ -825,9 +839,9 @@
pi3_pong_counter_event_loop.get(), "/pi3/aos");
// Count remote timestamps
- MessageCounter<logger::MessageHeader> remote_timestamps_pi2_on_pi1(
+ MessageCounter<RemoteMessage> remote_timestamps_pi2_on_pi1(
pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
- MessageCounter<logger::MessageHeader> remote_timestamps_pi1_on_pi2(
+ MessageCounter<RemoteMessage> remote_timestamps_pi1_on_pi2(
pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
MessageCounter<message_bridge::ServerStatistics>
@@ -1022,8 +1036,13 @@
int reliable_timestamp_count = 0;
pi1_remote_timestamp->MakeWatcher(
"/pi1/aos/remote_timestamps/pi2",
- [reliable_channel_index,
- &reliable_timestamp_count](const logger::MessageHeader &header) {
+ [reliable_channel_index, &reliable_timestamp_count,
+ &simulated_event_loop_factory, pi2](const RemoteMessage &header) {
+ EXPECT_TRUE(header.has_boot_uuid());
+ EXPECT_EQ(header.boot_uuid()->string_view(),
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
+ ->boot_uuid()
+ .string_view());
VLOG(1) << aos::FlatbufferToJson(&header);
if (header.channel_index() == reliable_channel_index) {
++reliable_timestamp_count;
@@ -1049,5 +1068,84 @@
EXPECT_EQ(reliable_timestamp_count, 2u);
}
+// Tests that rebooting a node changes the ServerStatistics message and the
+// RemoteTimestamp message.
+TEST(SimulatedEventLoopTest, BootUUIDTest) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(ConfigPrefix() +
+ "events/multinode_pingpong_config.json");
+ const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+ const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+
+ SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+
+ std::unique_ptr<EventLoop> ping_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+ Ping ping(ping_event_loop.get());
+
+ std::unique_ptr<EventLoop> pong_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+ Pong pong(pong_event_loop.get());
+
+ std::unique_ptr<EventLoop> pi1_remote_timestamp =
+ simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
+ std::string expected_boot_uuid(
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
+ ->boot_uuid()
+ .string_view());
+
+ int timestamp_count = 0;
+ pi1_remote_timestamp->MakeWatcher(
+ "/pi1/aos/remote_timestamps/pi2",
+ [×tamp_count, &expected_boot_uuid](const RemoteMessage &header) {
+ EXPECT_TRUE(header.has_boot_uuid());
+ EXPECT_EQ(header.boot_uuid()->string_view(), expected_boot_uuid);
+ VLOG(1) << aos::FlatbufferToJson(&header);
+ ++timestamp_count;
+ });
+
+ int pi1_server_statistics_count = 0;
+ pi1_remote_timestamp->MakeWatcher(
+ "/pi1/aos", [&pi1_server_statistics_count, &expected_boot_uuid](
+ const message_bridge::ServerStatistics &stats) {
+ VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+ for (const message_bridge::ServerConnection *connection :
+ *stats.connections()) {
+ EXPECT_TRUE(connection->has_boot_uuid());
+ if (connection->node()->name()->string_view() == "pi2") {
+ EXPECT_EQ(expected_boot_uuid,
+ connection->boot_uuid()->string_view())
+ << " : Got " << aos::FlatbufferToJson(&stats);
+ ++pi1_server_statistics_count;
+ }
+ }
+ });
+
+ // Let a couple of ServerStatistics messages show up before rebooting.
+ simulated_event_loop_factory.RunFor(chrono::milliseconds(2001));
+
+ EXPECT_GT(timestamp_count, 100);
+ EXPECT_GE(pi1_server_statistics_count, 1u);
+
+ // Confirm that reboot changes the UUID.
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->Reboot();
+
+ EXPECT_NE(expected_boot_uuid,
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
+ ->boot_uuid()
+ .string_view());
+
+ expected_boot_uuid =
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
+ ->boot_uuid()
+ .string_view();
+ timestamp_count = 0;
+ pi1_server_statistics_count = 0;
+
+ simulated_event_loop_factory.RunFor(chrono::milliseconds(2000));
+ EXPECT_GT(timestamp_count, 100);
+ EXPECT_GE(pi1_server_statistics_count, 1u);
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index b9ff861..45bc6e7 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -1,7 +1,10 @@
#include "aos/events/simulated_network_bridge.h"
+#include "absl/strings/str_cat.h"
+#include "aos/configuration.h"
#include "aos/events/event_loop.h"
#include "aos/events/simulated_event_loop.h"
+#include "aos/network/remote_message_generated.h"
namespace aos {
namespace message_bridge {
@@ -18,15 +21,19 @@
aos::EventLoop *send_event_loop,
std::unique_ptr<aos::RawFetcher> fetcher,
std::unique_ptr<aos::RawSender> sender,
+ MessageBridgeServerStatus *server_status,
+ size_t destination_node_index,
ServerConnection *server_connection, int client_index,
MessageBridgeClientStatus *client_status,
size_t channel_index,
- aos::Sender<logger::MessageHeader> *timestamp_logger)
+ aos::Sender<RemoteMessage> *timestamp_logger)
: fetch_node_factory_(fetch_node_factory),
send_node_factory_(send_node_factory),
send_event_loop_(send_event_loop),
fetcher_(std::move(fetcher)),
sender_(std::move(sender)),
+ server_status_(server_status),
+ destination_node_index_(destination_node_index),
server_connection_(server_connection),
client_status_(client_status),
client_index_(client_index),
@@ -117,11 +124,25 @@
client_connection_->received_packets() + 1);
if (timestamp_logger_) {
- aos::Sender<logger::MessageHeader>::Builder builder =
+ aos::Sender<RemoteMessage>::Builder builder =
timestamp_logger_->MakeBuilder();
- logger::MessageHeader::Builder message_header_builder =
- builder.MakeBuilder<logger::MessageHeader>();
+ // Reset the filter every time the UUID changes. There's probably a more
+ // clever way to do this, but that means a better concept of rebooting.
+ if (server_status_->BootUUID(destination_node_index_) !=
+ send_node_factory_->boot_uuid().string_view()) {
+ server_status_->ResetFilter(destination_node_index_);
+ server_status_->SetBootUUID(
+ destination_node_index_,
+ send_node_factory_->boot_uuid().string_view());
+ }
+
+ flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
+ builder.fbb()->CreateString(
+ send_node_factory_->boot_uuid().string_view());
+
+ RemoteMessage::Builder message_header_builder =
+ builder.MakeBuilder<RemoteMessage>();
message_header_builder.add_channel_index(channel_index_);
@@ -139,6 +160,7 @@
message_header_builder.add_realtime_sent_time(
sender_->realtime_sent_time().time_since_epoch().count());
message_header_builder.add_queue_index(sender_->sent_queue_index());
+ message_header_builder.add_boot_uuid(boot_uuid_offset);
builder.Send(message_header_builder.Finish());
}
@@ -169,6 +191,9 @@
std::unique_ptr<aos::RawFetcher> fetcher_;
// Sender to send them back out.
std::unique_ptr<aos::RawSender> sender_;
+
+ MessageBridgeServerStatus *server_status_;
+ const size_t destination_node_index_;
// True if we have sent the message in the fetcher.
bool sent_ = false;
@@ -178,7 +203,7 @@
ClientConnection *client_connection_ = nullptr;
size_t channel_index_;
- aos::Sender<logger::MessageHeader> *timestamp_logger_ = nullptr;
+ aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
};
SimulatedMessageBridge::SimulatedMessageBridge(
@@ -219,6 +244,28 @@
}
}
+ for (const Node *node : simulated_event_loop_factory->nodes()) {
+ auto it = event_loop_map_.find(node);
+
+ CHECK(it != event_loop_map_.end());
+
+ size_t node_index = 0;
+ for (ServerConnection *connection :
+ it->second.server_status.server_connection()) {
+ if (connection != nullptr) {
+ const Node *client_node =
+ simulated_event_loop_factory->configuration()->nodes()->Get(
+ node_index);
+ auto client_event_loop = event_loop_map_.find(client_node);
+ it->second.server_status.ResetFilter(node_index);
+ it->second.server_status.SetBootUUID(
+ node_index,
+ client_event_loop->second.event_loop->boot_uuid().string_view());
+ }
+ ++node_index;
+ }
+ }
+
for (const Channel *channel :
*simulated_event_loop_factory->configuration()->channels()) {
if (!channel->has_destination_nodes()) {
@@ -265,6 +312,7 @@
destination_event_loop->second.event_loop.get(),
source_event_loop->second.event_loop->MakeRawFetcher(channel),
destination_event_loop->second.event_loop->MakeRawSender(channel),
+ &source_event_loop->second.server_status, destination_node_index,
server_connection, client_index,
&destination_event_loop->second.client_status,
configuration::ChannelIndex(
@@ -370,7 +418,7 @@
if (!timestamp_loggers[other_node_index]) {
timestamp_loggers[other_node_index] =
- event_loop->MakeSender<logger::MessageHeader>(
+ event_loop->MakeSender<RemoteMessage>(
absl::StrCat("/aos/remote_timestamps/",
connection->name()->string_view()));
}
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 2a4da63..5130c65 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -2,10 +2,10 @@
#define AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
#include "aos/events/event_loop.h"
-#include "aos/events/logging/logger_generated.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/network/message_bridge_client_status.h"
#include "aos/network/message_bridge_server_status.h"
+#include "aos/network/remote_message_generated.h"
namespace aos {
namespace message_bridge {
@@ -41,7 +41,7 @@
MessageBridgeServerStatus server_status;
MessageBridgeClientStatus client_status;
- std::vector<aos::Sender<logger::MessageHeader>> timestamp_loggers;
+ std::vector<aos::Sender<RemoteMessage>> timestamp_loggers;
};
// Map of nodes to event loops. This is a member variable so that the
// lifetime of the event loops matches the lifetime of the bridge.
diff --git a/aos/flatbuffer_introspection_test.cc b/aos/flatbuffer_introspection_test.cc
index cd005ce..1119b3b 100644
--- a/aos/flatbuffer_introspection_test.cc
+++ b/aos/flatbuffer_introspection_test.cc
@@ -10,13 +10,13 @@
class FlatbufferIntrospectionTest : public ::testing::Test {
public:
FlatbufferIntrospectionTest()
- : schema_data_(
- util::ReadFileToStringOrDie("aos/json_to_flatbuffer.bfbs")) {
+ : schema_data_(FileToFlatbuffer<reflection::Schema>(
+ "aos/json_to_flatbuffer.bfbs")) {
schema_ = reflection::GetSchema(schema_data_.span().data());
}
protected:
- FlatbufferString<reflection::Schema> schema_data_;
+ FlatbufferVector<reflection::Schema> schema_data_;
const reflection::Schema *schema_;
};
diff --git a/aos/flatbuffers.bzl b/aos/flatbuffers.bzl
new file mode 100644
index 0000000..eb19beb
--- /dev/null
+++ b/aos/flatbuffers.bzl
@@ -0,0 +1,22 @@
+def cc_static_flatbuffer(name, target, function):
+ """Creates a cc_library which encodes a file as a Span.
+
+ args:
+ target, The file to encode.
+ function, The inline function, with full namespaces, to create.
+ """
+ native.genrule(
+ name = name + "_gen",
+ tools = ["//aos:flatbuffers_static"],
+ srcs = [target],
+ outs = [name + ".h"],
+ cmd = "$(location //aos:flatbuffers_static) $(SRCS) $(OUTS) '" + function + "'",
+ )
+
+ native.cc_library(
+ name = name,
+ hdrs = [name + ".h"],
+ deps = [
+ "@com_google_absl//absl/types:span",
+ ],
+ )
diff --git a/aos/flatbuffers_static.py b/aos/flatbuffers_static.py
new file mode 100644
index 0000000..199c5b9
--- /dev/null
+++ b/aos/flatbuffers_static.py
@@ -0,0 +1,65 @@
+#!/usr/bin/python3
+
+# Application to generate C++ code with a binary flatbuffer file embedded in it
+# as a Span.
+
+import sys
+from pathlib import Path
+
+
+def main(argv):
+ if len(argv) != 4:
+ return 1
+
+ input_path = sys.argv[1]
+ output_path = sys.argv[2]
+ function = sys.argv[3].split("::")
+ include_guard = output_path.replace('/', '_').replace('-', '_').replace(
+ '.', '_').upper() + '_'
+
+ output_prefix = [
+ b'#ifndef ' + include_guard.encode(),
+ b'#define ' + include_guard.encode(),
+ b'',
+ b'#include "absl/types/span.h"',
+ b'',
+ ]
+
+ for f in function[:-1]:
+ output_prefix.append(b'namespace ' + f.encode() + b' {')
+
+ output_prefix.append(b'')
+ output_prefix.append(b'inline absl::Span<const uint8_t> ' +
+ function[-1].encode() + b'() {')
+
+ output_suffix = [
+ b' return absl::Span<const uint8_t>(reinterpret_cast<const uint8_t*>(kData), sizeof(kData));',
+ b'}'
+ ]
+ output_suffix.append(b'')
+
+ for f in function[:-1]:
+ output_suffix.append(b'} // namespace ' + f.encode())
+
+ output_suffix.append(b'')
+ output_suffix.append(b'#endif // ' + include_guard.encode())
+
+ with open(input_path, 'rb') as binary_file:
+ bfbs = binary_file.read()
+
+ # Write out the header file
+ with open(output_path, 'wb') as output:
+ for line in output_prefix:
+ output.write(line)
+ output.write(b'\n')
+ output.write(b' alignas(64) static constexpr char kData[] = "')
+ for byte in bfbs:
+ output.write(b'\\x' + (b'%x' % byte).zfill(2))
+ output.write(b'";\n')
+ for line in output_suffix:
+ output.write(line)
+ output.write(b'\n')
+
+
+if __name__ == '__main__':
+ sys.exit(main(sys.argv))
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 4818aa2..13ba42e 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -1,10 +1,23 @@
load("//aos/seasocks:gen_embedded.bzl", "gen_embedded")
-load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_ts_library")
load("//aos:config.bzl", "aos_config")
+load("//aos:flatbuffers.bzl", "cc_static_flatbuffer")
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_ts_library")
package(default_visibility = ["//visibility:public"])
flatbuffer_cc_library(
+ name = "remote_message_fbs",
+ srcs = ["remote_message.fbs"],
+ gen_reflections = 1,
+)
+
+cc_static_flatbuffer(
+ name = "remote_message_schema",
+ function = "aos::message_bridge::RemoteMessageSchema",
+ target = ":remote_message_fbs_reflection_out",
+)
+
+flatbuffer_cc_library(
name = "connect_fbs",
srcs = ["connect.fbs"],
gen_reflections = 1,
@@ -178,6 +191,7 @@
":message_bridge_protocol",
":message_bridge_server_fbs",
":message_bridge_server_status",
+ ":remote_message_fbs",
":sctp_lib",
":sctp_server",
":timestamp_fbs",
@@ -257,6 +271,7 @@
":message_bridge_client_status",
":message_bridge_protocol",
":message_bridge_server_fbs",
+ ":remote_message_fbs",
":sctp_client",
":timestamp_fbs",
"//aos/events:shm_event_loop",
@@ -285,7 +300,7 @@
name = "message_bridge_test_common_config",
src = "message_bridge_test_common.json",
flatbuffers = [
- "//aos/events/logging:logger_fbs",
+ ":remote_message_fbs",
"//aos/events:ping_fbs",
"//aos/events:pong_fbs",
"//aos/network:message_bridge_client_fbs",
diff --git a/aos/network/connect.fbs b/aos/network/connect.fbs
index b593472..7ff2fbe 100644
--- a/aos/network/connect.fbs
+++ b/aos/network/connect.fbs
@@ -10,4 +10,9 @@
// The channels that we want transfered to this client.
channels_to_transfer:[Channel] (id: 1);
+
+ // The UUID that this node booted with.
+ boot_uuid: string (id: 2);
}
+
+root_type Connect;
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index 99b2ac1..38c3831 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -11,6 +11,7 @@
#include "aos/network/sctp_client.h"
#include "aos/network/timestamp_generated.h"
#include "aos/unique_malloc_ptr.h"
+#include "aos/util/file.h"
#include "gflags/gflags.h"
#include "glog/logging.h"
@@ -97,8 +98,9 @@
std::vector<SctpClientChannelState> *channels, int client_index,
MessageBridgeClientStatus *client_status)
: event_loop_(event_loop),
- connect_message_(MakeConnectMessage(event_loop->configuration(), my_node,
- remote_name)),
+ connect_message_(
+ MakeConnectMessage(event_loop->configuration(), my_node, remote_name,
+ event_loop->boot_uuid().string_view())),
message_reception_reply_(MakeMessageHeaderReply()),
remote_node_(CHECK_NOTNULL(
configuration::GetNode(event_loop->configuration(), remote_name))),
diff --git a/aos/network/message_bridge_protocol.cc b/aos/network/message_bridge_protocol.cc
index 8936da8..53102c2 100644
--- a/aos/network/message_bridge_protocol.cc
+++ b/aos/network/message_bridge_protocol.cc
@@ -13,12 +13,15 @@
aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect> MakeConnectMessage(
const Configuration *config, const Node *my_node,
- std::string_view remote_name) {
+ std::string_view remote_name, std::string_view boot_uuid) {
CHECK(config->has_nodes()) << ": Config must have nodes to transfer.";
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
+ flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
+ fbb.CreateString(boot_uuid);
+
flatbuffers::Offset<Node> node_offset =
RecursiveCopyFlatBuffer<Node>(my_node, &fbb);
const std::string_view node_name = my_node->name()->string_view();
@@ -46,6 +49,7 @@
Connect::Builder connect_builder(fbb);
connect_builder.add_channels_to_transfer(channels_offset);
connect_builder.add_node(node_offset);
+ connect_builder.add_boot_uuid(boot_uuid_offset);
fbb.Finish(connect_builder.Finish());
return fbb.Release();
diff --git a/aos/network/message_bridge_protocol.h b/aos/network/message_bridge_protocol.h
index 7b6a248..5759a29 100644
--- a/aos/network/message_bridge_protocol.h
+++ b/aos/network/message_bridge_protocol.h
@@ -33,7 +33,7 @@
// Builds up a subscription request for my_node to remote_name.
aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect> MakeConnectMessage(
const Configuration *config, const Node *my_node,
- std::string_view remote_name);
+ std::string_view remote_name, std::string_view boot_uuid);
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/message_bridge_server.fbs b/aos/network/message_bridge_server.fbs
index 1be0796..fa73ba5 100644
--- a/aos/network/message_bridge_server.fbs
+++ b/aos/network/message_bridge_server.fbs
@@ -27,6 +27,9 @@
// monotonic time.
monotonic_offset:int64 (id: 4);
+ // Boot UUID of the client.
+ boot_uuid:string (id: 5);
+
// TODO(austin): Per channel counts?
}
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index b6c94e5..1ac6cb5 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -8,6 +8,7 @@
#include "aos/network/connect_generated.h"
#include "aos/network/message_bridge_protocol.h"
#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/remote_message_generated.h"
#include "aos/network/sctp_server.h"
#include "glog/logging.h"
@@ -93,8 +94,9 @@
// and flushes. Whee.
}
-void ChannelState::HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t /*ssn*/,
- absl::Span<const uint8_t> data) {
+void ChannelState::HandleDelivery(
+ sctp_assoc_t rcv_assoc_id, uint16_t /*ssn*/, absl::Span<const uint8_t> data,
+ const MessageBridgeServerStatus &server_status) {
const logger::MessageHeader *message_header =
flatbuffers::GetRoot<logger::MessageHeader>(data.data());
for (Peer &peer : peers_) {
@@ -106,11 +108,15 @@
// This needs to be munged and cleaned up to match the timestamp
// standard.
- aos::Sender<logger::MessageHeader>::Builder builder =
+ aos::Sender<RemoteMessage>::Builder builder =
peer.timestamp_logger->MakeBuilder();
- logger::MessageHeader::Builder message_header_builder =
- builder.MakeBuilder<logger::MessageHeader>();
+ flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
+ builder.fbb()->CreateString(
+ server_status.BootUUID(peer.node_index));
+
+ RemoteMessage::Builder message_header_builder =
+ builder.MakeBuilder<RemoteMessage>();
message_header_builder.add_channel_index(
message_header->channel_index());
@@ -130,6 +136,7 @@
message_header->realtime_sent_time());
message_header_builder.add_remote_queue_index(
message_header->queue_index());
+ message_header_builder.add_boot_uuid(boot_uuid_offset);
builder.Send(message_header_builder.Finish());
}
@@ -173,10 +180,10 @@
// time out eventually. Need to sort that out.
}
-void ChannelState::AddPeer(
- const Connection *connection, int node_index,
- ServerConnection *server_connection_statistics, bool logged_remotely,
- aos::Sender<logger::MessageHeader> *timestamp_logger) {
+void ChannelState::AddPeer(const Connection *connection, int node_index,
+ ServerConnection *server_connection_statistics,
+ bool logged_remotely,
+ aos::Sender<RemoteMessage> *timestamp_logger) {
peers_.emplace_back(connection, node_index, server_connection_statistics,
logged_remotely, timestamp_logger);
}
@@ -270,7 +277,8 @@
MakeConnectMessage(event_loop->configuration(),
configuration::GetNode(event_loop->configuration(),
destination_node_name),
- event_loop->node()->name()->string_view())
+ event_loop->node()->name()->string_view(),
+ UUID::Zero().string_view())
.span()
.size());
VLOG(1) << "Connection to " << destination_node_name << " has size "
@@ -330,7 +338,7 @@
// timestamps from it.
if (delivery_time_is_logged && !timestamp_loggers_[other_node_index]) {
timestamp_loggers_[other_node_index] =
- event_loop_->MakeSender<logger::MessageHeader>(
+ event_loop_->MakeSender<RemoteMessage>(
absl::StrCat("/aos/remote_timestamps/",
connection->name()->string_view()));
}
@@ -409,6 +417,7 @@
->name()
->string_view();
server_status_.ResetFilter(node_index);
+ server_status_.SetBootUUID(node_index, "");
}
}
@@ -483,6 +492,7 @@
}
}
server_status_.ResetFilter(node_index);
+ server_status_.SetBootUUID(node_index, connect->boot_uuid()->string_view());
VLOG(1) << "Resetting filters for " << node_index << " "
<< event_loop_->configuration()
->nodes()
@@ -499,7 +509,8 @@
->HandleDelivery(
message->header.rcvinfo.rcv_assoc_id,
message->header.rcvinfo.rcv_ssn,
- absl::Span<const uint8_t>(message->data(), message->size));
+ absl::Span<const uint8_t>(message->data(), message->size),
+ server_status_);
}
if (VLOG_IS_ON(1)) {
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index c0c1d56..55f3fea 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -11,6 +11,7 @@
#include "aos/network/message_bridge_client_generated.h"
#include "aos/network/message_bridge_server_generated.h"
#include "aos/network/message_bridge_server_status.h"
+#include "aos/network/remote_message_generated.h"
#include "aos/network/sctp_server.h"
#include "aos/network/timestamp_generated.h"
#include "glog/logging.h"
@@ -36,7 +37,7 @@
Peer(const Connection *new_connection, int new_node_index,
ServerConnection *new_server_connection_statistics,
bool new_logged_remotely,
- aos::Sender<logger::MessageHeader> *new_timestamp_logger)
+ aos::Sender<RemoteMessage> *new_timestamp_logger)
: connection(new_connection),
node_index(new_node_index),
server_connection_statistics(new_server_connection_statistics),
@@ -50,7 +51,7 @@
const aos::Connection *connection;
const int node_index;
ServerConnection *server_connection_statistics;
- aos::Sender<logger::MessageHeader> *timestamp_logger = nullptr;
+ aos::Sender<RemoteMessage> *timestamp_logger = nullptr;
// If true, this message will be logged on a receiving node. We need to
// keep it around to log it locally if that fails.
@@ -67,7 +68,7 @@
void AddPeer(const Connection *connection, int node_index,
ServerConnection *server_connection_statistics,
bool logged_remotely,
- aos::Sender<logger::MessageHeader> *timestamp_logger);
+ aos::Sender<RemoteMessage> *timestamp_logger);
// Returns true if this channel has the same name and type as the other
// channel.
@@ -81,7 +82,8 @@
// Handles reception of delivery times.
void HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t ssn,
- absl::Span<const uint8_t> data);
+ absl::Span<const uint8_t> data,
+ const MessageBridgeServerStatus &server_status);
// Handles (by consuming) failure to deliver a message.
void HandleFailure(
@@ -125,7 +127,7 @@
// Event loop to schedule everything on.
aos::ShmEventLoop *event_loop_;
- std::vector<aos::Sender<logger::MessageHeader>> timestamp_loggers_;
+ std::vector<aos::Sender<RemoteMessage>> timestamp_loggers_;
SctpServer server_;
MessageBridgeServerStatus server_status_;
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index 1f2de0b..c5c30d7 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -85,6 +85,11 @@
statistics_.message().connections()->size());
filters_.resize(event_loop->configuration()->nodes()->size());
+ boot_uuids_.resize(event_loop->configuration()->nodes()->size());
+ for (std::string &boot_uuid : boot_uuids_) {
+ // Make sure the memory gets allocated.
+ boot_uuid.reserve(UUID::kSize);
+ }
timestamp_fetchers_.resize(event_loop->configuration()->nodes()->size());
server_connection_.resize(event_loop->configuration()->nodes()->size());
@@ -129,6 +134,13 @@
node_name);
}
+void MessageBridgeServerStatus::SetBootUUID(int node_index,
+ std::string_view boot_uuid) {
+ boot_uuids_[node_index] = boot_uuid;
+ SendStatistics();
+ last_statistics_send_time_ = event_loop_->monotonic_now();
+}
+
void MessageBridgeServerStatus::ResetFilter(int node_index) {
filters_[node_index].Reset();
server_connection_[node_index]->mutate_monotonic_offset(0);
@@ -153,6 +165,12 @@
node_builder.add_name(node_name_offset);
flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+ flatbuffers::Offset<flatbuffers::String> boot_uuid_offset;
+ if (!boot_uuids_[node_index].empty() &&
+ connection->state() == State::CONNECTED) {
+ boot_uuid_offset = builder.fbb()->CreateString(boot_uuids_[node_index]);
+ }
+
ServerConnection::Builder server_connection_builder =
builder.MakeBuilder<ServerConnection>();
server_connection_builder.add_node(node_offset);
@@ -167,6 +185,10 @@
connection->monotonic_offset());
}
+ if (!boot_uuid_offset.IsNull()) {
+ server_connection_builder.add_boot_uuid(boot_uuid_offset);
+ }
+
server_connection_offsets_.emplace_back(server_connection_builder.Finish());
}
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index 814fccb..0e70e24 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -40,6 +40,14 @@
// Resets the filter and clears the entry from the server statistics.
void ResetFilter(int node_index);
+ // Sets the boot UUID for the provided node.
+ void SetBootUUID(int node_index, std::string_view boot_uuid);
+
+ // Returns the boot UUID for a node, or an empty string_view if there isn't
+ // one.
+ std::string_view BootUUID(int node_index) const {
+ return boot_uuids_[node_index];
+ }
// Returns the ServerConnection message which is updated by the server.
ServerConnection *FindServerConnection(std::string_view node_name);
@@ -83,6 +91,9 @@
// Bidirectional filters for each connection.
std::vector<ClippedAverageFilter> filters_;
+ // List of UUIDs for each node.
+ std::vector<std::string> boot_uuids_;
+
// Sender for the timestamps that we are forwarding over the network.
aos::Sender<Timestamp> timestamp_sender_;
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 36bdde9..4d5bd49 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -34,18 +34,18 @@
}
class MessageBridgeTest : public ::testing::Test {
- public:
- MessageBridgeTest()
- : pi1_config(aos::configuration::ReadConfig(
- "aos/network/message_bridge_test_server_config.json")),
- pi2_config(aos::configuration::ReadConfig(
- "aos/network/message_bridge_test_client_config.json")) {
- util::UnlinkRecursive(ShmBase("pi1"));
- util::UnlinkRecursive(ShmBase("pi2"));
- }
+ public:
+ MessageBridgeTest()
+ : pi1_config(aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_server_config.json")),
+ pi2_config(aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_client_config.json")) {
+ util::UnlinkRecursive(ShmBase("pi1"));
+ util::UnlinkRecursive(ShmBase("pi2"));
+ }
- aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
- aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
+ aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
+ aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
};
// Test that we can send a ping message over sctp and receive it.
@@ -89,8 +89,8 @@
ping_event_loop.MakeSender<examples::Ping>("/test");
aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
- aos::Fetcher<logger::MessageHeader> message_header_fetcher1 =
- pi1_test_event_loop.MakeFetcher<logger::MessageHeader>(
+ aos::Fetcher<RemoteMessage> message_header_fetcher1 =
+ pi1_test_event_loop.MakeFetcher<RemoteMessage>(
"/pi1/aos/remote_timestamps/pi2");
// Fetchers for confirming the remote timestamps made it.
@@ -121,8 +121,8 @@
aos::Fetcher<ClientStatistics> client_statistics_fetcher =
test_event_loop.MakeFetcher<ClientStatistics>("/aos");
- aos::Fetcher<logger::MessageHeader> message_header_fetcher2 =
- test_event_loop.MakeFetcher<logger::MessageHeader>(
+ aos::Fetcher<RemoteMessage> message_header_fetcher2 =
+ test_event_loop.MakeFetcher<RemoteMessage>(
"/pi2/aos/remote_timestamps/pi1");
// Event loop for fetching data delivered to pi2 from pi1 to match up
@@ -140,7 +140,7 @@
pong_event_loop.MakeWatcher(
"/test", [&pong_count](const examples::Ping &ping) {
++pong_count;
- LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
+ VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
});
FLAGS_override_hostname = "";
@@ -152,7 +152,7 @@
"/pi1/aos",
[&ping_count, &pi2_client_event_loop, &ping_sender,
&pi1_server_statistics_count](const ServerStatistics &stats) {
- LOG(INFO) << FlatbufferToJson(&stats);
+ VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
ASSERT_TRUE(stats.has_connections());
EXPECT_EQ(stats.connections()->size(), 1);
@@ -172,13 +172,14 @@
if (connection->node()->name()->string_view() ==
pi2_client_event_loop.node()->name()->string_view()) {
if (connection->state() == State::CONNECTED) {
+ EXPECT_TRUE(connection->has_boot_uuid());
connected = true;
}
}
}
if (connected) {
- LOG(INFO) << "Connected! Sent ping.";
+ VLOG(1) << "Connected! Sent ping.";
auto builder = ping_sender.MakeBuilder();
examples::Ping::Builder ping_builder =
builder.MakeBuilder<examples::Ping>();
@@ -193,7 +194,7 @@
int pi2_server_statistics_count = 0;
pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_server_statistics_count](
const ServerStatistics &stats) {
- LOG(INFO) << FlatbufferToJson(&stats);
+ VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
for (const ServerConnection *connection : *stats.connections()) {
if (connection->has_monotonic_offset()) {
++pi2_server_statistics_count;
@@ -203,6 +204,7 @@
chrono::milliseconds(1));
EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
chrono::milliseconds(-1));
+ EXPECT_TRUE(connection->has_boot_uuid());
}
}
});
@@ -210,7 +212,7 @@
int pi1_client_statistics_count = 0;
ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
const ClientStatistics &stats) {
- LOG(INFO) << FlatbufferToJson(&stats);
+ VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
for (const ClientConnection *connection : *stats.connections()) {
if (connection->has_monotonic_offset()) {
@@ -230,7 +232,7 @@
int pi2_client_statistics_count = 0;
pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
const ClientStatistics &stats) {
- LOG(INFO) << FlatbufferToJson(&stats);
+ VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
for (const ClientConnection *connection : *stats.connections()) {
if (connection->has_monotonic_offset()) {
@@ -245,11 +247,11 @@
ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp ×tamp) {
EXPECT_TRUE(timestamp.has_offsets());
- LOG(INFO) << "/pi1/aos Timestamp " << FlatbufferToJson(×tamp);
+ VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(×tamp);
});
pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp ×tamp) {
EXPECT_TRUE(timestamp.has_offsets());
- LOG(INFO) << "/pi2/aos Timestamp " << FlatbufferToJson(×tamp);
+ VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(×tamp);
});
// Run for 5 seconds to make sure we have time to estimate the offset.
@@ -282,8 +284,11 @@
"/pi1/aos/remote_timestamps/pi2",
[pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
&ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
- &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
- VLOG(1) << aos::FlatbufferToJson(&header);
+ &pi1_on_pi1_timestamp_fetcher](const RemoteMessage &header) {
+ VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ << aos::FlatbufferToJson(&header);
+
+ EXPECT_TRUE(header.has_boot_uuid());
const aos::monotonic_clock::time_point header_monotonic_sent_time(
chrono::nanoseconds(header.monotonic_sent_time()));
@@ -461,29 +466,29 @@
// Wait until we are connected, then send.
pi1_test_event_loop.MakeWatcher(
"/pi1/aos", [](const ServerStatistics &stats) {
- LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+ VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
});
pi2_test_event_loop.MakeWatcher(
"/pi2/aos", [](const ServerStatistics &stats) {
- LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
+ VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
});
pi1_test_event_loop.MakeWatcher(
"/pi1/aos", [](const ClientStatistics &stats) {
- LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+ VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
});
pi2_test_event_loop.MakeWatcher(
"/pi2/aos", [](const ClientStatistics &stats) {
- LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
+ VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
});
pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp ×tamp) {
- LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(×tamp);
+ VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(×tamp);
});
pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp ×tamp) {
- LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(×tamp);
+ VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(×tamp);
});
// Start everything up. Pong is the only thing we don't know how to wait on,
@@ -532,6 +537,7 @@
chrono::milliseconds(1));
EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
chrono::milliseconds(-1));
+ EXPECT_TRUE(pi1_connection->has_boot_uuid());
EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
EXPECT_TRUE(pi2_connection->has_monotonic_offset());
@@ -539,6 +545,7 @@
chrono::milliseconds(1));
EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
chrono::milliseconds(-1));
+ EXPECT_TRUE(pi2_connection->has_boot_uuid());
}
std::this_thread::sleep_for(std::chrono::seconds(2));
@@ -554,8 +561,10 @@
EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
EXPECT_FALSE(pi1_connection->has_monotonic_offset());
+ EXPECT_FALSE(pi1_connection->has_boot_uuid());
EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
EXPECT_FALSE(pi2_connection->has_monotonic_offset());
+ EXPECT_TRUE(pi2_connection->has_boot_uuid());
}
{
@@ -590,6 +599,7 @@
chrono::milliseconds(1));
EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
chrono::milliseconds(-1));
+ EXPECT_TRUE(pi1_connection->has_boot_uuid());
EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
EXPECT_TRUE(pi2_connection->has_monotonic_offset());
@@ -597,6 +607,7 @@
chrono::milliseconds(1));
EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
chrono::milliseconds(-1));
+ EXPECT_TRUE(pi2_connection->has_boot_uuid());
}
// Shut everyone else down
@@ -664,31 +675,31 @@
// Wait until we are connected, then send.
pi1_test_event_loop.MakeWatcher(
"/pi1/aos", [](const ServerStatistics &stats) {
- LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+ VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
});
// Confirm both client and server statistics messages have decent offsets in
// them.
pi2_test_event_loop.MakeWatcher(
"/pi2/aos", [](const ServerStatistics &stats) {
- LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
+ VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
});
pi1_test_event_loop.MakeWatcher(
"/pi1/aos", [](const ClientStatistics &stats) {
- LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+ VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
});
pi2_test_event_loop.MakeWatcher(
"/pi2/aos", [](const ClientStatistics &stats) {
- LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
+ VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
});
pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp ×tamp) {
- LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(×tamp);
+ VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(×tamp);
});
pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp ×tamp) {
- LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(×tamp);
+ VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(×tamp);
});
// Start everything up. Pong is the only thing we don't know how to wait on,
@@ -737,6 +748,7 @@
chrono::milliseconds(1));
EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
chrono::milliseconds(-1));
+ EXPECT_TRUE(pi1_connection->has_boot_uuid());
EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
EXPECT_TRUE(pi2_connection->has_monotonic_offset());
@@ -744,6 +756,7 @@
chrono::milliseconds(1));
EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
chrono::milliseconds(-1));
+ EXPECT_TRUE(pi2_connection->has_boot_uuid());
}
std::this_thread::sleep_for(std::chrono::seconds(2));
@@ -760,6 +773,7 @@
EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
+ EXPECT_TRUE(pi1_server_connection->has_boot_uuid());
EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
}
@@ -796,6 +810,7 @@
chrono::milliseconds(1));
EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
chrono::milliseconds(-1));
+ EXPECT_TRUE(pi1_connection->has_boot_uuid());
EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
EXPECT_TRUE(pi2_connection->has_monotonic_offset());
@@ -803,6 +818,7 @@
chrono::milliseconds(1));
EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
chrono::milliseconds(-1));
+ EXPECT_TRUE(pi2_connection->has_boot_uuid());
}
// Shut everyone else down
@@ -879,9 +895,10 @@
std::atomic<int> ping_timestamp_count{0};
pi1_remote_timestamp_event_loop.MakeWatcher(
"/pi1/aos/remote_timestamps/pi2",
- [ping_channel_index,
- &ping_timestamp_count](const logger::MessageHeader &header) {
- VLOG(1) << aos::FlatbufferToJson(&header);
+ [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
+ VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ << aos::FlatbufferToJson(&header);
+ EXPECT_TRUE(header.has_boot_uuid());
if (header.channel_index() == ping_channel_index) {
++ping_timestamp_count;
}
@@ -1034,9 +1051,10 @@
std::atomic<int> ping_timestamp_count{0};
pi1_remote_timestamp_event_loop.MakeWatcher(
"/pi1/aos/remote_timestamps/pi2",
- [ping_channel_index,
- &ping_timestamp_count](const logger::MessageHeader &header) {
- VLOG(1) << aos::FlatbufferToJson(&header);
+ [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
+ VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ << aos::FlatbufferToJson(&header);
+ EXPECT_TRUE(header.has_boot_uuid());
if (header.channel_index() == ping_channel_index) {
++ping_timestamp_count;
}
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index a7c0d1b..2b89080 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -74,13 +74,13 @@
},
{
"name": "/pi1/aos/remote_timestamps/pi2",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "pi1",
"frequency": 10
},
{
"name": "/pi2/aos/remote_timestamps/pi1",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "pi2",
"frequency": 10
},
diff --git a/aos/network/remote_message.fbs b/aos/network/remote_message.fbs
new file mode 100644
index 0000000..b43f055
--- /dev/null
+++ b/aos/network/remote_message.fbs
@@ -0,0 +1,34 @@
+namespace aos.message_bridge;
+
+table RemoteMessage {
+ // Index into the channel datastructure in the log file header. This
+ // provides the data type.
+ channel_index:uint (id: 0);
+ // Time this message was sent on the monotonic clock in nanoseconds on this
+ // node.
+ monotonic_sent_time:long (id: 1);
+ // Time this message was sent on the realtime clock in nanoseconds on this
+ // node.
+ realtime_sent_time:long (id: 2);
+ // Index into the ipc queue of this message. This should start with 0 and
+ // always monotonically increment if no messages were ever lost. It will
+ // wrap at a multiple of the queue size.
+ queue_index:uint (id: 3);
+
+ // The nested flatbuffer.
+ data:[ubyte] (id: 4);
+
+ // Time this message was sent on the monotonic clock of the remote node in
+ // nanoseconds.
+ monotonic_remote_time:int64 = -9223372036854775808 (id: 5);
+ // Time this message was sent on the realtime clock of the remote node in
+ // nanoseconds.
+ realtime_remote_time:int64 = -9223372036854775808 (id: 6);
+ // Queue index of this message on the remote node.
+ remote_queue_index:uint32 = 4294967295 (id: 7);
+
+ // UUID for this boot.
+ boot_uuid:string (id: 8);
+}
+
+root_type RemoteMessage;
diff --git a/third_party/flatbuffers/BUILD b/third_party/flatbuffers/BUILD
index 57f2b8e..ed39893 100644
--- a/third_party/flatbuffers/BUILD
+++ b/third_party/flatbuffers/BUILD
@@ -7,6 +7,7 @@
exports_files([
"LICENSE",
"tsconfig.json",
+ "include/flatbuffers/reflection.bfbs",
])
# Public flatc library to compile flatbuffer files at runtime.
diff --git a/third_party/flatbuffers/include/flatbuffers/flatbuffers.h b/third_party/flatbuffers/include/flatbuffers/flatbuffers.h
index c623126..1a24984 100644
--- a/third_party/flatbuffers/include/flatbuffers/flatbuffers.h
+++ b/third_party/flatbuffers/include/flatbuffers/flatbuffers.h
@@ -1638,6 +1638,17 @@
return CreateSharedString(str->c_str(), str->size());
}
+#ifdef FLATBUFFERS_HAS_STRING_VIEW
+ /// @brief Store a string in the buffer, which can contain any binary data.
+ /// If a string with this exact contents has already been serialized before,
+ /// instead simply returns the offset of the existing string.
+ /// @param[in] str A const pointer to a `String` struct to add to the buffer.
+ /// @return Returns the offset in the buffer where the string starts
+ Offset<String> CreateSharedString(const flatbuffers::string_view str) {
+ return CreateSharedString(str.data(), str.size());
+ }
+#endif
+
/// @cond FLATBUFFERS_INTERNAL
uoffset_t EndVector(size_t len) {
FLATBUFFERS_ASSERT(nested); // Hit if no corresponding StartVector.
diff --git a/y2020/BUILD b/y2020/BUILD
index e54c168..a759681 100644
--- a/y2020/BUILD
+++ b/y2020/BUILD
@@ -172,7 +172,7 @@
"//y2020/vision/sift:sift_fbs",
"//y2020/vision/sift:sift_training_fbs",
"//y2020/vision:vision_fbs",
- "//aos/events/logging:logger_fbs",
+ "//aos/network:remote_message_fbs",
],
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
@@ -200,7 +200,7 @@
"//y2020/vision/sift:sift_fbs",
"//y2020/vision/sift:sift_training_fbs",
"//y2020/vision:vision_fbs",
- "//aos/events/logging:logger_fbs",
+ "//aos/network:remote_message_fbs",
],
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
@@ -216,7 +216,7 @@
src = "y2020_roborio.json",
flatbuffers = [
":setpoint_fbs",
- "//aos/events/logging:logger_fbs",
+ "//aos/network:remote_message_fbs",
"//aos/network:message_bridge_client_fbs",
"//aos/network:message_bridge_server_fbs",
"//aos/network:timestamp_fbs",
diff --git a/y2020/y2020_laptop.json b/y2020/y2020_laptop.json
index 37961f8..164e707 100644
--- a/y2020/y2020_laptop.json
+++ b/y2020/y2020_laptop.json
@@ -176,7 +176,7 @@
},
{
"name": "/laptop/aos/remote_timestamps/roborio",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "laptop",
"logger": "NOT_LOGGED",
"frequency": 20,
@@ -185,7 +185,7 @@
},
{
"name": "/laptop/aos/remote_timestamps/pi1",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "laptop",
"logger": "NOT_LOGGED",
"frequency": 20,
@@ -194,7 +194,7 @@
},
{
"name": "/laptop/aos/remote_timestamps/pi2",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "laptop",
"logger": "NOT_LOGGED",
"frequency": 20,
@@ -203,7 +203,7 @@
},
{
"name": "/laptop/aos/remote_timestamps/pi3",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "laptop",
"logger": "NOT_LOGGED",
"frequency": 20,
@@ -212,7 +212,7 @@
},
{
"name": "/laptop/aos/remote_timestamps/pi4",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"source_node": "laptop",
"logger": "NOT_LOGGED",
"frequency": 20,
diff --git a/y2020/y2020_roborio.json b/y2020/y2020_roborio.json
index 703d560..d463848 100644
--- a/y2020/y2020_roborio.json
+++ b/y2020/y2020_roborio.json
@@ -44,25 +44,25 @@
},
{
"name": "/roborio/aos/remote_timestamps/pi1",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "roborio"
},
{
"name": "/roborio/aos/remote_timestamps/pi2",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "roborio"
},
{
"name": "/roborio/aos/remote_timestamps/pi3",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "roborio"
},
{
"name": "/roborio/aos/remote_timestamps/pi4",
- "type": "aos.logger.MessageHeader",
+ "type": "aos.message_bridge.RemoteMessage",
"logger": "NOT_LOGGED",
"source_node": "roborio"
},