Make Fetch, FetchNext, and watchers see messages at the same time.
We had a bug where a no-arg watcher would prod a Fetch to happen,
expecting it to return the message just delivered. It wasn't finding a
message at that point in time, so the state machine wasn't progressing.
This was happening because watchers use Read() with the next queue
index to see if a message has arrived, and Fetch() calls LatestIndex()
to get the newest message. There is a very tiny window between when the
message is put in the message pointer queue in shared memory, and when
the next pointer is updated to document that. Watchers also look for
messages any time any event happens because it is cheap and you don't
want to go backwards, so nothing else was preventing the watcher from
racing with the sender.
There are 2 potential answers here.
1: Check in with LatestIndex() and use the min
2: Repair the next message pointer if it is behind inside
LatestIndex()
Longer term, we want to move timestamping after the publish compare +
exchange. That means we will need to drive even more things off the
publish compare + exchange which makes the message visible. So, 2 sets
us up better to complete that.
To test this, we have code in the queue death tests which produce a
snapshot of memory after each write into memory in the queue. Use that
to then trigger both a read and LatestIndex() after each write and
confirm they agree deterministically.
Change-Id: If63bc7cab1521a5a6dad5431961871c25aecaf9c
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index a1b0211..fd3a305 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -1283,19 +1283,20 @@
monotonic_clock::time_point *monotonic_remote_time,
realtime_clock::time_point *realtime_remote_time,
uint32_t *remote_queue_index, UUID *source_boot_uuid, size_t *length,
- char *data, std::function<bool(const Context &)> should_read) const {
- const size_t queue_size = memory_->queue_size();
+ char *data,
+ std::function<bool(const Context &)> should_read_callback) const {
+ const size_t queue_size = const_memory_->queue_size();
// Build up the QueueIndex.
const QueueIndex queue_index =
QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
// Read the message stored at the requested location.
- Index mi = memory_->LoadIndex(queue_index);
- const Message *m = memory_->GetMessage(mi);
+ Index mi = const_memory_->LoadIndex(queue_index);
+ const Message *m = const_memory_->GetMessage(mi);
while (true) {
- DCHECK(!CheckBothRedzones(memory_, m))
+ DCHECK(!CheckBothRedzones(const_memory_, m))
<< ": Invalid message found in shared memory";
// We need to confirm that the data doesn't change while we are reading it.
// Do that by first confirming that the message points to the queue index we
@@ -1313,7 +1314,7 @@
// Someone has re-used this message between when we pulled it out of the
// queue and when we grabbed its index. It is pretty hard to deduce
// what happened. Just try again.
- const Message *const new_m = memory_->GetMessage(queue_index);
+ const Message *const new_m = const_memory_->GetMessage(queue_index);
if (m != new_m) {
m = new_m;
VLOG(3) << "Retrying, m doesn't match";
@@ -1342,7 +1343,7 @@
// then they need to wait. They got ahead. Otherwise, they are
// asking for something crazy, like something before the beginning of
// the queue. Tell them that they are behind.
- if (uint32_queue_index < memory_->queue_size()) {
+ if (uint32_queue_index < const_memory_->queue_size()) {
VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
return Result::NOTHING_NEW;
} else {
@@ -1357,37 +1358,24 @@
// Then read the data out. Copy it all out to be deterministic and so we can
// make length be from either end.
- if (!should_read) {
- *monotonic_sent_time = m->header.monotonic_sent_time;
- *realtime_sent_time = m->header.realtime_sent_time;
- if (m->header.remote_queue_index == 0xffffffffu) {
- *remote_queue_index = queue_index.index();
- } else {
- *remote_queue_index = m->header.remote_queue_index;
- }
- *monotonic_remote_time = m->header.monotonic_remote_time;
- *realtime_remote_time = m->header.realtime_remote_time;
- *source_boot_uuid = m->header.source_boot_uuid;
- *length = m->header.length;
+ Context context;
+ context.monotonic_event_time = m->header.monotonic_sent_time;
+ context.realtime_event_time = m->header.realtime_sent_time;
+ context.monotonic_remote_time = m->header.monotonic_remote_time;
+ context.realtime_remote_time = m->header.realtime_remote_time;
+ context.queue_index = queue_index.index();
+ if (m->header.remote_queue_index == 0xffffffffu) {
+ context.remote_queue_index = context.queue_index;
} else {
- // Cache the header results so we don't modify the outputs unless the filter
- // function says "go".
- Context context;
- context.monotonic_event_time = m->header.monotonic_sent_time;
- context.realtime_event_time = m->header.realtime_sent_time;
- context.monotonic_remote_time = m->header.monotonic_remote_time;
- context.realtime_remote_time = m->header.realtime_remote_time;
- context.queue_index = queue_index.index();
- if (m->header.remote_queue_index == 0xffffffffu) {
- context.remote_queue_index = context.queue_index;
- } else {
- context.remote_queue_index = m->header.remote_queue_index;
- }
- context.source_boot_uuid = m->header.source_boot_uuid;
- context.size = m->header.length;
- context.data = nullptr;
- context.buffer_index = -1;
+ context.remote_queue_index = m->header.remote_queue_index;
+ }
+ context.source_boot_uuid = m->header.source_boot_uuid;
+ context.size = m->header.length;
+ context.data = nullptr;
+ context.buffer_index = -1;
+ // If the callback is provided, use it.
+ if (should_read_callback) {
// And finally, confirm that the message *still* points to the queue index
// we want. This means it didn't change out from under us. If something
// changed out from under us, we were reading it much too late in its
@@ -1404,24 +1392,22 @@
// We now know that the context is safe to use. See if we are supposed to
// take the message or not.
- if (!should_read(context)) {
+ if (!should_read_callback(context)) {
return Result::FILTERED;
}
-
- // And now take it.
- *monotonic_sent_time = context.monotonic_event_time;
- *realtime_sent_time = context.realtime_event_time;
- *remote_queue_index = context.remote_queue_index;
- *monotonic_remote_time = context.monotonic_remote_time;
- *realtime_remote_time = context.realtime_remote_time;
- *source_boot_uuid = context.source_boot_uuid;
- *length = context.size;
}
- if (data) {
- memcpy(data, m->data(memory_->message_data_size()),
- memory_->message_data_size());
- // Check again since we touched the message again.
+ // Read the data if requested.
+ if (data) {
+ memcpy(data, m->data(const_memory_->message_data_size()),
+ const_memory_->message_data_size());
+ }
+
+ // Now, we need to confirm that nothing has changed by re-reading the queue
+ // index from the header since we've read all the body. We only need to do it
+ // if we have read anything new after the previous check up above, which
+ // happens if we read the data, or if we didn't check for the filtered case.
+ if (data || !should_read_callback) {
aos_compiler_memory_barrier();
const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
if (final_queue_index != queue_index) {
@@ -1433,18 +1419,75 @@
}
}
+ // And now take it and make it visible to the user. By doing it here, we will
+ // never present partial or corrupted state to the user in the output
+ // pointers.
+ *monotonic_sent_time = context.monotonic_event_time;
+ *realtime_sent_time = context.realtime_event_time;
+ *remote_queue_index = context.remote_queue_index;
+ *monotonic_remote_time = context.monotonic_remote_time;
+ *realtime_remote_time = context.realtime_remote_time;
+ *source_boot_uuid = context.source_boot_uuid;
+ *length = context.size;
+
return Result::GOOD;
}
QueueIndex LocklessQueueReader::LatestIndex() const {
- const size_t queue_size = memory_->queue_size();
+ const size_t queue_size = const_memory_->queue_size();
- // There is only one interesting case. We need to know if the queue is empty.
- // That is done with a sentinel value. At worst, this will be off by one.
- const QueueIndex next_queue_index =
- memory_->next_queue_index.Load(queue_size);
- if (next_queue_index.valid()) {
- const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
+ // There are 2 main cases. Either the next queue index is right, or it is
+ // behind by 1 and wrong. If nothing has been published, the next queue index
+ // will be the reserved "Invalid" value, otherwise it will point to the next
+ // place to write. We need to figure out if it is right or wrong, and it if
+ // is wrong, fix it. If we don't, Read() can find the next message before
+ // LatestIndex() sees it if someone is hammering on Read() until it returns
+ // nothing new is left, which mean watchers and fetchers may disagree on when
+ // a message is published.
+ QueueIndex actual_next_queue_index =
+ const_memory_->next_queue_index.Load(queue_size);
+
+ // Handle the "nothing has been published" case by making next_queue_index
+ // point to the 0th index.
+ const QueueIndex next_queue_index = ZeroOrValid(actual_next_queue_index);
+
+ // This needs to synchronize with whoever the previous writer at this
+ // location was. Read what is there to see if the message has been published
+ // and next_queue_index is just behind.
+ Index to_replace = const_memory_->LoadIndex(next_queue_index);
+
+ // See if next_queue_index is consistent with the state of the queue. If it
+ // is not, try to atomically update next_queue_index in case the previous
+ // writer failed and retry.
+ if (to_replace.IsPlausible(next_queue_index)) {
+ // If next_queue_index ends up pointing to a message with a matching index,
+ // this is what next_queue_index needs to be updated to
+ const QueueIndex incremented_queue_index = next_queue_index.Increment();
+
+ // We don't care about the result. It will either succeed, or we got
+ // beat in fixing it. The way the Send logic works, the pointer can never
+ // get more than 1 behind or the next send will repair it. So, if we fail,
+ // that means that someone else got there first and fixed it up (and
+ // potentially someone further continued to send).
+ //
+ // Both require no further action from us. Worst case, our Next pointer
+ // will not be the latest message, but there will always be a point after
+ // which the index can change. We just need a consistent snapshot where
+ // there is nothing in the queue that isn't accounted for by
+ // next_queue_index.
+ memory_->next_queue_index.CompareAndExchangeStrong(actual_next_queue_index,
+ incremented_queue_index);
+
+ VLOG(3) << "next_queue_index is lagging, fixed it. Found " << std::hex
+ << to_replace.get() << ", expected "
+ << next_queue_index.DecrementBy(queue_size).index();
+
+ actual_next_queue_index = incremented_queue_index;
+ }
+
+ if (actual_next_queue_index.valid()) {
+ const QueueIndex current_queue_index =
+ actual_next_queue_index.DecrementBy(1u);
return current_queue_index;
}
return QueueIndex::Invalid();