blob: a83cfb1f7db89bd7561e5363409a5d382b6d0081 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#ifndef AOS_EVENTS_EVENT_LOOP_H_
2#define AOS_EVENTS_EVENT_LOOP_H_
3
4#include <atomic>
5#include <string>
James Kuszmaul3ae42262019-11-08 12:33:41 -08006#include <string_view>
Alex Perrycb7da4b2019-08-28 19:35:56 -07007
Alex Perrycb7da4b2019-08-28 19:35:56 -07008#include "aos/configuration.h"
9#include "aos/configuration_generated.h"
10#include "aos/flatbuffers.h"
11#include "aos/json_to_flatbuffer.h"
12#include "aos/time/time.h"
13#include "flatbuffers/flatbuffers.h"
14#include "glog/logging.h"
15
16namespace aos {
17
18// Struct available on Watchers and Fetchers with context about the current
19// message.
20struct Context {
21 // Time that the message was sent.
22 monotonic_clock::time_point monotonic_sent_time;
23 realtime_clock::time_point realtime_sent_time;
24 // Index in the queue.
25 uint32_t queue_index;
26 // Size of the data sent.
27 size_t size;
28 // Pointer to the data.
29 void *data;
30};
31
32// Raw version of fetcher. Contains a local variable that the fetcher will
33// update. This is used for reflection and as an interface to implement typed
34// fetchers.
35class RawFetcher {
36 public:
Austin Schuh54cf95f2019-11-29 13:14:18 -080037 RawFetcher(const Channel *channel) : channel_(channel) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -070038 virtual ~RawFetcher() {}
39
40 // Non-blocking fetch of the next message in the queue. Returns true if there
41 // was a new message and we got it.
42 virtual bool FetchNext() = 0;
43
44 // Non-blocking fetch of the latest message:
45 virtual bool Fetch() = 0;
46
47 // Returns a pointer to data in the most recent message, or nullptr if there
48 // is no message.
49 const void *most_recent_data() const { return data_; }
50
51 const Context &context() const { return context_; }
52
Austin Schuh54cf95f2019-11-29 13:14:18 -080053 const Channel *channel() const { return channel_; }
54
Alex Perrycb7da4b2019-08-28 19:35:56 -070055 protected:
56 RawFetcher(const RawFetcher &) = delete;
57 RawFetcher &operator=(const RawFetcher &) = delete;
58
59 void *data_ = nullptr;
60 Context context_;
Austin Schuh54cf95f2019-11-29 13:14:18 -080061 const Channel *channel_;
Alex Perrycb7da4b2019-08-28 19:35:56 -070062};
63
64// Raw version of sender. Sends a block of data. This is used for reflection
65// and as a building block to implement typed senders.
66class RawSender {
67 public:
Austin Schuh54cf95f2019-11-29 13:14:18 -080068 RawSender(const Channel *channel) : channel_(channel) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -070069 virtual ~RawSender() {}
70
71 // Sends a message without copying it. The users starts by copying up to
72 // size() bytes into the data backed by data(). They then call Send to send.
73 // Returns true on a successful send.
74 virtual void *data() = 0;
75 virtual size_t size() = 0;
76 virtual bool Send(size_t size) = 0;
77
78 // Sends a single block of data by copying it.
Austin Schuh4726ce92019-11-29 13:23:18 -080079 virtual bool Send(const void *data, size_t size) = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -070080
81 // Returns the name of this sender.
James Kuszmaul3ae42262019-11-08 12:33:41 -080082 virtual const std::string_view name() const = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -070083
Austin Schuh54cf95f2019-11-29 13:14:18 -080084 const Channel *channel() const { return channel_; }
85
Alex Perrycb7da4b2019-08-28 19:35:56 -070086 protected:
87 RawSender(const RawSender &) = delete;
88 RawSender &operator=(const RawSender &) = delete;
Austin Schuh54cf95f2019-11-29 13:14:18 -080089
90 const Channel *channel_;
Alex Perrycb7da4b2019-08-28 19:35:56 -070091};
92
93
94// Fetches the newest message from a channel.
95// This provides a polling based interface for channels.
96template <typename T>
97class Fetcher {
98 public:
99 Fetcher() {}
100
101 // Fetches the next message. Returns true if it fetched a new message. This
102 // method will only return messages sent after the Fetcher was created.
103 bool FetchNext() { return fetcher_->FetchNext(); }
104
105 // Fetches the most recent message. Returns true if it fetched a new message.
106 // This will return the latest message regardless of if it was sent before or
107 // after the fetcher was created.
108 bool Fetch() { return fetcher_->Fetch(); }
109
110 // Returns a pointer to the contained flatbuffer, or nullptr if there is no
111 // available message.
112 const T *get() const {
113 return fetcher_->most_recent_data() != nullptr
114 ? flatbuffers::GetRoot<T>(reinterpret_cast<const char *>(
115 fetcher_->most_recent_data()))
116 : nullptr;
117 }
118
119 // Returns the context holding timestamps and other metadata about the
120 // message.
121 const Context &context() const { return fetcher_->context(); }
122
123 const T &operator*() const { return *get(); }
124 const T *operator->() const { return get(); }
125
126 private:
127 friend class EventLoop;
128 Fetcher(::std::unique_ptr<RawFetcher> fetcher)
129 : fetcher_(::std::move(fetcher)) {}
130 ::std::unique_ptr<RawFetcher> fetcher_;
131};
132
133// Sends messages to a channel.
134template <typename T>
135class Sender {
136 public:
137 Sender() {}
138
139 // Represents a single message about to be sent to the queue.
140 // The lifecycle goes:
141 //
142 // Builder builder = sender.MakeBuilder();
143 // T::Builder t_builder = builder.MakeBuilder<T>();
144 // Populate(&t_builder);
145 // builder.Send(t_builder.Finish());
146 class Builder {
147 public:
148 Builder(RawSender *sender, void *data, size_t size)
149 : alloc_(data, size), fbb_(size, &alloc_), sender_(sender) {
150 fbb_.ForceDefaults(1);
151 }
152
153 flatbuffers::FlatBufferBuilder *fbb() { return &fbb_; }
154
155 template <typename T2>
156 typename T2::Builder MakeBuilder() {
157 return typename T2::Builder(fbb_);
158 }
159
160 bool Send(flatbuffers::Offset<T> offset) {
161 fbb_.Finish(offset);
162 return sender_->Send(fbb_.GetSize());
163 }
164
165 // CHECKs that this message was sent.
166 void CheckSent() { fbb_.Finished(); }
167
168 private:
169 PreallocatedAllocator alloc_;
170 flatbuffers::FlatBufferBuilder fbb_;
171 RawSender *sender_;
172 };
173
174 // Constructs an above builder.
175 Builder MakeBuilder();
176
177 // Returns the name of the underlying queue.
James Kuszmaul3ae42262019-11-08 12:33:41 -0800178 const std::string_view name() const { return sender_->name(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700179
180 private:
181 friend class EventLoop;
182 Sender(std::unique_ptr<RawSender> sender) : sender_(std::move(sender)) {}
183 std::unique_ptr<RawSender> sender_;
184};
185
186// Interface for timers
187class TimerHandler {
188 public:
189 virtual ~TimerHandler() {}
190
191 // Timer should sleep until base, base + offset, base + offset * 2, ...
192 // If repeat_offset isn't set, the timer only expires once.
193 virtual void Setup(monotonic_clock::time_point base,
194 monotonic_clock::duration repeat_offset =
195 ::aos::monotonic_clock::zero()) = 0;
196
197 // Stop future calls to callback().
198 virtual void Disable() = 0;
Austin Schuh1540c2f2019-11-29 21:59:29 -0800199
200 // Sets and gets the name of the timer. Set this if you want a descriptive
201 // name in the timing report.
202 void set_name(std::string_view name) { name_ = std::string(name); }
203 const std::string_view name() const { return name_; }
204
205 private:
206 std::string name_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700207};
208
209// Interface for phased loops. They are built on timers.
210class PhasedLoopHandler {
211 public:
212 virtual ~PhasedLoopHandler() {}
213
214 // Sets the interval and offset. Any changes to interval and offset only take
215 // effect when the handler finishes running.
216 virtual void set_interval_and_offset(
217 const monotonic_clock::duration interval,
218 const monotonic_clock::duration offset) = 0;
Austin Schuh1540c2f2019-11-29 21:59:29 -0800219
220 // Sets and gets the name of the timer. Set this if you want a descriptive
221 // name in the timing report.
222 void set_name(std::string_view name) { name_ = std::string(name); }
223 const std::string_view name() const { return name_; }
224
225 private:
226 std::string name_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700227};
228
229// TODO(austin): Ping pong example apps, and then start doing introspection.
230// TODO(austin): Timing reporter. Publish statistics on latencies of
231// handlers.
232class EventLoop {
233 public:
234 EventLoop(const Configuration *configuration)
235 : configuration_(configuration) {}
236
237 virtual ~EventLoop() {}
238
239 // Current time.
240 virtual monotonic_clock::time_point monotonic_now() = 0;
241 virtual realtime_clock::time_point realtime_now() = 0;
242
243 // Note, it is supported to create:
244 // multiple fetchers, and (one sender or one watcher) per <name, type>
245 // tuple.
246
247 // Makes a class that will always fetch the most recent value
248 // sent to the provided channel.
249 template <typename T>
James Kuszmaul3ae42262019-11-08 12:33:41 -0800250 Fetcher<T> MakeFetcher(const std::string_view channel_name) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700251 const Channel *channel = configuration::GetChannel(
252 configuration_, channel_name, T::GetFullyQualifiedName(), name());
253 CHECK(channel != nullptr)
254 << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
255 << T::GetFullyQualifiedName() << "\" } not found in config.";
256
257 return Fetcher<T>(MakeRawFetcher(channel));
258 }
259
260 // Makes class that allows constructing and sending messages to
261 // the provided channel.
262 template <typename T>
James Kuszmaul3ae42262019-11-08 12:33:41 -0800263 Sender<T> MakeSender(const std::string_view channel_name) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700264 const Channel *channel = configuration::GetChannel(
265 configuration_, channel_name, T::GetFullyQualifiedName(), name());
266 CHECK(channel != nullptr)
267 << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
268 << T::GetFullyQualifiedName() << "\" } not found in config.";
269
270 return Sender<T>(MakeRawSender(channel));
271 }
272
273 // This will watch messages sent to the provided channel.
274 //
275 // Watch is a functor that have a call signature like so:
276 // void Event(const MessageType& type);
277 //
278 // TODO(parker): Need to support ::std::bind. For now, use lambdas.
279 // TODO(austin): Do we need a functor? Or is a std::function good enough?
280 template <typename Watch>
James Kuszmaul3ae42262019-11-08 12:33:41 -0800281 void MakeWatcher(const std::string_view name, Watch &&w);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700282
283 // The passed in function will be called when the event loop starts.
284 // Use this to run code once the thread goes into "real-time-mode",
285 virtual void OnRun(::std::function<void()> on_run) = 0;
286
287 // Sets the name of the event loop. This is the application name.
James Kuszmaul3ae42262019-11-08 12:33:41 -0800288 virtual void set_name(const std::string_view name) = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700289 // Gets the name of the event loop.
James Kuszmaul3ae42262019-11-08 12:33:41 -0800290 virtual const std::string_view name() const = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700291
292 // Creates a timer that executes callback when the timer expires
293 // Returns a TimerHandle for configuration of the timer
294 virtual TimerHandler *AddTimer(::std::function<void()> callback) = 0;
295
296 // Creates a timer that executes callback periodically at the specified
297 // interval and offset. Returns a PhasedLoopHandler for interacting with the
298 // timer.
299 virtual PhasedLoopHandler *AddPhasedLoop(
300 ::std::function<void(int)> callback,
301 const monotonic_clock::duration interval,
302 const monotonic_clock::duration offset = ::std::chrono::seconds(0)) = 0;
303
304 // TODO(austin): OnExit
305
306 // Threadsafe.
307 bool is_running() const { return is_running_.load(); }
308
309 // Sets the scheduler priority to run the event loop at. This may not be
310 // called after we go into "real-time-mode".
311 virtual void SetRuntimeRealtimePriority(int priority) = 0;
312
313 // Fetches new messages from the provided channel (path, type). Note: this
314 // channel must be a member of the exact configuration object this was built
315 // with.
316 virtual std::unique_ptr<RawFetcher> MakeRawFetcher(
317 const Channel *channel) = 0;
318
319 // Will watch channel (name, type) for new messages
320 virtual void MakeRawWatcher(
321 const Channel *channel,
322 std::function<void(const Context &context, const void *message)>
323 watcher) = 0;
324
325 // Returns the context for the current message.
326 // TODO(austin): Fill out whatever is useful for timers.
327 const Context &context() const { return context_; }
328
329 // Returns the configuration that this event loop was built with.
330 const Configuration *configuration() const { return configuration_; }
331
Austin Schuh54cf95f2019-11-29 13:14:18 -0800332 // Will send new messages from channel (path, type).
333 virtual std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) = 0;
334
Alex Perrycb7da4b2019-08-28 19:35:56 -0700335 protected:
336 void set_is_running(bool value) { is_running_.store(value); }
337
Austin Schuh54cf95f2019-11-29 13:14:18 -0800338 // Validates that channel exists inside configuration_.
339 void ValidateChannel(const Channel *channel);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700340
341 private:
342 ::std::atomic<bool> is_running_{false};
343
344 // Context available for watchers.
345 Context context_;
346
347 const Configuration *configuration_;
348};
349
350} // namespace aos
351
352#include "aos/events/event_loop_tmpl.h"
353
354#endif // AOS_EVENTS_EVENT_LOOP_H