blob: 31f3f0e0c204e416911e6f2019ceb2a966512d15 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
Austin Schuh0de30f32020-12-06 12:44:28 -08003#include "absl/strings/str_cat.h"
4#include "aos/configuration.h"
Austin Schuh898f4972020-01-11 17:21:25 -08005#include "aos/events/event_loop.h"
6#include "aos/events/simulated_event_loop.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08007#include "aos/network/remote_message_generated.h"
Austin Schuh898f4972020-01-11 17:21:25 -08008
9namespace aos {
10namespace message_bridge {
11
12// This class delays messages forwarded between two factories.
13//
14// The basic design is that we need to use the distributed_clock to convert
15// monotonic times from the source to the destination node. We also use a
16// fetcher to manage the queue of data, and a timer to schedule the sends.
17class RawMessageDelayer {
18 public:
Austin Schuh58646e22021-08-23 23:51:46 -070019 RawMessageDelayer(const Channel *channel, const Connection *connection,
20 aos::NodeEventLoopFactory *fetch_node_factory,
Austin Schuh898f4972020-01-11 17:21:25 -080021 aos::NodeEventLoopFactory *send_node_factory,
Austin Schuh58646e22021-08-23 23:51:46 -070022 size_t destination_node_index, bool delivery_time_is_logged)
23 : channel_(channel),
24 connection_(connection),
25 fetch_node_factory_(fetch_node_factory),
Austin Schuh898f4972020-01-11 17:21:25 -080026 send_node_factory_(send_node_factory),
Austin Schuh20ac95d2020-12-05 17:24:19 -080027 destination_node_index_(destination_node_index),
Austin Schuh58646e22021-08-23 23:51:46 -070028 channel_index_(configuration::ChannelIndex(
29 fetch_node_factory_->configuration(), channel_)),
30 delivery_time_is_logged_(delivery_time_is_logged) {}
Austin Schuh898f4972020-01-11 17:21:25 -080031
Austin Schuh58646e22021-08-23 23:51:46 -070032 bool forwarding_disabled() const { return forwarding_disabled_; }
33 void set_forwarding_disabled(bool forwarding_disabled) {
34 forwarding_disabled_ = forwarding_disabled;
James Kuszmaul94ca5132022-07-19 09:11:08 -070035 if (!forwarding_disabled_) {
36 CHECK(timestamp_logger_ == nullptr);
37 CHECK(sender_ == nullptr);
38 }
Austin Schuh898f4972020-01-11 17:21:25 -080039 }
40
Austin Schuh58646e22021-08-23 23:51:46 -070041 void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
42 MessageBridgeServerStatus *server_status,
43 ChannelTimestampSender *timestamp_loggers) {
44 sent_ = false;
45 fetch_event_loop_ = fetch_event_loop;
46 if (fetch_event_loop_) {
47 fetcher_ = fetch_event_loop_->MakeRawFetcher(channel_);
48 } else {
49 fetcher_ = nullptr;
50 }
51
52 server_status_ = server_status;
53 if (server_status) {
54 server_connection_ =
55 server_status_->FindServerConnection(send_node_factory_->node());
56 }
James Kuszmaul94ca5132022-07-19 09:11:08 -070057 if (delivery_time_is_logged_ && timestamp_loggers != nullptr &&
58 !forwarding_disabled_) {
Austin Schuh58646e22021-08-23 23:51:46 -070059 timestamp_logger_ =
60 timestamp_loggers->SenderForChannel(channel_, connection_);
61 } else {
62 timestamp_logger_ = nullptr;
63 }
64
65 if (fetch_event_loop_) {
66 timestamp_timer_ =
67 fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
68 if (send_event_loop_) {
milind1f1dca32021-07-03 13:50:07 -070069 std::string timer_name =
70 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
71 fetcher_->channel()->name()->string_view(), " ",
72 fetcher_->channel()->type()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -070073 if (timer_) {
74 timer_->set_name(timer_name);
75 }
76 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
77 }
78 } else {
79 timestamp_timer_ = nullptr;
80 }
81 }
82
83 void SetSendEventLoop(aos::EventLoop *send_event_loop,
84 MessageBridgeClientStatus *client_status) {
85 sent_ = false;
86 send_event_loop_ = send_event_loop;
James Kuszmaul94ca5132022-07-19 09:11:08 -070087 if (send_event_loop_ && !forwarding_disabled_) {
Austin Schuh58646e22021-08-23 23:51:46 -070088 sender_ = send_event_loop_->MakeRawSender(channel_);
89 } else {
90 sender_ = nullptr;
91 }
92
93 client_status_ = client_status;
94 if (client_status_) {
95 client_index_ = client_status_->FindClientIndex(
96 channel_->source_node()->string_view());
97 client_connection_ = client_status_->GetClientConnection(client_index_);
98 } else {
99 client_index_ = -1;
100 client_connection_ = nullptr;
101 }
102
103 if (send_event_loop_) {
104 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
105 if (fetcher_) {
106 std::string timer_name =
107 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
108 fetcher_->channel()->name()->string_view(), " ",
109 fetcher_->channel()->type()->string_view());
110 timer_->set_name(timer_name);
111 if (timestamp_timer_) {
112 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
113 }
114 }
115 } else {
116 timer_ = nullptr;
117 }
118 }
119
120 const Channel *channel() const { return channel_; }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800121
Austin Schuh4c570ea2020-11-19 23:13:24 -0800122 uint32_t time_to_live() {
Austin Schuh58646e22021-08-23 23:51:46 -0700123 return configuration::ConnectionToNode(channel_, send_node_factory_->node())
Austin Schuh4c570ea2020-11-19 23:13:24 -0800124 ->time_to_live();
125 }
126
Austin Schuh58646e22021-08-23 23:51:46 -0700127 void ScheduleReliable() {
128 if (forwarding_disabled()) return;
129
130 if (!fetcher_) {
131 return;
132 }
133 if (fetcher_->context().data == nullptr || sent_) {
134 sent_ = !fetcher_->Fetch();
135 }
136
137 FetchNext();
138 if (fetcher_->context().data == nullptr || sent_) {
139 return;
140 }
Austin Schuh58646e22021-08-23 23:51:46 -0700141
142 // Send at startup. It is the best we can do.
143 const monotonic_clock::time_point monotonic_delivered_time =
144 send_node_factory_->monotonic_now() +
145 send_node_factory_->network_delay();
146
147 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
148 << ": Trying to deliver message in the past on channel "
149 << configuration::StrippedChannelToString(fetcher_->channel())
150 << " to node " << send_event_loop_->node()->name()->string_view()
151 << " sent from " << fetcher_->channel()->source_node()->string_view()
152 << " at " << fetch_node_factory_->monotonic_now();
153
154 if (timer_) {
155 server_connection_->mutate_sent_packets(
156 server_connection_->sent_packets() + 1);
157 timer_->Setup(monotonic_delivered_time);
158 timer_scheduled_ = true;
159 } else {
160 server_connection_->mutate_dropped_packets(
161 server_connection_->dropped_packets() + 1);
162 sent_ = true;
163 }
164 }
165
166 bool timer_scheduled_ = false;
167
Austin Schuh898f4972020-01-11 17:21:25 -0800168 // Kicks us to re-fetch and schedule the timer.
169 void Schedule() {
Austin Schuh58646e22021-08-23 23:51:46 -0700170 CHECK(!forwarding_disabled());
171 if (!fetcher_) {
172 return;
173 }
174 if (timer_scheduled_) {
175 return;
176 }
177 FetchNext();
178 if (fetcher_->context().data == nullptr || sent_) {
179 return;
180 }
181
182 // Compute the time to publish this message.
183 const monotonic_clock::time_point monotonic_delivered_time =
184 DeliveredTime(fetcher_->context());
185
186 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
187 << ": Trying to deliver message in the past on channel "
188 << configuration::StrippedChannelToString(fetcher_->channel())
189 << " to node " << send_event_loop_->node()->name()->string_view()
190 << " sent from " << fetcher_->channel()->source_node()->string_view()
191 << " at " << fetch_node_factory_->monotonic_now();
192
193 if (timer_) {
194 server_connection_->mutate_sent_packets(
195 server_connection_->sent_packets() + 1);
196 timer_->Setup(monotonic_delivered_time);
197 timer_scheduled_ = true;
198 } else {
199 server_connection_->mutate_dropped_packets(
200 server_connection_->dropped_packets() + 1);
201 sent_ = true;
202 Schedule();
203 }
204 }
205
206 private:
207 void FetchNext() {
208 CHECK(server_connection_);
Austin Schuh6aa77be2020-02-22 21:06:40 -0800209 // Keep pulling messages out of the fetcher until we find one in the future.
210 while (true) {
211 if (fetcher_->context().data == nullptr || sent_) {
212 sent_ = !fetcher_->FetchNext();
213 }
214 if (sent_) {
215 break;
216 }
Austin Schuhc0b0f722020-12-12 18:36:06 -0800217
218 if (server_connection_->state() != State::CONNECTED) {
219 sent_ = true;
220 server_connection_->mutate_dropped_packets(
221 server_connection_->dropped_packets() + 1);
222 continue;
223 }
224
Austin Schuh6aa77be2020-02-22 21:06:40 -0800225 if (fetcher_->context().monotonic_event_time +
Austin Schuh58646e22021-08-23 23:51:46 -0700226 send_node_factory_->network_delay() +
227 send_node_factory_->send_delay() >
228 fetch_node_factory_->monotonic_now() ||
229 time_to_live() == 0) {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800230 break;
231 }
232
233 // TODO(austin): Not cool. We want to actually forward these. This means
234 // we need a more sophisticated concept of what is running.
Austin Schuh60e77942022-05-16 17:48:24 -0700235 // TODO(james): This fails if multiple messages are sent on the same
236 // channel within the same callback.
Austin Schuh6aa77be2020-02-22 21:06:40 -0800237 LOG(WARNING) << "Not forwarding message on "
238 << configuration::CleanedChannelToString(fetcher_->channel())
239 << " because we aren't running. Set at "
240 << fetcher_->context().monotonic_event_time << " now is "
241 << fetch_node_factory_->monotonic_now();
242 sent_ = true;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700243 server_connection_->mutate_dropped_packets(
244 server_connection_->dropped_packets() + 1);
Austin Schuh898f4972020-01-11 17:21:25 -0800245 }
Austin Schuh898f4972020-01-11 17:21:25 -0800246 }
247
Austin Schuh58646e22021-08-23 23:51:46 -0700248 // Actually sends the message, and reschedules.
Austin Schuh898f4972020-01-11 17:21:25 -0800249 void Send() {
Austin Schuh58646e22021-08-23 23:51:46 -0700250 timer_scheduled_ = false;
251 CHECK(sender_);
252 CHECK(client_status_);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800253 if (server_connection_->state() != State::CONNECTED) {
254 sent_ = true;
255 Schedule();
256 return;
257 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700258 // Fill out the send times.
milind1f1dca32021-07-03 13:50:07 -0700259 sender_->CheckOk(sender_->Send(
260 fetcher_->context().data, fetcher_->context().size,
261 fetcher_->context().monotonic_event_time,
262 fetcher_->context().realtime_event_time,
263 fetcher_->context().queue_index, fetcher_->context().source_boot_uuid));
Austin Schuh898f4972020-01-11 17:21:25 -0800264
Austin Schuh4c3b9702020-08-30 11:34:55 -0700265 // And simulate message_bridge's offset recovery.
Austin Schuh367a7f42021-11-23 23:04:36 -0800266 client_status_->SampleFilter(
267 client_index_, fetcher_->context().monotonic_event_time,
268 sender_->monotonic_sent_time(), fetcher_->context().source_boot_uuid);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700269
270 client_connection_->mutate_received_packets(
271 client_connection_->received_packets() + 1);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700272
273 if (timestamp_logger_) {
Austin Schuheeaa2022021-01-02 21:52:03 -0800274 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800275 fbb.ForceDefaults(true);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800276 // Reset the filter every time the UUID changes. There's probably a more
277 // clever way to do this, but that means a better concept of rebooting.
278 if (server_status_->BootUUID(destination_node_index_) !=
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800279 send_node_factory_->boot_uuid()) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800280 server_status_->ResetFilter(destination_node_index_);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800281 server_status_->SetBootUUID(destination_node_index_,
282 send_node_factory_->boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800283 }
284
Austin Schuhcdd90272021-03-15 12:46:16 -0700285 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
286 send_node_factory_->boot_uuid().PackVector(&fbb);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800287
Austin Schuheeaa2022021-01-02 21:52:03 -0800288 RemoteMessage::Builder message_header_builder(fbb);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700289
290 message_header_builder.add_channel_index(channel_index_);
291
292 // Swap the remote and sent metrics. They are from the sender's
293 // perspective, not the receiver's perspective.
294 message_header_builder.add_monotonic_remote_time(
295 fetcher_->context().monotonic_event_time.time_since_epoch().count());
296 message_header_builder.add_realtime_remote_time(
297 fetcher_->context().realtime_event_time.time_since_epoch().count());
298 message_header_builder.add_remote_queue_index(
299 fetcher_->context().queue_index);
300
301 message_header_builder.add_monotonic_sent_time(
302 sender_->monotonic_sent_time().time_since_epoch().count());
303 message_header_builder.add_realtime_sent_time(
304 sender_->realtime_sent_time().time_since_epoch().count());
305 message_header_builder.add_queue_index(sender_->sent_queue_index());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800306 message_header_builder.add_boot_uuid(boot_uuid_offset);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700307
Austin Schuheeaa2022021-01-02 21:52:03 -0800308 fbb.Finish(message_header_builder.Finish());
309
310 remote_timestamps_.emplace_back(
311 FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
312 fetch_node_factory_->monotonic_now() +
313 send_node_factory_->network_delay());
314 ScheduleTimestamp();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700315 }
316
Austin Schuh898f4972020-01-11 17:21:25 -0800317 sent_ = true;
318 Schedule();
319 }
320
Austin Schuheeaa2022021-01-02 21:52:03 -0800321 // Schedules sending the next timestamp in remote_timestamps_ if there is one.
322 void ScheduleTimestamp() {
323 if (remote_timestamps_.empty()) {
324 timestamp_timer_->Disable();
325 return;
326 }
327
328 if (scheduled_time_ !=
329 remote_timestamps_.front().monotonic_timestamp_time) {
330 timestamp_timer_->Setup(
331 remote_timestamps_.front().monotonic_timestamp_time);
332 scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
333 return;
334 } else {
335 scheduled_time_ = monotonic_clock::min_time;
336 }
337 }
338
339 // Sends the next timestamp in remote_timestamps_.
340 void SendTimestamp() {
341 CHECK(!remote_timestamps_.empty());
342
343 // Send out all timestamps at the currently scheduled time.
344 while (remote_timestamps_.front().monotonic_timestamp_time ==
345 scheduled_time_) {
346 if (server_connection_->state() == State::CONNECTED) {
milind1f1dca32021-07-03 13:50:07 -0700347 timestamp_logger_->CheckOk(timestamp_logger_->Send(
348 std::move(remote_timestamps_.front().remote_message)));
Austin Schuheeaa2022021-01-02 21:52:03 -0800349 }
350 remote_timestamps_.pop_front();
351 if (remote_timestamps_.empty()) {
352 break;
353 }
354 }
355
356 ScheduleTimestamp();
357 }
358
Austin Schuh898f4972020-01-11 17:21:25 -0800359 // Converts from time on the sending node to time on the receiving node.
360 monotonic_clock::time_point DeliveredTime(const Context &context) const {
361 const distributed_clock::time_point distributed_sent_time =
362 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
363
Austin Schuh58646e22021-08-23 23:51:46 -0700364 const logger::BootTimestamp t = send_node_factory_->FromDistributedClock(
Austin Schuh2febf0d2020-09-21 22:24:30 -0700365 distributed_sent_time + send_node_factory_->network_delay() +
366 send_node_factory_->send_delay());
Austin Schuh58646e22021-08-23 23:51:46 -0700367 CHECK_EQ(t.boot, send_node_factory_->boot_count());
368 return t.time;
Austin Schuh898f4972020-01-11 17:21:25 -0800369 }
370
Austin Schuh58646e22021-08-23 23:51:46 -0700371 const Channel *channel_;
372 const Connection *connection_;
373
Austin Schuh898f4972020-01-11 17:21:25 -0800374 // Factories used for time conversion.
375 aos::NodeEventLoopFactory *fetch_node_factory_;
376 aos::NodeEventLoopFactory *send_node_factory_;
377
Austin Schuheeaa2022021-01-02 21:52:03 -0800378 // Event loop which fetching and sending timestamps are scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700379 aos::EventLoop *fetch_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800380 // Event loop which sending is scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700381 aos::EventLoop *send_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800382 // Timer used to send.
Austin Schuh58646e22021-08-23 23:51:46 -0700383 aos::TimerHandler *timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800384 // Timer used to send timestamps out.
Austin Schuh58646e22021-08-23 23:51:46 -0700385 aos::TimerHandler *timestamp_timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800386 // Time that the timer is scheduled for. Used to track if it needs to be
387 // rescheduled.
388 monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
389
Austin Schuh898f4972020-01-11 17:21:25 -0800390 // Fetcher used to receive messages.
391 std::unique_ptr<aos::RawFetcher> fetcher_;
392 // Sender to send them back out.
393 std::unique_ptr<aos::RawSender> sender_;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800394
Austin Schuh58646e22021-08-23 23:51:46 -0700395 MessageBridgeServerStatus *server_status_ = nullptr;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800396 const size_t destination_node_index_;
Austin Schuh898f4972020-01-11 17:21:25 -0800397 // True if we have sent the message in the fetcher.
398 bool sent_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700399
400 ServerConnection *server_connection_ = nullptr;
401 MessageBridgeClientStatus *client_status_ = nullptr;
Austin Schuh58646e22021-08-23 23:51:46 -0700402 int client_index_ = -1;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700403 ClientConnection *client_connection_ = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700404
405 size_t channel_index_;
Austin Schuh0de30f32020-12-06 12:44:28 -0800406 aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800407
408 struct Timestamp {
409 Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
410 monotonic_clock::time_point new_monotonic_timestamp_time)
411 : remote_message(std::move(new_remote_message)),
412 monotonic_timestamp_time(new_monotonic_timestamp_time) {}
413 FlatbufferDetachedBuffer<RemoteMessage> remote_message;
414 monotonic_clock::time_point monotonic_timestamp_time;
415 };
416
417 std::deque<Timestamp> remote_timestamps_;
Austin Schuh58646e22021-08-23 23:51:46 -0700418
419 bool delivery_time_is_logged_;
420
421 bool forwarding_disabled_ = false;
Austin Schuh898f4972020-01-11 17:21:25 -0800422};
423
424SimulatedMessageBridge::SimulatedMessageBridge(
425 SimulatedEventLoopFactory *simulated_event_loop_factory) {
426 CHECK(
427 configuration::MultiNode(simulated_event_loop_factory->configuration()));
428
429 // Pre-build up event loops for every node. They are pretty cheap anyways.
430 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700431 NodeEventLoopFactory *node_factory =
432 simulated_event_loop_factory->GetNodeEventLoopFactory(node);
433 auto it = event_loop_map_.emplace(node, node_factory);
Austin Schuhcde938c2020-02-02 17:30:07 -0800434 CHECK(it.second);
435
Austin Schuh58646e22021-08-23 23:51:46 -0700436 node_factory->OnStartup(
437 [this, simulated_event_loop_factory, node_state = &it.first->second]() {
438 node_state->MakeEventLoop();
439 const size_t my_node_index = configuration::GetNodeIndex(
440 simulated_event_loop_factory->configuration(),
441 node_state->event_loop->node());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700442
Austin Schuh58646e22021-08-23 23:51:46 -0700443 size_t node_index = 0;
444 for (ServerConnection *connection :
445 node_state->server_status->server_connection()) {
446 if (connection != nullptr) {
447 node_state->server_status->ResetFilter(node_index);
448 }
449 ++node_index;
450 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700451
Austin Schuh58646e22021-08-23 23:51:46 -0700452 for (const ClientConnection *client_connections :
453 *node_state->client_status->mutable_client_statistics()
454 ->connections()) {
455 const Node *client_node = configuration::GetNode(
456 simulated_event_loop_factory->configuration(),
457 client_connections->node()->name()->string_view());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700458
Austin Schuh58646e22021-08-23 23:51:46 -0700459 auto client_event_loop = event_loop_map_.find(client_node);
460 client_event_loop->second.SetBootUUID(
461 my_node_index, node_state->event_loop->boot_uuid());
462 }
463 });
Austin Schuh4c3b9702020-08-30 11:34:55 -0700464
Austin Schuh58646e22021-08-23 23:51:46 -0700465 node_factory->OnShutdown([node_state = &it.first->second]() {
466 node_state->SetEventLoop(nullptr);
467 });
Austin Schuh20ac95d2020-12-05 17:24:19 -0800468 }
469
Austin Schuh898f4972020-01-11 17:21:25 -0800470 for (const Channel *channel :
471 *simulated_event_loop_factory->configuration()->channels()) {
472 if (!channel->has_destination_nodes()) {
473 continue;
474 }
475
476 // Find the sending node.
Austin Schuh58646e22021-08-23 23:51:46 -0700477 const Node *source_node =
Austin Schuh898f4972020-01-11 17:21:25 -0800478 configuration::GetNode(simulated_event_loop_factory->configuration(),
479 channel->source_node()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -0700480 auto source_event_loop = event_loop_map_.find(source_node);
Austin Schuh898f4972020-01-11 17:21:25 -0800481 CHECK(source_event_loop != event_loop_map_.end());
482
483 std::unique_ptr<DelayersVector> delayers =
484 std::make_unique<DelayersVector>();
485
486 // And then build up a RawMessageDelayer for each destination.
487 for (const Connection *connection : *channel->destination_nodes()) {
488 const Node *destination_node =
489 configuration::GetNode(simulated_event_loop_factory->configuration(),
490 connection->name()->string_view());
491 auto destination_event_loop = event_loop_map_.find(destination_node);
492 CHECK(destination_event_loop != event_loop_map_.end());
493
Austin Schuh2f8fd752020-09-01 22:38:28 -0700494 const size_t destination_node_index = configuration::GetNodeIndex(
495 simulated_event_loop_factory->configuration(), destination_node);
496
497 const bool delivery_time_is_logged =
Austin Schuh58646e22021-08-23 23:51:46 -0700498 configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
499 source_node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700500
Austin Schuh58646e22021-08-23 23:51:46 -0700501 delayers->v.emplace_back(std::make_unique<RawMessageDelayer>(
502 channel, connection,
503 simulated_event_loop_factory->GetNodeEventLoopFactory(source_node),
Austin Schuh898f4972020-01-11 17:21:25 -0800504 simulated_event_loop_factory->GetNodeEventLoopFactory(
505 destination_node),
Austin Schuh58646e22021-08-23 23:51:46 -0700506 destination_node_index, delivery_time_is_logged));
507
508 source_event_loop->second.AddSourceDelayer(delayers->v.back().get());
509 destination_event_loop->second.AddDestinationDelayer(
510 delayers->v.back().get());
Austin Schuh898f4972020-01-11 17:21:25 -0800511 }
512
Austin Schuh4c3b9702020-08-30 11:34:55 -0700513 const Channel *const timestamp_channel = configuration::GetChannel(
514 simulated_event_loop_factory->configuration(), "/aos",
Austin Schuh58646e22021-08-23 23:51:46 -0700515 Timestamp::GetFullyQualifiedName(), "message_bridge", source_node);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700516
517 if (channel == timestamp_channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700518 source_event_loop->second.SetSendData(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700519 [captured_delayers = delayers.get()](const Context &) {
520 for (std::unique_ptr<RawMessageDelayer> &delayer :
Austin Schuh58646e22021-08-23 23:51:46 -0700521 captured_delayers->v) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700522 delayer->Schedule();
523 }
524 });
525 } else {
Austin Schuh58646e22021-08-23 23:51:46 -0700526 source_event_loop->second.AddDelayerWatcher(channel, delayers.get());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700527 }
Austin Schuh898f4972020-01-11 17:21:25 -0800528 delayers_list_.emplace_back(std::move(delayers));
529 }
530}
531
532SimulatedMessageBridge::~SimulatedMessageBridge() {}
533
Austin Schuh6f3babe2020-01-26 20:34:50 -0800534void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700535 for (std::unique_ptr<DelayersVector> &delayers : delayers_list_) {
536 if (delayers->v.size() > 0) {
537 if (delayers->v[0]->channel() == channel) {
538 delayers->disable_forwarding = true;
539 for (std::unique_ptr<RawMessageDelayer> &delayer : delayers->v) {
540 delayer->set_forwarding_disabled(true);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800541 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800542 }
543 }
544 }
545}
546
Austin Schuhc0b0f722020-12-12 18:36:06 -0800547void SimulatedMessageBridge::Disconnect(const Node *source,
548 const Node *destination) {
549 SetState(source, destination, message_bridge::State::DISCONNECTED);
550}
551
552void SimulatedMessageBridge::Connect(const Node *source,
553 const Node *destination) {
554 SetState(source, destination, message_bridge::State::CONNECTED);
555}
556void SimulatedMessageBridge::SetState(const Node *source,
557 const Node *destination,
558 message_bridge::State state) {
559 auto source_state = event_loop_map_.find(source);
560 CHECK(source_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700561 source_state->second.SetServerState(destination, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800562
563 auto destination_state = event_loop_map_.find(destination);
564 CHECK(destination_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700565 destination_state->second.SetClientState(source, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800566}
567
James Kuszmaul94ca5132022-07-19 09:11:08 -0700568void SimulatedMessageBridge::DisableStatistics(DestroySenders destroy_senders) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700569 for (std::pair<const Node *const, State> &state : event_loop_map_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700570 state.second.DisableStatistics(destroy_senders);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700571 }
572}
573
James Kuszmaul94ca5132022-07-19 09:11:08 -0700574void SimulatedMessageBridge::DisableStatistics(const Node *node,
575 DestroySenders destroy_senders) {
Austin Schuh48205e62021-11-12 14:13:18 -0800576 auto it = event_loop_map_.find(node);
577 CHECK(it != event_loop_map_.end());
James Kuszmaul94ca5132022-07-19 09:11:08 -0700578 it->second.DisableStatistics(destroy_senders);
Austin Schuh48205e62021-11-12 14:13:18 -0800579}
580
581void SimulatedMessageBridge::EnableStatistics() {
582 for (std::pair<const Node *const, State> &state : event_loop_map_) {
583 state.second.EnableStatistics();
584 }
585}
586
587void SimulatedMessageBridge::EnableStatistics(const Node *node) {
588 auto it = event_loop_map_.find(node);
589 CHECK(it != event_loop_map_.end());
590 it->second.EnableStatistics();
591}
592
Austin Schuh58646e22021-08-23 23:51:46 -0700593void SimulatedMessageBridge::State::SetEventLoop(
594 std::unique_ptr<aos::EventLoop> loop) {
595 if (!loop) {
596 timestamp_loggers = ChannelTimestampSender(nullptr);
597 server_status.reset();
598 client_status.reset();
599 for (RawMessageDelayer *source_delayer : source_delayers_) {
600 source_delayer->SetFetchEventLoop(nullptr, nullptr, nullptr);
601 }
602 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
603 destination_delayer->SetSendEventLoop(nullptr, nullptr);
604 }
605 event_loop = std::move(loop);
606 return;
607 } else {
608 CHECK(!event_loop);
609 }
610 event_loop = std::move(loop);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700611
Austin Schuh58646e22021-08-23 23:51:46 -0700612 event_loop->SkipTimingReport();
613 event_loop->SkipAosLog();
614
615 for (std::pair<const Channel *, DelayersVector *> &watcher :
616 delayer_watchers_) {
617 // Don't register watchers if we know we aren't forwarding.
618 if (watcher.second->disable_forwarding) continue;
619 event_loop->MakeRawNoArgWatcher(
620 watcher.first, [captured_delayers = watcher.second](const Context &) {
621 // We might get told after registering, so don't forward at that point
622 // too.
623 for (std::unique_ptr<RawMessageDelayer> &delayer :
624 captured_delayers->v) {
625 delayer->Schedule();
626 }
627 });
628 }
629
630 timestamp_loggers = ChannelTimestampSender(event_loop.get());
631 server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
Austin Schuh48205e62021-11-12 14:13:18 -0800632 if (disable_statistics_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700633 server_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
Austin Schuh48205e62021-11-12 14:13:18 -0800634 }
Austin Schuh58646e22021-08-23 23:51:46 -0700635
636 {
637 size_t node_index = 0;
638 for (ServerConnection *connection : server_status->server_connection()) {
639 if (connection) {
640 if (boot_uuids_[node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800641 switch (server_state_[node_index]) {
642 case message_bridge::State::DISCONNECTED:
643 server_status->Disconnect(node_index);
644 break;
645 case message_bridge::State::CONNECTED:
646 server_status->Connect(node_index, event_loop->monotonic_now());
647 break;
648 }
Austin Schuh58646e22021-08-23 23:51:46 -0700649 } else {
Austin Schuh367a7f42021-11-23 23:04:36 -0800650 server_status->Disconnect(node_index);
Austin Schuh58646e22021-08-23 23:51:46 -0700651 }
652 }
653 ++node_index;
654 }
655 }
656
657 for (size_t i = 0; i < boot_uuids_.size(); ++i) {
658 if (boot_uuids_[i] != UUID::Zero()) {
659 server_status->SetBootUUID(i, boot_uuids_[i]);
660 }
661 }
Austin Schuh58646e22021-08-23 23:51:46 -0700662 if (fn_) {
663 server_status->set_send_data(fn_);
664 }
665 client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
666 if (disable_statistics_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700667 client_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
Austin Schuh58646e22021-08-23 23:51:46 -0700668 }
669
670 for (size_t i = 0;
671 i < client_status->mutable_client_statistics()->connections()->size();
672 ++i) {
673 ClientConnection *client_connection =
674 client_status->mutable_client_statistics()
675 ->mutable_connections()
676 ->GetMutableObject(i);
677 const Node *client_node = configuration::GetNode(
678 node_factory_->configuration(),
679 client_connection->node()->name()->string_view());
680 const size_t client_node_index = configuration::GetNodeIndex(
681 node_factory_->configuration(), client_node);
682 if (boot_uuids_[client_node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800683 if (client_connection->state() != client_state_[client_node_index]) {
684 switch (client_state_[client_node_index]) {
685 case message_bridge::State::DISCONNECTED:
686 client_status->Disconnect(i);
687 break;
688 case message_bridge::State::CONNECTED:
689 client_status->Connect(i);
690 break;
691 }
692 }
Austin Schuh58646e22021-08-23 23:51:46 -0700693 } else {
Austin Schuh367a7f42021-11-23 23:04:36 -0800694 client_status->Disconnect(i);
Austin Schuh58646e22021-08-23 23:51:46 -0700695 }
696 }
697
Austin Schuh2f8fd752020-09-01 22:38:28 -0700698 for (const Channel *channel : *event_loop->configuration()->channels()) {
699 CHECK(channel->has_source_node());
700
701 // Sent by us.
702 if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
703 channel->has_destination_nodes()) {
704 for (const Connection *connection : *channel->destination_nodes()) {
705 const bool delivery_time_is_logged =
706 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
707 connection, event_loop->node());
708
James Kuszmaul94ca5132022-07-19 09:11:08 -0700709 const RawMessageDelayer *delayer = nullptr;
710 for (const RawMessageDelayer *candidate : source_delayers_) {
711 if (candidate->channel() == channel) {
712 delayer = candidate;
713 }
714 }
715
Austin Schuh2f8fd752020-09-01 22:38:28 -0700716 // And the timestamps are then logged back by us again.
James Kuszmaul94ca5132022-07-19 09:11:08 -0700717 if (!delivery_time_is_logged ||
718 CHECK_NOTNULL(delayer)->forwarding_disabled()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700719 continue;
720 }
721
Austin Schuh89c9b812021-02-20 14:42:10 -0800722 timestamp_loggers.SenderForChannel(channel, connection);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700723 }
724 }
725 }
Austin Schuh58646e22021-08-23 23:51:46 -0700726
727 for (RawMessageDelayer *source_delayer : source_delayers_) {
728 source_delayer->SetFetchEventLoop(event_loop.get(), server_status.get(),
729 &timestamp_loggers);
730 }
731 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
732 destination_delayer->SetSendEventLoop(event_loop.get(),
733 client_status.get());
734 }
735 event_loop->OnRun([this]() {
736 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
737 if (destination_delayer->time_to_live() == 0) {
738 destination_delayer->ScheduleReliable();
739 }
740 }
James Kuszmaul86e86c32022-07-21 17:39:47 -0700741 // Note: This exists to work around the fact that some users like to be able
742 // to send reliable messages while execution is stopped, creating a
743 // situation where the following sequencing can occur:
744 // 1) <While stopped> Send a reliable message on Node A (to be forwarded to
745 // Node B).
746 // 2) Node B starts up.
747 // 3) Anywhere from 0 to N seconds later, Node A starts up.
748 //
749 // In this case, we need the reliable message to make it to Node B, but it
750 // also shouldn't make it to Node B until Node A has started up.
751 //
752 // Ideally, if the user were to wait for the Node B OnRun callbacks to send
753 // the message, then that would trigger the watchers in the delayers.
754 // However, we so far have continued to support Sending while stopped....
755 for (RawMessageDelayer *source_delayer : source_delayers_) {
756 if (source_delayer->time_to_live() == 0) {
757 source_delayer->ScheduleReliable();
758 }
759 }
Austin Schuh58646e22021-08-23 23:51:46 -0700760 });
Austin Schuh2f8fd752020-09-01 22:38:28 -0700761}
762
Austin Schuh898f4972020-01-11 17:21:25 -0800763} // namespace message_bridge
764} // namespace aos