Add remote timestamps and queue_index to context
This gives us the knobs to expose the remote timestamps and queue_index
to anything receiving the events. The first use case is the logger. It
can now log forwarded entries *without* having to make the
message_gateway responsible for logging this data.
Change-Id: Ie34dd040d270f4fa90ecd6e463069e1adca1818a
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 3fb506b..903150b 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -552,16 +552,30 @@
return &message->data[0];
}
-void LocklessQueue::Sender::Send(const char *data, size_t length) {
+void LocklessQueue::Sender::Send(
+ const char *data, size_t length,
+ aos::monotonic_clock::time_point monotonic_remote_time,
+ aos::realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index,
+ aos::monotonic_clock::time_point *monotonic_sent_time,
+ aos::realtime_clock::time_point *realtime_sent_time,
+ uint32_t *queue_index) {
CHECK_LE(length, size());
// Flatbuffers write from the back of the buffer to the front. If we are
// going to write an explicit chunk of memory into the buffer, we need to
// adhere to this convention and place it at the end.
memcpy((reinterpret_cast<char *>(Data()) + size() - length), data, length);
- Send(length);
+ Send(length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
+ monotonic_sent_time, realtime_sent_time, queue_index);
}
-void LocklessQueue::Sender::Send(size_t length) {
+void LocklessQueue::Sender::Send(
+ size_t length, aos::monotonic_clock::time_point monotonic_remote_time,
+ aos::realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index,
+ aos::monotonic_clock::time_point *monotonic_sent_time,
+ aos::realtime_clock::time_point *realtime_sent_time,
+ uint32_t *queue_index) {
const size_t queue_size = memory_->queue_size();
CHECK_LE(length, size());
@@ -572,6 +586,11 @@
Message *const message = memory_->GetMessage(scratch_index);
message->header.length = length;
+ // Pass these through. Any alternative behavior can be implemented out a
+ // layer.
+ message->header.remote_queue_index = remote_queue_index;
+ message->header.monotonic_remote_time = monotonic_remote_time;
+ message->header.realtime_remote_time = realtime_remote_time;
while (true) {
const QueueIndex actual_next_queue_index =
@@ -625,6 +644,15 @@
message->header.monotonic_sent_time = ::aos::monotonic_clock::now();
message->header.realtime_sent_time = ::aos::realtime_clock::now();
+ if (monotonic_sent_time != nullptr) {
+ *monotonic_sent_time = message->header.monotonic_sent_time;
+ }
+ if (realtime_sent_time != nullptr) {
+ *realtime_sent_time = message->header.realtime_sent_time;
+ }
+ if (queue_index != nullptr) {
+ *queue_index = next_queue_index.index();
+ }
// Before we are fully done filling out the message, update the Sender state
// with the new index to write. This re-uses the barrier for the
@@ -676,8 +704,10 @@
LocklessQueue::ReadResult LocklessQueue::Read(
uint32_t uint32_queue_index,
::aos::monotonic_clock::time_point *monotonic_sent_time,
- ::aos::realtime_clock::time_point *realtime_sent_time, size_t *length,
- char *data) {
+ ::aos::realtime_clock::time_point *realtime_sent_time,
+ ::aos::monotonic_clock::time_point *monotonic_remote_time,
+ ::aos::realtime_clock::time_point *realtime_remote_time,
+ uint32_t *remote_queue_index, size_t *length, char *data) {
const size_t queue_size = memory_->queue_size();
// Build up the QueueIndex.
@@ -751,6 +781,13 @@
// make length be from either end.
*monotonic_sent_time = m->header.monotonic_sent_time;
*realtime_sent_time = m->header.realtime_sent_time;
+ if (m->header.remote_queue_index == 0xffffffffu) {
+ *remote_queue_index = queue_index.index();
+ } else {
+ *remote_queue_index = m->header.remote_queue_index;
+ }
+ *monotonic_remote_time = m->header.monotonic_remote_time;
+ *realtime_remote_time = m->header.realtime_remote_time;
memcpy(data, &m->data[0], message_data_size());
*length = m->header.length;
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index d539386..976f758 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -65,6 +65,13 @@
// fails.
::aos::monotonic_clock::time_point monotonic_sent_time;
::aos::realtime_clock::time_point realtime_sent_time;
+ // Timestamps of the message from the remote node. These are transparently
+ // passed through.
+ ::aos::monotonic_clock::time_point monotonic_remote_time;
+ ::aos::realtime_clock::time_point realtime_remote_time;
+
+ // Queue index from the remote node.
+ uint32_t remote_queue_index;
size_t length;
} header;
@@ -146,7 +153,9 @@
ReadResult Read(uint32_t queue_index,
::aos::monotonic_clock::time_point *monotonic_sent_time,
::aos::realtime_clock::time_point *realtime_sent_time,
- size_t *length, char *data);
+ ::aos::monotonic_clock::time_point *monotonic_remote_time,
+ ::aos::realtime_clock::time_point *realtime_remote_time,
+ uint32_t *remote_queue_index, size_t *length, char *data);
// Returns the index to the latest queue message. Returns empty_queue_index()
// if there are no messages in the queue. Do note that this index wraps if
@@ -195,10 +204,26 @@
// Note: calls to Data() are expensive enough that you should cache it.
size_t size();
void *Data();
- void Send(size_t length);
+ void Send(size_t length,
+ aos::monotonic_clock::time_point monotonic_remote_time =
+ aos::monotonic_clock::min_time,
+ aos::realtime_clock::time_point realtime_remote_time =
+ aos::realtime_clock::min_time,
+ uint32_t remote_queue_index = 0xffffffff,
+ aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+ aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+ uint32_t *queue_index = nullptr);
// Sends up to length data. Does not wakeup the target.
- void Send(const char *data, size_t length);
+ void Send(const char *data, size_t length,
+ aos::monotonic_clock::time_point monotonic_remote_time =
+ aos::monotonic_clock::min_time,
+ aos::realtime_clock::time_point realtime_remote_time =
+ aos::realtime_clock::min_time,
+ uint32_t remote_queue_index = 0xffffffff,
+ aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+ aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+ uint32_t *queue_index = nullptr);
private:
friend class LocklessQueue;
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index e3d6c5e..213c9e4 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -584,12 +584,16 @@
while (true) {
::aos::monotonic_clock::time_point monotonic_sent_time;
::aos::realtime_clock::time_point realtime_sent_time;
+ ::aos::monotonic_clock::time_point monotonic_remote_time;
+ ::aos::realtime_clock::time_point realtime_remote_time;
+ uint32_t remote_queue_index;
char read_data[1024];
size_t length;
LocklessQueue::ReadResult read_result =
- queue.Read(i, &monotonic_sent_time, &realtime_sent_time, &length,
- &(read_data[0]));
+ queue.Read(i, &monotonic_sent_time, &realtime_sent_time,
+ &monotonic_remote_time, &realtime_remote_time,
+ &remote_queue_index, &length, &(read_data[0]));
if (read_result != LocklessQueue::ReadResult::GOOD) {
break;
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index b9cb54d..109c2ea 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -260,6 +260,9 @@
// Read a result from 5 in the past.
::aos::monotonic_clock::time_point monotonic_sent_time;
::aos::realtime_clock::time_point realtime_sent_time;
+ ::aos::monotonic_clock::time_point monotonic_remote_time;
+ ::aos::realtime_clock::time_point realtime_remote_time;
+ uint32_t remote_queue_index;
char read_data[1024];
size_t length;
@@ -271,7 +274,8 @@
}
LocklessQueue::ReadResult read_result =
queue.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
- &length, &(read_data[0]));
+ &monotonic_remote_time, &realtime_remote_time,
+ &remote_queue_index, &length, &(read_data[0]));
// This should either return GOOD, or TOO_OLD if it is before the start of
// the queue.
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 350350c..9bb0a70 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -251,6 +251,9 @@
i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
::aos::monotonic_clock::time_point monotonic_sent_time;
::aos::realtime_clock::time_point realtime_sent_time;
+ ::aos::monotonic_clock::time_point monotonic_remote_time;
+ ::aos::realtime_clock::time_point realtime_remote_time;
+ uint32_t remote_queue_index;
size_t length;
char read_data[1024];
@@ -259,7 +262,8 @@
0xffffffffu, queue.QueueSize()));
LocklessQueue::ReadResult read_result =
queue.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
- &length, &(read_data[0]));
+ &monotonic_remote_time, &realtime_remote_time,
+ &remote_queue_index, &length, &(read_data[0]));
if (race_reads) {
if (read_result == LocklessQueue::ReadResult::NOTHING_NEW) {
@@ -280,6 +284,9 @@
ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
last_monotonic_sent_time = monotonic_sent_time;
+ EXPECT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
+ EXPECT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
+
ThreadPlusCount tpc;
ASSERT_EQ(length, sizeof(ThreadPlusCount));
memcpy(&tpc, read_data + queue.message_data_size() - length,