blob: c6ea811bc800061138d335547d0e557472076be8 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
Austin Schuh0de30f32020-12-06 12:44:28 -08003#include "absl/strings/str_cat.h"
4#include "aos/configuration.h"
Austin Schuh898f4972020-01-11 17:21:25 -08005#include "aos/events/event_loop.h"
6#include "aos/events/simulated_event_loop.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08007#include "aos/network/remote_message_generated.h"
Austin Schuh898f4972020-01-11 17:21:25 -08008
9namespace aos {
10namespace message_bridge {
11
12// This class delays messages forwarded between two factories.
13//
14// The basic design is that we need to use the distributed_clock to convert
15// monotonic times from the source to the destination node. We also use a
16// fetcher to manage the queue of data, and a timer to schedule the sends.
17class RawMessageDelayer {
18 public:
Austin Schuh58646e22021-08-23 23:51:46 -070019 RawMessageDelayer(const Channel *channel, const Connection *connection,
20 aos::NodeEventLoopFactory *fetch_node_factory,
Austin Schuh898f4972020-01-11 17:21:25 -080021 aos::NodeEventLoopFactory *send_node_factory,
Austin Schuh58646e22021-08-23 23:51:46 -070022 size_t destination_node_index, bool delivery_time_is_logged)
23 : channel_(channel),
24 connection_(connection),
25 fetch_node_factory_(fetch_node_factory),
Austin Schuh898f4972020-01-11 17:21:25 -080026 send_node_factory_(send_node_factory),
Austin Schuh20ac95d2020-12-05 17:24:19 -080027 destination_node_index_(destination_node_index),
Austin Schuh58646e22021-08-23 23:51:46 -070028 channel_index_(configuration::ChannelIndex(
29 fetch_node_factory_->configuration(), channel_)),
30 delivery_time_is_logged_(delivery_time_is_logged) {}
Austin Schuh898f4972020-01-11 17:21:25 -080031
Austin Schuh58646e22021-08-23 23:51:46 -070032 bool forwarding_disabled() const { return forwarding_disabled_; }
33 void set_forwarding_disabled(bool forwarding_disabled) {
34 forwarding_disabled_ = forwarding_disabled;
Austin Schuh898f4972020-01-11 17:21:25 -080035 }
36
Austin Schuh58646e22021-08-23 23:51:46 -070037 void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
38 MessageBridgeServerStatus *server_status,
39 ChannelTimestampSender *timestamp_loggers) {
40 sent_ = false;
41 fetch_event_loop_ = fetch_event_loop;
42 if (fetch_event_loop_) {
43 fetcher_ = fetch_event_loop_->MakeRawFetcher(channel_);
44 } else {
45 fetcher_ = nullptr;
46 }
47
48 server_status_ = server_status;
49 if (server_status) {
50 server_connection_ =
51 server_status_->FindServerConnection(send_node_factory_->node());
52 }
53 if (delivery_time_is_logged_ && timestamp_loggers != nullptr) {
54 timestamp_logger_ =
55 timestamp_loggers->SenderForChannel(channel_, connection_);
56 } else {
57 timestamp_logger_ = nullptr;
58 }
59
60 if (fetch_event_loop_) {
61 timestamp_timer_ =
62 fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
63 if (send_event_loop_) {
milind1f1dca32021-07-03 13:50:07 -070064 std::string timer_name =
65 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
66 fetcher_->channel()->name()->string_view(), " ",
67 fetcher_->channel()->type()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -070068 if (timer_) {
69 timer_->set_name(timer_name);
70 }
71 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
72 }
73 } else {
74 timestamp_timer_ = nullptr;
75 }
76 }
77
78 void SetSendEventLoop(aos::EventLoop *send_event_loop,
79 MessageBridgeClientStatus *client_status) {
80 sent_ = false;
81 send_event_loop_ = send_event_loop;
82 if (send_event_loop_) {
83 sender_ = send_event_loop_->MakeRawSender(channel_);
84 } else {
85 sender_ = nullptr;
86 }
87
88 client_status_ = client_status;
89 if (client_status_) {
90 client_index_ = client_status_->FindClientIndex(
91 channel_->source_node()->string_view());
92 client_connection_ = client_status_->GetClientConnection(client_index_);
93 } else {
94 client_index_ = -1;
95 client_connection_ = nullptr;
96 }
97
98 if (send_event_loop_) {
99 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
100 if (fetcher_) {
101 std::string timer_name =
102 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
103 fetcher_->channel()->name()->string_view(), " ",
104 fetcher_->channel()->type()->string_view());
105 timer_->set_name(timer_name);
106 if (timestamp_timer_) {
107 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
108 }
109 }
110 } else {
111 timer_ = nullptr;
112 }
113 }
114
115 const Channel *channel() const { return channel_; }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800116
Austin Schuh4c570ea2020-11-19 23:13:24 -0800117 uint32_t time_to_live() {
Austin Schuh58646e22021-08-23 23:51:46 -0700118 return configuration::ConnectionToNode(channel_, send_node_factory_->node())
Austin Schuh4c570ea2020-11-19 23:13:24 -0800119 ->time_to_live();
120 }
121
Austin Schuh58646e22021-08-23 23:51:46 -0700122 void ScheduleReliable() {
123 if (forwarding_disabled()) return;
124
125 if (!fetcher_) {
126 return;
127 }
128 if (fetcher_->context().data == nullptr || sent_) {
129 sent_ = !fetcher_->Fetch();
130 }
131
132 FetchNext();
133 if (fetcher_->context().data == nullptr || sent_) {
134 return;
135 }
136 CHECK(!timer_scheduled_);
137
138 // Send at startup. It is the best we can do.
139 const monotonic_clock::time_point monotonic_delivered_time =
140 send_node_factory_->monotonic_now() +
141 send_node_factory_->network_delay();
142
143 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
144 << ": Trying to deliver message in the past on channel "
145 << configuration::StrippedChannelToString(fetcher_->channel())
146 << " to node " << send_event_loop_->node()->name()->string_view()
147 << " sent from " << fetcher_->channel()->source_node()->string_view()
148 << " at " << fetch_node_factory_->monotonic_now();
149
150 if (timer_) {
151 server_connection_->mutate_sent_packets(
152 server_connection_->sent_packets() + 1);
153 timer_->Setup(monotonic_delivered_time);
154 timer_scheduled_ = true;
155 } else {
156 server_connection_->mutate_dropped_packets(
157 server_connection_->dropped_packets() + 1);
158 sent_ = true;
159 }
160 }
161
162 bool timer_scheduled_ = false;
163
Austin Schuh898f4972020-01-11 17:21:25 -0800164 // Kicks us to re-fetch and schedule the timer.
165 void Schedule() {
Austin Schuh58646e22021-08-23 23:51:46 -0700166 CHECK(!forwarding_disabled());
167 if (!fetcher_) {
168 return;
169 }
170 if (timer_scheduled_) {
171 return;
172 }
173 FetchNext();
174 if (fetcher_->context().data == nullptr || sent_) {
175 return;
176 }
177
178 // Compute the time to publish this message.
179 const monotonic_clock::time_point monotonic_delivered_time =
180 DeliveredTime(fetcher_->context());
181
182 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
183 << ": Trying to deliver message in the past on channel "
184 << configuration::StrippedChannelToString(fetcher_->channel())
185 << " to node " << send_event_loop_->node()->name()->string_view()
186 << " sent from " << fetcher_->channel()->source_node()->string_view()
187 << " at " << fetch_node_factory_->monotonic_now();
188
189 if (timer_) {
190 server_connection_->mutate_sent_packets(
191 server_connection_->sent_packets() + 1);
192 timer_->Setup(monotonic_delivered_time);
193 timer_scheduled_ = true;
194 } else {
195 server_connection_->mutate_dropped_packets(
196 server_connection_->dropped_packets() + 1);
197 sent_ = true;
198 Schedule();
199 }
200 }
201
202 private:
203 void FetchNext() {
204 CHECK(server_connection_);
Austin Schuh6aa77be2020-02-22 21:06:40 -0800205 // Keep pulling messages out of the fetcher until we find one in the future.
206 while (true) {
207 if (fetcher_->context().data == nullptr || sent_) {
208 sent_ = !fetcher_->FetchNext();
209 }
210 if (sent_) {
211 break;
212 }
Austin Schuhc0b0f722020-12-12 18:36:06 -0800213
214 if (server_connection_->state() != State::CONNECTED) {
215 sent_ = true;
216 server_connection_->mutate_dropped_packets(
217 server_connection_->dropped_packets() + 1);
218 continue;
219 }
220
Austin Schuh6aa77be2020-02-22 21:06:40 -0800221 if (fetcher_->context().monotonic_event_time +
Austin Schuh58646e22021-08-23 23:51:46 -0700222 send_node_factory_->network_delay() +
223 send_node_factory_->send_delay() >
224 fetch_node_factory_->monotonic_now() ||
225 time_to_live() == 0) {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800226 break;
227 }
228
229 // TODO(austin): Not cool. We want to actually forward these. This means
230 // we need a more sophisticated concept of what is running.
James Kuszmaul29c59522022-02-12 16:44:26 -0800231 // TODO(james): This fails if multiple messages are sent on the same channel
232 // within the same callback.
Austin Schuh6aa77be2020-02-22 21:06:40 -0800233 LOG(WARNING) << "Not forwarding message on "
234 << configuration::CleanedChannelToString(fetcher_->channel())
235 << " because we aren't running. Set at "
236 << fetcher_->context().monotonic_event_time << " now is "
237 << fetch_node_factory_->monotonic_now();
238 sent_ = true;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700239 server_connection_->mutate_dropped_packets(
240 server_connection_->dropped_packets() + 1);
Austin Schuh898f4972020-01-11 17:21:25 -0800241 }
Austin Schuh898f4972020-01-11 17:21:25 -0800242 }
243
Austin Schuh58646e22021-08-23 23:51:46 -0700244 // Actually sends the message, and reschedules.
Austin Schuh898f4972020-01-11 17:21:25 -0800245 void Send() {
Austin Schuh58646e22021-08-23 23:51:46 -0700246 timer_scheduled_ = false;
247 CHECK(sender_);
248 CHECK(client_status_);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800249 if (server_connection_->state() != State::CONNECTED) {
250 sent_ = true;
251 Schedule();
252 return;
253 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700254 // Fill out the send times.
milind1f1dca32021-07-03 13:50:07 -0700255 sender_->CheckOk(sender_->Send(
256 fetcher_->context().data, fetcher_->context().size,
257 fetcher_->context().monotonic_event_time,
258 fetcher_->context().realtime_event_time,
259 fetcher_->context().queue_index, fetcher_->context().source_boot_uuid));
Austin Schuh898f4972020-01-11 17:21:25 -0800260
Austin Schuh4c3b9702020-08-30 11:34:55 -0700261 // And simulate message_bridge's offset recovery.
Austin Schuh367a7f42021-11-23 23:04:36 -0800262 client_status_->SampleFilter(
263 client_index_, fetcher_->context().monotonic_event_time,
264 sender_->monotonic_sent_time(), fetcher_->context().source_boot_uuid);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700265
266 client_connection_->mutate_received_packets(
267 client_connection_->received_packets() + 1);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700268
269 if (timestamp_logger_) {
Austin Schuheeaa2022021-01-02 21:52:03 -0800270 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800271 fbb.ForceDefaults(true);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800272 // Reset the filter every time the UUID changes. There's probably a more
273 // clever way to do this, but that means a better concept of rebooting.
274 if (server_status_->BootUUID(destination_node_index_) !=
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800275 send_node_factory_->boot_uuid()) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800276 server_status_->ResetFilter(destination_node_index_);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800277 server_status_->SetBootUUID(destination_node_index_,
278 send_node_factory_->boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800279 }
280
Austin Schuhcdd90272021-03-15 12:46:16 -0700281 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
282 send_node_factory_->boot_uuid().PackVector(&fbb);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800283
Austin Schuheeaa2022021-01-02 21:52:03 -0800284 RemoteMessage::Builder message_header_builder(fbb);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700285
286 message_header_builder.add_channel_index(channel_index_);
287
288 // Swap the remote and sent metrics. They are from the sender's
289 // perspective, not the receiver's perspective.
290 message_header_builder.add_monotonic_remote_time(
291 fetcher_->context().monotonic_event_time.time_since_epoch().count());
292 message_header_builder.add_realtime_remote_time(
293 fetcher_->context().realtime_event_time.time_since_epoch().count());
294 message_header_builder.add_remote_queue_index(
295 fetcher_->context().queue_index);
296
297 message_header_builder.add_monotonic_sent_time(
298 sender_->monotonic_sent_time().time_since_epoch().count());
299 message_header_builder.add_realtime_sent_time(
300 sender_->realtime_sent_time().time_since_epoch().count());
301 message_header_builder.add_queue_index(sender_->sent_queue_index());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800302 message_header_builder.add_boot_uuid(boot_uuid_offset);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700303
Austin Schuheeaa2022021-01-02 21:52:03 -0800304 fbb.Finish(message_header_builder.Finish());
305
306 remote_timestamps_.emplace_back(
307 FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
308 fetch_node_factory_->monotonic_now() +
309 send_node_factory_->network_delay());
310 ScheduleTimestamp();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700311 }
312
Austin Schuh898f4972020-01-11 17:21:25 -0800313 sent_ = true;
314 Schedule();
315 }
316
Austin Schuheeaa2022021-01-02 21:52:03 -0800317 // Schedules sending the next timestamp in remote_timestamps_ if there is one.
318 void ScheduleTimestamp() {
319 if (remote_timestamps_.empty()) {
320 timestamp_timer_->Disable();
321 return;
322 }
323
324 if (scheduled_time_ !=
325 remote_timestamps_.front().monotonic_timestamp_time) {
326 timestamp_timer_->Setup(
327 remote_timestamps_.front().monotonic_timestamp_time);
328 scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
329 return;
330 } else {
331 scheduled_time_ = monotonic_clock::min_time;
332 }
333 }
334
335 // Sends the next timestamp in remote_timestamps_.
336 void SendTimestamp() {
337 CHECK(!remote_timestamps_.empty());
338
339 // Send out all timestamps at the currently scheduled time.
340 while (remote_timestamps_.front().monotonic_timestamp_time ==
341 scheduled_time_) {
342 if (server_connection_->state() == State::CONNECTED) {
milind1f1dca32021-07-03 13:50:07 -0700343 timestamp_logger_->CheckOk(timestamp_logger_->Send(
344 std::move(remote_timestamps_.front().remote_message)));
Austin Schuheeaa2022021-01-02 21:52:03 -0800345 }
346 remote_timestamps_.pop_front();
347 if (remote_timestamps_.empty()) {
348 break;
349 }
350 }
351
352 ScheduleTimestamp();
353 }
354
Austin Schuh898f4972020-01-11 17:21:25 -0800355 // Converts from time on the sending node to time on the receiving node.
356 monotonic_clock::time_point DeliveredTime(const Context &context) const {
357 const distributed_clock::time_point distributed_sent_time =
358 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
359
Austin Schuh58646e22021-08-23 23:51:46 -0700360 const logger::BootTimestamp t = send_node_factory_->FromDistributedClock(
Austin Schuh2febf0d2020-09-21 22:24:30 -0700361 distributed_sent_time + send_node_factory_->network_delay() +
362 send_node_factory_->send_delay());
Austin Schuh58646e22021-08-23 23:51:46 -0700363 CHECK_EQ(t.boot, send_node_factory_->boot_count());
364 return t.time;
Austin Schuh898f4972020-01-11 17:21:25 -0800365 }
366
Austin Schuh58646e22021-08-23 23:51:46 -0700367 const Channel *channel_;
368 const Connection *connection_;
369
Austin Schuh898f4972020-01-11 17:21:25 -0800370 // Factories used for time conversion.
371 aos::NodeEventLoopFactory *fetch_node_factory_;
372 aos::NodeEventLoopFactory *send_node_factory_;
373
Austin Schuheeaa2022021-01-02 21:52:03 -0800374 // Event loop which fetching and sending timestamps are scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700375 aos::EventLoop *fetch_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800376 // Event loop which sending is scheduled on.
Austin Schuh58646e22021-08-23 23:51:46 -0700377 aos::EventLoop *send_event_loop_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800378 // Timer used to send.
Austin Schuh58646e22021-08-23 23:51:46 -0700379 aos::TimerHandler *timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800380 // Timer used to send timestamps out.
Austin Schuh58646e22021-08-23 23:51:46 -0700381 aos::TimerHandler *timestamp_timer_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800382 // Time that the timer is scheduled for. Used to track if it needs to be
383 // rescheduled.
384 monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
385
Austin Schuh898f4972020-01-11 17:21:25 -0800386 // Fetcher used to receive messages.
387 std::unique_ptr<aos::RawFetcher> fetcher_;
388 // Sender to send them back out.
389 std::unique_ptr<aos::RawSender> sender_;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800390
Austin Schuh58646e22021-08-23 23:51:46 -0700391 MessageBridgeServerStatus *server_status_ = nullptr;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800392 const size_t destination_node_index_;
Austin Schuh898f4972020-01-11 17:21:25 -0800393 // True if we have sent the message in the fetcher.
394 bool sent_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700395
396 ServerConnection *server_connection_ = nullptr;
397 MessageBridgeClientStatus *client_status_ = nullptr;
Austin Schuh58646e22021-08-23 23:51:46 -0700398 int client_index_ = -1;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700399 ClientConnection *client_connection_ = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700400
401 size_t channel_index_;
Austin Schuh0de30f32020-12-06 12:44:28 -0800402 aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800403
404 struct Timestamp {
405 Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
406 monotonic_clock::time_point new_monotonic_timestamp_time)
407 : remote_message(std::move(new_remote_message)),
408 monotonic_timestamp_time(new_monotonic_timestamp_time) {}
409 FlatbufferDetachedBuffer<RemoteMessage> remote_message;
410 monotonic_clock::time_point monotonic_timestamp_time;
411 };
412
413 std::deque<Timestamp> remote_timestamps_;
Austin Schuh58646e22021-08-23 23:51:46 -0700414
415 bool delivery_time_is_logged_;
416
417 bool forwarding_disabled_ = false;
Austin Schuh898f4972020-01-11 17:21:25 -0800418};
419
420SimulatedMessageBridge::SimulatedMessageBridge(
421 SimulatedEventLoopFactory *simulated_event_loop_factory) {
422 CHECK(
423 configuration::MultiNode(simulated_event_loop_factory->configuration()));
424
425 // Pre-build up event loops for every node. They are pretty cheap anyways.
426 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700427 NodeEventLoopFactory *node_factory =
428 simulated_event_loop_factory->GetNodeEventLoopFactory(node);
429 auto it = event_loop_map_.emplace(node, node_factory);
Austin Schuhcde938c2020-02-02 17:30:07 -0800430 CHECK(it.second);
431
Austin Schuh58646e22021-08-23 23:51:46 -0700432 node_factory->OnStartup(
433 [this, simulated_event_loop_factory, node_state = &it.first->second]() {
434 node_state->MakeEventLoop();
435 const size_t my_node_index = configuration::GetNodeIndex(
436 simulated_event_loop_factory->configuration(),
437 node_state->event_loop->node());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700438
Austin Schuh58646e22021-08-23 23:51:46 -0700439 size_t node_index = 0;
440 for (ServerConnection *connection :
441 node_state->server_status->server_connection()) {
442 if (connection != nullptr) {
443 node_state->server_status->ResetFilter(node_index);
444 }
445 ++node_index;
446 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700447
Austin Schuh58646e22021-08-23 23:51:46 -0700448 for (const ClientConnection *client_connections :
449 *node_state->client_status->mutable_client_statistics()
450 ->connections()) {
451 const Node *client_node = configuration::GetNode(
452 simulated_event_loop_factory->configuration(),
453 client_connections->node()->name()->string_view());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700454
Austin Schuh58646e22021-08-23 23:51:46 -0700455 auto client_event_loop = event_loop_map_.find(client_node);
456 client_event_loop->second.SetBootUUID(
457 my_node_index, node_state->event_loop->boot_uuid());
458 }
459 });
Austin Schuh4c3b9702020-08-30 11:34:55 -0700460
Austin Schuh58646e22021-08-23 23:51:46 -0700461 node_factory->OnShutdown([node_state = &it.first->second]() {
462 node_state->SetEventLoop(nullptr);
463 });
Austin Schuh20ac95d2020-12-05 17:24:19 -0800464 }
465
Austin Schuh898f4972020-01-11 17:21:25 -0800466 for (const Channel *channel :
467 *simulated_event_loop_factory->configuration()->channels()) {
468 if (!channel->has_destination_nodes()) {
469 continue;
470 }
471
472 // Find the sending node.
Austin Schuh58646e22021-08-23 23:51:46 -0700473 const Node *source_node =
Austin Schuh898f4972020-01-11 17:21:25 -0800474 configuration::GetNode(simulated_event_loop_factory->configuration(),
475 channel->source_node()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -0700476 auto source_event_loop = event_loop_map_.find(source_node);
Austin Schuh898f4972020-01-11 17:21:25 -0800477 CHECK(source_event_loop != event_loop_map_.end());
478
479 std::unique_ptr<DelayersVector> delayers =
480 std::make_unique<DelayersVector>();
481
482 // And then build up a RawMessageDelayer for each destination.
483 for (const Connection *connection : *channel->destination_nodes()) {
484 const Node *destination_node =
485 configuration::GetNode(simulated_event_loop_factory->configuration(),
486 connection->name()->string_view());
487 auto destination_event_loop = event_loop_map_.find(destination_node);
488 CHECK(destination_event_loop != event_loop_map_.end());
489
Austin Schuh2f8fd752020-09-01 22:38:28 -0700490 const size_t destination_node_index = configuration::GetNodeIndex(
491 simulated_event_loop_factory->configuration(), destination_node);
492
493 const bool delivery_time_is_logged =
Austin Schuh58646e22021-08-23 23:51:46 -0700494 configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
495 source_node);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700496
Austin Schuh58646e22021-08-23 23:51:46 -0700497 delayers->v.emplace_back(std::make_unique<RawMessageDelayer>(
498 channel, connection,
499 simulated_event_loop_factory->GetNodeEventLoopFactory(source_node),
Austin Schuh898f4972020-01-11 17:21:25 -0800500 simulated_event_loop_factory->GetNodeEventLoopFactory(
501 destination_node),
Austin Schuh58646e22021-08-23 23:51:46 -0700502 destination_node_index, delivery_time_is_logged));
503
504 source_event_loop->second.AddSourceDelayer(delayers->v.back().get());
505 destination_event_loop->second.AddDestinationDelayer(
506 delayers->v.back().get());
Austin Schuh898f4972020-01-11 17:21:25 -0800507 }
508
Austin Schuh4c3b9702020-08-30 11:34:55 -0700509 const Channel *const timestamp_channel = configuration::GetChannel(
510 simulated_event_loop_factory->configuration(), "/aos",
Austin Schuh58646e22021-08-23 23:51:46 -0700511 Timestamp::GetFullyQualifiedName(), "message_bridge", source_node);
Austin Schuh4c3b9702020-08-30 11:34:55 -0700512
513 if (channel == timestamp_channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700514 source_event_loop->second.SetSendData(
Austin Schuh4c3b9702020-08-30 11:34:55 -0700515 [captured_delayers = delayers.get()](const Context &) {
516 for (std::unique_ptr<RawMessageDelayer> &delayer :
Austin Schuh58646e22021-08-23 23:51:46 -0700517 captured_delayers->v) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700518 delayer->Schedule();
519 }
520 });
521 } else {
Austin Schuh58646e22021-08-23 23:51:46 -0700522 source_event_loop->second.AddDelayerWatcher(channel, delayers.get());
Austin Schuh4c3b9702020-08-30 11:34:55 -0700523 }
Austin Schuh898f4972020-01-11 17:21:25 -0800524 delayers_list_.emplace_back(std::move(delayers));
525 }
526}
527
528SimulatedMessageBridge::~SimulatedMessageBridge() {}
529
Austin Schuh6f3babe2020-01-26 20:34:50 -0800530void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
Austin Schuh58646e22021-08-23 23:51:46 -0700531 for (std::unique_ptr<DelayersVector> &delayers : delayers_list_) {
532 if (delayers->v.size() > 0) {
533 if (delayers->v[0]->channel() == channel) {
534 delayers->disable_forwarding = true;
535 for (std::unique_ptr<RawMessageDelayer> &delayer : delayers->v) {
536 delayer->set_forwarding_disabled(true);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800537 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800538 }
539 }
540 }
541}
542
Austin Schuhc0b0f722020-12-12 18:36:06 -0800543void SimulatedMessageBridge::Disconnect(const Node *source,
544 const Node *destination) {
545 SetState(source, destination, message_bridge::State::DISCONNECTED);
546}
547
548void SimulatedMessageBridge::Connect(const Node *source,
549 const Node *destination) {
550 SetState(source, destination, message_bridge::State::CONNECTED);
551}
552void SimulatedMessageBridge::SetState(const Node *source,
553 const Node *destination,
554 message_bridge::State state) {
555 auto source_state = event_loop_map_.find(source);
556 CHECK(source_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700557 source_state->second.SetServerState(destination, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800558
559 auto destination_state = event_loop_map_.find(destination);
560 CHECK(destination_state != event_loop_map_.end());
Austin Schuh58646e22021-08-23 23:51:46 -0700561 destination_state->second.SetClientState(source, state);
Austin Schuhc0b0f722020-12-12 18:36:06 -0800562}
563
Austin Schuh4c3b9702020-08-30 11:34:55 -0700564void SimulatedMessageBridge::DisableStatistics() {
565 for (std::pair<const Node *const, State> &state : event_loop_map_) {
Austin Schuh58646e22021-08-23 23:51:46 -0700566 state.second.DisableStatistics();
Austin Schuh4c3b9702020-08-30 11:34:55 -0700567 }
568}
569
Austin Schuh48205e62021-11-12 14:13:18 -0800570void SimulatedMessageBridge::DisableStatistics(const Node *node) {
571 auto it = event_loop_map_.find(node);
572 CHECK(it != event_loop_map_.end());
573 it->second.DisableStatistics();
574}
575
576void SimulatedMessageBridge::EnableStatistics() {
577 for (std::pair<const Node *const, State> &state : event_loop_map_) {
578 state.second.EnableStatistics();
579 }
580}
581
582void SimulatedMessageBridge::EnableStatistics(const Node *node) {
583 auto it = event_loop_map_.find(node);
584 CHECK(it != event_loop_map_.end());
585 it->second.EnableStatistics();
586}
587
Austin Schuh58646e22021-08-23 23:51:46 -0700588void SimulatedMessageBridge::State::SetEventLoop(
589 std::unique_ptr<aos::EventLoop> loop) {
590 if (!loop) {
591 timestamp_loggers = ChannelTimestampSender(nullptr);
592 server_status.reset();
593 client_status.reset();
594 for (RawMessageDelayer *source_delayer : source_delayers_) {
595 source_delayer->SetFetchEventLoop(nullptr, nullptr, nullptr);
596 }
597 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
598 destination_delayer->SetSendEventLoop(nullptr, nullptr);
599 }
600 event_loop = std::move(loop);
601 return;
602 } else {
603 CHECK(!event_loop);
604 }
605 event_loop = std::move(loop);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700606
Austin Schuh58646e22021-08-23 23:51:46 -0700607 event_loop->SkipTimingReport();
608 event_loop->SkipAosLog();
609
610 for (std::pair<const Channel *, DelayersVector *> &watcher :
611 delayer_watchers_) {
612 // Don't register watchers if we know we aren't forwarding.
613 if (watcher.second->disable_forwarding) continue;
614 event_loop->MakeRawNoArgWatcher(
615 watcher.first, [captured_delayers = watcher.second](const Context &) {
616 // We might get told after registering, so don't forward at that point
617 // too.
618 for (std::unique_ptr<RawMessageDelayer> &delayer :
619 captured_delayers->v) {
620 delayer->Schedule();
621 }
622 });
623 }
624
625 timestamp_loggers = ChannelTimestampSender(event_loop.get());
626 server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
Austin Schuh48205e62021-11-12 14:13:18 -0800627 if (disable_statistics_) {
628 server_status->DisableStatistics();
629 }
Austin Schuh58646e22021-08-23 23:51:46 -0700630
631 {
632 size_t node_index = 0;
633 for (ServerConnection *connection : server_status->server_connection()) {
634 if (connection) {
635 if (boot_uuids_[node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800636 switch (server_state_[node_index]) {
637 case message_bridge::State::DISCONNECTED:
638 server_status->Disconnect(node_index);
639 break;
640 case message_bridge::State::CONNECTED:
641 server_status->Connect(node_index, event_loop->monotonic_now());
642 break;
643 }
Austin Schuh58646e22021-08-23 23:51:46 -0700644 } else {
Austin Schuh367a7f42021-11-23 23:04:36 -0800645 server_status->Disconnect(node_index);
Austin Schuh58646e22021-08-23 23:51:46 -0700646 }
647 }
648 ++node_index;
649 }
650 }
651
652 for (size_t i = 0; i < boot_uuids_.size(); ++i) {
653 if (boot_uuids_[i] != UUID::Zero()) {
654 server_status->SetBootUUID(i, boot_uuids_[i]);
655 }
656 }
Austin Schuh58646e22021-08-23 23:51:46 -0700657 if (fn_) {
658 server_status->set_send_data(fn_);
659 }
660 client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
661 if (disable_statistics_) {
662 client_status->DisableStatistics();
663 }
664
665 for (size_t i = 0;
666 i < client_status->mutable_client_statistics()->connections()->size();
667 ++i) {
668 ClientConnection *client_connection =
669 client_status->mutable_client_statistics()
670 ->mutable_connections()
671 ->GetMutableObject(i);
672 const Node *client_node = configuration::GetNode(
673 node_factory_->configuration(),
674 client_connection->node()->name()->string_view());
675 const size_t client_node_index = configuration::GetNodeIndex(
676 node_factory_->configuration(), client_node);
677 if (boot_uuids_[client_node_index] != UUID::Zero()) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800678 if (client_connection->state() != client_state_[client_node_index]) {
679 switch (client_state_[client_node_index]) {
680 case message_bridge::State::DISCONNECTED:
681 client_status->Disconnect(i);
682 break;
683 case message_bridge::State::CONNECTED:
684 client_status->Connect(i);
685 break;
686 }
687 }
Austin Schuh58646e22021-08-23 23:51:46 -0700688 } else {
Austin Schuh367a7f42021-11-23 23:04:36 -0800689 client_status->Disconnect(i);
Austin Schuh58646e22021-08-23 23:51:46 -0700690 }
691 }
692
Austin Schuh2f8fd752020-09-01 22:38:28 -0700693 for (const Channel *channel : *event_loop->configuration()->channels()) {
694 CHECK(channel->has_source_node());
695
696 // Sent by us.
697 if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
698 channel->has_destination_nodes()) {
699 for (const Connection *connection : *channel->destination_nodes()) {
700 const bool delivery_time_is_logged =
701 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
702 connection, event_loop->node());
703
704 // And the timestamps are then logged back by us again.
705 if (!delivery_time_is_logged) {
706 continue;
707 }
708
Austin Schuh89c9b812021-02-20 14:42:10 -0800709 timestamp_loggers.SenderForChannel(channel, connection);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700710 }
711 }
712 }
Austin Schuh58646e22021-08-23 23:51:46 -0700713
714 for (RawMessageDelayer *source_delayer : source_delayers_) {
715 source_delayer->SetFetchEventLoop(event_loop.get(), server_status.get(),
716 &timestamp_loggers);
717 }
718 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
719 destination_delayer->SetSendEventLoop(event_loop.get(),
720 client_status.get());
721 }
722 event_loop->OnRun([this]() {
723 for (RawMessageDelayer *destination_delayer : destination_delayers_) {
724 if (destination_delayer->time_to_live() == 0) {
725 destination_delayer->ScheduleReliable();
726 }
727 }
728 });
Austin Schuh2f8fd752020-09-01 22:38:28 -0700729}
730
Austin Schuh898f4972020-01-11 17:21:25 -0800731} // namespace message_bridge
732} // namespace aos