blob: 0b79504432579950748101fce0a517305dd0b9db [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
Austin Schuh0de30f32020-12-06 12:44:28 -08003#include "absl/strings/str_cat.h"
4#include "aos/configuration.h"
Austin Schuh898f4972020-01-11 17:21:25 -08005#include "aos/events/event_loop.h"
6#include "aos/events/simulated_event_loop.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08007#include "aos/network/remote_message_generated.h"
Austin Schuh898f4972020-01-11 17:21:25 -08008
9namespace aos {
10namespace message_bridge {
11
12// This class delays messages forwarded between two factories.
13//
14// The basic design is that we need to use the distributed_clock to convert
15// monotonic times from the source to the destination node. We also use a
16// fetcher to manage the queue of data, and a timer to schedule the sends.
17class RawMessageDelayer {
18 public:
19 RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
20 aos::NodeEventLoopFactory *send_node_factory,
21 aos::EventLoop *send_event_loop,
22 std::unique_ptr<aos::RawFetcher> fetcher,
Austin Schuh4c3b9702020-08-30 11:34:55 -070023 std::unique_ptr<aos::RawSender> sender,
Austin Schuh20ac95d2020-12-05 17:24:19 -080024 MessageBridgeServerStatus *server_status,
25 size_t destination_node_index,
Austin Schuh4c3b9702020-08-30 11:34:55 -070026 ServerConnection *server_connection, int client_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -070027 MessageBridgeClientStatus *client_status,
28 size_t channel_index,
Austin Schuh0de30f32020-12-06 12:44:28 -080029 aos::Sender<RemoteMessage> *timestamp_logger)
Austin Schuh898f4972020-01-11 17:21:25 -080030 : fetch_node_factory_(fetch_node_factory),
31 send_node_factory_(send_node_factory),
32 send_event_loop_(send_event_loop),
33 fetcher_(std::move(fetcher)),
Austin Schuh4c3b9702020-08-30 11:34:55 -070034 sender_(std::move(sender)),
Austin Schuh20ac95d2020-12-05 17:24:19 -080035 server_status_(server_status),
36 destination_node_index_(destination_node_index),
Austin Schuh4c3b9702020-08-30 11:34:55 -070037 server_connection_(server_connection),
38 client_status_(client_status),
39 client_index_(client_index),
Austin Schuh2f8fd752020-09-01 22:38:28 -070040 client_connection_(client_status_->GetClientConnection(client_index)),
41 channel_index_(channel_index),
42 timestamp_logger_(timestamp_logger) {
Austin Schuh898f4972020-01-11 17:21:25 -080043 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
44
45 Schedule();
46 }
47
Austin Schuh6f3babe2020-01-26 20:34:50 -080048 const Channel *channel() const { return fetcher_->channel(); }
49
Austin Schuh4c570ea2020-11-19 23:13:24 -080050 uint32_t time_to_live() {
51 return configuration::ConnectionToNode(sender_->channel(),
52 send_node_factory_->node())
53 ->time_to_live();
54 }
55
Austin Schuh898f4972020-01-11 17:21:25 -080056 // Kicks us to re-fetch and schedule the timer.
57 void Schedule() {
Austin Schuh6aa77be2020-02-22 21:06:40 -080058 // Keep pulling messages out of the fetcher until we find one in the future.
59 while (true) {
60 if (fetcher_->context().data == nullptr || sent_) {
61 sent_ = !fetcher_->FetchNext();
62 }
63 if (sent_) {
64 break;
65 }
Austin Schuhc0b0f722020-12-12 18:36:06 -080066
67 if (server_connection_->state() != State::CONNECTED) {
68 sent_ = true;
69 server_connection_->mutate_dropped_packets(
70 server_connection_->dropped_packets() + 1);
71 continue;
72 }
73
Austin Schuh6aa77be2020-02-22 21:06:40 -080074 if (fetcher_->context().monotonic_event_time +
75 send_node_factory_->network_delay() +
76 send_node_factory_->send_delay() >
77 fetch_node_factory_->monotonic_now()) {
78 break;
79 }
80
81 // TODO(austin): Not cool. We want to actually forward these. This means
82 // we need a more sophisticated concept of what is running.
83 LOG(WARNING) << "Not forwarding message on "
84 << configuration::CleanedChannelToString(fetcher_->channel())
85 << " because we aren't running. Set at "
86 << fetcher_->context().monotonic_event_time << " now is "
87 << fetch_node_factory_->monotonic_now();
88 sent_ = true;
Austin Schuh4c3b9702020-08-30 11:34:55 -070089 server_connection_->mutate_dropped_packets(
90 server_connection_->dropped_packets() + 1);
Austin Schuh898f4972020-01-11 17:21:25 -080091 }
92
93 if (fetcher_->context().data == nullptr) {
94 return;
95 }
96
97 if (sent_) {
98 return;
99 }
100
101 // Compute the time to publish this message.
102 const monotonic_clock::time_point monotonic_delivered_time =
103 DeliveredTime(fetcher_->context());
104
105 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
Austin Schuh2febf0d2020-09-21 22:24:30 -0700106 << ": Trying to deliver message in the past on channel "
107 << configuration::StrippedChannelToString(fetcher_->channel())
108 << " to node " << send_event_loop_->node()->name()->string_view()
109 << " sent from " << fetcher_->channel()->source_node()->string_view()
110 << " at " << fetch_node_factory_->monotonic_now();
Austin Schuh898f4972020-01-11 17:21:25 -0800111
Austin Schuh4c3b9702020-08-30 11:34:55 -0700112 server_connection_->mutate_sent_packets(server_connection_->sent_packets() +
113 1);
Austin Schuh898f4972020-01-11 17:21:25 -0800114 timer_->Setup(monotonic_delivered_time);
115 }
116
117 private:
118 // Acutally sends the message, and reschedules.
119 void Send() {
Austin Schuhc0b0f722020-12-12 18:36:06 -0800120 if (server_connection_->state() != State::CONNECTED) {
121 sent_ = true;
122 Schedule();
123 return;
124 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700125 // Fill out the send times.
Austin Schuh898f4972020-01-11 17:21:25 -0800126 sender_->Send(fetcher_->context().data, fetcher_->context().size,
127 fetcher_->context().monotonic_event_time,
128 fetcher_->context().realtime_event_time,
129 fetcher_->context().queue_index);
130
Austin Schuh4c3b9702020-08-30 11:34:55 -0700131 // And simulate message_bridge's offset recovery.
132 client_status_->SampleFilter(client_index_,
133 fetcher_->context().monotonic_event_time,
134 sender_->monotonic_sent_time());
135
136 client_connection_->mutate_received_packets(
137 client_connection_->received_packets() + 1);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700138
139 if (timestamp_logger_) {
Austin Schuh0de30f32020-12-06 12:44:28 -0800140 aos::Sender<RemoteMessage>::Builder builder =
Austin Schuh2f8fd752020-09-01 22:38:28 -0700141 timestamp_logger_->MakeBuilder();
142
Austin Schuh20ac95d2020-12-05 17:24:19 -0800143 // Reset the filter every time the UUID changes. There's probably a more
144 // clever way to do this, but that means a better concept of rebooting.
145 if (server_status_->BootUUID(destination_node_index_) !=
146 send_node_factory_->boot_uuid().string_view()) {
147 server_status_->ResetFilter(destination_node_index_);
148 server_status_->SetBootUUID(
149 destination_node_index_,
150 send_node_factory_->boot_uuid().string_view());
151 }
152
153 flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
154 builder.fbb()->CreateString(
155 send_node_factory_->boot_uuid().string_view());
156
Austin Schuh0de30f32020-12-06 12:44:28 -0800157 RemoteMessage::Builder message_header_builder =
158 builder.MakeBuilder<RemoteMessage>();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700159
160 message_header_builder.add_channel_index(channel_index_);
161
162 // Swap the remote and sent metrics. They are from the sender's
163 // perspective, not the receiver's perspective.
164 message_header_builder.add_monotonic_remote_time(
165 fetcher_->context().monotonic_event_time.time_since_epoch().count());
166 message_header_builder.add_realtime_remote_time(
167 fetcher_->context().realtime_event_time.time_since_epoch().count());
168 message_header_builder.add_remote_queue_index(
169 fetcher_->context().queue_index);
170
171 message_header_builder.add_monotonic_sent_time(
172 sender_->monotonic_sent_time().time_since_epoch().count());
173 message_header_builder.add_realtime_sent_time(
174 sender_->realtime_sent_time().time_since_epoch().count());
175 message_header_builder.add_queue_index(sender_->sent_queue_index());
Austin Schuh20ac95d2020-12-05 17:24:19 -0800176 message_header_builder.add_boot_uuid(boot_uuid_offset);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700177
178 builder.Send(message_header_builder.Finish());
179 }
180
Austin Schuh898f4972020-01-11 17:21:25 -0800181 sent_ = true;
182 Schedule();
183 }
184
185 // Converts from time on the sending node to time on the receiving node.
186 monotonic_clock::time_point DeliveredTime(const Context &context) const {
187 const distributed_clock::time_point distributed_sent_time =
188 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
189
Austin Schuh2febf0d2020-09-21 22:24:30 -0700190 return send_node_factory_->FromDistributedClock(
191 distributed_sent_time + send_node_factory_->network_delay() +
192 send_node_factory_->send_delay());
Austin Schuh898f4972020-01-11 17:21:25 -0800193 }
194
195 // Factories used for time conversion.
196 aos::NodeEventLoopFactory *fetch_node_factory_;
197 aos::NodeEventLoopFactory *send_node_factory_;
198
199 // Event loop which sending is scheduled on.
200 aos::EventLoop *send_event_loop_;
201 // Timer used to send.
202 aos::TimerHandler *timer_;
203 // Fetcher used to receive messages.
204 std::unique_ptr<aos::RawFetcher> fetcher_;
205 // Sender to send them back out.
206 std::unique_ptr<aos::RawSender> sender_;
Austin Schuh20ac95d2020-12-05 17:24:19 -0800207
208 MessageBridgeServerStatus *server_status_;
209 const size_t destination_node_index_;
Austin Schuh898f4972020-01-11 17:21:25 -0800210 // True if we have sent the message in the fetcher.
211 bool sent_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700212
213 ServerConnection *server_connection_ = nullptr;
214 MessageBridgeClientStatus *client_status_ = nullptr;
215 int client_index_;
216 ClientConnection *client_connection_ = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700217
218 size_t channel_index_;
Austin Schuh0de30f32020-12-06 12:44:28 -0800219 aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800220};
221
222SimulatedMessageBridge::SimulatedMessageBridge(
223 SimulatedEventLoopFactory *simulated_event_loop_factory) {
224 CHECK(
225 configuration::MultiNode(simulated_event_loop_factory->configuration()));
226
227 // Pre-build up event loops for every node. They are pretty cheap anyways.
228 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700229 auto it = event_loop_map_.emplace(std::make_pair(
230 node,
231 simulated_event_loop_factory->MakeEventLoop("message_bridge", node)));
Austin Schuhcde938c2020-02-02 17:30:07 -0800232
233 CHECK(it.second);
234
Austin Schuh4c3b9702020-08-30 11:34:55 -0700235 it.first->second.event_loop->SkipTimingReport();
236 it.first->second.event_loop->SkipAosLog();
237
238 for (ServerConnection *connection :
239 it.first->second.server_status.server_connection()) {
240 if (connection == nullptr) continue;
241
242 connection->mutate_state(message_bridge::State::CONNECTED);
243 }
244
245 for (size_t i = 0;
246 i < it.first->second.client_status.mutable_client_statistics()
247 ->mutable_connections()
248 ->size();
249 ++i) {
250 ClientConnection *connection =
251 it.first->second.client_status.mutable_client_statistics()
252 ->mutable_connections()
253 ->GetMutableObject(i);
254 if (connection == nullptr) continue;
255
256 connection->mutate_state(message_bridge::State::CONNECTED);
257 }
Austin Schuh898f4972020-01-11 17:21:25 -0800258 }
259
Austin Schuh20ac95d2020-12-05 17:24:19 -0800260 for (const Node *node : simulated_event_loop_factory->nodes()) {
261 auto it = event_loop_map_.find(node);
262
263 CHECK(it != event_loop_map_.end());
264
265 size_t node_index = 0;
266 for (ServerConnection *connection :
267 it->second.server_status.server_connection()) {
268 if (connection != nullptr) {
269 const Node *client_node =
270 simulated_event_loop_factory->configuration()->nodes()->Get(
271 node_index);
272 auto client_event_loop = event_loop_map_.find(client_node);
273 it->second.server_status.ResetFilter(node_index);
274 it->second.server_status.SetBootUUID(
275 node_index,
276 client_event_loop->second.event_loop->boot_uuid().string_view());
277 }
278 ++node_index;
279 }
280 }
281
Austin Schuh898f4972020-01-11 17:21:25 -0800282 for (const Channel *channel :
283 *simulated_event_loop_factory->configuration()->channels()) {
284 if (!channel->has_destination_nodes()) {
285 continue;
286 }
287
288 // Find the sending node.
289 const Node *node =
290 configuration::GetNode(simulated_event_loop_factory->configuration(),
291 channel->source_node()->string_view());
292 auto source_event_loop = event_loop_map_.find(node);
293 CHECK(source_event_loop != event_loop_map_.end());
294
295 std::unique_ptr<DelayersVector> delayers =
296 std::make_unique<DelayersVector>();
297
298 // And then build up a RawMessageDelayer for each destination.
299 for (const Connection *connection : *channel->destination_nodes()) {
300 const Node *destination_node =
301 configuration::GetNode(simulated_event_loop_factory->configuration(),
302 connection->name()->string_view());
303 auto destination_event_loop = event_loop_map_.find(destination_node);
304 CHECK(destination_event_loop != event_loop_map_.end());
305
Austin Schuh4c3b9702020-08-30 11:34:55 -0700306 ServerConnection *server_connection =
307 source_event_loop->second.server_status.FindServerConnection(
308 connection->name()->string_view());
309
310 int client_index =
311 destination_event_loop->second.client_status.FindClientIndex(
312 channel->source_node()->string_view());
313
Austin Schuh2f8fd752020-09-01 22:38:28 -0700314 const size_t destination_node_index = configuration::GetNodeIndex(
315 simulated_event_loop_factory->configuration(), destination_node);
316
317 const bool delivery_time_is_logged =
318 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
319 connection, source_event_loop->second.event_loop->node());
320
Austin Schuh898f4972020-01-11 17:21:25 -0800321 delayers->emplace_back(std::make_unique<RawMessageDelayer>(
322 simulated_event_loop_factory->GetNodeEventLoopFactory(node),
323 simulated_event_loop_factory->GetNodeEventLoopFactory(
324 destination_node),
Austin Schuh4c3b9702020-08-30 11:34:55 -0700325 destination_event_loop->second.event_loop.get(),
326 source_event_loop->second.event_loop->MakeRawFetcher(channel),
327 destination_event_loop->second.event_loop->MakeRawSender(channel),
Austin Schuh20ac95d2020-12-05 17:24:19 -0800328 &source_event_loop->second.server_status, destination_node_index,
Austin Schuh4c3b9702020-08-30 11:34:55 -0700329 server_connection, client_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700330 &destination_event_loop->second.client_status,
331 configuration::ChannelIndex(
332 source_event_loop->second.event_loop->configuration(), channel),
333 delivery_time_is_logged
334 ? &source_event_loop->second
335 .timestamp_loggers[destination_node_index]
336 : nullptr));
Austin Schuh898f4972020-01-11 17:21:25 -0800337 }
338
Austin Schuh4c3b9702020-08-30 11:34:55 -0700339 const Channel *const timestamp_channel = configuration::GetChannel(
340 simulated_event_loop_factory->configuration(), "/aos",
341 Timestamp::GetFullyQualifiedName(),
342 source_event_loop->second.event_loop->name(), node);
343
344 if (channel == timestamp_channel) {
345 source_event_loop->second.server_status.set_send_data(
346 [captured_delayers = delayers.get()](const Context &) {
347 for (std::unique_ptr<RawMessageDelayer> &delayer :
348 *captured_delayers) {
349 delayer->Schedule();
350 }
351 });
352 } else {
353 // And register every delayer to be poked when a new message shows up.
Austin Schuh4c570ea2020-11-19 23:13:24 -0800354
355 source_event_loop->second.event_loop->OnRun([captured_delayers =
356 delayers.get()]() {
357 // Poke all the reliable delayers so they send any queued messages.
358 for (std::unique_ptr<RawMessageDelayer> &delayer : *captured_delayers) {
359 if (delayer->time_to_live() == 0) {
360 delayer->Schedule();
361 }
362 }
363 });
Austin Schuh4c3b9702020-08-30 11:34:55 -0700364 source_event_loop->second.event_loop->MakeRawNoArgWatcher(
365 channel, [captured_delayers = delayers.get()](const Context &) {
366 for (std::unique_ptr<RawMessageDelayer> &delayer :
367 *captured_delayers) {
368 delayer->Schedule();
369 }
370 });
371 }
Austin Schuh898f4972020-01-11 17:21:25 -0800372 delayers_list_.emplace_back(std::move(delayers));
373 }
374}
375
376SimulatedMessageBridge::~SimulatedMessageBridge() {}
377
Austin Schuh6f3babe2020-01-26 20:34:50 -0800378void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
379 for (std::unique_ptr<std::vector<std::unique_ptr<RawMessageDelayer>>>
380 &delayers : delayers_list_) {
381 if (delayers->size() > 0) {
382 if ((*delayers)[0]->channel() == channel) {
383 for (std::unique_ptr<RawMessageDelayer> &delayer : *delayers) {
384 CHECK(delayer->channel() == channel);
385 }
386
387 // If we clear the delayers list, nothing will be scheduled. Which is a
388 // success!
389 delayers->clear();
390 }
391 }
392 }
393}
394
Austin Schuhc0b0f722020-12-12 18:36:06 -0800395void SimulatedMessageBridge::Disconnect(const Node *source,
396 const Node *destination) {
397 SetState(source, destination, message_bridge::State::DISCONNECTED);
398}
399
400void SimulatedMessageBridge::Connect(const Node *source,
401 const Node *destination) {
402 SetState(source, destination, message_bridge::State::CONNECTED);
403}
404void SimulatedMessageBridge::SetState(const Node *source,
405 const Node *destination,
406 message_bridge::State state) {
407 auto source_state = event_loop_map_.find(source);
408 CHECK(source_state != event_loop_map_.end());
409
410 ServerConnection *server_connection =
411 source_state->second.server_status.FindServerConnection(destination);
412 if (!server_connection) {
413 return;
414 }
415 server_connection->mutate_state(state);
416
417 auto destination_state = event_loop_map_.find(destination);
418 CHECK(destination_state != event_loop_map_.end());
419 ClientConnection *client_connection =
420 destination_state->second.client_status.GetClientConnection(source);
421 if (!client_connection) {
422 return;
423 }
424 client_connection->mutate_state(state);
425}
426
Austin Schuh4c3b9702020-08-30 11:34:55 -0700427void SimulatedMessageBridge::DisableStatistics() {
428 for (std::pair<const Node *const, State> &state : event_loop_map_) {
429 state.second.server_status.DisableStatistics();
430 state.second.client_status.DisableStatistics();
431 }
432}
433
Austin Schuh2f8fd752020-09-01 22:38:28 -0700434SimulatedMessageBridge::State::State(
435 std::unique_ptr<aos::EventLoop> &&new_event_loop)
436 : event_loop(std::move(new_event_loop)),
437 server_status(event_loop.get()),
438 client_status(event_loop.get()) {
439 timestamp_loggers.resize(event_loop->configuration()->nodes()->size());
440
441 // Find all nodes which log timestamps back to us (from us).
442 for (const Channel *channel : *event_loop->configuration()->channels()) {
443 CHECK(channel->has_source_node());
444
445 // Sent by us.
446 if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
447 channel->has_destination_nodes()) {
448 for (const Connection *connection : *channel->destination_nodes()) {
449 const bool delivery_time_is_logged =
450 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
451 connection, event_loop->node());
452
453 // And the timestamps are then logged back by us again.
454 if (!delivery_time_is_logged) {
455 continue;
456 }
457
458 // (And only construct the sender if it hasn't been constructed)
459 const Node *other_node = configuration::GetNode(
460 event_loop->configuration(), connection->name()->string_view());
461 const size_t other_node_index = configuration::GetNodeIndex(
462 event_loop->configuration(), other_node);
463
464 if (!timestamp_loggers[other_node_index]) {
465 timestamp_loggers[other_node_index] =
Austin Schuh0de30f32020-12-06 12:44:28 -0800466 event_loop->MakeSender<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700467 absl::StrCat("/aos/remote_timestamps/",
468 connection->name()->string_view()));
469 }
470 }
471 }
472 }
473}
474
Austin Schuh898f4972020-01-11 17:21:25 -0800475} // namespace message_bridge
476} // namespace aos