Test pinning in lockless_queue_death_test

Change-Id: I1bb851c027e4a2546e148cc3ef5e42965128298f
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index 3d8812d..b4bb66a 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -487,6 +487,9 @@
   }
   return false;
 }
+
+static int kPinnedMessageIndex = 0;
+
 }  // namespace
 
 // Tests that death during sends is recovered from correctly.
@@ -504,7 +507,7 @@
   config.num_watchers = 2;
   config.num_senders = 2;
   config.num_pinners = 1;
-  config.queue_size = 4;
+  config.queue_size = 2;
   config.message_data_size = 32;
 
   TestShmRobustness(
@@ -523,13 +526,19 @@
             reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
             reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
             config);
-        // Now try to write 2 messages.  We will get killed a bunch as this
+        // Now try to write some messages.  We will get killed a bunch as this
         // tries to happen.
         LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
-        for (int i = 0; i < 2; ++i) {
+        LocklessQueuePinner pinner = LocklessQueuePinner::Make(queue).value();
+        for (int i = 0; i < 5; ++i) {
           char data[100];
           size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
           sender.Send(data, s + 1);
+          // Pin a message, so when we keep writing we will exercise the pinning
+          // logic.
+          if (i == 1) {
+            CHECK_EQ(pinner.PinIndex(1), kPinnedMessageIndex);
+          }
         }
       },
       [config, tid](void *raw_memory) {
@@ -560,15 +569,31 @@
         }
 
         // Building and destroying a sender will clean up the queue.
-        {
-          LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
-        }
+        LocklessQueueSender::Make(queue).value();
 
         if (print) {
           printf("Cleaned up version:\n");
           PrintLocklessQueueMemory(memory);
         }
 
+        LocklessQueueReader reader(queue);
+
+        // Verify that the pinned message still has its contents. Note that we
+        // need to do this _before_ sending more messages, because the pinner
+        // has been cleaned up.
+        {
+          const Message *const message =
+              memory->GetMessage(Index(1, kPinnedMessageIndex));
+          const auto queue_index =
+              message->header.queue_index.Load(memory->queue_size());
+          if (queue_index.valid()) {
+            const char *const data = message->data(memory->message_data_size());
+            EXPECT_EQ(data[LocklessQueueMessageDataSize(memory) -
+                           message->header.length + 6],
+                      '2');
+          }
+        }
+
         {
           LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
           {
@@ -583,8 +608,6 @@
           sender.Send(data, s + 1);
         }
 
-        LocklessQueueReader reader(queue);
-
         // Now loop through the queue and make sure the number in the snprintf
         // increments.
         char last_data = '0';
@@ -604,6 +627,12 @@
                           &remote_queue_index, &length, &(read_data[0]));
 
           if (read_result != LocklessQueueReader::Result::GOOD) {
+            if (read_result == LocklessQueueReader::Result::TOO_OLD) {
+              ++i;
+              continue;
+            }
+            CHECK(read_result == LocklessQueueReader::Result::NOTHING_NEW)
+                << ": " << static_cast<int>(read_result);
             break;
           }
 
@@ -618,7 +647,7 @@
         }
 
         // Confirm our message got through.
-        EXPECT_EQ(last_data, '9');
+        EXPECT_EQ(last_data, '9') << ": Got through " << i;
       },
       /* prepare_in_child = true */ true);
 }