Add support for Foxglove websocket protocol

Imports the foxglove websocket protocol library so
that I don't have to re-implement it--this also drags
in the websocketpp library, which is a bit awkward since
we also already have seasocks imported.

Change-Id: I1973eded827dda9d97a3dba271f8604705be3d1e
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/WORKSPACE b/WORKSPACE
index dbde661..d402c98 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -1328,3 +1328,36 @@
 load("//third_party/cargo:crates.bzl", "raze_fetch_remote_crates")
 
 raze_fetch_remote_crates()
+
+http_archive(
+    name = "com_github_zaphoyd_websocketpp",
+    build_file = "//third_party/websocketpp:websocketpp.BUILD",
+    sha256 = "6ce889d85ecdc2d8fa07408d6787e7352510750daa66b5ad44aacb47bea76755",
+    strip_prefix = "websocketpp-0.8.2",
+    url = "https://github.com/zaphoyd/websocketpp/archive/refs/tags/0.8.2.tar.gz",
+)
+
+http_archive(
+    name = "com_github_foxglove_ws-protocol",
+    build_file = "//third_party/foxglove_ws_protocol:foxglove_ws_protocol.BUILD",
+    patch_args = ["-p1"],
+    patches = ["//third_party/foxglove_ws_protocol:foxglove_ws_protocol.patch"],
+    sha256 = "3256f09a67419f6556778c443d332f1a4bf53ba0e7a464179bf838abffa366ab",
+    strip_prefix = "ws-protocol-releases-typescript-ws-protocol-examples-v0.0.6",
+    url = "https://github.com/foxglove/ws-protocol/archive/refs/tags/releases/typescript/ws-protocol-examples/v0.0.6.tar.gz",
+)
+
+http_archive(
+    name = "asio",
+    build_file_content = """
+cc_library(
+    name = "asio",
+    hdrs = glob(["include/asio/**/*.hpp", "include/asio/**/*.ipp", "include/asio.hpp"]),
+    visibility = ["//visibility:public"],
+    defines = ["ASIO_STANDALONE"],
+    includes = ["include/"],
+)""",
+    sha256 = "8976812c24a118600f6fcf071a20606630a69afe4c0abee3b0dea528e682c585",
+    strip_prefix = "asio-1.24.0",
+    url = "https://downloads.sourceforge.net/project/asio/asio/1.24.0%2520%2528Stable%2529/asio-1.24.0.tar.bz2",
+)
diff --git a/aos/util/BUILD b/aos/util/BUILD
index f8eb09f..bd1dbab 100644
--- a/aos/util/BUILD
+++ b/aos/util/BUILD
@@ -437,6 +437,17 @@
     ],
 )
 
+cc_library(
+    name = "foxglove_websocket_lib",
+    srcs = ["foxglove_websocket_lib.cc"],
+    hdrs = ["foxglove_websocket_lib.h"],
+    deps = [
+        ":mcap_logger",
+        "//aos/events:event_loop",
+        "@com_github_foxglove_ws-protocol",
+    ],
+)
+
 cc_binary(
     name = "config_validator",
     testonly = True,
@@ -454,3 +465,15 @@
         "@com_google_googletest//:gtest",
     ],
 )
+
+cc_binary(
+    name = "foxglove_websocket",
+    srcs = ["foxglove_websocket.cc"],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":foxglove_websocket_lib",
+        "//aos:init",
+        "//aos/events:shm_event_loop",
+        "@com_github_gflags_gflags//:gflags",
+    ],
+)
diff --git a/aos/util/foxglove_websocket.cc b/aos/util/foxglove_websocket.cc
new file mode 100644
index 0000000..26092bc
--- /dev/null
+++ b/aos/util/foxglove_websocket.cc
@@ -0,0 +1,46 @@
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+#include "aos/util/foxglove_websocket_lib.h"
+#include "gflags/gflags.h"
+
+DEFINE_string(config, "/app/aos_config.json", "Path to the config.");
+DEFINE_uint32(port, 8765, "Port to use for foxglove websocket server.");
+
+int main(int argc, char *argv[]) {
+  gflags::SetUsageMessage(
+      "Runs a websocket server that a foxglove instance can connect to in "
+      "order to view live data on a device.\n\n"
+      "Typical Usage: foxglove_websocket [--port 8765]\n"
+      "If the default port is not exposed directly, you can port-forward with "
+      "SSH by doing\n"
+      "$ ssh -L 8765:localhost:8765 ssh_target\n\n"
+      "When accessing this in foxglove:\n"
+      "1) Open a data source (this window may be open by default).\n"
+      "2) Select \"Open Connection\"\n"
+      "3) Select \"Foxglove WebSocket\" (do NOT select the rosbridge option)\n"
+      "4) Fill out the URL for the machine. If port forwarding, the default\n"
+      "   ws://localhost:8765 should work.\n\n"
+      "Note that this does not start up a foxglove instance itself. You must "
+      "either have one locally on your laptop, or go to "
+      "https://studio.foxglove.dev, or use another application to serve the "
+      "foxglove HTML pages.\n"
+      "If you want to use the studio.foxglove.dev page to view data (which "
+      "won't send any of your data to foxglove.dev--it's just needed to load "
+      "the HTML files), you can also go directly to:\n"
+      "https://studio.foxglove.dev/?ds=foxglove-websocket&ds.url=ws://"
+      "localhost:8765\n"
+      "where localhost:8765 must be updated if you aren't port-forwarding "
+      "and/or are using a different port number. Similarly, if you are serving "
+      "the static foxglove files locally, you can update the "
+      "studio.foxglove.dev to point at your local webserver.\n");
+  aos::InitGoogle(&argc, &argv);
+
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(FLAGS_config);
+
+  aos::ShmEventLoop event_loop(&config.message());
+
+  aos::FoxgloveWebsocketServer server(&event_loop, FLAGS_port);
+
+  event_loop.Run();
+}
diff --git a/aos/util/foxglove_websocket_lib.cc b/aos/util/foxglove_websocket_lib.cc
new file mode 100644
index 0000000..b4d262a
--- /dev/null
+++ b/aos/util/foxglove_websocket_lib.cc
@@ -0,0 +1,126 @@
+#include "aos/util/foxglove_websocket_lib.h"
+
+#include "aos/util/mcap_logger.h"
+#include "gflags/gflags.h"
+
+DEFINE_uint32(sorting_buffer_ms, 100,
+              "Amount of time to buffer messages to sort them before sending "
+              "them to foxglove.");
+DEFINE_bool(fetch_pinned_channels, false,
+            "Set this to allow foxglove_websocket to make fetchers on channels "
+            "with a read_method of PIN (see aos/configuration.fbs; PIN is an "
+            "enum value). By default, we don't make fetchers for "
+            "these channels since using up a fetcher slot on PIN'd channels "
+            "can have side-effects.");
+
+namespace {
+// Period at which to poll the fetchers for all the channels.
+constexpr std::chrono::milliseconds kPollPeriod{50};
+}  // namespace
+
+namespace aos {
+FoxgloveWebsocketServer::FoxgloveWebsocketServer(aos::EventLoop *event_loop,
+                                                 uint32_t port)
+    : event_loop_(event_loop), server_(port, "aos_foxglove") {
+  for (const aos::Channel *channel :
+       *event_loop_->configuration()->channels()) {
+    const bool is_pinned = (channel->read_method() == ReadMethod::PIN);
+    if (aos::configuration::ChannelIsReadableOnNode(channel,
+                                                    event_loop_->node()) &&
+        (!is_pinned || FLAGS_fetch_pinned_channels)) {
+      const ChannelId id =
+          server_.addChannel(foxglove::websocket::ChannelWithoutId{
+              .topic = channel->name()->str() + " " + channel->type()->str(),
+              .encoding = "json",
+              .schemaName = channel->type()->str(),
+              .schema = JsonSchemaForFlatbuffer({channel->schema()}).dump()});
+      CHECK(fetchers_.count(id) == 0);
+      fetchers_[id] =
+          FetcherState{.fetcher = event_loop_->MakeRawFetcher(channel)};
+    }
+  }
+
+  server_.setSubscribeHandler([this](ChannelId channel) {
+    if (fetchers_.count(channel) == 0) {
+      return;
+    }
+    if (active_channels_.count(channel) == 0) {
+      // Catch up to the latest message on the requested channel, then subscribe
+      // to it.
+      fetchers_[channel].fetcher->Fetch();
+      active_channels_.insert(channel);
+    }
+  });
+  server_.setUnsubscribeHandler(
+      [this](ChannelId channel) { active_channels_.erase(channel); });
+  aos::TimerHandler *timer = event_loop_->AddTimer([this]() {
+    // In order to run the websocket server, we just let it spin every cycle for
+    // a bit. This isn't great for integration, but lets us stay in control and
+    // until we either have (a) a chance to locate a file descriptor to hand
+    // epoll; or (b) rewrite the foxglove websocket server to use seasocks
+    // (which we know how to integrate), we'll just function with this.
+    // TODO(james): Tighter integration into our event loop structure.
+    server_.run_for(kPollPeriod / 2);
+
+    // Unfortunately, we can't just push out all the messages as they come in.
+    // Foxglove expects that the timestamps associated with each message to be
+    // monotonic, and if you send things out of order then it will clear the
+    // state of the visualization entirely, which makes viewing plots
+    // impossible. If the user only accesses a single channel, that is fine, but
+    // as soon as they try to use multiple channels, you encounter interleaving.
+    // To resolve this, we specify a buffer (--sorting_buffer_ms), and only send
+    // out messages older than that time, sorting everything before we send it
+    // out.
+    const aos::monotonic_clock::time_point sort_until =
+        event_loop_->monotonic_now() -
+        std::chrono::milliseconds(FLAGS_sorting_buffer_ms);
+
+    // Pair of <send_time, channel id>.
+    absl::btree_set<std::pair<aos::monotonic_clock::time_point, ChannelId>>
+        fetcher_times;
+
+    // Go through and seed fetcher_times with the first message on each channel.
+    for (const ChannelId channel : active_channels_) {
+      FetcherState *fetcher = &fetchers_[channel];
+      if (fetcher->sent_current_message) {
+        if (fetcher->fetcher->FetchNext()) {
+          fetcher->sent_current_message = false;
+        }
+      }
+      if (!fetcher->sent_current_message) {
+        const aos::monotonic_clock::time_point send_time =
+            fetcher->fetcher->context().monotonic_event_time;
+        if (send_time <= sort_until) {
+          fetcher_times.insert(std::make_pair(send_time, channel));
+        }
+      }
+    }
+
+    // Send the oldest message continually until we run out of messages to send.
+    while (!fetcher_times.empty()) {
+      const ChannelId channel = fetcher_times.begin()->second;
+      FetcherState *fetcher = &fetchers_[channel];
+      server_.sendMessage(
+          channel, fetcher_times.begin()->first.time_since_epoch().count(),
+          aos::FlatbufferToJson(
+              fetcher->fetcher->channel()->schema(),
+              static_cast<const uint8_t *>(fetcher->fetcher->context().data)));
+      fetcher_times.erase(fetcher_times.begin());
+      fetcher->sent_current_message = true;
+      if (fetcher->fetcher->FetchNext()) {
+        fetcher->sent_current_message = false;
+        const aos::monotonic_clock::time_point send_time =
+            fetcher->fetcher->context().monotonic_event_time;
+        if (send_time <= sort_until) {
+          fetcher_times.insert(std::make_pair(send_time, channel));
+        }
+      }
+    }
+  });
+
+  event_loop_->OnRun([timer, this]() {
+    timer->Setup(event_loop_->monotonic_now(), kPollPeriod);
+  });
+}
+FoxgloveWebsocketServer::~FoxgloveWebsocketServer() { server_.stop(); }
+}  // namespace aos
diff --git a/aos/util/foxglove_websocket_lib.h b/aos/util/foxglove_websocket_lib.h
new file mode 100644
index 0000000..8160653
--- /dev/null
+++ b/aos/util/foxglove_websocket_lib.h
@@ -0,0 +1,43 @@
+#ifndef AOS_UTIL_FOXGLOVE_WEBSOCKET_LIB_H_
+#define AOS_UTIL_FOXGLOVE_WEBSOCKET_LIB_H_
+#include <map>
+#include <set>
+
+#include "aos/events/event_loop.h"
+#include "foxglove/websocket/server.hpp"
+
+namespace aos {
+// This class implements a live AOS -> Foxglove Websocket Protocol connection,
+// making use of the implementation at
+// https://github.com/foxglove/ws-protocol/tree/main/cpp to send JSON messages
+// to a foxglove studio client.
+// See foxglove_websocket.cc for some usage notes.
+class FoxgloveWebsocketServer {
+ public:
+  FoxgloveWebsocketServer(aos::EventLoop *event_loop, uint32_t port);
+  ~FoxgloveWebsocketServer();
+
+ private:
+  typedef foxglove::websocket::ChannelId ChannelId;
+
+  struct FetcherState {
+    std::unique_ptr<aos::RawFetcher> fetcher;
+    // Whether the current message in the fetcher has been sent to the client.
+    // Starts as true because the fetcher starts with no data.
+    // This is necessary because we have to send all of our messages out
+    // in order, which we can only do once we know the timestamp of each
+    // message. And we can only know the timestamp after having called Fetch()
+    // on the fetcher. Once we get around to actually sending the data, we can
+    // set this to true so that we know it is safe to call FetchNext() again.
+    bool sent_current_message = true;
+  };
+
+  aos::EventLoop *event_loop_;
+  foxglove::websocket::Server server_;
+  // A map of fetchers for every single channel that could be subscribed to.
+  std::map<ChannelId, FetcherState> fetchers_;
+  // The set of channels that we have clients actively subscribed to.
+  std::set<ChannelId> active_channels_;
+};
+}  // namespace aos
+#endif  // AOS_UTIL_FOXGLOVE_WEBSOCKET_LIB_H_
diff --git a/third_party/foxglove_ws_protocol/BUILD b/third_party/foxglove_ws_protocol/BUILD
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/third_party/foxglove_ws_protocol/BUILD
diff --git a/third_party/foxglove_ws_protocol/foxglove_ws_protocol.BUILD b/third_party/foxglove_ws_protocol/foxglove_ws_protocol.BUILD
new file mode 100644
index 0000000..01d9a90
--- /dev/null
+++ b/third_party/foxglove_ws_protocol/foxglove_ws_protocol.BUILD
@@ -0,0 +1,19 @@
+# MIT License
+licenses(["notice"])
+
+cc_library(
+    name = "com_github_foxglove_ws-protocol",
+    hdrs = ["cpp/foxglove-websocket/include/foxglove/websocket/server.hpp"],
+    includes = ["cpp/foxglove-websocket/include/"],
+    visibility = ["//visibility:public"],
+    deps = [
+        "@com_github_nlohmann_json//:json",
+        "@com_github_zaphoyd_websocketpp",
+    ],
+)
+
+cc_binary(
+    name = "example_server",
+    srcs = ["cpp/examples/example_server.cpp"],
+    deps = [":com_github_foxglove_ws-protocol"],
+)
diff --git a/third_party/foxglove_ws_protocol/foxglove_ws_protocol.patch b/third_party/foxglove_ws_protocol/foxglove_ws_protocol.patch
new file mode 100644
index 0000000..39eb3da
--- /dev/null
+++ b/third_party/foxglove_ws_protocol/foxglove_ws_protocol.patch
@@ -0,0 +1,78 @@
+# Blue River-specific patches to be able to compile the websocket
+# server against boost and with our compilers/compiler flags.
+diff --git a/cpp/examples/example_server.cpp b/cpp/examples/example_server.cpp
+index c731072..e869e8f 100644
+--- a/cpp/examples/example_server.cpp
++++ b/cpp/examples/example_server.cpp
+@@ -44,7 +44,7 @@ int main() {
+   });
+
+   uint64_t i = 0;
+-  std::shared_ptr<asio::steady_timer> timer;
++  std::shared_ptr<boost::asio::steady_timer> timer;
+   std::function<void()> setTimer = [&] {
+     timer = server.getEndpoint().set_timer(200, [&](std::error_code const& ec) {
+       if (ec) {
+@@ -59,7 +59,7 @@ int main() {
+
+   setTimer();
+
+-  asio::signal_set signals(server.getEndpoint().get_io_service(), SIGINT);
++  boost::asio::signal_set signals(server.getEndpoint().get_io_service(), SIGINT);
+
+   signals.async_wait([&](std::error_code const& ec, int sig) {
+     if (ec) {
+diff --git a/cpp/foxglove-websocket/include/foxglove/websocket/server.hpp b/cpp/foxglove-websocket/include/foxglove/websocket/server.hpp
+index 16a5e83..8845971 100644
+--- a/cpp/foxglove-websocket/include/foxglove/websocket/server.hpp
++++ b/cpp/foxglove-websocket/include/foxglove/websocket/server.hpp
+@@ -1,6 +1,6 @@
+ #pragma once
+
+-#include <nlohmann/json.hpp>
++#include "single_include/nlohmann/json.hpp"
+ #include <websocketpp/config/asio_no_tls.hpp>
+ #include <websocketpp/server.hpp>
+
+@@ -84,6 +84,9 @@ public:
+   Server& operator=(Server&&) = delete;
+
+   void run();
++  size_t run_for(std::chrono::nanoseconds duration) {
++    return _server.get_io_service().run_for(duration);
++  }
+   void stop();
+
+   ChannelId addChannel(ChannelWithoutId channel);
+@@ -105,6 +108,10 @@ private:
+     std::unordered_map<SubscriptionId, ChannelId> subscriptions;
+     std::unordered_map<ChannelId, std::unordered_set<SubscriptionId>> subscriptionsByChannel;
+
++    ClientInfo(const std::string& name_, const ConnHandle& handle_)
++        : name(name_)
++        , handle(handle_) {}
++
+     ClientInfo(const ClientInfo&) = delete;
+     ClientInfo& operator=(const ClientInfo&) = delete;
+
+@@ -147,7 +154,7 @@ inline Server::Server(uint16_t port, std::string name)
+   _server.set_message_handler(std::bind(&Server::handleMessage, this, _1, _2));
+   _server.set_reuse_addr(true);
+   _server.set_listen_backlog(128);
+-  _server.listen(_port);
++  _server.listen(websocketpp::lib::asio::ip::tcp::v4(), _port);
+   _server.start_accept();
+ }
+
+@@ -174,10 +181,7 @@ inline void Server::handleConnectionOpened(ConnHandle hdl) {
+   _server.get_alog().write(
+     websocketpp::log::alevel::app,
+     "Client " + con->get_remote_endpoint() + " connected via " + con->get_resource());
+-  _clients.emplace(hdl, ClientInfo{
+-                          .name = con->get_remote_endpoint(),
+-                          .handle = hdl,
+-                        });
++  _clients.emplace(hdl, ClientInfo{con->get_remote_endpoint(), hdl});
+
+   con->send(json({
+                    {"op", "serverInfo"},
diff --git a/third_party/websocketpp/BUILD b/third_party/websocketpp/BUILD
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/third_party/websocketpp/BUILD
diff --git a/third_party/websocketpp/websocketpp.BUILD b/third_party/websocketpp/websocketpp.BUILD
new file mode 100644
index 0000000..ecf12cd
--- /dev/null
+++ b/third_party/websocketpp/websocketpp.BUILD
@@ -0,0 +1,11 @@
+# 3 Clause BSD
+licenses(["notice"])
+
+cc_library(
+    name = "com_github_zaphoyd_websocketpp",
+    hdrs = glob(["websocketpp/**/*.hpp"]),
+    defines = ["_WEBSOCKETPP_CPP11_STL_", "ASIO_STANDALONE"],
+    includes = ["."],
+    visibility = ["//visibility:public"],
+    deps = ["@asio"],
+)
diff --git a/tools/dependency_rewrite b/tools/dependency_rewrite
index bf5d271..80e31a3 100644
--- a/tools/dependency_rewrite
+++ b/tools/dependency_rewrite
@@ -11,6 +11,7 @@
 rewrite devsite.ctr-electronics.com/(.*) software.frc971.org/Build-Dependencies/devsite.ctr-electronics.com/$1
 rewrite www.openssl.org/(.*) software.frc971.org/Build-Dependencies/www.openssl.org/$1
 rewrite zlib.net/(.*) software.frc971.org/Build-Dependencies/zlib.net/$1
+rewrite downloads.sourceforge.net/(.*) software.frc971.org/Build-Dependencies/downloads.sourceforge.net/$1
 allow crates.io
 allow golang.org