Enable flatbuffer encoding for foxglove websocket server

Change-Id: I7ab44a5c6e978f517008fb7dcc40893688e95e28
Signed-off-by: James Kuszmaul <jabukuszmaul@gmail.com>
diff --git a/aos/util/foxglove_websocket_lib.cc b/aos/util/foxglove_websocket_lib.cc
index b4d262a..1cc3a8a 100644
--- a/aos/util/foxglove_websocket_lib.cc
+++ b/aos/util/foxglove_websocket_lib.cc
@@ -1,17 +1,13 @@
 #include "aos/util/foxglove_websocket_lib.h"
 
 #include "aos/util/mcap_logger.h"
+#include "aos/flatbuffer_merge.h"
+#include "absl/strings/escaping.h"
 #include "gflags/gflags.h"
 
 DEFINE_uint32(sorting_buffer_ms, 100,
               "Amount of time to buffer messages to sort them before sending "
               "them to foxglove.");
-DEFINE_bool(fetch_pinned_channels, false,
-            "Set this to allow foxglove_websocket to make fetchers on channels "
-            "with a read_method of PIN (see aos/configuration.fbs; PIN is an "
-            "enum value). By default, we don't make fetchers for "
-            "these channels since using up a fetcher slot on PIN'd channels "
-            "can have side-effects.");
 
 namespace {
 // Period at which to poll the fetchers for all the channels.
@@ -19,21 +15,38 @@
 }  // namespace
 
 namespace aos {
-FoxgloveWebsocketServer::FoxgloveWebsocketServer(aos::EventLoop *event_loop,
-                                                 uint32_t port)
-    : event_loop_(event_loop), server_(port, "aos_foxglove") {
+FoxgloveWebsocketServer::FoxgloveWebsocketServer(
+    aos::EventLoop *event_loop, uint32_t port, Serialization serialization,
+    FetchPinnedChannels fetch_pinned_channels)
+    : event_loop_(event_loop),
+      serialization_(serialization),
+      fetch_pinned_channels_(fetch_pinned_channels),
+      server_(port, "aos_foxglove") {
   for (const aos::Channel *channel :
        *event_loop_->configuration()->channels()) {
     const bool is_pinned = (channel->read_method() == ReadMethod::PIN);
     if (aos::configuration::ChannelIsReadableOnNode(channel,
                                                     event_loop_->node()) &&
-        (!is_pinned || FLAGS_fetch_pinned_channels)) {
+        (!is_pinned || fetch_pinned_channels_ == FetchPinnedChannels::kYes)) {
+      const FlatbufferDetachedBuffer<reflection::Schema> schema =
+          RecursiveCopyFlatBuffer(channel->schema());
       const ChannelId id =
-          server_.addChannel(foxglove::websocket::ChannelWithoutId{
-              .topic = channel->name()->str() + " " + channel->type()->str(),
-              .encoding = "json",
-              .schemaName = channel->type()->str(),
-              .schema = JsonSchemaForFlatbuffer({channel->schema()}).dump()});
+          (serialization_ == Serialization::kJson)
+              ? server_.addChannel(foxglove::websocket::ChannelWithoutId{
+                    .topic =
+                        channel->name()->str() + " " + channel->type()->str(),
+                    .encoding = "json",
+                    .schemaName = channel->type()->str(),
+                    .schema =
+                        JsonSchemaForFlatbuffer({channel->schema()}).dump()})
+              : server_.addChannel(foxglove::websocket::ChannelWithoutId{
+                    .topic =
+                        channel->name()->str() + " " + channel->type()->str(),
+                    .encoding = "flatbuffer",
+                    .schemaName = channel->type()->str(),
+                    .schema = absl::Base64Escape(
+                        {reinterpret_cast<const char *>(schema.span().data()),
+                         schema.span().size()})});
       CHECK(fetchers_.count(id) == 0);
       fetchers_[id] =
           FetcherState{.fetcher = event_loop_->MakeRawFetcher(channel)};
@@ -100,11 +113,20 @@
     while (!fetcher_times.empty()) {
       const ChannelId channel = fetcher_times.begin()->second;
       FetcherState *fetcher = &fetchers_[channel];
-      server_.sendMessage(
-          channel, fetcher_times.begin()->first.time_since_epoch().count(),
-          aos::FlatbufferToJson(
-              fetcher->fetcher->channel()->schema(),
-              static_cast<const uint8_t *>(fetcher->fetcher->context().data)));
+      switch (serialization_) {
+        case Serialization::kJson:
+          server_.sendMessage(
+              channel, fetcher_times.begin()->first.time_since_epoch().count(),
+              aos::FlatbufferToJson(fetcher->fetcher->channel()->schema(),
+                                    static_cast<const uint8_t *>(
+                                        fetcher->fetcher->context().data)));
+          break;
+        case Serialization::kFlatbuffer:
+          server_.sendMessage(
+              channel, fetcher_times.begin()->first.time_since_epoch().count(),
+              {static_cast<const char *>(fetcher->fetcher->context().data),
+               fetcher->fetcher->context().size});
+      }
       fetcher_times.erase(fetcher_times.begin());
       fetcher->sent_current_message = true;
       if (fetcher->fetcher->FetchNext()) {