blob: 1f56e46cb511dafac7f0913d32c520d6179e4061 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
3#include "aos/events/event_loop.h"
4#include "aos/events/simulated_event_loop.h"
5
6namespace aos {
7namespace message_bridge {
8
9// This class delays messages forwarded between two factories.
10//
11// The basic design is that we need to use the distributed_clock to convert
12// monotonic times from the source to the destination node. We also use a
13// fetcher to manage the queue of data, and a timer to schedule the sends.
14class RawMessageDelayer {
15 public:
16 RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
17 aos::NodeEventLoopFactory *send_node_factory,
18 aos::EventLoop *send_event_loop,
19 std::unique_ptr<aos::RawFetcher> fetcher,
20 std::unique_ptr<aos::RawSender> sender)
21 : fetch_node_factory_(fetch_node_factory),
22 send_node_factory_(send_node_factory),
23 send_event_loop_(send_event_loop),
24 fetcher_(std::move(fetcher)),
25 sender_(std::move(sender)) {
26 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
27
28 Schedule();
29 }
30
Austin Schuh6f3babe2020-01-26 20:34:50 -080031 const Channel *channel() const { return fetcher_->channel(); }
32
Austin Schuh898f4972020-01-11 17:21:25 -080033 // Kicks us to re-fetch and schedule the timer.
34 void Schedule() {
Austin Schuh6aa77be2020-02-22 21:06:40 -080035 // Keep pulling messages out of the fetcher until we find one in the future.
36 while (true) {
37 if (fetcher_->context().data == nullptr || sent_) {
38 sent_ = !fetcher_->FetchNext();
39 }
40 if (sent_) {
41 break;
42 }
43 if (fetcher_->context().monotonic_event_time +
44 send_node_factory_->network_delay() +
45 send_node_factory_->send_delay() >
46 fetch_node_factory_->monotonic_now()) {
47 break;
48 }
49
50 // TODO(austin): Not cool. We want to actually forward these. This means
51 // we need a more sophisticated concept of what is running.
52 LOG(WARNING) << "Not forwarding message on "
53 << configuration::CleanedChannelToString(fetcher_->channel())
54 << " because we aren't running. Set at "
55 << fetcher_->context().monotonic_event_time << " now is "
56 << fetch_node_factory_->monotonic_now();
57 sent_ = true;
Austin Schuh898f4972020-01-11 17:21:25 -080058 }
59
60 if (fetcher_->context().data == nullptr) {
61 return;
62 }
63
64 if (sent_) {
65 return;
66 }
67
68 // Compute the time to publish this message.
69 const monotonic_clock::time_point monotonic_delivered_time =
70 DeliveredTime(fetcher_->context());
71
72 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
73 << ": Trying to deliver message in the past...";
74
75 timer_->Setup(monotonic_delivered_time);
76 }
77
78 private:
79 // Acutally sends the message, and reschedules.
80 void Send() {
81 // Compute the time to publish this message.
82 const monotonic_clock::time_point monotonic_delivered_time =
83 DeliveredTime(fetcher_->context());
84
85 CHECK_EQ(monotonic_delivered_time, send_node_factory_->monotonic_now())
86 << ": Message to be sent at the wrong time.";
87
88 // And also fill out the send times as well.
89 sender_->Send(fetcher_->context().data, fetcher_->context().size,
90 fetcher_->context().monotonic_event_time,
91 fetcher_->context().realtime_event_time,
92 fetcher_->context().queue_index);
93
94 sent_ = true;
95 Schedule();
96 }
97
98 // Converts from time on the sending node to time on the receiving node.
99 monotonic_clock::time_point DeliveredTime(const Context &context) const {
100 const distributed_clock::time_point distributed_sent_time =
101 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
102
103 return aos::monotonic_clock::epoch() +
104 (distributed_sent_time - send_node_factory_->ToDistributedClock(
105 aos::monotonic_clock::epoch())) +
106 send_node_factory_->network_delay() +
107 send_node_factory_->send_delay();
108 }
109
110 // Factories used for time conversion.
111 aos::NodeEventLoopFactory *fetch_node_factory_;
112 aos::NodeEventLoopFactory *send_node_factory_;
113
114 // Event loop which sending is scheduled on.
115 aos::EventLoop *send_event_loop_;
116 // Timer used to send.
117 aos::TimerHandler *timer_;
118 // Fetcher used to receive messages.
119 std::unique_ptr<aos::RawFetcher> fetcher_;
120 // Sender to send them back out.
121 std::unique_ptr<aos::RawSender> sender_;
122 // True if we have sent the message in the fetcher.
123 bool sent_ = false;
124};
125
126SimulatedMessageBridge::SimulatedMessageBridge(
127 SimulatedEventLoopFactory *simulated_event_loop_factory) {
128 CHECK(
129 configuration::MultiNode(simulated_event_loop_factory->configuration()));
130
131 // Pre-build up event loops for every node. They are pretty cheap anyways.
132 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuhcde938c2020-02-02 17:30:07 -0800133 auto it = event_loop_map_.insert(
134 {node,
135 simulated_event_loop_factory->MakeEventLoop("message_bridge", node)});
136
137 CHECK(it.second);
138
139 it.first->second->SkipTimingReport();
140 it.first->second->SkipAosLog();
Austin Schuh898f4972020-01-11 17:21:25 -0800141 }
142
143 for (const Channel *channel :
144 *simulated_event_loop_factory->configuration()->channels()) {
145 if (!channel->has_destination_nodes()) {
146 continue;
147 }
148
149 // Find the sending node.
150 const Node *node =
151 configuration::GetNode(simulated_event_loop_factory->configuration(),
152 channel->source_node()->string_view());
153 auto source_event_loop = event_loop_map_.find(node);
154 CHECK(source_event_loop != event_loop_map_.end());
155
156 std::unique_ptr<DelayersVector> delayers =
157 std::make_unique<DelayersVector>();
158
159 // And then build up a RawMessageDelayer for each destination.
160 for (const Connection *connection : *channel->destination_nodes()) {
161 const Node *destination_node =
162 configuration::GetNode(simulated_event_loop_factory->configuration(),
163 connection->name()->string_view());
164 auto destination_event_loop = event_loop_map_.find(destination_node);
165 CHECK(destination_event_loop != event_loop_map_.end());
166
167 delayers->emplace_back(std::make_unique<RawMessageDelayer>(
168 simulated_event_loop_factory->GetNodeEventLoopFactory(node),
169 simulated_event_loop_factory->GetNodeEventLoopFactory(
170 destination_node),
171 destination_event_loop->second.get(),
172 source_event_loop->second->MakeRawFetcher(channel),
173 destination_event_loop->second->MakeRawSender(channel)));
174 }
175
176 // And register every delayer to be poked when a new message shows up.
177 source_event_loop->second->MakeRawWatcher(
178 channel,
179 [captured_delayers = delayers.get()](const Context &, const void *) {
180 for (std::unique_ptr<RawMessageDelayer> &delayer :
181 *captured_delayers) {
182 delayer->Schedule();
183 }
184 });
185 delayers_list_.emplace_back(std::move(delayers));
186 }
187}
188
189SimulatedMessageBridge::~SimulatedMessageBridge() {}
190
Austin Schuh6f3babe2020-01-26 20:34:50 -0800191void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
192 for (std::unique_ptr<std::vector<std::unique_ptr<RawMessageDelayer>>>
193 &delayers : delayers_list_) {
194 if (delayers->size() > 0) {
195 if ((*delayers)[0]->channel() == channel) {
196 for (std::unique_ptr<RawMessageDelayer> &delayer : *delayers) {
197 CHECK(delayer->channel() == channel);
198 }
199
200 // If we clear the delayers list, nothing will be scheduled. Which is a
201 // success!
202 delayers->clear();
203 }
204 }
205 }
206}
207
Austin Schuh898f4972020-01-11 17:21:25 -0800208} // namespace message_bridge
209} // namespace aos