Enable flatbuffer encoding for foxglove websocket server
Change-Id: I7ab44a5c6e978f517008fb7dcc40893688e95e28
Signed-off-by: James Kuszmaul <jabukuszmaul@gmail.com>
diff --git a/aos/util/foxglove_websocket_lib.cc b/aos/util/foxglove_websocket_lib.cc
index b4d262a..1cc3a8a 100644
--- a/aos/util/foxglove_websocket_lib.cc
+++ b/aos/util/foxglove_websocket_lib.cc
@@ -1,17 +1,13 @@
#include "aos/util/foxglove_websocket_lib.h"
#include "aos/util/mcap_logger.h"
+#include "aos/flatbuffer_merge.h"
+#include "absl/strings/escaping.h"
#include "gflags/gflags.h"
DEFINE_uint32(sorting_buffer_ms, 100,
"Amount of time to buffer messages to sort them before sending "
"them to foxglove.");
-DEFINE_bool(fetch_pinned_channels, false,
- "Set this to allow foxglove_websocket to make fetchers on channels "
- "with a read_method of PIN (see aos/configuration.fbs; PIN is an "
- "enum value). By default, we don't make fetchers for "
- "these channels since using up a fetcher slot on PIN'd channels "
- "can have side-effects.");
namespace {
// Period at which to poll the fetchers for all the channels.
@@ -19,21 +15,38 @@
} // namespace
namespace aos {
-FoxgloveWebsocketServer::FoxgloveWebsocketServer(aos::EventLoop *event_loop,
- uint32_t port)
- : event_loop_(event_loop), server_(port, "aos_foxglove") {
+FoxgloveWebsocketServer::FoxgloveWebsocketServer(
+ aos::EventLoop *event_loop, uint32_t port, Serialization serialization,
+ FetchPinnedChannels fetch_pinned_channels)
+ : event_loop_(event_loop),
+ serialization_(serialization),
+ fetch_pinned_channels_(fetch_pinned_channels),
+ server_(port, "aos_foxglove") {
for (const aos::Channel *channel :
*event_loop_->configuration()->channels()) {
const bool is_pinned = (channel->read_method() == ReadMethod::PIN);
if (aos::configuration::ChannelIsReadableOnNode(channel,
event_loop_->node()) &&
- (!is_pinned || FLAGS_fetch_pinned_channels)) {
+ (!is_pinned || fetch_pinned_channels_ == FetchPinnedChannels::kYes)) {
+ const FlatbufferDetachedBuffer<reflection::Schema> schema =
+ RecursiveCopyFlatBuffer(channel->schema());
const ChannelId id =
- server_.addChannel(foxglove::websocket::ChannelWithoutId{
- .topic = channel->name()->str() + " " + channel->type()->str(),
- .encoding = "json",
- .schemaName = channel->type()->str(),
- .schema = JsonSchemaForFlatbuffer({channel->schema()}).dump()});
+ (serialization_ == Serialization::kJson)
+ ? server_.addChannel(foxglove::websocket::ChannelWithoutId{
+ .topic =
+ channel->name()->str() + " " + channel->type()->str(),
+ .encoding = "json",
+ .schemaName = channel->type()->str(),
+ .schema =
+ JsonSchemaForFlatbuffer({channel->schema()}).dump()})
+ : server_.addChannel(foxglove::websocket::ChannelWithoutId{
+ .topic =
+ channel->name()->str() + " " + channel->type()->str(),
+ .encoding = "flatbuffer",
+ .schemaName = channel->type()->str(),
+ .schema = absl::Base64Escape(
+ {reinterpret_cast<const char *>(schema.span().data()),
+ schema.span().size()})});
CHECK(fetchers_.count(id) == 0);
fetchers_[id] =
FetcherState{.fetcher = event_loop_->MakeRawFetcher(channel)};
@@ -100,11 +113,20 @@
while (!fetcher_times.empty()) {
const ChannelId channel = fetcher_times.begin()->second;
FetcherState *fetcher = &fetchers_[channel];
- server_.sendMessage(
- channel, fetcher_times.begin()->first.time_since_epoch().count(),
- aos::FlatbufferToJson(
- fetcher->fetcher->channel()->schema(),
- static_cast<const uint8_t *>(fetcher->fetcher->context().data)));
+ switch (serialization_) {
+ case Serialization::kJson:
+ server_.sendMessage(
+ channel, fetcher_times.begin()->first.time_since_epoch().count(),
+ aos::FlatbufferToJson(fetcher->fetcher->channel()->schema(),
+ static_cast<const uint8_t *>(
+ fetcher->fetcher->context().data)));
+ break;
+ case Serialization::kFlatbuffer:
+ server_.sendMessage(
+ channel, fetcher_times.begin()->first.time_since_epoch().count(),
+ {static_cast<const char *>(fetcher->fetcher->context().data),
+ fetcher->fetcher->context().size});
+ }
fetcher_times.erase(fetcher_times.begin());
fetcher->sent_current_message = true;
if (fetcher->fetcher->FetchNext()) {