blob: 90b78cdab5063555d7801944edc2966664ff6db1 [file] [log] [blame]
#include "aos/events/simulated-event-loop.h"
#include "aos/common/queue.h"
namespace aos {
namespace {
class SimulatedFetcher : public RawFetcher {
public:
explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
~SimulatedFetcher() {}
bool Fetch() override {
if (index_ == queue_->index()) return false;
// Fetched message is newer
msg_ = queue_->latest_message();
index_ = queue_->index();
set_most_recent(reinterpret_cast<FetchValue *>(msg_.get()));
return true;
}
private:
int64_t index_ = -1;
SimulatedQueue *queue_;
RefCountedBuffer msg_;
};
class SimulatedSender : public RawSender {
public:
SimulatedSender(SimulatedQueue *queue) : queue_(queue) {}
~SimulatedSender() {}
SendContext *GetContext() override {
return reinterpret_cast<SendContext *>(
RefCountedBuffer(queue_->size()).release());
}
void Free(SendContext *context) override {
RefCountedBuffer(reinterpret_cast<aos::Message *>(context));
}
bool Send(SendContext *context) override {
queue_->Send(RefCountedBuffer(reinterpret_cast<aos::Message *>(context)));
return true; // Maybe false instead? :)
}
private:
SimulatedQueue *queue_;
};
} // namespace
EventScheduler::Token EventScheduler::Schedule(
::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
return events_list_.emplace(time, callback);
}
void EventScheduler::Deschedule(EventScheduler::Token token) {
events_list_.erase(token);
}
void EventScheduler::Run() {
is_running_ = true;
while (!events_list_.empty() && is_running_) {
auto iter = events_list_.begin();
now_ = iter->first;
::std::function<void()> callback = ::std::move(iter->second);
events_list_.erase(iter);
callback();
}
}
void SimulatedEventLoop::MakeRawWatcher(
const std::string &path, const QueueTypeInfo &type,
std::function<void(const aos::Message *message)> watcher) {
Take(path);
::std::pair<::std::string, QueueTypeInfo> key(path, type);
GetSimulatedQueue(key)->MakeRawWatcher(watcher);
}
std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
const std::string &path, const QueueTypeInfo &type) {
::std::pair<::std::string, QueueTypeInfo> key(path, type);
return GetSimulatedQueue(key)->MakeRawSender();
}
std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
const std::string &path, const QueueTypeInfo &type) {
Take(path);
::std::pair<::std::string, QueueTypeInfo> key(path, type);
return GetSimulatedQueue(key)->MakeRawFetcher();
}
SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
const ::std::pair<::std::string, QueueTypeInfo> &type) {
auto it = queues_->find(type);
if (it == queues_->end()) {
it = queues_->emplace(type, SimulatedQueue(type.second, scheduler_))
.first;
}
return &it->second;
}
void SimulatedQueue::MakeRawWatcher(
std::function<void(const aos::Message *message)> watcher) {
watchers_.push_back(watcher);
}
std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender() {
return std::unique_ptr<RawSender>(new SimulatedSender(this));
}
std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
}
void SimulatedEventLoop::Take(const std::string &path) {
if (is_running()) {
::aos::Die("Cannot add new objects while running.\n");
}
if (!taken_.emplace(path).second) {
::aos::Die("%s already has a listener / watcher.", path.c_str());
}
}
} // namespace aos