blob: c8a8150d867d3155ec1bf2f57ff456fbf42a3c72 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
Austin Schuh0de30f32020-12-06 12:44:28 -08003#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07004
Austin Schuh0de30f32020-12-06 12:44:28 -08005#include "aos/configuration.h"
Austin Schuh898f4972020-01-11 17:21:25 -08006#include "aos/events/event_loop.h"
7#include "aos/events/simulated_event_loop.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08008#include "aos/network/remote_message_generated.h"
Austin Schuh898f4972020-01-11 17:21:25 -08009
10namespace aos {
11namespace message_bridge {
12
13// This class delays messages forwarded between two factories.
14//
15// The basic design is that we need to use the distributed_clock to convert
16// monotonic times from the source to the destination node. We also use a
17// fetcher to manage the queue of data, and a timer to schedule the sends.
18class RawMessageDelayer {
19 public:
Austin Schuh58646e22021-08-23 23:51:46 -070020 RawMessageDelayer(const Channel *channel, const Connection *connection,
21 aos::NodeEventLoopFactory *fetch_node_factory,
Austin Schuh898f4972020-01-11 17:21:25 -080022 aos::NodeEventLoopFactory *send_node_factory,
Austin Schuh58646e22021-08-23 23:51:46 -070023 size_t destination_node_index, bool delivery_time_is_logged)
24 : channel_(channel),
25 connection_(connection),
26 fetch_node_factory_(fetch_node_factory),
Austin Schuh898f4972020-01-11 17:21:25 -080027 send_node_factory_(send_node_factory),
Austin Schuh20ac95d2020-12-05 17:24:19 -080028 destination_node_index_(destination_node_index),
Austin Schuh58646e22021-08-23 23:51:46 -070029 channel_index_(configuration::ChannelIndex(
30 fetch_node_factory_->configuration(), channel_)),
31 delivery_time_is_logged_(delivery_time_is_logged) {}
Austin Schuh898f4972020-01-11 17:21:25 -080032
Austin Schuh58646e22021-08-23 23:51:46 -070033 bool forwarding_disabled() const { return forwarding_disabled_; }
34 void set_forwarding_disabled(bool forwarding_disabled) {
35 forwarding_disabled_ = forwarding_disabled;
James Kuszmaul94ca5132022-07-19 09:11:08 -070036 if (!forwarding_disabled_) {
37 CHECK(timestamp_logger_ == nullptr);
38 CHECK(sender_ == nullptr);
39 }
Austin Schuh898f4972020-01-11 17:21:25 -080040 }
41
Austin Schuh58646e22021-08-23 23:51:46 -070042 void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
43 MessageBridgeServerStatus *server_status,
44 ChannelTimestampSender *timestamp_loggers) {
45 sent_ = false;
46 fetch_event_loop_ = fetch_event_loop;
47 if (fetch_event_loop_) {
48 fetcher_ = fetch_event_loop_->MakeRawFetcher(channel_);
49 } else {
50 fetcher_ = nullptr;
51 }
52
53 server_status_ = server_status;
54 if (server_status) {
55 server_connection_ =
56 server_status_->FindServerConnection(send_node_factory_->node());
James Kuszmaula6681e22023-05-26 11:20:40 -070057 server_index_ = configuration::GetNodeIndex(
58 send_node_factory_->configuration(), send_node_factory_->node());
Austin Schuh58646e22021-08-23 23:51:46 -070059 }
James Kuszmaul94ca5132022-07-19 09:11:08 -070060 if (delivery_time_is_logged_ && timestamp_loggers != nullptr &&
61 !forwarding_disabled_) {
Austin Schuh58646e22021-08-23 23:51:46 -070062 timestamp_logger_ =
63 timestamp_loggers->SenderForChannel(channel_, connection_);
64 } else {
65 timestamp_logger_ = nullptr;
66 }
67
68 if (fetch_event_loop_) {
69 timestamp_timer_ =
70 fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
71 if (send_event_loop_) {
milind1f1dca32021-07-03 13:50:07 -070072 std::string timer_name =
73 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
74 fetcher_->channel()->name()->string_view(), " ",
75 fetcher_->channel()->type()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -070076 if (timer_) {
77 timer_->set_name(timer_name);
78 }
79 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
80 }
81 } else {
82 timestamp_timer_ = nullptr;
83 }
84 }
85
86 void SetSendEventLoop(aos::EventLoop *send_event_loop,
87 MessageBridgeClientStatus *client_status) {
88 sent_ = false;
89 send_event_loop_ = send_event_loop;
James Kuszmaul94ca5132022-07-19 09:11:08 -070090 if (send_event_loop_ && !forwarding_disabled_) {
Austin Schuh58646e22021-08-23 23:51:46 -070091 sender_ = send_event_loop_->MakeRawSender(channel_);
92 } else {
93 sender_ = nullptr;
94 }
95
96 client_status_ = client_status;
97 if (client_status_) {
98 client_index_ = client_status_->FindClientIndex(
99 channel_->source_node()->string_view());
100 client_connection_ = client_status_->GetClientConnection(client_index_);
101 } else {
102 client_index_ = -1;
103 client_connection_ = nullptr;
104 }
105
106 if (send_event_loop_) {
107 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
108 if (fetcher_) {
109 std::string timer_name =
110 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
111 fetcher_->channel()->name()->string_view(), " ",
112 fetcher_->channel()->type()->string_view());
113 timer_->set_name(timer_name);
114 if (timestamp_timer_) {
115 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
116 }
117 }
118 } else {
119 timer_ = nullptr;
120 }
121 }
122
123 const Channel *channel() const { return channel_; }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800124
Austin Schuh4c570ea2020-11-19 23:13:24 -0800125 uint32_t time_to_live() {
Austin Schuh58646e22021-08-23 23:51:46 -0700126 return configuration::ConnectionToNode(channel_, send_node_factory_->node())
Austin Schuh4c570ea2020-11-19 23:13:24 -0800127 ->time_to_live();
128 }
129
Austin Schuh58646e22021-08-23 23:51:46 -0700130 void ScheduleReliable() {
131 if (forwarding_disabled()) return;
132
133 if (!fetcher_) {
134 return;
135 }
136 if (fetcher_->context().data == nullptr || sent_) {
137 sent_ = !fetcher_->Fetch();
138 }
139
140 FetchNext();
141 if (fetcher_->context().data == nullptr || sent_) {
142 return;
143 }
Austin Schuh58646e22021-08-23 23:51:46 -0700144
145 // Send at startup. It is the best we can do.
146 const monotonic_clock::time_point monotonic_delivered_time =
147 send_node_factory_->monotonic_now() +
148 send_node_factory_->network_delay();
149
150 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
151 << ": Trying to deliver message in the past on channel "
152 << configuration::StrippedChannelToString(fetcher_->channel())
153 << " to node " << send_event_loop_->node()->name()->string_view()
154 << " sent from " << fetcher_->channel()->source_node()->string_view()
155 << " at " << fetch_node_factory_->monotonic_now();
156
157 if (timer_) {
James Kuszmaula6681e22023-05-26 11:20:40 -0700158 server_status_->AddSentPacket(server_index_, channel_);
Philipp Schradera6712522023-07-05 20:25:11 -0700159 timer_->Schedule(monotonic_delivered_time);
Austin Schuh58646e22021-08-23 23:51:46 -0700160 timer_scheduled_ = true;
161 } else {
James Kuszmaula6681e22023-05-26 11:20:40 -0700162 server_status_->AddDroppedPacket(server_index_, channel_);
Austin Schuh58646e22021-08-23 23:51:46 -0700163 sent_ = true;
164 }
165 }
166
167 bool timer_scheduled_ = false;
168
Austin Schuh898f4972020-01-11 17:21:25 -0800169 // Kicks us to re-fetch and schedule the timer.
170 void Schedule() {
Austin Schuh58646e22021-08-23 23:51:46 -0700171 CHECK(!forwarding_disabled());
172 if (!fetcher_) {
173 return;
174 }
175 if (timer_scheduled_) {
176 return;
177 }
178 FetchNext();
179 if (fetcher_->context().data == nullptr || sent_) {
180 return;
181 }
182
183 // Compute the time to publish this message.
184 const monotonic_clock::time_point monotonic_delivered_time =
185 DeliveredTime(fetcher_->context());
186
187 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
188 << ": Trying to deliver message in the past on channel "
189 << configuration::StrippedChannelToString(fetcher_->channel())
190 << " to node " << send_event_loop_->node()->name()->string_view()
191 << " sent from " << fetcher_->channel()->source_node()->string_view()
192 << " at " << fetch_node_factory_->monotonic_now();
193
194 if (timer_) {
James Kuszmaula6681e22023-05-26 11:20:40 -0700195 server_status_->AddSentPacket(server_index_, channel_);
Philipp Schradera6712522023-07-05 20:25:11 -0700196 timer_->Schedule(monotonic_delivered_time);
Austin Schuh58646e22021-08-23 23:51:46 -0700197 timer_scheduled_ = true;
198 } else {
James Kuszmaula6681e22023-05-26 11:20:40 -0700199 server_status_->AddDroppedPacket(server_index_, channel_);
Austin Schuh58646e22021-08-23 23:51:46 -0700200 sent_ = true;
201 Schedule();
202 }
203 }
204
205 private:
206 void FetchNext() {
207 CHECK(server_connection_);
Austin Schuh6aa77be2020-02-22 21:06:40 -0800208 // Keep pulling messages out of the fetcher until we find one in the future.
209 while (true) {
210 if (fetcher_->context().data == nullptr || sent_) {
211 sent_ = !fetcher_->FetchNext();
212 }
213 if (sent_) {
214 break;
215 }
Austin Schuhc0b0f722020-12-12 18:36:06 -0800216
217 if (server_connection_->state() != State::CONNECTED) {
218 sent_ = true;
James Kuszmaula6681e22023-05-26 11:20:40 -0700219 server_status_->AddDroppedPacket(server_index_, channel_);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800220 continue;
221 }
222
Austin Schuh6aa77be2020-02-22 21:06:40 -0800223 if (fetcher_->context().monotonic_event_time +
Austin Schuh58646e22021-08-23 23:51:46 -0700224 send_node_factory_->network_delay() +
225 send_node_factory_->send_delay() >
226 fetch_node_factory_->monotonic_now() ||
227 time_to_live() == 0) {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800228 break;
229 }
230
231 // TODO(austin): Not cool. We want to actually forward these. This means
232 // we need a more sophisticated concept of what is running.
Austin Schuh60e77942022-05-16 17:48:24 -0700233 // TODO(james): This fails if multiple messages are sent on the same
234 // channel within the same callback.
Austin Schuh6aa77be2020-02-22 21:06:40 -0800235 LOG(WARNING) << "Not forwarding message on "
236 << configuration::CleanedChannelToString(fetcher_->channel())
237 << " because we aren't running. Set at "
238 << fetcher_->context().monotonic_event_time << " now is "
239 << fetch_node_factory_->monotonic_now();
240 sent_ = true;
James Kuszmaula6681e22023-05-26 11:20:40 -0700241 server_status_->AddDroppedPacket(server_index_, channel_);
Austin Schuh898f4972020-01-11 17:21:25 -0800242 }
Austin Schuh898f4972020-01-11 17:21:25 -0800243 }
244
Austin Schuh58646e22021-08-23 23:51:46 -0700245 // Actually sends the message, and reschedules.
Austin Schuh898f4972020-01-11 17:21:25 -0800246 void Send() {
Austin Schuh58646e22021-08-23 23:51:46 -0700247 timer_scheduled_ = false;
248 CHECK(sender_);
249 CHECK(client_status_);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800250 if (server_connection_->state() != State::CONNECTED) {
251 sent_ = true;
252 Schedule();
253 return;
254 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700255 // Fill out the send times.
milind1f1dca32021-07-03 13:50:07 -0700256 sender_->CheckOk(sender_->Send(
257 fetcher_->context().data, fetcher_->context().size,
258 fetcher_->context().monotonic_event_time,
259 fetcher_->context().realtime_event_time,
260 fetcher_->context().queue_index, fetcher_->context().source_boot_uuid));
Austin Schuh898f4972020-01-11 17:21:25 -0800261
Austin Schuh4c3b9702020-08-30 11:34:55 -0700262 // And simulate message_bridge's offset recovery.
Austin Schuh367a7f42021-11-23 23:04:36 -0800263 client_status_->SampleFilter(
264 client_index_, fetcher_->context().monotonic_event_time,
265 sender_->monotonic_sent_time(), fetcher_->context().source_boot_uuid);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700266
267 client_connection_->mutate_received_packets(
268 client_connection_->received_packets() + 1);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700269
270 if (timestamp_logger_) {
Austin Schuheeaa2022021-01-02 21:52:03 -0800271 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800272 fbb.ForceDefaults(true);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800273 // Reset the filter every time the UUID changes. There's probably a more
274 // clever way to do this, but that means a better concept of rebooting.
James Kuszmaulbedbb342023-05-26 11:19:27 -0700275 if (!server_status_->BootUUID(destination_node_index_).has_value() ||
276 (server_status_->BootUUID(destination_node_index_).value() !=
277 send_node_factory_->boot_uuid())) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800278 server_status_->ResetFilter(destination_node_index_);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800279 server_status_->SetBootUUID(destination_node_index_,
280 send_node_factory_->boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800281 }
282
Austin Schuhcdd90272021-03-15 12:46:16 -0700283 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
284 send_node_factory_->boot_uuid().PackVector(&fbb);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800285
Austin Schuheeaa2022021-01-02 21:52:03 -0800286 RemoteMessage::Builder message_header_builder(fbb);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700287
288 message_header_builder.add_channel_index(channel_index_);
289
290 // Swap the remote and sent metrics. They are from the sender's
291 // perspective, not the receiver's perspective.
292 message_header_builder.add_monotonic_remote_time(
293 fetcher_->context().monotonic_event_time.time_since_epoch().count());
294 message_header_builder.add_realtime_remote_time(
295 fetcher_->context().realtime_event_time.time_since_epoch().count());
296 message_header_builder.add_remote_queue_index(
297 fetcher_->context().queue_index);
298
299 message_header_builder.add_monotonic_sent_time(
300 sender_->monotonic_sent_time().time_since_epoch().count());
301 message_header_builder.add_realtime_sent_time(
302 sender_->realtime_sent_time().time_since_epoch().count());
303 message_header_builder.add_queue_index(sender_->sent_queue_index());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800304 message_header_builder.add_boot_uuid(boot_uuid_offset);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700305
Austin Schuheeaa2022021-01-02 21:52:03 -0800306 fbb.Finish(message_header_builder.Finish());
307
308 remote_timestamps_.emplace_back(
309 FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
310 fetch_node_factory_->monotonic_now() +
311 send_node_factory_->network_delay());
312 ScheduleTimestamp();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700313 }
314
Austin Schuh898f4972020-01-11 17:21:25 -0800315 sent_ = true;
316 Schedule();
317 }
318
Austin Schuheeaa2022021-01-02 21:52:03 -0800319 // Schedules sending the next timestamp in remote_timestamps_ if there is one.
320 void ScheduleTimestamp() {
321 if (remote_timestamps_.empty()) {
322 timestamp_timer_->Disable();
323 return;
324 }
325
326 if (scheduled_time_ !=
327 remote_timestamps_.front().monotonic_timestamp_time) {
Philipp Schradera6712522023-07-05 20:25:11 -0700328 timestamp_timer_->Schedule(
Austin Schuheeaa2022021-01-02 21:52:03 -0800329 remote_timestamps_.front().monotonic_timestamp_time);
330 scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
331 return;
332 } else {
333 scheduled_time_ = monotonic_clock::min_time;
334 }
335 }
336
337 // Sends the next timestamp in remote_timestamps_.
338 void SendTimestamp() {
339 CHECK(!remote_timestamps_.empty());
340
341 // Send out all timestamps at the currently scheduled time.
342 while (remote_timestamps_.front().monotonic_timestamp_time ==
343 scheduled_time_) {
344 if (server_connection_->state() == State::CONNECTED) {
milind1f1dca32021-07-03 13:50:07 -0700345 timestamp_logger_->CheckOk(timestamp_logger_->Send(
346 std::move(remote_timestamps_.front().remote_message)));
Austin Schuheeaa2022021-01-02 21:52:03 -0800347 }
348 remote_timestamps_.pop_front();
349 if (remote_timestamps_.empty()) {
350 break;
351 }
352 }
353
354 ScheduleTimestamp();
355 }
356
Austin Schuh898f4972020-01-11 17:21:25 -0800357 // Converts from time on the sending node to time on the receiving node.
358 monotonic_clock::time_point DeliveredTime(const Context &context) const {
359 const distributed_clock::time_point distributed_sent_time =
360 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
361
Austin Schuh58646e22021-08-23 23:51:46 -0700362 const logger::BootTimestamp t = send_node_factory_->FromDistributedClock(
Austin Schuh2febf0d2020-09-21 22:24:30 -0700363 distributed_sent_time + send_node_factory_->network_delay() +
364 send_node_factory_->send_delay());
Austin Schuh58646e22021-08-23 23:51:46 -0700365 CHECK_EQ(t.boot, send_node_factory_->boot_count());
366 return t.time;
Austin Schuh898f4972020-01-11 17:21:25 -0800367 }
368
Austin Schuh58646e22021-08-23 23:51:46 -0700369 const Channel *channel_;
370 const Connection *connection_;
371
Austin Schuh898f4972020-01-11 17:21:25 -0800372 // Factories used for time conversion.
373 aos::NodeEventLoopFactory *fetch_node_factory_;
374 aos::NodeEventLoopFactory *send_node_factory_;
375
Austin Schuheeaa2022021-01-02 21:52:03 -0800376 // Event loop which fetching and sending timestamps are scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700377 aos::EventLoop *fetch_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800378 // Event loop which sending is scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700379 aos::EventLoop *send_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800380 // Timer used to send.
Austin Schuh58646e22021-08-23 23:51:46 -0700381 aos::TimerHandler *timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800382 // Timer used to send timestamps out.
Austin Schuh58646e22021-08-23 23:51:46 -0700383 aos::TimerHandler *timestamp_timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800384 // Time that the timer is scheduled for. Used to track if it needs to be
385 // rescheduled.
386 monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
387
Austin Schuh898f4972020-01-11 17:21:25 -0800388 // Fetcher used to receive messages.
389 std::unique_ptr<aos::RawFetcher> fetcher_;
390 // Sender to send them back out.
391 std::unique_ptr<aos::RawSender> sender_;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800392
Austin Schuh58646e22021-08-23 23:51:46 -0700393 MessageBridgeServerStatus *server_status_ = nullptr;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800394 const size_t destination_node_index_;
Austin Schuh898f4972020-01-11 17:21:25 -0800395 // True if we have sent the message in the fetcher.
396 bool sent_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700397
398 ServerConnection *server_connection_ = nullptr;
James Kuszmaula6681e22023-05-26 11:20:40 -0700399 int server_index_ = -1;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700400 MessageBridgeClientStatus *client_status_ = nullptr;
Austin Schuh58646e22021-08-23 23:51:46 -0700401 int client_index_ = -1;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700402 ClientConnection *client_connection_ = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700403
404 size_t channel_index_;
Austin Schuh0de30f32020-12-06 12:44:28 -0800405 aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800406
407 struct Timestamp {
408 Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
409 monotonic_clock::time_point new_monotonic_timestamp_time)
410 : remote_message(std::move(new_remote_message)),
411 monotonic_timestamp_time(new_monotonic_timestamp_time) {}
412 FlatbufferDetachedBuffer<RemoteMessage> remote_message;
413 monotonic_clock::time_point monotonic_timestamp_time;
414 };
415
416 std::deque<Timestamp> remote_timestamps_;
Austin Schuh58646e22021-08-23 23:51:46 -0700417
418 bool delivery_time_is_logged_;
419
420 bool forwarding_disabled_ = false;
Austin Schuh898f4972020-01-11 17:21:25 -0800421};
422
423SimulatedMessageBridge::SimulatedMessageBridge(
424 SimulatedEventLoopFactory *simulated_event_loop_factory) {
425 CHECK(
426 configuration::MultiNode(simulated_event_loop_factory->configuration()));
427
428 // Pre-build up event loops for every node. They are pretty cheap anyways.
429 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700430 NodeEventLoopFactory *node_factory =
431 simulated_event_loop_factory->GetNodeEventLoopFactory(node);
432 auto it = event_loop_map_.emplace(node, node_factory);
Austin Schuhcde938c2020-02-02 17:30:07 -0800433 CHECK(it.second);
434
Austin Schuh58646e22021-08-23 23:51:46 -0700435 node_factory->OnStartup(
436 [this, simulated_event_loop_factory, node_state = &it.first->second]() {
437 node_state->MakeEventLoop();
438 const size_t my_node_index = configuration::GetNodeIndex(
439 simulated_event_loop_factory->configuration(),
440 node_state->event_loop->node());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700441
Austin Schuh58646e22021-08-23 23:51:46 -0700442 size_t node_index = 0;
James Kuszmaulbedbb342023-05-26 11:19:27 -0700443 for (const std::optional<MessageBridgeServerStatus::NodeState>
444 &connection : node_state->server_status->nodes()) {
445 if (connection.has_value()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700446 node_state->server_status->ResetFilter(node_index);
447 }
448 ++node_index;
449 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700450
Austin Schuh58646e22021-08-23 23:51:46 -0700451 for (const ClientConnection *client_connections :
452 *node_state->client_status->mutable_client_statistics()
453 ->connections()) {
454 const Node *client_node = configuration::GetNode(
455 simulated_event_loop_factory->configuration(),
456 client_connections->node()->name()->string_view());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700457
Austin Schuh58646e22021-08-23 23:51:46 -0700458 auto client_event_loop = event_loop_map_.find(client_node);
459 client_event_loop->second.SetBootUUID(
460 my_node_index, node_state->event_loop->boot_uuid());
461 }
462 });
Austin Schuh4c3b9702020-08-30 11:34:55 -0700463
Austin Schuh58646e22021-08-23 23:51:46 -0700464 node_factory->OnShutdown([node_state = &it.first->second]() {
465 node_state->SetEventLoop(nullptr);
466 });
Austin Schuh20ac95d2020-12-05 17:24:19 -0800467 }
468
Austin Schuh898f4972020-01-11 17:21:25 -0800469 for (const Channel *channel :
470 *simulated_event_loop_factory->configuration()->channels()) {
471 if (!channel->has_destination_nodes()) {
472 continue;
473 }
474
475 // Find the sending node.
Austin Schuh58646e22021-08-23 23:51:46 -0700476 const Node *source_node =
Austin Schuh898f4972020-01-11 17:21:25 -0800477 configuration::GetNode(simulated_event_loop_factory->configuration(),
478 channel->source_node()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -0700479 auto source_event_loop = event_loop_map_.find(source_node);
Austin Schuh898f4972020-01-11 17:21:25 -0800480 CHECK(source_event_loop != event_loop_map_.end());
481
482 std::unique_ptr<DelayersVector> delayers =
483 std::make_unique<DelayersVector>();
484
485 // And then build up a RawMessageDelayer for each destination.
486 for (const Connection *connection : *channel->destination_nodes()) {
487 const Node *destination_node =
488 configuration::GetNode(simulated_event_loop_factory->configuration(),
489 connection->name()->string_view());
490 auto destination_event_loop = event_loop_map_.find(destination_node);
491 CHECK(destination_event_loop != event_loop_map_.end());
492
Austin Schuh2f8fd752020-09-01 22:38:28 -0700493 const size_t destination_node_index = configuration::GetNodeIndex(
494 simulated_event_loop_factory->configuration(), destination_node);
495
496 const bool delivery_time_is_logged =
Austin Schuh58646e22021-08-23 23:51:46 -0700497 configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
498 source_node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700499
Austin Schuh58646e22021-08-23 23:51:46 -0700500 delayers->v.emplace_back(std::make_unique<RawMessageDelayer>(
501 channel, connection,
502 simulated_event_loop_factory->GetNodeEventLoopFactory(source_node),
Austin Schuh898f4972020-01-11 17:21:25 -0800503 simulated_event_loop_factory->GetNodeEventLoopFactory(
504 destination_node),
Austin Schuh58646e22021-08-23 23:51:46 -0700505 destination_node_index, delivery_time_is_logged));
506
507 source_event_loop->second.AddSourceDelayer(delayers->v.back().get());
508 destination_event_loop->second.AddDestinationDelayer(
509 delayers->v.back().get());
Austin Schuh898f4972020-01-11 17:21:25 -0800510 }
511
Austin Schuh4c3b9702020-08-30 11:34:55 -0700512 const Channel *const timestamp_channel = configuration::GetChannel(
513 simulated_event_loop_factory->configuration(), "/aos",
Austin Schuh58646e22021-08-23 23:51:46 -0700514 Timestamp::GetFullyQualifiedName(), "message_bridge", source_node);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700515
516 if (channel == timestamp_channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700517 source_event_loop->second.SetSendData(
James Kuszmaul79b2f032023-06-02 21:02:27 -0700518 [captured_delayers = delayers.get()]() {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700519 for (std::unique_ptr<RawMessageDelayer> &delayer :
Austin Schuh58646e22021-08-23 23:51:46 -0700520 captured_delayers->v) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700521 delayer->Schedule();
522 }
523 });
524 } else {
Austin Schuh58646e22021-08-23 23:51:46 -0700525 source_event_loop->second.AddDelayerWatcher(channel, delayers.get());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700526 }
Austin Schuh898f4972020-01-11 17:21:25 -0800527 delayers_list_.emplace_back(std::move(delayers));
528 }
529}
530
531SimulatedMessageBridge::~SimulatedMessageBridge() {}
532
Austin Schuh6f3babe2020-01-26 20:34:50 -0800533void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700534 for (std::unique_ptr<DelayersVector> &delayers : delayers_list_) {
535 if (delayers->v.size() > 0) {
536 if (delayers->v[0]->channel() == channel) {
537 delayers->disable_forwarding = true;
538 for (std::unique_ptr<RawMessageDelayer> &delayer : delayers->v) {
539 delayer->set_forwarding_disabled(true);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800540 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800541 }
542 }
543 }
544}
545
Austin Schuhc0b0f722020-12-12 18:36:06 -0800546void SimulatedMessageBridge::Disconnect(const Node *source,
547 const Node *destination) {
548 SetState(source, destination, message_bridge::State::DISCONNECTED);
549}
550
551void SimulatedMessageBridge::Connect(const Node *source,
552 const Node *destination) {
553 SetState(source, destination, message_bridge::State::CONNECTED);
554}
555void SimulatedMessageBridge::SetState(const Node *source,
556 const Node *destination,
557 message_bridge::State state) {
558 auto source_state = event_loop_map_.find(source);
559 CHECK(source_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700560 source_state->second.SetServerState(destination, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800561
562 auto destination_state = event_loop_map_.find(destination);
563 CHECK(destination_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700564 destination_state->second.SetClientState(source, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800565}
566
James Kuszmaul94ca5132022-07-19 09:11:08 -0700567void SimulatedMessageBridge::DisableStatistics(DestroySenders destroy_senders) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700568 for (std::pair<const Node *const, State> &state : event_loop_map_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700569 state.second.DisableStatistics(destroy_senders);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700570 }
571}
572
James Kuszmaul94ca5132022-07-19 09:11:08 -0700573void SimulatedMessageBridge::DisableStatistics(const Node *node,
574 DestroySenders destroy_senders) {
Austin Schuh48205e62021-11-12 14:13:18 -0800575 auto it = event_loop_map_.find(node);
576 CHECK(it != event_loop_map_.end());
James Kuszmaul94ca5132022-07-19 09:11:08 -0700577 it->second.DisableStatistics(destroy_senders);
Austin Schuh48205e62021-11-12 14:13:18 -0800578}
579
580void SimulatedMessageBridge::EnableStatistics() {
581 for (std::pair<const Node *const, State> &state : event_loop_map_) {
582 state.second.EnableStatistics();
583 }
584}
585
586void SimulatedMessageBridge::EnableStatistics(const Node *node) {
587 auto it = event_loop_map_.find(node);
588 CHECK(it != event_loop_map_.end());
589 it->second.EnableStatistics();
590}
591
Austin Schuh58646e22021-08-23 23:51:46 -0700592void SimulatedMessageBridge::State::SetEventLoop(
593 std::unique_ptr<aos::EventLoop> loop) {
594 if (!loop) {
595 timestamp_loggers = ChannelTimestampSender(nullptr);
596 server_status.reset();
597 client_status.reset();
598 for (RawMessageDelayer *source_delayer : source_delayers_) {
599 source_delayer->SetFetchEventLoop(nullptr, nullptr, nullptr);
600 }
601 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
602 destination_delayer->SetSendEventLoop(nullptr, nullptr);
603 }
604 event_loop = std::move(loop);
605 return;
606 } else {
607 CHECK(!event_loop);
608 }
609 event_loop = std::move(loop);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700610
Austin Schuh58646e22021-08-23 23:51:46 -0700611 event_loop->SkipTimingReport();
612 event_loop->SkipAosLog();
613
614 for (std::pair<const Channel *, DelayersVector *> &watcher :
615 delayer_watchers_) {
616 // Don't register watchers if we know we aren't forwarding.
617 if (watcher.second->disable_forwarding) continue;
618 event_loop->MakeRawNoArgWatcher(
619 watcher.first, [captured_delayers = watcher.second](const Context &) {
620 // We might get told after registering, so don't forward at that point
621 // too.
622 for (std::unique_ptr<RawMessageDelayer> &delayer :
623 captured_delayers->v) {
624 delayer->Schedule();
625 }
626 });
627 }
628
629 timestamp_loggers = ChannelTimestampSender(event_loop.get());
630 server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
Austin Schuh48205e62021-11-12 14:13:18 -0800631 if (disable_statistics_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700632 server_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
Austin Schuh48205e62021-11-12 14:13:18 -0800633 }
Austin Schuh58646e22021-08-23 23:51:46 -0700634
635 {
636 size_t node_index = 0;
James Kuszmaulbedbb342023-05-26 11:19:27 -0700637 for (const std::optional<MessageBridgeServerStatus::NodeState> &connection :
638 server_status->nodes()) {
639 if (connection.has_value()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700640 if (boot_uuids_[node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800641 switch (server_state_[node_index]) {
642 case message_bridge::State::DISCONNECTED:
643 server_status->Disconnect(node_index);
644 break;
645 case message_bridge::State::CONNECTED:
646 server_status->Connect(node_index, event_loop->monotonic_now());
647 break;
648 }
Austin Schuh58646e22021-08-23 23:51:46 -0700649 } else {
Austin Schuh367a7f42021-11-23 23:04:36 -0800650 server_status->Disconnect(node_index);
Austin Schuh58646e22021-08-23 23:51:46 -0700651 }
652 }
653 ++node_index;
654 }
655 }
656
657 for (size_t i = 0; i < boot_uuids_.size(); ++i) {
658 if (boot_uuids_[i] != UUID::Zero()) {
659 server_status->SetBootUUID(i, boot_uuids_[i]);
660 }
661 }
Austin Schuh58646e22021-08-23 23:51:46 -0700662 if (fn_) {
663 server_status->set_send_data(fn_);
664 }
665 client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
666 if (disable_statistics_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700667 client_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
Austin Schuh58646e22021-08-23 23:51:46 -0700668 }
669
670 for (size_t i = 0;
671 i < client_status->mutable_client_statistics()->connections()->size();
672 ++i) {
673 ClientConnection *client_connection =
674 client_status->mutable_client_statistics()
675 ->mutable_connections()
676 ->GetMutableObject(i);
677 const Node *client_node = configuration::GetNode(
678 node_factory_->configuration(),
679 client_connection->node()->name()->string_view());
680 const size_t client_node_index = configuration::GetNodeIndex(
681 node_factory_->configuration(), client_node);
682 if (boot_uuids_[client_node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800683 if (client_connection->state() != client_state_[client_node_index]) {
684 switch (client_state_[client_node_index]) {
685 case message_bridge::State::DISCONNECTED:
686 client_status->Disconnect(i);
687 break;
688 case message_bridge::State::CONNECTED:
689 client_status->Connect(i);
690 break;
691 }
692 }
Austin Schuh58646e22021-08-23 23:51:46 -0700693 } else {
Austin Schuh367a7f42021-11-23 23:04:36 -0800694 client_status->Disconnect(i);
Austin Schuh58646e22021-08-23 23:51:46 -0700695 }
696 }
697
Austin Schuh2f8fd752020-09-01 22:38:28 -0700698 for (const Channel *channel : *event_loop->configuration()->channels()) {
699 CHECK(channel->has_source_node());
700
701 // Sent by us.
702 if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
703 channel->has_destination_nodes()) {
704 for (const Connection *connection : *channel->destination_nodes()) {
705 const bool delivery_time_is_logged =
706 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
707 connection, event_loop->node());
708
James Kuszmaul94ca5132022-07-19 09:11:08 -0700709 const RawMessageDelayer *delayer = nullptr;
710 for (const RawMessageDelayer *candidate : source_delayers_) {
711 if (candidate->channel() == channel) {
712 delayer = candidate;
713 }
714 }
715
Austin Schuh2f8fd752020-09-01 22:38:28 -0700716 // And the timestamps are then logged back by us again.
James Kuszmaul94ca5132022-07-19 09:11:08 -0700717 if (!delivery_time_is_logged ||
718 CHECK_NOTNULL(delayer)->forwarding_disabled()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700719 continue;
720 }
721
Austin Schuh89c9b812021-02-20 14:42:10 -0800722 timestamp_loggers.SenderForChannel(channel, connection);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700723 }
724 }
725 }
Austin Schuh58646e22021-08-23 23:51:46 -0700726
727 for (RawMessageDelayer *source_delayer : source_delayers_) {
728 source_delayer->SetFetchEventLoop(event_loop.get(), server_status.get(),
729 &timestamp_loggers);
730 }
731 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
732 destination_delayer->SetSendEventLoop(event_loop.get(),
733 client_status.get());
734 }
735 event_loop->OnRun([this]() {
736 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
737 if (destination_delayer->time_to_live() == 0) {
738 destination_delayer->ScheduleReliable();
739 }
740 }
James Kuszmaul86e86c32022-07-21 17:39:47 -0700741 // Note: This exists to work around the fact that some users like to be able
742 // to send reliable messages while execution is stopped, creating a
743 // situation where the following sequencing can occur:
744 // 1) <While stopped> Send a reliable message on Node A (to be forwarded to
745 // Node B).
746 // 2) Node B starts up.
747 // 3) Anywhere from 0 to N seconds later, Node A starts up.
748 //
749 // In this case, we need the reliable message to make it to Node B, but it
750 // also shouldn't make it to Node B until Node A has started up.
751 //
752 // Ideally, if the user were to wait for the Node B OnRun callbacks to send
753 // the message, then that would trigger the watchers in the delayers.
754 // However, we so far have continued to support Sending while stopped....
755 for (RawMessageDelayer *source_delayer : source_delayers_) {
756 if (source_delayer->time_to_live() == 0) {
757 source_delayer->ScheduleReliable();
758 }
759 }
Austin Schuh58646e22021-08-23 23:51:46 -0700760 });
Austin Schuh2f8fd752020-09-01 22:38:28 -0700761}
762
Austin Schuh898f4972020-01-11 17:21:25 -0800763} // namespace message_bridge
764} // namespace aos