Add support for Foxglove websocket protocol
Imports the foxglove websocket protocol library so
that I don't have to re-implement it--this also drags
in the websocketpp library, which is a bit awkward since
we also already have seasocks imported.
Change-Id: I1973eded827dda9d97a3dba271f8604705be3d1e
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/WORKSPACE b/WORKSPACE
index dbde661..d402c98 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -1328,3 +1328,36 @@
load("//third_party/cargo:crates.bzl", "raze_fetch_remote_crates")
raze_fetch_remote_crates()
+
+http_archive(
+ name = "com_github_zaphoyd_websocketpp",
+ build_file = "//third_party/websocketpp:websocketpp.BUILD",
+ sha256 = "6ce889d85ecdc2d8fa07408d6787e7352510750daa66b5ad44aacb47bea76755",
+ strip_prefix = "websocketpp-0.8.2",
+ url = "https://github.com/zaphoyd/websocketpp/archive/refs/tags/0.8.2.tar.gz",
+)
+
+http_archive(
+ name = "com_github_foxglove_ws-protocol",
+ build_file = "//third_party/foxglove_ws_protocol:foxglove_ws_protocol.BUILD",
+ patch_args = ["-p1"],
+ patches = ["//third_party/foxglove_ws_protocol:foxglove_ws_protocol.patch"],
+ sha256 = "3256f09a67419f6556778c443d332f1a4bf53ba0e7a464179bf838abffa366ab",
+ strip_prefix = "ws-protocol-releases-typescript-ws-protocol-examples-v0.0.6",
+ url = "https://github.com/foxglove/ws-protocol/archive/refs/tags/releases/typescript/ws-protocol-examples/v0.0.6.tar.gz",
+)
+
+http_archive(
+ name = "asio",
+ build_file_content = """
+cc_library(
+ name = "asio",
+ hdrs = glob(["include/asio/**/*.hpp", "include/asio/**/*.ipp", "include/asio.hpp"]),
+ visibility = ["//visibility:public"],
+ defines = ["ASIO_STANDALONE"],
+ includes = ["include/"],
+)""",
+ sha256 = "8976812c24a118600f6fcf071a20606630a69afe4c0abee3b0dea528e682c585",
+ strip_prefix = "asio-1.24.0",
+ url = "https://downloads.sourceforge.net/project/asio/asio/1.24.0%2520%2528Stable%2529/asio-1.24.0.tar.bz2",
+)
diff --git a/aos/util/BUILD b/aos/util/BUILD
index f8eb09f..bd1dbab 100644
--- a/aos/util/BUILD
+++ b/aos/util/BUILD
@@ -437,6 +437,17 @@
],
)
+cc_library(
+ name = "foxglove_websocket_lib",
+ srcs = ["foxglove_websocket_lib.cc"],
+ hdrs = ["foxglove_websocket_lib.h"],
+ deps = [
+ ":mcap_logger",
+ "//aos/events:event_loop",
+ "@com_github_foxglove_ws-protocol",
+ ],
+)
+
cc_binary(
name = "config_validator",
testonly = True,
@@ -454,3 +465,15 @@
"@com_google_googletest//:gtest",
],
)
+
+cc_binary(
+ name = "foxglove_websocket",
+ srcs = ["foxglove_websocket.cc"],
+ visibility = ["//visibility:public"],
+ deps = [
+ ":foxglove_websocket_lib",
+ "//aos:init",
+ "//aos/events:shm_event_loop",
+ "@com_github_gflags_gflags//:gflags",
+ ],
+)
diff --git a/aos/util/foxglove_websocket.cc b/aos/util/foxglove_websocket.cc
new file mode 100644
index 0000000..26092bc
--- /dev/null
+++ b/aos/util/foxglove_websocket.cc
@@ -0,0 +1,46 @@
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+#include "aos/util/foxglove_websocket_lib.h"
+#include "gflags/gflags.h"
+
+DEFINE_string(config, "/app/aos_config.json", "Path to the config.");
+DEFINE_uint32(port, 8765, "Port to use for foxglove websocket server.");
+
+int main(int argc, char *argv[]) {
+ gflags::SetUsageMessage(
+ "Runs a websocket server that a foxglove instance can connect to in "
+ "order to view live data on a device.\n\n"
+ "Typical Usage: foxglove_websocket [--port 8765]\n"
+ "If the default port is not exposed directly, you can port-forward with "
+ "SSH by doing\n"
+ "$ ssh -L 8765:localhost:8765 ssh_target\n\n"
+ "When accessing this in foxglove:\n"
+ "1) Open a data source (this window may be open by default).\n"
+ "2) Select \"Open Connection\"\n"
+ "3) Select \"Foxglove WebSocket\" (do NOT select the rosbridge option)\n"
+ "4) Fill out the URL for the machine. If port forwarding, the default\n"
+ " ws://localhost:8765 should work.\n\n"
+ "Note that this does not start up a foxglove instance itself. You must "
+ "either have one locally on your laptop, or go to "
+ "https://studio.foxglove.dev, or use another application to serve the "
+ "foxglove HTML pages.\n"
+ "If you want to use the studio.foxglove.dev page to view data (which "
+ "won't send any of your data to foxglove.dev--it's just needed to load "
+ "the HTML files), you can also go directly to:\n"
+ "https://studio.foxglove.dev/?ds=foxglove-websocket&ds.url=ws://"
+ "localhost:8765\n"
+ "where localhost:8765 must be updated if you aren't port-forwarding "
+ "and/or are using a different port number. Similarly, if you are serving "
+ "the static foxglove files locally, you can update the "
+ "studio.foxglove.dev to point at your local webserver.\n");
+ aos::InitGoogle(&argc, &argv);
+
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(FLAGS_config);
+
+ aos::ShmEventLoop event_loop(&config.message());
+
+ aos::FoxgloveWebsocketServer server(&event_loop, FLAGS_port);
+
+ event_loop.Run();
+}
diff --git a/aos/util/foxglove_websocket_lib.cc b/aos/util/foxglove_websocket_lib.cc
new file mode 100644
index 0000000..b4d262a
--- /dev/null
+++ b/aos/util/foxglove_websocket_lib.cc
@@ -0,0 +1,126 @@
+#include "aos/util/foxglove_websocket_lib.h"
+
+#include "aos/util/mcap_logger.h"
+#include "gflags/gflags.h"
+
+DEFINE_uint32(sorting_buffer_ms, 100,
+ "Amount of time to buffer messages to sort them before sending "
+ "them to foxglove.");
+DEFINE_bool(fetch_pinned_channels, false,
+ "Set this to allow foxglove_websocket to make fetchers on channels "
+ "with a read_method of PIN (see aos/configuration.fbs; PIN is an "
+ "enum value). By default, we don't make fetchers for "
+ "these channels since using up a fetcher slot on PIN'd channels "
+ "can have side-effects.");
+
+namespace {
+// Period at which to poll the fetchers for all the channels.
+constexpr std::chrono::milliseconds kPollPeriod{50};
+} // namespace
+
+namespace aos {
+FoxgloveWebsocketServer::FoxgloveWebsocketServer(aos::EventLoop *event_loop,
+ uint32_t port)
+ : event_loop_(event_loop), server_(port, "aos_foxglove") {
+ for (const aos::Channel *channel :
+ *event_loop_->configuration()->channels()) {
+ const bool is_pinned = (channel->read_method() == ReadMethod::PIN);
+ if (aos::configuration::ChannelIsReadableOnNode(channel,
+ event_loop_->node()) &&
+ (!is_pinned || FLAGS_fetch_pinned_channels)) {
+ const ChannelId id =
+ server_.addChannel(foxglove::websocket::ChannelWithoutId{
+ .topic = channel->name()->str() + " " + channel->type()->str(),
+ .encoding = "json",
+ .schemaName = channel->type()->str(),
+ .schema = JsonSchemaForFlatbuffer({channel->schema()}).dump()});
+ CHECK(fetchers_.count(id) == 0);
+ fetchers_[id] =
+ FetcherState{.fetcher = event_loop_->MakeRawFetcher(channel)};
+ }
+ }
+
+ server_.setSubscribeHandler([this](ChannelId channel) {
+ if (fetchers_.count(channel) == 0) {
+ return;
+ }
+ if (active_channels_.count(channel) == 0) {
+ // Catch up to the latest message on the requested channel, then subscribe
+ // to it.
+ fetchers_[channel].fetcher->Fetch();
+ active_channels_.insert(channel);
+ }
+ });
+ server_.setUnsubscribeHandler(
+ [this](ChannelId channel) { active_channels_.erase(channel); });
+ aos::TimerHandler *timer = event_loop_->AddTimer([this]() {
+ // In order to run the websocket server, we just let it spin every cycle for
+ // a bit. This isn't great for integration, but lets us stay in control and
+ // until we either have (a) a chance to locate a file descriptor to hand
+ // epoll; or (b) rewrite the foxglove websocket server to use seasocks
+ // (which we know how to integrate), we'll just function with this.
+ // TODO(james): Tighter integration into our event loop structure.
+ server_.run_for(kPollPeriod / 2);
+
+ // Unfortunately, we can't just push out all the messages as they come in.
+ // Foxglove expects that the timestamps associated with each message to be
+ // monotonic, and if you send things out of order then it will clear the
+ // state of the visualization entirely, which makes viewing plots
+ // impossible. If the user only accesses a single channel, that is fine, but
+ // as soon as they try to use multiple channels, you encounter interleaving.
+ // To resolve this, we specify a buffer (--sorting_buffer_ms), and only send
+ // out messages older than that time, sorting everything before we send it
+ // out.
+ const aos::monotonic_clock::time_point sort_until =
+ event_loop_->monotonic_now() -
+ std::chrono::milliseconds(FLAGS_sorting_buffer_ms);
+
+ // Pair of <send_time, channel id>.
+ absl::btree_set<std::pair<aos::monotonic_clock::time_point, ChannelId>>
+ fetcher_times;
+
+ // Go through and seed fetcher_times with the first message on each channel.
+ for (const ChannelId channel : active_channels_) {
+ FetcherState *fetcher = &fetchers_[channel];
+ if (fetcher->sent_current_message) {
+ if (fetcher->fetcher->FetchNext()) {
+ fetcher->sent_current_message = false;
+ }
+ }
+ if (!fetcher->sent_current_message) {
+ const aos::monotonic_clock::time_point send_time =
+ fetcher->fetcher->context().monotonic_event_time;
+ if (send_time <= sort_until) {
+ fetcher_times.insert(std::make_pair(send_time, channel));
+ }
+ }
+ }
+
+ // Send the oldest message continually until we run out of messages to send.
+ while (!fetcher_times.empty()) {
+ const ChannelId channel = fetcher_times.begin()->second;
+ FetcherState *fetcher = &fetchers_[channel];
+ server_.sendMessage(
+ channel, fetcher_times.begin()->first.time_since_epoch().count(),
+ aos::FlatbufferToJson(
+ fetcher->fetcher->channel()->schema(),
+ static_cast<const uint8_t *>(fetcher->fetcher->context().data)));
+ fetcher_times.erase(fetcher_times.begin());
+ fetcher->sent_current_message = true;
+ if (fetcher->fetcher->FetchNext()) {
+ fetcher->sent_current_message = false;
+ const aos::monotonic_clock::time_point send_time =
+ fetcher->fetcher->context().monotonic_event_time;
+ if (send_time <= sort_until) {
+ fetcher_times.insert(std::make_pair(send_time, channel));
+ }
+ }
+ }
+ });
+
+ event_loop_->OnRun([timer, this]() {
+ timer->Setup(event_loop_->monotonic_now(), kPollPeriod);
+ });
+}
+FoxgloveWebsocketServer::~FoxgloveWebsocketServer() { server_.stop(); }
+} // namespace aos
diff --git a/aos/util/foxglove_websocket_lib.h b/aos/util/foxglove_websocket_lib.h
new file mode 100644
index 0000000..8160653
--- /dev/null
+++ b/aos/util/foxglove_websocket_lib.h
@@ -0,0 +1,43 @@
+#ifndef AOS_UTIL_FOXGLOVE_WEBSOCKET_LIB_H_
+#define AOS_UTIL_FOXGLOVE_WEBSOCKET_LIB_H_
+#include <map>
+#include <set>
+
+#include "aos/events/event_loop.h"
+#include "foxglove/websocket/server.hpp"
+
+namespace aos {
+// This class implements a live AOS -> Foxglove Websocket Protocol connection,
+// making use of the implementation at
+// https://github.com/foxglove/ws-protocol/tree/main/cpp to send JSON messages
+// to a foxglove studio client.
+// See foxglove_websocket.cc for some usage notes.
+class FoxgloveWebsocketServer {
+ public:
+ FoxgloveWebsocketServer(aos::EventLoop *event_loop, uint32_t port);
+ ~FoxgloveWebsocketServer();
+
+ private:
+ typedef foxglove::websocket::ChannelId ChannelId;
+
+ struct FetcherState {
+ std::unique_ptr<aos::RawFetcher> fetcher;
+ // Whether the current message in the fetcher has been sent to the client.
+ // Starts as true because the fetcher starts with no data.
+ // This is necessary because we have to send all of our messages out
+ // in order, which we can only do once we know the timestamp of each
+ // message. And we can only know the timestamp after having called Fetch()
+ // on the fetcher. Once we get around to actually sending the data, we can
+ // set this to true so that we know it is safe to call FetchNext() again.
+ bool sent_current_message = true;
+ };
+
+ aos::EventLoop *event_loop_;
+ foxglove::websocket::Server server_;
+ // A map of fetchers for every single channel that could be subscribed to.
+ std::map<ChannelId, FetcherState> fetchers_;
+ // The set of channels that we have clients actively subscribed to.
+ std::set<ChannelId> active_channels_;
+};
+} // namespace aos
+#endif // AOS_UTIL_FOXGLOVE_WEBSOCKET_LIB_H_
diff --git a/third_party/foxglove_ws_protocol/BUILD b/third_party/foxglove_ws_protocol/BUILD
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/third_party/foxglove_ws_protocol/BUILD
diff --git a/third_party/foxglove_ws_protocol/foxglove_ws_protocol.BUILD b/third_party/foxglove_ws_protocol/foxglove_ws_protocol.BUILD
new file mode 100644
index 0000000..01d9a90
--- /dev/null
+++ b/third_party/foxglove_ws_protocol/foxglove_ws_protocol.BUILD
@@ -0,0 +1,19 @@
+# MIT License
+licenses(["notice"])
+
+cc_library(
+ name = "com_github_foxglove_ws-protocol",
+ hdrs = ["cpp/foxglove-websocket/include/foxglove/websocket/server.hpp"],
+ includes = ["cpp/foxglove-websocket/include/"],
+ visibility = ["//visibility:public"],
+ deps = [
+ "@com_github_nlohmann_json//:json",
+ "@com_github_zaphoyd_websocketpp",
+ ],
+)
+
+cc_binary(
+ name = "example_server",
+ srcs = ["cpp/examples/example_server.cpp"],
+ deps = [":com_github_foxglove_ws-protocol"],
+)
diff --git a/third_party/foxglove_ws_protocol/foxglove_ws_protocol.patch b/third_party/foxglove_ws_protocol/foxglove_ws_protocol.patch
new file mode 100644
index 0000000..39eb3da
--- /dev/null
+++ b/third_party/foxglove_ws_protocol/foxglove_ws_protocol.patch
@@ -0,0 +1,78 @@
+# Blue River-specific patches to be able to compile the websocket
+# server against boost and with our compilers/compiler flags.
+diff --git a/cpp/examples/example_server.cpp b/cpp/examples/example_server.cpp
+index c731072..e869e8f 100644
+--- a/cpp/examples/example_server.cpp
++++ b/cpp/examples/example_server.cpp
+@@ -44,7 +44,7 @@ int main() {
+ });
+
+ uint64_t i = 0;
+- std::shared_ptr<asio::steady_timer> timer;
++ std::shared_ptr<boost::asio::steady_timer> timer;
+ std::function<void()> setTimer = [&] {
+ timer = server.getEndpoint().set_timer(200, [&](std::error_code const& ec) {
+ if (ec) {
+@@ -59,7 +59,7 @@ int main() {
+
+ setTimer();
+
+- asio::signal_set signals(server.getEndpoint().get_io_service(), SIGINT);
++ boost::asio::signal_set signals(server.getEndpoint().get_io_service(), SIGINT);
+
+ signals.async_wait([&](std::error_code const& ec, int sig) {
+ if (ec) {
+diff --git a/cpp/foxglove-websocket/include/foxglove/websocket/server.hpp b/cpp/foxglove-websocket/include/foxglove/websocket/server.hpp
+index 16a5e83..8845971 100644
+--- a/cpp/foxglove-websocket/include/foxglove/websocket/server.hpp
++++ b/cpp/foxglove-websocket/include/foxglove/websocket/server.hpp
+@@ -1,6 +1,6 @@
+ #pragma once
+
+-#include <nlohmann/json.hpp>
++#include "single_include/nlohmann/json.hpp"
+ #include <websocketpp/config/asio_no_tls.hpp>
+ #include <websocketpp/server.hpp>
+
+@@ -84,6 +84,9 @@ public:
+ Server& operator=(Server&&) = delete;
+
+ void run();
++ size_t run_for(std::chrono::nanoseconds duration) {
++ return _server.get_io_service().run_for(duration);
++ }
+ void stop();
+
+ ChannelId addChannel(ChannelWithoutId channel);
+@@ -105,6 +108,10 @@ private:
+ std::unordered_map<SubscriptionId, ChannelId> subscriptions;
+ std::unordered_map<ChannelId, std::unordered_set<SubscriptionId>> subscriptionsByChannel;
+
++ ClientInfo(const std::string& name_, const ConnHandle& handle_)
++ : name(name_)
++ , handle(handle_) {}
++
+ ClientInfo(const ClientInfo&) = delete;
+ ClientInfo& operator=(const ClientInfo&) = delete;
+
+@@ -147,7 +154,7 @@ inline Server::Server(uint16_t port, std::string name)
+ _server.set_message_handler(std::bind(&Server::handleMessage, this, _1, _2));
+ _server.set_reuse_addr(true);
+ _server.set_listen_backlog(128);
+- _server.listen(_port);
++ _server.listen(websocketpp::lib::asio::ip::tcp::v4(), _port);
+ _server.start_accept();
+ }
+
+@@ -174,10 +181,7 @@ inline void Server::handleConnectionOpened(ConnHandle hdl) {
+ _server.get_alog().write(
+ websocketpp::log::alevel::app,
+ "Client " + con->get_remote_endpoint() + " connected via " + con->get_resource());
+- _clients.emplace(hdl, ClientInfo{
+- .name = con->get_remote_endpoint(),
+- .handle = hdl,
+- });
++ _clients.emplace(hdl, ClientInfo{con->get_remote_endpoint(), hdl});
+
+ con->send(json({
+ {"op", "serverInfo"},
diff --git a/third_party/websocketpp/BUILD b/third_party/websocketpp/BUILD
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/third_party/websocketpp/BUILD
diff --git a/third_party/websocketpp/websocketpp.BUILD b/third_party/websocketpp/websocketpp.BUILD
new file mode 100644
index 0000000..ecf12cd
--- /dev/null
+++ b/third_party/websocketpp/websocketpp.BUILD
@@ -0,0 +1,11 @@
+# 3 Clause BSD
+licenses(["notice"])
+
+cc_library(
+ name = "com_github_zaphoyd_websocketpp",
+ hdrs = glob(["websocketpp/**/*.hpp"]),
+ defines = ["_WEBSOCKETPP_CPP11_STL_", "ASIO_STANDALONE"],
+ includes = ["."],
+ visibility = ["//visibility:public"],
+ deps = ["@asio"],
+)
diff --git a/tools/dependency_rewrite b/tools/dependency_rewrite
index bf5d271..80e31a3 100644
--- a/tools/dependency_rewrite
+++ b/tools/dependency_rewrite
@@ -11,6 +11,7 @@
rewrite devsite.ctr-electronics.com/(.*) software.frc971.org/Build-Dependencies/devsite.ctr-electronics.com/$1
rewrite www.openssl.org/(.*) software.frc971.org/Build-Dependencies/www.openssl.org/$1
rewrite zlib.net/(.*) software.frc971.org/Build-Dependencies/zlib.net/$1
+rewrite downloads.sourceforge.net/(.*) software.frc971.org/Build-Dependencies/downloads.sourceforge.net/$1
allow crates.io
allow golang.org