blob: e82c38103015aa43cf3519f48b9a421843db2d7d [file] [log] [blame]
Neil Balchc8f41ed2018-01-20 22:06:53 -08001#include "aos/events/simulated-event-loop.h"
John Park33858a32018-09-28 23:05:48 -07002#include "aos/queue.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -08003
4namespace aos {
5namespace {
6class SimulatedFetcher : public RawFetcher {
7 public:
8 explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
9 ~SimulatedFetcher() {}
10
11 bool Fetch() override {
12 if (index_ == queue_->index()) return false;
13
14 // Fetched message is newer
15 msg_ = queue_->latest_message();
16 index_ = queue_->index();
17 set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
18 return true;
19 }
20
21 private:
22 int64_t index_ = -1;
23 SimulatedQueue *queue_;
24 RefCountedBuffer msg_;
25};
26
27class SimulatedSender : public RawSender {
28 public:
29 SimulatedSender(SimulatedQueue *queue) : queue_(queue) {}
30 ~SimulatedSender() {}
31
32 SendContext *GetContext() override {
33 return reinterpret_cast<SendContext *>(
34 RefCountedBuffer(queue_->size()).release());
35 }
36
37 void Free(SendContext *context) override {
38 RefCountedBuffer(reinterpret_cast<aos::Message *>(context));
39 }
40
41 bool Send(SendContext *context) override {
42 queue_->Send(RefCountedBuffer(reinterpret_cast<aos::Message *>(context)));
43 return true; // Maybe false instead? :)
44 }
45
Austin Schuhd681bbd2019-02-02 12:03:32 -080046 const char *name() const override { return queue_->name(); }
47
Neil Balchc8f41ed2018-01-20 22:06:53 -080048 private:
49 SimulatedQueue *queue_;
50};
51} // namespace
52
53EventScheduler::Token EventScheduler::Schedule(
54 ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
55 return events_list_.emplace(time, callback);
56}
57
58void EventScheduler::Deschedule(EventScheduler::Token token) {
59 events_list_.erase(token);
60}
61
62void EventScheduler::Run() {
63 is_running_ = true;
64 while (!events_list_.empty() && is_running_) {
65 auto iter = events_list_.begin();
66 now_ = iter->first;
67 ::std::function<void()> callback = ::std::move(iter->second);
68 events_list_.erase(iter);
69 callback();
70 }
71}
72
73void SimulatedEventLoop::MakeRawWatcher(
74 const std::string &path, const QueueTypeInfo &type,
75 std::function<void(const aos::Message *message)> watcher) {
76 Take(path);
77 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
78 GetSimulatedQueue(key)->MakeRawWatcher(watcher);
79}
80
81std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
82 const std::string &path, const QueueTypeInfo &type) {
83 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
84 return GetSimulatedQueue(key)->MakeRawSender();
85}
86
87std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
88 const std::string &path, const QueueTypeInfo &type) {
89 Take(path);
90 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
91 return GetSimulatedQueue(key)->MakeRawFetcher();
92}
93
94SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
95 const ::std::pair<::std::string, QueueTypeInfo> &type) {
96 auto it = queues_->find(type);
97 if (it == queues_->end()) {
Austin Schuhd681bbd2019-02-02 12:03:32 -080098 it =
99 queues_
100 ->emplace(type, SimulatedQueue(type.second, type.first, scheduler_))
101 .first;
Neil Balchc8f41ed2018-01-20 22:06:53 -0800102 }
103 return &it->second;
104}
105
106void SimulatedQueue::MakeRawWatcher(
107 std::function<void(const aos::Message *message)> watcher) {
108 watchers_.push_back(watcher);
109}
110
111std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender() {
112 return std::unique_ptr<RawSender>(new SimulatedSender(this));
113}
114
115std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
116 return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
117}
118
119void SimulatedEventLoop::Take(const std::string &path) {
120 if (is_running()) {
121 ::aos::Die("Cannot add new objects while running.\n");
122 }
123 if (!taken_.emplace(path).second) {
124 ::aos::Die("%s already has a listener / watcher.", path.c_str());
125 }
126}
127} // namespace aos