Add average latency to log_stats

Useful to see how network usage affects latency of forwarded messages.
Using to see impact of log offloading on forwarding.

Change-Id: I15d64339f6116b3d0b8473d1cbcc1fb28ea7fb5e
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index 4efb8df..8e3d451 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -20,7 +20,8 @@
             "Only print channels that have a set max message size that is more "
             "than double of the max message size.");
 
-// This class implements a histogram for tracking message period percentiles.
+// This class implements a histogram for tracking message period
+// percentiles.
 class Histogram {
  public:
   Histogram(size_t buckets = 1024)
@@ -109,8 +110,24 @@
 
 class ChannelStats {
  public:
-  ChannelStats(const aos::Channel *channel, const aos::Configuration *config)
-      : channel_(channel), config_(config) {}
+  ChannelStats(const aos::Channel *channel, const aos::Node *destination_node,
+               aos::SimulatedEventLoopFactory *factory)
+      : channel_(channel),
+        config_(factory->configuration()),
+        factory_(factory),
+        destination_node_(destination_node) {
+    // Multi-node channel
+    if (channel_->has_source_node() && channel_->has_destination_nodes() &&
+        channel_->destination_nodes()->size() > 0) {
+      CHECK(destination_node_)
+          << "Should have destination node for forwarded channel: "
+          << channel_->name()->string_view();
+      source_node_ = aos::configuration::GetNode(
+          config_, channel_->source_node()->string_view());
+      CHECK(source_node_) << "Node not in config: "
+                          << channel_->source_node()->string_view();
+    }
+  }
 
   // Adds a sample to the statistics.
   void Add(const aos::Context &context) {
@@ -134,6 +151,24 @@
     }
     max_messages_per_period_ = std::max(
         max_messages_per_period_, channel_storage_duration_messages_.size());
+
+    // Only count latency if this message is forwarded and the remote time was
+    // filled
+    if (source_node_ != nullptr &&
+        context.monotonic_remote_time != context.monotonic_event_time) {
+      // Convert times to distributed clock so they can be compared across nodes
+      const aos::distributed_clock::time_point remote_time =
+          factory_->GetNodeEventLoopFactory(source_node_)
+              ->ToDistributedClock(context.monotonic_remote_time);
+
+      const aos::distributed_clock::time_point event_time =
+          factory_->GetNodeEventLoopFactory(destination_node_)
+              ->ToDistributedClock(context.monotonic_event_time);
+      // Add the current latency to the sum
+      total_latency_ += event_time - remote_time;
+
+      num_messages_with_remote_++;
+    }
   }
 
   std::string Percentile() const { return histogram_.Percentile(); }
@@ -167,10 +202,29 @@
 
   const aos::Channel *channel() const { return channel_; }
 
+  std::string AvgLatency() {
+    if (num_messages_with_remote_ == 0) {
+      return "";
+    }
+
+    std::stringstream ss;
+    ss << std::setprecision(3);
+
+    const double avg_latency =
+        std::chrono::duration<double, std::milli>(total_latency_).count() /
+        num_messages_with_remote_;
+    ss << '[' << source_node_->name()->string_view() << "->"
+       << destination_node_->name()->string_view() << " " << avg_latency
+       << "ms latency avg]";
+
+    return ss.str();
+  }
+
  private:
   // pointer to the channel for which stats are collected
   const aos::Channel *channel_;
   const aos::Configuration *config_;
+  aos::SimulatedEventLoopFactory *factory_;
   aos::realtime_clock::time_point channel_end_time_ =
       aos::realtime_clock::min_time;
   aos::monotonic_clock::time_point first_message_time_ =
@@ -189,7 +243,15 @@
   size_t max_message_size_ = 0;
   size_t total_message_size_ = 0;
 
+  // Count of messages which had remote timestamps
+  size_t num_messages_with_remote_ = 0;
+  // Sum of latencies in all messages sent on this channel if multinode
+  aos::distributed_clock::duration total_latency_;
+
   Histogram histogram_;
+
+  const aos::Node *source_node_ = nullptr;
+  const aos::Node *destination_node_;
 };
 
 struct LogfileStats {
@@ -210,8 +272,9 @@
       " - Total messages per channel/type;\n"
       " - Max message size per channel/type;\n"
       " - Frequency of messages per second;\n"
-      " - Total logfile size and number of messages.\n"
-      "Use --logfile flag to select a logfile (path/filename) and use --name "
+      " - Total logfile size and number of messages;\n"
+      " - Average latency per forwarded channel/type.\n"
+      "Pass a logfile (path/filename) and use --name "
       "flag to specify a channel to listen on.");
 
   aos::InitGoogle(&argc, &argv);
@@ -276,7 +339,7 @@
     }
 
     // Add a record to the stats vector.
-    channel_stats.push_back({channel, reader.configuration()});
+    channel_stats.push_back({channel, node, &log_reader_factory});
     // Lambda to read messages and parse for information
     stats_event_loop->MakeRawNoArgWatcher(
         channel,
@@ -326,7 +389,8 @@
                   << " bytes avg, " << channel_stats[i].avg_message_bandwidth()
                   << " bytes/sec avg, " << channel_stats[i].max_message_size()
                   << " bytes max / " << channel_stats[i].channel()->max_size()
-                  << "bytes " << channel_stats[i].Percentile();
+                  << "bytes, " << channel_stats[i].Percentile() << ", "
+                  << channel_stats[i].AvgLatency();
         std::cout << std::endl;
       }
     }