blob: f5b3eae17e568e9787513994976c8c4f99d1d126 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#ifndef AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
2#define AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
3
4#include "aos/events/event_loop.h"
5#include "aos/events/simulated_event_loop.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -07006#include "aos/network/message_bridge_client_status.h"
7#include "aos/network/message_bridge_server_status.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08008#include "aos/network/remote_message_generated.h"
Austin Schuh89c9b812021-02-20 14:42:10 -08009#include "aos/network/timestamp_channel.h"
Austin Schuh898f4972020-01-11 17:21:25 -080010
11namespace aos {
12namespace message_bridge {
13
14class RawMessageDelayer;
15
16// This class moves messages between nodes. It is implemented as a separate
17// class because it would have been even harder to manage forwarding in the
18// SimulatedEventLoopFactory.
19class SimulatedMessageBridge {
20 public:
21 // Constructs the bridge.
22 SimulatedMessageBridge(
23 SimulatedEventLoopFactory *simulated_event_loop_factory);
24 ~SimulatedMessageBridge();
25
Austin Schuh6f3babe2020-01-26 20:34:50 -080026 // Disables forwarding for this channel. This should be used very rarely only
27 // for things like the logger.
28 void DisableForwarding(const Channel *channel);
29
Austin Schuhc0b0f722020-12-12 18:36:06 -080030 void Disconnect(const Node *source, const Node *other);
31 void Connect(const Node *source, const Node *other);
32 void SetState(const Node *source, const Node *other,
33 message_bridge::State state);
34
Austin Schuh4c3b9702020-08-30 11:34:55 -070035 // Disables generating and sending the messages which message_gateway sends.
36 // The messages are the ClientStatistics, ServerStatistics and Timestamp
37 // messages.
38 void DisableStatistics();
Austin Schuh48205e62021-11-12 14:13:18 -080039 void DisableStatistics(const Node *node);
40 void EnableStatistics();
41 void EnableStatistics(const Node *node);
Austin Schuh4c3b9702020-08-30 11:34:55 -070042
Austin Schuh898f4972020-01-11 17:21:25 -080043 private:
Austin Schuh58646e22021-08-23 23:51:46 -070044 struct DelayersVector {
45 std::vector<std::unique_ptr<RawMessageDelayer>> v;
Austin Schuh4c3b9702020-08-30 11:34:55 -070046
Austin Schuh58646e22021-08-23 23:51:46 -070047 bool disable_forwarding = false;
48 };
49 struct State {
50 State(NodeEventLoopFactory *node_factory) : node_factory_(node_factory) {
51 const size_t num_nodes = node_factory->configuration()->nodes()->size();
52 boot_uuids_.resize(num_nodes, UUID::Zero());
53 client_state_.resize(num_nodes, message_bridge::State::CONNECTED);
54 server_state_.resize(num_nodes, message_bridge::State::CONNECTED);
55 }
Austin Schuh4c3b9702020-08-30 11:34:55 -070056 State(const State &state) = delete;
57
Austin Schuh58646e22021-08-23 23:51:46 -070058 void DisableStatistics() {
59 disable_statistics_ = true;
60 if (server_status) {
61 server_status->DisableStatistics();
62 }
63 if (client_status) {
64 client_status->DisableStatistics();
65 }
66 }
67
Austin Schuh48205e62021-11-12 14:13:18 -080068 void EnableStatistics() {
69 disable_statistics_ = false;
70 if (server_status) {
71 server_status->EnableStatistics();
72 }
73 if (client_status) {
74 client_status->EnableStatistics();
75 }
76 }
77
Austin Schuh58646e22021-08-23 23:51:46 -070078 void AddSourceDelayer(RawMessageDelayer *delayer) {
79 source_delayers_.emplace_back(delayer);
80 }
81 void AddDestinationDelayer(RawMessageDelayer *delayer) {
82 destination_delayers_.emplace_back(delayer);
83 }
84
85 void MakeEventLoop() {
James Kuszmaul890c2492022-04-06 14:59:31 -070086 // Message bridge isn't the thing that should be catching sent-too-fast,
87 // and may need to be able to forward too-fast messages replayed from old
88 // logfiles.
89 SetEventLoop(node_factory_->MakeEventLoop(
90 "message_bridge", {NodeEventLoopFactory::CheckSentTooFast::kNo,
91 NodeEventLoopFactory::ExclusiveSenders::kNo}));
Austin Schuh58646e22021-08-23 23:51:46 -070092 }
93
Austin Schuh58646e22021-08-23 23:51:46 -070094 void SetEventLoop(std::unique_ptr<aos::EventLoop> loop);
95
96 void SetSendData(std::function<void(const Context &)> fn) {
97 CHECK(!fn_);
98 fn_ = std::move(fn);
99 if (server_status) {
100 server_status->set_send_data(fn_);
101 }
102 }
103
104 void AddDelayerWatcher(const Channel *channel, DelayersVector *v) {
105 delayer_watchers_.emplace_back(channel, v);
106 }
107
108 void SetBootUUID(size_t node_index, const UUID &boot_uuid) {
109 boot_uuids_[node_index] = boot_uuid;
110 const Node *node =
111 node_factory_->configuration()->nodes()->Get(node_index);
112 if (server_status) {
113 ServerConnection *connection =
114 server_status->FindServerConnection(node);
115 if (connection) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800116 if (boot_uuid == UUID::Zero()) {
117 server_status->Disconnect(node_index);
118 server_status->ResetFilter(node_index);
119 } else {
120 switch (server_state_[node_index]) {
121 case message_bridge::State::DISCONNECTED:
122 server_status->Disconnect(node_index);
123 break;
124 case message_bridge::State::CONNECTED:
125 server_status->Connect(node_index, event_loop->monotonic_now());
126 break;
127 }
128 server_status->ResetFilter(node_index);
129 server_status->SetBootUUID(node_index, boot_uuid);
130 }
Austin Schuh58646e22021-08-23 23:51:46 -0700131 }
132 }
133 if (client_status) {
134 const int client_index =
135 client_status->FindClientIndex(node->name()->string_view());
Austin Schuh367a7f42021-11-23 23:04:36 -0800136 client_status->SampleReset(client_index);
137 if (boot_uuid == UUID::Zero()) {
138 client_status->Disconnect(client_index);
139 } else {
140 switch (client_state_[node_index]) {
141 case message_bridge::State::CONNECTED:
142 client_status->Connect(client_index);
143 break;
144 case message_bridge::State::DISCONNECTED:
145 client_status->Disconnect(client_index);
146 break;
147 }
Austin Schuh58646e22021-08-23 23:51:46 -0700148 }
149 }
150 }
151
152 void SetServerState(const Node *destination, message_bridge::State state) {
153 const size_t node_index = configuration::GetNodeIndex(
154 node_factory_->configuration(), destination);
155 server_state_[node_index] = state;
156 if (server_status) {
157 ServerConnection *connection =
158 server_status->FindServerConnection(destination);
159 if (connection == nullptr) return;
160
Austin Schuh367a7f42021-11-23 23:04:36 -0800161 if (state == connection->state()) {
162 return;
163 }
164 switch (state) {
165 case message_bridge::State::DISCONNECTED:
166 server_status->Disconnect(node_index);
167 break;
168 case message_bridge::State::CONNECTED:
169 server_status->Connect(node_index, event_loop->monotonic_now());
170 break;
171 }
Austin Schuh58646e22021-08-23 23:51:46 -0700172 }
173 }
174
175 void SetClientState(const Node *source, message_bridge::State state) {
176 const size_t node_index =
177 configuration::GetNodeIndex(node_factory_->configuration(), source);
178 client_state_[node_index] = state;
179 if (client_status) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800180 const int client_index =
181 client_status->FindClientIndex(source->name()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -0700182 ClientConnection *connection =
183 client_status->GetClientConnection(source);
184
Austin Schuh367a7f42021-11-23 23:04:36 -0800185 // TODO(austin): Are there cases where we want to dedup 2 CONNECTED
186 // calls?
187 if (connection->state() != state) {
188 switch (state) {
189 case message_bridge::State::CONNECTED:
190 client_status->Connect(client_index);
191 break;
192 case message_bridge::State::DISCONNECTED:
193 client_status->Disconnect(client_index);
194 break;
195 }
196 }
Austin Schuh58646e22021-08-23 23:51:46 -0700197 }
198 }
199
200 std::vector<UUID> boot_uuids_;
201 std::vector<message_bridge::State> client_state_;
202 std::vector<message_bridge::State> server_state_;
203
204 std::vector<std::pair<const Channel *, DelayersVector *>> delayer_watchers_;
205
206 std::function<void(const Context &)> fn_;
207
208 NodeEventLoopFactory *node_factory_;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700209 std::unique_ptr<aos::EventLoop> event_loop;
Austin Schuh89c9b812021-02-20 14:42:10 -0800210 ChannelTimestampSender timestamp_loggers;
Austin Schuh58646e22021-08-23 23:51:46 -0700211 std::unique_ptr<MessageBridgeServerStatus> server_status;
212 std::unique_ptr<MessageBridgeClientStatus> client_status;
213
214 // List of delayers to update whenever this node starts or stops.
215 // Source delayers (which are the ones fetching).
216 std::vector<RawMessageDelayer *> source_delayers_;
217 // Destination delayers (which are the ones sending on the receiving nodes).
218 std::vector<RawMessageDelayer *> destination_delayers_;
219
220 bool disable_statistics_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700221 };
Austin Schuh898f4972020-01-11 17:21:25 -0800222 // Map of nodes to event loops. This is a member variable so that the
223 // lifetime of the event loops matches the lifetime of the bridge.
Austin Schuh4c3b9702020-08-30 11:34:55 -0700224 std::map<const Node *, State> event_loop_map_;
Austin Schuh898f4972020-01-11 17:21:25 -0800225
Austin Schuh898f4972020-01-11 17:21:25 -0800226 // List of delayers used to resend the messages.
Austin Schuh898f4972020-01-11 17:21:25 -0800227 std::vector<std::unique_ptr<DelayersVector>> delayers_list_;
228};
229
230} // namespace message_bridge
231} // namespace aos
232
233#endif // AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_