blob: 5c97292527d75958fd69b12311edae53b7b8d358 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
Austin Schuh0de30f32020-12-06 12:44:28 -08003#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07004
Austin Schuh0de30f32020-12-06 12:44:28 -08005#include "aos/configuration.h"
Austin Schuh898f4972020-01-11 17:21:25 -08006#include "aos/events/event_loop.h"
7#include "aos/events/simulated_event_loop.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08008#include "aos/network/remote_message_generated.h"
Austin Schuh898f4972020-01-11 17:21:25 -08009
Stephan Pleinesf63bde82024-01-13 15:59:33 -080010namespace aos::message_bridge {
Austin Schuh898f4972020-01-11 17:21:25 -080011
12// This class delays messages forwarded between two factories.
13//
14// The basic design is that we need to use the distributed_clock to convert
15// monotonic times from the source to the destination node. We also use a
16// fetcher to manage the queue of data, and a timer to schedule the sends.
17class RawMessageDelayer {
18 public:
Austin Schuh58646e22021-08-23 23:51:46 -070019 RawMessageDelayer(const Channel *channel, const Connection *connection,
20 aos::NodeEventLoopFactory *fetch_node_factory,
Austin Schuh898f4972020-01-11 17:21:25 -080021 aos::NodeEventLoopFactory *send_node_factory,
Austin Schuh58646e22021-08-23 23:51:46 -070022 size_t destination_node_index, bool delivery_time_is_logged)
23 : channel_(channel),
24 connection_(connection),
25 fetch_node_factory_(fetch_node_factory),
Austin Schuh898f4972020-01-11 17:21:25 -080026 send_node_factory_(send_node_factory),
Austin Schuh20ac95d2020-12-05 17:24:19 -080027 destination_node_index_(destination_node_index),
Austin Schuh58646e22021-08-23 23:51:46 -070028 channel_index_(configuration::ChannelIndex(
29 fetch_node_factory_->configuration(), channel_)),
30 delivery_time_is_logged_(delivery_time_is_logged) {}
Austin Schuh898f4972020-01-11 17:21:25 -080031
Austin Schuh58646e22021-08-23 23:51:46 -070032 bool forwarding_disabled() const { return forwarding_disabled_; }
33 void set_forwarding_disabled(bool forwarding_disabled) {
34 forwarding_disabled_ = forwarding_disabled;
James Kuszmaul94ca5132022-07-19 09:11:08 -070035 if (!forwarding_disabled_) {
36 CHECK(timestamp_logger_ == nullptr);
37 CHECK(sender_ == nullptr);
38 }
Austin Schuh898f4972020-01-11 17:21:25 -080039 }
40
Austin Schuh58646e22021-08-23 23:51:46 -070041 void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
42 MessageBridgeServerStatus *server_status,
43 ChannelTimestampSender *timestamp_loggers) {
44 sent_ = false;
45 fetch_event_loop_ = fetch_event_loop;
46 if (fetch_event_loop_) {
47 fetcher_ = fetch_event_loop_->MakeRawFetcher(channel_);
48 } else {
49 fetcher_ = nullptr;
50 }
51
52 server_status_ = server_status;
53 if (server_status) {
54 server_connection_ =
55 server_status_->FindServerConnection(send_node_factory_->node());
James Kuszmaula6681e22023-05-26 11:20:40 -070056 server_index_ = configuration::GetNodeIndex(
57 send_node_factory_->configuration(), send_node_factory_->node());
Austin Schuh58646e22021-08-23 23:51:46 -070058 }
James Kuszmaul94ca5132022-07-19 09:11:08 -070059 if (delivery_time_is_logged_ && timestamp_loggers != nullptr &&
60 !forwarding_disabled_) {
Austin Schuh58646e22021-08-23 23:51:46 -070061 timestamp_logger_ =
62 timestamp_loggers->SenderForChannel(channel_, connection_);
63 } else {
64 timestamp_logger_ = nullptr;
65 }
66
67 if (fetch_event_loop_) {
68 timestamp_timer_ =
69 fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
70 if (send_event_loop_) {
milind1f1dca32021-07-03 13:50:07 -070071 std::string timer_name =
72 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
73 fetcher_->channel()->name()->string_view(), " ",
74 fetcher_->channel()->type()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -070075 if (timer_) {
76 timer_->set_name(timer_name);
77 }
78 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
79 }
80 } else {
81 timestamp_timer_ = nullptr;
82 }
83 }
84
85 void SetSendEventLoop(aos::EventLoop *send_event_loop,
86 MessageBridgeClientStatus *client_status) {
87 sent_ = false;
88 send_event_loop_ = send_event_loop;
James Kuszmaul94ca5132022-07-19 09:11:08 -070089 if (send_event_loop_ && !forwarding_disabled_) {
Austin Schuh58646e22021-08-23 23:51:46 -070090 sender_ = send_event_loop_->MakeRawSender(channel_);
91 } else {
92 sender_ = nullptr;
93 }
94
95 client_status_ = client_status;
96 if (client_status_) {
97 client_index_ = client_status_->FindClientIndex(
98 channel_->source_node()->string_view());
99 client_connection_ = client_status_->GetClientConnection(client_index_);
100 } else {
101 client_index_ = -1;
102 client_connection_ = nullptr;
103 }
104
105 if (send_event_loop_) {
106 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
107 if (fetcher_) {
108 std::string timer_name =
109 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
110 fetcher_->channel()->name()->string_view(), " ",
111 fetcher_->channel()->type()->string_view());
112 timer_->set_name(timer_name);
113 if (timestamp_timer_) {
114 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
115 }
116 }
117 } else {
118 timer_ = nullptr;
119 }
120 }
121
122 const Channel *channel() const { return channel_; }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800123
Austin Schuh4c570ea2020-11-19 23:13:24 -0800124 uint32_t time_to_live() {
Austin Schuh58646e22021-08-23 23:51:46 -0700125 return configuration::ConnectionToNode(channel_, send_node_factory_->node())
Austin Schuh4c570ea2020-11-19 23:13:24 -0800126 ->time_to_live();
127 }
128
Austin Schuh58646e22021-08-23 23:51:46 -0700129 void ScheduleReliable() {
130 if (forwarding_disabled()) return;
131
132 if (!fetcher_) {
133 return;
134 }
135 if (fetcher_->context().data == nullptr || sent_) {
136 sent_ = !fetcher_->Fetch();
137 }
138
139 FetchNext();
140 if (fetcher_->context().data == nullptr || sent_) {
141 return;
142 }
Austin Schuh58646e22021-08-23 23:51:46 -0700143
144 // Send at startup. It is the best we can do.
145 const monotonic_clock::time_point monotonic_delivered_time =
146 send_node_factory_->monotonic_now() +
147 send_node_factory_->network_delay();
148
149 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
150 << ": Trying to deliver message in the past on channel "
151 << configuration::StrippedChannelToString(fetcher_->channel())
152 << " to node " << send_event_loop_->node()->name()->string_view()
153 << " sent from " << fetcher_->channel()->source_node()->string_view()
154 << " at " << fetch_node_factory_->monotonic_now();
155
156 if (timer_) {
James Kuszmaula6681e22023-05-26 11:20:40 -0700157 server_status_->AddSentPacket(server_index_, channel_);
Philipp Schradera6712522023-07-05 20:25:11 -0700158 timer_->Schedule(monotonic_delivered_time);
Austin Schuh58646e22021-08-23 23:51:46 -0700159 timer_scheduled_ = true;
160 } else {
James Kuszmaula6681e22023-05-26 11:20:40 -0700161 server_status_->AddDroppedPacket(server_index_, channel_);
Austin Schuh58646e22021-08-23 23:51:46 -0700162 sent_ = true;
163 }
164 }
165
166 bool timer_scheduled_ = false;
167
Austin Schuh898f4972020-01-11 17:21:25 -0800168 // Kicks us to re-fetch and schedule the timer.
169 void Schedule() {
Austin Schuh58646e22021-08-23 23:51:46 -0700170 CHECK(!forwarding_disabled());
171 if (!fetcher_) {
172 return;
173 }
174 if (timer_scheduled_) {
175 return;
176 }
177 FetchNext();
178 if (fetcher_->context().data == nullptr || sent_) {
179 return;
180 }
181
182 // Compute the time to publish this message.
183 const monotonic_clock::time_point monotonic_delivered_time =
184 DeliveredTime(fetcher_->context());
185
186 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
187 << ": Trying to deliver message in the past on channel "
188 << configuration::StrippedChannelToString(fetcher_->channel())
189 << " to node " << send_event_loop_->node()->name()->string_view()
190 << " sent from " << fetcher_->channel()->source_node()->string_view()
191 << " at " << fetch_node_factory_->monotonic_now();
192
193 if (timer_) {
James Kuszmaula6681e22023-05-26 11:20:40 -0700194 server_status_->AddSentPacket(server_index_, channel_);
Philipp Schradera6712522023-07-05 20:25:11 -0700195 timer_->Schedule(monotonic_delivered_time);
Austin Schuh58646e22021-08-23 23:51:46 -0700196 timer_scheduled_ = true;
197 } else {
James Kuszmaula6681e22023-05-26 11:20:40 -0700198 server_status_->AddDroppedPacket(server_index_, channel_);
Austin Schuh58646e22021-08-23 23:51:46 -0700199 sent_ = true;
200 Schedule();
201 }
202 }
203
204 private:
205 void FetchNext() {
206 CHECK(server_connection_);
Austin Schuh6aa77be2020-02-22 21:06:40 -0800207 // Keep pulling messages out of the fetcher until we find one in the future.
208 while (true) {
209 if (fetcher_->context().data == nullptr || sent_) {
210 sent_ = !fetcher_->FetchNext();
211 }
212 if (sent_) {
213 break;
214 }
Austin Schuhc0b0f722020-12-12 18:36:06 -0800215
216 if (server_connection_->state() != State::CONNECTED) {
217 sent_ = true;
James Kuszmaula6681e22023-05-26 11:20:40 -0700218 server_status_->AddDroppedPacket(server_index_, channel_);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800219 continue;
220 }
221
Austin Schuh6aa77be2020-02-22 21:06:40 -0800222 if (fetcher_->context().monotonic_event_time +
Austin Schuh58646e22021-08-23 23:51:46 -0700223 send_node_factory_->network_delay() +
224 send_node_factory_->send_delay() >
225 fetch_node_factory_->monotonic_now() ||
226 time_to_live() == 0) {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800227 break;
228 }
229
230 // TODO(austin): Not cool. We want to actually forward these. This means
231 // we need a more sophisticated concept of what is running.
Austin Schuh60e77942022-05-16 17:48:24 -0700232 // TODO(james): This fails if multiple messages are sent on the same
233 // channel within the same callback.
Austin Schuh6aa77be2020-02-22 21:06:40 -0800234 LOG(WARNING) << "Not forwarding message on "
235 << configuration::CleanedChannelToString(fetcher_->channel())
Philipp Schrader00f117c2023-09-21 14:00:37 -0700236 << " because we aren't running. Sent at "
Austin Schuh6aa77be2020-02-22 21:06:40 -0800237 << fetcher_->context().monotonic_event_time << " now is "
238 << fetch_node_factory_->monotonic_now();
239 sent_ = true;
James Kuszmaula6681e22023-05-26 11:20:40 -0700240 server_status_->AddDroppedPacket(server_index_, channel_);
Austin Schuh898f4972020-01-11 17:21:25 -0800241 }
Austin Schuh898f4972020-01-11 17:21:25 -0800242 }
243
Austin Schuh58646e22021-08-23 23:51:46 -0700244 // Actually sends the message, and reschedules.
Austin Schuh898f4972020-01-11 17:21:25 -0800245 void Send() {
Austin Schuh58646e22021-08-23 23:51:46 -0700246 timer_scheduled_ = false;
247 CHECK(sender_);
248 CHECK(client_status_);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800249 if (server_connection_->state() != State::CONNECTED) {
250 sent_ = true;
251 Schedule();
252 return;
253 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700254 // Fill out the send times.
milind1f1dca32021-07-03 13:50:07 -0700255 sender_->CheckOk(sender_->Send(
256 fetcher_->context().data, fetcher_->context().size,
257 fetcher_->context().monotonic_event_time,
258 fetcher_->context().realtime_event_time,
259 fetcher_->context().queue_index, fetcher_->context().source_boot_uuid));
Austin Schuh898f4972020-01-11 17:21:25 -0800260
Austin Schuh4c3b9702020-08-30 11:34:55 -0700261 // And simulate message_bridge's offset recovery.
Austin Schuh367a7f42021-11-23 23:04:36 -0800262 client_status_->SampleFilter(
263 client_index_, fetcher_->context().monotonic_event_time,
264 sender_->monotonic_sent_time(), fetcher_->context().source_boot_uuid);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700265
266 client_connection_->mutate_received_packets(
267 client_connection_->received_packets() + 1);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700268
269 if (timestamp_logger_) {
Austin Schuheeaa2022021-01-02 21:52:03 -0800270 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800271 fbb.ForceDefaults(true);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800272 // Reset the filter every time the UUID changes. There's probably a more
273 // clever way to do this, but that means a better concept of rebooting.
James Kuszmaulbedbb342023-05-26 11:19:27 -0700274 if (!server_status_->BootUUID(destination_node_index_).has_value() ||
275 (server_status_->BootUUID(destination_node_index_).value() !=
276 send_node_factory_->boot_uuid())) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800277 server_status_->ResetFilter(destination_node_index_);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800278 server_status_->SetBootUUID(destination_node_index_,
279 send_node_factory_->boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800280 }
281
Austin Schuhcdd90272021-03-15 12:46:16 -0700282 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
283 send_node_factory_->boot_uuid().PackVector(&fbb);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800284
Austin Schuheeaa2022021-01-02 21:52:03 -0800285 RemoteMessage::Builder message_header_builder(fbb);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700286
287 message_header_builder.add_channel_index(channel_index_);
288
289 // Swap the remote and sent metrics. They are from the sender's
290 // perspective, not the receiver's perspective.
291 message_header_builder.add_monotonic_remote_time(
292 fetcher_->context().monotonic_event_time.time_since_epoch().count());
293 message_header_builder.add_realtime_remote_time(
294 fetcher_->context().realtime_event_time.time_since_epoch().count());
295 message_header_builder.add_remote_queue_index(
296 fetcher_->context().queue_index);
297
298 message_header_builder.add_monotonic_sent_time(
299 sender_->monotonic_sent_time().time_since_epoch().count());
300 message_header_builder.add_realtime_sent_time(
301 sender_->realtime_sent_time().time_since_epoch().count());
302 message_header_builder.add_queue_index(sender_->sent_queue_index());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800303 message_header_builder.add_boot_uuid(boot_uuid_offset);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700304
Austin Schuheeaa2022021-01-02 21:52:03 -0800305 fbb.Finish(message_header_builder.Finish());
306
307 remote_timestamps_.emplace_back(
308 FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
309 fetch_node_factory_->monotonic_now() +
310 send_node_factory_->network_delay());
311 ScheduleTimestamp();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700312 }
313
Austin Schuh898f4972020-01-11 17:21:25 -0800314 sent_ = true;
315 Schedule();
316 }
317
Austin Schuheeaa2022021-01-02 21:52:03 -0800318 // Schedules sending the next timestamp in remote_timestamps_ if there is one.
319 void ScheduleTimestamp() {
320 if (remote_timestamps_.empty()) {
321 timestamp_timer_->Disable();
322 return;
323 }
324
325 if (scheduled_time_ !=
326 remote_timestamps_.front().monotonic_timestamp_time) {
Philipp Schradera6712522023-07-05 20:25:11 -0700327 timestamp_timer_->Schedule(
Austin Schuheeaa2022021-01-02 21:52:03 -0800328 remote_timestamps_.front().monotonic_timestamp_time);
329 scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
330 return;
331 } else {
332 scheduled_time_ = monotonic_clock::min_time;
333 }
334 }
335
336 // Sends the next timestamp in remote_timestamps_.
337 void SendTimestamp() {
338 CHECK(!remote_timestamps_.empty());
339
340 // Send out all timestamps at the currently scheduled time.
341 while (remote_timestamps_.front().monotonic_timestamp_time ==
342 scheduled_time_) {
343 if (server_connection_->state() == State::CONNECTED) {
milind1f1dca32021-07-03 13:50:07 -0700344 timestamp_logger_->CheckOk(timestamp_logger_->Send(
345 std::move(remote_timestamps_.front().remote_message)));
Austin Schuheeaa2022021-01-02 21:52:03 -0800346 }
347 remote_timestamps_.pop_front();
348 if (remote_timestamps_.empty()) {
349 break;
350 }
351 }
352
353 ScheduleTimestamp();
354 }
355
Austin Schuh898f4972020-01-11 17:21:25 -0800356 // Converts from time on the sending node to time on the receiving node.
357 monotonic_clock::time_point DeliveredTime(const Context &context) const {
358 const distributed_clock::time_point distributed_sent_time =
359 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
360
Austin Schuh58646e22021-08-23 23:51:46 -0700361 const logger::BootTimestamp t = send_node_factory_->FromDistributedClock(
Austin Schuh2febf0d2020-09-21 22:24:30 -0700362 distributed_sent_time + send_node_factory_->network_delay() +
363 send_node_factory_->send_delay());
Austin Schuh58646e22021-08-23 23:51:46 -0700364 CHECK_EQ(t.boot, send_node_factory_->boot_count());
365 return t.time;
Austin Schuh898f4972020-01-11 17:21:25 -0800366 }
367
Austin Schuh58646e22021-08-23 23:51:46 -0700368 const Channel *channel_;
369 const Connection *connection_;
370
Austin Schuh898f4972020-01-11 17:21:25 -0800371 // Factories used for time conversion.
372 aos::NodeEventLoopFactory *fetch_node_factory_;
373 aos::NodeEventLoopFactory *send_node_factory_;
374
Austin Schuheeaa2022021-01-02 21:52:03 -0800375 // Event loop which fetching and sending timestamps are scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700376 aos::EventLoop *fetch_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800377 // Event loop which sending is scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700378 aos::EventLoop *send_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800379 // Timer used to send.
Austin Schuh58646e22021-08-23 23:51:46 -0700380 aos::TimerHandler *timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800381 // Timer used to send timestamps out.
Austin Schuh58646e22021-08-23 23:51:46 -0700382 aos::TimerHandler *timestamp_timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800383 // Time that the timer is scheduled for. Used to track if it needs to be
384 // rescheduled.
385 monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
386
Austin Schuh898f4972020-01-11 17:21:25 -0800387 // Fetcher used to receive messages.
388 std::unique_ptr<aos::RawFetcher> fetcher_;
389 // Sender to send them back out.
390 std::unique_ptr<aos::RawSender> sender_;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800391
Austin Schuh58646e22021-08-23 23:51:46 -0700392 MessageBridgeServerStatus *server_status_ = nullptr;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800393 const size_t destination_node_index_;
Austin Schuh898f4972020-01-11 17:21:25 -0800394 // True if we have sent the message in the fetcher.
395 bool sent_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700396
397 ServerConnection *server_connection_ = nullptr;
James Kuszmaula6681e22023-05-26 11:20:40 -0700398 int server_index_ = -1;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700399 MessageBridgeClientStatus *client_status_ = nullptr;
Austin Schuh58646e22021-08-23 23:51:46 -0700400 int client_index_ = -1;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700401 ClientConnection *client_connection_ = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700402
403 size_t channel_index_;
Austin Schuh0de30f32020-12-06 12:44:28 -0800404 aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800405
406 struct Timestamp {
407 Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
408 monotonic_clock::time_point new_monotonic_timestamp_time)
409 : remote_message(std::move(new_remote_message)),
410 monotonic_timestamp_time(new_monotonic_timestamp_time) {}
411 FlatbufferDetachedBuffer<RemoteMessage> remote_message;
412 monotonic_clock::time_point monotonic_timestamp_time;
413 };
414
415 std::deque<Timestamp> remote_timestamps_;
Austin Schuh58646e22021-08-23 23:51:46 -0700416
417 bool delivery_time_is_logged_;
418
419 bool forwarding_disabled_ = false;
Austin Schuh898f4972020-01-11 17:21:25 -0800420};
421
422SimulatedMessageBridge::SimulatedMessageBridge(
423 SimulatedEventLoopFactory *simulated_event_loop_factory) {
424 CHECK(
425 configuration::MultiNode(simulated_event_loop_factory->configuration()));
426
427 // Pre-build up event loops for every node. They are pretty cheap anyways.
428 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700429 NodeEventLoopFactory *node_factory =
430 simulated_event_loop_factory->GetNodeEventLoopFactory(node);
431 auto it = event_loop_map_.emplace(node, node_factory);
Austin Schuhcde938c2020-02-02 17:30:07 -0800432 CHECK(it.second);
433
Austin Schuh58646e22021-08-23 23:51:46 -0700434 node_factory->OnStartup(
435 [this, simulated_event_loop_factory, node_state = &it.first->second]() {
436 node_state->MakeEventLoop();
437 const size_t my_node_index = configuration::GetNodeIndex(
438 simulated_event_loop_factory->configuration(),
439 node_state->event_loop->node());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700440
Austin Schuh58646e22021-08-23 23:51:46 -0700441 size_t node_index = 0;
James Kuszmaulbedbb342023-05-26 11:19:27 -0700442 for (const std::optional<MessageBridgeServerStatus::NodeState>
443 &connection : node_state->server_status->nodes()) {
444 if (connection.has_value()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700445 node_state->server_status->ResetFilter(node_index);
446 }
447 ++node_index;
448 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700449
Austin Schuh58646e22021-08-23 23:51:46 -0700450 for (const ClientConnection *client_connections :
451 *node_state->client_status->mutable_client_statistics()
452 ->connections()) {
453 const Node *client_node = configuration::GetNode(
454 simulated_event_loop_factory->configuration(),
455 client_connections->node()->name()->string_view());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700456
Austin Schuh58646e22021-08-23 23:51:46 -0700457 auto client_event_loop = event_loop_map_.find(client_node);
458 client_event_loop->second.SetBootUUID(
459 my_node_index, node_state->event_loop->boot_uuid());
460 }
461 });
Austin Schuh4c3b9702020-08-30 11:34:55 -0700462
Austin Schuh58646e22021-08-23 23:51:46 -0700463 node_factory->OnShutdown([node_state = &it.first->second]() {
464 node_state->SetEventLoop(nullptr);
465 });
Austin Schuh20ac95d2020-12-05 17:24:19 -0800466 }
467
Austin Schuh898f4972020-01-11 17:21:25 -0800468 for (const Channel *channel :
469 *simulated_event_loop_factory->configuration()->channels()) {
470 if (!channel->has_destination_nodes()) {
471 continue;
472 }
473
474 // Find the sending node.
Austin Schuh58646e22021-08-23 23:51:46 -0700475 const Node *source_node =
Austin Schuh898f4972020-01-11 17:21:25 -0800476 configuration::GetNode(simulated_event_loop_factory->configuration(),
477 channel->source_node()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -0700478 auto source_event_loop = event_loop_map_.find(source_node);
Austin Schuh898f4972020-01-11 17:21:25 -0800479 CHECK(source_event_loop != event_loop_map_.end());
480
481 std::unique_ptr<DelayersVector> delayers =
482 std::make_unique<DelayersVector>();
483
484 // And then build up a RawMessageDelayer for each destination.
485 for (const Connection *connection : *channel->destination_nodes()) {
486 const Node *destination_node =
487 configuration::GetNode(simulated_event_loop_factory->configuration(),
488 connection->name()->string_view());
489 auto destination_event_loop = event_loop_map_.find(destination_node);
490 CHECK(destination_event_loop != event_loop_map_.end());
491
Austin Schuh2f8fd752020-09-01 22:38:28 -0700492 const size_t destination_node_index = configuration::GetNodeIndex(
493 simulated_event_loop_factory->configuration(), destination_node);
494
495 const bool delivery_time_is_logged =
Austin Schuh58646e22021-08-23 23:51:46 -0700496 configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
497 source_node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700498
Austin Schuh58646e22021-08-23 23:51:46 -0700499 delayers->v.emplace_back(std::make_unique<RawMessageDelayer>(
500 channel, connection,
501 simulated_event_loop_factory->GetNodeEventLoopFactory(source_node),
Austin Schuh898f4972020-01-11 17:21:25 -0800502 simulated_event_loop_factory->GetNodeEventLoopFactory(
503 destination_node),
Austin Schuh58646e22021-08-23 23:51:46 -0700504 destination_node_index, delivery_time_is_logged));
505
506 source_event_loop->second.AddSourceDelayer(delayers->v.back().get());
507 destination_event_loop->second.AddDestinationDelayer(
508 delayers->v.back().get());
Austin Schuh898f4972020-01-11 17:21:25 -0800509 }
510
Austin Schuh4c3b9702020-08-30 11:34:55 -0700511 const Channel *const timestamp_channel = configuration::GetChannel(
512 simulated_event_loop_factory->configuration(), "/aos",
Austin Schuh58646e22021-08-23 23:51:46 -0700513 Timestamp::GetFullyQualifiedName(), "message_bridge", source_node);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700514
515 if (channel == timestamp_channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700516 source_event_loop->second.SetSendData(
James Kuszmaul79b2f032023-06-02 21:02:27 -0700517 [captured_delayers = delayers.get()]() {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700518 for (std::unique_ptr<RawMessageDelayer> &delayer :
Austin Schuh58646e22021-08-23 23:51:46 -0700519 captured_delayers->v) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700520 delayer->Schedule();
521 }
522 });
523 } else {
Austin Schuh58646e22021-08-23 23:51:46 -0700524 source_event_loop->second.AddDelayerWatcher(channel, delayers.get());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700525 }
Austin Schuh898f4972020-01-11 17:21:25 -0800526 delayers_list_.emplace_back(std::move(delayers));
527 }
528}
529
530SimulatedMessageBridge::~SimulatedMessageBridge() {}
531
Austin Schuh6f3babe2020-01-26 20:34:50 -0800532void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700533 for (std::unique_ptr<DelayersVector> &delayers : delayers_list_) {
534 if (delayers->v.size() > 0) {
535 if (delayers->v[0]->channel() == channel) {
536 delayers->disable_forwarding = true;
537 for (std::unique_ptr<RawMessageDelayer> &delayer : delayers->v) {
538 delayer->set_forwarding_disabled(true);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800539 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800540 }
541 }
542 }
543}
544
Austin Schuhc0b0f722020-12-12 18:36:06 -0800545void SimulatedMessageBridge::Disconnect(const Node *source,
546 const Node *destination) {
547 SetState(source, destination, message_bridge::State::DISCONNECTED);
548}
549
550void SimulatedMessageBridge::Connect(const Node *source,
551 const Node *destination) {
552 SetState(source, destination, message_bridge::State::CONNECTED);
553}
554void SimulatedMessageBridge::SetState(const Node *source,
555 const Node *destination,
556 message_bridge::State state) {
557 auto source_state = event_loop_map_.find(source);
558 CHECK(source_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700559 source_state->second.SetServerState(destination, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800560
561 auto destination_state = event_loop_map_.find(destination);
562 CHECK(destination_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700563 destination_state->second.SetClientState(source, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800564}
565
James Kuszmaul94ca5132022-07-19 09:11:08 -0700566void SimulatedMessageBridge::DisableStatistics(DestroySenders destroy_senders) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700567 for (std::pair<const Node *const, State> &state : event_loop_map_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700568 state.second.DisableStatistics(destroy_senders);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700569 }
570}
571
James Kuszmaul94ca5132022-07-19 09:11:08 -0700572void SimulatedMessageBridge::DisableStatistics(const Node *node,
573 DestroySenders destroy_senders) {
Austin Schuh48205e62021-11-12 14:13:18 -0800574 auto it = event_loop_map_.find(node);
575 CHECK(it != event_loop_map_.end());
James Kuszmaul94ca5132022-07-19 09:11:08 -0700576 it->second.DisableStatistics(destroy_senders);
Austin Schuh48205e62021-11-12 14:13:18 -0800577}
578
579void SimulatedMessageBridge::EnableStatistics() {
580 for (std::pair<const Node *const, State> &state : event_loop_map_) {
581 state.second.EnableStatistics();
582 }
583}
584
585void SimulatedMessageBridge::EnableStatistics(const Node *node) {
586 auto it = event_loop_map_.find(node);
587 CHECK(it != event_loop_map_.end());
588 it->second.EnableStatistics();
589}
590
Austin Schuh58646e22021-08-23 23:51:46 -0700591void SimulatedMessageBridge::State::SetEventLoop(
592 std::unique_ptr<aos::EventLoop> loop) {
593 if (!loop) {
594 timestamp_loggers = ChannelTimestampSender(nullptr);
595 server_status.reset();
596 client_status.reset();
597 for (RawMessageDelayer *source_delayer : source_delayers_) {
598 source_delayer->SetFetchEventLoop(nullptr, nullptr, nullptr);
599 }
600 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
601 destination_delayer->SetSendEventLoop(nullptr, nullptr);
602 }
603 event_loop = std::move(loop);
604 return;
605 } else {
606 CHECK(!event_loop);
607 }
608 event_loop = std::move(loop);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700609
Austin Schuh58646e22021-08-23 23:51:46 -0700610 event_loop->SkipTimingReport();
611 event_loop->SkipAosLog();
612
613 for (std::pair<const Channel *, DelayersVector *> &watcher :
614 delayer_watchers_) {
615 // Don't register watchers if we know we aren't forwarding.
616 if (watcher.second->disable_forwarding) continue;
617 event_loop->MakeRawNoArgWatcher(
618 watcher.first, [captured_delayers = watcher.second](const Context &) {
619 // We might get told after registering, so don't forward at that point
620 // too.
621 for (std::unique_ptr<RawMessageDelayer> &delayer :
622 captured_delayers->v) {
623 delayer->Schedule();
624 }
625 });
626 }
627
628 timestamp_loggers = ChannelTimestampSender(event_loop.get());
629 server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
Austin Schuh48205e62021-11-12 14:13:18 -0800630 if (disable_statistics_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700631 server_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
Austin Schuh48205e62021-11-12 14:13:18 -0800632 }
Austin Schuh58646e22021-08-23 23:51:46 -0700633
634 {
635 size_t node_index = 0;
James Kuszmaulbedbb342023-05-26 11:19:27 -0700636 for (const std::optional<MessageBridgeServerStatus::NodeState> &connection :
637 server_status->nodes()) {
638 if (connection.has_value()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700639 if (boot_uuids_[node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800640 switch (server_state_[node_index]) {
641 case message_bridge::State::DISCONNECTED:
642 server_status->Disconnect(node_index);
643 break;
644 case message_bridge::State::CONNECTED:
645 server_status->Connect(node_index, event_loop->monotonic_now());
646 break;
647 }
Austin Schuh58646e22021-08-23 23:51:46 -0700648 } else {
Austin Schuh367a7f42021-11-23 23:04:36 -0800649 server_status->Disconnect(node_index);
Austin Schuh58646e22021-08-23 23:51:46 -0700650 }
651 }
652 ++node_index;
653 }
654 }
655
656 for (size_t i = 0; i < boot_uuids_.size(); ++i) {
657 if (boot_uuids_[i] != UUID::Zero()) {
658 server_status->SetBootUUID(i, boot_uuids_[i]);
659 }
660 }
Austin Schuh58646e22021-08-23 23:51:46 -0700661 if (fn_) {
662 server_status->set_send_data(fn_);
663 }
664 client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
665 if (disable_statistics_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700666 client_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
Austin Schuh58646e22021-08-23 23:51:46 -0700667 }
668
669 for (size_t i = 0;
670 i < client_status->mutable_client_statistics()->connections()->size();
671 ++i) {
672 ClientConnection *client_connection =
673 client_status->mutable_client_statistics()
674 ->mutable_connections()
675 ->GetMutableObject(i);
676 const Node *client_node = configuration::GetNode(
677 node_factory_->configuration(),
678 client_connection->node()->name()->string_view());
679 const size_t client_node_index = configuration::GetNodeIndex(
680 node_factory_->configuration(), client_node);
681 if (boot_uuids_[client_node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800682 if (client_connection->state() != client_state_[client_node_index]) {
683 switch (client_state_[client_node_index]) {
684 case message_bridge::State::DISCONNECTED:
685 client_status->Disconnect(i);
686 break;
687 case message_bridge::State::CONNECTED:
688 client_status->Connect(i);
689 break;
690 }
691 }
Austin Schuh58646e22021-08-23 23:51:46 -0700692 } else {
Austin Schuh367a7f42021-11-23 23:04:36 -0800693 client_status->Disconnect(i);
Austin Schuh58646e22021-08-23 23:51:46 -0700694 }
695 }
696
Austin Schuh2f8fd752020-09-01 22:38:28 -0700697 for (const Channel *channel : *event_loop->configuration()->channels()) {
698 CHECK(channel->has_source_node());
699
700 // Sent by us.
701 if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
702 channel->has_destination_nodes()) {
703 for (const Connection *connection : *channel->destination_nodes()) {
704 const bool delivery_time_is_logged =
705 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
706 connection, event_loop->node());
707
James Kuszmaul94ca5132022-07-19 09:11:08 -0700708 const RawMessageDelayer *delayer = nullptr;
709 for (const RawMessageDelayer *candidate : source_delayers_) {
710 if (candidate->channel() == channel) {
711 delayer = candidate;
712 }
713 }
714
Austin Schuh2f8fd752020-09-01 22:38:28 -0700715 // And the timestamps are then logged back by us again.
James Kuszmaul94ca5132022-07-19 09:11:08 -0700716 if (!delivery_time_is_logged ||
717 CHECK_NOTNULL(delayer)->forwarding_disabled()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700718 continue;
719 }
720
Austin Schuh89c9b812021-02-20 14:42:10 -0800721 timestamp_loggers.SenderForChannel(channel, connection);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700722 }
723 }
724 }
Austin Schuh58646e22021-08-23 23:51:46 -0700725
726 for (RawMessageDelayer *source_delayer : source_delayers_) {
727 source_delayer->SetFetchEventLoop(event_loop.get(), server_status.get(),
728 &timestamp_loggers);
729 }
730 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
731 destination_delayer->SetSendEventLoop(event_loop.get(),
732 client_status.get());
733 }
734 event_loop->OnRun([this]() {
735 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
736 if (destination_delayer->time_to_live() == 0) {
737 destination_delayer->ScheduleReliable();
738 }
739 }
James Kuszmaul86e86c32022-07-21 17:39:47 -0700740 // Note: This exists to work around the fact that some users like to be able
741 // to send reliable messages while execution is stopped, creating a
742 // situation where the following sequencing can occur:
743 // 1) <While stopped> Send a reliable message on Node A (to be forwarded to
744 // Node B).
745 // 2) Node B starts up.
746 // 3) Anywhere from 0 to N seconds later, Node A starts up.
747 //
748 // In this case, we need the reliable message to make it to Node B, but it
749 // also shouldn't make it to Node B until Node A has started up.
750 //
751 // Ideally, if the user were to wait for the Node B OnRun callbacks to send
752 // the message, then that would trigger the watchers in the delayers.
753 // However, we so far have continued to support Sending while stopped....
754 for (RawMessageDelayer *source_delayer : source_delayers_) {
755 if (source_delayer->time_to_live() == 0) {
756 source_delayer->ScheduleReliable();
757 }
758 }
Austin Schuh58646e22021-08-23 23:51:46 -0700759 });
Austin Schuh2f8fd752020-09-01 22:38:28 -0700760}
761
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800762} // namespace aos::message_bridge