Add a PackMessage method which doesn't malloc
This lets us pack directly into the buffer to write to disk, removing
mallocs and allowing us to fix heap fragmentation by reducing memory
churn.
Change-Id: I116f379b6f59f7d0f0dbfa69f15aab33c9e6a6cd
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index ddfb53a..ca2789f 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -557,12 +557,19 @@
cc_test(
name = "logfile_utils_test",
srcs = ["logfile_utils_test.cc"],
+ data = [
+ ":logger_fbs_reflection_out",
+ ],
target_compatible_with = ["@platforms//os:linux"],
deps = [
":logfile_utils",
+ ":logger_fbs",
":test_message_fbs",
"//aos/testing:googletest",
+ "//aos/testing:path",
+ "//aos/testing:random_seed",
"//aos/testing:tmpdir",
+ "@com_github_google_flatbuffers//src:flatc_library",
],
)
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 9768584..2eeab52 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -294,6 +294,14 @@
case LogType::kLogMessage:
case LogType::kLogMessageAndDeliveryTime:
case LogType::kLogRemoteMessage:
+ // Since the timestamps are 8 byte aligned, we are going to end up adding
+ // padding in the middle of the message to pad everything out to 8 byte
+ // alignment. That's rather wasteful. To make things efficient to mmap
+ // while reading uncompressed logs, we'd actually rather the message be
+ // aligned. So, force 8 byte alignment (enough to preserve alignment
+ // inside the nested message so that we can read it without moving it)
+ // here.
+ fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8);
data_offset = fbb->CreateVector(
static_cast<const uint8_t *>(context.data), context.size);
break;
@@ -305,48 +313,454 @@
MessageHeader::Builder message_header_builder(*fbb);
message_header_builder.add_channel_index(channel_index);
+ // These are split out into very explicit serialization calls because the
+ // order here changes the order things are written out on the wire, and we
+ // want to control and understand it here. Changing the order can increase
+ // the amount of padding bytes in the middle.
+ //
+ // It is also easier to follow... And doesn't actually make things much bigger.
switch (log_type) {
case LogType::kLogRemoteMessage:
message_header_builder.add_queue_index(context.remote_queue_index);
+ message_header_builder.add_data(data_offset);
message_header_builder.add_monotonic_sent_time(
context.monotonic_remote_time.time_since_epoch().count());
message_header_builder.add_realtime_sent_time(
context.realtime_remote_time.time_since_epoch().count());
break;
- case LogType::kLogMessage:
- case LogType::kLogMessageAndDeliveryTime:
case LogType::kLogDeliveryTimeOnly:
message_header_builder.add_queue_index(context.queue_index);
message_header_builder.add_monotonic_sent_time(
context.monotonic_event_time.time_since_epoch().count());
message_header_builder.add_realtime_sent_time(
context.realtime_event_time.time_since_epoch().count());
- break;
- }
-
- switch (log_type) {
- case LogType::kLogMessage:
- case LogType::kLogRemoteMessage:
- message_header_builder.add_data(data_offset);
- break;
-
- case LogType::kLogMessageAndDeliveryTime:
- message_header_builder.add_data(data_offset);
- [[fallthrough]];
-
- case LogType::kLogDeliveryTimeOnly:
message_header_builder.add_monotonic_remote_time(
context.monotonic_remote_time.time_since_epoch().count());
message_header_builder.add_realtime_remote_time(
context.realtime_remote_time.time_since_epoch().count());
message_header_builder.add_remote_queue_index(context.remote_queue_index);
break;
+
+ case LogType::kLogMessage:
+ message_header_builder.add_queue_index(context.queue_index);
+ message_header_builder.add_data(data_offset);
+ message_header_builder.add_monotonic_sent_time(
+ context.monotonic_event_time.time_since_epoch().count());
+ message_header_builder.add_realtime_sent_time(
+ context.realtime_event_time.time_since_epoch().count());
+ break;
+
+ case LogType::kLogMessageAndDeliveryTime:
+ message_header_builder.add_queue_index(context.queue_index);
+ message_header_builder.add_remote_queue_index(context.remote_queue_index);
+ message_header_builder.add_monotonic_sent_time(
+ context.monotonic_event_time.time_since_epoch().count());
+ message_header_builder.add_realtime_sent_time(
+ context.realtime_event_time.time_since_epoch().count());
+ message_header_builder.add_monotonic_remote_time(
+ context.monotonic_remote_time.time_since_epoch().count());
+ message_header_builder.add_realtime_remote_time(
+ context.realtime_remote_time.time_since_epoch().count());
+ message_header_builder.add_data(data_offset);
+ break;
}
return message_header_builder.Finish();
}
+flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) {
+ switch (log_type) {
+ case LogType::kLogMessage:
+ return
+ // Root table size + offset.
+ sizeof(flatbuffers::uoffset_t) * 2 +
+ // 6 padding bytes to pad the header out properly.
+ 6 +
+ // vtable header (size + size of table)
+ sizeof(flatbuffers::voffset_t) * 2 +
+ // offsets to all the fields.
+ sizeof(flatbuffers::voffset_t) * 5 +
+ // pointer to vtable
+ sizeof(flatbuffers::soffset_t) +
+ // pointer to data
+ sizeof(flatbuffers::uoffset_t) +
+ // realtime_sent_time, monotonic_sent_time
+ sizeof(int64_t) * 2 +
+ // queue_index, channel_index
+ sizeof(uint32_t) * 2;
+
+ case LogType::kLogDeliveryTimeOnly:
+ return
+ // Root table size + offset.
+ sizeof(flatbuffers::uoffset_t) * 2 +
+ // 6 padding bytes to pad the header out properly.
+ 4 +
+ // vtable header (size + size of table)
+ sizeof(flatbuffers::voffset_t) * 2 +
+ // offsets to all the fields.
+ sizeof(flatbuffers::voffset_t) * 8 +
+ // pointer to vtable
+ sizeof(flatbuffers::soffset_t) +
+ // remote_queue_index
+ sizeof(uint32_t) +
+ // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
+ // monotonic_sent_time
+ sizeof(int64_t) * 4 +
+ // queue_index, channel_index
+ sizeof(uint32_t) * 2;
+
+ case LogType::kLogMessageAndDeliveryTime:
+ return
+ // Root table size + offset.
+ sizeof(flatbuffers::uoffset_t) * 2 +
+ // 4 padding bytes to pad the header out properly.
+ 4 +
+ // vtable header (size + size of table)
+ sizeof(flatbuffers::voffset_t) * 2 +
+ // offsets to all the fields.
+ sizeof(flatbuffers::voffset_t) * 8 +
+ // pointer to vtable
+ sizeof(flatbuffers::soffset_t) +
+ // pointer to data
+ sizeof(flatbuffers::uoffset_t) +
+ // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
+ // monotonic_sent_time
+ sizeof(int64_t) * 4 +
+ // remote_queue_index, queue_index, channel_index
+ sizeof(uint32_t) * 3;
+
+ case LogType::kLogRemoteMessage:
+ return
+ // Root table size + offset.
+ sizeof(flatbuffers::uoffset_t) * 2 +
+ // 6 padding bytes to pad the header out properly.
+ 6 +
+ // vtable header (size + size of table)
+ sizeof(flatbuffers::voffset_t) * 2 +
+ // offsets to all the fields.
+ sizeof(flatbuffers::voffset_t) * 5 +
+ // pointer to vtable
+ sizeof(flatbuffers::soffset_t) +
+ // realtime_sent_time, monotonic_sent_time
+ sizeof(int64_t) * 2 +
+ // pointer to data
+ sizeof(flatbuffers::uoffset_t) +
+ // queue_index, channel_index
+ sizeof(uint32_t) * 2;
+ }
+ LOG(FATAL);
+}
+
+flatbuffers::uoffset_t PackMessageSize(LogType log_type,
+ const Context &context) {
+ static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
+ "Update size logic please.");
+ const flatbuffers::uoffset_t aligned_data_length =
+ ((context.size + 7) & 0xfffffff8u);
+ switch (log_type) {
+ case LogType::kLogDeliveryTimeOnly:
+ return PackMessageHeaderSize(log_type);
+
+ case LogType::kLogMessage:
+ case LogType::kLogMessageAndDeliveryTime:
+ case LogType::kLogRemoteMessage:
+ return PackMessageHeaderSize(log_type) +
+ // Vector...
+ sizeof(flatbuffers::uoffset_t) + aligned_data_length;
+ }
+ LOG(FATAL);
+}
+
+// Do the magic dance to convert the endianness of the data and append it to the
+// buffer.
+namespace {
+
+// TODO(austin): Look at the generated code to see if building the header is
+// efficient or not.
+template <typename T>
+uint8_t *Push(uint8_t *buffer, const T data) {
+ const T endian_data = flatbuffers::EndianScalar<T>(data);
+ std::memcpy(buffer, &endian_data, sizeof(T));
+ return buffer + sizeof(T);
+}
+
+uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
+ std::memcpy(buffer, data, size);
+ return buffer + size;
+}
+
+uint8_t *Pad(uint8_t *buffer, size_t padding) {
+ std::memset(buffer, 0, padding);
+ return buffer + padding;
+}
+} // namespace
+
+size_t PackMessageInline(uint8_t *buffer, const Context &context,
+ int channel_index, LogType log_type) {
+ const flatbuffers::uoffset_t message_size =
+ PackMessageSize(log_type, context);
+
+ buffer = Push<flatbuffers::uoffset_t>(
+ buffer, message_size - sizeof(flatbuffers::uoffset_t));
+
+ // Pack all the data in. This is brittle but easy to change. Use the
+ // InlinePackMessage.Equivilent unit test to verify everything matches.
+ switch (log_type) {
+ case LogType::kLogMessage:
+ // clang-format off
+ // header:
+ // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
+ // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
+ buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
+ //
+ // padding:
+ // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
+ buffer = Pad(buffer, 6);
+ //
+ // vtable (aos.logger.MessageHeader):
+ // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0xe);
+ // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
+ // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
+ // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
+ // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
+ // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
+ // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
+ //
+ // root_table (aos.logger.MessageHeader):
+ // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
+ buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e);
+ // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long)
+ buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
+ // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long)
+ buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
+ // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
+ buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c);
+ // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt)
+ buffer = Push<uint32_t>(buffer, context.queue_index);
+ // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt)
+ buffer = Push<uint32_t>(buffer, channel_index);
+ //
+ // vector (aos.logger.MessageHeader.data):
+ // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items)
+ buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
+ // +0x40 | FF | uint8_t | 0xFF (255) | value[0]
+ // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1]
+ // +0x42 | EE | uint8_t | 0xEE (238) | value[2]
+ // +0x43 | 00 | uint8_t | 0x00 (0) | value[3]
+ // +0x44 | 20 | uint8_t | 0x20 (32) | value[4]
+ // +0x45 | 4D | uint8_t | 0x4D (77) | value[5]
+ // +0x46 | FF | uint8_t | 0xFF (255) | value[6]
+ // +0x47 | 25 | uint8_t | 0x25 (37) | value[7]
+ // +0x48 | 3C | uint8_t | 0x3C (60) | value[8]
+ // +0x49 | 17 | uint8_t | 0x17 (23) | value[9]
+ // +0x4A | 65 | uint8_t | 0x65 (101) | value[10]
+ // +0x4B | 2F | uint8_t | 0x2F (47) | value[11]
+ // +0x4C | 63 | uint8_t | 0x63 (99) | value[12]
+ // +0x4D | 58 | uint8_t | 0x58 (88) | value[13]
+ buffer = PushBytes(buffer, context.data, context.size);
+ //
+ // padding:
+ // +0x4E | 00 00 | uint8_t[2] | .. | padding
+ buffer = Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
+ // clang-format on
+ break;
+
+ case LogType::kLogDeliveryTimeOnly:
+ // clang-format off
+ // header:
+ // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
+ // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
+ buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
+ //
+ // padding:
+ // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
+ buffer = Pad(buffer, 4);
+ //
+ // vtable (aos.logger.MessageHeader):
+ // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
+ // +0x0E | 30 00 | uint16_t | 0x0030 (48) | size of referring table
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
+ // +0x10 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `channel_index` (id: 0)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
+ // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
+ // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
+ // +0x16 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `queue_index` (id: 3)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
+ // +0x18 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
+ // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
+ // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
+ // +0x1E | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
+ //
+ // root_table (aos.logger.MessageHeader):
+ // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
+ buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
+ // +0x24 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `remote_queue_index` (UInt)
+ buffer = Push<uint32_t>(buffer, context.remote_queue_index);
+ // +0x28 | C6 85 F1 AB 83 B5 CD EB | int64_t | 0xEBCDB583ABF185C6 (-1455307527440726586) | table field `realtime_remote_time` (Long)
+ buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
+ // +0x30 | 47 24 D3 97 1E 42 2D 99 | int64_t | 0x992D421E97D32447 (-7409193112790948793) | table field `monotonic_remote_time` (Long)
+ buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
+ // +0x38 | C8 B9 A7 AB 79 F2 CD 60 | int64_t | 0x60CDF279ABA7B9C8 (6975498002251626952) | table field `realtime_sent_time` (Long)
+ buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
+ // +0x40 | EA 8F 2A 0F AF 01 7A AB | int64_t | 0xAB7A01AF0F2A8FEA (-6090553694679822358) | table field `monotonic_sent_time` (Long)
+ buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
+ // +0x48 | F5 00 00 00 | uint32_t | 0x000000F5 (245) | table field `queue_index` (UInt)
+ buffer = Push<uint32_t>(buffer, context.queue_index);
+ // +0x4C | 88 00 00 00 | uint32_t | 0x00000088 (136) | table field `channel_index` (UInt)
+ buffer = Push<uint32_t>(buffer, channel_index);
+
+ // clang-format on
+ break;
+
+ case LogType::kLogMessageAndDeliveryTime:
+ // clang-format off
+ // header:
+ // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
+ // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
+ buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
+ //
+ // padding:
+ // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
+ buffer = Pad(buffer, 4);
+ //
+ // vtable (aos.logger.MessageHeader):
+ // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
+ // +0x0E | 34 00 | uint16_t | 0x0034 (52) | size of referring table
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
+ // +0x10 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `channel_index` (id: 0)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
+ // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
+ // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
+ // +0x16 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `queue_index` (id: 3)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
+ // +0x18 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `data` (id: 4)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
+ // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
+ // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
+ // +0x1E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `remote_queue_index` (id: 7)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
+ //
+ // root_table (aos.logger.MessageHeader):
+ // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
+ buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
+ // +0x24 | 30 00 00 00 | UOffset32 | 0x00000030 (48) Loc: +0x54 | offset to field `data` (vector)
+ buffer = Push<flatbuffers::uoffset_t>(buffer, 0x30);
+ // +0x28 | C4 C8 87 BF 40 6C 1F 29 | int64_t | 0x291F6C40BF87C8C4 (2963206105180129476) | table field `realtime_remote_time` (Long)
+ buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
+ // +0x30 | 0F 00 26 FD D2 6D C0 1F | int64_t | 0x1FC06DD2FD26000F (2287949363661897743) | table field `monotonic_remote_time` (Long)
+ buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
+ // +0x38 | 29 75 09 C0 73 73 BF 88 | int64_t | 0x88BF7373C0097529 (-8593022623019338455) | table field `realtime_sent_time` (Long)
+ buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
+ // +0x40 | 6D 8A AE 04 50 25 9C E9 | int64_t | 0xE99C255004AE8A6D (-1613373540899321235) | table field `monotonic_sent_time` (Long)
+ buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
+ // +0x48 | 47 00 00 00 | uint32_t | 0x00000047 (71) | table field `remote_queue_index` (UInt)
+ buffer = Push<uint32_t>(buffer, context.remote_queue_index);
+ // +0x4C | 4C 00 00 00 | uint32_t | 0x0000004C (76) | table field `queue_index` (UInt)
+ buffer = Push<uint32_t>(buffer, context.queue_index);
+ // +0x50 | 72 00 00 00 | uint32_t | 0x00000072 (114) | table field `channel_index` (UInt)
+ buffer = Push<uint32_t>(buffer, channel_index);
+ //
+ // vector (aos.logger.MessageHeader.data):
+ // +0x54 | 07 00 00 00 | uint32_t | 0x00000007 (7) | length of vector (# items)
+ buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
+ // +0x58 | B1 | uint8_t | 0xB1 (177) | value[0]
+ // +0x59 | 4A | uint8_t | 0x4A (74) | value[1]
+ // +0x5A | 50 | uint8_t | 0x50 (80) | value[2]
+ // +0x5B | 24 | uint8_t | 0x24 (36) | value[3]
+ // +0x5C | AF | uint8_t | 0xAF (175) | value[4]
+ // +0x5D | C8 | uint8_t | 0xC8 (200) | value[5]
+ // +0x5E | D5 | uint8_t | 0xD5 (213) | value[6]
+ buffer = PushBytes(buffer, context.data, context.size);
+ //
+ // padding:
+ // +0x5F | 00 | uint8_t[1] | . | padding
+ buffer = Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
+ // clang-format on
+
+ break;
+
+ case LogType::kLogRemoteMessage:
+ // This is the message we need to recreate.
+ //
+ // clang-format off
+ // header:
+ // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
+ // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
+ buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
+ //
+ // padding:
+ // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
+ buffer = Pad(buffer, 6);
+ //
+ // vtable (aos.logger.MessageHeader):
+ // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e);
+ // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
+ // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
+ // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
+ // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
+ // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
+ // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
+ //
+ // root_table (aos.logger.MessageHeader):
+ // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
+ buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E);
+ // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long)
+ buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
+ // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long)
+ buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
+ // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
+ buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C);
+ // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt)
+ buffer = Push<uint32_t>(buffer, context.remote_queue_index);
+ // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt)
+ buffer = Push<uint32_t>(buffer, channel_index);
+ //
+ // vector (aos.logger.MessageHeader.data):
+ // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items)
+ buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
+ // +0x40 | 38 | uint8_t | 0x38 (56) | value[0]
+ // +0x41 | 1A | uint8_t | 0x1A (26) | value[1]
+ // ...
+ // +0x58 | 90 | uint8_t | 0x90 (144) | value[24]
+ // +0x59 | 92 | uint8_t | 0x92 (146) | value[25]
+ buffer = PushBytes(buffer, context.data, context.size);
+ //
+ // padding:
+ // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
+ buffer = Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
+ // clang-format on
+ }
+
+ return message_size;
+}
+
SpanReader::SpanReader(std::string_view filename, bool quiet)
: filename_(filename) {
decoder_ = std::make_unique<DummyDecoder>(filename);
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index ca733d4..951efdc 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -199,6 +199,17 @@
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, LogType log_type);
+// Returns the size that the packed message from PackMessage or
+// PackMessageInline will be.
+flatbuffers::uoffset_t PackMessageSize(LogType log_type,
+ const Context &context);
+
+// Packs the provided message pointed to by context into the provided buffer.
+// This is equivalent to PackMessage, but doesn't require allocating a
+// FlatBufferBuilder underneath.
+size_t PackMessageInline(uint8_t *data, const Context &contex,
+ int channel_index, LogType log_type);
+
// Class to read chunks out of a log file.
class SpanReader {
public:
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 9e4f8b9..b7cf90b 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -1,14 +1,22 @@
#include "aos/events/logging/logfile_utils.h"
#include <chrono>
+#include <random>
#include <string>
+#include "absl/strings/escaping.h"
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/test_message_generated.h"
#include "aos/flatbuffer_merge.h"
#include "aos/flatbuffers.h"
#include "aos/json_to_flatbuffer.h"
+#include "aos/testing/path.h"
+#include "aos/testing/random_seed.h"
#include "aos/testing/tmpdir.h"
+#include "aos/util/file.h"
+#include "external/com_github_google_flatbuffers/src/annotated_binary_text_gen.h"
+#include "external/com_github_google_flatbuffers/src/binary_annotator.h"
+#include "flatbuffers/reflection_generated.h"
#include "gflags/gflags.h"
#include "gtest/gtest.h"
@@ -16,6 +24,7 @@
namespace logger {
namespace testing {
namespace chrono = std::chrono;
+using aos::testing::ArtifactPath;
// Adapter class to make it easy to test DetachedBufferWriter without adding
// test only boilerplate to DetachedBufferWriter.
@@ -2831,6 +2840,135 @@
}
}
+class InlinePackMessage : public ::testing::Test {
+ protected:
+ aos::Context RandomContext() {
+ data_ = RandomData();
+ std::uniform_int_distribution<uint32_t> uint32_distribution(
+ std::numeric_limits<uint32_t>::min(),
+ std::numeric_limits<uint32_t>::max());
+
+ std::uniform_int_distribution<int64_t> time_distribution(
+ std::numeric_limits<int64_t>::min(),
+ std::numeric_limits<int64_t>::max());
+
+ aos::Context context;
+ context.monotonic_event_time =
+ aos::monotonic_clock::epoch() +
+ chrono::nanoseconds(time_distribution(random_number_generator_));
+ context.realtime_event_time =
+ aos::realtime_clock::epoch() +
+ chrono::nanoseconds(time_distribution(random_number_generator_));
+
+ context.monotonic_remote_time =
+ aos::monotonic_clock::epoch() +
+ chrono::nanoseconds(time_distribution(random_number_generator_));
+ context.realtime_remote_time =
+ aos::realtime_clock::epoch() +
+ chrono::nanoseconds(time_distribution(random_number_generator_));
+
+ context.queue_index = uint32_distribution(random_number_generator_);
+ context.remote_queue_index = uint32_distribution(random_number_generator_);
+ context.size = data_.size();
+ context.data = data_.data();
+ return context;
+ }
+
+ std::vector<uint8_t> RandomData() {
+ std::vector<uint8_t> result;
+ std::uniform_int_distribution<int> length_distribution(1, 32);
+ std::uniform_int_distribution<uint8_t> data_distribution(
+ std::numeric_limits<uint8_t>::min(),
+ std::numeric_limits<uint8_t>::max());
+
+ const size_t length = length_distribution(random_number_generator_);
+
+ result.reserve(length);
+ for (size_t i = 0; i < length; ++i) {
+ result.emplace_back(data_distribution(random_number_generator_));
+ }
+ return result;
+ }
+
+ std::mt19937 random_number_generator_{
+ std::mt19937(::aos::testing::RandomSeed())};
+
+ std::vector<uint8_t> data_;
+};
+
+// Uses the binary schema to annotate a provided flatbuffer. Returns the
+// annotated flatbuffer.
+std::string AnnotateBinaries(
+ const aos::NonSizePrefixedFlatbuffer<reflection::Schema> &schema,
+ const std::string &schema_filename,
+ flatbuffers::span<uint8_t> binary_data) {
+ flatbuffers::BinaryAnnotator binary_annotator(
+ schema.span().data(), schema.span().size(), binary_data.data(),
+ binary_data.size());
+
+ auto annotations = binary_annotator.Annotate();
+
+ flatbuffers::AnnotatedBinaryTextGenerator text_generator(
+ flatbuffers::AnnotatedBinaryTextGenerator::Options{}, annotations,
+ binary_data.data(), binary_data.size());
+
+ text_generator.Generate(aos::testing::TestTmpDir() + "/foo.bfbs",
+ schema_filename);
+
+ return aos::util::ReadFileToStringOrDie(aos::testing::TestTmpDir() +
+ "/foo.afb");
+}
+
+// Tests that all variations of PackMessage are equivalent to the inline
+// PackMessage used to avoid allocations.
+TEST_F(InlinePackMessage, Equivilent) {
+ std::uniform_int_distribution<uint32_t> uint32_distribution(
+ std::numeric_limits<uint32_t>::min(),
+ std::numeric_limits<uint32_t>::max());
+ aos::FlatbufferVector<reflection::Schema> schema =
+ FileToFlatbuffer<reflection::Schema>(
+ ArtifactPath("aos/events/logging/logger.bfbs"));
+
+ for (const LogType type :
+ {LogType::kLogMessage, LogType::kLogDeliveryTimeOnly,
+ LogType::kLogMessageAndDeliveryTime, LogType::kLogRemoteMessage}) {
+ for (int i = 0; i < 100; ++i) {
+ aos::Context context = RandomContext();
+ const uint32_t channel_index =
+ uint32_distribution(random_number_generator_);
+
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+ fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index, type));
+
+ VLOG(1) << absl::BytesToHexString(std::string_view(
+ reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
+ fbb.GetBufferSpan().size()));
+
+ // Make sure that both the builder and inline method agree on sizes.
+ ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context))
+ << "log type " << static_cast<int>(type);
+
+ // Initialize the buffer to something nonzero to make sure all the padding
+ // bytes are set to 0.
+ std::vector<uint8_t> repacked_message(PackMessageSize(type, context), 67);
+
+ // And verify packing inline works as expected.
+ EXPECT_EQ(repacked_message.size(),
+ PackMessageInline(repacked_message.data(), context,
+ channel_index, type));
+ EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
+ absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
+ fbb.GetBufferSpan().size()))
+ << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
+ fbb.GetBufferSpan());
+ }
+ }
+}
+
+// TODO(austin): I need a method to cpoy the RemoteMessage without mallocing
+// too.
+
} // namespace testing
} // namespace logger
} // namespace aos