blob: 40b468fe17d70f24711090d5e1a1e5c756a2187d [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
Austin Schuh0de30f32020-12-06 12:44:28 -08003#include "absl/strings/str_cat.h"
4#include "aos/configuration.h"
Austin Schuh898f4972020-01-11 17:21:25 -08005#include "aos/events/event_loop.h"
6#include "aos/events/simulated_event_loop.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08007#include "aos/network/remote_message_generated.h"
Austin Schuh898f4972020-01-11 17:21:25 -08008
9namespace aos {
10namespace message_bridge {
11
12// This class delays messages forwarded between two factories.
13//
14// The basic design is that we need to use the distributed_clock to convert
15// monotonic times from the source to the destination node. We also use a
16// fetcher to manage the queue of data, and a timer to schedule the sends.
17class RawMessageDelayer {
18 public:
19 RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
20 aos::NodeEventLoopFactory *send_node_factory,
Austin Schuheeaa2022021-01-02 21:52:03 -080021 aos::EventLoop *fetch_event_loop,
Austin Schuh898f4972020-01-11 17:21:25 -080022 aos::EventLoop *send_event_loop,
23 std::unique_ptr<aos::RawFetcher> fetcher,
Austin Schuh4c3b9702020-08-30 11:34:55 -070024 std::unique_ptr<aos::RawSender> sender,
Austin Schuh20ac95d2020-12-05 17:24:19 -080025 MessageBridgeServerStatus *server_status,
26 size_t destination_node_index,
Austin Schuh4c3b9702020-08-30 11:34:55 -070027 ServerConnection *server_connection, int client_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -070028 MessageBridgeClientStatus *client_status,
29 size_t channel_index,
Austin Schuh0de30f32020-12-06 12:44:28 -080030 aos::Sender<RemoteMessage> *timestamp_logger)
Austin Schuh898f4972020-01-11 17:21:25 -080031 : fetch_node_factory_(fetch_node_factory),
32 send_node_factory_(send_node_factory),
Austin Schuheeaa2022021-01-02 21:52:03 -080033 fetch_event_loop_(fetch_event_loop),
Austin Schuh898f4972020-01-11 17:21:25 -080034 send_event_loop_(send_event_loop),
35 fetcher_(std::move(fetcher)),
Austin Schuh4c3b9702020-08-30 11:34:55 -070036 sender_(std::move(sender)),
Austin Schuh20ac95d2020-12-05 17:24:19 -080037 server_status_(server_status),
38 destination_node_index_(destination_node_index),
Austin Schuh4c3b9702020-08-30 11:34:55 -070039 server_connection_(server_connection),
40 client_status_(client_status),
41 client_index_(client_index),
Austin Schuh2f8fd752020-09-01 22:38:28 -070042 client_connection_(client_status_->GetClientConnection(client_index)),
43 channel_index_(channel_index),
44 timestamp_logger_(timestamp_logger) {
Austin Schuh898f4972020-01-11 17:21:25 -080045 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
Austin Schuheaf54272021-02-07 22:51:34 -080046 std::string timer_name =
47 absl::StrCat(send_event_loop_->node()->name()->string_view(), " ",
48 fetcher_->channel()->name()->string_view(), " ",
49 fetcher_->channel()->type()->string_view());
50 timer_->set_name(timer_name);
Austin Schuheeaa2022021-01-02 21:52:03 -080051 timestamp_timer_ =
52 fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
Austin Schuheaf54272021-02-07 22:51:34 -080053 timestamp_timer_->set_name(absl::StrCat(timer_name, " timestamps"));
Austin Schuh898f4972020-01-11 17:21:25 -080054
55 Schedule();
56 }
57
Austin Schuh6f3babe2020-01-26 20:34:50 -080058 const Channel *channel() const { return fetcher_->channel(); }
59
Austin Schuh4c570ea2020-11-19 23:13:24 -080060 uint32_t time_to_live() {
61 return configuration::ConnectionToNode(sender_->channel(),
62 send_node_factory_->node())
63 ->time_to_live();
64 }
65
Austin Schuh898f4972020-01-11 17:21:25 -080066 // Kicks us to re-fetch and schedule the timer.
67 void Schedule() {
Austin Schuh6aa77be2020-02-22 21:06:40 -080068 // Keep pulling messages out of the fetcher until we find one in the future.
69 while (true) {
70 if (fetcher_->context().data == nullptr || sent_) {
71 sent_ = !fetcher_->FetchNext();
72 }
73 if (sent_) {
74 break;
75 }
Austin Schuhc0b0f722020-12-12 18:36:06 -080076
77 if (server_connection_->state() != State::CONNECTED) {
78 sent_ = true;
79 server_connection_->mutate_dropped_packets(
80 server_connection_->dropped_packets() + 1);
81 continue;
82 }
83
Austin Schuh6aa77be2020-02-22 21:06:40 -080084 if (fetcher_->context().monotonic_event_time +
85 send_node_factory_->network_delay() +
86 send_node_factory_->send_delay() >
87 fetch_node_factory_->monotonic_now()) {
88 break;
89 }
90
91 // TODO(austin): Not cool. We want to actually forward these. This means
92 // we need a more sophisticated concept of what is running.
93 LOG(WARNING) << "Not forwarding message on "
94 << configuration::CleanedChannelToString(fetcher_->channel())
95 << " because we aren't running. Set at "
96 << fetcher_->context().monotonic_event_time << " now is "
97 << fetch_node_factory_->monotonic_now();
98 sent_ = true;
Austin Schuh4c3b9702020-08-30 11:34:55 -070099 server_connection_->mutate_dropped_packets(
100 server_connection_->dropped_packets() + 1);
Austin Schuh898f4972020-01-11 17:21:25 -0800101 }
102
103 if (fetcher_->context().data == nullptr) {
104 return;
105 }
106
107 if (sent_) {
108 return;
109 }
110
111 // Compute the time to publish this message.
112 const monotonic_clock::time_point monotonic_delivered_time =
113 DeliveredTime(fetcher_->context());
114
115 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
Austin Schuh2febf0d2020-09-21 22:24:30 -0700116 << ": Trying to deliver message in the past on channel "
117 << configuration::StrippedChannelToString(fetcher_->channel())
118 << " to node " << send_event_loop_->node()->name()->string_view()
119 << " sent from " << fetcher_->channel()->source_node()->string_view()
120 << " at " << fetch_node_factory_->monotonic_now();
Austin Schuh898f4972020-01-11 17:21:25 -0800121
Austin Schuh4c3b9702020-08-30 11:34:55 -0700122 server_connection_->mutate_sent_packets(server_connection_->sent_packets() +
123 1);
Austin Schuh898f4972020-01-11 17:21:25 -0800124 timer_->Setup(monotonic_delivered_time);
125 }
126
127 private:
128 // Acutally sends the message, and reschedules.
129 void Send() {
Austin Schuhc0b0f722020-12-12 18:36:06 -0800130 if (server_connection_->state() != State::CONNECTED) {
131 sent_ = true;
132 Schedule();
133 return;
134 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700135 // Fill out the send times.
Austin Schuh898f4972020-01-11 17:21:25 -0800136 sender_->Send(fetcher_->context().data, fetcher_->context().size,
137 fetcher_->context().monotonic_event_time,
138 fetcher_->context().realtime_event_time,
Austin Schuh8902fa52021-03-14 22:39:24 -0700139 fetcher_->context().queue_index,
140 fetcher_->context().remote_boot_uuid);
Austin Schuh898f4972020-01-11 17:21:25 -0800141
Austin Schuh4c3b9702020-08-30 11:34:55 -0700142 // And simulate message_bridge's offset recovery.
143 client_status_->SampleFilter(client_index_,
144 fetcher_->context().monotonic_event_time,
145 sender_->monotonic_sent_time());
146
147 client_connection_->mutate_received_packets(
148 client_connection_->received_packets() + 1);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700149
150 if (timestamp_logger_) {
Austin Schuheeaa2022021-01-02 21:52:03 -0800151 flatbuffers::FlatBufferBuilder fbb;
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800152 fbb.ForceDefaults(true);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800153 // Reset the filter every time the UUID changes. There's probably a more
154 // clever way to do this, but that means a better concept of rebooting.
155 if (server_status_->BootUUID(destination_node_index_) !=
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800156 send_node_factory_->boot_uuid()) {
Austin Schuh20ac95d2020-12-05 17:24:19 -0800157 server_status_->ResetFilter(destination_node_index_);
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800158 server_status_->SetBootUUID(destination_node_index_,
159 send_node_factory_->boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800160 }
161
Austin Schuhcdd90272021-03-15 12:46:16 -0700162 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
163 send_node_factory_->boot_uuid().PackVector(&fbb);
Austin Schuh20ac95d2020-12-05 17:24:19 -0800164
Austin Schuheeaa2022021-01-02 21:52:03 -0800165 RemoteMessage::Builder message_header_builder(fbb);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700166
167 message_header_builder.add_channel_index(channel_index_);
168
169 // Swap the remote and sent metrics. They are from the sender's
170 // perspective, not the receiver's perspective.
171 message_header_builder.add_monotonic_remote_time(
172 fetcher_->context().monotonic_event_time.time_since_epoch().count());
173 message_header_builder.add_realtime_remote_time(
174 fetcher_->context().realtime_event_time.time_since_epoch().count());
175 message_header_builder.add_remote_queue_index(
176 fetcher_->context().queue_index);
177
178 message_header_builder.add_monotonic_sent_time(
179 sender_->monotonic_sent_time().time_since_epoch().count());
180 message_header_builder.add_realtime_sent_time(
181 sender_->realtime_sent_time().time_since_epoch().count());
182 message_header_builder.add_queue_index(sender_->sent_queue_index());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800183 message_header_builder.add_boot_uuid(boot_uuid_offset);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700184
Austin Schuheeaa2022021-01-02 21:52:03 -0800185 fbb.Finish(message_header_builder.Finish());
186
187 remote_timestamps_.emplace_back(
188 FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
189 fetch_node_factory_->monotonic_now() +
190 send_node_factory_->network_delay());
191 ScheduleTimestamp();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700192 }
193
Austin Schuh898f4972020-01-11 17:21:25 -0800194 sent_ = true;
195 Schedule();
196 }
197
Austin Schuheeaa2022021-01-02 21:52:03 -0800198 // Schedules sending the next timestamp in remote_timestamps_ if there is one.
199 void ScheduleTimestamp() {
200 if (remote_timestamps_.empty()) {
201 timestamp_timer_->Disable();
202 return;
203 }
204
205 if (scheduled_time_ !=
206 remote_timestamps_.front().monotonic_timestamp_time) {
207 timestamp_timer_->Setup(
208 remote_timestamps_.front().monotonic_timestamp_time);
209 scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
210 return;
211 } else {
212 scheduled_time_ = monotonic_clock::min_time;
213 }
214 }
215
216 // Sends the next timestamp in remote_timestamps_.
217 void SendTimestamp() {
218 CHECK(!remote_timestamps_.empty());
219
220 // Send out all timestamps at the currently scheduled time.
221 while (remote_timestamps_.front().monotonic_timestamp_time ==
222 scheduled_time_) {
223 if (server_connection_->state() == State::CONNECTED) {
224 timestamp_logger_->Send(
225 std::move(remote_timestamps_.front().remote_message));
226 }
227 remote_timestamps_.pop_front();
228 if (remote_timestamps_.empty()) {
229 break;
230 }
231 }
232
233 ScheduleTimestamp();
234 }
235
Austin Schuh898f4972020-01-11 17:21:25 -0800236 // Converts from time on the sending node to time on the receiving node.
237 monotonic_clock::time_point DeliveredTime(const Context &context) const {
238 const distributed_clock::time_point distributed_sent_time =
239 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
240
Austin Schuh2febf0d2020-09-21 22:24:30 -0700241 return send_node_factory_->FromDistributedClock(
242 distributed_sent_time + send_node_factory_->network_delay() +
243 send_node_factory_->send_delay());
Austin Schuh898f4972020-01-11 17:21:25 -0800244 }
245
246 // Factories used for time conversion.
247 aos::NodeEventLoopFactory *fetch_node_factory_;
248 aos::NodeEventLoopFactory *send_node_factory_;
249
Austin Schuheeaa2022021-01-02 21:52:03 -0800250 // Event loop which fetching and sending timestamps are scheduled on.
251 aos::EventLoop *fetch_event_loop_;
Austin Schuh898f4972020-01-11 17:21:25 -0800252 // Event loop which sending is scheduled on.
253 aos::EventLoop *send_event_loop_;
254 // Timer used to send.
255 aos::TimerHandler *timer_;
Austin Schuheeaa2022021-01-02 21:52:03 -0800256 // Timer used to send timestamps out.
257 aos::TimerHandler *timestamp_timer_;
258 // Time that the timer is scheduled for. Used to track if it needs to be
259 // rescheduled.
260 monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
261
Austin Schuh898f4972020-01-11 17:21:25 -0800262 // Fetcher used to receive messages.
263 std::unique_ptr<aos::RawFetcher> fetcher_;
264 // Sender to send them back out.
265 std::unique_ptr<aos::RawSender> sender_;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800266
267 MessageBridgeServerStatus *server_status_;
268 const size_t destination_node_index_;
Austin Schuh898f4972020-01-11 17:21:25 -0800269 // True if we have sent the message in the fetcher.
270 bool sent_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700271
272 ServerConnection *server_connection_ = nullptr;
273 MessageBridgeClientStatus *client_status_ = nullptr;
274 int client_index_;
275 ClientConnection *client_connection_ = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700276
277 size_t channel_index_;
Austin Schuh0de30f32020-12-06 12:44:28 -0800278 aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800279
280 struct Timestamp {
281 Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
282 monotonic_clock::time_point new_monotonic_timestamp_time)
283 : remote_message(std::move(new_remote_message)),
284 monotonic_timestamp_time(new_monotonic_timestamp_time) {}
285 FlatbufferDetachedBuffer<RemoteMessage> remote_message;
286 monotonic_clock::time_point monotonic_timestamp_time;
287 };
288
289 std::deque<Timestamp> remote_timestamps_;
Austin Schuh898f4972020-01-11 17:21:25 -0800290};
291
292SimulatedMessageBridge::SimulatedMessageBridge(
293 SimulatedEventLoopFactory *simulated_event_loop_factory) {
294 CHECK(
295 configuration::MultiNode(simulated_event_loop_factory->configuration()));
296
297 // Pre-build up event loops for every node. They are pretty cheap anyways.
298 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700299 auto it = event_loop_map_.emplace(std::make_pair(
300 node,
301 simulated_event_loop_factory->MakeEventLoop("message_bridge", node)));
Austin Schuhcde938c2020-02-02 17:30:07 -0800302
303 CHECK(it.second);
304
Austin Schuh4c3b9702020-08-30 11:34:55 -0700305 it.first->second.event_loop->SkipTimingReport();
306 it.first->second.event_loop->SkipAosLog();
307
308 for (ServerConnection *connection :
309 it.first->second.server_status.server_connection()) {
310 if (connection == nullptr) continue;
311
312 connection->mutate_state(message_bridge::State::CONNECTED);
313 }
314
315 for (size_t i = 0;
316 i < it.first->second.client_status.mutable_client_statistics()
317 ->mutable_connections()
318 ->size();
319 ++i) {
320 ClientConnection *connection =
321 it.first->second.client_status.mutable_client_statistics()
322 ->mutable_connections()
323 ->GetMutableObject(i);
324 if (connection == nullptr) continue;
325
326 connection->mutate_state(message_bridge::State::CONNECTED);
327 }
Austin Schuh898f4972020-01-11 17:21:25 -0800328 }
329
Austin Schuh20ac95d2020-12-05 17:24:19 -0800330 for (const Node *node : simulated_event_loop_factory->nodes()) {
331 auto it = event_loop_map_.find(node);
332
333 CHECK(it != event_loop_map_.end());
334
335 size_t node_index = 0;
336 for (ServerConnection *connection :
337 it->second.server_status.server_connection()) {
338 if (connection != nullptr) {
339 const Node *client_node =
340 simulated_event_loop_factory->configuration()->nodes()->Get(
341 node_index);
342 auto client_event_loop = event_loop_map_.find(client_node);
343 it->second.server_status.ResetFilter(node_index);
344 it->second.server_status.SetBootUUID(
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800345 node_index, client_event_loop->second.event_loop->boot_uuid());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800346 }
347 ++node_index;
348 }
349 }
350
Austin Schuh898f4972020-01-11 17:21:25 -0800351 for (const Channel *channel :
352 *simulated_event_loop_factory->configuration()->channels()) {
353 if (!channel->has_destination_nodes()) {
354 continue;
355 }
356
357 // Find the sending node.
358 const Node *node =
359 configuration::GetNode(simulated_event_loop_factory->configuration(),
360 channel->source_node()->string_view());
361 auto source_event_loop = event_loop_map_.find(node);
362 CHECK(source_event_loop != event_loop_map_.end());
363
364 std::unique_ptr<DelayersVector> delayers =
365 std::make_unique<DelayersVector>();
366
367 // And then build up a RawMessageDelayer for each destination.
368 for (const Connection *connection : *channel->destination_nodes()) {
369 const Node *destination_node =
370 configuration::GetNode(simulated_event_loop_factory->configuration(),
371 connection->name()->string_view());
372 auto destination_event_loop = event_loop_map_.find(destination_node);
373 CHECK(destination_event_loop != event_loop_map_.end());
374
Austin Schuh4c3b9702020-08-30 11:34:55 -0700375 ServerConnection *server_connection =
376 source_event_loop->second.server_status.FindServerConnection(
377 connection->name()->string_view());
378
379 int client_index =
380 destination_event_loop->second.client_status.FindClientIndex(
381 channel->source_node()->string_view());
382
Austin Schuh2f8fd752020-09-01 22:38:28 -0700383 const size_t destination_node_index = configuration::GetNodeIndex(
384 simulated_event_loop_factory->configuration(), destination_node);
385
386 const bool delivery_time_is_logged =
387 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
388 connection, source_event_loop->second.event_loop->node());
389
Austin Schuh898f4972020-01-11 17:21:25 -0800390 delayers->emplace_back(std::make_unique<RawMessageDelayer>(
391 simulated_event_loop_factory->GetNodeEventLoopFactory(node),
392 simulated_event_loop_factory->GetNodeEventLoopFactory(
393 destination_node),
Austin Schuheeaa2022021-01-02 21:52:03 -0800394 source_event_loop->second.event_loop.get(),
Austin Schuh4c3b9702020-08-30 11:34:55 -0700395 destination_event_loop->second.event_loop.get(),
396 source_event_loop->second.event_loop->MakeRawFetcher(channel),
397 destination_event_loop->second.event_loop->MakeRawSender(channel),
Austin Schuh20ac95d2020-12-05 17:24:19 -0800398 &source_event_loop->second.server_status, destination_node_index,
Austin Schuh4c3b9702020-08-30 11:34:55 -0700399 server_connection, client_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700400 &destination_event_loop->second.client_status,
401 configuration::ChannelIndex(
402 source_event_loop->second.event_loop->configuration(), channel),
403 delivery_time_is_logged
Austin Schuh89c9b812021-02-20 14:42:10 -0800404 ? source_event_loop->second.timestamp_loggers.SenderForChannel(
405 channel, connection)
Austin Schuh2f8fd752020-09-01 22:38:28 -0700406 : nullptr));
Austin Schuh898f4972020-01-11 17:21:25 -0800407 }
408
Austin Schuh4c3b9702020-08-30 11:34:55 -0700409 const Channel *const timestamp_channel = configuration::GetChannel(
410 simulated_event_loop_factory->configuration(), "/aos",
411 Timestamp::GetFullyQualifiedName(),
412 source_event_loop->second.event_loop->name(), node);
413
414 if (channel == timestamp_channel) {
415 source_event_loop->second.server_status.set_send_data(
416 [captured_delayers = delayers.get()](const Context &) {
417 for (std::unique_ptr<RawMessageDelayer> &delayer :
418 *captured_delayers) {
419 delayer->Schedule();
420 }
421 });
422 } else {
423 // And register every delayer to be poked when a new message shows up.
Austin Schuh4c570ea2020-11-19 23:13:24 -0800424
425 source_event_loop->second.event_loop->OnRun([captured_delayers =
426 delayers.get()]() {
427 // Poke all the reliable delayers so they send any queued messages.
428 for (std::unique_ptr<RawMessageDelayer> &delayer : *captured_delayers) {
429 if (delayer->time_to_live() == 0) {
430 delayer->Schedule();
431 }
432 }
433 });
Austin Schuh4c3b9702020-08-30 11:34:55 -0700434 source_event_loop->second.event_loop->MakeRawNoArgWatcher(
435 channel, [captured_delayers = delayers.get()](const Context &) {
436 for (std::unique_ptr<RawMessageDelayer> &delayer :
437 *captured_delayers) {
438 delayer->Schedule();
439 }
440 });
441 }
Austin Schuh898f4972020-01-11 17:21:25 -0800442 delayers_list_.emplace_back(std::move(delayers));
443 }
444}
445
446SimulatedMessageBridge::~SimulatedMessageBridge() {}
447
Austin Schuh6f3babe2020-01-26 20:34:50 -0800448void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
449 for (std::unique_ptr<std::vector<std::unique_ptr<RawMessageDelayer>>>
450 &delayers : delayers_list_) {
451 if (delayers->size() > 0) {
452 if ((*delayers)[0]->channel() == channel) {
453 for (std::unique_ptr<RawMessageDelayer> &delayer : *delayers) {
454 CHECK(delayer->channel() == channel);
455 }
456
457 // If we clear the delayers list, nothing will be scheduled. Which is a
458 // success!
459 delayers->clear();
460 }
461 }
462 }
463}
464
Austin Schuhc0b0f722020-12-12 18:36:06 -0800465void SimulatedMessageBridge::Disconnect(const Node *source,
466 const Node *destination) {
467 SetState(source, destination, message_bridge::State::DISCONNECTED);
468}
469
470void SimulatedMessageBridge::Connect(const Node *source,
471 const Node *destination) {
472 SetState(source, destination, message_bridge::State::CONNECTED);
473}
474void SimulatedMessageBridge::SetState(const Node *source,
475 const Node *destination,
476 message_bridge::State state) {
477 auto source_state = event_loop_map_.find(source);
478 CHECK(source_state != event_loop_map_.end());
479
480 ServerConnection *server_connection =
481 source_state->second.server_status.FindServerConnection(destination);
482 if (!server_connection) {
483 return;
484 }
485 server_connection->mutate_state(state);
486
487 auto destination_state = event_loop_map_.find(destination);
488 CHECK(destination_state != event_loop_map_.end());
489 ClientConnection *client_connection =
490 destination_state->second.client_status.GetClientConnection(source);
491 if (!client_connection) {
492 return;
493 }
494 client_connection->mutate_state(state);
495}
496
Austin Schuh4c3b9702020-08-30 11:34:55 -0700497void SimulatedMessageBridge::DisableStatistics() {
498 for (std::pair<const Node *const, State> &state : event_loop_map_) {
499 state.second.server_status.DisableStatistics();
500 state.second.client_status.DisableStatistics();
501 }
502}
503
Austin Schuh2928ebe2021-02-07 22:10:27 -0800504void SimulatedMessageBridge::SkipTimingReport() {
505 for (std::pair<const Node *const, State> &state : event_loop_map_) {
506 state.second.event_loop->SkipTimingReport();
507 }
508}
509
Austin Schuh2f8fd752020-09-01 22:38:28 -0700510SimulatedMessageBridge::State::State(
511 std::unique_ptr<aos::EventLoop> &&new_event_loop)
512 : event_loop(std::move(new_event_loop)),
Austin Schuh89c9b812021-02-20 14:42:10 -0800513 timestamp_loggers(event_loop.get()),
Austin Schuh2f8fd752020-09-01 22:38:28 -0700514 server_status(event_loop.get()),
515 client_status(event_loop.get()) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700516
517 // Find all nodes which log timestamps back to us (from us).
518 for (const Channel *channel : *event_loop->configuration()->channels()) {
519 CHECK(channel->has_source_node());
520
521 // Sent by us.
522 if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
523 channel->has_destination_nodes()) {
524 for (const Connection *connection : *channel->destination_nodes()) {
525 const bool delivery_time_is_logged =
526 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
527 connection, event_loop->node());
528
529 // And the timestamps are then logged back by us again.
530 if (!delivery_time_is_logged) {
531 continue;
532 }
533
Austin Schuh89c9b812021-02-20 14:42:10 -0800534 timestamp_loggers.SenderForChannel(channel, connection);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700535 }
536 }
537 }
538}
539
Austin Schuh898f4972020-01-11 17:21:25 -0800540} // namespace message_bridge
541} // namespace aos