Sort wakeups and add send latency.

We don't want to rely on epoll to trigger wakeups in order.  Timers will
likely wake up before watchers due to less kernel delays, etc.  So,
maintain a sorted list of events and see which should be triggered first
and trigger it first.  It also merges wakeups nicely.

This gives us the infrastructure to handle the same problem in
simulation.  We can have perfect timers, and senders with latency.  But,
if a timer wakes up while something is being sent, we will move up the
wakeup and handle it in order.

Change-Id: I8675ec5221dd2603b4aa5b1a3729907a599813b4
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 16a1923..44be581 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -3,6 +3,7 @@
 #include <string_view>
 
 #include "aos/events/event_loop_param_test.h"
+#include "aos/events/test_message_generated.h"
 #include "gtest/gtest.h"
 
 namespace aos {
@@ -28,6 +29,10 @@
   // I'm not sure how much that matters.
   void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
 
+  void set_send_delay(std::chrono::nanoseconds send_delay) {
+    event_loop_factory_.set_send_delay(send_delay);
+  }
+
  private:
    SimulatedEventLoopFactory event_loop_factory_;
 };
@@ -114,5 +119,106 @@
   EXPECT_EQ(counter, 10);
 }
 
+// Tests that watchers have latency in simulation.
+TEST(SimulatedEventLoopTest, WatcherTimingReport) {
+  SimulatedEventLoopTestFactory factory;
+  factory.set_send_delay(std::chrono::microseconds(50));
+
+  FLAGS_timing_report_ms = 1000;
+  auto loop1 = factory.MakePrimary("primary");
+  loop1->MakeWatcher("/test", [](const TestMessage &) {});
+
+  auto loop2 = factory.Make("sender_loop");
+
+  auto loop3 = factory.Make("report_fetcher");
+
+  Fetcher<timing::Report> report_fetcher =
+      loop3->MakeFetcher<timing::Report>("/aos");
+  EXPECT_FALSE(report_fetcher.Fetch());
+
+  auto sender = loop2->MakeSender<TestMessage>("/test");
+
+  // Send 10 messages in the middle of a timing report period so we get
+  // something interesting back.
+  auto test_timer = loop2->AddTimer([&sender]() {
+    for (int i = 0; i < 10; ++i) {
+      aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+      TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+      builder.add_value(200 + i);
+      ASSERT_TRUE(msg.Send(builder.Finish()));
+    }
+  });
+
+  // Quit after 1 timing report, mid way through the next cycle.
+  {
+    auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
+    end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
+    end_timer->set_name("end");
+  }
+
+  loop1->OnRun([&test_timer, &loop1]() {
+    test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
+  });
+
+  factory.Run();
+
+  // And, since we are here, check that the timing report makes sense.
+  // Start by looking for our event loop's timing.
+  FlatbufferDetachedBuffer<timing::Report> primary_report =
+      FlatbufferDetachedBuffer<timing::Report>::Empty();
+  while (report_fetcher.FetchNext()) {
+    LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
+    if (report_fetcher->name()->string_view() == "primary") {
+      primary_report = CopyFlatBuffer(report_fetcher.get());
+    }
+  }
+
+  // Check the watcher report.
+  VLOG(1) << FlatbufferToJson(primary_report, true);
+
+  EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
+
+  // Just the timing report timer.
+  ASSERT_NE(primary_report.message().timers(), nullptr);
+  EXPECT_EQ(primary_report.message().timers()->size(), 2);
+
+  // No phased loops
+  ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
+
+  // And now confirm that the watcher received all 10 messages, and has latency.
+  ASSERT_NE(primary_report.message().watchers(), nullptr);
+  ASSERT_EQ(primary_report.message().watchers()->size(), 1);
+  EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
+  EXPECT_NEAR(
+      primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
+      0.00005, 1e-9);
+  EXPECT_NEAR(
+      primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
+      0.00005, 1e-9);
+  EXPECT_NEAR(
+      primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
+      0.00005, 1e-9);
+  EXPECT_EQ(primary_report.message()
+                .watchers()
+                ->Get(0)
+                ->wakeup_latency()
+                ->standard_deviation(),
+            0.0);
+
+  EXPECT_EQ(
+      primary_report.message().watchers()->Get(0)->handler_time()->average(),
+      0.0);
+  EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
+            0.0);
+  EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
+            0.0);
+  EXPECT_EQ(primary_report.message()
+                .watchers()
+                ->Get(0)
+                ->handler_time()
+                ->standard_deviation(),
+            0.0);
+}
+
 }  // namespace testing
 }  // namespace aos