blob: f614517a76ceea286114f75a6a5bba3cfe62e411 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#ifndef AOS_EVENTS_EVENT_LOOP_H_
2#define AOS_EVENTS_EVENT_LOOP_H_
3
4#include <atomic>
5#include <string>
6
7#include "absl/strings/string_view.h"
8#include "aos/configuration.h"
9#include "aos/configuration_generated.h"
10#include "aos/flatbuffers.h"
11#include "aos/json_to_flatbuffer.h"
12#include "aos/time/time.h"
13#include "flatbuffers/flatbuffers.h"
14#include "glog/logging.h"
15
16namespace aos {
17
18// Struct available on Watchers and Fetchers with context about the current
19// message.
20struct Context {
21 // Time that the message was sent.
22 monotonic_clock::time_point monotonic_sent_time;
23 realtime_clock::time_point realtime_sent_time;
24 // Index in the queue.
25 uint32_t queue_index;
26 // Size of the data sent.
27 size_t size;
28 // Pointer to the data.
29 void *data;
30};
31
32// Raw version of fetcher. Contains a local variable that the fetcher will
33// update. This is used for reflection and as an interface to implement typed
34// fetchers.
35class RawFetcher {
36 public:
37 RawFetcher() {}
38 virtual ~RawFetcher() {}
39
40 // Non-blocking fetch of the next message in the queue. Returns true if there
41 // was a new message and we got it.
42 virtual bool FetchNext() = 0;
43
44 // Non-blocking fetch of the latest message:
45 virtual bool Fetch() = 0;
46
47 // Returns a pointer to data in the most recent message, or nullptr if there
48 // is no message.
49 const void *most_recent_data() const { return data_; }
50
51 const Context &context() const { return context_; }
52
53 protected:
54 RawFetcher(const RawFetcher &) = delete;
55 RawFetcher &operator=(const RawFetcher &) = delete;
56
57 void *data_ = nullptr;
58 Context context_;
59};
60
61// Raw version of sender. Sends a block of data. This is used for reflection
62// and as a building block to implement typed senders.
63class RawSender {
64 public:
65 RawSender() {}
66 virtual ~RawSender() {}
67
68 // Sends a message without copying it. The users starts by copying up to
69 // size() bytes into the data backed by data(). They then call Send to send.
70 // Returns true on a successful send.
71 virtual void *data() = 0;
72 virtual size_t size() = 0;
73 virtual bool Send(size_t size) = 0;
74
75 // Sends a single block of data by copying it.
76 virtual bool Send(void *data, size_t size) = 0;
77
78 // Returns the name of this sender.
79 virtual const absl::string_view name() const = 0;
80
81 protected:
82 RawSender(const RawSender &) = delete;
83 RawSender &operator=(const RawSender &) = delete;
84};
85
86
87// Fetches the newest message from a channel.
88// This provides a polling based interface for channels.
89template <typename T>
90class Fetcher {
91 public:
92 Fetcher() {}
93
94 // Fetches the next message. Returns true if it fetched a new message. This
95 // method will only return messages sent after the Fetcher was created.
96 bool FetchNext() { return fetcher_->FetchNext(); }
97
98 // Fetches the most recent message. Returns true if it fetched a new message.
99 // This will return the latest message regardless of if it was sent before or
100 // after the fetcher was created.
101 bool Fetch() { return fetcher_->Fetch(); }
102
103 // Returns a pointer to the contained flatbuffer, or nullptr if there is no
104 // available message.
105 const T *get() const {
106 return fetcher_->most_recent_data() != nullptr
107 ? flatbuffers::GetRoot<T>(reinterpret_cast<const char *>(
108 fetcher_->most_recent_data()))
109 : nullptr;
110 }
111
112 // Returns the context holding timestamps and other metadata about the
113 // message.
114 const Context &context() const { return fetcher_->context(); }
115
116 const T &operator*() const { return *get(); }
117 const T *operator->() const { return get(); }
118
119 private:
120 friend class EventLoop;
121 Fetcher(::std::unique_ptr<RawFetcher> fetcher)
122 : fetcher_(::std::move(fetcher)) {}
123 ::std::unique_ptr<RawFetcher> fetcher_;
124};
125
126// Sends messages to a channel.
127template <typename T>
128class Sender {
129 public:
130 Sender() {}
131
132 // Represents a single message about to be sent to the queue.
133 // The lifecycle goes:
134 //
135 // Builder builder = sender.MakeBuilder();
136 // T::Builder t_builder = builder.MakeBuilder<T>();
137 // Populate(&t_builder);
138 // builder.Send(t_builder.Finish());
139 class Builder {
140 public:
141 Builder(RawSender *sender, void *data, size_t size)
142 : alloc_(data, size), fbb_(size, &alloc_), sender_(sender) {
143 fbb_.ForceDefaults(1);
144 }
145
146 flatbuffers::FlatBufferBuilder *fbb() { return &fbb_; }
147
148 template <typename T2>
149 typename T2::Builder MakeBuilder() {
150 return typename T2::Builder(fbb_);
151 }
152
153 bool Send(flatbuffers::Offset<T> offset) {
154 fbb_.Finish(offset);
155 return sender_->Send(fbb_.GetSize());
156 }
157
158 // CHECKs that this message was sent.
159 void CheckSent() { fbb_.Finished(); }
160
161 private:
162 PreallocatedAllocator alloc_;
163 flatbuffers::FlatBufferBuilder fbb_;
164 RawSender *sender_;
165 };
166
167 // Constructs an above builder.
168 Builder MakeBuilder();
169
170 // Returns the name of the underlying queue.
171 const absl::string_view name() const { return sender_->name(); }
172
173 private:
174 friend class EventLoop;
175 Sender(std::unique_ptr<RawSender> sender) : sender_(std::move(sender)) {}
176 std::unique_ptr<RawSender> sender_;
177};
178
179// Interface for timers
180class TimerHandler {
181 public:
182 virtual ~TimerHandler() {}
183
184 // Timer should sleep until base, base + offset, base + offset * 2, ...
185 // If repeat_offset isn't set, the timer only expires once.
186 virtual void Setup(monotonic_clock::time_point base,
187 monotonic_clock::duration repeat_offset =
188 ::aos::monotonic_clock::zero()) = 0;
189
190 // Stop future calls to callback().
191 virtual void Disable() = 0;
192};
193
194// Interface for phased loops. They are built on timers.
195class PhasedLoopHandler {
196 public:
197 virtual ~PhasedLoopHandler() {}
198
199 // Sets the interval and offset. Any changes to interval and offset only take
200 // effect when the handler finishes running.
201 virtual void set_interval_and_offset(
202 const monotonic_clock::duration interval,
203 const monotonic_clock::duration offset) = 0;
204};
205
206// TODO(austin): Ping pong example apps, and then start doing introspection.
207// TODO(austin): Timing reporter. Publish statistics on latencies of
208// handlers.
209class EventLoop {
210 public:
211 EventLoop(const Configuration *configuration)
212 : configuration_(configuration) {}
213
214 virtual ~EventLoop() {}
215
216 // Current time.
217 virtual monotonic_clock::time_point monotonic_now() = 0;
218 virtual realtime_clock::time_point realtime_now() = 0;
219
220 // Note, it is supported to create:
221 // multiple fetchers, and (one sender or one watcher) per <name, type>
222 // tuple.
223
224 // Makes a class that will always fetch the most recent value
225 // sent to the provided channel.
226 template <typename T>
227 Fetcher<T> MakeFetcher(const absl::string_view channel_name) {
228 const Channel *channel = configuration::GetChannel(
229 configuration_, channel_name, T::GetFullyQualifiedName(), name());
230 CHECK(channel != nullptr)
231 << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
232 << T::GetFullyQualifiedName() << "\" } not found in config.";
233
234 return Fetcher<T>(MakeRawFetcher(channel));
235 }
236
237 // Makes class that allows constructing and sending messages to
238 // the provided channel.
239 template <typename T>
240 Sender<T> MakeSender(const absl::string_view channel_name) {
241 const Channel *channel = configuration::GetChannel(
242 configuration_, channel_name, T::GetFullyQualifiedName(), name());
243 CHECK(channel != nullptr)
244 << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
245 << T::GetFullyQualifiedName() << "\" } not found in config.";
246
247 return Sender<T>(MakeRawSender(channel));
248 }
249
250 // This will watch messages sent to the provided channel.
251 //
252 // Watch is a functor that have a call signature like so:
253 // void Event(const MessageType& type);
254 //
255 // TODO(parker): Need to support ::std::bind. For now, use lambdas.
256 // TODO(austin): Do we need a functor? Or is a std::function good enough?
257 template <typename Watch>
258 void MakeWatcher(const absl::string_view name, Watch &&w);
259
260 // The passed in function will be called when the event loop starts.
261 // Use this to run code once the thread goes into "real-time-mode",
262 virtual void OnRun(::std::function<void()> on_run) = 0;
263
264 // Sets the name of the event loop. This is the application name.
265 virtual void set_name(const absl::string_view name) = 0;
266 // Gets the name of the event loop.
267 virtual const absl::string_view name() const = 0;
268
269 // Creates a timer that executes callback when the timer expires
270 // Returns a TimerHandle for configuration of the timer
271 virtual TimerHandler *AddTimer(::std::function<void()> callback) = 0;
272
273 // Creates a timer that executes callback periodically at the specified
274 // interval and offset. Returns a PhasedLoopHandler for interacting with the
275 // timer.
276 virtual PhasedLoopHandler *AddPhasedLoop(
277 ::std::function<void(int)> callback,
278 const monotonic_clock::duration interval,
279 const monotonic_clock::duration offset = ::std::chrono::seconds(0)) = 0;
280
281 // TODO(austin): OnExit
282
283 // Threadsafe.
284 bool is_running() const { return is_running_.load(); }
285
286 // Sets the scheduler priority to run the event loop at. This may not be
287 // called after we go into "real-time-mode".
288 virtual void SetRuntimeRealtimePriority(int priority) = 0;
289
290 // Fetches new messages from the provided channel (path, type). Note: this
291 // channel must be a member of the exact configuration object this was built
292 // with.
293 virtual std::unique_ptr<RawFetcher> MakeRawFetcher(
294 const Channel *channel) = 0;
295
296 // Will watch channel (name, type) for new messages
297 virtual void MakeRawWatcher(
298 const Channel *channel,
299 std::function<void(const Context &context, const void *message)>
300 watcher) = 0;
301
302 // Returns the context for the current message.
303 // TODO(austin): Fill out whatever is useful for timers.
304 const Context &context() const { return context_; }
305
306 // Returns the configuration that this event loop was built with.
307 const Configuration *configuration() const { return configuration_; }
308
309 protected:
310 void set_is_running(bool value) { is_running_.store(value); }
311
312 // Will send new messages from channel (path, type).
313 virtual std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) = 0;
314
315 private:
316 ::std::atomic<bool> is_running_{false};
317
318 // Context available for watchers.
319 Context context_;
320
321 const Configuration *configuration_;
322};
323
324} // namespace aos
325
326#include "aos/events/event_loop_tmpl.h"
327
328#endif // AOS_EVENTS_EVENT_LOOP_H