Add ShmEventLoop wrapper for Rust

Change-Id: I9960ce813bf2b9a6cd5fa1940ede06b6523bf5d5
Signed-off-by: Adam Snaider <adsnaider@gmail.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 07c1f3a..eb77036 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -3,7 +3,7 @@
 load("//aos:flatbuffers.bzl", "cc_static_flatbuffer")
 load("//aos:config.bzl", "aos_config")
 load("//tools/build_rules:autocxx.bzl", "autocxx_library")
-load("@rules_rust//rust:defs.bzl", "rust_doc_test", "rust_test")
+load("@rules_rust//rust:defs.bzl", "rust_doc", "rust_doc_test", "rust_test")
 
 package(default_visibility = ["//visibility:public"])
 
@@ -181,6 +181,12 @@
     ],
 )
 
+rust_doc(
+    name = "event_loop_runtime_doc",
+    crate = ":event_loop_runtime",
+    target_compatible_with = ["@platforms//cpu:x86_64"],
+)
+
 rust_doc_test(
     name = "event_loop_runtime_doc_test",
     crate = ":event_loop_runtime",
@@ -520,6 +526,15 @@
     ],
 )
 
+cc_library(
+    name = "shm_event_loop_for_rust",
+    hdrs = ["shm_event_loop_for_rust.h"],
+    deps = [
+        ":event_loop",
+        ":simulated_event_loop",
+    ],
+)
+
 autocxx_library(
     name = "simulated_event_loop_rs",
     srcs = ["simulated_event_loop.rs"],
@@ -560,6 +575,66 @@
     ],
 )
 
+autocxx_library(
+    name = "shm_event_loop_rs",
+    srcs = ["shm_event_loop.rs"],
+    crate_name = "aos_events_shm_event_loop",
+    libs = [
+        ":shm_event_loop",
+        ":shm_event_loop_for_rust",
+    ],
+    rs_deps = [
+        "@com_github_google_flatbuffers//rust",
+        "@crate_index//:futures",
+        "//aos:configuration_rust_fbs",
+        "//aos:flatbuffers_rs",
+    ],
+    target_compatible_with = select({
+        "//conditions:default": ["//tools/platforms/rust:has_support"],
+        "//tools:has_msan": ["@platforms//:incompatible"],
+    }),
+    visibility = ["//visibility:public"],
+    deps = [
+        ":event_loop_runtime",
+        "//aos:configuration_rs",
+    ],
+)
+
+rust_doc(
+    name = "shm_event_loop_rs_doc",
+    crate = ":shm_event_loop_rs",
+    target_compatible_with = ["@platforms//cpu:x86_64"],
+)
+
+rust_test(
+    name = "shm_event_loop_rs_test",
+    crate = ":shm_event_loop_rs",
+    data = [":pingpong_config"],
+    # TODO: Make Rust play happy with pic vs nopic. Details at:
+    # https://github.com/bazelbuild/rules_rust/issues/118
+    rustc_flags = ["-Crelocation-model=static"],
+    target_compatible_with = select({
+        "//conditions:default": ["//tools/platforms/rust:has_support"],
+        "//tools:has_msan": ["@platforms//:incompatible"],
+    }),
+    deps = [
+        ":ping_rust_fbs",
+        "//aos:init_rs",
+        "@crate_index//:futures",
+        "@rules_rust//tools/runfiles",
+    ],
+)
+
+rust_doc_test(
+    name = "shm_event_loop_rs_doc_test",
+    crate = ":shm_event_loop_rs",
+    target_compatible_with = ["@platforms//cpu:x86_64"],
+    deps = [
+        ":ping_rust_fbs",
+        ":pong_rust_fbs",
+    ],
+)
+
 cc_test(
     name = "event_scheduler_test",
     srcs = ["event_scheduler_test.cc"],
diff --git a/aos/events/shm_event_loop.rs b/aos/events/shm_event_loop.rs
new file mode 100644
index 0000000..3e387ef
--- /dev/null
+++ b/aos/events/shm_event_loop.rs
@@ -0,0 +1,302 @@
+pub use aos_configuration::{Configuration, ConfigurationExt};
+pub use aos_events_event_loop_runtime::EventLoop;
+pub use aos_events_event_loop_runtime::{CppExitHandle, EventLoopRuntime, ExitHandle};
+
+use aos_configuration_fbs::aos::Configuration as RustConfiguration;
+use aos_flatbuffers::{transmute_table_to, Flatbuffer};
+use autocxx::WithinBox;
+use core::marker::PhantomData;
+use core::pin::Pin;
+use std::boxed::Box;
+use std::ops::{Deref, DerefMut};
+
+autocxx::include_cpp! (
+#include "aos/events/shm_event_loop.h"
+#include "aos/events/shm_event_loop_for_rust.h"
+
+safety!(unsafe)
+
+generate!("aos::ShmEventLoopForRust")
+
+extern_cpp_type!("aos::ExitHandle", crate::CppExitHandle)
+extern_cpp_type!("aos::Configuration", crate::Configuration)
+extern_cpp_type!("aos::EventLoop", crate::EventLoop)
+);
+
+/// A Rust-owned C++ `ShmEventLoop` object.
+pub struct ShmEventLoop<'config> {
+    inner: Pin<Box<ffi::aos::ShmEventLoopForRust>>,
+    _config: PhantomData<&'config Configuration>,
+}
+
+impl<'config> ShmEventLoop<'config> {
+    /// Creates a Rust-owned ShmEventLoop.
+    pub fn new(config: &'config impl Flatbuffer<RustConfiguration<'static>>) -> Self {
+        // SAFETY: The `_config` represents the lifetime of this pointer we're handing off to c++ to
+        // store.
+        let event_loop = unsafe {
+            ffi::aos::ShmEventLoopForRust::new(transmute_table_to::<Configuration>(
+                &config.message()._tab,
+            ))
+        }
+        .within_box();
+
+        Self {
+            inner: event_loop,
+            _config: PhantomData,
+        }
+    }
+
+    /// Provides a runtime to construct the application and runs the event loop.
+    ///
+    /// The runtime is the only way to interact with the event loop. It provides the functionality
+    /// to spawn a task, construct timers, watchers, fetchers, and so on.
+    ///
+    /// Making an [`EventLoopRuntime`] is tricky since the lifetime of the runtime is invariant
+    /// w.r.t the event loop. In other words, the runtime and the event loop must have the same
+    /// lifetime. By providing access to the runtime through an [`FnOnce`], we can guarantee
+    /// that the runtime and the event loop both have the same lifetime.
+    ///
+    /// # Examples
+    ///
+    /// A ping application might do something like the following
+    ///
+    /// ```no_run
+    /// # use aos_events_shm_event_loop::*;
+    /// use ping_rust_fbs::aos::examples as ping;
+    /// use pong_rust_fbs::aos::examples as pong;
+    /// use std::cell::Cell;
+    /// use std::path::Path;
+    /// use aos_configuration::read_config_from;
+    /// use aos_events_event_loop_runtime::{Sender, Watcher};
+    ///
+    /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
+    /// let event_loop = ShmEventLoop::new(&config);
+    /// event_loop.run_with(|runtime| {
+    ///   // One task will send a ping, the other will listen to pong messages.
+    ///   let mut sender: Sender<ping::Ping> = runtime
+    ///       .make_sender("/test")
+    ///       .expect("Can't create `Ping` sender");
+    ///
+    ///   let on_run = runtime.on_run();
+    ///   // Sends a single ping message.
+    ///   let send_task = async move {
+    ///     on_run.await;
+    ///     let mut builder = sender.make_builder();
+    ///     let mut ping = ping::PingBuilder::new(builder.fbb());
+    ///     ping.add_value(10);
+    ///     let ping = ping.finish();
+    ///     builder.send(ping).expect("Can't send ping");
+    ///   };
+    ///
+    ///   let mut watcher: Watcher<pong::Pong> = runtime
+    ///       .make_watcher("/test")
+    ///       .expect("Can't create `Ping` watcher");
+    ///
+    ///   // Listens to pong messages and prints them.
+    ///   let receive_task = async move {
+    ///     loop {
+    ///       let pong = dbg!(watcher.next().await);
+    ///     }
+    ///   };
+    ///
+    ///   runtime.spawn(async move {
+    ///      futures::join!(send_task, receive_task);
+    ///      std::future::pending().await
+    ///   });
+    /// }); // Event loop starts runnning...
+    /// unreachable!("This can't be reached since no ExitHandle was made");
+    /// ```
+    ///
+    /// `run_with` can also borrow data from the outer scope that can be used in the async task.
+    ///
+    /// ```no_run
+    /// # use aos_events_shm_event_loop::*;
+    /// # use std::cell::Cell;
+    /// # use std::path::Path;
+    /// # use aos_configuration::read_config_from;
+    /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
+    /// let shared_data = Cell::new(971);
+    /// let shared_data = &shared_data;
+    /// let event_loop = ShmEventLoop::new(&config);
+    /// event_loop.run_with(|runtime| {
+    ///   // Note how `Cell` is enough since the event loop is single threaded.
+    ///   let t1 = async move {
+    ///     shared_data.set(shared_data.get() + 1);
+    ///   };
+    ///   let t2 = async move {
+    ///     shared_data.set(shared_data.get() + 1);
+    ///   };
+    ///
+    ///   runtime.spawn(async move {
+    ///      futures::join!(t1, t2);
+    ///      std::future::pending().await
+    ///   });
+    /// });
+    /// unreachable!("This can't be reached since no ExitHandle was made");
+    /// ```
+    ///
+    /// However, the spawned future must outlive `run_with`.
+    ///
+    /// ```compile_fail
+    /// # use aos_events_shm_event_loop::*;
+    /// # use std::cell::Cell;
+    /// # use std::path::Path;
+    /// # use aos_configuration::read_config_from;
+    /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
+    /// let event_loop = ShmEventLoop::new(&config);
+    /// event_loop.run_with(|runtime| {
+    ///   // ERROR: `shared_data` doesn't live long enough.
+    ///   let shared_data = Cell::new(971);
+    ///   let t1 = async {
+    ///     shared_data.set(shared_data.get() + 1);
+    ///   };
+    ///   let t2 = async {
+    ///     shared_data.set(shared_data.get() + 1);
+    ///   };
+    ///
+    ///   runtime.spawn(async move {
+    ///      futures::join!(t1, t2);
+    ///      std::future::pending().await
+    ///   });
+    /// });
+    /// ```
+    pub fn run_with<'env, F>(mut self, fun: F)
+    where
+        F: for<'event_loop> FnOnce(&mut Scoped<'event_loop, 'env, EventLoopRuntime<'event_loop>>),
+    {
+        // SAFETY: The runtime and the event loop (i.e. self) both get destroyed at the end of this
+        // scope: first the runtime followed by the event loop. The runtime gets exclusive access
+        // during initialization in `fun` while the event loop remains unused.
+        let runtime = unsafe { EventLoopRuntime::new(self.inner.as_mut().event_loop_mut()) };
+        let mut runtime = Scoped::new(runtime);
+        fun(&mut runtime);
+        self.run();
+    }
+
+    /// Makes an exit handle.
+    ///
+    /// Awaiting on the exit handle is the only way to actually exit the event loop
+    /// task, other than panicking.
+    pub fn make_exit_handle(&mut self) -> ExitHandle {
+        self.inner.as_mut().MakeExitHandle().into()
+    }
+
+    /// Runs the spawned task to completion.
+    fn run(&mut self) {
+        self.inner.as_mut().Run();
+    }
+}
+
+/// A wrapper over some data that lives for the duration of a scope.
+///
+/// This struct ensures the existence of some `'env` which outlives `'scope`. In
+/// the presence of higher-ranked trait bounds which require types that work for
+/// any `'scope`, this allows the compiler to propagate lifetime bounds which
+/// outlive any of the possible `'scope`. This is the simplest way to express
+/// this concept to the compiler right now.
+pub struct Scoped<'scope, 'env: 'scope, T: 'scope> {
+    data: T,
+    _env: PhantomData<fn(&'env ()) -> &'env ()>,
+    _scope: PhantomData<fn(&'scope ()) -> &'scope ()>,
+}
+
+impl<'scope, 'env: 'scope, T: 'scope> Scoped<'scope, 'env, T> {
+    /// Makes the [`Scoped`].
+    pub fn new(data: T) -> Self {
+        Self {
+            data,
+            _env: PhantomData,
+            _scope: PhantomData,
+        }
+    }
+}
+
+impl<'scope, 'env: 'scope, T: 'scope> Deref for Scoped<'scope, 'env, T> {
+    type Target = T;
+    fn deref(&self) -> &Self::Target {
+        &self.data
+    }
+}
+
+impl<'scope, 'env: 'scope, T: 'scope> DerefMut for Scoped<'scope, 'env, T> {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.data
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use runfiles::Runfiles;
+
+    use aos_configuration::read_config_from;
+    use aos_events_event_loop_runtime::{Sender, Watcher};
+    use aos_init::test_init;
+    use ping_rust_fbs::aos::examples as ping;
+    use std::sync::atomic::{AtomicUsize, Ordering};
+    use std::sync::Barrier;
+
+    /// Tests basic functionality with 2 threads operating their own event loops.
+    #[test]
+    fn smoke_test() {
+        test_init();
+
+        let r = Runfiles::create().unwrap();
+        let config =
+            read_config_from(&r.rlocation("org_frc971/aos/events/pingpong_config.json")).unwrap();
+
+        const VALUE: i32 = 971;
+        let barrier = Barrier::new(2);
+        let count = AtomicUsize::new(0);
+
+        std::thread::scope(|s| {
+            let config = &config;
+            let barrier = &barrier;
+            let count = &count;
+            s.spawn(move || {
+                let mut event_loop = ShmEventLoop::new(config);
+                let exit_handle = event_loop.make_exit_handle();
+                event_loop.run_with(|runtime| {
+                    let mut watcher: Watcher<ping::Ping> = runtime
+                        .make_watcher("/test")
+                        .expect("Can't create `Ping` watcher");
+                    let on_run = runtime.on_run();
+                    runtime.spawn(async move {
+                        on_run.await;
+                        barrier.wait();
+                        let ping = watcher.next().await;
+                        assert_eq!(ping.message().unwrap().value(), VALUE);
+                        count.fetch_add(1, Ordering::Relaxed);
+                        exit_handle.exit().await
+                    });
+                });
+            });
+            s.spawn(move || {
+                let mut event_loop = ShmEventLoop::new(config);
+                let exit_handle = event_loop.make_exit_handle();
+                event_loop.run_with(|runtime| {
+                    let mut sender: Sender<ping::Ping> = runtime
+                        .make_sender("/test")
+                        .expect("Can't create `Ping` sender");
+                    let on_run = runtime.on_run();
+                    runtime.spawn(async move {
+                        on_run.await;
+                        // Give the waiting thread a chance to start.
+                        barrier.wait();
+                        let mut sender = sender.make_builder();
+                        let mut ping = ping::PingBuilder::new(sender.fbb());
+                        ping.add_value(VALUE);
+                        let ping = ping.finish();
+                        sender.send(ping).expect("send should succeed");
+                        count.fetch_add(1, Ordering::Relaxed);
+                        exit_handle.exit().await
+                    });
+                });
+            });
+        });
+
+        assert_eq!(count.into_inner(), 2, "Not all event loops ran.");
+    }
+}
diff --git a/aos/events/shm_event_loop_for_rust.h b/aos/events/shm_event_loop_for_rust.h
new file mode 100644
index 0000000..3a815e1
--- /dev/null
+++ b/aos/events/shm_event_loop_for_rust.h
@@ -0,0 +1,31 @@
+#ifndef AOS_EVENTS_SHM_EVENT_LOOP_FOR_RUST_H_
+#define AOS_EVENTS_SHM_EVENT_LOOP_FOR_RUST_H_
+
+#include <memory>
+
+#include "aos/events/event_loop.h"
+#include "aos/events/shm_event_loop.h"
+
+namespace aos {
+
+class ShmEventLoopForRust {
+ public:
+  ShmEventLoopForRust(const Configuration *configuration)
+      : event_loop_(configuration) {}
+
+  const EventLoop *event_loop() const { return &event_loop_; }
+  EventLoop *event_loop_mut() { return &event_loop_; }
+
+  std::unique_ptr<ExitHandle> MakeExitHandle() {
+    return event_loop_.MakeExitHandle();
+  }
+
+  void Run() { event_loop_.Run(); }
+
+ private:
+  ShmEventLoop event_loop_;
+};
+
+}  // namespace aos
+
+#endif  // AOS_EVENTS_SHM_EVENT_LOOP_FOR_RUST_H_