blob: 85cfd6fb17ba56fcdf26b35a525c8e3b9725bfac [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#ifndef AOS_NETWORK_WEB_PROXY_H_
2#define AOS_NETWORK_WEB_PROXY_H_
3#include <map>
4#include <set>
James Kuszmaul7ad91522020-09-01 19:15:35 -07005#include "aos/events/event_loop.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08006#include "aos/events/shm_event_loop.h"
Alex Perry5f474f22020-02-01 12:14:24 -08007#include "aos/network/connect_generated.h"
8#include "aos/network/web_proxy_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08009#include "aos/seasocks/seasocks_logger.h"
10#include "flatbuffers/flatbuffers.h"
11#include "seasocks/Server.h"
12#include "seasocks/StringUtil.h"
13#include "seasocks/WebSocket.h"
14
15#include "api/peer_connection_interface.h"
16
17namespace aos {
18namespace web_proxy {
19
20class Connection;
Alex Perry5f474f22020-02-01 12:14:24 -080021class Subscriber;
Alex Perryb3b50792020-01-18 16:13:45 -080022
23// Basic class that handles receiving new websocket connections. Creates a new
24// Connection to manage the rest of the negotiation and data passing. When the
25// websocket closes, it deletes the Connection.
26class WebsocketHandler : public ::seasocks::WebSocket::Handler {
27 public:
James Kuszmaul71a81932020-12-15 21:08:01 -080028 WebsocketHandler(::seasocks::Server *server, aos::EventLoop *event_loop,
29 int buffer_size);
Alex Perryb3b50792020-01-18 16:13:45 -080030 void onConnect(::seasocks::WebSocket *sock) override;
31 void onData(::seasocks::WebSocket *sock, const uint8_t *data,
32 size_t size) override;
33 void onDisconnect(::seasocks::WebSocket *sock) override;
34
35 private:
36 std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
37 ::seasocks::Server *server_;
James Kuszmaul7ad91522020-09-01 19:15:35 -070038 std::vector<std::unique_ptr<Subscriber>> subscribers_;
39 const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
James Kuszmaul71a81932020-12-15 21:08:01 -080040
41 const EventLoop *const event_loop_;
42};
43
44// Wrapper class that manages the seasocks server and WebsocketHandler.
45class WebProxy {
46 public:
47 WebProxy(aos::EventLoop *event_loop, int buffer_size);
48 WebProxy(aos::ShmEventLoop *event_loop, int buffer_size);
49 ~WebProxy();
50
51 void SetDataPath(const char *path) { server_.setStaticPath(path); }
52
53 private:
54 WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
55 int buffer_size);
56
57 aos::internal::EPoll internal_epoll_;
58 aos::internal::EPoll *const epoll_;
59 ::seasocks::Server server_;
60 std::shared_ptr<WebsocketHandler> websocket_handler_;
Alex Perryb3b50792020-01-18 16:13:45 -080061};
62
63// Seasocks requires that sends happen on the correct thread. This class takes a
64// detached buffer to send on a specific websocket connection and sends it when
65// seasocks is ready.
66class UpdateData : public ::seasocks::Server::Runnable {
67 public:
68 UpdateData(::seasocks::WebSocket *websocket,
Alex Perry5f474f22020-02-01 12:14:24 -080069 flatbuffers::DetachedBuffer &&buffer)
Alex Perryb3b50792020-01-18 16:13:45 -080070 : sock_(websocket), buffer_(std::move(buffer)) {}
71 ~UpdateData() override = default;
72 UpdateData(const UpdateData &) = delete;
73 UpdateData &operator=(const UpdateData &) = delete;
74
75 void run() override { sock_->send(buffer_.data(), buffer_.size()); }
76
77 private:
78 ::seasocks::WebSocket *sock_;
Alex Perry5f474f22020-02-01 12:14:24 -080079 const flatbuffers::DetachedBuffer buffer_;
80};
81
82// Represents a fetcher and all the Connections that care about it.
83// Handles building the message and telling each connection to send it.
84// indexed by location of the channel it handles in the config.
James Kuszmaul71a81932020-12-15 21:08:01 -080085// Subscriber also uses an internal buffer to store past messages. This is
86// primarily meant for use in offline log replay/simulation where we want to be
87// able to store infinite buffers. In the future, we will probably want to be
88// able to specify *which* channels to store buffers for so that we aren't just
89// loading the entire logfile into memory.
Alex Perry5f474f22020-02-01 12:14:24 -080090class Subscriber {
91 public:
James Kuszmaul71a81932020-12-15 21:08:01 -080092 Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index,
93 int buffer_size)
Alex Perry5f474f22020-02-01 12:14:24 -080094 : fbb_(1024),
95 fetcher_(std::move(fetcher)),
James Kuszmaul71a81932020-12-15 21:08:01 -080096 channel_index_(channel_index),
97 buffer_size_(buffer_size) {}
Alex Perry5f474f22020-02-01 12:14:24 -080098
99 void RunIteration();
100
James Kuszmaul71a81932020-12-15 21:08:01 -0800101 void AddListener(rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
102 TransferMethod transfer_method);
Alex Perry5f474f22020-02-01 12:14:24 -0800103
104 void RemoveListener(
James Kuszmaul71a81932020-12-15 21:08:01 -0800105 rtc::scoped_refptr<webrtc::DataChannelInterface> channel);
Alex Perry5f474f22020-02-01 12:14:24 -0800106
107 // Check if the Channel passed matches the channel this fetchs.
108 bool Compare(const Channel *channel) const;
109
110 int index() const { return channel_index_; }
111
112 private:
James Kuszmaul71a81932020-12-15 21:08:01 -0800113 struct ChannelInformation {
114 TransferMethod transfer_method;
115 uint32_t current_queue_index = 0;
116 size_t next_packet_number = 0;
117 };
118 struct Message {
119 uint32_t index = 0xffffffff;
120 std::vector<webrtc::DataBuffer> data;
121 };
122
123 const webrtc::DataBuffer *NextBuffer(ChannelInformation *channel);
124 void SkipToLastMessage(ChannelInformation *channel);
125
Alex Perry5f474f22020-02-01 12:14:24 -0800126 flatbuffers::FlatBufferBuilder fbb_;
127 std::unique_ptr<RawFetcher> fetcher_;
128 int channel_index_;
James Kuszmaul71a81932020-12-15 21:08:01 -0800129 int buffer_size_;
130 std::deque<Message> message_buffer_;
131 std::map<rtc::scoped_refptr<webrtc::DataChannelInterface>, ChannelInformation>
132 channels_;
Alex Perryb3b50792020-01-18 16:13:45 -0800133};
134
135// Represents a single connection to a browser for the entire lifetime of the
136// connection.
137class Connection : public webrtc::PeerConnectionObserver,
138 public webrtc::CreateSessionDescriptionObserver,
139 public webrtc::DataChannelObserver {
140 public:
Alex Perry5f474f22020-02-01 12:14:24 -0800141 Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server,
142 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800143 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
144 const EventLoop *event_loop);
Alex Perry5f474f22020-02-01 12:14:24 -0800145
146 ~Connection() {
147 // DataChannel may call OnStateChange after this is destroyed, so make sure
148 // it doesn't.
James Kuszmaul1ec74432020-07-30 20:26:45 -0700149 if (data_channel_) {
150 data_channel_->UnregisterObserver();
151 }
Alex Perry5f474f22020-02-01 12:14:24 -0800152 }
Alex Perryb3b50792020-01-18 16:13:45 -0800153
154 void HandleWebSocketData(const uint8_t *data, size_t size);
155
Alex Perry5f474f22020-02-01 12:14:24 -0800156 void Send(const flatbuffers::DetachedBuffer &buffer) const;
157
Alex Perryb3b50792020-01-18 16:13:45 -0800158 // PeerConnectionObserver implementation
159 void OnSignalingChange(
160 webrtc::PeerConnectionInterface::SignalingState) override {}
161 void OnAddStream(rtc::scoped_refptr<webrtc::MediaStreamInterface>) override {}
162 void OnRemoveStream(
163 rtc::scoped_refptr<webrtc::MediaStreamInterface>) override {}
164 void OnDataChannel(
165 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) override;
166 void OnRenegotiationNeeded() override {}
167 void OnIceConnectionChange(
168 webrtc::PeerConnectionInterface::IceConnectionState state) override {}
169 void OnIceGatheringChange(
170 webrtc::PeerConnectionInterface::IceGatheringState) override {}
171 void OnIceCandidate(const webrtc::IceCandidateInterface *candidate) override;
172 void OnIceConnectionReceivingChange(bool) override {}
173
174 // CreateSessionDescriptionObserver implementation
175 void OnSuccess(webrtc::SessionDescriptionInterface *desc) override;
176 void OnFailure(webrtc::RTCError error) override {}
177 // CreateSessionDescriptionObserver is a refcounted object
178 void AddRef() const override {}
179 // We handle ownership with a unique_ptr so don't worry about actually
180 // refcounting. We will delete when we are done.
181 rtc::RefCountReleaseStatus Release() const override {
182 return rtc::RefCountReleaseStatus::kOtherRefsRemained;
183 }
184
185 // DataChannelObserver implementation
Alex Perry5f474f22020-02-01 12:14:24 -0800186 void OnStateChange() override;
Alex Perryb3b50792020-01-18 16:13:45 -0800187 void OnMessage(const webrtc::DataBuffer &buffer) override;
188 void OnBufferedAmountChange(uint64_t sent_data_size) override {}
189
190 private:
191 ::seasocks::WebSocket *sock_;
192 ::seasocks::Server *server_;
Alex Perry5f474f22020-02-01 12:14:24 -0800193 const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
James Kuszmaul1ec74432020-07-30 20:26:45 -0700194 const std::vector<FlatbufferDetachedBuffer<MessageHeader>> config_headers_;
Alex Perry5f474f22020-02-01 12:14:24 -0800195 std::map<int, rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;
196
Alex Perryb3b50792020-01-18 16:13:45 -0800197 rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
198 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
James Kuszmaul71a81932020-12-15 21:08:01 -0800199
200 const EventLoop *const event_loop_;
Alex Perryb3b50792020-01-18 16:13:45 -0800201};
202
203} // namespace web_proxy
204} // namespace aos
205
206#endif // AOS_NETWORK_WEB_PROXY_H_