Add timing reports for both simulation and shm

This involves a major refactor of how event loops are put together.
Lots of functionality for timing reports is in the base event loop
class (though hidden as much as possible to avoid cluttering the
interface).  More functionality is now in the base class, with simpler
and more consistent subclasses.

Change-Id: I163ffb8d1b0e29a0231b54e205c2d7c5241d662b
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 8defcb5..834ca16 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -57,6 +57,13 @@
   auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
 
   EXPECT_FALSE(fetcher.Fetch());
+  EXPECT_EQ(fetcher.get(), nullptr);
+
+  EXPECT_EQ(fetcher.context().monotonic_sent_time, monotonic_clock::min_time);
+  EXPECT_EQ(fetcher.context().realtime_sent_time, realtime_clock::min_time);
+  EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
+  EXPECT_EQ(fetcher.context().size, 0u);
+  EXPECT_EQ(fetcher.context().data, nullptr);
 
   aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
   TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
@@ -66,6 +73,20 @@
   EXPECT_TRUE(fetcher.Fetch());
   ASSERT_FALSE(fetcher.get() == nullptr);
   EXPECT_EQ(fetcher.get()->value(), 200);
+
+  const chrono::milliseconds kEpsilon(100);
+
+  EXPECT_GE(fetcher.context().monotonic_sent_time,
+            loop2->monotonic_now() - kEpsilon);
+  EXPECT_LE(fetcher.context().monotonic_sent_time,
+            loop2->monotonic_now() + kEpsilon);
+  EXPECT_GE(fetcher.context().realtime_sent_time,
+            loop2->realtime_now() - kEpsilon);
+  EXPECT_LE(fetcher.context().realtime_sent_time,
+            loop2->realtime_now() + kEpsilon);
+  EXPECT_EQ(fetcher.context().queue_index, 0x0u);
+  EXPECT_EQ(fetcher.context().size, 20u);
+  EXPECT_NE(fetcher.context().data, nullptr);
 }
 
 // Tests that watcher will receive all messages sent if they are sent after
@@ -474,21 +495,36 @@
 
 // Verify that timer intervals and duration function properly.
 TEST_P(AbstractEventLoopTest, TimerIntervalAndDuration) {
+  // Force a slower rate so we are guarenteed to have reports for our timer.
+  FLAGS_timing_report_ms = 2000;
+
   const int kCount = 5;
 
   auto loop = MakePrimary();
+  auto loop2 = Make();
+
   ::std::vector<::aos::monotonic_clock::time_point> times;
   ::std::vector<::aos::monotonic_clock::time_point> expected_times;
 
+  Fetcher<timing::Report> report_fetcher =
+      loop2->MakeFetcher<timing::Report>("/aos");
+  EXPECT_FALSE(report_fetcher.Fetch());
+
   auto test_timer = loop->AddTimer([this, &times, &expected_times, &loop]() {
     times.push_back(loop->monotonic_now());
+    EXPECT_EQ(loop->context().realtime_sent_time, realtime_clock::min_time);
+    EXPECT_EQ(loop->context().queue_index, 0xffffffffu);
+    EXPECT_EQ(loop->context().size, 0u);
+    EXPECT_EQ(loop->context().data, nullptr);
+
     expected_times.push_back(loop->context().monotonic_sent_time);
     if (times.size() == kCount) {
       this->Exit();
     }
   });
+  test_timer->set_name("Test loop");
 
-  monotonic_clock::time_point start_time = loop->monotonic_now();
+  const monotonic_clock::time_point start_time = loop->monotonic_now();
   // TODO(austin): This should be an error...  Should be done in OnRun only.
   test_timer->Setup(start_time + chrono::seconds(1), chrono::seconds(1));
 
@@ -540,6 +576,39 @@
 
   EXPECT_LT(expected_times[expected_times.size() / 2], average_time + kEpsilon);
   EXPECT_GT(expected_times[expected_times.size() / 2], average_time - kEpsilon);
+
+  // And, since we are here, check that the timing report makes sense.
+  // Start by looking for our event loop's timing.
+  FlatbufferDetachedBuffer<timing::Report> report =
+      FlatbufferDetachedBuffer<timing::Report>::Empty();
+  while (report_fetcher.FetchNext()) {
+    if (report_fetcher->name()->string_view() == "primary") {
+      report = CopyFlatBuffer(report_fetcher.get());
+    }
+  }
+
+  // Confirm that we have the right number of reports, and the contents are
+  // sane.
+  VLOG(1) << FlatbufferToJson(report, true);
+
+  EXPECT_EQ(report.message().name()->string_view(), "primary");
+
+  ASSERT_NE(report.message().senders(), nullptr);
+  EXPECT_EQ(report.message().senders()->size(), 1);
+
+  ASSERT_NE(report.message().timers(), nullptr);
+  EXPECT_EQ(report.message().timers()->size(), 2);
+
+  EXPECT_EQ(report.message().timers()->Get(0)->name()->string_view(),
+            "Test loop");
+  EXPECT_GE(report.message().timers()->Get(0)->count(), 1);
+
+  EXPECT_EQ(report.message().timers()->Get(1)->name()->string_view(),
+            "timing_reports");
+  EXPECT_EQ(report.message().timers()->Get(1)->count(), 1);
+
+  // Make sure there is a single phased loop report with our report in it.
+  ASSERT_EQ(report.message().phased_loops(), nullptr);
 }
 
 // Verify that we can change a timer's parameters during execution.
@@ -591,7 +660,7 @@
 TEST_P(AbstractEventLoopDeathTest, InvalidChannel) {
   auto loop = MakePrimary();
 
-  const Channel *channel = loop->configuration()->channels()->Get(0);
+  const Channel *channel = loop->configuration()->channels()->Get(1);
 
   FlatbufferDetachedBuffer<Channel> channel_copy = CopyFlatBuffer(channel);
 
@@ -675,26 +744,45 @@
 // Tests that a couple phased loops run in a row result in the correct offset
 // and period.
 TEST_P(AbstractEventLoopTest, PhasedLoopTest) {
+  // Force a slower rate so we are guarenteed to have reports for our phased
+  // loop.
+  FLAGS_timing_report_ms = 2000;
+
   const chrono::milliseconds kOffset = chrono::milliseconds(400);
   const int kCount = 5;
 
   auto loop1 = MakePrimary();
+  auto loop2 = Make();
+
+  Fetcher<timing::Report> report_fetcher =
+      loop2->MakeFetcher<timing::Report>("/aos");
+  EXPECT_FALSE(report_fetcher.Fetch());
 
   // Collect up a couple of samples.
   ::std::vector<::aos::monotonic_clock::time_point> times;
   ::std::vector<::aos::monotonic_clock::time_point> expected_times;
 
   // Run kCount iterations.
-  loop1->AddPhasedLoop(
-      [&times, &expected_times, &loop1, this](int count) {
-        EXPECT_EQ(count, 1);
-        times.push_back(loop1->monotonic_now());
-        expected_times.push_back(loop1->context().monotonic_sent_time);
-        if (times.size() == kCount) {
-          this->Exit();
-        }
-      },
-      chrono::seconds(1), kOffset);
+  loop1
+      ->AddPhasedLoop(
+          [&times, &expected_times, &loop1, this](int count) {
+            EXPECT_EQ(count, 1);
+            times.push_back(loop1->monotonic_now());
+            expected_times.push_back(loop1->context().monotonic_sent_time);
+
+            EXPECT_EQ(loop1->context().realtime_sent_time,
+                      realtime_clock::min_time);
+            EXPECT_EQ(loop1->context().queue_index, 0xffffffffu);
+            EXPECT_EQ(loop1->context().size, 0u);
+            EXPECT_EQ(loop1->context().data, nullptr);
+
+            if (times.size() == kCount) {
+              LOG(INFO) << "Exiting";
+              this->Exit();
+            }
+          },
+          chrono::seconds(1), kOffset)
+      ->set_name("Test loop");
 
   // Add a delay to make sure that delay during startup doesn't result in a
   // "missed cycle".
@@ -749,6 +837,266 @@
 
   EXPECT_LT(expected_times[expected_times.size() / 2], average_time + kEpsilon);
   EXPECT_GT(expected_times[expected_times.size() / 2], average_time - kEpsilon);
+
+  // And, since we are here, check that the timing report makes sense.
+  // Start by looking for our event loop's timing.
+  FlatbufferDetachedBuffer<timing::Report> report =
+      FlatbufferDetachedBuffer<timing::Report>::Empty();
+  while (report_fetcher.FetchNext()) {
+    if (report_fetcher->name()->string_view() == "primary") {
+      report = CopyFlatBuffer(report_fetcher.get());
+    }
+  }
+
+  VLOG(1) << FlatbufferToJson(report, true);
+
+  EXPECT_EQ(report.message().name()->string_view(), "primary");
+
+  ASSERT_NE(report.message().senders(), nullptr);
+  EXPECT_EQ(report.message().senders()->size(), 1);
+
+  ASSERT_NE(report.message().timers(), nullptr);
+  EXPECT_EQ(report.message().timers()->size(), 1);
+
+  // Make sure there is a single phased loop report with our report in it.
+  ASSERT_NE(report.message().phased_loops(), nullptr);
+  ASSERT_EQ(report.message().phased_loops()->size(), 1);
+  EXPECT_EQ(report.message().phased_loops()->Get(0)->name()->string_view(),
+            "Test loop");
+  EXPECT_GE(report.message().phased_loops()->Get(0)->count(), 1);
+}
+
+// Tests that senders count correctly in the timing report.
+TEST_P(AbstractEventLoopTest, SenderTimingReport) {
+  FLAGS_timing_report_ms = 1000;
+  auto loop1 = MakePrimary();
+
+  auto loop2 = Make("watcher_loop");
+  loop2->MakeWatcher("/test", [](const TestMessage &) {});
+
+  auto loop3 = Make();
+
+  Fetcher<timing::Report> report_fetcher =
+      loop3->MakeFetcher<timing::Report>("/aos");
+  EXPECT_FALSE(report_fetcher.Fetch());
+
+  auto sender = loop1->MakeSender<TestMessage>("/test");
+
+  // Add a timer to actually quit.
+  auto test_timer = loop1->AddTimer([&sender]() {
+    for (int i = 0; i < 10; ++i) {
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(200 + i);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
+    }
+  });
+
+  // Quit after 1 timing report, mid way through the next cycle.
+  EndEventLoop(loop1.get(), chrono::milliseconds(2500));
+
+  loop1->OnRun([&test_timer, &loop1]() {
+    test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
+  });
+
+  Run();
+
+  // And, since we are here, check that the timing report makes sense.
+  // Start by looking for our event loop's timing.
+  FlatbufferDetachedBuffer<timing::Report> primary_report =
+      FlatbufferDetachedBuffer<timing::Report>::Empty();
+  while (report_fetcher.FetchNext()) {
+    LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
+    if (report_fetcher->name()->string_view() == "primary") {
+      primary_report = CopyFlatBuffer(report_fetcher.get());
+    }
+  }
+
+  LOG(INFO) << FlatbufferToJson(primary_report, true);
+
+  EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
+
+  ASSERT_NE(primary_report.message().senders(), nullptr);
+  EXPECT_EQ(primary_report.message().senders()->size(), 2);
+
+  // Confirm that the sender looks sane.
+  EXPECT_EQ(
+      loop1->configuration()
+          ->channels()
+          ->Get(primary_report.message().senders()->Get(0)->channel_index())
+          ->name()
+          ->string_view(),
+      "/test");
+  EXPECT_EQ(primary_report.message().senders()->Get(0)->count(), 10);
+
+  // Confirm that the timing primary_report sender looks sane.
+  EXPECT_EQ(
+      loop1->configuration()
+          ->channels()
+          ->Get(primary_report.message().senders()->Get(1)->channel_index())
+          ->name()
+          ->string_view(),
+      "/aos");
+  EXPECT_EQ(primary_report.message().senders()->Get(1)->count(), 1);
+
+  ASSERT_NE(primary_report.message().timers(), nullptr);
+  EXPECT_EQ(primary_report.message().timers()->size(), 3);
+
+  // Make sure there are no phased loops or watchers.
+  ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
+  ASSERT_EQ(primary_report.message().watchers(), nullptr);
+}
+
+// Tests that senders count correctly in the timing report.
+TEST_P(AbstractEventLoopTest, WatcherTimingReport) {
+  FLAGS_timing_report_ms = 1000;
+  auto loop1 = MakePrimary();
+  loop1->MakeWatcher("/test", [](const TestMessage &) {});
+
+  auto loop2 = Make("sender_loop");
+
+  auto loop3 = Make();
+
+  Fetcher<timing::Report> report_fetcher =
+      loop3->MakeFetcher<timing::Report>("/aos");
+  EXPECT_FALSE(report_fetcher.Fetch());
+
+  auto sender = loop2->MakeSender<TestMessage>("/test");
+
+  // Add a timer to actually quit.
+  auto test_timer = loop1->AddTimer([&sender]() {
+    for (int i = 0; i < 10; ++i) {
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(200 + i);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
+    }
+  });
+
+  // Quit after 1 timing report, mid way through the next cycle.
+  EndEventLoop(loop1.get(), chrono::milliseconds(2500));
+
+  loop1->OnRun([&test_timer, &loop1]() {
+    test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
+  });
+
+  Run();
+
+  // And, since we are here, check that the timing report makes sense.
+  // Start by looking for our event loop's timing.
+  FlatbufferDetachedBuffer<timing::Report> primary_report =
+      FlatbufferDetachedBuffer<timing::Report>::Empty();
+  while (report_fetcher.FetchNext()) {
+    LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
+    if (report_fetcher->name()->string_view() == "primary") {
+      primary_report = CopyFlatBuffer(report_fetcher.get());
+    }
+  }
+
+  // Check the watcher report.
+  VLOG(1) << FlatbufferToJson(primary_report, true);
+
+  EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
+
+  // Just the timing report timer.
+  ASSERT_NE(primary_report.message().timers(), nullptr);
+  EXPECT_EQ(primary_report.message().timers()->size(), 3);
+
+  // No phased loops
+  ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
+
+  ASSERT_NE(primary_report.message().watchers(), nullptr);
+  ASSERT_EQ(primary_report.message().watchers()->size(), 1);
+  EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
+}
+
+// Tests that fetchers count correctly in the timing report.
+TEST_P(AbstractEventLoopTest, FetcherTimingReport) {
+  FLAGS_timing_report_ms = 1000;
+  auto loop1 = MakePrimary();
+  auto loop2 = Make("sender_loop");
+
+  auto loop3 = Make();
+
+  Fetcher<timing::Report> report_fetcher =
+      loop3->MakeFetcher<timing::Report>("/aos");
+  EXPECT_FALSE(report_fetcher.Fetch());
+
+  auto sender = loop2->MakeSender<TestMessage>("/test");
+  auto fetcher1 = loop1->MakeFetcher<TestMessage>("/test");
+  auto fetcher2 = loop1->MakeFetcher<TestMessage>("/test");
+  fetcher1.Fetch();
+  fetcher2.Fetch();
+
+  // Add a timer to actually quit.
+  auto test_timer = loop1->AddTimer([&sender]() {
+    for (int i = 0; i < 10; ++i) {
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(200 + i);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
+    }
+  });
+
+  auto test_timer2 = loop1->AddTimer([&fetcher1, &fetcher2]() {
+    fetcher1.Fetch();
+    while (fetcher2.FetchNext()) {
+    }
+  });
+
+  // Quit after 1 timing report, mid way through the next cycle.
+  EndEventLoop(loop1.get(), chrono::milliseconds(2500));
+
+  loop1->OnRun([test_timer, test_timer2, &loop1]() {
+    test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1400));
+    test_timer2->Setup(loop1->monotonic_now() + chrono::milliseconds(1600));
+  });
+
+  Run();
+
+  // And, since we are here, check that the timing report makes sense.
+  // Start by looking for our event loop's timing.
+  FlatbufferDetachedBuffer<timing::Report> primary_report =
+      FlatbufferDetachedBuffer<timing::Report>::Empty();
+  while (report_fetcher.FetchNext()) {
+    if (report_fetcher->name()->string_view() == "primary") {
+      primary_report = CopyFlatBuffer(report_fetcher.get());
+    }
+  }
+
+  VLOG(1) << FlatbufferToJson(primary_report, true);
+
+  EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
+
+  ASSERT_NE(primary_report.message().senders(), nullptr);
+  EXPECT_EQ(primary_report.message().senders()->size(), 1);
+
+  ASSERT_NE(primary_report.message().timers(), nullptr);
+  EXPECT_EQ(primary_report.message().timers()->size(), 4);
+
+  // Make sure there are no phased loops or watchers.
+  ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
+  ASSERT_EQ(primary_report.message().watchers(), nullptr);
+
+  // Now look at the fetchrs.
+  ASSERT_NE(primary_report.message().fetchers(), nullptr);
+  ASSERT_EQ(primary_report.message().fetchers()->size(), 2);
+
+  EXPECT_EQ(primary_report.message().fetchers()->Get(0)->count(), 1);
+  EXPECT_GE(primary_report.message().fetchers()->Get(0)->latency()->average(),
+            0.1);
+  EXPECT_GE(primary_report.message().fetchers()->Get(0)->latency()->min(),
+            0.1);
+  EXPECT_GE(primary_report.message().fetchers()->Get(0)->latency()->max(),
+            0.1);
+  EXPECT_EQ(primary_report.message()
+                .fetchers()
+                ->Get(0)
+                ->latency()
+                ->standard_deviation(),
+            0.0);
+
+  EXPECT_EQ(primary_report.message().fetchers()->Get(1)->count(), 10);
 }
 
 }  // namespace testing