blob: 0f6e8bc28a9b5b3c37525524fad9a2e7e7662970 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
3#include "aos/events/event_loop.h"
4#include "aos/events/simulated_event_loop.h"
5
6namespace aos {
7namespace message_bridge {
8
9// This class delays messages forwarded between two factories.
10//
11// The basic design is that we need to use the distributed_clock to convert
12// monotonic times from the source to the destination node. We also use a
13// fetcher to manage the queue of data, and a timer to schedule the sends.
14class RawMessageDelayer {
15 public:
16 RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
17 aos::NodeEventLoopFactory *send_node_factory,
18 aos::EventLoop *send_event_loop,
19 std::unique_ptr<aos::RawFetcher> fetcher,
Austin Schuh4c3b9702020-08-30 11:34:55 -070020 std::unique_ptr<aos::RawSender> sender,
21 ServerConnection *server_connection, int client_index,
22 MessageBridgeClientStatus *client_status)
Austin Schuh898f4972020-01-11 17:21:25 -080023 : fetch_node_factory_(fetch_node_factory),
24 send_node_factory_(send_node_factory),
25 send_event_loop_(send_event_loop),
26 fetcher_(std::move(fetcher)),
Austin Schuh4c3b9702020-08-30 11:34:55 -070027 sender_(std::move(sender)),
28 server_connection_(server_connection),
29 client_status_(client_status),
30 client_index_(client_index),
31 client_connection_(client_status_->GetClientConnection(client_index)) {
Austin Schuh898f4972020-01-11 17:21:25 -080032 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
33
34 Schedule();
35 }
36
Austin Schuh6f3babe2020-01-26 20:34:50 -080037 const Channel *channel() const { return fetcher_->channel(); }
38
Austin Schuh898f4972020-01-11 17:21:25 -080039 // Kicks us to re-fetch and schedule the timer.
40 void Schedule() {
Austin Schuh6aa77be2020-02-22 21:06:40 -080041 // Keep pulling messages out of the fetcher until we find one in the future.
42 while (true) {
43 if (fetcher_->context().data == nullptr || sent_) {
44 sent_ = !fetcher_->FetchNext();
45 }
46 if (sent_) {
47 break;
48 }
49 if (fetcher_->context().monotonic_event_time +
50 send_node_factory_->network_delay() +
51 send_node_factory_->send_delay() >
52 fetch_node_factory_->monotonic_now()) {
53 break;
54 }
55
56 // TODO(austin): Not cool. We want to actually forward these. This means
57 // we need a more sophisticated concept of what is running.
58 LOG(WARNING) << "Not forwarding message on "
59 << configuration::CleanedChannelToString(fetcher_->channel())
60 << " because we aren't running. Set at "
61 << fetcher_->context().monotonic_event_time << " now is "
62 << fetch_node_factory_->monotonic_now();
63 sent_ = true;
Austin Schuh4c3b9702020-08-30 11:34:55 -070064 server_connection_->mutate_dropped_packets(
65 server_connection_->dropped_packets() + 1);
Austin Schuh898f4972020-01-11 17:21:25 -080066 }
67
68 if (fetcher_->context().data == nullptr) {
69 return;
70 }
71
72 if (sent_) {
73 return;
74 }
75
76 // Compute the time to publish this message.
77 const monotonic_clock::time_point monotonic_delivered_time =
78 DeliveredTime(fetcher_->context());
79
80 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
81 << ": Trying to deliver message in the past...";
82
Austin Schuh4c3b9702020-08-30 11:34:55 -070083 server_connection_->mutate_sent_packets(server_connection_->sent_packets() +
84 1);
Austin Schuh898f4972020-01-11 17:21:25 -080085 timer_->Setup(monotonic_delivered_time);
86 }
87
88 private:
89 // Acutally sends the message, and reschedules.
90 void Send() {
Austin Schuh4c3b9702020-08-30 11:34:55 -070091 // Fill out the send times.
Austin Schuh898f4972020-01-11 17:21:25 -080092 sender_->Send(fetcher_->context().data, fetcher_->context().size,
93 fetcher_->context().monotonic_event_time,
94 fetcher_->context().realtime_event_time,
95 fetcher_->context().queue_index);
96
Austin Schuh4c3b9702020-08-30 11:34:55 -070097 // And simulate message_bridge's offset recovery.
98 client_status_->SampleFilter(client_index_,
99 fetcher_->context().monotonic_event_time,
100 sender_->monotonic_sent_time());
101
102 client_connection_->mutate_received_packets(
103 client_connection_->received_packets() + 1);
Austin Schuh898f4972020-01-11 17:21:25 -0800104 sent_ = true;
105 Schedule();
106 }
107
108 // Converts from time on the sending node to time on the receiving node.
109 monotonic_clock::time_point DeliveredTime(const Context &context) const {
110 const distributed_clock::time_point distributed_sent_time =
111 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
112
113 return aos::monotonic_clock::epoch() +
114 (distributed_sent_time - send_node_factory_->ToDistributedClock(
115 aos::monotonic_clock::epoch())) +
116 send_node_factory_->network_delay() +
117 send_node_factory_->send_delay();
118 }
119
120 // Factories used for time conversion.
121 aos::NodeEventLoopFactory *fetch_node_factory_;
122 aos::NodeEventLoopFactory *send_node_factory_;
123
124 // Event loop which sending is scheduled on.
125 aos::EventLoop *send_event_loop_;
126 // Timer used to send.
127 aos::TimerHandler *timer_;
128 // Fetcher used to receive messages.
129 std::unique_ptr<aos::RawFetcher> fetcher_;
130 // Sender to send them back out.
131 std::unique_ptr<aos::RawSender> sender_;
132 // True if we have sent the message in the fetcher.
133 bool sent_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700134
135 ServerConnection *server_connection_ = nullptr;
136 MessageBridgeClientStatus *client_status_ = nullptr;
137 int client_index_;
138 ClientConnection *client_connection_ = nullptr;
Austin Schuh898f4972020-01-11 17:21:25 -0800139};
140
141SimulatedMessageBridge::SimulatedMessageBridge(
142 SimulatedEventLoopFactory *simulated_event_loop_factory) {
143 CHECK(
144 configuration::MultiNode(simulated_event_loop_factory->configuration()));
145
146 // Pre-build up event loops for every node. They are pretty cheap anyways.
147 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuh4c3b9702020-08-30 11:34:55 -0700148 auto it = event_loop_map_.emplace(std::make_pair(
149 node,
150 simulated_event_loop_factory->MakeEventLoop("message_bridge", node)));
Austin Schuhcde938c2020-02-02 17:30:07 -0800151
152 CHECK(it.second);
153
Austin Schuh4c3b9702020-08-30 11:34:55 -0700154 it.first->second.event_loop->SkipTimingReport();
155 it.first->second.event_loop->SkipAosLog();
156
157 for (ServerConnection *connection :
158 it.first->second.server_status.server_connection()) {
159 if (connection == nullptr) continue;
160
161 connection->mutate_state(message_bridge::State::CONNECTED);
162 }
163
164 for (size_t i = 0;
165 i < it.first->second.client_status.mutable_client_statistics()
166 ->mutable_connections()
167 ->size();
168 ++i) {
169 ClientConnection *connection =
170 it.first->second.client_status.mutable_client_statistics()
171 ->mutable_connections()
172 ->GetMutableObject(i);
173 if (connection == nullptr) continue;
174
175 connection->mutate_state(message_bridge::State::CONNECTED);
176 }
Austin Schuh898f4972020-01-11 17:21:25 -0800177 }
178
179 for (const Channel *channel :
180 *simulated_event_loop_factory->configuration()->channels()) {
181 if (!channel->has_destination_nodes()) {
182 continue;
183 }
184
185 // Find the sending node.
186 const Node *node =
187 configuration::GetNode(simulated_event_loop_factory->configuration(),
188 channel->source_node()->string_view());
189 auto source_event_loop = event_loop_map_.find(node);
190 CHECK(source_event_loop != event_loop_map_.end());
191
192 std::unique_ptr<DelayersVector> delayers =
193 std::make_unique<DelayersVector>();
194
195 // And then build up a RawMessageDelayer for each destination.
196 for (const Connection *connection : *channel->destination_nodes()) {
197 const Node *destination_node =
198 configuration::GetNode(simulated_event_loop_factory->configuration(),
199 connection->name()->string_view());
200 auto destination_event_loop = event_loop_map_.find(destination_node);
201 CHECK(destination_event_loop != event_loop_map_.end());
202
Austin Schuh4c3b9702020-08-30 11:34:55 -0700203 ServerConnection *server_connection =
204 source_event_loop->second.server_status.FindServerConnection(
205 connection->name()->string_view());
206
207 int client_index =
208 destination_event_loop->second.client_status.FindClientIndex(
209 channel->source_node()->string_view());
210
Austin Schuh898f4972020-01-11 17:21:25 -0800211 delayers->emplace_back(std::make_unique<RawMessageDelayer>(
212 simulated_event_loop_factory->GetNodeEventLoopFactory(node),
213 simulated_event_loop_factory->GetNodeEventLoopFactory(
214 destination_node),
Austin Schuh4c3b9702020-08-30 11:34:55 -0700215 destination_event_loop->second.event_loop.get(),
216 source_event_loop->second.event_loop->MakeRawFetcher(channel),
217 destination_event_loop->second.event_loop->MakeRawSender(channel),
218 server_connection, client_index,
219 &destination_event_loop->second.client_status));
Austin Schuh898f4972020-01-11 17:21:25 -0800220 }
221
Austin Schuh4c3b9702020-08-30 11:34:55 -0700222 const Channel *const timestamp_channel = configuration::GetChannel(
223 simulated_event_loop_factory->configuration(), "/aos",
224 Timestamp::GetFullyQualifiedName(),
225 source_event_loop->second.event_loop->name(), node);
226
227 if (channel == timestamp_channel) {
228 source_event_loop->second.server_status.set_send_data(
229 [captured_delayers = delayers.get()](const Context &) {
230 for (std::unique_ptr<RawMessageDelayer> &delayer :
231 *captured_delayers) {
232 delayer->Schedule();
233 }
234 });
235 } else {
236 // And register every delayer to be poked when a new message shows up.
237 source_event_loop->second.event_loop->MakeRawNoArgWatcher(
238 channel, [captured_delayers = delayers.get()](const Context &) {
239 for (std::unique_ptr<RawMessageDelayer> &delayer :
240 *captured_delayers) {
241 delayer->Schedule();
242 }
243 });
244 }
Austin Schuh898f4972020-01-11 17:21:25 -0800245 delayers_list_.emplace_back(std::move(delayers));
246 }
247}
248
249SimulatedMessageBridge::~SimulatedMessageBridge() {}
250
Austin Schuh6f3babe2020-01-26 20:34:50 -0800251void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
252 for (std::unique_ptr<std::vector<std::unique_ptr<RawMessageDelayer>>>
253 &delayers : delayers_list_) {
254 if (delayers->size() > 0) {
255 if ((*delayers)[0]->channel() == channel) {
256 for (std::unique_ptr<RawMessageDelayer> &delayer : *delayers) {
257 CHECK(delayer->channel() == channel);
258 }
259
260 // If we clear the delayers list, nothing will be scheduled. Which is a
261 // success!
262 delayers->clear();
263 }
264 }
265 }
266}
267
Austin Schuh4c3b9702020-08-30 11:34:55 -0700268void SimulatedMessageBridge::DisableStatistics() {
269 for (std::pair<const Node *const, State> &state : event_loop_map_) {
270 state.second.server_status.DisableStatistics();
271 state.second.client_status.DisableStatistics();
272 }
273}
274
Austin Schuh898f4972020-01-11 17:21:25 -0800275} // namespace message_bridge
276} // namespace aos