Expose OnRun to Rust
Change-Id: I2760992cbb98b3bf3b99c7246c4ac1557cf54a0f
Signed-off-by: Brian Silverman <bsilver16384@gmail.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 59d27ad..b339b67 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -122,6 +122,9 @@
cc_library(
name = "event_loop_runtime_cc",
+ srcs = [
+ "event_loop_runtime.cc",
+ ],
hdrs = [
"event_loop_runtime.h",
],
diff --git a/aos/events/event_loop_runtime.cc b/aos/events/event_loop_runtime.cc
new file mode 100644
index 0000000..727d806
--- /dev/null
+++ b/aos/events/event_loop_runtime.cc
@@ -0,0 +1,11 @@
+#include "aos/events/event_loop_runtime.h"
+
+namespace aos {
+
+OnRunForRust::OnRunForRust(EventLoopRuntime *runtime) : runtime_(runtime) {
+ ++runtime->child_count_;
+}
+OnRunForRust::~OnRunForRust() { --runtime_->child_count_; }
+bool OnRunForRust::is_running() const { return runtime_->is_running(); }
+
+} // namespace aos
diff --git a/aos/events/event_loop_runtime.h b/aos/events/event_loop_runtime.h
index df76d67..faa5ad7 100644
--- a/aos/events/event_loop_runtime.h
+++ b/aos/events/event_loop_runtime.h
@@ -124,20 +124,36 @@
const std::unique_ptr<RawFetcher> fetcher_;
};
+class EventLoopRuntime;
+
+class OnRunForRust {
+ public:
+ OnRunForRust(EventLoopRuntime *runtime);
+ ~OnRunForRust();
+
+ bool is_running() const;
+
+ private:
+ EventLoopRuntime *const runtime_;
+};
+
class EventLoopRuntime {
public:
EventLoopRuntime(EventLoop *event_loop) : event_loop_(event_loop) {}
- ~EventLoopRuntime() = default;
+ ~EventLoopRuntime() {
+ CHECK_EQ(child_count_, 0)
+ << ": Some child objects were not destroyed first";
+ }
EventLoop *event_loop() { return event_loop_; }
void spawn(std::unique_ptr<ApplicationFuture> task) {
CHECK(!task_) << ": May only call spawn once";
task_ = std::move(task);
- // TODO(Brian): Do this once we've got OnRun support.
- // DoPoll();
- // TODO(Brian): Once we have OnRun support, should this move there or stay
- // here unconditionally?
+ DoPoll();
+ // Just do this unconditionally, so we don't have to keep track of each
+ // OnRun to only do it once. If Rust doesn't use OnRun, it's harmless to do
+ // an extra poll.
event_loop_->OnRun([this] { DoPoll(); });
}
@@ -146,6 +162,8 @@
}
const Node *node() const { return event_loop_->node(); }
+ bool is_running() const { return event_loop_->is_running(); }
+
// autocxx generates broken C++ code for `time_point`, see
// https://github.com/google/autocxx/issues/787.
int64_t monotonic_now() const {
@@ -175,7 +193,11 @@
return FetcherForRust(event_loop_->MakeRawFetcher(channel));
}
+ OnRunForRust MakeOnRun() { return OnRunForRust(this); }
+
private:
+ friend class OnRunForRust;
+
// Polls the top-level future once. This is what all the callbacks should do.
void DoPoll() {
if (task_) {
@@ -186,6 +208,8 @@
EventLoop *const event_loop_;
std::unique_ptr<ApplicationFuture> task_;
+
+ int child_count_ = 0;
};
} // namespace aos
diff --git a/aos/events/event_loop_runtime.rs b/aos/events/event_loop_runtime.rs
index cd3ca6d..c01d74f 100644
--- a/aos/events/event_loop_runtime.rs
+++ b/aos/events/event_loop_runtime.rs
@@ -71,6 +71,7 @@
generate!("aos::RawSender_Error")
generate!("aos::SenderForRust")
generate!("aos::FetcherForRust")
+generate!("aos::OnRunForRust")
generate!("aos::EventLoopRuntime")
subclass!("aos::ApplicationFuture", RustApplicationFuture)
@@ -218,9 +219,8 @@
/// want your task to stop, return the result of awaiting [`futures::future::pending`], which
/// will never complete. `task` will not be polled after the underlying `aos::EventLoop` exits.
///
- /// TODO(Brian): Make this paragraph true:
- /// Note that task will be polled immediately. If you want to defer work until the event loop
- /// starts running, await TODO in the task.
+ /// Note that task will be polled immediately, to give it a chance to initialize. If you want to
+ /// defer work until the event loop starts running, await [`on_run`] in the task.
///
/// # Panics
///
@@ -446,8 +446,17 @@
// TODO(Brian): Expose timers and phased loops. Should we have `sleep`-style methods for those,
// instead of / in addition to mirroring C++ with separate setup and wait?
- // TODO(Brian): Expose OnRun. That should only be called once, so coalesce and have it return
- // immediately afterwards.
+ /// Returns a Future to wait until the underlying EventLoop is running. Once this resolves, all
+ /// subsequent code will have any realtime scheduling applied. This means it can rely on
+ /// consistent timing, but it can no longer create any EventLoop child objects or do anything
+ /// else non-realtime.
+ pub fn on_run(&mut self) -> OnRun {
+ OnRun(self.0.as_mut().MakeOnRun().within_box())
+ }
+
+ pub fn is_running(&self) -> bool {
+ self.0.is_running()
+ }
}
/// Provides async blocking access to messages on a channel. This will return every message on the
@@ -1052,6 +1061,23 @@
}
}
+/// The type returned from [`EventLoopRuntime::on_run`], see there for details.
+// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
+#[repr(transparent)]
+pub struct OnRun(Pin<Box<ffi::aos::OnRunForRust>>);
+
+impl Future for OnRun {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, _: &mut std::task::Context) -> Poll<()> {
+ if self.0.is_running() {
+ Poll::Ready(())
+ } else {
+ Poll::Pending
+ }
+ }
+}
+
/// Represents a `aos::monotonic_clock::time_point` in a natural Rust way. This
/// is intended to have the same API as [`std::time::Instant`], any missing
/// functionality can be added if useful.
diff --git a/aos/events/event_loop_runtime_test.cc b/aos/events/event_loop_runtime_test.cc
index 81f3be3..47e56b8 100644
--- a/aos/events/event_loop_runtime_test.cc
+++ b/aos/events/event_loop_runtime_test.cc
@@ -42,7 +42,9 @@
->Setup(
ping_event_loop->monotonic_now() + std::chrono::milliseconds(10),
std::chrono::milliseconds(10));
+ ASSERT_EQ(starting_count, started_test_count());
factory.Run();
+ ASSERT_EQ(starting_count + 1, started_test_count());
EXPECT_EQ(2, iteration);
}
ASSERT_EQ(starting_count + 1, completed_test_count());
diff --git a/aos/events/event_loop_runtime_test_lib.rs b/aos/events/event_loop_runtime_test_lib.rs
index 62c7584..310d0ec 100644
--- a/aos/events/event_loop_runtime_test_lib.rs
+++ b/aos/events/event_loop_runtime_test_lib.rs
@@ -35,6 +35,10 @@
})
}
+ fn started_test_count() -> u32 {
+ GLOBAL_STATE.with(|g| g.borrow().on_run_count)
+ }
+
pub struct TestApplication<'event_loop> {
_runtime: EventLoopRuntime<'event_loop>,
raw_ping_fetcher: RawFetcher,
@@ -51,8 +55,9 @@
.get_raw_channel("/test", "aos.examples.Pong")
.expect("Should have Pong channel"),
);
+ let on_run = runtime.on_run();
runtime.spawn(async move {
- // TODO(Brian): Wait for OnRun here.
+ on_run.await;
GLOBAL_STATE.with(|g| {
let g = &mut *g.borrow_mut();
assert_eq!(g.creation_count, g.drop_count + 1);
@@ -163,8 +168,9 @@
fn new(mut runtime: EventLoopRuntime<'event_loop>) -> Self {
let mut ping_watcher = runtime.make_watcher::<Ping<'static>>("/test").unwrap();
let mut pong_sender = runtime.make_sender::<Pong<'static>>("/test").unwrap();
+ let on_run = runtime.on_run();
runtime.spawn(async move {
- // TODO(Brian): Wait for OnRun here.
+ on_run.await;
GLOBAL_STATE.with(|g| {
let g = &mut *g.borrow_mut();
assert_eq!(g.creation_count, g.drop_count + 1);
@@ -275,6 +281,7 @@
) -> Box<TypedTestApplication<'static>>;
fn completed_test_count() -> u32;
+ fn started_test_count() -> u32;
}
extern "Rust" {