blob: 829ee8c8d2dcf665ddfd826e3143a8059c35a17f [file] [log] [blame]
Neil Balchc8f41ed2018-01-20 22:06:53 -08001#ifndef _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
2#define _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
3
4#include <map>
5#include <memory>
6#include <unordered_set>
7#include <utility>
8#include <vector>
9
10#include "aos/events/event-loop.h"
11
12namespace aos {
13
14class RefCountedBuffer {
15 public:
16 RefCountedBuffer() {}
17 ~RefCountedBuffer() { clear(); }
18
19 explicit RefCountedBuffer(aos::Message *data) : data_(data) {}
20
21 explicit RefCountedBuffer(size_t size) {
22 data_ = reinterpret_cast<uint8_t *>(malloc(kRefCountSize + size)) +
23 kRefCountSize;
24 // Initialize the allocated memory with an integer
25 *GetRefCount() = 1;
26 }
27
28 RefCountedBuffer(const RefCountedBuffer &other) {
29 data_ = other.data_;
30 ++*GetRefCount();
31 }
32
33 RefCountedBuffer(RefCountedBuffer &&other) { std::swap(data_, other.data_); }
34
35 RefCountedBuffer &operator=(const RefCountedBuffer &other) {
36 if (this == &other) return *this;
37 clear();
38 data_ = other.data_;
39 ++*GetRefCount();
40 return *this;
41 }
42
43 RefCountedBuffer &operator=(RefCountedBuffer &&other) {
44 if (this == &other) return *this;
45 std::swap(data_, other.data_);
46 return *this;
47 }
48
49 aos::Message *get() const { return static_cast<aos::Message *>(data_); }
50
51 aos::Message *release() {
52 auto tmp = get();
53 data_ = nullptr;
54 return tmp;
55 }
56
57 void clear() {
58 if (data_ != nullptr) {
59 if (--*GetRefCount() == 0) {
60 // Free memory block from the start of the allocated block
61 free(GetRefCount());
62 }
63 data_ = nullptr;
64 }
65 }
66
67 private:
68 void *data_ = nullptr;
69 // Qty. memory to be allocated to the ref counter
70 static constexpr size_t kRefCountSize = sizeof(int64_t);
71
72 int64_t *GetRefCount() {
73 // Need to cast the void* to an 8 bit long object (size of void* is
74 // technically 0)
75 return reinterpret_cast<int64_t *>(static_cast<void *>(
76 reinterpret_cast<uint8_t *>(data_) - kRefCountSize));
77 }
78};
79
80class EventScheduler {
81 public:
82 using QueueType = ::std::multimap<::aos::monotonic_clock::time_point,
83 ::std::function<void()>>;
84 using Token = QueueType::iterator;
85
86 // Schedule an event with a callback function
87 // Returns an iterator to the event
88 Token Schedule(::aos::monotonic_clock::time_point time,
89 ::std::function<void()> callback);
90
91 // Deschedule an event by its iterator
92 void Deschedule(Token token);
93
94 void Run();
95
96 void Exit() { is_running_ = false; }
97
98 ::aos::monotonic_clock::time_point now() { return now_; }
99
100 private:
101 ::aos::monotonic_clock::time_point now_ = ::aos::monotonic_clock::epoch();
102 QueueType events_list_;
103 bool is_running_ = false;
104};
105
106class SimulatedQueue {
107 public:
108 explicit SimulatedQueue(const QueueTypeInfo &type, EventScheduler *scheduler)
109 : type_(type), scheduler_(scheduler){};
110
111 std::unique_ptr<RawSender> MakeRawSender();
112
113 std::unique_ptr<RawFetcher> MakeRawFetcher();
114
115 void MakeRawWatcher(std::function<void(const aos::Message *message)> watcher);
116
117 void Send(RefCountedBuffer message) {
118 index_++;
119 latest_message_ = message;
120 for (auto &watcher : watchers_) {
121 scheduler_->Schedule(scheduler_->now(),
122 [watcher, message]() { watcher(message.get()); });
123 }
124 }
125
126 const RefCountedBuffer &latest_message() { return latest_message_; }
127
128 int64_t index() { return index_; }
129
130 size_t size() { return type_.size; }
131
132 private:
133 int64_t index_ = -1;
134 QueueTypeInfo type_;
135 ::std::vector<std::function<void(const aos::Message *message)>> watchers_;
136 RefCountedBuffer latest_message_;
137 EventScheduler *scheduler_;
138};
139
140class SimulatedTimerHandler : public TimerHandler {
141 public:
142 explicit SimulatedTimerHandler(EventScheduler *scheduler,
143 ::std::function<void()> fn)
144 : scheduler_(scheduler), fn_(fn) {}
145 ~SimulatedTimerHandler() {}
146
147 void Setup(monotonic_clock::time_point base,
148 monotonic_clock::duration repeat_offset) override {
149 Disable();
150 auto now = scheduler_->now();
151 base_ = base;
152 repeat_offset_ = repeat_offset;
153 if (base < now) {
154 token_ = scheduler_->Schedule(now, [this]() { HandleEvent(); });
155 } else {
156 token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
157 }
158 }
159
160 void HandleEvent() {
161 auto now = scheduler_->now();
162 if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
163 // Reschedule.
164 while (base_ <= now) base_ += repeat_offset_;
165 token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
166 } else {
167 token_ = EventScheduler::Token();
168 }
169 fn_();
170 }
171
172 void Disable() override {
173 if (token_ != EventScheduler::Token()) {
174 scheduler_->Deschedule(token_);
175 token_ = EventScheduler::Token();
176 }
177 }
178
179 private:
180 EventScheduler *scheduler_;
181 EventScheduler::Token token_;
182 // Function to be run on the thread
183 ::std::function<void()> fn_;
184 monotonic_clock::time_point base_;
185 monotonic_clock::duration repeat_offset_;
186};
187
188class SimulatedEventLoop : public EventLoop {
189 public:
190 explicit SimulatedEventLoop(
191 EventScheduler *scheduler,
192 ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
193 *queues) : scheduler_(scheduler), queues_(queues){};
194 ~SimulatedEventLoop() override{};
195
196 ::aos::monotonic_clock::time_point monotonic_now() override {
197 return scheduler_->now();
198 }
199
200 std::unique_ptr<RawSender> MakeRawSender(const std::string &path,
201 const QueueTypeInfo &type) override;
202
203 std::unique_ptr<RawFetcher> MakeRawFetcher(
204 const std::string &path, const QueueTypeInfo &type) override;
205
206 void MakeRawWatcher(
207 const std::string &path, const QueueTypeInfo &type,
208 std::function<void(const aos::Message *message)> watcher) override;
209
210 TimerHandler *AddTimer(::std::function<void()> callback) override {
211 timers_.emplace_back(
212 new SimulatedTimerHandler(scheduler_, callback));
213 return timers_.back().get();
214 }
215
216 void OnRun(std::function<void()> on_run) override {
217 scheduler_->Schedule(scheduler_->now(), on_run);
218 }
219 void Run() override {
220 set_is_running(true);
221 scheduler_->Run();
222 }
223 void Exit() override {
224 set_is_running(false);
225 scheduler_->Exit();
226 }
227
228 SimulatedQueue *GetSimulatedQueue(
229 const ::std::pair<::std::string, QueueTypeInfo> &);
230
231 void Take(const std::string &path);
232
233 private:
234 EventScheduler *scheduler_;
235 std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue> *queues_;
236 std::unordered_set<std::string> taken_;
237 std::vector<std::unique_ptr<TimerHandler>> timers_;
238};
239
240class SimulatedEventLoopFactory {
241 public:
242 std::unique_ptr<EventLoop> CreateEventLoop() {
243 return std::unique_ptr<EventLoop>(
244 new SimulatedEventLoop(&scheduler_, &queues_));
245 }
246
247 private:
248 EventScheduler scheduler_;
249 ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue> queues_;
250};
251} // namespace aos
252#endif //_AOS_EVENTS_TEST_EVENT_LOOP_H_