Implement flatbuffer mode in log_to_mcap

This makes log_to_mcap output a flatbuffer-based MCAP format (if requested)
instead of a JSON format. This significantly speeds up conversion times
and reduces disk space used, and should at least put us in the ball-park
for working with images in foxglove (although we still need to do some
extra work to either use one of their pre-defined image formats or
support our existing image formats natively in studio).

Change-Id: Id1778c0b5b9ced90517ab34269511d78cd0c7e58
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/log_to_mcap.cc b/aos/util/log_to_mcap.cc
index 49aca69..5330c60 100644
--- a/aos/util/log_to_mcap.cc
+++ b/aos/util/log_to_mcap.cc
@@ -5,6 +5,7 @@
 
 DEFINE_string(node, "", "Node to replay from the perspective of.");
 DEFINE_string(output_path, "/tmp/log.mcap", "Log to output.");
+DEFINE_string(mode, "json", "json or flatbuffer serialization.");
 
 // Converts an AOS log to an MCAP log that can be fed into Foxglove. To try this
 // out, run:
@@ -30,6 +31,9 @@
   std::unique_ptr<aos::EventLoop> mcap_event_loop =
       reader.event_loop_factory()->MakeEventLoop("mcap", node);
   CHECK(!FLAGS_output_path.empty());
-  aos::McapLogger relogger(mcap_event_loop.get(), FLAGS_output_path);
+  aos::McapLogger relogger(mcap_event_loop.get(), FLAGS_output_path,
+                           FLAGS_mode == "flatbuffer"
+                               ? aos::McapLogger::Serialization::kFlatbuffer
+                               : aos::McapLogger::Serialization::kJson);
   reader.event_loop_factory()->Run();
 }
diff --git a/aos/util/log_to_mcap_test.py b/aos/util/log_to_mcap_test.py
index 86c3d1d..72c9078 100644
--- a/aos/util/log_to_mcap_test.py
+++ b/aos/util/log_to_mcap_test.py
@@ -22,7 +22,7 @@
         subprocess.run([args.generate_log, "--output_folder", log_name]).check_returncode()
         # Run with a really small chunk size, to force a multi-chunk file.
         subprocess.run(
-            [args.log_to_mcap, "--output_path", mcap_name, "--mcap_chunk_size", "1000",
+            [args.log_to_mcap, "--output_path", mcap_name, "--mcap_chunk_size", "1000", "--mode", "json",
              log_name]).check_returncode()
         # MCAP attempts to find $HOME/.mcap.yaml, and dies on $HOME not existing. So
         # give it an arbitrary config location (it seems to be fine with a non-existent config).
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
index f4056ea..561a02b 100644
--- a/aos/util/mcap_logger.cc
+++ b/aos/util/mcap_logger.cc
@@ -1,10 +1,17 @@
 #include "aos/util/mcap_logger.h"
 
 #include "absl/strings/str_replace.h"
+#include "aos/flatbuffer_merge.h"
 #include "single_include/nlohmann/json.hpp"
 
-DEFINE_uint64(mcap_chunk_size, 10000000,
+DEFINE_uint64(mcap_chunk_size, 10'000'000,
               "Size, in bytes, of individual MCAP chunks");
+DEFINE_bool(fetch, false,
+            "Whether to fetch most recent messages at start of logfile. Turn "
+            "this on if there are, e.g., one-time messages sent before the "
+            "start of the logfile that you need access to. Turn it off if you "
+            "don't want to deal with having messages that have timestamps that "
+            "may be arbitrarily far before any other interesting messages.");
 
 namespace aos {
 
@@ -74,8 +81,11 @@
   return schema;
 }
 
-McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path)
-    : event_loop_(event_loop), output_(output_path) {
+McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path,
+                       Serialization serialization)
+    : event_loop_(event_loop),
+      output_(output_path),
+      serialization_(serialization) {
   event_loop->SkipTimingReport();
   event_loop->SkipAosLog();
   CHECK(output_);
@@ -119,13 +129,34 @@
   // summary and summary offset sections.
   WriteFooter(summary_offset, summary_offset_offset);
   WriteMagic();
+
+  // TODO(james): Add compression. With flatbuffers messages that contain large
+  // numbers of zeros (e.g., large grids or thresholded images) this can result
+  // in massive savings.
+  if (VLOG_IS_ON(2)) {
+    // For debugging, print out how much space each channel is taking in the
+    // overall log.
+    LOG(INFO) << total_message_bytes_;
+    std::vector<std::pair<size_t, const Channel *>> channel_bytes;
+    for (const auto &pair : total_channel_bytes_) {
+      channel_bytes.push_back(std::make_pair(pair.second, pair.first));
+    }
+    std::sort(channel_bytes.begin(), channel_bytes.end());
+    for (const auto &pair : channel_bytes) {
+      LOG(INFO) << configuration::StrippedChannelToString(pair.second) << ": "
+                << static_cast<float>(pair.first) * 1e-6 << "MB "
+                << static_cast<float>(pair.first) / total_message_bytes_
+                << "\n";
+    }
+  }
 }
 
 std::vector<McapLogger::SummaryOffset> McapLogger::WriteSchemasAndChannels(
     RegisterHandlers register_handlers) {
-  uint16_t id = 1;
+  uint16_t id = 0;
   std::map<uint16_t, const Channel *> channels;
   for (const Channel *channel : *event_loop_->configuration()->channels()) {
+    ++id;
     if (!configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
       continue;
     }
@@ -141,8 +172,13 @@
               WriteChunk();
             }
           });
+      fetchers_[id] = event_loop_->MakeRawFetcher(channel);
+      event_loop_->OnRun([this, id, channel]() {
+        if (FLAGS_fetch && fetchers_[id]->Fetch()) {
+          WriteMessage(id, channel, fetchers_[id]->context(), &current_chunk_);
+        }
+      });
     }
-    ++id;
   }
 
   std::vector<SummaryOffset> offsets;
@@ -200,7 +236,9 @@
 
 void McapLogger::WriteSchema(const uint16_t id, const aos::Channel *channel) {
   CHECK(channel->has_schema());
-  std::string schema = JsonSchemaForFlatbuffer({channel->schema()}).dump();
+
+  const FlatbufferDetachedBuffer<reflection::Schema> schema =
+      CopyFlatBuffer(channel->schema());
 
   // Write out the schema (we don't bother deduplicating schema types):
   string_builder_.Reset();
@@ -208,10 +246,23 @@
   AppendInt16(&string_builder_, id);
   // Type name
   AppendString(&string_builder_, channel->type()->string_view());
-  // Encoding
-  AppendString(&string_builder_, "jsonschema");
-  // Actual schema itself
-  AppendString(&string_builder_, schema);
+  switch (serialization_) {
+    case Serialization::kJson:
+      // Encoding
+      AppendString(&string_builder_, "jsonschema");
+      // Actual schema itself
+      AppendString(&string_builder_,
+                   JsonSchemaForFlatbuffer({channel->schema()}).dump());
+      break;
+    case Serialization::kFlatbuffer:
+      // Encoding
+      AppendString(&string_builder_, "flatbuffer");
+      // Actual schema itself
+      AppendString(&string_builder_,
+                   {reinterpret_cast<const char *>(schema.span().data()),
+                    schema.span().size()});
+      break;
+  }
   WriteRecord(OpCode::kSchema, string_builder_.Result());
 }
 
@@ -227,7 +278,15 @@
                absl::StrCat(channel->name()->string_view(), " ",
                             channel->type()->string_view()));
   // Encoding
-  AppendString(&string_builder_, "json");
+  switch (serialization_) {
+    case Serialization::kJson:
+      AppendString(&string_builder_, "json");
+      break;
+    case Serialization::kFlatbuffer:
+      AppendString(&string_builder_, "flatbuffer");
+      break;
+  }
+
   // Metadata (technically supposed to be a Map<string, string>)
   AppendString(&string_builder_, "");
   WriteRecord(OpCode::kChannel, string_builder_.Result());
@@ -241,9 +300,15 @@
 
   if (!earliest_message_.has_value()) {
     earliest_message_ = context.monotonic_event_time;
+  } else {
+    earliest_message_ =
+        std::min(context.monotonic_event_time, earliest_message_.value());
   }
   if (!earliest_chunk_message_.has_value()) {
     earliest_chunk_message_ = context.monotonic_event_time;
+  } else {
+    earliest_chunk_message_ =
+        std::min(context.monotonic_event_time, earliest_chunk_message_.value());
   }
   latest_message_ = context.monotonic_event_time;
 
@@ -257,6 +322,8 @@
   // TODO(james): If we use this for multi-node logfiles, use distributed clock.
   AppendInt64(&string_builder_,
               context.monotonic_event_time.time_since_epoch().count());
+  // Note: Foxglove Studio doesn't appear to actually support using publish time
+  // right now.
   AppendInt64(&string_builder_,
               context.monotonic_event_time.time_since_epoch().count());
 
@@ -267,8 +334,18 @@
       << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
       << channel->type()->c_str();
 
-  aos::FlatbufferToJson(&string_builder_, channel->schema(),
-                        static_cast<const uint8_t *>(context.data));
+  switch (serialization_) {
+    case Serialization::kJson:
+      aos::FlatbufferToJson(&string_builder_, channel->schema(),
+                            static_cast<const uint8_t *>(context.data));
+      break;
+    case Serialization::kFlatbuffer:
+      string_builder_.Append(
+          {static_cast<const char *>(context.data), context.size});
+      break;
+  }
+  total_message_bytes_ += context.size;
+  total_channel_bytes_[channel] += context.size;
 
   message_indices_[channel_id].push_back(std::make_pair<uint64_t, uint64_t>(
       context.monotonic_event_time.time_since_epoch().count(),
diff --git a/aos/util/mcap_logger.h b/aos/util/mcap_logger.h
index dcacb68..5ae6413 100644
--- a/aos/util/mcap_logger.h
+++ b/aos/util/mcap_logger.h
@@ -30,7 +30,14 @@
 // available, to be able to support Foxglove fully.
 class McapLogger {
  public:
-  McapLogger(EventLoop *event_loop, const std::string &output_path);
+  // Whether to serialize the messages into the MCAP file as JSON or
+  // flatbuffers.
+  enum class Serialization {
+    kJson,
+    kFlatbuffer,
+  };
+  McapLogger(EventLoop *event_loop, const std::string &output_path,
+             Serialization serialization);
   ~McapLogger();
 
  private:
@@ -122,6 +129,9 @@
 
   aos::EventLoop *event_loop_;
   std::ofstream output_;
+  const Serialization serialization_;
+  size_t total_message_bytes_ = 0;
+  std::map<const Channel *, size_t> total_channel_bytes_;
   // Buffer containing serialized message data for the currently-being-built
   // chunk.
   std::stringstream current_chunk_;
@@ -136,6 +146,7 @@
       aos::monotonic_clock::min_time;
   // Count of all messages on each channel, indexed by channel ID.
   std::map<uint16_t, uint64_t> message_counts_;
+  std::map<uint16_t, std::unique_ptr<RawFetcher>> fetchers_;
   // MessageIndex's for each message. The std::map is indexed by channel ID. The
   // vector is then a series of pairs of (timestamp, offset from start of
   // current_chunk_).