blob: 4ad0630ae1cfd15f66494c705d3f938622c0d969 [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#ifndef AOS_NETWORK_WEB_PROXY_H_
2#define AOS_NETWORK_WEB_PROXY_H_
Austin Schuh52e5e3a2021-04-24 22:30:02 -07003
4#include <deque>
Alex Perryb3b50792020-01-18 16:13:45 -08005#include <map>
6#include <set>
Austin Schuh52e5e3a2021-04-24 22:30:02 -07007
James Kuszmaul7ad91522020-09-01 19:15:35 -07008#include "aos/events/event_loop.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08009#include "aos/events/shm_event_loop.h"
James Kuszmaul8d928d02020-12-25 17:47:49 -080010#include "aos/mutex/mutex.h"
Alex Perry5f474f22020-02-01 12:14:24 -080011#include "aos/network/connect_generated.h"
Austin Schuh52e5e3a2021-04-24 22:30:02 -070012#include "aos/network/rawrtc.h"
Alex Perry5f474f22020-02-01 12:14:24 -080013#include "aos/network/web_proxy_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -080014#include "aos/seasocks/seasocks_logger.h"
15#include "flatbuffers/flatbuffers.h"
16#include "seasocks/Server.h"
17#include "seasocks/StringUtil.h"
18#include "seasocks/WebSocket.h"
19
Alex Perryb3b50792020-01-18 16:13:45 -080020namespace aos {
21namespace web_proxy {
22
23class Connection;
Alex Perry5f474f22020-02-01 12:14:24 -080024class Subscriber;
Austin Schuh52e5e3a2021-04-24 22:30:02 -070025class ApplicationConnection;
Alex Perryb3b50792020-01-18 16:13:45 -080026
27// Basic class that handles receiving new websocket connections. Creates a new
28// Connection to manage the rest of the negotiation and data passing. When the
29// websocket closes, it deletes the Connection.
30class WebsocketHandler : public ::seasocks::WebSocket::Handler {
31 public:
James Kuszmaul71a81932020-12-15 21:08:01 -080032 WebsocketHandler(::seasocks::Server *server, aos::EventLoop *event_loop,
33 int buffer_size);
Alex Perryb3b50792020-01-18 16:13:45 -080034 void onConnect(::seasocks::WebSocket *sock) override;
35 void onData(::seasocks::WebSocket *sock, const uint8_t *data,
36 size_t size) override;
37 void onDisconnect(::seasocks::WebSocket *sock) override;
38
39 private:
Alex Perryb3b50792020-01-18 16:13:45 -080040 ::seasocks::Server *server_;
James Kuszmaul7ad91522020-09-01 19:15:35 -070041 std::vector<std::unique_ptr<Subscriber>> subscribers_;
42 const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
James Kuszmaul71a81932020-12-15 21:08:01 -080043
Austin Schuh52e5e3a2021-04-24 22:30:02 -070044 std::map<::seasocks::WebSocket *, std::unique_ptr<ApplicationConnection>>
45 connections_;
46
47 EventLoop *const event_loop_;
James Kuszmaul71a81932020-12-15 21:08:01 -080048};
49
50// Wrapper class that manages the seasocks server and WebsocketHandler.
51class WebProxy {
52 public:
53 WebProxy(aos::EventLoop *event_loop, int buffer_size);
54 WebProxy(aos::ShmEventLoop *event_loop, int buffer_size);
55 ~WebProxy();
56
57 void SetDataPath(const char *path) { server_.setStaticPath(path); }
58
59 private:
60 WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
61 int buffer_size);
62
63 aos::internal::EPoll internal_epoll_;
64 aos::internal::EPoll *const epoll_;
65 ::seasocks::Server server_;
66 std::shared_ptr<WebsocketHandler> websocket_handler_;
Alex Perryb3b50792020-01-18 16:13:45 -080067};
68
69// Seasocks requires that sends happen on the correct thread. This class takes a
70// detached buffer to send on a specific websocket connection and sends it when
71// seasocks is ready.
72class UpdateData : public ::seasocks::Server::Runnable {
73 public:
74 UpdateData(::seasocks::WebSocket *websocket,
Alex Perry5f474f22020-02-01 12:14:24 -080075 flatbuffers::DetachedBuffer &&buffer)
Alex Perryb3b50792020-01-18 16:13:45 -080076 : sock_(websocket), buffer_(std::move(buffer)) {}
77 ~UpdateData() override = default;
78 UpdateData(const UpdateData &) = delete;
79 UpdateData &operator=(const UpdateData &) = delete;
80
81 void run() override { sock_->send(buffer_.data(), buffer_.size()); }
82
83 private:
84 ::seasocks::WebSocket *sock_;
Alex Perry5f474f22020-02-01 12:14:24 -080085 const flatbuffers::DetachedBuffer buffer_;
86};
87
88// Represents a fetcher and all the Connections that care about it.
89// Handles building the message and telling each connection to send it.
90// indexed by location of the channel it handles in the config.
James Kuszmaul71a81932020-12-15 21:08:01 -080091// Subscriber also uses an internal buffer to store past messages. This is
92// primarily meant for use in offline log replay/simulation where we want to be
93// able to store infinite buffers. In the future, we will probably want to be
94// able to specify *which* channels to store buffers for so that we aren't just
95// loading the entire logfile into memory.
Alex Perry5f474f22020-02-01 12:14:24 -080096class Subscriber {
97 public:
James Kuszmaul71a81932020-12-15 21:08:01 -080098 Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index,
99 int buffer_size)
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700100 : fetcher_(std::move(fetcher)),
James Kuszmaul71a81932020-12-15 21:08:01 -0800101 channel_index_(channel_index),
102 buffer_size_(buffer_size) {}
Alex Perry5f474f22020-02-01 12:14:24 -0800103
104 void RunIteration();
105
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700106 void AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
107 TransferMethod transfer_method);
Alex Perry5f474f22020-02-01 12:14:24 -0800108
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700109 void RemoveListener(std::shared_ptr<ScopedDataChannel> data_channel);
Alex Perry5f474f22020-02-01 12:14:24 -0800110
111 private:
James Kuszmaul71a81932020-12-15 21:08:01 -0800112 struct ChannelInformation {
113 TransferMethod transfer_method;
114 uint32_t current_queue_index = 0;
115 size_t next_packet_number = 0;
116 };
117 struct Message {
118 uint32_t index = 0xffffffff;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700119 std::vector<std::shared_ptr<struct mbuf>> data;
James Kuszmaul71a81932020-12-15 21:08:01 -0800120 };
121
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700122 std::shared_ptr<struct mbuf> NextBuffer(ChannelInformation *channel);
James Kuszmaul71a81932020-12-15 21:08:01 -0800123 void SkipToLastMessage(ChannelInformation *channel);
124
Alex Perry5f474f22020-02-01 12:14:24 -0800125 std::unique_ptr<RawFetcher> fetcher_;
126 int channel_index_;
James Kuszmaul71a81932020-12-15 21:08:01 -0800127 int buffer_size_;
128 std::deque<Message> message_buffer_;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700129 std::map<std::shared_ptr<ScopedDataChannel>, ChannelInformation> channels_;
Alex Perryb3b50792020-01-18 16:13:45 -0800130};
131
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700132// Class to manage a WebRTC connection to a browser.
133class ApplicationConnection {
Alex Perryb3b50792020-01-18 16:13:45 -0800134 public:
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700135 ApplicationConnection(
136 ::seasocks::Server *server, ::seasocks::WebSocket *sock,
137 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
138 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
139 const EventLoop *event_loop);
Alex Perry5f474f22020-02-01 12:14:24 -0800140
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700141 ~ApplicationConnection();
Alex Perryb3b50792020-01-18 16:13:45 -0800142
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700143 // Handles a SDP sent through the negotiation channel.
144 void OnSdp(const char *sdp);
145 // Handles a ICE candidate sent through the negotiation channel.
146 void OnIce(const WebSocketIce *ice);
Alex Perryb3b50792020-01-18 16:13:45 -0800147
148 private:
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700149 void LocalCandidate(
150 struct rawrtc_peer_connection_ice_candidate *const candidate,
151 char const *const url);
Alex Perry5f474f22020-02-01 12:14:24 -0800152
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700153 // Handles a signaling channel being made.
154 void OnDataChannel(std::shared_ptr<ScopedDataChannel> channel);
155
156 // Handles data coming in on the signaling channel requesting subscription.
157 void HandleSignallingData(
158 struct mbuf *const
159 buffer, // nullable (in case partial delivery has been requested)
160 const enum rawrtc_data_channel_message_flag /*flags*/);
161
162 RawRTCConnection connection_;
163
164 ::seasocks::Server *server_;
165 ::seasocks::WebSocket *sock_;
166
167 struct ChannelState {
168 std::shared_ptr<ScopedDataChannel> data_channel;
169 bool requested = true;
170 };
171
172 std::map<int, ChannelState> channels_;
173 const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
174
175 const std::vector<FlatbufferDetachedBuffer<MessageHeader>> config_headers_;
James Kuszmaul71a81932020-12-15 21:08:01 -0800176
177 const EventLoop *const event_loop_;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700178
179 std::shared_ptr<ScopedDataChannel> channel_;
Alex Perryb3b50792020-01-18 16:13:45 -0800180};
181
182} // namespace web_proxy
183} // namespace aos
184
185#endif // AOS_NETWORK_WEB_PROXY_H_