blob: faa63985a1511b10857e29dda0d79dbb1f7878ce [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#ifndef AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
2#define AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
3
4#include "aos/events/event_loop.h"
5#include "aos/events/simulated_event_loop.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -07006#include "aos/network/message_bridge_client_status.h"
7#include "aos/network/message_bridge_server_status.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08008#include "aos/network/remote_message_generated.h"
Austin Schuh89c9b812021-02-20 14:42:10 -08009#include "aos/network/timestamp_channel.h"
Austin Schuh898f4972020-01-11 17:21:25 -080010
11namespace aos {
12namespace message_bridge {
13
14class RawMessageDelayer;
15
16// This class moves messages between nodes. It is implemented as a separate
17// class because it would have been even harder to manage forwarding in the
18// SimulatedEventLoopFactory.
19class SimulatedMessageBridge {
20 public:
21 // Constructs the bridge.
22 SimulatedMessageBridge(
23 SimulatedEventLoopFactory *simulated_event_loop_factory);
24 ~SimulatedMessageBridge();
25
Austin Schuh6f3babe2020-01-26 20:34:50 -080026 // Disables forwarding for this channel. This should be used very rarely only
27 // for things like the logger.
28 void DisableForwarding(const Channel *channel);
29
Austin Schuhc0b0f722020-12-12 18:36:06 -080030 void Disconnect(const Node *source, const Node *other);
31 void Connect(const Node *source, const Node *other);
32 void SetState(const Node *source, const Node *other,
33 message_bridge::State state);
34
Austin Schuh4c3b9702020-08-30 11:34:55 -070035 // Disables generating and sending the messages which message_gateway sends.
36 // The messages are the ClientStatistics, ServerStatistics and Timestamp
37 // messages.
James Kuszmaul94ca5132022-07-19 09:11:08 -070038 enum class DestroySenders { kNo, kYes };
39 void DisableStatistics(DestroySenders destroy_senders = DestroySenders::kNo);
40 void DisableStatistics(const Node *node,
41 DestroySenders destroy_senders = DestroySenders::kNo);
Austin Schuh48205e62021-11-12 14:13:18 -080042 void EnableStatistics();
43 void EnableStatistics(const Node *node);
Austin Schuh4c3b9702020-08-30 11:34:55 -070044
Austin Schuh898f4972020-01-11 17:21:25 -080045 private:
Austin Schuh58646e22021-08-23 23:51:46 -070046 struct DelayersVector {
47 std::vector<std::unique_ptr<RawMessageDelayer>> v;
Austin Schuh4c3b9702020-08-30 11:34:55 -070048
Austin Schuh58646e22021-08-23 23:51:46 -070049 bool disable_forwarding = false;
50 };
51 struct State {
52 State(NodeEventLoopFactory *node_factory) : node_factory_(node_factory) {
53 const size_t num_nodes = node_factory->configuration()->nodes()->size();
54 boot_uuids_.resize(num_nodes, UUID::Zero());
55 client_state_.resize(num_nodes, message_bridge::State::CONNECTED);
56 server_state_.resize(num_nodes, message_bridge::State::CONNECTED);
57 }
Austin Schuh4c3b9702020-08-30 11:34:55 -070058 State(const State &state) = delete;
59
James Kuszmaul94ca5132022-07-19 09:11:08 -070060 void DisableStatistics(DestroySenders destroy_senders) {
Austin Schuh58646e22021-08-23 23:51:46 -070061 disable_statistics_ = true;
James Kuszmaul94ca5132022-07-19 09:11:08 -070062 destroy_senders_ = destroy_senders;
Austin Schuh58646e22021-08-23 23:51:46 -070063 if (server_status) {
James Kuszmaul94ca5132022-07-19 09:11:08 -070064 server_status->DisableStatistics(destroy_senders ==
65 DestroySenders::kYes);
Austin Schuh58646e22021-08-23 23:51:46 -070066 }
67 if (client_status) {
James Kuszmaul94ca5132022-07-19 09:11:08 -070068 client_status->DisableStatistics(destroy_senders ==
69 DestroySenders::kYes);
Austin Schuh58646e22021-08-23 23:51:46 -070070 }
71 }
72
Austin Schuh48205e62021-11-12 14:13:18 -080073 void EnableStatistics() {
74 disable_statistics_ = false;
75 if (server_status) {
76 server_status->EnableStatistics();
77 }
78 if (client_status) {
79 client_status->EnableStatistics();
80 }
81 }
82
Austin Schuh58646e22021-08-23 23:51:46 -070083 void AddSourceDelayer(RawMessageDelayer *delayer) {
84 source_delayers_.emplace_back(delayer);
85 }
86 void AddDestinationDelayer(RawMessageDelayer *delayer) {
87 destination_delayers_.emplace_back(delayer);
88 }
89
90 void MakeEventLoop() {
James Kuszmaul890c2492022-04-06 14:59:31 -070091 // Message bridge isn't the thing that should be catching sent-too-fast,
92 // and may need to be able to forward too-fast messages replayed from old
93 // logfiles.
94 SetEventLoop(node_factory_->MakeEventLoop(
95 "message_bridge", {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -070096 NodeEventLoopFactory::ExclusiveSenders::kNo,
97 {}}));
Austin Schuh58646e22021-08-23 23:51:46 -070098 }
99
Austin Schuh58646e22021-08-23 23:51:46 -0700100 void SetEventLoop(std::unique_ptr<aos::EventLoop> loop);
101
James Kuszmaul79b2f032023-06-02 21:02:27 -0700102 void SetSendData(std::function<void()> fn) {
Austin Schuh58646e22021-08-23 23:51:46 -0700103 CHECK(!fn_);
104 fn_ = std::move(fn);
105 if (server_status) {
106 server_status->set_send_data(fn_);
107 }
108 }
109
110 void AddDelayerWatcher(const Channel *channel, DelayersVector *v) {
111 delayer_watchers_.emplace_back(channel, v);
112 }
113
114 void SetBootUUID(size_t node_index, const UUID &boot_uuid) {
115 boot_uuids_[node_index] = boot_uuid;
116 const Node *node =
117 node_factory_->configuration()->nodes()->Get(node_index);
118 if (server_status) {
119 ServerConnection *connection =
120 server_status->FindServerConnection(node);
121 if (connection) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800122 if (boot_uuid == UUID::Zero()) {
123 server_status->Disconnect(node_index);
124 server_status->ResetFilter(node_index);
125 } else {
126 switch (server_state_[node_index]) {
127 case message_bridge::State::DISCONNECTED:
128 server_status->Disconnect(node_index);
129 break;
130 case message_bridge::State::CONNECTED:
131 server_status->Connect(node_index, event_loop->monotonic_now());
132 break;
133 }
134 server_status->ResetFilter(node_index);
135 server_status->SetBootUUID(node_index, boot_uuid);
136 }
Austin Schuh58646e22021-08-23 23:51:46 -0700137 }
138 }
139 if (client_status) {
140 const int client_index =
141 client_status->FindClientIndex(node->name()->string_view());
Austin Schuh367a7f42021-11-23 23:04:36 -0800142 client_status->SampleReset(client_index);
143 if (boot_uuid == UUID::Zero()) {
144 client_status->Disconnect(client_index);
145 } else {
146 switch (client_state_[node_index]) {
147 case message_bridge::State::CONNECTED:
148 client_status->Connect(client_index);
149 break;
150 case message_bridge::State::DISCONNECTED:
151 client_status->Disconnect(client_index);
152 break;
153 }
Austin Schuh58646e22021-08-23 23:51:46 -0700154 }
155 }
156 }
157
158 void SetServerState(const Node *destination, message_bridge::State state) {
159 const size_t node_index = configuration::GetNodeIndex(
160 node_factory_->configuration(), destination);
161 server_state_[node_index] = state;
162 if (server_status) {
163 ServerConnection *connection =
164 server_status->FindServerConnection(destination);
165 if (connection == nullptr) return;
166
Austin Schuh367a7f42021-11-23 23:04:36 -0800167 if (state == connection->state()) {
168 return;
169 }
170 switch (state) {
171 case message_bridge::State::DISCONNECTED:
172 server_status->Disconnect(node_index);
173 break;
174 case message_bridge::State::CONNECTED:
175 server_status->Connect(node_index, event_loop->monotonic_now());
176 break;
177 }
Austin Schuh58646e22021-08-23 23:51:46 -0700178 }
179 }
180
181 void SetClientState(const Node *source, message_bridge::State state) {
182 const size_t node_index =
183 configuration::GetNodeIndex(node_factory_->configuration(), source);
184 client_state_[node_index] = state;
185 if (client_status) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800186 const int client_index =
187 client_status->FindClientIndex(source->name()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -0700188 ClientConnection *connection =
189 client_status->GetClientConnection(source);
190
Austin Schuh367a7f42021-11-23 23:04:36 -0800191 // TODO(austin): Are there cases where we want to dedup 2 CONNECTED
192 // calls?
193 if (connection->state() != state) {
194 switch (state) {
195 case message_bridge::State::CONNECTED:
196 client_status->Connect(client_index);
197 break;
198 case message_bridge::State::DISCONNECTED:
199 client_status->Disconnect(client_index);
200 break;
201 }
202 }
Austin Schuh58646e22021-08-23 23:51:46 -0700203 }
204 }
205
206 std::vector<UUID> boot_uuids_;
207 std::vector<message_bridge::State> client_state_;
208 std::vector<message_bridge::State> server_state_;
209
210 std::vector<std::pair<const Channel *, DelayersVector *>> delayer_watchers_;
211
James Kuszmaul79b2f032023-06-02 21:02:27 -0700212 std::function<void()> fn_;
Austin Schuh58646e22021-08-23 23:51:46 -0700213
214 NodeEventLoopFactory *node_factory_;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700215 std::unique_ptr<aos::EventLoop> event_loop;
Austin Schuh89c9b812021-02-20 14:42:10 -0800216 ChannelTimestampSender timestamp_loggers;
Austin Schuh58646e22021-08-23 23:51:46 -0700217 std::unique_ptr<MessageBridgeServerStatus> server_status;
218 std::unique_ptr<MessageBridgeClientStatus> client_status;
219
220 // List of delayers to update whenever this node starts or stops.
221 // Source delayers (which are the ones fetching).
222 std::vector<RawMessageDelayer *> source_delayers_;
223 // Destination delayers (which are the ones sending on the receiving nodes).
224 std::vector<RawMessageDelayer *> destination_delayers_;
225
226 bool disable_statistics_ = false;
James Kuszmaul94ca5132022-07-19 09:11:08 -0700227 DestroySenders destroy_senders_ = DestroySenders::kNo;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700228 };
James Kuszmaul94ca5132022-07-19 09:11:08 -0700229
Austin Schuh898f4972020-01-11 17:21:25 -0800230 // Map of nodes to event loops. This is a member variable so that the
231 // lifetime of the event loops matches the lifetime of the bridge.
Austin Schuh4c3b9702020-08-30 11:34:55 -0700232 std::map<const Node *, State> event_loop_map_;
Austin Schuh898f4972020-01-11 17:21:25 -0800233
Austin Schuh898f4972020-01-11 17:21:25 -0800234 // List of delayers used to resend the messages.
Austin Schuh898f4972020-01-11 17:21:25 -0800235 std::vector<std::unique_ptr<DelayersVector>> delayers_list_;
236};
237
238} // namespace message_bridge
239} // namespace aos
240
241#endif // AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_