blob: 85ea21b53ccdc2f226d6f2080dadb8077d2cb19a [file] [log] [blame]
#include "aos/events/simulated-event-loop.h"
#include <algorithm>
#include <deque>
#include "aos/logging/logging.h"
#include "aos/queue.h"
#include "aos/testing/test_logging.h"
#include "aos/util/phased_loop.h"
namespace aos {
namespace {
class SimulatedSender : public RawSender {
public:
SimulatedSender(SimulatedQueue *queue, EventLoop *event_loop)
: queue_(queue), event_loop_(event_loop) {
testing::EnableTestLogging();
}
~SimulatedSender() {}
aos::Message *GetMessage() override {
return RefCountedBuffer(queue_->size()).release();
}
void Free(aos::Message *msg) override { RefCountedBuffer tmp(msg); }
bool Send(aos::Message *msg) override {
{
if (msg->sent_time == monotonic_clock::min_time) {
msg->sent_time = event_loop_->monotonic_now();
}
}
queue_->Send(RefCountedBuffer(msg));
return true;
}
const char *name() const override { return queue_->name(); }
private:
SimulatedQueue *queue_;
EventLoop *event_loop_;
};
} // namespace
class SimulatedFetcher : public RawFetcher {
public:
explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
bool FetchNext() override {
if (msgs_.size() == 0) return false;
msg_ = msgs_.front();
msgs_.pop_front();
set_most_recent(msg_.get());
return true;
}
bool Fetch() override {
if (msgs_.size() == 0) {
if (!msg_ && queue_->latest_message()) {
msg_ = queue_->latest_message();
set_most_recent(msg_.get());
return true;
} else {
return false;
}
}
// We've had a message enqueued, so we don't need to go looking for the
// latest message from before we started.
msg_ = msgs_.back();
msgs_.clear();
set_most_recent(msg_.get());
return true;
}
private:
friend class SimulatedQueue;
// Internal method for Simulation to add a message to the buffer.
void Enqueue(RefCountedBuffer buffer) {
msgs_.emplace_back(buffer);
}
SimulatedQueue *queue_;
RefCountedBuffer msg_;
// Messages queued up but not in use.
::std::deque<RefCountedBuffer> msgs_;
};
class SimulatedTimerHandler : public TimerHandler {
public:
explicit SimulatedTimerHandler(EventScheduler *scheduler,
::std::function<void()> fn)
: scheduler_(scheduler), fn_(fn) {}
~SimulatedTimerHandler() {}
void Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) override {
Disable();
const ::aos::monotonic_clock::time_point monotonic_now =
scheduler_->monotonic_now();
base_ = base;
repeat_offset_ = repeat_offset;
if (base < monotonic_now) {
token_ = scheduler_->Schedule(monotonic_now, [this]() { HandleEvent(); });
} else {
token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
}
}
void HandleEvent() {
const ::aos::monotonic_clock::time_point monotonic_now =
scheduler_->monotonic_now();
if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
// Reschedule.
while (base_ <= monotonic_now) base_ += repeat_offset_;
token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
} else {
token_ = EventScheduler::Token();
}
fn_();
}
void Disable() override {
if (token_ != EventScheduler::Token()) {
scheduler_->Deschedule(token_);
token_ = EventScheduler::Token();
}
}
::aos::monotonic_clock::time_point monotonic_now() const {
return scheduler_->monotonic_now();
}
private:
EventScheduler *scheduler_;
EventScheduler::Token token_;
// Function to be run on the thread
::std::function<void()> fn_;
monotonic_clock::time_point base_;
monotonic_clock::duration repeat_offset_;
};
class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
public:
SimulatedPhasedLoopHandler(EventScheduler *scheduler,
::std::function<void(int)> fn,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset)
: simulated_timer_handler_(scheduler, [this]() { HandleTimerWakeup(); }),
phased_loop_(interval, simulated_timer_handler_.monotonic_now(),
offset),
fn_(fn) {
// TODO(austin): This assumes time doesn't change between when the
// constructor is called and when we start running. It's probably a safe
// assumption.
Reschedule();
}
void HandleTimerWakeup() {
fn_(cycles_elapsed_);
Reschedule();
}
void set_interval_and_offset(
const monotonic_clock::duration interval,
const monotonic_clock::duration offset) override {
phased_loop_.set_interval_and_offset(interval, offset);
}
void Reschedule() {
cycles_elapsed_ =
phased_loop_.Iterate(simulated_timer_handler_.monotonic_now());
simulated_timer_handler_.Setup(phased_loop_.sleep_time(),
::aos::monotonic_clock::zero());
}
private:
SimulatedTimerHandler simulated_timer_handler_;
time::PhasedLoop phased_loop_;
int cycles_elapsed_ = 1;
::std::function<void(int)> fn_;
};
class SimulatedEventLoop : public EventLoop {
public:
explicit SimulatedEventLoop(
EventScheduler *scheduler,
::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
*queues)
: scheduler_(scheduler), queues_(queues) {
scheduler_->AddRawEventLoop(this);
}
~SimulatedEventLoop() override { scheduler_->RemoveRawEventLoop(this); };
::aos::monotonic_clock::time_point monotonic_now() override {
return scheduler_->monotonic_now();
}
::std::unique_ptr<RawSender> MakeRawSender(
const ::std::string &path, const QueueTypeInfo &type) override;
::std::unique_ptr<RawFetcher> MakeRawFetcher(
const ::std::string &path, const QueueTypeInfo &type) override;
void MakeRawWatcher(
const ::std::string &path, const QueueTypeInfo &type,
::std::function<void(const ::aos::Message *message)> watcher) override;
TimerHandler *AddTimer(::std::function<void()> callback) override {
timers_.emplace_back(new SimulatedTimerHandler(scheduler_, callback));
return timers_.back().get();
}
PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset =
::std::chrono::seconds(0)) override {
phased_loops_.emplace_back(
new SimulatedPhasedLoopHandler(scheduler_, callback, interval, offset));
return phased_loops_.back().get();
}
void OnRun(::std::function<void()> on_run) override {
scheduler_->Schedule(scheduler_->monotonic_now(), on_run);
}
void set_name(const char *name) override { name_ = name; }
SimulatedQueue *GetSimulatedQueue(
const ::std::pair<::std::string, QueueTypeInfo> &);
void Take(const ::std::string &path);
void SetRuntimeRealtimePriority(int /*priority*/) override {
if (is_running()) {
::aos::Die("Cannot set realtime priority while running.");
}
}
private:
EventScheduler *scheduler_;
::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
*queues_;
::std::vector<std::string> taken_;
::std::vector<std::unique_ptr<TimerHandler>> timers_;
::std::vector<std::unique_ptr<PhasedLoopHandler>> phased_loops_;
::std::string name_;
};
EventScheduler::Token EventScheduler::Schedule(
::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
return events_list_.emplace(time, callback);
}
void EventScheduler::Deschedule(EventScheduler::Token token) {
events_list_.erase(token);
}
void EventScheduler::RunFor(monotonic_clock::duration duration) {
const ::aos::monotonic_clock::time_point end_time =
monotonic_now() + duration;
testing::MockTime(monotonic_now());
for (RawEventLoop *event_loop : raw_event_loops_) {
event_loop->set_is_running(true);
}
is_running_ = true;
while (!events_list_.empty() && is_running_) {
auto iter = events_list_.begin();
::aos::monotonic_clock::time_point next_time = iter->first;
if (next_time > end_time) {
break;
}
now_ = iter->first;
testing::MockTime(now_);
::std::function<void()> callback = ::std::move(iter->second);
events_list_.erase(iter);
callback();
}
now_ = end_time;
if (!is_running_) {
for (RawEventLoop *event_loop : raw_event_loops_) {
event_loop->set_is_running(false);
}
}
testing::UnMockTime();
}
void EventScheduler::Run() {
testing::MockTime(monotonic_now());
for (RawEventLoop *event_loop : raw_event_loops_) {
event_loop->set_is_running(true);
}
is_running_ = true;
while (!events_list_.empty() && is_running_) {
auto iter = events_list_.begin();
now_ = iter->first;
testing::MockTime(now_);
::std::function<void()> callback = ::std::move(iter->second);
events_list_.erase(iter);
callback();
}
if (!is_running_) {
for (RawEventLoop *event_loop : raw_event_loops_) {
event_loop->set_is_running(false);
}
}
testing::UnMockTime();
}
void SimulatedEventLoop::MakeRawWatcher(
const std::string &path, const QueueTypeInfo &type,
std::function<void(const aos::Message *message)> watcher) {
Take(path);
::std::pair<::std::string, QueueTypeInfo> key(path, type);
GetSimulatedQueue(key)->MakeRawWatcher(watcher);
}
std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
const std::string &path, const QueueTypeInfo &type) {
Take(path);
::std::pair<::std::string, QueueTypeInfo> key(path, type);
return GetSimulatedQueue(key)->MakeRawSender(this);
}
std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
const std::string &path, const QueueTypeInfo &type) {
::std::pair<::std::string, QueueTypeInfo> key(path, type);
return GetSimulatedQueue(key)->MakeRawFetcher();
}
SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
const ::std::pair<::std::string, QueueTypeInfo> &type) {
auto it = queues_->find(type);
if (it == queues_->end()) {
it =
queues_
->emplace(type, SimulatedQueue(type.second, type.first, scheduler_))
.first;
}
return &it->second;
}
void SimulatedQueue::MakeRawWatcher(
::std::function<void(const aos::Message *message)> watcher) {
watchers_.push_back(watcher);
}
::std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender(
EventLoop *event_loop) {
return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
}
::std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
::std::unique_ptr<SimulatedFetcher> fetcher(new SimulatedFetcher(this));
fetchers_.push_back(fetcher.get());
return ::std::move(fetcher);
}
void SimulatedQueue::Send(RefCountedBuffer message) {
latest_message_ = message;
if (scheduler_->is_running()) {
for (auto &watcher : watchers_) {
scheduler_->Schedule(scheduler_->monotonic_now(),
[watcher, message]() { watcher(message.get()); });
}
}
for (auto &fetcher : fetchers_) {
fetcher->Enqueue(message);
}
}
void SimulatedQueue::UnregisterFetcher(SimulatedFetcher *fetcher) {
fetchers_.erase(::std::find(fetchers_.begin(), fetchers_.end(), fetcher));
}
void SimulatedEventLoop::Take(const ::std::string &path) {
if (is_running()) {
::aos::Die("Cannot add new objects while running.\n");
}
const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
if (prior != taken_.end()) {
::aos::Die("%s is already being used.", path.c_str());
} else {
taken_.emplace_back(path);
}
}
::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop() {
return ::std::unique_ptr<EventLoop>(
new SimulatedEventLoop(&scheduler_, &queues_));
}
} // namespace aos