blob: 9ab4012948af4a8b7c4a932b8597d10612925232 [file] [log] [blame]
#include "aos/util/mcap_logger.h"
#include "absl/strings/str_replace.h"
#include "single_include/nlohmann/json.hpp"
namespace aos {
nlohmann::json JsonSchemaForFlatbuffer(const FlatbufferType &type,
JsonSchemaRecursion recursion_level) {
nlohmann::json schema;
if (recursion_level == JsonSchemaRecursion::kTopLevel) {
schema["$schema"] = "https://json-schema.org/draft/2020-12/schema";
}
schema["type"] = "object";
nlohmann::json properties;
for (int index = 0; index < type.NumberFields(); ++index) {
nlohmann::json field;
const bool is_array = type.FieldIsRepeating(index);
if (type.FieldIsSequence(index)) {
// For sub-tables/structs, just recurse.
nlohmann::json subtype = JsonSchemaForFlatbuffer(
type.FieldType(index), JsonSchemaRecursion::kNested);
if (is_array) {
field["type"] = "array";
field["items"] = subtype;
} else {
field = subtype;
}
} else {
std::string elementary_type;
switch (type.FieldElementaryType(index)) {
case flatbuffers::ET_UTYPE:
case flatbuffers::ET_CHAR:
case flatbuffers::ET_UCHAR:
case flatbuffers::ET_SHORT:
case flatbuffers::ET_USHORT:
case flatbuffers::ET_INT:
case flatbuffers::ET_UINT:
case flatbuffers::ET_LONG:
case flatbuffers::ET_ULONG:
case flatbuffers::ET_FLOAT:
case flatbuffers::ET_DOUBLE:
elementary_type = "number";
break;
case flatbuffers::ET_BOOL:
elementary_type = "boolean";
break;
case flatbuffers::ET_STRING:
elementary_type = "string";
break;
case flatbuffers::ET_SEQUENCE:
if (type.FieldIsEnum(index)) {
elementary_type = "string";
} else {
LOG(FATAL) << "Should not encounter any sequence fields here.";
}
break;
}
if (is_array) {
field["type"] = "array";
field["items"]["type"] = elementary_type;
} else {
field["type"] = elementary_type;
}
}
// the nlohmann::json [] operator needs an actual string, not just a
// string_view :(.
properties[std::string(type.FieldName(index))] = field;
}
schema["properties"] = properties;
return schema;
}
McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path)
: output_(output_path) {
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
CHECK(output_);
WriteMagic();
WriteHeader();
uint16_t id = 1;
for (const Channel *channel : *event_loop->configuration()->channels()) {
if (!configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
continue;
}
WriteSchema(id, channel);
// Write out the channel entry that uses the schema (we just re-use the
// chema ID for the channel ID, since we aren't deduplicating schemas for
// channels that are of the same type).
WriteChannel(id, id, channel);
event_loop->MakeRawWatcher(
channel, [this, id, channel](const Context &context, const void *) {
WriteMessage(id, channel, context);
});
++id;
}
}
McapLogger::~McapLogger() {
WriteDataEnd();
WriteFooter();
WriteMagic();
}
void McapLogger::WriteMagic() { output_ << "\x89MCAP0\r\n"; }
void McapLogger::WriteHeader() {
string_builder_.Reset();
// "profile"
AppendString(&string_builder_, "x-aos");
// "library"
AppendString(&string_builder_, "AOS MCAP converter");
WriteRecord(OpCode::kHeader, string_builder_.Result());
}
void McapLogger::WriteFooter() {
string_builder_.Reset();
// Offsets and CRC32 for summary section, which we don't populate.
AppendInt64(&string_builder_, 0);
AppendInt64(&string_builder_, 0);
AppendInt32(&string_builder_, 0);
WriteRecord(OpCode::kFooter, string_builder_.Result());
}
void McapLogger::WriteDataEnd() {
string_builder_.Reset();
// CRC32 for the data, which we are too lazy to calculate.
AppendInt32(&string_builder_, 0);
WriteRecord(OpCode::kDataEnd, string_builder_.Result());
}
void McapLogger::WriteSchema(const uint16_t id, const aos::Channel *channel) {
CHECK(channel->has_schema());
std::string schema = JsonSchemaForFlatbuffer({channel->schema()}).dump();
// Write out the schema (we don't bother deduplicating schema types):
string_builder_.Reset();
// Schema ID
AppendInt16(&string_builder_, id);
// Type name
AppendString(&string_builder_, channel->type()->string_view());
// Encoding
AppendString(&string_builder_, "jsonschema");
// Actual schema itself
AppendString(&string_builder_, schema);
WriteRecord(OpCode::kSchema, string_builder_.Result());
}
void McapLogger::WriteChannel(const uint16_t id, const uint16_t schema_id,
const aos::Channel *channel) {
string_builder_.Reset();
// Channel ID
AppendInt16(&string_builder_, id);
// Schema ID
AppendInt16(&string_builder_, schema_id);
// Topic name
AppendString(&string_builder_,
absl::StrCat(channel->name()->string_view(), " ",
channel->type()->string_view()));
// Encoding
AppendString(&string_builder_, "json");
// Metadata (technically supposed to be a Map<string, string>)
AppendString(&string_builder_, "");
WriteRecord(OpCode::kChannel, string_builder_.Result());
}
void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
const Context &context) {
CHECK_NOTNULL(context.data);
string_builder_.Reset();
// Channel ID
AppendInt16(&string_builder_, channel_id);
// Queue Index
AppendInt32(&string_builder_, context.queue_index);
// Log time, and publish time. Since we don't log a logged time, just use
// published time.
// TODO(james): If we use this for multi-node logfiles, use distributed clock.
AppendInt64(&string_builder_,
context.monotonic_event_time.time_since_epoch().count());
AppendInt64(&string_builder_,
context.monotonic_event_time.time_since_epoch().count());
CHECK(flatbuffers::Verify(*channel->schema(),
*channel->schema()->root_table(),
static_cast<const uint8_t *>(context.data),
static_cast<size_t>(context.size)))
<< ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
<< channel->type()->c_str();
aos::FlatbufferToJson(&string_builder_, channel->schema(),
static_cast<const uint8_t *>(context.data));
WriteRecord(OpCode::kMessage, string_builder_.Result());
}
void McapLogger::WriteRecord(OpCode op, std::string_view record) {
output_.put(static_cast<char>(op));
uint64_t record_length = record.size();
output_.write(reinterpret_cast<const char *>(&record_length),
sizeof(record_length));
output_ << record;
}
void McapLogger::AppendString(FastStringBuilder *builder,
std::string_view string) {
AppendInt32(builder, string.size());
builder->Append(string);
}
namespace {
template <typename T>
static void AppendInt(FastStringBuilder *builder, T val) {
builder->Append(
std::string_view(reinterpret_cast<const char *>(&val), sizeof(T)));
}
} // namespace
void McapLogger::AppendInt16(FastStringBuilder *builder, uint16_t val) {
AppendInt(builder, val);
}
void McapLogger::AppendInt32(FastStringBuilder *builder, uint32_t val) {
AppendInt(builder, val);
}
void McapLogger::AppendInt64(FastStringBuilder *builder, uint64_t val) {
AppendInt(builder, val);
}
} // namespace aos