merging in changes "queued" up for after Davis
diff --git a/aos/linux_code/ipc_lib/ipc_lib.gyp b/aos/linux_code/ipc_lib/ipc_lib.gyp
index 08e8df8..fe8b2e0 100644
--- a/aos/linux_code/ipc_lib/ipc_lib.gyp
+++ b/aos/linux_code/ipc_lib/ipc_lib.gyp
@@ -51,7 +51,7 @@
'target_name': 'raw_queue_test',
'type': 'executable',
'sources': [
- 'queue_test.cc',
+ 'raw_queue_test.cc',
],
'dependencies': [
'<(EXTERNALS):gtest',
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
index b19ef2c..8103b6e 100644
--- a/aos/linux_code/ipc_lib/queue.cc
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -22,6 +22,7 @@
const bool kWriteDebug = false;
const bool kRefDebug = false;
const bool kFetchDebug = false;
+const bool kReadIndexDebug = false;
// The number of extra messages the pool associated with each queue will be able
// to hold (for readers who are slow about freeing them or who leak one when
@@ -62,6 +63,7 @@
// TODO(brians) maybe do this with atomic integer instructions so it doesn't
// have to lock/unlock pool_lock_
void RawQueue::DecrementMessageReferenceCount(const void *msg) {
+ // TODO(brians): Test this function.
MutexLocker locker(&pool_lock_);
MessageHeader *header = MessageHeader::Get(msg);
--header->ref_count;
@@ -226,6 +228,7 @@
}
bool RawQueue::WriteMessage(void *msg, int options) {
+ // TODO(brians): Test this function.
if (kWriteDebug) {
printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
}
@@ -326,9 +329,10 @@
}
return true;
}
-void *RawQueue::ReadPeek(int options, int start) {
+void *RawQueue::ReadPeek(int options, int start) const {
void *ret;
if (options & kFromEnd) {
+ // TODO(brians): Test this block with ReadMessageIndex.
int pos = data_end_ - 1;
if (pos < 0) { // if it needs to wrap
pos = data_length_ - 1;
@@ -338,6 +342,7 @@
}
ret = data_[pos];
} else {
+ assert(start != -1);
if (kReadDebug) {
printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
}
@@ -351,6 +356,7 @@
return ret;
}
const void *RawQueue::ReadMessage(int options) {
+ // TODO(brians): Test this function.
if (kReadDebug) {
printf("queue: %p->ReadMessage(%x)\n", this, options);
}
@@ -424,21 +430,40 @@
// Where we're going to start reading.
int my_start;
- const int unread_messages = messages_ - *index;
- int current_messages = data_end_ - data_start_;
- if (current_messages < 0) current_messages += data_length_ - 1;
- // If we're behind the available messages.
- if (unread_messages > current_messages) {
- // Catch index up to the last available message.
- *index = messages_ - current_messages;
- // And that's the one we're going to read.
- my_start = data_start_;
+ if (options & kFromEnd) {
+ my_start = -1;
} else {
- // Just start reading at the first available message that we haven't yet
- // read.
- my_start = data_end_ - unread_messages;
- if (my_start < 0) {
- my_start += data_length_;
+ const int unread_messages = messages_ - *index;
+ assert(unread_messages > 0);
+ int current_messages = data_end_ - data_start_;
+ if (current_messages < 0) current_messages += data_length_;
+ if (kReadIndexDebug) {
+ printf("queue: %p start=%d end=%d current=%d\n",
+ this, data_start_, data_end_, current_messages);
+ }
+ assert(current_messages > 0);
+ // If we're behind the available messages.
+ if (unread_messages > current_messages) {
+ // Catch index up to the last available message.
+ *index = messages_ - current_messages;
+ // And that's the one we're going to read.
+ my_start = data_start_;
+ if (kReadIndexDebug) {
+ printf("queue: %p jumping ahead to message %d (have %d) (at %d)\n",
+ this, *index, messages_, data_start_);
+ }
+ } else {
+ // Just start reading at the first available message that we haven't yet
+ // read.
+ my_start = data_end_ - unread_messages;
+ if (kReadIndexDebug) {
+ printf("queue: %p original read from %d\n", this, my_start);
+ }
+ if (data_start_ < data_end_) {
+ assert(my_start >= data_start_);
+ } else {
+ if (my_start < 0) my_start += data_length_;
+ }
}
}
@@ -446,10 +471,14 @@
msg = ReadPeek(options, my_start);
} else {
if (options & kFromEnd) {
+ // TODO(brians): Test this block.
if (kReadDebug) {
printf("queue: %p start of c1\n", this);
}
int pos = data_end_ - 1;
+ if (kReadIndexDebug) {
+ printf("queue: %p end pos start %d\n", this, pos);
+ }
if (pos < 0) { // If it wrapped.
pos = data_length_ - 1; // Unwrap it.
}
@@ -462,13 +491,12 @@
if (kReadDebug) {
printf("queue: %p reading from d1: %d\n", this, my_start);
}
-#if 0
- // TODO(brians): Do this check right? (make sure full queue works etc)
// This assert checks that we're either within both endpoints (duh) or
- // outside of both of them (if the queue is wrapped around).
+ // not between them (if the queue is wrapped around).
assert((my_start >= data_start_ && my_start < data_end_) ||
- (my_start > data_end_ && my_start <= data_start_));
-#endif
+ ((my_start >= data_start_) == (my_start > data_end_)));
+ // More sanity checking.
+ assert((my_start >= 0) && (my_start < data_length_));
msg = data_[my_start];
++(*index);
}
@@ -483,6 +511,7 @@
}
void *RawQueue::GetMessage() {
+ // TODO(brians): Test this function.
MutexLocker locker(&pool_lock_);
MessageHeader *header;
if (pool_length_ - messages_used_ > 0) {
diff --git a/aos/linux_code/ipc_lib/queue.h b/aos/linux_code/ipc_lib/queue.h
index 4d00cde..e7737e1 100644
--- a/aos/linux_code/ipc_lib/queue.h
+++ b/aos/linux_code/ipc_lib/queue.h
@@ -55,20 +55,22 @@
// Constants for passing to options arguments.
// The non-conflicting ones can be combined with bitwise-or.
- // Causes the returned message to be left in the queue.
+ // Doesn't update the currently read index (the read messages in the queue or
+ // the index). This means the returned message (and any others skipped with
+ // kFromEnd) will be left in the queue.
// For reading only.
static const int kPeek = 0x0001;
// Reads the last message in the queue instead of just the next one.
// NOTE: This removes all of the messages until the last one from the queue
- // (which means that nobody else will read them). However, PEEK means to not
- // remove any from the queue, including the ones that are skipped.
+ // (which means that nobody else will read them).
// For reading only.
static const int kFromEnd = 0x0002;
// Causes reads to return NULL and writes to fail instead of waiting.
// For reading and writing.
static const int kNonBlock = 0x0004;
// Causes things to block.
- // IMPORTANT: Has a value of 0 so that it is the default. This has to stay.
+ // IMPORTANT: Has a value of 0 so that it is the default. This has to stay
+ // this way.
// For reading and writing.
static const int kBlock = 0x0000;
// Causes writes to overwrite the oldest message in the queue instead of
@@ -94,7 +96,7 @@
// same message twice with the same index argument. However, it may not
// return some messages that pass through the queue.
// *index should start as 0. index does not have to be in shared memory, but
- // it can be
+ // it can be.
const void *ReadMessageIndex(int options, int *index);
// Retrieves ("allocates") a message that can then be written to the queue.
@@ -111,6 +113,10 @@
if (msg != NULL) DecrementMessageReferenceCount(msg);
}
+ // Returns the number of messages from this queue that are currently used (in
+ // the queue and/or given out as references).
+ int messages_used() const { return messages_used_; }
+
private:
struct MessageHeader;
struct ReadData;
@@ -151,16 +157,17 @@
// Calls DoFreeMessage if appropriate.
void DecrementMessageReferenceCount(const void *msg);
- // Should be called with data_lock_ locked.
+ // Must be called with data_lock_ locked.
// *read_data will be initialized.
// Returns with a readable message in data_ or false.
bool ReadCommonStart(int options, int *index, ReadData *read_data);
// Deals with setting/unsetting readable_ and writable_.
- // Should be called after data_lock_ has been unlocked.
+ // Must be called after data_lock_ has been unlocked.
// read_data should be the same thing that was passed in to ReadCommonStart.
void ReadCommonEnd(ReadData *read_data);
// Handles reading with kPeek.
- void *ReadPeek(int options, int start);
+ // start can be -1 if options has kFromEnd set.
+ void *ReadPeek(int options, int start) const;
// Gets called by Fetch when necessary (with placement new).
RawQueue(const char *name, size_t length, int hash, int queue_length);
diff --git a/aos/linux_code/ipc_lib/queue_test.cc b/aos/linux_code/ipc_lib/queue_test.cc
deleted file mode 100644
index d05b20a..0000000
--- a/aos/linux_code/ipc_lib/queue_test.cc
+++ /dev/null
@@ -1,465 +0,0 @@
-#include "aos/common/queue.h"
-
-#include <unistd.h>
-#include <sys/mman.h>
-#include <inttypes.h>
-
-#include <ostream>
-#include <memory>
-#include <map>
-
-#include "gtest/gtest.h"
-
-#include "aos/linux_code/ipc_lib/core_lib.h"
-#include "aos/common/type_traits.h"
-#include "aos/common/queue_testutils.h"
-#include "aos/common/time.h"
-#include "aos/common/logging/logging.h"
-#include "aos/common/die.h"
-
-using ::testing::AssertionResult;
-using ::testing::AssertionSuccess;
-using ::testing::AssertionFailure;
-using ::aos::common::testing::GlobalCoreInstance;
-
-namespace aos {
-namespace testing {
-
-class QueueTest : public ::testing::Test {
- protected:
- static const size_t kFailureSize = 400;
- static char *fatal_failure;
- private:
- enum class ResultType : uint8_t {
- NotCalled,
- Called,
- Returned,
- };
- const std::string ResultTypeString(volatile const ResultType &result) {
- switch (result) {
- case ResultType::Returned:
- return "Returned";
- case ResultType::Called:
- return "Called";
- case ResultType::NotCalled:
- return "NotCalled";
- default:
- return std::string("unknown(" + static_cast<uint8_t>(result)) + ")";
- }
- }
- static_assert(aos::shm_ok<ResultType>::value,
- "this will get put in shared memory");
- template<typename T>
- struct FunctionToCall {
- FunctionToCall() : result(ResultType::NotCalled) {
- started.Lock();
- }
-
- volatile ResultType result;
- bool expected;
- void (*function)(T*, char*);
- T *arg;
- volatile char failure[kFailureSize];
- Mutex started;
- };
- template<typename T>
- static void Hangs_(FunctionToCall<T> *const to_call) {
- to_call->started.Unlock();
- to_call->result = ResultType::Called;
- to_call->function(to_call->arg, const_cast<char *>(to_call->failure));
- to_call->result = ResultType::Returned;
- }
-
- // How long until a function is considered to have hung.
- static constexpr time::Time kHangTime = time::Time::InSeconds(0.035);
- // How long to sleep after forking (for debugging).
- static constexpr time::Time kForkSleep = time::Time::InSeconds(0);
-
- // Represents a process that has been forked off. The destructor kills the
- // process and wait(2)s for it.
- class ForkedProcess {
- public:
- ForkedProcess(pid_t pid, mutex *lock) : pid_(pid), lock_(lock) {};
- ~ForkedProcess() {
- if (kill(pid_, SIGINT) == -1) {
- if (errno == ESRCH) {
- printf("process %jd was already dead\n", static_cast<intmax_t>(pid_));
- } else {
- fprintf(stderr, "kill(SIGKILL, %jd) failed with %d: %s\n",
- static_cast<intmax_t>(pid_), errno, strerror(errno));
- }
- return;
- }
- const pid_t ret = wait(NULL);
- if (ret == -1) {
- LOG(WARNING, "wait(NULL) failed."
- " child %jd might still be alive\n",
- static_cast<intmax_t>(pid_));
- } else if (ret == 0) {
- LOG(WARNING, "child %jd wasn't waitable. it might still be alive\n",
- static_cast<intmax_t>(pid_));
- } else if (ret != pid_) {
- LOG(WARNING, "child %d is now confirmed dead"
- ", but child %jd might still be alive\n",
- ret, static_cast<intmax_t>(pid_));
- }
- }
-
- enum class JoinResult {
- Finished, Hung, Error
- };
- JoinResult Join(time::Time timeout = kHangTime) {
- timespec lock_timeout = (kForkSleep + timeout).ToTimespec();
- switch (mutex_lock_timeout(lock_, &lock_timeout)) {
- case 2:
- return JoinResult::Hung;
- case 0:
- return JoinResult::Finished;
- default:
- return JoinResult::Error;
- }
- }
-
- private:
- const pid_t pid_;
- mutex *const lock_;
- } __attribute__((unused));
-
- // State for HangsFork and HangsCheck.
- typedef uint8_t ChildID;
- static void ReapExitHandler() {
- for (auto it = children_.begin(); it != children_.end(); ++it) {
- delete it->second;
- }
- }
- static std::map<ChildID, ForkedProcess *> children_;
- std::map<ChildID, FunctionToCall<void> *> to_calls_;
-
- void SetUp() override {
- ::testing::Test::SetUp();
-
- SetDieTestMode(true);
-
- fatal_failure = static_cast<char *>(shm_malloc(sizeof(fatal_failure)));
- static bool registered = false;
- if (!registered) {
- atexit(ReapExitHandler);
- registered = true;
- }
- }
-
- protected:
- // function gets called with arg in a forked process.
- // Leaks shared memory.
- template<typename T> __attribute__((warn_unused_result))
- std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
- mutex *lock = static_cast<mutex *>(shm_malloc_aligned(
- sizeof(*lock), sizeof(int)));
- assert(mutex_lock(lock) == 0);
- const pid_t pid = fork();
- switch (pid) {
- case 0: // child
- if (kForkSleep != time::Time(0, 0)) {
- LOG(INFO, "pid %jd sleeping for %ds%dns\n",
- static_cast<intmax_t>(getpid()),
- kForkSleep.sec(), kForkSleep.nsec());
- time::SleepFor(kForkSleep);
- }
- ::aos::common::testing::PreventExit();
- function(arg);
- mutex_unlock(lock);
- exit(EXIT_SUCCESS);
- case -1: // parent failure
- LOG(ERROR, "fork() failed with %d: %s\n", errno, strerror(errno));
- return std::unique_ptr<ForkedProcess>();
- default: // parent
- return std::unique_ptr<ForkedProcess>(new ForkedProcess(pid, lock));
- }
- }
-
- // Checks whether or not the given function hangs.
- // expected is whether to return success or failure if the function hangs
- // NOTE: There are other reasons for it to return a failure than the function
- // doing the wrong thing.
- // Leaks shared memory.
- template<typename T>
- AssertionResult Hangs(void (*function)(T*, char*), T *arg, bool expected) {
- AssertionResult fork_result(HangsFork<T>(function, arg, expected, 0));
- if (!fork_result) {
- return fork_result;
- }
- return HangsCheck(0);
- }
- // Starts the first part of Hangs.
- // Use HangsCheck to get the result.
- // Returns whether the fork succeeded or not, NOT whether or not the hang
- // check succeeded.
- template<typename T>
- AssertionResult HangsFork(void (*function)(T*, char *), T *arg,
- bool expected, ChildID id) {
- static_assert(aos::shm_ok<FunctionToCall<T>>::value,
- "this is going into shared memory");
- FunctionToCall<T> *const to_call =
- static_cast<FunctionToCall<T> *>(
- shm_malloc_aligned(sizeof(*to_call), alignof(FunctionToCall<T>)));
- new (to_call) FunctionToCall<T>();
- to_call->function = function;
- to_call->arg = arg;
- to_call->expected = expected;
- to_call->failure[0] = '\0';
- static_cast<char *>(fatal_failure)[0] = '\0';
- children_[id] = ForkExecute(Hangs_, to_call).release();
- if (!children_[id]) return AssertionFailure() << "ForkExecute failed";
- to_calls_[id] = reinterpret_cast<FunctionToCall<void> *>(to_call);
- to_call->started.Lock();
- return AssertionSuccess();
- }
- // Checks whether or not a function hung like it was supposed to.
- // Use HangsFork first.
- // NOTE: calls to HangsFork and HangsCheck with the same id argument will
- // correspond, but they do not nest. Also, id 0 is used by Hangs.
- // Return value is the same as Hangs.
- AssertionResult HangsCheck(ChildID id) {
- std::unique_ptr<ForkedProcess> child(children_[id]);
- children_.erase(id);
- const ForkedProcess::JoinResult result = child->Join();
- if (to_calls_[id]->failure[0] != '\0') {
- return AssertionFailure() << "function says: "
- << const_cast<char *>(to_calls_[id]->failure);
- }
- if (result == ForkedProcess::JoinResult::Finished) {
- return !to_calls_[id]->expected ? AssertionSuccess() : (AssertionFailure()
- << "something happened and the the test only got to "
- << ResultTypeString(to_calls_[id]->result));
- } else {
- if (to_calls_[id]->result == ResultType::Called) {
- return to_calls_[id]->expected ? AssertionSuccess() :
- AssertionFailure();
- } else if (result == ForkedProcess::JoinResult::Error) {
- return AssertionFailure() << "error joining child";
- } else {
- abort();
- return AssertionFailure() << "something weird happened";
- }
- }
- }
-#define EXPECT_HANGS(function, arg) \
- EXPECT_HANGS_COND(function, arg, true, EXPECT_TRUE)
-#define EXPECT_RETURNS(function, arg) \
- EXPECT_HANGS_COND(function, arg, false, EXPECT_TRUE)
-#define EXPECT_RETURNS_FAILS(function, arg) \
- EXPECT_HANGS_COND(function, arg, false, EXPECT_FALSE)
-#define EXPECT_HANGS_COND(function, arg, hangs, cond) do { \
- cond(Hangs(function, arg, hangs)); \
- if (fatal_failure[0] != '\0') { \
- FAIL() << fatal_failure; \
- } \
-} while (false)
-
- struct TestMessage {
- // Some contents because we don't really want to test empty messages.
- int16_t data;
- };
- struct MessageArgs {
- RawQueue *const queue;
- int flags;
- int16_t data; // -1 means NULL expected
- };
- static void WriteTestMessage(MessageArgs *args, char *failure) {
- TestMessage *msg = static_cast<TestMessage *>(args->queue->GetMessage());
- if (msg == NULL) {
- snprintf(fatal_failure, kFailureSize,
- "couldn't get_msg from %p", args->queue);
- return;
- }
- msg->data = args->data;
- if (!args->queue->WriteMessage(msg, args->flags)) {
- snprintf(failure, kFailureSize, "write_msg_free(%p, %p, %d) failed",
- args->queue, msg, args->flags);
- }
- }
- static void ReadTestMessage(MessageArgs *args, char *failure) {
- const TestMessage *msg = static_cast<const TestMessage *>(
- args->queue->ReadMessage(args->flags));
- if (msg == NULL) {
- if (args->data != -1) {
- snprintf(failure, kFailureSize,
- "expected data of %" PRId16 " but got NULL message",
- args->data);
- }
- } else {
- if (args->data != msg->data) {
- snprintf(failure, kFailureSize,
- "expected data of %" PRId16 " but got %" PRId16 " instead",
- args->data, msg->data);
- }
- args->queue->FreeMessage(msg);
- }
- }
-
- private:
- GlobalCoreInstance my_core;
-};
-char *QueueTest::fatal_failure;
-std::map<QueueTest::ChildID, QueueTest::ForkedProcess *> QueueTest::children_;
-constexpr time::Time QueueTest::kHangTime;
-constexpr time::Time QueueTest::kForkSleep;
-
-TEST_F(QueueTest, Reading) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
- MessageArgs args{queue, 0, -1};
-
- args.flags = RawQueue::kNonBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = RawQueue::kNonBlock | RawQueue::kPeek;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = 0;
- EXPECT_HANGS(ReadTestMessage, &args);
- args.flags = RawQueue::kPeek;
- EXPECT_HANGS(ReadTestMessage, &args);
- args.data = 254;
- args.flags = RawQueue::kBlock;
- EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = RawQueue::kPeek;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = RawQueue::kPeek;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = RawQueue::kPeek | RawQueue::kNonBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = 0;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = 0;
- args.data = -1;
- EXPECT_HANGS(ReadTestMessage, &args);
- args.flags = RawQueue::kNonBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = 0;
- args.data = 971;
- EXPECT_RETURNS_FAILS(ReadTestMessage, &args);
-}
-TEST_F(QueueTest, Writing) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
- MessageArgs args{queue, 0, 973};
-
- args.flags = RawQueue::kBlock;
- EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = RawQueue::kBlock;
- EXPECT_HANGS(WriteTestMessage, &args);
- args.flags = RawQueue::kNonBlock;
- EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
- args.flags = RawQueue::kNonBlock;
- EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
- args.flags = RawQueue::kPeek;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.data = 971;
- args.flags = RawQueue::kOverride;
- EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = RawQueue::kOverride;
- EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = 0;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = RawQueue::kNonBlock;
- EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = 0;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = RawQueue::kOverride;
- EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = 0;
- EXPECT_RETURNS(ReadTestMessage, &args);
-}
-
-TEST_F(QueueTest, MultiRead) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
- MessageArgs args{queue, 0, 1323};
-
- args.flags = RawQueue::kBlock;
- EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = RawQueue::kBlock;
- ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
- ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
- EXPECT_TRUE(HangsCheck(1) != HangsCheck(2));
- // TODO(brians) finish this
-}
-
-// There used to be a bug where reading first without an index and then with an
-// index would crash. This test makes sure that's fixed.
-TEST_F(QueueTest, ReadIndexAndNot) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
-
- // Write a message, read it (with ReadMessage), and then write another
- // message (before freeing the read one so the queue allocates a distinct
- // message to use for it).
- TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
- ASSERT_NE(nullptr, msg);
- ASSERT_TRUE(queue->WriteMessage(msg, 0));
- const void *read_msg = queue->ReadMessage(0);
- EXPECT_NE(nullptr, read_msg);
- msg = static_cast<TestMessage *>(queue->GetMessage());
- queue->FreeMessage(read_msg);
- ASSERT_NE(nullptr, msg);
- ASSERT_TRUE(queue->WriteMessage(msg, 0));
-
- int index = 0;
- const void *second_read_msg = queue->ReadMessageIndex(0, &index);
- EXPECT_NE(nullptr, second_read_msg);
- EXPECT_NE(read_msg, second_read_msg)
- << "We already took that message out of the queue.";
-}
-
-TEST_F(QueueTest, Recycle) {
- // TODO(brians) basic test of recycle queue
- // include all of the ways a message can get into the recycle queue
- RawQueue *recycle_queue = reinterpret_cast<RawQueue *>(23);
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage),
- 1, 2, 2, 2, &recycle_queue);
- ASSERT_NE(reinterpret_cast<RawQueue *>(23), recycle_queue);
- MessageArgs args{queue, 0, 973}, recycle{recycle_queue, 0, 973};
-
- args.flags = RawQueue::kBlock;
- EXPECT_RETURNS(WriteTestMessage, &args);
- EXPECT_HANGS(ReadTestMessage, &recycle);
- args.data = 254;
- EXPECT_RETURNS(WriteTestMessage, &args);
- EXPECT_HANGS(ReadTestMessage, &recycle);
- args.data = 971;
- args.flags = RawQueue::kOverride;
- EXPECT_RETURNS(WriteTestMessage, &args);
- recycle.flags = RawQueue::kBlock;
- EXPECT_RETURNS(ReadTestMessage, &recycle);
-
- EXPECT_HANGS(ReadTestMessage, &recycle);
-
- TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
- ASSERT_TRUE(msg != NULL);
- msg->data = 341;
- queue->FreeMessage(msg);
- recycle.data = 341;
- EXPECT_RETURNS(ReadTestMessage, &recycle);
-
- EXPECT_HANGS(ReadTestMessage, &recycle);
-
- args.data = 254;
- args.flags = RawQueue::kPeek;
- EXPECT_RETURNS(ReadTestMessage, &args);
- recycle.flags = RawQueue::kBlock;
- EXPECT_HANGS(ReadTestMessage, &recycle);
- args.flags = RawQueue::kBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- recycle.data = 254;
- EXPECT_RETURNS(ReadTestMessage, &recycle);
-}
-
-// Makes sure that when a message doesn't get written with kNonBlock it does get
-// freed.
-TEST_F(QueueTest, NonBlockFailFree) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
-
- void *message1 = queue->GetMessage();
- void *message2 = queue->GetMessage();
- ASSERT_TRUE(queue->WriteMessage(message1, RawQueue::kNonBlock));
- ASSERT_FALSE(queue->WriteMessage(message2, RawQueue::kNonBlock));
- EXPECT_EQ(message2, queue->GetMessage());
-}
-
-} // namespace testing
-} // namespace aos
diff --git a/aos/linux_code/ipc_lib/raw_queue_test.cc b/aos/linux_code/ipc_lib/raw_queue_test.cc
new file mode 100644
index 0000000..0c0308a
--- /dev/null
+++ b/aos/linux_code/ipc_lib/raw_queue_test.cc
@@ -0,0 +1,762 @@
+#include "aos/common/queue.h"
+
+#include <unistd.h>
+#include <sys/mman.h>
+#include <inttypes.h>
+
+#include <ostream>
+#include <memory>
+#include <map>
+
+#include "gtest/gtest.h"
+
+#include "aos/linux_code/ipc_lib/core_lib.h"
+#include "aos/common/type_traits.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/time.h"
+#include "aos/common/logging/logging.h"
+#include "aos/common/die.h"
+
+using ::testing::AssertionResult;
+using ::testing::AssertionSuccess;
+using ::testing::AssertionFailure;
+using ::aos::common::testing::GlobalCoreInstance;
+
+namespace aos {
+namespace testing {
+
+class RawQueueTest : public ::testing::Test {
+ protected:
+ static const size_t kFailureSize = 400;
+ static char *fatal_failure;
+ private:
+ enum class ResultType : uint8_t {
+ NotCalled,
+ Called,
+ Returned,
+ };
+ const std::string ResultTypeString(volatile const ResultType &result) {
+ switch (result) {
+ case ResultType::Returned:
+ return "Returned";
+ case ResultType::Called:
+ return "Called";
+ case ResultType::NotCalled:
+ return "NotCalled";
+ default:
+ return std::string("unknown(" + static_cast<uint8_t>(result)) + ")";
+ }
+ }
+ static_assert(aos::shm_ok<ResultType>::value,
+ "this will get put in shared memory");
+ template<typename T>
+ struct FunctionToCall {
+ FunctionToCall() : result(ResultType::NotCalled) {
+ started.Lock();
+ }
+
+ volatile ResultType result;
+ bool expected;
+ void (*function)(T*, char*);
+ T *arg;
+ volatile char failure[kFailureSize];
+ Mutex started;
+ };
+ template<typename T>
+ static void Hangs_(FunctionToCall<T> *const to_call) {
+ to_call->started.Unlock();
+ to_call->result = ResultType::Called;
+ to_call->function(to_call->arg, const_cast<char *>(to_call->failure));
+ to_call->result = ResultType::Returned;
+ }
+
+ // How long until a function is considered to have hung.
+ static constexpr time::Time kHangTime = time::Time::InSeconds(0.035);
+ // How long to sleep after forking (for debugging).
+ static constexpr time::Time kForkSleep = time::Time::InSeconds(0);
+
+ // Represents a process that has been forked off. The destructor kills the
+ // process and wait(2)s for it.
+ class ForkedProcess {
+ public:
+ ForkedProcess(pid_t pid, mutex *lock) : pid_(pid), lock_(lock) {};
+ ~ForkedProcess() {
+ if (kill(pid_, SIGINT) == -1) {
+ if (errno == ESRCH) {
+ printf("process %jd was already dead\n", static_cast<intmax_t>(pid_));
+ } else {
+ fprintf(stderr, "kill(SIGKILL, %jd) failed with %d: %s\n",
+ static_cast<intmax_t>(pid_), errno, strerror(errno));
+ }
+ return;
+ }
+ const pid_t ret = wait(NULL);
+ if (ret == -1) {
+ LOG(WARNING, "wait(NULL) failed."
+ " child %jd might still be alive\n",
+ static_cast<intmax_t>(pid_));
+ } else if (ret == 0) {
+ LOG(WARNING, "child %jd wasn't waitable. it might still be alive\n",
+ static_cast<intmax_t>(pid_));
+ } else if (ret != pid_) {
+ LOG(WARNING, "child %d is now confirmed dead"
+ ", but child %jd might still be alive\n",
+ ret, static_cast<intmax_t>(pid_));
+ }
+ }
+
+ enum class JoinResult {
+ Finished, Hung, Error
+ };
+ JoinResult Join(time::Time timeout = kHangTime) {
+ timespec lock_timeout = (kForkSleep + timeout).ToTimespec();
+ switch (mutex_lock_timeout(lock_, &lock_timeout)) {
+ case 2:
+ return JoinResult::Hung;
+ case 0:
+ return JoinResult::Finished;
+ default:
+ return JoinResult::Error;
+ }
+ }
+
+ private:
+ const pid_t pid_;
+ mutex *const lock_;
+ } __attribute__((unused));
+
+ // State for HangsFork and HangsCheck.
+ typedef uint8_t ChildID;
+ static void ReapExitHandler() {
+ for (auto it = children_.begin(); it != children_.end(); ++it) {
+ delete it->second;
+ }
+ }
+ static std::map<ChildID, ForkedProcess *> children_;
+ std::map<ChildID, FunctionToCall<void> *> to_calls_;
+
+ void SetUp() override {
+ ::testing::Test::SetUp();
+
+ SetDieTestMode(true);
+
+ fatal_failure = static_cast<char *>(shm_malloc(sizeof(fatal_failure)));
+ static bool registered = false;
+ if (!registered) {
+ atexit(ReapExitHandler);
+ registered = true;
+ }
+ }
+
+ protected:
+ // function gets called with arg in a forked process.
+ // Leaks shared memory.
+ template<typename T> __attribute__((warn_unused_result))
+ std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
+ mutex *lock = static_cast<mutex *>(shm_malloc_aligned(
+ sizeof(*lock), sizeof(int)));
+ assert(mutex_lock(lock) == 0);
+ const pid_t pid = fork();
+ switch (pid) {
+ case 0: // child
+ if (kForkSleep != time::Time(0, 0)) {
+ LOG(INFO, "pid %jd sleeping for %ds%dns\n",
+ static_cast<intmax_t>(getpid()),
+ kForkSleep.sec(), kForkSleep.nsec());
+ time::SleepFor(kForkSleep);
+ }
+ ::aos::common::testing::PreventExit();
+ function(arg);
+ mutex_unlock(lock);
+ exit(EXIT_SUCCESS);
+ case -1: // parent failure
+ LOG(ERROR, "fork() failed with %d: %s\n", errno, strerror(errno));
+ return std::unique_ptr<ForkedProcess>();
+ default: // parent
+ return std::unique_ptr<ForkedProcess>(new ForkedProcess(pid, lock));
+ }
+ }
+
+ // Checks whether or not the given function hangs.
+ // expected is whether to return success or failure if the function hangs
+ // NOTE: There are other reasons for it to return a failure than the function
+ // doing the wrong thing.
+ // Leaks shared memory.
+ template<typename T>
+ AssertionResult Hangs(void (*function)(T*, char*), T *arg, bool expected) {
+ AssertionResult fork_result(HangsFork<T>(function, arg, expected, 0));
+ if (!fork_result) {
+ return fork_result;
+ }
+ return HangsCheck(0);
+ }
+ // Starts the first part of Hangs.
+ // Use HangsCheck to get the result.
+ // Returns whether the fork succeeded or not, NOT whether or not the hang
+ // check succeeded.
+ template<typename T>
+ AssertionResult HangsFork(void (*function)(T*, char *), T *arg,
+ bool expected, ChildID id) {
+ static_assert(aos::shm_ok<FunctionToCall<T>>::value,
+ "this is going into shared memory");
+ FunctionToCall<T> *const to_call =
+ static_cast<FunctionToCall<T> *>(
+ shm_malloc_aligned(sizeof(*to_call), alignof(FunctionToCall<T>)));
+ new (to_call) FunctionToCall<T>();
+ to_call->function = function;
+ to_call->arg = arg;
+ to_call->expected = expected;
+ to_call->failure[0] = '\0';
+ static_cast<char *>(fatal_failure)[0] = '\0';
+ children_[id] = ForkExecute(Hangs_, to_call).release();
+ if (!children_[id]) return AssertionFailure() << "ForkExecute failed";
+ to_calls_[id] = reinterpret_cast<FunctionToCall<void> *>(to_call);
+ to_call->started.Lock();
+ return AssertionSuccess();
+ }
+ // Checks whether or not a function hung like it was supposed to.
+ // Use HangsFork first.
+ // NOTE: calls to HangsFork and HangsCheck with the same id argument will
+ // correspond, but they do not nest. Also, id 0 is used by Hangs.
+ // Return value is the same as Hangs.
+ AssertionResult HangsCheck(ChildID id) {
+ std::unique_ptr<ForkedProcess> child(children_[id]);
+ children_.erase(id);
+ const ForkedProcess::JoinResult result = child->Join();
+ if (to_calls_[id]->failure[0] != '\0') {
+ return AssertionFailure() << "function says: "
+ << const_cast<char *>(to_calls_[id]->failure);
+ }
+ if (result == ForkedProcess::JoinResult::Finished) {
+ return !to_calls_[id]->expected ? AssertionSuccess() : (AssertionFailure()
+ << "something happened and the the test only got to "
+ << ResultTypeString(to_calls_[id]->result));
+ } else {
+ if (to_calls_[id]->result == ResultType::Called) {
+ return to_calls_[id]->expected ? AssertionSuccess() :
+ AssertionFailure();
+ } else if (result == ForkedProcess::JoinResult::Error) {
+ return AssertionFailure() << "error joining child";
+ } else {
+ abort();
+ return AssertionFailure() << "something weird happened";
+ }
+ }
+ }
+#define EXPECT_HANGS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, true, EXPECT_TRUE)
+#define EXPECT_RETURNS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, false, EXPECT_TRUE)
+#define EXPECT_RETURNS_FAILS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, false, EXPECT_FALSE)
+#define EXPECT_HANGS_COND(function, arg, hangs, cond) do { \
+ cond(Hangs(function, arg, hangs)); \
+ if (fatal_failure[0] != '\0') { \
+ FAIL() << fatal_failure; \
+ } \
+} while (false)
+
+ struct TestMessage {
+ // Some contents because we don't really want to test empty messages.
+ int16_t data;
+ };
+ struct MessageArgs {
+ RawQueue *const queue;
+ int flags;
+ int16_t data; // -1 means NULL expected
+ };
+ static void WriteTestMessage(MessageArgs *args, char *failure) {
+ TestMessage *msg = static_cast<TestMessage *>(args->queue->GetMessage());
+ if (msg == NULL) {
+ snprintf(fatal_failure, kFailureSize,
+ "couldn't get_msg from %p", args->queue);
+ return;
+ }
+ msg->data = args->data;
+ if (!args->queue->WriteMessage(msg, args->flags)) {
+ snprintf(failure, kFailureSize, "write_msg_free(%p, %p, %d) failed",
+ args->queue, msg, args->flags);
+ }
+ }
+ static void ReadTestMessage(MessageArgs *args, char *failure) {
+ const TestMessage *msg = static_cast<const TestMessage *>(
+ args->queue->ReadMessage(args->flags));
+ if (msg == NULL) {
+ if (args->data != -1) {
+ snprintf(failure, kFailureSize,
+ "expected data of %" PRId16 " but got NULL message",
+ args->data);
+ }
+ } else {
+ if (args->data != msg->data) {
+ snprintf(failure, kFailureSize,
+ "expected data of %" PRId16 " but got %" PRId16 " instead",
+ args->data, msg->data);
+ }
+ args->queue->FreeMessage(msg);
+ }
+ }
+
+ void PushMessage(RawQueue *queue, uint16_t data) {
+ TestMessage *message = static_cast<TestMessage *>(queue->GetMessage());
+ message->data = data;
+ ASSERT_TRUE(queue->WriteMessage(message, RawQueue::kOverride));
+ }
+
+ private:
+ GlobalCoreInstance my_core;
+};
+char *RawQueueTest::fatal_failure;
+std::map<RawQueueTest::ChildID, RawQueueTest::ForkedProcess *>
+ RawQueueTest::children_;
+constexpr time::Time RawQueueTest::kHangTime;
+constexpr time::Time RawQueueTest::kForkSleep;
+
+TEST_F(RawQueueTest, Reading) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ MessageArgs args{queue, 0, -1};
+
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kNonBlock | RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.flags = RawQueue::kPeek;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.data = 254;
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kPeek | RawQueue::kNonBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ args.data = -1;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ args.data = 971;
+ EXPECT_RETURNS_FAILS(ReadTestMessage, &args);
+}
+TEST_F(RawQueueTest, Writing) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ MessageArgs args{queue, 0, 973};
+
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ EXPECT_HANGS(WriteTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
+ args.flags = RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.data = 971;
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+}
+
+TEST_F(RawQueueTest, MultiRead) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ MessageArgs args{queue, 0, 1323};
+
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
+ ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
+ EXPECT_TRUE(HangsCheck(1) != HangsCheck(2));
+ // TODO(brians) finish this
+}
+
+// There used to be a bug where reading first without an index and then with an
+// index would crash. This test makes sure that's fixed.
+TEST_F(RawQueueTest, ReadIndexAndNot) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+
+ // Write a message, read it (with ReadMessage), and then write another
+ // message (before freeing the read one so the queue allocates a distinct
+ // message to use for it).
+ TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
+ ASSERT_NE(nullptr, msg);
+ ASSERT_TRUE(queue->WriteMessage(msg, 0));
+ const void *read_msg = queue->ReadMessage(0);
+ EXPECT_NE(nullptr, read_msg);
+ msg = static_cast<TestMessage *>(queue->GetMessage());
+ queue->FreeMessage(read_msg);
+ ASSERT_NE(nullptr, msg);
+ ASSERT_TRUE(queue->WriteMessage(msg, 0));
+
+ int index = 0;
+ const void *second_read_msg = queue->ReadMessageIndex(0, &index);
+ EXPECT_NE(nullptr, second_read_msg);
+ EXPECT_NE(read_msg, second_read_msg)
+ << "We already took that message out of the queue.";
+}
+
+TEST_F(RawQueueTest, Recycle) {
+ // TODO(brians) basic test of recycle queue
+ // include all of the ways a message can get into the recycle queue
+ RawQueue *recycle_queue = reinterpret_cast<RawQueue *>(23);
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage),
+ 1, 2, 2, 2, &recycle_queue);
+ ASSERT_NE(reinterpret_cast<RawQueue *>(23), recycle_queue);
+ MessageArgs args{queue, 0, 973}, recycle{recycle_queue, 0, 973};
+
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.data = 254;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.data = 971;
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ recycle.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+
+ TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
+ ASSERT_TRUE(msg != NULL);
+ msg->data = 341;
+ queue->FreeMessage(msg);
+ recycle.data = 341;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+
+ args.data = 254;
+ args.flags = RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ recycle.flags = RawQueue::kBlock;
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ recycle.data = 254;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+}
+
+// Makes sure that when a message doesn't get written with kNonBlock it does get
+// freed.
+TEST_F(RawQueueTest, NonBlockFailFree) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+
+ void *message1 = queue->GetMessage();
+ void *message2 = queue->GetMessage();
+ ASSERT_TRUE(queue->WriteMessage(message1, RawQueue::kNonBlock));
+ ASSERT_FALSE(queue->WriteMessage(message2, RawQueue::kNonBlock));
+ EXPECT_EQ(message2, queue->GetMessage());
+}
+
+TEST_F(RawQueueTest, ReadIndexNotFull) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ EXPECT_EQ(0, queue->messages_used());
+ PushMessage(queue, 971);
+ EXPECT_EQ(1, queue->messages_used());
+
+ int index = 0;
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(971, message->data);
+ EXPECT_EQ(1, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ PushMessage(queue, 1768);
+ EXPECT_EQ(2, queue->messages_used());
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(1768, message->data);
+ EXPECT_EQ(2, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ PushMessage(queue, 254);
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(254, message->data);
+ EXPECT_EQ(3, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ EXPECT_EQ(2, queue->messages_used());
+}
+
+TEST_F(RawQueueTest, ReadIndexNotBehind) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ EXPECT_EQ(0, queue->messages_used());
+ PushMessage(queue, 971);
+ EXPECT_EQ(1, queue->messages_used());
+ PushMessage(queue, 1768);
+ EXPECT_EQ(2, queue->messages_used());
+
+ int index = 0;
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(971, message->data);
+ EXPECT_EQ(1, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+}
+
+TEST_F(RawQueueTest, ReadIndexLittleBehindNotFull) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+ int index = 0;
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(1768, message->data);
+ EXPECT_EQ(2, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+}
+
+TEST_F(RawQueueTest, ReadIndexMoreBehind) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 254);
+
+ int index = 0;
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(1768, message->data);
+ EXPECT_EQ(2, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(254, message->data);
+ EXPECT_EQ(3, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+}
+
+TEST_F(RawQueueTest, ReadIndexMoreBehindNotFull) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 254);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+ int index = 0;
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(254, message->data);
+ EXPECT_EQ(3, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+}
+
+TEST_F(RawQueueTest, ReadIndexLotBehind) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ {
+ const void *message1, *message2;
+ message1 = queue->ReadMessage(RawQueue::kNonBlock);
+ ASSERT_NE(nullptr, message1);
+ PushMessage(queue, 254);
+ message2 = queue->ReadMessage(RawQueue::kNonBlock);
+ ASSERT_NE(nullptr, message2);
+ PushMessage(queue, 973);
+ EXPECT_EQ(4, queue->messages_used());
+ queue->FreeMessage(message1);
+ EXPECT_EQ(3, queue->messages_used());
+ queue->FreeMessage(message2);
+ EXPECT_EQ(2, queue->messages_used());
+ }
+
+ int index = 0;
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(254, message->data);
+ EXPECT_EQ(3, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(973, message->data);
+ EXPECT_EQ(4, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+}
+
+TEST_F(RawQueueTest, ReadIndexLotBehindNotFull) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 254);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 973);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+ int index = 0;
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(973, message->data);
+ EXPECT_EQ(4, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+}
+
+TEST_F(RawQueueTest, ReadIndexEvenMoreBehind) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 254);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 973);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 1114);
+
+ int index = 0;
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(973, message->data);
+ EXPECT_EQ(4, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(1114, message->data);
+ EXPECT_EQ(5, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+}
+
+TEST_F(RawQueueTest, ReadIndexEvenMoreBehindNotFull) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+ const TestMessage *message, *peek_message;
+
+ PushMessage(queue, 971);
+ PushMessage(queue, 1768);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 254);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 973);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+ PushMessage(queue, 1114);
+ ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+ int index = 0;
+
+ peek_message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+ message = static_cast<const TestMessage *>(
+ queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+ ASSERT_NE(nullptr, message);
+ EXPECT_EQ(message, peek_message);
+ EXPECT_EQ(1114, message->data);
+ EXPECT_EQ(5, index);
+ queue->FreeMessage(message);
+ queue->FreeMessage(peek_message);
+}
+
+} // namespace testing
+} // namespace aos
diff --git a/aos/linux_code/logging/binary_log_file.cc b/aos/linux_code/logging/binary_log_file.cc
index 007ed9b..896422c 100644
--- a/aos/linux_code/logging/binary_log_file.cc
+++ b/aos/linux_code/logging/binary_log_file.cc
@@ -6,6 +6,8 @@
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
+#include <signal.h>
+#include <setjmp.h>
namespace aos {
namespace logging {
@@ -111,11 +113,20 @@
if (current_ == MAP_FAILED) {
LOG(FATAL,
"mmap(NULL, %zd, PROT_READ [ | PROT_WRITE], MAP_SHARED, %d, %jd)"
- " failed with %d: %s. aborting\n",
+ " failed with %d: %s\n",
kPageSize, fd_, static_cast<intmax_t>(offset_), errno,
strerror(errno));
}
+ if (madvise(current_, kPageSize, MADV_SEQUENTIAL | MADV_WILLNEED) == -1) {
+ LOG(WARNING, "madvise(%p, %zd, MADV_SEQUENTIAL | MADV_WILLNEED)"
+ " failed with %d: %s\n",
+ current_, kPageSize, errno, strerror(errno));
+ }
offset_ += kPageSize;
+
+ if (!writable_) {
+ CheckCurrentPageReadable();
+ }
}
void LogFileAccessor::Unmap(void *location) {
@@ -126,6 +137,59 @@
is_last_page_ = 0;
}
+// This mess is because the only not completely hackish way to do this is to set
+// up a handler for SIGBUS/SIGSEGV that siglongjmps out to avoid either the
+// instruction being repeated infinitely (and more signals being delivered) or
+// (with SA_RESETHAND) the signal killing the process.
+namespace {
+
+void *volatile fault_address;
+sigjmp_buf jump_context;
+
+void CheckCurrentPageReadableHandler(int /*signal*/, siginfo_t *info, void *) {
+ fault_address = info->si_addr;
+
+ siglongjmp(jump_context, 1);
+}
+
+} // namespace
+void LogFileAccessor::CheckCurrentPageReadable() {
+ if (sigsetjmp(jump_context, 1) == 0) {
+ struct sigaction action;
+ action.sa_sigaction = CheckCurrentPageReadableHandler;
+ sigemptyset(&action.sa_mask);
+ action.sa_flags = SA_RESETHAND | SA_SIGINFO;
+ struct sigaction previous_bus, previous_segv;
+ if (sigaction(SIGBUS, &action, &previous_bus) == -1) {
+ LOG(FATAL, "sigaction(SIGBUS(=%d), %p, %p) failed with %d: %s\n",
+ SIGBUS, &action, &previous_bus, errno, strerror(errno));
+ }
+ if (sigaction(SIGSEGV, &action, &previous_segv) == -1) {
+ LOG(FATAL, "sigaction(SIGSEGV(=%d), %p, %p) failed with %d: %s\n",
+ SIGSEGV, &action, &previous_segv, errno, strerror(errno));
+ }
+
+ char __attribute__((unused)) c = current_[0];
+
+ if (sigaction(SIGBUS, &previous_bus, NULL) == -1) {
+ LOG(FATAL, "sigaction(SIGBUS(=%d), %p, NULL) failed with %d: %s\n",
+ SIGBUS, &previous_bus, errno, strerror(errno));
+ }
+ if (sigaction(SIGSEGV, &previous_segv, NULL) == -1) {
+ LOG(FATAL, "sigaction(SIGSEGV(=%d), %p, NULL) failed with %d: %s\n",
+ SIGSEGV, &previous_segv, errno, strerror(errno));
+ }
+ } else {
+ if (fault_address == current_) {
+ LOG(FATAL, "could not read 1 byte at offset 0x%jx into log file\n",
+ static_cast<uintmax_t>(offset_));
+ } else {
+ LOG(FATAL, "faulted at %p, not %p like we were (maybe) supposed to\n",
+ fault_address, current_);
+ }
+ }
+}
+
} // namespace linux_code
} // namespace logging
} // namespace aos
diff --git a/aos/linux_code/logging/binary_log_file.h b/aos/linux_code/logging/binary_log_file.h
index 0f8c3fb..72aac09 100644
--- a/aos/linux_code/logging/binary_log_file.h
+++ b/aos/linux_code/logging/binary_log_file.h
@@ -118,6 +118,9 @@
void MapNextPage();
void Unmap(void *location);
+ // Tries reading from the current page to see if it fails because the file
+ // isn't big enough.
+ void CheckCurrentPageReadable();
// Advances position to the next (aligned) location.
void AlignPosition() {
diff --git a/bbb_cape/src/cape/analog.c b/bbb_cape/src/cape/analog.c
index 855098c..297afd6 100644
--- a/bbb_cape/src/cape/analog.c
+++ b/bbb_cape/src/cape/analog.c
@@ -2,8 +2,6 @@
#include <string.h>
-#include <STM32F2XX.h>
-
#include "cape/util.h"
#include "cape/led.h"
@@ -20,27 +18,13 @@
#define NUM_CHANNELS 8
+// This file handles reading values from the MCP3008-I/SL ADC.
+
uint16_t analog_readings[NUM_CHANNELS] __attribute__((aligned(8)));
static volatile int current_channel;
static volatile int partial_reading;
static volatile int frame;
-
-static void start_read(int channel) {
- // This needs to wait 13 cycles between enabling the CSEL pin and starting to
- // send data.
- // (100ns+8ns)*120MHz = 12.96
-
- // Clear the CSEL pin to select it.
- for (int i = 0; i < 9; ++i) gpio_off(CSEL_GPIO, CSEL_NUM);
- current_channel = channel;
- partial_reading = 0;
- frame = 0;
- SPI->DR = 1; // start bit
- uint16_t data = (1 << 15) /* not differential */ |
- (channel << 12);
- while (!(SPI->SR & SPI_SR_TXE));
- SPI->DR = data;
-}
+static volatile int analog_errors;
void SPI_IRQHandler(void) {
uint32_t status = SPI->SR;
@@ -50,15 +34,15 @@
frame = 1;
partial_reading = value;
} else {
+ frame = 2;
// Masking off the high bits is important because there's nothing driving
// the MISO line during the time the MCU receives them.
- analog_readings[current_channel] = (partial_reading << 16 | value) & 0x3FF;
+ analog_readings[current_channel] =
+ (partial_reading << 16 | value) & 0x3FF;
for (int i = 0; i < 100; ++i) gpio_off(CSEL_GPIO, CSEL_NUM);
gpio_on(CSEL_GPIO, CSEL_NUM);
- TIM->CR1 = TIM_CR1_OPM;
- TIM->EGR = TIM_EGR_UG;
- TIM->CR1 |= TIM_CR1_CEN;
+ current_channel = (current_channel + 1) % NUM_CHANNELS;
}
}
}
@@ -66,7 +50,27 @@
void TIM_IRQHandler(void) {
TIM->SR = ~TIM_SR_CC1IF;
- start_read((current_channel + 1) % NUM_CHANNELS);
+ if (frame != 2) {
+ // We're not done with the previous reading yet, so we're going to reset and
+ // try again.
+ // 270ns*120MHz = 32.4
+ for (int i = 0; i < 33; ++i) gpio_on(CSEL_GPIO, CSEL_NUM);
+ ++analog_errors;
+ }
+
+ // This needs to wait 13 cycles between enabling the CSEL pin and starting to
+ // send data.
+ // (100ns+8ns)*120MHz = 12.96
+
+ // Clear the CSEL pin to select it.
+ for (int i = 0; i < 9; ++i) gpio_off(CSEL_GPIO, CSEL_NUM);
+ partial_reading = 0;
+ frame = 0;
+ SPI->DR = 1; // start bit
+ uint16_t data = (1 << 15) /* not differential */ |
+ (current_channel << 12);
+ while (!(SPI->SR & SPI_SR_TXE));
+ SPI->DR = data;
}
void analog_init(void) {
@@ -87,7 +91,7 @@
NVIC_SetPriority(TIM_IRQn, 6);
NVIC_EnableIRQ(TIM_IRQn);
- TIM->CR1 = TIM_CR1_OPM;
+ TIM->CR1 = 0;
TIM->DIER = TIM_DIER_CC1IE;
TIM->CCMR1 = 0;
// Make each tick take 1500ns.
@@ -104,5 +108,17 @@
SPI->CR2 = SPI_CR2_RXNEIE;
SPI->CR1 |= SPI_CR1_SPE; // enable it
- start_read(0);
+ current_channel = 0;
+ analog_errors = 0;
+
+ TIM->EGR = TIM_EGR_UG;
+ TIM->CR1 |= TIM_CR1_CEN;
+}
+
+int analog_get_errors(void) {
+ NVIC_DisableIRQ(TIM_IRQn);
+ int r = analog_errors;
+ analog_errors = 0;
+ NVIC_EnableIRQ(TIM_IRQn);
+ return r;
}
diff --git a/bbb_cape/src/cape/analog.h b/bbb_cape/src/cape/analog.h
index 50038d5..cd9ce82 100644
--- a/bbb_cape/src/cape/analog.h
+++ b/bbb_cape/src/cape/analog.h
@@ -3,6 +3,8 @@
#include <stdint.h>
+#include <STM32F2XX.h>
+
// Starts up constantly reading analog values and storing them in an array to
// be retrieved by analog_get.
void analog_init(void);
@@ -14,4 +16,9 @@
return analog_readings[num];
}
+// Returns the number of errors since last called.
+// Must be called from something with priority equal to or lower than our
+// timer's IRQ.
+int analog_get_errors(void);
+
#endif // CAPE_ANALOG_H_
diff --git a/bbb_cape/src/cape/data_struct.h b/bbb_cape/src/cape/data_struct.h
index 4aa2a10..943e8c2 100644
--- a/bbb_cape/src/cape/data_struct.h
+++ b/bbb_cape/src/cape/data_struct.h
@@ -38,6 +38,8 @@
// contents of flash for the main code (aka what's in the .hex file).
uint32_t flash_checksum;
+ uint8_t analog_errors;
+
struct {
// If the current gyro_angle has been not updated because of a bad
// reading from the sensor.
diff --git a/bbb_cape/src/cape/fill_packet.c b/bbb_cape/src/cape/fill_packet.c
index 97f9d8a..8d1f72a 100644
--- a/bbb_cape/src/cape/fill_packet.c
+++ b/bbb_cape/src/cape/fill_packet.c
@@ -40,6 +40,7 @@
packet->uninitialized_gyro = !gyro_output.initialized;
packet->zeroing_gyro = !gyro_output.zeroed;
packet->bad_gyro = gyro_output.gyro_bad;
+ packet->analog_errors = analog_get_errors();
robot_fill_packet(packet);
//counter_update_u64_u16(×tamp, TIMESTAMP_TIM->CNT);
diff --git a/frc971/input/sensor_receiver.cc b/frc971/input/sensor_receiver.cc
index 8b04c4b..b44bb8d 100644
--- a/frc971/input/sensor_receiver.cc
+++ b/frc971/input/sensor_receiver.cc
@@ -162,6 +162,10 @@
.Send();
}
+ if (data->analog_errors != 0) {
+ LOG(WARNING, "%" PRIu8 " analog errors\n", data->analog_errors);
+ }
+
other_sensors.MakeWithBuilder()
.sonar_distance(sonar_translate(data->main.ultrasonic_pulse_length))
.Send();