blob: cd611daae3aed0a6bd570c39403f5a5966161a2c [file] [log] [blame]
Adam Snaiderf2a45852023-06-24 14:23:32 -07001pub use aos_configuration::{Configuration, ConfigurationExt};
Adam Snaider88097232023-10-17 18:43:14 -07002pub use aos_events_event_loop_runtime::{
Adam Snaidere98c2482023-10-17 19:02:03 -07003 CppEventLoop, CppEventLoopRuntime, CppExitHandle, EventLoopRuntime, ExitHandle,
Adam Snaider88097232023-10-17 18:43:14 -07004};
Adam Snaiderf2a45852023-06-24 14:23:32 -07005
6use aos_configuration_fbs::aos::Configuration as RustConfiguration;
7use aos_flatbuffers::{transmute_table_to, Flatbuffer};
8use autocxx::WithinBox;
9use core::marker::PhantomData;
10use core::pin::Pin;
11use std::boxed::Box;
12use std::ops::{Deref, DerefMut};
13
14autocxx::include_cpp! (
15#include "aos/events/shm_event_loop.h"
16#include "aos/events/shm_event_loop_for_rust.h"
17
18safety!(unsafe)
19
20generate!("aos::ShmEventLoopForRust")
21
22extern_cpp_type!("aos::ExitHandle", crate::CppExitHandle)
23extern_cpp_type!("aos::Configuration", crate::Configuration)
Adam Snaider88097232023-10-17 18:43:14 -070024extern_cpp_type!("aos::EventLoop", crate::CppEventLoop)
Adam Snaiderf2a45852023-06-24 14:23:32 -070025);
26
27/// A Rust-owned C++ `ShmEventLoop` object.
28pub struct ShmEventLoop<'config> {
29 inner: Pin<Box<ffi::aos::ShmEventLoopForRust>>,
30 _config: PhantomData<&'config Configuration>,
31}
32
33impl<'config> ShmEventLoop<'config> {
34 /// Creates a Rust-owned ShmEventLoop.
35 pub fn new(config: &'config impl Flatbuffer<RustConfiguration<'static>>) -> Self {
36 // SAFETY: The `_config` represents the lifetime of this pointer we're handing off to c++ to
37 // store.
38 let event_loop = unsafe {
39 ffi::aos::ShmEventLoopForRust::new(transmute_table_to::<Configuration>(
40 &config.message()._tab,
41 ))
42 }
43 .within_box();
44
45 Self {
46 inner: event_loop,
47 _config: PhantomData,
48 }
49 }
50
51 /// Provides a runtime to construct the application and runs the event loop.
52 ///
53 /// The runtime is the only way to interact with the event loop. It provides the functionality
54 /// to spawn a task, construct timers, watchers, fetchers, and so on.
55 ///
56 /// Making an [`EventLoopRuntime`] is tricky since the lifetime of the runtime is invariant
57 /// w.r.t the event loop. In other words, the runtime and the event loop must have the same
58 /// lifetime. By providing access to the runtime through an [`FnOnce`], we can guarantee
59 /// that the runtime and the event loop both have the same lifetime.
60 ///
61 /// # Examples
62 ///
63 /// A ping application might do something like the following
64 ///
65 /// ```no_run
66 /// # use aos_events_shm_event_loop::*;
67 /// use ping_rust_fbs::aos::examples as ping;
68 /// use pong_rust_fbs::aos::examples as pong;
Adam Snaidera3317c82023-10-02 16:02:36 -070069 /// use std::borrow::Borrow;
Adam Snaiderf2a45852023-06-24 14:23:32 -070070 /// use std::cell::Cell;
71 /// use std::path::Path;
72 /// use aos_configuration::read_config_from;
73 /// use aos_events_event_loop_runtime::{Sender, Watcher};
74 ///
75 /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
76 /// let event_loop = ShmEventLoop::new(&config);
77 /// event_loop.run_with(|runtime| {
78 /// // One task will send a ping, the other will listen to pong messages.
79 /// let mut sender: Sender<ping::Ping> = runtime
80 /// .make_sender("/test")
81 /// .expect("Can't create `Ping` sender");
82 ///
83 /// let on_run = runtime.on_run();
84 /// // Sends a single ping message.
85 /// let send_task = async move {
Adam Snaidera3317c82023-10-02 16:02:36 -070086 /// on_run.borrow().await;
Adam Snaiderf2a45852023-06-24 14:23:32 -070087 /// let mut builder = sender.make_builder();
88 /// let mut ping = ping::PingBuilder::new(builder.fbb());
89 /// ping.add_value(10);
90 /// let ping = ping.finish();
91 /// builder.send(ping).expect("Can't send ping");
92 /// };
93 ///
94 /// let mut watcher: Watcher<pong::Pong> = runtime
95 /// .make_watcher("/test")
96 /// .expect("Can't create `Ping` watcher");
97 ///
98 /// // Listens to pong messages and prints them.
99 /// let receive_task = async move {
100 /// loop {
101 /// let pong = dbg!(watcher.next().await);
102 /// }
103 /// };
104 ///
105 /// runtime.spawn(async move {
106 /// futures::join!(send_task, receive_task);
107 /// std::future::pending().await
108 /// });
109 /// }); // Event loop starts runnning...
110 /// unreachable!("This can't be reached since no ExitHandle was made");
111 /// ```
112 ///
113 /// `run_with` can also borrow data from the outer scope that can be used in the async task.
114 ///
115 /// ```no_run
116 /// # use aos_events_shm_event_loop::*;
117 /// # use std::cell::Cell;
118 /// # use std::path::Path;
119 /// # use aos_configuration::read_config_from;
120 /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
121 /// let shared_data = Cell::new(971);
122 /// let shared_data = &shared_data;
123 /// let event_loop = ShmEventLoop::new(&config);
124 /// event_loop.run_with(|runtime| {
125 /// // Note how `Cell` is enough since the event loop is single threaded.
126 /// let t1 = async move {
127 /// shared_data.set(shared_data.get() + 1);
128 /// };
129 /// let t2 = async move {
130 /// shared_data.set(shared_data.get() + 1);
131 /// };
132 ///
133 /// runtime.spawn(async move {
134 /// futures::join!(t1, t2);
135 /// std::future::pending().await
136 /// });
137 /// });
138 /// unreachable!("This can't be reached since no ExitHandle was made");
139 /// ```
140 ///
141 /// However, the spawned future must outlive `run_with`.
142 ///
143 /// ```compile_fail
144 /// # use aos_events_shm_event_loop::*;
145 /// # use std::cell::Cell;
146 /// # use std::path::Path;
147 /// # use aos_configuration::read_config_from;
148 /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
149 /// let event_loop = ShmEventLoop::new(&config);
150 /// event_loop.run_with(|runtime| {
151 /// // ERROR: `shared_data` doesn't live long enough.
152 /// let shared_data = Cell::new(971);
153 /// let t1 = async {
154 /// shared_data.set(shared_data.get() + 1);
155 /// };
156 /// let t2 = async {
157 /// shared_data.set(shared_data.get() + 1);
158 /// };
159 ///
160 /// runtime.spawn(async move {
161 /// futures::join!(t1, t2);
162 /// std::future::pending().await
163 /// });
164 /// });
165 /// ```
166 pub fn run_with<'env, F>(mut self, fun: F)
167 where
Adam Snaiderde51c672023-09-28 21:55:43 -0700168 F: for<'event_loop> FnOnce(
169 &'event_loop mut Scoped<'event_loop, 'env, EventLoopRuntime<'event_loop>>,
170 ),
Adam Snaiderf2a45852023-06-24 14:23:32 -0700171 {
172 // SAFETY: The runtime and the event loop (i.e. self) both get destroyed at the end of this
173 // scope: first the runtime followed by the event loop. The runtime gets exclusive access
174 // during initialization in `fun` while the event loop remains unused.
Adam Snaidere98c2482023-10-17 19:02:03 -0700175 let cpp_runtime =
176 unsafe { CppEventLoopRuntime::new(self.inner.as_mut().event_loop_mut()).within_box() };
177 let runtime = unsafe { EventLoopRuntime::new(&cpp_runtime) };
Adam Snaiderf2a45852023-06-24 14:23:32 -0700178 let mut runtime = Scoped::new(runtime);
179 fun(&mut runtime);
180 self.run();
181 }
182
183 /// Makes an exit handle.
184 ///
185 /// Awaiting on the exit handle is the only way to actually exit the event loop
186 /// task, other than panicking.
187 pub fn make_exit_handle(&mut self) -> ExitHandle {
188 self.inner.as_mut().MakeExitHandle().into()
189 }
190
191 /// Runs the spawned task to completion.
192 fn run(&mut self) {
193 self.inner.as_mut().Run();
194 }
195}
196
197/// A wrapper over some data that lives for the duration of a scope.
198///
199/// This struct ensures the existence of some `'env` which outlives `'scope`. In
200/// the presence of higher-ranked trait bounds which require types that work for
201/// any `'scope`, this allows the compiler to propagate lifetime bounds which
202/// outlive any of the possible `'scope`. This is the simplest way to express
203/// this concept to the compiler right now.
204pub struct Scoped<'scope, 'env: 'scope, T: 'scope> {
205 data: T,
206 _env: PhantomData<fn(&'env ()) -> &'env ()>,
207 _scope: PhantomData<fn(&'scope ()) -> &'scope ()>,
208}
209
210impl<'scope, 'env: 'scope, T: 'scope> Scoped<'scope, 'env, T> {
211 /// Makes the [`Scoped`].
212 pub fn new(data: T) -> Self {
213 Self {
214 data,
215 _env: PhantomData,
216 _scope: PhantomData,
217 }
218 }
219}
220
221impl<'scope, 'env: 'scope, T: 'scope> Deref for Scoped<'scope, 'env, T> {
222 type Target = T;
223 fn deref(&self) -> &Self::Target {
224 &self.data
225 }
226}
227
228impl<'scope, 'env: 'scope, T: 'scope> DerefMut for Scoped<'scope, 'env, T> {
229 fn deref_mut(&mut self) -> &mut Self::Target {
230 &mut self.data
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237
238 use runfiles::Runfiles;
239
240 use aos_configuration::read_config_from;
241 use aos_events_event_loop_runtime::{Sender, Watcher};
Adam Snaiderc8b7e752023-09-14 14:27:53 -0700242 use aos_test_init::test_init;
Adam Snaiderf2a45852023-06-24 14:23:32 -0700243 use ping_rust_fbs::aos::examples as ping;
Adam Snaidera3317c82023-10-02 16:02:36 -0700244 use std::borrow::Borrow;
Adam Snaiderf2a45852023-06-24 14:23:32 -0700245 use std::sync::atomic::{AtomicUsize, Ordering};
246 use std::sync::Barrier;
247
248 /// Tests basic functionality with 2 threads operating their own event loops.
249 #[test]
250 fn smoke_test() {
251 test_init();
252
253 let r = Runfiles::create().unwrap();
254 let config =
255 read_config_from(&r.rlocation("org_frc971/aos/events/pingpong_config.json")).unwrap();
256
257 const VALUE: i32 = 971;
258 let barrier = Barrier::new(2);
259 let count = AtomicUsize::new(0);
260
261 std::thread::scope(|s| {
262 let config = &config;
263 let barrier = &barrier;
264 let count = &count;
265 s.spawn(move || {
266 let mut event_loop = ShmEventLoop::new(config);
267 let exit_handle = event_loop.make_exit_handle();
268 event_loop.run_with(|runtime| {
Adam Snaiderde51c672023-09-28 21:55:43 -0700269 runtime.spawn(async {
270 let mut watcher: Watcher<ping::Ping> = runtime
271 .make_watcher("/test")
272 .expect("Can't create `Ping` watcher");
Adam Snaidera3317c82023-10-02 16:02:36 -0700273 runtime.on_run().borrow().await;
Adam Snaiderf2a45852023-06-24 14:23:32 -0700274 barrier.wait();
275 let ping = watcher.next().await;
276 assert_eq!(ping.message().unwrap().value(), VALUE);
277 count.fetch_add(1, Ordering::Relaxed);
278 exit_handle.exit().await
279 });
280 });
281 });
282 s.spawn(move || {
283 let mut event_loop = ShmEventLoop::new(config);
284 let exit_handle = event_loop.make_exit_handle();
285 event_loop.run_with(|runtime| {
Adam Snaiderde51c672023-09-28 21:55:43 -0700286 runtime.spawn(async {
287 let mut sender: Sender<ping::Ping> = runtime
288 .make_sender("/test")
289 .expect("Can't create `Ping` sender");
Adam Snaidera3317c82023-10-02 16:02:36 -0700290 runtime.on_run().borrow().await;
Adam Snaiderf2a45852023-06-24 14:23:32 -0700291 // Give the waiting thread a chance to start.
292 barrier.wait();
293 let mut sender = sender.make_builder();
294 let mut ping = ping::PingBuilder::new(sender.fbb());
295 ping.add_value(VALUE);
296 let ping = ping.finish();
297 sender.send(ping).expect("send should succeed");
298 count.fetch_add(1, Ordering::Relaxed);
299 exit_handle.exit().await
300 });
301 });
302 });
303 });
304
305 assert_eq!(count.into_inner(), 2, "Not all event loops ran.");
306 }
307}