Provide lossless version of live plotter

Reuse the existing buffers that exist for offline log replay to maintain
only the buffers we need during realtime replay.

Change-Id: Ic2b43bb2c761ebf564d77ee3ce00cd84804a7a1a
Signed-off-by: James Kuszmaul <jabukuszmaul+collab@gmail.com>
diff --git a/aos/network/log_web_proxy_main.cc b/aos/network/log_web_proxy_main.cc
index 5c099a2..945d034 100644
--- a/aos/network/log_web_proxy_main.cc
+++ b/aos/network/log_web_proxy_main.cc
@@ -42,7 +42,8 @@
 
   event_loop->SkipTimingReport();
 
-  aos::web_proxy::WebProxy web_proxy(event_loop.get(), FLAGS_buffer_size);
+  aos::web_proxy::WebProxy web_proxy(
+      event_loop.get(), aos::web_proxy::StoreHistory::kYes, FLAGS_buffer_size);
 
   web_proxy.SetDataPath(FLAGS_data_dir.c_str());
 
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index fdd8f1e..6d4f23f 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -28,7 +28,9 @@
 namespace aos {
 namespace web_proxy {
 WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
-                                   aos::EventLoop *event_loop, int buffer_size)
+                                   aos::EventLoop *event_loop,
+                                   StoreHistory store_history,
+                                   int per_channel_buffer_size_bytes)
     : server_(server),
       config_(aos::CopyFlatBuffer(event_loop->configuration())),
       event_loop_(event_loop) {
@@ -47,7 +49,10 @@
     if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
       auto fetcher = event_loop_->MakeRawFetcher(channel);
       subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
-          std::move(fetcher), i, buffer_size));
+          std::move(fetcher), i, store_history,
+          per_channel_buffer_size_bytes < 0
+              ? -1
+              : per_channel_buffer_size_bytes / channel->max_size()));
     } else {
       subscribers_.emplace_back(nullptr);
     }
@@ -126,19 +131,24 @@
   global_epoll->DeleteFd(fd);
 }
 
-WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
-    : WebProxy(event_loop, &internal_epoll_, buffer_size) {}
+WebProxy::WebProxy(aos::EventLoop *event_loop, StoreHistory store_history,
+                   int per_channel_buffer_size_bytes)
+    : WebProxy(event_loop, &internal_epoll_, store_history,
+               per_channel_buffer_size_bytes) {}
 
-WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size)
-    : WebProxy(event_loop, event_loop->epoll(), buffer_size) {}
+WebProxy::WebProxy(aos::ShmEventLoop *event_loop, StoreHistory store_history,
+                   int per_channel_buffer_size_bytes)
+    : WebProxy(event_loop, event_loop->epoll(), store_history,
+               per_channel_buffer_size_bytes) {}
 
 WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
-                   int buffer_size)
+                   StoreHistory store_history,
+                   int per_channel_buffer_size_bytes)
     : epoll_(epoll),
       server_(std::make_shared<aos::seasocks::SeasocksLogger>(
           ::seasocks::Logger::Level::Info)),
-      websocket_handler_(
-          new WebsocketHandler(&server_, event_loop, buffer_size)) {
+      websocket_handler_(new WebsocketHandler(
+          &server_, event_loop, store_history, per_channel_buffer_size_bytes)) {
   CHECK(!global_epoll);
   global_epoll = epoll;
 
@@ -192,10 +202,13 @@
 }
 
 void Subscriber::RunIteration() {
-  if (channels_.empty() && buffer_size_ == 0) {
+  if (channels_.empty() && (buffer_size_ == 0 || !store_history_)) {
+    fetcher_->Fetch();
+    message_buffer_.clear();
     return;
   }
 
+
   while (fetcher_->FetchNext()) {
     // If we aren't building up a buffer, short-circuit the FetchNext().
     if (buffer_size_ == 0) {
@@ -271,12 +284,6 @@
   ChannelInformation info;
   info.transfer_method = transfer_method;
 
-  // If we aren't keeping a buffer and there are no existing listeners, call
-  // Fetch() to avoid falling behind on future calls to FetchNext().
-  if (channels_.empty() && buffer_size_ == 0) {
-    fetcher_->Fetch();
-  }
-
   channels_.emplace_back(std::make_pair(data_channel, info));
 
   data_channel->set_on_message(
diff --git a/aos/network/web_proxy.fbs b/aos/network/web_proxy.fbs
index 6c85acb..f1d6645 100644
--- a/aos/network/web_proxy.fbs
+++ b/aos/network/web_proxy.fbs
@@ -64,7 +64,7 @@
 
 enum TransferMethod : byte {
   SUBSAMPLE,
-  EVERYTHING_WITH_HISTORY,
+  LOSSLESS,
 }
 
 table ChannelRequest {
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index 0815ebf..baca26e 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -24,13 +24,19 @@
 class Subscriber;
 class ApplicationConnection;
 
+enum class StoreHistory {
+  kNo,
+  kYes,
+};
+
 // Basic class that handles receiving new websocket connections. Creates a new
 // Connection to manage the rest of the negotiation and data passing. When the
 // websocket closes, it deletes the Connection.
 class WebsocketHandler : public ::seasocks::WebSocket::Handler {
  public:
   WebsocketHandler(::seasocks::Server *server, aos::EventLoop *event_loop,
-                   int buffer_size);
+                   StoreHistory store_history,
+                   int per_channel_buffer_size_bytes);
   void onConnect(::seasocks::WebSocket *sock) override;
   void onData(::seasocks::WebSocket *sock, const uint8_t *data,
               size_t size) override;
@@ -50,15 +56,28 @@
 // Wrapper class that manages the seasocks server and WebsocketHandler.
 class WebProxy {
  public:
-  WebProxy(aos::EventLoop *event_loop, int buffer_size);
-  WebProxy(aos::ShmEventLoop *event_loop, int buffer_size);
+  // Constructs a WebProxy object for interacting with a webpage. store_history
+  // and per_channel_buffer_size_bytes specify how we manage delivering LOSSLESS
+  // messages to the client:
+  // * store_history specifies whether we should always buffer up data for all
+  //   channels--even for messages that are played prior to the client
+  //   connecting. This is mostly useful for log replay where the client
+  //   will typically connect after the logfile has been fully loaded/replayed.
+  // * per_channel_buffer_size_bytes is the maximum amount of data to buffer
+  //   up per channel (-1 will indicate infinite data, which is used during log
+  //   replay). This is divided by the max_size per channel to determine
+  //   how many messages to queue up.
+  WebProxy(aos::EventLoop *event_loop, StoreHistory store_history,
+           int per_channel_buffer_size_bytes);
+  WebProxy(aos::ShmEventLoop *event_loop, StoreHistory store_history,
+           int per_channel_buffer_size_bytes);
   ~WebProxy();
 
   void SetDataPath(const char *path) { server_.setStaticPath(path); }
 
  private:
   WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
-           int buffer_size);
+           StoreHistory store_history, int per_channel_buffer_size_bytes);
 
   aos::internal::EPoll internal_epoll_;
   aos::internal::EPoll *const epoll_;
@@ -96,9 +115,10 @@
 class Subscriber {
  public:
   Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index,
-             int buffer_size)
+             StoreHistory store_history, int buffer_size)
       : fetcher_(std::move(fetcher)),
         channel_index_(channel_index),
+        store_history_(store_history == StoreHistory::kYes),
         buffer_size_(buffer_size) {}
 
   void RunIteration();
@@ -133,6 +153,10 @@
 
   std::unique_ptr<RawFetcher> fetcher_;
   int channel_index_;
+  // If set, will always build up a buffer of the most recent buffer_size_
+  // messages. If store_history_ is *not* set we will only buffer up messages
+  // while there is an active listener.
+  bool store_history_;
   int buffer_size_;
   std::deque<Message> message_buffer_;
   // The ScopedDataChannel that we use for actually sending data over WebRTC
diff --git a/aos/network/web_proxy_main.cc b/aos/network/web_proxy_main.cc
index 06fe942..f3ad926 100644
--- a/aos/network/web_proxy_main.cc
+++ b/aos/network/web_proxy_main.cc
@@ -6,7 +6,9 @@
 
 DEFINE_string(config, "./config.json", "File path of aos configuration");
 DEFINE_string(data_dir, "www", "Directory to serve data files from");
-DEFINE_int32(buffer_size, 0, "-1 if infinite, in # of messages / channel.");
+DEFINE_int32(buffer_size, 1000000,
+             "-1 if infinite, in bytes / channel. If there are no active "
+             "connections, will not store anything.");
 
 int main(int argc, char **argv) {
   aos::InitGoogle(&argc, &argv);
@@ -16,7 +18,8 @@
 
   aos::ShmEventLoop event_loop(&config.message());
 
-  aos::web_proxy::WebProxy web_proxy(&event_loop, FLAGS_buffer_size);
+  aos::web_proxy::WebProxy web_proxy(
+      &event_loop, aos::web_proxy::StoreHistory::kNo, FLAGS_buffer_size);
   web_proxy.SetDataPath(FLAGS_data_dir.c_str());
 
   event_loop.Run();
diff --git a/aos/network/www/proxy.ts b/aos/network/www/proxy.ts
index 5461899..5415400 100644
--- a/aos/network/www/proxy.ts
+++ b/aos/network/www/proxy.ts
@@ -117,7 +117,7 @@
       name: string, type: string,
       handler: (data: Uint8Array, sentTime: number) => void): void {
     this.addHandlerImpl(
-        name, type, TransferMethod.EVERYTHING_WITH_HISTORY, handler);
+        name, type, TransferMethod.LOSSLESS, handler);
   }
 
   /**
@@ -137,7 +137,7 @@
     if (!this.handlerFuncs.has(channel.key())) {
       this.handlerFuncs.set(channel.key(), []);
     } else {
-      if (method == TransferMethod.EVERYTHING_WITH_HISTORY) {
+      if (method == TransferMethod.LOSSLESS) {
         console.warn(
             'Behavior of multiple reliable handlers is currently poorly ' +
             'defined and may not actually deliver all of the messages.');
diff --git a/frc971/analysis/in_process_plotter.cc b/frc971/analysis/in_process_plotter.cc
index 0eaf719..cd52b1c 100644
--- a/frc971/analysis/in_process_plotter.cc
+++ b/frc971/analysis/in_process_plotter.cc
@@ -15,7 +15,7 @@
       event_loop_factory_(&config_.message()),
       event_loop_(event_loop_factory_.MakeEventLoop("plotter")),
       plot_sender_(event_loop_->MakeSender<Plot>("/analysis")),
-      web_proxy_(event_loop_.get(), -1),
+      web_proxy_(event_loop_.get(), aos::web_proxy::StoreHistory::kYes, -1),
       builder_(plot_sender_.MakeBuilder()) {
   web_proxy_.SetDataPath(kDataPath);
   event_loop_->SkipTimingReport();
diff --git a/frc971/analysis/live_web_plotter_demo.sh b/frc971/analysis/live_web_plotter_demo.sh
index 4c4c9c4..45f3b92 100755
--- a/frc971/analysis/live_web_plotter_demo.sh
+++ b/frc971/analysis/live_web_plotter_demo.sh
@@ -1 +1 @@
-./aos/network/web_proxy_main --config=aos/network/www/test_config.json --data_dir=frc971/analysis
+./aos/network/web_proxy_main --config=aos/network/www/test_config.json --data_dir=frc971/analysis "$@"