Eric Schmiedeberg | e279b53 | 2023-04-19 16:36:02 -0600 | [diff] [blame] | 1 | #include "aos/events/logging/config_remapper.h" |
| 2 | |
| 3 | #include <vector> |
| 4 | |
Austin Schuh | 99f7c6a | 2024-06-25 22:07:44 -0700 | [diff] [blame] | 5 | #include "absl/log/check.h" |
| 6 | #include "absl/log/log.h" |
Eric Schmiedeberg | e279b53 | 2023-04-19 16:36:02 -0600 | [diff] [blame] | 7 | #include "absl/strings/escaping.h" |
| 8 | #include "flatbuffers/flatbuffers.h" |
| 9 | |
| 10 | #include "aos/events/logging/logger_generated.h" |
| 11 | #include "aos/flatbuffer_merge.h" |
| 12 | #include "aos/json_to_flatbuffer.h" |
| 13 | #include "aos/network/multinode_timestamp_filter.h" |
| 14 | #include "aos/network/remote_message_generated.h" |
| 15 | #include "aos/network/remote_message_schema.h" |
| 16 | #include "aos/network/team_number.h" |
| 17 | #include "aos/network/timestamp_channel.h" |
| 18 | |
| 19 | namespace aos { |
| 20 | using message_bridge::RemoteMessage; |
| 21 | |
| 22 | namespace { |
| 23 | // Checks if the specified channel name/type exists in the config and, depending |
| 24 | // on the value of conflict_handling, calls conflict_handler or just dies. |
| 25 | template <typename F> |
| 26 | void CheckAndHandleRemapConflict( |
| 27 | std::string_view new_name, std::string_view new_type, |
| 28 | const Configuration *config, |
| 29 | ConfigRemapper::RemapConflict conflict_handling, F conflict_handler) { |
| 30 | const Channel *existing_channel = |
| 31 | configuration::GetChannel(config, new_name, new_type, "", nullptr, true); |
| 32 | if (existing_channel != nullptr) { |
| 33 | switch (conflict_handling) { |
| 34 | case ConfigRemapper::RemapConflict::kDisallow: |
| 35 | LOG(FATAL) |
| 36 | << "Channel " |
| 37 | << configuration::StrippedChannelToString(existing_channel) |
| 38 | << " is already used--you can't remap an original channel to it."; |
| 39 | break; |
| 40 | case ConfigRemapper::RemapConflict::kCascade: |
| 41 | VLOG(1) << "Automatically remapping " |
| 42 | << configuration::StrippedChannelToString(existing_channel) |
| 43 | << " to avoid conflicts."; |
| 44 | conflict_handler(); |
| 45 | break; |
| 46 | } |
| 47 | } |
| 48 | } |
| 49 | } // namespace |
| 50 | |
| 51 | namespace configuration { |
| 52 | // We don't really want to expose this publicly, but log reader doesn't really |
| 53 | // want to re-implement it. |
| 54 | void HandleMaps(const flatbuffers::Vector<flatbuffers::Offset<Map>> *maps, |
| 55 | std::string *name, std::string_view type, const Node *node); |
| 56 | } // namespace configuration |
| 57 | |
| 58 | bool CompareChannels(const Channel *c, |
| 59 | ::std::pair<std::string_view, std::string_view> p) { |
| 60 | int name_compare = c->name()->string_view().compare(p.first); |
| 61 | if (name_compare == 0) { |
| 62 | return c->type()->string_view() < p.second; |
| 63 | } else if (name_compare < 0) { |
| 64 | return true; |
| 65 | } else { |
| 66 | return false; |
| 67 | } |
| 68 | } |
| 69 | |
| 70 | bool EqualsChannels(const Channel *c, |
| 71 | ::std::pair<std::string_view, std::string_view> p) { |
| 72 | return c->name()->string_view() == p.first && |
| 73 | c->type()->string_view() == p.second; |
| 74 | } |
| 75 | // Copies the channel, removing the schema as we go. If new_name is provided, |
| 76 | // it is used instead of the name inside the channel. If new_type is provided, |
| 77 | // it is used instead of the type in the channel. |
| 78 | flatbuffers::Offset<Channel> CopyChannel(const Channel *c, |
| 79 | std::string_view new_name, |
| 80 | std::string_view new_type, |
| 81 | flatbuffers::FlatBufferBuilder *fbb) { |
| 82 | CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 14u) |
| 83 | << ": Merging logic needs to be updated when the number of channel " |
| 84 | "fields changes."; |
| 85 | |
| 86 | flatbuffers::Offset<flatbuffers::String> name_offset = |
| 87 | fbb->CreateSharedString(new_name.empty() ? c->name()->string_view() |
| 88 | : new_name); |
| 89 | flatbuffers::Offset<flatbuffers::String> type_offset = |
| 90 | fbb->CreateSharedString(new_type.empty() ? c->type()->str() : new_type); |
| 91 | flatbuffers::Offset<flatbuffers::String> source_node_offset = |
| 92 | c->has_source_node() ? fbb->CreateSharedString(c->source_node()->str()) |
| 93 | : 0; |
| 94 | |
| 95 | flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Connection>>> |
| 96 | destination_nodes_offset = |
| 97 | RecursiveCopyVectorTable(c->destination_nodes(), fbb); |
| 98 | |
| 99 | flatbuffers::Offset< |
| 100 | flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>> |
| 101 | logger_nodes_offset = CopyVectorSharedString(c->logger_nodes(), fbb); |
| 102 | |
| 103 | Channel::Builder channel_builder(*fbb); |
| 104 | channel_builder.add_name(name_offset); |
| 105 | channel_builder.add_type(type_offset); |
| 106 | if (c->has_frequency()) { |
| 107 | channel_builder.add_frequency(c->frequency()); |
| 108 | } |
| 109 | if (c->has_max_size()) { |
| 110 | channel_builder.add_max_size(c->max_size()); |
| 111 | } |
| 112 | if (c->has_num_senders()) { |
| 113 | channel_builder.add_num_senders(c->num_senders()); |
| 114 | } |
| 115 | if (c->has_num_watchers()) { |
| 116 | channel_builder.add_num_watchers(c->num_watchers()); |
| 117 | } |
| 118 | if (!source_node_offset.IsNull()) { |
| 119 | channel_builder.add_source_node(source_node_offset); |
| 120 | } |
| 121 | if (!destination_nodes_offset.IsNull()) { |
| 122 | channel_builder.add_destination_nodes(destination_nodes_offset); |
| 123 | } |
| 124 | if (c->has_logger()) { |
| 125 | channel_builder.add_logger(c->logger()); |
| 126 | } |
| 127 | if (!logger_nodes_offset.IsNull()) { |
| 128 | channel_builder.add_logger_nodes(logger_nodes_offset); |
| 129 | } |
| 130 | if (c->has_read_method()) { |
| 131 | channel_builder.add_read_method(c->read_method()); |
| 132 | } |
| 133 | if (c->has_num_readers()) { |
| 134 | channel_builder.add_num_readers(c->num_readers()); |
| 135 | } |
| 136 | if (c->has_channel_storage_duration()) { |
| 137 | channel_builder.add_channel_storage_duration(c->channel_storage_duration()); |
| 138 | } |
| 139 | return channel_builder.Finish(); |
| 140 | } |
| 141 | |
| 142 | ConfigRemapper::ConfigRemapper(const Configuration *config, |
| 143 | const Configuration *replay_config, |
| 144 | const logger::ReplayChannels *replay_channels) |
| 145 | : remapped_configuration_(config), |
| 146 | original_configuration_(config), |
| 147 | replay_configuration_(replay_config), |
| 148 | replay_channels_(replay_channels) { |
| 149 | MakeRemappedConfig(); |
| 150 | |
| 151 | // If any remote timestamp channel was not marked NOT_LOGGED, then remap that |
| 152 | // channel to avoid the redundant logged data. Also, this loop handles the |
| 153 | // MessageHeader to RemoteMessae name change. |
| 154 | // Note: This path is mainly for backwards compatibility reasons, and should |
| 155 | // not be necessary for any new logs. |
| 156 | for (const Node *node : configuration::GetNodes(original_configuration())) { |
| 157 | message_bridge::ChannelTimestampFinder finder(original_configuration(), |
| 158 | "log_reader", node); |
| 159 | |
| 160 | absl::btree_set<std::string_view> remote_nodes; |
| 161 | |
| 162 | for (const Channel *channel : *original_configuration()->channels()) { |
| 163 | if (!configuration::ChannelIsSendableOnNode(channel, node)) { |
| 164 | continue; |
| 165 | } |
| 166 | if (!channel->has_destination_nodes()) { |
| 167 | continue; |
| 168 | } |
| 169 | for (const Connection *connection : *channel->destination_nodes()) { |
| 170 | if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection, |
| 171 | node)) { |
| 172 | // Start by seeing if the split timestamp channels are being used for |
| 173 | // this message. |
| 174 | const Channel *timestamp_channel = configuration::GetChannel( |
| 175 | original_configuration(), |
| 176 | finder.SplitChannelName(channel, connection), |
| 177 | RemoteMessage::GetFullyQualifiedName(), "", node, true); |
| 178 | |
| 179 | if (timestamp_channel != nullptr) { |
| 180 | // If for some reason a timestamp channel is not NOT_LOGGED (which |
| 181 | // is unusual), then remap the channel so that the replayed channel |
| 182 | // doesn't overlap with the special separate replay we do for |
| 183 | // timestamps. |
| 184 | if (timestamp_channel->logger() != LoggerConfig::NOT_LOGGED) { |
| 185 | RemapOriginalChannel<RemoteMessage>( |
| 186 | timestamp_channel->name()->string_view(), node); |
| 187 | } |
| 188 | continue; |
| 189 | } |
| 190 | |
| 191 | // Otherwise collect this one up as a node to look for a combined |
| 192 | // channel from. It is more efficient to compare nodes than channels. |
| 193 | LOG(WARNING) << "Failed to find channel " |
| 194 | << finder.SplitChannelName(channel, connection) |
| 195 | << " on node " << FlatbufferToJson(node); |
| 196 | remote_nodes.insert(connection->name()->string_view()); |
| 197 | } |
| 198 | } |
| 199 | } |
| 200 | |
| 201 | std::vector<const Node *> timestamp_logger_nodes = |
| 202 | configuration::TimestampNodes(original_configuration(), node); |
| 203 | for (const std::string_view remote_node : remote_nodes) { |
| 204 | const std::string channel = finder.CombinedChannelName(remote_node); |
| 205 | |
| 206 | // See if the log file is an old log with logger::MessageHeader channels |
| 207 | // in it, or a newer log with RemoteMessage. If we find an older log, |
| 208 | // rename the type too along with the name. |
| 209 | if (HasChannel<logger::MessageHeader>(channel, node)) { |
| 210 | CHECK(!HasChannel<RemoteMessage>(channel, node)) |
| 211 | << ": Can't have both a logger::MessageHeader and RemoteMessage " |
| 212 | "remote " |
| 213 | "timestamp channel."; |
| 214 | // In theory, we should check NOT_LOGGED like RemoteMessage and be more |
| 215 | // careful about updating the config, but there are fewer and fewer logs |
| 216 | // with logger::MessageHeader remote messages, so it isn't worth the |
| 217 | // effort. |
| 218 | RemapOriginalChannel<logger::MessageHeader>( |
| 219 | channel, node, "/original", "aos.message_bridge.RemoteMessage"); |
| 220 | } else { |
| 221 | CHECK(HasChannel<RemoteMessage>(channel, node)) |
| 222 | << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \"" |
| 223 | << RemoteMessage::GetFullyQualifiedName() << "\"} for node " |
| 224 | << node->name()->string_view(); |
| 225 | // Only bother to remap if there's something on the channel. We can |
| 226 | // tell if the channel was marked NOT_LOGGED or not. This makes the |
| 227 | // config not change un-necesarily when we replay a log with NOT_LOGGED |
| 228 | // messages. |
| 229 | if (HasOriginalChannel<RemoteMessage>(channel, node)) { |
| 230 | RemapOriginalChannel<RemoteMessage>(channel, node); |
| 231 | } |
| 232 | } |
| 233 | } |
| 234 | } |
| 235 | if (replay_configuration_) { |
| 236 | CHECK_EQ(configuration::MultiNode(remapped_configuration()), |
| 237 | configuration::MultiNode(replay_configuration_)) |
| 238 | << ": Log file and replay config need to both be multi or single " |
| 239 | "node."; |
| 240 | } |
| 241 | } |
| 242 | |
| 243 | ConfigRemapper::~ConfigRemapper() { |
| 244 | // Zero out some buffers. It's easy to do use-after-frees on these, so make |
| 245 | // it more obvious. |
| 246 | if (remapped_configuration_buffer_) { |
| 247 | remapped_configuration_buffer_->Wipe(); |
| 248 | } |
| 249 | } |
| 250 | |
| 251 | const Configuration *ConfigRemapper::original_configuration() const { |
| 252 | return original_configuration_; |
| 253 | } |
| 254 | |
| 255 | const Configuration *ConfigRemapper::remapped_configuration() const { |
| 256 | return remapped_configuration_; |
| 257 | } |
| 258 | |
| 259 | void ConfigRemapper::set_configuration(const Configuration *configuration) { |
| 260 | remapped_configuration_ = configuration; |
| 261 | } |
| 262 | |
| 263 | std::vector<const Channel *> ConfigRemapper::RemappedChannels() const { |
| 264 | std::vector<const Channel *> result; |
| 265 | result.reserve(remapped_channels_.size()); |
| 266 | for (auto &pair : remapped_channels_) { |
| 267 | const Channel *const original_channel = |
Austin Schuh | 6bdcc37 | 2024-06-27 14:49:11 -0700 | [diff] [blame] | 268 | original_configuration()->channels()->Get(pair.first); |
| 269 | CHECK(original_channel != nullptr); |
Eric Schmiedeberg | e279b53 | 2023-04-19 16:36:02 -0600 | [diff] [blame] | 270 | |
| 271 | auto channel_iterator = std::lower_bound( |
| 272 | remapped_configuration_->channels()->cbegin(), |
| 273 | remapped_configuration_->channels()->cend(), |
| 274 | std::make_pair(std::string_view(pair.second.remapped_name), |
| 275 | original_channel->type()->string_view()), |
| 276 | CompareChannels); |
| 277 | |
| 278 | CHECK(channel_iterator != remapped_configuration_->channels()->cend()); |
| 279 | CHECK(EqualsChannels( |
| 280 | *channel_iterator, |
| 281 | std::make_pair(std::string_view(pair.second.remapped_name), |
| 282 | original_channel->type()->string_view()))); |
| 283 | result.push_back(*channel_iterator); |
| 284 | } |
| 285 | return result; |
| 286 | } |
| 287 | |
| 288 | const Channel *ConfigRemapper::RemapChannel(const EventLoop *event_loop, |
| 289 | const Node *node, |
| 290 | const Channel *channel) { |
| 291 | std::string_view channel_name = channel->name()->string_view(); |
| 292 | std::string_view channel_type = channel->type()->string_view(); |
| 293 | const int channel_index = |
| 294 | configuration::ChannelIndex(original_configuration(), channel); |
| 295 | // If the channel is remapped, find the correct channel name to use. |
| 296 | if (remapped_channels_.count(channel_index) > 0) { |
| 297 | VLOG(3) << "Got remapped channel on " |
| 298 | << configuration::CleanedChannelToString(channel); |
| 299 | channel_name = remapped_channels_[channel_index].remapped_name; |
| 300 | } |
| 301 | |
| 302 | VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type; |
| 303 | const Channel *remapped_channel = configuration::GetChannel( |
| 304 | remapped_configuration(), channel_name, channel_type, |
| 305 | event_loop ? event_loop->name() : "log_reader", node); |
| 306 | |
| 307 | CHECK(remapped_channel != nullptr) |
| 308 | << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \"" |
| 309 | << channel_type << "\"} because it is not in the provided configuration."; |
| 310 | |
| 311 | return remapped_channel; |
| 312 | } |
| 313 | |
| 314 | void ConfigRemapper::RemapOriginalChannel(std::string_view name, |
| 315 | std::string_view type, |
| 316 | std::string_view add_prefix, |
| 317 | std::string_view new_type, |
| 318 | RemapConflict conflict_handling) { |
| 319 | RemapOriginalChannel(name, type, nullptr, add_prefix, new_type, |
| 320 | conflict_handling); |
| 321 | } |
| 322 | |
| 323 | void ConfigRemapper::RemapOriginalChannel(std::string_view name, |
| 324 | std::string_view type, |
| 325 | const Node *node, |
| 326 | std::string_view add_prefix, |
| 327 | std::string_view new_type, |
| 328 | RemapConflict conflict_handling) { |
| 329 | if (node != nullptr) { |
| 330 | VLOG(1) << "Node is " << FlatbufferToJson(node); |
| 331 | } |
| 332 | if (replay_channels_ != nullptr) { |
| 333 | CHECK(std::find(replay_channels_->begin(), replay_channels_->end(), |
| 334 | std::make_pair(std::string{name}, std::string{type})) != |
| 335 | replay_channels_->end()) |
| 336 | << "Attempted to remap channel " << name << " " << type |
| 337 | << " which is not included in the replay channels passed to " |
| 338 | "ConfigRemapper."; |
| 339 | } |
| 340 | const Channel *remapped_channel = |
| 341 | configuration::GetChannel(original_configuration(), name, type, "", node); |
| 342 | CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name |
| 343 | << "\", \"type\": \"" << type << "\"}"; |
| 344 | VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type |
| 345 | << "\"}"; |
| 346 | VLOG(1) << "Remapped " |
| 347 | << configuration::StrippedChannelToString(remapped_channel); |
| 348 | |
| 349 | // We want to make /spray on node 0 go to /0/spray by snooping the maps. And |
| 350 | // we want it to degrade if the heuristics fail to just work. |
| 351 | // |
| 352 | // The easiest way to do this is going to be incredibly specific and verbose. |
| 353 | // Look up /spray, to /0/spray. Then, prefix the result with /original to get |
| 354 | // /original/0/spray. Then, create a map from /original/spray to |
| 355 | // /original/0/spray for just the type we were asked for. |
| 356 | if (name != remapped_channel->name()->string_view()) { |
| 357 | MapT new_map; |
| 358 | new_map.match = std::make_unique<ChannelT>(); |
| 359 | new_map.match->name = absl::StrCat(add_prefix, name); |
| 360 | new_map.match->type = type; |
| 361 | if (node != nullptr) { |
| 362 | new_map.match->source_node = node->name()->str(); |
| 363 | } |
| 364 | new_map.rename = std::make_unique<ChannelT>(); |
| 365 | new_map.rename->name = |
| 366 | absl::StrCat(add_prefix, remapped_channel->name()->string_view()); |
| 367 | maps_.emplace_back(std::move(new_map)); |
| 368 | } |
| 369 | |
| 370 | // Then remap the original channel to the prefixed channel. |
| 371 | const size_t channel_index = |
| 372 | configuration::ChannelIndex(original_configuration(), remapped_channel); |
| 373 | CHECK_EQ(0u, remapped_channels_.count(channel_index)) |
| 374 | << "Already remapped channel " |
| 375 | << configuration::CleanedChannelToString(remapped_channel); |
| 376 | |
| 377 | RemappedChannel remapped_channel_struct; |
| 378 | remapped_channel_struct.remapped_name = |
| 379 | std::string(add_prefix) + |
| 380 | std::string(remapped_channel->name()->string_view()); |
| 381 | remapped_channel_struct.new_type = new_type; |
| 382 | const std::string_view remapped_type = new_type.empty() ? type : new_type; |
| 383 | CheckAndHandleRemapConflict( |
| 384 | remapped_channel_struct.remapped_name, remapped_type, |
| 385 | remapped_configuration_, conflict_handling, |
| 386 | [this, &remapped_channel_struct, remapped_type, node, add_prefix, |
| 387 | conflict_handling]() { |
| 388 | RemapOriginalChannel(remapped_channel_struct.remapped_name, |
| 389 | remapped_type, node, add_prefix, "", |
| 390 | conflict_handling); |
| 391 | }); |
| 392 | remapped_channels_[channel_index] = std::move(remapped_channel_struct); |
| 393 | MakeRemappedConfig(); |
| 394 | } |
| 395 | |
| 396 | void ConfigRemapper::RenameOriginalChannel(const std::string_view name, |
| 397 | const std::string_view type, |
| 398 | const std::string_view new_name, |
| 399 | const std::vector<MapT> &add_maps) { |
| 400 | RenameOriginalChannel(name, type, nullptr, new_name, add_maps); |
| 401 | } |
| 402 | |
| 403 | void ConfigRemapper::RenameOriginalChannel(const std::string_view name, |
| 404 | const std::string_view type, |
| 405 | const Node *const node, |
| 406 | const std::string_view new_name, |
| 407 | const std::vector<MapT> &add_maps) { |
| 408 | if (node != nullptr) { |
| 409 | VLOG(1) << "Node is " << FlatbufferToJson(node); |
| 410 | } |
| 411 | // First find the channel and rename it. |
| 412 | const Channel *remapped_channel = |
| 413 | configuration::GetChannel(original_configuration(), name, type, "", node); |
| 414 | CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name |
| 415 | << "\", \"type\": \"" << type << "\"}"; |
| 416 | VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type |
| 417 | << "\"}"; |
| 418 | VLOG(1) << "Remapped " |
| 419 | << configuration::StrippedChannelToString(remapped_channel); |
| 420 | |
| 421 | const size_t channel_index = |
| 422 | configuration::ChannelIndex(original_configuration(), remapped_channel); |
| 423 | CHECK_EQ(0u, remapped_channels_.count(channel_index)) |
| 424 | << "Already remapped channel " |
| 425 | << configuration::CleanedChannelToString(remapped_channel); |
| 426 | |
| 427 | RemappedChannel remapped_channel_struct; |
| 428 | remapped_channel_struct.remapped_name = new_name; |
| 429 | remapped_channel_struct.new_type.clear(); |
| 430 | remapped_channels_[channel_index] = std::move(remapped_channel_struct); |
| 431 | |
| 432 | // Then add any provided maps. |
| 433 | for (const MapT &map : add_maps) { |
| 434 | maps_.push_back(map); |
| 435 | } |
| 436 | |
| 437 | // Finally rewrite the config. |
| 438 | MakeRemappedConfig(); |
| 439 | } |
| 440 | |
| 441 | void ConfigRemapper::MakeRemappedConfig() { |
| 442 | // If no remapping occurred and we are using the original config, then there |
| 443 | // is nothing interesting to do here. |
| 444 | if (remapped_channels_.empty() && replay_configuration_ == nullptr) { |
| 445 | remapped_configuration_ = original_configuration(); |
| 446 | return; |
| 447 | } |
| 448 | // Config to copy Channel definitions from. Use the specified |
| 449 | // replay_configuration_ if it has been provided. |
| 450 | const Configuration *const base_config = replay_configuration_ == nullptr |
| 451 | ? original_configuration() |
| 452 | : replay_configuration_; |
| 453 | |
| 454 | // Create a config with all the channels, but un-sorted/merged. Collect up |
| 455 | // the schemas while we do this. Call MergeConfiguration to sort everything, |
| 456 | // and then merge it all in together. |
| 457 | |
| 458 | // This is the builder that we use for the config containing all the new |
| 459 | // channels. |
| 460 | flatbuffers::FlatBufferBuilder fbb; |
| 461 | fbb.ForceDefaults(true); |
| 462 | std::vector<flatbuffers::Offset<Channel>> channel_offsets; |
| 463 | |
| 464 | CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 14u) |
| 465 | << ": Merging logic needs to be updated when the number of channel " |
| 466 | "fields changes."; |
| 467 | |
| 468 | // List of schemas. |
| 469 | std::map<std::string_view, FlatbufferVector<reflection::Schema>> schema_map; |
| 470 | // Make sure our new RemoteMessage schema is in there for old logs without it. |
| 471 | schema_map.insert(std::make_pair( |
| 472 | message_bridge::RemoteMessage::GetFullyQualifiedName(), |
| 473 | FlatbufferVector<reflection::Schema>(FlatbufferSpan<reflection::Schema>( |
| 474 | message_bridge::RemoteMessageSchema())))); |
| 475 | |
| 476 | // Reconstruct the remapped channels. |
| 477 | for (auto &pair : remapped_channels_) { |
Austin Schuh | 6bdcc37 | 2024-06-27 14:49:11 -0700 | [diff] [blame] | 478 | const Channel *const c = configuration::GetChannel( |
Eric Schmiedeberg | e279b53 | 2023-04-19 16:36:02 -0600 | [diff] [blame] | 479 | base_config, original_configuration()->channels()->Get(pair.first), "", |
Austin Schuh | 6bdcc37 | 2024-06-27 14:49:11 -0700 | [diff] [blame] | 480 | nullptr); |
| 481 | CHECK(c != nullptr); |
Eric Schmiedeberg | e279b53 | 2023-04-19 16:36:02 -0600 | [diff] [blame] | 482 | channel_offsets.emplace_back( |
| 483 | CopyChannel(c, pair.second.remapped_name, "", &fbb)); |
| 484 | |
| 485 | if (c->has_destination_nodes()) { |
| 486 | for (const Connection *connection : *c->destination_nodes()) { |
| 487 | switch (connection->timestamp_logger()) { |
| 488 | case LoggerConfig::LOCAL_LOGGER: |
| 489 | case LoggerConfig::NOT_LOGGED: |
| 490 | // There is no timestamp channel associated with this, so ignore it. |
| 491 | break; |
| 492 | |
| 493 | case LoggerConfig::REMOTE_LOGGER: |
| 494 | case LoggerConfig::LOCAL_AND_REMOTE_LOGGER: |
| 495 | // We want to make a split timestamp channel regardless of what type |
| 496 | // of log this used to be. No sense propagating the single |
| 497 | // timestamp channel. |
| 498 | |
| 499 | CHECK(connection->has_timestamp_logger_nodes()); |
| 500 | for (const flatbuffers::String *timestamp_logger_node : |
| 501 | *connection->timestamp_logger_nodes()) { |
| 502 | const Node *node = |
| 503 | configuration::GetNode(original_configuration(), |
| 504 | timestamp_logger_node->string_view()); |
| 505 | message_bridge::ChannelTimestampFinder finder( |
| 506 | original_configuration(), "log_reader", node); |
| 507 | |
| 508 | // We are assuming here that all the maps are setup correctly to |
| 509 | // handle arbitrary timestamps. Apply the maps for this node to |
| 510 | // see what name this ends up with. |
| 511 | std::string name = finder.SplitChannelName( |
| 512 | pair.second.remapped_name, c->type()->str(), connection); |
| 513 | std::string unmapped_name = name; |
| 514 | configuration::HandleMaps(original_configuration()->maps(), &name, |
| 515 | "aos.message_bridge.RemoteMessage", |
| 516 | node); |
| 517 | CHECK_NE(name, unmapped_name) |
| 518 | << ": Remote timestamp channel was not remapped, this is " |
| 519 | "very fishy"; |
| 520 | flatbuffers::Offset<flatbuffers::String> channel_name_offset = |
| 521 | fbb.CreateString(name); |
| 522 | flatbuffers::Offset<flatbuffers::String> channel_type_offset = |
| 523 | fbb.CreateString("aos.message_bridge.RemoteMessage"); |
| 524 | flatbuffers::Offset<flatbuffers::String> source_node_offset = |
| 525 | fbb.CreateString(timestamp_logger_node->string_view()); |
| 526 | |
| 527 | // Now, build a channel. Don't log it, 2 senders, and match the |
| 528 | // source frequency. |
| 529 | Channel::Builder channel_builder(fbb); |
| 530 | channel_builder.add_name(channel_name_offset); |
| 531 | channel_builder.add_type(channel_type_offset); |
| 532 | channel_builder.add_source_node(source_node_offset); |
| 533 | channel_builder.add_logger(LoggerConfig::NOT_LOGGED); |
| 534 | channel_builder.add_num_senders(2); |
| 535 | if (c->has_frequency()) { |
| 536 | channel_builder.add_frequency(c->frequency()); |
| 537 | } |
| 538 | if (c->has_channel_storage_duration()) { |
| 539 | channel_builder.add_channel_storage_duration( |
| 540 | c->channel_storage_duration()); |
| 541 | } |
| 542 | channel_offsets.emplace_back(channel_builder.Finish()); |
| 543 | } |
| 544 | break; |
| 545 | } |
| 546 | } |
| 547 | } |
| 548 | } |
| 549 | |
| 550 | // Now reconstruct the original channels, translating types as needed |
| 551 | for (const Channel *c : *base_config->channels()) { |
| 552 | // Search for a mapping channel. |
| 553 | std::string_view new_type = ""; |
| 554 | for (auto &pair : remapped_channels_) { |
| 555 | const Channel *const remapped_channel = |
| 556 | original_configuration()->channels()->Get(pair.first); |
| 557 | if (remapped_channel->name()->string_view() == c->name()->string_view() && |
| 558 | remapped_channel->type()->string_view() == c->type()->string_view()) { |
| 559 | new_type = pair.second.new_type; |
| 560 | break; |
| 561 | } |
| 562 | } |
| 563 | |
| 564 | // Copy everything over. |
| 565 | channel_offsets.emplace_back(CopyChannel(c, "", new_type, &fbb)); |
| 566 | |
| 567 | // Add the schema if it doesn't exist. |
| 568 | if (schema_map.find(c->type()->string_view()) == schema_map.end()) { |
James (Peilun) Li | 8769b98 | 2024-10-19 12:34:25 -0700 | [diff] [blame^] | 569 | if (!c->has_schema()) { |
| 570 | LOG(FATAL) << "Could not find schema for " << c->type()->string_view(); |
| 571 | } |
Eric Schmiedeberg | e279b53 | 2023-04-19 16:36:02 -0600 | [diff] [blame] | 572 | schema_map.insert(std::make_pair(c->type()->string_view(), |
| 573 | RecursiveCopyFlatBuffer(c->schema()))); |
| 574 | } |
| 575 | } |
| 576 | |
| 577 | // The MergeConfiguration API takes a vector, not a map. Convert. |
| 578 | std::vector<FlatbufferVector<reflection::Schema>> schemas; |
| 579 | while (!schema_map.empty()) { |
| 580 | schemas.emplace_back(std::move(schema_map.begin()->second)); |
| 581 | schema_map.erase(schema_map.begin()); |
| 582 | } |
| 583 | |
| 584 | // Create the Configuration containing the new channels that we want to add. |
| 585 | const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>> |
| 586 | channels_offset = |
| 587 | channel_offsets.empty() ? 0 : fbb.CreateVector(channel_offsets); |
| 588 | |
| 589 | // Copy over the old maps. |
| 590 | std::vector<flatbuffers::Offset<Map>> map_offsets; |
| 591 | if (base_config->maps()) { |
| 592 | for (const Map *map : *base_config->maps()) { |
| 593 | map_offsets.emplace_back(RecursiveCopyFlatBuffer(map, &fbb)); |
| 594 | } |
| 595 | } |
| 596 | |
| 597 | // Now create the new maps. These are second so they take effect first. |
| 598 | for (const MapT &map : maps_) { |
| 599 | CHECK(!map.match->name.empty()); |
| 600 | const flatbuffers::Offset<flatbuffers::String> match_name_offset = |
| 601 | fbb.CreateString(map.match->name); |
| 602 | flatbuffers::Offset<flatbuffers::String> match_type_offset; |
| 603 | if (!map.match->type.empty()) { |
| 604 | match_type_offset = fbb.CreateString(map.match->type); |
| 605 | } |
| 606 | flatbuffers::Offset<flatbuffers::String> match_source_node_offset; |
| 607 | if (!map.match->source_node.empty()) { |
| 608 | match_source_node_offset = fbb.CreateString(map.match->source_node); |
| 609 | } |
| 610 | CHECK(!map.rename->name.empty()); |
| 611 | const flatbuffers::Offset<flatbuffers::String> rename_name_offset = |
| 612 | fbb.CreateString(map.rename->name); |
| 613 | Channel::Builder match_builder(fbb); |
| 614 | match_builder.add_name(match_name_offset); |
| 615 | if (!match_type_offset.IsNull()) { |
| 616 | match_builder.add_type(match_type_offset); |
| 617 | } |
| 618 | if (!match_source_node_offset.IsNull()) { |
| 619 | match_builder.add_source_node(match_source_node_offset); |
| 620 | } |
| 621 | const flatbuffers::Offset<Channel> match_offset = match_builder.Finish(); |
| 622 | |
| 623 | Channel::Builder rename_builder(fbb); |
| 624 | rename_builder.add_name(rename_name_offset); |
| 625 | const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish(); |
| 626 | |
| 627 | Map::Builder map_builder(fbb); |
| 628 | map_builder.add_match(match_offset); |
| 629 | map_builder.add_rename(rename_offset); |
| 630 | map_offsets.emplace_back(map_builder.Finish()); |
| 631 | } |
| 632 | |
| 633 | flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Map>>> |
| 634 | maps_offsets = map_offsets.empty() ? 0 : fbb.CreateVector(map_offsets); |
| 635 | |
| 636 | // And copy everything else over. |
| 637 | flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>> |
| 638 | nodes_offset = RecursiveCopyVectorTable(base_config->nodes(), &fbb); |
| 639 | |
| 640 | flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>> |
| 641 | applications_offset = |
| 642 | RecursiveCopyVectorTable(base_config->applications(), &fbb); |
| 643 | |
| 644 | // Now insert everything else in unmodified. |
| 645 | ConfigurationBuilder configuration_builder(fbb); |
| 646 | if (!channels_offset.IsNull()) { |
| 647 | configuration_builder.add_channels(channels_offset); |
| 648 | } |
| 649 | if (!maps_offsets.IsNull()) { |
| 650 | configuration_builder.add_maps(maps_offsets); |
| 651 | } |
| 652 | if (!nodes_offset.IsNull()) { |
| 653 | configuration_builder.add_nodes(nodes_offset); |
| 654 | } |
| 655 | if (!applications_offset.IsNull()) { |
| 656 | configuration_builder.add_applications(applications_offset); |
| 657 | } |
| 658 | |
| 659 | if (base_config->has_channel_storage_duration()) { |
| 660 | configuration_builder.add_channel_storage_duration( |
| 661 | base_config->channel_storage_duration()); |
| 662 | } |
| 663 | |
| 664 | CHECK_EQ(Configuration::MiniReflectTypeTable()->num_elems, 6u) |
| 665 | << ": Merging logic needs to be updated when the number of configuration " |
| 666 | "fields changes."; |
| 667 | |
| 668 | fbb.Finish(configuration_builder.Finish()); |
| 669 | |
| 670 | // Clean it up and return it! By using MergeConfiguration here, we'll |
| 671 | // actually get a deduplicated config for free too. |
| 672 | FlatbufferDetachedBuffer<Configuration> new_merged_config = |
| 673 | configuration::MergeConfiguration( |
| 674 | FlatbufferDetachedBuffer<Configuration>(fbb.Release())); |
| 675 | |
| 676 | remapped_configuration_buffer_ = |
| 677 | std::make_unique<FlatbufferDetachedBuffer<Configuration>>( |
| 678 | configuration::MergeConfiguration(new_merged_config, schemas)); |
| 679 | |
| 680 | remapped_configuration_ = &remapped_configuration_buffer_->message(); |
| 681 | |
| 682 | // TODO(austin): Lazily re-build to save CPU? |
| 683 | } |
| 684 | |
| 685 | } // namespace aos |