Add remote timestamps and queue_index to context

This gives us the knobs to expose the remote timestamps and queue_index
to anything receiving the events.  The first use case is the logger.  It
can now log forwarded entries *without* having to make the
message_gateway responsible for logging this data.

Change-Id: Ie34dd040d270f4fa90ecd6e463069e1adca1818a
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 3fb506b..903150b 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -552,16 +552,30 @@
   return &message->data[0];
 }
 
-void LocklessQueue::Sender::Send(const char *data, size_t length) {
+void LocklessQueue::Sender::Send(
+    const char *data, size_t length,
+    aos::monotonic_clock::time_point monotonic_remote_time,
+    aos::realtime_clock::time_point realtime_remote_time,
+    uint32_t remote_queue_index,
+    aos::monotonic_clock::time_point *monotonic_sent_time,
+    aos::realtime_clock::time_point *realtime_sent_time,
+    uint32_t *queue_index) {
   CHECK_LE(length, size());
   // Flatbuffers write from the back of the buffer to the front.  If we are
   // going to write an explicit chunk of memory into the buffer, we need to
   // adhere to this convention and place it at the end.
   memcpy((reinterpret_cast<char *>(Data()) + size() - length), data, length);
-  Send(length);
+  Send(length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
+       monotonic_sent_time, realtime_sent_time, queue_index);
 }
 
-void LocklessQueue::Sender::Send(size_t length) {
+void LocklessQueue::Sender::Send(
+    size_t length, aos::monotonic_clock::time_point monotonic_remote_time,
+    aos::realtime_clock::time_point realtime_remote_time,
+    uint32_t remote_queue_index,
+    aos::monotonic_clock::time_point *monotonic_sent_time,
+    aos::realtime_clock::time_point *realtime_sent_time,
+    uint32_t *queue_index) {
   const size_t queue_size = memory_->queue_size();
   CHECK_LE(length, size());
 
@@ -572,6 +586,11 @@
   Message *const message = memory_->GetMessage(scratch_index);
 
   message->header.length = length;
+  // Pass these through.  Any alternative behavior can be implemented out a
+  // layer.
+  message->header.remote_queue_index = remote_queue_index;
+  message->header.monotonic_remote_time = monotonic_remote_time;
+  message->header.realtime_remote_time = realtime_remote_time;
 
   while (true) {
     const QueueIndex actual_next_queue_index =
@@ -625,6 +644,15 @@
 
     message->header.monotonic_sent_time = ::aos::monotonic_clock::now();
     message->header.realtime_sent_time = ::aos::realtime_clock::now();
+    if (monotonic_sent_time != nullptr) {
+      *monotonic_sent_time = message->header.monotonic_sent_time;
+    }
+    if (realtime_sent_time != nullptr) {
+      *realtime_sent_time = message->header.realtime_sent_time;
+    }
+    if (queue_index != nullptr) {
+      *queue_index = next_queue_index.index();
+    }
 
     // Before we are fully done filling out the message, update the Sender state
     // with the new index to write.  This re-uses the barrier for the
@@ -676,8 +704,10 @@
 LocklessQueue::ReadResult LocklessQueue::Read(
     uint32_t uint32_queue_index,
     ::aos::monotonic_clock::time_point *monotonic_sent_time,
-    ::aos::realtime_clock::time_point *realtime_sent_time, size_t *length,
-    char *data) {
+    ::aos::realtime_clock::time_point *realtime_sent_time,
+    ::aos::monotonic_clock::time_point *monotonic_remote_time,
+    ::aos::realtime_clock::time_point *realtime_remote_time,
+    uint32_t *remote_queue_index, size_t *length, char *data) {
   const size_t queue_size = memory_->queue_size();
 
   // Build up the QueueIndex.
@@ -751,6 +781,13 @@
   // make length be from either end.
   *monotonic_sent_time = m->header.monotonic_sent_time;
   *realtime_sent_time = m->header.realtime_sent_time;
+  if (m->header.remote_queue_index == 0xffffffffu) {
+    *remote_queue_index = queue_index.index();
+  } else {
+    *remote_queue_index = m->header.remote_queue_index;
+  }
+  *monotonic_remote_time = m->header.monotonic_remote_time;
+  *realtime_remote_time = m->header.realtime_remote_time;
   memcpy(data, &m->data[0], message_data_size());
   *length = m->header.length;
 
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index d539386..976f758 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -65,6 +65,13 @@
     // fails.
     ::aos::monotonic_clock::time_point monotonic_sent_time;
     ::aos::realtime_clock::time_point realtime_sent_time;
+    // Timestamps of the message from the remote node.  These are transparently
+    // passed through.
+    ::aos::monotonic_clock::time_point monotonic_remote_time;
+    ::aos::realtime_clock::time_point realtime_remote_time;
+
+    // Queue index from the remote node.
+    uint32_t remote_queue_index;
 
     size_t length;
   } header;
@@ -146,7 +153,9 @@
   ReadResult Read(uint32_t queue_index,
                   ::aos::monotonic_clock::time_point *monotonic_sent_time,
                   ::aos::realtime_clock::time_point *realtime_sent_time,
-                  size_t *length, char *data);
+                  ::aos::monotonic_clock::time_point *monotonic_remote_time,
+                  ::aos::realtime_clock::time_point *realtime_remote_time,
+                  uint32_t *remote_queue_index, size_t *length, char *data);
 
   // Returns the index to the latest queue message.  Returns empty_queue_index()
   // if there are no messages in the queue.  Do note that this index wraps if
@@ -195,10 +204,26 @@
     // Note: calls to Data() are expensive enough that you should cache it.
     size_t size();
     void *Data();
-    void Send(size_t length);
+    void Send(size_t length,
+              aos::monotonic_clock::time_point monotonic_remote_time =
+                  aos::monotonic_clock::min_time,
+              aos::realtime_clock::time_point realtime_remote_time =
+                  aos::realtime_clock::min_time,
+              uint32_t remote_queue_index = 0xffffffff,
+              aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+              aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+              uint32_t *queue_index = nullptr);
 
     // Sends up to length data.  Does not wakeup the target.
-    void Send(const char *data, size_t length);
+    void Send(const char *data, size_t length,
+              aos::monotonic_clock::time_point monotonic_remote_time =
+                  aos::monotonic_clock::min_time,
+              aos::realtime_clock::time_point realtime_remote_time =
+                  aos::realtime_clock::min_time,
+              uint32_t remote_queue_index = 0xffffffff,
+              aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+              aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+              uint32_t *queue_index = nullptr);
 
    private:
     friend class LocklessQueue;
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index e3d6c5e..213c9e4 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -584,12 +584,16 @@
         while (true) {
           ::aos::monotonic_clock::time_point monotonic_sent_time;
           ::aos::realtime_clock::time_point realtime_sent_time;
+          ::aos::monotonic_clock::time_point monotonic_remote_time;
+          ::aos::realtime_clock::time_point realtime_remote_time;
+          uint32_t remote_queue_index;
           char read_data[1024];
           size_t length;
 
           LocklessQueue::ReadResult read_result =
-              queue.Read(i, &monotonic_sent_time, &realtime_sent_time, &length,
-                         &(read_data[0]));
+              queue.Read(i, &monotonic_sent_time, &realtime_sent_time,
+                         &monotonic_remote_time, &realtime_remote_time,
+                         &remote_queue_index, &length, &(read_data[0]));
 
           if (read_result != LocklessQueue::ReadResult::GOOD) {
             break;
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index b9cb54d..109c2ea 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -260,6 +260,9 @@
     // Read a result from 5 in the past.
     ::aos::monotonic_clock::time_point monotonic_sent_time;
     ::aos::realtime_clock::time_point realtime_sent_time;
+    ::aos::monotonic_clock::time_point monotonic_remote_time;
+    ::aos::realtime_clock::time_point realtime_remote_time;
+    uint32_t remote_queue_index;
     char read_data[1024];
     size_t length;
 
@@ -271,7 +274,8 @@
     }
     LocklessQueue::ReadResult read_result =
         queue.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
-                   &length, &(read_data[0]));
+                   &monotonic_remote_time, &realtime_remote_time,
+                   &remote_queue_index, &length, &(read_data[0]));
 
     // This should either return GOOD, or TOO_OLD if it is before the start of
     // the queue.
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 350350c..9bb0a70 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -251,6 +251,9 @@
        i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
     ::aos::monotonic_clock::time_point monotonic_sent_time;
     ::aos::realtime_clock::time_point realtime_sent_time;
+    ::aos::monotonic_clock::time_point monotonic_remote_time;
+    ::aos::realtime_clock::time_point realtime_remote_time;
+    uint32_t remote_queue_index;
     size_t length;
     char read_data[1024];
 
@@ -259,7 +262,8 @@
                                        0xffffffffu, queue.QueueSize()));
     LocklessQueue::ReadResult read_result =
         queue.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
-                   &length, &(read_data[0]));
+                   &monotonic_remote_time, &realtime_remote_time,
+                   &remote_queue_index, &length, &(read_data[0]));
 
     if (race_reads) {
       if (read_result == LocklessQueue::ReadResult::NOTHING_NEW) {
@@ -280,6 +284,9 @@
     ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
     last_monotonic_sent_time = monotonic_sent_time;
 
+    EXPECT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
+    EXPECT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
+
     ThreadPlusCount tpc;
     ASSERT_EQ(length, sizeof(ThreadPlusCount));
     memcpy(&tpc, read_data + queue.message_data_size() - length,