blob: b5c94bf5a5cd4bfde0a36da6dffbec43e0175c90 [file] [log] [blame]
Neil Balchc8f41ed2018-01-20 22:06:53 -08001#ifndef _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
2#define _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
3
4#include <map>
5#include <memory>
6#include <unordered_set>
7#include <utility>
8#include <vector>
9
10#include "aos/events/event-loop.h"
11
12namespace aos {
13
14class RefCountedBuffer {
15 public:
16 RefCountedBuffer() {}
17 ~RefCountedBuffer() { clear(); }
18
19 explicit RefCountedBuffer(aos::Message *data) : data_(data) {}
20
21 explicit RefCountedBuffer(size_t size) {
22 data_ = reinterpret_cast<uint8_t *>(malloc(kRefCountSize + size)) +
23 kRefCountSize;
24 // Initialize the allocated memory with an integer
25 *GetRefCount() = 1;
26 }
27
28 RefCountedBuffer(const RefCountedBuffer &other) {
29 data_ = other.data_;
30 ++*GetRefCount();
31 }
32
33 RefCountedBuffer(RefCountedBuffer &&other) { std::swap(data_, other.data_); }
34
35 RefCountedBuffer &operator=(const RefCountedBuffer &other) {
36 if (this == &other) return *this;
37 clear();
38 data_ = other.data_;
39 ++*GetRefCount();
40 return *this;
41 }
42
43 RefCountedBuffer &operator=(RefCountedBuffer &&other) {
44 if (this == &other) return *this;
45 std::swap(data_, other.data_);
46 return *this;
47 }
48
49 aos::Message *get() const { return static_cast<aos::Message *>(data_); }
50
51 aos::Message *release() {
52 auto tmp = get();
53 data_ = nullptr;
54 return tmp;
55 }
56
57 void clear() {
58 if (data_ != nullptr) {
59 if (--*GetRefCount() == 0) {
60 // Free memory block from the start of the allocated block
61 free(GetRefCount());
62 }
63 data_ = nullptr;
64 }
65 }
66
67 private:
68 void *data_ = nullptr;
69 // Qty. memory to be allocated to the ref counter
70 static constexpr size_t kRefCountSize = sizeof(int64_t);
71
72 int64_t *GetRefCount() {
73 // Need to cast the void* to an 8 bit long object (size of void* is
74 // technically 0)
75 return reinterpret_cast<int64_t *>(static_cast<void *>(
76 reinterpret_cast<uint8_t *>(data_) - kRefCountSize));
77 }
78};
79
80class EventScheduler {
81 public:
82 using QueueType = ::std::multimap<::aos::monotonic_clock::time_point,
83 ::std::function<void()>>;
84 using Token = QueueType::iterator;
85
86 // Schedule an event with a callback function
87 // Returns an iterator to the event
88 Token Schedule(::aos::monotonic_clock::time_point time,
89 ::std::function<void()> callback);
90
91 // Deschedule an event by its iterator
92 void Deschedule(Token token);
93
94 void Run();
95
96 void Exit() { is_running_ = false; }
97
98 ::aos::monotonic_clock::time_point now() { return now_; }
99
100 private:
101 ::aos::monotonic_clock::time_point now_ = ::aos::monotonic_clock::epoch();
102 QueueType events_list_;
103 bool is_running_ = false;
104};
105
106class SimulatedQueue {
107 public:
Austin Schuhd681bbd2019-02-02 12:03:32 -0800108 explicit SimulatedQueue(const QueueTypeInfo &type, const ::std::string &name,
109 EventScheduler *scheduler)
110 : type_(type), name_(name), scheduler_(scheduler){};
Neil Balchc8f41ed2018-01-20 22:06:53 -0800111
112 std::unique_ptr<RawSender> MakeRawSender();
113
114 std::unique_ptr<RawFetcher> MakeRawFetcher();
115
116 void MakeRawWatcher(std::function<void(const aos::Message *message)> watcher);
117
118 void Send(RefCountedBuffer message) {
119 index_++;
120 latest_message_ = message;
121 for (auto &watcher : watchers_) {
122 scheduler_->Schedule(scheduler_->now(),
123 [watcher, message]() { watcher(message.get()); });
124 }
125 }
126
127 const RefCountedBuffer &latest_message() { return latest_message_; }
128
129 int64_t index() { return index_; }
130
131 size_t size() { return type_.size; }
132
Austin Schuhd681bbd2019-02-02 12:03:32 -0800133 const char *name() const { return name_.c_str(); }
134
Neil Balchc8f41ed2018-01-20 22:06:53 -0800135 private:
136 int64_t index_ = -1;
137 QueueTypeInfo type_;
Austin Schuhd681bbd2019-02-02 12:03:32 -0800138 const ::std::string name_;
Neil Balchc8f41ed2018-01-20 22:06:53 -0800139 ::std::vector<std::function<void(const aos::Message *message)>> watchers_;
140 RefCountedBuffer latest_message_;
141 EventScheduler *scheduler_;
142};
143
144class SimulatedTimerHandler : public TimerHandler {
145 public:
146 explicit SimulatedTimerHandler(EventScheduler *scheduler,
147 ::std::function<void()> fn)
148 : scheduler_(scheduler), fn_(fn) {}
149 ~SimulatedTimerHandler() {}
150
151 void Setup(monotonic_clock::time_point base,
152 monotonic_clock::duration repeat_offset) override {
153 Disable();
154 auto now = scheduler_->now();
155 base_ = base;
156 repeat_offset_ = repeat_offset;
157 if (base < now) {
158 token_ = scheduler_->Schedule(now, [this]() { HandleEvent(); });
159 } else {
160 token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
161 }
162 }
163
164 void HandleEvent() {
165 auto now = scheduler_->now();
166 if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
167 // Reschedule.
168 while (base_ <= now) base_ += repeat_offset_;
169 token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
170 } else {
171 token_ = EventScheduler::Token();
172 }
173 fn_();
174 }
175
176 void Disable() override {
177 if (token_ != EventScheduler::Token()) {
178 scheduler_->Deschedule(token_);
179 token_ = EventScheduler::Token();
180 }
181 }
182
183 private:
184 EventScheduler *scheduler_;
185 EventScheduler::Token token_;
186 // Function to be run on the thread
187 ::std::function<void()> fn_;
188 monotonic_clock::time_point base_;
189 monotonic_clock::duration repeat_offset_;
190};
191
192class SimulatedEventLoop : public EventLoop {
193 public:
194 explicit SimulatedEventLoop(
195 EventScheduler *scheduler,
196 ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
197 *queues) : scheduler_(scheduler), queues_(queues){};
198 ~SimulatedEventLoop() override{};
199
200 ::aos::monotonic_clock::time_point monotonic_now() override {
201 return scheduler_->now();
202 }
203
204 std::unique_ptr<RawSender> MakeRawSender(const std::string &path,
205 const QueueTypeInfo &type) override;
206
207 std::unique_ptr<RawFetcher> MakeRawFetcher(
208 const std::string &path, const QueueTypeInfo &type) override;
209
210 void MakeRawWatcher(
211 const std::string &path, const QueueTypeInfo &type,
212 std::function<void(const aos::Message *message)> watcher) override;
213
214 TimerHandler *AddTimer(::std::function<void()> callback) override {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800215 timers_.emplace_back(new SimulatedTimerHandler(scheduler_, callback));
Neil Balchc8f41ed2018-01-20 22:06:53 -0800216 return timers_.back().get();
217 }
218
219 void OnRun(std::function<void()> on_run) override {
220 scheduler_->Schedule(scheduler_->now(), on_run);
221 }
222 void Run() override {
223 set_is_running(true);
224 scheduler_->Run();
225 }
226 void Exit() override {
227 set_is_running(false);
228 scheduler_->Exit();
229 }
230
231 SimulatedQueue *GetSimulatedQueue(
232 const ::std::pair<::std::string, QueueTypeInfo> &);
233
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800234 void Take(const ::std::string &path);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800235
236 private:
237 EventScheduler *scheduler_;
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800238 ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
239 *queues_;
240 ::std::vector<std::string> taken_;
241 ::std::vector<std::unique_ptr<TimerHandler>> timers_;
Neil Balchc8f41ed2018-01-20 22:06:53 -0800242};
243
244class SimulatedEventLoopFactory {
245 public:
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800246 ::std::unique_ptr<EventLoop> CreateEventLoop() {
247 return ::std::unique_ptr<EventLoop>(
Neil Balchc8f41ed2018-01-20 22:06:53 -0800248 new SimulatedEventLoop(&scheduler_, &queues_));
249 }
250
251 private:
252 EventScheduler scheduler_;
253 ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue> queues_;
254};
255} // namespace aos
256#endif //_AOS_EVENTS_TEST_EVENT_LOOP_H_