blob: ab524debcbfcf980a570e51e7c988759a72a8383 [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#ifndef AOS_NETWORK_WEB_PROXY_H_
2#define AOS_NETWORK_WEB_PROXY_H_
3#include <map>
4#include <set>
James Kuszmaul7ad91522020-09-01 19:15:35 -07005#include "aos/events/event_loop.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08006#include "aos/events/shm_event_loop.h"
James Kuszmaul8d928d02020-12-25 17:47:49 -08007#include "aos/mutex/mutex.h"
Alex Perry5f474f22020-02-01 12:14:24 -08008#include "aos/network/connect_generated.h"
9#include "aos/network/web_proxy_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -080010#include "aos/seasocks/seasocks_logger.h"
11#include "flatbuffers/flatbuffers.h"
12#include "seasocks/Server.h"
13#include "seasocks/StringUtil.h"
14#include "seasocks/WebSocket.h"
15
16#include "api/peer_connection_interface.h"
17
18namespace aos {
19namespace web_proxy {
20
21class Connection;
Alex Perry5f474f22020-02-01 12:14:24 -080022class Subscriber;
Alex Perryb3b50792020-01-18 16:13:45 -080023
24// Basic class that handles receiving new websocket connections. Creates a new
25// Connection to manage the rest of the negotiation and data passing. When the
26// websocket closes, it deletes the Connection.
27class WebsocketHandler : public ::seasocks::WebSocket::Handler {
28 public:
James Kuszmaul71a81932020-12-15 21:08:01 -080029 WebsocketHandler(::seasocks::Server *server, aos::EventLoop *event_loop,
30 int buffer_size);
Alex Perryb3b50792020-01-18 16:13:45 -080031 void onConnect(::seasocks::WebSocket *sock) override;
32 void onData(::seasocks::WebSocket *sock, const uint8_t *data,
33 size_t size) override;
34 void onDisconnect(::seasocks::WebSocket *sock) override;
35
36 private:
37 std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
38 ::seasocks::Server *server_;
James Kuszmaul7ad91522020-09-01 19:15:35 -070039 std::vector<std::unique_ptr<Subscriber>> subscribers_;
40 const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
James Kuszmaul71a81932020-12-15 21:08:01 -080041
42 const EventLoop *const event_loop_;
43};
44
45// Wrapper class that manages the seasocks server and WebsocketHandler.
46class WebProxy {
47 public:
48 WebProxy(aos::EventLoop *event_loop, int buffer_size);
49 WebProxy(aos::ShmEventLoop *event_loop, int buffer_size);
50 ~WebProxy();
51
52 void SetDataPath(const char *path) { server_.setStaticPath(path); }
53
54 private:
55 WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
56 int buffer_size);
57
58 aos::internal::EPoll internal_epoll_;
59 aos::internal::EPoll *const epoll_;
60 ::seasocks::Server server_;
61 std::shared_ptr<WebsocketHandler> websocket_handler_;
Alex Perryb3b50792020-01-18 16:13:45 -080062};
63
64// Seasocks requires that sends happen on the correct thread. This class takes a
65// detached buffer to send on a specific websocket connection and sends it when
66// seasocks is ready.
67class UpdateData : public ::seasocks::Server::Runnable {
68 public:
69 UpdateData(::seasocks::WebSocket *websocket,
Alex Perry5f474f22020-02-01 12:14:24 -080070 flatbuffers::DetachedBuffer &&buffer)
Alex Perryb3b50792020-01-18 16:13:45 -080071 : sock_(websocket), buffer_(std::move(buffer)) {}
72 ~UpdateData() override = default;
73 UpdateData(const UpdateData &) = delete;
74 UpdateData &operator=(const UpdateData &) = delete;
75
76 void run() override { sock_->send(buffer_.data(), buffer_.size()); }
77
78 private:
79 ::seasocks::WebSocket *sock_;
Alex Perry5f474f22020-02-01 12:14:24 -080080 const flatbuffers::DetachedBuffer buffer_;
81};
82
83// Represents a fetcher and all the Connections that care about it.
84// Handles building the message and telling each connection to send it.
85// indexed by location of the channel it handles in the config.
James Kuszmaul71a81932020-12-15 21:08:01 -080086// Subscriber also uses an internal buffer to store past messages. This is
87// primarily meant for use in offline log replay/simulation where we want to be
88// able to store infinite buffers. In the future, we will probably want to be
89// able to specify *which* channels to store buffers for so that we aren't just
90// loading the entire logfile into memory.
Alex Perry5f474f22020-02-01 12:14:24 -080091class Subscriber {
92 public:
James Kuszmaul71a81932020-12-15 21:08:01 -080093 Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index,
94 int buffer_size)
Alex Perry5f474f22020-02-01 12:14:24 -080095 : fbb_(1024),
96 fetcher_(std::move(fetcher)),
James Kuszmaul71a81932020-12-15 21:08:01 -080097 channel_index_(channel_index),
98 buffer_size_(buffer_size) {}
Alex Perry5f474f22020-02-01 12:14:24 -080099
100 void RunIteration();
101
James Kuszmaul71a81932020-12-15 21:08:01 -0800102 void AddListener(rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
James Kuszmaul8d928d02020-12-25 17:47:49 -0800103 TransferMethod transfer_method,
104 rtc::Thread *signaling_thread);
Alex Perry5f474f22020-02-01 12:14:24 -0800105
106 void RemoveListener(
James Kuszmaul71a81932020-12-15 21:08:01 -0800107 rtc::scoped_refptr<webrtc::DataChannelInterface> channel);
Alex Perry5f474f22020-02-01 12:14:24 -0800108
109 // Check if the Channel passed matches the channel this fetchs.
110 bool Compare(const Channel *channel) const;
111
112 int index() const { return channel_index_; }
113
114 private:
James Kuszmaul71a81932020-12-15 21:08:01 -0800115 struct ChannelInformation {
116 TransferMethod transfer_method;
117 uint32_t current_queue_index = 0;
118 size_t next_packet_number = 0;
James Kuszmaul8d928d02020-12-25 17:47:49 -0800119 // Thread to use for making calls to the DataChannelInterface.
120 rtc::Thread *signaling_thread;
James Kuszmaul71a81932020-12-15 21:08:01 -0800121 };
122 struct Message {
123 uint32_t index = 0xffffffff;
124 std::vector<webrtc::DataBuffer> data;
125 };
126
127 const webrtc::DataBuffer *NextBuffer(ChannelInformation *channel);
128 void SkipToLastMessage(ChannelInformation *channel);
129
Alex Perry5f474f22020-02-01 12:14:24 -0800130 flatbuffers::FlatBufferBuilder fbb_;
131 std::unique_ptr<RawFetcher> fetcher_;
132 int channel_index_;
James Kuszmaul71a81932020-12-15 21:08:01 -0800133 int buffer_size_;
134 std::deque<Message> message_buffer_;
135 std::map<rtc::scoped_refptr<webrtc::DataChannelInterface>, ChannelInformation>
136 channels_;
James Kuszmaul8d928d02020-12-25 17:47:49 -0800137 // In order to enable the Connection class to add/remove listeners
138 // asyncrhonously, queue up all the newly added listeners in pending_*
139 // members. Access to these members is controlled by mutex_.
140 std::map<rtc::scoped_refptr<webrtc::DataChannelInterface>, ChannelInformation>
141 pending_channels_;
142 std::vector<rtc::scoped_refptr<webrtc::DataChannelInterface>>
143 pending_removal_;
144
145 aos::Mutex mutex_;
Alex Perryb3b50792020-01-18 16:13:45 -0800146};
147
148// Represents a single connection to a browser for the entire lifetime of the
149// connection.
150class Connection : public webrtc::PeerConnectionObserver,
151 public webrtc::CreateSessionDescriptionObserver,
152 public webrtc::DataChannelObserver {
153 public:
Alex Perry5f474f22020-02-01 12:14:24 -0800154 Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server,
155 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800156 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
157 const EventLoop *event_loop);
Alex Perry5f474f22020-02-01 12:14:24 -0800158
159 ~Connection() {
160 // DataChannel may call OnStateChange after this is destroyed, so make sure
161 // it doesn't.
James Kuszmaul1ec74432020-07-30 20:26:45 -0700162 if (data_channel_) {
163 data_channel_->UnregisterObserver();
164 }
Alex Perry5f474f22020-02-01 12:14:24 -0800165 }
Alex Perryb3b50792020-01-18 16:13:45 -0800166
167 void HandleWebSocketData(const uint8_t *data, size_t size);
168
Alex Perry5f474f22020-02-01 12:14:24 -0800169 void Send(const flatbuffers::DetachedBuffer &buffer) const;
170
Alex Perryb3b50792020-01-18 16:13:45 -0800171 // PeerConnectionObserver implementation
172 void OnSignalingChange(
173 webrtc::PeerConnectionInterface::SignalingState) override {}
174 void OnAddStream(rtc::scoped_refptr<webrtc::MediaStreamInterface>) override {}
175 void OnRemoveStream(
176 rtc::scoped_refptr<webrtc::MediaStreamInterface>) override {}
177 void OnDataChannel(
178 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) override;
179 void OnRenegotiationNeeded() override {}
180 void OnIceConnectionChange(
181 webrtc::PeerConnectionInterface::IceConnectionState state) override {}
182 void OnIceGatheringChange(
183 webrtc::PeerConnectionInterface::IceGatheringState) override {}
184 void OnIceCandidate(const webrtc::IceCandidateInterface *candidate) override;
185 void OnIceConnectionReceivingChange(bool) override {}
186
187 // CreateSessionDescriptionObserver implementation
188 void OnSuccess(webrtc::SessionDescriptionInterface *desc) override;
189 void OnFailure(webrtc::RTCError error) override {}
190 // CreateSessionDescriptionObserver is a refcounted object
191 void AddRef() const override {}
192 // We handle ownership with a unique_ptr so don't worry about actually
193 // refcounting. We will delete when we are done.
194 rtc::RefCountReleaseStatus Release() const override {
195 return rtc::RefCountReleaseStatus::kOtherRefsRemained;
196 }
197
198 // DataChannelObserver implementation
Alex Perry5f474f22020-02-01 12:14:24 -0800199 void OnStateChange() override;
Alex Perryb3b50792020-01-18 16:13:45 -0800200 void OnMessage(const webrtc::DataBuffer &buffer) override;
201 void OnBufferedAmountChange(uint64_t sent_data_size) override {}
202
203 private:
204 ::seasocks::WebSocket *sock_;
205 ::seasocks::Server *server_;
James Kuszmaul8d928d02020-12-25 17:47:49 -0800206 // The signaling thread is the thread on which most/all of the work we do with
207 // WebRTC will happen--it is both where the handlers we register should be
208 // called and where we should be calling Send() from.
209 rtc::Thread *signaling_thread_;
Alex Perry5f474f22020-02-01 12:14:24 -0800210 const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
James Kuszmaul1ec74432020-07-30 20:26:45 -0700211 const std::vector<FlatbufferDetachedBuffer<MessageHeader>> config_headers_;
Alex Perry5f474f22020-02-01 12:14:24 -0800212 std::map<int, rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;
213
Alex Perryb3b50792020-01-18 16:13:45 -0800214 rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
215 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
James Kuszmaul71a81932020-12-15 21:08:01 -0800216
217 const EventLoop *const event_loop_;
Alex Perryb3b50792020-01-18 16:13:45 -0800218};
219
220} // namespace web_proxy
221} // namespace aos
222
223#endif // AOS_NETWORK_WEB_PROXY_H_