blob: 4c9208ea050229e7078daf6a976de818836aa7d2 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
Austin Schuh0de30f32020-12-06 12:44:28 -08003#include "absl/strings/str_cat.h"
4#include "aos/configuration.h"
Austin Schuh898f4972020-01-11 17:21:25 -08005#include "aos/events/event_loop.h"
6#include "aos/events/simulated_event_loop.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08007#include "aos/network/remote_message_generated.h"
Austin Schuh898f4972020-01-11 17:21:25 -08008
9namespace aos {
10namespace message_bridge {
11
12// This class delays messages forwarded between two factories.
13//
14// The basic design is that we need to use the distributed_clock to convert
15// monotonic times from the source to the destination node. We also use a
16// fetcher to manage the queue of data, and a timer to schedule the sends.
17class RawMessageDelayer {
18 public:
Austin Schuh58646e22021-08-23 23:51:46 -070019 RawMessageDelayer(const Channel *channel, const Connection *connection,
20 aos::NodeEventLoopFactory *fetch_node_factory,
Austin Schuh898f4972020-01-11 17:21:25 -080021 aos::NodeEventLoopFactory *send_node_factory,
Austin Schuh58646e22021-08-23 23:51:46 -070022 size_t destination_node_index, bool delivery_time_is_logged)
23 : channel_(channel),
24 connection_(connection),
25 fetch_node_factory_(fetch_node_factory),
Austin Schuh898f4972020-01-11 17:21:25 -080026 send_node_factory_(send_node_factory),
Austin Schuh20ac95d2020-12-05 17:24:19 -080027 destination_node_index_(destination_node_index),
Austin Schuh58646e22021-08-23 23:51:46 -070028 channel_index_(configuration::ChannelIndex(
29 fetch_node_factory_->configuration(), channel_)),
30 delivery_time_is_logged_(delivery_time_is_logged) {}
Austin Schuh898f4972020-01-11 17:21:25 -080031
Austin Schuh58646e22021-08-23 23:51:46 -070032 bool forwarding_disabled() const { return forwarding_disabled_; }
33 void set_forwarding_disabled(bool forwarding_disabled) {
34 forwarding_disabled_ = forwarding_disabled;
Austin Schuh898f4972020-01-11 17:21:25 -080035 }
36
Austin Schuh58646e22021-08-23 23:51:46 -070037 void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
38 MessageBridgeServerStatus *server_status,
39 ChannelTimestampSender *timestamp_loggers) {
40 sent_ = false;
41 fetch_event_loop_ = fetch_event_loop;
42 if (fetch_event_loop_) {
43 fetcher_ = fetch_event_loop_->MakeRawFetcher(channel_);
44 } else {
45 fetcher_ = nullptr;
46 }
47
48 server_status_ = server_status;
49 if (server_status) {
50 server_connection_ =
51 server_status_->FindServerConnection(send_node_factory_->node());
52 }
53 if (delivery_time_is_logged_ && timestamp_loggers != nullptr) {
54 timestamp_logger_ =
55 timestamp_loggers->SenderForChannel(channel_, connection_);
56 } else {
57 timestamp_logger_ = nullptr;
58 }
59
60 if (fetch_event_loop_) {
61 timestamp_timer_ =
62 fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
63 if (send_event_loop_) {
milind1f1dca32021-07-03 13:50:07 -070064 std::string timer_name =
65 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
66 fetcher_->channel()->name()->string_view(), " ",
67 fetcher_->channel()->type()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -070068 if (timer_) {
69 timer_->set_name(timer_name);
70 }
71 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
72 }
73 } else {
74 timestamp_timer_ = nullptr;
75 }
76 }
77
78 void SetSendEventLoop(aos::EventLoop *send_event_loop,
79 MessageBridgeClientStatus *client_status) {
80 sent_ = false;
81 send_event_loop_ = send_event_loop;
82 if (send_event_loop_) {
83 sender_ = send_event_loop_->MakeRawSender(channel_);
84 } else {
85 sender_ = nullptr;
86 }
87
88 client_status_ = client_status;
89 if (client_status_) {
90 client_index_ = client_status_->FindClientIndex(
91 channel_->source_node()->string_view());
92 client_connection_ = client_status_->GetClientConnection(client_index_);
93 } else {
94 client_index_ = -1;
95 client_connection_ = nullptr;
96 }
97
98 if (send_event_loop_) {
99 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
100 if (fetcher_) {
101 std::string timer_name =
102 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
103 fetcher_->channel()->name()->string_view(), " ",
104 fetcher_->channel()->type()->string_view());
105 timer_->set_name(timer_name);
106 if (timestamp_timer_) {
107 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
108 }
109 }
110 } else {
111 timer_ = nullptr;
112 }
113 }
114
115 const Channel *channel() const { return channel_; }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800116
Austin Schuh4c570ea2020-11-19 23:13:24 -0800117 uint32_t time_to_live() {
Austin Schuh58646e22021-08-23 23:51:46 -0700118 return configuration::ConnectionToNode(channel_, send_node_factory_->node())
Austin Schuh4c570ea2020-11-19 23:13:24 -0800119 ->time_to_live();
120 }
121
Austin Schuh58646e22021-08-23 23:51:46 -0700122 void ScheduleReliable() {
123 if (forwarding_disabled()) return;
124
125 if (!fetcher_) {
126 return;
127 }
128 if (fetcher_->context().data == nullptr || sent_) {
129 sent_ = !fetcher_->Fetch();
130 }
131
132 FetchNext();
133 if (fetcher_->context().data == nullptr || sent_) {
134 return;
135 }
136 CHECK(!timer_scheduled_);
137
138 // Send at startup. It is the best we can do.
139 const monotonic_clock::time_point monotonic_delivered_time =
140 send_node_factory_->monotonic_now() +
141 send_node_factory_->network_delay();
142
143 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
144 << ": Trying to deliver message in the past on channel "
145 << configuration::StrippedChannelToString(fetcher_->channel())
146 << " to node " << send_event_loop_->node()->name()->string_view()
147 << " sent from " << fetcher_->channel()->source_node()->string_view()
148 << " at " << fetch_node_factory_->monotonic_now();
149
150 if (timer_) {
151 server_connection_->mutate_sent_packets(
152 server_connection_->sent_packets() + 1);
153 timer_->Setup(monotonic_delivered_time);
154 timer_scheduled_ = true;
155 } else {
156 server_connection_->mutate_dropped_packets(
157 server_connection_->dropped_packets() + 1);
158 sent_ = true;
159 }
160 }
161
162 bool timer_scheduled_ = false;
163
Austin Schuh898f4972020-01-11 17:21:25 -0800164 // Kicks us to re-fetch and schedule the timer.
165 void Schedule() {
Austin Schuh58646e22021-08-23 23:51:46 -0700166 CHECK(!forwarding_disabled());
167 if (!fetcher_) {
168 return;
169 }
170 if (timer_scheduled_) {
171 return;
172 }
173 FetchNext();
174 if (fetcher_->context().data == nullptr || sent_) {
175 return;
176 }
177
178 // Compute the time to publish this message.
179 const monotonic_clock::time_point monotonic_delivered_time =
180 DeliveredTime(fetcher_->context());
181
182 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
183 << ": Trying to deliver message in the past on channel "
184 << configuration::StrippedChannelToString(fetcher_->channel())
185 << " to node " << send_event_loop_->node()->name()->string_view()
186 << " sent from " << fetcher_->channel()->source_node()->string_view()
187 << " at " << fetch_node_factory_->monotonic_now();
188
189 if (timer_) {
190 server_connection_->mutate_sent_packets(
191 server_connection_->sent_packets() + 1);
192 timer_->Setup(monotonic_delivered_time);
193 timer_scheduled_ = true;
194 } else {
195 server_connection_->mutate_dropped_packets(
196 server_connection_->dropped_packets() + 1);
197 sent_ = true;
198 Schedule();
199 }
200 }
201
202 private:
203 void FetchNext() {
204 CHECK(server_connection_);
Austin Schuh6aa77be2020-02-22 21:06:40 -0800205 // Keep pulling messages out of the fetcher until we find one in the future.
206 while (true) {
207 if (fetcher_->context().data == nullptr || sent_) {
208 sent_ = !fetcher_->FetchNext();
209 }
210 if (sent_) {
211 break;
212 }
Austin Schuhc0b0f722020-12-12 18:36:06 -0800213
214 if (server_connection_->state() != State::CONNECTED) {
215 sent_ = true;
216 server_connection_->mutate_dropped_packets(
217 server_connection_->dropped_packets() + 1);
218 continue;
219 }
220
Austin Schuh6aa77be2020-02-22 21:06:40 -0800221 if (fetcher_->context().monotonic_event_time +
Austin Schuh58646e22021-08-23 23:51:46 -0700222 send_node_factory_->network_delay() +
223 send_node_factory_->send_delay() >
224 fetch_node_factory_->monotonic_now() ||
225 time_to_live() == 0) {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800226 break;
227 }
228
229 // TODO(austin): Not cool. We want to actually forward these. This means
230 // we need a more sophisticated concept of what is running.
231 LOG(WARNING) << "Not forwarding message on "
232 << configuration::CleanedChannelToString(fetcher_->channel())
233 << " because we aren't running. Set at "
234 << fetcher_->context().monotonic_event_time << " now is "
235 << fetch_node_factory_->monotonic_now();
236 sent_ = true;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700237 server_connection_->mutate_dropped_packets(
238 server_connection_->dropped_packets() + 1);
Austin Schuh898f4972020-01-11 17:21:25 -0800239 }
Austin Schuh898f4972020-01-11 17:21:25 -0800240 }
241
Austin Schuh58646e22021-08-23 23:51:46 -0700242 // Actually sends the message, and reschedules.
Austin Schuh898f4972020-01-11 17:21:25 -0800243 void Send() {
Austin Schuh58646e22021-08-23 23:51:46 -0700244 timer_scheduled_ = false;
245 CHECK(sender_);
246 CHECK(client_status_);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800247 if (server_connection_->state() != State::CONNECTED) {
248 sent_ = true;
249 Schedule();
250 return;
251 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700252 // Fill out the send times.
milind1f1dca32021-07-03 13:50:07 -0700253 sender_->CheckOk(sender_->Send(
254 fetcher_->context().data, fetcher_->context().size,
255 fetcher_->context().monotonic_event_time,
256 fetcher_->context().realtime_event_time,
257 fetcher_->context().queue_index, fetcher_->context().source_boot_uuid));
Austin Schuh898f4972020-01-11 17:21:25 -0800258
Austin Schuh4c3b9702020-08-30 11:34:55 -0700259 // And simulate message_bridge's offset recovery.
Austin Schuh367a7f42021-11-23 23:04:36 -0800260 client_status_->SampleFilter(
261 client_index_, fetcher_->context().monotonic_event_time,
262 sender_->monotonic_sent_time(), fetcher_->context().source_boot_uuid);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700263
264 client_connection_->mutate_received_packets(
265 client_connection_->received_packets() + 1);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700266
267 if (timestamp_logger_) {
Austin Schuheeaa2022021-01-02 21:52:03 -0800268 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800269 fbb.ForceDefaults(true);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800270 // Reset the filter every time the UUID changes. There's probably a more
271 // clever way to do this, but that means a better concept of rebooting.
272 if (server_status_->BootUUID(destination_node_index_) !=
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800273 send_node_factory_->boot_uuid()) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800274 server_status_->ResetFilter(destination_node_index_);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800275 server_status_->SetBootUUID(destination_node_index_,
276 send_node_factory_->boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800277 }
278
Austin Schuhcdd90272021-03-15 12:46:16 -0700279 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
280 send_node_factory_->boot_uuid().PackVector(&fbb);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800281
Austin Schuheeaa2022021-01-02 21:52:03 -0800282 RemoteMessage::Builder message_header_builder(fbb);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700283
284 message_header_builder.add_channel_index(channel_index_);
285
286 // Swap the remote and sent metrics. They are from the sender's
287 // perspective, not the receiver's perspective.
288 message_header_builder.add_monotonic_remote_time(
289 fetcher_->context().monotonic_event_time.time_since_epoch().count());
290 message_header_builder.add_realtime_remote_time(
291 fetcher_->context().realtime_event_time.time_since_epoch().count());
292 message_header_builder.add_remote_queue_index(
293 fetcher_->context().queue_index);
294
295 message_header_builder.add_monotonic_sent_time(
296 sender_->monotonic_sent_time().time_since_epoch().count());
297 message_header_builder.add_realtime_sent_time(
298 sender_->realtime_sent_time().time_since_epoch().count());
299 message_header_builder.add_queue_index(sender_->sent_queue_index());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800300 message_header_builder.add_boot_uuid(boot_uuid_offset);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700301
Austin Schuheeaa2022021-01-02 21:52:03 -0800302 fbb.Finish(message_header_builder.Finish());
303
304 remote_timestamps_.emplace_back(
305 FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
306 fetch_node_factory_->monotonic_now() +
307 send_node_factory_->network_delay());
308 ScheduleTimestamp();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700309 }
310
Austin Schuh898f4972020-01-11 17:21:25 -0800311 sent_ = true;
312 Schedule();
313 }
314
Austin Schuheeaa2022021-01-02 21:52:03 -0800315 // Schedules sending the next timestamp in remote_timestamps_ if there is one.
316 void ScheduleTimestamp() {
317 if (remote_timestamps_.empty()) {
318 timestamp_timer_->Disable();
319 return;
320 }
321
322 if (scheduled_time_ !=
323 remote_timestamps_.front().monotonic_timestamp_time) {
324 timestamp_timer_->Setup(
325 remote_timestamps_.front().monotonic_timestamp_time);
326 scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
327 return;
328 } else {
329 scheduled_time_ = monotonic_clock::min_time;
330 }
331 }
332
333 // Sends the next timestamp in remote_timestamps_.
334 void SendTimestamp() {
335 CHECK(!remote_timestamps_.empty());
336
337 // Send out all timestamps at the currently scheduled time.
338 while (remote_timestamps_.front().monotonic_timestamp_time ==
339 scheduled_time_) {
340 if (server_connection_->state() == State::CONNECTED) {
milind1f1dca32021-07-03 13:50:07 -0700341 timestamp_logger_->CheckOk(timestamp_logger_->Send(
342 std::move(remote_timestamps_.front().remote_message)));
Austin Schuheeaa2022021-01-02 21:52:03 -0800343 }
344 remote_timestamps_.pop_front();
345 if (remote_timestamps_.empty()) {
346 break;
347 }
348 }
349
350 ScheduleTimestamp();
351 }
352
Austin Schuh898f4972020-01-11 17:21:25 -0800353 // Converts from time on the sending node to time on the receiving node.
354 monotonic_clock::time_point DeliveredTime(const Context &context) const {
355 const distributed_clock::time_point distributed_sent_time =
356 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
357
Austin Schuh58646e22021-08-23 23:51:46 -0700358 const logger::BootTimestamp t = send_node_factory_->FromDistributedClock(
Austin Schuh2febf0d2020-09-21 22:24:30 -0700359 distributed_sent_time + send_node_factory_->network_delay() +
360 send_node_factory_->send_delay());
Austin Schuh58646e22021-08-23 23:51:46 -0700361 CHECK_EQ(t.boot, send_node_factory_->boot_count());
362 return t.time;
Austin Schuh898f4972020-01-11 17:21:25 -0800363 }
364
Austin Schuh58646e22021-08-23 23:51:46 -0700365 const Channel *channel_;
366 const Connection *connection_;
367
Austin Schuh898f4972020-01-11 17:21:25 -0800368 // Factories used for time conversion.
369 aos::NodeEventLoopFactory *fetch_node_factory_;
370 aos::NodeEventLoopFactory *send_node_factory_;
371
Austin Schuheeaa2022021-01-02 21:52:03 -0800372 // Event loop which fetching and sending timestamps are scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700373 aos::EventLoop *fetch_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800374 // Event loop which sending is scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700375 aos::EventLoop *send_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800376 // Timer used to send.
Austin Schuh58646e22021-08-23 23:51:46 -0700377 aos::TimerHandler *timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800378 // Timer used to send timestamps out.
Austin Schuh58646e22021-08-23 23:51:46 -0700379 aos::TimerHandler *timestamp_timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800380 // Time that the timer is scheduled for. Used to track if it needs to be
381 // rescheduled.
382 monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
383
Austin Schuh898f4972020-01-11 17:21:25 -0800384 // Fetcher used to receive messages.
385 std::unique_ptr<aos::RawFetcher> fetcher_;
386 // Sender to send them back out.
387 std::unique_ptr<aos::RawSender> sender_;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800388
Austin Schuh58646e22021-08-23 23:51:46 -0700389 MessageBridgeServerStatus *server_status_ = nullptr;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800390 const size_t destination_node_index_;
Austin Schuh898f4972020-01-11 17:21:25 -0800391 // True if we have sent the message in the fetcher.
392 bool sent_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700393
394 ServerConnection *server_connection_ = nullptr;
395 MessageBridgeClientStatus *client_status_ = nullptr;
Austin Schuh58646e22021-08-23 23:51:46 -0700396 int client_index_ = -1;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700397 ClientConnection *client_connection_ = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700398
399 size_t channel_index_;
Austin Schuh0de30f32020-12-06 12:44:28 -0800400 aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800401
402 struct Timestamp {
403 Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
404 monotonic_clock::time_point new_monotonic_timestamp_time)
405 : remote_message(std::move(new_remote_message)),
406 monotonic_timestamp_time(new_monotonic_timestamp_time) {}
407 FlatbufferDetachedBuffer<RemoteMessage> remote_message;
408 monotonic_clock::time_point monotonic_timestamp_time;
409 };
410
411 std::deque<Timestamp> remote_timestamps_;
Austin Schuh58646e22021-08-23 23:51:46 -0700412
413 bool delivery_time_is_logged_;
414
415 bool forwarding_disabled_ = false;
Austin Schuh898f4972020-01-11 17:21:25 -0800416};
417
418SimulatedMessageBridge::SimulatedMessageBridge(
419 SimulatedEventLoopFactory *simulated_event_loop_factory) {
420 CHECK(
421 configuration::MultiNode(simulated_event_loop_factory->configuration()));
422
423 // Pre-build up event loops for every node. They are pretty cheap anyways.
424 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700425 NodeEventLoopFactory *node_factory =
426 simulated_event_loop_factory->GetNodeEventLoopFactory(node);
427 auto it = event_loop_map_.emplace(node, node_factory);
Austin Schuhcde938c2020-02-02 17:30:07 -0800428 CHECK(it.second);
429
Austin Schuh58646e22021-08-23 23:51:46 -0700430 node_factory->OnStartup(
431 [this, simulated_event_loop_factory, node_state = &it.first->second]() {
432 node_state->MakeEventLoop();
433 const size_t my_node_index = configuration::GetNodeIndex(
434 simulated_event_loop_factory->configuration(),
435 node_state->event_loop->node());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700436
Austin Schuh58646e22021-08-23 23:51:46 -0700437 size_t node_index = 0;
438 for (ServerConnection *connection :
439 node_state->server_status->server_connection()) {
440 if (connection != nullptr) {
441 node_state->server_status->ResetFilter(node_index);
442 }
443 ++node_index;
444 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700445
Austin Schuh58646e22021-08-23 23:51:46 -0700446 for (const ClientConnection *client_connections :
447 *node_state->client_status->mutable_client_statistics()
448 ->connections()) {
449 const Node *client_node = configuration::GetNode(
450 simulated_event_loop_factory->configuration(),
451 client_connections->node()->name()->string_view());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700452
Austin Schuh58646e22021-08-23 23:51:46 -0700453 auto client_event_loop = event_loop_map_.find(client_node);
454 client_event_loop->second.SetBootUUID(
455 my_node_index, node_state->event_loop->boot_uuid());
456 }
457 });
Austin Schuh4c3b9702020-08-30 11:34:55 -0700458
Austin Schuh58646e22021-08-23 23:51:46 -0700459 node_factory->OnShutdown([node_state = &it.first->second]() {
460 node_state->SetEventLoop(nullptr);
461 });
Austin Schuh20ac95d2020-12-05 17:24:19 -0800462 }
463
Austin Schuh898f4972020-01-11 17:21:25 -0800464 for (const Channel *channel :
465 *simulated_event_loop_factory->configuration()->channels()) {
466 if (!channel->has_destination_nodes()) {
467 continue;
468 }
469
470 // Find the sending node.
Austin Schuh58646e22021-08-23 23:51:46 -0700471 const Node *source_node =
Austin Schuh898f4972020-01-11 17:21:25 -0800472 configuration::GetNode(simulated_event_loop_factory->configuration(),
473 channel->source_node()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -0700474 auto source_event_loop = event_loop_map_.find(source_node);
Austin Schuh898f4972020-01-11 17:21:25 -0800475 CHECK(source_event_loop != event_loop_map_.end());
476
477 std::unique_ptr<DelayersVector> delayers =
478 std::make_unique<DelayersVector>();
479
480 // And then build up a RawMessageDelayer for each destination.
481 for (const Connection *connection : *channel->destination_nodes()) {
482 const Node *destination_node =
483 configuration::GetNode(simulated_event_loop_factory->configuration(),
484 connection->name()->string_view());
485 auto destination_event_loop = event_loop_map_.find(destination_node);
486 CHECK(destination_event_loop != event_loop_map_.end());
487
Austin Schuh2f8fd752020-09-01 22:38:28 -0700488 const size_t destination_node_index = configuration::GetNodeIndex(
489 simulated_event_loop_factory->configuration(), destination_node);
490
491 const bool delivery_time_is_logged =
Austin Schuh58646e22021-08-23 23:51:46 -0700492 configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
493 source_node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700494
Austin Schuh58646e22021-08-23 23:51:46 -0700495 delayers->v.emplace_back(std::make_unique<RawMessageDelayer>(
496 channel, connection,
497 simulated_event_loop_factory->GetNodeEventLoopFactory(source_node),
Austin Schuh898f4972020-01-11 17:21:25 -0800498 simulated_event_loop_factory->GetNodeEventLoopFactory(
499 destination_node),
Austin Schuh58646e22021-08-23 23:51:46 -0700500 destination_node_index, delivery_time_is_logged));
501
502 source_event_loop->second.AddSourceDelayer(delayers->v.back().get());
503 destination_event_loop->second.AddDestinationDelayer(
504 delayers->v.back().get());
Austin Schuh898f4972020-01-11 17:21:25 -0800505 }
506
Austin Schuh4c3b9702020-08-30 11:34:55 -0700507 const Channel *const timestamp_channel = configuration::GetChannel(
508 simulated_event_loop_factory->configuration(), "/aos",
Austin Schuh58646e22021-08-23 23:51:46 -0700509 Timestamp::GetFullyQualifiedName(), "message_bridge", source_node);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700510
511 if (channel == timestamp_channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700512 source_event_loop->second.SetSendData(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700513 [captured_delayers = delayers.get()](const Context &) {
514 for (std::unique_ptr<RawMessageDelayer> &delayer :
Austin Schuh58646e22021-08-23 23:51:46 -0700515 captured_delayers->v) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700516 delayer->Schedule();
517 }
518 });
519 } else {
Austin Schuh58646e22021-08-23 23:51:46 -0700520 source_event_loop->second.AddDelayerWatcher(channel, delayers.get());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700521 }
Austin Schuh898f4972020-01-11 17:21:25 -0800522 delayers_list_.emplace_back(std::move(delayers));
523 }
524}
525
526SimulatedMessageBridge::~SimulatedMessageBridge() {}
527
Austin Schuh6f3babe2020-01-26 20:34:50 -0800528void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700529 for (std::unique_ptr<DelayersVector> &delayers : delayers_list_) {
530 if (delayers->v.size() > 0) {
531 if (delayers->v[0]->channel() == channel) {
532 delayers->disable_forwarding = true;
533 for (std::unique_ptr<RawMessageDelayer> &delayer : delayers->v) {
534 delayer->set_forwarding_disabled(true);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800535 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800536 }
537 }
538 }
539}
540
Austin Schuhc0b0f722020-12-12 18:36:06 -0800541void SimulatedMessageBridge::Disconnect(const Node *source,
542 const Node *destination) {
543 SetState(source, destination, message_bridge::State::DISCONNECTED);
544}
545
546void SimulatedMessageBridge::Connect(const Node *source,
547 const Node *destination) {
548 SetState(source, destination, message_bridge::State::CONNECTED);
549}
550void SimulatedMessageBridge::SetState(const Node *source,
551 const Node *destination,
552 message_bridge::State state) {
553 auto source_state = event_loop_map_.find(source);
554 CHECK(source_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700555 source_state->second.SetServerState(destination, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800556
557 auto destination_state = event_loop_map_.find(destination);
558 CHECK(destination_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700559 destination_state->second.SetClientState(source, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800560}
561
Austin Schuh4c3b9702020-08-30 11:34:55 -0700562void SimulatedMessageBridge::DisableStatistics() {
563 for (std::pair<const Node *const, State> &state : event_loop_map_) {
Austin Schuh58646e22021-08-23 23:51:46 -0700564 state.second.DisableStatistics();
Austin Schuh4c3b9702020-08-30 11:34:55 -0700565 }
566}
567
Austin Schuh48205e62021-11-12 14:13:18 -0800568void SimulatedMessageBridge::DisableStatistics(const Node *node) {
569 auto it = event_loop_map_.find(node);
570 CHECK(it != event_loop_map_.end());
571 it->second.DisableStatistics();
572}
573
574void SimulatedMessageBridge::EnableStatistics() {
575 for (std::pair<const Node *const, State> &state : event_loop_map_) {
576 state.second.EnableStatistics();
577 }
578}
579
580void SimulatedMessageBridge::EnableStatistics(const Node *node) {
581 auto it = event_loop_map_.find(node);
582 CHECK(it != event_loop_map_.end());
583 it->second.EnableStatistics();
584}
585
Austin Schuh58646e22021-08-23 23:51:46 -0700586void SimulatedMessageBridge::State::SetEventLoop(
587 std::unique_ptr<aos::EventLoop> loop) {
588 if (!loop) {
589 timestamp_loggers = ChannelTimestampSender(nullptr);
590 server_status.reset();
591 client_status.reset();
592 for (RawMessageDelayer *source_delayer : source_delayers_) {
593 source_delayer->SetFetchEventLoop(nullptr, nullptr, nullptr);
594 }
595 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
596 destination_delayer->SetSendEventLoop(nullptr, nullptr);
597 }
598 event_loop = std::move(loop);
599 return;
600 } else {
601 CHECK(!event_loop);
602 }
603 event_loop = std::move(loop);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700604
Austin Schuh58646e22021-08-23 23:51:46 -0700605 event_loop->SkipTimingReport();
606 event_loop->SkipAosLog();
607
608 for (std::pair<const Channel *, DelayersVector *> &watcher :
609 delayer_watchers_) {
610 // Don't register watchers if we know we aren't forwarding.
611 if (watcher.second->disable_forwarding) continue;
612 event_loop->MakeRawNoArgWatcher(
613 watcher.first, [captured_delayers = watcher.second](const Context &) {
614 // We might get told after registering, so don't forward at that point
615 // too.
616 for (std::unique_ptr<RawMessageDelayer> &delayer :
617 captured_delayers->v) {
618 delayer->Schedule();
619 }
620 });
621 }
622
623 timestamp_loggers = ChannelTimestampSender(event_loop.get());
624 server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
Austin Schuh48205e62021-11-12 14:13:18 -0800625 if (disable_statistics_) {
626 server_status->DisableStatistics();
627 }
Austin Schuh58646e22021-08-23 23:51:46 -0700628
629 {
630 size_t node_index = 0;
631 for (ServerConnection *connection : server_status->server_connection()) {
632 if (connection) {
633 if (boot_uuids_[node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800634 switch (server_state_[node_index]) {
635 case message_bridge::State::DISCONNECTED:
636 server_status->Disconnect(node_index);
637 break;
638 case message_bridge::State::CONNECTED:
639 server_status->Connect(node_index, event_loop->monotonic_now());
640 break;
641 }
Austin Schuh58646e22021-08-23 23:51:46 -0700642 } else {
Austin Schuh367a7f42021-11-23 23:04:36 -0800643 server_status->Disconnect(node_index);
Austin Schuh58646e22021-08-23 23:51:46 -0700644 }
645 }
646 ++node_index;
647 }
648 }
649
650 for (size_t i = 0; i < boot_uuids_.size(); ++i) {
651 if (boot_uuids_[i] != UUID::Zero()) {
652 server_status->SetBootUUID(i, boot_uuids_[i]);
653 }
654 }
Austin Schuh58646e22021-08-23 23:51:46 -0700655 if (fn_) {
656 server_status->set_send_data(fn_);
657 }
658 client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
659 if (disable_statistics_) {
660 client_status->DisableStatistics();
661 }
662
663 for (size_t i = 0;
664 i < client_status->mutable_client_statistics()->connections()->size();
665 ++i) {
666 ClientConnection *client_connection =
667 client_status->mutable_client_statistics()
668 ->mutable_connections()
669 ->GetMutableObject(i);
670 const Node *client_node = configuration::GetNode(
671 node_factory_->configuration(),
672 client_connection->node()->name()->string_view());
673 const size_t client_node_index = configuration::GetNodeIndex(
674 node_factory_->configuration(), client_node);
675 if (boot_uuids_[client_node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800676 if (client_connection->state() != client_state_[client_node_index]) {
677 switch (client_state_[client_node_index]) {
678 case message_bridge::State::DISCONNECTED:
679 client_status->Disconnect(i);
680 break;
681 case message_bridge::State::CONNECTED:
682 client_status->Connect(i);
683 break;
684 }
685 }
Austin Schuh58646e22021-08-23 23:51:46 -0700686 } else {
Austin Schuh367a7f42021-11-23 23:04:36 -0800687 client_status->Disconnect(i);
Austin Schuh58646e22021-08-23 23:51:46 -0700688 }
689 }
690
Austin Schuh2f8fd752020-09-01 22:38:28 -0700691 for (const Channel *channel : *event_loop->configuration()->channels()) {
692 CHECK(channel->has_source_node());
693
694 // Sent by us.
695 if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
696 channel->has_destination_nodes()) {
697 for (const Connection *connection : *channel->destination_nodes()) {
698 const bool delivery_time_is_logged =
699 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
700 connection, event_loop->node());
701
702 // And the timestamps are then logged back by us again.
703 if (!delivery_time_is_logged) {
704 continue;
705 }
706
Austin Schuh89c9b812021-02-20 14:42:10 -0800707 timestamp_loggers.SenderForChannel(channel, connection);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700708 }
709 }
710 }
Austin Schuh58646e22021-08-23 23:51:46 -0700711
712 for (RawMessageDelayer *source_delayer : source_delayers_) {
713 source_delayer->SetFetchEventLoop(event_loop.get(), server_status.get(),
714 &timestamp_loggers);
715 }
716 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
717 destination_delayer->SetSendEventLoop(event_loop.get(),
718 client_status.get());
719 }
720 event_loop->OnRun([this]() {
721 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
722 if (destination_delayer->time_to_live() == 0) {
723 destination_delayer->ScheduleReliable();
724 }
725 }
726 });
Austin Schuh2f8fd752020-09-01 22:38:28 -0700727}
728
Austin Schuh898f4972020-01-11 17:21:25 -0800729} // namespace message_bridge
730} // namespace aos