blob: baca26e60b2ddcbde23307b9af7e5cb786905946 [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#ifndef AOS_NETWORK_WEB_PROXY_H_
2#define AOS_NETWORK_WEB_PROXY_H_
Austin Schuh52e5e3a2021-04-24 22:30:02 -07003
4#include <deque>
Alex Perryb3b50792020-01-18 16:13:45 -08005#include <map>
6#include <set>
Austin Schuh52e5e3a2021-04-24 22:30:02 -07007
James Kuszmaul7ad91522020-09-01 19:15:35 -07008#include "aos/events/event_loop.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08009#include "aos/events/shm_event_loop.h"
James Kuszmaul8d928d02020-12-25 17:47:49 -080010#include "aos/mutex/mutex.h"
Alex Perry5f474f22020-02-01 12:14:24 -080011#include "aos/network/connect_generated.h"
Austin Schuh52e5e3a2021-04-24 22:30:02 -070012#include "aos/network/rawrtc.h"
Alex Perry5f474f22020-02-01 12:14:24 -080013#include "aos/network/web_proxy_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -080014#include "aos/seasocks/seasocks_logger.h"
15#include "flatbuffers/flatbuffers.h"
16#include "seasocks/Server.h"
17#include "seasocks/StringUtil.h"
18#include "seasocks/WebSocket.h"
19
Alex Perryb3b50792020-01-18 16:13:45 -080020namespace aos {
21namespace web_proxy {
22
23class Connection;
Alex Perry5f474f22020-02-01 12:14:24 -080024class Subscriber;
Austin Schuh52e5e3a2021-04-24 22:30:02 -070025class ApplicationConnection;
Alex Perryb3b50792020-01-18 16:13:45 -080026
James Kuszmaul1a29c082022-02-03 14:02:47 -080027enum class StoreHistory {
28 kNo,
29 kYes,
30};
31
Alex Perryb3b50792020-01-18 16:13:45 -080032// Basic class that handles receiving new websocket connections. Creates a new
33// Connection to manage the rest of the negotiation and data passing. When the
34// websocket closes, it deletes the Connection.
35class WebsocketHandler : public ::seasocks::WebSocket::Handler {
36 public:
James Kuszmaul71a81932020-12-15 21:08:01 -080037 WebsocketHandler(::seasocks::Server *server, aos::EventLoop *event_loop,
James Kuszmaul1a29c082022-02-03 14:02:47 -080038 StoreHistory store_history,
39 int per_channel_buffer_size_bytes);
Alex Perryb3b50792020-01-18 16:13:45 -080040 void onConnect(::seasocks::WebSocket *sock) override;
41 void onData(::seasocks::WebSocket *sock, const uint8_t *data,
42 size_t size) override;
43 void onDisconnect(::seasocks::WebSocket *sock) override;
44
45 private:
Alex Perryb3b50792020-01-18 16:13:45 -080046 ::seasocks::Server *server_;
James Kuszmaul7ad91522020-09-01 19:15:35 -070047 std::vector<std::unique_ptr<Subscriber>> subscribers_;
48 const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
James Kuszmaul71a81932020-12-15 21:08:01 -080049
Austin Schuh52e5e3a2021-04-24 22:30:02 -070050 std::map<::seasocks::WebSocket *, std::unique_ptr<ApplicationConnection>>
51 connections_;
52
53 EventLoop *const event_loop_;
James Kuszmaul71a81932020-12-15 21:08:01 -080054};
55
56// Wrapper class that manages the seasocks server and WebsocketHandler.
57class WebProxy {
58 public:
James Kuszmaul1a29c082022-02-03 14:02:47 -080059 // Constructs a WebProxy object for interacting with a webpage. store_history
60 // and per_channel_buffer_size_bytes specify how we manage delivering LOSSLESS
61 // messages to the client:
62 // * store_history specifies whether we should always buffer up data for all
63 // channels--even for messages that are played prior to the client
64 // connecting. This is mostly useful for log replay where the client
65 // will typically connect after the logfile has been fully loaded/replayed.
66 // * per_channel_buffer_size_bytes is the maximum amount of data to buffer
67 // up per channel (-1 will indicate infinite data, which is used during log
68 // replay). This is divided by the max_size per channel to determine
69 // how many messages to queue up.
70 WebProxy(aos::EventLoop *event_loop, StoreHistory store_history,
71 int per_channel_buffer_size_bytes);
72 WebProxy(aos::ShmEventLoop *event_loop, StoreHistory store_history,
73 int per_channel_buffer_size_bytes);
James Kuszmaul71a81932020-12-15 21:08:01 -080074 ~WebProxy();
75
76 void SetDataPath(const char *path) { server_.setStaticPath(path); }
77
78 private:
79 WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
James Kuszmaul1a29c082022-02-03 14:02:47 -080080 StoreHistory store_history, int per_channel_buffer_size_bytes);
James Kuszmaul71a81932020-12-15 21:08:01 -080081
82 aos::internal::EPoll internal_epoll_;
83 aos::internal::EPoll *const epoll_;
84 ::seasocks::Server server_;
85 std::shared_ptr<WebsocketHandler> websocket_handler_;
Alex Perryb3b50792020-01-18 16:13:45 -080086};
87
88// Seasocks requires that sends happen on the correct thread. This class takes a
89// detached buffer to send on a specific websocket connection and sends it when
90// seasocks is ready.
91class UpdateData : public ::seasocks::Server::Runnable {
92 public:
93 UpdateData(::seasocks::WebSocket *websocket,
Alex Perry5f474f22020-02-01 12:14:24 -080094 flatbuffers::DetachedBuffer &&buffer)
Alex Perryb3b50792020-01-18 16:13:45 -080095 : sock_(websocket), buffer_(std::move(buffer)) {}
96 ~UpdateData() override = default;
97 UpdateData(const UpdateData &) = delete;
98 UpdateData &operator=(const UpdateData &) = delete;
99
100 void run() override { sock_->send(buffer_.data(), buffer_.size()); }
101
102 private:
103 ::seasocks::WebSocket *sock_;
Alex Perry5f474f22020-02-01 12:14:24 -0800104 const flatbuffers::DetachedBuffer buffer_;
105};
106
107// Represents a fetcher and all the Connections that care about it.
108// Handles building the message and telling each connection to send it.
109// indexed by location of the channel it handles in the config.
James Kuszmaul71a81932020-12-15 21:08:01 -0800110// Subscriber also uses an internal buffer to store past messages. This is
111// primarily meant for use in offline log replay/simulation where we want to be
112// able to store infinite buffers. In the future, we will probably want to be
113// able to specify *which* channels to store buffers for so that we aren't just
114// loading the entire logfile into memory.
Alex Perry5f474f22020-02-01 12:14:24 -0800115class Subscriber {
116 public:
James Kuszmaul71a81932020-12-15 21:08:01 -0800117 Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index,
James Kuszmaul1a29c082022-02-03 14:02:47 -0800118 StoreHistory store_history, int buffer_size)
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700119 : fetcher_(std::move(fetcher)),
James Kuszmaul71a81932020-12-15 21:08:01 -0800120 channel_index_(channel_index),
James Kuszmaul1a29c082022-02-03 14:02:47 -0800121 store_history_(store_history == StoreHistory::kYes),
James Kuszmaul71a81932020-12-15 21:08:01 -0800122 buffer_size_(buffer_size) {}
Alex Perry5f474f22020-02-01 12:14:24 -0800123
124 void RunIteration();
125
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700126 void AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
127 TransferMethod transfer_method);
Alex Perry5f474f22020-02-01 12:14:24 -0800128
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700129 void RemoveListener(std::shared_ptr<ScopedDataChannel> data_channel);
Alex Perry5f474f22020-02-01 12:14:24 -0800130
131 private:
James Kuszmaul71a81932020-12-15 21:08:01 -0800132 struct ChannelInformation {
133 TransferMethod transfer_method;
James Kuszmaula5822682021-12-23 18:39:28 -0800134 // Queue index (same as the queue index within the AOS channel) of the
135 // message that we are currently sending or, if we are between messages,
136 // the next message we will send.
James Kuszmaul71a81932020-12-15 21:08:01 -0800137 uint32_t current_queue_index = 0;
James Kuszmaula5822682021-12-23 18:39:28 -0800138 // Index of the next packet to send within current_queue_index (large
139 // messages are broken into multiple packets, as we have encountered
140 // issues with how some WebRTC implementations handle large packets).
James Kuszmaul71a81932020-12-15 21:08:01 -0800141 size_t next_packet_number = 0;
James Kuszmaula5822682021-12-23 18:39:28 -0800142 // The last queue/packet index reported by the client.
143 uint32_t reported_queue_index = 0;
144 size_t reported_packet_index = 0;
James Kuszmaul71a81932020-12-15 21:08:01 -0800145 };
146 struct Message {
147 uint32_t index = 0xffffffff;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700148 std::vector<std::shared_ptr<struct mbuf>> data;
James Kuszmaul71a81932020-12-15 21:08:01 -0800149 };
150
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700151 std::shared_ptr<struct mbuf> NextBuffer(ChannelInformation *channel);
James Kuszmaul71a81932020-12-15 21:08:01 -0800152 void SkipToLastMessage(ChannelInformation *channel);
153
Alex Perry5f474f22020-02-01 12:14:24 -0800154 std::unique_ptr<RawFetcher> fetcher_;
155 int channel_index_;
James Kuszmaul1a29c082022-02-03 14:02:47 -0800156 // If set, will always build up a buffer of the most recent buffer_size_
157 // messages. If store_history_ is *not* set we will only buffer up messages
158 // while there is an active listener.
159 bool store_history_;
James Kuszmaul71a81932020-12-15 21:08:01 -0800160 int buffer_size_;
161 std::deque<Message> message_buffer_;
James Kuszmaula5822682021-12-23 18:39:28 -0800162 // The ScopedDataChannel that we use for actually sending data over WebRTC
163 // is stored using a weak_ptr because:
164 // (a) There are some dangers of accidentally creating circular dependencies
165 // that prevent a ScopedDataChannel from ever being destroyed.
166 // (b) The inter-dependencies involved are complicated enough that we want
167 // to be able to check whether someone has destroyed the ScopedDataChannel
168 // before using it (if it has been destroyed and the Subscriber still
169 // wants to use it, that is a bug, but checking for bugs is useful).
170 // This particular location *may* be able to get away with a shared_ptr, but
171 // because the ScopedDataChannel effectively destroys itself (see
172 // ScopedDataChannel::StaticDataChannelCloseHandler) while also potentially
173 // holding references to other objects (e.g., through the various handlers
174 // that can be registered), creating unnecessary shared_ptr's is dubious.
175 std::vector<std::pair<std::weak_ptr<ScopedDataChannel>, ChannelInformation>>
176 channels_;
Alex Perryb3b50792020-01-18 16:13:45 -0800177};
178
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700179// Class to manage a WebRTC connection to a browser.
180class ApplicationConnection {
Alex Perryb3b50792020-01-18 16:13:45 -0800181 public:
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700182 ApplicationConnection(
183 ::seasocks::Server *server, ::seasocks::WebSocket *sock,
184 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
185 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
186 const EventLoop *event_loop);
Alex Perry5f474f22020-02-01 12:14:24 -0800187
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700188 ~ApplicationConnection();
Alex Perryb3b50792020-01-18 16:13:45 -0800189
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700190 // Handles a SDP sent through the negotiation channel.
191 void OnSdp(const char *sdp);
192 // Handles a ICE candidate sent through the negotiation channel.
193 void OnIce(const WebSocketIce *ice);
Alex Perryb3b50792020-01-18 16:13:45 -0800194
195 private:
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700196 void LocalCandidate(
197 struct rawrtc_peer_connection_ice_candidate *const candidate,
198 char const *const url);
Alex Perry5f474f22020-02-01 12:14:24 -0800199
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700200 // Handles a signaling channel being made.
201 void OnDataChannel(std::shared_ptr<ScopedDataChannel> channel);
202
203 // Handles data coming in on the signaling channel requesting subscription.
204 void HandleSignallingData(
205 struct mbuf *const
206 buffer, // nullable (in case partial delivery has been requested)
207 const enum rawrtc_data_channel_message_flag /*flags*/);
208
209 RawRTCConnection connection_;
210
211 ::seasocks::Server *server_;
212 ::seasocks::WebSocket *sock_;
213
214 struct ChannelState {
215 std::shared_ptr<ScopedDataChannel> data_channel;
216 bool requested = true;
217 };
218
219 std::map<int, ChannelState> channels_;
220 const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
221
222 const std::vector<FlatbufferDetachedBuffer<MessageHeader>> config_headers_;
James Kuszmaul71a81932020-12-15 21:08:01 -0800223
224 const EventLoop *const event_loop_;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700225
226 std::shared_ptr<ScopedDataChannel> channel_;
Alex Perryb3b50792020-01-18 16:13:45 -0800227};
228
229} // namespace web_proxy
230} // namespace aos
231
232#endif // AOS_NETWORK_WEB_PROXY_H_