blob: b5c94bf5a5cd4bfde0a36da6dffbec43e0175c90 [file] [log] [blame]
#ifndef _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
#define _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
#include <map>
#include <memory>
#include <unordered_set>
#include <utility>
#include <vector>
#include "aos/events/event-loop.h"
namespace aos {
class RefCountedBuffer {
public:
RefCountedBuffer() {}
~RefCountedBuffer() { clear(); }
explicit RefCountedBuffer(aos::Message *data) : data_(data) {}
explicit RefCountedBuffer(size_t size) {
data_ = reinterpret_cast<uint8_t *>(malloc(kRefCountSize + size)) +
kRefCountSize;
// Initialize the allocated memory with an integer
*GetRefCount() = 1;
}
RefCountedBuffer(const RefCountedBuffer &other) {
data_ = other.data_;
++*GetRefCount();
}
RefCountedBuffer(RefCountedBuffer &&other) { std::swap(data_, other.data_); }
RefCountedBuffer &operator=(const RefCountedBuffer &other) {
if (this == &other) return *this;
clear();
data_ = other.data_;
++*GetRefCount();
return *this;
}
RefCountedBuffer &operator=(RefCountedBuffer &&other) {
if (this == &other) return *this;
std::swap(data_, other.data_);
return *this;
}
aos::Message *get() const { return static_cast<aos::Message *>(data_); }
aos::Message *release() {
auto tmp = get();
data_ = nullptr;
return tmp;
}
void clear() {
if (data_ != nullptr) {
if (--*GetRefCount() == 0) {
// Free memory block from the start of the allocated block
free(GetRefCount());
}
data_ = nullptr;
}
}
private:
void *data_ = nullptr;
// Qty. memory to be allocated to the ref counter
static constexpr size_t kRefCountSize = sizeof(int64_t);
int64_t *GetRefCount() {
// Need to cast the void* to an 8 bit long object (size of void* is
// technically 0)
return reinterpret_cast<int64_t *>(static_cast<void *>(
reinterpret_cast<uint8_t *>(data_) - kRefCountSize));
}
};
class EventScheduler {
public:
using QueueType = ::std::multimap<::aos::monotonic_clock::time_point,
::std::function<void()>>;
using Token = QueueType::iterator;
// Schedule an event with a callback function
// Returns an iterator to the event
Token Schedule(::aos::monotonic_clock::time_point time,
::std::function<void()> callback);
// Deschedule an event by its iterator
void Deschedule(Token token);
void Run();
void Exit() { is_running_ = false; }
::aos::monotonic_clock::time_point now() { return now_; }
private:
::aos::monotonic_clock::time_point now_ = ::aos::monotonic_clock::epoch();
QueueType events_list_;
bool is_running_ = false;
};
class SimulatedQueue {
public:
explicit SimulatedQueue(const QueueTypeInfo &type, const ::std::string &name,
EventScheduler *scheduler)
: type_(type), name_(name), scheduler_(scheduler){};
std::unique_ptr<RawSender> MakeRawSender();
std::unique_ptr<RawFetcher> MakeRawFetcher();
void MakeRawWatcher(std::function<void(const aos::Message *message)> watcher);
void Send(RefCountedBuffer message) {
index_++;
latest_message_ = message;
for (auto &watcher : watchers_) {
scheduler_->Schedule(scheduler_->now(),
[watcher, message]() { watcher(message.get()); });
}
}
const RefCountedBuffer &latest_message() { return latest_message_; }
int64_t index() { return index_; }
size_t size() { return type_.size; }
const char *name() const { return name_.c_str(); }
private:
int64_t index_ = -1;
QueueTypeInfo type_;
const ::std::string name_;
::std::vector<std::function<void(const aos::Message *message)>> watchers_;
RefCountedBuffer latest_message_;
EventScheduler *scheduler_;
};
class SimulatedTimerHandler : public TimerHandler {
public:
explicit SimulatedTimerHandler(EventScheduler *scheduler,
::std::function<void()> fn)
: scheduler_(scheduler), fn_(fn) {}
~SimulatedTimerHandler() {}
void Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) override {
Disable();
auto now = scheduler_->now();
base_ = base;
repeat_offset_ = repeat_offset;
if (base < now) {
token_ = scheduler_->Schedule(now, [this]() { HandleEvent(); });
} else {
token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
}
}
void HandleEvent() {
auto now = scheduler_->now();
if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
// Reschedule.
while (base_ <= now) base_ += repeat_offset_;
token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
} else {
token_ = EventScheduler::Token();
}
fn_();
}
void Disable() override {
if (token_ != EventScheduler::Token()) {
scheduler_->Deschedule(token_);
token_ = EventScheduler::Token();
}
}
private:
EventScheduler *scheduler_;
EventScheduler::Token token_;
// Function to be run on the thread
::std::function<void()> fn_;
monotonic_clock::time_point base_;
monotonic_clock::duration repeat_offset_;
};
class SimulatedEventLoop : public EventLoop {
public:
explicit SimulatedEventLoop(
EventScheduler *scheduler,
::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
*queues) : scheduler_(scheduler), queues_(queues){};
~SimulatedEventLoop() override{};
::aos::monotonic_clock::time_point monotonic_now() override {
return scheduler_->now();
}
std::unique_ptr<RawSender> MakeRawSender(const std::string &path,
const QueueTypeInfo &type) override;
std::unique_ptr<RawFetcher> MakeRawFetcher(
const std::string &path, const QueueTypeInfo &type) override;
void MakeRawWatcher(
const std::string &path, const QueueTypeInfo &type,
std::function<void(const aos::Message *message)> watcher) override;
TimerHandler *AddTimer(::std::function<void()> callback) override {
timers_.emplace_back(new SimulatedTimerHandler(scheduler_, callback));
return timers_.back().get();
}
void OnRun(std::function<void()> on_run) override {
scheduler_->Schedule(scheduler_->now(), on_run);
}
void Run() override {
set_is_running(true);
scheduler_->Run();
}
void Exit() override {
set_is_running(false);
scheduler_->Exit();
}
SimulatedQueue *GetSimulatedQueue(
const ::std::pair<::std::string, QueueTypeInfo> &);
void Take(const ::std::string &path);
private:
EventScheduler *scheduler_;
::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
*queues_;
::std::vector<std::string> taken_;
::std::vector<std::unique_ptr<TimerHandler>> timers_;
};
class SimulatedEventLoopFactory {
public:
::std::unique_ptr<EventLoop> CreateEventLoop() {
return ::std::unique_ptr<EventLoop>(
new SimulatedEventLoop(&scheduler_, &queues_));
}
private:
EventScheduler scheduler_;
::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue> queues_;
};
} // namespace aos
#endif //_AOS_EVENTS_TEST_EVENT_LOOP_H_