Update log_stats to handle reboots better

Moving a few variables/functions from log_stats::main into a new class (for
improved clarity and brevity of the main function) and then adapting the new
class to match closely how log_cat operates. This involved copy/paste'ing some
functionality from log_cat for improved handling of log files that contain
reboots.

Change-Id: Ic4b603cc1d67d5ebffe99fdee8c6663bd1531ea7
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index 2819a0b..c0540bf 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -313,6 +313,130 @@
       aos::realtime_clock::min_time;
 };
 
+class LogStatsApplication {
+ public:
+  LogStatsApplication(aos::EventLoop *event_loop,
+                      aos::SimulatedEventLoopFactory *log_reader_factory,
+                      aos::logger::LogReader *reader)
+      : event_loop_(event_loop),
+        node_(event_loop_->node()),
+        realtime_start_time_(reader->realtime_start_time(node_)),
+        channel_stats_() {
+    event_loop_->SkipTimingReport();
+    event_loop_->SkipAosLog();
+
+    // Read channel info and store in vector.
+    bool found_channel = false;
+    const flatbuffers::Vector<flatbuffers::Offset<aos::Channel>> *channels =
+        event_loop_->configuration()->channels();
+
+    int channel_stats_index = 0;  // Iterate through the channel_stats.
+    for (flatbuffers::uoffset_t i = 0; i < channels->size(); i++) {
+      const aos::Channel *channel = channels->Get(i);
+      if (!aos::configuration::ChannelIsReadableOnNode(channel, node_)) {
+        continue;
+      }
+
+      if (channel->name()->string_view().find(FLAGS_name) ==
+          std::string::npos) {
+        continue;
+      }
+
+      // Add a record to the stats vector.
+      channel_stats_.push_back(ChannelStats{
+          channel, node_, log_reader_factory,
+          aos::configuration::GetSchema(event_loop_->configuration(),
+                                        channel->type()->string_view())});
+      // Lambda to read messages, parse for information, and update stats.
+      auto watcher = [this, channel_stats_index](const aos::Context &context) {
+        this->UpdateStats(context, channel_stats_index);
+      };
+      if (FLAGS_print_repack_size_diffs) {
+        event_loop_->MakeRawWatcher(
+            channel, std::bind(watcher, ::std::placeholders::_1));
+      } else {
+        event_loop_->MakeRawNoArgWatcher(channel, watcher);
+      }
+      channel_stats_index++;
+      // TODO (Stephan): Frequency of messages per second
+      // - Sliding window
+      // - Max / Deviation
+      found_channel = true;
+    }
+    if (!found_channel) {
+      LOG(FATAL) << "Could not find any channels.";
+    }
+  }
+
+  ~LogStatsApplication() { PrintStats(); }
+
+  void PrintStats() {
+    // Print out the stats per channel and for the logfile.
+    for (size_t i = 0; i != channel_stats_.size(); i++) {
+      if (!FLAGS_excessive_size_only ||
+          (channel_stats_[i].max_message_size() * 2) <
+              static_cast<size_t>(channel_stats_[i].channel()->max_size())) {
+        if (channel_stats_[i].total_num_messages() > 0) {
+          std::cout << channel_stats_[i].channel()->name()->string_view() << " "
+                    << channel_stats_[i].channel()->type()->string_view()
+                    << "\n";
+
+          logfile_stats_.total_log_messages +=
+              channel_stats_[i].total_num_messages();
+          logfile_stats_.logfile_end_time =
+              std::max(logfile_stats_.logfile_end_time,
+                       channel_stats_[i].channel_end_time());
+
+          if (!FLAGS_excessive_size_only) {
+            std::cout << "   " << channel_stats_[i].total_num_messages()
+                      << " msgs, " << channel_stats_[i].avg_messages_per_sec()
+                      << "hz avg, " << channel_stats_[i].max_messages_per_sec()
+                      << "hz max, " << channel_stats_[i].channel()->frequency()
+                      << "hz configured max";
+          }
+          std::cout << " " << channel_stats_[i].avg_message_size()
+                    << " bytes avg, "
+                    << channel_stats_[i].avg_message_bandwidth()
+                    << " bytes/sec avg, "
+                    << channel_stats_[i].max_message_size() << " bytes max / "
+                    << channel_stats_[i].channel()->max_size() << "bytes, "
+                    << channel_stats_[i].Percentile() << ", "
+                    << channel_stats_[i].AvgLatency();
+          std::cout << std::endl;
+          if (FLAGS_print_repack_size_diffs) {
+            std::cout << "   " << channel_stats_[i].avg_packed_size_reduction()
+                      << " bytes packed reduction avg, "
+                      << channel_stats_[i].max_packed_size_reduction()
+                      << " bytes packed reduction max";
+            std::cout << std::endl;
+          }
+        }
+      }
+    }
+    std::cout << std::setfill('-') << std::setw(80) << "-"
+              << "\nLogfile statistics:\n"
+              << "Log starts at:\t" << realtime_start_time_ << "\n"
+              << "Log ends at:\t" << logfile_stats_.logfile_end_time << "\n"
+              << "Log file size:\t" << logfile_stats_.logfile_length << "\n"
+              << "Total messages:\t" << logfile_stats_.total_log_messages
+              << "\n";
+  }
+
+  void UpdateStats(const aos::Context &context, int channel_stats_index) {
+    // Run the ChannelStats update.
+    channel_stats_[channel_stats_index].Add(context);
+    // Update the overall logfile statistics.
+    logfile_stats_.logfile_length += context.size;
+  }
+
+ private:
+  aos::EventLoop *event_loop_;
+  const aos::Node *node_;
+  aos::realtime_clock::time_point realtime_start_time_;
+  std::vector<ChannelStats> channel_stats_;
+  LogfileStats logfile_stats_;
+};
+
 int main(int argc, char **argv) {
   gflags::SetUsageMessage(
       "Usage: \n"
@@ -337,11 +461,8 @@
   aos::logger::LogReader reader(
       aos::logger::SortParts(aos::logger::FindLogs(argc, argv)));
 
-  LogfileStats logfile_stats;
-  std::vector<ChannelStats> channel_stats;
-
-  aos::SimulatedEventLoopFactory log_reader_factory(reader.configuration());
-  reader.Register(&log_reader_factory);
+  aos::SimulatedEventLoopFactory event_loop_factory(reader.configuration());
+  reader.RegisterWithoutStarting(&event_loop_factory);
 
   const aos::Node *node = nullptr;
 
@@ -358,114 +479,32 @@
     }
   }
 
-  // Make an eventloop for retrieving stats
-  std::unique_ptr<aos::EventLoop> stats_event_loop =
-      log_reader_factory.MakeEventLoop("logstats", node);
-  stats_event_loop->SkipTimingReport();
-  stats_event_loop->SkipAosLog();
+  LogStatsApplication *log_stats_application = nullptr;
 
-  // Read channel info and store in vector
-  bool found_channel = false;
-  const flatbuffers::Vector<flatbuffers::Offset<aos::Channel>> *channels =
-      reader.configuration()->channels();
-
-  int it = 0;  // iterate through the channel_stats
-  for (flatbuffers::uoffset_t i = 0; i < channels->size(); i++) {
-    const aos::Channel *channel = channels->Get(i);
-    if (!aos::configuration::ChannelIsReadableOnNode(
-            channel, stats_event_loop->node())) {
-      continue;
-    }
-
-    if (channel->name()->string_view().find(FLAGS_name) == std::string::npos) {
-      continue;
-    }
-
-    // Add a record to the stats vector.
-    channel_stats.push_back(ChannelStats{
-        channel, node, &log_reader_factory,
-        aos::configuration::GetSchema(reader.configuration(),
-                                      channel->type()->string_view())});
-    // Lambda to read messages and parse for information
-    auto watcher = [&logfile_stats, &channel_stats,
-                    it](const aos::Context &context) {
-      channel_stats[it].Add(context);
-
-      // Update the overall logfile statistics
-      logfile_stats.logfile_length += context.size;
-    };
-    if (FLAGS_print_repack_size_diffs) {
-      stats_event_loop->MakeRawWatcher(
-          channel, std::bind(watcher, ::std::placeholders::_1));
-    } else {
-      stats_event_loop->MakeRawNoArgWatcher(channel, watcher);
-    }
-    it++;
-    // TODO (Stephan): Frequency of messages per second
-    // - Sliding window
-    // - Max / Deviation
-    found_channel = true;
-  }
-  if (!found_channel) {
-    LOG(FATAL) << "Could not find any channels";
-  }
+  // Start the LogStatsApplication when the logfile starts.
+  aos::NodeEventLoopFactory *node_factory =
+      event_loop_factory.GetNodeEventLoopFactory(node);
+  reader.OnStart(node, [&log_stats_application, &event_loop_factory,
+                        node_factory, &reader]() {
+    log_stats_application = node_factory->AlwaysStart<LogStatsApplication>(
+        "log_stats_application", &event_loop_factory, &reader);
+  });
+  reader.OnEnd(node, [&log_stats_application, node_factory]() {
+    CHECK(log_stats_application != nullptr);
+    node_factory->Stop(log_stats_application);
+    log_stats_application = nullptr;
+  });
 
   if (FLAGS_run_for > 0.0) {
-    log_reader_factory.RunFor(
+    event_loop_factory.RunFor(
         std::chrono::duration_cast<std::chrono::nanoseconds>(
             std::chrono::duration<double>(FLAGS_run_for)));
   } else {
-    log_reader_factory.Run();
+    event_loop_factory.Run();
   }
 
   std::cout << std::endl;
 
-  // Print out the stats per channel and for the logfile
-  for (size_t i = 0; i != channel_stats.size(); i++) {
-    if (!FLAGS_excessive_size_only ||
-        (channel_stats[i].max_message_size() * 2) <
-            static_cast<size_t>(channel_stats[i].channel()->max_size())) {
-      if (channel_stats[i].total_num_messages() > 0) {
-        std::cout << channel_stats[i].channel()->name()->string_view() << " "
-                  << channel_stats[i].channel()->type()->string_view() << "\n";
-
-        logfile_stats.total_log_messages +=
-            channel_stats[i].total_num_messages();
-        logfile_stats.logfile_end_time =
-            std::max(logfile_stats.logfile_end_time,
-                     channel_stats[i].channel_end_time());
-
-        if (!FLAGS_excessive_size_only) {
-          std::cout << "   " << channel_stats[i].total_num_messages()
-                    << " msgs, " << channel_stats[i].avg_messages_per_sec()
-                    << "hz avg, " << channel_stats[i].max_messages_per_sec()
-                    << "hz max, " << channel_stats[i].channel()->frequency()
-                    << "hz configured max";
-        }
-        std::cout << " " << channel_stats[i].avg_message_size()
-                  << " bytes avg, " << channel_stats[i].avg_message_bandwidth()
-                  << " bytes/sec avg, " << channel_stats[i].max_message_size()
-                  << " bytes max / " << channel_stats[i].channel()->max_size()
-                  << "bytes, " << channel_stats[i].Percentile() << ", "
-                  << channel_stats[i].AvgLatency();
-        std::cout << std::endl;
-        if (FLAGS_print_repack_size_diffs) {
-          std::cout << "   " << channel_stats[i].avg_packed_size_reduction()
-                    << " bytes packed reduction avg, "
-                    << channel_stats[i].max_packed_size_reduction()
-                    << " bytes packed reduction max";
-          std::cout << std::endl;
-        }
-      }
-    }
-  }
-  std::cout << std::setfill('-') << std::setw(80) << "-"
-            << "\nLogfile statistics:\n"
-            << "Log starts at:\t" << reader.realtime_start_time(node) << "\n"
-            << "Log ends at:\t" << logfile_stats.logfile_end_time << "\n"
-            << "Log file size:\t" << logfile_stats.logfile_length << "\n"
-            << "Total messages:\t" << logfile_stats.total_log_messages << "\n";
-
   // Cleanup the created processes
   reader.Deregister();