Add type-safe Rust EventLoop APIs

Change-Id: I8c4ac13dec513fd03fe534f8f9449585962b970c
Signed-off-by: Brian Silverman <bsilver16384@gmail.com>
diff --git a/aos/events/event_loop_runtime.rs b/aos/events/event_loop_runtime.rs
index 6daa4eb..cd3ca6d 100644
--- a/aos/events/event_loop_runtime.rs
+++ b/aos/events/event_loop_runtime.rs
@@ -51,11 +51,14 @@
     WithinBox,
 };
 use cxx::UniquePtr;
+use flatbuffers::{root_unchecked, Follow, FollowWith, FullyQualifiedName};
 use futures::{future::FusedFuture, never::Never};
 use thiserror::Error;
 use uuid::Uuid;
 
-pub use aos_configuration::{Channel, ChannelLookupError, Configuration, ConfigurationExt, Node};
+pub use aos_configuration::{Channel, Configuration, Node};
+use aos_configuration::{ChannelLookupError, ConfigurationExt};
+
 pub use aos_uuid::UUID;
 
 autocxx::include_cpp! (
@@ -176,20 +179,40 @@
         self.0.as_mut().event_loop()
     }
 
-    // TODO(Brian): Expose `name`. Need to sort out the lifetimes. C++ can reallocate the pointer
-    // independent of Rust. Use it in `get_raw_channel` instead of passing the name in.
+    /// Returns a reference to the name of this EventLoop.
+    ///
+    /// TODO(Brian): Come up with a nice way to expose this safely, without memory allocations, for
+    /// logging etc.
+    ///
+    /// # Safety
+    ///
+    /// The result must not be used after C++ could change it. Unfortunately C++ can change this
+    /// name from most places, so you should be really careful what you do with the result.
+    pub unsafe fn raw_name(&self) -> &str {
+        self.0.name()
+    }
 
     pub fn get_raw_channel(
         &self,
         name: &str,
         typename: &str,
-        application_name: &str,
     ) -> Result<&'event_loop Channel, ChannelLookupError> {
-        self.configuration()
-            .get_channel(name, typename, application_name, self.node())
+        self.configuration().get_channel(
+            name,
+            typename,
+            // SAFETY: We're not calling any EventLoop methods while C++ is using this for the
+            // channel lookup.
+            unsafe { self.raw_name() },
+            self.node(),
+        )
     }
 
-    // TODO(Brian): `get_channel<T>`.
+    pub fn get_channel<T: FullyQualifiedName>(
+        &self,
+        name: &str,
+    ) -> Result<&'event_loop Channel, ChannelLookupError> {
+        self.get_raw_channel(name, T::get_fully_qualified_name())
+    }
 
     /// Starts running the given `task`, which may not return (as specified by its type). If you
     /// want your task to stop, return the result of awaiting [`futures::future::pending`], which
@@ -347,6 +370,23 @@
         RawWatcher(unsafe { self.0.as_mut().MakeWatcher(channel) }.within_box())
     }
 
+    /// Provides type-safe async blocking access to messages on a channel. `T` should be a
+    /// generated flatbuffers table type, the lifetime parameter does not matter, using `'static`
+    /// is easiest.
+    ///
+    /// # Panics
+    ///
+    /// Dropping `self` before the returned object is dropped will panic.
+    pub fn make_watcher<T>(&mut self, channel_name: &str) -> Result<Watcher<T>, ChannelLookupError>
+    where
+        for<'a> T: FollowWith<'a>,
+        for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>,
+        T: FullyQualifiedName,
+    {
+        let channel = self.get_channel::<T>(channel_name)?;
+        Ok(Watcher(self.make_raw_watcher(channel), PhantomData))
+    }
+
     /// Note that the `'event_loop` input lifetime is intentional. The C++ API requires that it is
     /// part of `self.configuration()`, which will always have this lifetime.
     ///
@@ -359,6 +399,21 @@
         RawSender(unsafe { self.0.as_mut().MakeSender(channel) }.within_box())
     }
 
+    /// Allows sending messages on a channel with a type-safe API.
+    ///
+    /// # Panics
+    ///
+    /// Dropping `self` before the returned object is dropped will panic.
+    pub fn make_sender<T>(&mut self, channel_name: &str) -> Result<Sender<T>, ChannelLookupError>
+    where
+        for<'a> T: FollowWith<'a>,
+        for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>,
+        T: FullyQualifiedName,
+    {
+        let channel = self.get_channel::<T>(channel_name)?;
+        Ok(Sender(self.make_raw_sender(channel), PhantomData))
+    }
+
     /// Note that the `'event_loop` input lifetime is intentional. The C++ API requires that it is
     /// part of `self.configuration()`, which will always have this lifetime.
     ///
@@ -371,6 +426,23 @@
         RawFetcher(unsafe { self.0.as_mut().MakeFetcher(channel) }.within_box())
     }
 
+    /// Provides type-safe access to messages on a channel, without the ability to wait for a new
+    /// one. This provides APIs to get the latest message, and to follow along and retrieve each
+    /// message in order.
+    ///
+    /// # Panics
+    ///
+    /// Dropping `self` before the returned object is dropped will panic.
+    pub fn make_fetcher<T>(&mut self, channel_name: &str) -> Result<Fetcher<T>, ChannelLookupError>
+    where
+        for<'a> T: FollowWith<'a>,
+        for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>,
+        T: FullyQualifiedName,
+    {
+        let channel = self.get_channel::<T>(channel_name)?;
+        Ok(Fetcher(self.make_raw_fetcher(channel), PhantomData))
+    }
+
     // TODO(Brian): Expose timers and phased loops. Should we have `sleep`-style methods for those,
     // instead of / in addition to mirroring C++ with separate setup and wait?
 
@@ -378,10 +450,6 @@
     // immediately afterwards.
 }
 
-// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
-#[repr(transparent)]
-pub struct RawWatcher(Pin<Box<ffi::aos::WatcherForRust>>);
-
 /// Provides async blocking access to messages on a channel. This will return every message on the
 /// channel, in order.
 ///
@@ -408,6 +476,10 @@
 /// https://blog.rust-lang.org/2022/08/05/nll-by-default.html#looking-forward-what-can-we-expect-for-the-borrow-checker-of-the-future
 /// We get around that one by moving the unbounded lifetime from the pointer dereference into the
 /// function with the if statement.
+// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
+#[repr(transparent)]
+pub struct RawWatcher(Pin<Box<ffi::aos::WatcherForRust>>);
+
 impl RawWatcher {
     /// Returns a Future to await the next value. This can be canceled (ie dropped) at will,
     /// without skipping any messages.
@@ -500,18 +572,187 @@
     }
 }
 
-// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
-#[repr(transparent)]
-pub struct RawFetcher(Pin<Box<ffi::aos::FetcherForRust>>);
+/// Provides async blocking access to messages on a channel. This will return every message on the
+/// channel, in order.
+///
+/// Use [`EventLoopRuntime::make_watcher`] to create one of these.
+///
+/// This is the same concept as [`futures::stream::Stream`], but can't follow that API for technical
+/// reasons. See [`RawWatcher`]'s documentation for details.
+pub struct Watcher<T>(RawWatcher, PhantomData<*mut T>)
+where
+    for<'a> T: FollowWith<'a>,
+    for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>;
+
+impl<T> Watcher<T>
+where
+    for<'a> T: FollowWith<'a>,
+    for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>,
+{
+    /// Returns a Future to await the next value. This can be canceled (ie dropped) at will,
+    /// without skipping any messages.
+    ///
+    /// Remember not to call `poll` after it returns `Poll::Ready`, just like any other future. You
+    /// will need to call this function again to get the succeeding message.
+    ///
+    /// # Examples
+    ///
+    /// The common use case is immediately awaiting the next message:
+    /// ```
+    /// # use pong_rust_fbs::aos::examples::Pong;
+    /// # async fn await_message(mut watcher: aos_events_event_loop_runtime::Watcher<Pong<'static>>) {
+    /// println!("received: {:?}", watcher.next().await);
+    /// # }
+    /// ```
+    ///
+    /// You can also await the first message from any of a set of channels:
+    /// ```
+    /// # use pong_rust_fbs::aos::examples::Pong;
+    /// # async fn select(
+    /// #     mut watcher1: aos_events_event_loop_runtime::Watcher<Pong<'static>>,
+    /// #     mut watcher2: aos_events_event_loop_runtime::Watcher<Pong<'static>>,
+    /// # ) {
+    /// futures::select! {
+    ///     message1 = watcher1.next() => println!("channel 1: {:?}", message1),
+    ///     message2 = watcher2.next() => println!("channel 2: {:?}", message2),
+    /// }
+    /// # }
+    /// ```
+    ///
+    /// Note that due to the returned object borrowing the `self` reference, the borrow checker will
+    /// enforce only having a single of these returned objects at a time. Drop the previous message
+    /// before asking for the next one. That means this will not compile:
+    /// ```compile_fail
+    /// # use pong_rust_fbs::aos::examples::Pong;
+    /// # async fn compile_check(mut watcher: aos_events_event_loop_runtime::Watcher<Pong<'static>>) {
+    /// let first = watcher.next();
+    /// let second = watcher.next();
+    /// first.await;
+    /// # }
+    /// ```
+    /// and nor will this:
+    /// ```compile_fail
+    /// # use pong_rust_fbs::aos::examples::Pong;
+    /// # async fn compile_check(mut watcher: aos_events_event_loop_runtime::Watcher<Pong<'static>>) {
+    /// let first = watcher.next().await;
+    /// watcher.next();
+    /// println!("still have: {:?}", first);
+    /// # }
+    /// ```
+    /// but this is fine:
+    /// ```
+    /// # use pong_rust_fbs::aos::examples::Pong;
+    /// # async fn compile_check(mut watcher: aos_events_event_loop_runtime::Watcher<Pong<'static>>) {
+    /// let first = watcher.next().await;
+    /// println!("have: {:?}", first);
+    /// watcher.next();
+    /// # }
+    /// ```
+    pub fn next(&mut self) -> WatcherNext<'_, <T as FollowWith<'_>>::Inner> {
+        WatcherNext(self.0.next(), PhantomData)
+    }
+}
+
+/// The type returned from [`Watcher::next`], see there for details.
+pub struct WatcherNext<'watcher, T>(RawWatcherNext<'watcher>, PhantomData<*mut T>)
+where
+    T: Follow<'watcher> + 'watcher;
+
+impl<'watcher, T> Future for WatcherNext<'watcher, T>
+where
+    T: Follow<'watcher> + 'watcher,
+{
+    type Output = TypedContext<'watcher, T>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
+        Pin::new(&mut self.get_mut().0).poll(cx).map(|context|
+                 // SAFETY: The Watcher this was created from verified that the channel is the
+                 // right type, and the C++ guarantees that the buffer's type matches.
+                 TypedContext(context, PhantomData))
+    }
+}
+
+impl<'watcher, T> FusedFuture for WatcherNext<'watcher, T>
+where
+    T: Follow<'watcher> + 'watcher,
+{
+    fn is_terminated(&self) -> bool {
+        self.0.is_terminated()
+    }
+}
+
+/// A wrapper around [`Context`] which exposes the flatbuffer message with the appropriate type.
+pub struct TypedContext<'a, T>(
+    // SAFETY: This must have a message, and it must be a valid `T` flatbuffer.
+    Context<'a>,
+    PhantomData<*mut T>,
+)
+where
+    T: Follow<'a> + 'a;
+
+// TODO(Brian): Add the realtime timestamps here.
+impl<'a, T> TypedContext<'a, T>
+where
+    T: Follow<'a> + 'a,
+{
+    pub fn message(&self) -> Option<T::Inner> {
+        self.0.data().map(|data| {
+            // SAFETY: C++ guarantees that this is a valid flatbuffer. We guarantee it's the right
+            // type based on invariants for our type.
+            unsafe { root_unchecked::<T>(data) }
+        })
+    }
+
+    pub fn monotonic_event_time(&self) -> MonotonicInstant {
+        self.0.monotonic_event_time()
+    }
+    pub fn monotonic_remote_time(&self) -> MonotonicInstant {
+        self.0.monotonic_remote_time()
+    }
+    pub fn queue_index(&self) -> u32 {
+        self.0.queue_index()
+    }
+    pub fn remote_queue_index(&self) -> u32 {
+        self.0.remote_queue_index()
+    }
+    pub fn buffer_index(&self) -> i32 {
+        self.0.buffer_index()
+    }
+    pub fn source_boot_uuid(&self) -> &Uuid {
+        self.0.source_boot_uuid()
+    }
+}
+
+impl<'a, T> fmt::Debug for TypedContext<'a, T>
+where
+    T: Follow<'a> + 'a,
+    T::Inner: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        // TODO(Brian): Add the realtime timestamps here.
+        f.debug_struct("TypedContext")
+            .field("monotonic_event_time", &self.monotonic_event_time())
+            .field("monotonic_remote_time", &self.monotonic_remote_time())
+            .field("queue_index", &self.queue_index())
+            .field("remote_queue_index", &self.remote_queue_index())
+            .field("message", &self.message())
+            .field("buffer_index", &self.buffer_index())
+            .field("source_boot_uuid", &self.source_boot_uuid())
+            .finish()
+    }
+}
 
 /// Provides access to messages on a channel, without the ability to wait for a new one. This
-/// provides APIs to get the latest message at some time, and to follow along and retrieve each
-/// message in order.
+/// provides APIs to get the latest message, and to follow along and retrieve each message in order.
 ///
 /// Use [`EventLoopRuntime::make_raw_fetcher`] to create one of these.
 ///
 /// This is the non-typed API, which is mainly useful for reflection and does not provide safe APIs
 /// for actually interpreting messages. You probably want a [`Fetcher`] instead.
+// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
+#[repr(transparent)]
+pub struct RawFetcher(Pin<Box<ffi::aos::FetcherForRust>>);
+
 impl RawFetcher {
     pub fn fetch_next(&mut self) -> bool {
         self.0.as_mut().FetchNext()
@@ -526,9 +767,37 @@
     }
 }
 
-// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
-#[repr(transparent)]
-pub struct RawSender(Pin<Box<ffi::aos::SenderForRust>>);
+/// Provides access to messages on a channel, without the ability to wait for a new one. This
+/// provides APIs to get the latest message, and to follow along and retrieve each message in order.
+///
+/// Use [`EventLoopRuntime::make_fetcher`] to create one of these.
+pub struct Fetcher<T>(
+    // SAFETY: This must produce messages of type `T`.
+    RawFetcher,
+    PhantomData<*mut T>,
+)
+where
+    for<'a> T: FollowWith<'a>,
+    for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>;
+
+impl<T> Fetcher<T>
+where
+    for<'a> T: FollowWith<'a>,
+    for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>,
+{
+    pub fn fetch_next(&mut self) -> bool {
+        self.0.fetch_next()
+    }
+    pub fn fetch(&mut self) -> bool {
+        self.0.fetch()
+    }
+
+    pub fn context(&self) -> TypedContext<'_, <T as FollowWith<'_>>::Inner> {
+        // SAFETY: We verified that this is the correct type, and C++ guarantees that the buffer's
+        // type matches.
+        TypedContext(self.0.context(), PhantomData)
+    }
+}
 
 /// Allows sending messages on a channel.
 ///
@@ -536,6 +805,10 @@
 /// for actually creating messages to send. You probably want a [`Sender`] instead.
 ///
 /// Use [`EventLoopRuntime::make_raw_sender`] to create one of these.
+// SAFETY: If this outlives the parent EventLoop, the C++ code will LOG(FATAL).
+#[repr(transparent)]
+pub struct RawSender(Pin<Box<ffi::aos::SenderForRust>>);
+
 impl RawSender {
     fn buffer(&mut self) -> &mut [u8] {
         // SAFETY: This is a valid slice, and `u8` doesn't have any alignment requirements.
@@ -595,14 +868,6 @@
     }
 }
 
-#[derive(Clone, Copy, Eq, PartialEq, Debug, Error)]
-pub enum SendError {
-    #[error("messages have been sent too fast on this channel")]
-    MessagesSentTooFast,
-    #[error("invalid redzone data, shared memory corruption detected")]
-    InvalidRedzone,
-}
-
 /// Used for building a message. See [`RawSender::make_builder`] for details.
 pub struct RawBuilder<'sender> {
     raw_sender: &'sender mut RawSender,
@@ -636,6 +901,98 @@
     }
 }
 
+/// Allows sending messages on a channel with a type-safe API.
+///
+/// Use [`EventLoopRuntime::make_raw_sender`] to create one of these.
+pub struct Sender<T>(
+    // SAFETY: This must accept messages of type `T`.
+    RawSender,
+    PhantomData<*mut T>,
+)
+where
+    for<'a> T: FollowWith<'a>,
+    for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>;
+
+impl<T> Sender<T>
+where
+    for<'a> T: FollowWith<'a>,
+    for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>,
+{
+    /// Returns an object which can be used to build a message.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # use pong_rust_fbs::aos::examples::{Pong, PongBuilder};
+    /// # fn compile_check(mut sender: aos_events_event_loop_runtime::Sender<Pong<'static>>) {
+    /// let mut builder = sender.make_builder();
+    /// let pong = PongBuilder::new(builder.fbb()).finish();
+    /// builder.send(pong);
+    /// # }
+    /// ```
+    ///
+    /// You can bail out of building a message and build another one:
+    /// ```
+    /// # use pong_rust_fbs::aos::examples::{Pong, PongBuilder};
+    /// # fn compile_check(mut sender: aos_events_event_loop_runtime::Sender<Pong<'static>>) {
+    /// let mut builder1 = sender.make_builder();
+    /// builder1.fbb();
+    /// let mut builder2 = sender.make_builder();
+    /// let pong = PongBuilder::new(builder2.fbb()).finish();
+    /// builder2.send(pong);
+    /// # }
+    /// ```
+    /// but you cannot build two messages at the same time with a single builder:
+    /// ```compile_fail
+    /// # use pong_rust_fbs::aos::examples::{Pong, PongBuilder};
+    /// # fn compile_check(mut sender: aos_events_event_loop_runtime::Sender<Pong<'static>>) {
+    /// let mut builder1 = sender.make_builder();
+    /// let mut builder2 = sender.make_builder();
+    /// PongBuilder::new(builder2.fbb()).finish();
+    /// PongBuilder::new(builder1.fbb()).finish();
+    /// # }
+    /// ```
+    pub fn make_builder(&mut self) -> Builder<T> {
+        Builder(self.0.make_builder(), PhantomData)
+    }
+}
+
+/// Used for building a message. See [`Sender::make_builder`] for details.
+pub struct Builder<'sender, T>(
+    // SAFETY: This must accept messages of type `T`.
+    RawBuilder<'sender>,
+    PhantomData<*mut T>,
+)
+where
+    for<'a> T: FollowWith<'a>,
+    for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>;
+
+impl<'sender, T> Builder<'sender, T>
+where
+    for<'a> T: FollowWith<'a>,
+    for<'a> <T as FollowWith<'a>>::Inner: Follow<'a>,
+{
+    pub fn fbb(&mut self) -> &mut flatbuffers::FlatBufferBuilder<'sender> {
+        self.0.fbb()
+    }
+
+    pub fn send<'a>(
+        self,
+        root: flatbuffers::WIPOffset<<T as FollowWith<'a>>::Inner>,
+    ) -> Result<(), SendError> {
+        // SAFETY: We guarantee this is the right type based on invariants for our type.
+        unsafe { self.0.send(root) }
+    }
+}
+
+#[derive(Clone, Copy, Eq, PartialEq, Debug, Error)]
+pub enum SendError {
+    #[error("messages have been sent too fast on this channel")]
+    MessagesSentTooFast,
+    #[error("invalid redzone data, shared memory corruption detected")]
+    InvalidRedzone,
+}
+
 #[repr(transparent)]
 #[derive(Clone, Copy)]
 pub struct Context<'context>(&'context ffi::aos::Context);
diff --git a/aos/events/event_loop_runtime_test.cc b/aos/events/event_loop_runtime_test.cc
index c27a27f..81f3be3 100644
--- a/aos/events/event_loop_runtime_test.cc
+++ b/aos/events/event_loop_runtime_test.cc
@@ -9,7 +9,8 @@
 namespace aos::events::testing {
 namespace {
 
-void MakeAndTestApplication(int value) {
+template <typename F>
+void MakeAndTestApplication(int value, F constructor) {
   const int32_t starting_count = completed_test_count();
   const aos::FlatbufferDetachedBuffer<aos::Configuration> config =
       aos::configuration::ReadConfig(
@@ -20,7 +21,7 @@
   auto pong_fetcher = ping_event_loop->MakeFetcher<examples::Pong>("/test");
   const auto rust_event_loop = factory.MakeEventLoop("pong");
   {
-    auto test_application = make_test_application(rust_event_loop.get());
+    auto test_application = constructor(rust_event_loop.get());
     int iteration = 0;
     ping_event_loop
         ->AddTimer([&]() {
@@ -51,11 +52,18 @@
 
 }  // namespace
 
-TEST(EventLoopRustTest, TestApplicationOnce) { MakeAndTestApplication(971); }
+TEST(EventLoopRustTest, TestApplicationOnce) {
+  MakeAndTestApplication(971, &make_test_application);
+}
 
 TEST(EventLoopRustTest, TestApplicationTwice) {
-  MakeAndTestApplication(971);
-  MakeAndTestApplication(254);
+  MakeAndTestApplication(971, &make_test_application);
+  MakeAndTestApplication(254, &make_test_application);
+}
+
+TEST(EventLoopRustTest, TestTypedApplicationTwice) {
+  MakeAndTestApplication(971, &make_typed_test_application);
+  MakeAndTestApplication(254, &make_typed_test_application);
 }
 
 }  // namespace aos::events::testing
diff --git a/aos/events/event_loop_runtime_test_lib.rs b/aos/events/event_loop_runtime_test_lib.rs
index ffc8ce8..62c7584 100644
--- a/aos/events/event_loop_runtime_test_lib.rs
+++ b/aos/events/event_loop_runtime_test_lib.rs
@@ -1,9 +1,9 @@
 //! These test helpers have to live in a separate file because autocxx only generates one set of
 //! outputs per file, and that needs to be the non-`#[cfg(test)]` stuff.
 
-use aos_events_event_loop_runtime::{EventLoop, EventLoopRuntime, RawFetcher};
-use ping_rust_fbs::aos::examples::root_as_ping;
-use pong_rust_fbs::aos::examples::PongBuilder;
+use aos_events_event_loop_runtime::{EventLoop, EventLoopRuntime, Fetcher, RawFetcher};
+use ping_rust_fbs::aos::examples::{root_as_ping, Ping};
+use pong_rust_fbs::aos::examples::{Pong, PongBuilder};
 
 mod tests {
     use super::*;
@@ -43,12 +43,12 @@
     impl<'event_loop> TestApplication<'event_loop> {
         fn new(mut runtime: EventLoopRuntime<'event_loop>) -> Self {
             let ping_channel = runtime
-                .get_raw_channel("/test", "aos.examples.Ping", "test")
+                .get_raw_channel("/test", "aos.examples.Ping")
                 .expect("Should have Ping channel");
             let mut raw_ping_watcher = runtime.make_raw_watcher(ping_channel);
             let mut raw_pong_sender = runtime.make_raw_sender(
                 runtime
-                    .get_raw_channel("/test", "aos.examples.Pong", "test")
+                    .get_raw_channel("/test", "aos.examples.Pong")
                     .expect("Should have Pong channel"),
             );
             runtime.spawn(async move {
@@ -154,21 +154,143 @@
         Box::new(TestApplication::new(EventLoopRuntime::new(event_loop)))
     }
 
+    pub struct TypedTestApplication<'event_loop> {
+        _runtime: EventLoopRuntime<'event_loop>,
+        ping_fetcher: Fetcher<Ping<'static>>,
+    }
+
+    impl<'event_loop> TypedTestApplication<'event_loop> {
+        fn new(mut runtime: EventLoopRuntime<'event_loop>) -> Self {
+            let mut ping_watcher = runtime.make_watcher::<Ping<'static>>("/test").unwrap();
+            let mut pong_sender = runtime.make_sender::<Pong<'static>>("/test").unwrap();
+            runtime.spawn(async move {
+                // TODO(Brian): Wait for OnRun here.
+                GLOBAL_STATE.with(|g| {
+                    let g = &mut *g.borrow_mut();
+                    assert_eq!(g.creation_count, g.drop_count + 1);
+                    assert_eq!(g.drop_count, g.on_run_count);
+                    assert_eq!(g.drop_count, g.before_count);
+                    assert_eq!(g.drop_count, g.watcher_count);
+                    assert_eq!(g.drop_count, g.after_count);
+                    g.on_run_count += 1;
+                });
+                loop {
+                    let context = ping_watcher.next().await;
+                    assert!(!context.monotonic_event_time().is_min_time());
+                    assert!(!context.message().is_none());
+                    GLOBAL_STATE.with(|g| {
+                        let g = &mut *g.borrow_mut();
+                        assert_eq!(g.creation_count, g.drop_count + 1);
+                        assert_eq!(g.creation_count, g.on_run_count);
+                        assert_eq!(g.creation_count, g.before_count);
+                        assert_eq!(g.drop_count, g.watcher_count);
+                        assert_eq!(g.drop_count, g.after_count);
+                        g.watcher_count += 1;
+                    });
+                    let ping: Ping<'_> = context.message().unwrap();
+
+                    let mut builder = pong_sender.make_builder();
+                    let mut pong = PongBuilder::new(builder.fbb());
+                    pong.add_value(ping.value());
+                    let pong = pong.finish();
+                    builder.send(pong).expect("send should succeed");
+                }
+            });
+            let ping_fetcher = runtime.make_fetcher("/test").unwrap();
+            Self {
+                _runtime: runtime,
+                ping_fetcher,
+            }
+        }
+
+        fn before_sending(&mut self) {
+            GLOBAL_STATE.with(|g| {
+                let g = &mut *g.borrow_mut();
+                assert_eq!(g.creation_count, g.drop_count + 1);
+                assert_eq!(g.creation_count, g.on_run_count);
+                assert_eq!(g.drop_count, g.before_count);
+                assert_eq!(g.drop_count, g.watcher_count);
+                assert_eq!(g.drop_count, g.after_count);
+                g.before_count += 1;
+            });
+            assert!(!self.ping_fetcher.fetch(), "should not have message yet");
+            assert!(
+                !self.ping_fetcher.fetch_next(),
+                "should not have message yet"
+            );
+            let context = self.ping_fetcher.context();
+            assert!(context.monotonic_event_time().is_min_time());
+            assert!(context.message().is_none());
+        }
+
+        fn after_sending(&mut self) {
+            GLOBAL_STATE.with(|g| {
+                let g = &mut *g.borrow_mut();
+                assert_eq!(g.creation_count, g.drop_count + 1);
+                assert_eq!(g.creation_count, g.on_run_count);
+                assert_eq!(g.creation_count, g.before_count);
+                assert_eq!(g.creation_count, g.watcher_count);
+                assert_eq!(g.drop_count, g.after_count);
+                g.after_count += 1;
+            });
+            assert!(self.ping_fetcher.fetch(), "should have message now");
+            let context = self.ping_fetcher.context();
+            assert!(!context.monotonic_event_time().is_min_time());
+        }
+    }
+
+    impl Drop for TypedTestApplication<'_> {
+        fn drop(&mut self) {
+            GLOBAL_STATE.with(|g| {
+                let g = &mut *g.borrow_mut();
+                assert_eq!(g.creation_count, g.drop_count + 1);
+                assert_eq!(g.creation_count, g.on_run_count);
+                assert_eq!(g.creation_count, g.before_count);
+                assert_eq!(g.creation_count, g.watcher_count);
+                assert_eq!(g.creation_count, g.after_count);
+                g.drop_count += 1;
+            });
+        }
+    }
+
+    unsafe fn make_typed_test_application(
+        event_loop: *mut EventLoop,
+    ) -> Box<TypedTestApplication<'static>> {
+        GLOBAL_STATE.with(|g| {
+            let g = &mut *g.borrow_mut();
+            g.creation_count += 1;
+        });
+        Box::new(TypedTestApplication::new(EventLoopRuntime::new(event_loop)))
+    }
+
     #[cxx::bridge(namespace = "aos::events::testing")]
     mod ffi_bridge {
         extern "Rust" {
-            type TestApplication<'a>;
-
             unsafe fn make_test_application(
                 event_loop: *mut EventLoop,
             ) -> Box<TestApplication<'static>>;
 
-            fn before_sending(&mut self);
-            fn after_sending(&mut self);
+            unsafe fn make_typed_test_application(
+                event_loop: *mut EventLoop,
+            ) -> Box<TypedTestApplication<'static>>;
 
             fn completed_test_count() -> u32;
         }
 
+        extern "Rust" {
+            type TestApplication<'a>;
+
+            fn before_sending(&mut self);
+            fn after_sending(&mut self);
+        }
+
+        extern "Rust" {
+            type TypedTestApplication<'a>;
+
+            fn before_sending(&mut self);
+            fn after_sending(&mut self);
+        }
+
         unsafe extern "C++" {
             include!("aos/events/event_loop.h");
             #[namespace = "aos"]
diff --git a/third_party/flatbuffers/build_defs.bzl b/third_party/flatbuffers/build_defs.bzl
index c3e2bb0..4ebd872 100644
--- a/third_party/flatbuffers/build_defs.bzl
+++ b/third_party/flatbuffers/build_defs.bzl
@@ -43,6 +43,7 @@
 DEFAULT_FLATC_RUST_ARGS = [
     "--gen-object-api",
     "--require-explicit-ids",
+    "--gen-name-strings",
 ]
 
 DEFAULT_FLATC_TS_ARGS = [
diff --git a/third_party/flatbuffers/rust/flatbuffers/src/primitives.rs b/third_party/flatbuffers/rust/flatbuffers/src/primitives.rs
index 72764b2..4c12af8 100644
--- a/third_party/flatbuffers/rust/flatbuffers/src/primitives.rs
+++ b/third_party/flatbuffers/rust/flatbuffers/src/primitives.rs
@@ -326,3 +326,12 @@
 impl_follow_for_endian_scalar!(i64);
 impl_follow_for_endian_scalar!(f32);
 impl_follow_for_endian_scalar!(f64);
+
+pub trait FullyQualifiedName {
+    /// Returns the fully-qualified name for this table. This will include all namespaces,like
+    /// `MyGame.Monster`.
+    ///
+    /// Use `--gen-name-strings` when running the flatbuffers compile to generate implementations
+    /// of this trait for all tables.
+    fn get_fully_qualified_name() -> &'static str;
+}
diff --git a/third_party/flatbuffers/src/idl_gen_rust.cpp b/third_party/flatbuffers/src/idl_gen_rust.cpp
index 787d03a..0b0ad9a 100644
--- a/third_party/flatbuffers/src/idl_gen_rust.cpp
+++ b/third_party/flatbuffers/src/idl_gen_rust.cpp
@@ -1566,12 +1566,16 @@
 
   // Generates a fully-qualified name getter for use with --gen-name-strings
   void GenFullyQualifiedNameGetter(const StructDef &struct_def,
-                                   const std::string &name) {
+                                   const std::string &name, bool is_struct) {
     const std::string fully_qualified_name =
         struct_def.defined_namespace->GetFullyQualifiedName(name);
-    code_ += "  pub const fn get_fully_qualified_name() -> &'static str {";
+    code_ += "impl flatbuffers::FullyQualifiedName for {{STRUCT_TY}}\\";
+    if (!is_struct) { code_ += "<'_>\\"; }
+    code_ += "{";
+    code_ += "  fn get_fully_qualified_name() -> &'static str {";
     code_ += "    \"" + fully_qualified_name + "\"";
     code_ += "  }";
+    code_ += "}";
     code_ += "";
   }
 
@@ -1659,10 +1663,6 @@
     });
     code_ += "";
 
-    if (parser_.opts.generate_name_strings) {
-      GenFullyQualifiedNameGetter(struct_def, struct_def.name);
-    }
-
     code_ += "  #[inline]";
     code_ +=
         "  pub fn init_from_table(table: flatbuffers::Table<'a>) -> "
@@ -1918,6 +1918,10 @@
     code_ += "}";  // End of table impl.
     code_ += "";
 
+    if (parser_.opts.generate_name_strings) {
+      GenFullyQualifiedNameGetter(struct_def, struct_def.name, false);
+    }
+
     // Generate Verifier;
     code_ += "impl flatbuffers::Verifiable for {{STRUCT_TY}}<'_> {";
     code_ += "  #[inline]";
@@ -2735,10 +2739,6 @@
     code_ += "  }";
     code_ += "";
 
-    if (parser_.opts.generate_name_strings) {
-      GenFullyQualifiedNameGetter(struct_def, struct_def.name);
-    }
-
     // Generate accessor methods for the struct.
     ForAllStructFields(struct_def, [&](const FieldDef &field) {
       this->GenComment(field.doc_comment);
@@ -2849,6 +2849,10 @@
     code_ += "}";  // End impl Struct methods.
     code_ += "";
 
+    if (parser_.opts.generate_name_strings) {
+      GenFullyQualifiedNameGetter(struct_def, struct_def.name, true);
+    }
+
     // Generate Struct Object.
     if (parser_.opts.generate_object_based_api) {
       // Struct declaration