Make Fetch, FetchNext, and watchers see messages at the same time.
We had a bug where a no-arg watcher would prod a Fetch to happen,
expecting it to return the message just delivered. It wasn't finding a
message at that point in time, so the state machine wasn't progressing.
This was happening because watchers use Read() with the next queue
index to see if a message has arrived, and Fetch() calls LatestIndex()
to get the newest message. There is a very tiny window between when the
message is put in the message pointer queue in shared memory, and when
the next pointer is updated to document that. Watchers also look for
messages any time any event happens because it is cheap and you don't
want to go backwards, so nothing else was preventing the watcher from
racing with the sender.
There are 2 potential answers here.
1: Check in with LatestIndex() and use the min
2: Repair the next message pointer if it is behind inside
LatestIndex()
Longer term, we want to move timestamping after the publish compare +
exchange. That means we will need to drive even more things off the
publish compare + exchange which makes the message visible. So, 2 sets
us up better to complete that.
To test this, we have code in the queue death tests which produce a
snapshot of memory after each write into memory in the queue. Use that
to then trigger both a read and LatestIndex() after each write and
confirm they agree deterministically.
Change-Id: If63bc7cab1521a5a6dad5431961871c25aecaf9c
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 471b75d..d67616f 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -233,6 +233,22 @@
],
)
+cc_library(
+ name = "lockless_queue_stepping",
+ testonly = True,
+ srcs = [
+ "lockless_queue_stepping.cc",
+ ],
+ hdrs = ["lockless_queue_stepping.h"],
+ deps = [
+ ":lockless_queue",
+ ":shm_observers",
+ "//aos/libc:aos_strsignal",
+ "//aos/testing:prevent_exit",
+ "@com_google_googletest//:gtest",
+ ],
+)
+
cc_test(
name = "lockless_queue_test",
timeout = "eternal",
@@ -242,11 +258,11 @@
deps = [
":event",
":lockless_queue",
+ ":lockless_queue_stepping",
":queue_racer",
":signalfd",
"//aos/events:epoll",
"//aos/testing:googletest",
- "//aos/testing:prevent_exit",
"//aos/util:phased_loop",
],
)
@@ -258,6 +274,7 @@
deps = [
":event",
":lockless_queue",
+ ":lockless_queue_stepping",
":queue_racer",
":shm_observers",
":signalfd",