Make Fetch, FetchNext, and watchers see messages at the same time.

We had a bug where a no-arg watcher would prod a Fetch to happen,
expecting it to return the message just delivered.  It wasn't finding a
message at that point in time, so the state machine wasn't progressing.

This was happening because watchers use Read() with the next queue
index to see if a message has arrived, and Fetch() calls LatestIndex()
to get the newest message.  There is a very tiny window between when the
message is put in the message pointer queue in shared memory, and when
the next pointer is updated to document that.  Watchers also look for
messages any time any event happens because it is cheap and you don't
want to go backwards, so nothing else was preventing the watcher from
racing with the sender.

There are 2 potential answers here.
  1: Check in with LatestIndex() and use the min
  2: Repair the next message pointer if it is behind inside
     LatestIndex()

Longer term, we want to move timestamping after the publish compare +
exchange.  That means we will need to drive even more things off the
publish compare + exchange which makes the message visible.  So, 2 sets
us up better to complete that.

To test this, we have code in the queue death tests which produce a
snapshot of memory after each write into memory in the queue.  Use that
to then trigger both a read and LatestIndex() after each write and
confirm they agree deterministically.

Change-Id: If63bc7cab1521a5a6dad5431961871c25aecaf9c
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 2cafb48..01f2b7c 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -414,14 +414,15 @@
     GOOD,
     // The message is in the future and we haven't written it yet.
     NOTHING_NEW,
-    // There is a message, but should_read() returned false so we didn't fetch
-    // it.
+    // There is a message, but should_read_callback() returned false so we
+    // didn't fetch it.
     FILTERED,
     // The message got overwritten while we were reading it.
     OVERWROTE,
   };
 
-  LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
+  LocklessQueueReader(LocklessQueue queue)
+      : memory_(queue.memory()), const_memory_(queue.const_memory()) {
     queue.Initialize();
   }
 
@@ -433,14 +434,14 @@
   // message, but the filter function returned false, return FILTERED.
   //
   // data may be nullptr to indicate the data should not be copied.
-  Result Read(uint32_t queue_index,
-              monotonic_clock::time_point *monotonic_sent_time,
-              realtime_clock::time_point *realtime_sent_time,
-              monotonic_clock::time_point *monotonic_remote_time,
-              realtime_clock::time_point *realtime_remote_time,
-              uint32_t *remote_queue_index, UUID *source_boot_uuid,
-              size_t *length, char *data,
-              std::function<bool(const Context &context)> should_read) const;
+  Result Read(
+      uint32_t queue_index, monotonic_clock::time_point *monotonic_sent_time,
+      realtime_clock::time_point *realtime_sent_time,
+      monotonic_clock::time_point *monotonic_remote_time,
+      realtime_clock::time_point *realtime_remote_time,
+      uint32_t *remote_queue_index, UUID *source_boot_uuid, size_t *length,
+      char *data,
+      std::function<bool(const Context &context)> should_read_callback) const;
 
   // Returns the index to the latest queue message.  Returns empty_queue_index()
   // if there are no messages in the queue.  Do note that this index wraps if
@@ -448,7 +449,8 @@
   QueueIndex LatestIndex() const;
 
  private:
-  const LocklessQueueMemory *const memory_;
+  LocklessQueueMemory *const memory_;
+  const LocklessQueueMemory *const_memory_;
 };
 
 // Returns the number of messages which are logically in the queue at a time.