blob: 5a9960e51a600ff9a707cf67f12030c01e7b9e15 [file] [log] [blame]
Neil Balchc8f41ed2018-01-20 22:06:53 -08001#include "aos/events/simulated-event-loop.h"
Austin Schuh81fc9cc2019-02-02 23:25:47 -08002
3#include <algorithm>
4
James Kuszmaulc79768b2019-02-18 15:08:44 -08005#include "aos/logging/logging.h"
John Park33858a32018-09-28 23:05:48 -07006#include "aos/queue.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -08007
8namespace aos {
9namespace {
10class SimulatedFetcher : public RawFetcher {
11 public:
12 explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
13 ~SimulatedFetcher() {}
14
James Kuszmaulc79768b2019-02-18 15:08:44 -080015 bool FetchNext() override {
16 LOG(FATAL, "Simulated event loops do not support FetchNext.");
17 return false;
18 }
19
Neil Balchc8f41ed2018-01-20 22:06:53 -080020 bool Fetch() override {
21 if (index_ == queue_->index()) return false;
22
23 // Fetched message is newer
24 msg_ = queue_->latest_message();
25 index_ = queue_->index();
26 set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
27 return true;
28 }
29
30 private:
31 int64_t index_ = -1;
32 SimulatedQueue *queue_;
33 RefCountedBuffer msg_;
34};
35
36class SimulatedSender : public RawSender {
37 public:
Austin Schuh7267c532019-05-19 19:55:53 -070038 SimulatedSender(SimulatedQueue *queue, EventLoop *event_loop)
39 : queue_(queue), event_loop_(event_loop) {}
Neil Balchc8f41ed2018-01-20 22:06:53 -080040 ~SimulatedSender() {}
41
42 SendContext *GetContext() override {
43 return reinterpret_cast<SendContext *>(
44 RefCountedBuffer(queue_->size()).release());
45 }
46
47 void Free(SendContext *context) override {
48 RefCountedBuffer(reinterpret_cast<aos::Message *>(context));
49 }
50
51 bool Send(SendContext *context) override {
Austin Schuh7267c532019-05-19 19:55:53 -070052 {
53 ::aos::Message *aos_msg = reinterpret_cast<Message *>(context);
54 if (aos_msg->sent_time == monotonic_clock::min_time) {
55 aos_msg->sent_time = event_loop_->monotonic_now();
56 }
57 }
Neil Balchc8f41ed2018-01-20 22:06:53 -080058 queue_->Send(RefCountedBuffer(reinterpret_cast<aos::Message *>(context)));
59 return true; // Maybe false instead? :)
60 }
61
Austin Schuhd681bbd2019-02-02 12:03:32 -080062 const char *name() const override { return queue_->name(); }
63
Neil Balchc8f41ed2018-01-20 22:06:53 -080064 private:
65 SimulatedQueue *queue_;
Austin Schuh7267c532019-05-19 19:55:53 -070066 EventLoop *event_loop_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080067};
68} // namespace
69
Austin Schuh7267c532019-05-19 19:55:53 -070070class SimulatedTimerHandler : public TimerHandler {
71 public:
72 explicit SimulatedTimerHandler(EventScheduler *scheduler,
73 ::std::function<void()> fn)
74 : scheduler_(scheduler), fn_(fn) {}
75 ~SimulatedTimerHandler() {}
76
77 void Setup(monotonic_clock::time_point base,
78 monotonic_clock::duration repeat_offset) override {
79 Disable();
80 const ::aos::monotonic_clock::time_point monotonic_now =
81 scheduler_->monotonic_now();
82 base_ = base;
83 repeat_offset_ = repeat_offset;
84 if (base < monotonic_now) {
85 token_ = scheduler_->Schedule(monotonic_now, [this]() { HandleEvent(); });
86 } else {
87 token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
88 }
89 }
90
91 void HandleEvent() {
92 const ::aos::monotonic_clock::time_point monotonic_now =
93 scheduler_->monotonic_now();
94 if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
95 // Reschedule.
96 while (base_ <= monotonic_now) base_ += repeat_offset_;
97 token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
98 } else {
99 token_ = EventScheduler::Token();
100 }
101 fn_();
102 }
103
104 void Disable() override {
105 if (token_ != EventScheduler::Token()) {
106 scheduler_->Deschedule(token_);
107 token_ = EventScheduler::Token();
108 }
109 }
110
111 private:
112 EventScheduler *scheduler_;
113 EventScheduler::Token token_;
114 // Function to be run on the thread
115 ::std::function<void()> fn_;
116 monotonic_clock::time_point base_;
117 monotonic_clock::duration repeat_offset_;
118};
119
120
121class SimulatedEventLoop : public EventLoop {
122 public:
123 explicit SimulatedEventLoop(
124 EventScheduler *scheduler,
125 ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
126 *queues)
Austin Schuh44019f92019-05-19 19:58:27 -0700127 : scheduler_(scheduler), queues_(queues) {
128 scheduler_->AddRawEventLoop(this);
129 }
130 ~SimulatedEventLoop() override { scheduler_->RemoveRawEventLoop(this); };
Austin Schuh7267c532019-05-19 19:55:53 -0700131
132 ::aos::monotonic_clock::time_point monotonic_now() override {
133 return scheduler_->monotonic_now();
134 }
135
136 ::std::unique_ptr<RawSender> MakeRawSender(
137 const ::std::string &path, const QueueTypeInfo &type) override;
138
139 ::std::unique_ptr<RawFetcher> MakeRawFetcher(
140 const ::std::string &path, const QueueTypeInfo &type) override;
141
142 void MakeRawWatcher(
143 const ::std::string &path, const QueueTypeInfo &type,
144 ::std::function<void(const ::aos::Message *message)> watcher) override;
145
146 TimerHandler *AddTimer(::std::function<void()> callback) override {
147 timers_.emplace_back(new SimulatedTimerHandler(scheduler_, callback));
148 return timers_.back().get();
149 }
150
151 void OnRun(::std::function<void()> on_run) override {
152 scheduler_->Schedule(scheduler_->monotonic_now(), on_run);
153 }
154 void Run() override {
Austin Schuh44019f92019-05-19 19:58:27 -0700155 LOG(FATAL, "Run from the factory instead\n");
Austin Schuh7267c532019-05-19 19:55:53 -0700156 scheduler_->Run();
157 }
158 void Exit() override {
Austin Schuh7267c532019-05-19 19:55:53 -0700159 scheduler_->Exit();
160 }
161
162 SimulatedQueue *GetSimulatedQueue(
163 const ::std::pair<::std::string, QueueTypeInfo> &);
164
165 void Take(const ::std::string &path);
166
167 private:
168 EventScheduler *scheduler_;
169 ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
170 *queues_;
171 ::std::vector<std::string> taken_;
172 ::std::vector<std::unique_ptr<TimerHandler>> timers_;
173};
174
Neil Balchc8f41ed2018-01-20 22:06:53 -0800175EventScheduler::Token EventScheduler::Schedule(
176 ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
177 return events_list_.emplace(time, callback);
178}
179
180void EventScheduler::Deschedule(EventScheduler::Token token) {
181 events_list_.erase(token);
182}
183
Austin Schuh44019f92019-05-19 19:58:27 -0700184void EventScheduler::RunFor(monotonic_clock::duration duration) {
185 const ::aos::monotonic_clock::time_point end_time =
186 monotonic_now() + duration;
187 for (RawEventLoop *event_loop : raw_event_loops_) {
188 event_loop->set_is_running(true);
189 }
190 is_running_ = true;
191 while (!events_list_.empty() && is_running_) {
192 auto iter = events_list_.begin();
193 ::aos::monotonic_clock::time_point next_time = iter->first;
194 if (next_time > end_time) {
195 break;
196 }
197 now_ = iter->first;
198 ::std::function<void()> callback = ::std::move(iter->second);
199 events_list_.erase(iter);
200 callback();
201 }
202 now_ = end_time;
203 if (!is_running_) {
204 for (RawEventLoop *event_loop : raw_event_loops_) {
205 event_loop->set_is_running(false);
206 }
207 }
208}
209
Neil Balchc8f41ed2018-01-20 22:06:53 -0800210void EventScheduler::Run() {
Austin Schuh44019f92019-05-19 19:58:27 -0700211 for (RawEventLoop *event_loop : raw_event_loops_) {
212 event_loop->set_is_running(true);
213 }
Neil Balchc8f41ed2018-01-20 22:06:53 -0800214 is_running_ = true;
215 while (!events_list_.empty() && is_running_) {
216 auto iter = events_list_.begin();
217 now_ = iter->first;
218 ::std::function<void()> callback = ::std::move(iter->second);
219 events_list_.erase(iter);
220 callback();
221 }
Austin Schuh44019f92019-05-19 19:58:27 -0700222 if (!is_running_) {
223 for (RawEventLoop *event_loop : raw_event_loops_) {
224 event_loop->set_is_running(false);
225 }
226 }
Neil Balchc8f41ed2018-01-20 22:06:53 -0800227}
228
229void SimulatedEventLoop::MakeRawWatcher(
230 const std::string &path, const QueueTypeInfo &type,
231 std::function<void(const aos::Message *message)> watcher) {
232 Take(path);
233 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
234 GetSimulatedQueue(key)->MakeRawWatcher(watcher);
235}
236
237std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
238 const std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800239 Take(path);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800240 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
Austin Schuh7267c532019-05-19 19:55:53 -0700241 return GetSimulatedQueue(key)->MakeRawSender(this);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800242}
243
244std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
245 const std::string &path, const QueueTypeInfo &type) {
Neil Balchc8f41ed2018-01-20 22:06:53 -0800246 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
247 return GetSimulatedQueue(key)->MakeRawFetcher();
248}
249
250SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
251 const ::std::pair<::std::string, QueueTypeInfo> &type) {
252 auto it = queues_->find(type);
253 if (it == queues_->end()) {
Austin Schuhd681bbd2019-02-02 12:03:32 -0800254 it =
255 queues_
256 ->emplace(type, SimulatedQueue(type.second, type.first, scheduler_))
257 .first;
Neil Balchc8f41ed2018-01-20 22:06:53 -0800258 }
259 return &it->second;
260}
261
262void SimulatedQueue::MakeRawWatcher(
263 std::function<void(const aos::Message *message)> watcher) {
264 watchers_.push_back(watcher);
265}
266
Austin Schuh7267c532019-05-19 19:55:53 -0700267std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender(
268 EventLoop *event_loop) {
269 return std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
Neil Balchc8f41ed2018-01-20 22:06:53 -0800270}
271
272std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
273 return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
274}
275
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800276void SimulatedEventLoop::Take(const ::std::string &path) {
Neil Balchc8f41ed2018-01-20 22:06:53 -0800277 if (is_running()) {
278 ::aos::Die("Cannot add new objects while running.\n");
279 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800280 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
281 if (prior != taken_.end()) {
282 ::aos::Die("%s is already being used.", path.c_str());
283 } else {
284 taken_.emplace_back(path);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800285 }
286}
Austin Schuh7267c532019-05-19 19:55:53 -0700287
288::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop() {
289 return ::std::unique_ptr<EventLoop>(
290 new SimulatedEventLoop(&scheduler_, &queues_));
291}
292
Neil Balchc8f41ed2018-01-20 22:06:53 -0800293} // namespace aos