Add test and fix watcher startup behavior.
The desired behavior for a watcher is that it will trigger for all
messages as they come in *after* the watcher is created. We don't want
to get messages from before we are constructed, and don't want to miss
any.
Fix the behavior and add tests to confirm.
Change-Id: I81717078f18835d7d15f6e5461f2d66ac58bcfb7
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index c9ff6fd..f55939c 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -97,24 +97,25 @@
std::function<void(const aos::Message *message)> watcher)
: thread_state_(std::move(thread_state)),
queue_(queue),
- watcher_(std::move(watcher)) {}
+ index_(0),
+ watcher_(std::move(watcher)) {
+ static constexpr Options<RawQueue> kOptions =
+ RawQueue::kFromEnd | RawQueue::kNonBlock;
+ const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
+ if (msg) {
+ queue_->FreeMessage(msg);
+ }
+ }
void Run() {
thread_state_->WaitForStart();
if (!thread_state_->is_running()) return;
- int32_t index = 0;
-
- static constexpr Options<RawQueue> kOptions =
- RawQueue::kFromEnd | RawQueue::kNonBlock;
- const void *msg = queue_->ReadMessageIndex(kOptions, &index);
-
+ const void *msg = nullptr;
while (true) {
- if (msg == nullptr) {
- msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index);
- assert(msg != nullptr);
- }
+ msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_);
+ assert(msg != nullptr);
{
MutexLocker locker(&thread_state_->mutex_);
@@ -125,7 +126,6 @@
if (!thread_state_->is_running()) break;
}
queue_->FreeMessage(msg);
- msg = nullptr;
}
queue_->FreeMessage(msg);
@@ -134,6 +134,8 @@
private:
std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
RawQueue *queue_;
+ int32_t index_;
+
std::function<void(const Message *message)> watcher_;
};