blob: cebf81b514a43f7e9765e7efd35078d942930d39 [file] [log] [blame]
Adam Snaiderf2a45852023-06-24 14:23:32 -07001pub use aos_configuration::{Configuration, ConfigurationExt};
2pub use aos_events_event_loop_runtime::EventLoop;
3pub use aos_events_event_loop_runtime::{CppExitHandle, EventLoopRuntime, ExitHandle};
4
5use aos_configuration_fbs::aos::Configuration as RustConfiguration;
6use aos_flatbuffers::{transmute_table_to, Flatbuffer};
7use autocxx::WithinBox;
8use core::marker::PhantomData;
9use core::pin::Pin;
10use std::boxed::Box;
11use std::ops::{Deref, DerefMut};
12
13autocxx::include_cpp! (
14#include "aos/events/shm_event_loop.h"
15#include "aos/events/shm_event_loop_for_rust.h"
16
17safety!(unsafe)
18
19generate!("aos::ShmEventLoopForRust")
20
21extern_cpp_type!("aos::ExitHandle", crate::CppExitHandle)
22extern_cpp_type!("aos::Configuration", crate::Configuration)
23extern_cpp_type!("aos::EventLoop", crate::EventLoop)
24);
25
26/// A Rust-owned C++ `ShmEventLoop` object.
27pub struct ShmEventLoop<'config> {
28 inner: Pin<Box<ffi::aos::ShmEventLoopForRust>>,
29 _config: PhantomData<&'config Configuration>,
30}
31
32impl<'config> ShmEventLoop<'config> {
33 /// Creates a Rust-owned ShmEventLoop.
34 pub fn new(config: &'config impl Flatbuffer<RustConfiguration<'static>>) -> Self {
35 // SAFETY: The `_config` represents the lifetime of this pointer we're handing off to c++ to
36 // store.
37 let event_loop = unsafe {
38 ffi::aos::ShmEventLoopForRust::new(transmute_table_to::<Configuration>(
39 &config.message()._tab,
40 ))
41 }
42 .within_box();
43
44 Self {
45 inner: event_loop,
46 _config: PhantomData,
47 }
48 }
49
50 /// Provides a runtime to construct the application and runs the event loop.
51 ///
52 /// The runtime is the only way to interact with the event loop. It provides the functionality
53 /// to spawn a task, construct timers, watchers, fetchers, and so on.
54 ///
55 /// Making an [`EventLoopRuntime`] is tricky since the lifetime of the runtime is invariant
56 /// w.r.t the event loop. In other words, the runtime and the event loop must have the same
57 /// lifetime. By providing access to the runtime through an [`FnOnce`], we can guarantee
58 /// that the runtime and the event loop both have the same lifetime.
59 ///
60 /// # Examples
61 ///
62 /// A ping application might do something like the following
63 ///
64 /// ```no_run
65 /// # use aos_events_shm_event_loop::*;
66 /// use ping_rust_fbs::aos::examples as ping;
67 /// use pong_rust_fbs::aos::examples as pong;
68 /// use std::cell::Cell;
69 /// use std::path::Path;
70 /// use aos_configuration::read_config_from;
71 /// use aos_events_event_loop_runtime::{Sender, Watcher};
72 ///
73 /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
74 /// let event_loop = ShmEventLoop::new(&config);
75 /// event_loop.run_with(|runtime| {
76 /// // One task will send a ping, the other will listen to pong messages.
77 /// let mut sender: Sender<ping::Ping> = runtime
78 /// .make_sender("/test")
79 /// .expect("Can't create `Ping` sender");
80 ///
81 /// let on_run = runtime.on_run();
82 /// // Sends a single ping message.
83 /// let send_task = async move {
84 /// on_run.await;
85 /// let mut builder = sender.make_builder();
86 /// let mut ping = ping::PingBuilder::new(builder.fbb());
87 /// ping.add_value(10);
88 /// let ping = ping.finish();
89 /// builder.send(ping).expect("Can't send ping");
90 /// };
91 ///
92 /// let mut watcher: Watcher<pong::Pong> = runtime
93 /// .make_watcher("/test")
94 /// .expect("Can't create `Ping` watcher");
95 ///
96 /// // Listens to pong messages and prints them.
97 /// let receive_task = async move {
98 /// loop {
99 /// let pong = dbg!(watcher.next().await);
100 /// }
101 /// };
102 ///
103 /// runtime.spawn(async move {
104 /// futures::join!(send_task, receive_task);
105 /// std::future::pending().await
106 /// });
107 /// }); // Event loop starts runnning...
108 /// unreachable!("This can't be reached since no ExitHandle was made");
109 /// ```
110 ///
111 /// `run_with` can also borrow data from the outer scope that can be used in the async task.
112 ///
113 /// ```no_run
114 /// # use aos_events_shm_event_loop::*;
115 /// # use std::cell::Cell;
116 /// # use std::path::Path;
117 /// # use aos_configuration::read_config_from;
118 /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
119 /// let shared_data = Cell::new(971);
120 /// let shared_data = &shared_data;
121 /// let event_loop = ShmEventLoop::new(&config);
122 /// event_loop.run_with(|runtime| {
123 /// // Note how `Cell` is enough since the event loop is single threaded.
124 /// let t1 = async move {
125 /// shared_data.set(shared_data.get() + 1);
126 /// };
127 /// let t2 = async move {
128 /// shared_data.set(shared_data.get() + 1);
129 /// };
130 ///
131 /// runtime.spawn(async move {
132 /// futures::join!(t1, t2);
133 /// std::future::pending().await
134 /// });
135 /// });
136 /// unreachable!("This can't be reached since no ExitHandle was made");
137 /// ```
138 ///
139 /// However, the spawned future must outlive `run_with`.
140 ///
141 /// ```compile_fail
142 /// # use aos_events_shm_event_loop::*;
143 /// # use std::cell::Cell;
144 /// # use std::path::Path;
145 /// # use aos_configuration::read_config_from;
146 /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
147 /// let event_loop = ShmEventLoop::new(&config);
148 /// event_loop.run_with(|runtime| {
149 /// // ERROR: `shared_data` doesn't live long enough.
150 /// let shared_data = Cell::new(971);
151 /// let t1 = async {
152 /// shared_data.set(shared_data.get() + 1);
153 /// };
154 /// let t2 = async {
155 /// shared_data.set(shared_data.get() + 1);
156 /// };
157 ///
158 /// runtime.spawn(async move {
159 /// futures::join!(t1, t2);
160 /// std::future::pending().await
161 /// });
162 /// });
163 /// ```
164 pub fn run_with<'env, F>(mut self, fun: F)
165 where
Adam Snaiderde51c672023-09-28 21:55:43 -0700166 F: for<'event_loop> FnOnce(
167 &'event_loop mut Scoped<'event_loop, 'env, EventLoopRuntime<'event_loop>>,
168 ),
Adam Snaiderf2a45852023-06-24 14:23:32 -0700169 {
170 // SAFETY: The runtime and the event loop (i.e. self) both get destroyed at the end of this
171 // scope: first the runtime followed by the event loop. The runtime gets exclusive access
172 // during initialization in `fun` while the event loop remains unused.
173 let runtime = unsafe { EventLoopRuntime::new(self.inner.as_mut().event_loop_mut()) };
174 let mut runtime = Scoped::new(runtime);
175 fun(&mut runtime);
176 self.run();
177 }
178
179 /// Makes an exit handle.
180 ///
181 /// Awaiting on the exit handle is the only way to actually exit the event loop
182 /// task, other than panicking.
183 pub fn make_exit_handle(&mut self) -> ExitHandle {
184 self.inner.as_mut().MakeExitHandle().into()
185 }
186
187 /// Runs the spawned task to completion.
188 fn run(&mut self) {
189 self.inner.as_mut().Run();
190 }
191}
192
193/// A wrapper over some data that lives for the duration of a scope.
194///
195/// This struct ensures the existence of some `'env` which outlives `'scope`. In
196/// the presence of higher-ranked trait bounds which require types that work for
197/// any `'scope`, this allows the compiler to propagate lifetime bounds which
198/// outlive any of the possible `'scope`. This is the simplest way to express
199/// this concept to the compiler right now.
200pub struct Scoped<'scope, 'env: 'scope, T: 'scope> {
201 data: T,
202 _env: PhantomData<fn(&'env ()) -> &'env ()>,
203 _scope: PhantomData<fn(&'scope ()) -> &'scope ()>,
204}
205
206impl<'scope, 'env: 'scope, T: 'scope> Scoped<'scope, 'env, T> {
207 /// Makes the [`Scoped`].
208 pub fn new(data: T) -> Self {
209 Self {
210 data,
211 _env: PhantomData,
212 _scope: PhantomData,
213 }
214 }
215}
216
217impl<'scope, 'env: 'scope, T: 'scope> Deref for Scoped<'scope, 'env, T> {
218 type Target = T;
219 fn deref(&self) -> &Self::Target {
220 &self.data
221 }
222}
223
224impl<'scope, 'env: 'scope, T: 'scope> DerefMut for Scoped<'scope, 'env, T> {
225 fn deref_mut(&mut self) -> &mut Self::Target {
226 &mut self.data
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 use runfiles::Runfiles;
235
236 use aos_configuration::read_config_from;
237 use aos_events_event_loop_runtime::{Sender, Watcher};
Adam Snaiderc8b7e752023-09-14 14:27:53 -0700238 use aos_test_init::test_init;
Adam Snaiderf2a45852023-06-24 14:23:32 -0700239 use ping_rust_fbs::aos::examples as ping;
240 use std::sync::atomic::{AtomicUsize, Ordering};
241 use std::sync::Barrier;
242
243 /// Tests basic functionality with 2 threads operating their own event loops.
244 #[test]
245 fn smoke_test() {
246 test_init();
247
248 let r = Runfiles::create().unwrap();
249 let config =
250 read_config_from(&r.rlocation("org_frc971/aos/events/pingpong_config.json")).unwrap();
251
252 const VALUE: i32 = 971;
253 let barrier = Barrier::new(2);
254 let count = AtomicUsize::new(0);
255
256 std::thread::scope(|s| {
257 let config = &config;
258 let barrier = &barrier;
259 let count = &count;
260 s.spawn(move || {
261 let mut event_loop = ShmEventLoop::new(config);
262 let exit_handle = event_loop.make_exit_handle();
263 event_loop.run_with(|runtime| {
Adam Snaiderde51c672023-09-28 21:55:43 -0700264 runtime.spawn(async {
265 let mut watcher: Watcher<ping::Ping> = runtime
266 .make_watcher("/test")
267 .expect("Can't create `Ping` watcher");
268 runtime.on_run().await;
Adam Snaiderf2a45852023-06-24 14:23:32 -0700269 barrier.wait();
270 let ping = watcher.next().await;
271 assert_eq!(ping.message().unwrap().value(), VALUE);
272 count.fetch_add(1, Ordering::Relaxed);
273 exit_handle.exit().await
274 });
275 });
276 });
277 s.spawn(move || {
278 let mut event_loop = ShmEventLoop::new(config);
279 let exit_handle = event_loop.make_exit_handle();
280 event_loop.run_with(|runtime| {
Adam Snaiderde51c672023-09-28 21:55:43 -0700281 runtime.spawn(async {
282 let mut sender: Sender<ping::Ping> = runtime
283 .make_sender("/test")
284 .expect("Can't create `Ping` sender");
285 runtime.on_run().await;
Adam Snaiderf2a45852023-06-24 14:23:32 -0700286 // Give the waiting thread a chance to start.
287 barrier.wait();
288 let mut sender = sender.make_builder();
289 let mut ping = ping::PingBuilder::new(sender.fbb());
290 ping.add_value(VALUE);
291 let ping = ping.finish();
292 sender.send(ping).expect("send should succeed");
293 count.fetch_add(1, Ordering::Relaxed);
294 exit_handle.exit().await
295 });
296 });
297 });
298 });
299
300 assert_eq!(count.into_inner(), 2, "Not all event loops ran.");
301 }
302}