Austin Schuh | 36a2c3e | 2021-02-18 22:28:38 -0800 | [diff] [blame] | 1 | #include "aos/network/timestamp_channel.h" |
| 2 | |
Austin Schuh | 99f7c6a | 2024-06-25 22:07:44 -0700 | [diff] [blame^] | 3 | #include "absl/flags/flag.h" |
Austin Schuh | 36a2c3e | 2021-02-18 22:28:38 -0800 | [diff] [blame] | 4 | #include "absl/strings/str_cat.h" |
| 5 | |
Austin Schuh | 99f7c6a | 2024-06-25 22:07:44 -0700 | [diff] [blame^] | 6 | ABSL_FLAG(bool, combined_timestamp_channel_fallback, true, |
| 7 | "If true, fall back to using the combined timestamp channel if the " |
| 8 | "single timestamp channel doesn't exist for a timestamp."); |
| 9 | ABSL_FLAG(bool, check_timestamp_channel_frequencies, true, |
| 10 | "If true, include a debug CHECK to ensure that remote timestamp " |
| 11 | "channels are configured to have at least as great a frequency as " |
| 12 | "the corresponding data channel."); |
Austin Schuh | 349e7ad | 2022-04-02 21:12:26 -0700 | [diff] [blame] | 13 | |
Stephan Pleines | f63bde8 | 2024-01-13 15:59:33 -0800 | [diff] [blame] | 14 | namespace aos::message_bridge { |
Austin Schuh | 36a2c3e | 2021-02-18 22:28:38 -0800 | [diff] [blame] | 15 | |
Austin Schuh | 61e973f | 2021-02-21 21:43:56 -0800 | [diff] [blame] | 16 | ChannelTimestampFinder::ChannelTimestampFinder( |
| 17 | const Configuration *configuration, const std::string_view name, |
| 18 | const Node *node) |
| 19 | : configuration_(configuration), name_(name), node_(node) {} |
| 20 | |
| 21 | std::string ChannelTimestampFinder::SplitChannelName( |
| 22 | const Channel *channel, const Connection *connection) { |
Austin Schuh | 349e7ad | 2022-04-02 21:12:26 -0700 | [diff] [blame] | 23 | return SplitChannelName(channel->name()->string_view(), |
| 24 | channel->type()->str(), connection); |
Austin Schuh | 006a9f5 | 2021-04-07 16:24:18 -0700 | [diff] [blame] | 25 | } |
| 26 | |
| 27 | std::string ChannelTimestampFinder::SplitChannelName( |
| 28 | std::string_view name, std::string type, const Connection *connection) { |
Austin Schuh | 61e973f | 2021-02-21 21:43:56 -0800 | [diff] [blame] | 29 | std::replace(type.begin(), type.end(), '.', '-'); |
| 30 | return absl::StrCat("/aos/remote_timestamps/", |
Austin Schuh | 006a9f5 | 2021-04-07 16:24:18 -0700 | [diff] [blame] | 31 | connection->name()->string_view(), name, "/", type); |
Austin Schuh | 61e973f | 2021-02-21 21:43:56 -0800 | [diff] [blame] | 32 | } |
| 33 | |
| 34 | std::string ChannelTimestampFinder::CombinedChannelName( |
| 35 | std::string_view remote_node) { |
| 36 | return absl::StrCat("/aos/remote_timestamps/", remote_node); |
| 37 | } |
| 38 | |
Austin Schuh | 01f3b39 | 2022-01-25 20:03:09 -0800 | [diff] [blame] | 39 | const Channel *ChannelTimestampFinder::SplitChannelForChannel( |
| 40 | const Channel *channel, const Connection *connection) { |
| 41 | const std::string split_timestamp_channel_name = |
| 42 | SplitChannelName(channel, connection); |
| 43 | return configuration::GetChannel(configuration_, split_timestamp_channel_name, |
| 44 | RemoteMessage::GetFullyQualifiedName(), |
| 45 | name_, node_, true); |
| 46 | } |
| 47 | |
Austin Schuh | 61e973f | 2021-02-21 21:43:56 -0800 | [diff] [blame] | 48 | const Channel *ChannelTimestampFinder::ForChannel( |
| 49 | const Channel *channel, const Connection *connection) { |
| 50 | const std::string split_timestamp_channel_name = |
| 51 | SplitChannelName(channel, connection); |
| 52 | const Channel *split_timestamp_channel = configuration::GetChannel( |
| 53 | configuration_, split_timestamp_channel_name, |
| 54 | RemoteMessage::GetFullyQualifiedName(), name_, node_, true); |
| 55 | if (split_timestamp_channel != nullptr) { |
| 56 | return split_timestamp_channel; |
| 57 | } |
| 58 | |
Austin Schuh | 99f7c6a | 2024-06-25 22:07:44 -0700 | [diff] [blame^] | 59 | if (!absl::GetFlag(FLAGS_combined_timestamp_channel_fallback)) { |
Austin Schuh | 349e7ad | 2022-04-02 21:12:26 -0700 | [diff] [blame] | 60 | LOG(FATAL) << "Failed to find new timestamp channel {\"name\": \"" |
| 61 | << split_timestamp_channel_name << "\", \"type\": \"" |
| 62 | << RemoteMessage::GetFullyQualifiedName() << "\"} for " |
| 63 | << configuration::CleanedChannelToString(channel) |
| 64 | << " connection " << aos::FlatbufferToJson(connection) |
| 65 | << " and --nocombined_timestamp_channel_fallback is set"; |
| 66 | } |
| 67 | |
Austin Schuh | 61e973f | 2021-02-21 21:43:56 -0800 | [diff] [blame] | 68 | const std::string shared_timestamp_channel_name = |
| 69 | CombinedChannelName(connection->name()->string_view()); |
| 70 | const Channel *shared_timestamp_channel = configuration::GetChannel( |
| 71 | configuration_, shared_timestamp_channel_name, |
| 72 | RemoteMessage::GetFullyQualifiedName(), name_, node_, true); |
| 73 | if (shared_timestamp_channel != nullptr) { |
| 74 | LOG(WARNING) << "Failed to find timestamp channel {\"name\": \"" |
James Kuszmaul | 9c12812 | 2021-03-22 22:24:36 -0700 | [diff] [blame] | 75 | << split_timestamp_channel_name << "\", \"type\": \"" |
Austin Schuh | 61e973f | 2021-02-21 21:43:56 -0800 | [diff] [blame] | 76 | << RemoteMessage::GetFullyQualifiedName() |
| 77 | << "\"}, falling back to old version."; |
| 78 | return shared_timestamp_channel; |
| 79 | } |
| 80 | |
| 81 | CHECK(shared_timestamp_channel != nullptr) |
| 82 | << ": Remote timestamp channel { \"name\": \"" |
| 83 | << split_timestamp_channel_name << "\", \"type\": \"" |
| 84 | << RemoteMessage::GetFullyQualifiedName() |
| 85 | << "\" } not found in config for " << name_ |
| 86 | << (configuration::MultiNode(configuration_) |
| 87 | ? absl::StrCat(" on node ", node_->name()->string_view()) |
| 88 | : "."); |
| 89 | |
| 90 | return nullptr; |
| 91 | } |
| 92 | |
Austin Schuh | 36a2c3e | 2021-02-18 22:28:38 -0800 | [diff] [blame] | 93 | ChannelTimestampSender::ChannelTimestampSender(aos::EventLoop *event_loop) |
| 94 | : event_loop_(event_loop) { |
Austin Schuh | 58646e2 | 2021-08-23 23:51:46 -0700 | [diff] [blame] | 95 | if (event_loop_) { |
| 96 | CHECK(configuration::MultiNode(event_loop_->configuration())); |
| 97 | } |
Austin Schuh | 36a2c3e | 2021-02-18 22:28:38 -0800 | [diff] [blame] | 98 | } |
| 99 | |
| 100 | aos::Sender<RemoteMessage> *ChannelTimestampSender::SenderForChannel( |
| 101 | const Channel *channel, const Connection *connection) { |
Austin Schuh | 58646e2 | 2021-08-23 23:51:46 -0700 | [diff] [blame] | 102 | CHECK(event_loop_); |
| 103 | |
Austin Schuh | 61e973f | 2021-02-21 21:43:56 -0800 | [diff] [blame] | 104 | ChannelTimestampFinder finder(event_loop_); |
Austin Schuh | 36a2c3e | 2021-02-18 22:28:38 -0800 | [diff] [blame] | 105 | // Look at any pre-created channel/connection pairs. |
Austin Schuh | 61e973f | 2021-02-21 21:43:56 -0800 | [diff] [blame] | 106 | { |
| 107 | auto it = |
| 108 | channel_timestamp_loggers_.find(std::make_pair(channel, connection)); |
| 109 | if (it != channel_timestamp_loggers_.end()) { |
| 110 | return it->second.get(); |
| 111 | } |
Austin Schuh | 36a2c3e | 2021-02-18 22:28:38 -0800 | [diff] [blame] | 112 | } |
| 113 | |
Austin Schuh | 61e973f | 2021-02-21 21:43:56 -0800 | [diff] [blame] | 114 | const Channel *timestamp_channel = finder.ForChannel(channel, connection); |
Austin Schuh | 36a2c3e | 2021-02-18 22:28:38 -0800 | [diff] [blame] | 115 | |
Austin Schuh | 99f7c6a | 2024-06-25 22:07:44 -0700 | [diff] [blame^] | 116 | if (absl::GetFlag(FLAGS_check_timestamp_channel_frequencies)) { |
James Kuszmaul | e339c85 | 2023-02-13 14:54:03 -0800 | [diff] [blame] | 117 | // Sanity-check that the timestamp channel can actually support full-rate |
| 118 | // messages coming through on the source channel. |
| 119 | CHECK_GE(timestamp_channel->frequency(), channel->frequency()) |
| 120 | << ": Timestamp channel " |
| 121 | << configuration::StrippedChannelToString(timestamp_channel) |
| 122 | << "'s rate is lower than the source channel."; |
| 123 | } |
James Kuszmaul | 839c8aa | 2023-01-10 15:27:57 -0800 | [diff] [blame] | 124 | |
Austin Schuh | 61e973f | 2021-02-21 21:43:56 -0800 | [diff] [blame] | 125 | { |
| 126 | auto it = timestamp_loggers_.find(timestamp_channel); |
| 127 | if (it != timestamp_loggers_.end()) { |
| 128 | CHECK(channel_timestamp_loggers_ |
| 129 | .try_emplace(std::make_pair(channel, connection), it->second) |
| 130 | .second); |
| 131 | return it->second.get(); |
Austin Schuh | 36a2c3e | 2021-02-18 22:28:38 -0800 | [diff] [blame] | 132 | } |
Austin Schuh | 36a2c3e | 2021-02-18 22:28:38 -0800 | [diff] [blame] | 133 | } |
Austin Schuh | 61e973f | 2021-02-21 21:43:56 -0800 | [diff] [blame] | 134 | |
| 135 | auto result = channel_timestamp_loggers_.try_emplace( |
| 136 | std::make_pair(channel, connection), |
| 137 | std::make_shared<aos::Sender<RemoteMessage>>( |
| 138 | event_loop_->MakeSender<RemoteMessage>( |
| 139 | timestamp_channel->name()->string_view()))); |
| 140 | |
| 141 | CHECK(timestamp_loggers_.try_emplace(timestamp_channel, result.first->second) |
| 142 | .second); |
| 143 | return result.first->second.get(); |
Austin Schuh | 36a2c3e | 2021-02-18 22:28:38 -0800 | [diff] [blame] | 144 | } |
| 145 | |
Stephan Pleines | f63bde8 | 2024-01-13 15:59:33 -0800 | [diff] [blame] | 146 | } // namespace aos::message_bridge |