blob: fde7273c9af85cf5d791d51819c4c187efaaff0b [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#ifndef AOS_EVENTS_EVENT_LOOP_H_
2#define AOS_EVENTS_EVENT_LOOP_H_
3
4#include <atomic>
5#include <string>
James Kuszmaul3ae42262019-11-08 12:33:41 -08006#include <string_view>
Alex Perrycb7da4b2019-08-28 19:35:56 -07007
Alex Perrycb7da4b2019-08-28 19:35:56 -07008#include "aos/configuration.h"
9#include "aos/configuration_generated.h"
10#include "aos/flatbuffers.h"
11#include "aos/json_to_flatbuffer.h"
12#include "aos/time/time.h"
13#include "flatbuffers/flatbuffers.h"
14#include "glog/logging.h"
15
16namespace aos {
17
18// Struct available on Watchers and Fetchers with context about the current
19// message.
20struct Context {
21 // Time that the message was sent.
22 monotonic_clock::time_point monotonic_sent_time;
23 realtime_clock::time_point realtime_sent_time;
24 // Index in the queue.
25 uint32_t queue_index;
26 // Size of the data sent.
27 size_t size;
28 // Pointer to the data.
29 void *data;
30};
31
32// Raw version of fetcher. Contains a local variable that the fetcher will
33// update. This is used for reflection and as an interface to implement typed
34// fetchers.
35class RawFetcher {
36 public:
Austin Schuh54cf95f2019-11-29 13:14:18 -080037 RawFetcher(const Channel *channel) : channel_(channel) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -070038 virtual ~RawFetcher() {}
39
40 // Non-blocking fetch of the next message in the queue. Returns true if there
41 // was a new message and we got it.
42 virtual bool FetchNext() = 0;
43
44 // Non-blocking fetch of the latest message:
45 virtual bool Fetch() = 0;
46
47 // Returns a pointer to data in the most recent message, or nullptr if there
48 // is no message.
49 const void *most_recent_data() const { return data_; }
50
51 const Context &context() const { return context_; }
52
Austin Schuh54cf95f2019-11-29 13:14:18 -080053 const Channel *channel() const { return channel_; }
54
Alex Perrycb7da4b2019-08-28 19:35:56 -070055 protected:
56 RawFetcher(const RawFetcher &) = delete;
57 RawFetcher &operator=(const RawFetcher &) = delete;
58
59 void *data_ = nullptr;
60 Context context_;
Austin Schuh54cf95f2019-11-29 13:14:18 -080061 const Channel *channel_;
Alex Perrycb7da4b2019-08-28 19:35:56 -070062};
63
64// Raw version of sender. Sends a block of data. This is used for reflection
65// and as a building block to implement typed senders.
66class RawSender {
67 public:
Austin Schuh54cf95f2019-11-29 13:14:18 -080068 RawSender(const Channel *channel) : channel_(channel) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -070069 virtual ~RawSender() {}
70
71 // Sends a message without copying it. The users starts by copying up to
72 // size() bytes into the data backed by data(). They then call Send to send.
73 // Returns true on a successful send.
74 virtual void *data() = 0;
75 virtual size_t size() = 0;
76 virtual bool Send(size_t size) = 0;
77
78 // Sends a single block of data by copying it.
Austin Schuh4726ce92019-11-29 13:23:18 -080079 virtual bool Send(const void *data, size_t size) = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -070080
81 // Returns the name of this sender.
James Kuszmaul3ae42262019-11-08 12:33:41 -080082 virtual const std::string_view name() const = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -070083
Austin Schuh54cf95f2019-11-29 13:14:18 -080084 const Channel *channel() const { return channel_; }
85
Alex Perrycb7da4b2019-08-28 19:35:56 -070086 protected:
87 RawSender(const RawSender &) = delete;
88 RawSender &operator=(const RawSender &) = delete;
Austin Schuh54cf95f2019-11-29 13:14:18 -080089
90 const Channel *channel_;
Alex Perrycb7da4b2019-08-28 19:35:56 -070091};
92
93
94// Fetches the newest message from a channel.
95// This provides a polling based interface for channels.
96template <typename T>
97class Fetcher {
98 public:
99 Fetcher() {}
100
101 // Fetches the next message. Returns true if it fetched a new message. This
102 // method will only return messages sent after the Fetcher was created.
103 bool FetchNext() { return fetcher_->FetchNext(); }
104
105 // Fetches the most recent message. Returns true if it fetched a new message.
106 // This will return the latest message regardless of if it was sent before or
107 // after the fetcher was created.
108 bool Fetch() { return fetcher_->Fetch(); }
109
110 // Returns a pointer to the contained flatbuffer, or nullptr if there is no
111 // available message.
112 const T *get() const {
113 return fetcher_->most_recent_data() != nullptr
114 ? flatbuffers::GetRoot<T>(reinterpret_cast<const char *>(
115 fetcher_->most_recent_data()))
116 : nullptr;
117 }
118
119 // Returns the context holding timestamps and other metadata about the
120 // message.
121 const Context &context() const { return fetcher_->context(); }
122
123 const T &operator*() const { return *get(); }
124 const T *operator->() const { return get(); }
125
126 private:
127 friend class EventLoop;
128 Fetcher(::std::unique_ptr<RawFetcher> fetcher)
129 : fetcher_(::std::move(fetcher)) {}
130 ::std::unique_ptr<RawFetcher> fetcher_;
131};
132
133// Sends messages to a channel.
134template <typename T>
135class Sender {
136 public:
137 Sender() {}
138
139 // Represents a single message about to be sent to the queue.
140 // The lifecycle goes:
141 //
142 // Builder builder = sender.MakeBuilder();
143 // T::Builder t_builder = builder.MakeBuilder<T>();
144 // Populate(&t_builder);
145 // builder.Send(t_builder.Finish());
146 class Builder {
147 public:
148 Builder(RawSender *sender, void *data, size_t size)
149 : alloc_(data, size), fbb_(size, &alloc_), sender_(sender) {
150 fbb_.ForceDefaults(1);
151 }
152
153 flatbuffers::FlatBufferBuilder *fbb() { return &fbb_; }
154
155 template <typename T2>
156 typename T2::Builder MakeBuilder() {
157 return typename T2::Builder(fbb_);
158 }
159
160 bool Send(flatbuffers::Offset<T> offset) {
161 fbb_.Finish(offset);
162 return sender_->Send(fbb_.GetSize());
163 }
164
165 // CHECKs that this message was sent.
166 void CheckSent() { fbb_.Finished(); }
167
168 private:
169 PreallocatedAllocator alloc_;
170 flatbuffers::FlatBufferBuilder fbb_;
171 RawSender *sender_;
172 };
173
174 // Constructs an above builder.
175 Builder MakeBuilder();
176
177 // Returns the name of the underlying queue.
James Kuszmaul3ae42262019-11-08 12:33:41 -0800178 const std::string_view name() const { return sender_->name(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700179
180 private:
181 friend class EventLoop;
182 Sender(std::unique_ptr<RawSender> sender) : sender_(std::move(sender)) {}
183 std::unique_ptr<RawSender> sender_;
184};
185
186// Interface for timers
187class TimerHandler {
188 public:
189 virtual ~TimerHandler() {}
190
191 // Timer should sleep until base, base + offset, base + offset * 2, ...
192 // If repeat_offset isn't set, the timer only expires once.
193 virtual void Setup(monotonic_clock::time_point base,
194 monotonic_clock::duration repeat_offset =
195 ::aos::monotonic_clock::zero()) = 0;
196
197 // Stop future calls to callback().
198 virtual void Disable() = 0;
199};
200
201// Interface for phased loops. They are built on timers.
202class PhasedLoopHandler {
203 public:
204 virtual ~PhasedLoopHandler() {}
205
206 // Sets the interval and offset. Any changes to interval and offset only take
207 // effect when the handler finishes running.
208 virtual void set_interval_and_offset(
209 const monotonic_clock::duration interval,
210 const monotonic_clock::duration offset) = 0;
211};
212
213// TODO(austin): Ping pong example apps, and then start doing introspection.
214// TODO(austin): Timing reporter. Publish statistics on latencies of
215// handlers.
216class EventLoop {
217 public:
218 EventLoop(const Configuration *configuration)
219 : configuration_(configuration) {}
220
221 virtual ~EventLoop() {}
222
223 // Current time.
224 virtual monotonic_clock::time_point monotonic_now() = 0;
225 virtual realtime_clock::time_point realtime_now() = 0;
226
227 // Note, it is supported to create:
228 // multiple fetchers, and (one sender or one watcher) per <name, type>
229 // tuple.
230
231 // Makes a class that will always fetch the most recent value
232 // sent to the provided channel.
233 template <typename T>
James Kuszmaul3ae42262019-11-08 12:33:41 -0800234 Fetcher<T> MakeFetcher(const std::string_view channel_name) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700235 const Channel *channel = configuration::GetChannel(
236 configuration_, channel_name, T::GetFullyQualifiedName(), name());
237 CHECK(channel != nullptr)
238 << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
239 << T::GetFullyQualifiedName() << "\" } not found in config.";
240
241 return Fetcher<T>(MakeRawFetcher(channel));
242 }
243
244 // Makes class that allows constructing and sending messages to
245 // the provided channel.
246 template <typename T>
James Kuszmaul3ae42262019-11-08 12:33:41 -0800247 Sender<T> MakeSender(const std::string_view channel_name) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700248 const Channel *channel = configuration::GetChannel(
249 configuration_, channel_name, T::GetFullyQualifiedName(), name());
250 CHECK(channel != nullptr)
251 << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
252 << T::GetFullyQualifiedName() << "\" } not found in config.";
253
254 return Sender<T>(MakeRawSender(channel));
255 }
256
257 // This will watch messages sent to the provided channel.
258 //
259 // Watch is a functor that have a call signature like so:
260 // void Event(const MessageType& type);
261 //
262 // TODO(parker): Need to support ::std::bind. For now, use lambdas.
263 // TODO(austin): Do we need a functor? Or is a std::function good enough?
264 template <typename Watch>
James Kuszmaul3ae42262019-11-08 12:33:41 -0800265 void MakeWatcher(const std::string_view name, Watch &&w);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700266
267 // The passed in function will be called when the event loop starts.
268 // Use this to run code once the thread goes into "real-time-mode",
269 virtual void OnRun(::std::function<void()> on_run) = 0;
270
271 // Sets the name of the event loop. This is the application name.
James Kuszmaul3ae42262019-11-08 12:33:41 -0800272 virtual void set_name(const std::string_view name) = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700273 // Gets the name of the event loop.
James Kuszmaul3ae42262019-11-08 12:33:41 -0800274 virtual const std::string_view name() const = 0;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700275
276 // Creates a timer that executes callback when the timer expires
277 // Returns a TimerHandle for configuration of the timer
278 virtual TimerHandler *AddTimer(::std::function<void()> callback) = 0;
279
280 // Creates a timer that executes callback periodically at the specified
281 // interval and offset. Returns a PhasedLoopHandler for interacting with the
282 // timer.
283 virtual PhasedLoopHandler *AddPhasedLoop(
284 ::std::function<void(int)> callback,
285 const monotonic_clock::duration interval,
286 const monotonic_clock::duration offset = ::std::chrono::seconds(0)) = 0;
287
288 // TODO(austin): OnExit
289
290 // Threadsafe.
291 bool is_running() const { return is_running_.load(); }
292
293 // Sets the scheduler priority to run the event loop at. This may not be
294 // called after we go into "real-time-mode".
295 virtual void SetRuntimeRealtimePriority(int priority) = 0;
296
297 // Fetches new messages from the provided channel (path, type). Note: this
298 // channel must be a member of the exact configuration object this was built
299 // with.
300 virtual std::unique_ptr<RawFetcher> MakeRawFetcher(
301 const Channel *channel) = 0;
302
303 // Will watch channel (name, type) for new messages
304 virtual void MakeRawWatcher(
305 const Channel *channel,
306 std::function<void(const Context &context, const void *message)>
307 watcher) = 0;
308
309 // Returns the context for the current message.
310 // TODO(austin): Fill out whatever is useful for timers.
311 const Context &context() const { return context_; }
312
313 // Returns the configuration that this event loop was built with.
314 const Configuration *configuration() const { return configuration_; }
315
Austin Schuh54cf95f2019-11-29 13:14:18 -0800316 // Will send new messages from channel (path, type).
317 virtual std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) = 0;
318
Alex Perrycb7da4b2019-08-28 19:35:56 -0700319 protected:
320 void set_is_running(bool value) { is_running_.store(value); }
321
Austin Schuh54cf95f2019-11-29 13:14:18 -0800322 // Validates that channel exists inside configuration_.
323 void ValidateChannel(const Channel *channel);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700324
325 private:
326 ::std::atomic<bool> is_running_{false};
327
328 // Context available for watchers.
329 Context context_;
330
331 const Configuration *configuration_;
332};
333
334} // namespace aos
335
336#include "aos/events/event_loop_tmpl.h"
337
338#endif // AOS_EVENTS_EVENT_LOOP_H