Add a PackRemoteMessage method which doesn't malloc
This lets us pack directly into the buffer to write to disk, removing
mallocs and allowing us to fix heap fragmentation by reducing memory
churn. A future patch will use it.
Change-Id: I05b312a4e4bf87f6da22f1d9a84ab71ec3ebab3d
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index ca2789f..3b82e98 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -61,6 +61,7 @@
"//aos:flatbuffers",
"//aos/containers:resizeable_buffer",
"//aos/events:event_loop",
+ "//aos/network:remote_message_fbs",
"//aos/util:file",
"@com_github_gflags_gflags//:gflags",
"@com_github_google_flatbuffers//:flatbuffers",
@@ -559,6 +560,7 @@
srcs = ["logfile_utils_test.cc"],
data = [
":logger_fbs_reflection_out",
+ "//aos/network:remote_message_fbs_reflection_out",
],
target_compatible_with = ["@platforms//os:linux"],
deps = [
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 0dc4468..36116a3 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -825,6 +825,7 @@
const auto start = event_loop_->monotonic_now();
// And now handle the special message contents channel. Copy the
// message into a FlatBufferBuilder and save it to disk.
+ //
// TODO(austin): We can be more efficient here when we start to
// care...
flatbuffers::FlatBufferBuilder fbb;
@@ -834,38 +835,20 @@
flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
-
- logger::MessageHeader::Builder message_header_builder(fbb);
-
// TODO(austin): This needs to check the channel_index and confirm
// that it should be logged before squirreling away the timestamp to
// disk. We don't want to log irrelevant timestamps.
- // Note: this must match the same order as MessageBridgeServer and
- // PackMessage. We want identical headers to have identical
- // on-the-wire formats to make comparing them easier.
-
// Translate from the channel index that the event loop uses to the
// channel index in the log file.
- message_header_builder.add_channel_index(
- event_loop_to_logged_channel_index_[msg->channel_index()]);
-
- message_header_builder.add_queue_index(msg->queue_index());
- message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
- message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
-
- message_header_builder.add_monotonic_remote_time(
- msg->monotonic_remote_time());
- message_header_builder.add_realtime_remote_time(
- msg->realtime_remote_time());
- message_header_builder.add_remote_queue_index(msg->remote_queue_index());
+ const int channel_index =
+ event_loop_to_logged_channel_index_[msg->channel_index()];
const aos::monotonic_clock::time_point monotonic_timestamp_time =
f.fetcher->context().monotonic_event_time;
- message_header_builder.add_monotonic_timestamp_time(
- monotonic_timestamp_time.time_since_epoch().count());
- fbb.FinishSizePrefixed(message_header_builder.Finish());
+ fbb.FinishSizePrefixed(
+ PackRemoteMessage(&fbb, msg, channel_index, monotonic_timestamp_time));
const auto end = event_loop_->monotonic_now();
RecordCreateMessageTime(start, end, f);
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 2eeab52..7f32f44 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -285,6 +285,128 @@
}
}
+// Do the magic dance to convert the endianness of the data and append it to the
+// buffer.
+namespace {
+
+// TODO(austin): Look at the generated code to see if building the header is
+// efficient or not.
+template <typename T>
+uint8_t *Push(uint8_t *buffer, const T data) {
+ const T endian_data = flatbuffers::EndianScalar<T>(data);
+ std::memcpy(buffer, &endian_data, sizeof(T));
+ return buffer + sizeof(T);
+}
+
+uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
+ std::memcpy(buffer, data, size);
+ return buffer + size;
+}
+
+uint8_t *Pad(uint8_t *buffer, size_t padding) {
+ std::memset(buffer, 0, padding);
+ return buffer + padding;
+}
+} // namespace
+
+flatbuffers::Offset<MessageHeader> PackRemoteMessage(
+ flatbuffers::FlatBufferBuilder *fbb,
+ const message_bridge::RemoteMessage *msg, int channel_index,
+ const aos::monotonic_clock::time_point monotonic_timestamp_time) {
+ logger::MessageHeader::Builder message_header_builder(*fbb);
+ // Note: this must match the same order as MessageBridgeServer and
+ // PackMessage. We want identical headers to have identical
+ // on-the-wire formats to make comparing them easier.
+
+ message_header_builder.add_channel_index(channel_index);
+
+ message_header_builder.add_queue_index(msg->queue_index());
+ message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
+ message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
+
+ message_header_builder.add_monotonic_remote_time(
+ msg->monotonic_remote_time());
+ message_header_builder.add_realtime_remote_time(msg->realtime_remote_time());
+ message_header_builder.add_remote_queue_index(msg->remote_queue_index());
+
+ message_header_builder.add_monotonic_timestamp_time(
+ monotonic_timestamp_time.time_since_epoch().count());
+
+ return message_header_builder.Finish();
+}
+
+size_t PackRemoteMessageInline(
+ uint8_t *buffer, const message_bridge::RemoteMessage *msg,
+ int channel_index,
+ const aos::monotonic_clock::time_point monotonic_timestamp_time) {
+ const flatbuffers::uoffset_t message_size = PackRemoteMessageSize();
+
+ // clang-format off
+ // header:
+ // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
+ buffer = Push<flatbuffers::uoffset_t>(
+ buffer, message_size - sizeof(flatbuffers::uoffset_t));
+ // +0x04 | 20 00 00 00 | UOffset32 | 0x00000020 (32) Loc: +0x24 | offset to root table `aos.logger.MessageHeader`
+ buffer = Push<flatbuffers::uoffset_t>(buffer, 0x20);
+ //
+ // padding:
+ // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
+ buffer = Pad(buffer, 6);
+ //
+ // vtable (aos.logger.MessageHeader):
+ // +0x0E | 16 00 | uint16_t | 0x0016 (22) | size of this vtable
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x16);
+ // +0x10 | 3C 00 | uint16_t | 0x003C (60) | size of referring table
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x3c);
+ // +0x12 | 38 00 | VOffset16 | 0x0038 (56) | offset to field `channel_index` (id: 0)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
+ // +0x14 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `monotonic_sent_time` (id: 1)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
+ // +0x16 | 24 00 | VOffset16 | 0x0024 (36) | offset to field `realtime_sent_time` (id: 2)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x24);
+ // +0x18 | 34 00 | VOffset16 | 0x0034 (52) | offset to field `queue_index` (id: 3)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
+ // +0x1A | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
+ // +0x1C | 1C 00 | VOffset16 | 0x001C (28) | offset to field `monotonic_remote_time` (id: 5)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
+ // +0x1E | 14 00 | VOffset16 | 0x0014 (20) | offset to field `realtime_remote_time` (id: 6)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
+ // +0x20 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `remote_queue_index` (id: 7)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
+ // +0x22 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `monotonic_timestamp_time` (id: 8)
+ buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
+ //
+ // root_table (aos.logger.MessageHeader):
+ // +0x24 | 16 00 00 00 | SOffset32 | 0x00000016 (22) Loc: +0x0E | offset to vtable
+ buffer = Push<flatbuffers::uoffset_t>(buffer, 0x16);
+ // +0x28 | F6 0B D8 11 A4 A8 B1 71 | int64_t | 0x71B1A8A411D80BF6 (8192514619791117302) | table field `monotonic_timestamp_time` (Long)
+ buffer = Push<int64_t>(buffer,
+ monotonic_timestamp_time.time_since_epoch().count());
+ // +0x30 | 00 00 00 00 | uint8_t[4] | .... | padding
+ // TODO(austin): Can we re-arrange the order to ditch the padding?
+ // (Answer is yes, but what is the impact elsewhere? It will change the
+ // binary format)
+ buffer = Pad(buffer, 4);
+ // +0x34 | 75 00 00 00 | uint32_t | 0x00000075 (117) | table field `remote_queue_index` (UInt)
+ buffer = Push<uint32_t>(buffer, msg->remote_queue_index());
+ // +0x38 | AA B0 43 0A 35 BE FA D2 | int64_t | 0xD2FABE350A43B0AA (-3244071446552268630) | table field `realtime_remote_time` (Long)
+ buffer = Push<int64_t>(buffer, msg->realtime_remote_time());
+ // +0x40 | D5 40 30 F3 C1 A7 26 1D | int64_t | 0x1D26A7C1F33040D5 (2100550727665467605) | table field `monotonic_remote_time` (Long)
+ buffer = Push<int64_t>(buffer, msg->monotonic_remote_time());
+ // +0x48 | 5B 25 32 A1 4A E8 46 CA | int64_t | 0xCA46E84AA132255B (-3871151422448720549) | table field `realtime_sent_time` (Long)
+ buffer = Push<int64_t>(buffer, msg->realtime_sent_time());
+ // +0x50 | 49 7D 45 1F 8C 36 6B A3 | int64_t | 0xA36B368C1F457D49 (-6671178447571288759) | table field `monotonic_sent_time` (Long)
+ buffer = Push<int64_t>(buffer, msg->monotonic_sent_time());
+ // +0x58 | 33 00 00 00 | uint32_t | 0x00000033 (51) | table field `queue_index` (UInt)
+ buffer = Push<uint32_t>(buffer, msg->queue_index());
+ // +0x5C | 76 00 00 00 | uint32_t | 0x00000076 (118) | table field `channel_index` (UInt)
+ buffer = Push<uint32_t>(buffer, channel_index);
+ // clang-format on
+
+ return message_size;
+}
+
flatbuffers::Offset<MessageHeader> PackMessage(
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, LogType log_type) {
@@ -472,30 +594,6 @@
LOG(FATAL);
}
-// Do the magic dance to convert the endianness of the data and append it to the
-// buffer.
-namespace {
-
-// TODO(austin): Look at the generated code to see if building the header is
-// efficient or not.
-template <typename T>
-uint8_t *Push(uint8_t *buffer, const T data) {
- const T endian_data = flatbuffers::EndianScalar<T>(data);
- std::memcpy(buffer, &endian_data, sizeof(T));
- return buffer + sizeof(T);
-}
-
-uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
- std::memcpy(buffer, data, size);
- return buffer + size;
-}
-
-uint8_t *Pad(uint8_t *buffer, size_t padding) {
- std::memset(buffer, 0, padding);
- return buffer + padding;
-}
-} // namespace
-
size_t PackMessageInline(uint8_t *buffer, const Context &context,
int channel_index, LogType log_type) {
const flatbuffers::uoffset_t message_size =
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 951efdc..16f3675 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -23,6 +23,7 @@
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/flatbuffers.h"
+#include "aos/network/remote_message_generated.h"
#include "flatbuffers/flatbuffers.h"
namespace aos::logger {
@@ -194,6 +195,17 @@
aos::monotonic_clock::min_time;
};
+// Repacks the provided RemoteMessage into fbb.
+flatbuffers::Offset<MessageHeader> PackRemoteMessage(
+ flatbuffers::FlatBufferBuilder *fbb,
+ const message_bridge::RemoteMessage *msg, int channel_index,
+ const aos::monotonic_clock::time_point monotonic_timestamp_time);
+
+constexpr flatbuffers::uoffset_t PackRemoteMessageSize() { return 96u; }
+size_t PackRemoteMessageInline(
+ uint8_t *data, const message_bridge::RemoteMessage *msg, int channel_index,
+ const aos::monotonic_clock::time_point monotonic_timestamp_time);
+
// Packes a message pointed to by the context into a MessageHeader.
flatbuffers::Offset<MessageHeader> PackMessage(
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index b7cf90b..c8d16e0 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -25,6 +25,7 @@
namespace testing {
namespace chrono = std::chrono;
using aos::testing::ArtifactPath;
+using aos::message_bridge::RemoteMessage;
// Adapter class to make it easy to test DetachedBufferWriter without adding
// test only boilerplate to DetachedBufferWriter.
@@ -2874,6 +2875,42 @@
return context;
}
+ aos::monotonic_clock::time_point RandomMonotonic() {
+ std::uniform_int_distribution<int64_t> time_distribution(
+ 0, std::numeric_limits<int64_t>::max());
+ return aos::monotonic_clock::epoch() +
+ chrono::nanoseconds(time_distribution(random_number_generator_));
+ }
+
+ aos::SizePrefixedFlatbufferDetachedBuffer<message_bridge::RemoteMessage>
+ RandomRemoteMessage() {
+ std::uniform_int_distribution<uint8_t> uint8_distribution(
+ std::numeric_limits<uint8_t>::min(),
+ std::numeric_limits<uint8_t>::max());
+
+ std::uniform_int_distribution<int64_t> time_distribution(
+ std::numeric_limits<int64_t>::min(),
+ std::numeric_limits<int64_t>::max());
+
+ flatbuffers::FlatBufferBuilder fbb;
+ message_bridge::RemoteMessage::Builder builder(fbb);
+ builder.add_queue_index(uint8_distribution(random_number_generator_));
+
+ builder.add_monotonic_sent_time(
+ time_distribution(random_number_generator_));
+ builder.add_realtime_sent_time(time_distribution(random_number_generator_));
+ builder.add_monotonic_remote_time(
+ time_distribution(random_number_generator_));
+ builder.add_realtime_remote_time(
+ time_distribution(random_number_generator_));
+
+ builder.add_remote_queue_index(
+ uint8_distribution(random_number_generator_));
+
+ fbb.FinishSizePrefixed(builder.Finish());
+ return fbb.Release();
+ }
+
std::vector<uint8_t> RandomData() {
std::vector<uint8_t> result;
std::uniform_int_distribution<int> length_distribution(1, 32);
@@ -2966,8 +3003,50 @@
}
}
-// TODO(austin): I need a method to cpoy the RemoteMessage without mallocing
-// too.
+// Tests that all variations of PackMessage are equivilent to the inline
+// PackMessage used to avoid allocations.
+TEST_F(InlinePackMessage, RemoteEquivilent) {
+ aos::FlatbufferVector<reflection::Schema> schema =
+ FileToFlatbuffer<reflection::Schema>(
+ ArtifactPath("aos/events/logging/logger.bfbs"));
+ std::uniform_int_distribution<uint8_t> uint8_distribution(
+ std::numeric_limits<uint8_t>::min(), std::numeric_limits<uint8_t>::max());
+
+ for (int i = 0; i < 100; ++i) {
+ aos::SizePrefixedFlatbufferDetachedBuffer<RemoteMessage> random_msg =
+ RandomRemoteMessage();
+ const size_t channel_index = uint8_distribution(random_number_generator_);
+ const monotonic_clock::time_point monotonic_timestamp_time =
+ RandomMonotonic();
+
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+ fbb.FinishSizePrefixed(PackRemoteMessage(
+ &fbb, &random_msg.message(), channel_index, monotonic_timestamp_time));
+
+ VLOG(1) << absl::BytesToHexString(std::string_view(
+ reinterpret_cast<const char *>(fbb.GetBufferSpan().data()),
+ fbb.GetBufferSpan().size()));
+
+ // Make sure that both the builder and inline method agree on sizes.
+ ASSERT_EQ(fbb.GetSize(), PackRemoteMessageSize());
+
+ // Initialize the buffer to something nonzer to make sure all the padding
+ // bytes are set to 0.
+ std::vector<uint8_t> repacked_message(PackRemoteMessageSize(), 67);
+
+ // And verify packing inline works as expected.
+ EXPECT_EQ(
+ repacked_message.size(),
+ PackRemoteMessageInline(repacked_message.data(), &random_msg.message(),
+ channel_index, monotonic_timestamp_time));
+ EXPECT_EQ(absl::Span<uint8_t>(repacked_message),
+ absl::Span<uint8_t>(fbb.GetBufferSpan().data(),
+ fbb.GetBufferSpan().size()))
+ << AnnotateBinaries(schema, "aos/events/logging/logger.bfbs",
+ fbb.GetBufferSpan());
+ }
+}
} // namespace testing
} // namespace logger