blob: a23b18c31a596bc522b50efbe3453fdad9b4b43f [file] [log] [blame]
Neil Balchc8f41ed2018-01-20 22:06:53 -08001#include "aos/events/simulated-event-loop.h"
Austin Schuh81fc9cc2019-02-02 23:25:47 -08002
3#include <algorithm>
4
John Park33858a32018-09-28 23:05:48 -07005#include "aos/queue.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -08006
7namespace aos {
8namespace {
9class SimulatedFetcher : public RawFetcher {
10 public:
11 explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
12 ~SimulatedFetcher() {}
13
14 bool Fetch() override {
15 if (index_ == queue_->index()) return false;
16
17 // Fetched message is newer
18 msg_ = queue_->latest_message();
19 index_ = queue_->index();
20 set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
21 return true;
22 }
23
24 private:
25 int64_t index_ = -1;
26 SimulatedQueue *queue_;
27 RefCountedBuffer msg_;
28};
29
30class SimulatedSender : public RawSender {
31 public:
32 SimulatedSender(SimulatedQueue *queue) : queue_(queue) {}
33 ~SimulatedSender() {}
34
35 SendContext *GetContext() override {
36 return reinterpret_cast<SendContext *>(
37 RefCountedBuffer(queue_->size()).release());
38 }
39
40 void Free(SendContext *context) override {
41 RefCountedBuffer(reinterpret_cast<aos::Message *>(context));
42 }
43
44 bool Send(SendContext *context) override {
45 queue_->Send(RefCountedBuffer(reinterpret_cast<aos::Message *>(context)));
46 return true; // Maybe false instead? :)
47 }
48
Austin Schuhd681bbd2019-02-02 12:03:32 -080049 const char *name() const override { return queue_->name(); }
50
Neil Balchc8f41ed2018-01-20 22:06:53 -080051 private:
52 SimulatedQueue *queue_;
53};
54} // namespace
55
56EventScheduler::Token EventScheduler::Schedule(
57 ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
58 return events_list_.emplace(time, callback);
59}
60
61void EventScheduler::Deschedule(EventScheduler::Token token) {
62 events_list_.erase(token);
63}
64
65void EventScheduler::Run() {
66 is_running_ = true;
67 while (!events_list_.empty() && is_running_) {
68 auto iter = events_list_.begin();
69 now_ = iter->first;
70 ::std::function<void()> callback = ::std::move(iter->second);
71 events_list_.erase(iter);
72 callback();
73 }
74}
75
76void SimulatedEventLoop::MakeRawWatcher(
77 const std::string &path, const QueueTypeInfo &type,
78 std::function<void(const aos::Message *message)> watcher) {
79 Take(path);
80 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
81 GetSimulatedQueue(key)->MakeRawWatcher(watcher);
82}
83
84std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
85 const std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -080086 Take(path);
Neil Balchc8f41ed2018-01-20 22:06:53 -080087 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
88 return GetSimulatedQueue(key)->MakeRawSender();
89}
90
91std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
92 const std::string &path, const QueueTypeInfo &type) {
Neil Balchc8f41ed2018-01-20 22:06:53 -080093 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
94 return GetSimulatedQueue(key)->MakeRawFetcher();
95}
96
97SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
98 const ::std::pair<::std::string, QueueTypeInfo> &type) {
99 auto it = queues_->find(type);
100 if (it == queues_->end()) {
Austin Schuhd681bbd2019-02-02 12:03:32 -0800101 it =
102 queues_
103 ->emplace(type, SimulatedQueue(type.second, type.first, scheduler_))
104 .first;
Neil Balchc8f41ed2018-01-20 22:06:53 -0800105 }
106 return &it->second;
107}
108
109void SimulatedQueue::MakeRawWatcher(
110 std::function<void(const aos::Message *message)> watcher) {
111 watchers_.push_back(watcher);
112}
113
114std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender() {
115 return std::unique_ptr<RawSender>(new SimulatedSender(this));
116}
117
118std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
119 return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
120}
121
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800122void SimulatedEventLoop::Take(const ::std::string &path) {
Neil Balchc8f41ed2018-01-20 22:06:53 -0800123 if (is_running()) {
124 ::aos::Die("Cannot add new objects while running.\n");
125 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800126 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
127 if (prior != taken_.end()) {
128 ::aos::Die("%s is already being used.", path.c_str());
129 } else {
130 taken_.emplace_back(path);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800131 }
132}
133} // namespace aos