Make ping/pong rust example realtime

Change-Id: I9d1a639afe0b87dd9e303a70f9d3ed1989465139
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/event_loop_runtime.rs b/aos/events/event_loop_runtime.rs
index fbce7c9..6bcca6a 100644
--- a/aos/events/event_loop_runtime.rs
+++ b/aos/events/event_loop_runtime.rs
@@ -1426,7 +1426,7 @@
 #[repr(transparent)]
 pub struct OnRun(Pin<Box<ffi::aos::OnRunForRust>>);
 
-impl Future for OnRun {
+impl Future for &'_ OnRun {
     type Output = ();
 
     fn poll(self: Pin<&mut Self>, _: &mut std::task::Context) -> Poll<()> {
diff --git a/aos/events/event_loop_runtime_test_lib.rs b/aos/events/event_loop_runtime_test_lib.rs
index 56dc9ef..120f8aa 100644
--- a/aos/events/event_loop_runtime_test_lib.rs
+++ b/aos/events/event_loop_runtime_test_lib.rs
@@ -8,7 +8,7 @@
 mod tests {
     use super::*;
 
-    use std::cell::RefCell;
+    use std::{borrow::Borrow, cell::RefCell};
 
     #[derive(Debug, Default)]
     struct GlobalState {
@@ -45,7 +45,7 @@
     }
 
     impl<'event_loop> TestApplication<'event_loop> {
-        fn new(mut runtime: EventLoopRuntime<'event_loop>) -> Self {
+        fn new(runtime: EventLoopRuntime<'event_loop>) -> Self {
             let ping_channel = runtime
                 .get_raw_channel("/test", "aos.examples.Ping")
                 .expect("Should have Ping channel");
@@ -57,7 +57,7 @@
             );
             let on_run = runtime.on_run();
             runtime.spawn(async move {
-                on_run.await;
+                on_run.borrow().await;
                 GLOBAL_STATE.with(|g| {
                     let g = &mut *g.borrow_mut();
                     assert_eq!(g.creation_count, g.drop_count + 1);
@@ -165,12 +165,12 @@
     }
 
     impl<'event_loop> TypedTestApplication<'event_loop> {
-        fn new(mut runtime: EventLoopRuntime<'event_loop>) -> Self {
+        fn new(runtime: EventLoopRuntime<'event_loop>) -> Self {
             let mut ping_watcher = runtime.make_watcher::<Ping<'static>>("/test").unwrap();
             let mut pong_sender = runtime.make_sender::<Pong<'static>>("/test").unwrap();
             let on_run = runtime.on_run();
             runtime.spawn(async move {
-                on_run.await;
+                on_run.borrow().await;
                 GLOBAL_STATE.with(|g| {
                     let g = &mut *g.borrow_mut();
                     assert_eq!(g.creation_count, g.drop_count + 1);
@@ -274,7 +274,7 @@
     }
 
     impl<'event_loop> PanicApplication<'event_loop> {
-        fn new(mut runtime: EventLoopRuntime<'event_loop>) -> Self {
+        fn new(runtime: EventLoopRuntime<'event_loop>) -> Self {
             runtime.spawn(async move {
                 panic!("Test Rust panic");
             });
@@ -292,10 +292,10 @@
     }
 
     impl<'event_loop> PanicOnRunApplication<'event_loop> {
-        fn new(mut runtime: EventLoopRuntime<'event_loop>) -> Self {
+        fn new(runtime: EventLoopRuntime<'event_loop>) -> Self {
             let on_run = runtime.on_run();
             runtime.spawn(async move {
-                on_run.await;
+                on_run.borrow().await;
                 panic!("Test Rust panic");
             });
 
diff --git a/aos/events/ping.rs b/aos/events/ping.rs
index d699d3c..e85ae06 100644
--- a/aos/events/ping.rs
+++ b/aos/events/ping.rs
@@ -6,6 +6,7 @@
 use core::cell::Cell;
 use core::time::Duration;
 use futures::never::Never;
+use std::borrow::Borrow;
 use std::path::Path;
 
 use ping_rust_fbs::aos::examples as ping;
@@ -26,6 +27,7 @@
     let config = config::read_config_from(Path::new("pingpong_config.json")).unwrap();
     let ping = PingTask::new();
     ShmEventLoop::new(&config).run_with(|runtime| {
+        runtime.set_realtime_priority(5);
         runtime.spawn(ping.tasks(runtime, app.sleep));
     });
 }
@@ -48,12 +50,14 @@
         unreachable!("Let's hope `never_type` gets stabilized soon :)");
     }
 
-    async fn ping(&self, event_loop: &EventLoopRuntime<'_>, sleep: u64) -> Never {
+    pub async fn ping(&self, event_loop: &EventLoopRuntime<'_>, sleep: u64) -> Never {
         // The sender is used to send messages back to the pong channel.
         let mut ping_sender: Sender<ping::Ping> = event_loop.make_sender("/test").unwrap();
         let mut interval = event_loop.add_interval(Duration::from_micros(sleep));
 
-        event_loop.on_run().await;
+        let on_run = event_loop.on_run();
+        on_run.borrow().await;
+
         loop {
             interval.tick().await;
             self.counter.set(self.counter.get() + 1);
@@ -67,13 +71,14 @@
         }
     }
 
-    async fn handle_pong(&self, event_loop: &EventLoopRuntime<'_>) -> Never {
+    pub async fn handle_pong(&self, event_loop: &EventLoopRuntime<'_>) -> Never {
         // The watcher gives us incoming ping messages.
         let mut pong_watcher: Watcher<pong::Pong> = event_loop.make_watcher("/test").unwrap();
 
-        event_loop.on_run().await;
+        let on_run = event_loop.on_run();
+        on_run.borrow().await;
         loop {
-            let pong = dbg!(pong_watcher.next().await);
+            let pong = pong_watcher.next().await;
             assert_eq!(
                 pong.message().unwrap().value(),
                 self.counter.get(),
diff --git a/aos/events/pong.rs b/aos/events/pong.rs
index 01af56f..d030dc7 100644
--- a/aos/events/pong.rs
+++ b/aos/events/pong.rs
@@ -4,7 +4,7 @@
 use aos_init::WithCppFlags;
 use clap::Parser;
 use futures::never::Never;
-use std::path::Path;
+use std::{borrow::Borrow, path::Path};
 
 use ping_rust_fbs::aos::examples as ping;
 use pong_rust_fbs::aos::examples as pong;
@@ -20,6 +20,7 @@
     let config = config::read_config_from(Path::new("pingpong_config.json")).unwrap();
     ShmEventLoop::new(&config).run_with(|runtime| {
         let task = pong(runtime);
+        runtime.set_realtime_priority(5);
         runtime.spawn(task);
     });
 }
@@ -32,7 +33,8 @@
     // The sender is used to send messages back to the pong channel.
     let mut pong_sender: Sender<pong::Pong> = event_loop.make_sender("/test").unwrap();
 
-    event_loop.on_run().await;
+    let on_run = event_loop.on_run();
+    on_run.borrow().await;
     loop {
         let ping = dbg!(ping_watcher.next().await);
 
diff --git a/aos/events/shm_event_loop.rs b/aos/events/shm_event_loop.rs
index cebf81b..04cf42c 100644
--- a/aos/events/shm_event_loop.rs
+++ b/aos/events/shm_event_loop.rs
@@ -65,6 +65,7 @@
     /// # use aos_events_shm_event_loop::*;
     /// use ping_rust_fbs::aos::examples as ping;
     /// use pong_rust_fbs::aos::examples as pong;
+    /// use std::borrow::Borrow;
     /// use std::cell::Cell;
     /// use std::path::Path;
     /// use aos_configuration::read_config_from;
@@ -81,7 +82,7 @@
     ///   let on_run = runtime.on_run();
     ///   // Sends a single ping message.
     ///   let send_task = async move {
-    ///     on_run.await;
+    ///     on_run.borrow().await;
     ///     let mut builder = sender.make_builder();
     ///     let mut ping = ping::PingBuilder::new(builder.fbb());
     ///     ping.add_value(10);
@@ -237,6 +238,7 @@
     use aos_events_event_loop_runtime::{Sender, Watcher};
     use aos_test_init::test_init;
     use ping_rust_fbs::aos::examples as ping;
+    use std::borrow::Borrow;
     use std::sync::atomic::{AtomicUsize, Ordering};
     use std::sync::Barrier;
 
@@ -265,7 +267,7 @@
                         let mut watcher: Watcher<ping::Ping> = runtime
                             .make_watcher("/test")
                             .expect("Can't create `Ping` watcher");
-                        runtime.on_run().await;
+                        runtime.on_run().borrow().await;
                         barrier.wait();
                         let ping = watcher.next().await;
                         assert_eq!(ping.message().unwrap().value(), VALUE);
@@ -282,7 +284,7 @@
                         let mut sender: Sender<ping::Ping> = runtime
                             .make_sender("/test")
                             .expect("Can't create `Ping` sender");
-                        runtime.on_run().await;
+                        runtime.on_run().borrow().await;
                         // Give the waiting thread a chance to start.
                         barrier.wait();
                         let mut sender = sender.make_builder();