blob: b9ff86188c43f7cd7139785341b4299c70e8640f [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
3#include "aos/events/event_loop.h"
4#include "aos/events/simulated_event_loop.h"
5
6namespace aos {
7namespace message_bridge {
8
9// This class delays messages forwarded between two factories.
10//
11// The basic design is that we need to use the distributed_clock to convert
12// monotonic times from the source to the destination node. We also use a
13// fetcher to manage the queue of data, and a timer to schedule the sends.
14class RawMessageDelayer {
15 public:
16 RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
17 aos::NodeEventLoopFactory *send_node_factory,
18 aos::EventLoop *send_event_loop,
19 std::unique_ptr<aos::RawFetcher> fetcher,
Austin Schuh4c3b9702020-08-30 11:34:55 -070020 std::unique_ptr<aos::RawSender> sender,
21 ServerConnection *server_connection, int client_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -070022 MessageBridgeClientStatus *client_status,
23 size_t channel_index,
24 aos::Sender<logger::MessageHeader> *timestamp_logger)
Austin Schuh898f4972020-01-11 17:21:25 -080025 : fetch_node_factory_(fetch_node_factory),
26 send_node_factory_(send_node_factory),
27 send_event_loop_(send_event_loop),
28 fetcher_(std::move(fetcher)),
Austin Schuh4c3b9702020-08-30 11:34:55 -070029 sender_(std::move(sender)),
30 server_connection_(server_connection),
31 client_status_(client_status),
32 client_index_(client_index),
Austin Schuh2f8fd752020-09-01 22:38:28 -070033 client_connection_(client_status_->GetClientConnection(client_index)),
34 channel_index_(channel_index),
35 timestamp_logger_(timestamp_logger) {
Austin Schuh898f4972020-01-11 17:21:25 -080036 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
37
38 Schedule();
39 }
40
Austin Schuh6f3babe2020-01-26 20:34:50 -080041 const Channel *channel() const { return fetcher_->channel(); }
42
Austin Schuh4c570ea2020-11-19 23:13:24 -080043 uint32_t time_to_live() {
44 return configuration::ConnectionToNode(sender_->channel(),
45 send_node_factory_->node())
46 ->time_to_live();
47 }
48
Austin Schuh898f4972020-01-11 17:21:25 -080049 // Kicks us to re-fetch and schedule the timer.
50 void Schedule() {
Austin Schuh6aa77be2020-02-22 21:06:40 -080051 // Keep pulling messages out of the fetcher until we find one in the future.
52 while (true) {
53 if (fetcher_->context().data == nullptr || sent_) {
54 sent_ = !fetcher_->FetchNext();
55 }
56 if (sent_) {
57 break;
58 }
59 if (fetcher_->context().monotonic_event_time +
60 send_node_factory_->network_delay() +
61 send_node_factory_->send_delay() >
62 fetch_node_factory_->monotonic_now()) {
63 break;
64 }
65
66 // TODO(austin): Not cool. We want to actually forward these. This means
67 // we need a more sophisticated concept of what is running.
68 LOG(WARNING) << "Not forwarding message on "
69 << configuration::CleanedChannelToString(fetcher_->channel())
70 << " because we aren't running. Set at "
71 << fetcher_->context().monotonic_event_time << " now is "
72 << fetch_node_factory_->monotonic_now();
73 sent_ = true;
Austin Schuh4c3b9702020-08-30 11:34:55 -070074 server_connection_->mutate_dropped_packets(
75 server_connection_->dropped_packets() + 1);
Austin Schuh898f4972020-01-11 17:21:25 -080076 }
77
78 if (fetcher_->context().data == nullptr) {
79 return;
80 }
81
82 if (sent_) {
83 return;
84 }
85
86 // Compute the time to publish this message.
87 const monotonic_clock::time_point monotonic_delivered_time =
88 DeliveredTime(fetcher_->context());
89
90 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
Austin Schuh2febf0d2020-09-21 22:24:30 -070091 << ": Trying to deliver message in the past on channel "
92 << configuration::StrippedChannelToString(fetcher_->channel())
93 << " to node " << send_event_loop_->node()->name()->string_view()
94 << " sent from " << fetcher_->channel()->source_node()->string_view()
95 << " at " << fetch_node_factory_->monotonic_now();
Austin Schuh898f4972020-01-11 17:21:25 -080096
Austin Schuh4c3b9702020-08-30 11:34:55 -070097 server_connection_->mutate_sent_packets(server_connection_->sent_packets() +
98 1);
Austin Schuh898f4972020-01-11 17:21:25 -080099 timer_->Setup(monotonic_delivered_time);
100 }
101
102 private:
103 // Acutally sends the message, and reschedules.
104 void Send() {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700105 // Fill out the send times.
Austin Schuh898f4972020-01-11 17:21:25 -0800106 sender_->Send(fetcher_->context().data, fetcher_->context().size,
107 fetcher_->context().monotonic_event_time,
108 fetcher_->context().realtime_event_time,
109 fetcher_->context().queue_index);
110
Austin Schuh4c3b9702020-08-30 11:34:55 -0700111 // And simulate message_bridge's offset recovery.
112 client_status_->SampleFilter(client_index_,
113 fetcher_->context().monotonic_event_time,
114 sender_->monotonic_sent_time());
115
116 client_connection_->mutate_received_packets(
117 client_connection_->received_packets() + 1);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700118
119 if (timestamp_logger_) {
120 aos::Sender<logger::MessageHeader>::Builder builder =
121 timestamp_logger_->MakeBuilder();
122
123 logger::MessageHeader::Builder message_header_builder =
124 builder.MakeBuilder<logger::MessageHeader>();
125
126 message_header_builder.add_channel_index(channel_index_);
127
128 // Swap the remote and sent metrics. They are from the sender's
129 // perspective, not the receiver's perspective.
130 message_header_builder.add_monotonic_remote_time(
131 fetcher_->context().monotonic_event_time.time_since_epoch().count());
132 message_header_builder.add_realtime_remote_time(
133 fetcher_->context().realtime_event_time.time_since_epoch().count());
134 message_header_builder.add_remote_queue_index(
135 fetcher_->context().queue_index);
136
137 message_header_builder.add_monotonic_sent_time(
138 sender_->monotonic_sent_time().time_since_epoch().count());
139 message_header_builder.add_realtime_sent_time(
140 sender_->realtime_sent_time().time_since_epoch().count());
141 message_header_builder.add_queue_index(sender_->sent_queue_index());
142
143 builder.Send(message_header_builder.Finish());
144 }
145
Austin Schuh898f4972020-01-11 17:21:25 -0800146 sent_ = true;
147 Schedule();
148 }
149
150 // Converts from time on the sending node to time on the receiving node.
151 monotonic_clock::time_point DeliveredTime(const Context &context) const {
152 const distributed_clock::time_point distributed_sent_time =
153 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
154
Austin Schuh2febf0d2020-09-21 22:24:30 -0700155 return send_node_factory_->FromDistributedClock(
156 distributed_sent_time + send_node_factory_->network_delay() +
157 send_node_factory_->send_delay());
Austin Schuh898f4972020-01-11 17:21:25 -0800158 }
159
160 // Factories used for time conversion.
161 aos::NodeEventLoopFactory *fetch_node_factory_;
162 aos::NodeEventLoopFactory *send_node_factory_;
163
164 // Event loop which sending is scheduled on.
165 aos::EventLoop *send_event_loop_;
166 // Timer used to send.
167 aos::TimerHandler *timer_;
168 // Fetcher used to receive messages.
169 std::unique_ptr<aos::RawFetcher> fetcher_;
170 // Sender to send them back out.
171 std::unique_ptr<aos::RawSender> sender_;
172 // True if we have sent the message in the fetcher.
173 bool sent_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700174
175 ServerConnection *server_connection_ = nullptr;
176 MessageBridgeClientStatus *client_status_ = nullptr;
177 int client_index_;
178 ClientConnection *client_connection_ = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700179
180 size_t channel_index_;
181 aos::Sender<logger::MessageHeader> *timestamp_logger_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800182};
183
184SimulatedMessageBridge::SimulatedMessageBridge(
185 SimulatedEventLoopFactory *simulated_event_loop_factory) {
186 CHECK(
187 configuration::MultiNode(simulated_event_loop_factory->configuration()));
188
189 // Pre-build up event loops for every node. They are pretty cheap anyways.
190 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700191 auto it = event_loop_map_.emplace(std::make_pair(
192 node,
193 simulated_event_loop_factory->MakeEventLoop("message_bridge", node)));
Austin Schuhcde938c2020-02-02 17:30:07 -0800194
195 CHECK(it.second);
196
Austin Schuh4c3b9702020-08-30 11:34:55 -0700197 it.first->second.event_loop->SkipTimingReport();
198 it.first->second.event_loop->SkipAosLog();
199
200 for (ServerConnection *connection :
201 it.first->second.server_status.server_connection()) {
202 if (connection == nullptr) continue;
203
204 connection->mutate_state(message_bridge::State::CONNECTED);
205 }
206
207 for (size_t i = 0;
208 i < it.first->second.client_status.mutable_client_statistics()
209 ->mutable_connections()
210 ->size();
211 ++i) {
212 ClientConnection *connection =
213 it.first->second.client_status.mutable_client_statistics()
214 ->mutable_connections()
215 ->GetMutableObject(i);
216 if (connection == nullptr) continue;
217
218 connection->mutate_state(message_bridge::State::CONNECTED);
219 }
Austin Schuh898f4972020-01-11 17:21:25 -0800220 }
221
222 for (const Channel *channel :
223 *simulated_event_loop_factory->configuration()->channels()) {
224 if (!channel->has_destination_nodes()) {
225 continue;
226 }
227
228 // Find the sending node.
229 const Node *node =
230 configuration::GetNode(simulated_event_loop_factory->configuration(),
231 channel->source_node()->string_view());
232 auto source_event_loop = event_loop_map_.find(node);
233 CHECK(source_event_loop != event_loop_map_.end());
234
235 std::unique_ptr<DelayersVector> delayers =
236 std::make_unique<DelayersVector>();
237
238 // And then build up a RawMessageDelayer for each destination.
239 for (const Connection *connection : *channel->destination_nodes()) {
240 const Node *destination_node =
241 configuration::GetNode(simulated_event_loop_factory->configuration(),
242 connection->name()->string_view());
243 auto destination_event_loop = event_loop_map_.find(destination_node);
244 CHECK(destination_event_loop != event_loop_map_.end());
245
Austin Schuh4c3b9702020-08-30 11:34:55 -0700246 ServerConnection *server_connection =
247 source_event_loop->second.server_status.FindServerConnection(
248 connection->name()->string_view());
249
250 int client_index =
251 destination_event_loop->second.client_status.FindClientIndex(
252 channel->source_node()->string_view());
253
Austin Schuh2f8fd752020-09-01 22:38:28 -0700254 const size_t destination_node_index = configuration::GetNodeIndex(
255 simulated_event_loop_factory->configuration(), destination_node);
256
257 const bool delivery_time_is_logged =
258 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
259 connection, source_event_loop->second.event_loop->node());
260
Austin Schuh898f4972020-01-11 17:21:25 -0800261 delayers->emplace_back(std::make_unique<RawMessageDelayer>(
262 simulated_event_loop_factory->GetNodeEventLoopFactory(node),
263 simulated_event_loop_factory->GetNodeEventLoopFactory(
264 destination_node),
Austin Schuh4c3b9702020-08-30 11:34:55 -0700265 destination_event_loop->second.event_loop.get(),
266 source_event_loop->second.event_loop->MakeRawFetcher(channel),
267 destination_event_loop->second.event_loop->MakeRawSender(channel),
268 server_connection, client_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700269 &destination_event_loop->second.client_status,
270 configuration::ChannelIndex(
271 source_event_loop->second.event_loop->configuration(), channel),
272 delivery_time_is_logged
273 ? &source_event_loop->second
274 .timestamp_loggers[destination_node_index]
275 : nullptr));
Austin Schuh898f4972020-01-11 17:21:25 -0800276 }
277
Austin Schuh4c3b9702020-08-30 11:34:55 -0700278 const Channel *const timestamp_channel = configuration::GetChannel(
279 simulated_event_loop_factory->configuration(), "/aos",
280 Timestamp::GetFullyQualifiedName(),
281 source_event_loop->second.event_loop->name(), node);
282
283 if (channel == timestamp_channel) {
284 source_event_loop->second.server_status.set_send_data(
285 [captured_delayers = delayers.get()](const Context &) {
286 for (std::unique_ptr<RawMessageDelayer> &delayer :
287 *captured_delayers) {
288 delayer->Schedule();
289 }
290 });
291 } else {
292 // And register every delayer to be poked when a new message shows up.
Austin Schuh4c570ea2020-11-19 23:13:24 -0800293
294 source_event_loop->second.event_loop->OnRun([captured_delayers =
295 delayers.get()]() {
296 // Poke all the reliable delayers so they send any queued messages.
297 for (std::unique_ptr<RawMessageDelayer> &delayer : *captured_delayers) {
298 if (delayer->time_to_live() == 0) {
299 delayer->Schedule();
300 }
301 }
302 });
Austin Schuh4c3b9702020-08-30 11:34:55 -0700303 source_event_loop->second.event_loop->MakeRawNoArgWatcher(
304 channel, [captured_delayers = delayers.get()](const Context &) {
305 for (std::unique_ptr<RawMessageDelayer> &delayer :
306 *captured_delayers) {
307 delayer->Schedule();
308 }
309 });
310 }
Austin Schuh898f4972020-01-11 17:21:25 -0800311 delayers_list_.emplace_back(std::move(delayers));
312 }
313}
314
315SimulatedMessageBridge::~SimulatedMessageBridge() {}
316
Austin Schuh6f3babe2020-01-26 20:34:50 -0800317void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
318 for (std::unique_ptr<std::vector<std::unique_ptr<RawMessageDelayer>>>
319 &delayers : delayers_list_) {
320 if (delayers->size() > 0) {
321 if ((*delayers)[0]->channel() == channel) {
322 for (std::unique_ptr<RawMessageDelayer> &delayer : *delayers) {
323 CHECK(delayer->channel() == channel);
324 }
325
326 // If we clear the delayers list, nothing will be scheduled. Which is a
327 // success!
328 delayers->clear();
329 }
330 }
331 }
332}
333
Austin Schuh4c3b9702020-08-30 11:34:55 -0700334void SimulatedMessageBridge::DisableStatistics() {
335 for (std::pair<const Node *const, State> &state : event_loop_map_) {
336 state.second.server_status.DisableStatistics();
337 state.second.client_status.DisableStatistics();
338 }
339}
340
Austin Schuh2f8fd752020-09-01 22:38:28 -0700341SimulatedMessageBridge::State::State(
342 std::unique_ptr<aos::EventLoop> &&new_event_loop)
343 : event_loop(std::move(new_event_loop)),
344 server_status(event_loop.get()),
345 client_status(event_loop.get()) {
346 timestamp_loggers.resize(event_loop->configuration()->nodes()->size());
347
348 // Find all nodes which log timestamps back to us (from us).
349 for (const Channel *channel : *event_loop->configuration()->channels()) {
350 CHECK(channel->has_source_node());
351
352 // Sent by us.
353 if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
354 channel->has_destination_nodes()) {
355 for (const Connection *connection : *channel->destination_nodes()) {
356 const bool delivery_time_is_logged =
357 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
358 connection, event_loop->node());
359
360 // And the timestamps are then logged back by us again.
361 if (!delivery_time_is_logged) {
362 continue;
363 }
364
365 // (And only construct the sender if it hasn't been constructed)
366 const Node *other_node = configuration::GetNode(
367 event_loop->configuration(), connection->name()->string_view());
368 const size_t other_node_index = configuration::GetNodeIndex(
369 event_loop->configuration(), other_node);
370
371 if (!timestamp_loggers[other_node_index]) {
372 timestamp_loggers[other_node_index] =
373 event_loop->MakeSender<logger::MessageHeader>(
374 absl::StrCat("/aos/remote_timestamps/",
375 connection->name()->string_view()));
376 }
377 }
378 }
379 }
380}
381
Austin Schuh898f4972020-01-11 17:21:25 -0800382} // namespace message_bridge
383} // namespace aos