Add remote_boot_uuid to Context

This lets us track which boot a message came from and finally fix the
logger relying on ServerStatistics having all the required information
needed to build up the logfile header.

Change-Id: I17fc4c5718d5d69c7a1e154afdd83b1ccb388a8f
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 22f7c6f..cc5a356 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -75,6 +75,10 @@
   // the caller has access to this context, which makes this pretty useless.
   int buffer_index;
 
+  // UUID of the remote node which sent this message, or this node in the case
+  // of events which are local to this node.
+  UUID remote_boot_uuid = UUID::Zero();
+
   // Efficiently copies the flatbuffer into a FlatbufferVector, allocating
   // memory in the process.  It is vital that T matches the type of the
   // underlying flatbuffer.
@@ -150,7 +154,7 @@
   bool Send(size_t size);
   bool Send(size_t size, monotonic_clock::time_point monotonic_remote_time,
             realtime_clock::time_point realtime_remote_time,
-            uint32_t remote_queue_index);
+            uint32_t remote_queue_index, const UUID &remote_boot_uuid);
 
   // Sends a single block of data by copying it.
   // The remote arguments have the same meaning as in Send above.
@@ -158,7 +162,7 @@
   bool Send(const void *data, size_t size,
             monotonic_clock::time_point monotonic_remote_time,
             realtime_clock::time_point realtime_remote_time,
-            uint32_t remote_queue_index);
+            uint32_t remote_queue_index, const UUID &remote_boot_uuid);
 
   const Channel *channel() const { return channel_; }
 
@@ -195,13 +199,15 @@
   friend class EventLoop;
 
   virtual bool DoSend(const void *data, size_t size,
-                      aos::monotonic_clock::time_point monotonic_remote_time,
-                      aos::realtime_clock::time_point realtime_remote_time,
-                      uint32_t remote_queue_index) = 0;
+                      monotonic_clock::time_point monotonic_remote_time,
+                      realtime_clock::time_point realtime_remote_time,
+                      uint32_t remote_queue_index,
+                      const UUID &remote_boot_uuid) = 0;
   virtual bool DoSend(size_t size,
-                      aos::monotonic_clock::time_point monotonic_remote_time,
-                      aos::realtime_clock::time_point realtime_remote_time,
-                      uint32_t remote_queue_index) = 0;
+                      monotonic_clock::time_point monotonic_remote_time,
+                      realtime_clock::time_point realtime_remote_time,
+                      uint32_t remote_queue_index,
+                      const UUID &remote_boot_uuid) = 0;
 
   EventLoop *const event_loop_;
   const Channel *const channel_;
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index e0c6024..ee77b8a 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -274,6 +274,7 @@
   EXPECT_EQ(fetcher.context().monotonic_remote_time, monotonic_clock::min_time);
   EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
   EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
+  EXPECT_EQ(fetcher.context().remote_boot_uuid, UUID::Zero());
   EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
   EXPECT_EQ(fetcher.context().size, 0u);
   EXPECT_EQ(fetcher.context().data, nullptr);
@@ -301,6 +302,7 @@
   EXPECT_LE(fetcher.context().monotonic_event_time, monotonic_now + kEpsilon);
   EXPECT_GE(fetcher.context().realtime_event_time, realtime_now - kEpsilon);
   EXPECT_LE(fetcher.context().realtime_event_time, realtime_now + kEpsilon);
+  EXPECT_EQ(fetcher.context().remote_boot_uuid, loop2->boot_uuid());
   EXPECT_EQ(fetcher.context().queue_index, 0x0u);
   EXPECT_EQ(fetcher.context().size, 20u);
   EXPECT_NE(fetcher.context().data, nullptr);
@@ -955,6 +957,7 @@
     EXPECT_EQ(loop->context().monotonic_remote_time, monotonic_clock::min_time);
     EXPECT_EQ(loop->context().realtime_event_time, realtime_clock::min_time);
     EXPECT_EQ(loop->context().realtime_remote_time, realtime_clock::min_time);
+    EXPECT_EQ(loop->context().remote_boot_uuid, loop->boot_uuid());
     EXPECT_EQ(loop->context().queue_index, 0xffffffffu);
     EXPECT_EQ(loop->context().size, 0u);
     EXPECT_EQ(loop->context().data, nullptr);
@@ -1249,6 +1252,7 @@
               loop1->context().monotonic_event_time);
     EXPECT_EQ(loop1->context().realtime_remote_time,
               loop1->context().realtime_event_time);
+    EXPECT_EQ(loop1->context().remote_boot_uuid, loop1->boot_uuid());
 
     const aos::monotonic_clock::time_point monotonic_now =
         loop1->monotonic_now();
@@ -1296,6 +1300,7 @@
             fetcher.context().realtime_remote_time);
   EXPECT_EQ(fetcher.context().monotonic_event_time,
             fetcher.context().monotonic_remote_time);
+  EXPECT_EQ(fetcher.context().remote_boot_uuid, loop1->boot_uuid());
 
   EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500))
       << ": Got "
@@ -1348,6 +1353,7 @@
               loop1->context().monotonic_event_time);
     EXPECT_EQ(loop1->context().realtime_remote_time,
               loop1->context().realtime_event_time);
+    EXPECT_EQ(loop1->context().remote_boot_uuid, loop1->boot_uuid());
 
     const aos::monotonic_clock::time_point monotonic_now =
         loop1->monotonic_now();
@@ -1381,6 +1387,7 @@
             fetcher.context().realtime_remote_time);
   EXPECT_EQ(fetcher.context().monotonic_event_time,
             fetcher.context().monotonic_remote_time);
+  EXPECT_EQ(fetcher.context().remote_boot_uuid, loop1->boot_uuid());
 
   EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500))
       << ": Got "
@@ -1438,6 +1445,7 @@
 
             EXPECT_EQ(loop1->context().monotonic_remote_time,
                       monotonic_clock::min_time);
+            EXPECT_EQ(loop1->context().remote_boot_uuid, loop1->boot_uuid());
             EXPECT_EQ(loop1->context().realtime_event_time,
                       realtime_clock::min_time);
             EXPECT_EQ(loop1->context().realtime_remote_time,
@@ -1582,13 +1590,13 @@
     FlatbufferDetachedBuffer<timing::Report> primary_report =
         FlatbufferDetachedBuffer<timing::Report>::Empty();
     while (report_fetcher.FetchNext()) {
-      LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
+      VLOG(1) << "Report " << FlatbufferToJson(report_fetcher.get());
       if (report_fetcher->name()->string_view() == "primary") {
         primary_report = CopyFlatBuffer(report_fetcher.get());
       }
     }
 
-    LOG(INFO) << FlatbufferToJson(primary_report, {.multi_line = true});
+    VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
 
     EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
 
@@ -1849,6 +1857,7 @@
   const aos::realtime_clock::time_point realtime_remote_time =
       aos::realtime_clock::time_point(chrono::seconds(3132));
   const uint32_t remote_queue_index = 0x254971;
+  const UUID remote_boot_uuid = UUID::Random();
 
   std::unique_ptr<aos::RawSender> sender =
       loop1->MakeRawSender(configuration::GetChannel(
@@ -1860,19 +1869,21 @@
 
   loop2->OnRun([&]() {
     EXPECT_TRUE(sender->Send(kData.data(), kData.size(), monotonic_remote_time,
-                             realtime_remote_time, remote_queue_index));
+                             realtime_remote_time, remote_queue_index,
+                             remote_boot_uuid));
   });
 
   bool happened = false;
   loop2->MakeRawWatcher(
       configuration::GetChannel(loop2->configuration(), "/test",
                                 "aos.TestMessage", "", nullptr),
-      [this, monotonic_remote_time, realtime_remote_time,
+      [this, monotonic_remote_time, realtime_remote_time, remote_boot_uuid,
        remote_queue_index, &fetcher,
        &happened](const Context &context, const void * /*message*/) {
         happened = true;
         EXPECT_EQ(monotonic_remote_time, context.monotonic_remote_time);
         EXPECT_EQ(realtime_remote_time, context.realtime_remote_time);
+        EXPECT_EQ(remote_boot_uuid, context.remote_boot_uuid);
         EXPECT_EQ(remote_queue_index, context.remote_queue_index);
 
         ASSERT_TRUE(fetcher->Fetch());
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 74d3493..d6922de 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -131,15 +131,15 @@
 
 inline bool RawSender::Send(size_t size) {
   return Send(size, monotonic_clock::min_time, realtime_clock::min_time,
-              0xffffffffu);
+              0xffffffffu, event_loop_->boot_uuid());
 }
 
 inline bool RawSender::Send(
     size_t size, aos::monotonic_clock::time_point monotonic_remote_time,
     aos::realtime_clock::time_point realtime_remote_time,
-    uint32_t remote_queue_index) {
+    uint32_t remote_queue_index, const UUID &uuid) {
   if (DoSend(size, monotonic_remote_time, realtime_remote_time,
-             remote_queue_index)) {
+             remote_queue_index, uuid)) {
     timing_.size.Add(size);
     timing_.sender->mutate_count(timing_.sender->count() + 1);
     ftrace_.FormatMessage(
@@ -154,16 +154,16 @@
 
 inline bool RawSender::Send(const void *data, size_t size) {
   return Send(data, size, monotonic_clock::min_time, realtime_clock::min_time,
-              0xffffffffu);
+              0xffffffffu, event_loop_->boot_uuid());
 }
 
 inline bool RawSender::Send(
     const void *data, size_t size,
     aos::monotonic_clock::time_point monotonic_remote_time,
     aos::realtime_clock::time_point realtime_remote_time,
-    uint32_t remote_queue_index) {
+    uint32_t remote_queue_index, const UUID &uuid) {
   if (DoSend(data, size, monotonic_remote_time, realtime_remote_time,
-             remote_queue_index)) {
+             remote_queue_index, uuid)) {
     timing_.size.Add(size);
     timing_.sender->mutate_count(timing_.sender->count() + 1);
     ftrace_.FormatMessage(
@@ -190,6 +190,7 @@
   event_loop_->context_.size = 0;
   event_loop_->context_.data = nullptr;
   event_loop_->context_.buffer_index = -1;
+  event_loop_->context_.remote_boot_uuid = event_loop_->boot_uuid();
 
   ftrace_.FormatMessage(
       "timer: %.*s: start now=%" PRId64 " event=%" PRId64,
@@ -235,6 +236,7 @@
   event_loop_->context_.size = 0;
   event_loop_->context_.data = nullptr;
   event_loop_->context_.buffer_index = -1;
+  event_loop_->context_.remote_boot_uuid = event_loop_->boot_uuid();
 
   // Compute how many cycles elapsed and schedule the next wakeup.
   Reschedule(schedule, monotonic_start_time);
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 07b423f..098b78c 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -315,7 +315,8 @@
       for (size_t i = 1; i < filtered_parts.size(); ++i) {
         CHECK_EQ(filtered_parts[i].source_boot_uuid,
                  filtered_parts[0].source_boot_uuid)
-            << ": Found parts from different boots "
+            << ": Found parts from different boots for node "
+            << node->name()->string_view() << " "
             << LogFileVectorToString(log_files_);
       }
       if (!filtered_parts[0].source_boot_uuid.empty()) {
@@ -1125,7 +1126,12 @@
       timestamped_message.data.message().data()->Data(),
       timestamped_message.data.message().data()->size(),
       timestamped_message.monotonic_remote_time,
-      timestamped_message.realtime_remote_time, remote_queue_index);
+      timestamped_message.realtime_remote_time, remote_queue_index,
+      (channel_source_state_[timestamped_message.channel_index] != nullptr
+           ? CHECK_NOTNULL(
+                 channel_source_state_[timestamped_message.channel_index])
+                 ->event_loop_->boot_uuid()
+           : event_loop_->boot_uuid()));
   if (!sent) return false;
 
   if (queue_index_map_[timestamped_message.channel_index]) {
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 61a4149..63e1cb9 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -63,7 +63,7 @@
 }
 std::string ShmPath(std::string_view shm_base, const Channel *channel) {
   CHECK(channel->has_type());
-  return ShmFolder(shm_base, channel) + channel->type()->str() + ".v3";
+  return ShmFolder(shm_base, channel) + channel->type()->str() + ".v4";
 }
 
 void PageFaultDataWrite(char *data, size_t size) {
@@ -372,7 +372,7 @@
         queue_index.index(), &context_.monotonic_event_time,
         &context_.realtime_event_time, &context_.monotonic_remote_time,
         &context_.realtime_remote_time, &context_.remote_queue_index,
-        &context_.size, copy_buffer);
+        &context_.remote_boot_uuid, &context_.size, copy_buffer);
 
     if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
       if (pin_data()) {
@@ -535,13 +535,15 @@
   bool DoSend(size_t length,
               aos::monotonic_clock::time_point monotonic_remote_time,
               aos::realtime_clock::time_point realtime_remote_time,
-              uint32_t remote_queue_index) override {
+              uint32_t remote_queue_index,
+              const UUID &remote_boot_uuid) override {
     CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
         << ": Sent too big a message on "
         << configuration::CleanedChannelToString(channel());
-    CHECK(lockless_queue_sender_.Send(
-        length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
-        &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_))
+    CHECK(lockless_queue_sender_.Send(length, monotonic_remote_time,
+                                      realtime_remote_time, remote_queue_index,
+                                      remote_boot_uuid, &monotonic_sent_time_,
+                                      &realtime_sent_time_, &sent_queue_index_))
         << ": Somebody wrote outside the buffer of their message on channel "
         << configuration::CleanedChannelToString(channel());
 
@@ -552,14 +554,15 @@
   bool DoSend(const void *msg, size_t length,
               aos::monotonic_clock::time_point monotonic_remote_time,
               aos::realtime_clock::time_point realtime_remote_time,
-              uint32_t remote_queue_index) override {
+              uint32_t remote_queue_index,
+              const UUID &remote_boot_uuid) override {
     CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
         << ": Sent too big a message on "
         << configuration::CleanedChannelToString(channel());
     CHECK(lockless_queue_sender_.Send(
         reinterpret_cast<const char *>(msg), length, monotonic_remote_time,
-        realtime_remote_time, remote_queue_index, &monotonic_sent_time_,
-        &realtime_sent_time_, &sent_queue_index_))
+        realtime_remote_time, remote_queue_index, remote_boot_uuid,
+        &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_))
         << ": Somebody wrote outside the buffer of their message on channel "
         << configuration::CleanedChannelToString(channel());
     wake_upper_.Wakeup(event_loop()->priority());
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index cd91ab3..2a2b706 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -26,12 +26,12 @@
     }
 
     // Clean up anything left there before.
-    unlink((FLAGS_shm_base + "/test/aos.TestMessage.v3").c_str());
-    unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v3").c_str());
-    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v3").c_str());
-    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v3").c_str());
-    unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v3").c_str());
-    unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v3").c_str());
+    unlink((FLAGS_shm_base + "/test/aos.TestMessage.v4").c_str());
+    unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v4").c_str());
+    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v4").c_str());
+    unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v4").c_str());
+    unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v4").c_str());
+    unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v4").c_str());
   }
 
   ~ShmEventLoopTestFactory() { FLAGS_override_hostname = ""; }
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index b38829e..9d431b7 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -294,12 +294,14 @@
   bool DoSend(size_t length,
               aos::monotonic_clock::time_point monotonic_remote_time,
               aos::realtime_clock::time_point realtime_remote_time,
-              uint32_t remote_queue_index) override;
+              uint32_t remote_queue_index,
+              const UUID &remote_boot_uuid) override;
 
   bool DoSend(const void *msg, size_t size,
               aos::monotonic_clock::time_point monotonic_remote_time,
               aos::realtime_clock::time_point realtime_remote_time,
-              uint32_t remote_queue_index) override;
+              uint32_t remote_queue_index,
+              const UUID &remote_boot_uuid) override;
 
   int buffer_index() override {
     // First, ensure message_ is allocated.
@@ -834,10 +836,11 @@
   simulated_channel_->CountSenderDestroyed();
 }
 
-bool SimulatedSender::DoSend(
-    size_t length, aos::monotonic_clock::time_point monotonic_remote_time,
-    aos::realtime_clock::time_point realtime_remote_time,
-    uint32_t remote_queue_index) {
+bool SimulatedSender::DoSend(size_t length,
+                             monotonic_clock::time_point monotonic_remote_time,
+                             realtime_clock::time_point realtime_remote_time,
+                             uint32_t remote_queue_index,
+                             const UUID &remote_boot_uuid) {
   // The allocations in here are due to infrastructure and don't count in the
   // no mallocs in RT code.
   ScopedNotRealtime nrt;
@@ -847,6 +850,7 @@
   message_->context.remote_queue_index = remote_queue_index;
   message_->context.realtime_event_time = event_loop_->realtime_now();
   message_->context.realtime_remote_time = realtime_remote_time;
+  message_->context.remote_boot_uuid = remote_boot_uuid;
   CHECK_LE(length, message_->context.size);
   message_->context.size = length;
 
@@ -862,11 +866,11 @@
   return true;
 }
 
-bool SimulatedSender::DoSend(
-    const void *msg, size_t size,
-    aos::monotonic_clock::time_point monotonic_remote_time,
-    aos::realtime_clock::time_point realtime_remote_time,
-    uint32_t remote_queue_index) {
+bool SimulatedSender::DoSend(const void *msg, size_t size,
+                             monotonic_clock::time_point monotonic_remote_time,
+                             realtime_clock::time_point realtime_remote_time,
+                             uint32_t remote_queue_index,
+                             const UUID &remote_boot_uuid) {
   CHECK_LE(size, this->size())
       << ": Attempting to send too big a message on "
       << configuration::CleanedChannelToString(simulated_channel_->channel());
@@ -883,7 +887,7 @@
          msg, size);
 
   return DoSend(size, monotonic_remote_time, realtime_remote_time,
-                remote_queue_index);
+                remote_queue_index, remote_boot_uuid);
 }
 
 SimulatedTimerHandler::SimulatedTimerHandler(
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 60731c1..22d4028 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -1485,18 +1485,28 @@
 
   std::unique_ptr<EventLoop> pi1_remote_timestamp =
       simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
-  std::string expected_boot_uuid(
-      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
-          ->boot_uuid()
-          .ToString());
+  UUID expected_boot_uuid =
+      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid();
 
   int timestamp_count = 0;
   pi1_remote_timestamp->MakeWatcher(
+      "/pi2/aos", [&expected_boot_uuid,
+                   &pi1_remote_timestamp](const message_bridge::Timestamp &) {
+        EXPECT_EQ(pi1_remote_timestamp->context().remote_boot_uuid,
+                  expected_boot_uuid);
+      });
+  pi1_remote_timestamp->MakeWatcher(
+      "/test",
+      [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
+        EXPECT_EQ(pi1_remote_timestamp->context().remote_boot_uuid,
+                  expected_boot_uuid);
+      });
+  pi1_remote_timestamp->MakeWatcher(
       shared() ? "/pi1/aos/remote_timestamps/pi2"
                : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
       [&timestamp_count, &expected_boot_uuid](const RemoteMessage &header) {
         EXPECT_TRUE(header.has_boot_uuid());
-        EXPECT_EQ(header.boot_uuid()->string_view(), expected_boot_uuid);
+        EXPECT_EQ(UUID::FromString(header.boot_uuid()), expected_boot_uuid);
         VLOG(1) << aos::FlatbufferToJson(&header);
         ++timestamp_count;
       });
@@ -1511,7 +1521,7 @@
           EXPECT_TRUE(connection->has_boot_uuid());
           if (connection->node()->name()->string_view() == "pi2") {
             EXPECT_EQ(expected_boot_uuid,
-                      connection->boot_uuid()->string_view())
+                      UUID::FromString(connection->boot_uuid()))
                 << " : Got " << aos::FlatbufferToJson(&stats);
             ++pi1_server_statistics_count;
           }
@@ -1527,15 +1537,12 @@
   // Confirm that reboot changes the UUID.
   simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->Reboot();
 
-  EXPECT_NE(expected_boot_uuid,
-            simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
-                ->boot_uuid()
-                .ToString());
+  EXPECT_NE(
+      expected_boot_uuid,
+      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid());
 
   expected_boot_uuid =
-      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
-          ->boot_uuid()
-          .ToString();
+      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid();
   timestamp_count = 0;
   pi1_server_statistics_count = 0;
 
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index e5eb4cb..eb7f243 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -136,7 +136,8 @@
     sender_->Send(fetcher_->context().data, fetcher_->context().size,
                   fetcher_->context().monotonic_event_time,
                   fetcher_->context().realtime_event_time,
-                  fetcher_->context().queue_index);
+                  fetcher_->context().queue_index,
+                  fetcher_->context().remote_boot_uuid);
 
     // And simulate message_bridge's offset recovery.
     client_status_->SampleFilter(client_index_,