Add support for Foxglove websocket protocol

Imports the foxglove websocket protocol library so
that I don't have to re-implement it--this also drags
in the websocketpp library, which is a bit awkward since
we also already have seasocks imported.

Change-Id: I1973eded827dda9d97a3dba271f8604705be3d1e
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/foxglove_websocket_lib.cc b/aos/util/foxglove_websocket_lib.cc
new file mode 100644
index 0000000..b4d262a
--- /dev/null
+++ b/aos/util/foxglove_websocket_lib.cc
@@ -0,0 +1,126 @@
+#include "aos/util/foxglove_websocket_lib.h"
+
+#include "aos/util/mcap_logger.h"
+#include "gflags/gflags.h"
+
+DEFINE_uint32(sorting_buffer_ms, 100,
+              "Amount of time to buffer messages to sort them before sending "
+              "them to foxglove.");
+DEFINE_bool(fetch_pinned_channels, false,
+            "Set this to allow foxglove_websocket to make fetchers on channels "
+            "with a read_method of PIN (see aos/configuration.fbs; PIN is an "
+            "enum value). By default, we don't make fetchers for "
+            "these channels since using up a fetcher slot on PIN'd channels "
+            "can have side-effects.");
+
+namespace {
+// Period at which to poll the fetchers for all the channels.
+constexpr std::chrono::milliseconds kPollPeriod{50};
+}  // namespace
+
+namespace aos {
+FoxgloveWebsocketServer::FoxgloveWebsocketServer(aos::EventLoop *event_loop,
+                                                 uint32_t port)
+    : event_loop_(event_loop), server_(port, "aos_foxglove") {
+  for (const aos::Channel *channel :
+       *event_loop_->configuration()->channels()) {
+    const bool is_pinned = (channel->read_method() == ReadMethod::PIN);
+    if (aos::configuration::ChannelIsReadableOnNode(channel,
+                                                    event_loop_->node()) &&
+        (!is_pinned || FLAGS_fetch_pinned_channels)) {
+      const ChannelId id =
+          server_.addChannel(foxglove::websocket::ChannelWithoutId{
+              .topic = channel->name()->str() + " " + channel->type()->str(),
+              .encoding = "json",
+              .schemaName = channel->type()->str(),
+              .schema = JsonSchemaForFlatbuffer({channel->schema()}).dump()});
+      CHECK(fetchers_.count(id) == 0);
+      fetchers_[id] =
+          FetcherState{.fetcher = event_loop_->MakeRawFetcher(channel)};
+    }
+  }
+
+  server_.setSubscribeHandler([this](ChannelId channel) {
+    if (fetchers_.count(channel) == 0) {
+      return;
+    }
+    if (active_channels_.count(channel) == 0) {
+      // Catch up to the latest message on the requested channel, then subscribe
+      // to it.
+      fetchers_[channel].fetcher->Fetch();
+      active_channels_.insert(channel);
+    }
+  });
+  server_.setUnsubscribeHandler(
+      [this](ChannelId channel) { active_channels_.erase(channel); });
+  aos::TimerHandler *timer = event_loop_->AddTimer([this]() {
+    // In order to run the websocket server, we just let it spin every cycle for
+    // a bit. This isn't great for integration, but lets us stay in control and
+    // until we either have (a) a chance to locate a file descriptor to hand
+    // epoll; or (b) rewrite the foxglove websocket server to use seasocks
+    // (which we know how to integrate), we'll just function with this.
+    // TODO(james): Tighter integration into our event loop structure.
+    server_.run_for(kPollPeriod / 2);
+
+    // Unfortunately, we can't just push out all the messages as they come in.
+    // Foxglove expects that the timestamps associated with each message to be
+    // monotonic, and if you send things out of order then it will clear the
+    // state of the visualization entirely, which makes viewing plots
+    // impossible. If the user only accesses a single channel, that is fine, but
+    // as soon as they try to use multiple channels, you encounter interleaving.
+    // To resolve this, we specify a buffer (--sorting_buffer_ms), and only send
+    // out messages older than that time, sorting everything before we send it
+    // out.
+    const aos::monotonic_clock::time_point sort_until =
+        event_loop_->monotonic_now() -
+        std::chrono::milliseconds(FLAGS_sorting_buffer_ms);
+
+    // Pair of <send_time, channel id>.
+    absl::btree_set<std::pair<aos::monotonic_clock::time_point, ChannelId>>
+        fetcher_times;
+
+    // Go through and seed fetcher_times with the first message on each channel.
+    for (const ChannelId channel : active_channels_) {
+      FetcherState *fetcher = &fetchers_[channel];
+      if (fetcher->sent_current_message) {
+        if (fetcher->fetcher->FetchNext()) {
+          fetcher->sent_current_message = false;
+        }
+      }
+      if (!fetcher->sent_current_message) {
+        const aos::monotonic_clock::time_point send_time =
+            fetcher->fetcher->context().monotonic_event_time;
+        if (send_time <= sort_until) {
+          fetcher_times.insert(std::make_pair(send_time, channel));
+        }
+      }
+    }
+
+    // Send the oldest message continually until we run out of messages to send.
+    while (!fetcher_times.empty()) {
+      const ChannelId channel = fetcher_times.begin()->second;
+      FetcherState *fetcher = &fetchers_[channel];
+      server_.sendMessage(
+          channel, fetcher_times.begin()->first.time_since_epoch().count(),
+          aos::FlatbufferToJson(
+              fetcher->fetcher->channel()->schema(),
+              static_cast<const uint8_t *>(fetcher->fetcher->context().data)));
+      fetcher_times.erase(fetcher_times.begin());
+      fetcher->sent_current_message = true;
+      if (fetcher->fetcher->FetchNext()) {
+        fetcher->sent_current_message = false;
+        const aos::monotonic_clock::time_point send_time =
+            fetcher->fetcher->context().monotonic_event_time;
+        if (send_time <= sort_until) {
+          fetcher_times.insert(std::make_pair(send_time, channel));
+        }
+      }
+    }
+  });
+
+  event_loop_->OnRun([timer, this]() {
+    timer->Setup(event_loop_->monotonic_now(), kPollPeriod);
+  });
+}
+FoxgloveWebsocketServer::~FoxgloveWebsocketServer() { server_.stop(); }
+}  // namespace aos