Move log file manipulation logic out of LogReader

It really had 4 separate layers that should each have been a class.
Split them out in perparation for multi-file file logs.

1) Read chunks of data from a file
2) Read the header and messages from a file.
3) Sort those messages
4) And then send them over the event loop.

Change-Id: Ib885e6f0ed027851a4d7faea71b9391c1b60cf19
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index dc1801d..714796e 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -8,8 +8,11 @@
 
 #include <vector>
 
+#include "aos/configuration.h"
 #include "aos/events/logging/logger_generated.h"
 #include "flatbuffers/flatbuffers.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
 
 DEFINE_int32(flush_size, 1000000,
              "Number of outstanding bytes to allow before flushing to disk.");
@@ -17,6 +20,8 @@
 namespace aos {
 namespace logger {
 
+namespace chrono = std::chrono;
+
 DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
     : fd_(open(std::string(filename).c_str(),
                O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
@@ -116,5 +121,220 @@
   return message_header_builder.Finish();
 }
 
+SpanReader::SpanReader(std::string_view filename)
+    : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
+  PCHECK(fd_ != -1) << ": Failed to open " << filename;
+}
+
+absl::Span<const uint8_t> SpanReader::ReadMessage() {
+  // Make sure we have enough for the size.
+  if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
+    if (!ReadBlock()) {
+      return absl::Span<const uint8_t>();
+    }
+  }
+
+  // Now make sure we have enough for the message.
+  const size_t data_size =
+      flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
+      sizeof(flatbuffers::uoffset_t);
+  while (data_.size() < consumed_data_ + data_size) {
+    if (!ReadBlock()) {
+      return absl::Span<const uint8_t>();
+    }
+  }
+
+  // And return it, consuming the data.
+  const uint8_t *data_ptr = data_.data() + consumed_data_;
+
+  consumed_data_ += data_size;
+
+  return absl::Span<const uint8_t>(data_ptr, data_size);
+}
+
+bool SpanReader::MessageAvailable() {
+  // Are we big enough to read the size?
+  if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
+    return false;
+  }
+
+  // Then, are we big enough to read the full message?
+  const size_t data_size =
+      flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
+      sizeof(flatbuffers::uoffset_t);
+  if (data_.size() < consumed_data_ + data_size) {
+    return false;
+  }
+
+  return true;
+}
+
+bool SpanReader::ReadBlock() {
+  if (end_of_file_) {
+    return false;
+  }
+
+  // Appends 256k.  This is enough that the read call is efficient.  We don't
+  // want to spend too much time reading small chunks because the syscalls for
+  // that will be expensive.
+  constexpr size_t kReadSize = 256 * 1024;
+
+  // Strip off any unused data at the front.
+  if (consumed_data_ != 0) {
+    data_.erase(data_.begin(), data_.begin() + consumed_data_);
+    consumed_data_ = 0;
+  }
+
+  const size_t starting_size = data_.size();
+
+  // This should automatically grow the backing store.  It won't shrink if we
+  // get a small chunk later.  This reduces allocations when we want to append
+  // more data.
+  data_.resize(data_.size() + kReadSize);
+
+  ssize_t count = read(fd_, &data_[starting_size], kReadSize);
+  data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
+  if (count == 0) {
+    end_of_file_ = true;
+    return false;
+  }
+  PCHECK(count > 0);
+
+  return true;
+}
+
+MessageReader::MessageReader(std::string_view filename)
+    : span_reader_(filename) {
+  // Make sure we have enough to read the size.
+  absl::Span<const uint8_t> config_data = span_reader_.ReadMessage();
+
+  // Make sure something was read.
+  CHECK(config_data != absl::Span<const uint8_t>());
+
+  // And copy the config so we have it forever.
+  configuration_ = std::vector<uint8_t>(config_data.begin(), config_data.end());
+
+  max_out_of_order_duration_ = std::chrono::nanoseconds(
+      flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+          ->max_out_of_order_duration());
+}
+
+std::optional<FlatbufferVector<MessageHeader>> MessageReader::ReadMessage() {
+  absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
+  if (msg_data == absl::Span<const uint8_t>()) {
+    return std::nullopt;
+  }
+
+  FlatbufferVector<MessageHeader> result{std::vector<uint8_t>(
+      msg_data.begin() + sizeof(flatbuffers::uoffset_t), msg_data.end())};
+
+  const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+      chrono::nanoseconds(result.message().monotonic_sent_time()));
+
+  newest_timestamp_ = std::max(newest_timestamp_, timestamp);
+  return result;
+}
+
+SortedMessageReader::SortedMessageReader(std::string_view filename)
+    : message_reader_(filename) {
+  channels_.resize(configuration()->channels()->size());
+
+  QueueMessages();
+}
+
+void SortedMessageReader::EmplaceDataBack(
+    FlatbufferVector<MessageHeader> &&new_data) {
+  const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+      chrono::nanoseconds(new_data.message().monotonic_sent_time()));
+  const size_t channel_index = new_data.message().channel_index();
+  CHECK_LT(channel_index, channels_.size());
+
+  if (channels_[channel_index].data.size() == 0) {
+    channels_[channel_index].oldest_timestamp = timestamp;
+    PushChannelHeap(timestamp, channel_index);
+  }
+  channels_[channel_index].data.emplace_back(std::move(new_data));
+}
+
+namespace {
+
+bool ChannelHeapCompare(
+    const std::pair<monotonic_clock::time_point, int> first,
+    const std::pair<monotonic_clock::time_point, int> second) {
+  if (first.first > second.first) {
+    return true;
+  } else if (first.first == second.first) {
+    return first.second > second.second;
+  } else {
+    return false;
+  }
+}
+
+}  // namespace
+
+void SortedMessageReader::PushChannelHeap(monotonic_clock::time_point timestamp,
+                                          int channel_index) {
+  channel_heap_.push_back(std::make_pair(timestamp, channel_index));
+
+  // The default sort puts the newest message first.  Use a custom comparator to
+  // put the oldest message first.
+  std::push_heap(channel_heap_.begin(), channel_heap_.end(),
+                 ChannelHeapCompare);
+}
+
+void SortedMessageReader::QueueMessages() {
+  while (true) {
+    // Don't queue if we have enough data already.
+    // When a log file starts, there should be a message from each channel.
+    // Those messages might be very old. Make sure to read a chunk past the
+    // starting time.
+    if (channel_heap_.size() > 0 &&
+        message_reader_.newest_timestamp() >
+            std::max(oldest_message().first, monotonic_start_time()) +
+                message_reader_.max_out_of_order_duration()) {
+      break;
+    }
+
+    if (std::optional<FlatbufferVector<MessageHeader>> msg =
+            message_reader_.ReadMessage()) {
+      EmplaceDataBack(std::move(msg.value()));
+    } else {
+      break;
+    }
+  }
+}
+
+std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
+SortedMessageReader::PopOldestChannel() {
+  std::pair<monotonic_clock::time_point, int> oldest_channel_data =
+      channel_heap_.front();
+  std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
+                &ChannelHeapCompare);
+  channel_heap_.pop_back();
+
+  struct ChannelData &channel = channels_[oldest_channel_data.second];
+
+  FlatbufferVector<MessageHeader> front = std::move(channel.front());
+
+  channel.data.pop_front();
+
+  // Re-push it and update the oldest timestamp.
+  if (channel.data.size() != 0) {
+    const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+        chrono::nanoseconds(channel.front().message().monotonic_sent_time()));
+    PushChannelHeap(timestamp, oldest_channel_data.second);
+    channel.oldest_timestamp = timestamp;
+  } else {
+    channel.oldest_timestamp = monotonic_clock::min_time;
+  }
+
+  if (oldest_channel_data.first > message_reader_.queue_data_time()) {
+    QueueMessages();
+  }
+
+  return std::make_tuple(oldest_channel_data.first, oldest_channel_data.second,
+                         std::move(front));
+}
+
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 0892290..285fe05 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -3,9 +3,12 @@
 
 #include <sys/uio.h>
 
+#include <deque>
+#include <optional>
 #include <string_view>
 #include <vector>
 
+#include "absl/types/span.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/logger_generated.h"
 #include "flatbuffers/flatbuffers.h"
@@ -62,15 +65,232 @@
     flatbuffers::FlatBufferBuilder *fbb, const Context &context,
     int channel_index, LogType log_type);
 
-// TODO(austin): 3 objects:
-// 1) log chunk reader.  Returns span.
-// 2) Sorted message header reader.  Returns sorted messages.
-// 3) LogReader, which does all the registration.
-//
-// Then, we can do a multi-node sim which forwards data nicely, try logging it, and then try replaying it.
+// Class to read chunks out of a log file.
+class SpanReader {
+ public:
+  SpanReader(std::string_view filename);
 
-// Optimization:
-//   Allocate the 256k blocks like we do today.  But, refcount them with shared_ptr pointed to by the messageheader that is returned.  This avoids the copy.
+  ~SpanReader() { close(fd_); }
+
+  // Returns a span with the data for a message from the log file, excluding
+  // the size.
+  absl::Span<const uint8_t> ReadMessage();
+
+  // Returns true if there is a full message available in the buffer, or if we
+  // will have to read more data from disk.
+  bool MessageAvailable();
+
+ private:
+  // TODO(austin): Optimization:
+  //   Allocate the 256k blocks like we do today.  But, refcount them with
+  //   shared_ptr pointed to by the messageheader that is returned.  This avoids
+  //   the copy.  Need to do more benchmarking.
+
+  // Reads a chunk of data into data_.  Returns false if no data was read.
+  bool ReadBlock();
+
+  // File descriptor for the log file.
+  int fd_ = -1;
+
+  // Allocator which doesn't zero initialize memory.
+  template <typename T>
+  struct DefaultInitAllocator {
+    typedef T value_type;
+
+    template <typename U>
+    void construct(U *p) {
+      ::new (static_cast<void *>(p)) U;
+    }
+
+    template <typename U, typename... Args>
+    void construct(U *p, Args &&... args) {
+      ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
+    }
+
+    T *allocate(std::size_t n) {
+      return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
+    }
+
+    template <typename U>
+    void deallocate(U *p, std::size_t /*n*/) {
+      ::operator delete(static_cast<void *>(p));
+    }
+  };
+
+  // Vector to read into.  This uses an allocator which doesn't zero
+  // initialize the memory.
+  std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
+
+  // Amount of data consumed already in data_.
+  size_t consumed_data_ = 0;
+
+  // Cached bit for if we have reached the end of the file.  Otherwise we will
+  // hammer on the kernel asking for more data each time we send.
+  bool end_of_file_ = false;
+};
+
+// Class which handles reading the header and messages from the log file.  This
+// handles any per-file state left before merging below.
+class MessageReader {
+ public:
+  MessageReader(std::string_view filename);
+
+  // Returns the header from the log file.
+  const LogFileHeader *log_file_header() const {
+    return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(
+        configuration_.data());
+  }
+
+  // Returns the minimum maount of data needed to queue up for sorting before
+  // ware guarenteed to not see data out of order.
+  std::chrono::nanoseconds max_out_of_order_duration() const {
+    return max_out_of_order_duration_;
+  }
+
+  monotonic_clock::time_point newest_timestamp() const {
+    return newest_timestamp_;
+  }
+
+  // Returns the next message if there is one.
+  std::optional<FlatbufferVector<MessageHeader>> ReadMessage();
+
+  // The time at which we need to read another chunk from the logfile.
+  monotonic_clock::time_point queue_data_time() const {
+    return newest_timestamp() - max_out_of_order_duration();
+  }
+
+ private:
+  // Log chunk reader.
+  SpanReader span_reader_;
+
+  // Vector holding the data for the configuration.
+  std::vector<uint8_t> configuration_;
+
+  // Minimum amount of data to queue up for sorting before we are guarenteed
+  // to not see data out of order.
+  std::chrono::nanoseconds max_out_of_order_duration_;
+
+  // Timestamp of the newest message in a channel queue.
+  monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
+};
+
+// We need to read a large chunk at a time, then kit it up into parts and
+// sort.
+//
+// We want to read 256 KB chunks at a time.  This is the fastest read size.
+// This leaves us with a fragmentation problem though.
+//
+// The easy answer is to read 256 KB chunks.  Then, malloc and memcpy those
+// chunks into single flatbuffer messages and manage them in a sorted queue.
+// Everything is copied three times (into 256 kb buffer, then into separate
+// buffer, then into sender), but none of it is all that expensive.  We can
+// optimize if it is slow later.
+//
+// As we place the elements in the sorted list of times, keep doing this
+// until we read a message that is newer than the threshold.
+//
+// Then repeat.  Keep filling up the sorted list with 256 KB chunks (need a
+// small state machine so we can resume), and keep pulling messages back out
+// and sending.
+//
+// For sorting, we want to use the fact that each channel is sorted, and
+// then merge sort the channels.  Have a vector of deques, and then hold a
+// sorted list of pointers to those.
+class SortedMessageReader {
+ public:
+  SortedMessageReader(std::string_view filename);
+
+  // Returns the header from the log file.
+  const LogFileHeader *log_file_header() const {
+    return message_reader_.log_file_header();
+  }
+
+  // Returns a pointer to the channel with the oldest message in it, and the
+  // timestamp.
+  const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
+    return channel_heap_.front();
+  }
+
+  // Returns the number of channels with data still in them.
+  size_t active_channel_count() const { return channel_heap_.size(); }
+
+  // Returns the configuration from the log file header.
+  const Configuration *configuration() const {
+    return log_file_header()->configuration();
+  }
+
+  // Returns the start time on both the monotonic and realtime clocks.
+  monotonic_clock::time_point monotonic_start_time() {
+    return monotonic_clock::time_point(
+        std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
+  }
+  realtime_clock::time_point realtime_start_time() {
+    return realtime_clock::time_point(
+        std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
+  }
+
+  // Returns the node who's point of view this log file is from.  Make sure this
+  // is a pointer in the configuration() nodes list so it can be consumed
+  // elsewhere.
+  const Node *node() const {
+    if (configuration()->has_nodes()) {
+      CHECK(log_file_header()->has_node());
+      CHECK(log_file_header()->node()->has_name());
+      return configuration::GetNode(
+          configuration(), log_file_header()->node()->name()->string_view());
+    } else {
+      CHECK(!log_file_header()->has_node());
+      return nullptr;
+    }
+  }
+
+  // Pops a pointer to the channel with the oldest message in it, and the
+  // timestamp.
+  std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
+  PopOldestChannel();
+
+ private:
+  // Adds more messages to the sorted list.
+  void QueueMessages();
+
+  // Moves the message to the correct channel queue.
+  void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
+
+  // Pushes a pointer to the channel for the given timestamp to the sorted
+  // channel list.
+  void PushChannelHeap(monotonic_clock::time_point timestamp,
+                       int channel_index);
+
+
+  // Datastructure to hold the list of messages, cached timestamp for the
+  // oldest message, and sender to send with.
+  struct ChannelData {
+    monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
+    std::deque<FlatbufferVector<MessageHeader>> data;
+    std::unique_ptr<RawSender> raw_sender;
+
+    // Returns the oldest message.
+    const FlatbufferVector<MessageHeader> &front() { return data.front(); }
+
+    // Returns the timestamp for the oldest message.
+    const monotonic_clock::time_point front_timestamp() {
+      return monotonic_clock::time_point(
+          std::chrono::nanoseconds(front().message().monotonic_sent_time()));
+    }
+  };
+
+  MessageReader message_reader_;
+
+  // TODO(austin): Multithreaded read at some point.  Gotta go faster!
+  // Especially if we start compressing.
+
+  // List of channels and messages for them.
+  std::vector<ChannelData> channels_;
+
+  // Heap of channels so we can track which channel to send next.
+  std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
+
+};
 
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index e7d0d3b..22c1a6e 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -7,7 +7,6 @@
 #include <sys/uio.h>
 #include <vector>
 
-#include "absl/strings/string_view.h"
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/logger_generated.h"
@@ -220,172 +219,27 @@
   writer_->Flush();
 }
 
-LogReader::LogReader(absl::string_view filename)
-    : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
-  PCHECK(fd_ != -1) << ": Failed to open " << filename;
-
-  // Make sure we have enough to read the size.
-  absl::Span<const uint8_t> config_data = ReadMessage();
-
-  // Make sure something was read.
-  CHECK(config_data != absl::Span<const uint8_t>());
-
-  // And copy the config so we have it forever.
-  configuration_ = std::vector<uint8_t>(config_data.begin(), config_data.end());
-
-  max_out_of_order_duration_ = std::chrono::nanoseconds(
-      flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
-          ->max_out_of_order_duration());
-
+LogReader::LogReader(std::string_view filename)
+    : sorted_message_reader_(filename) {
   channels_.resize(configuration()->channels()->size());
-
-  QueueMessages();
 }
 
 LogReader::~LogReader() {
-  CHECK(!event_loop_unique_ptr_) << "Did you remember to call Deregister?";
-}
-
-bool LogReader::ReadBlock() {
-  if (end_of_file_) {
-    return false;
-  }
-
-  // Appends 256k.  This is enough that the read call is efficient.  We don't
-  // want to spend too much time reading small chunks because the syscalls for
-  // that will be expensive.
-  constexpr size_t kReadSize = 256 * 1024;
-
-  // Strip off any unused data at the front.
-  if (consumed_data_ != 0) {
-    data_.erase(data_.begin(), data_.begin() + consumed_data_);
-    consumed_data_ = 0;
-  }
-
-  const size_t starting_size = data_.size();
-
-  // This should automatically grow the backing store.  It won't shrink if we
-  // get a small chunk later.  This reduces allocations when we want to append
-  // more data.
-  data_.resize(data_.size() + kReadSize);
-
-  ssize_t count = read(fd_, &data_[starting_size], kReadSize);
-  data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
-  if (count == 0) {
-    end_of_file_ = true;
-    return false;
-  }
-  PCHECK(count > 0);
-
-  return true;
-}
-
-bool LogReader::MessageAvailable() {
-  // Are we big enough to read the size?
-  if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
-    return false;
-  }
-
-  // Then, are we big enough to read the full message?
-  const size_t data_size =
-      flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
-      sizeof(flatbuffers::uoffset_t);
-  if (data_.size() < consumed_data_ + data_size) {
-    return false;
-  }
-
-  return true;
-}
-
-absl::Span<const uint8_t> LogReader::ReadMessage() {
-  // Make sure we have enough for the size.
-  if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
-    if (!ReadBlock()) {
-      return absl::Span<const uint8_t>();
-    }
-  }
-
-  // Now make sure we have enough for the message.
-  const size_t data_size =
-      flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
-      sizeof(flatbuffers::uoffset_t);
-  while (data_.size() < consumed_data_ + data_size) {
-    if (!ReadBlock()) {
-      return absl::Span<const uint8_t>();
-    }
-  }
-
-  // And return it, consuming the data.
-  const uint8_t *data_ptr = data_.data() + consumed_data_;
-
-  consumed_data_ += data_size;
-
-  return absl::Span<const uint8_t>(data_ptr, data_size);
-}
-
-void LogReader::QueueMessages() {
-  while (true) {
-    // Don't queue if we have enough data already.
-    // When a log file starts, there should be a message from each channel.
-    // Those messages might be very old. Make sure to read a chunk past the
-    // starting time.
-    if (channel_heap_.size() > 0 &&
-        newest_timestamp_ >
-            std::max(oldest_message().first, monotonic_start_time()) +
-                max_out_of_order_duration_) {
-      break;
-    }
-
-    absl::Span<const uint8_t> msg_data = ReadMessage();
-    if (msg_data == absl::Span<const uint8_t>()) {
-      break;
-    }
-
-    FlatbufferVector<MessageHeader> msg(std::vector<uint8_t>(
-        msg_data.begin() + sizeof(flatbuffers::uoffset_t), msg_data.end()));
-
-    EmplaceDataBack(std::move(msg));
-  }
-
-  queue_data_time_ = newest_timestamp_ - max_out_of_order_duration_;
+  CHECK(!event_loop_unique_ptr_) << ": Did you remember to call Deregister?";
 }
 
 const Configuration *LogReader::configuration() const {
-  return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
-      ->configuration();
+  return sorted_message_reader_.configuration();
 }
 
-const Node *LogReader::node() const {
-  if (configuration()->has_nodes()) {
-    CHECK(flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
-              ->has_node());
-    CHECK(flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
-              ->node()
-              ->has_name());
-    return configuration::GetNode(
-        configuration(),
-        flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
-            ->node()
-            ->name()
-            ->string_view());
-  } else {
-    CHECK(
-        !flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
-             ->has_node());
-    return nullptr;
-  }
-}
+const Node *LogReader::node() const { return sorted_message_reader_.node(); }
 
 monotonic_clock::time_point LogReader::monotonic_start_time() {
-  return monotonic_clock::time_point(std::chrono::nanoseconds(
-      flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
-          ->monotonic_start_time()));
+  return sorted_message_reader_.monotonic_start_time();
 }
 
 realtime_clock::time_point LogReader::realtime_start_time() {
-  return realtime_clock::time_point(std::chrono::nanoseconds(
-      flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
-          ->realtime_start_time()));
+  return sorted_message_reader_.realtime_start_time();
 }
 
 void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
@@ -413,88 +267,75 @@
     CHECK_EQ(configuration()->channels()->Get(i)->type(),
              event_loop_->configuration()->channels()->Get(i)->type());
 
-    channels_[i].raw_sender = event_loop_->MakeRawSender(
+    channels_[i] = event_loop_->MakeRawSender(
         event_loop_->configuration()->channels()->Get(i));
   }
 
   timer_handler_ = event_loop_->AddTimer([this]() {
-    std::pair<monotonic_clock::time_point, int> oldest_channel_index =
-        PopOldestChannel();
+    monotonic_clock::time_point channel_timestamp;
+    int channel_index;
+    FlatbufferVector<MessageHeader> channel_data =
+        FlatbufferVector<MessageHeader>::Empty();
+
+    std::tie(channel_timestamp, channel_index, channel_data) =
+        sorted_message_reader_.PopOldestChannel();
+
     const monotonic_clock::time_point monotonic_now =
         event_loop_->context().monotonic_event_time;
-    CHECK(monotonic_now == oldest_channel_index.first)
+    CHECK(monotonic_now == channel_timestamp)
         << ": Now " << monotonic_now.time_since_epoch().count()
-        << " trying to send "
-        << oldest_channel_index.first.time_since_epoch().count();
+        << " trying to send " << channel_timestamp.time_since_epoch().count();
 
-    struct LogReader::ChannelData &channel =
-        channels_[oldest_channel_index.second];
-
-    FlatbufferVector<MessageHeader> front = std::move(channel.front());
-
-    if (oldest_channel_index.first > monotonic_start_time() ||
+    if (channel_timestamp > monotonic_start_time() ||
         event_loop_factory_ != nullptr) {
       if (!FLAGS_skip_missing_forwarding_entries ||
-          front.message().data() != nullptr) {
-        CHECK(front.message().data() != nullptr)
-            << ": Got a message without data.  Forwarding entry which was not "
+          channel_data.message().data() != nullptr) {
+        CHECK(channel_data.message().data() != nullptr)
+            << ": Got a message without data.  Forwarding entry which was "
+               "not "
                "matched?  Use --skip_missing_forwarding_entries to ignore "
                "this.";
 
         // If we have access to the factory, use it to fix the realtime time.
         if (event_loop_factory_ != nullptr) {
           event_loop_factory_->SetRealtimeOffset(
-              monotonic_clock::time_point(
-                  chrono::nanoseconds(front.message().monotonic_sent_time())),
-              realtime_clock::time_point(
-                  chrono::nanoseconds(front.message().realtime_sent_time())));
+              monotonic_clock::time_point(chrono::nanoseconds(
+                  channel_data.message().monotonic_sent_time())),
+              realtime_clock::time_point(chrono::nanoseconds(
+                  channel_data.message().realtime_sent_time())));
         }
 
-        channel.raw_sender->Send(
-            front.message().data()->Data(), front.message().data()->size(),
-            monotonic_clock::time_point(
-                chrono::nanoseconds(front.message().monotonic_remote_time())),
-            realtime_clock::time_point(
-                chrono::nanoseconds(front.message().realtime_remote_time())),
-            front.message().remote_queue_index());
+        channels_[channel_index]->Send(
+            channel_data.message().data()->Data(),
+            channel_data.message().data()->size(),
+            monotonic_clock::time_point(chrono::nanoseconds(
+                channel_data.message().monotonic_remote_time())),
+            realtime_clock::time_point(chrono::nanoseconds(
+                channel_data.message().realtime_remote_time())),
+            channel_data.message().remote_queue_index());
       }
     } else {
       LOG(WARNING) << "Not sending data from before the start of the log file. "
-                   << oldest_channel_index.first.time_since_epoch().count()
-                   << " start "
+                   << channel_timestamp.time_since_epoch().count() << " start "
                    << monotonic_start_time().time_since_epoch().count() << " "
-                   << FlatbufferToJson(front);
-    }
-    channel.data.pop_front();
-
-    // Re-push it and update the oldest timestamp.
-    if (channel.data.size() != 0) {
-      const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
-          chrono::nanoseconds(channel.front().message().monotonic_sent_time()));
-      PushChannelHeap(timestamp, oldest_channel_index.second);
-      channel.oldest_timestamp = timestamp;
-    } else {
-      channel.oldest_timestamp = monotonic_clock::min_time;
+                   << FlatbufferToJson(channel_data);
     }
 
-    if (monotonic_now > queue_data_time_) {
-      QueueMessages();
-    }
-
-    if (channel_heap_.size() != 0) {
-      timer_handler_->Setup(oldest_message().first);
+    if (sorted_message_reader_.active_channel_count() > 0u) {
+      timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
     }
   });
 
-  if (channel_heap_.size() > 0u) {
-    event_loop_->OnRun(
-        [this]() { timer_handler_->Setup(oldest_message().first); });
+  if (sorted_message_reader_.active_channel_count() > 0u) {
+    event_loop_->OnRun([this]() {
+      timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
+    });
   }
 }
 
 void LogReader::Deregister() {
   for (size_t i = 0; i < channels_.size(); ++i) {
-    channels_[i].raw_sender.reset();
+    channels_[i].reset();
   }
 
   event_loop_factory_ = nullptr;
@@ -502,52 +343,5 @@
   event_loop_ = nullptr;
 }
 
-void LogReader::EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data) {
-  const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
-      chrono::nanoseconds(new_data.message().monotonic_sent_time()));
-  const size_t channel_index = new_data.message().channel_index();
-  CHECK_LT(channel_index, channels_.size());
-  newest_timestamp_ = std::max(newest_timestamp_, timestamp);
-  if (channels_[channel_index].data.size() == 0) {
-    channels_[channel_index].oldest_timestamp = timestamp;
-    PushChannelHeap(timestamp, channel_index);
-  }
-  channels_[channel_index].data.emplace_back(std::move(new_data));
-}
-
-namespace {
-
-bool ChannelHeapCompare(
-    const std::pair<monotonic_clock::time_point, int> first,
-    const std::pair<monotonic_clock::time_point, int> second) {
-  if (first.first > second.first) {
-    return true;
-  } else if (first.first == second.first) {
-    return first.second > second.second;
-  } else {
-    return false;
-  }
-}
-
-}  // namespace
-
-void LogReader::PushChannelHeap(monotonic_clock::time_point timestamp,
-                                int channel_index) {
-  channel_heap_.push_back(std::make_pair(timestamp, channel_index));
-
-  // The default sort puts the newest message first.  Use a custom comparator to
-  // put the oldest message first.
-  std::push_heap(channel_heap_.begin(), channel_heap_.end(),
-                 ChannelHeapCompare);
-}
-
-std::pair<monotonic_clock::time_point, int> LogReader::PopOldestChannel() {
-  std::pair<monotonic_clock::time_point, int> result = channel_heap_.front();
-  std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
-                &ChannelHeapCompare);
-  channel_heap_.pop_back();
-  return result;
-}
-
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e44f65f..a35b453 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -3,8 +3,8 @@
 
 #include <deque>
 #include <vector>
+#include <string_view>
 
-#include "absl/strings/string_view.h"
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/logfile_utils.h"
@@ -59,7 +59,7 @@
 // Replays all the channels in the logfile to the event loop.
 class LogReader {
  public:
-  LogReader(absl::string_view filename);
+  LogReader(std::string_view filename);
   ~LogReader();
 
   // Registers the timer and senders used to resend the messages from the log
@@ -89,143 +89,18 @@
   // happened.
 
  private:
-  // Reads a chunk of data into data_.  Returns false if no data was read.
-  bool ReadBlock();
-
-  // Returns true if there is a full message available in the buffer, or if we
-  // will have to read more data from disk.
-  bool MessageAvailable();
-
-  // Returns a span with the data for a message from the log file, excluding
-  // the size.
-  absl::Span<const uint8_t> ReadMessage();
-
   // Queues at least max_out_of_order_duration_ messages into channels_.
   void QueueMessages();
 
-  // We need to read a large chunk at a time, then kit it up into parts and
-  // sort.
-  //
-  // We want to read 256 KB chunks at a time.  This is the fastest read size.
-  // This leaves us with a fragmentation problem though.
-  //
-  // The easy answer is to read 256 KB chunks.  Then, malloc and memcpy those
-  // chunks into single flatbuffer messages and manage them in a sorted queue.
-  // Everything is copied three times (into 256 kb buffer, then into separate
-  // buffer, then into sender), but none of it is all that expensive.  We can
-  // optimize if it is slow later.
-  //
-  // As we place the elements in the sorted list of times, keep doing this
-  // until we read a message that is newer than the threshold.
-  //
-  // Then repeat.  Keep filling up the sorted list with 256 KB chunks (need a
-  // small state machine so we can resume), and keep pulling messages back out
-  // and sending.
-  //
-  // For sorting, we want to use the fact that each channel is sorted, and
-  // then merge sort the channels.  Have a vector of deques, and then hold a
-  // sorted list of pointers to those.
-  //
-  // TODO(austin): Multithreaded read at some point.  Gotta go faster!
-  // Especially if we start compressing.
+  // Log chunk reader.
+  SortedMessageReader sorted_message_reader_;
 
-  // Allocator which doesn't zero initialize memory.
-  template <typename T>
-  struct DefaultInitAllocator {
-    typedef T value_type;
-
-    template <typename U>
-    void construct(U *p) {
-      ::new (static_cast<void *>(p)) U;
-    }
-
-    template <typename U, typename... Args>
-    void construct(U *p, Args &&... args) {
-      ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
-    }
-
-    T *allocate(std::size_t n) {
-      return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
-    }
-
-    template <typename U>
-    void deallocate(U *p, std::size_t /*n*/) {
-      ::operator delete(static_cast<void *>(p));
-    }
-  };
-
-  // Minimum amount of data to queue up for sorting before we are guarenteed
-  // to not see data out of order.
-  std::chrono::nanoseconds max_out_of_order_duration_;
-
-  // File descriptor for the log file.
-  int fd_ = -1;
+  std::vector<std::unique_ptr<RawSender>> channels_;
 
   SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
   std::unique_ptr<EventLoop> event_loop_unique_ptr_;
   EventLoop *event_loop_ = nullptr;
   TimerHandler *timer_handler_;
-
-  // Vector to read into.  This uses an allocator which doesn't zero
-  // initialize the memory.
-  std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
-
-  // Amount of data consumed already in data_.
-  size_t consumed_data_ = 0;
-
-  // Vector holding the data for the configuration.
-  std::vector<uint8_t> configuration_;
-
-  // Moves the message to the correct channel queue.
-  void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
-
-  // Pushes a pointer to the channel for the given timestamp to the sorted
-  // channel list.
-  void PushChannelHeap(monotonic_clock::time_point timestamp,
-                       int channel_index);
-
-  // Returns a pointer to the channel with the oldest message in it, and the
-  // timestamp.
-  const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
-    return channel_heap_.front();
-  }
-
-  // Pops a pointer to the channel with the oldest message in it, and the
-  // timestamp.
-  std::pair<monotonic_clock::time_point, int> PopOldestChannel();
-
-  // Datastructure to hold the list of messages, cached timestamp for the
-  // oldest message, and sender to send with.
-  struct ChannelData {
-    monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
-    std::deque<FlatbufferVector<MessageHeader>> data;
-    std::unique_ptr<RawSender> raw_sender;
-
-    // Returns the oldest message.
-    const FlatbufferVector<MessageHeader> &front() { return data.front(); }
-
-    // Returns the timestamp for the oldest message.
-    const monotonic_clock::time_point front_timestamp() {
-      return monotonic_clock::time_point(
-          std::chrono::nanoseconds(front().message().monotonic_sent_time()));
-    }
-  };
-
-  // List of channels and messages for them.
-  std::vector<ChannelData> channels_;
-
-  // Heap of channels so we can track which channel to send next.
-  std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
-
-  // Timestamp of the newest message in a channel queue.
-  monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
-
-  // The time at which we need to read another chunk from the logfile.
-  monotonic_clock::time_point queue_data_time_ = monotonic_clock::min_time;
-
-  // Cached bit for if we have reached the end of the file.  Otherwise we will
-  // hammer on the kernel asking for more data each time we send.
-  bool end_of_file_ = false;
 };
 
 }  // namespace logger
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 630741f..4e4520c 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -171,6 +171,11 @@
     return *this;
   }
 
+  // Constructs an empty flatbuffer of type T.
+  static FlatbufferVector<T> Empty() {
+    return FlatbufferVector<T>(std::vector<uint8_t>{});
+  }
+
   virtual ~FlatbufferVector() override {}
 
   const uint8_t *data() const override { return data_.data(); }