Merge changes Ida4d8c5a,I2fd90778
* changes:
Add a Rust ping/pong example
Add timers for Rust event loops
diff --git a/aos/events/BUILD b/aos/events/BUILD
index eb77036..d78df3a 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -3,7 +3,7 @@
load("//aos:flatbuffers.bzl", "cc_static_flatbuffer")
load("//aos:config.bzl", "aos_config")
load("//tools/build_rules:autocxx.bzl", "autocxx_library")
-load("@rules_rust//rust:defs.bzl", "rust_doc", "rust_doc_test", "rust_test")
+load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_doc", "rust_doc_test", "rust_test")
package(default_visibility = ["//visibility:public"])
@@ -184,7 +184,6 @@
rust_doc(
name = "event_loop_runtime_doc",
crate = ":event_loop_runtime",
- target_compatible_with = ["@platforms//cpu:x86_64"],
)
rust_doc_test(
@@ -265,6 +264,60 @@
],
)
+rust_binary(
+ name = "ping_rs",
+ srcs = [
+ "ping.rs",
+ ],
+ data = [
+ ":pingpong_config",
+ ],
+ rustc_flags = ["-Crelocation-model=static"],
+ target_compatible_with = select({
+ "//conditions:default": ["//tools/platforms/rust:has_support"],
+ "//tools:has_msan": ["@platforms//:incompatible"],
+ }),
+ deps = [
+ ":event_loop_runtime",
+ ":ping_rust_fbs",
+ ":pong_rust_fbs",
+ ":shm_event_loop_rs",
+ "//aos:configuration_rs",
+ "//aos:configuration_rust_fbs",
+ "//aos:flatbuffers_rs",
+ "//aos:init_rs",
+ "@com_github_google_flatbuffers//rust",
+ "@crate_index//:futures",
+ ],
+)
+
+rust_binary(
+ name = "pong_rs",
+ srcs = [
+ "pong.rs",
+ ],
+ data = [
+ ":pingpong_config",
+ ],
+ rustc_flags = ["-Crelocation-model=static"],
+ target_compatible_with = select({
+ "//conditions:default": ["//tools/platforms/rust:has_support"],
+ "//tools:has_msan": ["@platforms//:incompatible"],
+ }),
+ deps = [
+ ":event_loop_runtime",
+ ":ping_rust_fbs",
+ ":pong_rust_fbs",
+ ":shm_event_loop_rs",
+ "//aos:configuration_rs",
+ "//aos:configuration_rust_fbs",
+ "//aos:flatbuffers_rs",
+ "//aos:init_rs",
+ "@com_github_google_flatbuffers//rust",
+ "@crate_index//:futures",
+ ],
+)
+
aos_config(
name = "aos_config",
src = "aos.json",
diff --git a/aos/events/event_loop_runtime.cc b/aos/events/event_loop_runtime.cc
index 727d806..e3d73c1 100644
--- a/aos/events/event_loop_runtime.cc
+++ b/aos/events/event_loop_runtime.cc
@@ -8,4 +8,22 @@
OnRunForRust::~OnRunForRust() { --runtime_->child_count_; }
bool OnRunForRust::is_running() const { return runtime_->is_running(); }
+std::unique_ptr<TimerForRust> TimerForRust::Make(EventLoopRuntime *runtime) {
+ auto handler = std::unique_ptr<TimerForRust>(new TimerForRust());
+ TimerForRust *inner = handler.get();
+ handler->timer_ = runtime->event_loop()->AddTimer([inner, runtime] {
+ inner->expired_ = true;
+ runtime->DoPoll();
+ });
+ return handler;
+}
+
+bool TimerForRust::Poll() {
+ if (expired_) {
+ // Reset it for next poll.
+ expired_ = false;
+ return true;
+ }
+ return false;
+}
} // namespace aos
diff --git a/aos/events/event_loop_runtime.h b/aos/events/event_loop_runtime.h
index 325560f..7cc551f 100644
--- a/aos/events/event_loop_runtime.h
+++ b/aos/events/event_loop_runtime.h
@@ -6,6 +6,7 @@
// particularly ergonomic for C++. See the Rust wrapper for detailed
// documentation.
+#include <chrono>
#include <memory>
#include <optional>
@@ -139,6 +140,41 @@
EventLoopRuntime *const runtime_;
};
+class TimerForRust {
+ public:
+ static std::unique_ptr<TimerForRust> Make(EventLoopRuntime *runtime);
+
+ TimerForRust(const TimerForRust &) = delete;
+ TimerForRust(TimerForRust &&) = delete;
+
+ TimerForRust &operator=(const TimerForRust &) = delete;
+ TimerForRust &operator=(TimerForRust &&) = delete;
+
+ ~TimerForRust() { timer_->Disable(); }
+
+ void Schedule(int64_t base, int64_t repeat_offset) {
+ timer_->Schedule(
+ monotonic_clock::time_point(std::chrono::nanoseconds(base)),
+ std::chrono::nanoseconds(repeat_offset));
+ }
+
+ void Disable() { timer_->Disable(); }
+
+ bool IsDisabled() const { return timer_->IsDisabled(); }
+
+ void set_name(rust::Str name) { timer_->set_name(RustStrToStringView(name)); }
+ rust::Str name() const { return StringViewToRustStr(timer_->name()); }
+
+ // If true, the timer is expired.
+ bool Poll();
+
+ private:
+ TimerForRust() = default;
+
+ TimerHandler *timer_;
+ bool expired_ = false;
+};
+
class EventLoopRuntime {
public:
EventLoopRuntime(EventLoop *event_loop) : event_loop_(event_loop) {}
@@ -199,8 +235,11 @@
OnRunForRust MakeOnRun() { return OnRunForRust(this); }
+ std::unique_ptr<TimerForRust> AddTimer() { return TimerForRust::Make(this); }
+
private:
friend class OnRunForRust;
+ friend class TimerForRust;
// Polls the top-level future once. This is what all the callbacks should do.
void DoPoll() {
diff --git a/aos/events/event_loop_runtime.rs b/aos/events/event_loop_runtime.rs
index 76c1e2e..35a4225 100644
--- a/aos/events/event_loop_runtime.rs
+++ b/aos/events/event_loop_runtime.rs
@@ -47,6 +47,7 @@
future::Future,
marker::PhantomData,
mem::ManuallyDrop,
+ ops::Add,
panic::{catch_unwind, AssertUnwindSafe},
pin::Pin,
slice,
@@ -84,6 +85,7 @@
generate!("aos::OnRunForRust")
generate!("aos::EventLoopRuntime")
generate!("aos::ExitHandle")
+generate!("aos::TimerForRust")
subclass!("aos::ApplicationFuture", RustApplicationFuture)
@@ -681,6 +683,115 @@
pub fn is_running(&self) -> bool {
self.0.is_running()
}
+
+ /// Returns an unarmed timer.
+ pub fn add_timer(&mut self) -> Timer {
+ Timer(self.0.as_mut().AddTimer())
+ }
+
+ /// Returns a timer that goes off every `duration`-long ticks.
+ pub fn add_interval(&mut self, duration: Duration) -> Timer {
+ let mut timer = self.add_timer();
+ timer.setup(self.monotonic_now(), Some(duration));
+ timer
+ }
+}
+
+/// An event loop primitive that allows sleeping asynchronously.
+///
+/// # Examples
+///
+/// ```no_run
+/// # use aos_events_event_loop_runtime::EventLoopRuntime;
+/// # use std::time::Duration;
+/// # fn compile_check(runtime: &mut EventLoopRuntime<'_>) {
+/// # let mut timer = runtime.add_timer();
+/// // Goes as soon as awaited.
+/// timer.setup(runtime.monotonic_now(), None);
+/// // Goes off once in 2 seconds.
+/// timer.setup(runtime.monotonic_now() + Duration::from_secs(2), None);
+/// // Goes off as soon as awaited and every 2 seconds afterwards.
+/// timer.setup(runtime.monotonic_now(), Some(Duration::from_secs(1)));
+/// async {
+/// for i in 0..10 {
+/// timer.tick().await;
+/// }
+/// // Timer won't off anymore. Next `tick` will never return.
+/// timer.disable();
+/// timer.tick().await;
+/// };
+/// # }
+/// ```
+pub struct Timer(UniquePtr<ffi::aos::TimerForRust>);
+
+/// A "tick" for a [`Timer`].
+///
+/// This is the raw future generated by the [`Timer::tick`] function.
+pub struct TimerTick<'a>(&'a mut Timer);
+
+impl Timer {
+ /// Arms the timer.
+ ///
+ /// The timer should sleep until `base`, `base + repeat`, `base + repeat * 2`, ...
+ /// If `repeat` is `None`, then the timer only expires once at `base`.
+ pub fn setup(&mut self, base: MonotonicInstant, repeat: Option<Duration>) {
+ self.0.pin_mut().Schedule(
+ base.0,
+ repeat
+ .unwrap_or(Duration::from_nanos(0))
+ .as_nanos()
+ .try_into()
+ .expect("Out of range: Internal clock uses 64 bits"),
+ );
+ }
+
+ /// Disarms the timer.
+ ///
+ /// Can be re-enabled by calling `setup` again.
+ pub fn disable(&mut self) {
+ self.0.pin_mut().Disable();
+ }
+
+ /// Returns `true` if the timer is enabled.
+ pub fn is_enabled(&self) -> bool {
+ !self.0.IsDisabled()
+ }
+
+ /// Sets the name of the timer.
+ ///
+ /// This can be useful to get a descriptive name in the timing reports.
+ pub fn set_name(&mut self, name: &str) {
+ self.0.pin_mut().set_name(name);
+ }
+
+ /// Gets the name of the timer.
+ pub fn name(&self) -> &str {
+ self.0.name()
+ }
+
+ /// Returns a tick which can be `.await`ed.
+ ///
+ /// This tick will resolve on the next timer expired.
+ pub fn tick(&mut self) -> TimerTick {
+ TimerTick(self)
+ }
+
+ /// Polls the timer, returning `[Poll::Ready]` only once the timer expired.
+ fn poll(&mut self) -> Poll<()> {
+ if self.0.pin_mut().Poll() {
+ Poll::Ready(())
+ } else {
+ Poll::Pending
+ }
+ }
+}
+
+impl Future for TimerTick<'_> {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, _: &mut std::task::Context) -> Poll<()> {
+ self.0.poll()
+ }
}
/// Provides async blocking access to messages on a channel. This will return every message on the
@@ -1342,6 +1453,14 @@
}
}
+impl Add<Duration> for MonotonicInstant {
+ type Output = MonotonicInstant;
+
+ fn add(self, rhs: Duration) -> Self::Output {
+ Self(self.0 + i64::try_from(rhs.as_nanos()).unwrap())
+ }
+}
+
impl fmt::Debug for MonotonicInstant {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.duration_since_epoch().fmt(f)
diff --git a/aos/events/ping.rs b/aos/events/ping.rs
new file mode 100644
index 0000000..b9725b8
--- /dev/null
+++ b/aos/events/ping.rs
@@ -0,0 +1,87 @@
+use aos_configuration as config;
+use aos_events_event_loop_runtime::{EventLoopRuntime, Sender, Watcher};
+use aos_events_shm_event_loop::ShmEventLoop;
+use core::cell::Cell;
+use core::future::Future;
+use core::time::Duration;
+use futures::never::Never;
+use std::path::Path;
+
+use ping_rust_fbs::aos::examples as ping;
+use pong_rust_fbs::aos::examples as pong;
+
+fn main() {
+ aos_init::init();
+ let config = config::read_config_from(Path::new("pingpong_config.json")).unwrap();
+ let ping = PingTask::new();
+ ShmEventLoop::new(&config).run_with(|runtime| {
+ let task = ping.tasks(runtime);
+ runtime.spawn(task);
+ });
+}
+
+#[derive(Debug)]
+struct PingTask {
+ counter: Cell<i32>,
+}
+
+impl PingTask {
+ pub fn new() -> Self {
+ Self {
+ counter: Cell::new(0),
+ }
+ }
+
+ /// Returns a future with all the tasks for the ping process
+ pub fn tasks(&self, event_loop: &mut EventLoopRuntime) -> impl Future<Output = Never> + '_ {
+ let ping = self.ping(event_loop);
+ let handle_pong = self.handle_pong(event_loop);
+
+ async move {
+ futures::join!(ping, handle_pong);
+ unreachable!("Let's hope `never_type` gets stabilized soon :)");
+ }
+ }
+
+ fn ping(&self, event_loop: &mut EventLoopRuntime) -> impl Future<Output = Never> + '_ {
+ // The sender is used to send messages back to the pong channel.
+ let mut ping_sender: Sender<ping::Ping> = event_loop.make_sender("/test").unwrap();
+ let startup = event_loop.on_run();
+
+ let mut interval = event_loop.add_interval(Duration::from_secs(1));
+
+ async move {
+ // Wait for startup.
+ startup.await;
+ loop {
+ interval.tick().await;
+ self.counter.set(self.counter.get() + 1);
+ let mut builder = ping_sender.make_builder();
+ let mut ping = ping::PingBuilder::new(builder.fbb());
+ let iter = self.counter.get();
+ ping.add_value(iter);
+ let ping = ping.finish();
+ builder.send(ping).expect("Can't send ping");
+ }
+ }
+ }
+
+ fn handle_pong(&self, event_loop: &mut EventLoopRuntime) -> impl Future<Output = Never> + '_ {
+ // The watcher gives us incoming ping messages.
+ let mut pong_watcher: Watcher<pong::Pong> = event_loop.make_watcher("/test").unwrap();
+ let startup = event_loop.on_run();
+
+ async move {
+ // Wait for startup.
+ startup.await;
+ loop {
+ let pong = dbg!(pong_watcher.next().await);
+ assert_eq!(
+ pong.message().unwrap().value(),
+ self.counter.get(),
+ "Missed a reply"
+ );
+ }
+ }
+ }
+}
diff --git a/aos/events/pong.rs b/aos/events/pong.rs
new file mode 100644
index 0000000..b817ac2
--- /dev/null
+++ b/aos/events/pong.rs
@@ -0,0 +1,42 @@
+use aos_configuration as config;
+use aos_events_event_loop_runtime::{EventLoopRuntime, Sender, Watcher};
+use aos_events_shm_event_loop::ShmEventLoop;
+use core::future::Future;
+use futures::never::Never;
+use std::path::Path;
+
+use ping_rust_fbs::aos::examples as ping;
+use pong_rust_fbs::aos::examples as pong;
+
+fn main() {
+ aos_init::init();
+ let config = config::read_config_from(Path::new("pingpong_config.json")).unwrap();
+ ShmEventLoop::new(&config).run_with(|runtime| {
+ let task = pong(runtime);
+ runtime.spawn(task);
+ });
+}
+
+/// Responds to ping messages with an equivalent pong.
+fn pong(event_loop: &mut EventLoopRuntime) -> impl Future<Output = Never> {
+ // The watcher gives us incoming ping messages.
+ let mut ping_watcher: Watcher<ping::Ping> = event_loop.make_watcher("/test").unwrap();
+
+ // The sender is used to send messages back to the pong channel.
+ let mut pong_sender: Sender<pong::Pong> = event_loop.make_sender("/test").unwrap();
+ // Wait for startup.
+ let startup = event_loop.on_run();
+
+ async move {
+ startup.await;
+ loop {
+ let ping = dbg!(ping_watcher.next().await);
+
+ let mut builder = pong_sender.make_builder();
+ let mut pong = pong::PongBuilder::new(builder.fbb());
+ pong.add_value(ping.message().unwrap().value());
+ let pong = pong.finish();
+ builder.send(pong).expect("Can't send pong reponse");
+ }
+ }
+}
diff --git a/aos/init.rs b/aos/init.rs
index 8a5f262..9ac62f1 100644
--- a/aos/init.rs
+++ b/aos/init.rs
@@ -16,6 +16,12 @@
///
/// Panics if non-test initialization has already been performed.
pub fn test_init() {
+ init();
+ // TODO(Brian): Do we want any of the other stuff that `:gtest_main` has?
+ // TODO(Brian): Call `aos::SetShmBase` like `:gtest_main` does.
+}
+
+pub fn init() {
static ONCE: Once = Once::new();
ONCE.call_once(|| {
let argv0 = std::env::args().next().expect("must have argv[0]");
@@ -23,7 +29,4 @@
// SAFETY: argv0 is a NUL-terminated string.
unsafe { ffi::aos::InitFromRust(argv0.as_ptr()) };
});
-
- // TODO(Brian): Do we want any of the other stuff that `:gtest_main` has?
- // TODO(Brian): Call `aos::SetShmBase` like `:gtest_main` does.
}