blob: 14a73217498492b5d5154ac2528eb333742d9c45 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#ifndef AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
2#define AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
3
4#include "aos/events/event_loop.h"
5#include "aos/events/simulated_event_loop.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -07006#include "aos/network/message_bridge_client_status.h"
7#include "aos/network/message_bridge_server_status.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08008#include "aos/network/remote_message_generated.h"
Austin Schuh89c9b812021-02-20 14:42:10 -08009#include "aos/network/timestamp_channel.h"
Austin Schuh898f4972020-01-11 17:21:25 -080010
Stephan Pleinesd99b1ee2024-02-02 20:56:44 -080011namespace aos::message_bridge {
Austin Schuh898f4972020-01-11 17:21:25 -080012
13class RawMessageDelayer;
14
15// This class moves messages between nodes. It is implemented as a separate
16// class because it would have been even harder to manage forwarding in the
17// SimulatedEventLoopFactory.
18class SimulatedMessageBridge {
19 public:
20 // Constructs the bridge.
21 SimulatedMessageBridge(
22 SimulatedEventLoopFactory *simulated_event_loop_factory);
23 ~SimulatedMessageBridge();
24
Austin Schuh6f3babe2020-01-26 20:34:50 -080025 // Disables forwarding for this channel. This should be used very rarely only
26 // for things like the logger.
27 void DisableForwarding(const Channel *channel);
28
Austin Schuhc0b0f722020-12-12 18:36:06 -080029 void Disconnect(const Node *source, const Node *other);
30 void Connect(const Node *source, const Node *other);
31 void SetState(const Node *source, const Node *other,
32 message_bridge::State state);
33
Austin Schuh4c3b9702020-08-30 11:34:55 -070034 // Disables generating and sending the messages which message_gateway sends.
35 // The messages are the ClientStatistics, ServerStatistics and Timestamp
36 // messages.
James Kuszmaul94ca5132022-07-19 09:11:08 -070037 enum class DestroySenders { kNo, kYes };
38 void DisableStatistics(DestroySenders destroy_senders = DestroySenders::kNo);
39 void DisableStatistics(const Node *node,
40 DestroySenders destroy_senders = DestroySenders::kNo);
Austin Schuh48205e62021-11-12 14:13:18 -080041 void EnableStatistics();
42 void EnableStatistics(const Node *node);
Austin Schuh4c3b9702020-08-30 11:34:55 -070043
Austin Schuh898f4972020-01-11 17:21:25 -080044 private:
Austin Schuh58646e22021-08-23 23:51:46 -070045 struct DelayersVector {
46 std::vector<std::unique_ptr<RawMessageDelayer>> v;
Austin Schuh4c3b9702020-08-30 11:34:55 -070047
Austin Schuh58646e22021-08-23 23:51:46 -070048 bool disable_forwarding = false;
49 };
50 struct State {
51 State(NodeEventLoopFactory *node_factory) : node_factory_(node_factory) {
52 const size_t num_nodes = node_factory->configuration()->nodes()->size();
53 boot_uuids_.resize(num_nodes, UUID::Zero());
54 client_state_.resize(num_nodes, message_bridge::State::CONNECTED);
55 server_state_.resize(num_nodes, message_bridge::State::CONNECTED);
56 }
Austin Schuh4c3b9702020-08-30 11:34:55 -070057 State(const State &state) = delete;
58
James Kuszmaul94ca5132022-07-19 09:11:08 -070059 void DisableStatistics(DestroySenders destroy_senders) {
Austin Schuh58646e22021-08-23 23:51:46 -070060 disable_statistics_ = true;
James Kuszmaul94ca5132022-07-19 09:11:08 -070061 destroy_senders_ = destroy_senders;
Austin Schuh58646e22021-08-23 23:51:46 -070062 if (server_status) {
James Kuszmaul94ca5132022-07-19 09:11:08 -070063 server_status->DisableStatistics(destroy_senders ==
64 DestroySenders::kYes);
Austin Schuh58646e22021-08-23 23:51:46 -070065 }
66 if (client_status) {
James Kuszmaul94ca5132022-07-19 09:11:08 -070067 client_status->DisableStatistics(destroy_senders ==
68 DestroySenders::kYes);
Austin Schuh58646e22021-08-23 23:51:46 -070069 }
70 }
71
Austin Schuh48205e62021-11-12 14:13:18 -080072 void EnableStatistics() {
73 disable_statistics_ = false;
74 if (server_status) {
75 server_status->EnableStatistics();
76 }
77 if (client_status) {
78 client_status->EnableStatistics();
79 }
80 }
81
Austin Schuh58646e22021-08-23 23:51:46 -070082 void AddSourceDelayer(RawMessageDelayer *delayer) {
83 source_delayers_.emplace_back(delayer);
84 }
85 void AddDestinationDelayer(RawMessageDelayer *delayer) {
86 destination_delayers_.emplace_back(delayer);
87 }
88
89 void MakeEventLoop() {
James Kuszmaul890c2492022-04-06 14:59:31 -070090 // Message bridge isn't the thing that should be catching sent-too-fast,
91 // and may need to be able to forward too-fast messages replayed from old
92 // logfiles.
93 SetEventLoop(node_factory_->MakeEventLoop(
94 "message_bridge", {NodeEventLoopFactory::CheckSentTooFast::kNo,
James Kuszmaul94ca5132022-07-19 09:11:08 -070095 NodeEventLoopFactory::ExclusiveSenders::kNo,
96 {}}));
Austin Schuh58646e22021-08-23 23:51:46 -070097 }
98
Austin Schuh58646e22021-08-23 23:51:46 -070099 void SetEventLoop(std::unique_ptr<aos::EventLoop> loop);
100
James Kuszmaul79b2f032023-06-02 21:02:27 -0700101 void SetSendData(std::function<void()> fn) {
Austin Schuh58646e22021-08-23 23:51:46 -0700102 CHECK(!fn_);
103 fn_ = std::move(fn);
104 if (server_status) {
105 server_status->set_send_data(fn_);
106 }
107 }
108
109 void AddDelayerWatcher(const Channel *channel, DelayersVector *v) {
110 delayer_watchers_.emplace_back(channel, v);
111 }
112
113 void SetBootUUID(size_t node_index, const UUID &boot_uuid) {
114 boot_uuids_[node_index] = boot_uuid;
115 const Node *node =
116 node_factory_->configuration()->nodes()->Get(node_index);
117 if (server_status) {
118 ServerConnection *connection =
119 server_status->FindServerConnection(node);
120 if (connection) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800121 if (boot_uuid == UUID::Zero()) {
122 server_status->Disconnect(node_index);
123 server_status->ResetFilter(node_index);
124 } else {
125 switch (server_state_[node_index]) {
126 case message_bridge::State::DISCONNECTED:
127 server_status->Disconnect(node_index);
128 break;
129 case message_bridge::State::CONNECTED:
130 server_status->Connect(node_index, event_loop->monotonic_now());
131 break;
132 }
133 server_status->ResetFilter(node_index);
134 server_status->SetBootUUID(node_index, boot_uuid);
135 }
Austin Schuh58646e22021-08-23 23:51:46 -0700136 }
137 }
138 if (client_status) {
139 const int client_index =
140 client_status->FindClientIndex(node->name()->string_view());
Austin Schuh367a7f42021-11-23 23:04:36 -0800141 client_status->SampleReset(client_index);
142 if (boot_uuid == UUID::Zero()) {
143 client_status->Disconnect(client_index);
144 } else {
145 switch (client_state_[node_index]) {
146 case message_bridge::State::CONNECTED:
147 client_status->Connect(client_index);
148 break;
149 case message_bridge::State::DISCONNECTED:
150 client_status->Disconnect(client_index);
151 break;
152 }
Austin Schuh58646e22021-08-23 23:51:46 -0700153 }
154 }
155 }
156
157 void SetServerState(const Node *destination, message_bridge::State state) {
158 const size_t node_index = configuration::GetNodeIndex(
159 node_factory_->configuration(), destination);
160 server_state_[node_index] = state;
161 if (server_status) {
162 ServerConnection *connection =
163 server_status->FindServerConnection(destination);
164 if (connection == nullptr) return;
165
Austin Schuh367a7f42021-11-23 23:04:36 -0800166 if (state == connection->state()) {
167 return;
168 }
169 switch (state) {
170 case message_bridge::State::DISCONNECTED:
171 server_status->Disconnect(node_index);
172 break;
173 case message_bridge::State::CONNECTED:
174 server_status->Connect(node_index, event_loop->monotonic_now());
175 break;
176 }
Austin Schuh58646e22021-08-23 23:51:46 -0700177 }
178 }
179
180 void SetClientState(const Node *source, message_bridge::State state) {
181 const size_t node_index =
182 configuration::GetNodeIndex(node_factory_->configuration(), source);
183 client_state_[node_index] = state;
184 if (client_status) {
Austin Schuh367a7f42021-11-23 23:04:36 -0800185 const int client_index =
186 client_status->FindClientIndex(source->name()->string_view());
Austin Schuh58646e22021-08-23 23:51:46 -0700187 ClientConnection *connection =
188 client_status->GetClientConnection(source);
189
Austin Schuh367a7f42021-11-23 23:04:36 -0800190 // TODO(austin): Are there cases where we want to dedup 2 CONNECTED
191 // calls?
192 if (connection->state() != state) {
193 switch (state) {
194 case message_bridge::State::CONNECTED:
195 client_status->Connect(client_index);
196 break;
197 case message_bridge::State::DISCONNECTED:
198 client_status->Disconnect(client_index);
199 break;
200 }
201 }
Austin Schuh58646e22021-08-23 23:51:46 -0700202 }
203 }
204
205 std::vector<UUID> boot_uuids_;
206 std::vector<message_bridge::State> client_state_;
207 std::vector<message_bridge::State> server_state_;
208
209 std::vector<std::pair<const Channel *, DelayersVector *>> delayer_watchers_;
210
James Kuszmaul79b2f032023-06-02 21:02:27 -0700211 std::function<void()> fn_;
Austin Schuh58646e22021-08-23 23:51:46 -0700212
213 NodeEventLoopFactory *node_factory_;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700214 std::unique_ptr<aos::EventLoop> event_loop;
Austin Schuh89c9b812021-02-20 14:42:10 -0800215 ChannelTimestampSender timestamp_loggers;
Austin Schuh58646e22021-08-23 23:51:46 -0700216 std::unique_ptr<MessageBridgeServerStatus> server_status;
217 std::unique_ptr<MessageBridgeClientStatus> client_status;
218
219 // List of delayers to update whenever this node starts or stops.
220 // Source delayers (which are the ones fetching).
221 std::vector<RawMessageDelayer *> source_delayers_;
222 // Destination delayers (which are the ones sending on the receiving nodes).
223 std::vector<RawMessageDelayer *> destination_delayers_;
224
225 bool disable_statistics_ = false;
James Kuszmaul94ca5132022-07-19 09:11:08 -0700226 DestroySenders destroy_senders_ = DestroySenders::kNo;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700227 };
James Kuszmaul94ca5132022-07-19 09:11:08 -0700228
Austin Schuh898f4972020-01-11 17:21:25 -0800229 // Map of nodes to event loops. This is a member variable so that the
230 // lifetime of the event loops matches the lifetime of the bridge.
Austin Schuh4c3b9702020-08-30 11:34:55 -0700231 std::map<const Node *, State> event_loop_map_;
Austin Schuh898f4972020-01-11 17:21:25 -0800232
Austin Schuh898f4972020-01-11 17:21:25 -0800233 // List of delayers used to resend the messages.
Austin Schuh898f4972020-01-11 17:21:25 -0800234 std::vector<std::unique_ptr<DelayersVector>> delayers_list_;
235};
236
Stephan Pleinesd99b1ee2024-02-02 20:56:44 -0800237} // namespace aos::message_bridge
Austin Schuh898f4972020-01-11 17:21:25 -0800238
239#endif // AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_