blob: 1ba140873dffb696cba7966a547d6e0df56f66b4 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
Austin Schuh0de30f32020-12-06 12:44:28 -08003#include "absl/strings/str_cat.h"
4#include "aos/configuration.h"
Austin Schuh898f4972020-01-11 17:21:25 -08005#include "aos/events/event_loop.h"
6#include "aos/events/simulated_event_loop.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08007#include "aos/network/remote_message_generated.h"
Austin Schuh898f4972020-01-11 17:21:25 -08008
9namespace aos {
10namespace message_bridge {
11
12// This class delays messages forwarded between two factories.
13//
14// The basic design is that we need to use the distributed_clock to convert
15// monotonic times from the source to the destination node. We also use a
16// fetcher to manage the queue of data, and a timer to schedule the sends.
17class RawMessageDelayer {
18 public:
19 RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
20 aos::NodeEventLoopFactory *send_node_factory,
21 aos::EventLoop *send_event_loop,
22 std::unique_ptr<aos::RawFetcher> fetcher,
Austin Schuh4c3b9702020-08-30 11:34:55 -070023 std::unique_ptr<aos::RawSender> sender,
24 ServerConnection *server_connection, int client_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -070025 MessageBridgeClientStatus *client_status,
26 size_t channel_index,
Austin Schuh0de30f32020-12-06 12:44:28 -080027 aos::Sender<RemoteMessage> *timestamp_logger)
Austin Schuh898f4972020-01-11 17:21:25 -080028 : fetch_node_factory_(fetch_node_factory),
29 send_node_factory_(send_node_factory),
30 send_event_loop_(send_event_loop),
31 fetcher_(std::move(fetcher)),
Austin Schuh4c3b9702020-08-30 11:34:55 -070032 sender_(std::move(sender)),
33 server_connection_(server_connection),
34 client_status_(client_status),
35 client_index_(client_index),
Austin Schuh2f8fd752020-09-01 22:38:28 -070036 client_connection_(client_status_->GetClientConnection(client_index)),
37 channel_index_(channel_index),
38 timestamp_logger_(timestamp_logger) {
Austin Schuh898f4972020-01-11 17:21:25 -080039 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
40
41 Schedule();
42 }
43
Austin Schuh6f3babe2020-01-26 20:34:50 -080044 const Channel *channel() const { return fetcher_->channel(); }
45
Austin Schuh4c570ea2020-11-19 23:13:24 -080046 uint32_t time_to_live() {
47 return configuration::ConnectionToNode(sender_->channel(),
48 send_node_factory_->node())
49 ->time_to_live();
50 }
51
Austin Schuh898f4972020-01-11 17:21:25 -080052 // Kicks us to re-fetch and schedule the timer.
53 void Schedule() {
Austin Schuh6aa77be2020-02-22 21:06:40 -080054 // Keep pulling messages out of the fetcher until we find one in the future.
55 while (true) {
56 if (fetcher_->context().data == nullptr || sent_) {
57 sent_ = !fetcher_->FetchNext();
58 }
59 if (sent_) {
60 break;
61 }
62 if (fetcher_->context().monotonic_event_time +
63 send_node_factory_->network_delay() +
64 send_node_factory_->send_delay() >
65 fetch_node_factory_->monotonic_now()) {
66 break;
67 }
68
69 // TODO(austin): Not cool. We want to actually forward these. This means
70 // we need a more sophisticated concept of what is running.
71 LOG(WARNING) << "Not forwarding message on "
72 << configuration::CleanedChannelToString(fetcher_->channel())
73 << " because we aren't running. Set at "
74 << fetcher_->context().monotonic_event_time << " now is "
75 << fetch_node_factory_->monotonic_now();
76 sent_ = true;
Austin Schuh4c3b9702020-08-30 11:34:55 -070077 server_connection_->mutate_dropped_packets(
78 server_connection_->dropped_packets() + 1);
Austin Schuh898f4972020-01-11 17:21:25 -080079 }
80
81 if (fetcher_->context().data == nullptr) {
82 return;
83 }
84
85 if (sent_) {
86 return;
87 }
88
89 // Compute the time to publish this message.
90 const monotonic_clock::time_point monotonic_delivered_time =
91 DeliveredTime(fetcher_->context());
92
93 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
Austin Schuh2febf0d2020-09-21 22:24:30 -070094 << ": Trying to deliver message in the past on channel "
95 << configuration::StrippedChannelToString(fetcher_->channel())
96 << " to node " << send_event_loop_->node()->name()->string_view()
97 << " sent from " << fetcher_->channel()->source_node()->string_view()
98 << " at " << fetch_node_factory_->monotonic_now();
Austin Schuh898f4972020-01-11 17:21:25 -080099
Austin Schuh4c3b9702020-08-30 11:34:55 -0700100 server_connection_->mutate_sent_packets(server_connection_->sent_packets() +
101 1);
Austin Schuh898f4972020-01-11 17:21:25 -0800102 timer_->Setup(monotonic_delivered_time);
103 }
104
105 private:
106 // Acutally sends the message, and reschedules.
107 void Send() {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700108 // Fill out the send times.
Austin Schuh898f4972020-01-11 17:21:25 -0800109 sender_->Send(fetcher_->context().data, fetcher_->context().size,
110 fetcher_->context().monotonic_event_time,
111 fetcher_->context().realtime_event_time,
112 fetcher_->context().queue_index);
113
Austin Schuh4c3b9702020-08-30 11:34:55 -0700114 // And simulate message_bridge's offset recovery.
115 client_status_->SampleFilter(client_index_,
116 fetcher_->context().monotonic_event_time,
117 sender_->monotonic_sent_time());
118
119 client_connection_->mutate_received_packets(
120 client_connection_->received_packets() + 1);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700121
122 if (timestamp_logger_) {
Austin Schuh0de30f32020-12-06 12:44:28 -0800123 aos::Sender<RemoteMessage>::Builder builder =
Austin Schuh2f8fd752020-09-01 22:38:28 -0700124 timestamp_logger_->MakeBuilder();
125
Austin Schuh0de30f32020-12-06 12:44:28 -0800126 RemoteMessage::Builder message_header_builder =
127 builder.MakeBuilder<RemoteMessage>();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700128
129 message_header_builder.add_channel_index(channel_index_);
130
131 // Swap the remote and sent metrics. They are from the sender's
132 // perspective, not the receiver's perspective.
133 message_header_builder.add_monotonic_remote_time(
134 fetcher_->context().monotonic_event_time.time_since_epoch().count());
135 message_header_builder.add_realtime_remote_time(
136 fetcher_->context().realtime_event_time.time_since_epoch().count());
137 message_header_builder.add_remote_queue_index(
138 fetcher_->context().queue_index);
139
140 message_header_builder.add_monotonic_sent_time(
141 sender_->monotonic_sent_time().time_since_epoch().count());
142 message_header_builder.add_realtime_sent_time(
143 sender_->realtime_sent_time().time_since_epoch().count());
144 message_header_builder.add_queue_index(sender_->sent_queue_index());
145
146 builder.Send(message_header_builder.Finish());
147 }
148
Austin Schuh898f4972020-01-11 17:21:25 -0800149 sent_ = true;
150 Schedule();
151 }
152
153 // Converts from time on the sending node to time on the receiving node.
154 monotonic_clock::time_point DeliveredTime(const Context &context) const {
155 const distributed_clock::time_point distributed_sent_time =
156 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
157
Austin Schuh2febf0d2020-09-21 22:24:30 -0700158 return send_node_factory_->FromDistributedClock(
159 distributed_sent_time + send_node_factory_->network_delay() +
160 send_node_factory_->send_delay());
Austin Schuh898f4972020-01-11 17:21:25 -0800161 }
162
163 // Factories used for time conversion.
164 aos::NodeEventLoopFactory *fetch_node_factory_;
165 aos::NodeEventLoopFactory *send_node_factory_;
166
167 // Event loop which sending is scheduled on.
168 aos::EventLoop *send_event_loop_;
169 // Timer used to send.
170 aos::TimerHandler *timer_;
171 // Fetcher used to receive messages.
172 std::unique_ptr<aos::RawFetcher> fetcher_;
173 // Sender to send them back out.
174 std::unique_ptr<aos::RawSender> sender_;
175 // True if we have sent the message in the fetcher.
176 bool sent_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700177
178 ServerConnection *server_connection_ = nullptr;
179 MessageBridgeClientStatus *client_status_ = nullptr;
180 int client_index_;
181 ClientConnection *client_connection_ = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700182
183 size_t channel_index_;
Austin Schuh0de30f32020-12-06 12:44:28 -0800184 aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800185};
186
187SimulatedMessageBridge::SimulatedMessageBridge(
188 SimulatedEventLoopFactory *simulated_event_loop_factory) {
189 CHECK(
190 configuration::MultiNode(simulated_event_loop_factory->configuration()));
191
192 // Pre-build up event loops for every node. They are pretty cheap anyways.
193 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700194 auto it = event_loop_map_.emplace(std::make_pair(
195 node,
196 simulated_event_loop_factory->MakeEventLoop("message_bridge", node)));
Austin Schuhcde938c2020-02-02 17:30:07 -0800197
198 CHECK(it.second);
199
Austin Schuh4c3b9702020-08-30 11:34:55 -0700200 it.first->second.event_loop->SkipTimingReport();
201 it.first->second.event_loop->SkipAosLog();
202
203 for (ServerConnection *connection :
204 it.first->second.server_status.server_connection()) {
205 if (connection == nullptr) continue;
206
207 connection->mutate_state(message_bridge::State::CONNECTED);
208 }
209
210 for (size_t i = 0;
211 i < it.first->second.client_status.mutable_client_statistics()
212 ->mutable_connections()
213 ->size();
214 ++i) {
215 ClientConnection *connection =
216 it.first->second.client_status.mutable_client_statistics()
217 ->mutable_connections()
218 ->GetMutableObject(i);
219 if (connection == nullptr) continue;
220
221 connection->mutate_state(message_bridge::State::CONNECTED);
222 }
Austin Schuh898f4972020-01-11 17:21:25 -0800223 }
224
225 for (const Channel *channel :
226 *simulated_event_loop_factory->configuration()->channels()) {
227 if (!channel->has_destination_nodes()) {
228 continue;
229 }
230
231 // Find the sending node.
232 const Node *node =
233 configuration::GetNode(simulated_event_loop_factory->configuration(),
234 channel->source_node()->string_view());
235 auto source_event_loop = event_loop_map_.find(node);
236 CHECK(source_event_loop != event_loop_map_.end());
237
238 std::unique_ptr<DelayersVector> delayers =
239 std::make_unique<DelayersVector>();
240
241 // And then build up a RawMessageDelayer for each destination.
242 for (const Connection *connection : *channel->destination_nodes()) {
243 const Node *destination_node =
244 configuration::GetNode(simulated_event_loop_factory->configuration(),
245 connection->name()->string_view());
246 auto destination_event_loop = event_loop_map_.find(destination_node);
247 CHECK(destination_event_loop != event_loop_map_.end());
248
Austin Schuh4c3b9702020-08-30 11:34:55 -0700249 ServerConnection *server_connection =
250 source_event_loop->second.server_status.FindServerConnection(
251 connection->name()->string_view());
252
253 int client_index =
254 destination_event_loop->second.client_status.FindClientIndex(
255 channel->source_node()->string_view());
256
Austin Schuh2f8fd752020-09-01 22:38:28 -0700257 const size_t destination_node_index = configuration::GetNodeIndex(
258 simulated_event_loop_factory->configuration(), destination_node);
259
260 const bool delivery_time_is_logged =
261 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
262 connection, source_event_loop->second.event_loop->node());
263
Austin Schuh898f4972020-01-11 17:21:25 -0800264 delayers->emplace_back(std::make_unique<RawMessageDelayer>(
265 simulated_event_loop_factory->GetNodeEventLoopFactory(node),
266 simulated_event_loop_factory->GetNodeEventLoopFactory(
267 destination_node),
Austin Schuh4c3b9702020-08-30 11:34:55 -0700268 destination_event_loop->second.event_loop.get(),
269 source_event_loop->second.event_loop->MakeRawFetcher(channel),
270 destination_event_loop->second.event_loop->MakeRawSender(channel),
271 server_connection, client_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700272 &destination_event_loop->second.client_status,
273 configuration::ChannelIndex(
274 source_event_loop->second.event_loop->configuration(), channel),
275 delivery_time_is_logged
276 ? &source_event_loop->second
277 .timestamp_loggers[destination_node_index]
278 : nullptr));
Austin Schuh898f4972020-01-11 17:21:25 -0800279 }
280
Austin Schuh4c3b9702020-08-30 11:34:55 -0700281 const Channel *const timestamp_channel = configuration::GetChannel(
282 simulated_event_loop_factory->configuration(), "/aos",
283 Timestamp::GetFullyQualifiedName(),
284 source_event_loop->second.event_loop->name(), node);
285
286 if (channel == timestamp_channel) {
287 source_event_loop->second.server_status.set_send_data(
288 [captured_delayers = delayers.get()](const Context &) {
289 for (std::unique_ptr<RawMessageDelayer> &delayer :
290 *captured_delayers) {
291 delayer->Schedule();
292 }
293 });
294 } else {
295 // And register every delayer to be poked when a new message shows up.
Austin Schuh4c570ea2020-11-19 23:13:24 -0800296
297 source_event_loop->second.event_loop->OnRun([captured_delayers =
298 delayers.get()]() {
299 // Poke all the reliable delayers so they send any queued messages.
300 for (std::unique_ptr<RawMessageDelayer> &delayer : *captured_delayers) {
301 if (delayer->time_to_live() == 0) {
302 delayer->Schedule();
303 }
304 }
305 });
Austin Schuh4c3b9702020-08-30 11:34:55 -0700306 source_event_loop->second.event_loop->MakeRawNoArgWatcher(
307 channel, [captured_delayers = delayers.get()](const Context &) {
308 for (std::unique_ptr<RawMessageDelayer> &delayer :
309 *captured_delayers) {
310 delayer->Schedule();
311 }
312 });
313 }
Austin Schuh898f4972020-01-11 17:21:25 -0800314 delayers_list_.emplace_back(std::move(delayers));
315 }
316}
317
318SimulatedMessageBridge::~SimulatedMessageBridge() {}
319
Austin Schuh6f3babe2020-01-26 20:34:50 -0800320void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
321 for (std::unique_ptr<std::vector<std::unique_ptr<RawMessageDelayer>>>
322 &delayers : delayers_list_) {
323 if (delayers->size() > 0) {
324 if ((*delayers)[0]->channel() == channel) {
325 for (std::unique_ptr<RawMessageDelayer> &delayer : *delayers) {
326 CHECK(delayer->channel() == channel);
327 }
328
329 // If we clear the delayers list, nothing will be scheduled. Which is a
330 // success!
331 delayers->clear();
332 }
333 }
334 }
335}
336
Austin Schuh4c3b9702020-08-30 11:34:55 -0700337void SimulatedMessageBridge::DisableStatistics() {
338 for (std::pair<const Node *const, State> &state : event_loop_map_) {
339 state.second.server_status.DisableStatistics();
340 state.second.client_status.DisableStatistics();
341 }
342}
343
Austin Schuh2f8fd752020-09-01 22:38:28 -0700344SimulatedMessageBridge::State::State(
345 std::unique_ptr<aos::EventLoop> &&new_event_loop)
346 : event_loop(std::move(new_event_loop)),
347 server_status(event_loop.get()),
348 client_status(event_loop.get()) {
349 timestamp_loggers.resize(event_loop->configuration()->nodes()->size());
350
351 // Find all nodes which log timestamps back to us (from us).
352 for (const Channel *channel : *event_loop->configuration()->channels()) {
353 CHECK(channel->has_source_node());
354
355 // Sent by us.
356 if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
357 channel->has_destination_nodes()) {
358 for (const Connection *connection : *channel->destination_nodes()) {
359 const bool delivery_time_is_logged =
360 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
361 connection, event_loop->node());
362
363 // And the timestamps are then logged back by us again.
364 if (!delivery_time_is_logged) {
365 continue;
366 }
367
368 // (And only construct the sender if it hasn't been constructed)
369 const Node *other_node = configuration::GetNode(
370 event_loop->configuration(), connection->name()->string_view());
371 const size_t other_node_index = configuration::GetNodeIndex(
372 event_loop->configuration(), other_node);
373
374 if (!timestamp_loggers[other_node_index]) {
375 timestamp_loggers[other_node_index] =
Austin Schuh0de30f32020-12-06 12:44:28 -0800376 event_loop->MakeSender<RemoteMessage>(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700377 absl::StrCat("/aos/remote_timestamps/",
378 connection->name()->string_view()));
379 }
380 }
381 }
382 }
383}
384
Austin Schuh898f4972020-01-11 17:21:25 -0800385} // namespace message_bridge
386} // namespace aos