blob: ce08a00e9858570a277233581b4b544453be452d [file] [log] [blame]
Neil Balchc8f41ed2018-01-20 22:06:53 -08001#include "aos/events/simulated-event-loop.h"
Austin Schuh81fc9cc2019-02-02 23:25:47 -08002
3#include <algorithm>
Austin Schuhbbce72d2019-05-26 15:11:46 -07004#include <deque>
Austin Schuh81fc9cc2019-02-02 23:25:47 -08005
James Kuszmaulc79768b2019-02-18 15:08:44 -08006#include "aos/logging/logging.h"
John Park33858a32018-09-28 23:05:48 -07007#include "aos/queue.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -08008
9namespace aos {
10namespace {
Neil Balchc8f41ed2018-01-20 22:06:53 -080011
12class SimulatedSender : public RawSender {
13 public:
Austin Schuh7267c532019-05-19 19:55:53 -070014 SimulatedSender(SimulatedQueue *queue, EventLoop *event_loop)
15 : queue_(queue), event_loop_(event_loop) {}
Neil Balchc8f41ed2018-01-20 22:06:53 -080016 ~SimulatedSender() {}
17
James Kuszmaulcd1db352019-05-26 16:42:29 -070018 aos::Message *GetMessage() override {
19 return RefCountedBuffer(queue_->size()).release();
Neil Balchc8f41ed2018-01-20 22:06:53 -080020 }
21
James Kuszmaulcd1db352019-05-26 16:42:29 -070022 void Free(aos::Message *msg) override { RefCountedBuffer tmp(msg); }
Neil Balchc8f41ed2018-01-20 22:06:53 -080023
James Kuszmaulcd1db352019-05-26 16:42:29 -070024 bool Send(aos::Message *msg) override {
Austin Schuh7267c532019-05-19 19:55:53 -070025 {
James Kuszmaulcd1db352019-05-26 16:42:29 -070026 if (msg->sent_time == monotonic_clock::min_time) {
27 msg->sent_time = event_loop_->monotonic_now();
Austin Schuh7267c532019-05-19 19:55:53 -070028 }
29 }
James Kuszmaulcd1db352019-05-26 16:42:29 -070030 queue_->Send(RefCountedBuffer(msg));
Austin Schuhbbce72d2019-05-26 15:11:46 -070031 return true;
Neil Balchc8f41ed2018-01-20 22:06:53 -080032 }
33
Austin Schuhd681bbd2019-02-02 12:03:32 -080034 const char *name() const override { return queue_->name(); }
35
Neil Balchc8f41ed2018-01-20 22:06:53 -080036 private:
37 SimulatedQueue *queue_;
Austin Schuh7267c532019-05-19 19:55:53 -070038 EventLoop *event_loop_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080039};
40} // namespace
41
Austin Schuhbbce72d2019-05-26 15:11:46 -070042class SimulatedFetcher : public RawFetcher {
43 public:
44 explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
45 ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
46
47 bool FetchNext() override {
48 if (msgs_.size() == 0) return false;
49
50 msg_ = msgs_.front();
51 msgs_.pop_front();
52 set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
53 return true;
54 }
55
56 bool Fetch() override {
57 if (msgs_.size() == 0) {
58 if (!msg_ && queue_->latest_message()) {
59 msg_ = queue_->latest_message();
60 set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
61 return true;
62 } else {
63 return false;
64 }
65 }
66
67 // We've had a message enqueued, so we don't need to go looking for the
68 // latest message from before we started.
69 msg_ = msgs_.back();
70 msgs_.clear();
71 set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
72 return true;
73 }
74
75 private:
76 friend class SimulatedQueue;
77
78 // Internal method for Simulation to add a message to the buffer.
79 void Enqueue(RefCountedBuffer buffer) {
80 msgs_.emplace_back(buffer);
81 }
82
83 SimulatedQueue *queue_;
84 RefCountedBuffer msg_;
85
86 // Messages queued up but not in use.
87 ::std::deque<RefCountedBuffer> msgs_;
88};
89
90
Austin Schuh7267c532019-05-19 19:55:53 -070091class SimulatedTimerHandler : public TimerHandler {
92 public:
93 explicit SimulatedTimerHandler(EventScheduler *scheduler,
94 ::std::function<void()> fn)
95 : scheduler_(scheduler), fn_(fn) {}
96 ~SimulatedTimerHandler() {}
97
98 void Setup(monotonic_clock::time_point base,
99 monotonic_clock::duration repeat_offset) override {
100 Disable();
101 const ::aos::monotonic_clock::time_point monotonic_now =
102 scheduler_->monotonic_now();
103 base_ = base;
104 repeat_offset_ = repeat_offset;
105 if (base < monotonic_now) {
106 token_ = scheduler_->Schedule(monotonic_now, [this]() { HandleEvent(); });
107 } else {
108 token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
109 }
110 }
111
112 void HandleEvent() {
113 const ::aos::monotonic_clock::time_point monotonic_now =
114 scheduler_->monotonic_now();
115 if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
116 // Reschedule.
117 while (base_ <= monotonic_now) base_ += repeat_offset_;
118 token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
119 } else {
120 token_ = EventScheduler::Token();
121 }
122 fn_();
123 }
124
125 void Disable() override {
126 if (token_ != EventScheduler::Token()) {
127 scheduler_->Deschedule(token_);
128 token_ = EventScheduler::Token();
129 }
130 }
131
132 private:
133 EventScheduler *scheduler_;
134 EventScheduler::Token token_;
135 // Function to be run on the thread
136 ::std::function<void()> fn_;
137 monotonic_clock::time_point base_;
138 monotonic_clock::duration repeat_offset_;
139};
140
141
142class SimulatedEventLoop : public EventLoop {
143 public:
144 explicit SimulatedEventLoop(
145 EventScheduler *scheduler,
146 ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
147 *queues)
Austin Schuh44019f92019-05-19 19:58:27 -0700148 : scheduler_(scheduler), queues_(queues) {
149 scheduler_->AddRawEventLoop(this);
150 }
151 ~SimulatedEventLoop() override { scheduler_->RemoveRawEventLoop(this); };
Austin Schuh7267c532019-05-19 19:55:53 -0700152
153 ::aos::monotonic_clock::time_point monotonic_now() override {
154 return scheduler_->monotonic_now();
155 }
156
157 ::std::unique_ptr<RawSender> MakeRawSender(
158 const ::std::string &path, const QueueTypeInfo &type) override;
159
160 ::std::unique_ptr<RawFetcher> MakeRawFetcher(
161 const ::std::string &path, const QueueTypeInfo &type) override;
162
163 void MakeRawWatcher(
164 const ::std::string &path, const QueueTypeInfo &type,
165 ::std::function<void(const ::aos::Message *message)> watcher) override;
166
167 TimerHandler *AddTimer(::std::function<void()> callback) override {
168 timers_.emplace_back(new SimulatedTimerHandler(scheduler_, callback));
169 return timers_.back().get();
170 }
171
172 void OnRun(::std::function<void()> on_run) override {
173 scheduler_->Schedule(scheduler_->monotonic_now(), on_run);
174 }
175 void Run() override {
Austin Schuh44019f92019-05-19 19:58:27 -0700176 LOG(FATAL, "Run from the factory instead\n");
Austin Schuh7267c532019-05-19 19:55:53 -0700177 scheduler_->Run();
178 }
179 void Exit() override {
Austin Schuh7267c532019-05-19 19:55:53 -0700180 scheduler_->Exit();
181 }
182
183 SimulatedQueue *GetSimulatedQueue(
184 const ::std::pair<::std::string, QueueTypeInfo> &);
185
186 void Take(const ::std::string &path);
187
188 private:
189 EventScheduler *scheduler_;
190 ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
191 *queues_;
192 ::std::vector<std::string> taken_;
193 ::std::vector<std::unique_ptr<TimerHandler>> timers_;
194};
195
Neil Balchc8f41ed2018-01-20 22:06:53 -0800196EventScheduler::Token EventScheduler::Schedule(
197 ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
198 return events_list_.emplace(time, callback);
199}
200
201void EventScheduler::Deschedule(EventScheduler::Token token) {
202 events_list_.erase(token);
203}
204
Austin Schuh44019f92019-05-19 19:58:27 -0700205void EventScheduler::RunFor(monotonic_clock::duration duration) {
206 const ::aos::monotonic_clock::time_point end_time =
207 monotonic_now() + duration;
208 for (RawEventLoop *event_loop : raw_event_loops_) {
209 event_loop->set_is_running(true);
210 }
211 is_running_ = true;
212 while (!events_list_.empty() && is_running_) {
213 auto iter = events_list_.begin();
214 ::aos::monotonic_clock::time_point next_time = iter->first;
215 if (next_time > end_time) {
216 break;
217 }
218 now_ = iter->first;
219 ::std::function<void()> callback = ::std::move(iter->second);
220 events_list_.erase(iter);
221 callback();
222 }
223 now_ = end_time;
224 if (!is_running_) {
225 for (RawEventLoop *event_loop : raw_event_loops_) {
226 event_loop->set_is_running(false);
227 }
228 }
229}
230
Neil Balchc8f41ed2018-01-20 22:06:53 -0800231void EventScheduler::Run() {
Austin Schuh44019f92019-05-19 19:58:27 -0700232 for (RawEventLoop *event_loop : raw_event_loops_) {
233 event_loop->set_is_running(true);
234 }
Neil Balchc8f41ed2018-01-20 22:06:53 -0800235 is_running_ = true;
236 while (!events_list_.empty() && is_running_) {
237 auto iter = events_list_.begin();
238 now_ = iter->first;
239 ::std::function<void()> callback = ::std::move(iter->second);
240 events_list_.erase(iter);
241 callback();
242 }
Austin Schuh44019f92019-05-19 19:58:27 -0700243 if (!is_running_) {
244 for (RawEventLoop *event_loop : raw_event_loops_) {
245 event_loop->set_is_running(false);
246 }
247 }
Neil Balchc8f41ed2018-01-20 22:06:53 -0800248}
249
250void SimulatedEventLoop::MakeRawWatcher(
251 const std::string &path, const QueueTypeInfo &type,
252 std::function<void(const aos::Message *message)> watcher) {
253 Take(path);
254 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
255 GetSimulatedQueue(key)->MakeRawWatcher(watcher);
256}
257
258std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
259 const std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800260 Take(path);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800261 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
Austin Schuh7267c532019-05-19 19:55:53 -0700262 return GetSimulatedQueue(key)->MakeRawSender(this);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800263}
264
265std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
266 const std::string &path, const QueueTypeInfo &type) {
Neil Balchc8f41ed2018-01-20 22:06:53 -0800267 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
268 return GetSimulatedQueue(key)->MakeRawFetcher();
269}
270
271SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
272 const ::std::pair<::std::string, QueueTypeInfo> &type) {
273 auto it = queues_->find(type);
274 if (it == queues_->end()) {
Austin Schuhd681bbd2019-02-02 12:03:32 -0800275 it =
276 queues_
277 ->emplace(type, SimulatedQueue(type.second, type.first, scheduler_))
278 .first;
Neil Balchc8f41ed2018-01-20 22:06:53 -0800279 }
280 return &it->second;
281}
282
283void SimulatedQueue::MakeRawWatcher(
Austin Schuhbbce72d2019-05-26 15:11:46 -0700284 ::std::function<void(const aos::Message *message)> watcher) {
Neil Balchc8f41ed2018-01-20 22:06:53 -0800285 watchers_.push_back(watcher);
286}
287
Austin Schuhbbce72d2019-05-26 15:11:46 -0700288::std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender(
Austin Schuh7267c532019-05-19 19:55:53 -0700289 EventLoop *event_loop) {
Austin Schuhbbce72d2019-05-26 15:11:46 -0700290 return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
Neil Balchc8f41ed2018-01-20 22:06:53 -0800291}
292
Austin Schuhbbce72d2019-05-26 15:11:46 -0700293::std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
294 ::std::unique_ptr<SimulatedFetcher> fetcher(new SimulatedFetcher(this));
295 fetchers_.push_back(fetcher.get());
296 return ::std::move(fetcher);
297}
298
299void SimulatedQueue::Send(RefCountedBuffer message) {
300 latest_message_ = message;
301 for (auto &watcher : watchers_) {
302 scheduler_->Schedule(scheduler_->monotonic_now(),
303 [watcher, message]() { watcher(message.get()); });
304 }
305 for (auto &fetcher : fetchers_) {
306 fetcher->Enqueue(message);
307 }
308}
309
310void SimulatedQueue::UnregisterFetcher(SimulatedFetcher *fetcher) {
311 fetchers_.erase(::std::find(fetchers_.begin(), fetchers_.end(), fetcher));
Neil Balchc8f41ed2018-01-20 22:06:53 -0800312}
313
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800314void SimulatedEventLoop::Take(const ::std::string &path) {
Neil Balchc8f41ed2018-01-20 22:06:53 -0800315 if (is_running()) {
316 ::aos::Die("Cannot add new objects while running.\n");
317 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800318 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
319 if (prior != taken_.end()) {
320 ::aos::Die("%s is already being used.", path.c_str());
321 } else {
322 taken_.emplace_back(path);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800323 }
324}
Austin Schuh7267c532019-05-19 19:55:53 -0700325
326::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop() {
327 return ::std::unique_ptr<EventLoop>(
328 new SimulatedEventLoop(&scheduler_, &queues_));
329}
330
Neil Balchc8f41ed2018-01-20 22:06:53 -0800331} // namespace aos