Make Fetch, FetchNext, and watchers see messages at the same time.
We had a bug where a no-arg watcher would prod a Fetch to happen,
expecting it to return the message just delivered. It wasn't finding a
message at that point in time, so the state machine wasn't progressing.
This was happening because watchers use Read() with the next queue
index to see if a message has arrived, and Fetch() calls LatestIndex()
to get the newest message. There is a very tiny window between when the
message is put in the message pointer queue in shared memory, and when
the next pointer is updated to document that. Watchers also look for
messages any time any event happens because it is cheap and you don't
want to go backwards, so nothing else was preventing the watcher from
racing with the sender.
There are 2 potential answers here.
1: Check in with LatestIndex() and use the min
2: Repair the next message pointer if it is behind inside
LatestIndex()
Longer term, we want to move timestamping after the publish compare +
exchange. That means we will need to drive even more things off the
publish compare + exchange which makes the message visible. So, 2 sets
us up better to complete that.
To test this, we have code in the queue death tests which produce a
snapshot of memory after each write into memory in the queue. Use that
to then trigger both a read and LatestIndex() after each write and
confirm they agree deterministically.
Change-Id: If63bc7cab1521a5a6dad5431961871c25aecaf9c
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 471b75d..d67616f 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -233,6 +233,22 @@
],
)
+cc_library(
+ name = "lockless_queue_stepping",
+ testonly = True,
+ srcs = [
+ "lockless_queue_stepping.cc",
+ ],
+ hdrs = ["lockless_queue_stepping.h"],
+ deps = [
+ ":lockless_queue",
+ ":shm_observers",
+ "//aos/libc:aos_strsignal",
+ "//aos/testing:prevent_exit",
+ "@com_google_googletest//:gtest",
+ ],
+)
+
cc_test(
name = "lockless_queue_test",
timeout = "eternal",
@@ -242,11 +258,11 @@
deps = [
":event",
":lockless_queue",
+ ":lockless_queue_stepping",
":queue_racer",
":signalfd",
"//aos/events:epoll",
"//aos/testing:googletest",
- "//aos/testing:prevent_exit",
"//aos/util:phased_loop",
],
)
@@ -258,6 +274,7 @@
deps = [
":event",
":lockless_queue",
+ ":lockless_queue_stepping",
":queue_racer",
":shm_observers",
":signalfd",
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index a1b0211..fd3a305 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -1283,19 +1283,20 @@
monotonic_clock::time_point *monotonic_remote_time,
realtime_clock::time_point *realtime_remote_time,
uint32_t *remote_queue_index, UUID *source_boot_uuid, size_t *length,
- char *data, std::function<bool(const Context &)> should_read) const {
- const size_t queue_size = memory_->queue_size();
+ char *data,
+ std::function<bool(const Context &)> should_read_callback) const {
+ const size_t queue_size = const_memory_->queue_size();
// Build up the QueueIndex.
const QueueIndex queue_index =
QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
// Read the message stored at the requested location.
- Index mi = memory_->LoadIndex(queue_index);
- const Message *m = memory_->GetMessage(mi);
+ Index mi = const_memory_->LoadIndex(queue_index);
+ const Message *m = const_memory_->GetMessage(mi);
while (true) {
- DCHECK(!CheckBothRedzones(memory_, m))
+ DCHECK(!CheckBothRedzones(const_memory_, m))
<< ": Invalid message found in shared memory";
// We need to confirm that the data doesn't change while we are reading it.
// Do that by first confirming that the message points to the queue index we
@@ -1313,7 +1314,7 @@
// Someone has re-used this message between when we pulled it out of the
// queue and when we grabbed its index. It is pretty hard to deduce
// what happened. Just try again.
- const Message *const new_m = memory_->GetMessage(queue_index);
+ const Message *const new_m = const_memory_->GetMessage(queue_index);
if (m != new_m) {
m = new_m;
VLOG(3) << "Retrying, m doesn't match";
@@ -1342,7 +1343,7 @@
// then they need to wait. They got ahead. Otherwise, they are
// asking for something crazy, like something before the beginning of
// the queue. Tell them that they are behind.
- if (uint32_queue_index < memory_->queue_size()) {
+ if (uint32_queue_index < const_memory_->queue_size()) {
VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
return Result::NOTHING_NEW;
} else {
@@ -1357,37 +1358,24 @@
// Then read the data out. Copy it all out to be deterministic and so we can
// make length be from either end.
- if (!should_read) {
- *monotonic_sent_time = m->header.monotonic_sent_time;
- *realtime_sent_time = m->header.realtime_sent_time;
- if (m->header.remote_queue_index == 0xffffffffu) {
- *remote_queue_index = queue_index.index();
- } else {
- *remote_queue_index = m->header.remote_queue_index;
- }
- *monotonic_remote_time = m->header.monotonic_remote_time;
- *realtime_remote_time = m->header.realtime_remote_time;
- *source_boot_uuid = m->header.source_boot_uuid;
- *length = m->header.length;
+ Context context;
+ context.monotonic_event_time = m->header.monotonic_sent_time;
+ context.realtime_event_time = m->header.realtime_sent_time;
+ context.monotonic_remote_time = m->header.monotonic_remote_time;
+ context.realtime_remote_time = m->header.realtime_remote_time;
+ context.queue_index = queue_index.index();
+ if (m->header.remote_queue_index == 0xffffffffu) {
+ context.remote_queue_index = context.queue_index;
} else {
- // Cache the header results so we don't modify the outputs unless the filter
- // function says "go".
- Context context;
- context.monotonic_event_time = m->header.monotonic_sent_time;
- context.realtime_event_time = m->header.realtime_sent_time;
- context.monotonic_remote_time = m->header.monotonic_remote_time;
- context.realtime_remote_time = m->header.realtime_remote_time;
- context.queue_index = queue_index.index();
- if (m->header.remote_queue_index == 0xffffffffu) {
- context.remote_queue_index = context.queue_index;
- } else {
- context.remote_queue_index = m->header.remote_queue_index;
- }
- context.source_boot_uuid = m->header.source_boot_uuid;
- context.size = m->header.length;
- context.data = nullptr;
- context.buffer_index = -1;
+ context.remote_queue_index = m->header.remote_queue_index;
+ }
+ context.source_boot_uuid = m->header.source_boot_uuid;
+ context.size = m->header.length;
+ context.data = nullptr;
+ context.buffer_index = -1;
+ // If the callback is provided, use it.
+ if (should_read_callback) {
// And finally, confirm that the message *still* points to the queue index
// we want. This means it didn't change out from under us. If something
// changed out from under us, we were reading it much too late in its
@@ -1404,24 +1392,22 @@
// We now know that the context is safe to use. See if we are supposed to
// take the message or not.
- if (!should_read(context)) {
+ if (!should_read_callback(context)) {
return Result::FILTERED;
}
-
- // And now take it.
- *monotonic_sent_time = context.monotonic_event_time;
- *realtime_sent_time = context.realtime_event_time;
- *remote_queue_index = context.remote_queue_index;
- *monotonic_remote_time = context.monotonic_remote_time;
- *realtime_remote_time = context.realtime_remote_time;
- *source_boot_uuid = context.source_boot_uuid;
- *length = context.size;
}
- if (data) {
- memcpy(data, m->data(memory_->message_data_size()),
- memory_->message_data_size());
- // Check again since we touched the message again.
+ // Read the data if requested.
+ if (data) {
+ memcpy(data, m->data(const_memory_->message_data_size()),
+ const_memory_->message_data_size());
+ }
+
+ // Now, we need to confirm that nothing has changed by re-reading the queue
+ // index from the header since we've read all the body. We only need to do it
+ // if we have read anything new after the previous check up above, which
+ // happens if we read the data, or if we didn't check for the filtered case.
+ if (data || !should_read_callback) {
aos_compiler_memory_barrier();
const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
if (final_queue_index != queue_index) {
@@ -1433,18 +1419,75 @@
}
}
+ // And now take it and make it visible to the user. By doing it here, we will
+ // never present partial or corrupted state to the user in the output
+ // pointers.
+ *monotonic_sent_time = context.monotonic_event_time;
+ *realtime_sent_time = context.realtime_event_time;
+ *remote_queue_index = context.remote_queue_index;
+ *monotonic_remote_time = context.monotonic_remote_time;
+ *realtime_remote_time = context.realtime_remote_time;
+ *source_boot_uuid = context.source_boot_uuid;
+ *length = context.size;
+
return Result::GOOD;
}
QueueIndex LocklessQueueReader::LatestIndex() const {
- const size_t queue_size = memory_->queue_size();
+ const size_t queue_size = const_memory_->queue_size();
- // There is only one interesting case. We need to know if the queue is empty.
- // That is done with a sentinel value. At worst, this will be off by one.
- const QueueIndex next_queue_index =
- memory_->next_queue_index.Load(queue_size);
- if (next_queue_index.valid()) {
- const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
+ // There are 2 main cases. Either the next queue index is right, or it is
+ // behind by 1 and wrong. If nothing has been published, the next queue index
+ // will be the reserved "Invalid" value, otherwise it will point to the next
+ // place to write. We need to figure out if it is right or wrong, and it if
+ // is wrong, fix it. If we don't, Read() can find the next message before
+ // LatestIndex() sees it if someone is hammering on Read() until it returns
+ // nothing new is left, which mean watchers and fetchers may disagree on when
+ // a message is published.
+ QueueIndex actual_next_queue_index =
+ const_memory_->next_queue_index.Load(queue_size);
+
+ // Handle the "nothing has been published" case by making next_queue_index
+ // point to the 0th index.
+ const QueueIndex next_queue_index = ZeroOrValid(actual_next_queue_index);
+
+ // This needs to synchronize with whoever the previous writer at this
+ // location was. Read what is there to see if the message has been published
+ // and next_queue_index is just behind.
+ Index to_replace = const_memory_->LoadIndex(next_queue_index);
+
+ // See if next_queue_index is consistent with the state of the queue. If it
+ // is not, try to atomically update next_queue_index in case the previous
+ // writer failed and retry.
+ if (to_replace.IsPlausible(next_queue_index)) {
+ // If next_queue_index ends up pointing to a message with a matching index,
+ // this is what next_queue_index needs to be updated to
+ const QueueIndex incremented_queue_index = next_queue_index.Increment();
+
+ // We don't care about the result. It will either succeed, or we got
+ // beat in fixing it. The way the Send logic works, the pointer can never
+ // get more than 1 behind or the next send will repair it. So, if we fail,
+ // that means that someone else got there first and fixed it up (and
+ // potentially someone further continued to send).
+ //
+ // Both require no further action from us. Worst case, our Next pointer
+ // will not be the latest message, but there will always be a point after
+ // which the index can change. We just need a consistent snapshot where
+ // there is nothing in the queue that isn't accounted for by
+ // next_queue_index.
+ memory_->next_queue_index.CompareAndExchangeStrong(actual_next_queue_index,
+ incremented_queue_index);
+
+ VLOG(3) << "next_queue_index is lagging, fixed it. Found " << std::hex
+ << to_replace.get() << ", expected "
+ << next_queue_index.DecrementBy(queue_size).index();
+
+ actual_next_queue_index = incremented_queue_index;
+ }
+
+ if (actual_next_queue_index.valid()) {
+ const QueueIndex current_queue_index =
+ actual_next_queue_index.DecrementBy(1u);
return current_queue_index;
}
return QueueIndex::Invalid();
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 2cafb48..01f2b7c 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -414,14 +414,15 @@
GOOD,
// The message is in the future and we haven't written it yet.
NOTHING_NEW,
- // There is a message, but should_read() returned false so we didn't fetch
- // it.
+ // There is a message, but should_read_callback() returned false so we
+ // didn't fetch it.
FILTERED,
// The message got overwritten while we were reading it.
OVERWROTE,
};
- LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
+ LocklessQueueReader(LocklessQueue queue)
+ : memory_(queue.memory()), const_memory_(queue.const_memory()) {
queue.Initialize();
}
@@ -433,14 +434,14 @@
// message, but the filter function returned false, return FILTERED.
//
// data may be nullptr to indicate the data should not be copied.
- Result Read(uint32_t queue_index,
- monotonic_clock::time_point *monotonic_sent_time,
- realtime_clock::time_point *realtime_sent_time,
- monotonic_clock::time_point *monotonic_remote_time,
- realtime_clock::time_point *realtime_remote_time,
- uint32_t *remote_queue_index, UUID *source_boot_uuid,
- size_t *length, char *data,
- std::function<bool(const Context &context)> should_read) const;
+ Result Read(
+ uint32_t queue_index, monotonic_clock::time_point *monotonic_sent_time,
+ realtime_clock::time_point *realtime_sent_time,
+ monotonic_clock::time_point *monotonic_remote_time,
+ realtime_clock::time_point *realtime_remote_time,
+ uint32_t *remote_queue_index, UUID *source_boot_uuid, size_t *length,
+ char *data,
+ std::function<bool(const Context &context)> should_read_callback) const;
// Returns the index to the latest queue message. Returns empty_queue_index()
// if there are no messages in the queue. Do note that this index wraps if
@@ -448,7 +449,8 @@
QueueIndex LatestIndex() const;
private:
- const LocklessQueueMemory *const memory_;
+ LocklessQueueMemory *const memory_;
+ const LocklessQueueMemory *const_memory_;
};
// Returns the number of messages which are logically in the queue at a time.
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index c37dc63..2a95b31 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -22,10 +22,9 @@
#include "aos/ipc_lib/aos_sync.h"
#include "aos/ipc_lib/lockless_queue.h"
#include "aos/ipc_lib/lockless_queue_memory.h"
+#include "aos/ipc_lib/lockless_queue_stepping.h"
#include "aos/ipc_lib/shm_observers.h"
-#include "aos/libc/aos_strsignal.h"
#include "aos/realtime.h"
-#include "aos/testing/prevent_exit.h"
#include "aos/testing/test_logging.h"
namespace aos {
@@ -34,529 +33,9 @@
namespace chrono = ::std::chrono;
-#if ((defined(AOS_SANITIZER_address) || defined(AOS_SANITIZER_thread)) && \
- !defined(__clang__) && __GNUC__ <= 4 && __GNUC_MINOR__ <= 8) || \
- defined(__ARM_EABI__)
-// There are various reasons why we might not actually be able to do this
-// testing, but we still want to run the functions to test anything they test
-// other than shm robustness.
-//
-// GCC 4.8 has too old of a version of asan to allow SIGSEGV handler
-// chaining.
-//
-// GCC 4.8's tsan doesn't work with the code for calling the real sigaction to
-// work arounds its weirdness of the SIGTRAP handler.
-//
-// ARM doesn't have a nice way to do single-stepping, and I don't feel like
-// dealing with editing the next instruction to be a breakpoint and then
-// changing it back.
-#else
-
-// This code currently supports amd64 only, but it
-// shouldn't be hard to port to i386 (it should just be using the name for
-// only the smaller part of the flags register), but that's not tested, and
-// porting it to any other architecture is more work.
-// Currently, we skip doing anything exciting on arm (just run the code without
-// any robustness testing) and refuse to compile anywhere else.
-
-#define SIMPLE_ASSERT(condition, message) \
- do { \
- if (!(condition)) { \
- static const char kMessage[] = message "\n"; \
- if (write(STDERR_FILENO, kMessage, sizeof(kMessage) - 1) != \
- (sizeof(kMessage) - 1)) { \
- static const char kFailureMessage[] = "writing failed\n"; \
- __attribute__((unused)) int ignore = write( \
- STDERR_FILENO, kFailureMessage, sizeof(kFailureMessage) - 1); \
- } \
- abort(); \
- } \
- } while (false)
-
-// Array to track writes to memory, and make sure they happen in the right
-// order.
-class WritesArray {
- public:
- uintptr_t At(size_t location) const {
- SIMPLE_ASSERT(location < size_, "too far into writes array");
- return writes_[location];
- }
- void Add(uintptr_t pointer) {
- SIMPLE_ASSERT(size_ < kSize, "too many writes");
- writes_[size_++] = pointer;
- }
-
- size_t size() const { return size_; }
-
- private:
- static const size_t kSize = 20000;
-
- uintptr_t writes_[kSize];
- size_t size_ = 0;
-};
-
-enum class DieAtState {
- // No SEGVs should be happening.
- kDisabled,
- // SEGVs are fine. Normal operation.
- kRunning,
- // We are manipulating a mutex. No SEGVs should be happening.
- kWriting,
-};
-
-// What we exit with when we're exiting in the middle.
-const int kExitEarlyValue = 123;
-
-// We have to keep track of everything in a global variable because there's no
-// other way for the signal handlers to find it.
-struct GlobalState {
- // Pointer to the queue memory, and its size.
- void *lockless_queue_memory;
- size_t lockless_queue_memory_size;
-
- // Pointer to a second block of memory the same size. This (on purpose) has
- // the same size as lockless_queue_memory so we can point the robust mutexes
- // here.
- void *lockless_queue_memory_lock_backup;
-
- // Expected writes.
- const WritesArray *writes_in;
- // Actual writes.
- WritesArray *writes_out;
- // Location to die at, and how far we have gotten.
- size_t die_at, current_location;
- // State.
- DieAtState state;
-};
-::std::atomic<GlobalState *> global_state;
-
-// Returns true if the address is in the queue memory chunk.
-bool IsInLocklessQueueMemory(void *address) {
- GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
- void *lockless_queue_memory = my_global_state->lockless_queue_memory;
- if (address < lockless_queue_memory) {
- return false;
- if (reinterpret_cast<uintptr_t>(address) >
- reinterpret_cast<uintptr_t>(lockless_queue_memory) +
- my_global_state->lockless_queue_memory_size)
- return false;
- }
- return true;
-}
-
-// Calls mprotect(2) for the entire shared memory region with the given prot.
-void ShmProtectOrDie(int prot) {
- GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
- PCHECK(mprotect(my_global_state->lockless_queue_memory,
- my_global_state->lockless_queue_memory_size, prot) != -1)
- << ": mprotect(" << my_global_state->lockless_queue_memory << ", "
- << my_global_state->lockless_queue_memory_size << ", 0x" << std::hex
- << prot << ") failed";
-}
-
-// Checks a write into the queue and conditionally dies. Tracks the write.
-void HandleWrite(void *address) {
- GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
- uintptr_t address_offset =
- reinterpret_cast<uintptr_t>(address) -
- reinterpret_cast<uintptr_t>(my_global_state->lockless_queue_memory);
- if (my_global_state->writes_in != nullptr) {
- SIMPLE_ASSERT(my_global_state->writes_in->At(
- my_global_state->current_location) == address_offset,
- "wrong write order");
- }
- if (my_global_state->writes_out != nullptr) {
- my_global_state->writes_out->Add(address_offset);
- }
- if (my_global_state->die_at != 0) {
- if (my_global_state->die_at == my_global_state->current_location) {
- _exit(kExitEarlyValue);
- }
- }
- ++my_global_state->current_location;
-}
-
-struct sigaction old_segv_handler, old_trap_handler;
-
-// Calls the original signal handler.
-bool CallChainedAction(const struct sigaction &action, int signal,
- siginfo_t *siginfo, void *context) {
- if (action.sa_handler == SIG_IGN || action.sa_handler == SIG_DFL) {
- return false;
- }
- if (action.sa_flags & SA_SIGINFO) {
- action.sa_sigaction(signal, siginfo, context);
- } else {
- action.sa_handler(signal);
- }
- return true;
-}
-
-void segv_handler(int signal, siginfo_t *siginfo, void *context_void) {
- GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
- const int saved_errno = errno;
- SIMPLE_ASSERT(signal == SIGSEGV, "wrong signal for SIGSEGV handler");
-
- // Only process memory addresses in our shared memory block.
- if (!IsInLocklessQueueMemory(siginfo->si_addr)) {
- if (CallChainedAction(old_segv_handler, signal, siginfo, context_void)) {
- errno = saved_errno;
- return;
- } else {
- SIMPLE_ASSERT(false, "actual SIGSEGV");
- }
- }
- SIMPLE_ASSERT(my_global_state->state == DieAtState::kRunning,
- "bad state for SIGSEGV");
-
- HandleWrite(siginfo->si_addr);
-
- ShmProtectOrDie(PROT_READ | PROT_WRITE);
- my_global_state->state = DieAtState::kWriting;
- errno = saved_errno;
-
-#if defined(__x86_64__)
- __asm__ __volatile__("int $3" ::: "memory", "cc");
-#elif defined(__aarch64__)
- __asm__ __volatile__("brk #0" ::: "memory", "cc");
-#else
-#error Unhandled architecture
-#endif
-}
-
-// A mutex lock is about to happen. Mark the memory rw, and check to see if we
-// should die.
-void futex_before(void *address, bool) {
- GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
- if (IsInLocklessQueueMemory(address)) {
- assert(my_global_state->state == DieAtState::kRunning);
- HandleWrite(address);
- ShmProtectOrDie(PROT_READ | PROT_WRITE);
- my_global_state->state = DieAtState::kWriting;
- }
-}
-
-// The SEGV handler has set a breakpoint 1 instruction in the future. This
-// clears it, marks memory readonly, and continues.
-void trap_handler(int signal, siginfo_t *, void * /*context*/) {
- GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
- const int saved_errno = errno;
- SIMPLE_ASSERT(signal == SIGTRAP, "wrong signal for SIGTRAP handler");
-
- my_global_state->state = DieAtState::kWriting;
- SIMPLE_ASSERT(my_global_state->state == DieAtState::kWriting,
- "bad state for SIGTRAP");
- ShmProtectOrDie(PROT_READ);
- my_global_state->state = DieAtState::kRunning;
- errno = saved_errno;
-}
-
-// We have a manual trap for mutexes. Check to see if we were supposed to die
-// on this write (the compare/exchange for the mutex), and mark the memory ro
-// again.
-void futex_after(void *address, bool) {
- GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
- if (IsInLocklessQueueMemory(address)) {
- assert(my_global_state->state == DieAtState::kWriting);
- ShmProtectOrDie(PROT_READ);
- my_global_state->state = DieAtState::kRunning;
- }
-}
-
-// Installs the signal handler.
-void InstallHandler(int signal, void (*handler)(int, siginfo_t *, void *),
- struct sigaction *old_action) {
- struct sigaction action;
- memset(&action, 0, sizeof(action));
- action.sa_sigaction = handler;
- // We don't do a full normal signal handler exit with ptrace, so SA_NODEFER is
- // necessary to keep our signal handler active.
- action.sa_flags = SA_RESTART | SA_SIGINFO | SA_NODEFER;
-#ifdef AOS_SANITIZER_thread
- // Tsan messes with signal handlers to check for race conditions, and it
- // causes problems, so we have to work around it for SIGTRAP.
- if (signal == SIGTRAP) {
- typedef int (*SigactionType)(int, const struct sigaction *,
- struct sigaction *);
- SigactionType real_sigaction =
- reinterpret_cast<SigactionType>(dlsym(RTLD_NEXT, "sigaction"));
- if (sigaction == real_sigaction) {
- LOG(WARNING) << "failed to work around tsan signal handling weirdness";
- }
- PCHECK(real_sigaction(signal, &action, old_action) == 0);
- return;
- }
-#endif
- PCHECK(sigaction(signal, &action, old_action) == 0);
-}
-
-// gtest only allows creating fatal failures in functions returning void...
-// status is from wait(2).
-void DetectFatalFailures(int status) {
- if (WIFEXITED(status)) {
- FAIL() << " child returned status "
- << ::std::to_string(WEXITSTATUS(status));
- } else if (WIFSIGNALED(status)) {
- FAIL() << " child exited because of signal "
- << aos_strsignal(WTERMSIG(status));
- } else {
- FAIL() << " child exited with status " << ::std::hex << status;
- }
-}
-
-// Returns true if it runs all the way through.
-bool RunFunctionDieAt(::std::function<void(void *)> prepare,
- ::std::function<void(void *)> function,
- bool *test_failure, size_t die_at, bool prepare_in_child,
- uintptr_t writable_offset, const WritesArray *writes_in,
- WritesArray *writes_out) {
- GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
- my_global_state->writes_in = writes_in;
- my_global_state->writes_out = writes_out;
- my_global_state->die_at = die_at;
- my_global_state->current_location = 0;
- my_global_state->state = DieAtState::kDisabled;
-
- if (!prepare_in_child) prepare(my_global_state->lockless_queue_memory);
-
- const pid_t pid = fork();
- PCHECK(pid != -1) << ": fork() failed";
- if (pid == 0) {
- // Run the test.
- ::aos::testing::PreventExit();
-
- if (prepare_in_child) prepare(my_global_state->lockless_queue_memory);
-
- // Update the robust list offset.
- linux_code::ipc_lib::SetRobustListOffset(writable_offset);
- // Install a segv handler (to detect writes to the memory block), and a trap
- // handler so we can single step.
- InstallHandler(SIGSEGV, segv_handler, &old_segv_handler);
- InstallHandler(SIGTRAP, trap_handler, &old_trap_handler);
- CHECK_EQ(old_trap_handler.sa_handler, SIG_DFL);
- linux_code::ipc_lib::SetShmAccessorObservers(futex_before, futex_after);
-
- PCHECK(ptrace(PTRACE_TRACEME, 0, 0, 0) == 0);
- ShmProtectOrDie(PROT_READ);
- my_global_state->state = DieAtState::kRunning;
-
- function(my_global_state->lockless_queue_memory);
- my_global_state->state = DieAtState::kDisabled;
- ShmProtectOrDie(PROT_READ | PROT_WRITE);
- _exit(0);
- } else {
- // Annoying wrapper type because elf_gregset_t is an array, which C++
- // handles poorly.
- struct RestoreState {
- RestoreState(elf_gregset_t regs_in) {
- memcpy(regs, regs_in, sizeof(regs));
- }
- elf_gregset_t regs;
- };
- std::optional<RestoreState> restore_regs;
- bool pass_trap = false;
- // Wait until the child process dies.
- while (true) {
- int status;
- pid_t waited_on = waitpid(pid, &status, 0);
- if (waited_on == -1) {
- if (errno == EINTR) continue;
- PCHECK(false) << ": waitpid(" << pid << ", " << &status
- << ", 0) failed";
- }
- CHECK_EQ(waited_on, pid)
- << ": waitpid got child " << waited_on << " instead of " << pid;
- if (WIFSTOPPED(status)) {
- // The child was stopped via ptrace.
- const int stop_signal = WSTOPSIG(status);
- elf_gregset_t regs;
- {
- struct iovec iov;
- iov.iov_base = ®s;
- iov.iov_len = sizeof(regs);
- PCHECK(ptrace(PTRACE_GETREGSET, pid, NT_PRSTATUS, &iov) == 0);
- CHECK_EQ(iov.iov_len, sizeof(regs))
- << ": ptrace regset is the wrong size";
- }
- if (stop_signal == SIGSEGV) {
- // It's a SEGV, hopefully due to writing to the shared memory which is
- // marked read-only. We record the instruction that faulted so we can
- // look for it while single-stepping, then deliver the signal so the
- // child can mark it read-write and then poke us to single-step that
- // instruction.
-
- CHECK(!restore_regs)
- << ": Traced child got a SEGV while single-stepping";
- // Save all the registers to resume execution at the current location
- // in the child.
- restore_regs = RestoreState(regs);
- PCHECK(ptrace(PTRACE_CONT, pid, nullptr, SIGSEGV) == 0);
- continue;
- }
- if (stop_signal == SIGTRAP) {
- if (pass_trap) {
- // This is the new SIGTRAP we generated, which we just want to pass
- // through so the child's signal handler can restore the memory to
- // read-only
- PCHECK(ptrace(PTRACE_CONT, pid, nullptr, SIGTRAP) == 0);
- pass_trap = false;
- continue;
- }
- if (restore_regs) {
- // Restore the state we saved before delivering the SEGV, and then
- // single-step that one instruction.
- struct iovec iov;
- iov.iov_base = &restore_regs->regs;
- iov.iov_len = sizeof(restore_regs->regs);
- PCHECK(ptrace(PTRACE_SETREGSET, pid, NT_PRSTATUS, &iov) == 0);
- restore_regs = std::nullopt;
- PCHECK(ptrace(PTRACE_SINGLESTEP, pid, nullptr, nullptr) == 0);
- continue;
- }
- // We executed the single instruction that originally faulted, so
- // now deliver a SIGTRAP to the child so it can mark the memory
- // read-only again.
- pass_trap = true;
- PCHECK(kill(pid, SIGTRAP) == 0);
- PCHECK(ptrace(PTRACE_CONT, pid, nullptr, nullptr) == 0);
- continue;
- }
- LOG(FATAL) << "Traced child was stopped with unexpected signal: "
- << static_cast<int>(WSTOPSIG(status));
- }
- if (WIFEXITED(status)) {
- if (WEXITSTATUS(status) == 0) return true;
- if (WEXITSTATUS(status) == kExitEarlyValue) return false;
- }
- DetectFatalFailures(status);
- if (test_failure) *test_failure = true;
- return false;
- }
- }
-}
-
-bool RunFunctionDieAtAndCheck(const LocklessQueueConfiguration &config,
- ::std::function<void(void *)> prepare,
- ::std::function<void(void *)> function,
- ::std::function<void(void *)> check,
- bool *test_failure, size_t die_at,
- bool prepare_in_child,
- const WritesArray *writes_in,
- WritesArray *writes_out) {
- // Allocate shared memory.
- GlobalState *my_global_state = global_state.load(::std::memory_order_relaxed);
- my_global_state->lockless_queue_memory_size = LocklessQueueMemorySize(config);
- my_global_state->lockless_queue_memory = static_cast<void *>(
- mmap(nullptr, my_global_state->lockless_queue_memory_size,
- PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0));
- CHECK_NE(MAP_FAILED, my_global_state->lockless_queue_memory);
-
- // And the backup used to point the robust list at.
- my_global_state->lockless_queue_memory_lock_backup = static_cast<void *>(
- mmap(nullptr, my_global_state->lockless_queue_memory_size,
- PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0));
- CHECK_NE(MAP_FAILED, my_global_state->lockless_queue_memory_lock_backup);
-
- // The writable offset tells us how to convert from a pointer in the queue to
- // a pointer that is safe to write. This is so robust futexes don't spin the
- // kernel when the user maps a page PROT_READ, and the kernel tries to clear
- // the futex there.
- const uintptr_t writable_offset =
- reinterpret_cast<uintptr_t>(
- my_global_state->lockless_queue_memory_lock_backup) -
- reinterpret_cast<uintptr_t>(my_global_state->lockless_queue_memory);
-
- bool r;
- // Do the actual test in a new thread so any locked mutexes will be cleaned up
- // nicely with owner-died at the end.
- ::std::thread test_thread([&prepare, &function, &check, test_failure, die_at,
- prepare_in_child, writes_in, writes_out,
- writable_offset, &r]() {
- r = RunFunctionDieAt(prepare, function, test_failure, die_at,
- prepare_in_child, writable_offset, writes_in,
- writes_out);
- if (::testing::Test::HasFailure()) {
- r = false;
- if (test_failure) *test_failure = true;
- return;
- }
-
- check(
- global_state.load(::std::memory_order_relaxed)->lockless_queue_memory);
- });
- test_thread.join();
- return r;
-}
-
-// Tests function to make sure it handles dying after each store it makes to
-// shared memory. check should make sure function behaved correctly.
-// This will repeatedly create a new TestSharedMemory, run prepare, run
-// function, and then
-// run check, killing the process function is running in at various points. It
-// will stop if anything reports a fatal gtest failure.
-//
-// prepare_in_child being true means the prepare function will be run in the
-// child instead of the parent which doesn't die. This means that reference
-// counts on any objects it allocates won't be duplicated, but it also means
-// that any variables it sets will not be visible in check etc.
-void TestShmRobustness(const LocklessQueueConfiguration &config,
- ::std::function<void(void *)> prepare,
- ::std::function<void(void *)> function,
- ::std::function<void(void *)> check,
- bool prepare_in_child) {
- // Map the global state and memory for the Writes array so it exists across
- // the process boundary.
- void *shared_allocations = static_cast<GlobalState *>(
- mmap(nullptr, sizeof(GlobalState) + sizeof(WritesArray),
- PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0));
- CHECK_NE(MAP_FAILED, shared_allocations);
-
- global_state.store(static_cast<GlobalState *>(shared_allocations));
- shared_allocations = static_cast<void *>(
- static_cast<uint8_t *>(shared_allocations) + sizeof(GlobalState));
- WritesArray *expected_writes = static_cast<WritesArray *>(shared_allocations);
- new (expected_writes) WritesArray();
-
- bool test_failed = false;
- ASSERT_TRUE(RunFunctionDieAtAndCheck(config, prepare, function, check,
- &test_failed, 0, prepare_in_child,
- nullptr, expected_writes));
- if (test_failed) {
- ADD_FAILURE();
- return;
- }
-
- size_t die_at = 1;
- while (true) {
- SCOPED_TRACE("dying at " + ::std::to_string(die_at) + "/" +
- ::std::to_string(expected_writes->size()));
- if (RunFunctionDieAtAndCheck(config, prepare, function, check, &test_failed,
- die_at, prepare_in_child, expected_writes,
- nullptr)) {
- LOG(INFO) << "Tested " << die_at << " death points";
- return;
- }
- if (test_failed) {
- ADD_FAILURE();
- }
- if (::testing::Test::HasFailure()) return;
- ++die_at;
- }
-}
+#ifdef SUPPORTS_SHM_ROBUSTNESS_TEST
namespace {
-pid_t gettid() { return syscall(SYS_gettid); }
-
-// Sets FUTEX_OWNER_DIED if the owner was tid. This fakes what the kernel does
-// with a robust mutex.
-bool PretendOwnerDied(aos_mutex *mutex, pid_t tid) {
- if ((mutex->futex & FUTEX_TID_MASK) == tid) {
- mutex->futex = FUTEX_OWNER_DIED;
- return true;
- }
- return false;
-}
-
static int kPinnedMessageIndex = 0;
constexpr monotonic_clock::duration kChannelStorageDuration =
@@ -567,12 +46,8 @@
// Tests that death during sends is recovered from correctly.
TEST(LocklessQueueTest, Death) {
::aos::testing::EnableTestLogging();
- // Capture the tid in the child so we can tell if it died. Use mmap so it
- // works across the process boundary.
- pid_t *tid =
- static_cast<pid_t *>(mmap(nullptr, sizeof(pid_t), PROT_READ | PROT_WRITE,
- MAP_SHARED | MAP_ANONYMOUS, -1, 0));
- CHECK_NE(MAP_FAILED, tid);
+
+ SharedTid tid;
// Make a small queue so it is easier to debug.
LocklessQueueConfiguration config;
@@ -584,14 +59,14 @@
TestShmRobustness(
config,
- [config, tid](void *memory) {
+ [config, &tid](void *memory) {
// Initialize the queue and grab the tid.
LocklessQueue(
reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
config)
.Initialize();
- *tid = gettid();
+ tid.Set();
},
[config](void *memory) {
LocklessQueue queue(
@@ -617,7 +92,7 @@
}
}
},
- [config, tid](void *raw_memory) {
+ [config, &tid](void *raw_memory) {
::aos::ipc_lib::LocklessQueueMemory *const memory =
reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
// Confirm that we can create 2 senders (the number in the queue), and
@@ -629,10 +104,10 @@
// TestShmRobustness doesn't handle robust futexes. It is happy to just
// not crash with them. We know what they are, and what the tid of the
// holder is. So go pretend to be the kernel and fix it for it.
- PretendOwnerDied(&memory->queue_setup_lock, *tid);
+ PretendOwnerDied(&memory->queue_setup_lock, tid.Get());
for (size_t i = 0; i < config.num_senders; ++i) {
- if (PretendOwnerDied(&memory->GetSender(i)->tid, *tid)) {
+ if (PretendOwnerDied(&memory->GetSender(i)->tid, tid.Get())) {
// Print out before and after results if a sender died. That is the
// more fun case.
print = true;
@@ -735,8 +210,7 @@
// Confirm our message got through.
EXPECT_EQ(last_data, '9') << ": Got through " << i;
- },
- /* prepare_in_child = true */ true);
+ });
}
#endif
diff --git a/aos/ipc_lib/lockless_queue_stepping.cc b/aos/ipc_lib/lockless_queue_stepping.cc
new file mode 100644
index 0000000..0ec5214
--- /dev/null
+++ b/aos/ipc_lib/lockless_queue_stepping.cc
@@ -0,0 +1,473 @@
+#include "aos/ipc_lib/lockless_queue_stepping.h"
+
+#include <dlfcn.h>
+#include <elf.h>
+#include <linux/futex.h>
+#include <sys/mman.h>
+#include <sys/procfs.h>
+#include <sys/ptrace.h>
+#include <sys/syscall.h>
+#include <sys/uio.h>
+#include <unistd.h>
+#include <wait.h>
+
+#include <memory>
+#include <thread>
+
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+#include "aos/ipc_lib/aos_sync.h"
+#include "aos/ipc_lib/lockless_queue_memory.h"
+#include "aos/ipc_lib/shm_observers.h"
+#include "aos/libc/aos_strsignal.h"
+#include "aos/testing/prevent_exit.h"
+
+#ifdef SUPPORTS_SHM_ROBUSTNESS_TEST
+
+namespace aos {
+namespace ipc_lib {
+namespace testing {
+
+namespace {
+pid_t gettid() { return syscall(SYS_gettid); }
+
+::std::atomic<GlobalState *> global_state;
+
+struct sigaction old_segv_handler, old_trap_handler;
+
+// Calls the original signal handler.
+bool CallChainedAction(const struct sigaction &action, int signal,
+ siginfo_t *siginfo, void *context) {
+ if (action.sa_handler == SIG_IGN || action.sa_handler == SIG_DFL) {
+ return false;
+ }
+ if (action.sa_flags & SA_SIGINFO) {
+ action.sa_sigaction(signal, siginfo, context);
+ } else {
+ action.sa_handler(signal);
+ }
+ return true;
+}
+
+void segv_handler(int signal, siginfo_t *siginfo, void *context_void) {
+ GlobalState *my_global_state = GlobalState::Get();
+ const int saved_errno = errno;
+ SIMPLE_ASSERT(signal == SIGSEGV, "wrong signal for SIGSEGV handler");
+
+ // Only process memory addresses in our shared memory block.
+ if (!my_global_state->IsInLocklessQueueMemory(siginfo->si_addr)) {
+ if (CallChainedAction(old_segv_handler, signal, siginfo, context_void)) {
+ errno = saved_errno;
+ return;
+ } else {
+ SIMPLE_ASSERT(false, "actual SIGSEGV");
+ }
+ }
+ SIMPLE_ASSERT(my_global_state->state == DieAtState::kRunning,
+ "bad state for SIGSEGV");
+
+ my_global_state->HandleWrite(siginfo->si_addr);
+
+ my_global_state->ShmProtectOrDie(PROT_READ | PROT_WRITE);
+ my_global_state->state = DieAtState::kWriting;
+ errno = saved_errno;
+
+#if defined(__x86_64__)
+ __asm__ __volatile__("int $3" ::: "memory", "cc");
+#elif defined(__aarch64__)
+ __asm__ __volatile__("brk #0" ::: "memory", "cc");
+#else
+#error Unhandled architecture
+#endif
+}
+
+// The SEGV handler has set a breakpoint 1 instruction in the future. This
+// clears it, marks memory readonly, and continues.
+void trap_handler(int signal, siginfo_t *, void * /*context*/) {
+ GlobalState *my_global_state = GlobalState::Get();
+ const int saved_errno = errno;
+ SIMPLE_ASSERT(signal == SIGTRAP, "wrong signal for SIGTRAP handler");
+
+ my_global_state->state = DieAtState::kWriting;
+ SIMPLE_ASSERT(my_global_state->state == DieAtState::kWriting,
+ "bad state for SIGTRAP");
+ my_global_state->ShmProtectOrDie(PROT_READ);
+ my_global_state->state = DieAtState::kRunning;
+ errno = saved_errno;
+}
+
+// Installs the signal handler.
+void InstallHandler(int signal, void (*handler)(int, siginfo_t *, void *),
+ struct sigaction *old_action) {
+ struct sigaction action;
+ memset(&action, 0, sizeof(action));
+ action.sa_sigaction = handler;
+ // We don't do a full normal signal handler exit with ptrace, so SA_NODEFER is
+ // necessary to keep our signal handler active.
+ action.sa_flags = SA_RESTART | SA_SIGINFO | SA_NODEFER;
+#ifdef AOS_SANITIZER_thread
+ // Tsan messes with signal handlers to check for race conditions, and it
+ // causes problems, so we have to work around it for SIGTRAP.
+ if (signal == SIGTRAP) {
+ typedef int (*SigactionType)(int, const struct sigaction *,
+ struct sigaction *);
+ SigactionType real_sigaction =
+ reinterpret_cast<SigactionType>(dlsym(RTLD_NEXT, "sigaction"));
+ if (sigaction == real_sigaction) {
+ LOG(WARNING) << "failed to work around tsan signal handling weirdness";
+ }
+ PCHECK(real_sigaction(signal, &action, old_action) == 0);
+ return;
+ }
+#endif
+ PCHECK(sigaction(signal, &action, old_action) == 0);
+}
+
+// A mutex lock is about to happen. Mark the memory rw, and check to see if we
+// should die.
+void futex_before(void *address, bool) {
+ GlobalState *my_global_state = GlobalState::Get();
+ if (my_global_state->IsInLocklessQueueMemory(address)) {
+ assert(my_global_state->state == DieAtState::kRunning);
+ my_global_state->HandleWrite(address);
+ my_global_state->ShmProtectOrDie(PROT_READ | PROT_WRITE);
+ my_global_state->state = DieAtState::kWriting;
+ }
+}
+
+// We have a manual trap for mutexes. Check to see if we were supposed to die
+// on this write (the compare/exchange for the mutex), and mark the memory ro
+// again.
+void futex_after(void *address, bool) {
+ GlobalState *my_global_state = GlobalState::Get();
+ if (my_global_state->IsInLocklessQueueMemory(address)) {
+ assert(my_global_state->state == DieAtState::kWriting);
+ my_global_state->ShmProtectOrDie(PROT_READ);
+ my_global_state->state = DieAtState::kRunning;
+ }
+}
+
+} // namespace
+
+void GlobalState::HandleWrite(void *address) {
+ uintptr_t address_offset = reinterpret_cast<uintptr_t>(address) -
+ reinterpret_cast<uintptr_t>(lockless_queue_memory);
+ if (writes_in != nullptr) {
+ SIMPLE_ASSERT(writes_in->At(current_location) == address_offset,
+ "wrong write order");
+ }
+ if (writes_out != nullptr) {
+ writes_out->Add(address_offset);
+ }
+ if (die_at != 0) {
+ if (die_at == current_location) {
+ _exit(kExitEarlyValue);
+ }
+ }
+ ++current_location;
+}
+
+GlobalState *GlobalState::Get() {
+ return global_state.load(::std::memory_order_relaxed);
+}
+
+std::tuple<GlobalState *, WritesArray *> GlobalState::MakeGlobalState() {
+ // Map the global state and memory for the Writes array so it exists across
+ // the process boundary.
+ void *shared_allocations = static_cast<GlobalState *>(
+ mmap(nullptr, sizeof(GlobalState) + sizeof(WritesArray),
+ PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0));
+ CHECK_NE(MAP_FAILED, shared_allocations);
+
+ global_state.store(static_cast<GlobalState *>(shared_allocations));
+ void *expected_writes_shared_allocations = static_cast<void *>(
+ static_cast<uint8_t *>(shared_allocations) + sizeof(GlobalState));
+ WritesArray *expected_writes =
+ static_cast<WritesArray *>(expected_writes_shared_allocations);
+ new (expected_writes) WritesArray();
+ return std::make_pair(static_cast<GlobalState *>(shared_allocations),
+ expected_writes);
+}
+
+bool GlobalState::IsInLocklessQueueMemory(void *address) {
+ void *read_lockless_queue_memory = lockless_queue_memory;
+ if (address < read_lockless_queue_memory) {
+ return false;
+ if (reinterpret_cast<uintptr_t>(address) >
+ reinterpret_cast<uintptr_t>(read_lockless_queue_memory) +
+ lockless_queue_memory_size)
+ return false;
+ }
+ return true;
+}
+
+void GlobalState::ShmProtectOrDie(int prot) {
+ PCHECK(mprotect(lockless_queue_memory, lockless_queue_memory_size, prot) !=
+ -1)
+ << ": mprotect(" << lockless_queue_memory << ", "
+ << lockless_queue_memory_size << ", 0x" << std::hex << prot << ") failed";
+}
+
+void GlobalState::RegisterSegvAndTrapHandlers() {
+ InstallHandler(SIGSEGV, segv_handler, &old_segv_handler);
+ InstallHandler(SIGTRAP, trap_handler, &old_trap_handler);
+ CHECK_EQ(old_trap_handler.sa_handler, SIG_DFL);
+ linux_code::ipc_lib::SetShmAccessorObservers(futex_before, futex_after);
+}
+
+// gtest only allows creating fatal failures in functions returning void...
+// status is from wait(2).
+void DetectFatalFailures(int status) {
+ if (WIFEXITED(status)) {
+ FAIL() << " child returned status "
+ << ::std::to_string(WEXITSTATUS(status));
+ } else if (WIFSIGNALED(status)) {
+ FAIL() << " child exited because of signal "
+ << aos_strsignal(WTERMSIG(status));
+ } else {
+ FAIL() << " child exited with status " << ::std::hex << status;
+ }
+}
+
+// Returns true if it runs all the way through.
+bool RunFunctionDieAt(::std::function<void(void *)> prepare,
+ ::std::function<void(void *)> function,
+ bool *test_failure, size_t die_at,
+ uintptr_t writable_offset, const WritesArray *writes_in,
+ WritesArray *writes_out) {
+ GlobalState *my_global_state = GlobalState::Get();
+ my_global_state->writes_in = writes_in;
+ my_global_state->writes_out = writes_out;
+ my_global_state->die_at = die_at;
+ my_global_state->current_location = 0;
+ my_global_state->state = DieAtState::kDisabled;
+
+ const pid_t pid = fork();
+ PCHECK(pid != -1) << ": fork() failed";
+ if (pid == 0) {
+ // Run the test.
+ ::aos::testing::PreventExit();
+
+ prepare(my_global_state->lockless_queue_memory);
+
+ // Update the robust list offset.
+ linux_code::ipc_lib::SetRobustListOffset(writable_offset);
+ // Install a segv handler (to detect writes to the memory block), and a trap
+ // handler so we can single step.
+ my_global_state->RegisterSegvAndTrapHandlers();
+
+ PCHECK(ptrace(PTRACE_TRACEME, 0, 0, 0) == 0);
+ my_global_state->ShmProtectOrDie(PROT_READ);
+ my_global_state->state = DieAtState::kRunning;
+
+ function(my_global_state->lockless_queue_memory);
+ my_global_state->state = DieAtState::kDisabled;
+ my_global_state->ShmProtectOrDie(PROT_READ | PROT_WRITE);
+ _exit(0);
+ } else {
+ // Annoying wrapper type because elf_gregset_t is an array, which C++
+ // handles poorly.
+ struct RestoreState {
+ RestoreState(elf_gregset_t regs_in) {
+ memcpy(regs, regs_in, sizeof(regs));
+ }
+ elf_gregset_t regs;
+ };
+ std::optional<RestoreState> restore_regs;
+ bool pass_trap = false;
+ // Wait until the child process dies.
+ while (true) {
+ int status;
+ pid_t waited_on = waitpid(pid, &status, 0);
+ if (waited_on == -1) {
+ if (errno == EINTR) continue;
+ PCHECK(false) << ": waitpid(" << pid << ", " << &status
+ << ", 0) failed";
+ }
+ CHECK_EQ(waited_on, pid)
+ << ": waitpid got child " << waited_on << " instead of " << pid;
+ if (WIFSTOPPED(status)) {
+ // The child was stopped via ptrace.
+ const int stop_signal = WSTOPSIG(status);
+ elf_gregset_t regs;
+ {
+ struct iovec iov;
+ iov.iov_base = ®s;
+ iov.iov_len = sizeof(regs);
+ PCHECK(ptrace(PTRACE_GETREGSET, pid, NT_PRSTATUS, &iov) == 0);
+ CHECK_EQ(iov.iov_len, sizeof(regs))
+ << ": ptrace regset is the wrong size";
+ }
+ if (stop_signal == SIGSEGV) {
+ // It's a SEGV, hopefully due to writing to the shared memory which is
+ // marked read-only. We record the instruction that faulted so we can
+ // look for it while single-stepping, then deliver the signal so the
+ // child can mark it read-write and then poke us to single-step that
+ // instruction.
+
+ CHECK(!restore_regs)
+ << ": Traced child got a SEGV while single-stepping";
+ // Save all the registers to resume execution at the current location
+ // in the child.
+ restore_regs = RestoreState(regs);
+ PCHECK(ptrace(PTRACE_CONT, pid, nullptr, SIGSEGV) == 0);
+ continue;
+ }
+ if (stop_signal == SIGTRAP) {
+ if (pass_trap) {
+ // This is the new SIGTRAP we generated, which we just want to pass
+ // through so the child's signal handler can restore the memory to
+ // read-only
+ PCHECK(ptrace(PTRACE_CONT, pid, nullptr, SIGTRAP) == 0);
+ pass_trap = false;
+ continue;
+ }
+ if (restore_regs) {
+ // Restore the state we saved before delivering the SEGV, and then
+ // single-step that one instruction.
+ struct iovec iov;
+ iov.iov_base = &restore_regs->regs;
+ iov.iov_len = sizeof(restore_regs->regs);
+ PCHECK(ptrace(PTRACE_SETREGSET, pid, NT_PRSTATUS, &iov) == 0);
+ restore_regs = std::nullopt;
+ PCHECK(ptrace(PTRACE_SINGLESTEP, pid, nullptr, nullptr) == 0);
+ continue;
+ }
+ // We executed the single instruction that originally faulted, so
+ // now deliver a SIGTRAP to the child so it can mark the memory
+ // read-only again.
+ pass_trap = true;
+ PCHECK(kill(pid, SIGTRAP) == 0);
+ PCHECK(ptrace(PTRACE_CONT, pid, nullptr, nullptr) == 0);
+ continue;
+ }
+ LOG(FATAL) << "Traced child was stopped with unexpected signal: "
+ << static_cast<int>(WSTOPSIG(status));
+ }
+ if (WIFEXITED(status)) {
+ if (WEXITSTATUS(status) == 0) return true;
+ if (WEXITSTATUS(status) == kExitEarlyValue) return false;
+ }
+ DetectFatalFailures(status);
+ if (test_failure) *test_failure = true;
+ return false;
+ }
+ }
+}
+
+bool RunFunctionDieAtAndCheck(const LocklessQueueConfiguration &config,
+ ::std::function<void(void *)> prepare,
+ ::std::function<void(void *)> function,
+ ::std::function<void(void *)> check,
+ bool *test_failure, size_t die_at,
+ const WritesArray *writes_in,
+ WritesArray *writes_out) {
+ // Allocate shared memory.
+ GlobalState *my_global_state = GlobalState::Get();
+ my_global_state->lockless_queue_memory_size = LocklessQueueMemorySize(config);
+ my_global_state->lockless_queue_memory = static_cast<void *>(
+ mmap(nullptr, my_global_state->lockless_queue_memory_size,
+ PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0));
+ CHECK_NE(MAP_FAILED, my_global_state->lockless_queue_memory);
+
+ // And the backup used to point the robust list at.
+ my_global_state->lockless_queue_memory_lock_backup = static_cast<void *>(
+ mmap(nullptr, my_global_state->lockless_queue_memory_size,
+ PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0));
+ CHECK_NE(MAP_FAILED, my_global_state->lockless_queue_memory_lock_backup);
+
+ // The writable offset tells us how to convert from a pointer in the queue to
+ // a pointer that is safe to write. This is so robust futexes don't spin the
+ // kernel when the user maps a page PROT_READ, and the kernel tries to clear
+ // the futex there.
+ const uintptr_t writable_offset =
+ reinterpret_cast<uintptr_t>(
+ my_global_state->lockless_queue_memory_lock_backup) -
+ reinterpret_cast<uintptr_t>(my_global_state->lockless_queue_memory);
+
+ bool r;
+ // Do the actual test in a new thread so any locked mutexes will be cleaned up
+ // nicely with owner-died at the end.
+ ::std::thread test_thread([&prepare, &function, &check, test_failure, die_at,
+ writes_in, writes_out, writable_offset, &r]() {
+ r = RunFunctionDieAt(prepare, function, test_failure, die_at,
+ writable_offset, writes_in, writes_out);
+ if (::testing::Test::HasFailure()) {
+ r = false;
+ if (test_failure) *test_failure = true;
+ return;
+ }
+
+ check(GlobalState::Get()->lockless_queue_memory);
+ });
+ test_thread.join();
+ return r;
+}
+
+// Tests function to make sure it handles dying after each store it makes to
+// shared memory. check should make sure function behaved correctly.
+// This will repeatedly create a new TestSharedMemory, run prepare, run
+// function, and then
+// run check, killing the process function is running in at various points. It
+// will stop if anything reports a fatal gtest failure.
+void TestShmRobustness(const LocklessQueueConfiguration &config,
+ ::std::function<void(void *)> prepare,
+ ::std::function<void(void *)> function,
+ ::std::function<void(void *)> check) {
+ auto [my_global_state, expected_writes] = GlobalState::MakeGlobalState();
+
+ bool test_failed = false;
+ ASSERT_TRUE(RunFunctionDieAtAndCheck(config, prepare, function, check,
+ &test_failed, 0, nullptr,
+ expected_writes));
+ if (test_failed) {
+ ADD_FAILURE();
+ return;
+ }
+
+ size_t die_at = 1;
+ while (true) {
+ SCOPED_TRACE("dying at " + ::std::to_string(die_at) + "/" +
+ ::std::to_string(expected_writes->size()));
+ if (RunFunctionDieAtAndCheck(config, prepare, function, check, &test_failed,
+ die_at, expected_writes, nullptr)) {
+ LOG(INFO) << "Tested " << die_at << " death points";
+ return;
+ }
+ if (test_failed) {
+ ADD_FAILURE();
+ }
+ if (::testing::Test::HasFailure()) return;
+ ++die_at;
+ }
+}
+
+SharedTid::SharedTid() {
+ // Capture the tid in the child so we can tell if it died. Use mmap so it
+ // works across the process boundary.
+ tid_ =
+ static_cast<pid_t *>(mmap(nullptr, sizeof(pid_t), PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_ANONYMOUS, -1, 0));
+ CHECK_NE(MAP_FAILED, tid_);
+}
+
+SharedTid::~SharedTid() { CHECK_EQ(munmap(tid_, sizeof(pid_t)), 0); }
+
+void SharedTid::Set() { *tid_ = gettid(); }
+
+pid_t SharedTid::Get() { return *tid_; }
+
+bool PretendOwnerDied(aos_mutex *mutex, pid_t tid) {
+ if ((mutex->futex & FUTEX_TID_MASK) == tid) {
+ mutex->futex = FUTEX_OWNER_DIED;
+ return true;
+ }
+ return false;
+}
+
+} // namespace testing
+} // namespace ipc_lib
+} // namespace aos
+
+#endif // SUPPORTS_SHM_ROBSTNESS_TEST
diff --git a/aos/ipc_lib/lockless_queue_stepping.h b/aos/ipc_lib/lockless_queue_stepping.h
new file mode 100644
index 0000000..b3eb6e3
--- /dev/null
+++ b/aos/ipc_lib/lockless_queue_stepping.h
@@ -0,0 +1,153 @@
+#ifndef AOS_IPC_LIB_LOCKLESS_QUEUE_STEPPING_H_
+#define AOS_IPC_LIB_LOCKLESS_QUEUE_STEPPING_H_
+
+#include <cinttypes>
+#include <functional>
+
+#include "aos/ipc_lib/lockless_queue.h"
+#include "aos/ipc_lib/lockless_queue_memory.h"
+
+namespace aos {
+namespace ipc_lib {
+namespace testing {
+
+#if defined(__ARM_EABI__)
+// There are various reasons why we might not actually be able to do this
+// testing, but we still want to run the functions to test anything they test
+// other than shm robustness.
+//
+// ARM doesn't have a nice way to do single-stepping, and I don't feel like
+// dealing with editing the next instruction to be a breakpoint and then
+// changing it back.
+#else
+
+#define SUPPORTS_SHM_ROBUSTNESS_TEST
+
+// This code currently supports amd64 only, but it
+// shouldn't be hard to port to i386 (it should just be using the name for
+// only the smaller part of the flags register), but that's not tested, and
+// porting it to any other architecture is more work.
+// Currently, we skip doing anything exciting on arm (just run the code without
+// any robustness testing) and refuse to compile anywhere else.
+
+#define SIMPLE_ASSERT(condition, message) \
+ do { \
+ if (!(condition)) { \
+ static const char kMessage[] = message "\n"; \
+ if (write(STDERR_FILENO, kMessage, sizeof(kMessage) - 1) != \
+ (sizeof(kMessage) - 1)) { \
+ static const char kFailureMessage[] = "writing failed\n"; \
+ __attribute__((unused)) int ignore = write( \
+ STDERR_FILENO, kFailureMessage, sizeof(kFailureMessage) - 1); \
+ } \
+ abort(); \
+ } \
+ } while (false)
+
+// Array to track writes to memory, and make sure they happen in the right
+// order.
+class WritesArray {
+ public:
+ uintptr_t At(size_t location) const {
+ SIMPLE_ASSERT(location < size_, "too far into writes array");
+ return writes_[location];
+ }
+ void Add(uintptr_t pointer) {
+ SIMPLE_ASSERT(size_ < kSize, "too many writes");
+ writes_[size_++] = pointer;
+ }
+
+ size_t size() const { return size_; }
+
+ private:
+ static const size_t kSize = 20000;
+
+ uintptr_t writes_[kSize];
+ size_t size_ = 0;
+};
+
+enum class DieAtState {
+ // No SEGVs should be happening.
+ kDisabled,
+ // SEGVs are fine. Normal operation.
+ kRunning,
+ // We are manipulating a mutex. No SEGVs should be happening.
+ kWriting,
+};
+
+// What we exit with when we're exiting in the middle.
+const int kExitEarlyValue = 123;
+
+// We have to keep track of everything in a global variable because there's no
+// other way for the signal handlers to find it.
+struct GlobalState {
+ // Constructs the singleton GlobalState.
+ static std::tuple<GlobalState *, WritesArray *> MakeGlobalState();
+
+ // Returns the global state. Atomic and safe from signal handlers.
+ static GlobalState *Get();
+
+ // Pointer to the queue memory, and its size.
+ void *lockless_queue_memory;
+ size_t lockless_queue_memory_size;
+
+ // Pointer to a second block of memory the same size. This (on purpose) has
+ // the same size as lockless_queue_memory so we can point the robust mutexes
+ // here.
+ void *lockless_queue_memory_lock_backup;
+
+ // Expected writes.
+ const WritesArray *writes_in;
+ // Actual writes.
+ WritesArray *writes_out;
+ // Location to die at, and how far we have gotten.
+ size_t die_at, current_location;
+ // State.
+ DieAtState state;
+
+ // Returns true if the address is in the queue memory chunk.
+ bool IsInLocklessQueueMemory(void *address);
+
+ // Calls mprotect(2) for the entire shared memory region with the given prot.
+ void ShmProtectOrDie(int prot);
+
+ // Checks a write into the queue and conditionally dies. Tracks the write.
+ void HandleWrite(void *address);
+
+ // Registers the handlers required to trap writes.
+ void RegisterSegvAndTrapHandlers();
+};
+
+void TestShmRobustness(const LocklessQueueConfiguration &config,
+ ::std::function<void(void *)> prepare,
+ ::std::function<void(void *)> function,
+ ::std::function<void(void *)> check);
+
+// Capture the tid in the child so we can tell if it died. Uses mmap so it
+// works across the process boundary.
+class SharedTid {
+ public:
+ SharedTid();
+ ~SharedTid();
+
+ // Captures the tid.
+ void Set();
+
+ // Returns the captured tid.
+ pid_t Get();
+
+ private:
+ pid_t *tid_;
+};
+
+// Sets FUTEX_OWNER_DIED if the owner was tid. This fakes what the kernel does
+// with a robust mutex.
+bool PretendOwnerDied(aos_mutex *mutex, pid_t tid);
+
+#endif
+
+} // namespace testing
+} // namespace ipc_lib
+} // namespace aos
+
+#endif // AOS_IPC_LIB_LOCKLESS_QUEUE_STEPPING_H_
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 93ae2a3..58e093f 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -1,5 +1,6 @@
#include "aos/ipc_lib/lockless_queue.h"
+#include <sys/mman.h>
#include <unistd.h>
#include <wait.h>
@@ -16,6 +17,8 @@
#include "aos/events/epoll.h"
#include "aos/ipc_lib/aos_sync.h"
#include "aos/ipc_lib/event.h"
+#include "aos/ipc_lib/lockless_queue_memory.h"
+#include "aos/ipc_lib/lockless_queue_stepping.h"
#include "aos/ipc_lib/queue_racer.h"
#include "aos/ipc_lib/signalfd.h"
#include "aos/realtime.h"
@@ -421,6 +424,148 @@
static_cast<double>(kNumMessages) / elapsed_seconds);
}
+#if defined(SUPPORTS_SHM_ROBUSTNESS_TEST)
+
+// Verifies that LatestIndex points to the same message as the logic from
+// "FetchNext", which increments the index until it gets "NOTHING_NEW" back.
+// This is so we can confirm fetchers and watchers all see the same message at
+// the same point in time.
+int VerifyMessages(LocklessQueue *queue, LocklessQueueMemory *memory) {
+ LocklessQueueReader reader(*queue);
+
+ const ipc_lib::QueueIndex queue_index = reader.LatestIndex();
+ if (!queue_index.valid()) {
+ return 0;
+ }
+
+ // Now loop through the queue and make sure the number in the snprintf
+ // increments.
+ char last_data = '0';
+ int i = 0;
+
+ // Callback which isn't set so we don't exercise the conditional reading code.
+ std::function<bool(const Context &)> should_read_callback;
+
+ // Now, read as far as we can until we get NOTHING_NEW. This simulates
+ // FetchNext.
+ while (true) {
+ monotonic_clock::time_point monotonic_sent_time;
+ realtime_clock::time_point realtime_sent_time;
+ monotonic_clock::time_point monotonic_remote_time;
+ realtime_clock::time_point realtime_remote_time;
+ uint32_t remote_queue_index;
+ UUID source_boot_uuid;
+ char read_data[1024];
+ size_t length;
+
+ LocklessQueueReader::Result read_result = reader.Read(
+ i, &monotonic_sent_time, &realtime_sent_time, &monotonic_remote_time,
+ &realtime_remote_time, &remote_queue_index, &source_boot_uuid, &length,
+ &(read_data[0]), std::ref(should_read_callback));
+
+ if (read_result != LocklessQueueReader::Result::GOOD) {
+ if (read_result == LocklessQueueReader::Result::TOO_OLD) {
+ ++i;
+ continue;
+ }
+ CHECK(read_result == LocklessQueueReader::Result::NOTHING_NEW)
+ << ": " << static_cast<int>(read_result);
+ break;
+ }
+
+ EXPECT_GT(read_data[LocklessQueueMessageDataSize(memory) - length + 6],
+ last_data)
+ << ": Got " << read_data << " for " << i;
+ last_data = read_data[LocklessQueueMessageDataSize(memory) - length + 6];
+
+ ++i;
+ }
+
+ // The latest queue index should match the fetched queue index.
+ if (i == 0) {
+ EXPECT_FALSE(queue_index.valid());
+ } else {
+ EXPECT_EQ(queue_index.index(), i - 1);
+ }
+ return i;
+}
+
+// Tests that at all points in the publish step, fetch == fetch next. This
+// means that there is an atomic point at which the message is viewed as visible
+// to consumers. Do this by killing the writer after each change to shared
+// memory, and confirming fetch == fetch next each time.
+TEST_F(LocklessQueueTest, FetchEqFetchNext) {
+ SharedTid tid;
+
+ // Make a small queue so it is easier to debug.
+ LocklessQueueConfiguration config;
+ config.num_watchers = 1;
+ config.num_senders = 2;
+ config.num_pinners = 0;
+ config.queue_size = 3;
+ config.message_data_size = 32;
+
+ TestShmRobustness(
+ config,
+ [config, &tid](void *memory) {
+ // Initialize the queue.
+ LocklessQueue(
+ reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+ reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+ config)
+ .Initialize();
+ tid.Set();
+ },
+ [config](void *memory) {
+ LocklessQueue queue(
+ reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+ reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+ config);
+ // Now try to write some messages. We will get killed a bunch as this
+ // tries to happen.
+ LocklessQueueSender sender =
+ LocklessQueueSender::Make(queue, chrono::nanoseconds(1)).value();
+ for (int i = 0; i < 5; ++i) {
+ char data[100];
+ size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
+ ASSERT_EQ(sender.Send(data, s + 1, monotonic_clock::min_time,
+ realtime_clock::min_time, 0xffffffffl,
+ UUID::Zero(), nullptr, nullptr, nullptr),
+ LocklessQueueSender::Result::GOOD);
+ }
+ },
+ [config, &tid](void *raw_memory) {
+ ::aos::ipc_lib::LocklessQueueMemory *const memory =
+ reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
+ LocklessQueue queue(memory, memory, config);
+ PretendOwnerDied(&memory->queue_setup_lock, tid.Get());
+
+ if (VLOG_IS_ON(1)) {
+ PrintLocklessQueueMemory(memory);
+ }
+
+ const int i = VerifyMessages(&queue, memory);
+
+ LocklessQueueSender sender =
+ LocklessQueueSender::Make(queue, chrono::nanoseconds(1)).value();
+ {
+ char data[100];
+ size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
+ ASSERT_EQ(sender.Send(data, s + 1, monotonic_clock::min_time,
+ realtime_clock::min_time, 0xffffffffl,
+ UUID::Zero(), nullptr, nullptr, nullptr),
+ LocklessQueueSender::Result::GOOD);
+ }
+
+ // Now, make sure we can send 1 message and receive it to confirm we
+ // haven't corrupted next_queue_index irrevocably.
+ const int newi = VerifyMessages(&queue, memory);
+ EXPECT_EQ(newi, i + 1);
+ });
+}
+
+#endif
+
} // namespace testing
} // namespace ipc_lib
} // namespace aos
diff --git a/aos/ipc_lib/queue_racer.h b/aos/ipc_lib/queue_racer.h
index 87f2cce..f0a2684 100644
--- a/aos/ipc_lib/queue_racer.h
+++ b/aos/ipc_lib/queue_racer.h
@@ -62,7 +62,8 @@
private:
// Wipes the queue memory out so we get a clean start.
void Reset() {
- memset(queue_.memory(), 0, LocklessQueueMemorySize(queue_.config()));
+ memset(reinterpret_cast<void *>(queue_.memory()), 0,
+ LocklessQueueMemorySize(queue_.config()));
}
// This is a separate method so that when all the ASSERT_* methods, we still