blob: 57f2efd8f05f8201b9496af906eee2e08b4aaef3 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#include "aos/events/simulated_network_bridge.h"
2
3#include "aos/events/event_loop.h"
4#include "aos/events/simulated_event_loop.h"
5
6namespace aos {
7namespace message_bridge {
8
9// This class delays messages forwarded between two factories.
10//
11// The basic design is that we need to use the distributed_clock to convert
12// monotonic times from the source to the destination node. We also use a
13// fetcher to manage the queue of data, and a timer to schedule the sends.
14class RawMessageDelayer {
15 public:
16 RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
17 aos::NodeEventLoopFactory *send_node_factory,
18 aos::EventLoop *send_event_loop,
19 std::unique_ptr<aos::RawFetcher> fetcher,
20 std::unique_ptr<aos::RawSender> sender)
21 : fetch_node_factory_(fetch_node_factory),
22 send_node_factory_(send_node_factory),
23 send_event_loop_(send_event_loop),
24 fetcher_(std::move(fetcher)),
25 sender_(std::move(sender)) {
26 timer_ = send_event_loop_->AddTimer([this]() { Send(); });
27
28 Schedule();
29 }
30
31 // Kicks us to re-fetch and schedule the timer.
32 void Schedule() {
33 if (fetcher_->context().data == nullptr || sent_) {
34 sent_ = !fetcher_->FetchNext();
35 }
36
37 if (fetcher_->context().data == nullptr) {
38 return;
39 }
40
41 if (sent_) {
42 return;
43 }
44
45 // Compute the time to publish this message.
46 const monotonic_clock::time_point monotonic_delivered_time =
47 DeliveredTime(fetcher_->context());
48
49 CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
50 << ": Trying to deliver message in the past...";
51
52 timer_->Setup(monotonic_delivered_time);
53 }
54
55 private:
56 // Acutally sends the message, and reschedules.
57 void Send() {
58 // Compute the time to publish this message.
59 const monotonic_clock::time_point monotonic_delivered_time =
60 DeliveredTime(fetcher_->context());
61
62 CHECK_EQ(monotonic_delivered_time, send_node_factory_->monotonic_now())
63 << ": Message to be sent at the wrong time.";
64
65 // And also fill out the send times as well.
66 sender_->Send(fetcher_->context().data, fetcher_->context().size,
67 fetcher_->context().monotonic_event_time,
68 fetcher_->context().realtime_event_time,
69 fetcher_->context().queue_index);
70
71 sent_ = true;
72 Schedule();
73 }
74
75 // Converts from time on the sending node to time on the receiving node.
76 monotonic_clock::time_point DeliveredTime(const Context &context) const {
77 const distributed_clock::time_point distributed_sent_time =
78 fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
79
80 return aos::monotonic_clock::epoch() +
81 (distributed_sent_time - send_node_factory_->ToDistributedClock(
82 aos::monotonic_clock::epoch())) +
83 send_node_factory_->network_delay() +
84 send_node_factory_->send_delay();
85 }
86
87 // Factories used for time conversion.
88 aos::NodeEventLoopFactory *fetch_node_factory_;
89 aos::NodeEventLoopFactory *send_node_factory_;
90
91 // Event loop which sending is scheduled on.
92 aos::EventLoop *send_event_loop_;
93 // Timer used to send.
94 aos::TimerHandler *timer_;
95 // Fetcher used to receive messages.
96 std::unique_ptr<aos::RawFetcher> fetcher_;
97 // Sender to send them back out.
98 std::unique_ptr<aos::RawSender> sender_;
99 // True if we have sent the message in the fetcher.
100 bool sent_ = false;
101};
102
103SimulatedMessageBridge::SimulatedMessageBridge(
104 SimulatedEventLoopFactory *simulated_event_loop_factory) {
105 CHECK(
106 configuration::MultiNode(simulated_event_loop_factory->configuration()));
107
108 // Pre-build up event loops for every node. They are pretty cheap anyways.
109 for (const Node *node : simulated_event_loop_factory->nodes()) {
110 CHECK(event_loop_map_
111 .insert({node, simulated_event_loop_factory->MakeEventLoop(
112 "message_bridge", node)})
113 .second);
114 }
115
116 for (const Channel *channel :
117 *simulated_event_loop_factory->configuration()->channels()) {
118 if (!channel->has_destination_nodes()) {
119 continue;
120 }
121
122 // Find the sending node.
123 const Node *node =
124 configuration::GetNode(simulated_event_loop_factory->configuration(),
125 channel->source_node()->string_view());
126 auto source_event_loop = event_loop_map_.find(node);
127 CHECK(source_event_loop != event_loop_map_.end());
128
129 std::unique_ptr<DelayersVector> delayers =
130 std::make_unique<DelayersVector>();
131
132 // And then build up a RawMessageDelayer for each destination.
133 for (const Connection *connection : *channel->destination_nodes()) {
134 const Node *destination_node =
135 configuration::GetNode(simulated_event_loop_factory->configuration(),
136 connection->name()->string_view());
137 auto destination_event_loop = event_loop_map_.find(destination_node);
138 CHECK(destination_event_loop != event_loop_map_.end());
139
140 delayers->emplace_back(std::make_unique<RawMessageDelayer>(
141 simulated_event_loop_factory->GetNodeEventLoopFactory(node),
142 simulated_event_loop_factory->GetNodeEventLoopFactory(
143 destination_node),
144 destination_event_loop->second.get(),
145 source_event_loop->second->MakeRawFetcher(channel),
146 destination_event_loop->second->MakeRawSender(channel)));
147 }
148
149 // And register every delayer to be poked when a new message shows up.
150 source_event_loop->second->MakeRawWatcher(
151 channel,
152 [captured_delayers = delayers.get()](const Context &, const void *) {
153 for (std::unique_ptr<RawMessageDelayer> &delayer :
154 *captured_delayers) {
155 delayer->Schedule();
156 }
157 });
158 delayers_list_.emplace_back(std::move(delayers));
159 }
160}
161
162SimulatedMessageBridge::~SimulatedMessageBridge() {}
163
164} // namespace message_bridge
165} // namespace aos