Merge changes I8675ec52,I2d8fd5b6
* changes:
Sort wakeups and add send latency.
Make raw senders able to send flatbuffers
diff --git a/aos/controls/control_loop_test.h b/aos/controls/control_loop_test.h
index a0a844f..a2dbb7b 100644
--- a/aos/controls/control_loop_test.h
+++ b/aos/controls/control_loop_test.h
@@ -93,6 +93,10 @@
return event_loop_factory_.MakeEventLoop(name);
}
+ void set_send_delay(std::chrono::nanoseconds send_delay) {
+ event_loop_factory_.set_send_delay(send_delay);
+ }
+
void RunFor(monotonic_clock::duration duration) {
event_loop_factory_.RunFor(duration);
}
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 6ffa526..dfad4ff 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -45,6 +45,7 @@
name = "event_loop",
srcs = [
"event_loop.cc",
+ "event_loop_event.h",
"event_loop_tmpl.h",
],
hdrs = [
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index 01f6a3e..2c3a215 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -65,6 +65,7 @@
EventLoop::~EventLoop() {
CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
+ CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
}
int EventLoop::ChannelIndex(const Channel *channel) {
@@ -377,6 +378,39 @@
}
}
+void EventLoop::ReserveEvents() {
+ events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
+}
+
+namespace {
+bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
+ return first->event_time() > second->event_time();
+}
+} // namespace
+
+void EventLoop::AddEvent(EventLoopEvent *event) {
+ DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
+ events_.push_back(event);
+ std::push_heap(events_.begin(), events_.end(), CompareEvents);
+}
+
+void EventLoop::RemoveEvent(EventLoopEvent *event) {
+ auto e = std::find(events_.begin(), events_.end(), event);
+ if (e != events_.end()) {
+ events_.erase(e);
+ std::make_heap(events_.begin(), events_.end(), CompareEvents);
+ event->Invalidate();
+ }
+}
+
+EventLoopEvent *EventLoop::PopEvent() {
+ EventLoopEvent *result = events_.front();
+ std::pop_heap(events_.begin(), events_.end(), CompareEvents);
+ events_.pop_back();
+ result->Invalidate();
+ return result;
+}
+
void WatcherState::set_timing_report(timing::Watcher *watcher) {
CHECK_NOTNULL(watcher);
watcher_ = watcher;
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 16980dd..68b1b5e 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -7,6 +7,7 @@
#include "aos/configuration.h"
#include "aos/configuration_generated.h"
+#include "aos/events/event_loop_event.h"
#include "aos/events/event_loop_generated.h"
#include "aos/events/timing_statistics.h"
#include "aos/flatbuffers.h"
@@ -424,9 +425,10 @@
WatcherState *NewWatcher(std::unique_ptr<WatcherState> watcher);
std::vector<RawSender *> senders_;
+ std::vector<RawFetcher *> fetchers_;
+
std::vector<std::unique_ptr<TimerHandler>> timers_;
std::vector<std::unique_ptr<PhasedLoopHandler>> phased_loops_;
- std::vector<RawFetcher *> fetchers_;
std::vector<std::unique_ptr<WatcherState>> watchers_;
void SendTimingReport();
@@ -435,6 +437,19 @@
std::unique_ptr<RawSender> timing_report_sender_;
+ // Tracks which event sources (timers and watchers) have data, and which
+ // don't. Added events may not change their event_time().
+ // TODO(austin): Test case 1: timer triggers at t1, handler takes until after
+ // t2 to run, t2 should then be picked up without a context switch.
+ void AddEvent(EventLoopEvent *event);
+ void RemoveEvent(EventLoopEvent *event);
+ size_t EventCount() const { return events_.size(); }
+ EventLoopEvent *PopEvent();
+ EventLoopEvent *PeekEvent() { return events_.front(); }
+ void ReserveEvents();
+
+ std::vector<EventLoopEvent *> events_;
+
private:
virtual pid_t GetTid() = 0;
diff --git a/aos/events/event_loop_event.h b/aos/events/event_loop_event.h
new file mode 100644
index 0000000..6438da1
--- /dev/null
+++ b/aos/events/event_loop_event.h
@@ -0,0 +1,46 @@
+#ifndef AOS_EVENTS_EVENT_LOOP_EVENT_H
+#define AOS_EVENTS_EVENT_LOOP_EVENT_H
+
+#include "aos/time/time.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+// Common interface to track when callbacks and timers should have happened.
+class EventLoopEvent {
+ public:
+ virtual ~EventLoopEvent() {}
+
+ bool valid() const { return event_time_ != monotonic_clock::max_time; }
+ void Invalidate() { event_time_ = monotonic_clock::max_time; }
+
+ monotonic_clock::time_point event_time() const {
+ DCHECK(valid());
+ return event_time_;
+ }
+
+ virtual void HandleEvent() = 0;
+
+ void set_event_time(monotonic_clock::time_point event_time) {
+ event_time_ = event_time;
+ }
+
+ private:
+ monotonic_clock::time_point event_time_ = monotonic_clock::max_time;
+};
+
+// Adapter class to implement EventLoopEvent by calling HandleEvent on T.
+template <typename T>
+class EventHandler final : public EventLoopEvent {
+ public:
+ EventHandler(T *t) : t_(t) {}
+ ~EventHandler() = default;
+ void HandleEvent() override { t_->HandleEvent(); }
+
+ private:
+ T *const t_;
+};
+
+} // namespace aos
+
+#endif // AOS_EVENTS_EVENT_LOOP_EVENT_H
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 834ca16..cf3b6df 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -1099,5 +1099,51 @@
EXPECT_EQ(primary_report.message().fetchers()->Get(1)->count(), 10);
}
+// Tests that a raw watcher and raw fetcher can receive messages from a raw
+// sender without messing up offsets.
+TEST_P(AbstractEventLoopTest, RawBasic) {
+ auto loop1 = Make();
+ auto loop2 = MakePrimary();
+ auto loop3 = Make();
+
+ const std::string kData("971 is the best");
+
+ std::unique_ptr<aos::RawSender> sender =
+ loop1->MakeRawSender(loop1->configuration()->channels()->Get(1));
+
+ std::unique_ptr<aos::RawFetcher> fetcher =
+ loop3->MakeRawFetcher(loop3->configuration()->channels()->Get(1));
+
+ loop2->OnRun(
+ [&]() { EXPECT_TRUE(sender->Send(kData.data(), kData.size())); });
+
+ bool happened = false;
+ loop2->MakeRawWatcher(
+ loop2->configuration()->channels()->Get(1),
+ [this, &kData, &fetcher, &happened](const Context &context,
+ const void *message) {
+ happened = true;
+ EXPECT_EQ(std::string_view(kData),
+ std::string_view(reinterpret_cast<const char *>(message),
+ context.size));
+ EXPECT_EQ(std::string_view(kData),
+ std::string_view(reinterpret_cast<const char *>(context.data),
+ context.size));
+
+ ASSERT_TRUE(fetcher->Fetch());
+
+ EXPECT_EQ(std::string_view(kData),
+ std::string_view(
+ reinterpret_cast<const char *>(fetcher->context().data),
+ fetcher->context().size));
+
+ this->Exit();
+ });
+
+ EXPECT_FALSE(happened);
+ Run();
+ EXPECT_TRUE(happened);
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index b69f28a..f4cf2ec 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -90,8 +90,7 @@
// Ends the given event loop at the given time from now.
void EndEventLoop(EventLoop *loop, ::std::chrono::milliseconds duration) {
auto end_timer = loop->AddTimer([this]() { this->Exit(); });
- end_timer->Setup(loop->monotonic_now() +
- ::std::chrono::milliseconds(duration));
+ end_timer->Setup(loop->monotonic_now() + duration);
end_timer->set_name("end");
}
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index f5feea9..e549d57 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -179,8 +179,6 @@
}
bool FetchNext() {
- // TODO(austin): Write a test which starts with nothing in the queue,
- // and then calls FetchNext() after something is sent.
// TODO(austin): Get behind and make sure it dies both here and with
// Fetch.
ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
@@ -340,41 +338,41 @@
class WatcherState : public aos::WatcherState {
public:
WatcherState(
- EventLoop *event_loop, const Channel *channel,
+ ShmEventLoop *event_loop, const Channel *channel,
std::function<void(const Context &context, const void *message)> fn)
: aos::WatcherState(event_loop, channel, std::move(fn)),
+ event_loop_(event_loop),
+ event_(this),
simple_shm_fetcher_(channel) {}
- ~WatcherState() override {}
+ ~WatcherState() override { event_loop_->RemoveEvent(&event_); }
void Startup(EventLoop *event_loop) override {
- PointAtNextQueueIndex();
+ simple_shm_fetcher_.PointAtNextQueueIndex();
CHECK(RegisterWakeup(event_loop->priority()));
}
- // Points the next message to fetch at the queue index which will be populated
- // next.
- void PointAtNextQueueIndex() { simple_shm_fetcher_.PointAtNextQueueIndex(); }
-
// Returns true if there is new data available.
- bool HasNewData() {
+ bool CheckForNewData() {
if (!has_new_data_) {
has_new_data_ = simple_shm_fetcher_.FetchNext();
+
+ if (has_new_data_) {
+ event_.set_event_time(
+ simple_shm_fetcher_.context().monotonic_sent_time);
+ event_loop_->AddEvent(&event_);
+ }
}
return has_new_data_;
}
- // Returns the time of the current data sample.
- aos::monotonic_clock::time_point event_time() const {
- return simple_shm_fetcher_.context().monotonic_sent_time;
- }
-
// Consumes the data by calling the callback.
- void CallCallback() {
+ void HandleEvent() {
CHECK(has_new_data_);
DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
has_new_data_ = false;
+ CheckForNewData();
}
// Registers us to receive a signal on event reception.
@@ -387,37 +385,68 @@
private:
bool has_new_data_ = false;
+ ShmEventLoop *event_loop_;
+ EventHandler<WatcherState> event_;
SimpleShmFetcher simple_shm_fetcher_;
};
// Adapter class to adapt a timerfd to a TimerHandler.
-class TimerHandlerState : public TimerHandler {
+class TimerHandlerState final : public TimerHandler {
public:
TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
: TimerHandler(shm_event_loop, std::move(fn)),
- shm_event_loop_(shm_event_loop) {
- shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
- const uint64_t elapsed_cycles = timerfd_.Read();
-
- Call(monotonic_clock::now, base_);
-
- base_ += repeat_offset_ * elapsed_cycles;
- });
+ shm_event_loop_(shm_event_loop),
+ event_(this) {
+ shm_event_loop_->epoll_.OnReadable(
+ timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
}
- ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
+ ~TimerHandlerState() {
+ Disable();
+ shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
+ }
+
+ void HandleEvent() {
+ uint64_t elapsed_cycles = timerfd_.Read();
+ if (elapsed_cycles == 0u) {
+ // We got called before the timer interrupt could happen, but because we
+ // are checking the time, we got called on time. Push the timer out by 1
+ // cycle.
+ elapsed_cycles = 1u;
+ timerfd_.SetTime(base_ + repeat_offset_, repeat_offset_);
+ }
+
+ Call(monotonic_clock::now, base_);
+
+ base_ += repeat_offset_ * elapsed_cycles;
+
+ if (repeat_offset_ != chrono::seconds(0)) {
+ event_.set_event_time(base_);
+ shm_event_loop_->AddEvent(&event_);
+ }
+ }
void Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) override {
+ if (event_.valid()) {
+ shm_event_loop_->RemoveEvent(&event_);
+ }
+
timerfd_.SetTime(base, repeat_offset);
base_ = base;
repeat_offset_ = repeat_offset;
+ event_.set_event_time(base_);
+ shm_event_loop_->AddEvent(&event_);
}
- void Disable() override { timerfd_.Disable(); }
+ void Disable() override {
+ shm_event_loop_->RemoveEvent(&event_);
+ timerfd_.Disable();
+ }
private:
ShmEventLoop *shm_event_loop_;
+ EventHandler<TimerHandlerState> event_;
TimerFd timerfd_;
@@ -426,33 +455,52 @@
};
// Adapter class to the timerfd and PhasedLoop.
-class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
+class PhasedLoopHandler final : public ::aos::PhasedLoopHandler {
public:
PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset)
: aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
- shm_event_loop_(shm_event_loop) {
- shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
- timerfd_.Read();
- Call(monotonic_clock::now,
- [this](monotonic_clock::time_point sleep_time) {
- Schedule(sleep_time);
- });
+ shm_event_loop_(shm_event_loop),
+ event_(this) {
+ shm_event_loop_->epoll_.OnReadable(
+ timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
+ }
+
+ void HandleEvent() {
+ // The return value for read is the number of cycles that have elapsed.
+ // Because we check to see when this event *should* have happened, there are
+ // cases where Read() will return 0, when 1 cycle has actually happened.
+ // This occurs when the timer interrupt hasn't triggered yet. Therefore,
+ // ignore it. Call handles rescheduling and calculating elapsed cycles
+ // without any extra help.
+ timerfd_.Read();
+ event_.Invalidate();
+
+ Call(monotonic_clock::now, [this](monotonic_clock::time_point sleep_time) {
+ Schedule(sleep_time);
});
}
~PhasedLoopHandler() override {
shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
+ shm_event_loop_->RemoveEvent(&event_);
}
private:
// Reschedules the timer.
void Schedule(monotonic_clock::time_point sleep_time) override {
+ if (event_.valid()) {
+ shm_event_loop_->RemoveEvent(&event_);
+ }
+
timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
+ event_.set_event_time(sleep_time);
+ shm_event_loop_->AddEvent(&event_);
}
ShmEventLoop *shm_event_loop_;
+ EventHandler<PhasedLoopHandler> event_;
TimerFd timerfd_;
};
@@ -497,33 +545,23 @@
on_run_.push_back(::std::move(on_run));
}
-void ShmEventLoop::HandleWatcherSignal() {
+void ShmEventLoop::HandleEvent() {
+ // Update all the times for handlers.
+ for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
+ internal::WatcherState *watcher =
+ reinterpret_cast<internal::WatcherState *>(base_watcher.get());
+
+ watcher->CheckForNewData();
+ }
+
while (true) {
- // Call the handlers in time order of their messages.
- aos::monotonic_clock::time_point min_event_time =
- aos::monotonic_clock::max_time;
- size_t min_watcher_index = -1;
- size_t watcher_index = 0;
- for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
- internal::WatcherState *watcher =
- reinterpret_cast<internal::WatcherState *>(base_watcher.get());
-
- if (watcher->HasNewData()) {
- if (watcher->event_time() < min_event_time) {
- min_watcher_index = watcher_index;
- min_event_time = watcher->event_time();
- }
- }
- ++watcher_index;
- }
-
- if (min_event_time == aos::monotonic_clock::max_time) {
+ if (EventCount() == 0 ||
+ PeekEvent()->event_time() > monotonic_clock::now()) {
break;
}
- reinterpret_cast<internal::WatcherState *>(
- watchers_[min_watcher_index].get())
- ->CallCallback();
+ EventLoopEvent *event = PopEvent();
+ event->HandleEvent();
}
}
@@ -635,12 +673,14 @@
// watchers, and calling the oldest thing first. That will improve
// determinism a lot.
- HandleWatcherSignal();
+ HandleEvent();
});
}
MaybeScheduleTimingReports();
+ ReserveEvents();
+
// Now, all the callbacks are setup. Lock everything into memory and go RT.
if (priority_ != 0) {
::aos::InitRT();
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index 89a2721..5063186 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -77,7 +77,7 @@
friend class internal::ShmSender;
friend class internal::ShmFetcher;
- void HandleWatcherSignal();
+ void HandleEvent();
// Tracks that we can't have multiple watchers or a sender and a watcher (or
// multiple senders) on a single queue (path).
@@ -94,6 +94,7 @@
internal::EPoll epoll_;
};
+
} // namespace aos
#endif // AOS_EVENTS_SHM_EVENT_LOOP_H_
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index a9dd3e5..29c9edd 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -29,41 +29,38 @@
OveralignedChar actual_data[];
};
+class SimulatedEventLoop;
class SimulatedFetcher;
class SimulatedChannel;
-class ShmWatcher : public WatcherState {
+class SimulatedWatcher : public WatcherState {
public:
- ShmWatcher(
- EventLoop *event_loop, const Channel *channel,
- std::function<void(const Context &context, const void *message)> fn)
- : WatcherState(event_loop, channel, std::move(fn)),
- event_loop_(event_loop) {}
+ SimulatedWatcher(
+ SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
+ const Channel *channel,
+ std::function<void(const Context &context, const void *message)> fn);
- ~ShmWatcher() override;
-
- void Call(const Context &context) {
- const monotonic_clock::time_point monotonic_now =
- event_loop_->monotonic_now();
- DoCallCallback([monotonic_now]() { return monotonic_now; }, context);
- }
+ ~SimulatedWatcher() override;
void Startup(EventLoop * /*event_loop*/) override {}
- void Schedule(EventScheduler *scheduler,
- std::shared_ptr<SimulatedMessage> message) {
- // TODO(austin): Track the token once we schedule in the future.
- // TODO(austin): Schedule wakeup in the future so we don't have 0 latency.
- scheduler->Schedule(scheduler->monotonic_now(),
- [this, message]() { Call(message->context); });
- }
+ void Schedule(std::shared_ptr<SimulatedMessage> message);
+
+ void HandleEvent();
void SetSimulatedChannel(SimulatedChannel *channel) {
simulated_channel_ = channel;
}
private:
- EventLoop *event_loop_;
+ void DoSchedule(monotonic_clock::time_point event_time);
+
+ ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
+
+ SimulatedEventLoop *simulated_event_loop_;
+ EventHandler<SimulatedWatcher> event_;
+ EventScheduler *scheduler_;
+ EventScheduler::Token token_;
SimulatedChannel *simulated_channel_ = nullptr;
};
@@ -83,9 +80,9 @@
::std::unique_ptr<RawFetcher> MakeRawFetcher(EventLoop *event_loop);
// Registers a watcher for the queue.
- void MakeRawWatcher(ShmWatcher *watcher);
+ void MakeRawWatcher(SimulatedWatcher *watcher);
- void RemoveWatcher(ShmWatcher *watcher) {
+ void RemoveWatcher(SimulatedWatcher *watcher) {
watchers_.erase(std::find(watchers_.begin(), watchers_.end(), watcher));
}
@@ -113,7 +110,7 @@
const Channel *channel_;
// List of all watchers.
- ::std::vector<ShmWatcher *> watchers_;
+ ::std::vector<SimulatedWatcher *> watchers_;
// List of all fetchers.
::std::vector<SimulatedFetcher *> fetchers_;
@@ -123,8 +120,6 @@
ipc_lib::QueueIndex next_queue_index_;
};
-ShmWatcher::~ShmWatcher() { simulated_channel_->RemoveWatcher(this); }
-
namespace {
// Creates a SimulatedMessage with size bytes of storage.
@@ -195,8 +190,6 @@
};
} // namespace
-class SimulatedEventLoop;
-
class SimulatedFetcher : public RawFetcher {
public:
explicit SimulatedFetcher(EventLoop *event_loop, SimulatedChannel *queue)
@@ -215,6 +208,8 @@
std::pair<bool, monotonic_clock::time_point> DoFetch() override {
if (msgs_.size() == 0) {
+ // TODO(austin): Can we just do this logic unconditionally? It is a lot
+ // simpler. And call clear, obviously.
if (!msg_ && queue_->latest_message()) {
SetMsg(queue_->latest_message());
return std::make_pair(true, queue_->monotonic_now());
@@ -256,25 +251,22 @@
explicit SimulatedTimerHandler(EventScheduler *scheduler,
SimulatedEventLoop *simulated_event_loop,
::std::function<void()> fn);
- ~SimulatedTimerHandler() {}
+ ~SimulatedTimerHandler() { Disable(); }
void Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) override;
void HandleEvent();
- void Disable() override {
- if (token_ != scheduler_->InvalidToken()) {
- scheduler_->Deschedule(token_);
- token_ = scheduler_->InvalidToken();
- }
- }
+ void Disable() override;
::aos::monotonic_clock::time_point monotonic_now() const {
return scheduler_->monotonic_now();
}
private:
+ SimulatedEventLoop *simulated_event_loop_;
+ EventHandler<SimulatedTimerHandler> event_;
EventScheduler *scheduler_;
EventScheduler::Token token_;
@@ -289,21 +281,15 @@
::std::function<void(int)> fn,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset);
- ~SimulatedPhasedLoopHandler() {
- if (token_ != scheduler_->InvalidToken()) {
- scheduler_->Deschedule(token_);
- token_ = scheduler_->InvalidToken();
- }
- }
+ ~SimulatedPhasedLoopHandler();
- void HandleTimerWakeup();
+ void HandleEvent();
- void Schedule(monotonic_clock::time_point sleep_time) override {
- token_ = scheduler_->Schedule(sleep_time, [this]() { HandleTimerWakeup(); });
- }
+ void Schedule(monotonic_clock::time_point sleep_time) override;
private:
SimulatedEventLoop *simulated_event_loop_;
+ EventHandler<SimulatedPhasedLoopHandler> event_;
EventScheduler *scheduler_;
EventScheduler::Token token_;
@@ -351,6 +337,11 @@
}
}
+ std::chrono::nanoseconds send_delay() const { return send_delay_; }
+ void set_send_delay(std::chrono::nanoseconds send_delay) {
+ send_delay_ = send_delay;
+ }
+
::aos::monotonic_clock::time_point monotonic_now() override {
return scheduler_->monotonic_now();
}
@@ -407,6 +398,19 @@
private:
friend class SimulatedTimerHandler;
+ friend class SimulatedPhasedLoopHandler;
+ friend class SimulatedWatcher;
+
+ void HandleEvent() {
+ while (true) {
+ if (EventCount() == 0 || PeekEvent()->event_time() > monotonic_now()) {
+ break;
+ }
+
+ EventLoopEvent *event = PopEvent();
+ event->HandleEvent();
+ }
+ }
pid_t GetTid() override { return tid_; }
@@ -422,16 +426,28 @@
bool has_setup_ = false;
+ std::chrono::nanoseconds send_delay_;
+
const pid_t tid_;
};
+void SimulatedEventLoopFactory::set_send_delay(
+ std::chrono::nanoseconds send_delay) {
+ send_delay_ = send_delay;
+ for (std::pair<EventLoop *, std::function<void(bool)>> &loop :
+ raw_event_loops_) {
+ reinterpret_cast<SimulatedEventLoop *>(loop.first)
+ ->set_send_delay(send_delay_);
+ }
+}
+
void SimulatedEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &channel, const void *message)> watcher) {
ChannelIndex(channel);
Take(channel);
- std::unique_ptr<ShmWatcher> shm_watcher(
- new ShmWatcher(this, channel, std::move(watcher)));
+ std::unique_ptr<SimulatedWatcher> shm_watcher(
+ new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
NewWatcher(std::move(shm_watcher));
@@ -463,7 +479,65 @@
return it->second.get();
}
-void SimulatedChannel::MakeRawWatcher(ShmWatcher *watcher) {
+SimulatedWatcher::SimulatedWatcher(
+ SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
+ const Channel *channel,
+ std::function<void(const Context &context, const void *message)> fn)
+ : WatcherState(simulated_event_loop, channel, std::move(fn)),
+ simulated_event_loop_(simulated_event_loop),
+ event_(this),
+ scheduler_(scheduler),
+ token_(scheduler_->InvalidToken()) {}
+
+SimulatedWatcher::~SimulatedWatcher() {
+ simulated_event_loop_->RemoveEvent(&event_);
+ if (token_ != scheduler_->InvalidToken()) {
+ scheduler_->Deschedule(token_);
+ }
+ simulated_channel_->RemoveWatcher(this);
+}
+
+void SimulatedWatcher::Schedule(std::shared_ptr<SimulatedMessage> message) {
+ monotonic_clock::time_point event_time = scheduler_->monotonic_now();
+
+ // Messages are queued in order. If we are the first, add ourselves.
+ // Otherwise, don't.
+ if (msgs_.size() == 0) {
+ event_.set_event_time(message->context.monotonic_sent_time);
+ simulated_event_loop_->AddEvent(&event_);
+
+ DoSchedule(event_time);
+ }
+
+ msgs_.emplace_back(message);
+}
+
+void SimulatedWatcher::HandleEvent() {
+ CHECK_NE(msgs_.size(), 0u) << ": No events to handle.";
+
+ const monotonic_clock::time_point monotonic_now =
+ simulated_event_loop_->monotonic_now();
+ DoCallCallback([monotonic_now]() { return monotonic_now; },
+ msgs_.front()->context);
+
+ msgs_.pop_front();
+ if (msgs_.size() != 0) {
+ event_.set_event_time(msgs_.front()->context.monotonic_sent_time);
+ simulated_event_loop_->AddEvent(&event_);
+
+ DoSchedule(event_.event_time());
+ } else {
+ token_ = scheduler_->InvalidToken();
+ }
+}
+
+void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
+ token_ =
+ scheduler_->Schedule(event_time + simulated_event_loop_->send_delay(),
+ [this]() { simulated_event_loop_->HandleEvent(); });
+}
+
+void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
watcher->SetSimulatedChannel(this);
watchers_.emplace_back(watcher);
}
@@ -489,8 +563,8 @@
latest_message_ = message;
if (scheduler_->is_running()) {
- for (ShmWatcher *watcher : watchers_) {
- watcher->Schedule(scheduler_, message);
+ for (SimulatedWatcher *watcher : watchers_) {
+ watcher->Schedule(message);
}
}
for (auto &fetcher : fetchers_) {
@@ -510,6 +584,8 @@
EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
::std::function<void()> fn)
: TimerHandler(simulated_event_loop, std::move(fn)),
+ simulated_event_loop_(simulated_event_loop),
+ event_(this),
scheduler_(scheduler),
token_(scheduler_->InvalidToken()) {}
@@ -521,10 +597,14 @@
base_ = base;
repeat_offset_ = repeat_offset;
if (base < monotonic_now) {
- token_ = scheduler_->Schedule(monotonic_now, [this]() { HandleEvent(); });
+ token_ = scheduler_->Schedule(
+ monotonic_now, [this]() { simulated_event_loop_->HandleEvent(); });
} else {
- token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
+ token_ = scheduler_->Schedule(
+ base, [this]() { simulated_event_loop_->HandleEvent(); });
}
+ event_.set_event_time(base_);
+ simulated_event_loop_->AddEvent(&event_);
}
void SimulatedTimerHandler::HandleEvent() {
@@ -533,7 +613,10 @@
if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
// Reschedule.
while (base_ <= monotonic_now) base_ += repeat_offset_;
- token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
+ token_ = scheduler_->Schedule(
+ base_, [this]() { simulated_event_loop_->HandleEvent(); });
+ event_.set_event_time(base_);
+ simulated_event_loop_->AddEvent(&event_);
} else {
token_ = scheduler_->InvalidToken();
}
@@ -541,16 +624,33 @@
Call([monotonic_now]() { return monotonic_now; }, monotonic_now);
}
+void SimulatedTimerHandler::Disable() {
+ simulated_event_loop_->RemoveEvent(&event_);
+ if (token_ != scheduler_->InvalidToken()) {
+ scheduler_->Deschedule(token_);
+ token_ = scheduler_->InvalidToken();
+ }
+}
+
SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
::std::function<void(int)> fn, const monotonic_clock::duration interval,
const monotonic_clock::duration offset)
: PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
simulated_event_loop_(simulated_event_loop),
+ event_(this),
scheduler_(scheduler),
token_(scheduler_->InvalidToken()) {}
-void SimulatedPhasedLoopHandler::HandleTimerWakeup() {
+SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
+ if (token_ != scheduler_->InvalidToken()) {
+ scheduler_->Deschedule(token_);
+ token_ = scheduler_->InvalidToken();
+ }
+ simulated_event_loop_->RemoveEvent(&event_);
+}
+
+void SimulatedPhasedLoopHandler::HandleEvent() {
monotonic_clock::time_point monotonic_now =
simulated_event_loop_->monotonic_now();
Call(
@@ -558,6 +658,14 @@
[this](monotonic_clock::time_point sleep_time) { Schedule(sleep_time); });
}
+void SimulatedPhasedLoopHandler::Schedule(
+ monotonic_clock::time_point sleep_time) {
+ token_ = scheduler_->Schedule(
+ sleep_time, [this]() { simulated_event_loop_->HandleEvent(); });
+ event_.set_event_time(sleep_time);
+ simulated_event_loop_->AddEvent(&event_);
+}
+
void SimulatedEventLoop::Take(const Channel *channel) {
CHECK(!is_running()) << ": Cannot add new objects while running.";
@@ -575,10 +683,11 @@
std::string_view name) {
pid_t tid = tid_;
++tid_;
- ::std::unique_ptr<EventLoop> result(new SimulatedEventLoop(
+ ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
&scheduler_, &channels_, configuration_, &raw_event_loops_, tid));
result->set_name(name);
- return result;
+ result->set_send_delay(send_delay_);
+ return std::move(result);
}
void SimulatedEventLoopFactory::RunFor(monotonic_clock::duration duration) {
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 79f1e46..7b2b9c7 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -65,6 +65,9 @@
// loop handler.
void Exit() { scheduler_.Exit(); }
+ // Sets the simulated send delay for the factory.
+ void set_send_delay(std::chrono::nanoseconds send_delay);
+
monotonic_clock::time_point monotonic_now() const {
return scheduler_.monotonic_now();
}
@@ -81,6 +84,8 @@
std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
raw_event_loops_;
+ std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+
pid_t tid_ = 0;
};
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 16a1923..44be581 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -3,6 +3,7 @@
#include <string_view>
#include "aos/events/event_loop_param_test.h"
+#include "aos/events/test_message_generated.h"
#include "gtest/gtest.h"
namespace aos {
@@ -28,6 +29,10 @@
// I'm not sure how much that matters.
void SleepFor(::std::chrono::nanoseconds /*duration*/) override {}
+ void set_send_delay(std::chrono::nanoseconds send_delay) {
+ event_loop_factory_.set_send_delay(send_delay);
+ }
+
private:
SimulatedEventLoopFactory event_loop_factory_;
};
@@ -114,5 +119,106 @@
EXPECT_EQ(counter, 10);
}
+// Tests that watchers have latency in simulation.
+TEST(SimulatedEventLoopTest, WatcherTimingReport) {
+ SimulatedEventLoopTestFactory factory;
+ factory.set_send_delay(std::chrono::microseconds(50));
+
+ FLAGS_timing_report_ms = 1000;
+ auto loop1 = factory.MakePrimary("primary");
+ loop1->MakeWatcher("/test", [](const TestMessage &) {});
+
+ auto loop2 = factory.Make("sender_loop");
+
+ auto loop3 = factory.Make("report_fetcher");
+
+ Fetcher<timing::Report> report_fetcher =
+ loop3->MakeFetcher<timing::Report>("/aos");
+ EXPECT_FALSE(report_fetcher.Fetch());
+
+ auto sender = loop2->MakeSender<TestMessage>("/test");
+
+ // Send 10 messages in the middle of a timing report period so we get
+ // something interesting back.
+ auto test_timer = loop2->AddTimer([&sender]() {
+ for (int i = 0; i < 10; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200 + i);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ }
+ });
+
+ // Quit after 1 timing report, mid way through the next cycle.
+ {
+ auto end_timer = loop1->AddTimer([&factory]() { factory.Exit(); });
+ end_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(2500));
+ end_timer->set_name("end");
+ }
+
+ loop1->OnRun([&test_timer, &loop1]() {
+ test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
+ });
+
+ factory.Run();
+
+ // And, since we are here, check that the timing report makes sense.
+ // Start by looking for our event loop's timing.
+ FlatbufferDetachedBuffer<timing::Report> primary_report =
+ FlatbufferDetachedBuffer<timing::Report>::Empty();
+ while (report_fetcher.FetchNext()) {
+ LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
+ if (report_fetcher->name()->string_view() == "primary") {
+ primary_report = CopyFlatBuffer(report_fetcher.get());
+ }
+ }
+
+ // Check the watcher report.
+ VLOG(1) << FlatbufferToJson(primary_report, true);
+
+ EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
+
+ // Just the timing report timer.
+ ASSERT_NE(primary_report.message().timers(), nullptr);
+ EXPECT_EQ(primary_report.message().timers()->size(), 2);
+
+ // No phased loops
+ ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
+
+ // And now confirm that the watcher received all 10 messages, and has latency.
+ ASSERT_NE(primary_report.message().watchers(), nullptr);
+ ASSERT_EQ(primary_report.message().watchers()->size(), 1);
+ EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
+ EXPECT_NEAR(
+ primary_report.message().watchers()->Get(0)->wakeup_latency()->average(),
+ 0.00005, 1e-9);
+ EXPECT_NEAR(
+ primary_report.message().watchers()->Get(0)->wakeup_latency()->min(),
+ 0.00005, 1e-9);
+ EXPECT_NEAR(
+ primary_report.message().watchers()->Get(0)->wakeup_latency()->max(),
+ 0.00005, 1e-9);
+ EXPECT_EQ(primary_report.message()
+ .watchers()
+ ->Get(0)
+ ->wakeup_latency()
+ ->standard_deviation(),
+ 0.0);
+
+ EXPECT_EQ(
+ primary_report.message().watchers()->Get(0)->handler_time()->average(),
+ 0.0);
+ EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->min(),
+ 0.0);
+ EXPECT_EQ(primary_report.message().watchers()->Get(0)->handler_time()->max(),
+ 0.0);
+ EXPECT_EQ(primary_report.message()
+ .watchers()
+ ->Get(0)
+ ->handler_time()
+ ->standard_deviation(),
+ 0.0);
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index a4539f8..0c87e4d 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -549,7 +549,10 @@
void LocklessQueue::Sender::Send(const char *data, size_t length) {
CHECK_LE(length, size());
- memcpy(Data(), data, length);
+ // Flatbuffers write from the back of the buffer to the front. If we are
+ // going to write an explicit chunk of memory into the buffer, we need to
+ // adhere to this convention and place it at the end.
+ memcpy((reinterpret_cast<char *>(Data()) + size() - length), data, length);
Send(length);
}
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index 6b3f7c5..e3d6c5e 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -595,8 +595,10 @@
break;
}
- EXPECT_GT(read_data[6], last_data) << ": Got " << read_data;
- last_data = read_data[6];
+ EXPECT_GT(read_data[queue.message_data_size() - length + 6],
+ last_data)
+ << ": Got " << read_data;
+ last_data = read_data[queue.message_data_size() - length + 6];
++i;
}
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 7f59e96..350350c 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -282,7 +282,8 @@
ThreadPlusCount tpc;
ASSERT_EQ(length, sizeof(ThreadPlusCount));
- memcpy(&tpc, read_data, sizeof(ThreadPlusCount));
+ memcpy(&tpc, read_data + queue.message_data_size() - length,
+ sizeof(ThreadPlusCount));
if (will_wrap) {
// The queue won't chang out from under us, so we should get some amount
diff --git a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
index b2a533e..9fd79fc 100644
--- a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
+++ b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
@@ -53,6 +53,8 @@
drivetrain_(dt_config_, drivetrain_event_loop_.get(), &localizer_),
drivetrain_plant_event_loop_(MakeEventLoop("drivetrain_plant")),
drivetrain_plant_(drivetrain_plant_event_loop_.get(), dt_config_) {
+ // Too many tests care...
+ set_send_delay(chrono::nanoseconds(0));
set_battery_voltage(12.0);
}
virtual ~DrivetrainTest() {}
diff --git a/y2016/control_loops/shooter/shooter_lib_test.cc b/y2016/control_loops/shooter/shooter_lib_test.cc
index 6b3e2d6..c20526c 100644
--- a/y2016/control_loops/shooter/shooter_lib_test.cc
+++ b/y2016/control_loops/shooter/shooter_lib_test.cc
@@ -178,7 +178,7 @@
EXPECT_TRUE(builder.Send(goal_builder.Finish()));
}
- RunFor(dt());
+ RunFor(dt() * 3 / 2);
VerifyNearGoal();
diff --git a/y2019/control_loops/superstructure/superstructure_lib_test.cc b/y2019/control_loops/superstructure/superstructure_lib_test.cc
index f0e1a70..58bac76 100644
--- a/y2019/control_loops/superstructure/superstructure_lib_test.cc
+++ b/y2019/control_loops/superstructure/superstructure_lib_test.cc
@@ -445,7 +445,11 @@
superstructure_status_fetcher_.Fetch();
// 2 Seconds
ASSERT_LE(i, 2 * 1.0 / .00505);
- } while (!superstructure_status_fetcher_.get()->zeroed());
+
+ // Since there is a delay when sending running, make sure we have a status
+ // before checking it.
+ } while (superstructure_status_fetcher_.get() == nullptr ||
+ !superstructure_status_fetcher_.get()->zeroed());
}
::std::unique_ptr<::aos::EventLoop> test_event_loop_;