Add should_fetch to lockless_queue so we can conditionally fetch
For logging, we want to be able to only fetch the next message if it is
time to do so. This adds a callback which is provided all the context
information at the right point to decide if we actually fetch or not.
Next step is to actually expose it through EventLoop.
Change-Id: Ic15eedc46116f8acfcb5a80ed551e89e9d6b7fa8
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 08e3998..a65c71a 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -96,6 +96,20 @@
)
cc_library(
+ name = "context",
+ hdrs = [
+ "context.h",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//aos:flatbuffers",
+ "//aos:uuid",
+ "//aos/time",
+ ],
+)
+
+cc_library(
name = "event_loop",
srcs = [
"event_loop.cc",
@@ -109,6 +123,7 @@
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
deps = [
+ ":context",
":event_loop_fbs",
":timing_statistics",
"//aos:configuration",
diff --git a/aos/events/context.h b/aos/events/context.h
new file mode 100644
index 0000000..6648949
--- /dev/null
+++ b/aos/events/context.h
@@ -0,0 +1,70 @@
+#ifndef AOS_EVENTS_CONTEXT_H_
+#define AOS_EVENTS_CONTEXT_H_
+
+#include "aos/flatbuffers.h"
+#include "aos/time/time.h"
+#include "aos/uuid.h"
+
+namespace aos {
+
+// Struct available on Watchers, Fetchers, Timers, and PhasedLoops with context
+// about the current message.
+struct Context {
+ // Time that the message was sent on this node, or the timer was triggered.
+ monotonic_clock::time_point monotonic_event_time;
+ // Realtime the message was sent on this node. This is set to min_time for
+ // Timers and PhasedLoops.
+ realtime_clock::time_point realtime_event_time;
+
+ // The rest are only valid for Watchers and Fetchers.
+
+ // For a single-node configuration, these two are identical to *_event_time.
+ // In a multinode configuration, these are the times that the message was
+ // sent on the original node.
+ monotonic_clock::time_point monotonic_remote_time;
+ realtime_clock::time_point realtime_remote_time;
+
+ // Index in the queue.
+ uint32_t queue_index;
+ // Index into the remote queue. Useful to determine if data was lost. In a
+ // single-node configuration, this will match queue_index.
+ uint32_t remote_queue_index;
+
+ // Size of the data sent.
+ size_t size;
+ // Pointer to the data.
+ const void *data;
+
+ // Index of the message buffer. This will be in [0, NumberBuffers) on
+ // read_method=PIN channels, and -1 for other channels.
+ //
+ // This only tells you about the underlying storage for this message, not
+ // anything about its position in the queue. This is only useful for advanced
+ // zero-copy use cases, on read_method=PIN channels.
+ //
+ // This will uniquely identify a message on this channel at a point in time.
+ // For senders, this point in time is while the sender has the message. With
+ // read_method==PIN, this point in time includes while the caller has access
+ // to this context. For other read_methods, this point in time may be before
+ // the caller has access to this context, which makes this pretty useless.
+ int buffer_index;
+
+ // UUID of the remote node which sent this message, or this node in the case
+ // of events which are local to this node.
+ UUID source_boot_uuid = UUID::Zero();
+
+ // Efficiently copies the flatbuffer into a FlatbufferVector, allocating
+ // memory in the process. It is vital that T matches the type of the
+ // underlying flatbuffer.
+ template <typename T>
+ FlatbufferVector<T> CopyFlatBuffer() const {
+ ResizeableBuffer buffer;
+ buffer.resize(size);
+ memcpy(buffer.data(), data, size);
+ return FlatbufferVector<T>(std::move(buffer));
+ }
+};
+
+} // namespace aos
+
+#endif // AOS_EVENTS_CONTEXT_H_
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index e53bc21..17e3e00 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -15,6 +15,7 @@
#include "aos/configuration.h"
#include "aos/configuration_generated.h"
#include "aos/events/channel_preallocated_allocator.h"
+#include "aos/events/context.h"
#include "aos/events/event_loop_event.h"
#include "aos/events/event_loop_generated.h"
#include "aos/events/timing_statistics.h"
@@ -34,64 +35,6 @@
class EventLoop;
class WatcherState;
-// Struct available on Watchers, Fetchers, Timers, and PhasedLoops with context
-// about the current message.
-struct Context {
- // Time that the message was sent on this node, or the timer was triggered.
- monotonic_clock::time_point monotonic_event_time;
- // Realtime the message was sent on this node. This is set to min_time for
- // Timers and PhasedLoops.
- realtime_clock::time_point realtime_event_time;
-
- // The rest are only valid for Watchers and Fetchers.
-
- // For a single-node configuration, these two are identical to *_event_time.
- // In a multinode configuration, these are the times that the message was
- // sent on the original node.
- monotonic_clock::time_point monotonic_remote_time;
- realtime_clock::time_point realtime_remote_time;
-
- // Index in the queue.
- uint32_t queue_index;
- // Index into the remote queue. Useful to determine if data was lost. In a
- // single-node configuration, this will match queue_index.
- uint32_t remote_queue_index;
-
- // Size of the data sent.
- size_t size;
- // Pointer to the data.
- const void *data;
-
- // Index of the message buffer. This will be in [0, NumberBuffers) on
- // read_method=PIN channels, and -1 for other channels.
- //
- // This only tells you about the underlying storage for this message, not
- // anything about its position in the queue. This is only useful for advanced
- // zero-copy use cases, on read_method=PIN channels.
- //
- // This will uniquely identify a message on this channel at a point in time.
- // For senders, this point in time is while the sender has the message. With
- // read_method==PIN, this point in time includes while the caller has access
- // to this context. For other read_methods, this point in time may be before
- // the caller has access to this context, which makes this pretty useless.
- int buffer_index;
-
- // UUID of the remote node which sent this message, or this node in the case
- // of events which are local to this node.
- UUID source_boot_uuid = UUID::Zero();
-
- // Efficiently copies the flatbuffer into a FlatbufferVector, allocating
- // memory in the process. It is vital that T matches the type of the
- // underlying flatbuffer.
- template <typename T>
- FlatbufferVector<T> CopyFlatBuffer() const {
- ResizeableBuffer buffer;
- buffer.resize(size);
- memcpy(buffer.data(), data, size);
- return FlatbufferVector<T>(std::move(buffer));
- }
-};
-
// Raw version of fetcher. Contains a local variable that the fetcher will
// update. This is used for reflection and as an interface to implement typed
// fetchers.
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 314e366..8ed6e0e 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -226,7 +226,8 @@
queue_index.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
- &context_.source_boot_uuid, &context_.size, copy_buffer);
+ &context_.source_boot_uuid, &context_.size, copy_buffer,
+ std::ref(should_fetch_));
if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
if (pin_data()) {
@@ -320,6 +321,11 @@
std::optional<ipc_lib::LocklessQueuePinner> pinner_;
Context context_;
+
+ // Pre-allocated should_fetch function so we don't allocate.
+ std::function<bool(const Context &)> should_fetch_ = [](const Context &) {
+ return true;
+ };
};
class ShmFetcher : public RawFetcher {
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index b6c2a89..147e0ba 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -209,6 +209,7 @@
"//aos:configuration",
"//aos:realtime",
"//aos:uuid",
+ "//aos/events:context",
"//aos/time",
"//aos/util:compiler_memory_barrier",
"@com_github_google_glog//:glog",
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 9a53eb0..01706ba 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -1269,7 +1269,7 @@
monotonic_clock::time_point *monotonic_remote_time,
realtime_clock::time_point *realtime_remote_time,
uint32_t *remote_queue_index, UUID *source_boot_uuid, size_t *length,
- char *data) const {
+ char *data, std::function<bool(const Context &)> should_read) const {
const size_t queue_size = memory_->queue_size();
// Build up the QueueIndex.
@@ -1343,34 +1343,80 @@
// Then read the data out. Copy it all out to be deterministic and so we can
// make length be from either end.
- *monotonic_sent_time = m->header.monotonic_sent_time;
- *realtime_sent_time = m->header.realtime_sent_time;
- if (m->header.remote_queue_index == 0xffffffffu) {
- *remote_queue_index = queue_index.index();
+ if (!should_read) {
+ *monotonic_sent_time = m->header.monotonic_sent_time;
+ *realtime_sent_time = m->header.realtime_sent_time;
+ if (m->header.remote_queue_index == 0xffffffffu) {
+ *remote_queue_index = queue_index.index();
+ } else {
+ *remote_queue_index = m->header.remote_queue_index;
+ }
+ *monotonic_remote_time = m->header.monotonic_remote_time;
+ *realtime_remote_time = m->header.realtime_remote_time;
+ *source_boot_uuid = m->header.source_boot_uuid;
+ *length = m->header.length;
} else {
- *remote_queue_index = m->header.remote_queue_index;
+ // Cache the header results so we don't modify the outputs unless the filter
+ // function says "go".
+ Context context;
+ context.monotonic_event_time = m->header.monotonic_sent_time;
+ context.realtime_event_time = m->header.realtime_sent_time;
+ context.monotonic_remote_time = m->header.monotonic_remote_time;
+ context.realtime_remote_time = m->header.realtime_remote_time;
+ context.queue_index = queue_index.index();
+ if (m->header.remote_queue_index == 0xffffffffu) {
+ context.remote_queue_index = context.queue_index;
+ } else {
+ context.remote_queue_index = m->header.remote_queue_index;
+ }
+ context.source_boot_uuid = m->header.source_boot_uuid;
+ context.size = m->header.length;
+ context.data = nullptr;
+ context.buffer_index = -1;
+
+ // And finally, confirm that the message *still* points to the queue index
+ // we want. This means it didn't change out from under us. If something
+ // changed out from under us, we were reading it much too late in its
+ // lifetime.
+ aos_compiler_memory_barrier();
+ const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
+ if (final_queue_index != queue_index) {
+ VLOG(3) << "Changed out from under us. Reading " << std::hex
+ << queue_index.index() << ", finished with "
+ << final_queue_index.index() << ", delta: " << std::dec
+ << (final_queue_index.index() - queue_index.index());
+ return Result::OVERWROTE;
+ }
+
+ // We now know that the context is safe to use. See if we are supposed to
+ // take the message or not.
+ if (!should_read(context)) {
+ return Result::FILTERED;
+ }
+
+ // And now take it.
+ *monotonic_sent_time = context.monotonic_event_time;
+ *realtime_sent_time = context.realtime_event_time;
+ *remote_queue_index = context.remote_queue_index;
+ *monotonic_remote_time = context.monotonic_remote_time;
+ *realtime_remote_time = context.realtime_remote_time;
+ *source_boot_uuid = context.source_boot_uuid;
+ *length = context.size;
}
- *monotonic_remote_time = m->header.monotonic_remote_time;
- *realtime_remote_time = m->header.realtime_remote_time;
- *source_boot_uuid = m->header.source_boot_uuid;
if (data) {
memcpy(data, m->data(memory_->message_data_size()),
memory_->message_data_size());
- }
- *length = m->header.length;
- // And finally, confirm that the message *still* points to the queue index we
- // want. This means it didn't change out from under us.
- // If something changed out from under us, we were reading it much too late in
- // it's lifetime.
- aos_compiler_memory_barrier();
- const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
- if (final_queue_index != queue_index) {
- VLOG(3) << "Changed out from under us. Reading " << std::hex
- << queue_index.index() << ", finished with "
- << final_queue_index.index() << ", delta: " << std::dec
- << (final_queue_index.index() - queue_index.index());
- return Result::OVERWROTE;
+ // Check again since we touched the message again.
+ aos_compiler_memory_barrier();
+ const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
+ if (final_queue_index != queue_index) {
+ VLOG(3) << "Changed out from under us. Reading " << std::hex
+ << queue_index.index() << ", finished with "
+ << final_queue_index.index() << ", delta: " << std::dec
+ << (final_queue_index.index() - queue_index.index());
+ return Result::OVERWROTE;
+ }
}
return Result::GOOD;
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index f83b558..2cafb48 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -10,6 +10,7 @@
#include "absl/types/span.h"
+#include "aos/events/context.h"
#include "aos/ipc_lib/aos_sync.h"
#include "aos/ipc_lib/data_alignment.h"
#include "aos/ipc_lib/index.h"
@@ -406,7 +407,19 @@
class LocklessQueueReader {
public:
- enum class Result { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
+ enum class Result {
+ // Message we read was too old and no longer is in the queue.
+ TOO_OLD,
+ // Success!
+ GOOD,
+ // The message is in the future and we haven't written it yet.
+ NOTHING_NEW,
+ // There is a message, but should_read() returned false so we didn't fetch
+ // it.
+ FILTERED,
+ // The message got overwritten while we were reading it.
+ OVERWROTE,
+ };
LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
queue.Initialize();
@@ -416,7 +429,8 @@
// NOTHING_NEW until that gets overwritten with new data. If you ask for an
// element newer than QueueSize() from the current message, we consider it
// behind by a large amount and return TOO_OLD. If the message is modified
- // out from underneath us as we read it, return OVERWROTE.
+ // out from underneath us as we read it, return OVERWROTE. If we found a new
+ // message, but the filter function returned false, return FILTERED.
//
// data may be nullptr to indicate the data should not be copied.
Result Read(uint32_t queue_index,
@@ -425,7 +439,8 @@
monotonic_clock::time_point *monotonic_remote_time,
realtime_clock::time_point *realtime_remote_time,
uint32_t *remote_queue_index, UUID *source_boot_uuid,
- size_t *length, char *data) const;
+ size_t *length, char *data,
+ std::function<bool(const Context &context)> should_read) const;
// Returns the index to the latest queue message. Returns empty_queue_index()
// if there are no messages in the queue. Do note that this index wraps if
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index 3ed3099..c37dc63 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -692,6 +692,11 @@
// increments.
char last_data = '0';
int i = 0;
+
+ std::function<bool(const Context &)> should_read = [](const Context &) {
+ return true;
+ };
+
while (true) {
monotonic_clock::time_point monotonic_sent_time;
realtime_clock::time_point realtime_sent_time;
@@ -702,10 +707,11 @@
char read_data[1024];
size_t length;
- LocklessQueueReader::Result read_result = reader.Read(
- i, &monotonic_sent_time, &realtime_sent_time,
- &monotonic_remote_time, &realtime_remote_time,
- &remote_queue_index, &source_boot_uuid, &length, &(read_data[0]));
+ LocklessQueueReader::Result read_result =
+ reader.Read(i, &monotonic_sent_time, &realtime_sent_time,
+ &monotonic_remote_time, &realtime_remote_time,
+ &remote_queue_index, &source_boot_uuid, &length,
+ &(read_data[0]), std::ref(should_read));
if (read_result != LocklessQueueReader::Result::GOOD) {
if (read_result == LocklessQueueReader::Result::TOO_OLD) {
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 34e2762..93ae2a3 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -234,9 +234,14 @@
LocklessQueueSender::Make(queue(), kChannelStorageDuration).value();
LocklessQueueReader reader(queue());
- time::PhasedLoop loop(std::chrono::microseconds(1), monotonic_clock::now());
+ time::PhasedLoop loop(kChannelStorageDuration / (config_.queue_size - 1),
+ monotonic_clock::now());
+ std::function<bool(const Context &)> should_read = [](const Context &) {
+ return true;
+ };
+
// Send enough messages to wrap.
- for (int i = 0; i < 20000; ++i) {
+ for (int i = 0; i < 2 * static_cast<int>(config_.queue_size); ++i) {
// Confirm that the queue index makes sense given the number of sends.
EXPECT_EQ(reader.LatestIndex().index(),
i == 0 ? QueueIndex::Invalid().index() : i - 1);
@@ -244,7 +249,7 @@
// Send a trivial piece of data.
char data[100];
size_t s = snprintf(data, sizeof(data), "foobar%d", i);
- EXPECT_EQ(sender.Send(data, s, monotonic_clock::min_time,
+ ASSERT_EQ(sender.Send(data, s, monotonic_clock::min_time,
realtime_clock::min_time, 0xffffffffu, UUID::Zero(),
nullptr, nullptr, nullptr),
LocklessQueueSender::Result::GOOD);
@@ -272,12 +277,12 @@
LocklessQueueReader::Result read_result = reader.Read(
index.index(), &monotonic_sent_time, &realtime_sent_time,
&monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
- &source_boot_uuid, &length, &(read_data[0]));
+ &source_boot_uuid, &length, &(read_data[0]), std::ref(should_read));
// This should either return GOOD, or TOO_OLD if it is before the start of
// the queue.
if (read_result != LocklessQueueReader::Result::GOOD) {
- EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
+ ASSERT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
}
loop.SleepUntilNext();
@@ -291,6 +296,8 @@
::std::mt19937 generator(0);
::std::uniform_int_distribution<> write_wrap_count_distribution(0, 10);
::std::bernoulli_distribution race_reads_distribution;
+ ::std::bernoulli_distribution set_should_read_distribution;
+ ::std::bernoulli_distribution should_read_result_distribution;
::std::bernoulli_distribution wrap_writes_distribution;
const chrono::seconds print_frequency(FLAGS_print_rate);
@@ -304,12 +311,15 @@
monotonic_clock::time_point next_print_time = start_time + print_frequency;
uint64_t messages = 0;
for (int i = 0; i < FLAGS_min_iterations || monotonic_now < end_time; ++i) {
- bool race_reads = race_reads_distribution(generator);
+ const bool race_reads = race_reads_distribution(generator);
+ const bool set_should_read = set_should_read_distribution(generator);
+ const bool should_read_result = should_read_result_distribution(generator);
int write_wrap_count = write_wrap_count_distribution(generator);
if (!wrap_writes_distribution(generator)) {
write_wrap_count = 0;
}
- EXPECT_NO_FATAL_FAILURE(racer.RunIteration(race_reads, write_wrap_count))
+ EXPECT_NO_FATAL_FAILURE(racer.RunIteration(
+ race_reads, write_wrap_count, set_should_read, should_read_result))
<< ": Running with race_reads: " << race_reads
<< ", and write_wrap_count " << write_wrap_count << " and on iteration "
<< i;
@@ -391,7 +401,7 @@
std::chrono::milliseconds(500),
false});
- EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
+ EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0, true, true));
}
// // Send enough messages to wrap the 32 bit send counter.
@@ -401,7 +411,7 @@
QueueRacer racer(queue(), 1, kNumMessages);
const monotonic_clock::time_point start_time = monotonic_clock::now();
- EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
+ EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0, false, true));
const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
double elapsed_seconds = chrono::duration_cast<chrono::duration<double>>(
monotonic_now - start_time)
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 7c0408d..0b8f1a6 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -13,7 +13,7 @@
namespace {
struct ThreadPlusCount {
- int thread;
+ uint64_t thread;
uint64_t count;
};
@@ -47,7 +47,8 @@
Reset();
}
-void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
+void QueueRacer::RunIteration(bool race_reads, int write_wrap_count,
+ bool set_should_read, bool should_read_result) {
const bool will_wrap = num_messages_ * num_threads_ *
static_cast<uint64_t>(1 + write_wrap_count) >
queue_.config().queue_size;
@@ -197,7 +198,10 @@
++started_writes_;
auto result =
sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
- aos::realtime_clock::min_time, 0xffffffff, UUID::Zero(),
+ aos::realtime_clock::min_time, 0xffffffff,
+ UUID::FromSpan(absl::Span<const uint8_t>(
+ reinterpret_cast<const uint8_t *>(&tpc),
+ sizeof(ThreadPlusCount))),
nullptr, nullptr, nullptr);
CHECK(std::find(expected_send_results_.begin(),
@@ -237,7 +241,8 @@
}
if (check_writes_and_reads_) {
- CheckReads(race_reads, write_wrap_count, &threads);
+ CheckReads(race_reads, write_wrap_count, &threads, set_should_read,
+ should_read_result);
}
// Reap all the threads.
@@ -276,7 +281,8 @@
}
void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
- ::std::vector<ThreadState> *threads) {
+ ::std::vector<ThreadState> *threads,
+ bool set_should_read, bool should_read_result) {
// Now read back the results to double check.
LocklessQueueReader reader(queue_);
const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
@@ -290,6 +296,15 @@
LocklessQueueSize(queue_.memory());
}
+ std::function<bool(const Context &)> nop;
+
+ Context fetched_context;
+ std::function<bool(const Context &)> should_read =
+ [&should_read_result, &fetched_context](const Context &context) {
+ fetched_context = context;
+ return should_read_result;
+ };
+
for (uint64_t i = initial_i;
i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
monotonic_clock::time_point monotonic_sent_time;
@@ -308,8 +323,17 @@
LocklessQueueReader::Result read_result = reader.Read(
wrapped_i, &monotonic_sent_time, &realtime_sent_time,
&monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
- &source_boot_uuid, &length, &(read_data[0]));
+ &source_boot_uuid, &length, &(read_data[0]),
+ set_should_read ? std::ref(should_read) : std::ref(nop));
+ // The code in lockless_queue.cc reads everything but data, checks that the
+ // header hasn't changed, then reads the data. So, if we succeed and both
+ // end up not being corrupted, then we've confirmed everything works.
+ //
+ // Feed in both combos of should_read and whether or not to return true or
+ // false from should_read. By capturing the header values inside the
+ // callback, we can also verify the state in the middle of the process to
+ // make sure we have the right boundaries.
if (race_reads) {
if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
--i;
@@ -322,22 +346,54 @@
continue;
}
}
- // Every message should be good.
- ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD) << ": i is " << i;
+
+ if (!set_should_read) {
+ // Every message should be good.
+ ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
+ << ": i is " << i;
+ } else {
+ if (should_read_result) {
+ ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
+ << ": i is " << i;
+
+ ASSERT_EQ(monotonic_sent_time, fetched_context.monotonic_event_time);
+ ASSERT_EQ(realtime_sent_time, fetched_context.realtime_event_time);
+ ASSERT_EQ(monotonic_remote_time, fetched_context.monotonic_remote_time);
+ ASSERT_EQ(realtime_remote_time, fetched_context.realtime_remote_time);
+ ASSERT_EQ(source_boot_uuid, fetched_context.source_boot_uuid);
+ ASSERT_EQ(remote_queue_index, fetched_context.remote_queue_index);
+ ASSERT_EQ(length, fetched_context.size);
+
+ ASSERT_EQ(
+ absl::Span<const uint8_t>(
+ reinterpret_cast<const uint8_t *>(
+ read_data + LocklessQueueMessageDataSize(queue_.memory()) -
+ length),
+ length),
+ source_boot_uuid.span());
+ } else {
+ ASSERT_EQ(read_result, LocklessQueueReader::Result::FILTERED);
+ monotonic_sent_time = fetched_context.monotonic_event_time;
+ realtime_sent_time = fetched_context.realtime_event_time;
+ monotonic_remote_time = fetched_context.monotonic_remote_time;
+ realtime_remote_time = fetched_context.realtime_remote_time;
+ source_boot_uuid = fetched_context.source_boot_uuid;
+ remote_queue_index = fetched_context.remote_queue_index;
+ length = fetched_context.size;
+ }
+ }
// And, confirm that time never went backwards.
ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
last_monotonic_sent_time = monotonic_sent_time;
- EXPECT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
- EXPECT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
- EXPECT_EQ(source_boot_uuid, UUID::Zero());
+ ASSERT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
+ ASSERT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
ThreadPlusCount tpc;
- ASSERT_EQ(length, sizeof(ThreadPlusCount));
- memcpy(&tpc,
- read_data + LocklessQueueMessageDataSize(queue_.memory()) - length,
- sizeof(ThreadPlusCount));
+ ASSERT_EQ(source_boot_uuid.span().size(), sizeof(ThreadPlusCount));
+ memcpy(&tpc, source_boot_uuid.span().data(),
+ source_boot_uuid.span().size());
if (will_wrap) {
// The queue won't chang out from under us, so we should get some amount
diff --git a/aos/ipc_lib/queue_racer.h b/aos/ipc_lib/queue_racer.h
index 3e5ca94..87f2cce 100644
--- a/aos/ipc_lib/queue_racer.h
+++ b/aos/ipc_lib/queue_racer.h
@@ -48,7 +48,12 @@
// necesitates a loser check at the end.
//
// If both are set, run an even looser test.
- void RunIteration(bool race_reads, int write_wrap_count);
+ //
+ // set_should_read is used to determine if we should pass in a valid
+ // should_read function, and should_read_result is the return result of that
+ // function.
+ void RunIteration(bool race_reads, int write_wrap_count, bool set_should_read,
+ bool should_read_result);
size_t CurrentIndex() {
return LocklessQueueReader(queue_).LatestIndex().index();
@@ -64,7 +69,8 @@
// clean up all the threads. Otherwise we get an assert on the way out of
// RunIteration instead of getting all the way back to gtest.
void CheckReads(bool race_reads, int write_wrap_count,
- ::std::vector<ThreadState> *threads);
+ ::std::vector<ThreadState> *threads, bool set_should_read,
+ bool should_read_result);
LocklessQueue queue_;
const uint64_t num_threads_;
@@ -80,6 +86,14 @@
::std::atomic<uint64_t> started_writes_;
// Number of writes completed.
::std::atomic<uint64_t> finished_writes_;
+
+ std::function<bool(uint32_t, monotonic_clock::time_point,
+ realtime_clock::time_point, monotonic_clock::time_point,
+ realtime_clock::time_point, uint32_t, UUID, size_t)>
+ should_read_ = [](uint32_t, monotonic_clock::time_point,
+ realtime_clock::time_point, monotonic_clock::time_point,
+ realtime_clock::time_point, uint32_t, UUID,
+ size_t) { return true; };
};
} // namespace ipc_lib