blob: dec307835ebdf1092ae6555c41935f3841e34ad2 [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#ifndef AOS_NETWORK_WEB_PROXY_H_
2#define AOS_NETWORK_WEB_PROXY_H_
Austin Schuh52e5e3a2021-04-24 22:30:02 -07003
4#include <deque>
Alex Perryb3b50792020-01-18 16:13:45 -08005#include <map>
James Kuszmaule524ed02023-12-09 13:21:03 -08006#include <optional>
Alex Perryb3b50792020-01-18 16:13:45 -08007#include <set>
Austin Schuh52e5e3a2021-04-24 22:30:02 -07008
Philipp Schrader790cb542023-07-05 21:06:52 -07009#include "flatbuffers/flatbuffers.h"
10
James Kuszmaul7ad91522020-09-01 19:15:35 -070011#include "aos/events/event_loop.h"
James Kuszmaul71a81932020-12-15 21:08:01 -080012#include "aos/events/shm_event_loop.h"
James Kuszmaul8d928d02020-12-25 17:47:49 -080013#include "aos/mutex/mutex.h"
Alex Perry5f474f22020-02-01 12:14:24 -080014#include "aos/network/connect_generated.h"
Austin Schuh52e5e3a2021-04-24 22:30:02 -070015#include "aos/network/rawrtc.h"
Alex Perry5f474f22020-02-01 12:14:24 -080016#include "aos/network/web_proxy_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -080017#include "aos/seasocks/seasocks_logger.h"
Alex Perryb3b50792020-01-18 16:13:45 -080018#include "seasocks/Server.h"
19#include "seasocks/StringUtil.h"
20#include "seasocks/WebSocket.h"
21
Stephan Pleinesd99b1ee2024-02-02 20:56:44 -080022namespace aos::web_proxy {
Alex Perryb3b50792020-01-18 16:13:45 -080023
24class Connection;
Alex Perry5f474f22020-02-01 12:14:24 -080025class Subscriber;
Austin Schuh52e5e3a2021-04-24 22:30:02 -070026class ApplicationConnection;
Alex Perryb3b50792020-01-18 16:13:45 -080027
James Kuszmaul1a29c082022-02-03 14:02:47 -080028enum class StoreHistory {
29 kNo,
30 kYes,
31};
32
Alex Perryb3b50792020-01-18 16:13:45 -080033// Basic class that handles receiving new websocket connections. Creates a new
34// Connection to manage the rest of the negotiation and data passing. When the
35// websocket closes, it deletes the Connection.
36class WebsocketHandler : public ::seasocks::WebSocket::Handler {
37 public:
James Kuszmaul71a81932020-12-15 21:08:01 -080038 WebsocketHandler(::seasocks::Server *server, aos::EventLoop *event_loop,
James Kuszmaul1a29c082022-02-03 14:02:47 -080039 StoreHistory store_history,
40 int per_channel_buffer_size_bytes);
Alex Perryb3b50792020-01-18 16:13:45 -080041 void onConnect(::seasocks::WebSocket *sock) override;
42 void onData(::seasocks::WebSocket *sock, const uint8_t *data,
43 size_t size) override;
44 void onDisconnect(::seasocks::WebSocket *sock) override;
James Kuszmaul147b4c12022-07-13 20:35:27 -070045 // Stops recording data, even if the event loop continues running. This allows
46 // us to continue serving the webserver + websocket server, without having to
47 // load more actual data.
48 void StopRecording() { recording_ = false; }
Alex Perryb3b50792020-01-18 16:13:45 -080049
50 private:
Alex Perryb3b50792020-01-18 16:13:45 -080051 ::seasocks::Server *server_;
James Kuszmaul7ad91522020-09-01 19:15:35 -070052 std::vector<std::unique_ptr<Subscriber>> subscribers_;
53 const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
James Kuszmaul71a81932020-12-15 21:08:01 -080054
Austin Schuh52e5e3a2021-04-24 22:30:02 -070055 std::map<::seasocks::WebSocket *, std::unique_ptr<ApplicationConnection>>
56 connections_;
57
58 EventLoop *const event_loop_;
James Kuszmaul147b4c12022-07-13 20:35:27 -070059 // Whether to pay attention to new messages.
60 bool recording_ = true;
James Kuszmaul71a81932020-12-15 21:08:01 -080061};
62
63// Wrapper class that manages the seasocks server and WebsocketHandler.
64class WebProxy {
65 public:
James Kuszmaul1a29c082022-02-03 14:02:47 -080066 // Constructs a WebProxy object for interacting with a webpage. store_history
67 // and per_channel_buffer_size_bytes specify how we manage delivering LOSSLESS
68 // messages to the client:
69 // * store_history specifies whether we should always buffer up data for all
70 // channels--even for messages that are played prior to the client
71 // connecting. This is mostly useful for log replay where the client
72 // will typically connect after the logfile has been fully loaded/replayed.
73 // * per_channel_buffer_size_bytes is the maximum amount of data to buffer
74 // up per channel (-1 will indicate infinite data, which is used during log
75 // replay). This is divided by the max_size per channel to determine
76 // how many messages to queue up.
77 WebProxy(aos::EventLoop *event_loop, StoreHistory store_history,
78 int per_channel_buffer_size_bytes);
79 WebProxy(aos::ShmEventLoop *event_loop, StoreHistory store_history,
80 int per_channel_buffer_size_bytes);
James Kuszmaulb67409b2022-06-20 16:25:03 -070081 WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
82 StoreHistory store_history, int per_channel_buffer_size_bytes);
James Kuszmaul71a81932020-12-15 21:08:01 -080083 ~WebProxy();
84
85 void SetDataPath(const char *path) { server_.setStaticPath(path); }
86
James Kuszmaul147b4c12022-07-13 20:35:27 -070087 // Stops recording data. Useful for setting end times in log replay.
88 void StopRecording();
89
James Kuszmaul71a81932020-12-15 21:08:01 -080090 private:
James Kuszmaul71a81932020-12-15 21:08:01 -080091 aos::internal::EPoll internal_epoll_;
92 aos::internal::EPoll *const epoll_;
93 ::seasocks::Server server_;
94 std::shared_ptr<WebsocketHandler> websocket_handler_;
Alex Perryb3b50792020-01-18 16:13:45 -080095};
96
97// Seasocks requires that sends happen on the correct thread. This class takes a
98// detached buffer to send on a specific websocket connection and sends it when
99// seasocks is ready.
100class UpdateData : public ::seasocks::Server::Runnable {
101 public:
102 UpdateData(::seasocks::WebSocket *websocket,
Alex Perry5f474f22020-02-01 12:14:24 -0800103 flatbuffers::DetachedBuffer &&buffer)
Alex Perryb3b50792020-01-18 16:13:45 -0800104 : sock_(websocket), buffer_(std::move(buffer)) {}
105 ~UpdateData() override = default;
106 UpdateData(const UpdateData &) = delete;
107 UpdateData &operator=(const UpdateData &) = delete;
108
109 void run() override { sock_->send(buffer_.data(), buffer_.size()); }
110
111 private:
112 ::seasocks::WebSocket *sock_;
Alex Perry5f474f22020-02-01 12:14:24 -0800113 const flatbuffers::DetachedBuffer buffer_;
114};
115
116// Represents a fetcher and all the Connections that care about it.
117// Handles building the message and telling each connection to send it.
118// indexed by location of the channel it handles in the config.
James Kuszmaul71a81932020-12-15 21:08:01 -0800119// Subscriber also uses an internal buffer to store past messages. This is
120// primarily meant for use in offline log replay/simulation where we want to be
121// able to store infinite buffers. In the future, we will probably want to be
122// able to specify *which* channels to store buffers for so that we aren't just
123// loading the entire logfile into memory.
Alex Perry5f474f22020-02-01 12:14:24 -0800124class Subscriber {
125 public:
James Kuszmaul71a81932020-12-15 21:08:01 -0800126 Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index,
James Kuszmaul1a29c082022-02-03 14:02:47 -0800127 StoreHistory store_history, int buffer_size)
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700128 : fetcher_(std::move(fetcher)),
James Kuszmaul71a81932020-12-15 21:08:01 -0800129 channel_index_(channel_index),
James Kuszmaul1a29c082022-02-03 14:02:47 -0800130 store_history_(store_history == StoreHistory::kYes),
James Kuszmaul71a81932020-12-15 21:08:01 -0800131 buffer_size_(buffer_size) {}
Alex Perry5f474f22020-02-01 12:14:24 -0800132
James Kuszmaul147b4c12022-07-13 20:35:27 -0700133 // Runs a single iteration of going through and fetching new data as needed
134 // and servicing any WebRTC channels that are requesting messages.
135 // fetch_new specifies whether we should actually attempt to retrieve new data
136 // on the channel--if false, will only worry about sending existing data to
137 // any clients.
138 void RunIteration(bool fetch_new);
Alex Perry5f474f22020-02-01 12:14:24 -0800139
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700140 void AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
141 TransferMethod transfer_method);
Alex Perry5f474f22020-02-01 12:14:24 -0800142
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700143 void RemoveListener(std::shared_ptr<ScopedDataChannel> data_channel);
Alex Perry5f474f22020-02-01 12:14:24 -0800144
145 private:
James Kuszmaul71a81932020-12-15 21:08:01 -0800146 struct ChannelInformation {
147 TransferMethod transfer_method;
James Kuszmaula5822682021-12-23 18:39:28 -0800148 // Queue index (same as the queue index within the AOS channel) of the
149 // message that we are currently sending or, if we are between messages,
150 // the next message we will send.
James Kuszmaul71a81932020-12-15 21:08:01 -0800151 uint32_t current_queue_index = 0;
James Kuszmaula5822682021-12-23 18:39:28 -0800152 // Index of the next packet to send within current_queue_index (large
153 // messages are broken into multiple packets, as we have encountered
154 // issues with how some WebRTC implementations handle large packets).
James Kuszmaul71a81932020-12-15 21:08:01 -0800155 size_t next_packet_number = 0;
James Kuszmaula5822682021-12-23 18:39:28 -0800156 // The last queue/packet index reported by the client.
157 uint32_t reported_queue_index = 0;
158 size_t reported_packet_index = 0;
James Kuszmaule524ed02023-12-09 13:21:03 -0800159 std::optional<aos::monotonic_clock::time_point> last_report = std::nullopt;
James Kuszmaul71a81932020-12-15 21:08:01 -0800160 };
161 struct Message {
162 uint32_t index = 0xffffffff;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700163 std::vector<std::shared_ptr<struct mbuf>> data;
James Kuszmaul71a81932020-12-15 21:08:01 -0800164 };
165
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700166 std::shared_ptr<struct mbuf> NextBuffer(ChannelInformation *channel);
James Kuszmaul71a81932020-12-15 21:08:01 -0800167 void SkipToLastMessage(ChannelInformation *channel);
168
Alex Perry5f474f22020-02-01 12:14:24 -0800169 std::unique_ptr<RawFetcher> fetcher_;
170 int channel_index_;
James Kuszmaul1a29c082022-02-03 14:02:47 -0800171 // If set, will always build up a buffer of the most recent buffer_size_
172 // messages. If store_history_ is *not* set we will only buffer up messages
173 // while there is an active listener.
174 bool store_history_;
James Kuszmaul71a81932020-12-15 21:08:01 -0800175 int buffer_size_;
176 std::deque<Message> message_buffer_;
James Kuszmaula5822682021-12-23 18:39:28 -0800177 // The ScopedDataChannel that we use for actually sending data over WebRTC
178 // is stored using a weak_ptr because:
179 // (a) There are some dangers of accidentally creating circular dependencies
180 // that prevent a ScopedDataChannel from ever being destroyed.
181 // (b) The inter-dependencies involved are complicated enough that we want
182 // to be able to check whether someone has destroyed the ScopedDataChannel
183 // before using it (if it has been destroyed and the Subscriber still
184 // wants to use it, that is a bug, but checking for bugs is useful).
185 // This particular location *may* be able to get away with a shared_ptr, but
186 // because the ScopedDataChannel effectively destroys itself (see
187 // ScopedDataChannel::StaticDataChannelCloseHandler) while also potentially
188 // holding references to other objects (e.g., through the various handlers
189 // that can be registered), creating unnecessary shared_ptr's is dubious.
190 std::vector<std::pair<std::weak_ptr<ScopedDataChannel>, ChannelInformation>>
191 channels_;
Alex Perryb3b50792020-01-18 16:13:45 -0800192};
193
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700194// Class to manage a WebRTC connection to a browser.
195class ApplicationConnection {
Alex Perryb3b50792020-01-18 16:13:45 -0800196 public:
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700197 ApplicationConnection(
198 ::seasocks::Server *server, ::seasocks::WebSocket *sock,
199 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
200 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
201 const EventLoop *event_loop);
Alex Perry5f474f22020-02-01 12:14:24 -0800202
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700203 ~ApplicationConnection();
Alex Perryb3b50792020-01-18 16:13:45 -0800204
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700205 // Handles a SDP sent through the negotiation channel.
206 void OnSdp(const char *sdp);
207 // Handles a ICE candidate sent through the negotiation channel.
208 void OnIce(const WebSocketIce *ice);
Alex Perryb3b50792020-01-18 16:13:45 -0800209
210 private:
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700211 void LocalCandidate(
212 struct rawrtc_peer_connection_ice_candidate *const candidate,
213 char const *const url);
Alex Perry5f474f22020-02-01 12:14:24 -0800214
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700215 // Handles a signaling channel being made.
216 void OnDataChannel(std::shared_ptr<ScopedDataChannel> channel);
217
218 // Handles data coming in on the signaling channel requesting subscription.
219 void HandleSignallingData(
220 struct mbuf *const
221 buffer, // nullable (in case partial delivery has been requested)
222 const enum rawrtc_data_channel_message_flag /*flags*/);
223
224 RawRTCConnection connection_;
225
226 ::seasocks::Server *server_;
227 ::seasocks::WebSocket *sock_;
228
229 struct ChannelState {
230 std::shared_ptr<ScopedDataChannel> data_channel;
231 bool requested = true;
232 };
233
James Kuszmaul9776b392023-01-14 14:08:08 -0800234 std::map<size_t, ChannelState> channels_;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700235 const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
236
237 const std::vector<FlatbufferDetachedBuffer<MessageHeader>> config_headers_;
James Kuszmaul71a81932020-12-15 21:08:01 -0800238
239 const EventLoop *const event_loop_;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700240
241 std::shared_ptr<ScopedDataChannel> channel_;
Alex Perryb3b50792020-01-18 16:13:45 -0800242};
243
Stephan Pleinesd99b1ee2024-02-02 20:56:44 -0800244} // namespace aos::web_proxy
Alex Perryb3b50792020-01-18 16:13:45 -0800245
246#endif // AOS_NETWORK_WEB_PROXY_H_