Merge changes If2fc227f,Ib83228bd

* changes:
  Fix race in localizer_test
  Add logger and log reader classes.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 53b3b05..3377d67 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -209,3 +209,40 @@
         "@com_google_absl//absl/container:btree",
     ],
 )
+
+flatbuffer_cc_library(
+    name = "logger_fbs",
+    srcs = ["logger.fbs"],
+    gen_reflections = 1,
+    includes = [
+        "//aos:configuration_fbs_includes",
+    ],
+)
+
+cc_library(
+    name = "logger",
+    srcs = ["logger.cc"],
+    hdrs = ["logger.h"],
+    deps = [
+        ":event_loop",
+        ":logger_fbs",
+        "//aos:flatbuffer_merge",
+        "//aos/time",
+        "@com_github_google_flatbuffers//:flatbuffers",
+        "@com_google_absl//absl/container:inlined_vector",
+        "@com_google_absl//absl/strings",
+    ],
+)
+
+cc_test(
+    name = "logger_test",
+    srcs = ["logger_test.cc"],
+    data = ["pingpong_config.json"],
+    deps = [
+        ":logger",
+        ":ping_lib",
+        ":pong_lib",
+        ":simulated_event_loop",
+        "//aos/testing:googletest",
+    ],
+)
diff --git a/aos/events/logger.cc b/aos/events/logger.cc
new file mode 100644
index 0000000..6b659c0
--- /dev/null
+++ b/aos/events/logger.cc
@@ -0,0 +1,498 @@
+#include "aos/events/logger.h"
+
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "aos/events/event_loop.h"
+#include "aos/events/logger_generated.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/time/time.h"
+#include "flatbuffers/flatbuffers.h"
+
+DEFINE_int32(flush_size, 1000000,
+             "Number of outstanding bytes to allow before flushing to disk.");
+
+namespace aos {
+namespace logger {
+
+namespace chrono = std::chrono;
+
+DetachedBufferWriter::DetachedBufferWriter(absl::string_view filename)
+    : fd_(open(std::string(filename).c_str(),
+               O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
+  PCHECK(fd_ != -1) << ": Failed to open " << filename;
+}
+
+DetachedBufferWriter::~DetachedBufferWriter() {
+  Flush();
+  PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
+}
+
+void DetachedBufferWriter::QueueSizedFlatbuffer(
+    flatbuffers::FlatBufferBuilder *fbb) {
+  QueueSizedFlatbuffer(fbb->Release());
+}
+
+void DetachedBufferWriter::QueueSizedFlatbuffer(
+    flatbuffers::DetachedBuffer &&buffer) {
+  queued_size_ += buffer.size();
+  queue_.emplace_back(std::move(buffer));
+
+  if (queued_size_ > static_cast<size_t>(FLAGS_flush_size)) {
+    Flush();
+  }
+}
+
+void DetachedBufferWriter::Flush() {
+  if (queue_.size() == 0u) {
+    return;
+  }
+  iovec_.clear();
+  iovec_.reserve(queue_.size());
+  size_t counted_size = 0;
+  for (size_t i = 0; i < queue_.size(); ++i) {
+    struct iovec n;
+    n.iov_base = queue_[i].data();
+    n.iov_len = queue_[i].size();
+    counted_size += n.iov_len;
+    iovec_.emplace_back(std::move(n));
+  }
+  CHECK_EQ(counted_size, queued_size_);
+  const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
+
+  PCHECK(written == static_cast<ssize_t>(queued_size_))
+      << ": Wrote " << written << " expected " << queued_size_;
+
+  queued_size_ = 0;
+  queue_.clear();
+  // TODO(austin): Handle partial writes in some way other than crashing...
+}
+
+Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
+               std::chrono::milliseconds polling_period)
+    : event_loop_(event_loop),
+      writer_(writer),
+      timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
+      polling_period_(polling_period) {
+  for (const Channel *channel : *event_loop_->configuration()->channels()) {
+    FetcherStruct fs;
+    fs.fetcher = event_loop->MakeRawFetcher(channel);
+    fs.written = false;
+    fetchers_.emplace_back(std::move(fs));
+  }
+
+  // When things start, we want to log the header, then the most recent messages
+  // available on each fetcher to capture the previous state, then start
+  // polling.
+  event_loop_->OnRun([this, polling_period]() {
+    // Grab data from each channel right before we declare the log file started
+    // so we can capture the latest message on each channel.  This lets us have
+    // non periodic messages with configuration that now get logged.
+    for (FetcherStruct &f : fetchers_) {
+      f.written = !f.fetcher->Fetch();
+    }
+
+    // We need to pick a point in time to declare the log file "started".  This
+    // starts here.  It needs to be after everything is fetched so that the
+    // fetchers are all pointed at the most recent message before the start
+    // time.
+    const monotonic_clock::time_point monotonic_now =
+        event_loop_->monotonic_now();
+    const realtime_clock::time_point realtime_now = event_loop_->realtime_now();
+    last_synchronized_time_ = monotonic_now;
+
+    {
+      // Now write the header with this timestamp in it.
+      flatbuffers::FlatBufferBuilder fbb;
+      fbb.ForceDefaults(1);
+
+      flatbuffers::Offset<aos::Configuration> configuration_offset =
+          CopyFlatBuffer(event_loop_->configuration(), &fbb);
+
+      aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
+
+      log_file_header_builder.add_configuration(configuration_offset);
+      // The worst case theoretical out of order is the polling period times 2.
+      // One message could get logged right after the boundary, but be for right
+      // before the next boundary.  And the reverse could happen for another
+      // message.  Report back 3x to be extra safe, and because the cost isn't
+      // huge on the read side.
+      log_file_header_builder.add_max_out_of_order_duration(
+          std::chrono::duration_cast<std::chrono::nanoseconds>(3 *
+                                                               polling_period)
+              .count());
+
+      log_file_header_builder.add_monotonic_sent_time(
+          std::chrono::duration_cast<std::chrono::nanoseconds>(
+              monotonic_now.time_since_epoch())
+              .count());
+      log_file_header_builder.add_realtime_sent_time(
+          std::chrono::duration_cast<std::chrono::nanoseconds>(
+              realtime_now.time_since_epoch())
+              .count());
+
+      fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+      writer_->QueueSizedFlatbuffer(&fbb);
+    }
+
+    timer_handler_->Setup(event_loop_->monotonic_now() + polling_period,
+                          polling_period);
+  });
+}
+
+void Logger::DoLogData() {
+  // We want to guarentee that messages aren't out of order by more than
+  // max_out_of_order_duration.  To do this, we need sync points.  Every write
+  // cycle should be a sync point.
+  const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+
+  do {
+    // Move the sync point up by at most polling_period.  This forces one sync
+    // per iteration, even if it is small.
+    last_synchronized_time_ =
+        std::min(last_synchronized_time_ + polling_period_, monotonic_now);
+    size_t channel_index = 0;
+    // Write each channel to disk, one at a time.
+    for (FetcherStruct &f : fetchers_) {
+      while (true) {
+        if (f.fetcher.get() == nullptr) {
+          if (!f.fetcher->FetchNext()) {
+            VLOG(1) << "No new data on "
+                    << FlatbufferToJson(f.fetcher->channel());
+            break;
+          } else {
+            f.written = false;
+          }
+        }
+
+        if (f.written) {
+          if (!f.fetcher->FetchNext()) {
+            VLOG(1) << "No new data on "
+                    << FlatbufferToJson(f.fetcher->channel());
+            break;
+          } else {
+            f.written = false;
+          }
+        }
+
+        if (!f.written && f.fetcher->context().monotonic_sent_time <
+                              last_synchronized_time_) {
+          // Write!
+          flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+                                             max_header_size_);
+          fbb.ForceDefaults(1);
+
+          flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
+              fbb.CreateVector(
+                  static_cast<uint8_t *>(f.fetcher->context().data),
+                  f.fetcher->context().size);
+
+          VLOG(1) << "Writing data for channel "
+                  << FlatbufferToJson(f.fetcher->channel());
+
+          MessageHeader::Builder message_header_builder(fbb);
+          message_header_builder.add_channel_index(channel_index);
+          message_header_builder.add_monotonic_sent_time(
+              f.fetcher->context()
+                  .monotonic_sent_time.time_since_epoch()
+                  .count());
+          message_header_builder.add_realtime_sent_time(
+              f.fetcher->context()
+                  .realtime_sent_time.time_since_epoch()
+                  .count());
+
+          message_header_builder.add_queue_index(
+              f.fetcher->context().queue_index);
+
+          message_header_builder.add_data(data_offset);
+
+          fbb.FinishSizePrefixed(message_header_builder.Finish());
+          max_header_size_ = std::max(
+              max_header_size_, fbb.GetSize() - f.fetcher->context().size);
+          writer_->QueueSizedFlatbuffer(&fbb);
+
+          f.written = true;
+        }
+      }
+
+      ++channel_index;
+    }
+
+    CHECK_EQ(channel_index, fetchers_.size());
+
+    // If we missed cycles, we could be pretty far behind.  Spin until we are
+    // caught up.
+  } while (last_synchronized_time_ + polling_period_ < monotonic_now);
+
+  writer_->Flush();
+}
+
+LogReader::LogReader(absl::string_view filename)
+    : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
+  PCHECK(fd_ != -1) << ": Failed to open " << filename;
+
+  // Make sure we have enough to read the size.
+  absl::Span<const uint8_t> config_data = ReadMessage();
+
+  // Make sure something was read.
+  CHECK(config_data != absl::Span<const uint8_t>());
+
+  // And copy the config so we have it forever.
+  configuration_ = std::vector<uint8_t>(config_data.begin(), config_data.end());
+
+  max_out_of_order_duration_ = std::chrono::nanoseconds(
+      flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+          ->max_out_of_order_duration());
+
+  channels_.resize(configuration()->channels()->size());
+
+  QueueMessages();
+}
+
+bool LogReader::ReadBlock() {
+  if (end_of_file_) {
+    return false;
+  }
+
+  // Appends 256k.  This is enough that the read call is efficient.  We don't
+  // want to spend too much time reading small chunks because the syscalls for
+  // that will be expensive.
+  constexpr size_t kReadSize = 256 * 1024;
+
+  // Strip off any unused data at the front.
+  if (consumed_data_ != 0) {
+    data_.erase(data_.begin(), data_.begin() + consumed_data_);
+    consumed_data_ = 0;
+  }
+
+  const size_t starting_size = data_.size();
+
+  // This should automatically grow the backing store.  It won't shrink if we
+  // get a small chunk later.  This reduces allocations when we want to append
+  // more data.
+  data_.resize(data_.size() + kReadSize);
+
+  ssize_t count = read(fd_, &data_[starting_size], kReadSize);
+  data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
+  if (count == 0) {
+    end_of_file_ = true;
+    return false;
+  }
+  PCHECK(count > 0);
+
+  return true;
+}
+
+bool LogReader::MessageAvailable() {
+  // Are we big enough to read the size?
+  if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
+    return false;
+  }
+
+  // Then, are we big enough to read the full message?
+  const size_t data_size =
+      flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
+      sizeof(flatbuffers::uoffset_t);
+  if (data_.size() < consumed_data_ + data_size) {
+    return false;
+  }
+
+  return true;
+}
+
+absl::Span<const uint8_t> LogReader::ReadMessage() {
+  // Make sure we have enough for the size.
+  if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
+    if (!ReadBlock()) {
+      return absl::Span<const uint8_t>();
+    }
+  }
+
+  // Now make sure we have enough for the message.
+  const size_t data_size =
+      flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
+      sizeof(flatbuffers::uoffset_t);
+  while (data_.size() < consumed_data_ + data_size) {
+    if (!ReadBlock()) {
+      return absl::Span<const uint8_t>();
+    }
+  }
+
+  // And return it, consuming the data.
+  const uint8_t *data_ptr = data_.data() + consumed_data_;
+
+  consumed_data_ += data_size;
+
+  return absl::Span<const uint8_t>(data_ptr, data_size);
+}
+
+void LogReader::QueueMessages() {
+  while (true) {
+    // Don't queue if we have enough data already.
+    // When a log file starts, there should be a message from each channel.
+    // Those messages might be very old. Make sure to read a chunk past the
+    // starting time.
+    if (channel_heap_.size() > 0 &&
+        newest_timestamp_ >
+            std::max(oldest_message().first, monotonic_start_time()) +
+                max_out_of_order_duration_) {
+      break;
+    }
+
+    absl::Span<const uint8_t> msg_data = ReadMessage();
+    if (msg_data == absl::Span<const uint8_t>()) {
+      break;
+    }
+
+    FlatbufferVector<MessageHeader> msg(std::vector<uint8_t>(
+        msg_data.begin() + sizeof(flatbuffers::uoffset_t), msg_data.end()));
+
+    EmplaceDataBack(std::move(msg));
+  }
+
+  queue_data_time_ = newest_timestamp_ - max_out_of_order_duration_;
+}
+
+const Configuration *LogReader::configuration() {
+  return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+      ->configuration();
+}
+
+monotonic_clock::time_point LogReader::monotonic_start_time() {
+  return monotonic_clock::time_point(std::chrono::nanoseconds(
+      flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+          ->monotonic_sent_time()));
+}
+
+realtime_clock::time_point LogReader::realtime_start_time() {
+  return realtime_clock::time_point(std::chrono::nanoseconds(
+      flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+          ->realtime_sent_time()));
+}
+
+void LogReader::Register(EventLoop *event_loop) {
+  event_loop_ = event_loop;
+
+  for (size_t i = 0; i < channels_.size(); ++i) {
+    CHECK_EQ(configuration()->channels()->Get(i)->name(),
+             event_loop_->configuration()->channels()->Get(i)->name());
+    CHECK_EQ(configuration()->channels()->Get(i)->type(),
+             event_loop_->configuration()->channels()->Get(i)->type());
+
+    channels_[i].raw_sender = event_loop_->MakeRawSender(
+        event_loop_->configuration()->channels()->Get(i));
+  }
+
+  timer_handler_ = event_loop_->AddTimer([this]() {
+    std::pair<monotonic_clock::time_point, int> oldest_channel_index =
+        PopOldestChannel();
+    const monotonic_clock::time_point monotonic_now =
+        event_loop_->monotonic_now();
+    CHECK(monotonic_now == oldest_channel_index.first)
+        << ": Now " << monotonic_now.time_since_epoch().count()
+        << " trying to send "
+        << oldest_channel_index.first.time_since_epoch().count();
+
+    struct LogReader::ChannelData &channel =
+        channels_[oldest_channel_index.second];
+
+    FlatbufferVector<MessageHeader> front = std::move(channel.front());
+
+    CHECK(front.message().data() != nullptr);
+    if (oldest_channel_index.first > monotonic_start_time()) {
+      channel.raw_sender->Send(front.message().data()->Data(),
+                               front.message().data()->size());
+    } else {
+      LOG(WARNING) << "Not sending data from before the start of the log file. "
+                   << oldest_channel_index.first.time_since_epoch().count()
+                   << " start "
+                   << monotonic_start_time().time_since_epoch().count() << " "
+                   << FlatbufferToJson(front);
+    }
+    channel.data.pop_front();
+
+    // Re-push it and update the oldest timestamp.
+    if (channel.data.size() != 0) {
+      const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+          chrono::nanoseconds(channel.front().message().monotonic_sent_time()));
+      PushChannelHeap(timestamp, oldest_channel_index.second);
+      channel.oldest_timestamp = timestamp;
+    } else {
+      channel.oldest_timestamp = monotonic_clock::min_time;
+    }
+
+    if (monotonic_now > queue_data_time_) {
+      QueueMessages();
+    }
+
+    if (channel_heap_.size() != 0) {
+      timer_handler_->Setup(oldest_message().first);
+    }
+  });
+
+  if (channel_heap_.size() > 0u) {
+    event_loop_->OnRun(
+        [this]() { timer_handler_->Setup(oldest_message().first); });
+  }
+}
+
+void LogReader::Deregister() {
+  for (size_t i = 0; i < channels_.size(); ++i) {
+    channels_[i].raw_sender.reset();
+  }
+}
+
+void LogReader::EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data) {
+  const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+      chrono::nanoseconds(new_data.message().monotonic_sent_time()));
+  const size_t channel_index = new_data.message().channel_index();
+  CHECK_LT(channel_index, channels_.size());
+  newest_timestamp_ = std::max(newest_timestamp_, timestamp);
+  if (channels_[channel_index].data.size() == 0) {
+    channels_[channel_index].oldest_timestamp = timestamp;
+    PushChannelHeap(timestamp, channel_index);
+  }
+  channels_[channel_index].data.emplace_back(std::move(new_data));
+}
+
+namespace {
+
+bool ChannelHeapCompare(
+    const std::pair<monotonic_clock::time_point, int> first,
+    const std::pair<monotonic_clock::time_point, int> second) {
+  if (first.first > second.first) {
+    return true;
+  } else if (first.first == second.first) {
+    return first.second > second.second;
+  } else {
+    return false;
+  }
+}
+
+}  // namespace
+
+void LogReader::PushChannelHeap(monotonic_clock::time_point timestamp,
+                                int channel_index) {
+  channel_heap_.push_back(std::make_pair(timestamp, channel_index));
+
+  // The default sort puts the newest message first.  Use a custom comparator to
+  // put the oldest message first.
+  std::push_heap(channel_heap_.begin(), channel_heap_.end(),
+                 ChannelHeapCompare);
+}
+
+std::pair<monotonic_clock::time_point, int> LogReader::PopOldestChannel() {
+  std::pair<monotonic_clock::time_point, int> result = channel_heap_.front();
+  std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
+                &ChannelHeapCompare);
+  channel_heap_.pop_back();
+  return result;
+}
+
+}  // namespace logger
+}  // namespace aos
diff --git a/aos/events/logger.fbs b/aos/events/logger.fbs
new file mode 100644
index 0000000..36055a6
--- /dev/null
+++ b/aos/events/logger.fbs
@@ -0,0 +1,53 @@
+include "aos/configuration.fbs";
+
+namespace aos.logger;
+
+// A log file is a sequence of size prefixed flatbuffers.
+// The first flatbuffer will be the LogFileHeader, followed by an arbitrary
+// number of MessageHeaders.
+//
+// The log file starts at the time demarcated in the header on the monotonic
+// clock.  There will be any number of messages per channel logged before the
+// start time.  These messages are logged so that fetchers can retrieve the
+// state of the system at the start of the logfile for things like capturing
+// parameters.  In replay, they should be made available to fetchers, but not
+// trigger watchers.
+
+table LogFileHeader {
+  // Time this log file started on the monotonic clock in nanoseconds.
+  monotonic_sent_time:long;
+  // Time this log file started on the realtime clock in nanoseconds.
+  realtime_sent_time:long;
+
+  // Messages are not written in order to disk.  They will be out of order by
+  // at most this duration (in nanoseconds).  If the log reader buffers until
+  // it finds messages this much newer than it's simulation time, it will never
+  // find a message out of order.
+  max_out_of_order_duration:long;
+
+  // The configuration of the channels.
+  configuration:aos.Configuration;
+
+  // TODO(austin): Node!
+}
+
+// Table holding a message.
+table MessageHeader {
+  // Index into the channel datastructure in the log file header.  This
+  // provides the data type.
+  channel_index:uint;
+  // Time this message was sent on the monotonic clock in nanoseconds.
+  monotonic_sent_time:long;
+  // Time this message was sent on the realtime clock in nanoseconds.
+  realtime_sent_time:long;
+  // Index into the ipc queue of this message.  This should start with 0 and
+  // always monotonically increment if no messages were ever lost.  It will
+  // wrap at a multiple of the queue size.
+  queue_index:uint;
+  // TODO(austin): Node.
+
+  // TODO(austin): Format?  Compressed?
+
+  // The nested flatbuffer.
+  data:[ubyte];
+}
diff --git a/aos/events/logger.h b/aos/events/logger.h
new file mode 100644
index 0000000..7873a42
--- /dev/null
+++ b/aos/events/logger.h
@@ -0,0 +1,252 @@
+#ifndef AOS_EVENTS_LOGGER_H_
+#define AOS_EVENTS_LOGGER_H_
+
+#include <deque>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "aos/events/event_loop.h"
+#include "aos/events/logger_generated.h"
+#include "aos/time/time.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+namespace logger {
+
+// This class manages efficiently writing a sequence of detached buffers to a
+// file.  It queues them up and batches the write operation.
+class DetachedBufferWriter {
+ public:
+  DetachedBufferWriter(absl::string_view filename);
+  ~DetachedBufferWriter();
+
+  // TODO(austin): Snappy compress the log file if it ends with .snappy!
+
+  // Queues up a finished FlatBufferBuilder to be written.  Steals the detached
+  // buffer from it.
+  void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
+  // Queues up a detached buffer directly.
+  void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
+
+  // Triggers data to be provided to the kernel and written.
+  void Flush();
+
+ private:
+  int fd_ = -1;
+
+  // Size of all the data in the queue.
+  size_t queued_size_ = 0;
+
+  // List of buffers to flush.
+  std::vector<flatbuffers::DetachedBuffer> queue_;
+  // List of iovecs to use with writev.  This is a member variable to avoid
+  // churn.
+  std::vector<struct iovec> iovec_;
+};
+
+// Logs all channels available in the event loop to disk every 100 ms.
+// Start by logging one message per channel to capture any state and
+// configuration that is sent rately on a channel and would affect execution.
+class Logger {
+ public:
+  Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
+         std::chrono::milliseconds polling_period =
+             std::chrono::milliseconds(100));
+
+ private:
+  void DoLogData();
+
+  EventLoop *event_loop_;
+  DetachedBufferWriter *writer_;
+
+  // Structure to track both a fetcher, and if the data fetched has been
+  // written.  We may want to delay writing data to disk so that we don't let
+  // data get too far out of order when written to disk so we can avoid making
+  // it too hard to sort when reading.
+  struct FetcherStruct {
+    std::unique_ptr<RawFetcher> fetcher;
+    bool written = false;
+  };
+
+  std::vector<FetcherStruct> fetchers_;
+  TimerHandler *timer_handler_;
+
+  // Period to poll the channels.
+  const std::chrono::milliseconds polling_period_;
+
+  // Last time that data was written for all channels to disk.
+  monotonic_clock::time_point last_synchronized_time_;
+
+  // Max size that the header has consumed.  This much extra data will be
+  // reserved in the builder to avoid reallocating.
+  size_t max_header_size_ = 0;
+};
+
+// Replays all the channels in the logfile to the event loop.
+class LogReader {
+ public:
+  LogReader(absl::string_view filename);
+
+  // Registers the timer and senders used to resend the messages from the log
+  // file.
+  void Register(EventLoop *event_loop);
+  // Unregisters the senders.
+  void Deregister();
+
+  // TODO(austin): Remap channels?
+
+  // Returns the configuration from the log file.
+  const Configuration *configuration();
+
+  // Returns the starting timestamp for the log file.
+  monotonic_clock::time_point monotonic_start_time();
+  realtime_clock::time_point realtime_start_time();
+
+  // TODO(austin): Add the ability to re-publish the fetched messages.  Add 2
+  // options, one which publishes them *now*, and another which publishes them
+  // to the simulated event loop factory back in time where they actually
+  // happened.
+
+ private:
+  // Reads a chunk of data into data_.  Returns false if no data was read.
+  bool ReadBlock();
+
+  // Returns true if there is a full message available in the buffer, or if we
+  // will have to read more data from disk.
+  bool MessageAvailable();
+
+  // Returns a span with the data for a message from the log file, excluding the
+  // size.
+  absl::Span<const uint8_t> ReadMessage();
+
+  // Queues at least max_out_of_order_duration_ messages into channels_.
+  void QueueMessages();
+
+  // We need to read a large chunk at a time, then kit it up into parts and
+  // sort.
+  //
+  // We want to read 256 KB chunks at a time.  This is the fastest read size.
+  // This leaves us with a fragmentation problem though.
+  //
+  // The easy answer is to read 256 KB chunks.  Then, malloc and memcpy those
+  // chunks into single flatbuffer messages and manage them in a sorted queue.
+  // Everything is copied three times (into 256 kb buffer, then into separate
+  // buffer, then into sender), but none of it is all that expensive.  We can
+  // optimize if it is slow later.
+  //
+  // As we place the elements in the sorted list of times, keep doing this until
+  // we read a message that is newer than the threshold.
+  //
+  // Then repeat.  Keep filling up the sorted list with 256 KB chunks (need a
+  // small state machine so we can resume), and keep pulling messages back out
+  // and sending.
+  //
+  // For sorting, we want to use the fact that each channel is sorted, and then
+  // merge sort the channels.  Have a vector of deques, and then hold a sorted
+  // list of pointers to those.
+  //
+  // TODO(austin): Multithreaded read at some point.  Gotta go faster!
+  // Especially if we start compressing.
+
+  // Allocator which doesn't zero initialize memory.
+  template <typename T>
+  struct DefaultInitAllocator {
+    typedef T value_type;
+
+    template <typename U>
+    void construct(U *p) {
+      ::new (static_cast<void *>(p)) U;
+    }
+
+    template <typename U, typename... Args>
+    void construct(U *p, Args &&... args) {
+      ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
+    }
+
+    T *allocate(std::size_t n) {
+      return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
+    }
+
+    template <typename U>
+    void deallocate(U *p, std::size_t /*n*/) {
+      ::operator delete(static_cast<void *>(p));
+    }
+  };
+
+  // Minimum amount of data to queue up for sorting before we are guarenteed to
+  // not see data out of order.
+  std::chrono::nanoseconds max_out_of_order_duration_;
+
+  // File descriptor for the log file.
+  int fd_ = -1;
+
+  EventLoop *event_loop_;
+  TimerHandler *timer_handler_;
+
+  // Vector to read into.  This uses an allocator which doesn't zero initialize
+  // the memory.
+  std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
+
+  // Amount of data consumed already in data_.
+  size_t consumed_data_ = 0;
+
+  // Vector holding the data for the configuration.
+  std::vector<uint8_t> configuration_;
+
+  // Moves the message to the correct channel queue.
+  void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
+
+  // Pushes a pointer to the channel for the given timestamp to the sorted
+  // channel list.
+  void PushChannelHeap(monotonic_clock::time_point timestamp,
+                       int channel_index);
+
+  // Returns a pointer to the channel with the oldest message in it, and the
+  // timestamp.
+  const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
+    return channel_heap_.front();
+  }
+
+  // Pops a pointer to the channel with the oldest message in it, and the
+  // timestamp.
+  std::pair<monotonic_clock::time_point, int> PopOldestChannel();
+
+  // Datastructure to hold the list of messages, cached timestamp for the oldest
+  // message, and sender to send with.
+  struct ChannelData {
+    monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
+    std::deque<FlatbufferVector<MessageHeader>> data;
+    std::unique_ptr<RawSender> raw_sender;
+
+    // Returns the oldest message.
+    const FlatbufferVector<MessageHeader> &front() { return data.front(); }
+
+    // Returns the timestamp for the oldest message.
+    const monotonic_clock::time_point front_timestamp() {
+      return monotonic_clock::time_point(
+          std::chrono::nanoseconds(front().message().monotonic_sent_time()));
+    }
+  };
+
+  // List of channels and messages for them.
+  std::vector<ChannelData> channels_;
+
+  // Heap of channels so we can track which channel to send next.
+  std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
+
+  // Timestamp of the newest message in a channel queue.
+  monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
+
+  // The time at which we need to read another chunk from the logfile.
+  monotonic_clock::time_point queue_data_time_ = monotonic_clock::min_time;
+
+  // Cached bit for if we have reached the end of the file.  Otherwise we will
+  // hammer on the kernel asking for more data each time we send.
+  bool end_of_file_ = false;
+};
+
+}  // namespace logger
+}  // namespace aos
+
+#endif  // AOS_EVENTS_LOGGER_H_
diff --git a/aos/events/logger_test.cc b/aos/events/logger_test.cc
new file mode 100644
index 0000000..71c640d
--- /dev/null
+++ b/aos/events/logger_test.cc
@@ -0,0 +1,111 @@
+#include "aos/events/logger.h"
+
+#include "aos/events/event_loop.h"
+#include "aos/events/ping_lib.h"
+#include "aos/events/pong_lib.h"
+#include "aos/events/simulated_event_loop.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace logger {
+namespace testing {
+
+namespace chrono = std::chrono;
+
+class LoggerTest : public ::testing::Test {
+ public:
+  LoggerTest()
+      : config_(
+            aos::configuration::ReadConfig("aos/events/pingpong_config.json")),
+        event_loop_factory_(&config_.message()),
+        ping_event_loop_(event_loop_factory_.MakeEventLoop()),
+        ping_(ping_event_loop_.get()),
+        pong_event_loop_(event_loop_factory_.MakeEventLoop()),
+        pong_(pong_event_loop_.get()) {}
+
+  // Config and factory.
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+  SimulatedEventLoopFactory event_loop_factory_;
+
+  // Event loop and app for Ping
+  std::unique_ptr<EventLoop> ping_event_loop_;
+  Ping ping_;
+
+  // Event loop and app for Pong
+  std::unique_ptr<EventLoop> pong_event_loop_;
+  Pong pong_;
+};
+
+// Tests that we can startup at all.  This confirms that the channels are all in
+// the config.
+TEST_F(LoggerTest, Starts) {
+  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string logfile = tmpdir + "/logfile.bfbs";
+  // Remove it.
+  unlink(logfile.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile;
+
+  {
+    DetachedBufferWriter writer(logfile);
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop();
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    Logger logger(&writer, logger_event_loop.get(),
+                  std::chrono::milliseconds(100));
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+
+  LogReader reader(logfile);
+
+  LOG(INFO) << "Config " << FlatbufferToJson(reader.configuration());
+
+  // TODO(austin): Figure out what the API needs to look like.  How do we replay
+  // the data that was fetched before it all starts?  How do we set the starting
+  // time from the log file?  Probably need to let the reader do more if it
+  // knows about the factory.
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+  std::unique_ptr<EventLoop> reader_event_loop =
+      log_reader_factory.MakeEventLoop();
+
+  reader.Register(reader_event_loop.get());
+
+  // Capture monotonic start time in OnRun and offset from there?  Let the user
+  // configure the factory if they want time to match?
+  /*
+  log_reader_factory.InitializeTime(log_reader.monotonic_start_time(),
+                                    log_reader.realtime_start_time());
+                                    */
+  std::unique_ptr<EventLoop> test_event_loop =
+      log_reader_factory.MakeEventLoop();
+
+  int ping_count = 10;
+  int pong_count = 10;
+
+  // Confirm that the ping value matches.
+  test_event_loop->MakeWatcher("/test",
+                               [&ping_count](const examples::Ping &ping) {
+                                 EXPECT_EQ(ping.value(), ping_count + 1);
+                                 ++ping_count;
+                               });
+  // Confirm that the ping and pong counts both match, and the value also
+  // matches.
+  test_event_loop->MakeWatcher(
+      "/test", [&pong_count, &ping_count](const examples::Pong &pong) {
+        EXPECT_EQ(pong.value(), pong_count + 1);
+        ++pong_count;
+        EXPECT_EQ(ping_count, pong_count);
+      });
+
+  log_reader_factory.RunFor(std::chrono::seconds(100));
+  EXPECT_EQ(ping_count, 2010);
+
+  reader.Deregister();
+}
+
+}  // namespace testing
+}  // namespace logger
+}  // namespace aos
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index ff2dced..1050417 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -151,6 +151,33 @@
   std::string data_;
 };
 
+// Vector backed flatbuffer.
+template <typename T>
+class FlatbufferVector : public Flatbuffer<T> {
+ public:
+  // Builds a Flatbuffer around a vector.
+  FlatbufferVector(std::vector<uint8_t> &&data) : data_(std::move(data)) {}
+
+  // Builds a Flatbuffer by copying the data from the other flatbuffer.
+  FlatbufferVector(const Flatbuffer<T> &other)
+      : data_(other.data(), other.data() + other.size()) {}
+
+  // Copies the data from the other flatbuffer.
+  FlatbufferVector &operator=(const Flatbuffer<T> &other) {
+    data_ = std::vector<uint8_t>(other.data(), other.data() + other.size());
+    return *this;
+  }
+
+  virtual ~FlatbufferVector() override {}
+
+  const uint8_t *data() const override { return data_.data(); }
+  uint8_t *data() override { return data_.data(); }
+  size_t size() const override { return data_.size(); }
+
+ private:
+  std::vector<uint8_t> data_;
+};
+
 // This object associates the message type with the memory storing the
 // flatbuffer.  This only stores root tables.
 //
diff --git a/frc971/control_loops/drivetrain/splinedrivetrain.cc b/frc971/control_loops/drivetrain/splinedrivetrain.cc
index 1222ae0..c467e1f 100644
--- a/frc971/control_loops/drivetrain/splinedrivetrain.cc
+++ b/frc971/control_loops/drivetrain/splinedrivetrain.cc
@@ -170,6 +170,8 @@
       has_started_execution_ = false;
     }
     mutex_.Unlock();
+  } else {
+    VLOG(1) << "Failed to acquire trajectory lock.";
   }
 }
 
diff --git a/y2019/control_loops/drivetrain/localizer_test.cc b/y2019/control_loops/drivetrain/localizer_test.cc
index d9a55f6..b126c6c 100644
--- a/y2019/control_loops/drivetrain/localizer_test.cc
+++ b/y2019/control_loops/drivetrain/localizer_test.cc
@@ -141,6 +141,9 @@
   }
 
   void SetUp() {
+    // Turn on -v 1
+    FLAGS_v = std::max(FLAGS_v, 1);
+
     flatbuffers::DetachedBuffer goal_buffer;
     {
       flatbuffers::FlatBufferBuilder fbb;
@@ -185,10 +188,13 @@
     aos::FlatbufferDetachedBuffer<frc971::control_loops::drivetrain::Goal> goal(
         std::move(goal_buffer));
 
-    spline_drivetrain_.SetGoal(&goal.message());
-
     // Let the spline drivetrain compute the spline.
     while (true) {
+      // We need to keep sending the goal.  There are conditions when the
+      // trajectory lock isn't grabbed the first time, and we want to keep
+      // banging on it to keep trying.  Otherwise we deadlock.
+      spline_drivetrain_.SetGoal(&goal.message());
+
       ::std::this_thread::sleep_for(::std::chrono::milliseconds(5));
 
       flatbuffers::FlatBufferBuilder fbb;