blob: 9410b9682647fab4b84c64a537ce297d43c210ff [file] [log] [blame]
Neil Balchc8f41ed2018-01-20 22:06:53 -08001#include "aos/events/simulated-event-loop.h"
Austin Schuh81fc9cc2019-02-02 23:25:47 -08002
3#include <algorithm>
4
James Kuszmaulc79768b2019-02-18 15:08:44 -08005#include "aos/logging/logging.h"
John Park33858a32018-09-28 23:05:48 -07006#include "aos/queue.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -08007
8namespace aos {
9namespace {
10class SimulatedFetcher : public RawFetcher {
11 public:
12 explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
13 ~SimulatedFetcher() {}
14
James Kuszmaulc79768b2019-02-18 15:08:44 -080015 bool FetchNext() override {
16 LOG(FATAL, "Simulated event loops do not support FetchNext.");
17 return false;
18 }
19
Neil Balchc8f41ed2018-01-20 22:06:53 -080020 bool Fetch() override {
21 if (index_ == queue_->index()) return false;
22
23 // Fetched message is newer
24 msg_ = queue_->latest_message();
25 index_ = queue_->index();
26 set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
27 return true;
28 }
29
30 private:
31 int64_t index_ = -1;
32 SimulatedQueue *queue_;
33 RefCountedBuffer msg_;
34};
35
36class SimulatedSender : public RawSender {
37 public:
Austin Schuh7267c532019-05-19 19:55:53 -070038 SimulatedSender(SimulatedQueue *queue, EventLoop *event_loop)
39 : queue_(queue), event_loop_(event_loop) {}
Neil Balchc8f41ed2018-01-20 22:06:53 -080040 ~SimulatedSender() {}
41
James Kuszmaulcd1db352019-05-26 16:42:29 -070042 aos::Message *GetMessage() override {
43 return RefCountedBuffer(queue_->size()).release();
Neil Balchc8f41ed2018-01-20 22:06:53 -080044 }
45
James Kuszmaulcd1db352019-05-26 16:42:29 -070046 void Free(aos::Message *msg) override { RefCountedBuffer tmp(msg); }
Neil Balchc8f41ed2018-01-20 22:06:53 -080047
James Kuszmaulcd1db352019-05-26 16:42:29 -070048 bool Send(aos::Message *msg) override {
Austin Schuh7267c532019-05-19 19:55:53 -070049 {
James Kuszmaulcd1db352019-05-26 16:42:29 -070050 if (msg->sent_time == monotonic_clock::min_time) {
51 msg->sent_time = event_loop_->monotonic_now();
Austin Schuh7267c532019-05-19 19:55:53 -070052 }
53 }
James Kuszmaulcd1db352019-05-26 16:42:29 -070054 queue_->Send(RefCountedBuffer(msg));
Neil Balchc8f41ed2018-01-20 22:06:53 -080055 return true; // Maybe false instead? :)
56 }
57
Austin Schuhd681bbd2019-02-02 12:03:32 -080058 const char *name() const override { return queue_->name(); }
59
Neil Balchc8f41ed2018-01-20 22:06:53 -080060 private:
61 SimulatedQueue *queue_;
Austin Schuh7267c532019-05-19 19:55:53 -070062 EventLoop *event_loop_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080063};
64} // namespace
65
Austin Schuh7267c532019-05-19 19:55:53 -070066class SimulatedTimerHandler : public TimerHandler {
67 public:
68 explicit SimulatedTimerHandler(EventScheduler *scheduler,
69 ::std::function<void()> fn)
70 : scheduler_(scheduler), fn_(fn) {}
71 ~SimulatedTimerHandler() {}
72
73 void Setup(monotonic_clock::time_point base,
74 monotonic_clock::duration repeat_offset) override {
75 Disable();
76 const ::aos::monotonic_clock::time_point monotonic_now =
77 scheduler_->monotonic_now();
78 base_ = base;
79 repeat_offset_ = repeat_offset;
80 if (base < monotonic_now) {
81 token_ = scheduler_->Schedule(monotonic_now, [this]() { HandleEvent(); });
82 } else {
83 token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
84 }
85 }
86
87 void HandleEvent() {
88 const ::aos::monotonic_clock::time_point monotonic_now =
89 scheduler_->monotonic_now();
90 if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
91 // Reschedule.
92 while (base_ <= monotonic_now) base_ += repeat_offset_;
93 token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
94 } else {
95 token_ = EventScheduler::Token();
96 }
97 fn_();
98 }
99
100 void Disable() override {
101 if (token_ != EventScheduler::Token()) {
102 scheduler_->Deschedule(token_);
103 token_ = EventScheduler::Token();
104 }
105 }
106
107 private:
108 EventScheduler *scheduler_;
109 EventScheduler::Token token_;
110 // Function to be run on the thread
111 ::std::function<void()> fn_;
112 monotonic_clock::time_point base_;
113 monotonic_clock::duration repeat_offset_;
114};
115
116
117class SimulatedEventLoop : public EventLoop {
118 public:
119 explicit SimulatedEventLoop(
120 EventScheduler *scheduler,
121 ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
122 *queues)
Austin Schuh44019f92019-05-19 19:58:27 -0700123 : scheduler_(scheduler), queues_(queues) {
124 scheduler_->AddRawEventLoop(this);
125 }
126 ~SimulatedEventLoop() override { scheduler_->RemoveRawEventLoop(this); };
Austin Schuh7267c532019-05-19 19:55:53 -0700127
128 ::aos::monotonic_clock::time_point monotonic_now() override {
129 return scheduler_->monotonic_now();
130 }
131
132 ::std::unique_ptr<RawSender> MakeRawSender(
133 const ::std::string &path, const QueueTypeInfo &type) override;
134
135 ::std::unique_ptr<RawFetcher> MakeRawFetcher(
136 const ::std::string &path, const QueueTypeInfo &type) override;
137
138 void MakeRawWatcher(
139 const ::std::string &path, const QueueTypeInfo &type,
140 ::std::function<void(const ::aos::Message *message)> watcher) override;
141
142 TimerHandler *AddTimer(::std::function<void()> callback) override {
143 timers_.emplace_back(new SimulatedTimerHandler(scheduler_, callback));
144 return timers_.back().get();
145 }
146
147 void OnRun(::std::function<void()> on_run) override {
148 scheduler_->Schedule(scheduler_->monotonic_now(), on_run);
149 }
150 void Run() override {
Austin Schuh44019f92019-05-19 19:58:27 -0700151 LOG(FATAL, "Run from the factory instead\n");
Austin Schuh7267c532019-05-19 19:55:53 -0700152 scheduler_->Run();
153 }
154 void Exit() override {
Austin Schuh7267c532019-05-19 19:55:53 -0700155 scheduler_->Exit();
156 }
157
158 SimulatedQueue *GetSimulatedQueue(
159 const ::std::pair<::std::string, QueueTypeInfo> &);
160
161 void Take(const ::std::string &path);
162
163 private:
164 EventScheduler *scheduler_;
165 ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
166 *queues_;
167 ::std::vector<std::string> taken_;
168 ::std::vector<std::unique_ptr<TimerHandler>> timers_;
169};
170
Neil Balchc8f41ed2018-01-20 22:06:53 -0800171EventScheduler::Token EventScheduler::Schedule(
172 ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
173 return events_list_.emplace(time, callback);
174}
175
176void EventScheduler::Deschedule(EventScheduler::Token token) {
177 events_list_.erase(token);
178}
179
Austin Schuh44019f92019-05-19 19:58:27 -0700180void EventScheduler::RunFor(monotonic_clock::duration duration) {
181 const ::aos::monotonic_clock::time_point end_time =
182 monotonic_now() + duration;
183 for (RawEventLoop *event_loop : raw_event_loops_) {
184 event_loop->set_is_running(true);
185 }
186 is_running_ = true;
187 while (!events_list_.empty() && is_running_) {
188 auto iter = events_list_.begin();
189 ::aos::monotonic_clock::time_point next_time = iter->first;
190 if (next_time > end_time) {
191 break;
192 }
193 now_ = iter->first;
194 ::std::function<void()> callback = ::std::move(iter->second);
195 events_list_.erase(iter);
196 callback();
197 }
198 now_ = end_time;
199 if (!is_running_) {
200 for (RawEventLoop *event_loop : raw_event_loops_) {
201 event_loop->set_is_running(false);
202 }
203 }
204}
205
Neil Balchc8f41ed2018-01-20 22:06:53 -0800206void EventScheduler::Run() {
Austin Schuh44019f92019-05-19 19:58:27 -0700207 for (RawEventLoop *event_loop : raw_event_loops_) {
208 event_loop->set_is_running(true);
209 }
Neil Balchc8f41ed2018-01-20 22:06:53 -0800210 is_running_ = true;
211 while (!events_list_.empty() && is_running_) {
212 auto iter = events_list_.begin();
213 now_ = iter->first;
214 ::std::function<void()> callback = ::std::move(iter->second);
215 events_list_.erase(iter);
216 callback();
217 }
Austin Schuh44019f92019-05-19 19:58:27 -0700218 if (!is_running_) {
219 for (RawEventLoop *event_loop : raw_event_loops_) {
220 event_loop->set_is_running(false);
221 }
222 }
Neil Balchc8f41ed2018-01-20 22:06:53 -0800223}
224
225void SimulatedEventLoop::MakeRawWatcher(
226 const std::string &path, const QueueTypeInfo &type,
227 std::function<void(const aos::Message *message)> watcher) {
228 Take(path);
229 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
230 GetSimulatedQueue(key)->MakeRawWatcher(watcher);
231}
232
233std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
234 const std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800235 Take(path);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800236 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
Austin Schuh7267c532019-05-19 19:55:53 -0700237 return GetSimulatedQueue(key)->MakeRawSender(this);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800238}
239
240std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
241 const std::string &path, const QueueTypeInfo &type) {
Neil Balchc8f41ed2018-01-20 22:06:53 -0800242 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
243 return GetSimulatedQueue(key)->MakeRawFetcher();
244}
245
246SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
247 const ::std::pair<::std::string, QueueTypeInfo> &type) {
248 auto it = queues_->find(type);
249 if (it == queues_->end()) {
Austin Schuhd681bbd2019-02-02 12:03:32 -0800250 it =
251 queues_
252 ->emplace(type, SimulatedQueue(type.second, type.first, scheduler_))
253 .first;
Neil Balchc8f41ed2018-01-20 22:06:53 -0800254 }
255 return &it->second;
256}
257
258void SimulatedQueue::MakeRawWatcher(
259 std::function<void(const aos::Message *message)> watcher) {
260 watchers_.push_back(watcher);
261}
262
Austin Schuh7267c532019-05-19 19:55:53 -0700263std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender(
264 EventLoop *event_loop) {
265 return std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
Neil Balchc8f41ed2018-01-20 22:06:53 -0800266}
267
268std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
269 return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
270}
271
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800272void SimulatedEventLoop::Take(const ::std::string &path) {
Neil Balchc8f41ed2018-01-20 22:06:53 -0800273 if (is_running()) {
274 ::aos::Die("Cannot add new objects while running.\n");
275 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800276 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
277 if (prior != taken_.end()) {
278 ::aos::Die("%s is already being used.", path.c_str());
279 } else {
280 taken_.emplace_back(path);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800281 }
282}
Austin Schuh7267c532019-05-19 19:55:53 -0700283
284::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop() {
285 return ::std::unique_ptr<EventLoop>(
286 new SimulatedEventLoop(&scheduler_, &queues_));
287}
288
Neil Balchc8f41ed2018-01-20 22:06:53 -0800289} // namespace aos