blob: b363142a6f40d97fbe995ab4db4a66f8541c32a4 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
Austin Schuh0de30f32020-12-06 12:44:28 -08003#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07004
Austin Schuh0de30f32020-12-06 12:44:28 -08005#include "aos/configuration.h"
Austin Schuh898f4972020-01-11 17:21:25 -08006#include "aos/events/event_loop.h"
7#include "aos/events/simulated_event_loop.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08008#include "aos/network/remote_message_generated.h"
Austin Schuh898f4972020-01-11 17:21:25 -08009
10namespace aos {
11namespace message_bridge {
12
13// This class delays messages forwarded between two factories.
14//
15// The basic design is that we need to use the distributed_clock to convert
16// monotonic times from the source to the destination node. We also use a
17// fetcher to manage the queue of data, and a timer to schedule the sends.
18class RawMessageDelayer {
19 public:
Austin Schuh58646e22021-08-23 23:51:46 -070020 RawMessageDelayer(const Channel *channel, const Connection *connection,
21 aos::NodeEventLoopFactory *fetch_node_factory,
Austin Schuh898f4972020-01-11 17:21:25 -080022 aos::NodeEventLoopFactory *send_node_factory,
Austin Schuh58646e22021-08-23 23:51:46 -070023 size_t destination_node_index, bool delivery_time_is_logged)
24 : channel_(channel),
25 connection_(connection),
26 fetch_node_factory_(fetch_node_factory),
Austin Schuh898f4972020-01-11 17:21:25 -080027 send_node_factory_(send_node_factory),
Austin Schuh20ac95d2020-12-05 17:24:19 -080028 destination_node_index_(destination_node_index),
Austin Schuh58646e22021-08-23 23:51:46 -070029 channel_index_(configuration::ChannelIndex(
30 fetch_node_factory_->configuration(), channel_)),
31 delivery_time_is_logged_(delivery_time_is_logged) {}
Austin Schuh898f4972020-01-11 17:21:25 -080032
Austin Schuh58646e22021-08-23 23:51:46 -070033 bool forwarding_disabled() const { return forwarding_disabled_; }
34 void set_forwarding_disabled(bool forwarding_disabled) {
35 forwarding_disabled_ = forwarding_disabled;
James Kuszmaul94ca5132022-07-19 09:11:08 -070036 if (!forwarding_disabled_) {
37 CHECK(timestamp_logger_ == nullptr);
38 CHECK(sender_ == nullptr);
39 }
Austin Schuh898f4972020-01-11 17:21:25 -080040 }
41
Austin Schuh58646e22021-08-23 23:51:46 -070042 void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
43 MessageBridgeServerStatus *server_status,
44 ChannelTimestampSender *timestamp_loggers) {
45 sent_ = false;
46 fetch_event_loop_ = fetch_event_loop;
47 if (fetch_event_loop_) {
48 fetcher_ = fetch_event_loop_->MakeRawFetcher(channel_);
49 } else {
50 fetcher_ = nullptr;
51 }
52
53 server_status_ = server_status;
54 if (server_status) {
55 server_connection_ =
56 server_status_->FindServerConnection(send_node_factory_->node());
57 }
James Kuszmaul94ca5132022-07-19 09:11:08 -070058 if (delivery_time_is_logged_ && timestamp_loggers != nullptr &&
59 !forwarding_disabled_) {
Austin Schuh58646e22021-08-23 23:51:46 -070060 timestamp_logger_ =
61 timestamp_loggers->SenderForChannel(channel_, connection_);
62 } else {
63 timestamp_logger_ = nullptr;
64 }
65
66 if (fetch_event_loop_) {
67 timestamp_timer_ =
68 fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
69 if (send_event_loop_) {
milind1f1dca32021-07-03 13:50:07 -070070 std::string timer_name =
71 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
72 fetcher_->channel()->name()->string_view(), " ",
73 fetcher_->channel()->type()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -070074 if (timer_) {
75 timer_->set_name(timer_name);
76 }
77 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
78 }
79 } else {
80 timestamp_timer_ = nullptr;
81 }
82 }
83
84 void SetSendEventLoop(aos::EventLoop *send_event_loop,
85 MessageBridgeClientStatus *client_status) {
86 sent_ = false;
87 send_event_loop_ = send_event_loop;
James Kuszmaul94ca5132022-07-19 09:11:08 -070088 if (send_event_loop_ && !forwarding_disabled_) {
Austin Schuh58646e22021-08-23 23:51:46 -070089 sender_ = send_event_loop_->MakeRawSender(channel_);
90 } else {
91 sender_ = nullptr;
92 }
93
94 client_status_ = client_status;
95 if (client_status_) {
96 client_index_ = client_status_->FindClientIndex(
97 channel_->source_node()->string_view());
98 client_connection_ = client_status_->GetClientConnection(client_index_);
99 } else {
100 client_index_ = -1;
101 client_connection_ = nullptr;
102 }
103
104 if (send_event_loop_) {
105 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
106 if (fetcher_) {
107 std::string timer_name =
108 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
109 fetcher_->channel()->name()->string_view(), " ",
110 fetcher_->channel()->type()->string_view());
111 timer_->set_name(timer_name);
112 if (timestamp_timer_) {
113 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
114 }
115 }
116 } else {
117 timer_ = nullptr;
118 }
119 }
120
121 const Channel *channel() const { return channel_; }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800122
Austin Schuh4c570ea2020-11-19 23:13:24 -0800123 uint32_t time_to_live() {
Austin Schuh58646e22021-08-23 23:51:46 -0700124 return configuration::ConnectionToNode(channel_, send_node_factory_->node())
Austin Schuh4c570ea2020-11-19 23:13:24 -0800125 ->time_to_live();
126 }
127
Austin Schuh58646e22021-08-23 23:51:46 -0700128 void ScheduleReliable() {
129 if (forwarding_disabled()) return;
130
131 if (!fetcher_) {
132 return;
133 }
134 if (fetcher_->context().data == nullptr || sent_) {
135 sent_ = !fetcher_->Fetch();
136 }
137
138 FetchNext();
139 if (fetcher_->context().data == nullptr || sent_) {
140 return;
141 }
Austin Schuh58646e22021-08-23 23:51:46 -0700142
143 // Send at startup. It is the best we can do.
144 const monotonic_clock::time_point monotonic_delivered_time =
145 send_node_factory_->monotonic_now() +
146 send_node_factory_->network_delay();
147
148 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
149 << ": Trying to deliver message in the past on channel "
150 << configuration::StrippedChannelToString(fetcher_->channel())
151 << " to node " << send_event_loop_->node()->name()->string_view()
152 << " sent from " << fetcher_->channel()->source_node()->string_view()
153 << " at " << fetch_node_factory_->monotonic_now();
154
155 if (timer_) {
156 server_connection_->mutate_sent_packets(
157 server_connection_->sent_packets() + 1);
Philipp Schradera6712522023-07-05 20:25:11 -0700158 timer_->Schedule(monotonic_delivered_time);
Austin Schuh58646e22021-08-23 23:51:46 -0700159 timer_scheduled_ = true;
160 } else {
161 server_connection_->mutate_dropped_packets(
162 server_connection_->dropped_packets() + 1);
163 sent_ = true;
164 }
165 }
166
167 bool timer_scheduled_ = false;
168
Austin Schuh898f4972020-01-11 17:21:25 -0800169 // Kicks us to re-fetch and schedule the timer.
170 void Schedule() {
Austin Schuh58646e22021-08-23 23:51:46 -0700171 CHECK(!forwarding_disabled());
172 if (!fetcher_) {
173 return;
174 }
175 if (timer_scheduled_) {
176 return;
177 }
178 FetchNext();
179 if (fetcher_->context().data == nullptr || sent_) {
180 return;
181 }
182
183 // Compute the time to publish this message.
184 const monotonic_clock::time_point monotonic_delivered_time =
185 DeliveredTime(fetcher_->context());
186
187 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
188 << ": Trying to deliver message in the past on channel "
189 << configuration::StrippedChannelToString(fetcher_->channel())
190 << " to node " << send_event_loop_->node()->name()->string_view()
191 << " sent from " << fetcher_->channel()->source_node()->string_view()
192 << " at " << fetch_node_factory_->monotonic_now();
193
194 if (timer_) {
195 server_connection_->mutate_sent_packets(
196 server_connection_->sent_packets() + 1);
Philipp Schradera6712522023-07-05 20:25:11 -0700197 timer_->Schedule(monotonic_delivered_time);
Austin Schuh58646e22021-08-23 23:51:46 -0700198 timer_scheduled_ = true;
199 } else {
200 server_connection_->mutate_dropped_packets(
201 server_connection_->dropped_packets() + 1);
202 sent_ = true;
203 Schedule();
204 }
205 }
206
207 private:
208 void FetchNext() {
209 CHECK(server_connection_);
Austin Schuh6aa77be2020-02-22 21:06:40 -0800210 // Keep pulling messages out of the fetcher until we find one in the future.
211 while (true) {
212 if (fetcher_->context().data == nullptr || sent_) {
213 sent_ = !fetcher_->FetchNext();
214 }
215 if (sent_) {
216 break;
217 }
Austin Schuhc0b0f722020-12-12 18:36:06 -0800218
219 if (server_connection_->state() != State::CONNECTED) {
220 sent_ = true;
221 server_connection_->mutate_dropped_packets(
222 server_connection_->dropped_packets() + 1);
223 continue;
224 }
225
Austin Schuh6aa77be2020-02-22 21:06:40 -0800226 if (fetcher_->context().monotonic_event_time +
Austin Schuh58646e22021-08-23 23:51:46 -0700227 send_node_factory_->network_delay() +
228 send_node_factory_->send_delay() >
229 fetch_node_factory_->monotonic_now() ||
230 time_to_live() == 0) {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800231 break;
232 }
233
234 // TODO(austin): Not cool. We want to actually forward these. This means
235 // we need a more sophisticated concept of what is running.
Austin Schuh60e77942022-05-16 17:48:24 -0700236 // TODO(james): This fails if multiple messages are sent on the same
237 // channel within the same callback.
Austin Schuh6aa77be2020-02-22 21:06:40 -0800238 LOG(WARNING) << "Not forwarding message on "
239 << configuration::CleanedChannelToString(fetcher_->channel())
240 << " because we aren't running. Set at "
241 << fetcher_->context().monotonic_event_time << " now is "
242 << fetch_node_factory_->monotonic_now();
243 sent_ = true;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700244 server_connection_->mutate_dropped_packets(
245 server_connection_->dropped_packets() + 1);
Austin Schuh898f4972020-01-11 17:21:25 -0800246 }
Austin Schuh898f4972020-01-11 17:21:25 -0800247 }
248
Austin Schuh58646e22021-08-23 23:51:46 -0700249 // Actually sends the message, and reschedules.
Austin Schuh898f4972020-01-11 17:21:25 -0800250 void Send() {
Austin Schuh58646e22021-08-23 23:51:46 -0700251 timer_scheduled_ = false;
252 CHECK(sender_);
253 CHECK(client_status_);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800254 if (server_connection_->state() != State::CONNECTED) {
255 sent_ = true;
256 Schedule();
257 return;
258 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700259 // Fill out the send times.
milind1f1dca32021-07-03 13:50:07 -0700260 sender_->CheckOk(sender_->Send(
261 fetcher_->context().data, fetcher_->context().size,
262 fetcher_->context().monotonic_event_time,
263 fetcher_->context().realtime_event_time,
264 fetcher_->context().queue_index, fetcher_->context().source_boot_uuid));
Austin Schuh898f4972020-01-11 17:21:25 -0800265
Austin Schuh4c3b9702020-08-30 11:34:55 -0700266 // And simulate message_bridge's offset recovery.
Austin Schuh367a7f42021-11-23 23:04:36 -0800267 client_status_->SampleFilter(
268 client_index_, fetcher_->context().monotonic_event_time,
269 sender_->monotonic_sent_time(), fetcher_->context().source_boot_uuid);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700270
271 client_connection_->mutate_received_packets(
272 client_connection_->received_packets() + 1);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700273
274 if (timestamp_logger_) {
Austin Schuheeaa2022021-01-02 21:52:03 -0800275 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800276 fbb.ForceDefaults(true);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800277 // Reset the filter every time the UUID changes. There's probably a more
278 // clever way to do this, but that means a better concept of rebooting.
James Kuszmaulbedbb342023-05-26 11:19:27 -0700279 if (!server_status_->BootUUID(destination_node_index_).has_value() ||
280 (server_status_->BootUUID(destination_node_index_).value() !=
281 send_node_factory_->boot_uuid())) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800282 server_status_->ResetFilter(destination_node_index_);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800283 server_status_->SetBootUUID(destination_node_index_,
284 send_node_factory_->boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800285 }
286
Austin Schuhcdd90272021-03-15 12:46:16 -0700287 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
288 send_node_factory_->boot_uuid().PackVector(&fbb);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800289
Austin Schuheeaa2022021-01-02 21:52:03 -0800290 RemoteMessage::Builder message_header_builder(fbb);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700291
292 message_header_builder.add_channel_index(channel_index_);
293
294 // Swap the remote and sent metrics. They are from the sender's
295 // perspective, not the receiver's perspective.
296 message_header_builder.add_monotonic_remote_time(
297 fetcher_->context().monotonic_event_time.time_since_epoch().count());
298 message_header_builder.add_realtime_remote_time(
299 fetcher_->context().realtime_event_time.time_since_epoch().count());
300 message_header_builder.add_remote_queue_index(
301 fetcher_->context().queue_index);
302
303 message_header_builder.add_monotonic_sent_time(
304 sender_->monotonic_sent_time().time_since_epoch().count());
305 message_header_builder.add_realtime_sent_time(
306 sender_->realtime_sent_time().time_since_epoch().count());
307 message_header_builder.add_queue_index(sender_->sent_queue_index());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800308 message_header_builder.add_boot_uuid(boot_uuid_offset);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700309
Austin Schuheeaa2022021-01-02 21:52:03 -0800310 fbb.Finish(message_header_builder.Finish());
311
312 remote_timestamps_.emplace_back(
313 FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
314 fetch_node_factory_->monotonic_now() +
315 send_node_factory_->network_delay());
316 ScheduleTimestamp();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700317 }
318
Austin Schuh898f4972020-01-11 17:21:25 -0800319 sent_ = true;
320 Schedule();
321 }
322
Austin Schuheeaa2022021-01-02 21:52:03 -0800323 // Schedules sending the next timestamp in remote_timestamps_ if there is one.
324 void ScheduleTimestamp() {
325 if (remote_timestamps_.empty()) {
326 timestamp_timer_->Disable();
327 return;
328 }
329
330 if (scheduled_time_ !=
331 remote_timestamps_.front().monotonic_timestamp_time) {
Philipp Schradera6712522023-07-05 20:25:11 -0700332 timestamp_timer_->Schedule(
Austin Schuheeaa2022021-01-02 21:52:03 -0800333 remote_timestamps_.front().monotonic_timestamp_time);
334 scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
335 return;
336 } else {
337 scheduled_time_ = monotonic_clock::min_time;
338 }
339 }
340
341 // Sends the next timestamp in remote_timestamps_.
342 void SendTimestamp() {
343 CHECK(!remote_timestamps_.empty());
344
345 // Send out all timestamps at the currently scheduled time.
346 while (remote_timestamps_.front().monotonic_timestamp_time ==
347 scheduled_time_) {
348 if (server_connection_->state() == State::CONNECTED) {
milind1f1dca32021-07-03 13:50:07 -0700349 timestamp_logger_->CheckOk(timestamp_logger_->Send(
350 std::move(remote_timestamps_.front().remote_message)));
Austin Schuheeaa2022021-01-02 21:52:03 -0800351 }
352 remote_timestamps_.pop_front();
353 if (remote_timestamps_.empty()) {
354 break;
355 }
356 }
357
358 ScheduleTimestamp();
359 }
360
Austin Schuh898f4972020-01-11 17:21:25 -0800361 // Converts from time on the sending node to time on the receiving node.
362 monotonic_clock::time_point DeliveredTime(const Context &context) const {
363 const distributed_clock::time_point distributed_sent_time =
364 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
365
Austin Schuh58646e22021-08-23 23:51:46 -0700366 const logger::BootTimestamp t = send_node_factory_->FromDistributedClock(
Austin Schuh2febf0d2020-09-21 22:24:30 -0700367 distributed_sent_time + send_node_factory_->network_delay() +
368 send_node_factory_->send_delay());
Austin Schuh58646e22021-08-23 23:51:46 -0700369 CHECK_EQ(t.boot, send_node_factory_->boot_count());
370 return t.time;
Austin Schuh898f4972020-01-11 17:21:25 -0800371 }
372
Austin Schuh58646e22021-08-23 23:51:46 -0700373 const Channel *channel_;
374 const Connection *connection_;
375
Austin Schuh898f4972020-01-11 17:21:25 -0800376 // Factories used for time conversion.
377 aos::NodeEventLoopFactory *fetch_node_factory_;
378 aos::NodeEventLoopFactory *send_node_factory_;
379
Austin Schuheeaa2022021-01-02 21:52:03 -0800380 // Event loop which fetching and sending timestamps are scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700381 aos::EventLoop *fetch_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800382 // Event loop which sending is scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700383 aos::EventLoop *send_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800384 // Timer used to send.
Austin Schuh58646e22021-08-23 23:51:46 -0700385 aos::TimerHandler *timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800386 // Timer used to send timestamps out.
Austin Schuh58646e22021-08-23 23:51:46 -0700387 aos::TimerHandler *timestamp_timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800388 // Time that the timer is scheduled for. Used to track if it needs to be
389 // rescheduled.
390 monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
391
Austin Schuh898f4972020-01-11 17:21:25 -0800392 // Fetcher used to receive messages.
393 std::unique_ptr<aos::RawFetcher> fetcher_;
394 // Sender to send them back out.
395 std::unique_ptr<aos::RawSender> sender_;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800396
Austin Schuh58646e22021-08-23 23:51:46 -0700397 MessageBridgeServerStatus *server_status_ = nullptr;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800398 const size_t destination_node_index_;
Austin Schuh898f4972020-01-11 17:21:25 -0800399 // True if we have sent the message in the fetcher.
400 bool sent_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700401
402 ServerConnection *server_connection_ = nullptr;
403 MessageBridgeClientStatus *client_status_ = nullptr;
Austin Schuh58646e22021-08-23 23:51:46 -0700404 int client_index_ = -1;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700405 ClientConnection *client_connection_ = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700406
407 size_t channel_index_;
Austin Schuh0de30f32020-12-06 12:44:28 -0800408 aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800409
410 struct Timestamp {
411 Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
412 monotonic_clock::time_point new_monotonic_timestamp_time)
413 : remote_message(std::move(new_remote_message)),
414 monotonic_timestamp_time(new_monotonic_timestamp_time) {}
415 FlatbufferDetachedBuffer<RemoteMessage> remote_message;
416 monotonic_clock::time_point monotonic_timestamp_time;
417 };
418
419 std::deque<Timestamp> remote_timestamps_;
Austin Schuh58646e22021-08-23 23:51:46 -0700420
421 bool delivery_time_is_logged_;
422
423 bool forwarding_disabled_ = false;
Austin Schuh898f4972020-01-11 17:21:25 -0800424};
425
426SimulatedMessageBridge::SimulatedMessageBridge(
427 SimulatedEventLoopFactory *simulated_event_loop_factory) {
428 CHECK(
429 configuration::MultiNode(simulated_event_loop_factory->configuration()));
430
431 // Pre-build up event loops for every node. They are pretty cheap anyways.
432 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700433 NodeEventLoopFactory *node_factory =
434 simulated_event_loop_factory->GetNodeEventLoopFactory(node);
435 auto it = event_loop_map_.emplace(node, node_factory);
Austin Schuhcde938c2020-02-02 17:30:07 -0800436 CHECK(it.second);
437
Austin Schuh58646e22021-08-23 23:51:46 -0700438 node_factory->OnStartup(
439 [this, simulated_event_loop_factory, node_state = &it.first->second]() {
440 node_state->MakeEventLoop();
441 const size_t my_node_index = configuration::GetNodeIndex(
442 simulated_event_loop_factory->configuration(),
443 node_state->event_loop->node());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700444
Austin Schuh58646e22021-08-23 23:51:46 -0700445 size_t node_index = 0;
James Kuszmaulbedbb342023-05-26 11:19:27 -0700446 for (const std::optional<MessageBridgeServerStatus::NodeState>
447 &connection : node_state->server_status->nodes()) {
448 if (connection.has_value()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700449 node_state->server_status->ResetFilter(node_index);
450 }
451 ++node_index;
452 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700453
Austin Schuh58646e22021-08-23 23:51:46 -0700454 for (const ClientConnection *client_connections :
455 *node_state->client_status->mutable_client_statistics()
456 ->connections()) {
457 const Node *client_node = configuration::GetNode(
458 simulated_event_loop_factory->configuration(),
459 client_connections->node()->name()->string_view());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700460
Austin Schuh58646e22021-08-23 23:51:46 -0700461 auto client_event_loop = event_loop_map_.find(client_node);
462 client_event_loop->second.SetBootUUID(
463 my_node_index, node_state->event_loop->boot_uuid());
464 }
465 });
Austin Schuh4c3b9702020-08-30 11:34:55 -0700466
Austin Schuh58646e22021-08-23 23:51:46 -0700467 node_factory->OnShutdown([node_state = &it.first->second]() {
468 node_state->SetEventLoop(nullptr);
469 });
Austin Schuh20ac95d2020-12-05 17:24:19 -0800470 }
471
Austin Schuh898f4972020-01-11 17:21:25 -0800472 for (const Channel *channel :
473 *simulated_event_loop_factory->configuration()->channels()) {
474 if (!channel->has_destination_nodes()) {
475 continue;
476 }
477
478 // Find the sending node.
Austin Schuh58646e22021-08-23 23:51:46 -0700479 const Node *source_node =
Austin Schuh898f4972020-01-11 17:21:25 -0800480 configuration::GetNode(simulated_event_loop_factory->configuration(),
481 channel->source_node()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -0700482 auto source_event_loop = event_loop_map_.find(source_node);
Austin Schuh898f4972020-01-11 17:21:25 -0800483 CHECK(source_event_loop != event_loop_map_.end());
484
485 std::unique_ptr<DelayersVector> delayers =
486 std::make_unique<DelayersVector>();
487
488 // And then build up a RawMessageDelayer for each destination.
489 for (const Connection *connection : *channel->destination_nodes()) {
490 const Node *destination_node =
491 configuration::GetNode(simulated_event_loop_factory->configuration(),
492 connection->name()->string_view());
493 auto destination_event_loop = event_loop_map_.find(destination_node);
494 CHECK(destination_event_loop != event_loop_map_.end());
495
Austin Schuh2f8fd752020-09-01 22:38:28 -0700496 const size_t destination_node_index = configuration::GetNodeIndex(
497 simulated_event_loop_factory->configuration(), destination_node);
498
499 const bool delivery_time_is_logged =
Austin Schuh58646e22021-08-23 23:51:46 -0700500 configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
501 source_node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700502
Austin Schuh58646e22021-08-23 23:51:46 -0700503 delayers->v.emplace_back(std::make_unique<RawMessageDelayer>(
504 channel, connection,
505 simulated_event_loop_factory->GetNodeEventLoopFactory(source_node),
Austin Schuh898f4972020-01-11 17:21:25 -0800506 simulated_event_loop_factory->GetNodeEventLoopFactory(
507 destination_node),
Austin Schuh58646e22021-08-23 23:51:46 -0700508 destination_node_index, delivery_time_is_logged));
509
510 source_event_loop->second.AddSourceDelayer(delayers->v.back().get());
511 destination_event_loop->second.AddDestinationDelayer(
512 delayers->v.back().get());
Austin Schuh898f4972020-01-11 17:21:25 -0800513 }
514
Austin Schuh4c3b9702020-08-30 11:34:55 -0700515 const Channel *const timestamp_channel = configuration::GetChannel(
516 simulated_event_loop_factory->configuration(), "/aos",
Austin Schuh58646e22021-08-23 23:51:46 -0700517 Timestamp::GetFullyQualifiedName(), "message_bridge", source_node);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700518
519 if (channel == timestamp_channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700520 source_event_loop->second.SetSendData(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700521 [captured_delayers = delayers.get()](const Context &) {
522 for (std::unique_ptr<RawMessageDelayer> &delayer :
Austin Schuh58646e22021-08-23 23:51:46 -0700523 captured_delayers->v) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700524 delayer->Schedule();
525 }
526 });
527 } else {
Austin Schuh58646e22021-08-23 23:51:46 -0700528 source_event_loop->second.AddDelayerWatcher(channel, delayers.get());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700529 }
Austin Schuh898f4972020-01-11 17:21:25 -0800530 delayers_list_.emplace_back(std::move(delayers));
531 }
532}
533
534SimulatedMessageBridge::~SimulatedMessageBridge() {}
535
Austin Schuh6f3babe2020-01-26 20:34:50 -0800536void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700537 for (std::unique_ptr<DelayersVector> &delayers : delayers_list_) {
538 if (delayers->v.size() > 0) {
539 if (delayers->v[0]->channel() == channel) {
540 delayers->disable_forwarding = true;
541 for (std::unique_ptr<RawMessageDelayer> &delayer : delayers->v) {
542 delayer->set_forwarding_disabled(true);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800543 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800544 }
545 }
546 }
547}
548
Austin Schuhc0b0f722020-12-12 18:36:06 -0800549void SimulatedMessageBridge::Disconnect(const Node *source,
550 const Node *destination) {
551 SetState(source, destination, message_bridge::State::DISCONNECTED);
552}
553
554void SimulatedMessageBridge::Connect(const Node *source,
555 const Node *destination) {
556 SetState(source, destination, message_bridge::State::CONNECTED);
557}
558void SimulatedMessageBridge::SetState(const Node *source,
559 const Node *destination,
560 message_bridge::State state) {
561 auto source_state = event_loop_map_.find(source);
562 CHECK(source_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700563 source_state->second.SetServerState(destination, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800564
565 auto destination_state = event_loop_map_.find(destination);
566 CHECK(destination_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700567 destination_state->second.SetClientState(source, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800568}
569
James Kuszmaul94ca5132022-07-19 09:11:08 -0700570void SimulatedMessageBridge::DisableStatistics(DestroySenders destroy_senders) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700571 for (std::pair<const Node *const, State> &state : event_loop_map_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700572 state.second.DisableStatistics(destroy_senders);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700573 }
574}
575
James Kuszmaul94ca5132022-07-19 09:11:08 -0700576void SimulatedMessageBridge::DisableStatistics(const Node *node,
577 DestroySenders destroy_senders) {
Austin Schuh48205e62021-11-12 14:13:18 -0800578 auto it = event_loop_map_.find(node);
579 CHECK(it != event_loop_map_.end());
James Kuszmaul94ca5132022-07-19 09:11:08 -0700580 it->second.DisableStatistics(destroy_senders);
Austin Schuh48205e62021-11-12 14:13:18 -0800581}
582
583void SimulatedMessageBridge::EnableStatistics() {
584 for (std::pair<const Node *const, State> &state : event_loop_map_) {
585 state.second.EnableStatistics();
586 }
587}
588
589void SimulatedMessageBridge::EnableStatistics(const Node *node) {
590 auto it = event_loop_map_.find(node);
591 CHECK(it != event_loop_map_.end());
592 it->second.EnableStatistics();
593}
594
Austin Schuh58646e22021-08-23 23:51:46 -0700595void SimulatedMessageBridge::State::SetEventLoop(
596 std::unique_ptr<aos::EventLoop> loop) {
597 if (!loop) {
598 timestamp_loggers = ChannelTimestampSender(nullptr);
599 server_status.reset();
600 client_status.reset();
601 for (RawMessageDelayer *source_delayer : source_delayers_) {
602 source_delayer->SetFetchEventLoop(nullptr, nullptr, nullptr);
603 }
604 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
605 destination_delayer->SetSendEventLoop(nullptr, nullptr);
606 }
607 event_loop = std::move(loop);
608 return;
609 } else {
610 CHECK(!event_loop);
611 }
612 event_loop = std::move(loop);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700613
Austin Schuh58646e22021-08-23 23:51:46 -0700614 event_loop->SkipTimingReport();
615 event_loop->SkipAosLog();
616
617 for (std::pair<const Channel *, DelayersVector *> &watcher :
618 delayer_watchers_) {
619 // Don't register watchers if we know we aren't forwarding.
620 if (watcher.second->disable_forwarding) continue;
621 event_loop->MakeRawNoArgWatcher(
622 watcher.first, [captured_delayers = watcher.second](const Context &) {
623 // We might get told after registering, so don't forward at that point
624 // too.
625 for (std::unique_ptr<RawMessageDelayer> &delayer :
626 captured_delayers->v) {
627 delayer->Schedule();
628 }
629 });
630 }
631
632 timestamp_loggers = ChannelTimestampSender(event_loop.get());
633 server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
Austin Schuh48205e62021-11-12 14:13:18 -0800634 if (disable_statistics_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700635 server_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
Austin Schuh48205e62021-11-12 14:13:18 -0800636 }
Austin Schuh58646e22021-08-23 23:51:46 -0700637
638 {
639 size_t node_index = 0;
James Kuszmaulbedbb342023-05-26 11:19:27 -0700640 for (const std::optional<MessageBridgeServerStatus::NodeState> &connection :
641 server_status->nodes()) {
642 if (connection.has_value()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700643 if (boot_uuids_[node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800644 switch (server_state_[node_index]) {
645 case message_bridge::State::DISCONNECTED:
646 server_status->Disconnect(node_index);
647 break;
648 case message_bridge::State::CONNECTED:
649 server_status->Connect(node_index, event_loop->monotonic_now());
650 break;
651 }
Austin Schuh58646e22021-08-23 23:51:46 -0700652 } else {
Austin Schuh367a7f42021-11-23 23:04:36 -0800653 server_status->Disconnect(node_index);
Austin Schuh58646e22021-08-23 23:51:46 -0700654 }
655 }
656 ++node_index;
657 }
658 }
659
660 for (size_t i = 0; i < boot_uuids_.size(); ++i) {
661 if (boot_uuids_[i] != UUID::Zero()) {
662 server_status->SetBootUUID(i, boot_uuids_[i]);
663 }
664 }
Austin Schuh58646e22021-08-23 23:51:46 -0700665 if (fn_) {
666 server_status->set_send_data(fn_);
667 }
668 client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
669 if (disable_statistics_) {
James Kuszmaul94ca5132022-07-19 09:11:08 -0700670 client_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
Austin Schuh58646e22021-08-23 23:51:46 -0700671 }
672
673 for (size_t i = 0;
674 i < client_status->mutable_client_statistics()->connections()->size();
675 ++i) {
676 ClientConnection *client_connection =
677 client_status->mutable_client_statistics()
678 ->mutable_connections()
679 ->GetMutableObject(i);
680 const Node *client_node = configuration::GetNode(
681 node_factory_->configuration(),
682 client_connection->node()->name()->string_view());
683 const size_t client_node_index = configuration::GetNodeIndex(
684 node_factory_->configuration(), client_node);
685 if (boot_uuids_[client_node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800686 if (client_connection->state() != client_state_[client_node_index]) {
687 switch (client_state_[client_node_index]) {
688 case message_bridge::State::DISCONNECTED:
689 client_status->Disconnect(i);
690 break;
691 case message_bridge::State::CONNECTED:
692 client_status->Connect(i);
693 break;
694 }
695 }
Austin Schuh58646e22021-08-23 23:51:46 -0700696 } else {
Austin Schuh367a7f42021-11-23 23:04:36 -0800697 client_status->Disconnect(i);
Austin Schuh58646e22021-08-23 23:51:46 -0700698 }
699 }
700
Austin Schuh2f8fd752020-09-01 22:38:28 -0700701 for (const Channel *channel : *event_loop->configuration()->channels()) {
702 CHECK(channel->has_source_node());
703
704 // Sent by us.
705 if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
706 channel->has_destination_nodes()) {
707 for (const Connection *connection : *channel->destination_nodes()) {
708 const bool delivery_time_is_logged =
709 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
710 connection, event_loop->node());
711
James Kuszmaul94ca5132022-07-19 09:11:08 -0700712 const RawMessageDelayer *delayer = nullptr;
713 for (const RawMessageDelayer *candidate : source_delayers_) {
714 if (candidate->channel() == channel) {
715 delayer = candidate;
716 }
717 }
718
Austin Schuh2f8fd752020-09-01 22:38:28 -0700719 // And the timestamps are then logged back by us again.
James Kuszmaul94ca5132022-07-19 09:11:08 -0700720 if (!delivery_time_is_logged ||
721 CHECK_NOTNULL(delayer)->forwarding_disabled()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700722 continue;
723 }
724
Austin Schuh89c9b812021-02-20 14:42:10 -0800725 timestamp_loggers.SenderForChannel(channel, connection);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700726 }
727 }
728 }
Austin Schuh58646e22021-08-23 23:51:46 -0700729
730 for (RawMessageDelayer *source_delayer : source_delayers_) {
731 source_delayer->SetFetchEventLoop(event_loop.get(), server_status.get(),
732 &timestamp_loggers);
733 }
734 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
735 destination_delayer->SetSendEventLoop(event_loop.get(),
736 client_status.get());
737 }
738 event_loop->OnRun([this]() {
739 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
740 if (destination_delayer->time_to_live() == 0) {
741 destination_delayer->ScheduleReliable();
742 }
743 }
James Kuszmaul86e86c32022-07-21 17:39:47 -0700744 // Note: This exists to work around the fact that some users like to be able
745 // to send reliable messages while execution is stopped, creating a
746 // situation where the following sequencing can occur:
747 // 1) <While stopped> Send a reliable message on Node A (to be forwarded to
748 // Node B).
749 // 2) Node B starts up.
750 // 3) Anywhere from 0 to N seconds later, Node A starts up.
751 //
752 // In this case, we need the reliable message to make it to Node B, but it
753 // also shouldn't make it to Node B until Node A has started up.
754 //
755 // Ideally, if the user were to wait for the Node B OnRun callbacks to send
756 // the message, then that would trigger the watchers in the delayers.
757 // However, we so far have continued to support Sending while stopped....
758 for (RawMessageDelayer *source_delayer : source_delayers_) {
759 if (source_delayer->time_to_live() == 0) {
760 source_delayer->ScheduleReliable();
761 }
762 }
Austin Schuh58646e22021-08-23 23:51:46 -0700763 });
Austin Schuh2f8fd752020-09-01 22:38:28 -0700764}
765
Austin Schuh898f4972020-01-11 17:21:25 -0800766} // namespace message_bridge
767} // namespace aos