Add queue buffers & simulation support to web proxy

This makes a couple of major changes:
-Directly uses EPoll class for managing Seasocks events.
-Adds buffers of queues to web proxy Subscribers so that we can
 transfering data losslessly in log replay.
-Modifies the flatbuffer used for the RTC communications so that
 the webpage can specify whether it wants every message or subsampled
 messages.
-Adds an option to LogReader to let us run past the end of the logfile.

Note that these changes do mean that, for log replay, the web proxy will
load the *entire* logfile into memory. Future changes can optimize this
to, e.g., only load the required channels into memory.

Change-Id: I74e7608c30baa8b36e05c4ab50e12a54bf75aa4c
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index 749e8fc..85cfd6f 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -3,6 +3,7 @@
 #include <map>
 #include <set>
 #include "aos/events/event_loop.h"
+#include "aos/events/shm_event_loop.h"
 #include "aos/network/connect_generated.h"
 #include "aos/network/web_proxy_generated.h"
 #include "aos/seasocks/seasocks_logger.h"
@@ -24,7 +25,8 @@
 // websocket closes, it deletes the Connection.
 class WebsocketHandler : public ::seasocks::WebSocket::Handler {
  public:
-  WebsocketHandler(::seasocks::Server *server, aos::EventLoop *event_loop);
+  WebsocketHandler(::seasocks::Server *server, aos::EventLoop *event_loop,
+                   int buffer_size);
   void onConnect(::seasocks::WebSocket *sock) override;
   void onData(::seasocks::WebSocket *sock, const uint8_t *data,
               size_t size) override;
@@ -35,6 +37,27 @@
   ::seasocks::Server *server_;
   std::vector<std::unique_ptr<Subscriber>> subscribers_;
   const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+
+  const EventLoop *const event_loop_;
+};
+
+// Wrapper class that manages the seasocks server and WebsocketHandler.
+class WebProxy {
+ public:
+  WebProxy(aos::EventLoop *event_loop, int buffer_size);
+  WebProxy(aos::ShmEventLoop *event_loop, int buffer_size);
+  ~WebProxy();
+
+  void SetDataPath(const char *path) { server_.setStaticPath(path); }
+
+ private:
+  WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
+           int buffer_size);
+
+  aos::internal::EPoll internal_epoll_;
+  aos::internal::EPoll *const epoll_;
+  ::seasocks::Server server_;
+  std::shared_ptr<WebsocketHandler> websocket_handler_;
 };
 
 // Seasocks requires that sends happen on the correct thread. This class takes a
@@ -59,25 +82,27 @@
 // Represents a fetcher and all the Connections that care about it.
 // Handles building the message and telling each connection to send it.
 // indexed by location of the channel it handles in the config.
-// TODO(james): Make it so that Subscriber can optionally maintain an infinite
-// backlog of messages.
+// Subscriber also uses an internal buffer to store past messages. This is
+// primarily meant for use in offline log replay/simulation where we want to be
+// able to store infinite buffers. In the future, we will probably want to be
+// able to specify *which* channels to store buffers for so that we aren't just
+// loading the entire logfile into memory.
 class Subscriber {
  public:
-  Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index)
+  Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index,
+             int buffer_size)
       : fbb_(1024),
         fetcher_(std::move(fetcher)),
-        channel_index_(channel_index) {}
+        channel_index_(channel_index),
+        buffer_size_(buffer_size) {}
 
   void RunIteration();
 
-  void AddListener(rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
-    channels_.insert(channel);
-  }
+  void AddListener(rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
+                   TransferMethod transfer_method);
 
   void RemoveListener(
-      rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
-    channels_.erase(channel);
-  }
+      rtc::scoped_refptr<webrtc::DataChannelInterface> channel);
 
   // Check if the Channel passed matches the channel this fetchs.
   bool Compare(const Channel *channel) const;
@@ -85,10 +110,26 @@
   int index() const { return channel_index_; }
 
  private:
+  struct ChannelInformation {
+    TransferMethod transfer_method;
+    uint32_t current_queue_index = 0;
+    size_t next_packet_number = 0;
+  };
+  struct Message {
+    uint32_t index = 0xffffffff;
+    std::vector<webrtc::DataBuffer> data;
+  };
+
+  const webrtc::DataBuffer *NextBuffer(ChannelInformation *channel);
+  void SkipToLastMessage(ChannelInformation *channel);
+
   flatbuffers::FlatBufferBuilder fbb_;
   std::unique_ptr<RawFetcher> fetcher_;
   int channel_index_;
-  std::set<rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;
+  int buffer_size_;
+  std::deque<Message> message_buffer_;
+  std::map<rtc::scoped_refptr<webrtc::DataChannelInterface>, ChannelInformation>
+      channels_;
 };
 
 // Represents a single connection to a browser for the entire lifetime of the
@@ -99,7 +140,8 @@
  public:
   Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server,
              const std::vector<std::unique_ptr<Subscriber>> &subscribers,
-             const aos::FlatbufferDetachedBuffer<aos::Configuration> &config);
+             const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
+             const EventLoop *event_loop);
 
   ~Connection() {
     // DataChannel may call OnStateChange after this is destroyed, so make sure
@@ -154,6 +196,8 @@
 
   rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
   rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
+
+  const EventLoop *const event_loop_;
 };
 
 }  // namespace web_proxy