Create an async Rust runtime with EventLoop
This is the API I'm envisioning Rust applications using. It doesn't have
everything wrapped yet, but it's enough to write some tests. It also
doesn't have an API managing the types like the C++ templates yet.
Change-Id: I3389cf18ea99f18c14dc8c4e732156d3cf8c8012
Signed-off-by: Brian Silverman <bsilver16384@gmail.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index deca15b..123a832 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -1,6 +1,8 @@
-load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_ts_library")
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_rust_library", "flatbuffer_ts_library")
load("//aos:flatbuffers.bzl", "cc_static_flatbuffer")
load("//aos:config.bzl", "aos_config")
+load("//tools/build_rules:autocxx.bzl", "autocxx_library")
+load("@rules_rust//rust:defs.bzl", "rust_doc_test")
package(default_visibility = ["//visibility:public"])
@@ -46,6 +48,18 @@
target_compatible_with = ["@platforms//os:linux"],
)
+flatbuffer_rust_library(
+ name = "ping_rust_fbs",
+ srcs = ["ping.fbs"],
+ target_compatible_with = ["//tools/platforms/rust:has_support"],
+)
+
+flatbuffer_rust_library(
+ name = "pong_rust_fbs",
+ srcs = ["pong.fbs"],
+ target_compatible_with = ["//tools/platforms/rust:has_support"],
+)
+
flatbuffer_cc_library(
name = "pong_fbs",
srcs = ["pong.fbs"],
@@ -107,6 +121,79 @@
)
cc_library(
+ name = "event_loop_runtime_cc",
+ hdrs = [
+ "event_loop_runtime.h",
+ ],
+ deps = [
+ ":event_loop",
+ "//aos:for_rust",
+ "//third_party/cargo:cxx_cc",
+ ],
+)
+
+autocxx_library(
+ name = "event_loop_runtime",
+ srcs = ["event_loop_runtime.rs"],
+ crate_name = "aos_events_event_loop_runtime",
+ libs = [
+ ":event_loop_runtime_cc",
+ ],
+ override_cc_toolchain = "@llvm_toolchain//:cc-clang-x86_64-linux",
+ rs_deps = [
+ "@com_github_google_flatbuffers//rust",
+ "//third_party/cargo:uuid",
+ "//third_party/cargo:futures",
+ "//third_party/cargo:thiserror",
+ ],
+ target_compatible_with = ["//tools/platforms/rust:has_support"],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//aos:configuration_rs",
+ "//aos:uuid_rs",
+ ],
+)
+
+rust_doc_test(
+ name = "event_loop_runtime_doc_test",
+ crate = ":event_loop_runtime",
+ deps = [
+ ":pong_rust_fbs",
+ ],
+)
+
+autocxx_library(
+ name = "event_loop_runtime_test_lib_rs",
+ testonly = True,
+ srcs = ["event_loop_runtime_test_lib.rs"],
+ libs = [
+ ":event_loop",
+ ],
+ override_cc_toolchain = "@llvm_toolchain//:cc-clang-x86_64-linux",
+ rs_deps = [
+ ":event_loop_runtime",
+ ":ping_rust_fbs",
+ ":pong_rust_fbs",
+ ],
+ target_compatible_with = ["//tools/platforms/rust:has_support"],
+)
+
+cc_test(
+ name = "event_loop_runtime_test",
+ srcs = ["event_loop_runtime_test.cc"],
+ data = [":pingpong_config"],
+ deps = [
+ ":event_loop_runtime_test_lib_rs",
+ ":ping_fbs",
+ ":pong_fbs",
+ ":simulated_event_loop",
+ "//aos/testing:googletest",
+ "//aos/testing:path",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
+cc_library(
name = "ping_lib",
srcs = [
"ping_lib.cc",
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index ff1183f..ab5d868 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -159,6 +159,8 @@
virtual ~RawSender();
+ // Returns the buffer to write new messages into. This is always valid, and
+ // may change after calling any of the Send functions.
virtual void *data() = 0;
virtual size_t size() = 0;
diff --git a/aos/events/event_loop_runtime.h b/aos/events/event_loop_runtime.h
new file mode 100644
index 0000000..df76d67
--- /dev/null
+++ b/aos/events/event_loop_runtime.h
@@ -0,0 +1,193 @@
+#ifndef AOS_EVENTS_EVENT_LOOP_RUNTIME_H_
+#define AOS_EVENTS_EVENT_LOOP_RUNTIME_H_
+
+// Exposes the primitives to implement an async Rust runtime on top of an
+// EventLoop. This is not intended to be used directly, so the APIs are not
+// particularly ergonomic for C++. See the Rust wrapper for detailed
+// documentation.
+
+#include <memory>
+#include <optional>
+
+#include "aos/events/event_loop.h"
+#include "aos/for_rust.h"
+#include "cxx.h"
+
+namespace aos {
+
+// An alternative version of Context to feed autocxx, to work around
+// https://github.com/google/autocxx/issues/787.
+/// <div rustbindgen replaces="aos::Context"></div>
+struct RustContext {
+ int64_t monotonic_event_time;
+ int64_t realtime_event_time;
+
+ int64_t monotonic_remote_time;
+ int64_t realtime_remote_time;
+
+ uint32_t queue_index;
+ uint32_t remote_queue_index;
+
+ size_t size;
+ const void *data;
+
+ int buffer_index;
+
+ // Work around https://github.com/google/autocxx/issues/266.
+ uint8_t source_boot_uuid[16];
+};
+
+static_assert(sizeof(Context) == sizeof(RustContext));
+static_assert(alignof(Context) == alignof(RustContext));
+static_assert(offsetof(Context, monotonic_event_time) ==
+ offsetof(RustContext, monotonic_event_time));
+static_assert(offsetof(Context, realtime_event_time) ==
+ offsetof(RustContext, realtime_event_time));
+static_assert(offsetof(Context, monotonic_remote_time) ==
+ offsetof(RustContext, monotonic_remote_time));
+static_assert(offsetof(Context, realtime_remote_time) ==
+ offsetof(RustContext, realtime_remote_time));
+static_assert(offsetof(Context, queue_index) ==
+ offsetof(RustContext, queue_index));
+static_assert(offsetof(Context, remote_queue_index) ==
+ offsetof(RustContext, remote_queue_index));
+static_assert(offsetof(Context, size) == offsetof(RustContext, size));
+static_assert(offsetof(Context, data) == offsetof(RustContext, data));
+static_assert(offsetof(Context, buffer_index) ==
+ offsetof(RustContext, buffer_index));
+static_assert(offsetof(Context, source_boot_uuid) ==
+ offsetof(RustContext, source_boot_uuid));
+static_assert(sizeof(Context::source_boot_uuid) ==
+ sizeof(RustContext::source_boot_uuid));
+static_assert(sizeof(RustContext) == sizeof(Context),
+ "Update this when adding or removing fields");
+
+// Similar to Rust's `Future<Output = Never>`.
+class ApplicationFuture {
+ public:
+ ApplicationFuture() = default;
+ virtual ~ApplicationFuture() = default;
+
+ // Calls a Rust `Future::poll`, with a waker that will panic if used. Because
+ // our Future's Output is Never, the inner Rust implementation can only return
+ // Poll::Pending, which is equivalent to void.
+ virtual void Poll() = 0;
+};
+
+// Similar to Rust's `Stream<Item = const Option&>`.
+class WatcherForRust {
+ public:
+ WatcherForRust(std::unique_ptr<RawFetcher> fetcher)
+ : fetcher_(std::move(fetcher)) {}
+ ~WatcherForRust() = default;
+
+ const Context *PollNext() {
+ if (!fetcher_->FetchNext()) {
+ return nullptr;
+ }
+ return &fetcher_->context();
+ }
+
+ private:
+ const std::unique_ptr<RawFetcher> fetcher_;
+};
+
+class SenderForRust {
+ public:
+ SenderForRust(std::unique_ptr<RawSender> sender)
+ : sender_(std::move(sender)) {}
+ ~SenderForRust() = default;
+
+ uint8_t *data() { return reinterpret_cast<uint8_t *>(sender_->data()); }
+ size_t size() { return sender_->size(); }
+ RawSender::Error SendBuffer(size_t size) { return sender_->Send(size); }
+ RawSender::Error CopyAndSend(const uint8_t *data, size_t size) {
+ return sender_->Send(data, size);
+ }
+
+ private:
+ const std::unique_ptr<RawSender> sender_;
+};
+
+class FetcherForRust {
+ public:
+ FetcherForRust(std::unique_ptr<RawFetcher> fetcher)
+ : fetcher_(std::move(fetcher)) {}
+ ~FetcherForRust() = default;
+
+ bool FetchNext() { return fetcher_->FetchNext(); }
+ bool Fetch() { return fetcher_->Fetch(); }
+
+ const Context &context() const { return fetcher_->context(); }
+
+ private:
+ const std::unique_ptr<RawFetcher> fetcher_;
+};
+
+class EventLoopRuntime {
+ public:
+ EventLoopRuntime(EventLoop *event_loop) : event_loop_(event_loop) {}
+ ~EventLoopRuntime() = default;
+
+ EventLoop *event_loop() { return event_loop_; }
+
+ void spawn(std::unique_ptr<ApplicationFuture> task) {
+ CHECK(!task_) << ": May only call spawn once";
+ task_ = std::move(task);
+ // TODO(Brian): Do this once we've got OnRun support.
+ // DoPoll();
+ // TODO(Brian): Once we have OnRun support, should this move there or stay
+ // here unconditionally?
+ event_loop_->OnRun([this] { DoPoll(); });
+ }
+
+ const Configuration *configuration() const {
+ return event_loop_->configuration();
+ }
+ const Node *node() const { return event_loop_->node(); }
+
+ // autocxx generates broken C++ code for `time_point`, see
+ // https://github.com/google/autocxx/issues/787.
+ int64_t monotonic_now() const {
+ return std::chrono::nanoseconds(
+ event_loop_->monotonic_now().time_since_epoch())
+ .count();
+ }
+ int64_t realtime_now() const {
+ return std::chrono::nanoseconds(
+ event_loop_->realtime_now().time_since_epoch())
+ .count();
+ }
+
+ rust::Str name() const { return StringViewToRustStr(event_loop_->name()); }
+
+ WatcherForRust MakeWatcher(const Channel *channel) {
+ event_loop_->MakeRawNoArgWatcher(channel,
+ [this](const Context &) { DoPoll(); });
+ return WatcherForRust(event_loop_->MakeRawFetcher(channel));
+ }
+
+ SenderForRust MakeSender(const Channel *channel) {
+ return SenderForRust(event_loop_->MakeRawSender(channel));
+ }
+
+ FetcherForRust MakeFetcher(const Channel *channel) {
+ return FetcherForRust(event_loop_->MakeRawFetcher(channel));
+ }
+
+ private:
+ // Polls the top-level future once. This is what all the callbacks should do.
+ void DoPoll() {
+ if (task_) {
+ task_->Poll();
+ }
+ }
+
+ EventLoop *const event_loop_;
+
+ std::unique_ptr<ApplicationFuture> task_;
+};
+
+} // namespace aos
+
+#endif // AOS_EVENTS_EVENT_LOOP_RUNTIME_H_
diff --git a/aos/events/event_loop_runtime.rs b/aos/events/event_loop_runtime.rs
new file mode 100644
index 0000000..6daa4eb
--- /dev/null
+++ b/aos/events/event_loop_runtime.rs
@@ -0,0 +1,759 @@
+#![warn(unsafe_op_in_unsafe_fn)]
+
+//! This module provides a Rust async runtime on top of the C++ `aos::EventLoop` interface.
+//!
+//! # Rust async with `aos::EventLoop`
+//!
+//! The async runtimes we create are not general-purpose. They may only await the objects provided
+//! by this module. Awaiting anything else will hang, until it is woken which will panic. Also,
+//! doing any long-running task (besides await) will block the C++ EventLoop thread, which is
+//! usually bad.
+//!
+//! ## Multiple tasks
+//!
+//! This runtime only supports a single task (aka a single [`Future`]) at a time. For many use
+//! cases, this is sufficient. If you want more than that, one of these may be appropriate:
+//!
+//! 1. If you have a small number of tasks determined at compile time, [`futures::join`] can await
+//! them all simultaneously.
+//! 2. [`futures::stream::FuturesUnordered`] can wait on a variable number of futures. It also
+//! supports adding them at runtime. Consider something like
+//! `FuturesUnordered<Pin<Box<dyn Future<Output = ()>>>` if you want a generic "container of any
+//! future".
+//! 3. Multiple applications are better suited to multiple `EventLoopRuntime`s, on separate
+//! `aos::EventLoop`s. Otherwise they can't send messages to each other, among other
+//! restrictions. https://github.com/frc971/971-Robot-Code/issues/12 covers creating an adapter
+//! that provides multiple `EventLoop`s on top of a single underlying implementation.
+//!
+//! ## Design
+//!
+//! The design of this is tricky. This is a complicated API interface between C++ and Rust. The big
+//! considerations in arriving at this design include:
+//! * `EventLoop` implementations alias the objects they're returning from C++, which means
+//! creating Rust unique references to them is unsound. See
+//! https://github.com/google/autocxx/issues/1146 for details.
+//! * For various reasons autocxx can't directly wrap APIs using types ergonomic for C++. This and
+//! the previous point mean we wrap all of the C++ objects specifically for this class.
+//! * Keeping track of all the lifetimes and creating appropriate references for the callbacks is
+//! really hard in Rust. Even doing it for the library implementation turned out to be hard
+//! enough to look for alternatives. I think you'd have to make extensive use of pointers, but
+//! Rust makes that hard, and it's easy to create references in ways that violate Rust's
+//! aliasing rules.
+//! * We can't use [`futures::stream::Stream`] and all of its nice [`futures::stream::StreamExt`]
+//! helpers for watchers because we need lifetime-generic `Item` types. Effectively we're making
+//! a lending stream. This is very close to lending iterators, which is one of the motivating
+//! examples for generic associated types (https://github.com/rust-lang/rust/issues/44265).
+
+use std::{fmt, future::Future, marker::PhantomData, pin::Pin, slice, task::Poll, time::Duration};
+
+use autocxx::{
+ subclass::{is_subclass, CppSubclass},
+ WithinBox,
+};
+use cxx::UniquePtr;
+use futures::{future::FusedFuture, never::Never};
+use thiserror::Error;
+use uuid::Uuid;
+
+pub use aos_configuration::{Channel, ChannelLookupError, Configuration, ConfigurationExt, Node};
+pub use aos_uuid::UUID;
+
+autocxx::include_cpp! (
+#include "aos/events/event_loop_runtime.h"
+
+safety!(unsafe)
+
+generate_pod!("aos::Context")
+generate!("aos::WatcherForRust")
+generate!("aos::RawSender_Error")
+generate!("aos::SenderForRust")
+generate!("aos::FetcherForRust")
+generate!("aos::EventLoopRuntime")
+
+subclass!("aos::ApplicationFuture", RustApplicationFuture)
+
+extern_cpp_type!("aos::Configuration", crate::Configuration)
+extern_cpp_type!("aos::Channel", crate::Channel)
+extern_cpp_type!("aos::Node", crate::Node)
+extern_cpp_type!("aos::UUID", crate::UUID)
+);
+
+pub type EventLoop = ffi::aos::EventLoop;
+
+/// # Safety
+///
+/// This should have a `'event_loop` lifetime and `future` should include that in its type, but
+/// autocxx's subclass doesn't support that. Even if it did, it wouldn't be enforced. C++ is
+/// enforcing the lifetime: it destroys this object along with the C++ `EventLoopRuntime`, which
+/// must be outlived by the EventLoop.
+#[doc(hidden)]
+#[is_subclass(superclass("aos::ApplicationFuture"))]
+pub struct RustApplicationFuture {
+ /// This logically has a `'event_loop` bound, see the class comment for details.
+ future: Pin<Box<dyn Future<Output = Never>>>,
+}
+
+impl ffi::aos::ApplicationFuture_methods for RustApplicationFuture {
+ fn Poll(&mut self) {
+ // This is always allowed because it can never create a value of type `Ready<Never>` to
+ // return, so it must always return `Pending`. That also means the value it returns doesn't
+ // mean anything, so we ignore it.
+ let _ =
+ Pin::new(&mut self.future).poll(&mut std::task::Context::from_waker(&panic_waker()));
+ }
+}
+
+impl RustApplicationFuture {
+ pub fn new<'event_loop>(
+ future: impl Future<Output = Never> + 'event_loop,
+ ) -> UniquePtr<ffi::aos::ApplicationFuture> {
+ /// # Safety
+ ///
+ /// This completely removes the `'event_loop` lifetime, the caller must ensure that is
+ /// sound.
+ unsafe fn remove_lifetime<'event_loop>(
+ future: Pin<Box<dyn Future<Output = Never> + 'event_loop>>,
+ ) -> Pin<Box<dyn Future<Output = Never>>> {
+ // SAFETY: Caller is responsible.
+ unsafe { std::mem::transmute(future) }
+ }
+
+ Self::as_ApplicationFuture_unique_ptr(Self::new_cpp_owned(Self {
+ // SAFETY: C++ manages observing the lifetime, see [`RustApplicationFuture`] for
+ // details.
+ future: unsafe { remove_lifetime(Box::pin(future)) },
+ cpp_peer: Default::default(),
+ }))
+ }
+}
+
+pub struct EventLoopRuntime<'event_loop>(
+ Pin<Box<ffi::aos::EventLoopRuntime>>,
+ // This is the lifetime of the underlying EventLoop, which is held in C++ via `.0`.
+ PhantomData<&'event_loop mut ()>,
+);
+
+/// Manages the Rust interface to a *single* `aos::EventLoop`. This is intended to be used by a
+/// single application.
+impl<'event_loop> EventLoopRuntime<'event_loop> {
+ /// Creates a new runtime. This must be the only user of the underlying `aos::EventLoop`, or
+ /// things may panic unexpectedly.
+ ///
+ /// Call [`spawn`] to respond to events. The non-event-driven APIs may be used without calling
+ /// this.
+ ///
+ /// This is an async runtime, but it's a somewhat unusual one. See the module-level
+ /// documentation for details.
+ ///
+ /// # Safety
+ ///
+ /// `event_loop` must be valid for `'event_loop`. Effectively we want the argument to be
+ /// `&'event_loop mut EventLoop`, but we can't do that (see the module-level documentation for
+ /// details).
+ ///
+ /// This is a tricky thing to guarantee, be very cautious calling this function. It's an unbound
+ /// lifetime so you should probably wrap it in a function that directly attaches a known
+ /// lifetime. One common pattern is calling this in the constructor of an object whose lifetime
+ /// is managed by C++; C++ doesn't inherit the Rust lifetime but we do have a lot of C++ code
+ /// that obeys the rule of destroying the object before the EventLoop, which is equivalent to
+ /// this restriction.
+ ///
+ /// In Rust terms, this is equivalent to storing `event_loop` in the returned object, which
+ /// will dereference it throughout its lifetime, and the caller must guarantee this is sound.
+ pub unsafe fn new(event_loop: *mut ffi::aos::EventLoop) -> Self {
+ Self(
+ // SAFETY: We push all the validity requirements for this up to our caller.
+ unsafe { ffi::aos::EventLoopRuntime::new(event_loop) }.within_box(),
+ PhantomData,
+ )
+ }
+
+ /// Returns the pointer passed into the constructor.
+ ///
+ /// The returned value should only be used for destroying it (_after_ `self` is dropped) or
+ /// calling other C++ APIs.
+ pub fn raw_event_loop(&mut self) -> *mut ffi::aos::EventLoop {
+ self.0.as_mut().event_loop()
+ }
+
+ // TODO(Brian): Expose `name`. Need to sort out the lifetimes. C++ can reallocate the pointer
+ // independent of Rust. Use it in `get_raw_channel` instead of passing the name in.
+
+ pub fn get_raw_channel(
+ &self,
+ name: &str,
+ typename: &str,
+ application_name: &str,
+ ) -> Result<&'event_loop Channel, ChannelLookupError> {
+ self.configuration()
+ .get_channel(name, typename, application_name, self.node())
+ }
+
+ // TODO(Brian): `get_channel<T>`.
+
+ /// Starts running the given `task`, which may not return (as specified by its type). If you
+ /// want your task to stop, return the result of awaiting [`futures::future::pending`], which
+ /// will never complete. `task` will not be polled after the underlying `aos::EventLoop` exits.
+ ///
+ /// TODO(Brian): Make this paragraph true:
+ /// Note that task will be polled immediately. If you want to defer work until the event loop
+ /// starts running, await TODO in the task.
+ ///
+ /// # Panics
+ ///
+ /// Panics if called more than once. See the module-level documentation for alternatives if you
+ /// want to do this.
+ ///
+ /// # Examples with interesting return types
+ ///
+ /// These are all valid futures which never return:
+ /// ```
+ /// # fn compile_check(mut runtime: aos_events_event_loop_runtime::EventLoopRuntime) {
+ /// # use futures::{never::Never, future::pending};
+ /// async fn pending_wrapper() -> Never {
+ /// pending().await
+ /// }
+ /// async fn loop_forever() -> Never {
+ /// loop {}
+ /// }
+ ///
+ /// runtime.spawn(pending());
+ /// runtime.spawn(async { pending().await });
+ /// runtime.spawn(pending_wrapper());
+ /// runtime.spawn(async { loop {} });
+ /// runtime.spawn(loop_forever());
+ /// runtime.spawn(async { println!("all done"); pending().await });
+ /// # }
+ /// ```
+ /// but this is not:
+ /// ```compile_fail
+ /// # fn compile_check(mut runtime: aos_events_event_loop_runtime::EventLoopRuntime) {
+ /// # use futures::ready;
+ /// runtime.spawn(ready());
+ /// # }
+ /// ```
+ /// and neither is this:
+ /// ```compile_fail
+ /// # fn compile_check(mut runtime: aos_events_event_loop_runtime::EventLoopRuntime) {
+ /// # use futures::ready;
+ /// runtime.spawn(async { println!("all done") });
+ /// # }
+ /// ```
+ ///
+ /// # Examples with capturing
+ ///
+ /// The future can capture things. This is important to access other objects created from the
+ /// runtime, either before calling this function:
+ /// ```
+ /// # fn compile_check<'event_loop>(
+ /// # mut runtime: aos_events_event_loop_runtime::EventLoopRuntime<'event_loop>,
+ /// # channel1: &'event_loop aos_events_event_loop_runtime::Channel,
+ /// # channel2: &'event_loop aos_events_event_loop_runtime::Channel,
+ /// # ) {
+ /// let mut watcher1 = runtime.make_raw_watcher(channel1);
+ /// let mut watcher2 = runtime.make_raw_watcher(channel2);
+ /// runtime.spawn(async move { loop {
+ /// watcher1.next().await;
+ /// watcher2.next().await;
+ /// }});
+ /// # }
+ /// ```
+ /// or after:
+ /// ```
+ /// # fn compile_check<'event_loop>(
+ /// # mut runtime: aos_events_event_loop_runtime::EventLoopRuntime<'event_loop>,
+ /// # channel1: &'event_loop aos_events_event_loop_runtime::Channel,
+ /// # channel2: &'event_loop aos_events_event_loop_runtime::Channel,
+ /// # ) {
+ /// # use std::{cell::RefCell, rc::Rc};
+ /// let runtime = Rc::new(RefCell::new(runtime));
+ /// runtime.borrow_mut().spawn({
+ /// let mut runtime = runtime.clone();
+ /// async move {
+ /// let mut runtime = runtime.borrow_mut();
+ /// let mut watcher1 = runtime.make_raw_watcher(channel1);
+ /// let mut watcher2 = runtime.make_raw_watcher(channel2);
+ /// loop {
+ /// watcher1.next().await;
+ /// watcher2.next().await;
+ /// }
+ /// }
+ /// });
+ /// # }
+ /// ```
+ /// or both:
+ /// ```
+ /// # fn compile_check<'event_loop>(
+ /// # mut runtime: aos_events_event_loop_runtime::EventLoopRuntime<'event_loop>,
+ /// # channel1: &'event_loop aos_events_event_loop_runtime::Channel,
+ /// # channel2: &'event_loop aos_events_event_loop_runtime::Channel,
+ /// # ) {
+ /// # use std::{cell::RefCell, rc::Rc};
+ /// let mut watcher1 = runtime.make_raw_watcher(channel1);
+ /// let runtime = Rc::new(RefCell::new(runtime));
+ /// runtime.borrow_mut().spawn({
+ /// let mut runtime = runtime.clone();
+ /// async move {
+ /// let mut runtime = runtime.borrow_mut();
+ /// let mut watcher2 = runtime.make_raw_watcher(channel2);
+ /// loop {
+ /// watcher1.next().await;
+ /// watcher2.next().await;
+ /// }
+ /// }
+ /// });
+ /// # }
+ /// ```
+ ///
+ /// But you cannot capture local variables:
+ /// ```compile_fail
+ /// # fn compile_check<'event_loop>(
+ /// # mut runtime: aos_events_event_loop_runtime::EventLoopRuntime<'event_loop>,
+ /// # ) {
+ /// let mut local: i32 = 971;
+ /// let local = &mut local;
+ /// runtime.spawn(async move { loop {
+ /// println!("have: {}", local);
+ /// }});
+ /// # }
+ /// ```
+ pub fn spawn(&mut self, task: impl Future<Output = Never> + 'event_loop) {
+ self.0.as_mut().spawn(RustApplicationFuture::new(task));
+ }
+
+ pub fn configuration(&self) -> &'event_loop Configuration {
+ // SAFETY: It's always a pointer valid for longer than the underlying EventLoop.
+ unsafe { &*self.0.configuration() }
+ }
+
+ pub fn node(&self) -> Option<&'event_loop Node> {
+ // SAFETY: It's always a pointer valid for longer than the underlying EventLoop, or null.
+ unsafe { self.0.node().as_ref() }
+ }
+
+ pub fn monotonic_now(&self) -> MonotonicInstant {
+ MonotonicInstant(self.0.monotonic_now())
+ }
+
+ /// Note that the `'event_loop` input lifetime is intentional. The C++ API requires that it is
+ /// part of `self.configuration()`, which will always have this lifetime.
+ ///
+ /// # Panics
+ ///
+ /// Dropping `self` before the returned object is dropped will panic.
+ pub fn make_raw_watcher(&mut self, channel: &'event_loop Channel) -> RawWatcher {
+ // SAFETY: `channel` is valid for the necessary lifetime, all other requirements fall under
+ // the usual autocxx heuristics.
+ RawWatcher(unsafe { self.0.as_mut().MakeWatcher(channel) }.within_box())
+ }
+
+ /// Note that the `'event_loop` input lifetime is intentional. The C++ API requires that it is
+ /// part of `self.configuration()`, which will always have this lifetime.
+ ///
+ /// # Panics
+ ///
+ /// Dropping `self` before the returned object is dropped will panic.
+ pub fn make_raw_sender(&mut self, channel: &'event_loop Channel) -> RawSender {
+ // SAFETY: `channel` is valid for the necessary lifetime, all other requirements fall under
+ // the usual autocxx heuristics.
+ RawSender(unsafe { self.0.as_mut().MakeSender(channel) }.within_box())
+ }
+
+ /// Note that the `'event_loop` input lifetime is intentional. The C++ API requires that it is
+ /// part of `self.configuration()`, which will always have this lifetime.
+ ///
+ /// # Panics
+ ///
+ /// Dropping `self` before the returned object is dropped will panic.
+ pub fn make_raw_fetcher(&mut self, channel: &'event_loop Channel) -> RawFetcher {
+ // SAFETY: `channel` is valid for the necessary lifetime, all other requirements fall under
+ // the usual autocxx heuristics.
+ RawFetcher(unsafe { self.0.as_mut().MakeFetcher(channel) }.within_box())
+ }
+
+ // TODO(Brian): Expose timers and phased loops. Should we have `sleep`-style methods for those,
+ // instead of / in addition to mirroring C++ with separate setup and wait?
+
+ // TODO(Brian): Expose OnRun. That should only be called once, so coalesce and have it return
+ // immediately afterwards.
+}
+
+// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
+#[repr(transparent)]
+pub struct RawWatcher(Pin<Box<ffi::aos::WatcherForRust>>);
+
+/// Provides async blocking access to messages on a channel. This will return every message on the
+/// channel, in order.
+///
+/// Use [`EventLoopRuntime::make_raw_watcher`] to create one of these.
+///
+/// This is the non-typed API, which is mainly useful for reflection and does not provide safe APIs
+/// for actually interpreting messages. You probably want a [`Watcher`] instead.
+///
+/// This is the same concept as [`futures::stream::Stream`], but can't follow that API for technical
+/// reasons.
+///
+/// # Design
+///
+/// We can't use [`futures::stream::Stream`] because our `Item` type is `Context<'_>`, which means
+/// it's different for each `self` lifetime so we can't write a single type alias for it. We could
+/// write an intermediate type with a generic lifetime that implements `Stream` and is returned
+/// from a `make_stream` method, but that's what `Stream` is doing in the first place so adding
+/// another level doesn't help anything.
+///
+/// We also drop the extraneous `cx` argument that isn't used by this implementation anyways.
+///
+/// We also run into some limitations in the borrow checker trying to implement `poll`, I think it's
+/// the same one mentioned here:
+/// https://blog.rust-lang.org/2022/08/05/nll-by-default.html#looking-forward-what-can-we-expect-for-the-borrow-checker-of-the-future
+/// We get around that one by moving the unbounded lifetime from the pointer dereference into the
+/// function with the if statement.
+impl RawWatcher {
+ /// Returns a Future to await the next value. This can be canceled (ie dropped) at will,
+ /// without skipping any messages.
+ ///
+ /// Remember not to call `poll` after it returns `Poll::Ready`, just like any other future. You
+ /// will need to call this function again to get the succeeding message.
+ ///
+ /// # Examples
+ ///
+ /// The common use case is immediately awaiting the next message:
+ /// ```
+ /// # async fn await_message(mut watcher: aos_events_event_loop_runtime::RawWatcher) {
+ /// println!("received: {:?}", watcher.next().await);
+ /// # }
+ /// ```
+ ///
+ /// You can also await the first message from any of a set of channels:
+ /// ```
+ /// # async fn select(
+ /// # mut watcher1: aos_events_event_loop_runtime::RawWatcher,
+ /// # mut watcher2: aos_events_event_loop_runtime::RawWatcher,
+ /// # ) {
+ /// futures::select! {
+ /// message1 = watcher1.next() => println!("channel 1: {:?}", message1),
+ /// message2 = watcher2.next() => println!("channel 2: {:?}", message2),
+ /// }
+ /// # }
+ /// ```
+ ///
+ /// Note that due to the returned object borrowing the `self` reference, the borrow checker will
+ /// enforce only having a single of these returned objects at a time. Drop the previous message
+ /// before asking for the next one. That means this will not compile:
+ /// ```compile_fail
+ /// # async fn compile_check(mut watcher: aos_events_event_loop_runtime::RawWatcher) {
+ /// let first = watcher.next();
+ /// let second = watcher.next();
+ /// first.await;
+ /// # }
+ /// ```
+ /// and nor will this:
+ /// ```compile_fail
+ /// # async fn compile_check(mut watcher: aos_events_event_loop_runtime::RawWatcher) {
+ /// let first = watcher.next().await;
+ /// watcher.next();
+ /// println!("still have: {:?}", first);
+ /// # }
+ /// ```
+ /// but this is fine:
+ /// ```
+ /// # async fn compile_check(mut watcher: aos_events_event_loop_runtime::RawWatcher) {
+ /// let first = watcher.next().await;
+ /// println!("have: {:?}", first);
+ /// watcher.next();
+ /// # }
+ /// ```
+ pub fn next(&mut self) -> RawWatcherNext {
+ RawWatcherNext(Some(self))
+ }
+}
+
+/// The type returned from [`RawWatcher::next`], see there for details.
+pub struct RawWatcherNext<'a>(Option<&'a mut RawWatcher>);
+
+impl<'a> Future for RawWatcherNext<'a> {
+ type Output = Context<'a>;
+ fn poll(mut self: Pin<&mut Self>, _: &mut std::task::Context) -> Poll<Context<'a>> {
+ let inner = self
+ .0
+ .take()
+ .expect("May not call poll after it returns Ready");
+ let maybe_context = inner.0.as_mut().PollNext();
+ if maybe_context.is_null() {
+ // We're not returning a reference into it, so we can safely replace the reference to
+ // use again in the future.
+ self.0.replace(inner);
+ Poll::Pending
+ } else {
+ // SAFETY: We just checked if it's null. If not, it will be a valid pointer. It will
+ // remain a valid pointer for the borrow of the underlying `RawWatcher` (ie `'a`)
+ // because we're dropping `inner` (which is that reference), so it will need to be
+ // borrowed again which cannot happen before the end of `'a`.
+ Poll::Ready(Context(unsafe { &*maybe_context }))
+ }
+ }
+}
+
+impl FusedFuture for RawWatcherNext<'_> {
+ fn is_terminated(&self) -> bool {
+ self.0.is_none()
+ }
+}
+
+// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
+#[repr(transparent)]
+pub struct RawFetcher(Pin<Box<ffi::aos::FetcherForRust>>);
+
+/// Provides access to messages on a channel, without the ability to wait for a new one. This
+/// provides APIs to get the latest message at some time, and to follow along and retrieve each
+/// message in order.
+///
+/// Use [`EventLoopRuntime::make_raw_fetcher`] to create one of these.
+///
+/// This is the non-typed API, which is mainly useful for reflection and does not provide safe APIs
+/// for actually interpreting messages. You probably want a [`Fetcher`] instead.
+impl RawFetcher {
+ pub fn fetch_next(&mut self) -> bool {
+ self.0.as_mut().FetchNext()
+ }
+
+ pub fn fetch(&mut self) -> bool {
+ self.0.as_mut().Fetch()
+ }
+
+ pub fn context(&self) -> Context {
+ Context(self.0.context())
+ }
+}
+
+// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
+#[repr(transparent)]
+pub struct RawSender(Pin<Box<ffi::aos::SenderForRust>>);
+
+/// Allows sending messages on a channel.
+///
+/// This is the non-typed API, which is mainly useful for reflection and does not provide safe APIs
+/// for actually creating messages to send. You probably want a [`Sender`] instead.
+///
+/// Use [`EventLoopRuntime::make_raw_sender`] to create one of these.
+impl RawSender {
+ fn buffer(&mut self) -> &mut [u8] {
+ // SAFETY: This is a valid slice, and `u8` doesn't have any alignment requirements.
+ unsafe { slice::from_raw_parts_mut(self.0.as_mut().data(), self.0.as_mut().size()) }
+ }
+
+ /// Returns an object which can be used to build a message.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use pong_rust_fbs::aos::examples::PongBuilder;
+ /// # fn compile_check(mut sender: aos_events_event_loop_runtime::RawSender) {
+ /// # unsafe {
+ /// let mut builder = sender.make_builder();
+ /// let pong = PongBuilder::new(builder.fbb()).finish();
+ /// builder.send(pong);
+ /// # }
+ /// # }
+ /// ```
+ ///
+ /// You can bail out of building a message and build another one:
+ /// ```
+ /// # use pong_rust_fbs::aos::examples::PongBuilder;
+ /// # fn compile_check(mut sender: aos_events_event_loop_runtime::RawSender) {
+ /// # unsafe {
+ /// let mut builder1 = sender.make_builder();
+ /// builder1.fbb();
+ /// let mut builder2 = sender.make_builder();
+ /// let pong = PongBuilder::new(builder2.fbb()).finish();
+ /// builder2.send(pong);
+ /// # }
+ /// # }
+ /// ```
+ /// but you cannot build two messages at the same time with a single builder:
+ /// ```compile_fail
+ /// # use pong_rust_fbs::aos::examples::PongBuilder;
+ /// # fn compile_check(mut sender: aos_events_event_loop_runtime::RawSender) {
+ /// # unsafe {
+ /// let mut builder1 = sender.make_builder();
+ /// let mut builder2 = sender.make_builder();
+ /// PongBuilder::new(builder2.fbb()).finish();
+ /// PongBuilder::new(builder1.fbb()).finish();
+ /// # }
+ /// # }
+ /// ```
+ pub fn make_builder(&mut self) -> RawBuilder {
+ // TODO(Brian): Actually use the provided buffer instead of just using its
+ // size to allocate a separate one.
+ //
+ // See https://github.com/google/flatbuffers/issues/7385.
+ let fbb = flatbuffers::FlatBufferBuilder::with_capacity(self.buffer().len());
+ RawBuilder {
+ raw_sender: self,
+ fbb,
+ }
+ }
+}
+
+#[derive(Clone, Copy, Eq, PartialEq, Debug, Error)]
+pub enum SendError {
+ #[error("messages have been sent too fast on this channel")]
+ MessagesSentTooFast,
+ #[error("invalid redzone data, shared memory corruption detected")]
+ InvalidRedzone,
+}
+
+/// Used for building a message. See [`RawSender::make_builder`] for details.
+pub struct RawBuilder<'sender> {
+ raw_sender: &'sender mut RawSender,
+ fbb: flatbuffers::FlatBufferBuilder<'sender>,
+}
+
+impl<'sender> RawBuilder<'sender> {
+ pub fn fbb(&mut self) -> &mut flatbuffers::FlatBufferBuilder<'sender> {
+ &mut self.fbb
+ }
+
+ /// # Safety
+ ///
+ /// `T` must match the type of the channel of the sender this builder was created from.
+ pub unsafe fn send<T>(mut self, root: flatbuffers::WIPOffset<T>) -> Result<(), SendError> {
+ self.fbb.finish_minimal(root);
+ let data = self.fbb.finished_data();
+
+ use ffi::aos::RawSender_Error as FfiError;
+ // SAFETY: This is a valid buffer we're passing.
+ match unsafe {
+ self.raw_sender
+ .0
+ .as_mut()
+ .CopyAndSend(data.as_ptr(), data.len())
+ } {
+ FfiError::kOk => Ok(()),
+ FfiError::kMessagesSentTooFast => Err(SendError::MessagesSentTooFast),
+ FfiError::kInvalidRedzone => Err(SendError::InvalidRedzone),
+ }
+ }
+}
+
+#[repr(transparent)]
+#[derive(Clone, Copy)]
+pub struct Context<'context>(&'context ffi::aos::Context);
+
+impl fmt::Debug for Context<'_> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ // TODO(Brian): Add the realtime timestamps here.
+ f.debug_struct("Context")
+ .field("monotonic_event_time", &self.monotonic_event_time())
+ .field("monotonic_remote_time", &self.monotonic_remote_time())
+ .field("queue_index", &self.queue_index())
+ .field("remote_queue_index", &self.remote_queue_index())
+ .field("size", &self.data().map(|data| data.len()))
+ .field("buffer_index", &self.buffer_index())
+ .field("source_boot_uuid", &self.source_boot_uuid())
+ .finish()
+ }
+}
+
+// TODO(Brian): Add the realtime timestamps here.
+impl<'context> Context<'context> {
+ pub fn monotonic_event_time(self) -> MonotonicInstant {
+ MonotonicInstant(self.0.monotonic_event_time)
+ }
+
+ pub fn monotonic_remote_time(self) -> MonotonicInstant {
+ MonotonicInstant(self.0.monotonic_remote_time)
+ }
+
+ pub fn queue_index(self) -> u32 {
+ self.0.queue_index
+ }
+ pub fn remote_queue_index(self) -> u32 {
+ self.0.remote_queue_index
+ }
+
+ pub fn data(self) -> Option<&'context [u8]> {
+ if self.0.data.is_null() {
+ None
+ } else {
+ // SAFETY:
+ // * `u8` has no alignment requirements
+ // * It must be a single initialized flatbuffers buffer
+ // * The borrow in `self.0` guarantees it won't be modified for `'context`
+ Some(unsafe { slice::from_raw_parts(self.0.data as *const u8, self.0.size) })
+ }
+ }
+
+ pub fn buffer_index(self) -> i32 {
+ self.0.buffer_index
+ }
+
+ pub fn source_boot_uuid(self) -> &'context Uuid {
+ // SAFETY: `self` has a valid C++ object. C++ guarantees that the return value will be
+ // valid until something changes the context, which is `'context`.
+ Uuid::from_bytes_ref(&self.0.source_boot_uuid)
+ }
+}
+
+/// Represents a `aos::monotonic_clock::time_point` in a natural Rust way. This
+/// is intended to have the same API as [`std::time::Instant`], any missing
+/// functionality can be added if useful.
+///
+/// TODO(Brian): Do RealtimeInstant too. Use a macro? Integer as a generic
+/// parameter to distinguish them? Or just copy/paste?
+#[repr(transparent)]
+#[derive(Clone, Copy, Eq, PartialEq)]
+pub struct MonotonicInstant(i64);
+
+impl MonotonicInstant {
+ /// `aos::monotonic_clock::min_time`, commonly used as a sentinel value.
+ pub const MIN_TIME: Self = Self(i64::MIN);
+
+ pub fn is_min_time(self) -> bool {
+ self == Self::MIN_TIME
+ }
+
+ pub fn duration_since_epoch(self) -> Option<Duration> {
+ if self.is_min_time() {
+ None
+ } else {
+ Some(Duration::from_nanos(self.0.try_into().expect(
+ "monotonic_clock::time_point should always be after the epoch",
+ )))
+ }
+ }
+}
+
+impl fmt::Debug for MonotonicInstant {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ self.duration_since_epoch().fmt(f)
+ }
+}
+
+mod panic_waker {
+ use std::task::{RawWaker, RawWakerVTable, Waker};
+
+ unsafe fn clone_panic_waker(_data: *const ()) -> RawWaker {
+ raw_panic_waker()
+ }
+
+ unsafe fn noop(_data: *const ()) {}
+
+ unsafe fn wake_panic(_data: *const ()) {
+ panic!("Nothing should wake EventLoopRuntime's waker");
+ }
+
+ const PANIC_WAKER_VTABLE: RawWakerVTable =
+ RawWakerVTable::new(clone_panic_waker, wake_panic, wake_panic, noop);
+
+ fn raw_panic_waker() -> RawWaker {
+ RawWaker::new(std::ptr::null(), &PANIC_WAKER_VTABLE)
+ }
+
+ pub fn panic_waker() -> Waker {
+ // SAFETY: The implementations of the RawWakerVTable functions do what is required of them.
+ unsafe { Waker::from_raw(raw_panic_waker()) }
+ }
+}
+
+use panic_waker::panic_waker;
diff --git a/aos/events/event_loop_runtime_test.cc b/aos/events/event_loop_runtime_test.cc
new file mode 100644
index 0000000..c27a27f
--- /dev/null
+++ b/aos/events/event_loop_runtime_test.cc
@@ -0,0 +1,61 @@
+#include "aos/events/event_loop_runtime_test_lib_rs_cxxgen.h"
+#include "aos/events/ping_generated.h"
+#include "aos/events/pong_generated.h"
+#include "aos/events/simulated_event_loop.h"
+#include "aos/testing/path.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+namespace aos::events::testing {
+namespace {
+
+void MakeAndTestApplication(int value) {
+ const int32_t starting_count = completed_test_count();
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ aos::testing::ArtifactPath("aos/events/pingpong_config.json"));
+ SimulatedEventLoopFactory factory{&config.message()};
+ const auto ping_event_loop = factory.MakeEventLoop("ping");
+ auto ping_sender = ping_event_loop->MakeSender<examples::Ping>("/test");
+ auto pong_fetcher = ping_event_loop->MakeFetcher<examples::Pong>("/test");
+ const auto rust_event_loop = factory.MakeEventLoop("pong");
+ {
+ auto test_application = make_test_application(rust_event_loop.get());
+ int iteration = 0;
+ ping_event_loop
+ ->AddTimer([&]() {
+ if (iteration++ > 0) {
+ test_application->after_sending();
+ factory.Exit();
+ return;
+ }
+ test_application->before_sending();
+ ASSERT_FALSE(pong_fetcher.Fetch());
+ {
+ auto builder = ping_sender.MakeBuilder();
+ examples::Ping::Builder ping(*builder.fbb());
+ ping.add_value(value);
+ builder.CheckOk(builder.Send(ping.Finish()));
+ }
+ })
+ ->Setup(
+ ping_event_loop->monotonic_now() + std::chrono::milliseconds(10),
+ std::chrono::milliseconds(10));
+ factory.Run();
+ EXPECT_EQ(2, iteration);
+ }
+ ASSERT_EQ(starting_count + 1, completed_test_count());
+ ASSERT_TRUE(pong_fetcher.Fetch());
+ ASSERT_EQ(value, pong_fetcher->value());
+}
+
+} // namespace
+
+TEST(EventLoopRustTest, TestApplicationOnce) { MakeAndTestApplication(971); }
+
+TEST(EventLoopRustTest, TestApplicationTwice) {
+ MakeAndTestApplication(971);
+ MakeAndTestApplication(254);
+}
+
+} // namespace aos::events::testing
diff --git a/aos/events/event_loop_runtime_test_lib.rs b/aos/events/event_loop_runtime_test_lib.rs
new file mode 100644
index 0000000..ffc8ce8
--- /dev/null
+++ b/aos/events/event_loop_runtime_test_lib.rs
@@ -0,0 +1,178 @@
+//! These test helpers have to live in a separate file because autocxx only generates one set of
+//! outputs per file, and that needs to be the non-`#[cfg(test)]` stuff.
+
+use aos_events_event_loop_runtime::{EventLoop, EventLoopRuntime, RawFetcher};
+use ping_rust_fbs::aos::examples::root_as_ping;
+use pong_rust_fbs::aos::examples::PongBuilder;
+
+mod tests {
+ use super::*;
+
+ use std::cell::RefCell;
+
+ #[derive(Debug, Default)]
+ struct GlobalState {
+ creation_count: u32,
+ drop_count: u32,
+ on_run_count: u32,
+ before_count: u32,
+ watcher_count: u32,
+ after_count: u32,
+ }
+
+ thread_local!(static GLOBAL_STATE: RefCell<GlobalState> = Default::default());
+
+ fn completed_test_count() -> u32 {
+ GLOBAL_STATE.with(|g| {
+ let g = &mut *g.borrow_mut();
+ let count = g.creation_count;
+ assert_eq!(count, g.on_run_count);
+ assert_eq!(count, g.before_count);
+ assert_eq!(count, g.watcher_count);
+ assert_eq!(count, g.after_count);
+ assert_eq!(count, g.drop_count);
+ count
+ })
+ }
+
+ pub struct TestApplication<'event_loop> {
+ _runtime: EventLoopRuntime<'event_loop>,
+ raw_ping_fetcher: RawFetcher,
+ }
+
+ impl<'event_loop> TestApplication<'event_loop> {
+ fn new(mut runtime: EventLoopRuntime<'event_loop>) -> Self {
+ let ping_channel = runtime
+ .get_raw_channel("/test", "aos.examples.Ping", "test")
+ .expect("Should have Ping channel");
+ let mut raw_ping_watcher = runtime.make_raw_watcher(ping_channel);
+ let mut raw_pong_sender = runtime.make_raw_sender(
+ runtime
+ .get_raw_channel("/test", "aos.examples.Pong", "test")
+ .expect("Should have Pong channel"),
+ );
+ runtime.spawn(async move {
+ // TODO(Brian): Wait for OnRun here.
+ GLOBAL_STATE.with(|g| {
+ let g = &mut *g.borrow_mut();
+ assert_eq!(g.creation_count, g.drop_count + 1);
+ assert_eq!(g.drop_count, g.on_run_count);
+ assert_eq!(g.drop_count, g.before_count);
+ assert_eq!(g.drop_count, g.watcher_count);
+ assert_eq!(g.drop_count, g.after_count);
+ g.on_run_count += 1;
+ });
+ loop {
+ let context = raw_ping_watcher.next().await;
+ assert!(!context.monotonic_event_time().is_min_time());
+ assert!(!context.data().is_none());
+ GLOBAL_STATE.with(|g| {
+ let g = &mut *g.borrow_mut();
+ assert_eq!(g.creation_count, g.drop_count + 1);
+ assert_eq!(g.creation_count, g.on_run_count);
+ assert_eq!(g.creation_count, g.before_count);
+ assert_eq!(g.drop_count, g.watcher_count);
+ assert_eq!(g.drop_count, g.after_count);
+ g.watcher_count += 1;
+ });
+ let ping = root_as_ping(context.data().expect("should have the data"))
+ .expect("Ping should be valid");
+
+ let mut builder = raw_pong_sender.make_builder();
+ let mut pong = PongBuilder::new(builder.fbb());
+ pong.add_value(ping.value());
+ let pong = pong.finish();
+ // SAFETY: We're sending the correct type here.
+ unsafe { builder.send(pong) }.expect("send should succeed");
+ }
+ });
+ let raw_ping_fetcher = runtime.make_raw_fetcher(ping_channel);
+ Self {
+ _runtime: runtime,
+ raw_ping_fetcher,
+ }
+ }
+
+ fn before_sending(&mut self) {
+ GLOBAL_STATE.with(|g| {
+ let g = &mut *g.borrow_mut();
+ assert_eq!(g.creation_count, g.drop_count + 1);
+ assert_eq!(g.creation_count, g.on_run_count);
+ assert_eq!(g.drop_count, g.before_count);
+ assert_eq!(g.drop_count, g.watcher_count);
+ assert_eq!(g.drop_count, g.after_count);
+ g.before_count += 1;
+ });
+ assert!(
+ !self.raw_ping_fetcher.fetch(),
+ "should not have message yet"
+ );
+ assert!(
+ !self.raw_ping_fetcher.fetch_next(),
+ "should not have message yet"
+ );
+ let context = self.raw_ping_fetcher.context();
+ assert!(context.monotonic_event_time().is_min_time());
+ assert!(context.data().is_none());
+ }
+
+ fn after_sending(&mut self) {
+ GLOBAL_STATE.with(|g| {
+ let g = &mut *g.borrow_mut();
+ assert_eq!(g.creation_count, g.drop_count + 1);
+ assert_eq!(g.creation_count, g.on_run_count);
+ assert_eq!(g.creation_count, g.before_count);
+ assert_eq!(g.creation_count, g.watcher_count);
+ assert_eq!(g.drop_count, g.after_count);
+ g.after_count += 1;
+ });
+ assert!(self.raw_ping_fetcher.fetch(), "should have message now");
+ let context = self.raw_ping_fetcher.context();
+ assert!(!context.monotonic_event_time().is_min_time());
+ }
+ }
+
+ impl Drop for TestApplication<'_> {
+ fn drop(&mut self) {
+ GLOBAL_STATE.with(|g| {
+ let g = &mut *g.borrow_mut();
+ assert_eq!(g.creation_count, g.drop_count + 1);
+ assert_eq!(g.creation_count, g.on_run_count);
+ assert_eq!(g.creation_count, g.before_count);
+ assert_eq!(g.creation_count, g.watcher_count);
+ assert_eq!(g.creation_count, g.after_count);
+ g.drop_count += 1;
+ });
+ }
+ }
+
+ unsafe fn make_test_application(event_loop: *mut EventLoop) -> Box<TestApplication<'static>> {
+ GLOBAL_STATE.with(|g| {
+ let g = &mut *g.borrow_mut();
+ g.creation_count += 1;
+ });
+ Box::new(TestApplication::new(EventLoopRuntime::new(event_loop)))
+ }
+
+ #[cxx::bridge(namespace = "aos::events::testing")]
+ mod ffi_bridge {
+ extern "Rust" {
+ type TestApplication<'a>;
+
+ unsafe fn make_test_application(
+ event_loop: *mut EventLoop,
+ ) -> Box<TestApplication<'static>>;
+
+ fn before_sending(&mut self);
+ fn after_sending(&mut self);
+
+ fn completed_test_count() -> u32;
+ }
+
+ unsafe extern "C++" {
+ include!("aos/events/event_loop.h");
+ #[namespace = "aos"]
+ type EventLoop = crate::EventLoop;
+ }
+ }
+}
diff --git a/tools/build_rules/autocxx.bzl b/tools/build_rules/autocxx.bzl
index 82d6d90..4679a6f 100644
--- a/tools/build_rules/autocxx.bzl
+++ b/tools/build_rules/autocxx.bzl
@@ -87,7 +87,9 @@
out_env_file = ctx.actions.declare_file("%s/rustc_env" % gendir)
ctx.actions.write(
output = out_env_file,
- content = "AUTOCXX_RS_JSON_ARCHIVE=%s" % out_rs_json.path,
+ # The first path is valid for rust_library/rust_binary/rust_test/etc, the second one
+ # is valid for rust_doc_test due to working directory differences.
+ content = "AUTOCXX_RS_JSON_ARCHIVE=%s:%s" % (out_rs_json.path, out_rs_json.short_path),
)
out_h = ctx.actions.declare_file("%s_cxxgen.h" % ctx.label.name.rstrip("__gen"))
diff --git a/tools/lint/rustfmt.sh b/tools/lint/rustfmt.sh
index 42bb654..8539101 100755
--- a/tools/lint/rustfmt.sh
+++ b/tools/lint/rustfmt.sh
@@ -23,5 +23,5 @@
# If we have any Rust files, format them.
if ((${#rust_files[@]} > 0)); then
- exec "${RUSTFMT}" "${rust_files[@]}"
+ exec "${RUSTFMT}" --edition=2021 "${rust_files[@]}"
fi