Convert aos over to flatbuffers

Everything builds, and all the tests pass.  I suspect that some entries
are missing from the config files, but those will be found pretty
quickly on startup.

There is no logging or live introspection of queue messages.

Change-Id: I496ee01ed68f202c7851bed7e8786cee30df29f5
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
new file mode 100644
index 0000000..5a88717
--- /dev/null
+++ b/aos/events/shm_event_loop.cc
@@ -0,0 +1,611 @@
+#include "glog/logging.h"
+
+#include "aos/events/shm_event_loop.h"
+
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <sys/timerfd.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <algorithm>
+#include <atomic>
+#include <chrono>
+#include <stdexcept>
+
+#include "aos/events/epoll.h"
+#include "aos/ipc_lib/lockless_queue.h"
+#include "aos/realtime.h"
+#include "aos/util/phased_loop.h"
+
+DEFINE_string(shm_base, "/dev/shm/aos",
+              "Directory to place queue backing mmaped files in.");
+DEFINE_uint32(permissions, 0770,
+              "Permissions to make shared memory files and folders.");
+
+namespace aos {
+
+std::string ShmFolder(const Channel *channel) {
+  CHECK(channel->has_name());
+  CHECK_EQ(channel->name()->string_view()[0], '/');
+  return FLAGS_shm_base + channel->name()->str() + "/";
+}
+std::string ShmPath(const Channel *channel) {
+  CHECK(channel->has_type());
+  return ShmFolder(channel) + channel->type()->str() + ".v0";
+}
+
+class MMapedQueue {
+ public:
+  MMapedQueue(const Channel *channel) {
+    std::string path = ShmPath(channel);
+
+    // TODO(austin): Pull these out into the config if there is a need.
+    config_.num_watchers = 10;
+    config_.num_senders = 10;
+    config_.queue_size = 2 * channel->frequency();
+    config_.message_data_size = channel->max_size();
+
+    size_ = ipc_lib::LocklessQueueMemorySize(config_);
+
+    MkdirP(path);
+
+    // There are 2 cases.  Either the file already exists, or it does not
+    // already exist and we need to create it.  Start by trying to create it. If
+    // that fails, the file has already been created and we can open it
+    // normally..  Once the file has been created it wil never be deleted.
+    fd_ = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
+               O_CLOEXEC | FLAGS_permissions);
+    if (fd_ == -1 && errno == EEXIST) {
+      VLOG(1) << path << " already created.";
+      // File already exists.
+      fd_ = open(path.c_str(), O_RDWR, O_CLOEXEC);
+      PCHECK(fd_ != -1) << ": Failed to open " << path;
+      while (true) {
+        struct stat st;
+        PCHECK(fstat(fd_, &st) == 0);
+        if (st.st_size != 0) {
+          CHECK_EQ(static_cast<size_t>(st.st_size), size_)
+              << ": Size of " << path
+              << " doesn't match expected size of backing queue file.  Did the "
+                 "queue definition change?";
+          break;
+        } else {
+          // The creating process didn't get around to it yet.  Give it a bit.
+          std::this_thread::sleep_for(std::chrono::milliseconds(10));
+          VLOG(1) << path << " is zero size, waiting";
+        }
+      }
+    } else {
+      VLOG(1) << "Created " << path;
+      PCHECK(fd_ != -1) << ": Failed to open " << path;
+      PCHECK(ftruncate(fd_, size_) == 0);
+    }
+
+    data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
+    PCHECK(data_ != MAP_FAILED);
+
+    ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
+  }
+
+  ~MMapedQueue() {
+    PCHECK(munmap(data_, size_) == 0);
+    PCHECK(close(fd_) == 0);
+  }
+
+  ipc_lib::LocklessQueueMemory *memory() const {
+    return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
+  }
+
+  const ipc_lib::LocklessQueueConfiguration &config() const {
+    return config_;
+  }
+
+ private:
+  void MkdirP(absl::string_view path) {
+    struct stat st;
+    auto last_slash_pos = path.find_last_of("/");
+
+    std::string folder(last_slash_pos == absl::string_view::npos
+                           ? absl::string_view("")
+                           : path.substr(0, last_slash_pos));
+    if (stat(folder.c_str(), &st) == -1) {
+      PCHECK(errno == ENOENT);
+      CHECK_NE(folder, "") << ": Base path doesn't exist";
+      MkdirP(folder);
+      VLOG(1) << "Creating " << folder;
+      PCHECK(mkdir(folder.c_str(), FLAGS_permissions) == 0);
+    }
+  }
+
+  ipc_lib::LocklessQueueConfiguration config_;
+
+  int fd_;
+
+  size_t size_;
+  void *data_;
+};
+
+// Returns the portion of the path after the last /.
+absl::string_view Filename(absl::string_view path) {
+  auto last_slash_pos = path.find_last_of("/");
+
+  return last_slash_pos == absl::string_view::npos
+             ? path
+             : path.substr(last_slash_pos + 1, path.size());
+}
+
+ShmEventLoop::ShmEventLoop(const Configuration *configuration)
+    : EventLoop(configuration), name_(Filename(program_invocation_name)) {}
+
+namespace {
+
+namespace chrono = ::std::chrono;
+
+class ShmFetcher : public RawFetcher {
+ public:
+  explicit ShmFetcher(const Channel *channel)
+      : lockless_queue_memory_(channel),
+        lockless_queue_(lockless_queue_memory_.memory(),
+                        lockless_queue_memory_.config()),
+        data_storage_(static_cast<AlignedChar *>(aligned_alloc(
+                          alignof(AlignedChar), channel->max_size())),
+                      &free) {
+    context_.data = nullptr;
+    // Point the queue index at the next index to read starting now.  This
+    // makes it such that FetchNext will read the next message sent after
+    // the fetcher is created.
+    PointAtNextQueueIndex();
+  }
+
+  ~ShmFetcher() { data_ = nullptr; }
+
+  // Points the next message to fetch at the queue index which will be
+  // populated next.
+  void PointAtNextQueueIndex() {
+    actual_queue_index_ = lockless_queue_.LatestQueueIndex();
+    if (!actual_queue_index_.valid()) {
+      // Nothing in the queue.  The next element will show up at the 0th
+      // index in the queue.
+      actual_queue_index_ =
+          ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
+    } else {
+      actual_queue_index_ = actual_queue_index_.Increment();
+    }
+  }
+
+  bool FetchNext() override {
+    // TODO(austin): Write a test which starts with nothing in the queue,
+    // and then calls FetchNext() after something is sent.
+    // TODO(austin): Get behind and make sure it dies both here and with
+    // Fetch.
+    ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
+        actual_queue_index_.index(), &context_.monotonic_sent_time,
+        &context_.realtime_sent_time, &context_.size,
+        reinterpret_cast<char *>(data_storage_.get()));
+    if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+      context_.queue_index = actual_queue_index_.index();
+      data_ = reinterpret_cast<char *>(data_storage_.get()) +
+              lockless_queue_.message_data_size() - context_.size;
+      context_.data = data_;
+      actual_queue_index_ = actual_queue_index_.Increment();
+    }
+
+    // Make sure the data wasn't modified while we were reading it.  This
+    // can only happen if you are reading the last message *while* it is
+    // being written to, which means you are pretty far behind.
+    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+        << ": Got behind while reading and the last message was modified "
+           "out "
+           "from under us while we were reading it.  Don't get so far "
+           "behind.";
+
+    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
+        << ": The next message is no longer available.";
+    return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+  }
+
+  bool Fetch() override {
+    const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
+    // actual_queue_index_ is only meaningful if it was set by Fetch or
+    // FetchNext.  This happens when valid_data_ has been set.  So, only
+    // skip checking if valid_data_ is true.
+    //
+    // Also, if the latest queue index is invalid, we are empty.  So there
+    // is nothing to fetch.
+    if ((data_ != nullptr &&
+         queue_index == actual_queue_index_.DecrementBy(1u)) ||
+        !queue_index.valid()) {
+      return false;
+    }
+
+    ipc_lib::LocklessQueue::ReadResult read_result =
+        lockless_queue_.Read(queue_index.index(), &context_.monotonic_sent_time,
+                             &context_.realtime_sent_time, &context_.size,
+                             reinterpret_cast<char *>(data_storage_.get()));
+    if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+      context_.queue_index = queue_index.index();
+      data_ = reinterpret_cast<char *>(data_storage_.get()) +
+              lockless_queue_.message_data_size() - context_.size;
+      context_.data = data_;
+      actual_queue_index_ = queue_index.Increment();
+    }
+
+    // Make sure the data wasn't modified while we were reading it.  This
+    // can only happen if you are reading the last message *while* it is
+    // being written to, which means you are pretty far behind.
+    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+        << ": Got behind while reading and the last message was modified "
+           "out "
+           "from under us while we were reading it.  Don't get so far "
+           "behind.";
+
+    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
+        << ": Queue index went backwards.  This should never happen.";
+
+    // We fell behind between when we read the index and read the value.
+    // This isn't worth recovering from since this means we went to sleep
+    // for a long time in the middle of this function.
+    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
+        << ": The next message is no longer available.";
+    return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+  }
+
+  bool RegisterWakeup(int priority) {
+    return lockless_queue_.RegisterWakeup(priority);
+  }
+
+  void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
+
+ private:
+  MMapedQueue lockless_queue_memory_;
+  ipc_lib::LocklessQueue lockless_queue_;
+
+  ipc_lib::QueueIndex actual_queue_index_ =
+      ipc_lib::LocklessQueue::empty_queue_index();
+
+  struct AlignedChar {
+    alignas(32) char data;
+  };
+
+  std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
+};
+
+class ShmSender : public RawSender {
+ public:
+  explicit ShmSender(const Channel *channel, const ShmEventLoop *shm_event_loop)
+      : RawSender(),
+        shm_event_loop_(shm_event_loop),
+        name_(channel->name()->str()),
+        lockless_queue_memory_(channel),
+        lockless_queue_(lockless_queue_memory_.memory(),
+                        lockless_queue_memory_.config()),
+        lockless_queue_sender_(lockless_queue_.MakeSender()) {}
+
+  void *data() override { return lockless_queue_sender_.Data(); }
+  size_t size() override { return lockless_queue_sender_.size(); }
+  bool Send(size_t size) override {
+    lockless_queue_sender_.Send(size);
+    lockless_queue_.Wakeup(shm_event_loop_->priority());
+    return true;
+  }
+
+  bool Send(void *msg, size_t length) override {
+    lockless_queue_sender_.Send(reinterpret_cast<char *>(msg), length);
+    lockless_queue_.Wakeup(shm_event_loop_->priority());
+    // TODO(austin): Return an error if we send too fast.
+    return true;
+  }
+
+  const absl::string_view name() const override { return name_; }
+
+ private:
+  const ShmEventLoop *shm_event_loop_;
+  std::string name_;
+  MMapedQueue lockless_queue_memory_;
+  ipc_lib::LocklessQueue lockless_queue_;
+  ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
+};
+
+}  // namespace
+
+namespace internal {
+
+// Class to manage the state for a Watcher.
+class WatcherState {
+ public:
+  WatcherState(
+      const Channel *channel,
+      std::function<void(const Context &context, const void *message)> watcher)
+      : shm_fetcher_(channel), watcher_(watcher) {}
+
+  ~WatcherState() {}
+
+  // Points the next message to fetch at the queue index which will be populated
+  // next.
+  void PointAtNextQueueIndex() { shm_fetcher_.PointAtNextQueueIndex(); }
+
+  // Returns true if there is new data available.
+  bool HasNewData() {
+    if (!has_new_data_) {
+      has_new_data_ = shm_fetcher_.FetchNext();
+    }
+
+    return has_new_data_;
+  }
+
+  // Returns the time of the current data sample.
+  aos::monotonic_clock::time_point event_time() const {
+    return shm_fetcher_.context().monotonic_sent_time;
+  }
+
+  // Consumes the data by calling the callback.
+  void CallCallback() {
+    CHECK(has_new_data_);
+    watcher_(shm_fetcher_.context(), shm_fetcher_.most_recent_data());
+    has_new_data_ = false;
+  }
+
+  // Starts the thread and waits until it is running.
+  bool RegisterWakeup(int priority) {
+    return shm_fetcher_.RegisterWakeup(priority);
+  }
+
+  void UnregisterWakeup() { return shm_fetcher_.UnregisterWakeup(); }
+
+ private:
+  bool has_new_data_ = false;
+
+  ShmFetcher shm_fetcher_;
+
+  std::function<void(const Context &context, const void *message)> watcher_;
+};
+
+// Adapter class to adapt a timerfd to a TimerHandler.
+// The part of the API which is accessed by the TimerHandler interface needs to
+// be threadsafe.  This means Setup and Disable.
+class TimerHandlerState : public TimerHandler {
+ public:
+  TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
+      : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
+    shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
+      timerfd_.Read();
+      fn_();
+    });
+  }
+
+  ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
+
+  void Setup(monotonic_clock::time_point base,
+             monotonic_clock::duration repeat_offset) override {
+    // SetTime is threadsafe already.
+    timerfd_.SetTime(base, repeat_offset);
+  }
+
+  void Disable() override {
+    // Disable is also threadsafe already.
+    timerfd_.Disable();
+  }
+
+ private:
+  ShmEventLoop *shm_event_loop_;
+
+  TimerFd timerfd_;
+
+  // Function to be run on the thread
+  ::std::function<void()> fn_;
+};
+
+// Adapter class to the timerfd and PhasedLoop.
+// The part of the API which is accessed by the PhasedLoopHandler interface
+// needs to be threadsafe.  This means set_interval_and_offset
+class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
+ public:
+  PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
+                    const monotonic_clock::duration interval,
+                    const monotonic_clock::duration offset)
+      : shm_event_loop_(shm_event_loop),
+        phased_loop_(interval, shm_event_loop_->monotonic_now(), offset),
+        fn_(::std::move(fn)) {
+    shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
+      timerfd_.Read();
+      // Call the function.  To avoid needing a recursive mutex, drop the lock
+      // before running the function.
+      fn_(cycles_elapsed_);
+      Reschedule();
+    });
+  }
+
+  ~PhasedLoopHandler() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
+
+  void set_interval_and_offset(
+      const monotonic_clock::duration interval,
+      const monotonic_clock::duration offset) override {
+    phased_loop_.set_interval_and_offset(interval, offset);
+  }
+
+  void Startup() {
+    phased_loop_.Reset(shm_event_loop_->monotonic_now());
+    Reschedule();
+  }
+
+ private:
+  // Reschedules the timer.  Must be called with the mutex held.
+  void Reschedule() {
+    cycles_elapsed_ = phased_loop_.Iterate(shm_event_loop_->monotonic_now());
+    timerfd_.SetTime(phased_loop_.sleep_time(), ::aos::monotonic_clock::zero());
+  }
+
+  ShmEventLoop *shm_event_loop_;
+
+  TimerFd timerfd_;
+  time::PhasedLoop phased_loop_;
+
+  int cycles_elapsed_ = 1;
+
+  // Function to be run
+  const ::std::function<void(int)> fn_;
+};
+}  // namespace internal
+
+::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
+    const Channel *channel) {
+  return ::std::unique_ptr<RawFetcher>(new ShmFetcher(channel));
+}
+
+::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
+    const Channel *channel) {
+  Take(channel);
+  return ::std::unique_ptr<RawSender>(new ShmSender(channel, this));
+}
+
+void ShmEventLoop::MakeRawWatcher(
+    const Channel *channel,
+    std::function<void(const Context &context, const void *message)> watcher) {
+  Take(channel);
+
+  ::std::unique_ptr<internal::WatcherState> state(
+      new internal::WatcherState(
+      channel, std::move(watcher)));
+  watchers_.push_back(::std::move(state));
+}
+
+TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
+  ::std::unique_ptr<internal::TimerHandlerState> timer(
+      new internal::TimerHandlerState(this, ::std::move(callback)));
+
+  timers_.push_back(::std::move(timer));
+
+  return timers_.back().get();
+}
+
+PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
+    ::std::function<void(int)> callback,
+    const monotonic_clock::duration interval,
+    const monotonic_clock::duration offset) {
+  ::std::unique_ptr<internal::PhasedLoopHandler> phased_loop(
+      new internal::PhasedLoopHandler(this, ::std::move(callback), interval,
+                                      offset));
+
+  phased_loops_.push_back(::std::move(phased_loop));
+
+  return phased_loops_.back().get();
+}
+
+void ShmEventLoop::OnRun(::std::function<void()> on_run) {
+  on_run_.push_back(::std::move(on_run));
+}
+
+void ShmEventLoop::Run() {
+  std::unique_ptr<ipc_lib::SignalFd> signalfd;
+
+  if (watchers_.size() > 0) {
+    signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
+
+    epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() {
+      signalfd_siginfo result = signalfd_ptr->Read();
+      CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
+
+      // TODO(austin): We should really be checking *everything*, not just
+      // watchers, and calling the oldest thing first.  That will improve
+      // determinism a lot.
+
+      while (true) {
+        // Call the handlers in time order of their messages.
+        aos::monotonic_clock::time_point min_event_time =
+            aos::monotonic_clock::max_time;
+        size_t min_watcher_index = -1;
+        size_t watcher_index = 0;
+        for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
+          if (watcher->HasNewData()) {
+            if (watcher->event_time() < min_event_time) {
+              min_watcher_index = watcher_index;
+              min_event_time = watcher->event_time();
+            }
+          }
+          ++watcher_index;
+        }
+
+        if (min_event_time == aos::monotonic_clock::max_time) {
+          break;
+        }
+
+        watchers_[min_watcher_index]->CallCallback();
+      }
+    });
+  }
+
+  // Now, all the threads are up.  Lock everything into memory and go RT.
+  if (priority_ != 0) {
+    ::aos::InitRT();
+
+    LOG(INFO) << "Setting priority to " << priority_;
+    ::aos::SetCurrentThreadRealtimePriority(priority_);
+  }
+
+  set_is_running(true);
+
+  // Now that we are realtime (but before the OnRun handlers run), snap the
+  // queue index.
+  for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
+    watcher->PointAtNextQueueIndex();
+    CHECK(watcher->RegisterWakeup(priority_));
+  }
+
+  // Now that we are RT, run all the OnRun handlers.
+  for (const auto &run : on_run_) {
+    run();
+  }
+
+  // Start up all the phased loops.
+  for (::std::unique_ptr<internal::PhasedLoopHandler> &phased_loop :
+       phased_loops_) {
+    phased_loop->Startup();
+  }
+
+  // And start our main event loop which runs all the timers and handles Quit.
+  epoll_.Run();
+
+  // Once epoll exits, there is no useful nonrt work left to do.
+  set_is_running(false);
+
+  // Nothing time or synchronization critical needs to happen after this point.
+  // Drop RT priority.
+  ::aos::UnsetCurrentThreadRealtimePriority();
+
+  for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
+    watcher->UnregisterWakeup();
+  }
+
+  if (watchers_.size() > 0) {
+    epoll_.DeleteFd(signalfd->fd());
+    signalfd.reset();
+  }
+}
+
+void ShmEventLoop::Exit() { epoll_.Quit(); }
+
+ShmEventLoop::~ShmEventLoop() {
+  CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
+}
+
+void ShmEventLoop::Take(const Channel *channel) {
+  CHECK(!is_running()) << ": Cannot add new objects while running.";
+
+  // Cheat aggresively.  Use the shared memory path as a proxy for a unique
+  // identifier for the channel.
+  const std::string path = ShmPath(channel);
+
+  const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
+  CHECK(prior == taken_.end()) << ": " << path << " is already being used.";
+
+  taken_.emplace_back(path);
+}
+
+void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
+  if (is_running()) {
+    LOG(FATAL) << "Cannot set realtime priority while running.";
+  }
+  priority_ = priority;
+}
+
+}  // namespace aos