Removed linux_code

Change-Id: I7327828d2c9efdf03172d1b90f49d5c51fbba86e
diff --git a/aos/ipc_lib/raw_queue_test.cc b/aos/ipc_lib/raw_queue_test.cc
new file mode 100644
index 0000000..ab98b1f
--- /dev/null
+++ b/aos/ipc_lib/raw_queue_test.cc
@@ -0,0 +1,1046 @@
+#include "aos/ipc_lib/queue.h"
+
+#include <unistd.h>
+#include <sys/mman.h>
+#include <inttypes.h>
+
+#include <chrono>
+#include <ostream>
+#include <memory>
+#include <map>
+
+#include "gtest/gtest.h"
+
+#include "aos/ipc_lib/core_lib.h"
+#include "aos/type_traits/type_traits.h"
+#include "aos/testing/test_shm.h"
+#include "aos/time/time.h"
+#include "aos/logging/logging.h"
+#include "aos/die.h"
+#include "aos/util/thread.h"
+#include "aos/util/options.h"
+#include "aos/util/death_test_log_implementation.h"
+#include "aos/testing/prevent_exit.h"
+
+using ::testing::AssertionResult;
+using ::testing::AssertionSuccess;
+using ::testing::AssertionFailure;
+
+namespace aos {
+namespace testing {
+
+namespace chrono = ::std::chrono;
+namespace this_thread = ::std::this_thread;
+
+// The same constant from queue.cc. This will have to be updated if that one is.
+const int kExtraMessages = 20;
+
+class RawQueueTest : public ::testing::Test {
+ protected:
+  static const size_t kFailureSize = 400;
+  static char *fatal_failure;
+ private:
+  enum class ResultType : uint8_t {
+    NotCalled,
+    Called,
+    Returned,
+  };
+  const std::string ResultTypeString(volatile const ResultType &result) {
+    switch (result) {
+      case ResultType::Returned:
+        return "Returned";
+      case ResultType::Called:
+        return "Called";
+      case ResultType::NotCalled:
+        return "NotCalled";
+      default:
+        return std::string("unknown(") +
+               ::std::to_string(static_cast<uint8_t>(result)) + ")";
+    }
+  }
+  static_assert(aos::shm_ok<ResultType>::value,
+                "this will get put in shared memory");
+  template<typename T>
+  struct FunctionToCall {
+    FunctionToCall() : result(ResultType::NotCalled), started() {
+    }
+
+    volatile ResultType result;
+    bool expected;
+    void (*function)(T*, char*);
+    T *arg;
+    volatile char failure[kFailureSize];
+    aos_futex started;
+  };
+  template<typename T>
+  static void Hangs_(FunctionToCall<T> *const to_call) {
+    this_thread::sleep_for(chrono::milliseconds(10));
+    ASSERT_EQ(1, futex_set(&to_call->started));
+    to_call->result = ResultType::Called;
+    to_call->function(to_call->arg, const_cast<char *>(to_call->failure));
+    to_call->result = ResultType::Returned;
+  }
+
+  // How long until a function is considered to have hung.
+  static constexpr chrono::nanoseconds kHangTime = chrono::milliseconds(90);
+  // How long to sleep after forking (for debugging).
+  static constexpr chrono::nanoseconds kForkSleep = chrono::milliseconds(0);
+
+  // Represents a process that has been forked off. The destructor kills the
+  // process and wait(2)s for it.
+  class ForkedProcess {
+   public:
+    ForkedProcess(pid_t pid, aos_futex *done)
+        : pid_(pid), done_(done), exiting_(false) {};
+    ~ForkedProcess() {
+      if (!exiting_) {
+        if (kill(pid_, SIGTERM) == -1) {
+          if (errno == ESRCH) {
+            printf("process %jd was already dead\n",
+                   static_cast<intmax_t>(pid_));
+          } else {
+            PLOG(FATAL, "kill(SIGKILL, %jd) failed",
+                 static_cast<intmax_t>(pid_));
+          }
+        }
+      }
+      const pid_t ret = wait(NULL);
+      if (ret == -1) {
+        LOG(WARNING, "wait(NULL) failed."
+            " child %jd might still be alive\n",
+            static_cast<intmax_t>(pid_));
+      } else if (ret == 0) {
+        LOG(WARNING, "child %jd wasn't waitable. it might still be alive\n",
+            static_cast<intmax_t>(pid_));
+      } else if (ret != pid_) {
+        LOG(WARNING, "child %d is now confirmed dead"
+            ", but child %jd might still be alive\n",
+            ret, static_cast<intmax_t>(pid_));
+      }
+    }
+
+    enum class JoinResult {
+      Finished, Hung, Error
+    };
+    JoinResult Join(chrono::nanoseconds timeout = kHangTime) {
+      struct timespec done_timeout;
+      {
+        auto full_timeout = kForkSleep + timeout;
+        ::std::chrono::seconds sec =
+            ::std::chrono::duration_cast<::std::chrono::seconds>(full_timeout);
+        ::std::chrono::nanoseconds nsec =
+            ::std::chrono::duration_cast<::std::chrono::nanoseconds>(
+                full_timeout - sec);
+        done_timeout.tv_sec = sec.count();
+        done_timeout.tv_nsec = nsec.count();
+      }
+
+      switch (futex_wait_timeout(done_, &done_timeout)) {
+        case 2:
+          return JoinResult::Hung;
+        case 0:
+          exiting_ = true;
+          return JoinResult::Finished;
+        default:
+          return JoinResult::Error;
+      }
+    }
+
+   private:
+    const pid_t pid_;
+    aos_futex *const done_;
+    // True iff we know that the process is already exiting.
+    bool exiting_;
+  } __attribute__((unused));
+
+  // State for HangsFork and HangsCheck.
+  typedef uint8_t ChildID;
+  static void ReapExitHandler() {
+    for (auto it = children_.begin(); it != children_.end(); ++it) {
+      delete it->second;
+    }
+  }
+  static std::map<ChildID, ForkedProcess *> children_;
+  std::map<ChildID, FunctionToCall<void> *> to_calls_;
+
+  void SetUp() override {
+    ::testing::Test::SetUp();
+
+    SetDieTestMode(true);
+
+    fatal_failure = static_cast<char *>(shm_malloc(sizeof(fatal_failure)));
+    static bool registered = false;
+    if (!registered) {
+      atexit(ReapExitHandler);
+      registered = true;
+    }
+  }
+
+ protected:
+  // function gets called with arg in a forked process.
+  // Leaks shared memory.
+  template<typename T> __attribute__((warn_unused_result))
+  std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
+    aos_futex *done = static_cast<aos_futex *>(shm_malloc_aligned(
+            sizeof(*done), alignof(aos_futex)));
+    *done = 0;
+    const pid_t pid = fork();
+    switch (pid) {
+      case 0:  // child
+        if (kForkSleep != chrono::milliseconds(0)) {
+          LOG(INFO, "pid %jd sleeping for %" PRId64 "ns\n",
+              static_cast<intmax_t>(getpid()), kForkSleep.count());
+          this_thread::sleep_for(kForkSleep);
+        }
+        ::aos::testing::PreventExit();
+        function(arg);
+        CHECK_NE(-1, futex_set(done));
+        exit(EXIT_SUCCESS);
+      case -1:  // parent failure
+        PLOG(ERROR, "fork() failed");
+        return std::unique_ptr<ForkedProcess>();
+      default:  // parent
+        return std::unique_ptr<ForkedProcess>(new ForkedProcess(pid, done));
+    }
+  }
+
+  // Checks whether or not the given function hangs.
+  // expected is whether to return success or failure if the function hangs
+  // NOTE: There are other reasons for it to return a failure than the function
+  // doing the wrong thing.
+  // Leaks shared memory.
+  template<typename T>
+  AssertionResult Hangs(void (*function)(T*, char*), T *arg, bool expected) {
+    AssertionResult fork_result(HangsFork<T>(function, arg, expected, 0));
+    if (!fork_result) {
+      return fork_result;
+    }
+    return HangsCheck(0);
+  }
+  // Starts the first part of Hangs.
+  // Use HangsCheck to get the result.
+  // Returns whether the fork succeeded or not, NOT whether or not the hang
+  // check succeeded.
+  template<typename T>
+  AssertionResult HangsFork(void (*function)(T*, char *), T *arg,
+                            bool expected, ChildID id) {
+    static_assert(aos::shm_ok<FunctionToCall<T>>::value,
+                  "this is going into shared memory");
+    FunctionToCall<T> *const to_call =
+        static_cast<FunctionToCall<T> *>(
+            shm_malloc_aligned(sizeof(*to_call), alignof(FunctionToCall<T>)));
+    new (to_call) FunctionToCall<T>();
+    to_call->function = function;
+    to_call->arg = arg;
+    to_call->expected = expected;
+    to_call->failure[0] = '\0';
+    static_cast<char *>(fatal_failure)[0] = '\0';
+    children_[id] = ForkExecute(Hangs_, to_call).release();
+    if (!children_[id]) return AssertionFailure() << "ForkExecute failed";
+    CHECK_EQ(0, futex_wait(&to_call->started));
+    to_calls_[id] = reinterpret_cast<FunctionToCall<void> *>(to_call);
+    return AssertionSuccess();
+  }
+  // Checks whether or not a function hung like it was supposed to.
+  // Use HangsFork first.
+  // NOTE: calls to HangsFork and HangsCheck with the same id argument will
+  // correspond, but they do not nest. Also, id 0 is used by Hangs.
+  // Return value is the same as Hangs.
+  AssertionResult HangsCheck(ChildID id) {
+    std::unique_ptr<ForkedProcess> child(children_[id]);
+    children_.erase(id);
+    const ForkedProcess::JoinResult result = child->Join();
+    if (to_calls_[id]->failure[0] != '\0') {
+      return AssertionFailure() << "function says: "
+          << const_cast<char *>(to_calls_[id]->failure);
+    }
+    if (result == ForkedProcess::JoinResult::Finished) {
+      return !to_calls_[id]->expected ? AssertionSuccess() : (AssertionFailure()
+          << "something happened and the the test only got to "
+          << ResultTypeString(to_calls_[id]->result));
+    } else {
+      if (to_calls_[id]->result == ResultType::Called) {
+        return to_calls_[id]->expected ? AssertionSuccess() :
+            AssertionFailure();
+      } else if (result == ForkedProcess::JoinResult::Error) {
+        return AssertionFailure() << "error joining child";
+      } else {
+        if (to_calls_[id]->result == ResultType::NotCalled) {
+          return AssertionFailure() << "stuff took too long getting started";
+        }
+        return AssertionFailure() << "something weird happened";
+      }
+    }
+  }
+#define EXPECT_HANGS(function, arg) \
+  EXPECT_HANGS_COND(function, arg, true, EXPECT_TRUE)
+#define EXPECT_RETURNS(function, arg) \
+  EXPECT_HANGS_COND(function, arg, false, EXPECT_TRUE)
+#define EXPECT_RETURNS_FAILS(function, arg) \
+  EXPECT_HANGS_COND(function, arg, false, EXPECT_FALSE)
+#define EXPECT_HANGS_COND(function, arg, hangs, cond) do { \
+  cond(Hangs(function, arg, hangs)); \
+  if (fatal_failure[0] != '\0') { \
+    FAIL() << fatal_failure; \
+  } \
+} while (false)
+
+  struct TestMessage {
+    // Some contents because we don't really want to test empty messages.
+    int16_t data;
+  };
+  struct MessageArgs {
+    RawQueue *const queue;
+    Options<RawQueue> flags;
+    int16_t data;  // -1 means NULL expected
+  };
+  static void WriteTestMessage(MessageArgs *args, char *failure) {
+    TestMessage *msg = static_cast<TestMessage *>(args->queue->GetMessage());
+    if (msg == NULL) {
+      snprintf(fatal_failure, kFailureSize,
+               "couldn't get_msg from %p", args->queue);
+      return;
+    }
+    msg->data = args->data;
+    if (!args->queue->WriteMessage(msg, args->flags)) {
+      snprintf(failure, kFailureSize, "%p->WriteMessage(%p, %x) failed",
+               args->queue, msg, args->flags.printable());
+    }
+  }
+  static void ReadTestMessage(MessageArgs *args, char *failure) {
+    const TestMessage *msg = static_cast<const TestMessage *>(
+        args->queue->ReadMessage(args->flags));
+    if (msg == NULL) {
+      if (args->data != -1) {
+        snprintf(failure, kFailureSize,
+                 "expected data of %" PRId16 " but got NULL message",
+                 args->data);
+      }
+    } else {
+      if (args->data != msg->data) {
+        snprintf(failure, kFailureSize,
+                 "expected data of %" PRId16 " but got %" PRId16 " instead",
+                 args->data, msg->data);
+      }
+      args->queue->FreeMessage(msg);
+    }
+  }
+
+  void PushMessage(RawQueue *queue, uint16_t data) {
+    TestMessage *message = static_cast<TestMessage *>(queue->GetMessage());
+    message->data = data;
+    ASSERT_TRUE(queue->WriteMessage(message, RawQueue::kOverride));
+  }
+
+ private:
+  ::aos::testing::TestSharedMemory my_shm_;
+};
+
+char *RawQueueTest::fatal_failure;
+std::map<RawQueueTest::ChildID, RawQueueTest::ForkedProcess *>
+    RawQueueTest::children_;
+constexpr chrono::nanoseconds RawQueueTest::kHangTime;
+constexpr chrono::nanoseconds RawQueueTest::kForkSleep;
+
+typedef RawQueueTest RawQueueDeathTest;
+
+TEST_F(RawQueueTest, Reading) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+  MessageArgs args{queue, RawQueue::kBlock, -1};
+
+  args.flags = RawQueue::kNonBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kNonBlock | RawQueue::kPeek;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  EXPECT_HANGS(ReadTestMessage, &args);
+  args.flags = RawQueue::kPeek | RawQueue::kBlock;
+  EXPECT_HANGS(ReadTestMessage, &args);
+  args.data = 254;
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  args.flags = RawQueue::kPeek | RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kPeek | RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kPeek | RawQueue::kNonBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  args.data = -1;
+  EXPECT_HANGS(ReadTestMessage, &args);
+  args.flags = RawQueue::kNonBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  args.data = 971;
+  EXPECT_RETURNS_FAILS(ReadTestMessage, &args);
+}
+TEST_F(RawQueueTest, Writing) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+  MessageArgs args{queue, RawQueue::kBlock, 973};
+
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  EXPECT_HANGS(WriteTestMessage, &args);
+  args.flags = RawQueue::kNonBlock;
+  EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
+  args.flags = RawQueue::kNonBlock;
+  EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
+  args.flags = RawQueue::kPeek | RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.data = 971;
+  args.flags = RawQueue::kOverride;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  args.flags = RawQueue::kOverride;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kNonBlock;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  args.flags = RawQueue::kOverride;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+}
+
+TEST_F(RawQueueTest, MultiRead) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+  MessageArgs args{queue, RawQueue::kBlock, 1323};
+
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  args.flags = RawQueue::kBlock;
+  ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
+  ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
+  AssertionResult one = HangsCheck(1);
+  AssertionResult two = HangsCheck(2);
+  EXPECT_TRUE(one != two) << "'" <<
+      one.failure_message() << "' vs '" << two.failure_message() << "'";
+  // TODO(brians) finish this
+}
+
+// There used to be a bug where reading first without an index and then with an
+// index would crash. This test makes sure that's fixed.
+TEST_F(RawQueueTest, ReadIndexAndNot) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+
+  // Write a message, read it (with ReadMessage), and then write another
+  // message (before freeing the read one so the queue allocates a distinct
+  // message to use for it).
+  TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
+  ASSERT_NE(nullptr, msg);
+  ASSERT_TRUE(queue->WriteMessage(msg, RawQueue::kBlock));
+  const void *read_msg = queue->ReadMessage(RawQueue::kBlock);
+  EXPECT_NE(nullptr, read_msg);
+  msg = static_cast<TestMessage *>(queue->GetMessage());
+  queue->FreeMessage(read_msg);
+  ASSERT_NE(nullptr, msg);
+  ASSERT_TRUE(queue->WriteMessage(msg, RawQueue::kBlock));
+
+  int index = 0;
+  const void *second_read_msg =
+      queue->ReadMessageIndex(RawQueue::kBlock, &index);
+  EXPECT_NE(nullptr, second_read_msg);
+  EXPECT_NE(read_msg, second_read_msg)
+      << "We already took that message out of the queue.";
+}
+
+TEST_F(RawQueueTest, Recycle) {
+  // TODO(brians) basic test of recycle queue
+  // include all of the ways a message can get into the recycle queue
+  RawQueue *recycle_queue = reinterpret_cast<RawQueue *>(23);
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage),
+                                          1, 2, 2, 2, &recycle_queue);
+  ASSERT_NE(reinterpret_cast<RawQueue *>(23), recycle_queue);
+  MessageArgs args{queue, RawQueue::kBlock, 973},
+      recycle{recycle_queue, RawQueue::kBlock, 973};
+
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  EXPECT_HANGS(ReadTestMessage, &recycle);
+  args.data = 254;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  EXPECT_HANGS(ReadTestMessage, &recycle);
+  args.data = 971;
+  args.flags = RawQueue::kOverride;
+  EXPECT_RETURNS(WriteTestMessage, &args);
+  recycle.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &recycle);
+
+  EXPECT_HANGS(ReadTestMessage, &recycle);
+
+  TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
+  ASSERT_TRUE(msg != NULL);
+  msg->data = 341;
+  queue->FreeMessage(msg);
+  recycle.data = 341;
+  EXPECT_RETURNS(ReadTestMessage, &recycle);
+
+  EXPECT_HANGS(ReadTestMessage, &recycle);
+
+  args.data = 254;
+  args.flags = RawQueue::kPeek | RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  recycle.flags = RawQueue::kBlock;
+  EXPECT_HANGS(ReadTestMessage, &recycle);
+  args.flags = RawQueue::kBlock;
+  EXPECT_RETURNS(ReadTestMessage, &args);
+  recycle.data = 254;
+  EXPECT_RETURNS(ReadTestMessage, &recycle);
+}
+
+// Makes sure that when a message doesn't get written with kNonBlock it does get
+// freed.
+TEST_F(RawQueueTest, NonBlockFailFree) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+
+  void *message1 = queue->GetMessage();
+  void *message2 = queue->GetMessage();
+  ASSERT_TRUE(queue->WriteMessage(message1, RawQueue::kNonBlock));
+  ASSERT_FALSE(queue->WriteMessage(message2, RawQueue::kNonBlock));
+  EXPECT_EQ(message2, queue->GetMessage());
+}
+
+// All of the tests from here down are designed to test every branch to
+// make sure it does what it's supposed to. They are generally pretty repetitive
+// and boring, and some of them may duplicate other tests above, but these ones
+// make it a lot easier to figure out what's wrong with bugs not related to race
+// conditions.
+
+TEST_F(RawQueueTest, ReadIndexNotFull) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
+  PushMessage(queue, 971);
+  EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
+
+  int index = 0;
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(971, message->data);
+  EXPECT_EQ(1, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  PushMessage(queue, 1768);
+  EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1768, message->data);
+  EXPECT_EQ(2, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  PushMessage(queue, 254);
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(254, message->data);
+  EXPECT_EQ(3, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(254, message->data);
+  EXPECT_EQ(3, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+}
+
+TEST_F(RawQueueTest, ReadIndexNotBehind) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
+  PushMessage(queue, 971);
+  EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
+  PushMessage(queue, 1768);
+  EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+
+  int index = 0;
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(971, message->data);
+  EXPECT_EQ(1, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1768, message->data);
+  EXPECT_EQ(2, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexLittleBehindNotFull) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+  int index = 0;
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1768, message->data);
+  EXPECT_EQ(2, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1768, message->data);
+  EXPECT_EQ(2, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexMoreBehind) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 254);
+
+  int index = 0;
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1768, message->data);
+  EXPECT_EQ(2, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(254, message->data);
+  EXPECT_EQ(3, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(254, message->data);
+  EXPECT_EQ(3, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexMoreBehindNotFull) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 254);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+  int index = 0;
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(254, message->data);
+  EXPECT_EQ(3, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(254, message->data);
+  EXPECT_EQ(3, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexLotBehind) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  {
+    const void *message1, *message2;
+    message1 = queue->ReadMessage(RawQueue::kNonBlock);
+    ASSERT_NE(nullptr, message1);
+    PushMessage(queue, 254);
+    message2 = queue->ReadMessage(RawQueue::kNonBlock);
+    ASSERT_NE(nullptr, message2);
+    PushMessage(queue, 973);
+    EXPECT_EQ(4, kExtraMessages + 2 - queue->FreeMessages());
+    queue->FreeMessage(message1);
+    EXPECT_EQ(3, kExtraMessages + 2 - queue->FreeMessages());
+    queue->FreeMessage(message2);
+    EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+  }
+
+  int index = 0;
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(254, message->data);
+  EXPECT_EQ(3, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(973, message->data);
+  EXPECT_EQ(4, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(973, message->data);
+  EXPECT_EQ(4, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexLotBehindNotFull) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 254);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 973);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+  int index = 0;
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(973, message->data);
+  EXPECT_EQ(4, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(973, message->data);
+  EXPECT_EQ(4, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexEvenMoreBehind) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 254);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 973);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 1114);
+
+  int index = 0;
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(973, message->data);
+  EXPECT_EQ(4, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1114, message->data);
+  EXPECT_EQ(5, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1114, message->data);
+  EXPECT_EQ(5, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, ReadIndexEvenMoreBehindNotFull) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const TestMessage *message, *peek_message;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 254);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 973);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+  PushMessage(queue, 1114);
+  ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
+
+  int index = 0;
+
+  peek_message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
+  message = static_cast<const TestMessage *>(
+      queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1114, message->data);
+  EXPECT_EQ(5, index);
+  queue->FreeMessage(message);
+  queue->FreeMessage(peek_message);
+
+  index = 0;
+  peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
+      RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
+  message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
+      RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
+  ASSERT_NE(nullptr, message);
+  EXPECT_EQ(message, peek_message);
+  EXPECT_EQ(1114, message->data);
+  EXPECT_EQ(5, index);
+  queue->FreeMessage(message);
+}
+
+TEST_F(RawQueueTest, MessageReferenceCounts) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  const void *message1, *message2;
+
+  EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
+  message1 = queue->GetMessage();
+  EXPECT_NE(nullptr, message1);
+  EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
+  message2 = queue->GetMessage();
+  EXPECT_NE(nullptr, message2);
+  EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
+  queue->FreeMessage(message1);
+  EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
+  queue->FreeMessage(message2);
+  EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
+}
+
+// Tests that writing with kNonBlock fails and frees the message.
+TEST_F(RawQueueTest, WriteDontBlock) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+  void *message;
+
+  PushMessage(queue, 971);
+  int free_before = queue->FreeMessages();
+  message = queue->GetMessage();
+  ASSERT_NE(nullptr, message);
+  EXPECT_NE(free_before, queue->FreeMessages());
+  EXPECT_FALSE(queue->WriteMessage(message, RawQueue::kNonBlock));
+  EXPECT_EQ(free_before, queue->FreeMessages());
+}
+
+// Tests that writing with kOverride pushes the last message out of the queue.
+TEST_F(RawQueueTest, WriteOverride) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+  TestMessage *message1;
+
+  PushMessage(queue, 971);
+  PushMessage(queue, 1768);
+  int free_before = queue->FreeMessages();
+  message1 = static_cast<TestMessage *>(queue->GetMessage());
+  ASSERT_NE(nullptr, message1);
+  EXPECT_NE(free_before, queue->FreeMessages());
+  message1->data = 254;
+  EXPECT_TRUE(queue->WriteMessage(message1, RawQueue::kOverride));
+  EXPECT_EQ(free_before, queue->FreeMessages());
+
+  const TestMessage *message2;
+  message2 =
+      static_cast<const TestMessage *>(queue->ReadMessage(RawQueue::kNonBlock));
+  EXPECT_EQ(1768, message2->data);
+  queue->FreeMessage(message2);
+  EXPECT_EQ(free_before + 1, queue->FreeMessages());
+  message2 =
+      static_cast<const TestMessage *>(queue->ReadMessage(RawQueue::kNonBlock));
+  EXPECT_EQ(254, message2->data);
+  queue->FreeMessage(message2);
+  EXPECT_EQ(free_before + 2, queue->FreeMessages());
+}
+
+// Makes sure that ThreadSanitizer doesn't catch any issues freeing from
+// multiple threads at once.
+TEST_F(RawQueueTest, MultiThreadedFree) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+  PushMessage(queue, 971);
+  int free_before = queue->FreeMessages();
+
+  const void *const message1 =
+      queue->ReadMessage(RawQueue::kPeek | RawQueue::kNonBlock);
+  const void *const message2 =
+      queue->ReadMessage(RawQueue::kPeek | RawQueue::kNonBlock);
+  ASSERT_NE(nullptr, message1);
+  ASSERT_NE(nullptr, message2);
+  EXPECT_EQ(free_before, queue->FreeMessages());
+  util::FunctionThread t1([message1, queue](util::Thread *) {
+    queue->FreeMessage(message1);
+  });
+  util::FunctionThread t2([message2, queue](util::Thread *) {
+    queue->FreeMessage(message2);
+  });
+  t1.Start();
+  t2.Start();
+  t1.WaitUntilDone();
+  t2.WaitUntilDone();
+  EXPECT_EQ(free_before, queue->FreeMessages());
+}
+
+TEST_F(RawQueueDeathTest, OptionsValidation) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", 1, 1, 1);
+
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->WriteMessage(nullptr, RawQueue::kPeek);
+      },
+      ".*illegal write option.*");
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->WriteMessage(nullptr, RawQueue::kFromEnd);
+      },
+      ".*illegal write option.*");
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->WriteMessage(nullptr, RawQueue::kPeek | RawQueue::kFromEnd);
+      },
+      ".*illegal write option.*");
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->WriteMessage(nullptr, RawQueue::kNonBlock | RawQueue::kBlock);
+      },
+      ".*invalid write option.*");
+
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->ReadMessageIndex(
+            RawQueue::kBlock | RawQueue::kFromEnd | RawQueue::kPeek, nullptr);
+      },
+      ".*ReadMessageIndex.*is not allowed.*");
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->ReadMessageIndex(RawQueue::kOverride, nullptr);
+      },
+      ".*illegal read option.*");
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->ReadMessageIndex(RawQueue::kOverride | RawQueue::kBlock,
+                                nullptr);
+      },
+      ".*illegal read option.*");
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        queue->ReadMessage(RawQueue::kNonBlock | RawQueue::kBlock);
+      },
+      ".*invalid read option.*");
+}
+
+}  // namespace testing
+}  // namespace aos