Update log_stats to handle reboots better
Moving a few variables/functions from log_stats::main into a new class (for
improved clarity and brevity of the main function) and then adapting the new
class to match closely how log_cat operates. This involved copy/paste'ing some
functionality from log_cat for improved handling of log files that contain
reboots.
Change-Id: Ic4b603cc1d67d5ebffe99fdee8c6663bd1531ea7
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index 2819a0b..c0540bf 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -313,6 +313,130 @@
aos::realtime_clock::min_time;
};
+class LogStatsApplication {
+ public:
+ LogStatsApplication(aos::EventLoop *event_loop,
+ aos::SimulatedEventLoopFactory *log_reader_factory,
+ aos::logger::LogReader *reader)
+ : event_loop_(event_loop),
+ node_(event_loop_->node()),
+ realtime_start_time_(reader->realtime_start_time(node_)),
+ channel_stats_() {
+ event_loop_->SkipTimingReport();
+ event_loop_->SkipAosLog();
+
+ // Read channel info and store in vector.
+ bool found_channel = false;
+ const flatbuffers::Vector<flatbuffers::Offset<aos::Channel>> *channels =
+ event_loop_->configuration()->channels();
+
+ int channel_stats_index = 0; // Iterate through the channel_stats.
+ for (flatbuffers::uoffset_t i = 0; i < channels->size(); i++) {
+ const aos::Channel *channel = channels->Get(i);
+ if (!aos::configuration::ChannelIsReadableOnNode(channel, node_)) {
+ continue;
+ }
+
+ if (channel->name()->string_view().find(FLAGS_name) ==
+ std::string::npos) {
+ continue;
+ }
+
+ // Add a record to the stats vector.
+ channel_stats_.push_back(ChannelStats{
+ channel, node_, log_reader_factory,
+ aos::configuration::GetSchema(event_loop_->configuration(),
+ channel->type()->string_view())});
+ // Lambda to read messages, parse for information, and update stats.
+ auto watcher = [this, channel_stats_index](const aos::Context &context) {
+ this->UpdateStats(context, channel_stats_index);
+ };
+ if (FLAGS_print_repack_size_diffs) {
+ event_loop_->MakeRawWatcher(
+ channel, std::bind(watcher, ::std::placeholders::_1));
+ } else {
+ event_loop_->MakeRawNoArgWatcher(channel, watcher);
+ }
+ channel_stats_index++;
+ // TODO (Stephan): Frequency of messages per second
+ // - Sliding window
+ // - Max / Deviation
+ found_channel = true;
+ }
+ if (!found_channel) {
+ LOG(FATAL) << "Could not find any channels.";
+ }
+ }
+
+ ~LogStatsApplication() { PrintStats(); }
+
+ void PrintStats() {
+ // Print out the stats per channel and for the logfile.
+ for (size_t i = 0; i != channel_stats_.size(); i++) {
+ if (!FLAGS_excessive_size_only ||
+ (channel_stats_[i].max_message_size() * 2) <
+ static_cast<size_t>(channel_stats_[i].channel()->max_size())) {
+ if (channel_stats_[i].total_num_messages() > 0) {
+ std::cout << channel_stats_[i].channel()->name()->string_view() << " "
+ << channel_stats_[i].channel()->type()->string_view()
+ << "\n";
+
+ logfile_stats_.total_log_messages +=
+ channel_stats_[i].total_num_messages();
+ logfile_stats_.logfile_end_time =
+ std::max(logfile_stats_.logfile_end_time,
+ channel_stats_[i].channel_end_time());
+
+ if (!FLAGS_excessive_size_only) {
+ std::cout << " " << channel_stats_[i].total_num_messages()
+ << " msgs, " << channel_stats_[i].avg_messages_per_sec()
+ << "hz avg, " << channel_stats_[i].max_messages_per_sec()
+ << "hz max, " << channel_stats_[i].channel()->frequency()
+ << "hz configured max";
+ }
+ std::cout << " " << channel_stats_[i].avg_message_size()
+ << " bytes avg, "
+ << channel_stats_[i].avg_message_bandwidth()
+ << " bytes/sec avg, "
+ << channel_stats_[i].max_message_size() << " bytes max / "
+ << channel_stats_[i].channel()->max_size() << "bytes, "
+ << channel_stats_[i].Percentile() << ", "
+ << channel_stats_[i].AvgLatency();
+ std::cout << std::endl;
+ if (FLAGS_print_repack_size_diffs) {
+ std::cout << " " << channel_stats_[i].avg_packed_size_reduction()
+ << " bytes packed reduction avg, "
+ << channel_stats_[i].max_packed_size_reduction()
+ << " bytes packed reduction max";
+ std::cout << std::endl;
+ }
+ }
+ }
+ }
+ std::cout << std::setfill('-') << std::setw(80) << "-"
+ << "\nLogfile statistics:\n"
+ << "Log starts at:\t" << realtime_start_time_ << "\n"
+ << "Log ends at:\t" << logfile_stats_.logfile_end_time << "\n"
+ << "Log file size:\t" << logfile_stats_.logfile_length << "\n"
+ << "Total messages:\t" << logfile_stats_.total_log_messages
+ << "\n";
+ }
+
+ void UpdateStats(const aos::Context &context, int channel_stats_index) {
+ // Run the ChannelStats update.
+ channel_stats_[channel_stats_index].Add(context);
+ // Update the overall logfile statistics.
+ logfile_stats_.logfile_length += context.size;
+ }
+
+ private:
+ aos::EventLoop *event_loop_;
+ const aos::Node *node_;
+ aos::realtime_clock::time_point realtime_start_time_;
+ std::vector<ChannelStats> channel_stats_;
+ LogfileStats logfile_stats_;
+};
+
int main(int argc, char **argv) {
gflags::SetUsageMessage(
"Usage: \n"
@@ -337,11 +461,8 @@
aos::logger::LogReader reader(
aos::logger::SortParts(aos::logger::FindLogs(argc, argv)));
- LogfileStats logfile_stats;
- std::vector<ChannelStats> channel_stats;
-
- aos::SimulatedEventLoopFactory log_reader_factory(reader.configuration());
- reader.Register(&log_reader_factory);
+ aos::SimulatedEventLoopFactory event_loop_factory(reader.configuration());
+ reader.RegisterWithoutStarting(&event_loop_factory);
const aos::Node *node = nullptr;
@@ -358,114 +479,32 @@
}
}
- // Make an eventloop for retrieving stats
- std::unique_ptr<aos::EventLoop> stats_event_loop =
- log_reader_factory.MakeEventLoop("logstats", node);
- stats_event_loop->SkipTimingReport();
- stats_event_loop->SkipAosLog();
+ LogStatsApplication *log_stats_application = nullptr;
- // Read channel info and store in vector
- bool found_channel = false;
- const flatbuffers::Vector<flatbuffers::Offset<aos::Channel>> *channels =
- reader.configuration()->channels();
-
- int it = 0; // iterate through the channel_stats
- for (flatbuffers::uoffset_t i = 0; i < channels->size(); i++) {
- const aos::Channel *channel = channels->Get(i);
- if (!aos::configuration::ChannelIsReadableOnNode(
- channel, stats_event_loop->node())) {
- continue;
- }
-
- if (channel->name()->string_view().find(FLAGS_name) == std::string::npos) {
- continue;
- }
-
- // Add a record to the stats vector.
- channel_stats.push_back(ChannelStats{
- channel, node, &log_reader_factory,
- aos::configuration::GetSchema(reader.configuration(),
- channel->type()->string_view())});
- // Lambda to read messages and parse for information
- auto watcher = [&logfile_stats, &channel_stats,
- it](const aos::Context &context) {
- channel_stats[it].Add(context);
-
- // Update the overall logfile statistics
- logfile_stats.logfile_length += context.size;
- };
- if (FLAGS_print_repack_size_diffs) {
- stats_event_loop->MakeRawWatcher(
- channel, std::bind(watcher, ::std::placeholders::_1));
- } else {
- stats_event_loop->MakeRawNoArgWatcher(channel, watcher);
- }
- it++;
- // TODO (Stephan): Frequency of messages per second
- // - Sliding window
- // - Max / Deviation
- found_channel = true;
- }
- if (!found_channel) {
- LOG(FATAL) << "Could not find any channels";
- }
+ // Start the LogStatsApplication when the logfile starts.
+ aos::NodeEventLoopFactory *node_factory =
+ event_loop_factory.GetNodeEventLoopFactory(node);
+ reader.OnStart(node, [&log_stats_application, &event_loop_factory,
+ node_factory, &reader]() {
+ log_stats_application = node_factory->AlwaysStart<LogStatsApplication>(
+ "log_stats_application", &event_loop_factory, &reader);
+ });
+ reader.OnEnd(node, [&log_stats_application, node_factory]() {
+ CHECK(log_stats_application != nullptr);
+ node_factory->Stop(log_stats_application);
+ log_stats_application = nullptr;
+ });
if (FLAGS_run_for > 0.0) {
- log_reader_factory.RunFor(
+ event_loop_factory.RunFor(
std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<double>(FLAGS_run_for)));
} else {
- log_reader_factory.Run();
+ event_loop_factory.Run();
}
std::cout << std::endl;
- // Print out the stats per channel and for the logfile
- for (size_t i = 0; i != channel_stats.size(); i++) {
- if (!FLAGS_excessive_size_only ||
- (channel_stats[i].max_message_size() * 2) <
- static_cast<size_t>(channel_stats[i].channel()->max_size())) {
- if (channel_stats[i].total_num_messages() > 0) {
- std::cout << channel_stats[i].channel()->name()->string_view() << " "
- << channel_stats[i].channel()->type()->string_view() << "\n";
-
- logfile_stats.total_log_messages +=
- channel_stats[i].total_num_messages();
- logfile_stats.logfile_end_time =
- std::max(logfile_stats.logfile_end_time,
- channel_stats[i].channel_end_time());
-
- if (!FLAGS_excessive_size_only) {
- std::cout << " " << channel_stats[i].total_num_messages()
- << " msgs, " << channel_stats[i].avg_messages_per_sec()
- << "hz avg, " << channel_stats[i].max_messages_per_sec()
- << "hz max, " << channel_stats[i].channel()->frequency()
- << "hz configured max";
- }
- std::cout << " " << channel_stats[i].avg_message_size()
- << " bytes avg, " << channel_stats[i].avg_message_bandwidth()
- << " bytes/sec avg, " << channel_stats[i].max_message_size()
- << " bytes max / " << channel_stats[i].channel()->max_size()
- << "bytes, " << channel_stats[i].Percentile() << ", "
- << channel_stats[i].AvgLatency();
- std::cout << std::endl;
- if (FLAGS_print_repack_size_diffs) {
- std::cout << " " << channel_stats[i].avg_packed_size_reduction()
- << " bytes packed reduction avg, "
- << channel_stats[i].max_packed_size_reduction()
- << " bytes packed reduction max";
- std::cout << std::endl;
- }
- }
- }
- }
- std::cout << std::setfill('-') << std::setw(80) << "-"
- << "\nLogfile statistics:\n"
- << "Log starts at:\t" << reader.realtime_start_time(node) << "\n"
- << "Log ends at:\t" << logfile_stats.logfile_end_time << "\n"
- << "Log file size:\t" << logfile_stats.logfile_length << "\n"
- << "Total messages:\t" << logfile_stats.total_log_messages << "\n";
-
// Cleanup the created processes
reader.Deregister();