blob: 04cf42c0c6b43137f0361a4363bae71e5145e54f [file] [log] [blame]
Adam Snaiderf2a45852023-06-24 14:23:32 -07001pub use aos_configuration::{Configuration, ConfigurationExt};
2pub use aos_events_event_loop_runtime::EventLoop;
3pub use aos_events_event_loop_runtime::{CppExitHandle, EventLoopRuntime, ExitHandle};
4
5use aos_configuration_fbs::aos::Configuration as RustConfiguration;
6use aos_flatbuffers::{transmute_table_to, Flatbuffer};
7use autocxx::WithinBox;
8use core::marker::PhantomData;
9use core::pin::Pin;
10use std::boxed::Box;
11use std::ops::{Deref, DerefMut};
12
13autocxx::include_cpp! (
14#include "aos/events/shm_event_loop.h"
15#include "aos/events/shm_event_loop_for_rust.h"
16
17safety!(unsafe)
18
19generate!("aos::ShmEventLoopForRust")
20
21extern_cpp_type!("aos::ExitHandle", crate::CppExitHandle)
22extern_cpp_type!("aos::Configuration", crate::Configuration)
23extern_cpp_type!("aos::EventLoop", crate::EventLoop)
24);
25
26/// A Rust-owned C++ `ShmEventLoop` object.
27pub struct ShmEventLoop<'config> {
28 inner: Pin<Box<ffi::aos::ShmEventLoopForRust>>,
29 _config: PhantomData<&'config Configuration>,
30}
31
32impl<'config> ShmEventLoop<'config> {
33 /// Creates a Rust-owned ShmEventLoop.
34 pub fn new(config: &'config impl Flatbuffer<RustConfiguration<'static>>) -> Self {
35 // SAFETY: The `_config` represents the lifetime of this pointer we're handing off to c++ to
36 // store.
37 let event_loop = unsafe {
38 ffi::aos::ShmEventLoopForRust::new(transmute_table_to::<Configuration>(
39 &config.message()._tab,
40 ))
41 }
42 .within_box();
43
44 Self {
45 inner: event_loop,
46 _config: PhantomData,
47 }
48 }
49
50 /// Provides a runtime to construct the application and runs the event loop.
51 ///
52 /// The runtime is the only way to interact with the event loop. It provides the functionality
53 /// to spawn a task, construct timers, watchers, fetchers, and so on.
54 ///
55 /// Making an [`EventLoopRuntime`] is tricky since the lifetime of the runtime is invariant
56 /// w.r.t the event loop. In other words, the runtime and the event loop must have the same
57 /// lifetime. By providing access to the runtime through an [`FnOnce`], we can guarantee
58 /// that the runtime and the event loop both have the same lifetime.
59 ///
60 /// # Examples
61 ///
62 /// A ping application might do something like the following
63 ///
64 /// ```no_run
65 /// # use aos_events_shm_event_loop::*;
66 /// use ping_rust_fbs::aos::examples as ping;
67 /// use pong_rust_fbs::aos::examples as pong;
Adam Snaidera3317c82023-10-02 16:02:36 -070068 /// use std::borrow::Borrow;
Adam Snaiderf2a45852023-06-24 14:23:32 -070069 /// use std::cell::Cell;
70 /// use std::path::Path;
71 /// use aos_configuration::read_config_from;
72 /// use aos_events_event_loop_runtime::{Sender, Watcher};
73 ///
74 /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
75 /// let event_loop = ShmEventLoop::new(&config);
76 /// event_loop.run_with(|runtime| {
77 /// // One task will send a ping, the other will listen to pong messages.
78 /// let mut sender: Sender<ping::Ping> = runtime
79 /// .make_sender("/test")
80 /// .expect("Can't create `Ping` sender");
81 ///
82 /// let on_run = runtime.on_run();
83 /// // Sends a single ping message.
84 /// let send_task = async move {
Adam Snaidera3317c82023-10-02 16:02:36 -070085 /// on_run.borrow().await;
Adam Snaiderf2a45852023-06-24 14:23:32 -070086 /// let mut builder = sender.make_builder();
87 /// let mut ping = ping::PingBuilder::new(builder.fbb());
88 /// ping.add_value(10);
89 /// let ping = ping.finish();
90 /// builder.send(ping).expect("Can't send ping");
91 /// };
92 ///
93 /// let mut watcher: Watcher<pong::Pong> = runtime
94 /// .make_watcher("/test")
95 /// .expect("Can't create `Ping` watcher");
96 ///
97 /// // Listens to pong messages and prints them.
98 /// let receive_task = async move {
99 /// loop {
100 /// let pong = dbg!(watcher.next().await);
101 /// }
102 /// };
103 ///
104 /// runtime.spawn(async move {
105 /// futures::join!(send_task, receive_task);
106 /// std::future::pending().await
107 /// });
108 /// }); // Event loop starts runnning...
109 /// unreachable!("This can't be reached since no ExitHandle was made");
110 /// ```
111 ///
112 /// `run_with` can also borrow data from the outer scope that can be used in the async task.
113 ///
114 /// ```no_run
115 /// # use aos_events_shm_event_loop::*;
116 /// # use std::cell::Cell;
117 /// # use std::path::Path;
118 /// # use aos_configuration::read_config_from;
119 /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
120 /// let shared_data = Cell::new(971);
121 /// let shared_data = &shared_data;
122 /// let event_loop = ShmEventLoop::new(&config);
123 /// event_loop.run_with(|runtime| {
124 /// // Note how `Cell` is enough since the event loop is single threaded.
125 /// let t1 = async move {
126 /// shared_data.set(shared_data.get() + 1);
127 /// };
128 /// let t2 = async move {
129 /// shared_data.set(shared_data.get() + 1);
130 /// };
131 ///
132 /// runtime.spawn(async move {
133 /// futures::join!(t1, t2);
134 /// std::future::pending().await
135 /// });
136 /// });
137 /// unreachable!("This can't be reached since no ExitHandle was made");
138 /// ```
139 ///
140 /// However, the spawned future must outlive `run_with`.
141 ///
142 /// ```compile_fail
143 /// # use aos_events_shm_event_loop::*;
144 /// # use std::cell::Cell;
145 /// # use std::path::Path;
146 /// # use aos_configuration::read_config_from;
147 /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
148 /// let event_loop = ShmEventLoop::new(&config);
149 /// event_loop.run_with(|runtime| {
150 /// // ERROR: `shared_data` doesn't live long enough.
151 /// let shared_data = Cell::new(971);
152 /// let t1 = async {
153 /// shared_data.set(shared_data.get() + 1);
154 /// };
155 /// let t2 = async {
156 /// shared_data.set(shared_data.get() + 1);
157 /// };
158 ///
159 /// runtime.spawn(async move {
160 /// futures::join!(t1, t2);
161 /// std::future::pending().await
162 /// });
163 /// });
164 /// ```
165 pub fn run_with<'env, F>(mut self, fun: F)
166 where
Adam Snaiderde51c672023-09-28 21:55:43 -0700167 F: for<'event_loop> FnOnce(
168 &'event_loop mut Scoped<'event_loop, 'env, EventLoopRuntime<'event_loop>>,
169 ),
Adam Snaiderf2a45852023-06-24 14:23:32 -0700170 {
171 // SAFETY: The runtime and the event loop (i.e. self) both get destroyed at the end of this
172 // scope: first the runtime followed by the event loop. The runtime gets exclusive access
173 // during initialization in `fun` while the event loop remains unused.
174 let runtime = unsafe { EventLoopRuntime::new(self.inner.as_mut().event_loop_mut()) };
175 let mut runtime = Scoped::new(runtime);
176 fun(&mut runtime);
177 self.run();
178 }
179
180 /// Makes an exit handle.
181 ///
182 /// Awaiting on the exit handle is the only way to actually exit the event loop
183 /// task, other than panicking.
184 pub fn make_exit_handle(&mut self) -> ExitHandle {
185 self.inner.as_mut().MakeExitHandle().into()
186 }
187
188 /// Runs the spawned task to completion.
189 fn run(&mut self) {
190 self.inner.as_mut().Run();
191 }
192}
193
194/// A wrapper over some data that lives for the duration of a scope.
195///
196/// This struct ensures the existence of some `'env` which outlives `'scope`. In
197/// the presence of higher-ranked trait bounds which require types that work for
198/// any `'scope`, this allows the compiler to propagate lifetime bounds which
199/// outlive any of the possible `'scope`. This is the simplest way to express
200/// this concept to the compiler right now.
201pub struct Scoped<'scope, 'env: 'scope, T: 'scope> {
202 data: T,
203 _env: PhantomData<fn(&'env ()) -> &'env ()>,
204 _scope: PhantomData<fn(&'scope ()) -> &'scope ()>,
205}
206
207impl<'scope, 'env: 'scope, T: 'scope> Scoped<'scope, 'env, T> {
208 /// Makes the [`Scoped`].
209 pub fn new(data: T) -> Self {
210 Self {
211 data,
212 _env: PhantomData,
213 _scope: PhantomData,
214 }
215 }
216}
217
218impl<'scope, 'env: 'scope, T: 'scope> Deref for Scoped<'scope, 'env, T> {
219 type Target = T;
220 fn deref(&self) -> &Self::Target {
221 &self.data
222 }
223}
224
225impl<'scope, 'env: 'scope, T: 'scope> DerefMut for Scoped<'scope, 'env, T> {
226 fn deref_mut(&mut self) -> &mut Self::Target {
227 &mut self.data
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234
235 use runfiles::Runfiles;
236
237 use aos_configuration::read_config_from;
238 use aos_events_event_loop_runtime::{Sender, Watcher};
Adam Snaiderc8b7e752023-09-14 14:27:53 -0700239 use aos_test_init::test_init;
Adam Snaiderf2a45852023-06-24 14:23:32 -0700240 use ping_rust_fbs::aos::examples as ping;
Adam Snaidera3317c82023-10-02 16:02:36 -0700241 use std::borrow::Borrow;
Adam Snaiderf2a45852023-06-24 14:23:32 -0700242 use std::sync::atomic::{AtomicUsize, Ordering};
243 use std::sync::Barrier;
244
245 /// Tests basic functionality with 2 threads operating their own event loops.
246 #[test]
247 fn smoke_test() {
248 test_init();
249
250 let r = Runfiles::create().unwrap();
251 let config =
252 read_config_from(&r.rlocation("org_frc971/aos/events/pingpong_config.json")).unwrap();
253
254 const VALUE: i32 = 971;
255 let barrier = Barrier::new(2);
256 let count = AtomicUsize::new(0);
257
258 std::thread::scope(|s| {
259 let config = &config;
260 let barrier = &barrier;
261 let count = &count;
262 s.spawn(move || {
263 let mut event_loop = ShmEventLoop::new(config);
264 let exit_handle = event_loop.make_exit_handle();
265 event_loop.run_with(|runtime| {
Adam Snaiderde51c672023-09-28 21:55:43 -0700266 runtime.spawn(async {
267 let mut watcher: Watcher<ping::Ping> = runtime
268 .make_watcher("/test")
269 .expect("Can't create `Ping` watcher");
Adam Snaidera3317c82023-10-02 16:02:36 -0700270 runtime.on_run().borrow().await;
Adam Snaiderf2a45852023-06-24 14:23:32 -0700271 barrier.wait();
272 let ping = watcher.next().await;
273 assert_eq!(ping.message().unwrap().value(), VALUE);
274 count.fetch_add(1, Ordering::Relaxed);
275 exit_handle.exit().await
276 });
277 });
278 });
279 s.spawn(move || {
280 let mut event_loop = ShmEventLoop::new(config);
281 let exit_handle = event_loop.make_exit_handle();
282 event_loop.run_with(|runtime| {
Adam Snaiderde51c672023-09-28 21:55:43 -0700283 runtime.spawn(async {
284 let mut sender: Sender<ping::Ping> = runtime
285 .make_sender("/test")
286 .expect("Can't create `Ping` sender");
Adam Snaidera3317c82023-10-02 16:02:36 -0700287 runtime.on_run().borrow().await;
Adam Snaiderf2a45852023-06-24 14:23:32 -0700288 // Give the waiting thread a chance to start.
289 barrier.wait();
290 let mut sender = sender.make_builder();
291 let mut ping = ping::PingBuilder::new(sender.fbb());
292 ping.add_value(VALUE);
293 let ping = ping.finish();
294 sender.send(ping).expect("send should succeed");
295 count.fetch_add(1, Ordering::Relaxed);
296 exit_handle.exit().await
297 });
298 });
299 });
300 });
301
302 assert_eq!(count.into_inner(), 2, "Not all event loops ran.");
303 }
304}