blob: a25d10aa500011e971f4b0c45ae15fbe2aa61f72 [file] [log] [blame]
#include "aos/ipc_lib/queue_racer.h"
#include <stdio.h>
#include <algorithm>
#include <cinttypes>
#include <cstring>
#include <limits>
#include <optional>
#include <ostream>
#include <thread>
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/types/span.h"
#include "gtest/gtest.h"
#include "aos/events/context.h"
#include "aos/ipc_lib/event.h"
namespace aos::ipc_lib {
namespace {
struct ThreadPlusCount {
uint64_t thread;
uint64_t count;
};
} // namespace
struct ThreadState {
::std::thread thread;
Event ready;
uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
};
QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
uint64_t num_messages)
: queue_(queue),
num_threads_(num_threads),
num_messages_(num_messages),
channel_storage_duration_(std::chrono::nanoseconds(1)),
expected_send_results_({LocklessQueueSender::Result::GOOD}),
check_writes_and_reads_(true) {
Reset();
}
QueueRacer::QueueRacer(LocklessQueue queue,
const QueueRacerConfiguration &config)
: queue_(queue),
num_threads_(config.num_threads),
num_messages_(config.num_messages),
channel_storage_duration_(config.channel_storage_duration),
expected_send_results_(config.expected_send_results),
check_writes_and_reads_(config.check_writes_and_reads) {
Reset();
}
void QueueRacer::RunIteration(bool race_reads, int write_wrap_count,
bool set_should_read, bool should_read_result) {
const bool will_wrap = num_messages_ * num_threads_ *
static_cast<uint64_t>(1 + write_wrap_count) >
queue_.config().queue_size;
// Clear out shmem.
Reset();
started_writes_ = 0;
finished_writes_ = 0;
// Event used to start all the threads processing at once.
Event run;
::std::atomic<bool> poll_index{true};
// List of threads.
::std::vector<ThreadState> threads(num_threads_);
::std::thread queue_index_racer([this, &poll_index]() {
LocklessQueueReader reader(queue_);
// Track the number of times we wrap, and cache the modulo.
uint64_t wrap_count = 0;
uint32_t last_queue_index = 0;
const uint32_t max_queue_index =
QueueIndex::MaxIndex(0xffffffffu, queue_.config().queue_size);
while (poll_index) {
// We want to read everything backwards. This will give us conservative
// bounds. And with enough time and randomness, we will see all the cases
// we care to see.
// These 3 numbers look at the same thing, but at different points of time
// in the process. The process (essentially) looks like:
//
// ++started_writes;
// ++latest_queue_index;
// ++finished_writes;
//
// We want to check that latest_queue_index is bounded by the number of
// writes started and finished. Basically, we can say that
// finished_writes < latest_queue_index always. And
// latest_queue_index < started_writes. And everything always increases.
// So, if we let more time elapse between sampling finished_writes and
// latest_queue_index, we will only be relaxing our bounds, not
// invalidating the check. The same goes for started_writes.
//
// So, grab them in order.
const uint64_t finished_writes = finished_writes_.load();
const QueueIndex latest_queue_index_queue_index = reader.LatestIndex();
const uint64_t started_writes = started_writes_.load();
const uint32_t latest_queue_index_uint32_t =
latest_queue_index_queue_index.index();
uint64_t latest_queue_index = latest_queue_index_uint32_t;
if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
// If we got smaller, we wrapped.
if (latest_queue_index_uint32_t < last_queue_index) {
++wrap_count;
}
// And apply it.
latest_queue_index +=
static_cast<uint64_t>(max_queue_index) * wrap_count;
last_queue_index = latest_queue_index_uint32_t;
}
// For grins, check that we have always started more than we finished.
// Should never fail.
EXPECT_GE(started_writes, finished_writes);
// If we are at the beginning, the queue needs to always return empty.
if (started_writes == 0) {
EXPECT_EQ(latest_queue_index_queue_index, QueueIndex::Invalid());
EXPECT_EQ(finished_writes, 0);
} else {
if (finished_writes == 0) {
// Plausible to be at the beginning, in which case we don't have
// anything to check.
if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
// Otherwise, we have started. The queue can't have any more
// entries than this.
EXPECT_GE(started_writes, latest_queue_index + 1);
}
} else {
EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
// latest_queue_index is an index, not a count. So it always reads 1
// low.
if (check_writes_and_reads_) {
EXPECT_GE(latest_queue_index + 1, finished_writes);
}
}
}
}
});
// Build up each thread and kick it off.
int thread_index = 0;
for (ThreadState &t : threads) {
if (will_wrap) {
t.event_count = ::std::numeric_limits<uint64_t>::max();
} else {
t.event_count = 0;
}
t.thread = ::std::thread([this, &t, thread_index, &run,
write_wrap_count]() {
LocklessQueueSender sender =
LocklessQueueSender::Make(queue_, channel_storage_duration_).value();
CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
// Signal that we are ready to start sending.
t.ready.Set();
// Wait until signaled to start running.
run.Wait();
// Gogogo!
for (uint64_t i = 0;
i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
++i) {
char *const data = static_cast<char *>(sender.Data()) + sender.size() -
sizeof(ThreadPlusCount);
const char fill = (i + 55) & 0xFF;
memset(data, fill, sizeof(ThreadPlusCount));
{
bool found_nonzero = false;
for (size_t i = 0; i < sizeof(ThreadPlusCount); ++i) {
if (data[i] != fill) {
found_nonzero = true;
}
}
CHECK(!found_nonzero) << ": Somebody else is writing to our buffer";
}
ThreadPlusCount tpc;
tpc.thread = thread_index;
tpc.count = i;
memcpy(data, &tpc, sizeof(ThreadPlusCount));
if (i % 0x800000 == 0x100000) {
fprintf(
stderr, "Sent %" PRIu64 ", %f %%\n", i,
static_cast<double>(i) /
static_cast<double>(num_messages_ * (1 + write_wrap_count)) *
100.0);
}
++started_writes_;
auto result =
sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
aos::realtime_clock::min_time,
aos::monotonic_clock::min_time, 0xffffffff,
UUID::FromSpan(absl::Span<const uint8_t>(
reinterpret_cast<const uint8_t *>(&tpc),
sizeof(ThreadPlusCount))),
nullptr, nullptr, nullptr);
CHECK(std::find(expected_send_results_.begin(),
expected_send_results_.end(),
result) != expected_send_results_.end())
<< "Unexpected send result: " << result;
// Blank out the new scratch buffer, to catch other people using it.
{
char *const new_data = static_cast<char *>(sender.Data()) +
sender.size() - sizeof(ThreadPlusCount);
const char new_fill = ~fill;
memset(new_data, new_fill, sizeof(ThreadPlusCount));
}
++finished_writes_;
}
});
++thread_index;
}
// Wait until all the threads are ready.
for (ThreadState &t : threads) {
t.ready.Wait();
}
// And start them racing.
run.Set();
// Let all the threads finish before reading if we are supposed to not be
// racing reads.
if (!race_reads) {
for (ThreadState &t : threads) {
t.thread.join();
}
poll_index = false;
queue_index_racer.join();
}
if (check_writes_and_reads_) {
CheckReads(race_reads, write_wrap_count, &threads, set_should_read,
should_read_result);
}
// Reap all the threads.
if (race_reads) {
for (ThreadState &t : threads) {
t.thread.join();
}
poll_index = false;
queue_index_racer.join();
}
if (check_writes_and_reads_) {
// Confirm that the number of writes matches the expected number of writes.
ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
started_writes_);
ASSERT_EQ(num_threads_ * num_messages_ * (1 + write_wrap_count),
finished_writes_);
// And that every thread sent the right number of messages.
for (ThreadState &t : threads) {
if (will_wrap) {
if (!race_reads) {
// If we are wrapping, there is a possibility that a thread writes
// everything *before* we can read any of it, and it all gets
// overwritten.
ASSERT_TRUE(t.event_count == ::std::numeric_limits<uint64_t>::max() ||
t.event_count == (1 + write_wrap_count) * num_messages_)
<< ": Got " << t.event_count << " events, expected "
<< (1 + write_wrap_count) * num_messages_;
}
} else {
ASSERT_EQ(t.event_count, num_messages_);
}
}
}
}
void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
::std::vector<ThreadState> *threads,
bool set_should_read, bool should_read_result) {
// Now read back the results to double check.
LocklessQueueReader reader(queue_);
const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
LocklessQueueSize(queue_.memory());
monotonic_clock::time_point last_monotonic_sent_time =
monotonic_clock::epoch();
uint64_t initial_i = 0;
if (will_wrap) {
initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
LocklessQueueSize(queue_.memory());
}
std::function<bool(const Context &)> nop;
Context fetched_context;
std::function<bool(const Context &)> should_read =
[&should_read_result, &fetched_context](const Context &context) {
fetched_context = context;
return should_read_result;
};
for (uint64_t i = initial_i;
i < (1 + write_wrap_count) * num_messages_ * num_threads_; ++i) {
monotonic_clock::time_point monotonic_sent_time;
realtime_clock::time_point realtime_sent_time;
monotonic_clock::time_point monotonic_remote_time;
monotonic_clock::time_point monotonic_remote_transmit_time;
realtime_clock::time_point realtime_remote_time;
UUID source_boot_uuid;
uint32_t remote_queue_index;
size_t length;
char read_data[1024];
// Handle overflowing the message count for the wrap test.
const uint32_t wrapped_i =
i % static_cast<size_t>(QueueIndex::MaxIndex(
0xffffffffu, LocklessQueueSize(queue_.memory())));
LocklessQueueReader::Result read_result =
set_should_read
? reader.Read(
wrapped_i, &monotonic_sent_time, &realtime_sent_time,
&monotonic_remote_time, &monotonic_remote_transmit_time,
&realtime_remote_time, &remote_queue_index, &source_boot_uuid,
&length, &(read_data[0]), std::ref(should_read))
: reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
&monotonic_remote_time,
&monotonic_remote_transmit_time,
&realtime_remote_time, &remote_queue_index,
&source_boot_uuid, &length, &(read_data[0]), nop);
// The code in lockless_queue.cc reads everything but data, checks that the
// header hasn't changed, then reads the data. So, if we succeed and both
// end up not being corrupted, then we've confirmed everything works.
//
// Feed in both combos of should_read and whether or not to return true or
// false from should_read. By capturing the header values inside the
// callback, we can also verify the state in the middle of the process to
// make sure we have the right boundaries.
if (race_reads) {
if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
--i;
continue;
}
}
if (race_reads && will_wrap) {
if (read_result == LocklessQueueReader::Result::TOO_OLD) {
continue;
}
}
if (!set_should_read) {
// Every message should be good.
ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
<< ": i is " << i;
} else {
if (should_read_result) {
ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD)
<< ": i is " << i;
ASSERT_EQ(monotonic_sent_time, fetched_context.monotonic_event_time);
ASSERT_EQ(realtime_sent_time, fetched_context.realtime_event_time);
ASSERT_EQ(monotonic_remote_time, fetched_context.monotonic_remote_time);
ASSERT_EQ(realtime_remote_time, fetched_context.realtime_remote_time);
ASSERT_EQ(source_boot_uuid, fetched_context.source_boot_uuid);
ASSERT_EQ(remote_queue_index, fetched_context.remote_queue_index);
ASSERT_EQ(length, fetched_context.size);
ASSERT_EQ(
absl::Span<const uint8_t>(
reinterpret_cast<const uint8_t *>(
read_data + LocklessQueueMessageDataSize(queue_.memory()) -
length),
length),
source_boot_uuid.span());
} else {
ASSERT_EQ(read_result, LocklessQueueReader::Result::FILTERED);
monotonic_sent_time = fetched_context.monotonic_event_time;
realtime_sent_time = fetched_context.realtime_event_time;
monotonic_remote_time = fetched_context.monotonic_remote_time;
realtime_remote_time = fetched_context.realtime_remote_time;
source_boot_uuid = fetched_context.source_boot_uuid;
remote_queue_index = fetched_context.remote_queue_index;
length = fetched_context.size;
}
}
// And, confirm that time never went backwards.
ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
last_monotonic_sent_time = monotonic_sent_time;
ASSERT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
ASSERT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
ThreadPlusCount tpc;
ASSERT_EQ(source_boot_uuid.span().size(), sizeof(ThreadPlusCount));
memcpy(&tpc, source_boot_uuid.span().data(),
source_boot_uuid.span().size());
if (will_wrap) {
// The queue won't chang out from under us, so we should get some amount
// of the tail end of the messages from a a thread.
// Confirm that once we get our first message, they all show up.
if ((*threads)[tpc.thread].event_count ==
::std::numeric_limits<uint64_t>::max()) {
(*threads)[tpc.thread].event_count = tpc.count;
}
if (race_reads) {
// Make sure nothing goes backwards. Really not much we can do here.
ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count)
<< ": Thread " << tpc.thread;
(*threads)[tpc.thread].event_count = tpc.count;
} else {
// Make sure nothing goes backwards. Really not much we can do here.
ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
<< ": Thread " << tpc.thread;
}
} else {
// Confirm that we see every message counter from every thread.
ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
<< ": Thread " << tpc.thread;
}
++(*threads)[tpc.thread].event_count;
}
}
} // namespace aos::ipc_lib