blob: e070270975eb3bca7ca8379c74e91c59e6832e64 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
Austin Schuh0de30f32020-12-06 12:44:28 -08003#include "absl/strings/str_cat.h"
Austin Schuhac6d89e2024-03-27 14:56:09 -07004#include "glog/logging.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07005
Austin Schuh0de30f32020-12-06 12:44:28 -08006#include "aos/configuration.h"
Austin Schuh898f4972020-01-11 17:21:25 -08007#include "aos/events/event_loop.h"
8#include "aos/events/simulated_event_loop.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08009#include "aos/network/remote_message_generated.h"
Austin Schuh898f4972020-01-11 17:21:25 -080010
Stephan Pleinesf63bde82024-01-13 15:59:33 -080011namespace aos::message_bridge {
Austin Schuh898f4972020-01-11 17:21:25 -080012
13// This class delays messages forwarded between two factories.
14//
15// The basic design is that we need to use the distributed_clock to convert
16// monotonic times from the source to the destination node. We also use a
17// fetcher to manage the queue of data, and a timer to schedule the sends.
18class RawMessageDelayer {
19 public:
Austin Schuh58646e22021-08-23 23:51:46 -070020 RawMessageDelayer(const Channel *channel, const Connection *connection,
21 aos::NodeEventLoopFactory *fetch_node_factory,
Austin Schuh898f4972020-01-11 17:21:25 -080022 aos::NodeEventLoopFactory *send_node_factory,
Austin Schuh58646e22021-08-23 23:51:46 -070023 size_t destination_node_index, bool delivery_time_is_logged)
24 : channel_(channel),
25 connection_(connection),
26 fetch_node_factory_(fetch_node_factory),
Austin Schuh898f4972020-01-11 17:21:25 -080027 send_node_factory_(send_node_factory),
Austin Schuh20ac95d2020-12-05 17:24:19 -080028 destination_node_index_(destination_node_index),
Austin Schuh58646e22021-08-23 23:51:46 -070029 channel_index_(configuration::ChannelIndex(
30 fetch_node_factory_->configuration(), channel_)),
31 delivery_time_is_logged_(delivery_time_is_logged) {}
Austin Schuh898f4972020-01-11 17:21:25 -080032
Austin Schuh58646e22021-08-23 23:51:46 -070033 bool forwarding_disabled() const { return forwarding_disabled_; }
34 void set_forwarding_disabled(bool forwarding_disabled) {
35 forwarding_disabled_ = forwarding_disabled;
James Kuszmaul94ca5132022-07-19 09:11:08 -070036 if (!forwarding_disabled_) {
37 CHECK(timestamp_logger_ == nullptr);
38 CHECK(sender_ == nullptr);
39 }
Austin Schuh898f4972020-01-11 17:21:25 -080040 }
41
Austin Schuh58646e22021-08-23 23:51:46 -070042 void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
43 MessageBridgeServerStatus *server_status,
44 ChannelTimestampSender *timestamp_loggers) {
45 sent_ = false;
Austin Schuhac6d89e2024-03-27 14:56:09 -070046 reliable_scheduled_ = false;
47 published_ = false;
Austin Schuh58646e22021-08-23 23:51:46 -070048 fetch_event_loop_ = fetch_event_loop;
49 if (fetch_event_loop_) {
50 fetcher_ = fetch_event_loop_->MakeRawFetcher(channel_);
51 } else {
52 fetcher_ = nullptr;
53 }
54
55 server_status_ = server_status;
Austin Schuhac6d89e2024-03-27 14:56:09 -070056 if (server_status_) {
Austin Schuh58646e22021-08-23 23:51:46 -070057 server_connection_ =
58 server_status_->FindServerConnection(send_node_factory_->node());
James Kuszmaula6681e22023-05-26 11:20:40 -070059 server_index_ = configuration::GetNodeIndex(
60 send_node_factory_->configuration(), send_node_factory_->node());
Austin Schuh58646e22021-08-23 23:51:46 -070061 }
James Kuszmaul94ca5132022-07-19 09:11:08 -070062 if (delivery_time_is_logged_ && timestamp_loggers != nullptr &&
63 !forwarding_disabled_) {
Austin Schuh58646e22021-08-23 23:51:46 -070064 timestamp_logger_ =
65 timestamp_loggers->SenderForChannel(channel_, connection_);
66 } else {
67 timestamp_logger_ = nullptr;
68 }
69
70 if (fetch_event_loop_) {
71 timestamp_timer_ =
72 fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
73 if (send_event_loop_) {
milind1f1dca32021-07-03 13:50:07 -070074 std::string timer_name =
75 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
76 fetcher_->channel()->name()->string_view(), " ",
77 fetcher_->channel()->type()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -070078 if (timer_) {
79 timer_->set_name(timer_name);
80 }
81 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
82 }
83 } else {
84 timestamp_timer_ = nullptr;
85 }
86 }
87
88 void SetSendEventLoop(aos::EventLoop *send_event_loop,
89 MessageBridgeClientStatus *client_status) {
Austin Schuh58646e22021-08-23 23:51:46 -070090 send_event_loop_ = send_event_loop;
James Kuszmaul94ca5132022-07-19 09:11:08 -070091 if (send_event_loop_ && !forwarding_disabled_) {
Austin Schuh58646e22021-08-23 23:51:46 -070092 sender_ = send_event_loop_->MakeRawSender(channel_);
93 } else {
94 sender_ = nullptr;
95 }
96
97 client_status_ = client_status;
98 if (client_status_) {
99 client_index_ = client_status_->FindClientIndex(
100 channel_->source_node()->string_view());
101 client_connection_ = client_status_->GetClientConnection(client_index_);
102 } else {
103 client_index_ = -1;
104 client_connection_ = nullptr;
105 }
106
107 if (send_event_loop_) {
108 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
109 if (fetcher_) {
110 std::string timer_name =
111 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
112 fetcher_->channel()->name()->string_view(), " ",
113 fetcher_->channel()->type()->string_view());
114 timer_->set_name(timer_name);
115 if (timestamp_timer_) {
116 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
117 }
118 }
119 } else {
120 timer_ = nullptr;
121 }
122 }
123
124 const Channel *channel() const { return channel_; }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800125
Austin Schuh4c570ea2020-11-19 23:13:24 -0800126 uint32_t time_to_live() {
Austin Schuh58646e22021-08-23 23:51:46 -0700127 return configuration::ConnectionToNode(channel_, send_node_factory_->node())
Austin Schuh4c570ea2020-11-19 23:13:24 -0800128 ->time_to_live();
129 }
130
Austin Schuhac6d89e2024-03-27 14:56:09 -0700131 std::string Name() {
132 std::string result;
133 result +=
134 (fetch_event_loop_ ? fetch_event_loop_->node()->name()->string_view()
135 : std::string_view("?"));
136 result += " -> ";
137 result +=
138 (send_event_loop_ ? send_event_loop_->node()->name()->string_view()
139 : std::string_view("?"));
140 result += " ";
141 result += aos::configuration::StrippedChannelToString(channel());
142 return result;
143 }
144
Austin Schuh58646e22021-08-23 23:51:46 -0700145 void ScheduleReliable() {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700146 if (forwarding_disabled()) {
147 return;
148 }
Austin Schuh58646e22021-08-23 23:51:46 -0700149
150 if (!fetcher_) {
151 return;
152 }
153 if (fetcher_->context().data == nullptr || sent_) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700154 fetcher_->Fetch();
155 sent_ = fetcher_->context().data == nullptr;
156 published_ = sent_;
157 reliable_scheduled_ = true;
158 }
159
160 if (!timer_) {
161 return;
162 }
163
164 if (server_connection_->state() != State::CONNECTED) {
165 reliable_scheduled_ = false;
166 sent_ = true;
167 return;
Austin Schuh58646e22021-08-23 23:51:46 -0700168 }
169
170 FetchNext();
171 if (fetcher_->context().data == nullptr || sent_) {
172 return;
173 }
Austin Schuh58646e22021-08-23 23:51:46 -0700174
175 // Send at startup. It is the best we can do.
176 const monotonic_clock::time_point monotonic_delivered_time =
177 send_node_factory_->monotonic_now() +
178 send_node_factory_->network_delay();
179
180 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
181 << ": Trying to deliver message in the past on channel "
182 << configuration::StrippedChannelToString(fetcher_->channel())
183 << " to node " << send_event_loop_->node()->name()->string_view()
184 << " sent from " << fetcher_->channel()->source_node()->string_view()
185 << " at " << fetch_node_factory_->monotonic_now();
186
Austin Schuh6adbc1e2024-04-01 16:17:27 -0700187 if (!timer_scheduled_) {
188 server_status_->AddSentPacket(server_index_, channel_);
189 timer_->Schedule(monotonic_delivered_time);
190 timer_scheduled_ = true;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700191
Austin Schuh6adbc1e2024-04-01 16:17:27 -0700192 QueueTransmitTimestamp(fetcher_->context().queue_index,
193 fetcher_->context().monotonic_event_time,
194 fetch_event_loop_->monotonic_now());
Austin Schuh58646e22021-08-23 23:51:46 -0700195 }
196 }
197
198 bool timer_scheduled_ = false;
199
Austin Schuhac6d89e2024-03-27 14:56:09 -0700200 void MessageWatcherCallback(uint32_t sent_queue_index,
201 monotonic_clock::time_point monotonic_sent_time,
202 monotonic_clock::time_point transmit_time) {
203 if (!reliable_scheduled_) {
204 QueueTransmitTimestamp(sent_queue_index, monotonic_sent_time,
205 transmit_time);
206 } else {
207 reliable_scheduled_ = false;
208 }
209 Schedule();
210 }
211
212 void QueueTransmitTimestamp(uint32_t sent_queue_index,
213 monotonic_clock::time_point monotonic_sent_time,
214 monotonic_clock::time_point transmit_time) {
215 if (forwarding_disabled()) return;
216
217 if (monotonic_remote_transmit_times_.size() > 0u) {
218 // FetchNext can discover messages before we do in the same nanosecond. In
219 // that case, make sure the contents match and don't add it a second time.
220 auto back = monotonic_remote_transmit_times_
221 [monotonic_remote_transmit_times_.size() - 1];
222 if (back.sent_queue_index == sent_queue_index) {
223 CHECK_EQ(back.monotonic_sent_time, monotonic_sent_time) << this;
224 CHECK_EQ(back.transmit_time, transmit_time) << this;
225 return;
226 }
227 }
228
229 // Capture the time this message was published over the network on the
230 // remote node
231 monotonic_remote_transmit_times_.push_back(TransmitTime{
232 .monotonic_sent_time = monotonic_sent_time,
233 .sent_queue_index = sent_queue_index,
234 .transmit_time = transmit_time,
235 });
236 }
237
238 void Connect() {
239 if (time_to_live() == 0 && published_ == false) {
240 if (forwarding_disabled()) {
241 return;
242 }
243 CHECK(fetcher_);
244
245 fetcher_->Fetch();
246 sent_ = fetcher_->context().data == nullptr;
247 reliable_scheduled_ = true;
248
249 QueueTransmitTimestamp(fetcher_->context().queue_index,
250 fetcher_->context().monotonic_event_time,
251 fetch_event_loop_->monotonic_now());
252 Schedule();
253 }
254 }
255
256 bool SendingTo(const Node *destination) {
257 return send_event_loop_ && send_event_loop_->node() == destination;
258 }
259
Austin Schuh898f4972020-01-11 17:21:25 -0800260 // Kicks us to re-fetch and schedule the timer.
261 void Schedule() {
Austin Schuh58646e22021-08-23 23:51:46 -0700262 CHECK(!forwarding_disabled());
263 if (!fetcher_) {
264 return;
265 }
266 if (timer_scheduled_) {
267 return;
268 }
269 FetchNext();
270 if (fetcher_->context().data == nullptr || sent_) {
271 return;
272 }
273
Austin Schuhac6d89e2024-03-27 14:56:09 -0700274 CHECK_GT(monotonic_remote_transmit_times_.size(), 0u) << this;
275 const monotonic_clock::time_point transmit_time =
276 monotonic_remote_transmit_times_[0].transmit_time;
277
Austin Schuh58646e22021-08-23 23:51:46 -0700278 // Compute the time to publish this message.
279 const monotonic_clock::time_point monotonic_delivered_time =
Austin Schuhac6d89e2024-03-27 14:56:09 -0700280 DeliveredTime(transmit_time);
Austin Schuh58646e22021-08-23 23:51:46 -0700281
282 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
283 << ": Trying to deliver message in the past on channel "
284 << configuration::StrippedChannelToString(fetcher_->channel())
285 << " to node " << send_event_loop_->node()->name()->string_view()
286 << " sent from " << fetcher_->channel()->source_node()->string_view()
287 << " at " << fetch_node_factory_->monotonic_now();
288
Austin Schuh6adbc1e2024-04-01 16:17:27 -0700289 CHECK(timer_);
290 server_status_->AddSentPacket(server_index_, channel_);
291 timer_->Schedule(monotonic_delivered_time);
292 timer_scheduled_ = true;
Austin Schuh58646e22021-08-23 23:51:46 -0700293 }
294
295 private:
296 void FetchNext() {
297 CHECK(server_connection_);
Austin Schuh6aa77be2020-02-22 21:06:40 -0800298 // Keep pulling messages out of the fetcher until we find one in the future.
299 while (true) {
300 if (fetcher_->context().data == nullptr || sent_) {
301 sent_ = !fetcher_->FetchNext();
Austin Schuhac6d89e2024-03-27 14:56:09 -0700302 if (!sent_) {
303 published_ = false;
304 }
305 if (!sent_) {
306 if (monotonic_remote_transmit_times_.size() == 0u) {
307 QueueTransmitTimestamp(fetcher_->context().queue_index,
308 fetcher_->context().monotonic_event_time,
309 fetch_event_loop_->monotonic_now());
310 }
311 }
Austin Schuh6aa77be2020-02-22 21:06:40 -0800312 }
313 if (sent_) {
314 break;
315 }
Austin Schuhc0b0f722020-12-12 18:36:06 -0800316
317 if (server_connection_->state() != State::CONNECTED) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700318 CHECK_GT(monotonic_remote_transmit_times_.size(), 0u) << this;
319 CHECK_EQ(monotonic_remote_transmit_times_[0].monotonic_sent_time,
320 fetcher_->context().monotonic_event_time)
321 << this << " " << Name();
322 CHECK_EQ(monotonic_remote_transmit_times_[0].sent_queue_index,
323 fetcher_->context().queue_index)
324 << this << " " << Name();
325
326 monotonic_remote_transmit_times_.erase(
327 monotonic_remote_transmit_times_.begin());
Austin Schuhc0b0f722020-12-12 18:36:06 -0800328 sent_ = true;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700329 reliable_scheduled_ = false;
330 published_ = false;
James Kuszmaula6681e22023-05-26 11:20:40 -0700331 server_status_->AddDroppedPacket(server_index_, channel_);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800332 continue;
333 }
334
Austin Schuh6aa77be2020-02-22 21:06:40 -0800335 if (fetcher_->context().monotonic_event_time +
Austin Schuh58646e22021-08-23 23:51:46 -0700336 send_node_factory_->network_delay() +
Austin Schuhac6d89e2024-03-27 14:56:09 -0700337 send_node_factory_->send_delay() >=
Austin Schuh58646e22021-08-23 23:51:46 -0700338 fetch_node_factory_->monotonic_now() ||
339 time_to_live() == 0) {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800340 break;
341 }
342
343 // TODO(austin): Not cool. We want to actually forward these. This means
344 // we need a more sophisticated concept of what is running.
345 LOG(WARNING) << "Not forwarding message on "
346 << configuration::CleanedChannelToString(fetcher_->channel())
Philipp Schrader00f117c2023-09-21 14:00:37 -0700347 << " because we aren't running. Sent at "
Austin Schuh6aa77be2020-02-22 21:06:40 -0800348 << fetcher_->context().monotonic_event_time << " now is "
349 << fetch_node_factory_->monotonic_now();
350 sent_ = true;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700351 reliable_scheduled_ = false;
352 published_ = false;
James Kuszmaula6681e22023-05-26 11:20:40 -0700353 server_status_->AddDroppedPacket(server_index_, channel_);
Austin Schuh898f4972020-01-11 17:21:25 -0800354 }
Austin Schuh898f4972020-01-11 17:21:25 -0800355 }
356
Austin Schuh58646e22021-08-23 23:51:46 -0700357 // Actually sends the message, and reschedules.
Austin Schuh898f4972020-01-11 17:21:25 -0800358 void Send() {
Austin Schuh58646e22021-08-23 23:51:46 -0700359 timer_scheduled_ = false;
360 CHECK(sender_);
361 CHECK(client_status_);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700362
363 // Confirm that the first element in the times list is ours, and pull the
364 // transmit time out of it.
365 CHECK(!monotonic_remote_transmit_times_.empty());
366 CHECK_EQ(monotonic_remote_transmit_times_[0].monotonic_sent_time,
367 fetcher_->context().monotonic_event_time);
368 CHECK_EQ(monotonic_remote_transmit_times_[0].sent_queue_index,
369 fetcher_->context().queue_index);
370
371 const monotonic_clock::time_point monotonic_remote_transmit_time =
372 monotonic_remote_transmit_times_[0].transmit_time;
373
374 monotonic_remote_transmit_times_.erase(
375 monotonic_remote_transmit_times_.begin());
376
Austin Schuhc0b0f722020-12-12 18:36:06 -0800377 if (server_connection_->state() != State::CONNECTED) {
378 sent_ = true;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700379 reliable_scheduled_ = false;
380 published_ = false;
Austin Schuhc0b0f722020-12-12 18:36:06 -0800381 Schedule();
382 return;
383 }
Austin Schuhac6d89e2024-03-27 14:56:09 -0700384
Austin Schuh4c3b9702020-08-30 11:34:55 -0700385 // Fill out the send times.
milind1f1dca32021-07-03 13:50:07 -0700386 sender_->CheckOk(sender_->Send(
387 fetcher_->context().data, fetcher_->context().size,
388 fetcher_->context().monotonic_event_time,
Austin Schuhac6d89e2024-03-27 14:56:09 -0700389 fetcher_->context().realtime_event_time, monotonic_remote_transmit_time,
milind1f1dca32021-07-03 13:50:07 -0700390 fetcher_->context().queue_index, fetcher_->context().source_boot_uuid));
Austin Schuh898f4972020-01-11 17:21:25 -0800391
Austin Schuh4c3b9702020-08-30 11:34:55 -0700392 // And simulate message_bridge's offset recovery.
Austin Schuhac6d89e2024-03-27 14:56:09 -0700393 client_status_->SampleFilter(client_index_, monotonic_remote_transmit_time,
394 sender_->monotonic_sent_time(),
395 fetcher_->context().source_boot_uuid);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700396
397 client_connection_->mutate_received_packets(
398 client_connection_->received_packets() + 1);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700399
400 if (timestamp_logger_) {
Austin Schuheeaa2022021-01-02 21:52:03 -0800401 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800402 fbb.ForceDefaults(true);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800403 // Reset the filter every time the UUID changes. There's probably a more
404 // clever way to do this, but that means a better concept of rebooting.
James Kuszmaulbedbb342023-05-26 11:19:27 -0700405 if (!server_status_->BootUUID(destination_node_index_).has_value() ||
406 (server_status_->BootUUID(destination_node_index_).value() !=
407 send_node_factory_->boot_uuid())) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800408 server_status_->ResetFilter(destination_node_index_);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800409 server_status_->SetBootUUID(destination_node_index_,
410 send_node_factory_->boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800411 }
412
Austin Schuhcdd90272021-03-15 12:46:16 -0700413 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
414 send_node_factory_->boot_uuid().PackVector(&fbb);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800415
Austin Schuheeaa2022021-01-02 21:52:03 -0800416 RemoteMessage::Builder message_header_builder(fbb);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700417
418 message_header_builder.add_channel_index(channel_index_);
419
420 // Swap the remote and sent metrics. They are from the sender's
421 // perspective, not the receiver's perspective.
422 message_header_builder.add_monotonic_remote_time(
423 fetcher_->context().monotonic_event_time.time_since_epoch().count());
424 message_header_builder.add_realtime_remote_time(
425 fetcher_->context().realtime_event_time.time_since_epoch().count());
426 message_header_builder.add_remote_queue_index(
427 fetcher_->context().queue_index);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700428 message_header_builder.add_monotonic_remote_transmit_time(
429 monotonic_remote_transmit_time.time_since_epoch().count());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700430 message_header_builder.add_monotonic_sent_time(
431 sender_->monotonic_sent_time().time_since_epoch().count());
432 message_header_builder.add_realtime_sent_time(
433 sender_->realtime_sent_time().time_since_epoch().count());
434 message_header_builder.add_queue_index(sender_->sent_queue_index());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800435 message_header_builder.add_boot_uuid(boot_uuid_offset);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700436
Austin Schuheeaa2022021-01-02 21:52:03 -0800437 fbb.Finish(message_header_builder.Finish());
438
439 remote_timestamps_.emplace_back(
440 FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
441 fetch_node_factory_->monotonic_now() +
442 send_node_factory_->network_delay());
443 ScheduleTimestamp();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700444 }
445
Austin Schuh898f4972020-01-11 17:21:25 -0800446 sent_ = true;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700447 reliable_scheduled_ = false;
448 published_ = true;
Austin Schuh898f4972020-01-11 17:21:25 -0800449 Schedule();
450 }
451
Austin Schuheeaa2022021-01-02 21:52:03 -0800452 // Schedules sending the next timestamp in remote_timestamps_ if there is one.
453 void ScheduleTimestamp() {
454 if (remote_timestamps_.empty()) {
455 timestamp_timer_->Disable();
456 return;
457 }
458
459 if (scheduled_time_ !=
460 remote_timestamps_.front().monotonic_timestamp_time) {
Philipp Schradera6712522023-07-05 20:25:11 -0700461 timestamp_timer_->Schedule(
Austin Schuheeaa2022021-01-02 21:52:03 -0800462 remote_timestamps_.front().monotonic_timestamp_time);
463 scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
464 return;
465 } else {
466 scheduled_time_ = monotonic_clock::min_time;
467 }
468 }
469
470 // Sends the next timestamp in remote_timestamps_.
471 void SendTimestamp() {
472 CHECK(!remote_timestamps_.empty());
473
474 // Send out all timestamps at the currently scheduled time.
475 while (remote_timestamps_.front().monotonic_timestamp_time ==
476 scheduled_time_) {
477 if (server_connection_->state() == State::CONNECTED) {
milind1f1dca32021-07-03 13:50:07 -0700478 timestamp_logger_->CheckOk(timestamp_logger_->Send(
479 std::move(remote_timestamps_.front().remote_message)));
Austin Schuheeaa2022021-01-02 21:52:03 -0800480 }
481 remote_timestamps_.pop_front();
482 if (remote_timestamps_.empty()) {
483 break;
484 }
485 }
486
487 ScheduleTimestamp();
488 }
489
Austin Schuh898f4972020-01-11 17:21:25 -0800490 // Converts from time on the sending node to time on the receiving node.
Austin Schuhac6d89e2024-03-27 14:56:09 -0700491 monotonic_clock::time_point DeliveredTime(
492 const monotonic_clock::time_point transmit_time) const {
Austin Schuh898f4972020-01-11 17:21:25 -0800493 const distributed_clock::time_point distributed_sent_time =
Austin Schuhac6d89e2024-03-27 14:56:09 -0700494 fetch_node_factory_->ToDistributedClock(transmit_time);
Austin Schuh898f4972020-01-11 17:21:25 -0800495
Austin Schuh58646e22021-08-23 23:51:46 -0700496 const logger::BootTimestamp t = send_node_factory_->FromDistributedClock(
Austin Schuhac6d89e2024-03-27 14:56:09 -0700497 distributed_sent_time + send_node_factory_->network_delay());
Austin Schuh58646e22021-08-23 23:51:46 -0700498 CHECK_EQ(t.boot, send_node_factory_->boot_count());
499 return t.time;
Austin Schuh898f4972020-01-11 17:21:25 -0800500 }
501
Austin Schuh58646e22021-08-23 23:51:46 -0700502 const Channel *channel_;
503 const Connection *connection_;
504
Austin Schuh898f4972020-01-11 17:21:25 -0800505 // Factories used for time conversion.
506 aos::NodeEventLoopFactory *fetch_node_factory_;
507 aos::NodeEventLoopFactory *send_node_factory_;
508
Austin Schuheeaa2022021-01-02 21:52:03 -0800509 // Event loop which fetching and sending timestamps are scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700510 aos::EventLoop *fetch_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800511 // Event loop which sending is scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700512 aos::EventLoop *send_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800513 // Timer used to send.
Austin Schuh58646e22021-08-23 23:51:46 -0700514 aos::TimerHandler *timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800515 // Timer used to send timestamps out.
Austin Schuh58646e22021-08-23 23:51:46 -0700516 aos::TimerHandler *timestamp_timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800517 // Time that the timer is scheduled for. Used to track if it needs to be
518 // rescheduled.
519 monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
520
Austin Schuh898f4972020-01-11 17:21:25 -0800521 // Fetcher used to receive messages.
522 std::unique_ptr<aos::RawFetcher> fetcher_;
523 // Sender to send them back out.
524 std::unique_ptr<aos::RawSender> sender_;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800525
Austin Schuh58646e22021-08-23 23:51:46 -0700526 MessageBridgeServerStatus *server_status_ = nullptr;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800527 const size_t destination_node_index_;
Austin Schuh898f4972020-01-11 17:21:25 -0800528 // True if we have sent the message in the fetcher.
529 bool sent_ = false;
Austin Schuhac6d89e2024-03-27 14:56:09 -0700530 bool published_ = false;
531 bool reliable_scheduled_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700532
533 ServerConnection *server_connection_ = nullptr;
James Kuszmaula6681e22023-05-26 11:20:40 -0700534 int server_index_ = -1;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700535 MessageBridgeClientStatus *client_status_ = nullptr;
Austin Schuh58646e22021-08-23 23:51:46 -0700536 int client_index_ = -1;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700537 ClientConnection *client_connection_ = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700538
539 size_t channel_index_;
Austin Schuh0de30f32020-12-06 12:44:28 -0800540 aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800541
Austin Schuhac6d89e2024-03-27 14:56:09 -0700542 struct TransmitTime {
543 monotonic_clock::time_point monotonic_sent_time;
544 uint32_t sent_queue_index;
545 monotonic_clock::time_point transmit_time;
546 };
547
548 // Stores tthe time the message was handed to the kernel to be published on
549 // the remote node over the network for all forwarded relevant messages.
550 std::vector<TransmitTime> monotonic_remote_transmit_times_;
551
Austin Schuheeaa2022021-01-02 21:52:03 -0800552 struct Timestamp {
553 Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
554 monotonic_clock::time_point new_monotonic_timestamp_time)
555 : remote_message(std::move(new_remote_message)),
556 monotonic_timestamp_time(new_monotonic_timestamp_time) {}
557 FlatbufferDetachedBuffer<RemoteMessage> remote_message;
558 monotonic_clock::time_point monotonic_timestamp_time;
559 };
560
561 std::deque<Timestamp> remote_timestamps_;
Austin Schuh58646e22021-08-23 23:51:46 -0700562
563 bool delivery_time_is_logged_;
564
565 bool forwarding_disabled_ = false;
Austin Schuh898f4972020-01-11 17:21:25 -0800566};
567
568SimulatedMessageBridge::SimulatedMessageBridge(
569 SimulatedEventLoopFactory *simulated_event_loop_factory) {
570 CHECK(
571 configuration::MultiNode(simulated_event_loop_factory->configuration()));
572
573 // Pre-build up event loops for every node. They are pretty cheap anyways.
574 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700575 NodeEventLoopFactory *node_factory =
576 simulated_event_loop_factory->GetNodeEventLoopFactory(node);
577 auto it = event_loop_map_.emplace(node, node_factory);
Austin Schuhcde938c2020-02-02 17:30:07 -0800578 CHECK(it.second);
579
Austin Schuh58646e22021-08-23 23:51:46 -0700580 node_factory->OnStartup(
581 [this, simulated_event_loop_factory, node_state = &it.first->second]() {
582 node_state->MakeEventLoop();
583 const size_t my_node_index = configuration::GetNodeIndex(
584 simulated_event_loop_factory->configuration(),
585 node_state->event_loop->node());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700586
Austin Schuh58646e22021-08-23 23:51:46 -0700587 size_t node_index = 0;
James Kuszmaulbedbb342023-05-26 11:19:27 -0700588 for (const std::optional<MessageBridgeServerStatus::NodeState>
Austin Schuhac6d89e2024-03-27 14:56:09 -0700589 &connection : node_state->server_status_->nodes()) {
James Kuszmaulbedbb342023-05-26 11:19:27 -0700590 if (connection.has_value()) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700591 node_state->server_status_->ResetFilter(node_index);
Austin Schuh58646e22021-08-23 23:51:46 -0700592 }
593 ++node_index;
594 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700595
Austin Schuh58646e22021-08-23 23:51:46 -0700596 for (const ClientConnection *client_connections :
597 *node_state->client_status->mutable_client_statistics()
598 ->connections()) {
599 const Node *client_node = configuration::GetNode(
600 simulated_event_loop_factory->configuration(),
601 client_connections->node()->name()->string_view());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700602
Austin Schuh58646e22021-08-23 23:51:46 -0700603 auto client_event_loop = event_loop_map_.find(client_node);
604 client_event_loop->second.SetBootUUID(
605 my_node_index, node_state->event_loop->boot_uuid());
606 }
607 });
Austin Schuh4c3b9702020-08-30 11:34:55 -0700608
Austin Schuh58646e22021-08-23 23:51:46 -0700609 node_factory->OnShutdown([node_state = &it.first->second]() {
610 node_state->SetEventLoop(nullptr);
611 });
Austin Schuh20ac95d2020-12-05 17:24:19 -0800612 }
613
Austin Schuh898f4972020-01-11 17:21:25 -0800614 for (const Channel *channel :
615 *simulated_event_loop_factory->configuration()->channels()) {
616 if (!channel->has_destination_nodes()) {
617 continue;
618 }
619
620 // Find the sending node.
Austin Schuh58646e22021-08-23 23:51:46 -0700621 const Node *source_node =
Austin Schuh898f4972020-01-11 17:21:25 -0800622 configuration::GetNode(simulated_event_loop_factory->configuration(),
623 channel->source_node()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -0700624 auto source_event_loop = event_loop_map_.find(source_node);
Austin Schuh898f4972020-01-11 17:21:25 -0800625 CHECK(source_event_loop != event_loop_map_.end());
626
627 std::unique_ptr<DelayersVector> delayers =
628 std::make_unique<DelayersVector>();
629
630 // And then build up a RawMessageDelayer for each destination.
631 for (const Connection *connection : *channel->destination_nodes()) {
632 const Node *destination_node =
633 configuration::GetNode(simulated_event_loop_factory->configuration(),
634 connection->name()->string_view());
635 auto destination_event_loop = event_loop_map_.find(destination_node);
636 CHECK(destination_event_loop != event_loop_map_.end());
637
Austin Schuh2f8fd752020-09-01 22:38:28 -0700638 const size_t destination_node_index = configuration::GetNodeIndex(
639 simulated_event_loop_factory->configuration(), destination_node);
640
641 const bool delivery_time_is_logged =
Austin Schuh58646e22021-08-23 23:51:46 -0700642 configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
643 source_node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700644
Austin Schuh58646e22021-08-23 23:51:46 -0700645 delayers->v.emplace_back(std::make_unique<RawMessageDelayer>(
646 channel, connection,
647 simulated_event_loop_factory->GetNodeEventLoopFactory(source_node),
Austin Schuh898f4972020-01-11 17:21:25 -0800648 simulated_event_loop_factory->GetNodeEventLoopFactory(
649 destination_node),
Austin Schuh58646e22021-08-23 23:51:46 -0700650 destination_node_index, delivery_time_is_logged));
651
652 source_event_loop->second.AddSourceDelayer(delayers->v.back().get());
653 destination_event_loop->second.AddDestinationDelayer(
654 delayers->v.back().get());
Austin Schuh898f4972020-01-11 17:21:25 -0800655 }
656
Austin Schuh4c3b9702020-08-30 11:34:55 -0700657 const Channel *const timestamp_channel = configuration::GetChannel(
658 simulated_event_loop_factory->configuration(), "/aos",
Austin Schuh58646e22021-08-23 23:51:46 -0700659 Timestamp::GetFullyQualifiedName(), "message_bridge", source_node);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700660
661 if (channel == timestamp_channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700662 source_event_loop->second.SetSendData(
Austin Schuhac6d89e2024-03-27 14:56:09 -0700663 [source_event_loop, captured_delayers = delayers.get()](
664 uint32_t sent_queue_index,
665 monotonic_clock::time_point monotonic_sent_time) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700666 for (std::unique_ptr<RawMessageDelayer> &delayer :
Austin Schuh58646e22021-08-23 23:51:46 -0700667 captured_delayers->v) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700668 delayer->QueueTransmitTimestamp(
669 sent_queue_index, monotonic_sent_time,
670 source_event_loop->second.event_loop->monotonic_now());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700671 delayer->Schedule();
672 }
673 });
674 } else {
Austin Schuh58646e22021-08-23 23:51:46 -0700675 source_event_loop->second.AddDelayerWatcher(channel, delayers.get());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700676 }
Austin Schuh898f4972020-01-11 17:21:25 -0800677 delayers_list_.emplace_back(std::move(delayers));
678 }
679}
680
681SimulatedMessageBridge::~SimulatedMessageBridge() {}
682
Austin Schuh6f3babe2020-01-26 20:34:50 -0800683void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700684 for (std::unique_ptr<DelayersVector> &delayers : delayers_list_) {
685 if (delayers->v.size() > 0) {
686 if (delayers->v[0]->channel() == channel) {
687 delayers->disable_forwarding = true;
688 for (std::unique_ptr<RawMessageDelayer> &delayer : delayers->v) {
689 delayer->set_forwarding_disabled(true);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800690 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800691 }
692 }
693 }
694}
695
Austin Schuhc0b0f722020-12-12 18:36:06 -0800696void SimulatedMessageBridge::Disconnect(const Node *source,
697 const Node *destination) {
698 SetState(source, destination, message_bridge::State::DISCONNECTED);
699}
700
701void SimulatedMessageBridge::Connect(const Node *source,
702 const Node *destination) {
703 SetState(source, destination, message_bridge::State::CONNECTED);
704}
705void SimulatedMessageBridge::SetState(const Node *source,
706 const Node *destination,
707 message_bridge::State state) {
708 auto source_state = event_loop_map_.find(source);
709 CHECK(source_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700710 source_state->second.SetServerState(destination, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800711
712 auto destination_state = event_loop_map_.find(destination);
713 CHECK(destination_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700714 destination_state->second.SetClientState(source, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800715}
716
James Kuszmaul94ca5132022-07-19 09:11:08 -0700717void SimulatedMessageBridge::DisableStatistics(DestroySenders destroy_senders) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700718 for (std::pair<const Node *const, State> &state : event_loop_map_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700719 state.second.DisableStatistics(destroy_senders);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700720 }
721}
722
James Kuszmaul94ca5132022-07-19 09:11:08 -0700723void SimulatedMessageBridge::DisableStatistics(const Node *node,
724 DestroySenders destroy_senders) {
Austin Schuh48205e62021-11-12 14:13:18 -0800725 auto it = event_loop_map_.find(node);
726 CHECK(it != event_loop_map_.end());
James Kuszmaul94ca5132022-07-19 09:11:08 -0700727 it->second.DisableStatistics(destroy_senders);
Austin Schuh48205e62021-11-12 14:13:18 -0800728}
729
730void SimulatedMessageBridge::EnableStatistics() {
731 for (std::pair<const Node *const, State> &state : event_loop_map_) {
732 state.second.EnableStatistics();
733 }
734}
735
736void SimulatedMessageBridge::EnableStatistics(const Node *node) {
737 auto it = event_loop_map_.find(node);
738 CHECK(it != event_loop_map_.end());
739 it->second.EnableStatistics();
740}
741
Austin Schuhac6d89e2024-03-27 14:56:09 -0700742void SimulatedMessageBridge::State::MakeEventLoop() {
743 // Message bridge isn't the thing that should be catching sent-too-fast,
744 // and may need to be able to forward too-fast messages replayed from old
745 // logfiles.
746 SetEventLoop(node_factory_->MakeEventLoop(
747 "message_bridge", {NodeEventLoopFactory::CheckSentTooFast::kNo,
748 NodeEventLoopFactory::ExclusiveSenders::kNo,
749 {}}));
750}
751
Austin Schuh58646e22021-08-23 23:51:46 -0700752void SimulatedMessageBridge::State::SetEventLoop(
753 std::unique_ptr<aos::EventLoop> loop) {
754 if (!loop) {
755 timestamp_loggers = ChannelTimestampSender(nullptr);
Austin Schuhac6d89e2024-03-27 14:56:09 -0700756 server_status_.reset();
Austin Schuh58646e22021-08-23 23:51:46 -0700757 client_status.reset();
758 for (RawMessageDelayer *source_delayer : source_delayers_) {
759 source_delayer->SetFetchEventLoop(nullptr, nullptr, nullptr);
760 }
761 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
762 destination_delayer->SetSendEventLoop(nullptr, nullptr);
763 }
764 event_loop = std::move(loop);
765 return;
766 } else {
767 CHECK(!event_loop);
768 }
769 event_loop = std::move(loop);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700770
Austin Schuh58646e22021-08-23 23:51:46 -0700771 event_loop->SkipTimingReport();
772 event_loop->SkipAosLog();
773
774 for (std::pair<const Channel *, DelayersVector *> &watcher :
775 delayer_watchers_) {
776 // Don't register watchers if we know we aren't forwarding.
777 if (watcher.second->disable_forwarding) continue;
778 event_loop->MakeRawNoArgWatcher(
Austin Schuhac6d89e2024-03-27 14:56:09 -0700779 watcher.first,
780 [this, captured_delayers = watcher.second](const Context &context) {
Austin Schuh58646e22021-08-23 23:51:46 -0700781 // We might get told after registering, so don't forward at that point
782 // too.
783 for (std::unique_ptr<RawMessageDelayer> &delayer :
784 captured_delayers->v) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700785 delayer->MessageWatcherCallback(context.queue_index,
786 context.monotonic_event_time,
787 event_loop->monotonic_now());
Austin Schuh58646e22021-08-23 23:51:46 -0700788 }
789 });
790 }
791
792 timestamp_loggers = ChannelTimestampSender(event_loop.get());
Austin Schuhac6d89e2024-03-27 14:56:09 -0700793 server_status_ =
794 std::make_unique<MessageBridgeServerStatus>(event_loop.get());
Austin Schuh48205e62021-11-12 14:13:18 -0800795 if (disable_statistics_) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700796 server_status_->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
Austin Schuh48205e62021-11-12 14:13:18 -0800797 }
Austin Schuh58646e22021-08-23 23:51:46 -0700798
799 {
800 size_t node_index = 0;
James Kuszmaulbedbb342023-05-26 11:19:27 -0700801 for (const std::optional<MessageBridgeServerStatus::NodeState> &connection :
Austin Schuhac6d89e2024-03-27 14:56:09 -0700802 server_status_->nodes()) {
James Kuszmaulbedbb342023-05-26 11:19:27 -0700803 if (connection.has_value()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700804 if (boot_uuids_[node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800805 switch (server_state_[node_index]) {
806 case message_bridge::State::DISCONNECTED:
Austin Schuhac6d89e2024-03-27 14:56:09 -0700807 server_status_->Disconnect(node_index);
Austin Schuh367a7f42021-11-23 23:04:36 -0800808 break;
809 case message_bridge::State::CONNECTED:
Austin Schuhac6d89e2024-03-27 14:56:09 -0700810 server_status_->Connect(node_index, event_loop->monotonic_now());
Austin Schuh367a7f42021-11-23 23:04:36 -0800811 break;
812 }
Austin Schuh58646e22021-08-23 23:51:46 -0700813 } else {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700814 server_status_->Disconnect(node_index);
Austin Schuh58646e22021-08-23 23:51:46 -0700815 }
816 }
817 ++node_index;
818 }
819 }
820
821 for (size_t i = 0; i < boot_uuids_.size(); ++i) {
822 if (boot_uuids_[i] != UUID::Zero()) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700823 server_status_->SetBootUUID(i, boot_uuids_[i]);
Austin Schuh58646e22021-08-23 23:51:46 -0700824 }
825 }
Austin Schuh58646e22021-08-23 23:51:46 -0700826 if (fn_) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700827 server_status_->set_send_data(fn_);
Austin Schuh58646e22021-08-23 23:51:46 -0700828 }
829 client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
830 if (disable_statistics_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700831 client_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
Austin Schuh58646e22021-08-23 23:51:46 -0700832 }
833
834 for (size_t i = 0;
835 i < client_status->mutable_client_statistics()->connections()->size();
836 ++i) {
837 ClientConnection *client_connection =
838 client_status->mutable_client_statistics()
839 ->mutable_connections()
840 ->GetMutableObject(i);
841 const Node *client_node = configuration::GetNode(
842 node_factory_->configuration(),
843 client_connection->node()->name()->string_view());
844 const size_t client_node_index = configuration::GetNodeIndex(
845 node_factory_->configuration(), client_node);
846 if (boot_uuids_[client_node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800847 if (client_connection->state() != client_state_[client_node_index]) {
848 switch (client_state_[client_node_index]) {
849 case message_bridge::State::DISCONNECTED:
850 client_status->Disconnect(i);
851 break;
852 case message_bridge::State::CONNECTED:
853 client_status->Connect(i);
854 break;
855 }
856 }
Austin Schuh58646e22021-08-23 23:51:46 -0700857 } else {
Austin Schuh367a7f42021-11-23 23:04:36 -0800858 client_status->Disconnect(i);
Austin Schuh58646e22021-08-23 23:51:46 -0700859 }
860 }
861
Austin Schuh2f8fd752020-09-01 22:38:28 -0700862 for (const Channel *channel : *event_loop->configuration()->channels()) {
863 CHECK(channel->has_source_node());
864
865 // Sent by us.
866 if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
867 channel->has_destination_nodes()) {
868 for (const Connection *connection : *channel->destination_nodes()) {
869 const bool delivery_time_is_logged =
870 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
871 connection, event_loop->node());
872
James Kuszmaul94ca5132022-07-19 09:11:08 -0700873 const RawMessageDelayer *delayer = nullptr;
874 for (const RawMessageDelayer *candidate : source_delayers_) {
875 if (candidate->channel() == channel) {
876 delayer = candidate;
877 }
878 }
879
Austin Schuh2f8fd752020-09-01 22:38:28 -0700880 // And the timestamps are then logged back by us again.
James Kuszmaul94ca5132022-07-19 09:11:08 -0700881 if (!delivery_time_is_logged ||
882 CHECK_NOTNULL(delayer)->forwarding_disabled()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700883 continue;
884 }
885
Austin Schuh89c9b812021-02-20 14:42:10 -0800886 timestamp_loggers.SenderForChannel(channel, connection);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700887 }
888 }
889 }
Austin Schuh58646e22021-08-23 23:51:46 -0700890
891 for (RawMessageDelayer *source_delayer : source_delayers_) {
Austin Schuhac6d89e2024-03-27 14:56:09 -0700892 source_delayer->SetFetchEventLoop(event_loop.get(), server_status_.get(),
Austin Schuh58646e22021-08-23 23:51:46 -0700893 &timestamp_loggers);
894 }
895 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
896 destination_delayer->SetSendEventLoop(event_loop.get(),
897 client_status.get());
898 }
899 event_loop->OnRun([this]() {
900 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
901 if (destination_delayer->time_to_live() == 0) {
902 destination_delayer->ScheduleReliable();
903 }
904 }
James Kuszmaul86e86c32022-07-21 17:39:47 -0700905 // Note: This exists to work around the fact that some users like to be able
906 // to send reliable messages while execution is stopped, creating a
907 // situation where the following sequencing can occur:
908 // 1) <While stopped> Send a reliable message on Node A (to be forwarded to
909 // Node B).
910 // 2) Node B starts up.
911 // 3) Anywhere from 0 to N seconds later, Node A starts up.
912 //
913 // In this case, we need the reliable message to make it to Node B, but it
914 // also shouldn't make it to Node B until Node A has started up.
915 //
916 // Ideally, if the user were to wait for the Node B OnRun callbacks to send
917 // the message, then that would trigger the watchers in the delayers.
918 // However, we so far have continued to support Sending while stopped....
919 for (RawMessageDelayer *source_delayer : source_delayers_) {
920 if (source_delayer->time_to_live() == 0) {
921 source_delayer->ScheduleReliable();
922 }
923 }
Austin Schuh58646e22021-08-23 23:51:46 -0700924 });
Austin Schuh2f8fd752020-09-01 22:38:28 -0700925}
926
Austin Schuhac6d89e2024-03-27 14:56:09 -0700927void SimulatedMessageBridge::State::SetSendData(
928 std::function<void(uint32_t, monotonic_clock::time_point)> fn) {
929 CHECK(!fn_);
930 fn_ = std::move(fn);
931 if (server_status_) {
932 server_status_->set_send_data(fn_);
933 }
934}
935
936void SimulatedMessageBridge::State::SetBootUUID(size_t node_index,
937 const UUID &boot_uuid) {
938 boot_uuids_[node_index] = boot_uuid;
939 const Node *node = node_factory_->configuration()->nodes()->Get(node_index);
940 if (server_status_) {
941 ServerConnection *connection = server_status_->FindServerConnection(node);
942 if (connection) {
943 if (boot_uuid == UUID::Zero()) {
944 server_status_->Disconnect(node_index);
945 server_status_->ResetFilter(node_index);
946 } else {
947 switch (server_state_[node_index]) {
948 case message_bridge::State::DISCONNECTED:
949 server_status_->Disconnect(node_index);
950 break;
951 case message_bridge::State::CONNECTED:
952 server_status_->Connect(node_index, event_loop->monotonic_now());
953 break;
954 }
955 server_status_->ResetFilter(node_index);
956 server_status_->SetBootUUID(node_index, boot_uuid);
957 }
958 }
959 }
960 if (client_status) {
961 const int client_index =
962 client_status->FindClientIndex(node->name()->string_view());
963 client_status->SampleReset(client_index);
964 if (boot_uuid == UUID::Zero()) {
965 client_status->Disconnect(client_index);
966 } else {
967 switch (client_state_[node_index]) {
968 case message_bridge::State::CONNECTED:
969 client_status->Connect(client_index);
970 break;
971 case message_bridge::State::DISCONNECTED:
972 client_status->Disconnect(client_index);
973 break;
974 }
975 }
976 }
977}
978
979void SimulatedMessageBridge::State::SetServerState(
980 const Node *destination, message_bridge::State state) {
981 const size_t node_index =
982 configuration::GetNodeIndex(node_factory_->configuration(), destination);
983 server_state_[node_index] = state;
984 if (server_status_) {
985 ServerConnection *connection =
986 server_status_->FindServerConnection(destination);
987 if (connection == nullptr) return;
988
989 if (state == connection->state()) {
990 return;
991 }
992 switch (state) {
993 case message_bridge::State::DISCONNECTED:
994 server_status_->Disconnect(node_index);
995 break;
996 case message_bridge::State::CONNECTED:
997 server_status_->Connect(node_index, event_loop->monotonic_now());
998 for (RawMessageDelayer *delayer : source_delayers_) {
999 if (delayer->SendingTo(destination)) {
1000 delayer->Connect();
1001 }
1002 }
1003 break;
1004 }
1005 }
1006}
1007
1008void SimulatedMessageBridge::State::SetClientState(
1009 const Node *source, message_bridge::State state) {
1010 const size_t node_index =
1011 configuration::GetNodeIndex(node_factory_->configuration(), source);
1012 client_state_[node_index] = state;
1013 if (client_status) {
1014 const int client_index =
1015 client_status->FindClientIndex(source->name()->string_view());
1016 ClientConnection *connection = client_status->GetClientConnection(source);
1017
1018 // TODO(austin): Are there cases where we want to dedup 2 CONNECTED
1019 // calls?
1020 if (connection->state() != state) {
1021 switch (state) {
1022 case message_bridge::State::CONNECTED:
1023 client_status->Connect(client_index);
1024 break;
1025 case message_bridge::State::DISCONNECTED:
1026 client_status->Disconnect(client_index);
1027 break;
1028 }
1029 }
1030 }
1031}
1032
Stephan Pleinesf63bde82024-01-13 15:59:33 -08001033} // namespace aos::message_bridge