Add start/end time options to log_web_proxy

This makes it so that if you only want to plot a portion of a very long
logfile you can do so.

Change-Id: I7b4841d63b5533dd42a6e21c452d78f44efc247d
Signed-off-by: James Kuszmaul <jabukuszmaul+collab@gmail.com>
diff --git a/aos/network/log_web_proxy_main.cc b/aos/network/log_web_proxy_main.cc
index 782c2be..1cc8e17 100644
--- a/aos/network/log_web_proxy_main.cc
+++ b/aos/network/log_web_proxy_main.cc
@@ -15,6 +15,8 @@
 DEFINE_string(data_dir, "www", "Directory to serve data files from");
 DEFINE_string(node, "", "Directory to serve data files from");
 DEFINE_int32(buffer_size, -1, "-1 if infinite, in # of messages / channel.");
+DEFINE_double(monotonic_start_time, -1.0, "Start time (sec)");
+DEFINE_double(monotonic_end_time, -1.0, "End time (sec)");
 
 int main(int argc, char **argv) {
   aos::InitGoogle(&argc, &argv);
@@ -43,6 +45,15 @@
 
   event_loop->SkipTimingReport();
 
+  if (FLAGS_monotonic_start_time > 0) {
+    event_loop->AddTimer([&reader]() { reader.event_loop_factory()->Exit(); })
+        ->Setup(aos::monotonic_clock::time_point(
+            std::chrono::duration_cast<std::chrono::nanoseconds>(
+                std::chrono::duration<double>(FLAGS_monotonic_start_time))));
+
+    reader.event_loop_factory()->Run();
+  }
+
   aos::web_proxy::WebProxy web_proxy(
       event_loop.get(), aos::web_proxy::StoreHistory::kYes, FLAGS_buffer_size);
 
@@ -51,5 +62,12 @@
   // Keep the web proxy alive past when we finish reading the logfile.
   reader.set_exit_on_finish(false);
 
+  if (FLAGS_monotonic_end_time > 0) {
+    event_loop->AddTimer([&web_proxy]() { web_proxy.StopRecording(); })
+        ->Setup(aos::monotonic_clock::time_point(
+            std::chrono::duration_cast<std::chrono::nanoseconds>(
+                std::chrono::duration<double>(FLAGS_monotonic_end_time))));
+  }
+
   reader.event_loop_factory()->Run();
 }
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index 7161ee3..d384f72 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -59,7 +59,7 @@
   }
   TimerHandler *const timer = event_loop_->AddTimer([this]() {
     for (auto &subscriber : subscribers_) {
-      if (subscriber) subscriber->RunIteration();
+      if (subscriber) subscriber->RunIteration(recording_);
     }
   });
 
@@ -201,54 +201,58 @@
   global_epoll = nullptr;
 }
 
-void Subscriber::RunIteration() {
-  if (channels_.empty() && (buffer_size_ == 0 || !store_history_)) {
-    fetcher_->Fetch();
-    message_buffer_.clear();
-    return;
-  }
+void WebProxy::StopRecording() { websocket_handler_->StopRecording(); }
 
-  while (fetcher_->FetchNext()) {
-    // If we aren't building up a buffer, short-circuit the FetchNext().
-    if (buffer_size_ == 0) {
+void Subscriber::RunIteration(bool fetch_new) {
+  if (fetch_new) {
+    if (channels_.empty() && (buffer_size_ == 0 || !store_history_)) {
       fetcher_->Fetch();
+      message_buffer_.clear();
+      return;
     }
-    Message message;
-    message.index = fetcher_->context().queue_index;
-    VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context())
-            << "packets";
-    for (int packet_index = 0;
-         packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
-      // Pack directly into the mbuffer.  This is admittedly a bit painful.
-      const size_t packet_size =
-          PackedMessageSize(fetcher_->context(), packet_index);
-      struct mbuf *mbuffer = mbuf_alloc(packet_size);
 
-      {
-        // Wrap a pre-allocated builder around the mbuffer.
-        PreallocatedAllocator allocator(mbuf_buf(mbuffer), packet_size);
-        flatbuffers::FlatBufferBuilder fbb(packet_size, &allocator);
-        flatbuffers::Offset<MessageHeader> message_offset = PackMessage(
-            &fbb, fetcher_->context(), channel_index_, packet_index);
-        fbb.Finish(message_offset);
-
-        // Now, the flatbuffer is built from the back to the front.  So any
-        // extra memory will be at the front.  Setup the end and start pointers
-        // on the mbuf.
-        mbuf_set_end(mbuffer, packet_size);
-        mbuf_set_pos(mbuffer, packet_size - fbb.GetSize());
+    while (fetcher_->FetchNext()) {
+      // If we aren't building up a buffer, short-circuit the FetchNext().
+      if (buffer_size_ == 0) {
+        fetcher_->Fetch();
       }
+      Message message;
+      message.index = fetcher_->context().queue_index;
+      VLOG(2) << "Packing a message with "
+              << GetPacketCount(fetcher_->context()) << "packets";
+      for (int packet_index = 0;
+           packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
+        // Pack directly into the mbuffer.  This is admittedly a bit painful.
+        const size_t packet_size =
+            PackedMessageSize(fetcher_->context(), packet_index);
+        struct mbuf *mbuffer = mbuf_alloc(packet_size);
 
-      message.data.emplace_back(
-          std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
-    }
-    message_buffer_.push_back(std::move(message));
-    // If we aren't keeping a buffer, then we should only do one iteration of
-    // the while loop--otherwise, if additional messages arrive between the
-    // first FetchNext() and the second iteration then we can end up behaving
-    // poorly (since we do a Fetch() when buffer_size_ == 0).
-    if (buffer_size_ == 0) {
-      break;
+        {
+          // Wrap a pre-allocated builder around the mbuffer.
+          PreallocatedAllocator allocator(mbuf_buf(mbuffer), packet_size);
+          flatbuffers::FlatBufferBuilder fbb(packet_size, &allocator);
+          flatbuffers::Offset<MessageHeader> message_offset = PackMessage(
+              &fbb, fetcher_->context(), channel_index_, packet_index);
+          fbb.Finish(message_offset);
+
+          // Now, the flatbuffer is built from the back to the front.  So any
+          // extra memory will be at the front.  Setup the end and start
+          // pointers on the mbuf.
+          mbuf_set_end(mbuffer, packet_size);
+          mbuf_set_pos(mbuffer, packet_size - fbb.GetSize());
+        }
+
+        message.data.emplace_back(
+            std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
+      }
+      message_buffer_.push_back(std::move(message));
+      // If we aren't keeping a buffer, then we should only do one iteration of
+      // the while loop--otherwise, if additional messages arrive between the
+      // first FetchNext() and the second iteration then we can end up behaving
+      // poorly (since we do a Fetch() when buffer_size_ == 0).
+      if (buffer_size_ == 0) {
+        break;
+      }
     }
   }
   for (auto &conn : channels_) {
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index baca26e..0c1d1dc 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -41,6 +41,10 @@
   void onData(::seasocks::WebSocket *sock, const uint8_t *data,
               size_t size) override;
   void onDisconnect(::seasocks::WebSocket *sock) override;
+  // Stops recording data, even if the event loop continues running. This allows
+  // us to continue serving the webserver + websocket server, without having to
+  // load more actual data.
+  void StopRecording() { recording_ = false; }
 
  private:
   ::seasocks::Server *server_;
@@ -51,6 +55,8 @@
       connections_;
 
   EventLoop *const event_loop_;
+  // Whether to pay attention to new messages.
+  bool recording_ = true;
 };
 
 // Wrapper class that manages the seasocks server and WebsocketHandler.
@@ -75,6 +81,9 @@
 
   void SetDataPath(const char *path) { server_.setStaticPath(path); }
 
+  // Stops recording data. Useful for setting end times in log replay.
+  void StopRecording();
+
  private:
   WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
            StoreHistory store_history, int per_channel_buffer_size_bytes);
@@ -121,7 +130,12 @@
         store_history_(store_history == StoreHistory::kYes),
         buffer_size_(buffer_size) {}
 
-  void RunIteration();
+  // Runs a single iteration of going through and fetching new data as needed
+  // and servicing any WebRTC channels that are requesting messages.
+  // fetch_new specifies whether we should actually attempt to retrieve new data
+  // on the channel--if false, will only worry about sending existing data to
+  // any clients.
+  void RunIteration(bool fetch_new);
 
   void AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
                    TransferMethod transfer_method);