blob: 1c07c496bc5824689d7cf711bc398d8174a93eea [file] [log] [blame]
#ifndef AOS_IPC_LIB_QUEUE_RACER_H_
#define AOS_IPC_LIB_QUEUE_RACER_H_
#include <stdint.h>
#include <atomic>
#include <chrono>
#include <cstring>
#include <functional>
#include <vector>
#include "aos/ipc_lib/index.h"
#include "aos/ipc_lib/lockless_queue.h"
#include "aos/time/time.h"
#include "aos/uuid.h"
namespace aos::ipc_lib {
struct ThreadState;
struct QueueRacerConfiguration {
// Number of threads that send messages
const int num_threads;
// Number of messages sent by each thread
const uint64_t num_messages;
// Allows QueueRacer to check for multiple returns from calling Send()
const std::vector<LocklessQueueSender::Result> expected_send_results = {
LocklessQueueSender::Result::GOOD};
// Channel Storage Duration for queue used by QueueRacer
const monotonic_clock::duration channel_storage_duration =
std::chrono::nanoseconds(1);
// Set to true if all writes and reads are expected to be successful
// This allows QueueRacer to be used for checking failure scenarios
const bool check_writes_and_reads;
};
// Class to test the queue by spinning up a bunch of writing threads and racing
// them together to all write at once.
class QueueRacer {
public:
QueueRacer(LocklessQueue queue, int num_threads, uint64_t num_messages);
QueueRacer(LocklessQueue queue, const QueueRacerConfiguration &config);
// Runs an iteration of the race.
//
// This spins up num_threads, each of which sends num_messages. These must
// both be able to fit in the queue without wrapping.
//
// Then, this reads back all the messages and confirms that all were received
// in order, and none were missed.
//
// If race_reads is set, start reading (and retry if data isn't ready yet)
// while writes are still happening.
//
// If wrap_writes is nonzero, write enough to overwrite old data. This
// necesitates a loser check at the end.
//
// If both are set, run an even looser test.
//
// set_should_read is used to determine if we should pass in a valid
// should_read function, and should_read_result is the return result of that
// function.
void RunIteration(bool race_reads, int write_wrap_count, bool set_should_read,
bool should_read_result);
size_t CurrentIndex() {
return LocklessQueueReader(queue_).LatestIndex().index();
}
private:
// Wipes the queue memory out so we get a clean start.
void Reset() {
memset(reinterpret_cast<void *>(queue_.memory()), 0,
LocklessQueueMemorySize(queue_.config()));
}
// This is a separate method so that when all the ASSERT_* methods, we still
// clean up all the threads. Otherwise we get an assert on the way out of
// RunIteration instead of getting all the way back to gtest.
void CheckReads(bool race_reads, int write_wrap_count,
::std::vector<ThreadState> *threads, bool set_should_read,
bool should_read_result);
LocklessQueue queue_;
const uint64_t num_threads_;
const uint64_t num_messages_;
const monotonic_clock::duration channel_storage_duration_;
// Allows QueueRacer to check for multiple returns from calling Send()
const std::vector<LocklessQueueSender::Result> expected_send_results_;
const bool check_writes_and_reads_;
// The overall number of writes executed will always be between the two of
// these. We can't atomically count writes, so we have to bound them.
//
// Number of writes about to be started.
::std::atomic<uint64_t> started_writes_;
// Number of writes completed.
::std::atomic<uint64_t> finished_writes_;
std::function<bool(uint32_t, monotonic_clock::time_point,
realtime_clock::time_point, monotonic_clock::time_point,
realtime_clock::time_point, uint32_t, UUID, size_t)>
should_read_ = [](uint32_t, monotonic_clock::time_point,
realtime_clock::time_point, monotonic_clock::time_point,
realtime_clock::time_point, uint32_t, UUID,
size_t) { return true; };
};
} // namespace aos::ipc_lib
#endif // AOS_IPC_LIB_QUEUE_RACER_H_