Add should_fetch to lockless_queue so we can conditionally fetch
For logging, we want to be able to only fetch the next message if it is
time to do so. This adds a callback which is provided all the context
information at the right point to decide if we actually fetch or not.
Next step is to actually expose it through EventLoop.
Change-Id: Ic15eedc46116f8acfcb5a80ed551e89e9d6b7fa8
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 7c0408d..0b8f1a6 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -13,7 +13,7 @@
namespace {
struct ThreadPlusCount {
- int thread;
+ uint64_t thread;
uint64_t count;
};
@@ -47,7 +47,8 @@
Reset();
}
-void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
+void QueueRacer::RunIteration(bool race_reads, int write_wrap_count,
+ bool set_should_read, bool should_read_result) {
const bool will_wrap = num_messages_ * num_threads_ *
static_cast<uint64_t>(1 + write_wrap_count) >
queue_.config().queue_size;
@@ -197,7 +198,10 @@
++started_writes_;
auto result =
sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
- aos::realtime_clock::min_time, 0xffffffff, UUID::Zero(),
+ aos::realtime_clock::min_time, 0xffffffff,
+ UUID::FromSpan(absl::Span<const uint8_t>(
+ reinterpret_cast<const uint8_t *>(&tpc),
+ sizeof(ThreadPlusCount))),
nullptr, nullptr, nullptr);
CHECK(std::find(expected_send_results_.begin(),
@@ -237,7 +241,8 @@
}
if (check_writes_and_reads_) {
- CheckReads(race_reads, write_wrap_count, &threads);
+ CheckReads(race_reads, write_wrap_count, &threads, set_should_read,
+ should_read_result);
}
// Reap all the threads.
@@ -276,7 +281,8 @@
}
void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
- ::std::vector<ThreadState> *threads) {
+ ::std::vector<ThreadState> *threads,
+ bool set_should_read, bool should_read_result) {
// Now read back the results to double check.
LocklessQueueReader reader(queue_);
const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
@@ -290,6 +296,15 @@
LocklessQueueSize(queue_.memory());
}
+ std::function<bool(const Context &)> nop;
+
+ Context fetched_context;
+ std::function<bool(const Context &)> should_read =
+ [&should_read_result, &fetched_context](const Context &context) {
+ fetched_context = context;
+ return should_read_result;
+ };
+
for (uint64_t i = initial_i;
i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
monotonic_clock::time_point monotonic_sent_time;
@@ -308,8 +323,17 @@
LocklessQueueReader::Result read_result = reader.Read(
wrapped_i, &monotonic_sent_time, &realtime_sent_time,
&monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
- &source_boot_uuid, &length, &(read_data[0]));
+ &source_boot_uuid, &length, &(read_data[0]),
+ set_should_read ? std::ref(should_read) : std::ref(nop));
+ // The code in lockless_queue.cc reads everything but data, checks that the
+ // header hasn't changed, then reads the data. So, if we succeed and both
+ // end up not being corrupted, then we've confirmed everything works.
+ //
+ // Feed in both combos of should_read and whether or not to return true or
+ // false from should_read. By capturing the header values inside the
+ // callback, we can also verify the state in the middle of the process to
+ // make sure we have the right boundaries.
if (race_reads) {
if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
--i;
@@ -322,22 +346,54 @@
continue;
}
}
- // Every message should be good.
- ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD) << ": i is " << i;
+
+ if (!set_should_read) {
+ // Every message should be good.
+ ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
+ << ": i is " << i;
+ } else {
+ if (should_read_result) {
+ ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
+ << ": i is " << i;
+
+ ASSERT_EQ(monotonic_sent_time, fetched_context.monotonic_event_time);
+ ASSERT_EQ(realtime_sent_time, fetched_context.realtime_event_time);
+ ASSERT_EQ(monotonic_remote_time, fetched_context.monotonic_remote_time);
+ ASSERT_EQ(realtime_remote_time, fetched_context.realtime_remote_time);
+ ASSERT_EQ(source_boot_uuid, fetched_context.source_boot_uuid);
+ ASSERT_EQ(remote_queue_index, fetched_context.remote_queue_index);
+ ASSERT_EQ(length, fetched_context.size);
+
+ ASSERT_EQ(
+ absl::Span<const uint8_t>(
+ reinterpret_cast<const uint8_t *>(
+ read_data + LocklessQueueMessageDataSize(queue_.memory()) -
+ length),
+ length),
+ source_boot_uuid.span());
+ } else {
+ ASSERT_EQ(read_result, LocklessQueueReader::Result::FILTERED);
+ monotonic_sent_time = fetched_context.monotonic_event_time;
+ realtime_sent_time = fetched_context.realtime_event_time;
+ monotonic_remote_time = fetched_context.monotonic_remote_time;
+ realtime_remote_time = fetched_context.realtime_remote_time;
+ source_boot_uuid = fetched_context.source_boot_uuid;
+ remote_queue_index = fetched_context.remote_queue_index;
+ length = fetched_context.size;
+ }
+ }
// And, confirm that time never went backwards.
ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
last_monotonic_sent_time = monotonic_sent_time;
- EXPECT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
- EXPECT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
- EXPECT_EQ(source_boot_uuid, UUID::Zero());
+ ASSERT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
+ ASSERT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
ThreadPlusCount tpc;
- ASSERT_EQ(length, sizeof(ThreadPlusCount));
- memcpy(&tpc,
- read_data + LocklessQueueMessageDataSize(queue_.memory()) - length,
- sizeof(ThreadPlusCount));
+ ASSERT_EQ(source_boot_uuid.span().size(), sizeof(ThreadPlusCount));
+ memcpy(&tpc, source_boot_uuid.span().data(),
+ source_boot_uuid.span().size());
if (will_wrap) {
// The queue won't chang out from under us, so we should get some amount