Add type-safe Rust EventLoop APIs
Change-Id: I8c4ac13dec513fd03fe534f8f9449585962b970c
Signed-off-by: Brian Silverman <bsilver16384@gmail.com>
diff --git a/aos/events/event_loop_runtime.rs b/aos/events/event_loop_runtime.rs
index 6daa4eb..cd3ca6d 100644
--- a/aos/events/event_loop_runtime.rs
+++ b/aos/events/event_loop_runtime.rs
@@ -51,11 +51,14 @@
WithinBox,
};
use cxx::UniquePtr;
+use flatbuffers::{root_unchecked, Follow, FollowWith, FullyQualifiedName};
use futures::{future::FusedFuture, never::Never};
use thiserror::Error;
use uuid::Uuid;
-pub use aos_configuration::{Channel, ChannelLookupError, Configuration, ConfigurationExt, Node};
+pub use aos_configuration::{Channel, Configuration, Node};
+use aos_configuration::{ChannelLookupError, ConfigurationExt};
+
pub use aos_uuid::UUID;
autocxx::include_cpp! (
@@ -176,20 +179,40 @@
self.0.as_mut().event_loop()
}
- // TODO(Brian): Expose `name`. Need to sort out the lifetimes. C++ can reallocate the pointer
- // independent of Rust. Use it in `get_raw_channel` instead of passing the name in.
+ /// Returns a reference to the name of this EventLoop.
+ ///
+ /// TODO(Brian): Come up with a nice way to expose this safely, without memory allocations, for
+ /// logging etc.
+ ///
+ /// # Safety
+ ///
+ /// The result must not be used after C++ could change it. Unfortunately C++ can change this
+ /// name from most places, so you should be really careful what you do with the result.
+ pub unsafe fn raw_name(&self) -> &str {
+ self.0.name()
+ }
pub fn get_raw_channel(
&self,
name: &str,
typename: &str,
- application_name: &str,
) -> Result<&'event_loop Channel, ChannelLookupError> {
- self.configuration()
- .get_channel(name, typename, application_name, self.node())
+ self.configuration().get_channel(
+ name,
+ typename,
+ // SAFETY: We're not calling any EventLoop methods while C++ is using this for the
+ // channel lookup.
+ unsafe { self.raw_name() },
+ self.node(),
+ )
}
- // TODO(Brian): `get_channel<T>`.
+ pub fn get_channel<T: FullyQualifiedName>(
+ &self,
+ name: &str,
+ ) -> Result<&'event_loop Channel, ChannelLookupError> {
+ self.get_raw_channel(name, T::get_fully_qualified_name())
+ }
/// Starts running the given `task`, which may not return (as specified by its type). If you
/// want your task to stop, return the result of awaiting [`futures::future::pending`], which
@@ -347,6 +370,23 @@
RawWatcher(unsafe { self.0.as_mut().MakeWatcher(channel) }.within_box())
}
+ /// Provides type-safe async blocking access to messages on a channel. `T` should be a
+ /// generated flatbuffers table type, the lifetime parameter does not matter, using `'static`
+ /// is easiest.
+ ///
+ /// # Panics
+ ///
+ /// Dropping `self` before the returned object is dropped will panic.
+ pub fn make_watcher<T>(&mut self, channel_name: &str) -> Result<Watcher<T>, ChannelLookupError>
+ where
+ for<'a> T: FollowWith<'a>,
+ for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>,
+ T: FullyQualifiedName,
+ {
+ let channel = self.get_channel::<T>(channel_name)?;
+ Ok(Watcher(self.make_raw_watcher(channel), PhantomData))
+ }
+
/// Note that the `'event_loop` input lifetime is intentional. The C++ API requires that it is
/// part of `self.configuration()`, which will always have this lifetime.
///
@@ -359,6 +399,21 @@
RawSender(unsafe { self.0.as_mut().MakeSender(channel) }.within_box())
}
+ /// Allows sending messages on a channel with a type-safe API.
+ ///
+ /// # Panics
+ ///
+ /// Dropping `self` before the returned object is dropped will panic.
+ pub fn make_sender<T>(&mut self, channel_name: &str) -> Result<Sender<T>, ChannelLookupError>
+ where
+ for<'a> T: FollowWith<'a>,
+ for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>,
+ T: FullyQualifiedName,
+ {
+ let channel = self.get_channel::<T>(channel_name)?;
+ Ok(Sender(self.make_raw_sender(channel), PhantomData))
+ }
+
/// Note that the `'event_loop` input lifetime is intentional. The C++ API requires that it is
/// part of `self.configuration()`, which will always have this lifetime.
///
@@ -371,6 +426,23 @@
RawFetcher(unsafe { self.0.as_mut().MakeFetcher(channel) }.within_box())
}
+ /// Provides type-safe access to messages on a channel, without the ability to wait for a new
+ /// one. This provides APIs to get the latest message, and to follow along and retrieve each
+ /// message in order.
+ ///
+ /// # Panics
+ ///
+ /// Dropping `self` before the returned object is dropped will panic.
+ pub fn make_fetcher<T>(&mut self, channel_name: &str) -> Result<Fetcher<T>, ChannelLookupError>
+ where
+ for<'a> T: FollowWith<'a>,
+ for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>,
+ T: FullyQualifiedName,
+ {
+ let channel = self.get_channel::<T>(channel_name)?;
+ Ok(Fetcher(self.make_raw_fetcher(channel), PhantomData))
+ }
+
// TODO(Brian): Expose timers and phased loops. Should we have `sleep`-style methods for those,
// instead of / in addition to mirroring C++ with separate setup and wait?
@@ -378,10 +450,6 @@
// immediately afterwards.
}
-// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
-#[repr(transparent)]
-pub struct RawWatcher(Pin<Box<ffi::aos::WatcherForRust>>);
-
/// Provides async blocking access to messages on a channel. This will return every message on the
/// channel, in order.
///
@@ -408,6 +476,10 @@
/// https://blog.rust-lang.org/2022/08/05/nll-by-default.html#looking-forward-what-can-we-expect-for-the-borrow-checker-of-the-future
/// We get around that one by moving the unbounded lifetime from the pointer dereference into the
/// function with the if statement.
+// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
+#[repr(transparent)]
+pub struct RawWatcher(Pin<Box<ffi::aos::WatcherForRust>>);
+
impl RawWatcher {
/// Returns a Future to await the next value. This can be canceled (ie dropped) at will,
/// without skipping any messages.
@@ -500,18 +572,187 @@
}
}
-// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
-#[repr(transparent)]
-pub struct RawFetcher(Pin<Box<ffi::aos::FetcherForRust>>);
+/// Provides async blocking access to messages on a channel. This will return every message on the
+/// channel, in order.
+///
+/// Use [`EventLoopRuntime::make_watcher`] to create one of these.
+///
+/// This is the same concept as [`futures::stream::Stream`], but can't follow that API for technical
+/// reasons. See [`RawWatcher`]'s documentation for details.
+pub struct Watcher<T>(RawWatcher, PhantomData<*mut T>)
+where
+ for<'a> T: FollowWith<'a>,
+ for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>;
+
+impl<T> Watcher<T>
+where
+ for<'a> T: FollowWith<'a>,
+ for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>,
+{
+ /// Returns a Future to await the next value. This can be canceled (ie dropped) at will,
+ /// without skipping any messages.
+ ///
+ /// Remember not to call `poll` after it returns `Poll::Ready`, just like any other future. You
+ /// will need to call this function again to get the succeeding message.
+ ///
+ /// # Examples
+ ///
+ /// The common use case is immediately awaiting the next message:
+ /// ```
+ /// # use pong_rust_fbs::aos::examples::Pong;
+ /// # async fn await_message(mut watcher: aos_events_event_loop_runtime::Watcher<Pong<'static>>) {
+ /// println!("received: {:?}", watcher.next().await);
+ /// # }
+ /// ```
+ ///
+ /// You can also await the first message from any of a set of channels:
+ /// ```
+ /// # use pong_rust_fbs::aos::examples::Pong;
+ /// # async fn select(
+ /// # mut watcher1: aos_events_event_loop_runtime::Watcher<Pong<'static>>,
+ /// # mut watcher2: aos_events_event_loop_runtime::Watcher<Pong<'static>>,
+ /// # ) {
+ /// futures::select! {
+ /// message1 = watcher1.next() => println!("channel 1: {:?}", message1),
+ /// message2 = watcher2.next() => println!("channel 2: {:?}", message2),
+ /// }
+ /// # }
+ /// ```
+ ///
+ /// Note that due to the returned object borrowing the `self` reference, the borrow checker will
+ /// enforce only having a single of these returned objects at a time. Drop the previous message
+ /// before asking for the next one. That means this will not compile:
+ /// ```compile_fail
+ /// # use pong_rust_fbs::aos::examples::Pong;
+ /// # async fn compile_check(mut watcher: aos_events_event_loop_runtime::Watcher<Pong<'static>>) {
+ /// let first = watcher.next();
+ /// let second = watcher.next();
+ /// first.await;
+ /// # }
+ /// ```
+ /// and nor will this:
+ /// ```compile_fail
+ /// # use pong_rust_fbs::aos::examples::Pong;
+ /// # async fn compile_check(mut watcher: aos_events_event_loop_runtime::Watcher<Pong<'static>>) {
+ /// let first = watcher.next().await;
+ /// watcher.next();
+ /// println!("still have: {:?}", first);
+ /// # }
+ /// ```
+ /// but this is fine:
+ /// ```
+ /// # use pong_rust_fbs::aos::examples::Pong;
+ /// # async fn compile_check(mut watcher: aos_events_event_loop_runtime::Watcher<Pong<'static>>) {
+ /// let first = watcher.next().await;
+ /// println!("have: {:?}", first);
+ /// watcher.next();
+ /// # }
+ /// ```
+ pub fn next(&mut self) -> WatcherNext<'_, <T as FollowWith<'_>>::Inner> {
+ WatcherNext(self.0.next(), PhantomData)
+ }
+}
+
+/// The type returned from [`Watcher::next`], see there for details.
+pub struct WatcherNext<'watcher, T>(RawWatcherNext<'watcher>, PhantomData<*mut T>)
+where
+ T: Follow<'watcher> + 'watcher;
+
+impl<'watcher, T> Future for WatcherNext<'watcher, T>
+where
+ T: Follow<'watcher> + 'watcher,
+{
+ type Output = TypedContext<'watcher, T>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
+ Pin::new(&mut self.get_mut().0).poll(cx).map(|context|
+ // SAFETY: The Watcher this was created from verified that the channel is the
+ // right type, and the C++ guarantees that the buffer's type matches.
+ TypedContext(context, PhantomData))
+ }
+}
+
+impl<'watcher, T> FusedFuture for WatcherNext<'watcher, T>
+where
+ T: Follow<'watcher> + 'watcher,
+{
+ fn is_terminated(&self) -> bool {
+ self.0.is_terminated()
+ }
+}
+
+/// A wrapper around [`Context`] which exposes the flatbuffer message with the appropriate type.
+pub struct TypedContext<'a, T>(
+ // SAFETY: This must have a message, and it must be a valid `T` flatbuffer.
+ Context<'a>,
+ PhantomData<*mut T>,
+)
+where
+ T: Follow<'a> + 'a;
+
+// TODO(Brian): Add the realtime timestamps here.
+impl<'a, T> TypedContext<'a, T>
+where
+ T: Follow<'a> + 'a,
+{
+ pub fn message(&self) -> Option<T::Inner> {
+ self.0.data().map(|data| {
+ // SAFETY: C++ guarantees that this is a valid flatbuffer. We guarantee it's the right
+ // type based on invariants for our type.
+ unsafe { root_unchecked::<T>(data) }
+ })
+ }
+
+ pub fn monotonic_event_time(&self) -> MonotonicInstant {
+ self.0.monotonic_event_time()
+ }
+ pub fn monotonic_remote_time(&self) -> MonotonicInstant {
+ self.0.monotonic_remote_time()
+ }
+ pub fn queue_index(&self) -> u32 {
+ self.0.queue_index()
+ }
+ pub fn remote_queue_index(&self) -> u32 {
+ self.0.remote_queue_index()
+ }
+ pub fn buffer_index(&self) -> i32 {
+ self.0.buffer_index()
+ }
+ pub fn source_boot_uuid(&self) -> &Uuid {
+ self.0.source_boot_uuid()
+ }
+}
+
+impl<'a, T> fmt::Debug for TypedContext<'a, T>
+where
+ T: Follow<'a> + 'a,
+ T::Inner: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ // TODO(Brian): Add the realtime timestamps here.
+ f.debug_struct("TypedContext")
+ .field("monotonic_event_time", &self.monotonic_event_time())
+ .field("monotonic_remote_time", &self.monotonic_remote_time())
+ .field("queue_index", &self.queue_index())
+ .field("remote_queue_index", &self.remote_queue_index())
+ .field("message", &self.message())
+ .field("buffer_index", &self.buffer_index())
+ .field("source_boot_uuid", &self.source_boot_uuid())
+ .finish()
+ }
+}
/// Provides access to messages on a channel, without the ability to wait for a new one. This
-/// provides APIs to get the latest message at some time, and to follow along and retrieve each
-/// message in order.
+/// provides APIs to get the latest message, and to follow along and retrieve each message in order.
///
/// Use [`EventLoopRuntime::make_raw_fetcher`] to create one of these.
///
/// This is the non-typed API, which is mainly useful for reflection and does not provide safe APIs
/// for actually interpreting messages. You probably want a [`Fetcher`] instead.
+// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
+#[repr(transparent)]
+pub struct RawFetcher(Pin<Box<ffi::aos::FetcherForRust>>);
+
impl RawFetcher {
pub fn fetch_next(&mut self) -> bool {
self.0.as_mut().FetchNext()
@@ -526,9 +767,37 @@
}
}
-// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
-#[repr(transparent)]
-pub struct RawSender(Pin<Box<ffi::aos::SenderForRust>>);
+/// Provides access to messages on a channel, without the ability to wait for a new one. This
+/// provides APIs to get the latest message, and to follow along and retrieve each message in order.
+///
+/// Use [`EventLoopRuntime::make_fetcher`] to create one of these.
+pub struct Fetcher<T>(
+ // SAFETY: This must produce messages of type `T`.
+ RawFetcher,
+ PhantomData<*mut T>,
+)
+where
+ for<'a> T: FollowWith<'a>,
+ for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>;
+
+impl<T> Fetcher<T>
+where
+ for<'a> T: FollowWith<'a>,
+ for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>,
+{
+ pub fn fetch_next(&mut self) -> bool {
+ self.0.fetch_next()
+ }
+ pub fn fetch(&mut self) -> bool {
+ self.0.fetch()
+ }
+
+ pub fn context(&self) -> TypedContext<'_, <T as FollowWith<'_>>::Inner> {
+ // SAFETY: We verified that this is the correct type, and C++ guarantees that the buffer's
+ // type matches.
+ TypedContext(self.0.context(), PhantomData)
+ }
+}
/// Allows sending messages on a channel.
///
@@ -536,6 +805,10 @@
/// for actually creating messages to send. You probably want a [`Sender`] instead.
///
/// Use [`EventLoopRuntime::make_raw_sender`] to create one of these.
+// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
+#[repr(transparent)]
+pub struct RawSender(Pin<Box<ffi::aos::SenderForRust>>);
+
impl RawSender {
fn buffer(&mut self) -> &mut [u8] {
// SAFETY: This is a valid slice, and `u8` doesn't have any alignment requirements.
@@ -595,14 +868,6 @@
}
}
-#[derive(Clone, Copy, Eq, PartialEq, Debug, Error)]
-pub enum SendError {
- #[error("messages have been sent too fast on this channel")]
- MessagesSentTooFast,
- #[error("invalid redzone data, shared memory corruption detected")]
- InvalidRedzone,
-}
-
/// Used for building a message. See [`RawSender::make_builder`] for details.
pub struct RawBuilder<'sender> {
raw_sender: &'sender mut RawSender,
@@ -636,6 +901,98 @@
}
}
+/// Allows sending messages on a channel with a type-safe API.
+///
+/// Use [`EventLoopRuntime::make_raw_sender`] to create one of these.
+pub struct Sender<T>(
+ // SAFETY: This must accept messages of type `T`.
+ RawSender,
+ PhantomData<*mut T>,
+)
+where
+ for<'a> T: FollowWith<'a>,
+ for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>;
+
+impl<T> Sender<T>
+where
+ for<'a> T: FollowWith<'a>,
+ for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>,
+{
+ /// Returns an object which can be used to build a message.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use pong_rust_fbs::aos::examples::{Pong, PongBuilder};
+ /// # fn compile_check(mut sender: aos_events_event_loop_runtime::Sender<Pong<'static>>) {
+ /// let mut builder = sender.make_builder();
+ /// let pong = PongBuilder::new(builder.fbb()).finish();
+ /// builder.send(pong);
+ /// # }
+ /// ```
+ ///
+ /// You can bail out of building a message and build another one:
+ /// ```
+ /// # use pong_rust_fbs::aos::examples::{Pong, PongBuilder};
+ /// # fn compile_check(mut sender: aos_events_event_loop_runtime::Sender<Pong<'static>>) {
+ /// let mut builder1 = sender.make_builder();
+ /// builder1.fbb();
+ /// let mut builder2 = sender.make_builder();
+ /// let pong = PongBuilder::new(builder2.fbb()).finish();
+ /// builder2.send(pong);
+ /// # }
+ /// ```
+ /// but you cannot build two messages at the same time with a single builder:
+ /// ```compile_fail
+ /// # use pong_rust_fbs::aos::examples::{Pong, PongBuilder};
+ /// # fn compile_check(mut sender: aos_events_event_loop_runtime::Sender<Pong<'static>>) {
+ /// let mut builder1 = sender.make_builder();
+ /// let mut builder2 = sender.make_builder();
+ /// PongBuilder::new(builder2.fbb()).finish();
+ /// PongBuilder::new(builder1.fbb()).finish();
+ /// # }
+ /// ```
+ pub fn make_builder(&mut self) -> Builder<T> {
+ Builder(self.0.make_builder(), PhantomData)
+ }
+}
+
+/// Used for building a message. See [`Sender::make_builder`] for details.
+pub struct Builder<'sender, T>(
+ // SAFETY: This must accept messages of type `T`.
+ RawBuilder<'sender>,
+ PhantomData<*mut T>,
+)
+where
+ for<'a> T: FollowWith<'a>,
+ for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>;
+
+impl<'sender, T> Builder<'sender, T>
+where
+ for<'a> T: FollowWith<'a>,
+ for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>,
+{
+ pub fn fbb(&mut self) -> &mut flatbuffers::FlatBufferBuilder<'sender> {
+ self.0.fbb()
+ }
+
+ pub fn send<'a>(
+ self,
+ root: flatbuffers::WIPOffset<<T as FollowWith<'a>>::Inner>,
+ ) -> Result<(), SendError> {
+ // SAFETY: We guarantee this is the right type based on invariants for our type.
+ unsafe { self.0.send(root) }
+ }
+}
+
+#[derive(Clone, Copy, Eq, PartialEq, Debug, Error)]
+pub enum SendError {
+ #[error("messages have been sent too fast on this channel")]
+ MessagesSentTooFast,
+ #[error("invalid redzone data, shared memory corruption detected")]
+ InvalidRedzone,
+}
+
#[repr(transparent)]
#[derive(Clone, Copy)]
pub struct Context<'context>(&'context ffi::aos::Context);
diff --git a/aos/events/event_loop_runtime_test.cc b/aos/events/event_loop_runtime_test.cc
index c27a27f..81f3be3 100644
--- a/aos/events/event_loop_runtime_test.cc
+++ b/aos/events/event_loop_runtime_test.cc
@@ -9,7 +9,8 @@
namespace aos::events::testing {
namespace {
-void MakeAndTestApplication(int value) {
+template <typename F>
+void MakeAndTestApplication(int value, F constructor) {
const int32_t starting_count = completed_test_count();
const aos::FlatbufferDetachedBuffer<aos::Configuration> config =
aos::configuration::ReadConfig(
@@ -20,7 +21,7 @@
auto pong_fetcher = ping_event_loop->MakeFetcher<examples::Pong>("/test");
const auto rust_event_loop = factory.MakeEventLoop("pong");
{
- auto test_application = make_test_application(rust_event_loop.get());
+ auto test_application = constructor(rust_event_loop.get());
int iteration = 0;
ping_event_loop
->AddTimer([&]() {
@@ -51,11 +52,18 @@
} // namespace
-TEST(EventLoopRustTest, TestApplicationOnce) { MakeAndTestApplication(971); }
+TEST(EventLoopRustTest, TestApplicationOnce) {
+ MakeAndTestApplication(971, &make_test_application);
+}
TEST(EventLoopRustTest, TestApplicationTwice) {
- MakeAndTestApplication(971);
- MakeAndTestApplication(254);
+ MakeAndTestApplication(971, &make_test_application);
+ MakeAndTestApplication(254, &make_test_application);
+}
+
+TEST(EventLoopRustTest, TestTypedApplicationTwice) {
+ MakeAndTestApplication(971, &make_typed_test_application);
+ MakeAndTestApplication(254, &make_typed_test_application);
}
} // namespace aos::events::testing
diff --git a/aos/events/event_loop_runtime_test_lib.rs b/aos/events/event_loop_runtime_test_lib.rs
index ffc8ce8..62c7584 100644
--- a/aos/events/event_loop_runtime_test_lib.rs
+++ b/aos/events/event_loop_runtime_test_lib.rs
@@ -1,9 +1,9 @@
//! These test helpers have to live in a separate file because autocxx only generates one set of
//! outputs per file, and that needs to be the non-`#[cfg(test)]` stuff.
-use aos_events_event_loop_runtime::{EventLoop, EventLoopRuntime, RawFetcher};
-use ping_rust_fbs::aos::examples::root_as_ping;
-use pong_rust_fbs::aos::examples::PongBuilder;
+use aos_events_event_loop_runtime::{EventLoop, EventLoopRuntime, Fetcher, RawFetcher};
+use ping_rust_fbs::aos::examples::{root_as_ping, Ping};
+use pong_rust_fbs::aos::examples::{Pong, PongBuilder};
mod tests {
use super::*;
@@ -43,12 +43,12 @@
impl<'event_loop> TestApplication<'event_loop> {
fn new(mut runtime: EventLoopRuntime<'event_loop>) -> Self {
let ping_channel = runtime
- .get_raw_channel("/test", "aos.examples.Ping", "test")
+ .get_raw_channel("/test", "aos.examples.Ping")
.expect("Should have Ping channel");
let mut raw_ping_watcher = runtime.make_raw_watcher(ping_channel);
let mut raw_pong_sender = runtime.make_raw_sender(
runtime
- .get_raw_channel("/test", "aos.examples.Pong", "test")
+ .get_raw_channel("/test", "aos.examples.Pong")
.expect("Should have Pong channel"),
);
runtime.spawn(async move {
@@ -154,21 +154,143 @@
Box::new(TestApplication::new(EventLoopRuntime::new(event_loop)))
}
+ pub struct TypedTestApplication<'event_loop> {
+ _runtime: EventLoopRuntime<'event_loop>,
+ ping_fetcher: Fetcher<Ping<'static>>,
+ }
+
+ impl<'event_loop> TypedTestApplication<'event_loop> {
+ fn new(mut runtime: EventLoopRuntime<'event_loop>) -> Self {
+ let mut ping_watcher = runtime.make_watcher::<Ping<'static>>("/test").unwrap();
+ let mut pong_sender = runtime.make_sender::<Pong<'static>>("/test").unwrap();
+ runtime.spawn(async move {
+ // TODO(Brian): Wait for OnRun here.
+ GLOBAL_STATE.with(|g| {
+ let g = &mut *g.borrow_mut();
+ assert_eq!(g.creation_count, g.drop_count + 1);
+ assert_eq!(g.drop_count, g.on_run_count);
+ assert_eq!(g.drop_count, g.before_count);
+ assert_eq!(g.drop_count, g.watcher_count);
+ assert_eq!(g.drop_count, g.after_count);
+ g.on_run_count += 1;
+ });
+ loop {
+ let context = ping_watcher.next().await;
+ assert!(!context.monotonic_event_time().is_min_time());
+ assert!(!context.message().is_none());
+ GLOBAL_STATE.with(|g| {
+ let g = &mut *g.borrow_mut();
+ assert_eq!(g.creation_count, g.drop_count + 1);
+ assert_eq!(g.creation_count, g.on_run_count);
+ assert_eq!(g.creation_count, g.before_count);
+ assert_eq!(g.drop_count, g.watcher_count);
+ assert_eq!(g.drop_count, g.after_count);
+ g.watcher_count += 1;
+ });
+ let ping: Ping<'_> = context.message().unwrap();
+
+ let mut builder = pong_sender.make_builder();
+ let mut pong = PongBuilder::new(builder.fbb());
+ pong.add_value(ping.value());
+ let pong = pong.finish();
+ builder.send(pong).expect("send should succeed");
+ }
+ });
+ let ping_fetcher = runtime.make_fetcher("/test").unwrap();
+ Self {
+ _runtime: runtime,
+ ping_fetcher,
+ }
+ }
+
+ fn before_sending(&mut self) {
+ GLOBAL_STATE.with(|g| {
+ let g = &mut *g.borrow_mut();
+ assert_eq!(g.creation_count, g.drop_count + 1);
+ assert_eq!(g.creation_count, g.on_run_count);
+ assert_eq!(g.drop_count, g.before_count);
+ assert_eq!(g.drop_count, g.watcher_count);
+ assert_eq!(g.drop_count, g.after_count);
+ g.before_count += 1;
+ });
+ assert!(!self.ping_fetcher.fetch(), "should not have message yet");
+ assert!(
+ !self.ping_fetcher.fetch_next(),
+ "should not have message yet"
+ );
+ let context = self.ping_fetcher.context();
+ assert!(context.monotonic_event_time().is_min_time());
+ assert!(context.message().is_none());
+ }
+
+ fn after_sending(&mut self) {
+ GLOBAL_STATE.with(|g| {
+ let g = &mut *g.borrow_mut();
+ assert_eq!(g.creation_count, g.drop_count + 1);
+ assert_eq!(g.creation_count, g.on_run_count);
+ assert_eq!(g.creation_count, g.before_count);
+ assert_eq!(g.creation_count, g.watcher_count);
+ assert_eq!(g.drop_count, g.after_count);
+ g.after_count += 1;
+ });
+ assert!(self.ping_fetcher.fetch(), "should have message now");
+ let context = self.ping_fetcher.context();
+ assert!(!context.monotonic_event_time().is_min_time());
+ }
+ }
+
+ impl Drop for TypedTestApplication<'_> {
+ fn drop(&mut self) {
+ GLOBAL_STATE.with(|g| {
+ let g = &mut *g.borrow_mut();
+ assert_eq!(g.creation_count, g.drop_count + 1);
+ assert_eq!(g.creation_count, g.on_run_count);
+ assert_eq!(g.creation_count, g.before_count);
+ assert_eq!(g.creation_count, g.watcher_count);
+ assert_eq!(g.creation_count, g.after_count);
+ g.drop_count += 1;
+ });
+ }
+ }
+
+ unsafe fn make_typed_test_application(
+ event_loop: *mut EventLoop,
+ ) -> Box<TypedTestApplication<'static>> {
+ GLOBAL_STATE.with(|g| {
+ let g = &mut *g.borrow_mut();
+ g.creation_count += 1;
+ });
+ Box::new(TypedTestApplication::new(EventLoopRuntime::new(event_loop)))
+ }
+
#[cxx::bridge(namespace = "aos::events::testing")]
mod ffi_bridge {
extern "Rust" {
- type TestApplication<'a>;
-
unsafe fn make_test_application(
event_loop: *mut EventLoop,
) -> Box<TestApplication<'static>>;
- fn before_sending(&mut self);
- fn after_sending(&mut self);
+ unsafe fn make_typed_test_application(
+ event_loop: *mut EventLoop,
+ ) -> Box<TypedTestApplication<'static>>;
fn completed_test_count() -> u32;
}
+ extern "Rust" {
+ type TestApplication<'a>;
+
+ fn before_sending(&mut self);
+ fn after_sending(&mut self);
+ }
+
+ extern "Rust" {
+ type TypedTestApplication<'a>;
+
+ fn before_sending(&mut self);
+ fn after_sending(&mut self);
+ }
+
unsafe extern "C++" {
include!("aos/events/event_loop.h");
#[namespace = "aos"]
diff --git a/third_party/flatbuffers/build_defs.bzl b/third_party/flatbuffers/build_defs.bzl
index c3e2bb0..4ebd872 100644
--- a/third_party/flatbuffers/build_defs.bzl
+++ b/third_party/flatbuffers/build_defs.bzl
@@ -43,6 +43,7 @@
DEFAULT_FLATC_RUST_ARGS = [
"--gen-object-api",
"--require-explicit-ids",
+ "--gen-name-strings",
]
DEFAULT_FLATC_TS_ARGS = [
diff --git a/third_party/flatbuffers/rust/flatbuffers/src/primitives.rs b/third_party/flatbuffers/rust/flatbuffers/src/primitives.rs
index 72764b2..4c12af8 100644
--- a/third_party/flatbuffers/rust/flatbuffers/src/primitives.rs
+++ b/third_party/flatbuffers/rust/flatbuffers/src/primitives.rs
@@ -326,3 +326,12 @@
impl_follow_for_endian_scalar!(i64);
impl_follow_for_endian_scalar!(f32);
impl_follow_for_endian_scalar!(f64);
+
+pub trait FullyQualifiedName {
+ /// Returns the fully-qualified name for this table. This will include all namespaces,like
+ /// `MyGame.Monster`.
+ ///
+ /// Use `--gen-name-strings` when running the flatbuffers compile to generate implementations
+ /// of this trait for all tables.
+ fn get_fully_qualified_name() -> &'static str;
+}
diff --git a/third_party/flatbuffers/src/idl_gen_rust.cpp b/third_party/flatbuffers/src/idl_gen_rust.cpp
index 787d03a..0b0ad9a 100644
--- a/third_party/flatbuffers/src/idl_gen_rust.cpp
+++ b/third_party/flatbuffers/src/idl_gen_rust.cpp
@@ -1566,12 +1566,16 @@
// Generates a fully-qualified name getter for use with --gen-name-strings
void GenFullyQualifiedNameGetter(const StructDef &struct_def,
- const std::string &name) {
+ const std::string &name, bool is_struct) {
const std::string fully_qualified_name =
struct_def.defined_namespace->GetFullyQualifiedName(name);
- code_ += " pub const fn get_fully_qualified_name() -> &'static str {";
+ code_ += "impl flatbuffers::FullyQualifiedName for {{STRUCT_TY}}\\";
+ if (!is_struct) { code_ += "<'_>\\"; }
+ code_ += "{";
+ code_ += " fn get_fully_qualified_name() -> &'static str {";
code_ += " \"" + fully_qualified_name + "\"";
code_ += " }";
+ code_ += "}";
code_ += "";
}
@@ -1659,10 +1663,6 @@
});
code_ += "";
- if (parser_.opts.generate_name_strings) {
- GenFullyQualifiedNameGetter(struct_def, struct_def.name);
- }
-
code_ += " #[inline]";
code_ +=
" pub fn init_from_table(table: flatbuffers::Table<'a>) -> "
@@ -1918,6 +1918,10 @@
code_ += "}"; // End of table impl.
code_ += "";
+ if (parser_.opts.generate_name_strings) {
+ GenFullyQualifiedNameGetter(struct_def, struct_def.name, false);
+ }
+
// Generate Verifier;
code_ += "impl flatbuffers::Verifiable for {{STRUCT_TY}}<'_> {";
code_ += " #[inline]";
@@ -2735,10 +2739,6 @@
code_ += " }";
code_ += "";
- if (parser_.opts.generate_name_strings) {
- GenFullyQualifiedNameGetter(struct_def, struct_def.name);
- }
-
// Generate accessor methods for the struct.
ForAllStructFields(struct_def, [&](const FieldDef &field) {
this->GenComment(field.doc_comment);
@@ -2849,6 +2849,10 @@
code_ += "}"; // End impl Struct methods.
code_ += "";
+ if (parser_.opts.generate_name_strings) {
+ GenFullyQualifiedNameGetter(struct_def, struct_def.name, true);
+ }
+
// Generate Struct Object.
if (parser_.opts.generate_object_based_api) {
// Struct declaration