blob: 3e387effb2345a3bae07601cd8800f1fc4464ffc [file] [log] [blame]
Adam Snaiderf2a45852023-06-24 14:23:32 -07001pub use aos_configuration::{Configuration, ConfigurationExt};
2pub use aos_events_event_loop_runtime::EventLoop;
3pub use aos_events_event_loop_runtime::{CppExitHandle, EventLoopRuntime, ExitHandle};
4
5use aos_configuration_fbs::aos::Configuration as RustConfiguration;
6use aos_flatbuffers::{transmute_table_to, Flatbuffer};
7use autocxx::WithinBox;
8use core::marker::PhantomData;
9use core::pin::Pin;
10use std::boxed::Box;
11use std::ops::{Deref, DerefMut};
12
13autocxx::include_cpp! (
14#include "aos/events/shm_event_loop.h"
15#include "aos/events/shm_event_loop_for_rust.h"
16
17safety!(unsafe)
18
19generate!("aos::ShmEventLoopForRust")
20
21extern_cpp_type!("aos::ExitHandle", crate::CppExitHandle)
22extern_cpp_type!("aos::Configuration", crate::Configuration)
23extern_cpp_type!("aos::EventLoop", crate::EventLoop)
24);
25
26/// A Rust-owned C++ `ShmEventLoop` object.
27pub struct ShmEventLoop<'config> {
28 inner: Pin<Box<ffi::aos::ShmEventLoopForRust>>,
29 _config: PhantomData<&'config Configuration>,
30}
31
32impl<'config> ShmEventLoop<'config> {
33 /// Creates a Rust-owned ShmEventLoop.
34 pub fn new(config: &'config impl Flatbuffer<RustConfiguration<'static>>) -> Self {
35 // SAFETY: The `_config` represents the lifetime of this pointer we're handing off to c++ to
36 // store.
37 let event_loop = unsafe {
38 ffi::aos::ShmEventLoopForRust::new(transmute_table_to::<Configuration>(
39 &config.message()._tab,
40 ))
41 }
42 .within_box();
43
44 Self {
45 inner: event_loop,
46 _config: PhantomData,
47 }
48 }
49
50 /// Provides a runtime to construct the application and runs the event loop.
51 ///
52 /// The runtime is the only way to interact with the event loop. It provides the functionality
53 /// to spawn a task, construct timers, watchers, fetchers, and so on.
54 ///
55 /// Making an [`EventLoopRuntime`] is tricky since the lifetime of the runtime is invariant
56 /// w.r.t the event loop. In other words, the runtime and the event loop must have the same
57 /// lifetime. By providing access to the runtime through an [`FnOnce`], we can guarantee
58 /// that the runtime and the event loop both have the same lifetime.
59 ///
60 /// # Examples
61 ///
62 /// A ping application might do something like the following
63 ///
64 /// ```no_run
65 /// # use aos_events_shm_event_loop::*;
66 /// use ping_rust_fbs::aos::examples as ping;
67 /// use pong_rust_fbs::aos::examples as pong;
68 /// use std::cell::Cell;
69 /// use std::path::Path;
70 /// use aos_configuration::read_config_from;
71 /// use aos_events_event_loop_runtime::{Sender, Watcher};
72 ///
73 /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
74 /// let event_loop = ShmEventLoop::new(&config);
75 /// event_loop.run_with(|runtime| {
76 /// // One task will send a ping, the other will listen to pong messages.
77 /// let mut sender: Sender<ping::Ping> = runtime
78 /// .make_sender("/test")
79 /// .expect("Can't create `Ping` sender");
80 ///
81 /// let on_run = runtime.on_run();
82 /// // Sends a single ping message.
83 /// let send_task = async move {
84 /// on_run.await;
85 /// let mut builder = sender.make_builder();
86 /// let mut ping = ping::PingBuilder::new(builder.fbb());
87 /// ping.add_value(10);
88 /// let ping = ping.finish();
89 /// builder.send(ping).expect("Can't send ping");
90 /// };
91 ///
92 /// let mut watcher: Watcher<pong::Pong> = runtime
93 /// .make_watcher("/test")
94 /// .expect("Can't create `Ping` watcher");
95 ///
96 /// // Listens to pong messages and prints them.
97 /// let receive_task = async move {
98 /// loop {
99 /// let pong = dbg!(watcher.next().await);
100 /// }
101 /// };
102 ///
103 /// runtime.spawn(async move {
104 /// futures::join!(send_task, receive_task);
105 /// std::future::pending().await
106 /// });
107 /// }); // Event loop starts runnning...
108 /// unreachable!("This can't be reached since no ExitHandle was made");
109 /// ```
110 ///
111 /// `run_with` can also borrow data from the outer scope that can be used in the async task.
112 ///
113 /// ```no_run
114 /// # use aos_events_shm_event_loop::*;
115 /// # use std::cell::Cell;
116 /// # use std::path::Path;
117 /// # use aos_configuration::read_config_from;
118 /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
119 /// let shared_data = Cell::new(971);
120 /// let shared_data = &shared_data;
121 /// let event_loop = ShmEventLoop::new(&config);
122 /// event_loop.run_with(|runtime| {
123 /// // Note how `Cell` is enough since the event loop is single threaded.
124 /// let t1 = async move {
125 /// shared_data.set(shared_data.get() + 1);
126 /// };
127 /// let t2 = async move {
128 /// shared_data.set(shared_data.get() + 1);
129 /// };
130 ///
131 /// runtime.spawn(async move {
132 /// futures::join!(t1, t2);
133 /// std::future::pending().await
134 /// });
135 /// });
136 /// unreachable!("This can't be reached since no ExitHandle was made");
137 /// ```
138 ///
139 /// However, the spawned future must outlive `run_with`.
140 ///
141 /// ```compile_fail
142 /// # use aos_events_shm_event_loop::*;
143 /// # use std::cell::Cell;
144 /// # use std::path::Path;
145 /// # use aos_configuration::read_config_from;
146 /// let config = read_config_from(Path::new("path/to/aos_config.json")).unwrap();
147 /// let event_loop = ShmEventLoop::new(&config);
148 /// event_loop.run_with(|runtime| {
149 /// // ERROR: `shared_data` doesn't live long enough.
150 /// let shared_data = Cell::new(971);
151 /// let t1 = async {
152 /// shared_data.set(shared_data.get() + 1);
153 /// };
154 /// let t2 = async {
155 /// shared_data.set(shared_data.get() + 1);
156 /// };
157 ///
158 /// runtime.spawn(async move {
159 /// futures::join!(t1, t2);
160 /// std::future::pending().await
161 /// });
162 /// });
163 /// ```
164 pub fn run_with<'env, F>(mut self, fun: F)
165 where
166 F: for<'event_loop> FnOnce(&mut Scoped<'event_loop, 'env, EventLoopRuntime<'event_loop>>),
167 {
168 // SAFETY: The runtime and the event loop (i.e. self) both get destroyed at the end of this
169 // scope: first the runtime followed by the event loop. The runtime gets exclusive access
170 // during initialization in `fun` while the event loop remains unused.
171 let runtime = unsafe { EventLoopRuntime::new(self.inner.as_mut().event_loop_mut()) };
172 let mut runtime = Scoped::new(runtime);
173 fun(&mut runtime);
174 self.run();
175 }
176
177 /// Makes an exit handle.
178 ///
179 /// Awaiting on the exit handle is the only way to actually exit the event loop
180 /// task, other than panicking.
181 pub fn make_exit_handle(&mut self) -> ExitHandle {
182 self.inner.as_mut().MakeExitHandle().into()
183 }
184
185 /// Runs the spawned task to completion.
186 fn run(&mut self) {
187 self.inner.as_mut().Run();
188 }
189}
190
191/// A wrapper over some data that lives for the duration of a scope.
192///
193/// This struct ensures the existence of some `'env` which outlives `'scope`. In
194/// the presence of higher-ranked trait bounds which require types that work for
195/// any `'scope`, this allows the compiler to propagate lifetime bounds which
196/// outlive any of the possible `'scope`. This is the simplest way to express
197/// this concept to the compiler right now.
198pub struct Scoped<'scope, 'env: 'scope, T: 'scope> {
199 data: T,
200 _env: PhantomData<fn(&'env ()) -> &'env ()>,
201 _scope: PhantomData<fn(&'scope ()) -> &'scope ()>,
202}
203
204impl<'scope, 'env: 'scope, T: 'scope> Scoped<'scope, 'env, T> {
205 /// Makes the [`Scoped`].
206 pub fn new(data: T) -> Self {
207 Self {
208 data,
209 _env: PhantomData,
210 _scope: PhantomData,
211 }
212 }
213}
214
215impl<'scope, 'env: 'scope, T: 'scope> Deref for Scoped<'scope, 'env, T> {
216 type Target = T;
217 fn deref(&self) -> &Self::Target {
218 &self.data
219 }
220}
221
222impl<'scope, 'env: 'scope, T: 'scope> DerefMut for Scoped<'scope, 'env, T> {
223 fn deref_mut(&mut self) -> &mut Self::Target {
224 &mut self.data
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 use runfiles::Runfiles;
233
234 use aos_configuration::read_config_from;
235 use aos_events_event_loop_runtime::{Sender, Watcher};
236 use aos_init::test_init;
237 use ping_rust_fbs::aos::examples as ping;
238 use std::sync::atomic::{AtomicUsize, Ordering};
239 use std::sync::Barrier;
240
241 /// Tests basic functionality with 2 threads operating their own event loops.
242 #[test]
243 fn smoke_test() {
244 test_init();
245
246 let r = Runfiles::create().unwrap();
247 let config =
248 read_config_from(&r.rlocation("org_frc971/aos/events/pingpong_config.json")).unwrap();
249
250 const VALUE: i32 = 971;
251 let barrier = Barrier::new(2);
252 let count = AtomicUsize::new(0);
253
254 std::thread::scope(|s| {
255 let config = &config;
256 let barrier = &barrier;
257 let count = &count;
258 s.spawn(move || {
259 let mut event_loop = ShmEventLoop::new(config);
260 let exit_handle = event_loop.make_exit_handle();
261 event_loop.run_with(|runtime| {
262 let mut watcher: Watcher<ping::Ping> = runtime
263 .make_watcher("/test")
264 .expect("Can't create `Ping` watcher");
265 let on_run = runtime.on_run();
266 runtime.spawn(async move {
267 on_run.await;
268 barrier.wait();
269 let ping = watcher.next().await;
270 assert_eq!(ping.message().unwrap().value(), VALUE);
271 count.fetch_add(1, Ordering::Relaxed);
272 exit_handle.exit().await
273 });
274 });
275 });
276 s.spawn(move || {
277 let mut event_loop = ShmEventLoop::new(config);
278 let exit_handle = event_loop.make_exit_handle();
279 event_loop.run_with(|runtime| {
280 let mut sender: Sender<ping::Ping> = runtime
281 .make_sender("/test")
282 .expect("Can't create `Ping` sender");
283 let on_run = runtime.on_run();
284 runtime.spawn(async move {
285 on_run.await;
286 // Give the waiting thread a chance to start.
287 barrier.wait();
288 let mut sender = sender.make_builder();
289 let mut ping = ping::PingBuilder::new(sender.fbb());
290 ping.add_value(VALUE);
291 let ping = ping.finish();
292 sender.send(ping).expect("send should succeed");
293 count.fetch_add(1, Ordering::Relaxed);
294 exit_handle.exit().await
295 });
296 });
297 });
298 });
299
300 assert_eq!(count.into_inner(), 2, "Not all event loops ran.");
301 }
302}