blob: c901e7e6c38acff73a1d7987b0dcc9c28c05f1c0 [file] [log] [blame]
Austin Schuh898f4972020-01-11 17:21:25 -08001#ifndef AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
2#define AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
3
4#include "aos/events/event_loop.h"
5#include "aos/events/simulated_event_loop.h"
Austin Schuh4c3b9702020-08-30 11:34:55 -07006#include "aos/network/message_bridge_client_status.h"
7#include "aos/network/message_bridge_server_status.h"
Austin Schuh0de30f32020-12-06 12:44:28 -08008#include "aos/network/remote_message_generated.h"
Austin Schuh89c9b812021-02-20 14:42:10 -08009#include "aos/network/timestamp_channel.h"
Austin Schuh898f4972020-01-11 17:21:25 -080010
11namespace aos {
12namespace message_bridge {
13
14class RawMessageDelayer;
15
16// This class moves messages between nodes. It is implemented as a separate
17// class because it would have been even harder to manage forwarding in the
18// SimulatedEventLoopFactory.
19class SimulatedMessageBridge {
20 public:
21 // Constructs the bridge.
22 SimulatedMessageBridge(
23 SimulatedEventLoopFactory *simulated_event_loop_factory);
24 ~SimulatedMessageBridge();
25
Austin Schuh6f3babe2020-01-26 20:34:50 -080026 // Disables forwarding for this channel. This should be used very rarely only
27 // for things like the logger.
28 void DisableForwarding(const Channel *channel);
29
Austin Schuhc0b0f722020-12-12 18:36:06 -080030 void Disconnect(const Node *source, const Node *other);
31 void Connect(const Node *source, const Node *other);
32 void SetState(const Node *source, const Node *other,
33 message_bridge::State state);
34
Austin Schuh4c3b9702020-08-30 11:34:55 -070035 // Disables generating and sending the messages which message_gateway sends.
36 // The messages are the ClientStatistics, ServerStatistics and Timestamp
37 // messages.
38 void DisableStatistics();
39
Austin Schuh898f4972020-01-11 17:21:25 -080040 private:
Austin Schuh58646e22021-08-23 23:51:46 -070041 struct DelayersVector {
42 std::vector<std::unique_ptr<RawMessageDelayer>> v;
Austin Schuh4c3b9702020-08-30 11:34:55 -070043
Austin Schuh58646e22021-08-23 23:51:46 -070044 bool disable_forwarding = false;
45 };
46 struct State {
47 State(NodeEventLoopFactory *node_factory) : node_factory_(node_factory) {
48 const size_t num_nodes = node_factory->configuration()->nodes()->size();
49 boot_uuids_.resize(num_nodes, UUID::Zero());
50 client_state_.resize(num_nodes, message_bridge::State::CONNECTED);
51 server_state_.resize(num_nodes, message_bridge::State::CONNECTED);
52 }
Austin Schuh4c3b9702020-08-30 11:34:55 -070053 State(const State &state) = delete;
54
Austin Schuh58646e22021-08-23 23:51:46 -070055 void DisableStatistics() {
56 disable_statistics_ = true;
57 if (server_status) {
58 server_status->DisableStatistics();
59 }
60 if (client_status) {
61 client_status->DisableStatistics();
62 }
63 }
64
65 void AddSourceDelayer(RawMessageDelayer *delayer) {
66 source_delayers_.emplace_back(delayer);
67 }
68 void AddDestinationDelayer(RawMessageDelayer *delayer) {
69 destination_delayers_.emplace_back(delayer);
70 }
71
72 void MakeEventLoop() {
73 SetEventLoop(node_factory_->MakeEventLoop("message_bridge"));
74 }
75
76 void ClearEventLoop() { SetEventLoop(nullptr); }
77 void SetEventLoop(std::unique_ptr<aos::EventLoop> loop);
78
79 void SetSendData(std::function<void(const Context &)> fn) {
80 CHECK(!fn_);
81 fn_ = std::move(fn);
82 if (server_status) {
83 server_status->set_send_data(fn_);
84 }
85 }
86
87 void AddDelayerWatcher(const Channel *channel, DelayersVector *v) {
88 delayer_watchers_.emplace_back(channel, v);
89 }
90
91 void SetBootUUID(size_t node_index, const UUID &boot_uuid) {
92 boot_uuids_[node_index] = boot_uuid;
93 const Node *node =
94 node_factory_->configuration()->nodes()->Get(node_index);
95 if (server_status) {
96 ServerConnection *connection =
97 server_status->FindServerConnection(node);
98 if (connection) {
99 connection->mutate_state(server_state_[node_index]);
100 server_status->ResetFilter(node_index);
101 server_status->SetBootUUID(node_index, boot_uuid);
102 }
103 }
104 if (client_status) {
105 const int client_index =
106 client_status->FindClientIndex(node->name()->string_view());
107 ClientConnection *client_connection =
108 client_status->GetClientConnection(client_index);
109 if (client_connection) {
110 client_status->SampleReset(client_index);
111 client_connection->mutate_state(client_state_[node_index]);
112 }
113 }
114 }
115
116 void SetServerState(const Node *destination, message_bridge::State state) {
117 const size_t node_index = configuration::GetNodeIndex(
118 node_factory_->configuration(), destination);
119 server_state_[node_index] = state;
120 if (server_status) {
121 ServerConnection *connection =
122 server_status->FindServerConnection(destination);
123 if (connection == nullptr) return;
124
125 connection->mutate_state(state);
126 }
127 }
128
129 void SetClientState(const Node *source, message_bridge::State state) {
130 const size_t node_index =
131 configuration::GetNodeIndex(node_factory_->configuration(), source);
132 client_state_[node_index] = state;
133 if (client_status) {
134 ClientConnection *connection =
135 client_status->GetClientConnection(source);
136
137 if (connection == nullptr) return;
138
139 connection->mutate_state(state);
140 }
141 }
142
143 std::vector<UUID> boot_uuids_;
144 std::vector<message_bridge::State> client_state_;
145 std::vector<message_bridge::State> server_state_;
146
147 std::vector<std::pair<const Channel *, DelayersVector *>> delayer_watchers_;
148
149 std::function<void(const Context &)> fn_;
150
151 NodeEventLoopFactory *node_factory_;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700152 std::unique_ptr<aos::EventLoop> event_loop;
Austin Schuh89c9b812021-02-20 14:42:10 -0800153 ChannelTimestampSender timestamp_loggers;
Austin Schuh58646e22021-08-23 23:51:46 -0700154 std::unique_ptr<MessageBridgeServerStatus> server_status;
155 std::unique_ptr<MessageBridgeClientStatus> client_status;
156
157 // List of delayers to update whenever this node starts or stops.
158 // Source delayers (which are the ones fetching).
159 std::vector<RawMessageDelayer *> source_delayers_;
160 // Destination delayers (which are the ones sending on the receiving nodes).
161 std::vector<RawMessageDelayer *> destination_delayers_;
162
163 bool disable_statistics_ = false;
Austin Schuh4c3b9702020-08-30 11:34:55 -0700164 };
Austin Schuh898f4972020-01-11 17:21:25 -0800165 // Map of nodes to event loops. This is a member variable so that the
166 // lifetime of the event loops matches the lifetime of the bridge.
Austin Schuh4c3b9702020-08-30 11:34:55 -0700167 std::map<const Node *, State> event_loop_map_;
Austin Schuh898f4972020-01-11 17:21:25 -0800168
Austin Schuh898f4972020-01-11 17:21:25 -0800169 // List of delayers used to resend the messages.
Austin Schuh898f4972020-01-11 17:21:25 -0800170 std::vector<std::unique_ptr<DelayersVector>> delayers_list_;
171};
172
173} // namespace message_bridge
174} // namespace aos
175
176#endif // AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_