blob: 66c902fa4a1e6ffadec667e7011e8c42aa73e687 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
Austin Schuh0de30f32020-12-06 12:44:28 -08003#include "absl/strings/str_cat.h"
Austin Schuhac6d89e2024-03-27 14:56:09 -07004#include "glog/logging.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07005
Austin Schuh0de30f32020-12-06 12:44:28 -08006#include "aos/configuration.h"
Austin Schuh898f4972020-01-11 17:21:25 -08007#include "aos/events/event_loop.h"
8#include "aos/events/simulated_event_loop.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08009#include "aos/network/remote_message_generated.h"
Austin Schuh898f4972020-01-11 17:21:25 -080010
Stephan Pleinesf63bde82024-01-13 15:59:33 -080011namespace aos::message_bridge {
Austin Schuh898f4972020-01-11 17:21:25 -080012
13// This class delays messages forwarded between two factories.
14//
15// The basic design is that we need to use the distributed_clock to convert
Austin Schuh9cce6842024-04-02 18:55:44 -070016// monotonic times from the source to the destination node. We use a list of
17// timestamps added each time a message is delivered to the server side to drive
18// the client side publishing. This pulls the data from the fetcher to match
19// with the timestamps queued.
Austin Schuh898f4972020-01-11 17:21:25 -080020class RawMessageDelayer {
21 public:
Austin Schuh58646e22021-08-23 23:51:46 -070022 RawMessageDelayer(const Channel *channel, const Connection *connection,
23 aos::NodeEventLoopFactory *fetch_node_factory,
Austin Schuh898f4972020-01-11 17:21:25 -080024 aos::NodeEventLoopFactory *send_node_factory,
Austin Schuh58646e22021-08-23 23:51:46 -070025 size_t destination_node_index, bool delivery_time_is_logged)
26 : channel_(channel),
27 connection_(connection),
28 fetch_node_factory_(fetch_node_factory),
Austin Schuh898f4972020-01-11 17:21:25 -080029 send_node_factory_(send_node_factory),
Austin Schuh20ac95d2020-12-05 17:24:19 -080030 destination_node_index_(destination_node_index),
Austin Schuh58646e22021-08-23 23:51:46 -070031 channel_index_(configuration::ChannelIndex(
32 fetch_node_factory_->configuration(), channel_)),
33 delivery_time_is_logged_(delivery_time_is_logged) {}
Austin Schuh898f4972020-01-11 17:21:25 -080034
Austin Schuh58646e22021-08-23 23:51:46 -070035 bool forwarding_disabled() const { return forwarding_disabled_; }
36 void set_forwarding_disabled(bool forwarding_disabled) {
37 forwarding_disabled_ = forwarding_disabled;
James Kuszmaul94ca5132022-07-19 09:11:08 -070038 if (!forwarding_disabled_) {
39 CHECK(timestamp_logger_ == nullptr);
40 CHECK(sender_ == nullptr);
41 }
Austin Schuh898f4972020-01-11 17:21:25 -080042 }
43
Austin Schuh58646e22021-08-23 23:51:46 -070044 void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
45 MessageBridgeServerStatus *server_status,
46 ChannelTimestampSender *timestamp_loggers) {
Austin Schuh9cce6842024-04-02 18:55:44 -070047 // Clear out state when the source node restarts.
48 last_sent_ = TransmitTime();
Austin Schuh58646e22021-08-23 23:51:46 -070049 fetch_event_loop_ = fetch_event_loop;
50 if (fetch_event_loop_) {
51 fetcher_ = fetch_event_loop_->MakeRawFetcher(channel_);
52 } else {
53 fetcher_ = nullptr;
54 }
55
56 server_status_ = server_status;
Austin Schuhac6d89e2024-03-27 14:56:09 -070057 if (server_status_) {
Austin Schuh58646e22021-08-23 23:51:46 -070058 server_connection_ =
59 server_status_->FindServerConnection(send_node_factory_->node());
James Kuszmaula6681e22023-05-26 11:20:40 -070060 server_index_ = configuration::GetNodeIndex(
61 send_node_factory_->configuration(), send_node_factory_->node());
Austin Schuh58646e22021-08-23 23:51:46 -070062 }
James Kuszmaul94ca5132022-07-19 09:11:08 -070063 if (delivery_time_is_logged_ && timestamp_loggers != nullptr &&
64 !forwarding_disabled_) {
Austin Schuh58646e22021-08-23 23:51:46 -070065 timestamp_logger_ =
66 timestamp_loggers->SenderForChannel(channel_, connection_);
67 } else {
68 timestamp_logger_ = nullptr;
69 }
70
71 if (fetch_event_loop_) {
72 timestamp_timer_ =
73 fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
74 if (send_event_loop_) {
milind1f1dca32021-07-03 13:50:07 -070075 std::string timer_name =
76 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
77 fetcher_->channel()->name()->string_view(), " ",
78 fetcher_->channel()->type()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -070079 if (timer_) {
80 timer_->set_name(timer_name);
81 }
82 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
83 }
84 } else {
85 timestamp_timer_ = nullptr;
86 }
87 }
88
89 void SetSendEventLoop(aos::EventLoop *send_event_loop,
90 MessageBridgeClientStatus *client_status) {
Austin Schuh58646e22021-08-23 23:51:46 -070091 send_event_loop_ = send_event_loop;
James Kuszmaul94ca5132022-07-19 09:11:08 -070092 if (send_event_loop_ && !forwarding_disabled_) {
Austin Schuh58646e22021-08-23 23:51:46 -070093 sender_ = send_event_loop_->MakeRawSender(channel_);
94 } else {
95 sender_ = nullptr;
96 }
97
98 client_status_ = client_status;
99 if (client_status_) {
100 client_index_ = client_status_->FindClientIndex(
101 channel_->source_node()->string_view());
102 client_connection_ = client_status_->GetClientConnection(client_index_);
103 } else {
104 client_index_ = -1;
105 client_connection_ = nullptr;
106 }
107
108 if (send_event_loop_) {
109 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
110 if (fetcher_) {
111 std::string timer_name =
112 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
113 fetcher_->channel()->name()->string_view(), " ",
114 fetcher_->channel()->type()->string_view());
115 timer_->set_name(timer_name);
116 if (timestamp_timer_) {
117 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
118 }
119 }
120 } else {
121 timer_ = nullptr;
122 }
123 }
124
125 const Channel *channel() const { return channel_; }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800126
Austin Schuh9cce6842024-04-02 18:55:44 -0700127 // Returns true if the connection is reliable.
128 bool reliable() const { return time_to_live() == 0; }
129
130 uint32_t time_to_live() const {
Austin Schuh58646e22021-08-23 23:51:46 -0700131 return configuration::ConnectionToNode(channel_, send_node_factory_->node())
Austin Schuh4c570ea2020-11-19 23:13:24 -0800132 ->time_to_live();
133 }
134
Austin Schuh9cce6842024-04-02 18:55:44 -0700135 std::string Name() const {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700136 std::string result;
137 result +=
138 (fetch_event_loop_ ? fetch_event_loop_->node()->name()->string_view()
139 : std::string_view("?"));
140 result += " -> ";
141 result +=
142 (send_event_loop_ ? send_event_loop_->node()->name()->string_view()
143 : std::string_view("?"));
144 result += " ";
145 result += aos::configuration::StrippedChannelToString(channel());
146 return result;
147 }
148
Austin Schuh9cce6842024-04-02 18:55:44 -0700149 // Schedules forwarding any reliable messages when a node boots.
Austin Schuh58646e22021-08-23 23:51:46 -0700150 void ScheduleReliable() {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700151 if (forwarding_disabled()) {
152 return;
153 }
Austin Schuh58646e22021-08-23 23:51:46 -0700154
Austin Schuh9cce6842024-04-02 18:55:44 -0700155 // There is no sending side awake, don't do work.
Austin Schuh58646e22021-08-23 23:51:46 -0700156 if (!fetcher_) {
157 return;
158 }
Austin Schuh9cce6842024-04-02 18:55:44 -0700159
160 // The network connection is disconnected, forget about this message. If
161 // this is a reliable message, it will get picked up in Connect() so we
162 // don't need to follow it here.
163 if (server_connection_->state() != State::CONNECTED) {
164 return;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700165 }
166
Austin Schuh9cce6842024-04-02 18:55:44 -0700167 // If there is no receiving side, bail.
Austin Schuhac6d89e2024-03-27 14:56:09 -0700168 if (!timer_) {
169 return;
170 }
171
Austin Schuh9cce6842024-04-02 18:55:44 -0700172 // We only want the newest message, grab it and see if there's anything to
173 // do.
174 fetcher_->Fetch();
175
176 // No data, bail.
177 if (fetcher_->context().data == nullptr) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700178 return;
Austin Schuh58646e22021-08-23 23:51:46 -0700179 }
180
Austin Schuh9cce6842024-04-02 18:55:44 -0700181 // Now, we know we've got a message we need to deliver, mark it down.
182 QueueMessage(fetcher_->context().queue_index,
183 fetcher_->context().monotonic_event_time,
184 fetch_event_loop_->monotonic_now());
Austin Schuh58646e22021-08-23 23:51:46 -0700185
186 // Send at startup. It is the best we can do.
Austin Schuh9cce6842024-04-02 18:55:44 -0700187 const logger::BootTimestamp monotonic_delivery_time =
188 DeliveredTime(monotonic_remote_transmit_times_.front().transmit_time);
Austin Schuh58646e22021-08-23 23:51:46 -0700189
Austin Schuh9cce6842024-04-02 18:55:44 -0700190 // This can only happen if a node reboots in under 100 uS. That's crazy,
191 // CHECK for now and handle it if someone actually has a good need.
192 CHECK_EQ(monotonic_delivery_time.boot, send_node_factory_->boot_count());
193
194 CHECK_GE(monotonic_delivery_time.time, send_node_factory_->monotonic_now())
Austin Schuh58646e22021-08-23 23:51:46 -0700195 << ": Trying to deliver message in the past on channel "
196 << configuration::StrippedChannelToString(fetcher_->channel())
197 << " to node " << send_event_loop_->node()->name()->string_view()
198 << " sent from " << fetcher_->channel()->source_node()->string_view()
199 << " at " << fetch_node_factory_->monotonic_now();
200
Austin Schuh6adbc1e2024-04-01 16:17:27 -0700201 if (!timer_scheduled_) {
202 server_status_->AddSentPacket(server_index_, channel_);
Austin Schuh9cce6842024-04-02 18:55:44 -0700203 timer_->Schedule(monotonic_delivery_time.time);
Austin Schuh6adbc1e2024-04-01 16:17:27 -0700204 timer_scheduled_ = true;
Austin Schuh58646e22021-08-23 23:51:46 -0700205 }
206 }
207
Austin Schuh9cce6842024-04-02 18:55:44 -0700208 // Handles a message begin delivered to message_bridge_server, and either
209 // drops it or queues it up.
Austin Schuhac6d89e2024-03-27 14:56:09 -0700210 void MessageWatcherCallback(uint32_t sent_queue_index,
211 monotonic_clock::time_point monotonic_sent_time,
212 monotonic_clock::time_point transmit_time) {
Austin Schuh9cce6842024-04-02 18:55:44 -0700213 if (server_connection_->state() != State::CONNECTED) {
214 server_status_->AddDroppedPacket(server_index_, channel_);
215 return;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700216 }
Austin Schuh9cce6842024-04-02 18:55:44 -0700217
218 QueueMessage(sent_queue_index, monotonic_sent_time, transmit_time);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700219 Schedule();
220 }
221
Austin Schuh9cce6842024-04-02 18:55:44 -0700222 void QueueMessage(uint32_t sent_queue_index,
223 monotonic_clock::time_point monotonic_sent_time,
224 monotonic_clock::time_point transmit_time) {
225 CHECK(!forwarding_disabled());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700226
Austin Schuh9cce6842024-04-02 18:55:44 -0700227 // When a reliable message gets queued, we can both receive the wakeup from
228 // the watcher, and from ScheduleReliable. In that case, detect that it is
229 // already in the queue and deduplicate with it.
Austin Schuhac6d89e2024-03-27 14:56:09 -0700230 if (monotonic_remote_transmit_times_.size() > 0u) {
Austin Schuh9cce6842024-04-02 18:55:44 -0700231 const TransmitTime back = monotonic_remote_transmit_times_
Austin Schuhac6d89e2024-03-27 14:56:09 -0700232 [monotonic_remote_transmit_times_.size() - 1];
233 if (back.sent_queue_index == sent_queue_index) {
234 CHECK_EQ(back.monotonic_sent_time, monotonic_sent_time) << this;
Austin Schuh9cce6842024-04-02 18:55:44 -0700235 CHECK(reliable());
236 CHECK_LE(back.transmit_time, transmit_time) << this;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700237 return;
238 }
239 }
240
241 // Capture the time this message was published over the network on the
242 // remote node
243 monotonic_remote_transmit_times_.push_back(TransmitTime{
244 .monotonic_sent_time = monotonic_sent_time,
245 .sent_queue_index = sent_queue_index,
246 .transmit_time = transmit_time,
247 });
248 }
249
Austin Schuh9cce6842024-04-02 18:55:44 -0700250 // Handles this node connecting to the network.
Austin Schuhac6d89e2024-03-27 14:56:09 -0700251 void Connect() {
Austin Schuh9cce6842024-04-02 18:55:44 -0700252 CHECK(fetcher_);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700253
Austin Schuh9cce6842024-04-02 18:55:44 -0700254 // We only send the last message. Point the fetcher to the latest to handle
255 // getting too far behind.
256 fetcher_->Fetch();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700257
Austin Schuh9cce6842024-04-02 18:55:44 -0700258 // Unreliable messages aren't resent on reconnect.
259 if (!reliable()) {
260 return;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700261 }
Austin Schuh9cce6842024-04-02 18:55:44 -0700262
263 if (forwarding_disabled()) {
264 return;
265 }
266
267 // Ignore it if there is no data.
268 if (fetcher_->context().data == nullptr) {
269 return;
270 }
271
272 // See if the newest message got sent already. If it hasn't, queue it up to
273 // be sent.
274 if (fetcher_->context().queue_index != last_sent_.sent_queue_index) {
275 QueueMessage(fetcher_->context().queue_index,
276 fetcher_->context().monotonic_event_time,
277 fetch_event_loop_->monotonic_now());
278 }
279
280 Schedule();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700281 }
282
Austin Schuh9cce6842024-04-02 18:55:44 -0700283 // Returns true if we know that this connection sends to the destination node.
284 // Returns false if the destination hasn't been constructed.
Austin Schuhac6d89e2024-03-27 14:56:09 -0700285 bool SendingTo(const Node *destination) {
286 return send_event_loop_ && send_event_loop_->node() == destination;
287 }
288
Austin Schuh898f4972020-01-11 17:21:25 -0800289 // Kicks us to re-fetch and schedule the timer.
290 void Schedule() {
Austin Schuh58646e22021-08-23 23:51:46 -0700291 CHECK(!forwarding_disabled());
Austin Schuh9cce6842024-04-02 18:55:44 -0700292 // Can't receive, bail.
Austin Schuh58646e22021-08-23 23:51:46 -0700293 if (!fetcher_) {
294 return;
295 }
Austin Schuh9cce6842024-04-02 18:55:44 -0700296
297 // Already scheduled, nothing to see here.
Austin Schuh58646e22021-08-23 23:51:46 -0700298 if (timer_scheduled_) {
299 return;
300 }
Austin Schuh9cce6842024-04-02 18:55:44 -0700301
302 // We've finally caught up, nothing to do.
303 if (monotonic_remote_transmit_times_.empty()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700304 return;
305 }
306
Austin Schuhac6d89e2024-03-27 14:56:09 -0700307 const monotonic_clock::time_point transmit_time =
308 monotonic_remote_transmit_times_[0].transmit_time;
309
Austin Schuh58646e22021-08-23 23:51:46 -0700310 // Compute the time to publish this message.
Austin Schuh9cce6842024-04-02 18:55:44 -0700311 const logger::BootTimestamp monotonic_delivery_time =
Austin Schuhac6d89e2024-03-27 14:56:09 -0700312 DeliveredTime(transmit_time);
Austin Schuh58646e22021-08-23 23:51:46 -0700313
Austin Schuh9cce6842024-04-02 18:55:44 -0700314 // This should be published after the reboot. Forget about it.
315 if (monotonic_delivery_time.boot != send_node_factory_->boot_count()) {
316 CHECK_GT(monotonic_delivery_time.boot, send_node_factory_->boot_count());
317
318 monotonic_remote_transmit_times_.erase(
319 monotonic_remote_transmit_times_.begin());
320 CHECK(monotonic_remote_transmit_times_.empty());
321 return;
322 }
323
324 CHECK_GE(monotonic_delivery_time.time, send_node_factory_->monotonic_now())
325 << ": " << this << " Trying to deliver message in the past on channel "
Austin Schuh58646e22021-08-23 23:51:46 -0700326 << configuration::StrippedChannelToString(fetcher_->channel())
327 << " to node " << send_event_loop_->node()->name()->string_view()
328 << " sent from " << fetcher_->channel()->source_node()->string_view()
329 << " at " << fetch_node_factory_->monotonic_now();
330
Austin Schuh6adbc1e2024-04-01 16:17:27 -0700331 CHECK(timer_);
332 server_status_->AddSentPacket(server_index_, channel_);
Austin Schuh9cce6842024-04-02 18:55:44 -0700333 timer_->Schedule(monotonic_delivery_time.time);
Austin Schuh6adbc1e2024-04-01 16:17:27 -0700334 timer_scheduled_ = true;
Austin Schuh58646e22021-08-23 23:51:46 -0700335 }
336
337 private:
Austin Schuh58646e22021-08-23 23:51:46 -0700338 // Actually sends the message, and reschedules.
Austin Schuh898f4972020-01-11 17:21:25 -0800339 void Send() {
Austin Schuh58646e22021-08-23 23:51:46 -0700340 timer_scheduled_ = false;
Austin Schuh9cce6842024-04-02 18:55:44 -0700341
Austin Schuh58646e22021-08-23 23:51:46 -0700342 CHECK(sender_);
343 CHECK(client_status_);
Austin Schuh9cce6842024-04-02 18:55:44 -0700344 CHECK(fetcher_);
345
346 CHECK(!monotonic_remote_transmit_times_.empty());
347 while (fetcher_->context().queue_index !=
348 monotonic_remote_transmit_times_.front().sent_queue_index) {
349 if (!fetcher_->FetchNext()) {
350 break;
351 }
352 }
Austin Schuhac6d89e2024-03-27 14:56:09 -0700353
354 // Confirm that the first element in the times list is ours, and pull the
355 // transmit time out of it.
Austin Schuhac6d89e2024-03-27 14:56:09 -0700356 CHECK_EQ(monotonic_remote_transmit_times_[0].monotonic_sent_time,
357 fetcher_->context().monotonic_event_time);
358 CHECK_EQ(monotonic_remote_transmit_times_[0].sent_queue_index,
359 fetcher_->context().queue_index);
360
Austin Schuh9cce6842024-04-02 18:55:44 -0700361 const TransmitTime timestamp = monotonic_remote_transmit_times_[0];
Austin Schuhac6d89e2024-03-27 14:56:09 -0700362
363 monotonic_remote_transmit_times_.erase(
364 monotonic_remote_transmit_times_.begin());
365
Austin Schuhc0b0f722020-12-12 18:36:06 -0800366 if (server_connection_->state() != State::CONNECTED) {
Austin Schuhc0b0f722020-12-12 18:36:06 -0800367 Schedule();
368 return;
369 }
Austin Schuhac6d89e2024-03-27 14:56:09 -0700370
Austin Schuh4c3b9702020-08-30 11:34:55 -0700371 // Fill out the send times.
milind1f1dca32021-07-03 13:50:07 -0700372 sender_->CheckOk(sender_->Send(
373 fetcher_->context().data, fetcher_->context().size,
374 fetcher_->context().monotonic_event_time,
Austin Schuh9cce6842024-04-02 18:55:44 -0700375 fetcher_->context().realtime_event_time, timestamp.transmit_time,
milind1f1dca32021-07-03 13:50:07 -0700376 fetcher_->context().queue_index, fetcher_->context().source_boot_uuid));
Austin Schuh898f4972020-01-11 17:21:25 -0800377
Austin Schuh9cce6842024-04-02 18:55:44 -0700378 // Record that this got sent.
379 last_sent_ = timestamp;
380
Austin Schuh4c3b9702020-08-30 11:34:55 -0700381 // And simulate message_bridge's offset recovery.
Austin Schuh9cce6842024-04-02 18:55:44 -0700382 client_status_->SampleFilter(client_index_, timestamp.transmit_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700383 sender_->monotonic_sent_time(),
384 fetcher_->context().source_boot_uuid);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700385
386 client_connection_->mutate_received_packets(
387 client_connection_->received_packets() + 1);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700388
389 if (timestamp_logger_) {
Austin Schuheeaa2022021-01-02 21:52:03 -0800390 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800391 fbb.ForceDefaults(true);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800392 // Reset the filter every time the UUID changes. There's probably a more
393 // clever way to do this, but that means a better concept of rebooting.
James Kuszmaulbedbb342023-05-26 11:19:27 -0700394 if (!server_status_->BootUUID(destination_node_index_).has_value() ||
395 (server_status_->BootUUID(destination_node_index_).value() !=
396 send_node_factory_->boot_uuid())) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800397 server_status_->ResetFilter(destination_node_index_);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800398 server_status_->SetBootUUID(destination_node_index_,
399 send_node_factory_->boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800400 }
401
Austin Schuhcdd90272021-03-15 12:46:16 -0700402 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
403 send_node_factory_->boot_uuid().PackVector(&fbb);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800404
Austin Schuheeaa2022021-01-02 21:52:03 -0800405 RemoteMessage::Builder message_header_builder(fbb);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700406
407 message_header_builder.add_channel_index(channel_index_);
408
409 // Swap the remote and sent metrics. They are from the sender's
410 // perspective, not the receiver's perspective.
411 message_header_builder.add_monotonic_remote_time(
412 fetcher_->context().monotonic_event_time.time_since_epoch().count());
413 message_header_builder.add_realtime_remote_time(
414 fetcher_->context().realtime_event_time.time_since_epoch().count());
415 message_header_builder.add_remote_queue_index(
416 fetcher_->context().queue_index);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700417 message_header_builder.add_monotonic_remote_transmit_time(
Austin Schuh9cce6842024-04-02 18:55:44 -0700418 timestamp.transmit_time.time_since_epoch().count());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700419 message_header_builder.add_monotonic_sent_time(
420 sender_->monotonic_sent_time().time_since_epoch().count());
421 message_header_builder.add_realtime_sent_time(
422 sender_->realtime_sent_time().time_since_epoch().count());
423 message_header_builder.add_queue_index(sender_->sent_queue_index());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800424 message_header_builder.add_boot_uuid(boot_uuid_offset);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700425
Austin Schuheeaa2022021-01-02 21:52:03 -0800426 fbb.Finish(message_header_builder.Finish());
427
428 remote_timestamps_.emplace_back(
429 FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
430 fetch_node_factory_->monotonic_now() +
431 send_node_factory_->network_delay());
432 ScheduleTimestamp();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700433 }
434
Austin Schuh898f4972020-01-11 17:21:25 -0800435 Schedule();
436 }
437
Austin Schuheeaa2022021-01-02 21:52:03 -0800438 // Schedules sending the next timestamp in remote_timestamps_ if there is one.
439 void ScheduleTimestamp() {
440 if (remote_timestamps_.empty()) {
441 timestamp_timer_->Disable();
442 return;
443 }
444
445 if (scheduled_time_ !=
446 remote_timestamps_.front().monotonic_timestamp_time) {
Philipp Schradera6712522023-07-05 20:25:11 -0700447 timestamp_timer_->Schedule(
Austin Schuheeaa2022021-01-02 21:52:03 -0800448 remote_timestamps_.front().monotonic_timestamp_time);
449 scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
450 return;
451 } else {
452 scheduled_time_ = monotonic_clock::min_time;
453 }
454 }
455
456 // Sends the next timestamp in remote_timestamps_.
457 void SendTimestamp() {
458 CHECK(!remote_timestamps_.empty());
459
460 // Send out all timestamps at the currently scheduled time.
461 while (remote_timestamps_.front().monotonic_timestamp_time ==
462 scheduled_time_) {
463 if (server_connection_->state() == State::CONNECTED) {
milind1f1dca32021-07-03 13:50:07 -0700464 timestamp_logger_->CheckOk(timestamp_logger_->Send(
465 std::move(remote_timestamps_.front().remote_message)));
Austin Schuheeaa2022021-01-02 21:52:03 -0800466 }
467 remote_timestamps_.pop_front();
468 if (remote_timestamps_.empty()) {
469 break;
470 }
471 }
472
473 ScheduleTimestamp();
474 }
475
Austin Schuh898f4972020-01-11 17:21:25 -0800476 // Converts from time on the sending node to time on the receiving node.
Austin Schuh9cce6842024-04-02 18:55:44 -0700477 logger::BootTimestamp DeliveredTime(
Austin Schuhac6d89e2024-03-27 14:56:09 -0700478 const monotonic_clock::time_point transmit_time) const {
Austin Schuh898f4972020-01-11 17:21:25 -0800479 const distributed_clock::time_point distributed_sent_time =
Austin Schuhac6d89e2024-03-27 14:56:09 -0700480 fetch_node_factory_->ToDistributedClock(transmit_time);
Austin Schuh898f4972020-01-11 17:21:25 -0800481
Austin Schuh58646e22021-08-23 23:51:46 -0700482 const logger::BootTimestamp t = send_node_factory_->FromDistributedClock(
Austin Schuhac6d89e2024-03-27 14:56:09 -0700483 distributed_sent_time + send_node_factory_->network_delay());
Austin Schuh9cce6842024-04-02 18:55:44 -0700484 return t;
Austin Schuh898f4972020-01-11 17:21:25 -0800485 }
486
Austin Schuh58646e22021-08-23 23:51:46 -0700487 const Channel *channel_;
488 const Connection *connection_;
489
Austin Schuh898f4972020-01-11 17:21:25 -0800490 // Factories used for time conversion.
491 aos::NodeEventLoopFactory *fetch_node_factory_;
492 aos::NodeEventLoopFactory *send_node_factory_;
493
Austin Schuheeaa2022021-01-02 21:52:03 -0800494 // Event loop which fetching and sending timestamps are scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700495 aos::EventLoop *fetch_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800496 // Event loop which sending is scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700497 aos::EventLoop *send_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800498 // Timer used to send.
Austin Schuh58646e22021-08-23 23:51:46 -0700499 aos::TimerHandler *timer_ = nullptr;
Austin Schuh9cce6842024-04-02 18:55:44 -0700500 bool timer_scheduled_ = false;
Austin Schuheeaa2022021-01-02 21:52:03 -0800501 // Timer used to send timestamps out.
Austin Schuh58646e22021-08-23 23:51:46 -0700502 aos::TimerHandler *timestamp_timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800503 // Time that the timer is scheduled for. Used to track if it needs to be
504 // rescheduled.
505 monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
506
Austin Schuh898f4972020-01-11 17:21:25 -0800507 // Fetcher used to receive messages.
508 std::unique_ptr<aos::RawFetcher> fetcher_;
509 // Sender to send them back out.
510 std::unique_ptr<aos::RawSender> sender_;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800511
Austin Schuh58646e22021-08-23 23:51:46 -0700512 MessageBridgeServerStatus *server_status_ = nullptr;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800513 const size_t destination_node_index_;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700514
515 ServerConnection *server_connection_ = nullptr;
James Kuszmaula6681e22023-05-26 11:20:40 -0700516 int server_index_ = -1;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700517 MessageBridgeClientStatus *client_status_ = nullptr;
Austin Schuh58646e22021-08-23 23:51:46 -0700518 int client_index_ = -1;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700519 ClientConnection *client_connection_ = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700520
521 size_t channel_index_;
Austin Schuh0de30f32020-12-06 12:44:28 -0800522 aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800523
Austin Schuhac6d89e2024-03-27 14:56:09 -0700524 struct TransmitTime {
Austin Schuh9cce6842024-04-02 18:55:44 -0700525 monotonic_clock::time_point monotonic_sent_time = monotonic_clock::min_time;
526 uint32_t sent_queue_index = 0xffffffff;
527 monotonic_clock::time_point transmit_time = monotonic_clock::min_time;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700528 };
529
Austin Schuh9cce6842024-04-02 18:55:44 -0700530 // Stores the time the message was handed to the kernel to be published on
Austin Schuhac6d89e2024-03-27 14:56:09 -0700531 // the remote node over the network for all forwarded relevant messages.
532 std::vector<TransmitTime> monotonic_remote_transmit_times_;
533
Austin Schuh9cce6842024-04-02 18:55:44 -0700534 // Stores the last message which was published. This is used to know if we
535 // need to re-transmit something on reconnect or not.
536 TransmitTime last_sent_;
537
Austin Schuheeaa2022021-01-02 21:52:03 -0800538 struct Timestamp {
539 Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
540 monotonic_clock::time_point new_monotonic_timestamp_time)
541 : remote_message(std::move(new_remote_message)),
542 monotonic_timestamp_time(new_monotonic_timestamp_time) {}
543 FlatbufferDetachedBuffer<RemoteMessage> remote_message;
544 monotonic_clock::time_point monotonic_timestamp_time;
545 };
546
547 std::deque<Timestamp> remote_timestamps_;
Austin Schuh58646e22021-08-23 23:51:46 -0700548
549 bool delivery_time_is_logged_;
550
551 bool forwarding_disabled_ = false;
Austin Schuh898f4972020-01-11 17:21:25 -0800552};
553
554SimulatedMessageBridge::SimulatedMessageBridge(
555 SimulatedEventLoopFactory *simulated_event_loop_factory) {
556 CHECK(
557 configuration::MultiNode(simulated_event_loop_factory->configuration()));
558
559 // Pre-build up event loops for every node. They are pretty cheap anyways.
560 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700561 NodeEventLoopFactory *node_factory =
562 simulated_event_loop_factory->GetNodeEventLoopFactory(node);
563 auto it = event_loop_map_.emplace(node, node_factory);
Austin Schuhcde938c2020-02-02 17:30:07 -0800564 CHECK(it.second);
565
Austin Schuh58646e22021-08-23 23:51:46 -0700566 node_factory->OnStartup(
567 [this, simulated_event_loop_factory, node_state = &it.first->second]() {
568 node_state->MakeEventLoop();
569 const size_t my_node_index = configuration::GetNodeIndex(
570 simulated_event_loop_factory->configuration(),
571 node_state->event_loop->node());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700572
Austin Schuh58646e22021-08-23 23:51:46 -0700573 size_t node_index = 0;
James Kuszmaulbedbb342023-05-26 11:19:27 -0700574 for (const std::optional<MessageBridgeServerStatus::NodeState>
Austin Schuhac6d89e2024-03-27 14:56:09 -0700575 &connection : node_state->server_status_->nodes()) {
James Kuszmaulbedbb342023-05-26 11:19:27 -0700576 if (connection.has_value()) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700577 node_state->server_status_->ResetFilter(node_index);
Austin Schuh58646e22021-08-23 23:51:46 -0700578 }
579 ++node_index;
580 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700581
Austin Schuh58646e22021-08-23 23:51:46 -0700582 for (const ClientConnection *client_connections :
583 *node_state->client_status->mutable_client_statistics()
584 ->connections()) {
585 const Node *client_node = configuration::GetNode(
586 simulated_event_loop_factory->configuration(),
587 client_connections->node()->name()->string_view());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700588
Austin Schuh58646e22021-08-23 23:51:46 -0700589 auto client_event_loop = event_loop_map_.find(client_node);
590 client_event_loop->second.SetBootUUID(
591 my_node_index, node_state->event_loop->boot_uuid());
592 }
593 });
Austin Schuh4c3b9702020-08-30 11:34:55 -0700594
Austin Schuh58646e22021-08-23 23:51:46 -0700595 node_factory->OnShutdown([node_state = &it.first->second]() {
596 node_state->SetEventLoop(nullptr);
597 });
Austin Schuh20ac95d2020-12-05 17:24:19 -0800598 }
599
Austin Schuh898f4972020-01-11 17:21:25 -0800600 for (const Channel *channel :
601 *simulated_event_loop_factory->configuration()->channels()) {
602 if (!channel->has_destination_nodes()) {
603 continue;
604 }
605
606 // Find the sending node.
Austin Schuh58646e22021-08-23 23:51:46 -0700607 const Node *source_node =
Austin Schuh898f4972020-01-11 17:21:25 -0800608 configuration::GetNode(simulated_event_loop_factory->configuration(),
609 channel->source_node()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -0700610 auto source_event_loop = event_loop_map_.find(source_node);
Austin Schuh898f4972020-01-11 17:21:25 -0800611 CHECK(source_event_loop != event_loop_map_.end());
612
613 std::unique_ptr<DelayersVector> delayers =
614 std::make_unique<DelayersVector>();
615
616 // And then build up a RawMessageDelayer for each destination.
617 for (const Connection *connection : *channel->destination_nodes()) {
618 const Node *destination_node =
619 configuration::GetNode(simulated_event_loop_factory->configuration(),
620 connection->name()->string_view());
621 auto destination_event_loop = event_loop_map_.find(destination_node);
622 CHECK(destination_event_loop != event_loop_map_.end());
623
Austin Schuh2f8fd752020-09-01 22:38:28 -0700624 const size_t destination_node_index = configuration::GetNodeIndex(
625 simulated_event_loop_factory->configuration(), destination_node);
626
627 const bool delivery_time_is_logged =
Austin Schuh58646e22021-08-23 23:51:46 -0700628 configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
629 source_node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700630
Austin Schuh58646e22021-08-23 23:51:46 -0700631 delayers->v.emplace_back(std::make_unique<RawMessageDelayer>(
632 channel, connection,
633 simulated_event_loop_factory->GetNodeEventLoopFactory(source_node),
Austin Schuh898f4972020-01-11 17:21:25 -0800634 simulated_event_loop_factory->GetNodeEventLoopFactory(
635 destination_node),
Austin Schuh58646e22021-08-23 23:51:46 -0700636 destination_node_index, delivery_time_is_logged));
637
638 source_event_loop->second.AddSourceDelayer(delayers->v.back().get());
639 destination_event_loop->second.AddDestinationDelayer(
640 delayers->v.back().get());
Austin Schuh898f4972020-01-11 17:21:25 -0800641 }
642
Austin Schuh4c3b9702020-08-30 11:34:55 -0700643 const Channel *const timestamp_channel = configuration::GetChannel(
644 simulated_event_loop_factory->configuration(), "/aos",
Austin Schuh58646e22021-08-23 23:51:46 -0700645 Timestamp::GetFullyQualifiedName(), "message_bridge", source_node);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700646
647 if (channel == timestamp_channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700648 source_event_loop->second.SetSendData(
Austin Schuhac6d89e2024-03-27 14:56:09 -0700649 [source_event_loop, captured_delayers = delayers.get()](
650 uint32_t sent_queue_index,
651 monotonic_clock::time_point monotonic_sent_time) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700652 for (std::unique_ptr<RawMessageDelayer> &delayer :
Austin Schuh58646e22021-08-23 23:51:46 -0700653 captured_delayers->v) {
Austin Schuh9cce6842024-04-02 18:55:44 -0700654 delayer->MessageWatcherCallback(
Austin Schuhac6d89e2024-03-27 14:56:09 -0700655 sent_queue_index, monotonic_sent_time,
656 source_event_loop->second.event_loop->monotonic_now());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700657 }
658 });
659 } else {
Austin Schuh58646e22021-08-23 23:51:46 -0700660 source_event_loop->second.AddDelayerWatcher(channel, delayers.get());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700661 }
Austin Schuh898f4972020-01-11 17:21:25 -0800662 delayers_list_.emplace_back(std::move(delayers));
663 }
664}
665
666SimulatedMessageBridge::~SimulatedMessageBridge() {}
667
Austin Schuh6f3babe2020-01-26 20:34:50 -0800668void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700669 for (std::unique_ptr<DelayersVector> &delayers : delayers_list_) {
670 if (delayers->v.size() > 0) {
671 if (delayers->v[0]->channel() == channel) {
672 delayers->disable_forwarding = true;
673 for (std::unique_ptr<RawMessageDelayer> &delayer : delayers->v) {
674 delayer->set_forwarding_disabled(true);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800675 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800676 }
677 }
678 }
679}
680
Austin Schuhc0b0f722020-12-12 18:36:06 -0800681void SimulatedMessageBridge::Disconnect(const Node *source,
682 const Node *destination) {
683 SetState(source, destination, message_bridge::State::DISCONNECTED);
684}
685
686void SimulatedMessageBridge::Connect(const Node *source,
687 const Node *destination) {
688 SetState(source, destination, message_bridge::State::CONNECTED);
689}
690void SimulatedMessageBridge::SetState(const Node *source,
691 const Node *destination,
692 message_bridge::State state) {
693 auto source_state = event_loop_map_.find(source);
694 CHECK(source_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700695 source_state->second.SetServerState(destination, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800696
697 auto destination_state = event_loop_map_.find(destination);
698 CHECK(destination_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700699 destination_state->second.SetClientState(source, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800700}
701
James Kuszmaul94ca5132022-07-19 09:11:08 -0700702void SimulatedMessageBridge::DisableStatistics(DestroySenders destroy_senders) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700703 for (std::pair<const Node *const, State> &state : event_loop_map_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700704 state.second.DisableStatistics(destroy_senders);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700705 }
706}
707
James Kuszmaul94ca5132022-07-19 09:11:08 -0700708void SimulatedMessageBridge::DisableStatistics(const Node *node,
709 DestroySenders destroy_senders) {
Austin Schuh48205e62021-11-12 14:13:18 -0800710 auto it = event_loop_map_.find(node);
711 CHECK(it != event_loop_map_.end());
James Kuszmaul94ca5132022-07-19 09:11:08 -0700712 it->second.DisableStatistics(destroy_senders);
Austin Schuh48205e62021-11-12 14:13:18 -0800713}
714
715void SimulatedMessageBridge::EnableStatistics() {
716 for (std::pair<const Node *const, State> &state : event_loop_map_) {
717 state.second.EnableStatistics();
718 }
719}
720
721void SimulatedMessageBridge::EnableStatistics(const Node *node) {
722 auto it = event_loop_map_.find(node);
723 CHECK(it != event_loop_map_.end());
724 it->second.EnableStatistics();
725}
726
Austin Schuhac6d89e2024-03-27 14:56:09 -0700727void SimulatedMessageBridge::State::MakeEventLoop() {
728 // Message bridge isn't the thing that should be catching sent-too-fast,
729 // and may need to be able to forward too-fast messages replayed from old
730 // logfiles.
731 SetEventLoop(node_factory_->MakeEventLoop(
732 "message_bridge", {NodeEventLoopFactory::CheckSentTooFast::kNo,
733 NodeEventLoopFactory::ExclusiveSenders::kNo,
734 {}}));
735}
736
Austin Schuh58646e22021-08-23 23:51:46 -0700737void SimulatedMessageBridge::State::SetEventLoop(
738 std::unique_ptr<aos::EventLoop> loop) {
739 if (!loop) {
740 timestamp_loggers = ChannelTimestampSender(nullptr);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700741 server_status_.reset();
Austin Schuh58646e22021-08-23 23:51:46 -0700742 client_status.reset();
743 for (RawMessageDelayer *source_delayer : source_delayers_) {
744 source_delayer->SetFetchEventLoop(nullptr, nullptr, nullptr);
745 }
746 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
747 destination_delayer->SetSendEventLoop(nullptr, nullptr);
748 }
749 event_loop = std::move(loop);
750 return;
751 } else {
752 CHECK(!event_loop);
753 }
754 event_loop = std::move(loop);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700755
Austin Schuh58646e22021-08-23 23:51:46 -0700756 event_loop->SkipTimingReport();
757 event_loop->SkipAosLog();
758
759 for (std::pair<const Channel *, DelayersVector *> &watcher :
760 delayer_watchers_) {
761 // Don't register watchers if we know we aren't forwarding.
762 if (watcher.second->disable_forwarding) continue;
763 event_loop->MakeRawNoArgWatcher(
Austin Schuhac6d89e2024-03-27 14:56:09 -0700764 watcher.first,
765 [this, captured_delayers = watcher.second](const Context &context) {
Austin Schuh58646e22021-08-23 23:51:46 -0700766 // We might get told after registering, so don't forward at that point
767 // too.
768 for (std::unique_ptr<RawMessageDelayer> &delayer :
769 captured_delayers->v) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700770 delayer->MessageWatcherCallback(context.queue_index,
771 context.monotonic_event_time,
772 event_loop->monotonic_now());
Austin Schuh58646e22021-08-23 23:51:46 -0700773 }
774 });
775 }
776
777 timestamp_loggers = ChannelTimestampSender(event_loop.get());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700778 server_status_ =
779 std::make_unique<MessageBridgeServerStatus>(event_loop.get());
Austin Schuh48205e62021-11-12 14:13:18 -0800780 if (disable_statistics_) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700781 server_status_->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
Austin Schuh48205e62021-11-12 14:13:18 -0800782 }
Austin Schuh58646e22021-08-23 23:51:46 -0700783
784 {
785 size_t node_index = 0;
James Kuszmaulbedbb342023-05-26 11:19:27 -0700786 for (const std::optional<MessageBridgeServerStatus::NodeState> &connection :
Austin Schuhac6d89e2024-03-27 14:56:09 -0700787 server_status_->nodes()) {
James Kuszmaulbedbb342023-05-26 11:19:27 -0700788 if (connection.has_value()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700789 if (boot_uuids_[node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800790 switch (server_state_[node_index]) {
791 case message_bridge::State::DISCONNECTED:
Austin Schuhac6d89e2024-03-27 14:56:09 -0700792 server_status_->Disconnect(node_index);
Austin Schuh367a7f42021-11-23 23:04:36 -0800793 break;
794 case message_bridge::State::CONNECTED:
Austin Schuhac6d89e2024-03-27 14:56:09 -0700795 server_status_->Connect(node_index, event_loop->monotonic_now());
Austin Schuh367a7f42021-11-23 23:04:36 -0800796 break;
797 }
Austin Schuh58646e22021-08-23 23:51:46 -0700798 } else {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700799 server_status_->Disconnect(node_index);
Austin Schuh58646e22021-08-23 23:51:46 -0700800 }
801 }
802 ++node_index;
803 }
804 }
805
806 for (size_t i = 0; i < boot_uuids_.size(); ++i) {
807 if (boot_uuids_[i] != UUID::Zero()) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700808 server_status_->SetBootUUID(i, boot_uuids_[i]);
Austin Schuh58646e22021-08-23 23:51:46 -0700809 }
810 }
Austin Schuh58646e22021-08-23 23:51:46 -0700811 if (fn_) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700812 server_status_->set_send_data(fn_);
Austin Schuh58646e22021-08-23 23:51:46 -0700813 }
814 client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
815 if (disable_statistics_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700816 client_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
Austin Schuh58646e22021-08-23 23:51:46 -0700817 }
818
819 for (size_t i = 0;
820 i < client_status->mutable_client_statistics()->connections()->size();
821 ++i) {
822 ClientConnection *client_connection =
823 client_status->mutable_client_statistics()
824 ->mutable_connections()
825 ->GetMutableObject(i);
826 const Node *client_node = configuration::GetNode(
827 node_factory_->configuration(),
828 client_connection->node()->name()->string_view());
829 const size_t client_node_index = configuration::GetNodeIndex(
830 node_factory_->configuration(), client_node);
831 if (boot_uuids_[client_node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800832 if (client_connection->state() != client_state_[client_node_index]) {
833 switch (client_state_[client_node_index]) {
834 case message_bridge::State::DISCONNECTED:
835 client_status->Disconnect(i);
836 break;
837 case message_bridge::State::CONNECTED:
838 client_status->Connect(i);
839 break;
840 }
841 }
Austin Schuh58646e22021-08-23 23:51:46 -0700842 } else {
Austin Schuh367a7f42021-11-23 23:04:36 -0800843 client_status->Disconnect(i);
Austin Schuh58646e22021-08-23 23:51:46 -0700844 }
845 }
846
Austin Schuh2f8fd752020-09-01 22:38:28 -0700847 for (const Channel *channel : *event_loop->configuration()->channels()) {
848 CHECK(channel->has_source_node());
849
850 // Sent by us.
851 if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
852 channel->has_destination_nodes()) {
853 for (const Connection *connection : *channel->destination_nodes()) {
854 const bool delivery_time_is_logged =
855 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
856 connection, event_loop->node());
857
James Kuszmaul94ca5132022-07-19 09:11:08 -0700858 const RawMessageDelayer *delayer = nullptr;
859 for (const RawMessageDelayer *candidate : source_delayers_) {
860 if (candidate->channel() == channel) {
861 delayer = candidate;
862 }
863 }
864
Austin Schuh2f8fd752020-09-01 22:38:28 -0700865 // And the timestamps are then logged back by us again.
James Kuszmaul94ca5132022-07-19 09:11:08 -0700866 if (!delivery_time_is_logged ||
867 CHECK_NOTNULL(delayer)->forwarding_disabled()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700868 continue;
869 }
870
Austin Schuh89c9b812021-02-20 14:42:10 -0800871 timestamp_loggers.SenderForChannel(channel, connection);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700872 }
873 }
874 }
Austin Schuh58646e22021-08-23 23:51:46 -0700875
876 for (RawMessageDelayer *source_delayer : source_delayers_) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700877 source_delayer->SetFetchEventLoop(event_loop.get(), server_status_.get(),
Austin Schuh58646e22021-08-23 23:51:46 -0700878 &timestamp_loggers);
879 }
880 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
881 destination_delayer->SetSendEventLoop(event_loop.get(),
882 client_status.get());
883 }
884 event_loop->OnRun([this]() {
885 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
Austin Schuh9cce6842024-04-02 18:55:44 -0700886 if (destination_delayer->reliable()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700887 destination_delayer->ScheduleReliable();
888 }
889 }
James Kuszmaul86e86c32022-07-21 17:39:47 -0700890 // Note: This exists to work around the fact that some users like to be able
891 // to send reliable messages while execution is stopped, creating a
892 // situation where the following sequencing can occur:
893 // 1) <While stopped> Send a reliable message on Node A (to be forwarded to
894 // Node B).
895 // 2) Node B starts up.
896 // 3) Anywhere from 0 to N seconds later, Node A starts up.
897 //
898 // In this case, we need the reliable message to make it to Node B, but it
899 // also shouldn't make it to Node B until Node A has started up.
900 //
901 // Ideally, if the user were to wait for the Node B OnRun callbacks to send
902 // the message, then that would trigger the watchers in the delayers.
903 // However, we so far have continued to support Sending while stopped....
904 for (RawMessageDelayer *source_delayer : source_delayers_) {
Austin Schuh9cce6842024-04-02 18:55:44 -0700905 if (source_delayer->reliable()) {
James Kuszmaul86e86c32022-07-21 17:39:47 -0700906 source_delayer->ScheduleReliable();
907 }
908 }
Austin Schuh58646e22021-08-23 23:51:46 -0700909 });
Austin Schuh2f8fd752020-09-01 22:38:28 -0700910}
911
Austin Schuhac6d89e2024-03-27 14:56:09 -0700912void SimulatedMessageBridge::State::SetSendData(
913 std::function<void(uint32_t, monotonic_clock::time_point)> fn) {
914 CHECK(!fn_);
915 fn_ = std::move(fn);
916 if (server_status_) {
917 server_status_->set_send_data(fn_);
918 }
919}
920
921void SimulatedMessageBridge::State::SetBootUUID(size_t node_index,
922 const UUID &boot_uuid) {
923 boot_uuids_[node_index] = boot_uuid;
924 const Node *node = node_factory_->configuration()->nodes()->Get(node_index);
925 if (server_status_) {
926 ServerConnection *connection = server_status_->FindServerConnection(node);
927 if (connection) {
928 if (boot_uuid == UUID::Zero()) {
929 server_status_->Disconnect(node_index);
930 server_status_->ResetFilter(node_index);
931 } else {
932 switch (server_state_[node_index]) {
933 case message_bridge::State::DISCONNECTED:
934 server_status_->Disconnect(node_index);
935 break;
936 case message_bridge::State::CONNECTED:
937 server_status_->Connect(node_index, event_loop->monotonic_now());
938 break;
939 }
940 server_status_->ResetFilter(node_index);
941 server_status_->SetBootUUID(node_index, boot_uuid);
942 }
943 }
944 }
945 if (client_status) {
946 const int client_index =
947 client_status->FindClientIndex(node->name()->string_view());
948 client_status->SampleReset(client_index);
949 if (boot_uuid == UUID::Zero()) {
950 client_status->Disconnect(client_index);
951 } else {
952 switch (client_state_[node_index]) {
953 case message_bridge::State::CONNECTED:
954 client_status->Connect(client_index);
955 break;
956 case message_bridge::State::DISCONNECTED:
957 client_status->Disconnect(client_index);
958 break;
959 }
960 }
961 }
962}
963
964void SimulatedMessageBridge::State::SetServerState(
965 const Node *destination, message_bridge::State state) {
966 const size_t node_index =
967 configuration::GetNodeIndex(node_factory_->configuration(), destination);
968 server_state_[node_index] = state;
969 if (server_status_) {
970 ServerConnection *connection =
971 server_status_->FindServerConnection(destination);
972 if (connection == nullptr) return;
973
974 if (state == connection->state()) {
975 return;
976 }
977 switch (state) {
978 case message_bridge::State::DISCONNECTED:
979 server_status_->Disconnect(node_index);
980 break;
981 case message_bridge::State::CONNECTED:
982 server_status_->Connect(node_index, event_loop->monotonic_now());
983 for (RawMessageDelayer *delayer : source_delayers_) {
984 if (delayer->SendingTo(destination)) {
985 delayer->Connect();
986 }
987 }
988 break;
989 }
990 }
991}
992
993void SimulatedMessageBridge::State::SetClientState(
994 const Node *source, message_bridge::State state) {
995 const size_t node_index =
996 configuration::GetNodeIndex(node_factory_->configuration(), source);
997 client_state_[node_index] = state;
998 if (client_status) {
999 const int client_index =
1000 client_status->FindClientIndex(source->name()->string_view());
1001 ClientConnection *connection = client_status->GetClientConnection(source);
1002
1003 // TODO(austin): Are there cases where we want to dedup 2 CONNECTED
1004 // calls?
1005 if (connection->state() != state) {
1006 switch (state) {
1007 case message_bridge::State::CONNECTED:
1008 client_status->Connect(client_index);
1009 break;
1010 case message_bridge::State::DISCONNECTED:
1011 client_status->Disconnect(client_index);
1012 break;
1013 }
1014 }
1015 }
1016}
1017
Stephan Pleinesf63bde82024-01-13 15:59:33 -08001018} // namespace aos::message_bridge