Make Fetch, FetchNext, and watchers see messages at the same time.

We had a bug where a no-arg watcher would prod a Fetch to happen,
expecting it to return the message just delivered.  It wasn't finding a
message at that point in time, so the state machine wasn't progressing.

This was happening because watchers use Read() with the next queue
index to see if a message has arrived, and Fetch() calls LatestIndex()
to get the newest message.  There is a very tiny window between when the
message is put in the message pointer queue in shared memory, and when
the next pointer is updated to document that.  Watchers also look for
messages any time any event happens because it is cheap and you don't
want to go backwards, so nothing else was preventing the watcher from
racing with the sender.

There are 2 potential answers here.
  1: Check in with LatestIndex() and use the min
  2: Repair the next message pointer if it is behind inside
     LatestIndex()

Longer term, we want to move timestamping after the publish compare +
exchange.  That means we will need to drive even more things off the
publish compare + exchange which makes the message visible.  So, 2 sets
us up better to complete that.

To test this, we have code in the queue death tests which produce a
snapshot of memory after each write into memory in the queue.  Use that
to then trigger both a read and LatestIndex() after each write and
confirm they agree deterministically.

Change-Id: If63bc7cab1521a5a6dad5431961871c25aecaf9c
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 93ae2a3..58e093f 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -1,5 +1,6 @@
 #include "aos/ipc_lib/lockless_queue.h"
 
+#include <sys/mman.h>
 #include <unistd.h>
 #include <wait.h>
 
@@ -16,6 +17,8 @@
 #include "aos/events/epoll.h"
 #include "aos/ipc_lib/aos_sync.h"
 #include "aos/ipc_lib/event.h"
+#include "aos/ipc_lib/lockless_queue_memory.h"
+#include "aos/ipc_lib/lockless_queue_stepping.h"
 #include "aos/ipc_lib/queue_racer.h"
 #include "aos/ipc_lib/signalfd.h"
 #include "aos/realtime.h"
@@ -421,6 +424,148 @@
          static_cast<double>(kNumMessages) / elapsed_seconds);
 }
 
+#if defined(SUPPORTS_SHM_ROBUSTNESS_TEST)
+
+// Verifies that LatestIndex points to the same message as the logic from
+// "FetchNext", which increments the index until it gets "NOTHING_NEW" back.
+// This is so we can confirm fetchers and watchers all see the same message at
+// the same point in time.
+int VerifyMessages(LocklessQueue *queue, LocklessQueueMemory *memory) {
+  LocklessQueueReader reader(*queue);
+
+  const ipc_lib::QueueIndex queue_index = reader.LatestIndex();
+  if (!queue_index.valid()) {
+    return 0;
+  }
+
+  // Now loop through the queue and make sure the number in the snprintf
+  // increments.
+  char last_data = '0';
+  int i = 0;
+
+  // Callback which isn't set so we don't exercise the conditional reading code.
+  std::function<bool(const Context &)> should_read_callback;
+
+  // Now, read as far as we can until we get NOTHING_NEW.  This simulates
+  // FetchNext.
+  while (true) {
+    monotonic_clock::time_point monotonic_sent_time;
+    realtime_clock::time_point realtime_sent_time;
+    monotonic_clock::time_point monotonic_remote_time;
+    realtime_clock::time_point realtime_remote_time;
+    uint32_t remote_queue_index;
+    UUID source_boot_uuid;
+    char read_data[1024];
+    size_t length;
+
+    LocklessQueueReader::Result read_result = reader.Read(
+        i, &monotonic_sent_time, &realtime_sent_time, &monotonic_remote_time,
+        &realtime_remote_time, &remote_queue_index, &source_boot_uuid, &length,
+        &(read_data[0]), std::ref(should_read_callback));
+
+    if (read_result != LocklessQueueReader::Result::GOOD) {
+      if (read_result == LocklessQueueReader::Result::TOO_OLD) {
+        ++i;
+        continue;
+      }
+      CHECK(read_result == LocklessQueueReader::Result::NOTHING_NEW)
+          << ": " << static_cast<int>(read_result);
+      break;
+    }
+
+    EXPECT_GT(read_data[LocklessQueueMessageDataSize(memory) - length + 6],
+              last_data)
+        << ": Got " << read_data << " for " << i;
+    last_data = read_data[LocklessQueueMessageDataSize(memory) - length + 6];
+
+    ++i;
+  }
+
+  // The latest queue index should match the fetched queue index.
+  if (i == 0) {
+    EXPECT_FALSE(queue_index.valid());
+  } else {
+    EXPECT_EQ(queue_index.index(), i - 1);
+  }
+  return i;
+}
+
+// Tests that at all points in the publish step, fetch == fetch next.  This
+// means that there is an atomic point at which the message is viewed as visible
+// to consumers.  Do this by killing the writer after each change to shared
+// memory, and confirming fetch == fetch next each time.
+TEST_F(LocklessQueueTest, FetchEqFetchNext) {
+  SharedTid tid;
+
+  // Make a small queue so it is easier to debug.
+  LocklessQueueConfiguration config;
+  config.num_watchers = 1;
+  config.num_senders = 2;
+  config.num_pinners = 0;
+  config.queue_size = 3;
+  config.message_data_size = 32;
+
+  TestShmRobustness(
+      config,
+      [config, &tid](void *memory) {
+        // Initialize the queue.
+        LocklessQueue(
+            reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+            reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+            config)
+            .Initialize();
+        tid.Set();
+      },
+      [config](void *memory) {
+        LocklessQueue queue(
+            reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+            reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+            config);
+        // Now try to write some messages.  We will get killed a bunch as this
+        // tries to happen.
+        LocklessQueueSender sender =
+            LocklessQueueSender::Make(queue, chrono::nanoseconds(1)).value();
+        for (int i = 0; i < 5; ++i) {
+          char data[100];
+          size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
+          ASSERT_EQ(sender.Send(data, s + 1, monotonic_clock::min_time,
+                                realtime_clock::min_time, 0xffffffffl,
+                                UUID::Zero(), nullptr, nullptr, nullptr),
+                    LocklessQueueSender::Result::GOOD);
+        }
+      },
+      [config, &tid](void *raw_memory) {
+        ::aos::ipc_lib::LocklessQueueMemory *const memory =
+            reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
+        LocklessQueue queue(memory, memory, config);
+        PretendOwnerDied(&memory->queue_setup_lock, tid.Get());
+
+        if (VLOG_IS_ON(1)) {
+          PrintLocklessQueueMemory(memory);
+        }
+
+        const int i = VerifyMessages(&queue, memory);
+
+        LocklessQueueSender sender =
+            LocklessQueueSender::Make(queue, chrono::nanoseconds(1)).value();
+        {
+          char data[100];
+          size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
+          ASSERT_EQ(sender.Send(data, s + 1, monotonic_clock::min_time,
+                                realtime_clock::min_time, 0xffffffffl,
+                                UUID::Zero(), nullptr, nullptr, nullptr),
+                    LocklessQueueSender::Result::GOOD);
+        }
+
+        // Now, make sure we can send 1 message and receive it to confirm we
+        // haven't corrupted next_queue_index irrevocably.
+        const int newi = VerifyMessages(&queue, memory);
+        EXPECT_EQ(newi, i + 1);
+      });
+}
+
+#endif
+
 }  // namespace testing
 }  // namespace ipc_lib
 }  // namespace aos