Deduplicate logger nodes and tags when merging configs
When merging new configuration snippets with an existing config, it is
really easy to end up with duplicated nodes or tags. This is generally
not what the user actually wants, they just want things to work. So,
deduplicate those when merging.
Turns out, the way we were merging configuration before was both slow
and hard to update to deduplicate things. It ended up being easier to
transform the config into an intermediate representation which is more
suited to deduplication, and then transform it back to a flatbuffer.
This sets us up to do this efficiently inside the log reader too if we
want.
Change-Id: Ia8636569493e82c55d4e92f046fbbb09e1fb4c7a
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/configuration.cc b/aos/configuration.cc
index f2488a5..68984d5 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -14,6 +14,7 @@
#include <string_view>
#include <vector>
+#include "absl/container/btree_map.h"
#include "absl/container/btree_set.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
@@ -36,6 +37,7 @@
"for config validation, and shouldn't be touched.");
namespace aos {
+namespace configuration {
namespace {
namespace chrono = std::chrono;
@@ -59,6 +61,8 @@
return absl::StrCat(filename, replacement);
}
+void ValidateUnmergedConfiguration(const Flatbuffer<Configuration> &config);
+
FlatbufferDetachedBuffer<Configuration> ReadConfigFile(std::string_view path,
bool binary) {
if (binary) {
@@ -72,92 +76,708 @@
CHECK_GT(buffer.size(), 0u) << ": Failed to parse JSON file: " << path;
- return FlatbufferDetachedBuffer<Configuration>(std::move(buffer));
+ FlatbufferDetachedBuffer<Configuration> result(std::move(buffer));
+ configuration::ValidateUnmergedConfiguration(result);
+ return result;
}
-} // namespace
-// Define the compare and equal operators for Channel and Application so we can
-// insert them in the btree below.
-bool operator<(const FlatbufferDetachedBuffer<Channel> &lhs,
- const FlatbufferDetachedBuffer<Channel> &rhs) {
- int name_compare = lhs.message().name()->string_view().compare(
- rhs.message().name()->string_view());
- if (name_compare == 0) {
- return lhs.message().type()->string_view() <
- rhs.message().type()->string_view();
- } else if (name_compare < 0) {
- return true;
- } else {
- return false;
+// Struct representing a Connection in a channel in a way that is easy to work
+// with.
+struct MutableConnection {
+ // See configuration.fbs for a description of what each of these fields
+ // represents.
+ std::string_view name;
+ std::optional<LoggerConfig> timestamp_logger;
+ absl::btree_set<std::string_view> timestamp_logger_nodes;
+ std::optional<uint16_t> priority;
+ std::optional<uint32_t> time_to_live;
+};
+
+// The name of a channel.
+struct MutableChannelName {
+ std::string_view name;
+ std::string_view type;
+
+ bool operator==(const MutableChannelName &other) const {
+ return std::make_tuple(name, type) ==
+ std::make_tuple(other.name, other.type);
}
-}
-
-bool operator==(const FlatbufferDetachedBuffer<Channel> &lhs,
- const FlatbufferDetachedBuffer<Channel> &rhs) {
- return lhs.message().name()->string_view() ==
- rhs.message().name()->string_view() &&
- lhs.message().type()->string_view() ==
- rhs.message().type()->string_view();
-}
-
-bool operator<(const FlatbufferDetachedBuffer<Connection> &lhs,
- const FlatbufferDetachedBuffer<Connection> &rhs) {
- return lhs.message().name()->string_view() <
- rhs.message().name()->string_view();
-}
-
-bool operator==(const FlatbufferDetachedBuffer<Connection> &lhs,
- const FlatbufferDetachedBuffer<Connection> &rhs) {
- return lhs.message().name()->string_view() ==
- rhs.message().name()->string_view();
-}
-
-bool operator==(const FlatbufferDetachedBuffer<Application> &lhs,
- const FlatbufferDetachedBuffer<Application> &rhs) {
- return lhs.message().name()->string_view() ==
- rhs.message().name()->string_view();
-}
-
-bool operator<(const FlatbufferDetachedBuffer<Application> &lhs,
- const FlatbufferDetachedBuffer<Application> &rhs) {
- return lhs.message().name()->string_view() <
- rhs.message().name()->string_view();
-}
-
-bool operator==(const FlatbufferDetachedBuffer<Node> &lhs,
- const FlatbufferDetachedBuffer<Node> &rhs) {
- return lhs.message().name()->string_view() ==
- rhs.message().name()->string_view();
-}
-
-bool operator<(const FlatbufferDetachedBuffer<Node> &lhs,
- const FlatbufferDetachedBuffer<Node> &rhs) {
- return lhs.message().name()->string_view() <
- rhs.message().name()->string_view();
-}
-
-namespace configuration {
-namespace {
-
-template <typename T>
-struct FbsContainer {
- FbsContainer(aos::FlatbufferDetachedBuffer<T> table) {
- this->table =
- std::make_unique<FlatbufferDetachedBuffer<T>>(std::move(table));
- }
- std::unique_ptr<FlatbufferDetachedBuffer<T>> table;
- bool operator==(const FbsContainer<T> &other) const {
- return *this->table == *other.table;
- }
- bool operator<(const FbsContainer<T> &other) const {
- return *this->table < *other.table;
+ std::strong_ordering operator<=>(const MutableChannelName &other) const {
+ return std::make_tuple(name, type) <=>
+ std::make_tuple(other.name, other.type);
}
};
-typedef FbsContainer<Channel> ChannelContainer;
-typedef FbsContainer<Connection> ConnectionContainer;
-typedef FbsContainer<Application> ApplicationContainer;
-typedef FbsContainer<Node> NodeContainer;
+// Struct representing a Channel in a way that is easy to work with.
+struct MutableChannel {
+ // See configuration.fbs for a description of what each of these fields
+ // represents.
+ std::string_view name;
+ std::string_view type;
+ std::optional<int32_t> frequency;
+ std::optional<int32_t> max_size;
+ std::optional<int32_t> num_senders;
+ std::optional<int32_t> num_watchers;
+
+ std::string_view source_node;
+ absl::btree_map<std::string_view, MutableConnection> destination_nodes;
+ std::optional<LoggerConfig> logger;
+ absl::btree_set<std::string_view> logger_nodes;
+ std::optional<ReadMethod> read_method;
+ std::optional<int32_t> num_readers;
+
+ std::optional<int64_t> channel_storage_duration;
+};
+
+// Struct representing a Node in a way that is easy to work with.
+struct MutableNode {
+ // See configuration.fbs for a description of what each of these fields
+ // represents.
+ std::string_view name;
+ std::string_view hostname;
+ std::optional<uint16_t> port;
+ absl::btree_set<std::string_view> hostnames;
+ absl::btree_set<std::string_view> tags;
+};
+
+// Struct representing a Map in a way that is easy to work with.
+struct MutableMap {
+ // See configuration.fbs for a description of what each of these fields
+ // represents.
+ MutableChannel match;
+ MutableChannel rename;
+};
+
+// Struct representing an Application in a way that is easy to work with.
+struct MutableApplication {
+ // See configuration.fbs for a description of what each of these fields
+ // represents.
+ std::string_view name;
+ std::string_view executable_name;
+ std::vector<MutableMap> maps;
+ absl::btree_set<std::string_view> nodes;
+ std::string_view user;
+ std::vector<std::string_view> args;
+ std::optional<bool> autostart;
+ std::optional<bool> autorestart;
+ std::optional<uint64_t> memory_limit;
+ std::optional<int64_t> stop_time;
+
+ bool operator==(const MutableApplication &other) const {
+ return name == other.name;
+ }
+ std::strong_ordering operator<=>(const MutableApplication &other) const {
+ return name <=> other.name;
+ }
+};
+
+// Struct representing a Configuration in a way that is easy to work with. To
+// use this class, start with a flatbuffer configuration, call
+// UnpackConfiguration() on it, and then manipulate from there. (Note: this API
+// generally assumes that the lifetime of strings is managed by something else,
+// and a string_view is good enough).
+// PackConfiguration() creates the corresponding flatbuffer from a mutable
+// configuration for downstream use.
+struct MutableConfiguration {
+ // See configuration.fbs for a description of what each of these fields
+ // represents.
+ // TODO(austin): Is this a better object for LogReader to manipulate than raw
+ // configs?
+ absl::btree_map<MutableChannelName, MutableChannel> channels;
+ std::vector<MutableMap> maps;
+ absl::btree_map<std::string_view, MutableNode> nodes;
+ absl::btree_map<std::string_view, MutableApplication> applications;
+ std::optional<uint64_t> channel_storage_duration;
+
+ absl::btree_map<std::string_view, const reflection::Schema *> schemas;
+};
+
+// Unpacks a vector of strings into a set.
+void UnpackStringSet(
+ const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>
+ *strings,
+ absl::btree_set<std::string_view> *result) {
+ for (const flatbuffers::String *str : *strings) {
+ result->insert(str->string_view());
+ }
+}
+
+void UnpackConnection(const Connection *destination_node,
+ MutableConnection *result) {
+ CHECK_EQ(Connection::MiniReflectTypeTable()->num_elems, 5u)
+ << ": Merging logic needs to be updated when the number of connection "
+ "fields changes.";
+ if (destination_node->has_timestamp_logger()) {
+ result->timestamp_logger = destination_node->timestamp_logger();
+ }
+
+ if (destination_node->has_timestamp_logger_nodes()) {
+ UnpackStringSet(destination_node->timestamp_logger_nodes(),
+ &result->timestamp_logger_nodes);
+ }
+ if (destination_node->has_priority()) {
+ result->priority = destination_node->priority();
+ }
+ if (destination_node->has_time_to_live()) {
+ result->time_to_live = destination_node->time_to_live();
+ }
+}
+
+void UnpackChannel(const Channel *channel, MutableChannel *result) {
+ CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 14u)
+ << ": Merging logic needs to be updated when the number of channel "
+ "fields changes.";
+
+ if (channel->has_name()) {
+ result->name = channel->name()->string_view();
+ }
+ if (channel->has_type()) {
+ result->type = channel->type()->string_view();
+ }
+ if (channel->has_frequency()) {
+ result->frequency = channel->frequency();
+ }
+ if (channel->has_max_size()) {
+ result->max_size = channel->max_size();
+ }
+ if (channel->has_num_senders()) {
+ result->num_senders = channel->num_senders();
+ }
+ if (channel->has_num_watchers()) {
+ result->num_watchers = channel->num_watchers();
+ }
+ if (channel->has_source_node()) {
+ result->source_node = channel->source_node()->string_view();
+ }
+ if (channel->has_destination_nodes()) {
+ for (const Connection *destination_node : *channel->destination_nodes()) {
+ MutableConnection &destination =
+ result->destination_nodes
+ .try_emplace(destination_node->name()->string_view(),
+ MutableConnection{
+ .name = destination_node->name()->string_view(),
+ })
+ .first->second;
+ UnpackConnection(destination_node, &destination);
+ }
+ }
+ if (channel->has_logger()) {
+ result->logger = channel->logger();
+ }
+ if (channel->has_logger_nodes()) {
+ UnpackStringSet(channel->logger_nodes(), &(result->logger_nodes));
+ }
+
+ if (channel->has_read_method()) {
+ result->read_method = channel->read_method();
+ }
+ if (channel->has_num_readers()) {
+ result->num_readers = channel->num_readers();
+ }
+ if (channel->has_channel_storage_duration()) {
+ result->channel_storage_duration = channel->channel_storage_duration();
+ }
+}
+
+void UnpackNode(const Node *node, MutableNode *result) {
+ CHECK_EQ(node->name()->string_view(), result->name);
+ CHECK_EQ(Node::MiniReflectTypeTable()->num_elems, 5u)
+ << ": Merging logic needs to be updated when the number of node "
+ "fields changes.";
+ if (node->has_hostname()) {
+ result->hostname = node->hostname()->string_view();
+ }
+ if (node->has_port()) {
+ result->port = node->port();
+ }
+
+ if (node->has_hostnames()) {
+ UnpackStringSet(node->hostnames(), &(result->hostnames));
+ }
+ if (node->has_tags()) {
+ UnpackStringSet(node->tags(), &(result->tags));
+ }
+}
+
+void UnpackMap(const Map *map, MutableMap *result) {
+ CHECK(map->has_match());
+ CHECK(map->has_rename());
+ UnpackChannel(map->match(), &(result->match));
+ UnpackChannel(map->rename(), &(result->rename));
+}
+
+void UnpackApplication(const Application *application,
+ MutableApplication *result) {
+ CHECK_EQ(application->name()->string_view(), result->name);
+
+ if (application->has_executable_name()) {
+ result->executable_name = application->executable_name()->string_view();
+ }
+ if (application->has_maps()) {
+ result->maps.reserve(application->maps()->size());
+ for (const Map *map : *application->maps()) {
+ result->maps.emplace_back();
+ UnpackMap(map, &(result->maps.back()));
+ }
+ }
+
+ if (application->has_nodes()) {
+ UnpackStringSet(application->nodes(), &(result->nodes));
+ }
+
+ if (application->has_user()) {
+ result->user = application->user()->string_view();
+ }
+
+ if (application->has_args()) {
+ // Very important, arguments replace old arguments.
+ result->args.clear();
+
+ result->args.reserve(application->args()->size());
+ for (const flatbuffers::String *arg : *application->args()) {
+ result->args.emplace_back(arg->string_view());
+ }
+ }
+
+ if (application->has_autostart()) {
+ result->autostart = application->autostart();
+ }
+ if (application->has_autorestart()) {
+ result->autorestart = application->autorestart();
+ }
+ if (application->has_memory_limit()) {
+ result->memory_limit = application->memory_limit();
+ }
+ if (application->has_stop_time()) {
+ result->stop_time = application->stop_time();
+ }
+}
+
+void UnpackConfiguration(const Configuration *configuration,
+ MutableConfiguration *result) {
+ if (configuration->has_channels()) {
+ for (const Channel *channel : *configuration->channels()) {
+ // Explode on malformed entries.
+ CHECK(channel->has_name() && channel->has_type());
+
+ // Attempt to insert the channel.
+ MutableChannel &unpacked_channel =
+ result->channels
+ .try_emplace(
+ MutableChannelName{
+ .name = channel->name()->string_view(),
+ .type = channel->type()->string_view(),
+ },
+ MutableChannel{
+ .name = channel->name()->string_view(),
+ .type = channel->type()->string_view(),
+ })
+ .first->second;
+
+ UnpackChannel(channel, &unpacked_channel);
+ if (channel->has_schema()) {
+ result->schemas.emplace(channel->type()->string_view(),
+ channel->schema());
+ }
+ }
+ }
+
+ if (configuration->has_maps()) {
+ result->maps.reserve(configuration->maps()->size());
+
+ for (const Map *map : *configuration->maps()) {
+ CHECK(map->has_match());
+ CHECK(map->has_rename());
+
+ result->maps.emplace_back();
+ UnpackMap(map, &(result->maps.back()));
+ }
+ }
+
+ if (configuration->has_nodes()) {
+ for (const Node *node : *configuration->nodes()) {
+ CHECK(node->has_name());
+
+ MutableNode &unpacked_node =
+ result->nodes
+ .try_emplace(node->name()->string_view(),
+ MutableNode{
+ .name = node->name()->string_view(),
+ })
+ .first->second;
+ UnpackNode(node, &unpacked_node);
+ }
+ }
+
+ if (configuration->has_applications()) {
+ for (const Application *application : *configuration->applications()) {
+ CHECK(application->has_name());
+
+ MutableApplication &unpacked_application =
+ result->applications
+ .try_emplace(application->name()->string_view(),
+ MutableApplication{
+ .name = application->name()->string_view(),
+ })
+ .first->second;
+ UnpackApplication(application, &unpacked_application);
+ }
+ }
+
+ if (configuration->has_channel_storage_duration()) {
+ result->channel_storage_duration =
+ configuration->channel_storage_duration();
+ }
+}
+
+flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+PackStringSet(const absl::btree_set<std::string_view> &set,
+ flatbuffers::FlatBufferBuilder *fbb) {
+ std::vector<flatbuffers::Offset<flatbuffers::String>> strings_offsets;
+ for (const std::string_view &str : set) {
+ strings_offsets.push_back(fbb->CreateSharedString(str));
+ }
+
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+ result;
+
+ if (!strings_offsets.empty()) {
+ result = fbb->CreateVector(strings_offsets);
+ }
+ return result;
+}
+
+flatbuffers::Offset<Connection> PackConnection(
+ const MutableConnection &destination_node,
+ flatbuffers::FlatBufferBuilder *fbb) {
+ flatbuffers::Offset<flatbuffers::String> name_offset =
+ fbb->CreateSharedString(destination_node.name);
+
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+ timestamp_logger_nodes_offset =
+ PackStringSet(destination_node.timestamp_logger_nodes, fbb);
+
+ Connection::Builder connection_builder(*fbb);
+ connection_builder.add_name(name_offset);
+ if (destination_node.timestamp_logger.has_value()) {
+ connection_builder.add_timestamp_logger(
+ destination_node.timestamp_logger.value());
+ }
+
+ if (!timestamp_logger_nodes_offset.IsNull()) {
+ connection_builder.add_timestamp_logger_nodes(
+ timestamp_logger_nodes_offset);
+ }
+ if (destination_node.priority.has_value()) {
+ connection_builder.add_priority(destination_node.priority.value());
+ }
+ if (destination_node.time_to_live.has_value()) {
+ connection_builder.add_time_to_live(destination_node.time_to_live.value());
+ }
+ return connection_builder.Finish();
+}
+
+flatbuffers::Offset<Channel> PackChannel(
+ const MutableChannel &channel,
+ flatbuffers::Offset<reflection::Schema> schema_offset,
+ flatbuffers::FlatBufferBuilder *fbb) {
+ std::vector<flatbuffers::Offset<Connection>> connection_offsets;
+
+ for (const std::pair<const std::string_view, MutableConnection>
+ &destination_node : channel.destination_nodes) {
+ CHECK_EQ(destination_node.first, destination_node.second.name);
+ connection_offsets.push_back(PackConnection(destination_node.second, fbb));
+ }
+
+ flatbuffers::Offset<flatbuffers::String> name_offset;
+ if (!channel.name.empty()) {
+ name_offset = fbb->CreateSharedString(channel.name);
+ }
+ flatbuffers::Offset<flatbuffers::String> type_offset;
+ if (!channel.type.empty()) {
+ type_offset = fbb->CreateSharedString(channel.type);
+ }
+ flatbuffers::Offset<flatbuffers::String> source_node_offset;
+ if (!channel.source_node.empty()) {
+ source_node_offset = fbb->CreateSharedString(channel.source_node);
+ }
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Connection>>>
+ destination_nodes_offset;
+ if (!connection_offsets.empty()) {
+ destination_nodes_offset = fbb->CreateVector(connection_offsets);
+ }
+
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+ logger_nodes_offset = PackStringSet(channel.logger_nodes, fbb);
+
+ Channel::Builder channel_builder(*fbb);
+
+ if (!name_offset.IsNull()) {
+ channel_builder.add_name(name_offset);
+ }
+ if (!type_offset.IsNull()) {
+ channel_builder.add_type(type_offset);
+ }
+ if (channel.frequency.has_value()) {
+ channel_builder.add_frequency(channel.frequency.value());
+ }
+ if (channel.max_size.has_value()) {
+ channel_builder.add_max_size(channel.max_size.value());
+ }
+ if (channel.num_senders.has_value()) {
+ channel_builder.add_num_senders(channel.num_senders.value());
+ }
+ if (channel.num_watchers.has_value()) {
+ channel_builder.add_num_watchers(channel.num_watchers.value());
+ }
+
+ if (!schema_offset.IsNull()) {
+ channel_builder.add_schema(schema_offset);
+ }
+
+ if (!source_node_offset.IsNull()) {
+ channel_builder.add_source_node(source_node_offset);
+ }
+ if (!destination_nodes_offset.IsNull()) {
+ channel_builder.add_destination_nodes(destination_nodes_offset);
+ }
+
+ if (channel.logger.has_value()) {
+ channel_builder.add_logger(channel.logger.value());
+ }
+ if (!logger_nodes_offset.IsNull()) {
+ channel_builder.add_logger_nodes(logger_nodes_offset);
+ }
+ if (channel.read_method.has_value()) {
+ channel_builder.add_read_method(channel.read_method.value());
+ }
+ if (channel.num_readers.has_value()) {
+ channel_builder.add_num_readers(channel.num_readers.value());
+ }
+ if (channel.channel_storage_duration.has_value()) {
+ channel_builder.add_channel_storage_duration(
+ channel.channel_storage_duration.value());
+ }
+
+ return channel_builder.Finish();
+}
+
+flatbuffers::Offset<Node> PackNode(const MutableNode &node,
+ flatbuffers::FlatBufferBuilder *fbb) {
+ flatbuffers::Offset<flatbuffers::String> name_offset =
+ fbb->CreateSharedString(node.name);
+ flatbuffers::Offset<flatbuffers::String> hostname_offset;
+ if (!node.hostname.empty()) {
+ hostname_offset = fbb->CreateSharedString(node.hostname);
+ }
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+ hostnames_offset = PackStringSet(node.hostnames, fbb);
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+ tags_offset = PackStringSet(node.tags, fbb);
+ Node::Builder node_builder(*fbb);
+ node_builder.add_name(name_offset);
+ if (!hostname_offset.IsNull()) {
+ node_builder.add_hostname(hostname_offset);
+ }
+ if (node.port) {
+ node_builder.add_port(node.port.value());
+ }
+ if (!hostnames_offset.IsNull()) {
+ node_builder.add_hostnames(hostnames_offset);
+ }
+ if (!tags_offset.IsNull()) {
+ node_builder.add_tags(tags_offset);
+ }
+ return node_builder.Finish();
+}
+
+flatbuffers::Offset<Map> PackMap(const MutableMap &map,
+ flatbuffers::FlatBufferBuilder *fbb) {
+ flatbuffers::Offset<Channel> match_offset =
+ PackChannel(map.match, flatbuffers::Offset<reflection::Schema>(), fbb);
+ flatbuffers::Offset<Channel> rename_offset =
+ PackChannel(map.rename, flatbuffers::Offset<reflection::Schema>(), fbb);
+ Map::Builder map_builder(*fbb);
+ map_builder.add_match(match_offset);
+ map_builder.add_rename(rename_offset);
+ return map_builder.Finish();
+}
+
+flatbuffers::Offset<Application> PackApplication(
+ const MutableApplication &application,
+ flatbuffers::FlatBufferBuilder *fbb) {
+ flatbuffers::Offset<flatbuffers::String> name_offset =
+ fbb->CreateSharedString(application.name);
+
+ flatbuffers::Offset<flatbuffers::String> executable_name_offset;
+ if (!application.executable_name.empty()) {
+ executable_name_offset =
+ fbb->CreateSharedString(application.executable_name);
+ }
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Map>>>
+ maps_offset;
+ if (!application.maps.empty()) {
+ std::vector<flatbuffers::Offset<Map>> maps_offsets;
+ maps_offsets.reserve(application.maps.size());
+ for (const MutableMap &map : application.maps) {
+ maps_offsets.emplace_back(PackMap(map, fbb));
+ }
+ maps_offset = fbb->CreateVector(maps_offsets);
+ }
+
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+ nodes_offset = PackStringSet(application.nodes, fbb);
+
+ flatbuffers::Offset<flatbuffers::String> user_offset;
+ if (!application.user.empty()) {
+ user_offset = fbb->CreateSharedString(application.user);
+ }
+
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+ args_offset;
+ if (!application.args.empty()) {
+ std::vector<flatbuffers::Offset<flatbuffers::String>> args_offsets;
+ for (const std::string_view arg : application.args) {
+ args_offsets.emplace_back(fbb->CreateSharedString(arg));
+ }
+ args_offset = fbb->CreateVector(args_offsets);
+ }
+
+ Application::Builder application_builder(*fbb);
+ application_builder.add_name(name_offset);
+ if (!executable_name_offset.IsNull()) {
+ application_builder.add_executable_name(executable_name_offset);
+ }
+ if (!maps_offset.IsNull()) {
+ application_builder.add_maps(maps_offset);
+ }
+ if (!nodes_offset.IsNull()) {
+ application_builder.add_nodes(nodes_offset);
+ }
+ if (!user_offset.IsNull()) {
+ application_builder.add_user(user_offset);
+ }
+ if (!args_offset.IsNull()) {
+ application_builder.add_args(args_offset);
+ }
+ if (application.autostart) {
+ application_builder.add_autostart(application.autostart.value());
+ }
+ if (application.autorestart) {
+ application_builder.add_autorestart(application.autorestart.value());
+ }
+ if (application.memory_limit) {
+ application_builder.add_memory_limit(application.memory_limit.value());
+ }
+ if (application.stop_time) {
+ application_builder.add_stop_time(application.stop_time.value());
+ }
+ return application_builder.Finish();
+}
+
+flatbuffers::Offset<Configuration> PackConfiguration(
+ const MutableConfiguration &configuration,
+ flatbuffers::FlatBufferBuilder *fbb) {
+ // Start by building the vectors. They need to come before the final table.
+ // Channels
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
+ channels_offset;
+ {
+ // We want to add channels unconditionally since everyone expects them to be
+ // there.
+ std::map<std::string_view, flatbuffers::Offset<reflection::Schema>>
+ schema_cache;
+
+ std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+ for (const std::pair<const MutableChannelName, MutableChannel>
+ &channel_key : configuration.channels) {
+ CHECK_EQ(channel_key.first.name, channel_key.second.name);
+ CHECK_EQ(channel_key.first.type, channel_key.second.type);
+ const MutableChannel &channel = channel_key.second;
+ auto cached_schema = schema_cache.find(channel.type);
+ flatbuffers::Offset<reflection::Schema> schema_offset;
+ if (cached_schema != schema_cache.end()) {
+ schema_offset = cached_schema->second;
+ } else {
+ auto schema_to_copy_it = configuration.schemas.find(channel.type);
+ if (schema_to_copy_it != configuration.schemas.end()) {
+ schema_offset = RecursiveCopyFlatBuffer<reflection::Schema>(
+ schema_to_copy_it->second, fbb);
+ schema_cache.emplace(channel.type, schema_offset);
+ }
+ }
+
+ channel_offsets.emplace_back(PackChannel(channel, schema_offset, fbb));
+ }
+ channels_offset = fbb->CreateVector(channel_offsets);
+ }
+
+ // Maps
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Map>>>
+ maps_offset;
+ if (!configuration.maps.empty()) {
+ std::vector<flatbuffers::Offset<Map>> map_offsets;
+ for (const MutableMap &map : configuration.maps) {
+ map_offsets.emplace_back(PackMap(map, fbb));
+ }
+ maps_offset = fbb->CreateVector(map_offsets);
+ }
+
+ // Nodes
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>>
+ nodes_offset;
+ if (!configuration.nodes.empty()) {
+ std::vector<flatbuffers::Offset<Node>> node_offsets;
+ for (const std::pair<const std::string_view, MutableNode> &node :
+ configuration.nodes) {
+ CHECK_EQ(node.first, node.second.name);
+ node_offsets.emplace_back(PackNode(node.second, fbb));
+ }
+ nodes_offset = fbb->CreateVector(node_offsets);
+ }
+
+ // Applications
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>>
+ applications_offset;
+ if (!configuration.applications.empty()) {
+ std::vector<flatbuffers::Offset<Application>> applications_offsets;
+ for (const std::pair<const std::string_view, MutableApplication>
+ &application : configuration.applications) {
+ CHECK_EQ(application.first, application.second.name);
+ applications_offsets.emplace_back(
+ PackApplication(application.second, fbb));
+ }
+ applications_offset = fbb->CreateVector(applications_offsets);
+ }
+
+ // And then build a Configuration with them all.
+ ConfigurationBuilder configuration_builder(*fbb);
+ configuration_builder.add_channels(channels_offset);
+ if (!maps_offset.IsNull()) {
+ configuration_builder.add_maps(maps_offset);
+ }
+ if (!nodes_offset.IsNull()) {
+ configuration_builder.add_nodes(nodes_offset);
+ }
+ if (!applications_offset.IsNull()) {
+ configuration_builder.add_applications(applications_offset);
+ }
+ if (configuration.channel_storage_duration) {
+ configuration_builder.add_channel_storage_duration(
+ configuration.channel_storage_duration.value());
+ }
+
+ return configuration_builder.Finish();
+}
// Extracts the folder part of a path. Returns ./ if there is no path.
std::string_view ExtractFolder(const std::string_view filename) {
@@ -351,15 +971,11 @@
return a->name()->string_view() == name;
}
-void ValidateConfiguration(const Flatbuffer<Configuration> &config) {
- // No imports should be left.
- CHECK(!config.message().has_imports());
-
+void ValidateUnmergedConfiguration(const Flatbuffer<Configuration> &config) {
// Check that if there is a node list, all the source nodes are filled out and
// valid, and all the destination nodes are valid (and not the source). This
// is a basic consistency check.
if (config.message().has_channels()) {
- const Channel *last_channel = nullptr;
for (const Channel *c : *config.message().channels()) {
CHECK(c->has_name());
CHECK(c->has_type());
@@ -438,9 +1054,24 @@
}
}
}
+ CHECK_EQ(c->read_method() == ReadMethod::PIN, c->num_readers() != 0)
+ << ": num_readers may be set if and only if read_method is PIN,"
+ " if you want 0 readers do not set PIN: "
+ << CleanedChannelToString(c);
+ }
+ }
+}
- // Make sure everything is sorted while we are here... If this fails,
- // there will be a bunch of weird errors.
+void ValidateConfiguration(const Flatbuffer<Configuration> &config) {
+ // No imports should be left.
+ CHECK(!config.message().has_imports());
+
+ ValidateUnmergedConfiguration(config);
+
+ // A final config is also sorted. Lookups in the config assume it is sorted.
+ if (config.message().has_channels()) {
+ const Channel *last_channel = nullptr;
+ for (const Channel *c : *config.message().channels()) {
if (last_channel != nullptr) {
CHECK(CompareChannels(
last_channel,
@@ -645,180 +1276,17 @@
FlatbufferDetachedBuffer<Configuration> MergeConfiguration(
const Flatbuffer<Configuration> &config) {
- // auto_merge_config will contain all the fields of the Configuration that are
- // to be passed through unmodified to the result of MergeConfiguration().
- // In the processing below, we mutate auto_merge_config to remove any fields
- // which we do need to alter (hence why we can't use the input config
- // directly), and then merge auto_merge_config back in at the end.
- aos::FlatbufferDetachedBuffer<aos::Configuration> auto_merge_config =
- aos::RecursiveCopyFlatBuffer(&config.message());
+ MutableConfiguration unpacked_config;
- // Store all the channels in a sorted set. This lets us track channels we
- // have seen before and merge the updates in.
- absl::btree_set<ChannelContainer> channels;
-
- if (config.message().has_channels()) {
- auto_merge_config.mutable_message()->clear_channels();
- for (const Channel *c : *config.message().channels()) {
- // Ignore malformed entries.
- if (!c->has_name()) {
- continue;
- }
- if (!c->has_type()) {
- continue;
- }
-
- CHECK_EQ(c->read_method() == ReadMethod::PIN, c->num_readers() != 0)
- << ": num_readers may be set if and only if read_method is PIN,"
- " if you want 0 readers do not set PIN: "
- << CleanedChannelToString(c);
-
- // Attempt to insert the channel.
- auto result = channels.insert(RecursiveCopyFlatBuffer(c));
- if (!result.second) {
- // Already there, so merge the new table into the original.
- // Schemas merge poorly, so pick the newest one.
- if (result.first->table->message().has_schema() && c->has_schema()) {
- result.first->table->mutable_message()->clear_schema();
- }
- auto merged =
- MergeFlatBuffers(*result.first->table, RecursiveCopyFlatBuffer(c));
-
- if (merged.message().has_destination_nodes()) {
- absl::btree_set<ConnectionContainer> connections;
- for (const Connection *connection :
- *merged.message().destination_nodes()) {
- auto connection_result =
- connections.insert(RecursiveCopyFlatBuffer(connection));
- if (!connection_result.second) {
- *connection_result.first->table =
- MergeFlatBuffers(*connection_result.first->table,
- RecursiveCopyFlatBuffer(connection));
- }
- }
- if (static_cast<size_t>(connections.size()) !=
- merged.message().destination_nodes()->size()) {
- merged.mutable_message()->clear_destination_nodes();
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
- std::vector<flatbuffers::Offset<Connection>> connection_offsets;
- for (const ConnectionContainer &connection : connections) {
- connection_offsets.push_back(
- RecursiveCopyFlatBuffer(&connection.table->message(), &fbb));
- }
- flatbuffers::Offset<
- flatbuffers::Vector<flatbuffers::Offset<Connection>>>
- destination_nodes_offset = fbb.CreateVector(connection_offsets);
- Channel::Builder channel_builder(fbb);
- channel_builder.add_destination_nodes(destination_nodes_offset);
- fbb.Finish(channel_builder.Finish());
- FlatbufferDetachedBuffer<Channel> destinations_channel(
- fbb.Release());
- merged = MergeFlatBuffers(merged, destinations_channel);
- }
- }
-
- *result.first->table = std::move(merged);
- }
- }
- }
-
- // Now repeat this for the application list.
- absl::btree_set<ApplicationContainer> applications;
- if (config.message().has_applications()) {
- auto_merge_config.mutable_message()->clear_applications();
- for (const Application *a : *config.message().applications()) {
- if (!a->has_name()) {
- continue;
- }
-
- auto result = applications.insert(RecursiveCopyFlatBuffer(a));
- if (!result.second) {
- if (a->has_args()) {
- result.first->table->mutable_message()->clear_args();
- }
- *result.first->table =
- MergeFlatBuffers(*result.first->table, RecursiveCopyFlatBuffer(a));
- }
- }
- }
-
- // Now repeat this for the node list.
- absl::btree_set<NodeContainer> nodes;
- if (config.message().has_nodes()) {
- auto_merge_config.mutable_message()->clear_nodes();
- for (const Node *n : *config.message().nodes()) {
- if (!n->has_name()) {
- continue;
- }
-
- auto result = nodes.insert(RecursiveCopyFlatBuffer(n));
- if (!result.second) {
- *result.first->table =
- MergeFlatBuffers(*result.first->table, RecursiveCopyFlatBuffer(n));
- }
- }
- }
+ // The act of unpacking a config merges everything.
+ UnpackConfiguration(&config.message(), &unpacked_config);
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
- // Start by building the vectors. They need to come before the final table.
- // Channels
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
- channels_offset;
- {
- ::std::vector<flatbuffers::Offset<Channel>> channel_offsets;
- for (const ChannelContainer &c : channels) {
- channel_offsets.emplace_back(
- RecursiveCopyFlatBuffer<Channel>(&c.table->message(), &fbb));
- }
- channels_offset = fbb.CreateVector(channel_offsets);
- }
+ fbb.Finish(PackConfiguration(unpacked_config, &fbb));
- // Applications
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>>
- applications_offset;
- {
- ::std::vector<flatbuffers::Offset<Application>> applications_offsets;
- for (const ApplicationContainer &a : applications) {
- applications_offsets.emplace_back(
- RecursiveCopyFlatBuffer<Application>(&a.table->message(), &fbb));
- }
- applications_offset = fbb.CreateVector(applications_offsets);
- }
-
- // Nodes
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>>
- nodes_offset;
- {
- ::std::vector<flatbuffers::Offset<Node>> node_offsets;
- for (const NodeContainer &n : nodes) {
- node_offsets.emplace_back(
- RecursiveCopyFlatBuffer<Node>(&n.table->message(), &fbb));
- }
- nodes_offset = fbb.CreateVector(node_offsets);
- }
-
- // And then build a Configuration with them all.
- ConfigurationBuilder configuration_builder(fbb);
- configuration_builder.add_channels(channels_offset);
- if (config.message().has_applications()) {
- configuration_builder.add_applications(applications_offset);
- }
- if (config.message().has_nodes()) {
- configuration_builder.add_nodes(nodes_offset);
- }
-
- fbb.Finish(configuration_builder.Finish());
-
- aos::FlatbufferDetachedBuffer<aos::Configuration> modified_config(
- fbb.Release());
-
- // Now, validate that if there is a node list, every channel has a source
- // node.
- FlatbufferDetachedBuffer<Configuration> result =
- MergeFlatBuffers(modified_config, auto_merge_config);
+ FlatbufferDetachedBuffer<aos::Configuration> result(fbb.Release());
ValidateConfiguration(result);
@@ -990,160 +1458,24 @@
FlatbufferDetachedBuffer<Configuration> MergeConfiguration(
const Flatbuffer<Configuration> &config,
const std::vector<aos::FlatbufferVector<reflection::Schema>> &schemas) {
+ MutableConfiguration unpacked_config;
+ UnpackConfiguration(&config.message(), &unpacked_config);
+
+ // Now, add the schemas in so they will get packed.
+ for (const aos::FlatbufferVector<reflection::Schema> &schema : schemas) {
+ CHECK(schema.message().has_root_table());
+ CHECK(schema.message().root_table()->has_name());
+ unpacked_config.schemas.emplace(
+ schema.message().root_table()->name()->string_view(),
+ &schema.message());
+ }
+
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
- // Cache for holding already inserted schemas.
- std::map<std::string_view, flatbuffers::Offset<reflection::Schema>>
- schema_cache;
+ fbb.Finish(PackConfiguration(unpacked_config, &fbb));
- CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 14u)
- << ": Merging logic needs to be updated when the number of channel "
- "fields changes.";
-
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
- channels_offset;
- if (config.message().has_channels()) {
- std::vector<flatbuffers::Offset<Channel>> channel_offsets;
- for (const Channel *c : *config.message().channels()) {
- // Search for a schema with a matching type.
- const aos::FlatbufferVector<reflection::Schema> *found_schema = nullptr;
- for (const aos::FlatbufferVector<reflection::Schema> &schema : schemas) {
- if (schema.message().root_table() != nullptr) {
- if (schema.message().root_table()->name()->string_view() ==
- c->type()->string_view()) {
- found_schema = &schema;
- }
- }
- }
-
- if (found_schema == nullptr) {
- std::stringstream ss;
- for (const aos::FlatbufferVector<reflection::Schema> &schema :
- schemas) {
- if (schema.message().root_table() == nullptr) {
- continue;
- }
- auto name = schema.message().root_table()->name()->string_view();
- ss << "\n\tname: " << name;
- }
- LOG(FATAL) << ": Failed to find schema for " << FlatbufferToJson(c)
- << "\n\tThe following schemas were found:\n"
- << ss.str();
- }
-
- // Now copy the message manually.
- auto cached_schema = schema_cache.find(c->type()->string_view());
- flatbuffers::Offset<reflection::Schema> schema_offset;
- if (cached_schema != schema_cache.end()) {
- schema_offset = cached_schema->second;
- } else {
- schema_offset = RecursiveCopyFlatBuffer<reflection::Schema>(
- &found_schema->message(), &fbb);
- schema_cache.emplace(c->type()->string_view(), schema_offset);
- }
-
- flatbuffers::Offset<flatbuffers::String> name_offset =
- fbb.CreateSharedString(c->name()->str());
- flatbuffers::Offset<flatbuffers::String> type_offset =
- fbb.CreateSharedString(c->type()->str());
- flatbuffers::Offset<flatbuffers::String> source_node_offset =
- c->has_source_node() ? fbb.CreateSharedString(c->source_node()->str())
- : 0;
-
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Connection>>>
- destination_nodes_offset =
- aos::RecursiveCopyVectorTable(c->destination_nodes(), &fbb);
-
- flatbuffers::Offset<
- flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
- logger_nodes_offset =
- aos::CopyVectorSharedString(c->logger_nodes(), &fbb);
-
- Channel::Builder channel_builder(fbb);
- channel_builder.add_name(name_offset);
- channel_builder.add_type(type_offset);
- if (c->has_frequency()) {
- channel_builder.add_frequency(c->frequency());
- }
- if (c->has_max_size()) {
- channel_builder.add_max_size(c->max_size());
- }
- if (c->has_num_senders()) {
- channel_builder.add_num_senders(c->num_senders());
- }
- if (c->has_num_watchers()) {
- channel_builder.add_num_watchers(c->num_watchers());
- }
- channel_builder.add_schema(schema_offset);
- if (!source_node_offset.IsNull()) {
- channel_builder.add_source_node(source_node_offset);
- }
- if (!destination_nodes_offset.IsNull()) {
- channel_builder.add_destination_nodes(destination_nodes_offset);
- }
- if (c->has_logger()) {
- channel_builder.add_logger(c->logger());
- }
- if (!logger_nodes_offset.IsNull()) {
- channel_builder.add_logger_nodes(logger_nodes_offset);
- }
- if (c->has_read_method()) {
- channel_builder.add_read_method(c->read_method());
- }
- if (c->has_num_readers()) {
- channel_builder.add_num_readers(c->num_readers());
- }
- if (c->has_channel_storage_duration()) {
- channel_builder.add_channel_storage_duration(
- c->channel_storage_duration());
- }
- channel_offsets.emplace_back(channel_builder.Finish());
- }
- channels_offset = fbb.CreateVector(channel_offsets);
- }
-
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Map>>>
- maps_offset =
- aos::RecursiveCopyVectorTable(config.message().maps(), &fbb);
-
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>>
- nodes_offset =
- aos::RecursiveCopyVectorTable(config.message().nodes(), &fbb);
-
- flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>>
- applications_offset =
- aos::RecursiveCopyVectorTable(config.message().applications(), &fbb);
-
- // Now insert everything else in unmodified.
- ConfigurationBuilder configuration_builder(fbb);
- if (config.message().has_channels()) {
- configuration_builder.add_channels(channels_offset);
- }
- if (!maps_offset.IsNull()) {
- configuration_builder.add_maps(maps_offset);
- }
- if (!nodes_offset.IsNull()) {
- configuration_builder.add_nodes(nodes_offset);
- }
- if (!applications_offset.IsNull()) {
- configuration_builder.add_applications(applications_offset);
- }
-
- if (config.message().has_channel_storage_duration()) {
- configuration_builder.add_channel_storage_duration(
- config.message().channel_storage_duration());
- }
-
- CHECK_EQ(Configuration::MiniReflectTypeTable()->num_elems, 6u)
- << ": Merging logic needs to be updated when the number of configuration "
- "fields changes.";
-
- fbb.Finish(configuration_builder.Finish());
- aos::FlatbufferDetachedBuffer<aos::Configuration> modified_config(
- fbb.Release());
-
- return modified_config;
+ return aos::FlatbufferDetachedBuffer<aos::Configuration>(fbb.Release());
}
const Node *GetNodeFromHostname(const Configuration *config,
@@ -1744,6 +2076,9 @@
CHECK(node->has_name());
overrides.source_node = node->name()->string_view();
}
+
+ // TODO(austin): Use MutableConfiguration to represent this transform more
+ // efficiently.
flatbuffers::FlatBufferBuilder fbb;
// Don't populate fields from overrides that the user doesn't explicitly
// override.
@@ -1763,6 +2098,8 @@
FlatbufferDetachedBuffer<Configuration> GetPartialConfiguration(
const Configuration &configuration,
std::function<bool(const Channel &)> should_include_channel) {
+ // TODO(austin): Use MutableConfiguration to represent this better.
+ //
// create new_configuration1, containing everything except the `channels`
// field.
FlatbufferDetachedBuffer<Configuration> new_configuration1 =