blob: e7bf4e072e0fbe0044dc2a8e8ad098534fa45f6f [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#ifndef AOS_NETWORK_WEB_PROXY_H_
2#define AOS_NETWORK_WEB_PROXY_H_
3#include <map>
4#include <set>
James Kuszmaul7ad91522020-09-01 19:15:35 -07005#include "aos/events/event_loop.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08006#include "aos/events/shm_event_loop.h"
James Kuszmaul8d928d02020-12-25 17:47:49 -08007#include "aos/mutex/mutex.h"
Alex Perry5f474f22020-02-01 12:14:24 -08008#include "aos/network/connect_generated.h"
9#include "aos/network/web_proxy_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -080010#include "aos/seasocks/seasocks_logger.h"
11#include "flatbuffers/flatbuffers.h"
12#include "seasocks/Server.h"
13#include "seasocks/StringUtil.h"
14#include "seasocks/WebSocket.h"
15
16#include "api/peer_connection_interface.h"
17
18namespace aos {
19namespace web_proxy {
20
21class Connection;
Alex Perry5f474f22020-02-01 12:14:24 -080022class Subscriber;
Alex Perryb3b50792020-01-18 16:13:45 -080023
24// Basic class that handles receiving new websocket connections. Creates a new
25// Connection to manage the rest of the negotiation and data passing. When the
26// websocket closes, it deletes the Connection.
27class WebsocketHandler : public ::seasocks::WebSocket::Handler {
28 public:
James Kuszmaul71a81932020-12-15 21:08:01 -080029 WebsocketHandler(::seasocks::Server *server, aos::EventLoop *event_loop,
30 int buffer_size);
Alex Perryb3b50792020-01-18 16:13:45 -080031 void onConnect(::seasocks::WebSocket *sock) override;
32 void onData(::seasocks::WebSocket *sock, const uint8_t *data,
33 size_t size) override;
34 void onDisconnect(::seasocks::WebSocket *sock) override;
35
36 private:
37 std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
38 ::seasocks::Server *server_;
James Kuszmaul7ad91522020-09-01 19:15:35 -070039 std::vector<std::unique_ptr<Subscriber>> subscribers_;
40 const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
James Kuszmaul71a81932020-12-15 21:08:01 -080041
42 const EventLoop *const event_loop_;
43};
44
45// Wrapper class that manages the seasocks server and WebsocketHandler.
46class WebProxy {
47 public:
48 WebProxy(aos::EventLoop *event_loop, int buffer_size);
49 WebProxy(aos::ShmEventLoop *event_loop, int buffer_size);
50 ~WebProxy();
51
52 void SetDataPath(const char *path) { server_.setStaticPath(path); }
53
54 private:
55 WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
56 int buffer_size);
57
58 aos::internal::EPoll internal_epoll_;
59 aos::internal::EPoll *const epoll_;
60 ::seasocks::Server server_;
61 std::shared_ptr<WebsocketHandler> websocket_handler_;
Alex Perryb3b50792020-01-18 16:13:45 -080062};
63
64// Seasocks requires that sends happen on the correct thread. This class takes a
65// detached buffer to send on a specific websocket connection and sends it when
66// seasocks is ready.
67class UpdateData : public ::seasocks::Server::Runnable {
68 public:
69 UpdateData(::seasocks::WebSocket *websocket,
Alex Perry5f474f22020-02-01 12:14:24 -080070 flatbuffers::DetachedBuffer &&buffer)
Alex Perryb3b50792020-01-18 16:13:45 -080071 : sock_(websocket), buffer_(std::move(buffer)) {}
72 ~UpdateData() override = default;
73 UpdateData(const UpdateData &) = delete;
74 UpdateData &operator=(const UpdateData &) = delete;
75
76 void run() override { sock_->send(buffer_.data(), buffer_.size()); }
77
78 private:
79 ::seasocks::WebSocket *sock_;
Alex Perry5f474f22020-02-01 12:14:24 -080080 const flatbuffers::DetachedBuffer buffer_;
81};
82
83// Represents a fetcher and all the Connections that care about it.
84// Handles building the message and telling each connection to send it.
85// indexed by location of the channel it handles in the config.
James Kuszmaul71a81932020-12-15 21:08:01 -080086// Subscriber also uses an internal buffer to store past messages. This is
87// primarily meant for use in offline log replay/simulation where we want to be
88// able to store infinite buffers. In the future, we will probably want to be
89// able to specify *which* channels to store buffers for so that we aren't just
90// loading the entire logfile into memory.
Alex Perry5f474f22020-02-01 12:14:24 -080091class Subscriber {
92 public:
James Kuszmaul71a81932020-12-15 21:08:01 -080093 Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index,
94 int buffer_size)
Alex Perry5f474f22020-02-01 12:14:24 -080095 : fbb_(1024),
96 fetcher_(std::move(fetcher)),
James Kuszmaul71a81932020-12-15 21:08:01 -080097 channel_index_(channel_index),
98 buffer_size_(buffer_size) {}
Alex Perry5f474f22020-02-01 12:14:24 -080099
100 void RunIteration();
101
James Kuszmaul71a81932020-12-15 21:08:01 -0800102 void AddListener(rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
James Kuszmaul8d928d02020-12-25 17:47:49 -0800103 TransferMethod transfer_method,
104 rtc::Thread *signaling_thread);
Alex Perry5f474f22020-02-01 12:14:24 -0800105
106 void RemoveListener(
James Kuszmaul71a81932020-12-15 21:08:01 -0800107 rtc::scoped_refptr<webrtc::DataChannelInterface> channel);
Alex Perry5f474f22020-02-01 12:14:24 -0800108
109 // Check if the Channel passed matches the channel this fetchs.
110 bool Compare(const Channel *channel) const;
111
112 int index() const { return channel_index_; }
113
114 private:
James Kuszmaul71a81932020-12-15 21:08:01 -0800115 struct ChannelInformation {
116 TransferMethod transfer_method;
117 uint32_t current_queue_index = 0;
118 size_t next_packet_number = 0;
James Kuszmaul8d928d02020-12-25 17:47:49 -0800119 // Thread to use for making calls to the DataChannelInterface.
120 rtc::Thread *signaling_thread;
James Kuszmaul71a81932020-12-15 21:08:01 -0800121 };
122 struct Message {
123 uint32_t index = 0xffffffff;
124 std::vector<webrtc::DataBuffer> data;
125 };
126
127 const webrtc::DataBuffer *NextBuffer(ChannelInformation *channel);
128 void SkipToLastMessage(ChannelInformation *channel);
129
Alex Perry5f474f22020-02-01 12:14:24 -0800130 flatbuffers::FlatBufferBuilder fbb_;
131 std::unique_ptr<RawFetcher> fetcher_;
132 int channel_index_;
James Kuszmaul71a81932020-12-15 21:08:01 -0800133 int buffer_size_;
134 std::deque<Message> message_buffer_;
135 std::map<rtc::scoped_refptr<webrtc::DataChannelInterface>, ChannelInformation>
136 channels_;
James Kuszmaul8d928d02020-12-25 17:47:49 -0800137 // In order to enable the Connection class to add/remove listeners
138 // asyncrhonously, queue up all the newly added listeners in pending_*
139 // members. Access to these members is controlled by mutex_.
140 std::map<rtc::scoped_refptr<webrtc::DataChannelInterface>, ChannelInformation>
141 pending_channels_;
142 std::vector<rtc::scoped_refptr<webrtc::DataChannelInterface>>
143 pending_removal_;
144
145 aos::Mutex mutex_;
Alex Perryb3b50792020-01-18 16:13:45 -0800146};
147
148// Represents a single connection to a browser for the entire lifetime of the
149// connection.
150class Connection : public webrtc::PeerConnectionObserver,
151 public webrtc::CreateSessionDescriptionObserver,
152 public webrtc::DataChannelObserver {
153 public:
Alex Perry5f474f22020-02-01 12:14:24 -0800154 Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server,
155 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800156 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
157 const EventLoop *event_loop);
Alex Perry5f474f22020-02-01 12:14:24 -0800158
159 ~Connection() {
160 // DataChannel may call OnStateChange after this is destroyed, so make sure
161 // it doesn't.
James Kuszmaul1ec74432020-07-30 20:26:45 -0700162 if (data_channel_) {
163 data_channel_->UnregisterObserver();
164 }
Alex Perry5f474f22020-02-01 12:14:24 -0800165 }
Alex Perryb3b50792020-01-18 16:13:45 -0800166
167 void HandleWebSocketData(const uint8_t *data, size_t size);
168
Alex Perry5f474f22020-02-01 12:14:24 -0800169 void Send(const flatbuffers::DetachedBuffer &buffer) const;
170
Alex Perryb3b50792020-01-18 16:13:45 -0800171 // PeerConnectionObserver implementation
172 void OnSignalingChange(
173 webrtc::PeerConnectionInterface::SignalingState) override {}
174 void OnAddStream(rtc::scoped_refptr<webrtc::MediaStreamInterface>) override {}
175 void OnRemoveStream(
176 rtc::scoped_refptr<webrtc::MediaStreamInterface>) override {}
177 void OnDataChannel(
178 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) override;
179 void OnRenegotiationNeeded() override {}
180 void OnIceConnectionChange(
James Kuszmaul54424d02020-12-26 18:09:20 -0800181 webrtc::PeerConnectionInterface::IceConnectionState) override {}
Alex Perryb3b50792020-01-18 16:13:45 -0800182 void OnIceGatheringChange(
183 webrtc::PeerConnectionInterface::IceGatheringState) override {}
184 void OnIceCandidate(const webrtc::IceCandidateInterface *candidate) override;
James Kuszmaul54424d02020-12-26 18:09:20 -0800185 void OnIceCandidateError(const std::string &host_candidate,
186 const std::string &url, int error_code,
187 const std::string &error_text) override {
188 LOG(ERROR) << "ICE Candidate Error on " << host_candidate << " for " << url
189 << " with error " << error_code << ": " << error_text;
190 }
Alex Perryb3b50792020-01-18 16:13:45 -0800191 void OnIceConnectionReceivingChange(bool) override {}
192
193 // CreateSessionDescriptionObserver implementation
194 void OnSuccess(webrtc::SessionDescriptionInterface *desc) override;
James Kuszmaul48671362020-12-24 13:54:16 -0800195 void OnFailure(webrtc::RTCError /*error*/) override {}
Alex Perryb3b50792020-01-18 16:13:45 -0800196 // CreateSessionDescriptionObserver is a refcounted object
197 void AddRef() const override {}
198 // We handle ownership with a unique_ptr so don't worry about actually
199 // refcounting. We will delete when we are done.
200 rtc::RefCountReleaseStatus Release() const override {
201 return rtc::RefCountReleaseStatus::kOtherRefsRemained;
202 }
203
204 // DataChannelObserver implementation
Alex Perry5f474f22020-02-01 12:14:24 -0800205 void OnStateChange() override;
Alex Perryb3b50792020-01-18 16:13:45 -0800206 void OnMessage(const webrtc::DataBuffer &buffer) override;
James Kuszmaul48671362020-12-24 13:54:16 -0800207 void OnBufferedAmountChange(uint64_t /*sent_data_size*/) override {}
Alex Perryb3b50792020-01-18 16:13:45 -0800208
209 private:
210 ::seasocks::WebSocket *sock_;
211 ::seasocks::Server *server_;
James Kuszmaul8d928d02020-12-25 17:47:49 -0800212 // The signaling thread is the thread on which most/all of the work we do with
213 // WebRTC will happen--it is both where the handlers we register should be
214 // called and where we should be calling Send() from.
215 rtc::Thread *signaling_thread_;
Alex Perry5f474f22020-02-01 12:14:24 -0800216 const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
James Kuszmaul1ec74432020-07-30 20:26:45 -0700217 const std::vector<FlatbufferDetachedBuffer<MessageHeader>> config_headers_;
Alex Perry5f474f22020-02-01 12:14:24 -0800218 std::map<int, rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;
219
Alex Perryb3b50792020-01-18 16:13:45 -0800220 rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
221 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
James Kuszmaul71a81932020-12-15 21:08:01 -0800222
223 const EventLoop *const event_loop_;
Alex Perryb3b50792020-01-18 16:13:45 -0800224};
225
226} // namespace web_proxy
227} // namespace aos
228
229#endif // AOS_NETWORK_WEB_PROXY_H_