blob: 5fe1bf5cff13b9a1aca2fb76b669646d148575f0 [file] [log] [blame]
Neil Balchc8f41ed2018-01-20 22:06:53 -08001#include "aos/events/simulated-event-loop.h"
Austin Schuh81fc9cc2019-02-02 23:25:47 -08002
3#include <algorithm>
4
James Kuszmaulc79768b2019-02-18 15:08:44 -08005#include "aos/logging/logging.h"
John Park33858a32018-09-28 23:05:48 -07006#include "aos/queue.h"
Neil Balchc8f41ed2018-01-20 22:06:53 -08007
8namespace aos {
9namespace {
10class SimulatedFetcher : public RawFetcher {
11 public:
12 explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
13 ~SimulatedFetcher() {}
14
James Kuszmaulc79768b2019-02-18 15:08:44 -080015 bool FetchNext() override {
16 LOG(FATAL, "Simulated event loops do not support FetchNext.");
17 return false;
18 }
19
Neil Balchc8f41ed2018-01-20 22:06:53 -080020 bool Fetch() override {
21 if (index_ == queue_->index()) return false;
22
23 // Fetched message is newer
24 msg_ = queue_->latest_message();
25 index_ = queue_->index();
26 set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
27 return true;
28 }
29
30 private:
31 int64_t index_ = -1;
32 SimulatedQueue *queue_;
33 RefCountedBuffer msg_;
34};
35
36class SimulatedSender : public RawSender {
37 public:
38 SimulatedSender(SimulatedQueue *queue) : queue_(queue) {}
39 ~SimulatedSender() {}
40
41 SendContext *GetContext() override {
42 return reinterpret_cast<SendContext *>(
43 RefCountedBuffer(queue_->size()).release());
44 }
45
46 void Free(SendContext *context) override {
47 RefCountedBuffer(reinterpret_cast<aos::Message *>(context));
48 }
49
50 bool Send(SendContext *context) override {
51 queue_->Send(RefCountedBuffer(reinterpret_cast<aos::Message *>(context)));
52 return true; // Maybe false instead? :)
53 }
54
Austin Schuhd681bbd2019-02-02 12:03:32 -080055 const char *name() const override { return queue_->name(); }
56
Neil Balchc8f41ed2018-01-20 22:06:53 -080057 private:
58 SimulatedQueue *queue_;
59};
60} // namespace
61
62EventScheduler::Token EventScheduler::Schedule(
63 ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
64 return events_list_.emplace(time, callback);
65}
66
67void EventScheduler::Deschedule(EventScheduler::Token token) {
68 events_list_.erase(token);
69}
70
71void EventScheduler::Run() {
72 is_running_ = true;
73 while (!events_list_.empty() && is_running_) {
74 auto iter = events_list_.begin();
75 now_ = iter->first;
76 ::std::function<void()> callback = ::std::move(iter->second);
77 events_list_.erase(iter);
78 callback();
79 }
80}
81
82void SimulatedEventLoop::MakeRawWatcher(
83 const std::string &path, const QueueTypeInfo &type,
84 std::function<void(const aos::Message *message)> watcher) {
85 Take(path);
86 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
87 GetSimulatedQueue(key)->MakeRawWatcher(watcher);
88}
89
90std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
91 const std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -080092 Take(path);
Neil Balchc8f41ed2018-01-20 22:06:53 -080093 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
94 return GetSimulatedQueue(key)->MakeRawSender();
95}
96
97std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
98 const std::string &path, const QueueTypeInfo &type) {
Neil Balchc8f41ed2018-01-20 22:06:53 -080099 ::std::pair<::std::string, QueueTypeInfo> key(path, type);
100 return GetSimulatedQueue(key)->MakeRawFetcher();
101}
102
103SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
104 const ::std::pair<::std::string, QueueTypeInfo> &type) {
105 auto it = queues_->find(type);
106 if (it == queues_->end()) {
Austin Schuhd681bbd2019-02-02 12:03:32 -0800107 it =
108 queues_
109 ->emplace(type, SimulatedQueue(type.second, type.first, scheduler_))
110 .first;
Neil Balchc8f41ed2018-01-20 22:06:53 -0800111 }
112 return &it->second;
113}
114
115void SimulatedQueue::MakeRawWatcher(
116 std::function<void(const aos::Message *message)> watcher) {
117 watchers_.push_back(watcher);
118}
119
120std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender() {
121 return std::unique_ptr<RawSender>(new SimulatedSender(this));
122}
123
124std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
125 return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
126}
127
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800128void SimulatedEventLoop::Take(const ::std::string &path) {
Neil Balchc8f41ed2018-01-20 22:06:53 -0800129 if (is_running()) {
130 ::aos::Die("Cannot add new objects while running.\n");
131 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800132 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
133 if (prior != taken_.end()) {
134 ::aos::Die("%s is already being used.", path.c_str());
135 } else {
136 taken_.emplace_back(path);
Neil Balchc8f41ed2018-01-20 22:06:53 -0800137 }
138}
139} // namespace aos