Sort wakeups and add send latency.
We don't want to rely on epoll to trigger wakeups in order. Timers will
likely wake up before watchers due to less kernel delays, etc. So,
maintain a sorted list of events and see which should be triggered first
and trigger it first. It also merges wakeups nicely.
This gives us the infrastructure to handle the same problem in
simulation. We can have perfect timers, and senders with latency. But,
if a timer wakes up while something is being sent, we will move up the
wakeup and handle it in order.
Change-Id: I8675ec5221dd2603b4aa5b1a3729907a599813b4
diff --git a/aos/controls/control_loop_test.h b/aos/controls/control_loop_test.h
index a0a844f..a2dbb7b 100644
--- a/aos/controls/control_loop_test.h
+++ b/aos/controls/control_loop_test.h
@@ -93,6 +93,10 @@
return event_loop_factory_.MakeEventLoop(name);
}
+ void set_send_delay(std::chrono::nanoseconds send_delay) {
+ event_loop_factory_.set_send_delay(send_delay);
+ }
+
void RunFor(monotonic_clock::duration duration) {
event_loop_factory_.RunFor(duration);
}
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 6ffa526..dfad4ff 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -45,6 +45,7 @@
name = "event_loop",
srcs = [
"event_loop.cc",
+ "event_loop_event.h",
"event_loop_tmpl.h",
],
hdrs = [
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index 01f6a3e..2c3a215 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -65,6 +65,7 @@
EventLoop::~EventLoop() {
CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
+ CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
}
int EventLoop::ChannelIndex(const Channel *channel) {
@@ -377,6 +378,39 @@
}
}
+void EventLoop::ReserveEvents() {
+ events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
+}
+
+namespace {
+bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
+ return first->event_time() > second->event_time();
+}
+} // namespace
+
+void EventLoop::AddEvent(EventLoopEvent *event) {
+ DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
+ events_.push_back(event);
+ std::push_heap(events_.begin(), events_.end(), CompareEvents);
+}
+
+void EventLoop::RemoveEvent(EventLoopEvent *event) {
+ auto e = std::find(events_.begin(), events_.end(), event);
+ if (e != events_.end()) {
+ events_.erase(e);
+ std::make_heap(events_.begin(), events_.end(), CompareEvents);
+ event->Invalidate();
+ }
+}
+
+EventLoopEvent *EventLoop::PopEvent() {
+ EventLoopEvent *result = events_.front();
+ std::pop_heap(events_.begin(), events_.end(), CompareEvents);
+ events_.pop_back();
+ result->Invalidate();
+ return result;
+}
+
void WatcherState::set_timing_report(timing::Watcher *watcher) {
CHECK_NOTNULL(watcher);
watcher_ = watcher;
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 16980dd..68b1b5e 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -7,6 +7,7 @@
#include "aos/configuration.h"
#include "aos/configuration_generated.h"
+#include "aos/events/event_loop_event.h"
#include "aos/events/event_loop_generated.h"
#include "aos/events/timing_statistics.h"
#include "aos/flatbuffers.h"
@@ -424,9 +425,10 @@
WatcherState *NewWatcher(std::unique_ptr<WatcherState> watcher);
std::vector<RawSender *> senders_;
+ std::vector<RawFetcher *> fetchers_;
+
std::vector<std::unique_ptr<TimerHandler>> timers_;
std::vector<std::unique_ptr<PhasedLoopHandler>> phased_loops_;
- std::vector<RawFetcher *> fetchers_;
std::vector<std::unique_ptr<WatcherState>> watchers_;
void SendTimingReport();
@@ -435,6 +437,19 @@
std::unique_ptr<RawSender> timing_report_sender_;
+ // Tracks which event sources (timers and watchers) have data, and which
+ // don't. Added events may not change their event_time().
+ // TODO(austin): Test case 1: timer triggers at t1, handler takes until after
+ // t2 to run, t2 should then be picked up without a context switch.
+ void AddEvent(EventLoopEvent *event);
+ void RemoveEvent(EventLoopEvent *event);
+ size_t EventCount() const { return events_.size(); }
+ EventLoopEvent *PopEvent();
+ EventLoopEvent *PeekEvent() { return events_.front(); }
+ void ReserveEvents();
+
+ std::vector<EventLoopEvent *> events_;
+
private:
virtual pid_t GetTid() = 0;
diff --git a/aos/events/event_loop_event.h b/aos/events/event_loop_event.h
new file mode 100644
index 0000000..6438da1
--- /dev/null
+++ b/aos/events/event_loop_event.h
@@ -0,0 +1,46 @@
+#ifndef AOS_EVENTS_EVENT_LOOP_EVENT_H
+#define AOS_EVENTS_EVENT_LOOP_EVENT_H
+
+#include "aos/time/time.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+// Common interface to track when callbacks and timers should have happened.
+class EventLoopEvent {
+ public:
+ virtual ~EventLoopEvent() {}
+
+ bool valid() const { return event_time_ != monotonic_clock::max_time; }
+ void Invalidate() { event_time_ = monotonic_clock::max_time; }
+
+ monotonic_clock::time_point event_time() const {
+ DCHECK(valid());
+ return event_time_;
+ }
+
+ virtual void HandleEvent() = 0;
+
+ void set_event_time(monotonic_clock::time_point event_time) {
+ event_time_ = event_time;
+ }
+
+ private:
+ monotonic_clock::time_point event_time_ = monotonic_clock::max_time;
+};
+
+// Adapter class to implement EventLoopEvent by calling HandleEvent on T.
+template <typename T>
+class EventHandler final : public EventLoopEvent {
+ public:
+ EventHandler(T *t) : t_(t) {}
+ ~EventHandler() = default;
+ void HandleEvent() override { t_->HandleEvent(); }
+
+ private:
+ T *const t_;
+};
+
+} // namespace aos
+
+#endif // AOS_EVENTS_EVENT_LOOP_EVENT_H
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index b69f28a..f4cf2ec 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -90,8 +90,7 @@
// Ends the given event loop at the given time from now.
void EndEventLoop(EventLoop *loop, ::std::chrono::milliseconds duration) {
auto end_timer = loop->AddTimer([this]() { this->Exit(); });
- end_timer->Setup(loop->monotonic_now() +
- ::std::chrono::milliseconds(duration));
+ end_timer->Setup(loop->monotonic_now() + duration);
end_timer->set_name("end");
}
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index f5feea9..e549d57 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -179,8 +179,6 @@
}
bool FetchNext() {
- // TODO(austin): Write a test which starts with nothing in the queue,
- // and then calls FetchNext() after something is sent.
// TODO(austin): Get behind and make sure it dies both here and with
// Fetch.
ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
@@ -340,41 +338,41 @@
class WatcherState : public aos::WatcherState {
public:
WatcherState(
- EventLoop *event_loop, const Channel *channel,
+ ShmEventLoop *event_loop, const Channel *channel,
std::function<void(const Context &context, const void *message)> fn)
: aos::WatcherState(event_loop, channel, std::move(fn)),
+ event_loop_(event_loop),
+ event_(this),
simple_shm_fetcher_(channel) {}
- ~WatcherState() override {}
+ ~WatcherState() override { event_loop_->RemoveEvent(&event_); }
void Startup(EventLoop *event_loop) override {
- PointAtNextQueueIndex();
+ simple_shm_fetcher_.PointAtNextQueueIndex();
CHECK(RegisterWakeup(event_loop->priority()));
}
- // Points the next message to fetch at the queue index which will be populated
- // next.
- void PointAtNextQueueIndex() { simple_shm_fetcher_.PointAtNextQueueIndex(); }
-
// Returns true if there is new data available.
- bool HasNewData() {
+ bool CheckForNewData() {
if (!has_new_data_) {
has_new_data_ = simple_shm_fetcher_.FetchNext();
+
+ if (has_new_data_) {
+ event_.set_event_time(
+ simple_shm_fetcher_.context().monotonic_sent_time);
+ event_loop_->AddEvent(&event_);
+ }
}
return has_new_data_;
}
- // Returns the time of the current data sample.
- aos::monotonic_clock::time_point event_time() const {
- return simple_shm_fetcher_.context().monotonic_sent_time;
- }
-
// Consumes the data by calling the callback.
- void CallCallback() {
+ void HandleEvent() {
CHECK(has_new_data_);
DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
has_new_data_ = false;
+ CheckForNewData();
}
// Registers us to receive a signal on event reception.
@@ -387,37 +385,68 @@
private:
bool has_new_data_ = false;
+ ShmEventLoop *event_loop_;
+ EventHandler<WatcherState> event_;
SimpleShmFetcher simple_shm_fetcher_;
};
// Adapter class to adapt a timerfd to a TimerHandler.
-class TimerHandlerState : public TimerHandler {
+class TimerHandlerState final : public TimerHandler {
public:
TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
: TimerHandler(shm_event_loop, std::move(fn)),
- shm_event_loop_(shm_event_loop) {
- shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
- const uint64_t elapsed_cycles = timerfd_.Read();
-
- Call(monotonic_clock::now, base_);
-
- base_ += repeat_offset_ * elapsed_cycles;
- });
+ shm_event_loop_(shm_event_loop),
+ event_(this) {
+ shm_event_loop_->epoll_.OnReadable(
+ timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
}
- ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
+ ~TimerHandlerState() {
+ Disable();
+ shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
+ }
+
+ void HandleEvent() {
+ uint64_t elapsed_cycles = timerfd_.Read();
+ if (elapsed_cycles == 0u) {
+ // We got called before the timer interrupt could happen, but because we
+ // are checking the time, we got called on time. Push the timer out by 1
+ // cycle.
+ elapsed_cycles = 1u;
+ timerfd_.SetTime(base_ + repeat_offset_, repeat_offset_);
+ }
+
+ Call(monotonic_clock::now, base_);
+
+ base_ += repeat_offset_ * elapsed_cycles;
+
+ if (repeat_offset_ != chrono::seconds(0)) {
+ event_.set_event_time(base_);
+ shm_event_loop_->AddEvent(&event_);
+ }
+ }
void Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) override {
+ if (event_.valid()) {
+ shm_event_loop_->RemoveEvent(&event_);
+ }
+
timerfd_.SetTime(base, repeat_offset);
base_ = base;
repeat_offset_ = repeat_offset;
+ event_.set_event_time(base_);
+ shm_event_loop_->AddEvent(&event_);
}
- void Disable() override { timerfd_.Disable(); }
+ void Disable() override {
+ shm_event_loop_->RemoveEvent(&event_);
+ timerfd_.Disable();
+ }
private:
ShmEventLoop *shm_event_loop_;
+ EventHandler<TimerHandlerState> event_;
TimerFd timerfd_;
@@ -426,33 +455,52 @@
};
// Adapter class to the timerfd and PhasedLoop.
-class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
+class PhasedLoopHandler final : public ::aos::PhasedLoopHandler {
public:
PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset)
: aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
- shm_event_loop_(shm_event_loop) {
- shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
- timerfd_.Read();
- Call(monotonic_clock::now,
- [this](monotonic_clock::time_point sleep_time) {
- Schedule(sleep_time);
- });
+ shm_event_loop_(shm_event_loop),
+ event_(this) {
+ shm_event_loop_->epoll_.OnReadable(
+ timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
+ }
+
+ void HandleEvent() {
+ // The return value for read is the number of cycles that have elapsed.
+ // Because we check to see when this event *should* have happened, there are
+ // cases where Read() will return 0, when 1 cycle has actually happened.
+ // This occurs when the timer interrupt hasn't triggered yet. Therefore,
+ // ignore it. Call handles rescheduling and calculating elapsed cycles
+ // without any extra help.
+ timerfd_.Read();
+ event_.Invalidate();
+
+ Call(monotonic_clock::now, [this](monotonic_clock::time_point sleep_time) {
+ Schedule(sleep_time);
});
}
~PhasedLoopHandler() override {
shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
+ shm_event_loop_->RemoveEvent(&event_);
}
private:
// Reschedules the timer.
void Schedule(monotonic_clock::time_point sleep_time) override {
+ if (event_.valid()) {
+ shm_event_loop_->RemoveEvent(&event_);
+ }
+
timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
+ event_.set_event_time(sleep_time);
+ shm_event_loop_->AddEvent(&event_);
}
ShmEventLoop *shm_event_loop_;
+ EventHandler<PhasedLoopHandler> event_;
TimerFd timerfd_;
};
@@ -497,33 +545,23 @@
on_run_.push_back(::std::move(on_run));
}
-void ShmEventLoop::HandleWatcherSignal() {
+void ShmEventLoop::HandleEvent() {
+ // Update all the times for handlers.
+ for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
+ internal::WatcherState *watcher =
+ reinterpret_cast<internal::WatcherState *>(base_watcher.get());
+
+ watcher->CheckForNewData();
+ }
+
while (true) {
- // Call the handlers in time order of their messages.
- aos::monotonic_clock::time_point min_event_time =
- aos::monotonic_clock::max_time;
- size_t min_watcher_index = -1;
- size_t watcher_index = 0;
- for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
- internal::WatcherState *watcher =
- reinterpret_cast<internal::WatcherState *>(base_watcher.get());
-
- if (watcher->HasNewData()) {
- if (watcher->event_time() < min_event_time) {
- min_watcher_index = watcher_index;
- min_event_time = watcher->event_time();
- }
- }
- ++watcher_index;
- }
-
- if (min_event_time == aos::monotonic_clock::max_time) {
+ if (EventCount() == 0 ||
+ PeekEvent()->event_time() > monotonic_clock::now()) {
break;
}
- reinterpret_cast<internal::WatcherState *>(
- watchers_[min_watcher_index].get())
- ->CallCallback();
+ EventLoopEvent *event = PopEvent();
+ event->HandleEvent();
}
}
@@ -635,12 +673,14 @@
// watchers, and calling the oldest thing first. That will improve
// determinism a lot.
- HandleWatcherSignal();
+ HandleEvent();
});
}
MaybeScheduleTimingReports();
+ ReserveEvents();
+
// Now, all the callbacks are setup. Lock everything into memory and go RT.
if (priority_ != 0) {
::aos::InitRT();
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index 89a2721..5063186 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -77,7 +77,7 @@
friend class internal::ShmSender;
friend class internal::ShmFetcher;
- void HandleWatcherSignal();
+ void HandleEvent();
// Tracks that we can't have multiple watchers or a sender and a watcher (or
// multiple senders) on a single queue (path).
@@ -94,6 +94,7 @@
internal::EPoll epoll_;
};
+
} // namespace aos
#endif // AOS_EVENTS_SHM_EVENT_LOOP_H_
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index a9dd3e5..29c9edd 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -29,41 +29,38 @@
OveralignedChar actual_data[];
};
+class SimulatedEventLoop;
class SimulatedFetcher;
class SimulatedChannel;
-class ShmWatcher : public WatcherState {
+class SimulatedWatcher : public WatcherState {
public:
- ShmWatcher(
- EventLoop *event_loop, const Channel *channel,
- std::function<void(const Context &context, const void *message)> fn)
- : WatcherState(event_loop, channel, std::move(fn)),
- event_loop_(event_loop) {}
+ SimulatedWatcher(
+ SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
+ const Channel *channel,
+ std::function<void(const Context &context, const void *message)> fn);
- ~ShmWatcher() override;
-
- void Call(const Context &context) {
- const monotonic_clock::time_point monotonic_now =
- event_loop_->monotonic_now();
- DoCallCallback([monotonic_now]() { return monotonic_now; }, context);
- }
+ ~SimulatedWatcher() override;
void Startup(EventLoop * /*event_loop*/) override {}
- void Schedule(EventScheduler *scheduler,
- std::shared_ptr<SimulatedMessage> message) {
- // TODO(austin): Track the token once we schedule in the future.
- // TODO(austin): Schedule wakeup in the future so we don't have 0 latency.
- scheduler->Schedule(scheduler->monotonic_now(),
- [this, message]() { Call(message->context); });
- }
+ void Schedule(std::shared_ptr<SimulatedMessage> message);
+
+ void HandleEvent();
void SetSimulatedChannel(SimulatedChannel *channel) {
simulated_channel_ = channel;
}
private:
- EventLoop *event_loop_;
+ void DoSchedule(monotonic_clock::time_point event_time);
+
+ ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
+
+ SimulatedEventLoop *simulated_event_loop_;
+ EventHandler<SimulatedWatcher> event_;
+ EventScheduler *scheduler_;
+ EventScheduler::Token token_;
SimulatedChannel *simulated_channel_ = nullptr;
};
@@ -83,9 +80,9 @@
::std::unique_ptr<RawFetcher> MakeRawFetcher(EventLoop *event_loop);
// Registers a watcher for the queue.
- void MakeRawWatcher(ShmWatcher *watcher);
+ void MakeRawWatcher(SimulatedWatcher *watcher);
- void RemoveWatcher(ShmWatcher *watcher) {
+ void RemoveWatcher(SimulatedWatcher *watcher) {
watchers_.erase(std::find(watchers_.begin(), watchers_.end(), watcher));
}
@@ -113,7 +110,7 @@
const Channel *channel_;
// List of all watchers.
- ::std::vector<ShmWatcher *> watchers_;
+ ::std::vector<SimulatedWatcher *> watchers_;
// List of all fetchers.
::std::vector<SimulatedFetcher *> fetchers_;
@@ -123,8 +120,6 @@
ipc_lib::QueueIndex next_queue_index_;
};
-ShmWatcher::~ShmWatcher() { simulated_channel_->RemoveWatcher(this); }
-
namespace {
// Creates a SimulatedMessage with size bytes of storage.
@@ -195,8 +190,6 @@
};
} // namespace
-class SimulatedEventLoop;
-
class SimulatedFetcher : public RawFetcher {
public:
explicit SimulatedFetcher(EventLoop *event_loop, SimulatedChannel *queue)
@@ -215,6 +208,8 @@
std::pair<bool, monotonic_clock::time_point> DoFetch() override {
if (msgs_.size() == 0) {
+ // TODO(austin): Can we just do this logic unconditionally? It is a lot
+ // simpler. And call clear, obviously.
if (!msg_ && queue_->latest_message()) {
SetMsg(queue_->latest_message());
return std::make_pair(true, queue_->monotonic_now());
@@ -256,25 +251,22 @@
explicit SimulatedTimerHandler(EventScheduler *scheduler,
SimulatedEventLoop *simulated_event_loop,
::std::function<void()> fn);
- ~SimulatedTimerHandler() {}
+ ~SimulatedTimerHandler() { Disable(); }
void Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) override;
void HandleEvent();
- void Disable() override {
- if (token_ != scheduler_->InvalidToken()) {
- scheduler_->Deschedule(token_);
- token_ = scheduler_->InvalidToken();
- }
- }
+ void Disable() override;
::aos::monotonic_clock::time_point monotonic_now() const {
return scheduler_->monotonic_now();
}
private:
+ SimulatedEventLoop *simulated_event_loop_;
+ EventHandler<SimulatedTimerHandler> event_;
EventScheduler *scheduler_;
EventScheduler::Token token_;
@@ -289,21 +281,15 @@
::std::function<void(int)> fn,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset);
- ~SimulatedPhasedLoopHandler() {
- if (token_ != scheduler_->InvalidToken()) {
- scheduler_->Deschedule(token_);
- token_ = scheduler_->InvalidToken();
- }
- }
+ ~SimulatedPhasedLoopHandler();
- void HandleTimerWakeup();
+ void HandleEvent();
- void Schedule(monotonic_clock::time_point sleep_time) override {
- token_ = scheduler_->Schedule(sleep_time, [this]() { HandleTimerWakeup(); });
- }
+ void Schedule(monotonic_clock::time_point sleep_time) override;
private:
SimulatedEventLoop *simulated_event_loop_;
+ EventHandler<SimulatedPhasedLoopHandler> event_;
EventScheduler *scheduler_;
EventScheduler::Token token_;
@@ -351,6 +337,11 @@
}
}
+ std::chrono::nanoseconds send_delay() const { return send_delay_; }
+ void set_send_delay(std::chrono::nanoseconds send_delay) {
+ send_delay_ = send_delay;
+ }
+
::aos::monotonic_clock::time_point monotonic_now() override {
return scheduler_->monotonic_now();
}
@@ -407,6 +398,19 @@
private:
friend class SimulatedTimerHandler;
+ friend class SimulatedPhasedLoopHandler;
+ friend class SimulatedWatcher;
+
+ void HandleEvent() {
+ while (true) {
+ if (EventCount() == 0 || PeekEvent()->event_time() > monotonic_now()) {
+ break;
+ }
+
+ EventLoopEvent *event = PopEvent();
+ event->HandleEvent();
+ }
+ }
pid_t GetTid() override { return tid_; }
@@ -422,16 +426,28 @@
bool has_setup_ = false;
+ std::chrono::nanoseconds send_delay_;
+
const pid_t tid_;
};
+void SimulatedEventLoopFactory::set_send_delay(
+ std::chrono::nanoseconds send_delay) {
+ send_delay_ = send_delay;
+ for (std::pair<EventLoop *, std::function<void(bool)>> &loop :
+ raw_event_loops_) {
+ reinterpret_cast<SimulatedEventLoop *>(loop.first)
+ ->set_send_delay(send_delay_);
+ }
+}
+
void SimulatedEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &channel, const void *message)> watcher) {
ChannelIndex(channel);
Take(channel);
- std::unique_ptr<ShmWatcher> shm_watcher(
- new ShmWatcher(this, channel, std::move(watcher)));
+ std::unique_ptr<SimulatedWatcher> shm_watcher(
+ new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
NewWatcher(std::move(shm_watcher));
@@ -463,7 +479,65 @@
return it->second.get();
}
-void SimulatedChannel::MakeRawWatcher(ShmWatcher *watcher) {
+SimulatedWatcher::SimulatedWatcher(
+ SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
+ const Channel *channel,
+ std::function<void(const Context &context, const void *message)> fn)
+ : WatcherState(simulated_event_loop, channel, std::move(fn)),
+ simulated_event_loop_(simulated_event_loop),
+ event_(this),
+ scheduler_(scheduler),
+ token_(scheduler_->InvalidToken()) {}
+
+SimulatedWatcher::~SimulatedWatcher() {
+ simulated_event_loop_->RemoveEvent(&event_);
+ if (token_ != scheduler_->InvalidToken()) {
+ scheduler_->Deschedule(token_);
+ }
+ simulated_channel_->RemoveWatcher(this);
+}
+
+void SimulatedWatcher::Schedule(std::shared_ptr<SimulatedMessage> message) {
+ monotonic_clock::time_point event_time = scheduler_->monotonic_now();
+
+ // Messages are queued in order. If we are the first, add ourselves.
+ // Otherwise, don't.
+ if (msgs_.size() == 0) {
+ event_.set_event_time(message->context.monotonic_sent_time);
+ simulated_event_loop_->AddEvent(&event_);
+
+ DoSchedule(event_time);
+ }
+
+ msgs_.emplace_back(message);
+}
+
+void SimulatedWatcher::HandleEvent() {
+ CHECK_NE(msgs_.size(), 0u) << ": No events to handle.";
+
+ const monotonic_clock::time_point monotonic_now =
+ simulated_event_loop_->monotonic_now();
+ DoCallCallback([monotonic_now]() { return monotonic_now; },
+ msgs_.front()->context);
+
+ msgs_.pop_front();
+ if (msgs_.size() != 0) {
+ event_.set_event_time(msgs_.front()->context.monotonic_sent_time);
+ simulated_event_loop_->AddEvent(&event_);
+
+ DoSchedule(event_.event_time());
+ } else {
+ token_ = scheduler_->InvalidToken();
+ }
+}
+
+void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
+ token_ =
+ scheduler_->Schedule(event_time + simulated_event_loop_->send_delay(),
+ [this]() { simulated_event_loop_->HandleEvent(); });
+}
+
+void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
watcher->SetSimulatedChannel(this);
watchers_.emplace_back(watcher);
}
@@ -489,8 +563,8 @@
latest_message_ = message;
if (scheduler_->is_running()) {
- for (ShmWatcher *watcher : watchers_) {
- watcher->Schedule(scheduler_, message);
+ for (SimulatedWatcher *watcher : watchers_) {
+ watcher->Schedule(message);
}
}
for (auto &fetcher : fetchers_) {
@@ -510,6 +584,8 @@
EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
::std::function<void()> fn)
: TimerHandler(simulated_event_loop, std::move(fn)),
+ simulated_event_loop_(simulated_event_loop),
+ event_(this),
scheduler_(scheduler),
token_(scheduler_->InvalidToken()) {}
@@ -521,10 +597,14 @@
base_ = base;
repeat_offset_ = repeat_offset;
if (base < monotonic_now) {
- token_ = scheduler_->Schedule(monotonic_now, [this]() { HandleEvent(); });
+ token_ = scheduler_->Schedule(
+ monotonic_now, [this]() { simulated_event_loop_->HandleEvent(); });
} else {
- token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
+ token_ = scheduler_->Schedule(
+ base, [this]() { simulated_event_loop_->HandleEvent(); });
}
+ event_.set_event_time(base_);
+ simulated_event_loop_->AddEvent(&event_);
}
void SimulatedTimerHandler::HandleEvent() {
@@ -533,7 +613,10 @@
if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
// Reschedule.
while (base_ <= monotonic_now) base_ += repeat_offset_;
- token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
+ token_ = scheduler_->Schedule(
+ base_, [this]() { simulated_event_loop_->HandleEvent(); });
+ event_.set_event_time(base_);
+ simulated_event_loop_->AddEvent(&event_);
} else {
token_ = scheduler_->InvalidToken();
}
@@ -541,16 +624,33 @@
Call([monotonic_now]() { return monotonic_now; }, monotonic_now);
}
+void SimulatedTimerHandler::Disable() {
+ simulated_event_loop_->RemoveEvent(&event_);
+ if (token_ != scheduler_->InvalidToken()) {
+ scheduler_->Deschedule(token_);
+ token_ = scheduler_->InvalidToken();
+ }
+}
+
SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
::std::function<void(int)> fn, const monotonic_clock::duration interval,
const monotonic_clock::duration offset)
: PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
simulated_event_loop_(simulated_event_loop),
+ event_(this),
scheduler_(scheduler),
token_(scheduler_->InvalidToken()) {}
-void SimulatedPhasedLoopHandler::HandleTimerWakeup() {
+SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
+ if (token_ != scheduler_->InvalidToken()) {
+ scheduler_->Deschedule(token_);
+ token_ = scheduler_->InvalidToken();
+ }
+ simulated_event_loop_->RemoveEvent(&event_);
+}
+
+void SimulatedPhasedLoopHandler::HandleEvent() {
monotonic_clock::time_point monotonic_now =
simulated_event_loop_->monotonic_now();
Call(
@@ -558,6 +658,14 @@
[this](monotonic_clock::time_point sleep_time) { Schedule(sleep_time); });
}
+void SimulatedPhasedLoopHandler::Schedule(
+ monotonic_clock::time_point sleep_time) {
+ token_ = scheduler_->Schedule(
+ sleep_time, [this]() { simulated_event_loop_->HandleEvent(); });
+ event_.set_event_time(sleep_time);
+ simulated_event_loop_->AddEvent(&event_);
+}
+
void SimulatedEventLoop::Take(const Channel *channel) {
CHECK(!is_running()) << ": Cannot add new objects while running.";
@@ -575,10 +683,11 @@
std::string_view name) {
pid_t tid = tid_;
++tid_;
- ::std::unique_ptr<EventLoop> result(new SimulatedEventLoop(
+ ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
&scheduler_, &channels_, configuration_, &raw_event_loops_, tid));
result->set_name(name);
- return result;
+ result->set_send_delay(send_delay_);
+ return std::move(result);
}
void SimulatedEventLoopFactory::RunFor(monotonic_clock::duration duration) {
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 79f1e46..7b2b9c7 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -65,6 +65,9 @@
// loop handler.
void Exit() { scheduler_.Exit(); }
+ // Sets the simulated send delay for the factory.
+ void set_send_delay(std::chrono::nanoseconds send_delay);
+
monotonic_clock::time_point monotonic_now() const {
return scheduler_.monotonic_now();
}
@@ -81,6 +84,8 @@
std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
raw_event_loops_;
+ std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+
pid_t tid_ = 0;
};
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 16a1923..44be581 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -3,6 +3,7 @@
#include <string_view>
#include "aos/events/event_loop_param_test.h"
+#include "aos/events/test_message_generated.h"
#include "gtest/gtest.h"
namespace aos {
@@ -28,6 +29,10 @@
// I'm not sure how much that matters.
void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
+ void set_send_delay(std::chrono::nanoseconds send_delay) {
+ event_loop_factory_.set_send_delay(send_delay);
+ }
+
private:
SimulatedEventLoopFactory event_loop_factory_;
};
@@ -114,5 +119,106 @@
EXPECT_EQ(counter, 10);
}
+// Tests that watchers have latency in simulation.
+TEST(SimulatedEventLoopTest, WatcherTimingReport) {
+ SimulatedEventLoopTestFactory factory;
+ factory.set_send_delay(std::chrono::microseconds(50));
+
+ FLAGS_timing_report_ms = 1000;
+ auto loop1 = factory.MakePrimary("primary");
+ loop1->MakeWatcher("/test", [](const TestMessage &) {});
+
+ auto loop2 = factory.Make("sender_loop");
+
+ auto loop3 = factory.Make("report_fetcher");
+
+ Fetcher<timing::Report> report_fetcher =
+ loop3->MakeFetcher<timing::Report>("/aos");
+ EXPECT_FALSE(report_fetcher.Fetch());
+
+ auto sender = loop2->MakeSender<TestMessage>("/test");
+
+ // Send 10 messages in the middle of a timing report period so we get
+ // something interesting back.
+ auto test_timer = loop2->AddTimer([&sender]() {
+ for (int i = 0; i < 10; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200 + i);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ }
+ });
+
+ // Quit after 1 timing report, mid way through the next cycle.
+ {
+ auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
+ end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
+ end_timer->set_name("end");
+ }
+
+ loop1->OnRun([&test_timer, &loop1]() {
+ test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
+ });
+
+ factory.Run();
+
+ // And, since we are here, check that the timing report makes sense.
+ // Start by looking for our event loop's timing.
+ FlatbufferDetachedBuffer<timing::Report> primary_report =
+ FlatbufferDetachedBuffer<timing::Report>::Empty();
+ while (report_fetcher.FetchNext()) {
+ LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
+ if (report_fetcher->name()->string_view() == "primary") {
+ primary_report = CopyFlatBuffer(report_fetcher.get());
+ }
+ }
+
+ // Check the watcher report.
+ VLOG(1) << FlatbufferToJson(primary_report, true);
+
+ EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
+
+ // Just the timing report timer.
+ ASSERT_NE(primary_report.message().timers(), nullptr);
+ EXPECT_EQ(primary_report.message().timers()->size(), 2);
+
+ // No phased loops
+ ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
+
+ // And now confirm that the watcher received all 10 messages, and has latency.
+ ASSERT_NE(primary_report.message().watchers(), nullptr);
+ ASSERT_EQ(primary_report.message().watchers()->size(), 1);
+ EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
+ EXPECT_NEAR(
+ primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
+ 0.00005, 1e-9);
+ EXPECT_NEAR(
+ primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
+ 0.00005, 1e-9);
+ EXPECT_NEAR(
+ primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
+ 0.00005, 1e-9);
+ EXPECT_EQ(primary_report.message()
+ .watchers()
+ ->Get(0)
+ ->wakeup_latency()
+ ->standard_deviation(),
+ 0.0);
+
+ EXPECT_EQ(
+ primary_report.message().watchers()->Get(0)->handler_time()->average(),
+ 0.0);
+ EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
+ 0.0);
+ EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
+ 0.0);
+ EXPECT_EQ(primary_report.message()
+ .watchers()
+ ->Get(0)
+ ->handler_time()
+ ->standard_deviation(),
+ 0.0);
+}
+
} // namespace testing
} // namespace aos