Move log file manipulation logic out of LogReader

It really had 4 separate layers that should each have been a class.
Split them out in perparation for multi-file file logs.

1) Read chunks of data from a file
2) Read the header and messages from a file.
3) Sort those messages
4) And then send them over the event loop.

Change-Id: Ib885e6f0ed027851a4d7faea71b9391c1b60cf19
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index dc1801d..714796e 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -8,8 +8,11 @@
 
 #include <vector>
 
+#include "aos/configuration.h"
 #include "aos/events/logging/logger_generated.h"
 #include "flatbuffers/flatbuffers.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
 
 DEFINE_int32(flush_size, 1000000,
              "Number of outstanding bytes to allow before flushing to disk.");
@@ -17,6 +20,8 @@
 namespace aos {
 namespace logger {
 
+namespace chrono = std::chrono;
+
 DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
     : fd_(open(std::string(filename).c_str(),
                O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
@@ -116,5 +121,220 @@
   return message_header_builder.Finish();
 }
 
+SpanReader::SpanReader(std::string_view filename)
+    : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
+  PCHECK(fd_ != -1) << ": Failed to open " << filename;
+}
+
+absl::Span<const uint8_t> SpanReader::ReadMessage() {
+  // Make sure we have enough for the size.
+  if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
+    if (!ReadBlock()) {
+      return absl::Span<const uint8_t>();
+    }
+  }
+
+  // Now make sure we have enough for the message.
+  const size_t data_size =
+      flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
+      sizeof(flatbuffers::uoffset_t);
+  while (data_.size() < consumed_data_ + data_size) {
+    if (!ReadBlock()) {
+      return absl::Span<const uint8_t>();
+    }
+  }
+
+  // And return it, consuming the data.
+  const uint8_t *data_ptr = data_.data() + consumed_data_;
+
+  consumed_data_ += data_size;
+
+  return absl::Span<const uint8_t>(data_ptr, data_size);
+}
+
+bool SpanReader::MessageAvailable() {
+  // Are we big enough to read the size?
+  if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
+    return false;
+  }
+
+  // Then, are we big enough to read the full message?
+  const size_t data_size =
+      flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
+      sizeof(flatbuffers::uoffset_t);
+  if (data_.size() < consumed_data_ + data_size) {
+    return false;
+  }
+
+  return true;
+}
+
+bool SpanReader::ReadBlock() {
+  if (end_of_file_) {
+    return false;
+  }
+
+  // Appends 256k.  This is enough that the read call is efficient.  We don't
+  // want to spend too much time reading small chunks because the syscalls for
+  // that will be expensive.
+  constexpr size_t kReadSize = 256 * 1024;
+
+  // Strip off any unused data at the front.
+  if (consumed_data_ != 0) {
+    data_.erase(data_.begin(), data_.begin() + consumed_data_);
+    consumed_data_ = 0;
+  }
+
+  const size_t starting_size = data_.size();
+
+  // This should automatically grow the backing store.  It won't shrink if we
+  // get a small chunk later.  This reduces allocations when we want to append
+  // more data.
+  data_.resize(data_.size() + kReadSize);
+
+  ssize_t count = read(fd_, &data_[starting_size], kReadSize);
+  data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
+  if (count == 0) {
+    end_of_file_ = true;
+    return false;
+  }
+  PCHECK(count > 0);
+
+  return true;
+}
+
+MessageReader::MessageReader(std::string_view filename)
+    : span_reader_(filename) {
+  // Make sure we have enough to read the size.
+  absl::Span<const uint8_t> config_data = span_reader_.ReadMessage();
+
+  // Make sure something was read.
+  CHECK(config_data != absl::Span<const uint8_t>());
+
+  // And copy the config so we have it forever.
+  configuration_ = std::vector<uint8_t>(config_data.begin(), config_data.end());
+
+  max_out_of_order_duration_ = std::chrono::nanoseconds(
+      flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+          ->max_out_of_order_duration());
+}
+
+std::optional<FlatbufferVector<MessageHeader>> MessageReader::ReadMessage() {
+  absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
+  if (msg_data == absl::Span<const uint8_t>()) {
+    return std::nullopt;
+  }
+
+  FlatbufferVector<MessageHeader> result{std::vector<uint8_t>(
+      msg_data.begin() + sizeof(flatbuffers::uoffset_t), msg_data.end())};
+
+  const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+      chrono::nanoseconds(result.message().monotonic_sent_time()));
+
+  newest_timestamp_ = std::max(newest_timestamp_, timestamp);
+  return result;
+}
+
+SortedMessageReader::SortedMessageReader(std::string_view filename)
+    : message_reader_(filename) {
+  channels_.resize(configuration()->channels()->size());
+
+  QueueMessages();
+}
+
+void SortedMessageReader::EmplaceDataBack(
+    FlatbufferVector<MessageHeader> &&new_data) {
+  const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+      chrono::nanoseconds(new_data.message().monotonic_sent_time()));
+  const size_t channel_index = new_data.message().channel_index();
+  CHECK_LT(channel_index, channels_.size());
+
+  if (channels_[channel_index].data.size() == 0) {
+    channels_[channel_index].oldest_timestamp = timestamp;
+    PushChannelHeap(timestamp, channel_index);
+  }
+  channels_[channel_index].data.emplace_back(std::move(new_data));
+}
+
+namespace {
+
+bool ChannelHeapCompare(
+    const std::pair<monotonic_clock::time_point, int> first,
+    const std::pair<monotonic_clock::time_point, int> second) {
+  if (first.first > second.first) {
+    return true;
+  } else if (first.first == second.first) {
+    return first.second > second.second;
+  } else {
+    return false;
+  }
+}
+
+}  // namespace
+
+void SortedMessageReader::PushChannelHeap(monotonic_clock::time_point timestamp,
+                                          int channel_index) {
+  channel_heap_.push_back(std::make_pair(timestamp, channel_index));
+
+  // The default sort puts the newest message first.  Use a custom comparator to
+  // put the oldest message first.
+  std::push_heap(channel_heap_.begin(), channel_heap_.end(),
+                 ChannelHeapCompare);
+}
+
+void SortedMessageReader::QueueMessages() {
+  while (true) {
+    // Don't queue if we have enough data already.
+    // When a log file starts, there should be a message from each channel.
+    // Those messages might be very old. Make sure to read a chunk past the
+    // starting time.
+    if (channel_heap_.size() > 0 &&
+        message_reader_.newest_timestamp() >
+            std::max(oldest_message().first, monotonic_start_time()) +
+                message_reader_.max_out_of_order_duration()) {
+      break;
+    }
+
+    if (std::optional<FlatbufferVector<MessageHeader>> msg =
+            message_reader_.ReadMessage()) {
+      EmplaceDataBack(std::move(msg.value()));
+    } else {
+      break;
+    }
+  }
+}
+
+std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
+SortedMessageReader::PopOldestChannel() {
+  std::pair<monotonic_clock::time_point, int> oldest_channel_data =
+      channel_heap_.front();
+  std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
+                &ChannelHeapCompare);
+  channel_heap_.pop_back();
+
+  struct ChannelData &channel = channels_[oldest_channel_data.second];
+
+  FlatbufferVector<MessageHeader> front = std::move(channel.front());
+
+  channel.data.pop_front();
+
+  // Re-push it and update the oldest timestamp.
+  if (channel.data.size() != 0) {
+    const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+        chrono::nanoseconds(channel.front().message().monotonic_sent_time()));
+    PushChannelHeap(timestamp, oldest_channel_data.second);
+    channel.oldest_timestamp = timestamp;
+  } else {
+    channel.oldest_timestamp = monotonic_clock::min_time;
+  }
+
+  if (oldest_channel_data.first > message_reader_.queue_data_time()) {
+    QueueMessages();
+  }
+
+  return std::make_tuple(oldest_channel_data.first, oldest_channel_data.second,
+                         std::move(front));
+}
+
 }  // namespace logger
 }  // namespace aos