Create AOS MCAP logger for testing Foxglove

Doesn't add indexing yet, so still limited in size.

https://github.com/foxglove/studio/issues/2909 tracks support for
flatbuffers in foxglove studio

References: PRO-13587
Change-Id: I7c8c15c765395ade979eb8a011cfdae65451b526
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
new file mode 100644
index 0000000..9ab4012
--- /dev/null
+++ b/aos/util/mcap_logger.cc
@@ -0,0 +1,232 @@
+#include "aos/util/mcap_logger.h"
+
+#include "absl/strings/str_replace.h"
+#include "single_include/nlohmann/json.hpp"
+
+namespace aos {
+nlohmann::json JsonSchemaForFlatbuffer(const FlatbufferType &type,
+                                       JsonSchemaRecursion recursion_level) {
+  nlohmann::json schema;
+  if (recursion_level == JsonSchemaRecursion::kTopLevel) {
+    schema["$schema"] = "https://json-schema.org/draft/2020-12/schema";
+  }
+  schema["type"] = "object";
+  nlohmann::json properties;
+  for (int index = 0; index < type.NumberFields(); ++index) {
+    nlohmann::json field;
+    const bool is_array = type.FieldIsRepeating(index);
+    if (type.FieldIsSequence(index)) {
+      // For sub-tables/structs, just recurse.
+      nlohmann::json subtype = JsonSchemaForFlatbuffer(
+          type.FieldType(index), JsonSchemaRecursion::kNested);
+      if (is_array) {
+        field["type"] = "array";
+        field["items"] = subtype;
+      } else {
+        field = subtype;
+      }
+    } else {
+      std::string elementary_type;
+      switch (type.FieldElementaryType(index)) {
+        case flatbuffers::ET_UTYPE:
+        case flatbuffers::ET_CHAR:
+        case flatbuffers::ET_UCHAR:
+        case flatbuffers::ET_SHORT:
+        case flatbuffers::ET_USHORT:
+        case flatbuffers::ET_INT:
+        case flatbuffers::ET_UINT:
+        case flatbuffers::ET_LONG:
+        case flatbuffers::ET_ULONG:
+        case flatbuffers::ET_FLOAT:
+        case flatbuffers::ET_DOUBLE:
+          elementary_type = "number";
+          break;
+        case flatbuffers::ET_BOOL:
+          elementary_type = "boolean";
+          break;
+        case flatbuffers::ET_STRING:
+          elementary_type = "string";
+          break;
+        case flatbuffers::ET_SEQUENCE:
+          if (type.FieldIsEnum(index)) {
+            elementary_type = "string";
+          } else {
+            LOG(FATAL) << "Should not encounter any sequence fields here.";
+          }
+          break;
+      }
+      if (is_array) {
+        field["type"] = "array";
+        field["items"]["type"] = elementary_type;
+      } else {
+        field["type"] = elementary_type;
+      }
+    }
+    // the nlohmann::json [] operator needs an actual string, not just a
+    // string_view :(.
+    properties[std::string(type.FieldName(index))] = field;
+  }
+  schema["properties"] = properties;
+  return schema;
+}
+
+McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path)
+    : output_(output_path) {
+  event_loop->SkipTimingReport();
+  event_loop->SkipAosLog();
+  CHECK(output_);
+  WriteMagic();
+  WriteHeader();
+  uint16_t id = 1;
+  for (const Channel *channel : *event_loop->configuration()->channels()) {
+    if (!configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
+      continue;
+    }
+
+    WriteSchema(id, channel);
+
+    // Write out the channel entry that uses the schema (we just re-use the
+    // chema ID for the channel ID, since we aren't deduplicating schemas for
+    // channels that are of the same type).
+    WriteChannel(id, id, channel);
+
+    event_loop->MakeRawWatcher(
+        channel, [this, id, channel](const Context &context, const void *) {
+          WriteMessage(id, channel, context);
+        });
+    ++id;
+  }
+}
+
+McapLogger::~McapLogger() {
+  WriteDataEnd();
+  WriteFooter();
+  WriteMagic();
+}
+
+void McapLogger::WriteMagic() { output_ << "\x89MCAP0\r\n"; }
+
+void McapLogger::WriteHeader() {
+  string_builder_.Reset();
+  // "profile"
+  AppendString(&string_builder_, "x-aos");
+  // "library"
+  AppendString(&string_builder_, "AOS MCAP converter");
+  WriteRecord(OpCode::kHeader, string_builder_.Result());
+}
+
+void McapLogger::WriteFooter() {
+  string_builder_.Reset();
+  // Offsets and CRC32 for summary section, which we don't populate.
+  AppendInt64(&string_builder_, 0);
+  AppendInt64(&string_builder_, 0);
+  AppendInt32(&string_builder_, 0);
+  WriteRecord(OpCode::kFooter, string_builder_.Result());
+}
+
+void McapLogger::WriteDataEnd() {
+  string_builder_.Reset();
+  // CRC32 for the data, which we are too lazy to calculate.
+  AppendInt32(&string_builder_, 0);
+  WriteRecord(OpCode::kDataEnd, string_builder_.Result());
+}
+
+void McapLogger::WriteSchema(const uint16_t id, const aos::Channel *channel) {
+  CHECK(channel->has_schema());
+  std::string schema = JsonSchemaForFlatbuffer({channel->schema()}).dump();
+
+  // Write out the schema (we don't bother deduplicating schema types):
+  string_builder_.Reset();
+  // Schema ID
+  AppendInt16(&string_builder_, id);
+  // Type name
+  AppendString(&string_builder_, channel->type()->string_view());
+  // Encoding
+  AppendString(&string_builder_, "jsonschema");
+  // Actual schema itself
+  AppendString(&string_builder_, schema);
+  WriteRecord(OpCode::kSchema, string_builder_.Result());
+}
+
+void McapLogger::WriteChannel(const uint16_t id, const uint16_t schema_id,
+                              const aos::Channel *channel) {
+  string_builder_.Reset();
+  // Channel ID
+  AppendInt16(&string_builder_, id);
+  // Schema ID
+  AppendInt16(&string_builder_, schema_id);
+  // Topic name
+  AppendString(&string_builder_,
+               absl::StrCat(channel->name()->string_view(), " ",
+                            channel->type()->string_view()));
+  // Encoding
+  AppendString(&string_builder_, "json");
+  // Metadata (technically supposed to be a Map<string, string>)
+  AppendString(&string_builder_, "");
+  WriteRecord(OpCode::kChannel, string_builder_.Result());
+}
+
+void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
+                              const Context &context) {
+  CHECK_NOTNULL(context.data);
+
+  string_builder_.Reset();
+  // Channel ID
+  AppendInt16(&string_builder_, channel_id);
+  // Queue Index
+  AppendInt32(&string_builder_, context.queue_index);
+  // Log time, and publish time. Since we don't log a logged time, just use
+  // published time.
+  // TODO(james): If we use this for multi-node logfiles, use distributed clock.
+  AppendInt64(&string_builder_,
+              context.monotonic_event_time.time_since_epoch().count());
+  AppendInt64(&string_builder_,
+              context.monotonic_event_time.time_since_epoch().count());
+
+  CHECK(flatbuffers::Verify(*channel->schema(),
+                            *channel->schema()->root_table(),
+                            static_cast<const uint8_t *>(context.data),
+                            static_cast<size_t>(context.size)))
+      << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
+      << channel->type()->c_str();
+
+  aos::FlatbufferToJson(&string_builder_, channel->schema(),
+                        static_cast<const uint8_t *>(context.data));
+
+  WriteRecord(OpCode::kMessage, string_builder_.Result());
+}
+
+void McapLogger::WriteRecord(OpCode op, std::string_view record) {
+  output_.put(static_cast<char>(op));
+  uint64_t record_length = record.size();
+  output_.write(reinterpret_cast<const char *>(&record_length),
+                sizeof(record_length));
+  output_ << record;
+}
+
+void McapLogger::AppendString(FastStringBuilder *builder,
+                              std::string_view string) {
+  AppendInt32(builder, string.size());
+  builder->Append(string);
+}
+
+namespace {
+template <typename T>
+static void AppendInt(FastStringBuilder *builder, T val) {
+  builder->Append(
+      std::string_view(reinterpret_cast<const char *>(&val), sizeof(T)));
+}
+}  // namespace
+
+void McapLogger::AppendInt16(FastStringBuilder *builder, uint16_t val) {
+  AppendInt(builder, val);
+}
+
+void McapLogger::AppendInt32(FastStringBuilder *builder, uint32_t val) {
+  AppendInt(builder, val);
+}
+
+void McapLogger::AppendInt64(FastStringBuilder *builder, uint64_t val) {
+  AppendInt(builder, val);
+}
+}  // namespace aos