blob: 90b78cdab5063555d7801944edc2966664ff6db1 [file] [log] [blame]
Neil Balchc8f41ed2018-01-20 22:06:53 -08001#include "aos/events/simulated-event-loop.h"
2#include "aos/common/queue.h"
3
4namespace aos {
5namespace {
6class SimulatedFetcher : public RawFetcher {
7 public:
8 explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
9 ~SimulatedFetcher() {}
10
11 bool Fetch() override {
12 if (index_ == queue_->index()) return false;
13
14 // Fetched message is newer
15 msg_ = queue_->latest_message();
16 index_ = queue_->index();
17 set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
18 return true;
19 }
20
21 private:
22 int64_t index_ = -1;
23 SimulatedQueue *queue_;
24 RefCountedBuffer msg_;
25};
26
27class SimulatedSender : public RawSender {
28 public:
29 SimulatedSender(SimulatedQueue *queue) : queue_(queue) {}
30 ~SimulatedSender() {}
31
32 SendContext *GetContext() override {
33 return reinterpret_cast<SendContext *>(
34 RefCountedBuffer(queue_->size()).release());
35 }
36
37 void Free(SendContext *context) override {
38 RefCountedBuffer(reinterpret_cast<aos::Message *>(context));
39 }
40
41 bool Send(SendContext *context) override {
42 queue_->Send(RefCountedBuffer(reinterpret_cast<aos::Message *>(context)));
43 return true; // Maybe false instead? :)
44 }
45
46 private:
47 SimulatedQueue *queue_;
48};
49} // namespace
50
51EventScheduler::Token EventScheduler::Schedule(
52 ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
53 return events_list_.emplace(time, callback);
54}
55
56void EventScheduler::Deschedule(EventScheduler::Token token) {
57 events_list_.erase(token);
58}
59
60void EventScheduler::Run() {
61 is_running_ = true;
62 while (!events_list_.empty() && is_running_) {
63 auto iter = events_list_.begin();
64 now_ = iter->first;
65 ::std::function<void()> callback = ::std::move(iter->second);
66 events_list_.erase(iter);
67 callback();
68 }
69}
70
71void SimulatedEventLoop::MakeRawWatcher(
72 const std::string &path, const QueueTypeInfo &type,
73 std::function<void(const aos::Message *message)> watcher) {
74 Take(path);
75 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
76 GetSimulatedQueue(key)->MakeRawWatcher(watcher);
77}
78
79std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
80 const std::string &path, const QueueTypeInfo &type) {
81 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
82 return GetSimulatedQueue(key)->MakeRawSender();
83}
84
85std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
86 const std::string &path, const QueueTypeInfo &type) {
87 Take(path);
88 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
89 return GetSimulatedQueue(key)->MakeRawFetcher();
90}
91
92SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
93 const ::std::pair<::std::string, QueueTypeInfo> &type) {
94 auto it = queues_->find(type);
95 if (it == queues_->end()) {
96 it = queues_->emplace(type, SimulatedQueue(type.second, scheduler_))
97 .first;
98 }
99 return &it->second;
100}
101
102void SimulatedQueue::MakeRawWatcher(
103 std::function<void(const aos::Message *message)> watcher) {
104 watchers_.push_back(watcher);
105}
106
107std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender() {
108 return std::unique_ptr<RawSender>(new SimulatedSender(this));
109}
110
111std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
112 return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
113}
114
115void SimulatedEventLoop::Take(const std::string &path) {
116 if (is_running()) {
117 ::aos::Die("Cannot add new objects while running.\n");
118 }
119 if (!taken_.emplace(path).second) {
120 ::aos::Die("%s already has a listener / watcher.", path.c_str());
121 }
122}
123} // namespace aos