Expose OnRun to Rust

Change-Id: I2760992cbb98b3bf3b99c7246c4ac1557cf54a0f
Signed-off-by: Brian Silverman <bsilver16384@gmail.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 59d27ad..b339b67 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -122,6 +122,9 @@
 
 cc_library(
     name = "event_loop_runtime_cc",
+    srcs = [
+        "event_loop_runtime.cc",
+    ],
     hdrs = [
         "event_loop_runtime.h",
     ],
diff --git a/aos/events/event_loop_runtime.cc b/aos/events/event_loop_runtime.cc
new file mode 100644
index 0000000..727d806
--- /dev/null
+++ b/aos/events/event_loop_runtime.cc
@@ -0,0 +1,11 @@
+#include "aos/events/event_loop_runtime.h"
+
+namespace aos {
+
+OnRunForRust::OnRunForRust(EventLoopRuntime *runtime) : runtime_(runtime) {
+  ++runtime->child_count_;
+}
+OnRunForRust::~OnRunForRust() { --runtime_->child_count_; }
+bool OnRunForRust::is_running() const { return runtime_->is_running(); }
+
+}  // namespace aos
diff --git a/aos/events/event_loop_runtime.h b/aos/events/event_loop_runtime.h
index df76d67..faa5ad7 100644
--- a/aos/events/event_loop_runtime.h
+++ b/aos/events/event_loop_runtime.h
@@ -124,20 +124,36 @@
   const std::unique_ptr<RawFetcher> fetcher_;
 };
 
+class EventLoopRuntime;
+
+class OnRunForRust {
+ public:
+  OnRunForRust(EventLoopRuntime *runtime);
+  ~OnRunForRust();
+
+  bool is_running() const;
+
+ private:
+  EventLoopRuntime *const runtime_;
+};
+
 class EventLoopRuntime {
  public:
   EventLoopRuntime(EventLoop *event_loop) : event_loop_(event_loop) {}
-  ~EventLoopRuntime() = default;
+  ~EventLoopRuntime() {
+    CHECK_EQ(child_count_, 0)
+        << ": Some child objects were not destroyed first";
+  }
 
   EventLoop *event_loop() { return event_loop_; }
 
   void spawn(std::unique_ptr<ApplicationFuture> task) {
     CHECK(!task_) << ": May only call spawn once";
     task_ = std::move(task);
-    // TODO(Brian): Do this once we've got OnRun support.
-    // DoPoll();
-    // TODO(Brian): Once we have OnRun support, should this move there or stay
-    // here unconditionally?
+    DoPoll();
+    // Just do this unconditionally, so we don't have to keep track of each
+    // OnRun to only do it once. If Rust doesn't use OnRun, it's harmless to do
+    // an extra poll.
     event_loop_->OnRun([this] { DoPoll(); });
   }
 
@@ -146,6 +162,8 @@
   }
   const Node *node() const { return event_loop_->node(); }
 
+  bool is_running() const { return event_loop_->is_running(); }
+
   // autocxx generates broken C++ code for `time_point`, see
   // https://github.com/google/autocxx/issues/787.
   int64_t monotonic_now() const {
@@ -175,7 +193,11 @@
     return FetcherForRust(event_loop_->MakeRawFetcher(channel));
   }
 
+  OnRunForRust MakeOnRun() { return OnRunForRust(this); }
+
  private:
+  friend class OnRunForRust;
+
   // Polls the top-level future once. This is what all the callbacks should do.
   void DoPoll() {
     if (task_) {
@@ -186,6 +208,8 @@
   EventLoop *const event_loop_;
 
   std::unique_ptr<ApplicationFuture> task_;
+
+  int child_count_ = 0;
 };
 
 }  // namespace aos
diff --git a/aos/events/event_loop_runtime.rs b/aos/events/event_loop_runtime.rs
index cd3ca6d..c01d74f 100644
--- a/aos/events/event_loop_runtime.rs
+++ b/aos/events/event_loop_runtime.rs
@@ -71,6 +71,7 @@
 generate!("aos::RawSender_Error")
 generate!("aos::SenderForRust")
 generate!("aos::FetcherForRust")
+generate!("aos::OnRunForRust")
 generate!("aos::EventLoopRuntime")
 
 subclass!("aos::ApplicationFuture", RustApplicationFuture)
@@ -218,9 +219,8 @@
     /// want your task to stop, return the result of awaiting [`futures::future::pending`], which
     /// will never complete. `task` will not be polled after the underlying `aos::EventLoop` exits.
     ///
-    /// TODO(Brian): Make this paragraph true:
-    /// Note that task will be polled immediately. If you want to defer work until the event loop
-    /// starts running, await TODO in the task.
+    /// Note that task will be polled immediately, to give it a chance to initialize. If you want to
+    /// defer work until the event loop starts running, await [`on_run`] in the task.
     ///
     /// # Panics
     ///
@@ -446,8 +446,17 @@
     // TODO(Brian): Expose timers and phased loops. Should we have `sleep`-style methods for those,
     // instead of / in addition to mirroring C++ with separate setup and wait?
 
-    // TODO(Brian): Expose OnRun. That should only be called once, so coalesce and have it return
-    // immediately afterwards.
+    /// Returns a Future to wait until the underlying EventLoop is running. Once this resolves, all
+    /// subsequent code will have any realtime scheduling applied. This means it can rely on
+    /// consistent timing, but it can no longer create any EventLoop child objects or do anything
+    /// else non-realtime.
+    pub fn on_run(&mut self) -> OnRun {
+        OnRun(self.0.as_mut().MakeOnRun().within_box())
+    }
+
+    pub fn is_running(&self) -> bool {
+        self.0.is_running()
+    }
 }
 
 /// Provides async blocking access to messages on a channel. This will return every message on the
@@ -1052,6 +1061,23 @@
     }
 }
 
+/// The type returned from [`EventLoopRuntime::on_run`], see there for details.
+// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
+#[repr(transparent)]
+pub struct OnRun(Pin<Box<ffi::aos::OnRunForRust>>);
+
+impl Future for OnRun {
+    type Output = ();
+
+    fn poll(self: Pin<&mut Self>, _: &mut std::task::Context) -> Poll<()> {
+        if self.0.is_running() {
+            Poll::Ready(())
+        } else {
+            Poll::Pending
+        }
+    }
+}
+
 /// Represents a `aos::monotonic_clock::time_point` in a natural Rust way. This
 /// is intended to have the same API as [`std::time::Instant`], any missing
 /// functionality can be added if useful.
diff --git a/aos/events/event_loop_runtime_test.cc b/aos/events/event_loop_runtime_test.cc
index 81f3be3..47e56b8 100644
--- a/aos/events/event_loop_runtime_test.cc
+++ b/aos/events/event_loop_runtime_test.cc
@@ -42,7 +42,9 @@
         ->Setup(
             ping_event_loop->monotonic_now() + std::chrono::milliseconds(10),
             std::chrono::milliseconds(10));
+    ASSERT_EQ(starting_count, started_test_count());
     factory.Run();
+    ASSERT_EQ(starting_count + 1, started_test_count());
     EXPECT_EQ(2, iteration);
   }
   ASSERT_EQ(starting_count + 1, completed_test_count());
diff --git a/aos/events/event_loop_runtime_test_lib.rs b/aos/events/event_loop_runtime_test_lib.rs
index 62c7584..310d0ec 100644
--- a/aos/events/event_loop_runtime_test_lib.rs
+++ b/aos/events/event_loop_runtime_test_lib.rs
@@ -35,6 +35,10 @@
         })
     }
 
+    fn started_test_count() -> u32 {
+        GLOBAL_STATE.with(|g| g.borrow().on_run_count)
+    }
+
     pub struct TestApplication<'event_loop> {
         _runtime: EventLoopRuntime<'event_loop>,
         raw_ping_fetcher: RawFetcher,
@@ -51,8 +55,9 @@
                     .get_raw_channel("/test", "aos.examples.Pong")
                     .expect("Should have Pong channel"),
             );
+            let on_run = runtime.on_run();
             runtime.spawn(async move {
-                // TODO(Brian): Wait for OnRun here.
+                on_run.await;
                 GLOBAL_STATE.with(|g| {
                     let g = &mut *g.borrow_mut();
                     assert_eq!(g.creation_count, g.drop_count + 1);
@@ -163,8 +168,9 @@
         fn new(mut runtime: EventLoopRuntime<'event_loop>) -> Self {
             let mut ping_watcher = runtime.make_watcher::<Ping<'static>>("/test").unwrap();
             let mut pong_sender = runtime.make_sender::<Pong<'static>>("/test").unwrap();
+            let on_run = runtime.on_run();
             runtime.spawn(async move {
-                // TODO(Brian): Wait for OnRun here.
+                on_run.await;
                 GLOBAL_STATE.with(|g| {
                     let g = &mut *g.borrow_mut();
                     assert_eq!(g.creation_count, g.drop_count + 1);
@@ -275,6 +281,7 @@
             ) -> Box<TypedTestApplication<'static>>;
 
             fn completed_test_count() -> u32;
+            fn started_test_count() -> u32;
         }
 
         extern "Rust" {