| #include "aos/configuration.h" |
| |
| #include <arpa/inet.h> |
| #include <ifaddrs.h> |
| #include <netinet/in.h> |
| #include <sys/types.h> |
| #include <unistd.h> |
| |
| #include <cstdlib> |
| #include <cstring> |
| #include <map> |
| #include <set> |
| #include <string> |
| #include <string_view> |
| #include <vector> |
| |
| #include "absl/container/btree_set.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/str_join.h" |
| #include "absl/strings/str_split.h" |
| #include "aos/configuration_generated.h" |
| #include "aos/flatbuffer_merge.h" |
| #include "aos/json_to_flatbuffer.h" |
| #include "aos/network/team_number.h" |
| #include "aos/unique_malloc_ptr.h" |
| #include "aos/util/file.h" |
| #include "gflags/gflags.h" |
| #include "glog/logging.h" |
| |
| namespace aos { |
| namespace { |
| namespace chrono = std::chrono; |
| |
| bool EndsWith(std::string_view str, std::string_view end) { |
| if (str.size() < end.size()) { |
| return false; |
| } |
| if (str.substr(str.size() - end.size(), end.size()) != end) { |
| return false; |
| } |
| return true; |
| } |
| |
| std::string MaybeReplaceExtension(std::string_view filename, |
| std::string_view extension, |
| std::string_view replacement) { |
| if (!EndsWith(filename, extension)) { |
| return std::string(filename); |
| } |
| filename.remove_suffix(extension.size()); |
| return absl::StrCat(filename, replacement); |
| } |
| |
| FlatbufferDetachedBuffer<Configuration> ReadConfigFile(std::string_view path, |
| bool binary) { |
| if (binary) { |
| FlatbufferVector<Configuration> config = |
| FileToFlatbuffer<Configuration>(path); |
| return CopySpanAsDetachedBuffer(config.span()); |
| } |
| |
| flatbuffers::DetachedBuffer buffer = JsonToFlatbuffer( |
| util::ReadFileToStringOrDie(path), ConfigurationTypeTable()); |
| |
| CHECK_GT(buffer.size(), 0u) << ": Failed to parse JSON file: " << path; |
| |
| return FlatbufferDetachedBuffer<Configuration>(std::move(buffer)); |
| } |
| |
| } // namespace |
| // Define the compare and equal operators for Channel and Application so we can |
| // insert them in the btree below. |
| bool operator<(const FlatbufferDetachedBuffer<Channel> &lhs, |
| const FlatbufferDetachedBuffer<Channel> &rhs) { |
| int name_compare = lhs.message().name()->string_view().compare( |
| rhs.message().name()->string_view()); |
| if (name_compare == 0) { |
| return lhs.message().type()->string_view() < |
| rhs.message().type()->string_view(); |
| } else if (name_compare < 0) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| bool operator==(const FlatbufferDetachedBuffer<Channel> &lhs, |
| const FlatbufferDetachedBuffer<Channel> &rhs) { |
| return lhs.message().name()->string_view() == |
| rhs.message().name()->string_view() && |
| lhs.message().type()->string_view() == |
| rhs.message().type()->string_view(); |
| } |
| |
| bool operator<(const FlatbufferDetachedBuffer<Connection> &lhs, |
| const FlatbufferDetachedBuffer<Connection> &rhs) { |
| return lhs.message().name()->string_view() < |
| rhs.message().name()->string_view(); |
| } |
| |
| bool operator==(const FlatbufferDetachedBuffer<Connection> &lhs, |
| const FlatbufferDetachedBuffer<Connection> &rhs) { |
| return lhs.message().name()->string_view() == |
| rhs.message().name()->string_view(); |
| } |
| |
| bool operator==(const FlatbufferDetachedBuffer<Application> &lhs, |
| const FlatbufferDetachedBuffer<Application> &rhs) { |
| return lhs.message().name()->string_view() == |
| rhs.message().name()->string_view(); |
| } |
| |
| bool operator<(const FlatbufferDetachedBuffer<Application> &lhs, |
| const FlatbufferDetachedBuffer<Application> &rhs) { |
| return lhs.message().name()->string_view() < |
| rhs.message().name()->string_view(); |
| } |
| |
| bool operator==(const FlatbufferDetachedBuffer<Node> &lhs, |
| const FlatbufferDetachedBuffer<Node> &rhs) { |
| return lhs.message().name()->string_view() == |
| rhs.message().name()->string_view(); |
| } |
| |
| bool operator<(const FlatbufferDetachedBuffer<Node> &lhs, |
| const FlatbufferDetachedBuffer<Node> &rhs) { |
| return lhs.message().name()->string_view() < |
| rhs.message().name()->string_view(); |
| } |
| |
| namespace configuration { |
| namespace { |
| |
| // Extracts the folder part of a path. Returns ./ if there is no path. |
| std::string_view ExtractFolder(const std::string_view filename) { |
| auto last_slash_pos = filename.find_last_of("/\\"); |
| |
| return last_slash_pos == std::string_view::npos |
| ? std::string_view("./") |
| : filename.substr(0, last_slash_pos + 1); |
| } |
| |
| std::string AbsolutePath(const std::string_view filename) { |
| // Uses an std::string so that we know the input will be null-terminated. |
| const std::string terminated_file(filename); |
| char buffer[PATH_MAX]; |
| PCHECK(NULL != realpath(terminated_file.c_str(), buffer)); |
| return buffer; |
| } |
| |
| std::string RemoveDotDots(const std::string_view filename) { |
| std::vector<std::string> split = absl::StrSplit(filename, '/'); |
| auto iterator = split.begin(); |
| while (iterator != split.end()) { |
| if (iterator->empty()) { |
| iterator = split.erase(iterator); |
| } else if (*iterator == ".") { |
| iterator = split.erase(iterator); |
| } else if (*iterator == "..") { |
| CHECK(iterator != split.begin()) |
| << ": Import path may not start with ..: " << filename; |
| auto previous = iterator; |
| --previous; |
| split.erase(iterator); |
| iterator = split.erase(previous); |
| } else { |
| ++iterator; |
| } |
| } |
| return absl::StrJoin(split, "/"); |
| } |
| |
| std::optional<FlatbufferDetachedBuffer<Configuration>> MaybeReadConfig( |
| const std::string_view path, absl::btree_set<std::string> *visited_paths, |
| const std::vector<std::string_view> &extra_import_paths) { |
| std::string binary_path = MaybeReplaceExtension(path, ".json", ".bfbs"); |
| VLOG(1) << "Looking up: " << path << ", starting with: " << binary_path; |
| bool binary_path_exists = util::PathExists(binary_path); |
| std::string raw_path(path); |
| // For each .json file, look and see if we can find a .bfbs file next to it |
| // with the same base name. If we can, assume it is the same and use it |
| // instead. It is much faster to load .bfbs files than .json files. |
| if (!binary_path_exists && !util::PathExists(raw_path)) { |
| const bool path_is_absolute = raw_path.size() > 0 && raw_path[0] == '/'; |
| if (path_is_absolute) { |
| // Nowhere else to look up an absolute path, so fail now. Note that we |
| // always have at least one extra import path based on /proc/self/exe, so |
| // warning about those paths existing isn't helpful. |
| LOG(ERROR) << ": Failed to find file " << path << "."; |
| return std::nullopt; |
| } |
| |
| bool found_path = false; |
| for (const auto &import_path : extra_import_paths) { |
| raw_path = std::string(import_path) + "/" + RemoveDotDots(path); |
| binary_path = MaybeReplaceExtension(raw_path, ".json", ".bfbs"); |
| VLOG(1) << "Checking: " << binary_path; |
| binary_path_exists = util::PathExists(binary_path); |
| if (binary_path_exists) { |
| found_path = true; |
| break; |
| } |
| VLOG(1) << "Checking: " << raw_path; |
| if (util::PathExists(raw_path)) { |
| found_path = true; |
| break; |
| } |
| } |
| if (!found_path) { |
| LOG(ERROR) << ": Failed to find file " << path << "."; |
| return std::nullopt; |
| } |
| } |
| |
| std::optional<FlatbufferDetachedBuffer<Configuration>> config = |
| ReadConfigFile(binary_path_exists ? binary_path : raw_path, |
| binary_path_exists); |
| |
| // Depth first. Take the following example: |
| // |
| // config1.json: |
| // { |
| // "channels": [ |
| // { |
| // "name": "/foo", |
| // "type": ".aos.bar", |
| // "max_size": 5 |
| // } |
| // ], |
| // "imports": [ |
| // "config2.json", |
| // ] |
| // } |
| // |
| // config2.json: |
| // { |
| // "channels": [ |
| // { |
| // "name": "/foo", |
| // "type": ".aos.bar", |
| // "max_size": 7 |
| // } |
| // ], |
| // } |
| // |
| // We want the main config (config1.json) to be able to override the imported |
| // config. That means that it needs to be merged into the imported configs, |
| // not the other way around. |
| |
| const std::string absolute_path = |
| AbsolutePath(binary_path_exists ? binary_path : raw_path); |
| // Track that we have seen this file before recursing. Track the path we |
| // actually loaded (which should be consistent if imported twice). |
| if (!visited_paths->insert(absolute_path).second) { |
| for (const auto &visited_path : *visited_paths) { |
| LOG(INFO) << "Already visited: " << visited_path; |
| } |
| LOG(FATAL) |
| << "Already imported " << path << " (i.e. " << absolute_path |
| << "). See above for the files that have already been processed."; |
| return std::nullopt; |
| } |
| |
| if (config->message().has_imports()) { |
| // Capture the imports. |
| const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>> *v = |
| config->message().imports(); |
| |
| // And then wipe them. This gets GCed when we merge later. |
| config->mutable_message()->clear_imports(); |
| |
| // Start with an empty configuration to merge into. |
| FlatbufferDetachedBuffer<Configuration> merged_config = |
| FlatbufferDetachedBuffer<Configuration>::Empty(); |
| |
| const std::string path_folder(ExtractFolder(path)); |
| for (const flatbuffers::String *str : *v) { |
| const std::string included_config = |
| path_folder + "/" + std::string(str->string_view()); |
| |
| const auto optional_config = |
| MaybeReadConfig(included_config, visited_paths, extra_import_paths); |
| if (!optional_config.has_value()) { |
| return std::nullopt; |
| } |
| // And them merge everything in. |
| merged_config = MergeFlatBuffers(merged_config, *optional_config); |
| } |
| |
| // Finally, merge this file in. |
| config = MergeFlatBuffers(merged_config, *config); |
| } |
| return config; |
| } |
| |
| // Compares (c < p) a channel, and a name, type tuple. |
| bool CompareChannels(const Channel *c, |
| ::std::pair<std::string_view, std::string_view> p) { |
| int name_compare = c->name()->string_view().compare(p.first); |
| if (name_compare == 0) { |
| return c->type()->string_view() < p.second; |
| } else if (name_compare < 0) { |
| return true; |
| } else { |
| return false; |
| } |
| }; |
| |
| // Compares for equality (c == p) a channel, and a name, type tuple. |
| bool EqualsChannels(const Channel *c, |
| ::std::pair<std::string_view, std::string_view> p) { |
| return c->name()->string_view() == p.first && |
| c->type()->string_view() == p.second; |
| } |
| |
| // Compares (c < p) an application, and a name; |
| bool CompareApplications(const Application *a, std::string_view name) { |
| return a->name()->string_view() < name; |
| }; |
| |
| // Compares for equality (c == p) an application, and a name; |
| bool EqualsApplications(const Application *a, std::string_view name) { |
| return a->name()->string_view() == name; |
| } |
| |
| void ValidateConfiguration(const Flatbuffer<Configuration> &config) { |
| // No imports should be left. |
| CHECK(!config.message().has_imports()); |
| |
| // Check that if there is a node list, all the source nodes are filled out and |
| // valid, and all the destination nodes are valid (and not the source). This |
| // is a basic consistency check. |
| if (config.message().has_channels()) { |
| const Channel *last_channel = nullptr; |
| for (const Channel *c : *config.message().channels()) { |
| CHECK(c->has_name()); |
| CHECK(c->has_type()); |
| if (c->name()->string_view().back() == '/') { |
| LOG(FATAL) << "Channel names can't end with '/'"; |
| } |
| if (c->name()->string_view().find("//") != std::string_view::npos) { |
| LOG(FATAL) << ": Invalid channel name " << c->name()->string_view() |
| << ", can't use //."; |
| } |
| for (const char data : c->name()->string_view()) { |
| if (data >= '0' && data <= '9') { |
| continue; |
| } |
| if (data >= 'a' && data <= 'z') { |
| continue; |
| } |
| if (data >= 'A' && data <= 'Z') { |
| continue; |
| } |
| if (data == '-' || data == '_' || data == '/') { |
| continue; |
| } |
| LOG(FATAL) << "Invalid channel name " << c->name()->string_view() |
| << ", can only use [-a-zA-Z0-9_/]"; |
| } |
| |
| CHECK_LT(QueueSize(&config.message(), c) + QueueScratchBufferSize(c), |
| std::numeric_limits<uint16_t>::max()) |
| << ": More messages/second configured than the queue can hold on " |
| << CleanedChannelToString(c) << ", " << c->frequency() << "hz for " |
| << config.message().channel_storage_duration() << "ns"; |
| |
| if (c->has_logger_nodes()) { |
| // Confirm that we don't have duplicate logger nodes. |
| absl::btree_set<std::string_view> logger_nodes; |
| for (const flatbuffers::String *s : *c->logger_nodes()) { |
| logger_nodes.insert(s->string_view()); |
| } |
| CHECK_EQ(static_cast<size_t>(logger_nodes.size()), |
| c->logger_nodes()->size()) |
| << ": Found duplicate logger_nodes in " |
| << CleanedChannelToString(c); |
| } |
| |
| if (c->has_destination_nodes()) { |
| // Confirm that we don't have duplicate timestamp logger nodes. |
| for (const Connection *d : *c->destination_nodes()) { |
| if (d->has_timestamp_logger_nodes()) { |
| absl::btree_set<std::string_view> timestamp_logger_nodes; |
| for (const flatbuffers::String *s : *d->timestamp_logger_nodes()) { |
| timestamp_logger_nodes.insert(s->string_view()); |
| } |
| CHECK_EQ(static_cast<size_t>(timestamp_logger_nodes.size()), |
| d->timestamp_logger_nodes()->size()) |
| << ": Found duplicate timestamp_logger_nodes in " |
| << CleanedChannelToString(c); |
| } |
| } |
| |
| // There is no good use case today for logging timestamps but not the |
| // corresponding data. Instead of plumbing through all of this on the |
| // reader side, let'd just disallow it for now. |
| if (c->logger() == LoggerConfig::NOT_LOGGED) { |
| for (const Connection *d : *c->destination_nodes()) { |
| CHECK(d->timestamp_logger() == LoggerConfig::NOT_LOGGED) |
| << ": Logging timestamps without data is not supported. If " |
| "you have a good use case, let's talk. " |
| << CleanedChannelToString(c); |
| } |
| } |
| } |
| |
| // Make sure everything is sorted while we are here... If this fails, |
| // there will be a bunch of weird errors. |
| if (last_channel != nullptr) { |
| CHECK(CompareChannels( |
| last_channel, |
| std::make_pair(c->name()->string_view(), c->type()->string_view()))) |
| << ": Channels not sorted!"; |
| } |
| last_channel = c; |
| } |
| } |
| |
| if (config.message().has_nodes() && config.message().has_channels()) { |
| for (const Channel *c : *config.message().channels()) { |
| CHECK(c->has_source_node()) << ": Channel " << FlatbufferToJson(c) |
| << " is missing \"source_node\""; |
| CHECK(GetNode(&config.message(), c->source_node()->string_view()) != |
| nullptr) |
| << ": Channel " << FlatbufferToJson(c) |
| << " has an unknown \"source_node\""; |
| |
| if (c->has_destination_nodes()) { |
| for (const Connection *connection : *c->destination_nodes()) { |
| CHECK(connection->has_name()); |
| CHECK(GetNode(&config.message(), connection->name()->string_view()) != |
| nullptr) |
| << ": Channel " << FlatbufferToJson(c) |
| << " has an unknown \"destination_nodes\" " |
| << connection->name()->string_view(); |
| |
| switch (connection->timestamp_logger()) { |
| case LoggerConfig::LOCAL_LOGGER: |
| case LoggerConfig::NOT_LOGGED: |
| CHECK(!connection->has_timestamp_logger_nodes()) |
| << ": " << CleanedChannelToString(c); |
| break; |
| case LoggerConfig::REMOTE_LOGGER: |
| case LoggerConfig::LOCAL_AND_REMOTE_LOGGER: |
| CHECK(connection->has_timestamp_logger_nodes()); |
| CHECK_GT(connection->timestamp_logger_nodes()->size(), 0u); |
| for (const flatbuffers::String *timestamp_logger_node : |
| *connection->timestamp_logger_nodes()) { |
| CHECK(GetNode(&config.message(), |
| timestamp_logger_node->string_view()) != nullptr) |
| << ": Channel " << FlatbufferToJson(c) |
| << " has an unknown \"timestamp_logger_node\"" |
| << connection->name()->string_view(); |
| } |
| break; |
| } |
| |
| CHECK_NE(connection->name()->string_view(), |
| c->source_node()->string_view()) |
| << ": Channel " << FlatbufferToJson(c) |
| << " is forwarding data to itself"; |
| } |
| } |
| } |
| } |
| } |
| |
| void HandleReverseMaps( |
| const flatbuffers::Vector<flatbuffers::Offset<aos::Map>> *maps, |
| std::string_view type, const Node *node, std::set<std::string> *names) { |
| for (const Map *map : *maps) { |
| CHECK_NOTNULL(map); |
| const Channel *const match = CHECK_NOTNULL(map->match()); |
| const Channel *const rename = CHECK_NOTNULL(map->rename()); |
| |
| // Handle type specific maps. |
| const flatbuffers::String *const match_type_string = match->type(); |
| if (match_type_string != nullptr && |
| match_type_string->string_view() != type) { |
| continue; |
| } |
| |
| // Now handle node specific maps. |
| const flatbuffers::String *const match_source_node_string = |
| match->source_node(); |
| if (node != nullptr && match_source_node_string != nullptr && |
| match_source_node_string->string_view() != |
| node->name()->string_view()) { |
| continue; |
| } |
| |
| const flatbuffers::String *const match_name_string = match->name(); |
| const flatbuffers::String *const rename_name_string = rename->name(); |
| if (match_name_string == nullptr || rename_name_string == nullptr) { |
| continue; |
| } |
| |
| const std::string rename_name = rename_name_string->str(); |
| const std::string_view match_name = match_name_string->string_view(); |
| |
| std::set<std::string> possible_renames; |
| |
| // Check if the current name(s) could have been reached using the provided |
| // rename. |
| if (match_name.back() == '*') { |
| for (const std::string &option : *names) { |
| if (option.substr(0, rename_name.size()) == rename_name) { |
| possible_renames.insert( |
| absl::StrCat(match_name.substr(0, match_name.size() - 1), |
| option.substr(rename_name.size()))); |
| } |
| } |
| names->insert(possible_renames.begin(), possible_renames.end()); |
| } else if (names->count(rename_name) != 0) { |
| names->insert(std::string(match_name)); |
| } |
| } |
| } |
| |
| } // namespace |
| |
| // Maps name for the provided maps. Modifies name. |
| // |
| // This is called many times during startup, and it dereferences a lot of |
| // pointers. These combine to make it a performance hotspot during many tests |
| // under msan, so there is some optimizing around caching intermediates instead |
| // of dereferencing the pointer multiple times. |
| // |
| // Deliberately not in an anonymous namespace so that the log-reading code can |
| // reference it. |
| void HandleMaps(const flatbuffers::Vector<flatbuffers::Offset<aos::Map>> *maps, |
| std::string *name, std::string_view type, const Node *node) { |
| // For the same reason we merge configs in reverse order, we want to process |
| // maps in reverse order. That lets the outer config overwrite channels from |
| // the inner configs. |
| for (auto i = maps->rbegin(); i != maps->rend(); ++i) { |
| const Channel *const match = i->match(); |
| if (!match) { |
| continue; |
| } |
| const flatbuffers::String *const match_name_string = match->name(); |
| if (!match_name_string) { |
| continue; |
| } |
| const Channel *const rename = i->rename(); |
| if (!rename) { |
| continue; |
| } |
| const flatbuffers::String *const rename_name_string = rename->name(); |
| if (!rename_name_string) { |
| continue; |
| } |
| |
| // Handle normal maps (now that we know that match and rename are filled |
| // out). |
| const std::string_view match_name = match_name_string->string_view(); |
| if (match_name != *name) { |
| if (match_name.back() == '*' && |
| std::string_view(*name).substr( |
| 0, std::min(name->size(), match_name.size() - 1)) == |
| match_name.substr(0, match_name.size() - 1)) { |
| CHECK_EQ(match_name.find('*'), match_name.size() - 1); |
| } else { |
| continue; |
| } |
| } |
| |
| // Handle type specific maps. |
| const flatbuffers::String *const match_type_string = match->type(); |
| if (match_type_string && match_type_string->string_view() != type) { |
| continue; |
| } |
| |
| // Now handle node specific maps. |
| const flatbuffers::String *const match_source_node_string = |
| match->source_node(); |
| if (node && match_source_node_string && |
| match_source_node_string->string_view() != |
| node->name()->string_view()) { |
| continue; |
| } |
| |
| std::string new_name(rename_name_string->string_view()); |
| if (match_name.back() == '*') { |
| new_name += std::string(name->substr(match_name.size() - 1)); |
| } |
| VLOG(1) << "Renamed \"" << *name << "\" to \"" << new_name << "\""; |
| *name = std::move(new_name); |
| } |
| } |
| |
| std::set<std::string> GetChannelAliases(const Configuration *config, |
| std::string_view name, |
| std::string_view type, |
| const std::string_view application_name, |
| const Node *node) { |
| std::set<std::string> names{std::string(name)}; |
| if (config->has_maps()) { |
| HandleReverseMaps(config->maps(), type, node, &names); |
| } |
| { |
| const Application *application = |
| GetApplication(config, node, application_name); |
| if (application != nullptr && application->has_maps()) { |
| HandleReverseMaps(application->maps(), type, node, &names); |
| } |
| } |
| return names; |
| } |
| |
| FlatbufferDetachedBuffer<Configuration> MergeConfiguration( |
| const Flatbuffer<Configuration> &config) { |
| // auto_merge_config will contain all the fields of the Configuration that are |
| // to be passed through unmodified to the result of MergeConfiguration(). |
| // In the processing below, we mutate auto_merge_config to remove any fields |
| // which we do need to alter (hence why we can't use the input config |
| // directly), and then merge auto_merge_config back in at the end. |
| aos::FlatbufferDetachedBuffer<aos::Configuration> auto_merge_config = |
| aos::RecursiveCopyFlatBuffer(&config.message()); |
| |
| // Store all the channels in a sorted set. This lets us track channels we |
| // have seen before and merge the updates in. |
| absl::btree_set<FlatbufferDetachedBuffer<Channel>> channels; |
| |
| if (config.message().has_channels()) { |
| auto_merge_config.mutable_message()->clear_channels(); |
| for (const Channel *c : *config.message().channels()) { |
| // Ignore malformed entries. |
| if (!c->has_name()) { |
| continue; |
| } |
| if (!c->has_type()) { |
| continue; |
| } |
| |
| CHECK_EQ(c->read_method() == ReadMethod::PIN, c->num_readers() != 0) |
| << ": num_readers may be set if and only if read_method is PIN," |
| " if you want 0 readers do not set PIN: " |
| << CleanedChannelToString(c); |
| |
| // Attempt to insert the channel. |
| auto result = channels.insert(RecursiveCopyFlatBuffer(c)); |
| if (!result.second) { |
| // Already there, so merge the new table into the original. |
| // Schemas merge poorly, so pick the newest one. |
| if (result.first->message().has_schema() && c->has_schema()) { |
| result.first->mutable_message()->clear_schema(); |
| } |
| auto merged = |
| MergeFlatBuffers(*result.first, RecursiveCopyFlatBuffer(c)); |
| |
| if (merged.message().has_destination_nodes()) { |
| absl::btree_set<FlatbufferDetachedBuffer<Connection>> connections; |
| for (const Connection *connection : |
| *merged.message().destination_nodes()) { |
| auto connection_result = |
| connections.insert(RecursiveCopyFlatBuffer(connection)); |
| if (!connection_result.second) { |
| *connection_result.first = |
| MergeFlatBuffers(*connection_result.first, |
| RecursiveCopyFlatBuffer(connection)); |
| } |
| } |
| if (static_cast<size_t>(connections.size()) != |
| merged.message().destination_nodes()->size()) { |
| merged.mutable_message()->clear_destination_nodes(); |
| flatbuffers::FlatBufferBuilder fbb; |
| fbb.ForceDefaults(true); |
| std::vector<flatbuffers::Offset<Connection>> connection_offsets; |
| for (const FlatbufferDetachedBuffer<Connection> &connection : |
| connections) { |
| connection_offsets.push_back( |
| RecursiveCopyFlatBuffer(&connection.message(), &fbb)); |
| } |
| flatbuffers::Offset< |
| flatbuffers::Vector<flatbuffers::Offset<Connection>>> |
| destination_nodes_offset = fbb.CreateVector(connection_offsets); |
| Channel::Builder channel_builder(fbb); |
| channel_builder.add_destination_nodes(destination_nodes_offset); |
| fbb.Finish(channel_builder.Finish()); |
| FlatbufferDetachedBuffer<Channel> destinations_channel( |
| fbb.Release()); |
| merged = MergeFlatBuffers(merged, destinations_channel); |
| } |
| } |
| |
| *result.first = std::move(merged); |
| } |
| } |
| } |
| |
| // Now repeat this for the application list. |
| absl::btree_set<FlatbufferDetachedBuffer<Application>> applications; |
| if (config.message().has_applications()) { |
| auto_merge_config.mutable_message()->clear_applications(); |
| for (const Application *a : *config.message().applications()) { |
| if (!a->has_name()) { |
| continue; |
| } |
| |
| auto result = applications.insert(RecursiveCopyFlatBuffer(a)); |
| if (!result.second) { |
| if (a->has_args()) { |
| result.first->mutable_message()->clear_args(); |
| } |
| *result.first = |
| MergeFlatBuffers(*result.first, RecursiveCopyFlatBuffer(a)); |
| } |
| } |
| } |
| |
| // Now repeat this for the node list. |
| absl::btree_set<FlatbufferDetachedBuffer<Node>> nodes; |
| if (config.message().has_nodes()) { |
| auto_merge_config.mutable_message()->clear_nodes(); |
| for (const Node *n : *config.message().nodes()) { |
| if (!n->has_name()) { |
| continue; |
| } |
| |
| auto result = nodes.insert(RecursiveCopyFlatBuffer(n)); |
| if (!result.second) { |
| *result.first = |
| MergeFlatBuffers(*result.first, RecursiveCopyFlatBuffer(n)); |
| } |
| } |
| } |
| |
| flatbuffers::FlatBufferBuilder fbb; |
| fbb.ForceDefaults(true); |
| |
| // Start by building the vectors. They need to come before the final table. |
| // Channels |
| flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>> |
| channels_offset; |
| { |
| ::std::vector<flatbuffers::Offset<Channel>> channel_offsets; |
| for (const FlatbufferDetachedBuffer<Channel> &c : channels) { |
| channel_offsets.emplace_back( |
| RecursiveCopyFlatBuffer<Channel>(&c.message(), &fbb)); |
| } |
| channels_offset = fbb.CreateVector(channel_offsets); |
| } |
| |
| // Applications |
| flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>> |
| applications_offset; |
| { |
| ::std::vector<flatbuffers::Offset<Application>> applications_offsets; |
| for (const FlatbufferDetachedBuffer<Application> &a : applications) { |
| applications_offsets.emplace_back( |
| RecursiveCopyFlatBuffer<Application>(&a.message(), &fbb)); |
| } |
| applications_offset = fbb.CreateVector(applications_offsets); |
| } |
| |
| // Nodes |
| flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>> |
| nodes_offset; |
| { |
| ::std::vector<flatbuffers::Offset<Node>> node_offsets; |
| for (const FlatbufferDetachedBuffer<Node> &n : nodes) { |
| node_offsets.emplace_back( |
| RecursiveCopyFlatBuffer<Node>(&n.message(), &fbb)); |
| } |
| nodes_offset = fbb.CreateVector(node_offsets); |
| } |
| |
| // And then build a Configuration with them all. |
| ConfigurationBuilder configuration_builder(fbb); |
| configuration_builder.add_channels(channels_offset); |
| if (config.message().has_applications()) { |
| configuration_builder.add_applications(applications_offset); |
| } |
| if (config.message().has_nodes()) { |
| configuration_builder.add_nodes(nodes_offset); |
| } |
| |
| fbb.Finish(configuration_builder.Finish()); |
| |
| aos::FlatbufferDetachedBuffer<aos::Configuration> modified_config( |
| fbb.Release()); |
| |
| // Now, validate that if there is a node list, every channel has a source |
| // node. |
| FlatbufferDetachedBuffer<Configuration> result = |
| MergeFlatBuffers(modified_config, auto_merge_config); |
| |
| ValidateConfiguration(result); |
| |
| return result; |
| } |
| |
| std::optional<FlatbufferDetachedBuffer<Configuration>> MaybeReadConfig( |
| const std::string_view path, |
| const std::vector<std::string_view> &extra_import_paths) { |
| // Add the executable directory to the search path. That makes it so that |
| // tools can be run from any directory without hard-coding an absolute path to |
| // the config into all binaries. |
| std::vector<std::string_view> extra_import_paths_with_exe = |
| extra_import_paths; |
| char proc_self_exec_buffer[PATH_MAX + 1]; |
| std::memset(proc_self_exec_buffer, 0, sizeof(proc_self_exec_buffer)); |
| ssize_t s = readlink("/proc/self/exe", proc_self_exec_buffer, PATH_MAX); |
| if (s > 0) { |
| // If the readlink call fails, the worst thing that happens is that we don't |
| // automatically find the config next to the binary. VLOG to make it easier |
| // to debug. |
| std::string_view proc_self_exec(proc_self_exec_buffer); |
| |
| extra_import_paths_with_exe.emplace_back( |
| proc_self_exec.substr(0, proc_self_exec.rfind("/"))); |
| } else { |
| VLOG(1) << "Failed to read /proc/self/exe"; |
| } |
| |
| // We only want to read a file once. So track the visited files in a set. |
| absl::btree_set<std::string> visited_paths; |
| std::optional<FlatbufferDetachedBuffer<Configuration>> read_config = |
| MaybeReadConfig(path, &visited_paths, extra_import_paths_with_exe); |
| |
| if (read_config == std::nullopt) { |
| return read_config; |
| } |
| |
| // If we only read one file, and it had a .bfbs extension, it has to be a |
| // fully formatted config. Do a quick verification and return it. |
| if (visited_paths.size() == 1 && EndsWith(*visited_paths.begin(), ".bfbs")) { |
| ValidateConfiguration(*read_config); |
| return read_config; |
| } |
| |
| return MergeConfiguration(*read_config); |
| } |
| |
| FlatbufferDetachedBuffer<Configuration> ReadConfig( |
| const std::string_view path, |
| const std::vector<std::string_view> &extra_import_paths) { |
| auto optional_config = MaybeReadConfig(path, extra_import_paths); |
| CHECK(optional_config) << "Could not read config. See above errors"; |
| return std::move(*optional_config); |
| } |
| |
| FlatbufferDetachedBuffer<Configuration> MergeWithConfig( |
| const Configuration *config, const Flatbuffer<Configuration> &addition) { |
| return MergeConfiguration(MergeFlatBuffers(config, &addition.message())); |
| } |
| |
| FlatbufferDetachedBuffer<Configuration> MergeWithConfig( |
| const Configuration *config, std::string_view json) { |
| FlatbufferDetachedBuffer<Configuration> addition = |
| JsonToFlatbuffer(json, Configuration::MiniReflectTypeTable()); |
| |
| return MergeWithConfig(config, addition); |
| } |
| |
| const Channel *GetChannel(const Configuration *config, std::string_view name, |
| std::string_view type, |
| std::string_view application_name, const Node *node, |
| bool quiet) { |
| if (!config->has_channels()) { |
| return nullptr; |
| } |
| |
| const std::string_view original_name = name; |
| std::string mutable_name; |
| if (node != nullptr) { |
| VLOG(1) << "Looking up { \"name\": \"" << name << "\", \"type\": \"" << type |
| << "\" } on " << aos::FlatbufferToJson(node); |
| } else { |
| VLOG(1) << "Looking up { \"name\": \"" << name << "\", \"type\": \"" << type |
| << "\" }"; |
| } |
| |
| // First handle application specific maps. Only do this if we have a matching |
| // application name, and it has maps. |
| { |
| const Application *application = |
| GetApplication(config, node, application_name); |
| if (application != nullptr && application->has_maps()) { |
| mutable_name = std::string(name); |
| HandleMaps(application->maps(), &mutable_name, type, node); |
| name = std::string_view(mutable_name); |
| } |
| } |
| |
| // Now do global maps. |
| if (config->has_maps()) { |
| mutable_name = std::string(name); |
| HandleMaps(config->maps(), &mutable_name, type, node); |
| name = std::string_view(mutable_name); |
| } |
| |
| if (original_name != name) { |
| VLOG(1) << "Remapped to { \"name\": \"" << name << "\", \"type\": \"" |
| << type << "\" }"; |
| } |
| |
| // Then look for the channel (note that this relies on the channels being |
| // sorted in the config). |
| auto channel_iterator = |
| std::lower_bound(config->channels()->cbegin(), config->channels()->cend(), |
| std::make_pair(name, type), CompareChannels); |
| |
| // Make sure we actually found it, and it matches. |
| if (channel_iterator != config->channels()->cend() && |
| EqualsChannels(*channel_iterator, std::make_pair(name, type))) { |
| if (VLOG_IS_ON(2)) { |
| VLOG(2) << "Found: " << FlatbufferToJson(*channel_iterator); |
| } else if (VLOG_IS_ON(1)) { |
| VLOG(1) << "Found: " << CleanedChannelToString(*channel_iterator); |
| } |
| return *channel_iterator; |
| } else { |
| VLOG(1) << "No match for { \"name\": \"" << name << "\", \"type\": \"" |
| << type << "\" }"; |
| if (original_name != name && !quiet) { |
| LOG(WARNING) << "Remapped from {\"name\": \"" << original_name |
| << "\", \"type\": \"" << type << "\"}, to {\"name\": \"" |
| << name << "\", \"type\": \"" << type |
| << "\"}, but no channel by that name exists."; |
| } |
| return nullptr; |
| } |
| } |
| |
| size_t ChannelIndex(const Configuration *configuration, |
| const Channel *channel) { |
| CHECK(configuration->channels() != nullptr) << ": No channels"; |
| |
| const auto c = std::lower_bound( |
| configuration->channels()->cbegin(), configuration->channels()->cend(), |
| std::make_pair(channel->name()->string_view(), |
| channel->type()->string_view()), |
| CompareChannels); |
| CHECK(c != configuration->channels()->cend()) |
| << ": Channel pointer not found in configuration()->channels()"; |
| CHECK(*c == channel) |
| << ": Channel pointer not found in configuration()->channels()"; |
| |
| return std::distance(configuration->channels()->cbegin(), c); |
| } |
| |
| std::string CleanedChannelToString(const Channel *channel) { |
| FlatbufferDetachedBuffer<Channel> cleaned_channel = CopyFlatBuffer(channel); |
| cleaned_channel.mutable_message()->clear_schema(); |
| return FlatbufferToJson(cleaned_channel); |
| } |
| |
| std::string StrippedChannelToString(const Channel *channel) { |
| return absl::StrCat("{ \"name\": \"", channel->name()->string_view(), |
| "\", \"type\": \"", channel->type()->string_view(), |
| "\" }"); |
| } |
| |
| FlatbufferDetachedBuffer<Configuration> MergeConfiguration( |
| const Flatbuffer<Configuration> &config, |
| const std::vector<aos::FlatbufferVector<reflection::Schema>> &schemas) { |
| flatbuffers::FlatBufferBuilder fbb; |
| fbb.ForceDefaults(true); |
| |
| // Cache for holding already inserted schemas. |
| std::map<std::string_view, flatbuffers::Offset<reflection::Schema>> |
| schema_cache; |
| |
| CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 13u) |
| << ": Merging logic needs to be updated when the number of channel " |
| "fields changes."; |
| |
| flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>> |
| channels_offset; |
| if (config.message().has_channels()) { |
| std::vector<flatbuffers::Offset<Channel>> channel_offsets; |
| for (const Channel *c : *config.message().channels()) { |
| // Search for a schema with a matching type. |
| const aos::FlatbufferVector<reflection::Schema> *found_schema = nullptr; |
| for (const aos::FlatbufferVector<reflection::Schema> &schema : schemas) { |
| if (schema.message().root_table() != nullptr) { |
| if (schema.message().root_table()->name()->string_view() == |
| c->type()->string_view()) { |
| found_schema = &schema; |
| } |
| } |
| } |
| |
| CHECK(found_schema != nullptr) |
| << ": Failed to find schema for " << FlatbufferToJson(c); |
| |
| // Now copy the message manually. |
| auto cached_schema = schema_cache.find(c->type()->string_view()); |
| flatbuffers::Offset<reflection::Schema> schema_offset; |
| if (cached_schema != schema_cache.end()) { |
| schema_offset = cached_schema->second; |
| } else { |
| schema_offset = RecursiveCopyFlatBuffer<reflection::Schema>( |
| &found_schema->message(), &fbb); |
| schema_cache.emplace(c->type()->string_view(), schema_offset); |
| } |
| |
| flatbuffers::Offset<flatbuffers::String> name_offset = |
| fbb.CreateSharedString(c->name()->str()); |
| flatbuffers::Offset<flatbuffers::String> type_offset = |
| fbb.CreateSharedString(c->type()->str()); |
| flatbuffers::Offset<flatbuffers::String> source_node_offset = |
| c->has_source_node() ? fbb.CreateSharedString(c->source_node()->str()) |
| : 0; |
| |
| flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Connection>>> |
| destination_nodes_offset = |
| aos::RecursiveCopyVectorTable(c->destination_nodes(), &fbb); |
| |
| flatbuffers::Offset< |
| flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>> |
| logger_nodes_offset = |
| aos::CopyVectorSharedString(c->logger_nodes(), &fbb); |
| |
| Channel::Builder channel_builder(fbb); |
| channel_builder.add_name(name_offset); |
| channel_builder.add_type(type_offset); |
| if (c->has_frequency()) { |
| channel_builder.add_frequency(c->frequency()); |
| } |
| if (c->has_max_size()) { |
| channel_builder.add_max_size(c->max_size()); |
| } |
| if (c->has_num_senders()) { |
| channel_builder.add_num_senders(c->num_senders()); |
| } |
| if (c->has_num_watchers()) { |
| channel_builder.add_num_watchers(c->num_watchers()); |
| } |
| channel_builder.add_schema(schema_offset); |
| if (!source_node_offset.IsNull()) { |
| channel_builder.add_source_node(source_node_offset); |
| } |
| if (!destination_nodes_offset.IsNull()) { |
| channel_builder.add_destination_nodes(destination_nodes_offset); |
| } |
| if (c->has_logger()) { |
| channel_builder.add_logger(c->logger()); |
| } |
| if (!logger_nodes_offset.IsNull()) { |
| channel_builder.add_logger_nodes(logger_nodes_offset); |
| } |
| if (c->has_read_method()) { |
| channel_builder.add_read_method(c->read_method()); |
| } |
| if (c->has_num_readers()) { |
| channel_builder.add_num_readers(c->num_readers()); |
| } |
| channel_offsets.emplace_back(channel_builder.Finish()); |
| } |
| channels_offset = fbb.CreateVector(channel_offsets); |
| } |
| |
| flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Map>>> |
| maps_offset = |
| aos::RecursiveCopyVectorTable(config.message().maps(), &fbb); |
| |
| flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>> |
| nodes_offset = |
| aos::RecursiveCopyVectorTable(config.message().nodes(), &fbb); |
| |
| flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>> |
| applications_offset = |
| aos::RecursiveCopyVectorTable(config.message().applications(), &fbb); |
| |
| // Now insert everything else in unmodified. |
| ConfigurationBuilder configuration_builder(fbb); |
| if (config.message().has_channels()) { |
| configuration_builder.add_channels(channels_offset); |
| } |
| if (!maps_offset.IsNull()) { |
| configuration_builder.add_maps(maps_offset); |
| } |
| if (!nodes_offset.IsNull()) { |
| configuration_builder.add_nodes(nodes_offset); |
| } |
| if (!applications_offset.IsNull()) { |
| configuration_builder.add_applications(applications_offset); |
| } |
| |
| if (config.message().has_channel_storage_duration()) { |
| configuration_builder.add_channel_storage_duration( |
| config.message().channel_storage_duration()); |
| } |
| |
| CHECK_EQ(Configuration::MiniReflectTypeTable()->num_elems, 6u) |
| << ": Merging logic needs to be updated when the number of configuration " |
| "fields changes."; |
| |
| fbb.Finish(configuration_builder.Finish()); |
| aos::FlatbufferDetachedBuffer<aos::Configuration> modified_config( |
| fbb.Release()); |
| |
| return modified_config; |
| } |
| |
| const Node *GetNodeFromHostname(const Configuration *config, |
| std::string_view hostname) { |
| for (const Node *node : *config->nodes()) { |
| if (node->has_hostname() && node->hostname()->string_view() == hostname) { |
| return node; |
| } |
| if (node->has_hostnames()) { |
| for (const auto &candidate : *node->hostnames()) { |
| if (candidate->string_view() == hostname) { |
| return node; |
| } |
| } |
| } |
| } |
| return nullptr; |
| } |
| |
| const Node *GetMyNode(const Configuration *config) { |
| const std::string hostname = (FLAGS_override_hostname.size() > 0) |
| ? FLAGS_override_hostname |
| : network::GetHostname(); |
| const Node *node = GetNodeFromHostname(config, hostname); |
| if (node != nullptr) return node; |
| |
| LOG(FATAL) << "Unknown node for host: " << hostname |
| << ". Consider using --override_hostname if hostname detection " |
| "is wrong."; |
| return nullptr; |
| } |
| |
| const Node *GetNode(const Configuration *config, const Node *node) { |
| if (!MultiNode(config)) { |
| CHECK(node == nullptr) << ": Provided a node in a single node world."; |
| return nullptr; |
| } else { |
| CHECK(node != nullptr); |
| CHECK(node->has_name()); |
| return GetNode(config, node->name()->string_view()); |
| } |
| } |
| |
| const Node *GetNode(const Configuration *config, std::string_view name) { |
| CHECK(config->has_nodes()) |
| << ": Asking for a node from a single node configuration."; |
| for (const Node *node : *config->nodes()) { |
| CHECK(node->has_name()) << ": Malformed node " << FlatbufferToJson(node); |
| if (node->name()->string_view() == name) { |
| return node; |
| } |
| } |
| return nullptr; |
| } |
| |
| const Node *GetNode(const Configuration *config, size_t node_index) { |
| if (!MultiNode(config)) { |
| CHECK_EQ(node_index, 0u) << ": Invalid node in a single node world."; |
| return nullptr; |
| } else { |
| CHECK_LT(node_index, config->nodes()->size()); |
| return config->nodes()->Get(node_index); |
| } |
| } |
| |
| const Node *GetNodeOrDie(const Configuration *config, const Node *node) { |
| if (!MultiNode(config)) { |
| CHECK(node == nullptr) << ": Provided a node in a single node world."; |
| return nullptr; |
| } else { |
| const Node *config_node = GetNode(config, node); |
| if (config_node == nullptr) { |
| LOG(FATAL) << "Couldn't find node matching " << FlatbufferToJson(node); |
| } |
| return config_node; |
| } |
| } |
| |
| namespace { |
| int GetNodeIndexFromConfig(const Configuration *config, const Node *node) { |
| int node_index = 0; |
| for (const Node *iterated_node : *config->nodes()) { |
| if (iterated_node == node) { |
| return node_index; |
| } |
| ++node_index; |
| } |
| return -1; |
| } |
| } // namespace |
| |
| aos::FlatbufferDetachedBuffer<aos::Configuration> AddSchema( |
| std::string_view json, |
| const std::vector<aos::FlatbufferVector<reflection::Schema>> &schemas) { |
| FlatbufferDetachedBuffer<Configuration> addition = |
| JsonToFlatbuffer(json, Configuration::MiniReflectTypeTable()); |
| return MergeConfiguration(addition, schemas); |
| } |
| |
| int GetNodeIndex(const Configuration *config, const Node *node) { |
| if (!MultiNode(config)) { |
| return 0; |
| } |
| |
| { |
| int node_index = GetNodeIndexFromConfig(config, node); |
| if (node_index != -1) { |
| return node_index; |
| } |
| } |
| |
| const Node *result = GetNode(config, node); |
| CHECK(result != nullptr); |
| |
| { |
| int node_index = GetNodeIndexFromConfig(config, result); |
| if (node_index != -1) { |
| return node_index; |
| } |
| } |
| |
| LOG(FATAL) << "Node " << FlatbufferToJson(node) |
| << " not found in the configuration."; |
| } |
| |
| int GetNodeIndex(const Configuration *config, std::string_view name) { |
| if (!MultiNode(config)) { |
| return 0; |
| } |
| |
| { |
| int node_index = 0; |
| for (const Node *iterated_node : *config->nodes()) { |
| if (iterated_node->name()->string_view() == name) { |
| return node_index; |
| } |
| ++node_index; |
| } |
| } |
| LOG(FATAL) << "Node " << name << " not found in the configuration."; |
| } |
| |
| size_t NodesCount(const Configuration *config) { |
| if (!MultiNode(config)) { |
| return 1u; |
| } |
| |
| return config->nodes()->size(); |
| } |
| |
| std::vector<const Node *> GetNodes(const Configuration *config) { |
| std::vector<const Node *> nodes; |
| if (MultiNode(config)) { |
| for (const Node *node : *config->nodes()) { |
| nodes.emplace_back(node); |
| } |
| } else { |
| nodes.emplace_back(nullptr); |
| } |
| return nodes; |
| } |
| |
| std::vector<const Node *> GetNodesWithTag(const Configuration *config, |
| std::string_view tag) { |
| std::vector<const Node *> nodes; |
| if (!MultiNode(config)) { |
| nodes.emplace_back(nullptr); |
| } else { |
| for (const Node *node : *config->nodes()) { |
| if (!node->has_tags()) { |
| continue; |
| } |
| bool did_found_tag = false; |
| for (const flatbuffers::String *found_tag : *node->tags()) { |
| if (found_tag->string_view() == tag) { |
| did_found_tag = true; |
| break; |
| } |
| } |
| if (did_found_tag) { |
| nodes.emplace_back(node); |
| } |
| } |
| } |
| return nodes; |
| } |
| |
| bool NodeHasTag(const Node *node, std::string_view tag) { |
| if (node == nullptr) { |
| return true; |
| } |
| |
| if (!node->has_tags()) { |
| return false; |
| } |
| |
| const auto *const tags = node->tags(); |
| return std::find_if(tags->begin(), tags->end(), |
| [tag](const flatbuffers::String *candidate) { |
| return candidate->string_view() == tag; |
| }) != tags->end(); |
| } |
| |
| bool MultiNode(const Configuration *config) { return config->has_nodes(); } |
| |
| bool ChannelIsSendableOnNode(const Channel *channel, const Node *node) { |
| if (node == nullptr) { |
| return true; |
| } |
| CHECK(channel->has_source_node()) << FlatbufferToJson(channel); |
| CHECK(node->has_name()) << FlatbufferToJson(node); |
| return (CHECK_NOTNULL(channel)->source_node()->string_view() == |
| node->name()->string_view()); |
| } |
| |
| bool ChannelIsReadableOnNode(const Channel *channel, const Node *node) { |
| if (node == nullptr) { |
| return true; |
| } |
| |
| if (channel->source_node()->string_view() == node->name()->string_view()) { |
| return true; |
| } |
| |
| if (!channel->has_destination_nodes()) { |
| return false; |
| } |
| |
| for (const Connection *connection : *channel->destination_nodes()) { |
| CHECK(connection->has_name()); |
| if (connection->name()->string_view() == node->name()->string_view()) { |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| bool ChannelMessageIsLoggedOnNode(const Channel *channel, const Node *node) { |
| if (node == nullptr) { |
| // Single node world. If there is a local logger, then we want to use |
| // it. |
| if (channel->logger() == LoggerConfig::LOCAL_LOGGER) { |
| return true; |
| } else if (channel->logger() == LoggerConfig::NOT_LOGGED) { |
| return false; |
| } |
| LOG(FATAL) << "Unsupported logging configuration in a single node world: " |
| << CleanedChannelToString(channel); |
| } |
| return ChannelMessageIsLoggedOnNode( |
| channel, CHECK_NOTNULL(node)->name()->string_view()); |
| } |
| |
| bool ChannelMessageIsLoggedOnNode(const Channel *channel, |
| std::string_view node_name) { |
| switch (channel->logger()) { |
| case LoggerConfig::LOCAL_LOGGER: |
| return channel->source_node()->string_view() == node_name; |
| case LoggerConfig::LOCAL_AND_REMOTE_LOGGER: |
| CHECK(channel->has_logger_nodes()) |
| << "Missing logger nodes on " << StrippedChannelToString(channel); |
| CHECK_GT(channel->logger_nodes()->size(), 0u) |
| << "Missing logger nodes on " << StrippedChannelToString(channel); |
| |
| if (channel->source_node()->string_view() == node_name) { |
| return true; |
| } |
| |
| [[fallthrough]]; |
| case LoggerConfig::REMOTE_LOGGER: |
| CHECK(channel->has_logger_nodes()) |
| << "Missing logger nodes on " << StrippedChannelToString(channel); |
| CHECK_GT(channel->logger_nodes()->size(), 0u) |
| << "Missing logger nodes on " << StrippedChannelToString(channel); |
| for (const flatbuffers::String *logger_node : *channel->logger_nodes()) { |
| if (logger_node->string_view() == node_name) { |
| return true; |
| } |
| } |
| |
| return false; |
| case LoggerConfig::NOT_LOGGED: |
| return false; |
| } |
| |
| LOG(FATAL) << "Unknown logger config " << static_cast<int>(channel->logger()); |
| } |
| |
| size_t ConnectionCount(const Channel *channel) { |
| if (!channel->has_destination_nodes()) { |
| return 0; |
| } |
| return channel->destination_nodes()->size(); |
| } |
| |
| const Connection *ConnectionToNode(const Channel *channel, const Node *node) { |
| if (!channel->has_destination_nodes()) { |
| return nullptr; |
| } |
| for (const Connection *connection : *channel->destination_nodes()) { |
| if (connection->name()->string_view() == node->name()->string_view()) { |
| return connection; |
| } |
| } |
| return nullptr; |
| } |
| |
| bool ConnectionDeliveryTimeIsLoggedOnNode(const Channel *channel, |
| const Node *node, |
| const Node *logger_node) { |
| return ConnectionDeliveryTimeIsLoggedOnNode(ConnectionToNode(channel, node), |
| logger_node); |
| } |
| |
| bool ConnectionDeliveryTimeIsLoggedOnNode(const Connection *connection, |
| const Node *node) { |
| if (connection == nullptr) { |
| return false; |
| } |
| switch (connection->timestamp_logger()) { |
| case LoggerConfig::LOCAL_AND_REMOTE_LOGGER: |
| CHECK(connection->has_timestamp_logger_nodes()); |
| CHECK_GT(connection->timestamp_logger_nodes()->size(), 0u); |
| if (connection->name()->string_view() == node->name()->string_view()) { |
| return true; |
| } |
| |
| [[fallthrough]]; |
| case LoggerConfig::REMOTE_LOGGER: |
| CHECK(connection->has_timestamp_logger_nodes()); |
| CHECK_GT(connection->timestamp_logger_nodes()->size(), 0u); |
| for (const flatbuffers::String *timestamp_logger_node : |
| *connection->timestamp_logger_nodes()) { |
| if (timestamp_logger_node->string_view() == |
| node->name()->string_view()) { |
| return true; |
| } |
| } |
| |
| return false; |
| case LoggerConfig::LOCAL_LOGGER: |
| return connection->name()->string_view() == node->name()->string_view(); |
| case LoggerConfig::NOT_LOGGED: |
| return false; |
| } |
| |
| LOG(FATAL) << "Unknown logger config " |
| << static_cast<int>(connection->timestamp_logger()); |
| } |
| |
| std::vector<std::string_view> SourceNodeNames(const Configuration *config, |
| const Node *my_node) { |
| std::set<std::string_view> result_set; |
| |
| for (const Channel *channel : *config->channels()) { |
| if (channel->has_destination_nodes()) { |
| for (const Connection *connection : *channel->destination_nodes()) { |
| if (connection->name()->string_view() == |
| my_node->name()->string_view()) { |
| result_set.insert(channel->source_node()->string_view()); |
| } |
| } |
| } |
| } |
| |
| std::vector<std::string_view> result; |
| for (const std::string_view source : result_set) { |
| VLOG(1) << "Found a source node of " << source; |
| result.emplace_back(source); |
| } |
| return result; |
| } |
| |
| std::vector<std::string_view> DestinationNodeNames(const Configuration *config, |
| const Node *my_node) { |
| std::vector<std::string_view> result; |
| |
| for (const Channel *channel : *config->channels()) { |
| if (channel->has_source_node() && channel->source_node()->string_view() == |
| my_node->name()->string_view()) { |
| if (!channel->has_destination_nodes()) continue; |
| |
| if (channel->source_node()->string_view() != |
| my_node->name()->string_view()) { |
| continue; |
| } |
| |
| for (const Connection *connection : *channel->destination_nodes()) { |
| if (std::find(result.begin(), result.end(), |
| connection->name()->string_view()) == result.end()) { |
| result.emplace_back(connection->name()->string_view()); |
| } |
| } |
| } |
| } |
| |
| for (const std::string_view destination : result) { |
| VLOG(1) << "Found a destination node of " << destination; |
| } |
| return result; |
| } |
| |
| std::vector<const Node *> TimestampNodes(const Configuration *config, |
| const Node *my_node) { |
| if (!configuration::MultiNode(config)) { |
| CHECK(my_node == nullptr); |
| return std::vector<const Node *>{}; |
| } |
| |
| std::set<const Node *> timestamp_logger_nodes; |
| for (const Channel *channel : *config->channels()) { |
| if (!configuration::ChannelIsSendableOnNode(channel, my_node)) { |
| continue; |
| } |
| if (!channel->has_destination_nodes()) { |
| continue; |
| } |
| for (const Connection *connection : *channel->destination_nodes()) { |
| const Node *other_node = |
| configuration::GetNode(config, connection->name()->string_view()); |
| |
| if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection, |
| my_node)) { |
| VLOG(1) << "Timestamps are logged from " |
| << FlatbufferToJson(other_node); |
| timestamp_logger_nodes.insert(other_node); |
| } |
| } |
| } |
| |
| std::vector<const Node *> result; |
| for (const Node *node : timestamp_logger_nodes) { |
| result.emplace_back(node); |
| } |
| return result; |
| } |
| |
| bool ApplicationShouldStart(const Configuration *config, const Node *my_node, |
| const Application *application) { |
| if (MultiNode(config)) { |
| // Ok, we need |
| CHECK(application->has_nodes()); |
| CHECK(my_node != nullptr); |
| for (const flatbuffers::String *str : *application->nodes()) { |
| if (str->string_view() == my_node->name()->string_view()) { |
| return true; |
| } |
| } |
| return false; |
| } else { |
| return true; |
| } |
| } |
| |
| const Application *GetApplication(const Configuration *config, |
| const Node *my_node, |
| std::string_view application_name) { |
| if (config->has_applications()) { |
| auto application_iterator = std::lower_bound( |
| config->applications()->cbegin(), config->applications()->cend(), |
| application_name, CompareApplications); |
| if (application_iterator != config->applications()->cend() && |
| EqualsApplications(*application_iterator, application_name)) { |
| if (ApplicationShouldStart(config, my_node, *application_iterator)) { |
| return *application_iterator; |
| } |
| } |
| } |
| return nullptr; |
| } |
| |
| std::vector<size_t> SourceNodeIndex(const Configuration *config) { |
| CHECK(config->has_channels()); |
| std::vector<size_t> result; |
| result.resize(config->channels()->size(), 0u); |
| if (MultiNode(config)) { |
| for (size_t i = 0; i < config->channels()->size(); ++i) { |
| result[i] = GetNodeIndex( |
| config, config->channels()->Get(i)->source_node()->string_view()); |
| } |
| } |
| return result; |
| } |
| |
| int QueueSize(const Configuration *config, const Channel *channel) { |
| return QueueSize(channel->frequency(), |
| chrono::nanoseconds(config->channel_storage_duration())); |
| } |
| |
| int QueueSize(size_t frequency, chrono::nanoseconds channel_storage_duration) { |
| // Use integer arithmetic and round up at all cost. |
| return static_cast<int>( |
| (999999999 + static_cast<int64_t>(frequency) * |
| static_cast<int64_t>(channel_storage_duration.count())) / |
| static_cast<int64_t>(1000000000)); |
| } |
| |
| int QueueScratchBufferSize(const Channel *channel) { |
| return channel->num_readers() + channel->num_senders(); |
| } |
| |
| // Searches through configurations for schemas that include a certain type |
| const reflection::Schema *GetSchema(const Configuration *config, |
| std::string_view schema_type) { |
| if (config->has_channels()) { |
| std::vector<flatbuffers::Offset<Channel>> channel_offsets; |
| for (const Channel *c : *config->channels()) { |
| if (schema_type == c->type()->string_view()) { |
| return c->schema(); |
| } |
| } |
| } |
| return nullptr; |
| } |
| |
| // Copy schema reflection into detached flatbuffer |
| std::optional<FlatbufferDetachedBuffer<reflection::Schema>> |
| GetSchemaDetachedBuffer(const Configuration *config, |
| std::string_view schema_type) { |
| const reflection::Schema *found_schema = GetSchema(config, schema_type); |
| if (found_schema == nullptr) { |
| return std::nullopt; |
| } |
| return RecursiveCopyFlatBuffer(found_schema); |
| } |
| |
| aos::FlatbufferDetachedBuffer<Configuration> AddChannelToConfiguration( |
| const Configuration *config, std::string_view name, |
| aos::FlatbufferVector<reflection::Schema> schema, const aos::Node *node, |
| ChannelT overrides) { |
| overrides.name = name; |
| CHECK(schema.message().has_root_table()); |
| overrides.type = schema.message().root_table()->name()->string_view(); |
| if (node != nullptr) { |
| CHECK(node->has_name()); |
| overrides.source_node = node->name()->string_view(); |
| } |
| flatbuffers::FlatBufferBuilder fbb; |
| // Don't populate fields from overrides that the user doesn't explicitly |
| // override. |
| fbb.ForceDefaults(false); |
| const flatbuffers::Offset<Channel> channel_offset = |
| Channel::Pack(fbb, &overrides); |
| const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>> |
| channels_offset = fbb.CreateVector({channel_offset}); |
| Configuration::Builder config_builder(fbb); |
| config_builder.add_channels(channels_offset); |
| fbb.Finish(config_builder.Finish()); |
| FlatbufferDetachedBuffer<Configuration> new_channel_config = fbb.Release(); |
| new_channel_config = MergeConfiguration(new_channel_config, {schema}); |
| return MergeWithConfig(config, new_channel_config); |
| } |
| |
| } // namespace configuration |
| } // namespace aos |