blob: 9ab4012948af4a8b7c4a932b8597d10612925232 [file] [log] [blame]
James Kuszmaul4ed5fb12022-03-22 15:20:04 -07001#include "aos/util/mcap_logger.h"
2
3#include "absl/strings/str_replace.h"
4#include "single_include/nlohmann/json.hpp"
5
6namespace aos {
7nlohmann::json JsonSchemaForFlatbuffer(const FlatbufferType &type,
8 JsonSchemaRecursion recursion_level) {
9 nlohmann::json schema;
10 if (recursion_level == JsonSchemaRecursion::kTopLevel) {
11 schema["$schema"] = "https://json-schema.org/draft/2020-12/schema";
12 }
13 schema["type"] = "object";
14 nlohmann::json properties;
15 for (int index = 0; index < type.NumberFields(); ++index) {
16 nlohmann::json field;
17 const bool is_array = type.FieldIsRepeating(index);
18 if (type.FieldIsSequence(index)) {
19 // For sub-tables/structs, just recurse.
20 nlohmann::json subtype = JsonSchemaForFlatbuffer(
21 type.FieldType(index), JsonSchemaRecursion::kNested);
22 if (is_array) {
23 field["type"] = "array";
24 field["items"] = subtype;
25 } else {
26 field = subtype;
27 }
28 } else {
29 std::string elementary_type;
30 switch (type.FieldElementaryType(index)) {
31 case flatbuffers::ET_UTYPE:
32 case flatbuffers::ET_CHAR:
33 case flatbuffers::ET_UCHAR:
34 case flatbuffers::ET_SHORT:
35 case flatbuffers::ET_USHORT:
36 case flatbuffers::ET_INT:
37 case flatbuffers::ET_UINT:
38 case flatbuffers::ET_LONG:
39 case flatbuffers::ET_ULONG:
40 case flatbuffers::ET_FLOAT:
41 case flatbuffers::ET_DOUBLE:
42 elementary_type = "number";
43 break;
44 case flatbuffers::ET_BOOL:
45 elementary_type = "boolean";
46 break;
47 case flatbuffers::ET_STRING:
48 elementary_type = "string";
49 break;
50 case flatbuffers::ET_SEQUENCE:
51 if (type.FieldIsEnum(index)) {
52 elementary_type = "string";
53 } else {
54 LOG(FATAL) << "Should not encounter any sequence fields here.";
55 }
56 break;
57 }
58 if (is_array) {
59 field["type"] = "array";
60 field["items"]["type"] = elementary_type;
61 } else {
62 field["type"] = elementary_type;
63 }
64 }
65 // the nlohmann::json [] operator needs an actual string, not just a
66 // string_view :(.
67 properties[std::string(type.FieldName(index))] = field;
68 }
69 schema["properties"] = properties;
70 return schema;
71}
72
73McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path)
74 : output_(output_path) {
75 event_loop->SkipTimingReport();
76 event_loop->SkipAosLog();
77 CHECK(output_);
78 WriteMagic();
79 WriteHeader();
80 uint16_t id = 1;
81 for (const Channel *channel : *event_loop->configuration()->channels()) {
82 if (!configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
83 continue;
84 }
85
86 WriteSchema(id, channel);
87
88 // Write out the channel entry that uses the schema (we just re-use the
89 // chema ID for the channel ID, since we aren't deduplicating schemas for
90 // channels that are of the same type).
91 WriteChannel(id, id, channel);
92
93 event_loop->MakeRawWatcher(
94 channel, [this, id, channel](const Context &context, const void *) {
95 WriteMessage(id, channel, context);
96 });
97 ++id;
98 }
99}
100
101McapLogger::~McapLogger() {
102 WriteDataEnd();
103 WriteFooter();
104 WriteMagic();
105}
106
107void McapLogger::WriteMagic() { output_ << "\x89MCAP0\r\n"; }
108
109void McapLogger::WriteHeader() {
110 string_builder_.Reset();
111 // "profile"
112 AppendString(&string_builder_, "x-aos");
113 // "library"
114 AppendString(&string_builder_, "AOS MCAP converter");
115 WriteRecord(OpCode::kHeader, string_builder_.Result());
116}
117
118void McapLogger::WriteFooter() {
119 string_builder_.Reset();
120 // Offsets and CRC32 for summary section, which we don't populate.
121 AppendInt64(&string_builder_, 0);
122 AppendInt64(&string_builder_, 0);
123 AppendInt32(&string_builder_, 0);
124 WriteRecord(OpCode::kFooter, string_builder_.Result());
125}
126
127void McapLogger::WriteDataEnd() {
128 string_builder_.Reset();
129 // CRC32 for the data, which we are too lazy to calculate.
130 AppendInt32(&string_builder_, 0);
131 WriteRecord(OpCode::kDataEnd, string_builder_.Result());
132}
133
134void McapLogger::WriteSchema(const uint16_t id, const aos::Channel *channel) {
135 CHECK(channel->has_schema());
136 std::string schema = JsonSchemaForFlatbuffer({channel->schema()}).dump();
137
138 // Write out the schema (we don't bother deduplicating schema types):
139 string_builder_.Reset();
140 // Schema ID
141 AppendInt16(&string_builder_, id);
142 // Type name
143 AppendString(&string_builder_, channel->type()->string_view());
144 // Encoding
145 AppendString(&string_builder_, "jsonschema");
146 // Actual schema itself
147 AppendString(&string_builder_, schema);
148 WriteRecord(OpCode::kSchema, string_builder_.Result());
149}
150
151void McapLogger::WriteChannel(const uint16_t id, const uint16_t schema_id,
152 const aos::Channel *channel) {
153 string_builder_.Reset();
154 // Channel ID
155 AppendInt16(&string_builder_, id);
156 // Schema ID
157 AppendInt16(&string_builder_, schema_id);
158 // Topic name
159 AppendString(&string_builder_,
160 absl::StrCat(channel->name()->string_view(), " ",
161 channel->type()->string_view()));
162 // Encoding
163 AppendString(&string_builder_, "json");
164 // Metadata (technically supposed to be a Map<string, string>)
165 AppendString(&string_builder_, "");
166 WriteRecord(OpCode::kChannel, string_builder_.Result());
167}
168
169void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
170 const Context &context) {
171 CHECK_NOTNULL(context.data);
172
173 string_builder_.Reset();
174 // Channel ID
175 AppendInt16(&string_builder_, channel_id);
176 // Queue Index
177 AppendInt32(&string_builder_, context.queue_index);
178 // Log time, and publish time. Since we don't log a logged time, just use
179 // published time.
180 // TODO(james): If we use this for multi-node logfiles, use distributed clock.
181 AppendInt64(&string_builder_,
182 context.monotonic_event_time.time_since_epoch().count());
183 AppendInt64(&string_builder_,
184 context.monotonic_event_time.time_since_epoch().count());
185
186 CHECK(flatbuffers::Verify(*channel->schema(),
187 *channel->schema()->root_table(),
188 static_cast<const uint8_t *>(context.data),
189 static_cast<size_t>(context.size)))
190 << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
191 << channel->type()->c_str();
192
193 aos::FlatbufferToJson(&string_builder_, channel->schema(),
194 static_cast<const uint8_t *>(context.data));
195
196 WriteRecord(OpCode::kMessage, string_builder_.Result());
197}
198
199void McapLogger::WriteRecord(OpCode op, std::string_view record) {
200 output_.put(static_cast<char>(op));
201 uint64_t record_length = record.size();
202 output_.write(reinterpret_cast<const char *>(&record_length),
203 sizeof(record_length));
204 output_ << record;
205}
206
207void McapLogger::AppendString(FastStringBuilder *builder,
208 std::string_view string) {
209 AppendInt32(builder, string.size());
210 builder->Append(string);
211}
212
213namespace {
214template <typename T>
215static void AppendInt(FastStringBuilder *builder, T val) {
216 builder->Append(
217 std::string_view(reinterpret_cast<const char *>(&val), sizeof(T)));
218}
219} // namespace
220
221void McapLogger::AppendInt16(FastStringBuilder *builder, uint16_t val) {
222 AppendInt(builder, val);
223}
224
225void McapLogger::AppendInt32(FastStringBuilder *builder, uint32_t val) {
226 AppendInt(builder, val);
227}
228
229void McapLogger::AppendInt64(FastStringBuilder *builder, uint64_t val) {
230 AppendInt(builder, val);
231}
232} // namespace aos