Add a Rust ping/pong example

Change-Id: Ida4d8c5abcf31a5ae3eb7db5a556fcbc3e1cfe7d
Signed-off-by: Adam Snaider <adsnaider@gmail.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index eb77036..d78df3a 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -3,7 +3,7 @@
 load("//aos:flatbuffers.bzl", "cc_static_flatbuffer")
 load("//aos:config.bzl", "aos_config")
 load("//tools/build_rules:autocxx.bzl", "autocxx_library")
-load("@rules_rust//rust:defs.bzl", "rust_doc", "rust_doc_test", "rust_test")
+load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_doc", "rust_doc_test", "rust_test")
 
 package(default_visibility = ["//visibility:public"])
 
@@ -184,7 +184,6 @@
 rust_doc(
     name = "event_loop_runtime_doc",
     crate = ":event_loop_runtime",
-    target_compatible_with = ["@platforms//cpu:x86_64"],
 )
 
 rust_doc_test(
@@ -265,6 +264,60 @@
     ],
 )
 
+rust_binary(
+    name = "ping_rs",
+    srcs = [
+        "ping.rs",
+    ],
+    data = [
+        ":pingpong_config",
+    ],
+    rustc_flags = ["-Crelocation-model=static"],
+    target_compatible_with = select({
+        "//conditions:default": ["//tools/platforms/rust:has_support"],
+        "//tools:has_msan": ["@platforms//:incompatible"],
+    }),
+    deps = [
+        ":event_loop_runtime",
+        ":ping_rust_fbs",
+        ":pong_rust_fbs",
+        ":shm_event_loop_rs",
+        "//aos:configuration_rs",
+        "//aos:configuration_rust_fbs",
+        "//aos:flatbuffers_rs",
+        "//aos:init_rs",
+        "@com_github_google_flatbuffers//rust",
+        "@crate_index//:futures",
+    ],
+)
+
+rust_binary(
+    name = "pong_rs",
+    srcs = [
+        "pong.rs",
+    ],
+    data = [
+        ":pingpong_config",
+    ],
+    rustc_flags = ["-Crelocation-model=static"],
+    target_compatible_with = select({
+        "//conditions:default": ["//tools/platforms/rust:has_support"],
+        "//tools:has_msan": ["@platforms//:incompatible"],
+    }),
+    deps = [
+        ":event_loop_runtime",
+        ":ping_rust_fbs",
+        ":pong_rust_fbs",
+        ":shm_event_loop_rs",
+        "//aos:configuration_rs",
+        "//aos:configuration_rust_fbs",
+        "//aos:flatbuffers_rs",
+        "//aos:init_rs",
+        "@com_github_google_flatbuffers//rust",
+        "@crate_index//:futures",
+    ],
+)
+
 aos_config(
     name = "aos_config",
     src = "aos.json",
diff --git a/aos/events/ping.rs b/aos/events/ping.rs
new file mode 100644
index 0000000..b9725b8
--- /dev/null
+++ b/aos/events/ping.rs
@@ -0,0 +1,87 @@
+use aos_configuration as config;
+use aos_events_event_loop_runtime::{EventLoopRuntime, Sender, Watcher};
+use aos_events_shm_event_loop::ShmEventLoop;
+use core::cell::Cell;
+use core::future::Future;
+use core::time::Duration;
+use futures::never::Never;
+use std::path::Path;
+
+use ping_rust_fbs::aos::examples as ping;
+use pong_rust_fbs::aos::examples as pong;
+
+fn main() {
+    aos_init::init();
+    let config = config::read_config_from(Path::new("pingpong_config.json")).unwrap();
+    let ping = PingTask::new();
+    ShmEventLoop::new(&config).run_with(|runtime| {
+        let task = ping.tasks(runtime);
+        runtime.spawn(task);
+    });
+}
+
+#[derive(Debug)]
+struct PingTask {
+    counter: Cell<i32>,
+}
+
+impl PingTask {
+    pub fn new() -> Self {
+        Self {
+            counter: Cell::new(0),
+        }
+    }
+
+    /// Returns a future with all the tasks for the ping process
+    pub fn tasks(&self, event_loop: &mut EventLoopRuntime) -> impl Future<Output = Never> + '_ {
+        let ping = self.ping(event_loop);
+        let handle_pong = self.handle_pong(event_loop);
+
+        async move {
+            futures::join!(ping, handle_pong);
+            unreachable!("Let's hope `never_type` gets stabilized soon :)");
+        }
+    }
+
+    fn ping(&self, event_loop: &mut EventLoopRuntime) -> impl Future<Output = Never> + '_ {
+        // The sender is used to send messages back to the pong channel.
+        let mut ping_sender: Sender<ping::Ping> = event_loop.make_sender("/test").unwrap();
+        let startup = event_loop.on_run();
+
+        let mut interval = event_loop.add_interval(Duration::from_secs(1));
+
+        async move {
+            // Wait for startup.
+            startup.await;
+            loop {
+                interval.tick().await;
+                self.counter.set(self.counter.get() + 1);
+                let mut builder = ping_sender.make_builder();
+                let mut ping = ping::PingBuilder::new(builder.fbb());
+                let iter = self.counter.get();
+                ping.add_value(iter);
+                let ping = ping.finish();
+                builder.send(ping).expect("Can't send ping");
+            }
+        }
+    }
+
+    fn handle_pong(&self, event_loop: &mut EventLoopRuntime) -> impl Future<Output = Never> + '_ {
+        // The watcher gives us incoming ping messages.
+        let mut pong_watcher: Watcher<pong::Pong> = event_loop.make_watcher("/test").unwrap();
+        let startup = event_loop.on_run();
+
+        async move {
+            // Wait for startup.
+            startup.await;
+            loop {
+                let pong = dbg!(pong_watcher.next().await);
+                assert_eq!(
+                    pong.message().unwrap().value(),
+                    self.counter.get(),
+                    "Missed a reply"
+                );
+            }
+        }
+    }
+}
diff --git a/aos/events/pong.rs b/aos/events/pong.rs
new file mode 100644
index 0000000..b817ac2
--- /dev/null
+++ b/aos/events/pong.rs
@@ -0,0 +1,42 @@
+use aos_configuration as config;
+use aos_events_event_loop_runtime::{EventLoopRuntime, Sender, Watcher};
+use aos_events_shm_event_loop::ShmEventLoop;
+use core::future::Future;
+use futures::never::Never;
+use std::path::Path;
+
+use ping_rust_fbs::aos::examples as ping;
+use pong_rust_fbs::aos::examples as pong;
+
+fn main() {
+    aos_init::init();
+    let config = config::read_config_from(Path::new("pingpong_config.json")).unwrap();
+    ShmEventLoop::new(&config).run_with(|runtime| {
+        let task = pong(runtime);
+        runtime.spawn(task);
+    });
+}
+
+/// Responds to ping messages with an equivalent pong.
+fn pong(event_loop: &mut EventLoopRuntime) -> impl Future<Output = Never> {
+    // The watcher gives us incoming ping messages.
+    let mut ping_watcher: Watcher<ping::Ping> = event_loop.make_watcher("/test").unwrap();
+
+    // The sender is used to send messages back to the pong channel.
+    let mut pong_sender: Sender<pong::Pong> = event_loop.make_sender("/test").unwrap();
+    // Wait for startup.
+    let startup = event_loop.on_run();
+
+    async move {
+        startup.await;
+        loop {
+            let ping = dbg!(ping_watcher.next().await);
+
+            let mut builder = pong_sender.make_builder();
+            let mut pong = pong::PongBuilder::new(builder.fbb());
+            pong.add_value(ping.message().unwrap().value());
+            let pong = pong.finish();
+            builder.send(pong).expect("Can't send pong reponse");
+        }
+    }
+}
diff --git a/aos/init.rs b/aos/init.rs
index 8a5f262..9ac62f1 100644
--- a/aos/init.rs
+++ b/aos/init.rs
@@ -16,6 +16,12 @@
 ///
 /// Panics if non-test initialization has already been performed.
 pub fn test_init() {
+    init();
+    // TODO(Brian): Do we want any of the other stuff that `:gtest_main` has?
+    // TODO(Brian): Call `aos::SetShmBase` like `:gtest_main` does.
+}
+
+pub fn init() {
     static ONCE: Once = Once::new();
     ONCE.call_once(|| {
         let argv0 = std::env::args().next().expect("must have argv[0]");
@@ -23,7 +29,4 @@
         // SAFETY: argv0 is a NUL-terminated string.
         unsafe { ffi::aos::InitFromRust(argv0.as_ptr()) };
     });
-
-    // TODO(Brian): Do we want any of the other stuff that `:gtest_main` has?
-    // TODO(Brian): Call `aos::SetShmBase` like `:gtest_main` does.
 }