Enable flatbuffer encoding for foxglove websocket server
Change-Id: I7ab44a5c6e978f517008fb7dcc40893688e95e28
Signed-off-by: James Kuszmaul <jabukuszmaul@gmail.com>
diff --git a/aos/util/foxglove_websocket.cc b/aos/util/foxglove_websocket.cc
index 26092bc..6ecb600 100644
--- a/aos/util/foxglove_websocket.cc
+++ b/aos/util/foxglove_websocket.cc
@@ -5,6 +5,12 @@
DEFINE_string(config, "/app/aos_config.json", "Path to the config.");
DEFINE_uint32(port, 8765, "Port to use for foxglove websocket server.");
+DEFINE_string(mode, "flatbuffer", "json or flatbuffer serialization.");
+DEFINE_bool(fetch_pinned_channels, true,
+ "Set this to allow foxglove_websocket to make fetchers on channels "
+ "with a read_method of PIN (see aos/configuration.fbs; PIN is an "
+ "enum value). Having this enabled will cause foxglove to consume "
+ "extra shared memory resources.");
int main(int argc, char *argv[]) {
gflags::SetUsageMessage(
@@ -40,7 +46,14 @@
aos::ShmEventLoop event_loop(&config.message());
- aos::FoxgloveWebsocketServer server(&event_loop, FLAGS_port);
+ aos::FoxgloveWebsocketServer server(
+ &event_loop, FLAGS_port,
+ FLAGS_mode == "flatbuffer"
+ ? aos::FoxgloveWebsocketServer::Serialization::kFlatbuffer
+ : aos::FoxgloveWebsocketServer::Serialization::kJson,
+ FLAGS_fetch_pinned_channels
+ ? aos::FoxgloveWebsocketServer::FetchPinnedChannels::kYes
+ : aos::FoxgloveWebsocketServer::FetchPinnedChannels::kNo);
event_loop.Run();
}
diff --git a/aos/util/foxglove_websocket_lib.cc b/aos/util/foxglove_websocket_lib.cc
index b4d262a..1cc3a8a 100644
--- a/aos/util/foxglove_websocket_lib.cc
+++ b/aos/util/foxglove_websocket_lib.cc
@@ -1,17 +1,13 @@
#include "aos/util/foxglove_websocket_lib.h"
#include "aos/util/mcap_logger.h"
+#include "aos/flatbuffer_merge.h"
+#include "absl/strings/escaping.h"
#include "gflags/gflags.h"
DEFINE_uint32(sorting_buffer_ms, 100,
"Amount of time to buffer messages to sort them before sending "
"them to foxglove.");
-DEFINE_bool(fetch_pinned_channels, false,
- "Set this to allow foxglove_websocket to make fetchers on channels "
- "with a read_method of PIN (see aos/configuration.fbs; PIN is an "
- "enum value). By default, we don't make fetchers for "
- "these channels since using up a fetcher slot on PIN'd channels "
- "can have side-effects.");
namespace {
// Period at which to poll the fetchers for all the channels.
@@ -19,21 +15,38 @@
} // namespace
namespace aos {
-FoxgloveWebsocketServer::FoxgloveWebsocketServer(aos::EventLoop *event_loop,
- uint32_t port)
- : event_loop_(event_loop), server_(port, "aos_foxglove") {
+FoxgloveWebsocketServer::FoxgloveWebsocketServer(
+ aos::EventLoop *event_loop, uint32_t port, Serialization serialization,
+ FetchPinnedChannels fetch_pinned_channels)
+ : event_loop_(event_loop),
+ serialization_(serialization),
+ fetch_pinned_channels_(fetch_pinned_channels),
+ server_(port, "aos_foxglove") {
for (const aos::Channel *channel :
*event_loop_->configuration()->channels()) {
const bool is_pinned = (channel->read_method() == ReadMethod::PIN);
if (aos::configuration::ChannelIsReadableOnNode(channel,
event_loop_->node()) &&
- (!is_pinned || FLAGS_fetch_pinned_channels)) {
+ (!is_pinned || fetch_pinned_channels_ == FetchPinnedChannels::kYes)) {
+ const FlatbufferDetachedBuffer<reflection::Schema> schema =
+ RecursiveCopyFlatBuffer(channel->schema());
const ChannelId id =
- server_.addChannel(foxglove::websocket::ChannelWithoutId{
- .topic = channel->name()->str() + " " + channel->type()->str(),
- .encoding = "json",
- .schemaName = channel->type()->str(),
- .schema = JsonSchemaForFlatbuffer({channel->schema()}).dump()});
+ (serialization_ == Serialization::kJson)
+ ? server_.addChannel(foxglove::websocket::ChannelWithoutId{
+ .topic =
+ channel->name()->str() + " " + channel->type()->str(),
+ .encoding = "json",
+ .schemaName = channel->type()->str(),
+ .schema =
+ JsonSchemaForFlatbuffer({channel->schema()}).dump()})
+ : server_.addChannel(foxglove::websocket::ChannelWithoutId{
+ .topic =
+ channel->name()->str() + " " + channel->type()->str(),
+ .encoding = "flatbuffer",
+ .schemaName = channel->type()->str(),
+ .schema = absl::Base64Escape(
+ {reinterpret_cast<const char *>(schema.span().data()),
+ schema.span().size()})});
CHECK(fetchers_.count(id) == 0);
fetchers_[id] =
FetcherState{.fetcher = event_loop_->MakeRawFetcher(channel)};
@@ -100,11 +113,20 @@
while (!fetcher_times.empty()) {
const ChannelId channel = fetcher_times.begin()->second;
FetcherState *fetcher = &fetchers_[channel];
- server_.sendMessage(
- channel, fetcher_times.begin()->first.time_since_epoch().count(),
- aos::FlatbufferToJson(
- fetcher->fetcher->channel()->schema(),
- static_cast<const uint8_t *>(fetcher->fetcher->context().data)));
+ switch (serialization_) {
+ case Serialization::kJson:
+ server_.sendMessage(
+ channel, fetcher_times.begin()->first.time_since_epoch().count(),
+ aos::FlatbufferToJson(fetcher->fetcher->channel()->schema(),
+ static_cast<const uint8_t *>(
+ fetcher->fetcher->context().data)));
+ break;
+ case Serialization::kFlatbuffer:
+ server_.sendMessage(
+ channel, fetcher_times.begin()->first.time_since_epoch().count(),
+ {static_cast<const char *>(fetcher->fetcher->context().data),
+ fetcher->fetcher->context().size});
+ }
fetcher_times.erase(fetcher_times.begin());
fetcher->sent_current_message = true;
if (fetcher->fetcher->FetchNext()) {
diff --git a/aos/util/foxglove_websocket_lib.h b/aos/util/foxglove_websocket_lib.h
index 8160653..9be2f61 100644
--- a/aos/util/foxglove_websocket_lib.h
+++ b/aos/util/foxglove_websocket_lib.h
@@ -14,7 +14,19 @@
// See foxglove_websocket.cc for some usage notes.
class FoxgloveWebsocketServer {
public:
- FoxgloveWebsocketServer(aos::EventLoop *event_loop, uint32_t port);
+ // Whether to serialize the messages into the MCAP file as JSON or
+ // flatbuffers.
+ enum class Serialization {
+ kJson,
+ kFlatbuffer,
+ };
+ enum class FetchPinnedChannels {
+ kYes,
+ kNo,
+ };
+ FoxgloveWebsocketServer(aos::EventLoop *event_loop, uint32_t port,
+ Serialization serialization,
+ FetchPinnedChannels fetch_pinned_channels);
~FoxgloveWebsocketServer();
private:
@@ -33,6 +45,8 @@
};
aos::EventLoop *event_loop_;
+ const Serialization serialization_;
+ const FetchPinnedChannels fetch_pinned_channels_;
foxglove::websocket::Server server_;
// A map of fetchers for every single channel that could be subscribed to.
std::map<ChannelId, FetcherState> fetchers_;
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
index 96a9b60..40e55f0 100644
--- a/aos/util/mcap_logger.cc
+++ b/aos/util/mcap_logger.cc
@@ -309,7 +309,7 @@
CHECK(channel->has_schema());
const FlatbufferDetachedBuffer<reflection::Schema> schema =
- CopyFlatBuffer(channel->schema());
+ RecursiveCopyFlatBuffer(channel->schema());
// Write out the schema (we don't bother deduplicating schema types):
string_builder_.Reset();