Expose a unique index for each event loop buffer

This can be helpful for indexing into other datastructures based on the
messages, by giving an identifier which will be unique as long as the
message is pinned.

Change-Id: I49ce18fba25a796005e64b40e5d1d5c55ca15543
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 606849a..993011f 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -1,8 +1,9 @@
 #include "aos/events/event_loop_param_test.h"
 
 #include <chrono>
+#include <unordered_map>
+#include <unordered_set>
 
-#include "aos/events/test_message_generated.h"
 #include "aos/flatbuffer_merge.h"
 #include "glog/logging.h"
 #include "gmock/gmock.h"
@@ -14,6 +15,54 @@
 namespace chrono = ::std::chrono;
 }  // namespace
 
+::std::unique_ptr<EventLoop> AbstractEventLoopTest::Make(
+    std::string_view name) {
+  std::string name_copy(name);
+  if (name == "") {
+    name_copy = "loop";
+    name_copy += std::to_string(event_loop_count_);
+  }
+  ++event_loop_count_;
+  return factory_->Make(name_copy);
+}
+
+void AbstractEventLoopTest::VerifyBuffers(
+    int number_buffers,
+    std::vector<std::reference_wrapper<const Fetcher<TestMessage>>> fetchers,
+    std::vector<std::reference_wrapper<const Sender<TestMessage>>> senders) {
+  // The buffers which are in a sender.
+  std::unordered_set<int> in_sender;
+  for (const Sender<TestMessage> &sender : senders) {
+    const int this_buffer = sender.buffer_index();
+    CHECK_GE(this_buffer, 0);
+    CHECK_LT(this_buffer, number_buffers);
+    CHECK(in_sender.insert(this_buffer).second) << ": " << this_buffer;
+  }
+
+  if (read_method() != ReadMethod::PIN) {
+    // If we're not using PIN, we can't really verify anything about what
+    // buffers the fetchers have.
+    return;
+  }
+
+  // Mapping from TestMessage::value to buffer index.
+  std::unordered_map<int, int> fetcher_values;
+  for (const Fetcher<TestMessage> &fetcher : fetchers) {
+    if (!fetcher.get()) {
+      continue;
+    }
+    const int this_buffer = fetcher.context().buffer_index;
+    CHECK_GE(this_buffer, 0);
+    CHECK_LT(this_buffer, number_buffers);
+    CHECK(in_sender.count(this_buffer) == 0) << ": " << this_buffer;
+    const auto insert_result = fetcher_values.insert(
+        std::make_pair(fetcher.get()->value(), this_buffer));
+    if (!insert_result.second) {
+      CHECK_EQ(this_buffer, insert_result.first->second);
+    }
+  }
+}
+
 // Tests that watcher can receive messages from a sender.
 // Also tests that OnRun() works.
 TEST_P(AbstractEventLoopTest, Basic) {
@@ -92,6 +141,7 @@
   loop2->MakeNoArgWatcher<TestMessage>("/test", [&]() {
     EXPECT_GT(loop2->context().size, 0u);
     EXPECT_EQ(nullptr, loop2->context().data);
+    EXPECT_EQ(-1, loop2->context().buffer_index);
     this->Exit();
   });
 
@@ -222,6 +272,7 @@
   EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
   EXPECT_EQ(fetcher.context().size, 0u);
   EXPECT_EQ(fetcher.context().data, nullptr);
+  EXPECT_EQ(fetcher.context().buffer_index, -1);
 
   aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
   TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
@@ -248,6 +299,13 @@
   EXPECT_EQ(fetcher.context().queue_index, 0x0u);
   EXPECT_EQ(fetcher.context().size, 20u);
   EXPECT_NE(fetcher.context().data, nullptr);
+  if (read_method() == ReadMethod::PIN) {
+    EXPECT_GE(fetcher.context().buffer_index, 0);
+    EXPECT_LT(fetcher.context().buffer_index,
+              loop2->NumberBuffers(fetcher.channel()));
+  } else {
+    EXPECT_EQ(fetcher.context().buffer_index, -1);
+  }
 }
 
 // Tests that watcher will receive all messages sent if they are sent after
@@ -603,17 +661,32 @@
       fetchers.emplace_back(fetch_loop->MakeFetcher<TestMessage>("/test"));
     }
     send_message(1);
+    const auto verify_buffers = [&]() {
+      std::vector<std::reference_wrapper<const Fetcher<TestMessage>>>
+          fetchers_copy;
+      for (const auto &fetcher : fetchers) {
+        fetchers_copy.emplace_back(fetcher);
+      }
+      std::vector<std::reference_wrapper<const Sender<TestMessage>>>
+          senders_copy;
+      senders_copy.emplace_back(sender);
+      VerifyBuffers(send_loop->NumberBuffers(sender.channel()), fetchers_copy,
+                    senders_copy);
+    };
     for (auto &fetcher : fetchers) {
       ASSERT_TRUE(fetcher.Fetch());
+      verify_buffers();
       EXPECT_EQ(1, fetcher.get()->value());
     }
 
     for (int save = 1; save <= max_save; ++save) {
       SCOPED_TRACE("save=" + std::to_string(save));
       send_message(100 + save);
+      verify_buffers();
       for (size_t i = 0; i < fetchers.size() - save; ++i) {
         SCOPED_TRACE("fetcher=" + std::to_string(i));
         ASSERT_TRUE(fetchers[i].Fetch());
+        verify_buffers();
         EXPECT_EQ(100 + save, fetchers[i].get()->value());
       }
       for (size_t i = fetchers.size() - save; i < fetchers.size() - 1; ++i) {
@@ -625,6 +698,7 @@
 
     for (int i = 0; i < 300; ++i) {
       send_message(200 + i);
+      verify_buffers();
     }
 
     for (size_t i = 0; i < fetchers.size() - max_save; ++i) {
@@ -879,6 +953,7 @@
     EXPECT_EQ(loop->context().queue_index, 0xffffffffu);
     EXPECT_EQ(loop->context().size, 0u);
     EXPECT_EQ(loop->context().data, nullptr);
+    EXPECT_EQ(loop->context().buffer_index, -1);
 
     expected_times.push_back(loop->context().monotonic_event_time);
     if (times.size() == kCount) {
@@ -1121,6 +1196,15 @@
     EXPECT_LT(&msg, reinterpret_cast<const void *>(
                         reinterpret_cast<const char *>(loop1->context().data) +
                         loop1->context().size));
+    if (read_method() == ReadMethod::PIN) {
+      EXPECT_GE(loop1->context().buffer_index, 0);
+      EXPECT_LT(loop1->context().buffer_index,
+                loop1->NumberBuffers(
+                    configuration::GetChannel(loop1->configuration(), "/test",
+                                              "aos.TestMessage", "", nullptr)));
+    } else {
+      EXPECT_EQ(-1, loop1->context().buffer_index);
+    }
     triggered = true;
   });
 
@@ -1293,6 +1377,7 @@
             EXPECT_EQ(loop1->context().queue_index, 0xffffffffu);
             EXPECT_EQ(loop1->context().size, 0u);
             EXPECT_EQ(loop1->context().data, nullptr);
+            EXPECT_EQ(loop1->context().buffer_index, -1);
 
             if (times.size() == kCount) {
               LOG(INFO) << "Exiting";