blob: 10ce37d03b8f9472296b5db12747f96006137b02 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
3#include "aos/events/event_loop.h"
4#include "aos/events/simulated_event_loop.h"
5
6namespace aos {
7namespace message_bridge {
8
9// This class delays messages forwarded between two factories.
10//
11// The basic design is that we need to use the distributed_clock to convert
12// monotonic times from the source to the destination node. We also use a
13// fetcher to manage the queue of data, and a timer to schedule the sends.
14class RawMessageDelayer {
15 public:
16 RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
17 aos::NodeEventLoopFactory *send_node_factory,
18 aos::EventLoop *send_event_loop,
19 std::unique_ptr<aos::RawFetcher> fetcher,
20 std::unique_ptr<aos::RawSender> sender)
21 : fetch_node_factory_(fetch_node_factory),
22 send_node_factory_(send_node_factory),
23 send_event_loop_(send_event_loop),
24 fetcher_(std::move(fetcher)),
25 sender_(std::move(sender)) {
26 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
27
28 Schedule();
29 }
30
Austin Schuh6f3babe2020-01-26 20:34:50 -080031 const Channel *channel() const { return fetcher_->channel(); }
32
Austin Schuh898f4972020-01-11 17:21:25 -080033 // Kicks us to re-fetch and schedule the timer.
34 void Schedule() {
35 if (fetcher_->context().data == nullptr || sent_) {
36 sent_ = !fetcher_->FetchNext();
37 }
38
39 if (fetcher_->context().data == nullptr) {
40 return;
41 }
42
43 if (sent_) {
44 return;
45 }
46
47 // Compute the time to publish this message.
48 const monotonic_clock::time_point monotonic_delivered_time =
49 DeliveredTime(fetcher_->context());
50
51 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
52 << ": Trying to deliver message in the past...";
53
54 timer_->Setup(monotonic_delivered_time);
55 }
56
57 private:
58 // Acutally sends the message, and reschedules.
59 void Send() {
60 // Compute the time to publish this message.
61 const monotonic_clock::time_point monotonic_delivered_time =
62 DeliveredTime(fetcher_->context());
63
64 CHECK_EQ(monotonic_delivered_time, send_node_factory_->monotonic_now())
65 << ": Message to be sent at the wrong time.";
66
67 // And also fill out the send times as well.
68 sender_->Send(fetcher_->context().data, fetcher_->context().size,
69 fetcher_->context().monotonic_event_time,
70 fetcher_->context().realtime_event_time,
71 fetcher_->context().queue_index);
72
73 sent_ = true;
74 Schedule();
75 }
76
77 // Converts from time on the sending node to time on the receiving node.
78 monotonic_clock::time_point DeliveredTime(const Context &context) const {
79 const distributed_clock::time_point distributed_sent_time =
80 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
81
82 return aos::monotonic_clock::epoch() +
83 (distributed_sent_time - send_node_factory_->ToDistributedClock(
84 aos::monotonic_clock::epoch())) +
85 send_node_factory_->network_delay() +
86 send_node_factory_->send_delay();
87 }
88
89 // Factories used for time conversion.
90 aos::NodeEventLoopFactory *fetch_node_factory_;
91 aos::NodeEventLoopFactory *send_node_factory_;
92
93 // Event loop which sending is scheduled on.
94 aos::EventLoop *send_event_loop_;
95 // Timer used to send.
96 aos::TimerHandler *timer_;
97 // Fetcher used to receive messages.
98 std::unique_ptr<aos::RawFetcher> fetcher_;
99 // Sender to send them back out.
100 std::unique_ptr<aos::RawSender> sender_;
101 // True if we have sent the message in the fetcher.
102 bool sent_ = false;
103};
104
105SimulatedMessageBridge::SimulatedMessageBridge(
106 SimulatedEventLoopFactory *simulated_event_loop_factory) {
107 CHECK(
108 configuration::MultiNode(simulated_event_loop_factory->configuration()));
109
110 // Pre-build up event loops for every node. They are pretty cheap anyways.
111 for (const Node *node : simulated_event_loop_factory->nodes()) {
112 CHECK(event_loop_map_
113 .insert({node, simulated_event_loop_factory->MakeEventLoop(
114 "message_bridge", node)})
115 .second);
116 }
117
118 for (const Channel *channel :
119 *simulated_event_loop_factory->configuration()->channels()) {
120 if (!channel->has_destination_nodes()) {
121 continue;
122 }
123
124 // Find the sending node.
125 const Node *node =
126 configuration::GetNode(simulated_event_loop_factory->configuration(),
127 channel->source_node()->string_view());
128 auto source_event_loop = event_loop_map_.find(node);
129 CHECK(source_event_loop != event_loop_map_.end());
130
131 std::unique_ptr<DelayersVector> delayers =
132 std::make_unique<DelayersVector>();
133
134 // And then build up a RawMessageDelayer for each destination.
135 for (const Connection *connection : *channel->destination_nodes()) {
136 const Node *destination_node =
137 configuration::GetNode(simulated_event_loop_factory->configuration(),
138 connection->name()->string_view());
139 auto destination_event_loop = event_loop_map_.find(destination_node);
140 CHECK(destination_event_loop != event_loop_map_.end());
141
142 delayers->emplace_back(std::make_unique<RawMessageDelayer>(
143 simulated_event_loop_factory->GetNodeEventLoopFactory(node),
144 simulated_event_loop_factory->GetNodeEventLoopFactory(
145 destination_node),
146 destination_event_loop->second.get(),
147 source_event_loop->second->MakeRawFetcher(channel),
148 destination_event_loop->second->MakeRawSender(channel)));
149 }
150
151 // And register every delayer to be poked when a new message shows up.
152 source_event_loop->second->MakeRawWatcher(
153 channel,
154 [captured_delayers = delayers.get()](const Context &, const void *) {
155 for (std::unique_ptr<RawMessageDelayer> &delayer :
156 *captured_delayers) {
157 delayer->Schedule();
158 }
159 });
160 delayers_list_.emplace_back(std::move(delayers));
161 }
162}
163
164SimulatedMessageBridge::~SimulatedMessageBridge() {}
165
Austin Schuh6f3babe2020-01-26 20:34:50 -0800166void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
167 for (std::unique_ptr<std::vector<std::unique_ptr<RawMessageDelayer>>>
168 &delayers : delayers_list_) {
169 if (delayers->size() > 0) {
170 if ((*delayers)[0]->channel() == channel) {
171 for (std::unique_ptr<RawMessageDelayer> &delayer : *delayers) {
172 CHECK(delayer->channel() == channel);
173 }
174
175 // If we clear the delayers list, nothing will be scheduled. Which is a
176 // success!
177 delayers->clear();
178 }
179 }
180 }
181}
182
Austin Schuh898f4972020-01-11 17:21:25 -0800183} // namespace message_bridge
184} // namespace aos