blob: 17ebcbc4e503942864c7be487e45cfdf935bc0cf [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
Austin Schuh0de30f32020-12-06 12:44:28 -08003#include "absl/strings/str_cat.h"
4#include "aos/configuration.h"
Austin Schuh898f4972020-01-11 17:21:25 -08005#include "aos/events/event_loop.h"
6#include "aos/events/simulated_event_loop.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08007#include "aos/network/remote_message_generated.h"
Austin Schuh898f4972020-01-11 17:21:25 -08008
9namespace aos {
10namespace message_bridge {
11
12// This class delays messages forwarded between two factories.
13//
14// The basic design is that we need to use the distributed_clock to convert
15// monotonic times from the source to the destination node. We also use a
16// fetcher to manage the queue of data, and a timer to schedule the sends.
17class RawMessageDelayer {
18 public:
19 RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
20 aos::NodeEventLoopFactory *send_node_factory,
Austin Schuheeaa2022021-01-02 21:52:03 -080021 aos::EventLoop *fetch_event_loop,
Austin Schuh898f4972020-01-11 17:21:25 -080022 aos::EventLoop *send_event_loop,
23 std::unique_ptr<aos::RawFetcher> fetcher,
Austin Schuh4c3b9702020-08-30 11:34:55 -070024 std::unique_ptr<aos::RawSender> sender,
Austin Schuh20ac95d2020-12-05 17:24:19 -080025 MessageBridgeServerStatus *server_status,
26 size_t destination_node_index,
Austin Schuh4c3b9702020-08-30 11:34:55 -070027 ServerConnection *server_connection, int client_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -070028 MessageBridgeClientStatus *client_status,
29 size_t channel_index,
Austin Schuh0de30f32020-12-06 12:44:28 -080030 aos::Sender<RemoteMessage> *timestamp_logger)
Austin Schuh898f4972020-01-11 17:21:25 -080031 : fetch_node_factory_(fetch_node_factory),
32 send_node_factory_(send_node_factory),
Austin Schuheeaa2022021-01-02 21:52:03 -080033 fetch_event_loop_(fetch_event_loop),
Austin Schuh898f4972020-01-11 17:21:25 -080034 send_event_loop_(send_event_loop),
35 fetcher_(std::move(fetcher)),
Austin Schuh4c3b9702020-08-30 11:34:55 -070036 sender_(std::move(sender)),
Austin Schuh20ac95d2020-12-05 17:24:19 -080037 server_status_(server_status),
38 destination_node_index_(destination_node_index),
Austin Schuh4c3b9702020-08-30 11:34:55 -070039 server_connection_(server_connection),
40 client_status_(client_status),
41 client_index_(client_index),
Austin Schuh2f8fd752020-09-01 22:38:28 -070042 client_connection_(client_status_->GetClientConnection(client_index)),
43 channel_index_(channel_index),
44 timestamp_logger_(timestamp_logger) {
Austin Schuh898f4972020-01-11 17:21:25 -080045 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
Austin Schuheeaa2022021-01-02 21:52:03 -080046 timestamp_timer_ =
47 fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
Austin Schuh898f4972020-01-11 17:21:25 -080048
49 Schedule();
50 }
51
Austin Schuh6f3babe2020-01-26 20:34:50 -080052 const Channel *channel() const { return fetcher_->channel(); }
53
Austin Schuh4c570ea2020-11-19 23:13:24 -080054 uint32_t time_to_live() {
55 return configuration::ConnectionToNode(sender_->channel(),
56 send_node_factory_->node())
57 ->time_to_live();
58 }
59
Austin Schuh898f4972020-01-11 17:21:25 -080060 // Kicks us to re-fetch and schedule the timer.
61 void Schedule() {
Austin Schuh6aa77be2020-02-22 21:06:40 -080062 // Keep pulling messages out of the fetcher until we find one in the future.
63 while (true) {
64 if (fetcher_->context().data == nullptr || sent_) {
65 sent_ = !fetcher_->FetchNext();
66 }
67 if (sent_) {
68 break;
69 }
Austin Schuhc0b0f722020-12-12 18:36:06 -080070
71 if (server_connection_->state() != State::CONNECTED) {
72 sent_ = true;
73 server_connection_->mutate_dropped_packets(
74 server_connection_->dropped_packets() + 1);
75 continue;
76 }
77
Austin Schuh6aa77be2020-02-22 21:06:40 -080078 if (fetcher_->context().monotonic_event_time +
79 send_node_factory_->network_delay() +
80 send_node_factory_->send_delay() >
81 fetch_node_factory_->monotonic_now()) {
82 break;
83 }
84
85 // TODO(austin): Not cool. We want to actually forward these. This means
86 // we need a more sophisticated concept of what is running.
87 LOG(WARNING) << "Not forwarding message on "
88 << configuration::CleanedChannelToString(fetcher_->channel())
89 << " because we aren't running. Set at "
90 << fetcher_->context().monotonic_event_time << " now is "
91 << fetch_node_factory_->monotonic_now();
92 sent_ = true;
Austin Schuh4c3b9702020-08-30 11:34:55 -070093 server_connection_->mutate_dropped_packets(
94 server_connection_->dropped_packets() + 1);
Austin Schuh898f4972020-01-11 17:21:25 -080095 }
96
97 if (fetcher_->context().data == nullptr) {
98 return;
99 }
100
101 if (sent_) {
102 return;
103 }
104
105 // Compute the time to publish this message.
106 const monotonic_clock::time_point monotonic_delivered_time =
107 DeliveredTime(fetcher_->context());
108
109 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
Austin Schuh2febf0d2020-09-21 22:24:30 -0700110 << ": Trying to deliver message in the past on channel "
111 << configuration::StrippedChannelToString(fetcher_->channel())
112 << " to node " << send_event_loop_->node()->name()->string_view()
113 << " sent from " << fetcher_->channel()->source_node()->string_view()
114 << " at " << fetch_node_factory_->monotonic_now();
Austin Schuh898f4972020-01-11 17:21:25 -0800115
Austin Schuh4c3b9702020-08-30 11:34:55 -0700116 server_connection_->mutate_sent_packets(server_connection_->sent_packets() +
117 1);
Austin Schuh898f4972020-01-11 17:21:25 -0800118 timer_->Setup(monotonic_delivered_time);
119 }
120
121 private:
122 // Acutally sends the message, and reschedules.
123 void Send() {
Austin Schuhc0b0f722020-12-12 18:36:06 -0800124 if (server_connection_->state() != State::CONNECTED) {
125 sent_ = true;
126 Schedule();
127 return;
128 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700129 // Fill out the send times.
Austin Schuh898f4972020-01-11 17:21:25 -0800130 sender_->Send(fetcher_->context().data, fetcher_->context().size,
131 fetcher_->context().monotonic_event_time,
132 fetcher_->context().realtime_event_time,
133 fetcher_->context().queue_index);
134
Austin Schuh4c3b9702020-08-30 11:34:55 -0700135 // And simulate message_bridge's offset recovery.
136 client_status_->SampleFilter(client_index_,
137 fetcher_->context().monotonic_event_time,
138 sender_->monotonic_sent_time());
139
140 client_connection_->mutate_received_packets(
141 client_connection_->received_packets() + 1);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700142
143 if (timestamp_logger_) {
Austin Schuheeaa2022021-01-02 21:52:03 -0800144 flatbuffers::FlatBufferBuilder fbb;
145 fbb.ForceDefaults(true);
Austin Schuh0de30f32020-12-06 12:44:28 -0800146 aos::Sender<RemoteMessage>::Builder builder =
Austin Schuh2f8fd752020-09-01 22:38:28 -0700147 timestamp_logger_->MakeBuilder();
148
Austin Schuh20ac95d2020-12-05 17:24:19 -0800149 // Reset the filter every time the UUID changes. There's probably a more
150 // clever way to do this, but that means a better concept of rebooting.
151 if (server_status_->BootUUID(destination_node_index_) !=
152 send_node_factory_->boot_uuid().string_view()) {
153 server_status_->ResetFilter(destination_node_index_);
154 server_status_->SetBootUUID(
155 destination_node_index_,
156 send_node_factory_->boot_uuid().string_view());
157 }
158
159 flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
Austin Schuheeaa2022021-01-02 21:52:03 -0800160 fbb.CreateString(
Austin Schuh20ac95d2020-12-05 17:24:19 -0800161 send_node_factory_->boot_uuid().string_view());
162
Austin Schuheeaa2022021-01-02 21:52:03 -0800163 RemoteMessage::Builder message_header_builder(fbb);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700164
165 message_header_builder.add_channel_index(channel_index_);
166
167 // Swap the remote and sent metrics. They are from the sender's
168 // perspective, not the receiver's perspective.
169 message_header_builder.add_monotonic_remote_time(
170 fetcher_->context().monotonic_event_time.time_since_epoch().count());
171 message_header_builder.add_realtime_remote_time(
172 fetcher_->context().realtime_event_time.time_since_epoch().count());
173 message_header_builder.add_remote_queue_index(
174 fetcher_->context().queue_index);
175
176 message_header_builder.add_monotonic_sent_time(
177 sender_->monotonic_sent_time().time_since_epoch().count());
178 message_header_builder.add_realtime_sent_time(
179 sender_->realtime_sent_time().time_since_epoch().count());
180 message_header_builder.add_queue_index(sender_->sent_queue_index());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800181 message_header_builder.add_boot_uuid(boot_uuid_offset);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700182
Austin Schuheeaa2022021-01-02 21:52:03 -0800183 fbb.Finish(message_header_builder.Finish());
184
185 remote_timestamps_.emplace_back(
186 FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
187 fetch_node_factory_->monotonic_now() +
188 send_node_factory_->network_delay());
189 ScheduleTimestamp();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700190 }
191
Austin Schuh898f4972020-01-11 17:21:25 -0800192 sent_ = true;
193 Schedule();
194 }
195
Austin Schuheeaa2022021-01-02 21:52:03 -0800196 // Schedules sending the next timestamp in remote_timestamps_ if there is one.
197 void ScheduleTimestamp() {
198 if (remote_timestamps_.empty()) {
199 timestamp_timer_->Disable();
200 return;
201 }
202
203 if (scheduled_time_ !=
204 remote_timestamps_.front().monotonic_timestamp_time) {
205 timestamp_timer_->Setup(
206 remote_timestamps_.front().monotonic_timestamp_time);
207 scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
208 return;
209 } else {
210 scheduled_time_ = monotonic_clock::min_time;
211 }
212 }
213
214 // Sends the next timestamp in remote_timestamps_.
215 void SendTimestamp() {
216 CHECK(!remote_timestamps_.empty());
217
218 // Send out all timestamps at the currently scheduled time.
219 while (remote_timestamps_.front().monotonic_timestamp_time ==
220 scheduled_time_) {
221 if (server_connection_->state() == State::CONNECTED) {
222 timestamp_logger_->Send(
223 std::move(remote_timestamps_.front().remote_message));
224 }
225 remote_timestamps_.pop_front();
226 if (remote_timestamps_.empty()) {
227 break;
228 }
229 }
230
231 ScheduleTimestamp();
232 }
233
Austin Schuh898f4972020-01-11 17:21:25 -0800234 // Converts from time on the sending node to time on the receiving node.
235 monotonic_clock::time_point DeliveredTime(const Context &context) const {
236 const distributed_clock::time_point distributed_sent_time =
237 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
238
Austin Schuh2febf0d2020-09-21 22:24:30 -0700239 return send_node_factory_->FromDistributedClock(
240 distributed_sent_time + send_node_factory_->network_delay() +
241 send_node_factory_->send_delay());
Austin Schuh898f4972020-01-11 17:21:25 -0800242 }
243
244 // Factories used for time conversion.
245 aos::NodeEventLoopFactory *fetch_node_factory_;
246 aos::NodeEventLoopFactory *send_node_factory_;
247
Austin Schuheeaa2022021-01-02 21:52:03 -0800248 // Event loop which fetching and sending timestamps are scheduled on.
249 aos::EventLoop *fetch_event_loop_;
Austin Schuh898f4972020-01-11 17:21:25 -0800250 // Event loop which sending is scheduled on.
251 aos::EventLoop *send_event_loop_;
252 // Timer used to send.
253 aos::TimerHandler *timer_;
Austin Schuheeaa2022021-01-02 21:52:03 -0800254 // Timer used to send timestamps out.
255 aos::TimerHandler *timestamp_timer_;
256 // Time that the timer is scheduled for. Used to track if it needs to be
257 // rescheduled.
258 monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
259
Austin Schuh898f4972020-01-11 17:21:25 -0800260 // Fetcher used to receive messages.
261 std::unique_ptr<aos::RawFetcher> fetcher_;
262 // Sender to send them back out.
263 std::unique_ptr<aos::RawSender> sender_;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800264
265 MessageBridgeServerStatus *server_status_;
266 const size_t destination_node_index_;
Austin Schuh898f4972020-01-11 17:21:25 -0800267 // True if we have sent the message in the fetcher.
268 bool sent_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700269
270 ServerConnection *server_connection_ = nullptr;
271 MessageBridgeClientStatus *client_status_ = nullptr;
272 int client_index_;
273 ClientConnection *client_connection_ = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700274
275 size_t channel_index_;
Austin Schuh0de30f32020-12-06 12:44:28 -0800276 aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
Austin Schuheeaa2022021-01-02 21:52:03 -0800277
278 struct Timestamp {
279 Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
280 monotonic_clock::time_point new_monotonic_timestamp_time)
281 : remote_message(std::move(new_remote_message)),
282 monotonic_timestamp_time(new_monotonic_timestamp_time) {}
283 FlatbufferDetachedBuffer<RemoteMessage> remote_message;
284 monotonic_clock::time_point monotonic_timestamp_time;
285 };
286
287 std::deque<Timestamp> remote_timestamps_;
Austin Schuh898f4972020-01-11 17:21:25 -0800288};
289
290SimulatedMessageBridge::SimulatedMessageBridge(
291 SimulatedEventLoopFactory *simulated_event_loop_factory) {
292 CHECK(
293 configuration::MultiNode(simulated_event_loop_factory->configuration()));
294
295 // Pre-build up event loops for every node. They are pretty cheap anyways.
296 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700297 auto it = event_loop_map_.emplace(std::make_pair(
298 node,
299 simulated_event_loop_factory->MakeEventLoop("message_bridge", node)));
Austin Schuhcde938c2020-02-02 17:30:07 -0800300
301 CHECK(it.second);
302
Austin Schuh4c3b9702020-08-30 11:34:55 -0700303 it.first->second.event_loop->SkipTimingReport();
304 it.first->second.event_loop->SkipAosLog();
305
306 for (ServerConnection *connection :
307 it.first->second.server_status.server_connection()) {
308 if (connection == nullptr) continue;
309
310 connection->mutate_state(message_bridge::State::CONNECTED);
311 }
312
313 for (size_t i = 0;
314 i < it.first->second.client_status.mutable_client_statistics()
315 ->mutable_connections()
316 ->size();
317 ++i) {
318 ClientConnection *connection =
319 it.first->second.client_status.mutable_client_statistics()
320 ->mutable_connections()
321 ->GetMutableObject(i);
322 if (connection == nullptr) continue;
323
324 connection->mutate_state(message_bridge::State::CONNECTED);
325 }
Austin Schuh898f4972020-01-11 17:21:25 -0800326 }
327
Austin Schuh20ac95d2020-12-05 17:24:19 -0800328 for (const Node *node : simulated_event_loop_factory->nodes()) {
329 auto it = event_loop_map_.find(node);
330
331 CHECK(it != event_loop_map_.end());
332
333 size_t node_index = 0;
334 for (ServerConnection *connection :
335 it->second.server_status.server_connection()) {
336 if (connection != nullptr) {
337 const Node *client_node =
338 simulated_event_loop_factory->configuration()->nodes()->Get(
339 node_index);
340 auto client_event_loop = event_loop_map_.find(client_node);
341 it->second.server_status.ResetFilter(node_index);
342 it->second.server_status.SetBootUUID(
343 node_index,
344 client_event_loop->second.event_loop->boot_uuid().string_view());
345 }
346 ++node_index;
347 }
348 }
349
Austin Schuh898f4972020-01-11 17:21:25 -0800350 for (const Channel *channel :
351 *simulated_event_loop_factory->configuration()->channels()) {
352 if (!channel->has_destination_nodes()) {
353 continue;
354 }
355
356 // Find the sending node.
357 const Node *node =
358 configuration::GetNode(simulated_event_loop_factory->configuration(),
359 channel->source_node()->string_view());
360 auto source_event_loop = event_loop_map_.find(node);
361 CHECK(source_event_loop != event_loop_map_.end());
362
363 std::unique_ptr<DelayersVector> delayers =
364 std::make_unique<DelayersVector>();
365
366 // And then build up a RawMessageDelayer for each destination.
367 for (const Connection *connection : *channel->destination_nodes()) {
368 const Node *destination_node =
369 configuration::GetNode(simulated_event_loop_factory->configuration(),
370 connection->name()->string_view());
371 auto destination_event_loop = event_loop_map_.find(destination_node);
372 CHECK(destination_event_loop != event_loop_map_.end());
373
Austin Schuh4c3b9702020-08-30 11:34:55 -0700374 ServerConnection *server_connection =
375 source_event_loop->second.server_status.FindServerConnection(
376 connection->name()->string_view());
377
378 int client_index =
379 destination_event_loop->second.client_status.FindClientIndex(
380 channel->source_node()->string_view());
381
Austin Schuh2f8fd752020-09-01 22:38:28 -0700382 const size_t destination_node_index = configuration::GetNodeIndex(
383 simulated_event_loop_factory->configuration(), destination_node);
384
385 const bool delivery_time_is_logged =
386 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
387 connection, source_event_loop->second.event_loop->node());
388
Austin Schuh898f4972020-01-11 17:21:25 -0800389 delayers->emplace_back(std::make_unique<RawMessageDelayer>(
390 simulated_event_loop_factory->GetNodeEventLoopFactory(node),
391 simulated_event_loop_factory->GetNodeEventLoopFactory(
392 destination_node),
Austin Schuheeaa2022021-01-02 21:52:03 -0800393 source_event_loop->second.event_loop.get(),
Austin Schuh4c3b9702020-08-30 11:34:55 -0700394 destination_event_loop->second.event_loop.get(),
395 source_event_loop->second.event_loop->MakeRawFetcher(channel),
396 destination_event_loop->second.event_loop->MakeRawSender(channel),
Austin Schuh20ac95d2020-12-05 17:24:19 -0800397 &source_event_loop->second.server_status, destination_node_index,
Austin Schuh4c3b9702020-08-30 11:34:55 -0700398 server_connection, client_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700399 &destination_event_loop->second.client_status,
400 configuration::ChannelIndex(
401 source_event_loop->second.event_loop->configuration(), channel),
402 delivery_time_is_logged
403 ? &source_event_loop->second
404 .timestamp_loggers[destination_node_index]
405 : nullptr));
Austin Schuh898f4972020-01-11 17:21:25 -0800406 }
407
Austin Schuh4c3b9702020-08-30 11:34:55 -0700408 const Channel *const timestamp_channel = configuration::GetChannel(
409 simulated_event_loop_factory->configuration(), "/aos",
410 Timestamp::GetFullyQualifiedName(),
411 source_event_loop->second.event_loop->name(), node);
412
413 if (channel == timestamp_channel) {
414 source_event_loop->second.server_status.set_send_data(
415 [captured_delayers = delayers.get()](const Context &) {
416 for (std::unique_ptr<RawMessageDelayer> &delayer :
417 *captured_delayers) {
418 delayer->Schedule();
419 }
420 });
421 } else {
422 // And register every delayer to be poked when a new message shows up.
Austin Schuh4c570ea2020-11-19 23:13:24 -0800423
424 source_event_loop->second.event_loop->OnRun([captured_delayers =
425 delayers.get()]() {
426 // Poke all the reliable delayers so they send any queued messages.
427 for (std::unique_ptr<RawMessageDelayer> &delayer : *captured_delayers) {
428 if (delayer->time_to_live() == 0) {
429 delayer->Schedule();
430 }
431 }
432 });
Austin Schuh4c3b9702020-08-30 11:34:55 -0700433 source_event_loop->second.event_loop->MakeRawNoArgWatcher(
434 channel, [captured_delayers = delayers.get()](const Context &) {
435 for (std::unique_ptr<RawMessageDelayer> &delayer :
436 *captured_delayers) {
437 delayer->Schedule();
438 }
439 });
440 }
Austin Schuh898f4972020-01-11 17:21:25 -0800441 delayers_list_.emplace_back(std::move(delayers));
442 }
443}
444
445SimulatedMessageBridge::~SimulatedMessageBridge() {}
446
Austin Schuh6f3babe2020-01-26 20:34:50 -0800447void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
448 for (std::unique_ptr<std::vector<std::unique_ptr<RawMessageDelayer>>>
449 &delayers : delayers_list_) {
450 if (delayers->size() > 0) {
451 if ((*delayers)[0]->channel() == channel) {
452 for (std::unique_ptr<RawMessageDelayer> &delayer : *delayers) {
453 CHECK(delayer->channel() == channel);
454 }
455
456 // If we clear the delayers list, nothing will be scheduled. Which is a
457 // success!
458 delayers->clear();
459 }
460 }
461 }
462}
463
Austin Schuhc0b0f722020-12-12 18:36:06 -0800464void SimulatedMessageBridge::Disconnect(const Node *source,
465 const Node *destination) {
466 SetState(source, destination, message_bridge::State::DISCONNECTED);
467}
468
469void SimulatedMessageBridge::Connect(const Node *source,
470 const Node *destination) {
471 SetState(source, destination, message_bridge::State::CONNECTED);
472}
473void SimulatedMessageBridge::SetState(const Node *source,
474 const Node *destination,
475 message_bridge::State state) {
476 auto source_state = event_loop_map_.find(source);
477 CHECK(source_state != event_loop_map_.end());
478
479 ServerConnection *server_connection =
480 source_state->second.server_status.FindServerConnection(destination);
481 if (!server_connection) {
482 return;
483 }
484 server_connection->mutate_state(state);
485
486 auto destination_state = event_loop_map_.find(destination);
487 CHECK(destination_state != event_loop_map_.end());
488 ClientConnection *client_connection =
489 destination_state->second.client_status.GetClientConnection(source);
490 if (!client_connection) {
491 return;
492 }
493 client_connection->mutate_state(state);
494}
495
Austin Schuh4c3b9702020-08-30 11:34:55 -0700496void SimulatedMessageBridge::DisableStatistics() {
497 for (std::pair<const Node *const, State> &state : event_loop_map_) {
498 state.second.server_status.DisableStatistics();
499 state.second.client_status.DisableStatistics();
500 }
501}
502
Austin Schuh2f8fd752020-09-01 22:38:28 -0700503SimulatedMessageBridge::State::State(
504 std::unique_ptr<aos::EventLoop> &&new_event_loop)
505 : event_loop(std::move(new_event_loop)),
506 server_status(event_loop.get()),
507 client_status(event_loop.get()) {
508 timestamp_loggers.resize(event_loop->configuration()->nodes()->size());
509
510 // Find all nodes which log timestamps back to us (from us).
511 for (const Channel *channel : *event_loop->configuration()->channels()) {
512 CHECK(channel->has_source_node());
513
514 // Sent by us.
515 if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
516 channel->has_destination_nodes()) {
517 for (const Connection *connection : *channel->destination_nodes()) {
518 const bool delivery_time_is_logged =
519 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
520 connection, event_loop->node());
521
522 // And the timestamps are then logged back by us again.
523 if (!delivery_time_is_logged) {
524 continue;
525 }
526
527 // (And only construct the sender if it hasn't been constructed)
528 const Node *other_node = configuration::GetNode(
529 event_loop->configuration(), connection->name()->string_view());
530 const size_t other_node_index = configuration::GetNodeIndex(
531 event_loop->configuration(), other_node);
532
533 if (!timestamp_loggers[other_node_index]) {
534 timestamp_loggers[other_node_index] =
Austin Schuh0de30f32020-12-06 12:44:28 -0800535 event_loop->MakeSender<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700536 absl::StrCat("/aos/remote_timestamps/",
537 connection->name()->string_view()));
538 }
539 }
540 }
541 }
542}
543
Austin Schuh898f4972020-01-11 17:21:25 -0800544} // namespace message_bridge
545} // namespace aos