blob: 6b03b2f8d31086ae96231b2bbc1d0bb0e5f862c8 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
3#include "aos/events/event_loop.h"
4#include "aos/events/simulated_event_loop.h"
5
6namespace aos {
7namespace message_bridge {
8
9// This class delays messages forwarded between two factories.
10//
11// The basic design is that we need to use the distributed_clock to convert
12// monotonic times from the source to the destination node. We also use a
13// fetcher to manage the queue of data, and a timer to schedule the sends.
14class RawMessageDelayer {
15 public:
16 RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
17 aos::NodeEventLoopFactory *send_node_factory,
18 aos::EventLoop *send_event_loop,
19 std::unique_ptr<aos::RawFetcher> fetcher,
20 std::unique_ptr<aos::RawSender> sender)
21 : fetch_node_factory_(fetch_node_factory),
22 send_node_factory_(send_node_factory),
23 send_event_loop_(send_event_loop),
24 fetcher_(std::move(fetcher)),
25 sender_(std::move(sender)) {
26 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
27
28 Schedule();
29 }
30
Austin Schuh6f3babe2020-01-26 20:34:50 -080031 const Channel *channel() const { return fetcher_->channel(); }
32
Austin Schuh898f4972020-01-11 17:21:25 -080033 // Kicks us to re-fetch and schedule the timer.
34 void Schedule() {
35 if (fetcher_->context().data == nullptr || sent_) {
36 sent_ = !fetcher_->FetchNext();
37 }
38
39 if (fetcher_->context().data == nullptr) {
40 return;
41 }
42
43 if (sent_) {
44 return;
45 }
46
47 // Compute the time to publish this message.
48 const monotonic_clock::time_point monotonic_delivered_time =
49 DeliveredTime(fetcher_->context());
50
51 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
52 << ": Trying to deliver message in the past...";
53
54 timer_->Setup(monotonic_delivered_time);
55 }
56
57 private:
58 // Acutally sends the message, and reschedules.
59 void Send() {
60 // Compute the time to publish this message.
61 const monotonic_clock::time_point monotonic_delivered_time =
62 DeliveredTime(fetcher_->context());
63
64 CHECK_EQ(monotonic_delivered_time, send_node_factory_->monotonic_now())
65 << ": Message to be sent at the wrong time.";
66
67 // And also fill out the send times as well.
68 sender_->Send(fetcher_->context().data, fetcher_->context().size,
69 fetcher_->context().monotonic_event_time,
70 fetcher_->context().realtime_event_time,
71 fetcher_->context().queue_index);
72
73 sent_ = true;
74 Schedule();
75 }
76
77 // Converts from time on the sending node to time on the receiving node.
78 monotonic_clock::time_point DeliveredTime(const Context &context) const {
79 const distributed_clock::time_point distributed_sent_time =
80 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
81
82 return aos::monotonic_clock::epoch() +
83 (distributed_sent_time - send_node_factory_->ToDistributedClock(
84 aos::monotonic_clock::epoch())) +
85 send_node_factory_->network_delay() +
86 send_node_factory_->send_delay();
87 }
88
89 // Factories used for time conversion.
90 aos::NodeEventLoopFactory *fetch_node_factory_;
91 aos::NodeEventLoopFactory *send_node_factory_;
92
93 // Event loop which sending is scheduled on.
94 aos::EventLoop *send_event_loop_;
95 // Timer used to send.
96 aos::TimerHandler *timer_;
97 // Fetcher used to receive messages.
98 std::unique_ptr<aos::RawFetcher> fetcher_;
99 // Sender to send them back out.
100 std::unique_ptr<aos::RawSender> sender_;
101 // True if we have sent the message in the fetcher.
102 bool sent_ = false;
103};
104
105SimulatedMessageBridge::SimulatedMessageBridge(
106 SimulatedEventLoopFactory *simulated_event_loop_factory) {
107 CHECK(
108 configuration::MultiNode(simulated_event_loop_factory->configuration()));
109
110 // Pre-build up event loops for every node. They are pretty cheap anyways.
111 for (const Node *node : simulated_event_loop_factory->nodes()) {
Austin Schuhcde938c2020-02-02 17:30:07 -0800112 auto it = event_loop_map_.insert(
113 {node,
114 simulated_event_loop_factory->MakeEventLoop("message_bridge", node)});
115
116 CHECK(it.second);
117
118 it.first->second->SkipTimingReport();
119 it.first->second->SkipAosLog();
Austin Schuh898f4972020-01-11 17:21:25 -0800120 }
121
122 for (const Channel *channel :
123 *simulated_event_loop_factory->configuration()->channels()) {
124 if (!channel->has_destination_nodes()) {
125 continue;
126 }
127
128 // Find the sending node.
129 const Node *node =
130 configuration::GetNode(simulated_event_loop_factory->configuration(),
131 channel->source_node()->string_view());
132 auto source_event_loop = event_loop_map_.find(node);
133 CHECK(source_event_loop != event_loop_map_.end());
134
135 std::unique_ptr<DelayersVector> delayers =
136 std::make_unique<DelayersVector>();
137
138 // And then build up a RawMessageDelayer for each destination.
139 for (const Connection *connection : *channel->destination_nodes()) {
140 const Node *destination_node =
141 configuration::GetNode(simulated_event_loop_factory->configuration(),
142 connection->name()->string_view());
143 auto destination_event_loop = event_loop_map_.find(destination_node);
144 CHECK(destination_event_loop != event_loop_map_.end());
145
146 delayers->emplace_back(std::make_unique<RawMessageDelayer>(
147 simulated_event_loop_factory->GetNodeEventLoopFactory(node),
148 simulated_event_loop_factory->GetNodeEventLoopFactory(
149 destination_node),
150 destination_event_loop->second.get(),
151 source_event_loop->second->MakeRawFetcher(channel),
152 destination_event_loop->second->MakeRawSender(channel)));
153 }
154
155 // And register every delayer to be poked when a new message shows up.
156 source_event_loop->second->MakeRawWatcher(
157 channel,
158 [captured_delayers = delayers.get()](const Context &, const void *) {
159 for (std::unique_ptr<RawMessageDelayer> &delayer :
160 *captured_delayers) {
161 delayer->Schedule();
162 }
163 });
164 delayers_list_.emplace_back(std::move(delayers));
165 }
166}
167
168SimulatedMessageBridge::~SimulatedMessageBridge() {}
169
Austin Schuh6f3babe2020-01-26 20:34:50 -0800170void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
171 for (std::unique_ptr<std::vector<std::unique_ptr<RawMessageDelayer>>>
172 &delayers : delayers_list_) {
173 if (delayers->size() > 0) {
174 if ((*delayers)[0]->channel() == channel) {
175 for (std::unique_ptr<RawMessageDelayer> &delayer : *delayers) {
176 CHECK(delayer->channel() == channel);
177 }
178
179 // If we clear the delayers list, nothing will be scheduled. Which is a
180 // success!
181 delayers->clear();
182 }
183 }
184 }
185}
186
Austin Schuh898f4972020-01-11 17:21:25 -0800187} // namespace message_bridge
188} // namespace aos