blob: 5660331b54c2751e629f3a3722622c82eb5593ef [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
3#include "aos/events/event_loop.h"
4#include "aos/events/simulated_event_loop.h"
5
6namespace aos {
7namespace message_bridge {
8
9// This class delays messages forwarded between two factories.
10//
11// The basic design is that we need to use the distributed_clock to convert
12// monotonic times from the source to the destination node. We also use a
13// fetcher to manage the queue of data, and a timer to schedule the sends.
14class RawMessageDelayer {
15 public:
16 RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
17 aos::NodeEventLoopFactory *send_node_factory,
18 aos::EventLoop *send_event_loop,
19 std::unique_ptr<aos::RawFetcher> fetcher,
Austin Schuh4c3b9702020-08-30 11:34:55 -070020 std::unique_ptr<aos::RawSender> sender,
21 ServerConnection *server_connection, int client_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -070022 MessageBridgeClientStatus *client_status,
23 size_t channel_index,
24 aos::Sender<logger::MessageHeader> *timestamp_logger)
Austin Schuh898f4972020-01-11 17:21:25 -080025 : fetch_node_factory_(fetch_node_factory),
26 send_node_factory_(send_node_factory),
27 send_event_loop_(send_event_loop),
28 fetcher_(std::move(fetcher)),
Austin Schuh4c3b9702020-08-30 11:34:55 -070029 sender_(std::move(sender)),
30 server_connection_(server_connection),
31 client_status_(client_status),
32 client_index_(client_index),
Austin Schuh2f8fd752020-09-01 22:38:28 -070033 client_connection_(client_status_->GetClientConnection(client_index)),
34 channel_index_(channel_index),
35 timestamp_logger_(timestamp_logger) {
Austin Schuh898f4972020-01-11 17:21:25 -080036 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
37
38 Schedule();
39 }
40
Austin Schuh6f3babe2020-01-26 20:34:50 -080041 const Channel *channel() const { return fetcher_->channel(); }
42
Austin Schuh898f4972020-01-11 17:21:25 -080043 // Kicks us to re-fetch and schedule the timer.
44 void Schedule() {
Austin Schuh6aa77be2020-02-22 21:06:40 -080045 // Keep pulling messages out of the fetcher until we find one in the future.
46 while (true) {
47 if (fetcher_->context().data == nullptr || sent_) {
48 sent_ = !fetcher_->FetchNext();
49 }
50 if (sent_) {
51 break;
52 }
53 if (fetcher_->context().monotonic_event_time +
54 send_node_factory_->network_delay() +
55 send_node_factory_->send_delay() >
56 fetch_node_factory_->monotonic_now()) {
57 break;
58 }
59
60 // TODO(austin): Not cool. We want to actually forward these. This means
61 // we need a more sophisticated concept of what is running.
62 LOG(WARNING) << "Not forwarding message on "
63 << configuration::CleanedChannelToString(fetcher_->channel())
64 << " because we aren't running. Set at "
65 << fetcher_->context().monotonic_event_time << " now is "
66 << fetch_node_factory_->monotonic_now();
67 sent_ = true;
Austin Schuh4c3b9702020-08-30 11:34:55 -070068 server_connection_->mutate_dropped_packets(
69 server_connection_->dropped_packets() + 1);
Austin Schuh898f4972020-01-11 17:21:25 -080070 }
71
72 if (fetcher_->context().data == nullptr) {
73 return;
74 }
75
76 if (sent_) {
77 return;
78 }
79
80 // Compute the time to publish this message.
81 const monotonic_clock::time_point monotonic_delivered_time =
82 DeliveredTime(fetcher_->context());
83
84 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
Austin Schuh2febf0d2020-09-21 22:24:30 -070085 << ": Trying to deliver message in the past on channel "
86 << configuration::StrippedChannelToString(fetcher_->channel())
87 << " to node " << send_event_loop_->node()->name()->string_view()
88 << " sent from " << fetcher_->channel()->source_node()->string_view()
89 << " at " << fetch_node_factory_->monotonic_now();
Austin Schuh898f4972020-01-11 17:21:25 -080090
Austin Schuh4c3b9702020-08-30 11:34:55 -070091 server_connection_->mutate_sent_packets(server_connection_->sent_packets() +
92 1);
Austin Schuh898f4972020-01-11 17:21:25 -080093 timer_->Setup(monotonic_delivered_time);
94 }
95
96 private:
97 // Acutally sends the message, and reschedules.
98 void Send() {
Austin Schuh4c3b9702020-08-30 11:34:55 -070099 // Fill out the send times.
Austin Schuh898f4972020-01-11 17:21:25 -0800100 sender_->Send(fetcher_->context().data, fetcher_->context().size,
101 fetcher_->context().monotonic_event_time,
102 fetcher_->context().realtime_event_time,
103 fetcher_->context().queue_index);
104
Austin Schuh4c3b9702020-08-30 11:34:55 -0700105 // And simulate message_bridge's offset recovery.
106 client_status_->SampleFilter(client_index_,
107 fetcher_->context().monotonic_event_time,
108 sender_->monotonic_sent_time());
109
110 client_connection_->mutate_received_packets(
111 client_connection_->received_packets() + 1);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700112
113 if (timestamp_logger_) {
114 aos::Sender<logger::MessageHeader>::Builder builder =
115 timestamp_logger_->MakeBuilder();
116
117 logger::MessageHeader::Builder message_header_builder =
118 builder.MakeBuilder<logger::MessageHeader>();
119
120 message_header_builder.add_channel_index(channel_index_);
121
122 // Swap the remote and sent metrics. They are from the sender's
123 // perspective, not the receiver's perspective.
124 message_header_builder.add_monotonic_remote_time(
125 fetcher_->context().monotonic_event_time.time_since_epoch().count());
126 message_header_builder.add_realtime_remote_time(
127 fetcher_->context().realtime_event_time.time_since_epoch().count());
128 message_header_builder.add_remote_queue_index(
129 fetcher_->context().queue_index);
130
131 message_header_builder.add_monotonic_sent_time(
132 sender_->monotonic_sent_time().time_since_epoch().count());
133 message_header_builder.add_realtime_sent_time(
134 sender_->realtime_sent_time().time_since_epoch().count());
135 message_header_builder.add_queue_index(sender_->sent_queue_index());
136
137 builder.Send(message_header_builder.Finish());
138 }
139
Austin Schuh898f4972020-01-11 17:21:25 -0800140 sent_ = true;
141 Schedule();
142 }
143
144 // Converts from time on the sending node to time on the receiving node.
145 monotonic_clock::time_point DeliveredTime(const Context &context) const {
146 const distributed_clock::time_point distributed_sent_time =
147 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
148
Austin Schuh2febf0d2020-09-21 22:24:30 -0700149 return send_node_factory_->FromDistributedClock(
150 distributed_sent_time + send_node_factory_->network_delay() +
151 send_node_factory_->send_delay());
Austin Schuh898f4972020-01-11 17:21:25 -0800152 }
153
154 // Factories used for time conversion.
155 aos::NodeEventLoopFactory *fetch_node_factory_;
156 aos::NodeEventLoopFactory *send_node_factory_;
157
158 // Event loop which sending is scheduled on.
159 aos::EventLoop *send_event_loop_;
160 // Timer used to send.
161 aos::TimerHandler *timer_;
162 // Fetcher used to receive messages.
163 std::unique_ptr<aos::RawFetcher> fetcher_;
164 // Sender to send them back out.
165 std::unique_ptr<aos::RawSender> sender_;
166 // True if we have sent the message in the fetcher.
167 bool sent_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700168
169 ServerConnection *server_connection_ = nullptr;
170 MessageBridgeClientStatus *client_status_ = nullptr;
171 int client_index_;
172 ClientConnection *client_connection_ = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700173
174 size_t channel_index_;
175 aos::Sender<logger::MessageHeader> *timestamp_logger_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800176};
177
178SimulatedMessageBridge::SimulatedMessageBridge(
179 SimulatedEventLoopFactory *simulated_event_loop_factory) {
180 CHECK(
181 configuration::MultiNode(simulated_event_loop_factory->configuration()));
182
183 // Pre-build up event loops for every node. They are pretty cheap anyways.
184 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700185 auto it = event_loop_map_.emplace(std::make_pair(
186 node,
187 simulated_event_loop_factory->MakeEventLoop("message_bridge", node)));
Austin Schuhcde938c2020-02-02 17:30:07 -0800188
189 CHECK(it.second);
190
Austin Schuh4c3b9702020-08-30 11:34:55 -0700191 it.first->second.event_loop->SkipTimingReport();
192 it.first->second.event_loop->SkipAosLog();
193
194 for (ServerConnection *connection :
195 it.first->second.server_status.server_connection()) {
196 if (connection == nullptr) continue;
197
198 connection->mutate_state(message_bridge::State::CONNECTED);
199 }
200
201 for (size_t i = 0;
202 i < it.first->second.client_status.mutable_client_statistics()
203 ->mutable_connections()
204 ->size();
205 ++i) {
206 ClientConnection *connection =
207 it.first->second.client_status.mutable_client_statistics()
208 ->mutable_connections()
209 ->GetMutableObject(i);
210 if (connection == nullptr) continue;
211
212 connection->mutate_state(message_bridge::State::CONNECTED);
213 }
Austin Schuh898f4972020-01-11 17:21:25 -0800214 }
215
216 for (const Channel *channel :
217 *simulated_event_loop_factory->configuration()->channels()) {
218 if (!channel->has_destination_nodes()) {
219 continue;
220 }
221
222 // Find the sending node.
223 const Node *node =
224 configuration::GetNode(simulated_event_loop_factory->configuration(),
225 channel->source_node()->string_view());
226 auto source_event_loop = event_loop_map_.find(node);
227 CHECK(source_event_loop != event_loop_map_.end());
228
229 std::unique_ptr<DelayersVector> delayers =
230 std::make_unique<DelayersVector>();
231
232 // And then build up a RawMessageDelayer for each destination.
233 for (const Connection *connection : *channel->destination_nodes()) {
234 const Node *destination_node =
235 configuration::GetNode(simulated_event_loop_factory->configuration(),
236 connection->name()->string_view());
237 auto destination_event_loop = event_loop_map_.find(destination_node);
238 CHECK(destination_event_loop != event_loop_map_.end());
239
Austin Schuh4c3b9702020-08-30 11:34:55 -0700240 ServerConnection *server_connection =
241 source_event_loop->second.server_status.FindServerConnection(
242 connection->name()->string_view());
243
244 int client_index =
245 destination_event_loop->second.client_status.FindClientIndex(
246 channel->source_node()->string_view());
247
Austin Schuh2f8fd752020-09-01 22:38:28 -0700248 const size_t destination_node_index = configuration::GetNodeIndex(
249 simulated_event_loop_factory->configuration(), destination_node);
250
251 const bool delivery_time_is_logged =
252 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
253 connection, source_event_loop->second.event_loop->node());
254
Austin Schuh898f4972020-01-11 17:21:25 -0800255 delayers->emplace_back(std::make_unique<RawMessageDelayer>(
256 simulated_event_loop_factory->GetNodeEventLoopFactory(node),
257 simulated_event_loop_factory->GetNodeEventLoopFactory(
258 destination_node),
Austin Schuh4c3b9702020-08-30 11:34:55 -0700259 destination_event_loop->second.event_loop.get(),
260 source_event_loop->second.event_loop->MakeRawFetcher(channel),
261 destination_event_loop->second.event_loop->MakeRawSender(channel),
262 server_connection, client_index,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700263 &destination_event_loop->second.client_status,
264 configuration::ChannelIndex(
265 source_event_loop->second.event_loop->configuration(), channel),
266 delivery_time_is_logged
267 ? &source_event_loop->second
268 .timestamp_loggers[destination_node_index]
269 : nullptr));
Austin Schuh898f4972020-01-11 17:21:25 -0800270 }
271
Austin Schuh4c3b9702020-08-30 11:34:55 -0700272 const Channel *const timestamp_channel = configuration::GetChannel(
273 simulated_event_loop_factory->configuration(), "/aos",
274 Timestamp::GetFullyQualifiedName(),
275 source_event_loop->second.event_loop->name(), node);
276
277 if (channel == timestamp_channel) {
278 source_event_loop->second.server_status.set_send_data(
279 [captured_delayers = delayers.get()](const Context &) {
280 for (std::unique_ptr<RawMessageDelayer> &delayer :
281 *captured_delayers) {
282 delayer->Schedule();
283 }
284 });
285 } else {
286 // And register every delayer to be poked when a new message shows up.
287 source_event_loop->second.event_loop->MakeRawNoArgWatcher(
288 channel, [captured_delayers = delayers.get()](const Context &) {
289 for (std::unique_ptr<RawMessageDelayer> &delayer :
290 *captured_delayers) {
291 delayer->Schedule();
292 }
293 });
294 }
Austin Schuh898f4972020-01-11 17:21:25 -0800295 delayers_list_.emplace_back(std::move(delayers));
296 }
297}
298
299SimulatedMessageBridge::~SimulatedMessageBridge() {}
300
Austin Schuh6f3babe2020-01-26 20:34:50 -0800301void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
302 for (std::unique_ptr<std::vector<std::unique_ptr<RawMessageDelayer>>>
303 &delayers : delayers_list_) {
304 if (delayers->size() > 0) {
305 if ((*delayers)[0]->channel() == channel) {
306 for (std::unique_ptr<RawMessageDelayer> &delayer : *delayers) {
307 CHECK(delayer->channel() == channel);
308 }
309
310 // If we clear the delayers list, nothing will be scheduled. Which is a
311 // success!
312 delayers->clear();
313 }
314 }
315 }
316}
317
Austin Schuh4c3b9702020-08-30 11:34:55 -0700318void SimulatedMessageBridge::DisableStatistics() {
319 for (std::pair<const Node *const, State> &state : event_loop_map_) {
320 state.second.server_status.DisableStatistics();
321 state.second.client_status.DisableStatistics();
322 }
323}
324
Austin Schuh2f8fd752020-09-01 22:38:28 -0700325SimulatedMessageBridge::State::State(
326 std::unique_ptr<aos::EventLoop> &&new_event_loop)
327 : event_loop(std::move(new_event_loop)),
328 server_status(event_loop.get()),
329 client_status(event_loop.get()) {
330 timestamp_loggers.resize(event_loop->configuration()->nodes()->size());
331
332 // Find all nodes which log timestamps back to us (from us).
333 for (const Channel *channel : *event_loop->configuration()->channels()) {
334 CHECK(channel->has_source_node());
335
336 // Sent by us.
337 if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
338 channel->has_destination_nodes()) {
339 for (const Connection *connection : *channel->destination_nodes()) {
340 const bool delivery_time_is_logged =
341 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
342 connection, event_loop->node());
343
344 // And the timestamps are then logged back by us again.
345 if (!delivery_time_is_logged) {
346 continue;
347 }
348
349 // (And only construct the sender if it hasn't been constructed)
350 const Node *other_node = configuration::GetNode(
351 event_loop->configuration(), connection->name()->string_view());
352 const size_t other_node_index = configuration::GetNodeIndex(
353 event_loop->configuration(), other_node);
354
355 if (!timestamp_loggers[other_node_index]) {
356 timestamp_loggers[other_node_index] =
357 event_loop->MakeSender<logger::MessageHeader>(
358 absl::StrCat("/aos/remote_timestamps/",
359 connection->name()->string_view()));
360 }
361 }
362 }
363 }
364}
365
Austin Schuh898f4972020-01-11 17:21:25 -0800366} // namespace message_bridge
367} // namespace aos