Add SetRuntimeRealtimePriority

This lets us move the concept of priority into the event loop rather
than trying to manage it elsewhere.  Also add tests to confirm I got
them all.

Change-Id: Iba2651778e8d1eb88dedf71d0efdf05dfc999f2d
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 1850a6c..21e9677 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -29,6 +29,7 @@
     visibility = ["//visibility:public"],
     deps = [
         ":event-loop",
+        "//aos:init",
         "//aos:queues",
         "//aos/logging",
     ],
diff --git a/aos/events/event-loop.h b/aos/events/event-loop.h
index bedf9a5..79651ef 100644
--- a/aos/events/event-loop.h
+++ b/aos/events/event-loop.h
@@ -129,11 +129,14 @@
   // Use this to run code once the thread goes into "real-time-mode",
   virtual void OnRun(std::function<void()>) = 0;
 
-  // TODO(austin): Sort out how to switch to realtime on run.
-  // virtual void RunRealtime() = 0;
+  // TODO(austin): OnExit
 
   // Stops receiving events
   virtual void Exit() = 0;
+
+  // Sets the scheduler priority to run the event loop at.  This may not be
+  // called after we go into "real-time-mode".
+  virtual void SetRuntimeRealtimePriority(int priority) = 0;
 };
 
 }  // namespace aos
diff --git a/aos/events/event-loop_param_test.cc b/aos/events/event-loop_param_test.cc
index 371cb78..5276aec 100644
--- a/aos/events/event-loop_param_test.cc
+++ b/aos/events/event-loop_param_test.cc
@@ -310,6 +310,42 @@
   EXPECT_THAT(values, ::testing::ElementsAreArray({201, 202, 204}));
 }
 
+
+// Tests that FetchNext behaves correctly when we get two messages in the queue
+// but don't consume the first until after the second has been sent.
+TEST_P(AbstractEventLoopTest, FetchNextTest) {
+
+  auto send_loop = Make();
+  auto fetch_loop = Make();
+  auto sender = send_loop->MakeSender<TestMessage>("/test");
+  Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test");
+
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 100;
+    ASSERT_TRUE(msg.Send());
+  }
+
+  {
+    auto msg = sender.MakeMessage();
+    msg->msg_value = 200;
+    ASSERT_TRUE(msg.Send());
+  }
+
+  ASSERT_TRUE(fetcher.FetchNext());
+  ASSERT_NE(nullptr, fetcher.get());
+  EXPECT_EQ(100, fetcher->msg_value);
+
+  ASSERT_TRUE(fetcher.FetchNext());
+  ASSERT_NE(nullptr, fetcher.get());
+  EXPECT_EQ(200, fetcher->msg_value);
+
+  // When we run off the end of the queue, expect to still have the old message:
+  ASSERT_FALSE(fetcher.FetchNext());
+  ASSERT_NE(nullptr, fetcher.get());
+  EXPECT_EQ(200, fetcher->msg_value);
+}
+
 // Verify that making a fetcher and watcher for "/test" succeeds.
 TEST_P(AbstractEventLoopTest, FetcherAndWatcher) {
   auto loop = Make();
@@ -332,6 +368,17 @@
                "/test");
 }
 
+// Verify that SetRuntimeRealtimePriority fails while running.
+TEST_P(AbstractEventLoopTest, SetRuntimeRealtimePriority) {
+  auto loop = MakePrimary();
+  // Confirm that runtime priority calls work when not realtime.
+  loop->SetRuntimeRealtimePriority(5);
+
+  loop->OnRun([&]() { loop->SetRuntimeRealtimePriority(5); });
+
+  EXPECT_DEATH(Run(), "realtime");
+}
+
 // Verify that registering a watcher and a sender for "/test" fails.
 TEST_P(AbstractEventLoopTest, WatcherAndSender) {
   auto loop = Make();
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index 11d9312..ee71b4c 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -6,6 +6,7 @@
 #include <chrono>
 #include <stdexcept>
 
+#include "aos/init.h"
 #include "aos/logging/logging.h"
 #include "aos/queue.h"
 
@@ -122,9 +123,13 @@
   }
 
   void Run() {
+    thread_state_->MaybeSetCurrentThreadRealtimePriority();
     thread_state_->WaitForStart();
 
-    if (!thread_state_->is_running()) return;
+    if (!thread_state_->is_running()) {
+      ::aos::UnsetCurrentThreadRealtimePriority();
+      return;
+    }
 
     const void *msg = nullptr;
     while (true) {
@@ -143,6 +148,7 @@
     }
 
     queue_->FreeMessage(msg);
+    ::aos::UnsetCurrentThreadRealtimePriority();
   }
 
  private:
@@ -180,6 +186,7 @@
   }
 
   void Run() {
+    thread_state_->MaybeSetCurrentThreadRealtimePriority();
     thread_state_->WaitForStart();
 
     while (true) {
@@ -196,6 +203,7 @@
         if (!thread_state_->is_running()) break;
       }
     }
+    ::aos::UnsetCurrentThreadRealtimePriority();
   }
 
  private:
@@ -256,11 +264,13 @@
 }
 
 void ShmEventLoop::Run() {
+  thread_state_->MaybeSetCurrentThreadRealtimePriority();
   set_is_running(true);
   for (const auto &run : on_run_) run();
   // TODO(austin): epoll event loop in main thread (if needed), and async safe
   // quit handler.
   thread_state_->Run();
+  ::aos::UnsetCurrentThreadRealtimePriority();
 }
 
 void ShmEventLoop::ThreadState::Run() {
@@ -275,6 +285,12 @@
   }
 }
 
+void ShmEventLoop::ThreadState::MaybeSetCurrentThreadRealtimePriority() {
+  if (priority_ != -1) {
+    ::aos::SetCurrentThreadRealtimePriority(priority_);
+  }
+}
+
 void ShmEventLoop::ThreadState::WaitForStart() {
   MutexLocker locker(&mutex_);
   while (!(loop_running_ || loop_finished_)) {
diff --git a/aos/events/shm-event-loop.h b/aos/events/shm-event-loop.h
index e8ff267..3d231ba 100644
--- a/aos/events/shm-event-loop.h
+++ b/aos/events/shm-event-loop.h
@@ -45,6 +45,13 @@
   void Run() override;
   void Exit() override;
 
+  void SetRuntimeRealtimePriority(int priority) override {
+    if (is_running()) {
+      ::aos::Die("Cannot set realtime priority while running.");
+    }
+    thread_state_->priority_ = priority;
+  }
+
  private:
   friend class internal::WatcherThreadState;
   friend class internal::TimerHandlerState;
@@ -62,6 +69,8 @@
 
     void Exit();
 
+    void MaybeSetCurrentThreadRealtimePriority();
+
    private:
     friend class internal::WatcherThreadState;
     friend class internal::TimerHandlerState;
@@ -74,6 +83,7 @@
     // Used to notify watchers that the loop is done.
     std::atomic<bool> loop_running_{false};
     bool loop_finished_ = false;
+    int priority_ = -1;
   };
 
   // Exclude multiple of the same type for path.
diff --git a/aos/events/shm-event-loop_test.cc b/aos/events/shm-event-loop_test.cc
index 0878e12..efa8545 100644
--- a/aos/events/shm-event-loop_test.cc
+++ b/aos/events/shm-event-loop_test.cc
@@ -7,6 +7,7 @@
 namespace aos {
 namespace testing {
 namespace {
+namespace chrono = ::std::chrono;
 
 class ShmEventLoopTestFactory : public EventLoopTestFactory {
  public:
@@ -46,42 +47,55 @@
 
 }  // namespace
 
-// Tests that FetchNext behaves correctly when we get two messages in the queue
-// but don't consume the first until after the second has been sent.
-// This cannot be abstracted to AbstractEventLoopTest because not all
-// event loops currently support FetchNext().
-TEST(ShmEventLoopTest, FetchNextTest) {
-  ::aos::testing::TestSharedMemory my_shm;
-
-  ShmEventLoop send_loop;
-  ShmEventLoop fetch_loop;
-  auto sender = send_loop.MakeSender<TestMessage>("/test");
-  Fetcher<TestMessage> fetcher = fetch_loop.MakeFetcher<TestMessage>("/test");
-
-  {
-    auto msg = sender.MakeMessage();
-    msg->msg_value = 100;
-    ASSERT_TRUE(msg.Send());
+bool IsRealtime() {
+  int scheduler;
+  if ((scheduler = sched_getscheduler(0)) == -1) {
+    PLOG(FATAL, "sched_getscheduler(0) failed\n");
   }
+  LOG(INFO, "scheduler is %d\n", scheduler);
+  return scheduler == SCHED_FIFO || scheduler == SCHED_RR;
+}
 
-  {
+// Tests that every handler type is realtime and runs.  There are threads
+// involved and it's easy to miss one.
+TEST(ShmEventLoopTest, AllHandlersAreRealtime) {
+  ShmEventLoopTestFactory factory;
+  auto loop = factory.MakePrimary();
+  auto loop2 = factory.Make();
+
+  loop->SetRuntimeRealtimePriority(1);
+
+  auto sender = loop2->MakeSender<TestMessage>("/test");
+
+  bool did_onrun = false;
+  bool did_timer = false;
+  bool did_watcher = false;
+
+  auto timer = loop->AddTimer([&did_timer, &loop]() {
+    EXPECT_TRUE(IsRealtime());
+    did_timer = true;
+    loop->Exit();
+  });
+
+  loop->MakeWatcher("/test", [&did_watcher](const TestMessage &) {
+    EXPECT_TRUE(IsRealtime());
+    did_watcher = true;
+  });
+
+  loop->OnRun([&loop, &did_onrun, &sender, timer]() {
+    EXPECT_TRUE(IsRealtime());
+    did_onrun = true;
+    timer->Setup(loop->monotonic_now() + chrono::milliseconds(100));
     auto msg = sender.MakeMessage();
     msg->msg_value = 200;
-    ASSERT_TRUE(msg.Send());
-  }
+    msg.Send();
+  });
 
-  ASSERT_TRUE(fetcher.FetchNext());
-  ASSERT_NE(nullptr, fetcher.get());
-  EXPECT_EQ(100, fetcher->msg_value);
+  factory.Run();
 
-  ASSERT_TRUE(fetcher.FetchNext());
-  ASSERT_NE(nullptr, fetcher.get());
-  EXPECT_EQ(200, fetcher->msg_value);
-
-  // When we run off the end of the queue, expect to still have the old message:
-  ASSERT_FALSE(fetcher.FetchNext());
-  ASSERT_NE(nullptr, fetcher.get());
-  EXPECT_EQ(200, fetcher->msg_value);
+  EXPECT_TRUE(did_onrun);
+  EXPECT_TRUE(did_timer);
+  EXPECT_TRUE(did_watcher);
 }
 }  // namespace testing
 }  // namespace aos
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
index d5a0f67..3836283 100644
--- a/aos/events/simulated-event-loop.cc
+++ b/aos/events/simulated-event-loop.cc
@@ -13,7 +13,9 @@
 class SimulatedSender : public RawSender {
  public:
   SimulatedSender(SimulatedQueue *queue, EventLoop *event_loop)
-      : queue_(queue), event_loop_(event_loop) {}
+      : queue_(queue), event_loop_(event_loop) {
+    testing::EnableTestLogging();
+  }
   ~SimulatedSender() {}
 
   aos::Message *GetMessage() override {
@@ -186,6 +188,12 @@
 
   void Take(const ::std::string &path);
 
+  void SetRuntimeRealtimePriority(int /*priority*/) override {
+    if (is_running()) {
+      ::aos::Die("Cannot set realtime priority while running.");
+    }
+  }
+
  private:
   EventScheduler *scheduler_;
   ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>