aos: Let log_stats find inefficiently packed flatbuffer messages
James Kuszmaul found that some messages are inefficiently packed. This
was causing unnecessary CPU usage increases of the logger.
In order to find any other channels with inefficiently packed
messages, this patch adds a new `--print_repack_size_diffs` flag. This
flag prints the number of bytes saved when messages on the
corresponding channels are naively repacked. The repacking method is
currently _very_ naive. Only use it when necessary. Otherwise, it'll
take a long time to analyze your log. I recommend using it with the
`--run_for` flag.
Change-Id: I0e18acccddb3d40af138559b40d356f22960b149
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index eef0753..2819a0b 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -26,6 +26,12 @@
"If set to a positive value, only process the log for this many seconds. "
"Otherwise, process the log until the end of the log.");
+DEFINE_bool(
+ print_repack_size_diffs, false,
+ "Analyze how many bytes could be saved in each message when converted to "
+ "JSON and back. This can be helpful to identify code that is generating "
+ "inefficiently packed flatbuffer messages.");
+
// This class implements a histogram for tracking message period
// percentiles.
class Histogram {
@@ -117,11 +123,14 @@
class ChannelStats {
public:
ChannelStats(const aos::Channel *channel, const aos::Node *destination_node,
- aos::SimulatedEventLoopFactory *factory)
+ aos::SimulatedEventLoopFactory *factory,
+ const reflection::Schema *schema)
: channel_(channel),
config_(factory->configuration()),
factory_(factory),
- destination_node_(destination_node) {
+ schema_(CHECK_NOTNULL(schema)),
+ destination_node_(destination_node),
+ flatbuffer_type_(schema) {
// Multi-node channel
if (channel_->has_source_node() && channel_->has_destination_nodes() &&
channel_->destination_nodes()->size() > 0) {
@@ -137,6 +146,22 @@
// Adds a sample to the statistics.
void Add(const aos::Context &context) {
+ if (context.size > 0 && context.data != nullptr) {
+ // Perform a very naive repacking of the message. Ideally, we'd use
+ // something like RecursiveCopyFlatBuffer that works with schemas.
+ // For now, this is good enough.
+ const std::string json = aos::FlatbufferToJson(
+ schema_, static_cast<const uint8_t *>(context.data));
+ flatbuffers::DetachedBuffer buffer =
+ aos::JsonToFlatbuffer(json, flatbuffer_type_);
+
+ const ssize_t packed_size_reduction = static_cast<ssize_t>(context.size) -
+ static_cast<ssize_t>(buffer.size());
+ max_packed_size_reduction_ =
+ std::max(max_packed_size_reduction_, packed_size_reduction);
+ total_packed_size_reduction_ += packed_size_reduction;
+ }
+
max_message_size_ = std::max(max_message_size_, context.size);
total_message_size_ += context.size;
total_num_messages_++;
@@ -187,6 +212,10 @@
size_t max_message_size() const { return max_message_size_; }
size_t total_num_messages() const { return total_num_messages_; }
+ ssize_t max_packed_size_reduction() const {
+ return max_packed_size_reduction_;
+ }
+
double avg_messages_per_sec() const {
return total_num_messages_ / SecondsActive();
}
@@ -205,6 +234,10 @@
return total_message_size_ / SecondsActive();
}
+ ssize_t avg_packed_size_reduction() const {
+ return total_packed_size_reduction_ / total_num_messages_;
+ }
+
aos::realtime_clock::time_point channel_end_time() const {
return channel_end_time_;
}
@@ -234,6 +267,7 @@
const aos::Channel *channel_;
const aos::Configuration *config_;
aos::SimulatedEventLoopFactory *factory_;
+ const reflection::Schema *const schema_;
aos::realtime_clock::time_point channel_end_time_ =
aos::realtime_clock::min_time;
aos::monotonic_clock::time_point first_message_time_ =
@@ -252,6 +286,12 @@
size_t max_message_size_ = 0;
size_t total_message_size_ = 0;
+ // The size reduction (in bytes) after a naive repacking. A negative number
+ // indicates that the repacking generated a _bigger_ message than the
+ // original message.
+ ssize_t max_packed_size_reduction_ = 0;
+ ssize_t total_packed_size_reduction_ = 0;
+
// Count of messages which had remote timestamps
size_t num_messages_with_remote_ = 0;
// Sum of latencies in all messages sent on this channel if multinode
@@ -261,6 +301,8 @@
const aos::Node *source_node_ = nullptr;
const aos::Node *destination_node_;
+
+ const aos::FlatbufferType flatbuffer_type_;
};
struct LogfileStats {
@@ -340,16 +382,24 @@
}
// Add a record to the stats vector.
- channel_stats.push_back({channel, node, &log_reader_factory});
+ channel_stats.push_back(ChannelStats{
+ channel, node, &log_reader_factory,
+ aos::configuration::GetSchema(reader.configuration(),
+ channel->type()->string_view())});
// Lambda to read messages and parse for information
- stats_event_loop->MakeRawNoArgWatcher(
- channel,
- [&logfile_stats, &channel_stats, it](const aos::Context &context) {
- channel_stats[it].Add(context);
+ auto watcher = [&logfile_stats, &channel_stats,
+ it](const aos::Context &context) {
+ channel_stats[it].Add(context);
- // Update the overall logfile statistics
- logfile_stats.logfile_length += context.size;
- });
+ // Update the overall logfile statistics
+ logfile_stats.logfile_length += context.size;
+ };
+ if (FLAGS_print_repack_size_diffs) {
+ stats_event_loop->MakeRawWatcher(
+ channel, std::bind(watcher, ::std::placeholders::_1));
+ } else {
+ stats_event_loop->MakeRawNoArgWatcher(channel, watcher);
+ }
it++;
// TODO (Stephan): Frequency of messages per second
// - Sliding window
@@ -399,6 +449,13 @@
<< "bytes, " << channel_stats[i].Percentile() << ", "
<< channel_stats[i].AvgLatency();
std::cout << std::endl;
+ if (FLAGS_print_repack_size_diffs) {
+ std::cout << " " << channel_stats[i].avg_packed_size_reduction()
+ << " bytes packed reduction avg, "
+ << channel_stats[i].max_packed_size_reduction()
+ << " bytes packed reduction max";
+ std::cout << std::endl;
+ }
}
}
}