blob: 02d15a696529ea7226be05c12087b752d66f28b5 [file] [log] [blame]
Neil Balchc8f41ed2018-01-20 22:06:53 -08001#include "aos/events/simulated-event-loop.h"
Austin Schuh81fc9cc2019-02-02 23:25:47 -08002
3#include <algorithm>
4
James Kuszmaulc79768b2019-02-18 15:08:44 -08005#include "aos/logging/logging.h"
John Park33858a32018-09-28 23:05:48 -07006#include "aos/queue.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -08007
8namespace aos {
9namespace {
10class SimulatedFetcher : public RawFetcher {
11 public:
12 explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
13 ~SimulatedFetcher() {}
14
James Kuszmaulc79768b2019-02-18 15:08:44 -080015 bool FetchNext() override {
16 LOG(FATAL, "Simulated event loops do not support FetchNext.");
17 return false;
18 }
19
Neil Balchc8f41ed2018-01-20 22:06:53 -080020 bool Fetch() override {
21 if (index_ == queue_->index()) return false;
22
23 // Fetched message is newer
24 msg_ = queue_->latest_message();
25 index_ = queue_->index();
26 set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
27 return true;
28 }
29
30 private:
31 int64_t index_ = -1;
32 SimulatedQueue *queue_;
33 RefCountedBuffer msg_;
34};
35
36class SimulatedSender : public RawSender {
37 public:
Austin Schuh7267c532019-05-19 19:55:53 -070038 SimulatedSender(SimulatedQueue *queue, EventLoop *event_loop)
39 : queue_(queue), event_loop_(event_loop) {}
Neil Balchc8f41ed2018-01-20 22:06:53 -080040 ~SimulatedSender() {}
41
42 SendContext *GetContext() override {
43 return reinterpret_cast<SendContext *>(
44 RefCountedBuffer(queue_->size()).release());
45 }
46
47 void Free(SendContext *context) override {
48 RefCountedBuffer(reinterpret_cast<aos::Message *>(context));
49 }
50
51 bool Send(SendContext *context) override {
Austin Schuh7267c532019-05-19 19:55:53 -070052 {
53 ::aos::Message *aos_msg = reinterpret_cast<Message *>(context);
54 if (aos_msg->sent_time == monotonic_clock::min_time) {
55 aos_msg->sent_time = event_loop_->monotonic_now();
56 }
57 }
Neil Balchc8f41ed2018-01-20 22:06:53 -080058 queue_->Send(RefCountedBuffer(reinterpret_cast<aos::Message *>(context)));
59 return true; // Maybe false instead? :)
60 }
61
Austin Schuhd681bbd2019-02-02 12:03:32 -080062 const char *name() const override { return queue_->name(); }
63
Neil Balchc8f41ed2018-01-20 22:06:53 -080064 private:
65 SimulatedQueue *queue_;
Austin Schuh7267c532019-05-19 19:55:53 -070066 EventLoop *event_loop_;
Neil Balchc8f41ed2018-01-20 22:06:53 -080067};
68} // namespace
69
Austin Schuh7267c532019-05-19 19:55:53 -070070class SimulatedTimerHandler : public TimerHandler {
71 public:
72 explicit SimulatedTimerHandler(EventScheduler *scheduler,
73 ::std::function<void()> fn)
74 : scheduler_(scheduler), fn_(fn) {}
75 ~SimulatedTimerHandler() {}
76
77 void Setup(monotonic_clock::time_point base,
78 monotonic_clock::duration repeat_offset) override {
79 Disable();
80 const ::aos::monotonic_clock::time_point monotonic_now =
81 scheduler_->monotonic_now();
82 base_ = base;
83 repeat_offset_ = repeat_offset;
84 if (base < monotonic_now) {
85 token_ = scheduler_->Schedule(monotonic_now, [this]() { HandleEvent(); });
86 } else {
87 token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
88 }
89 }
90
91 void HandleEvent() {
92 const ::aos::monotonic_clock::time_point monotonic_now =
93 scheduler_->monotonic_now();
94 if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
95 // Reschedule.
96 while (base_ <= monotonic_now) base_ += repeat_offset_;
97 token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
98 } else {
99 token_ = EventScheduler::Token();
100 }
101 fn_();
102 }
103
104 void Disable() override {
105 if (token_ != EventScheduler::Token()) {
106 scheduler_->Deschedule(token_);
107 token_ = EventScheduler::Token();
108 }
109 }
110
111 private:
112 EventScheduler *scheduler_;
113 EventScheduler::Token token_;
114 // Function to be run on the thread
115 ::std::function<void()> fn_;
116 monotonic_clock::time_point base_;
117 monotonic_clock::duration repeat_offset_;
118};
119
120
121class SimulatedEventLoop : public EventLoop {
122 public:
123 explicit SimulatedEventLoop(
124 EventScheduler *scheduler,
125 ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
126 *queues)
127 : scheduler_(scheduler), queues_(queues) {}
128 ~SimulatedEventLoop() override {};
129
130 ::aos::monotonic_clock::time_point monotonic_now() override {
131 return scheduler_->monotonic_now();
132 }
133
134 ::std::unique_ptr<RawSender> MakeRawSender(
135 const ::std::string &path, const QueueTypeInfo &type) override;
136
137 ::std::unique_ptr<RawFetcher> MakeRawFetcher(
138 const ::std::string &path, const QueueTypeInfo &type) override;
139
140 void MakeRawWatcher(
141 const ::std::string &path, const QueueTypeInfo &type,
142 ::std::function<void(const ::aos::Message *message)> watcher) override;
143
144 TimerHandler *AddTimer(::std::function<void()> callback) override {
145 timers_.emplace_back(new SimulatedTimerHandler(scheduler_, callback));
146 return timers_.back().get();
147 }
148
149 void OnRun(::std::function<void()> on_run) override {
150 scheduler_->Schedule(scheduler_->monotonic_now(), on_run);
151 }
152 void Run() override {
153 set_is_running(true);
154 scheduler_->Run();
155 }
156 void Exit() override {
157 set_is_running(false);
158 scheduler_->Exit();
159 }
160
161 SimulatedQueue *GetSimulatedQueue(
162 const ::std::pair<::std::string, QueueTypeInfo> &);
163
164 void Take(const ::std::string &path);
165
166 private:
167 EventScheduler *scheduler_;
168 ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
169 *queues_;
170 ::std::vector<std::string> taken_;
171 ::std::vector<std::unique_ptr<TimerHandler>> timers_;
172};
173
Neil Balchc8f41ed2018-01-20 22:06:53 -0800174EventScheduler::Token EventScheduler::Schedule(
175 ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
176 return events_list_.emplace(time, callback);
177}
178
179void EventScheduler::Deschedule(EventScheduler::Token token) {
180 events_list_.erase(token);
181}
182
183void EventScheduler::Run() {
184 is_running_ = true;
185 while (!events_list_.empty() && is_running_) {
186 auto iter = events_list_.begin();
187 now_ = iter->first;
188 ::std::function<void()> callback = ::std::move(iter->second);
189 events_list_.erase(iter);
190 callback();
191 }
192}
193
194void SimulatedEventLoop::MakeRawWatcher(
195 const std::string &path, const QueueTypeInfo &type,
196 std::function<void(const aos::Message *message)> watcher) {
197 Take(path);
198 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
199 GetSimulatedQueue(key)->MakeRawWatcher(watcher);
200}
201
202std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
203 const std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800204 Take(path);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800205 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
Austin Schuh7267c532019-05-19 19:55:53 -0700206 return GetSimulatedQueue(key)->MakeRawSender(this);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800207}
208
209std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
210 const std::string &path, const QueueTypeInfo &type) {
Neil Balchc8f41ed2018-01-20 22:06:53 -0800211 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
212 return GetSimulatedQueue(key)->MakeRawFetcher();
213}
214
215SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
216 const ::std::pair<::std::string, QueueTypeInfo> &type) {
217 auto it = queues_->find(type);
218 if (it == queues_->end()) {
Austin Schuhd681bbd2019-02-02 12:03:32 -0800219 it =
220 queues_
221 ->emplace(type, SimulatedQueue(type.second, type.first, scheduler_))
222 .first;
Neil Balchc8f41ed2018-01-20 22:06:53 -0800223 }
224 return &it->second;
225}
226
227void SimulatedQueue::MakeRawWatcher(
228 std::function<void(const aos::Message *message)> watcher) {
229 watchers_.push_back(watcher);
230}
231
Austin Schuh7267c532019-05-19 19:55:53 -0700232std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender(
233 EventLoop *event_loop) {
234 return std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
Neil Balchc8f41ed2018-01-20 22:06:53 -0800235}
236
237std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
238 return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
239}
240
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800241void SimulatedEventLoop::Take(const ::std::string &path) {
Neil Balchc8f41ed2018-01-20 22:06:53 -0800242 if (is_running()) {
243 ::aos::Die("Cannot add new objects while running.\n");
244 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800245 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
246 if (prior != taken_.end()) {
247 ::aos::Die("%s is already being used.", path.c_str());
248 } else {
249 taken_.emplace_back(path);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800250 }
251}
Austin Schuh7267c532019-05-19 19:55:53 -0700252
253::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop() {
254 return ::std::unique_ptr<EventLoop>(
255 new SimulatedEventLoop(&scheduler_, &queues_));
256}
257
Neil Balchc8f41ed2018-01-20 22:06:53 -0800258} // namespace aos