Move log file manipulation logic out of LogReader

It really had 4 separate layers that should each have been a class.
Split them out in perparation for multi-file file logs.

1) Read chunks of data from a file
2) Read the header and messages from a file.
3) Sort those messages
4) And then send them over the event loop.

Change-Id: Ib885e6f0ed027851a4d7faea71b9391c1b60cf19
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e44f65f..a35b453 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -3,8 +3,8 @@
 
 #include <deque>
 #include <vector>
+#include <string_view>
 
-#include "absl/strings/string_view.h"
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/logfile_utils.h"
@@ -59,7 +59,7 @@
 // Replays all the channels in the logfile to the event loop.
 class LogReader {
  public:
-  LogReader(absl::string_view filename);
+  LogReader(std::string_view filename);
   ~LogReader();
 
   // Registers the timer and senders used to resend the messages from the log
@@ -89,143 +89,18 @@
   // happened.
 
  private:
-  // Reads a chunk of data into data_.  Returns false if no data was read.
-  bool ReadBlock();
-
-  // Returns true if there is a full message available in the buffer, or if we
-  // will have to read more data from disk.
-  bool MessageAvailable();
-
-  // Returns a span with the data for a message from the log file, excluding
-  // the size.
-  absl::Span<const uint8_t> ReadMessage();
-
   // Queues at least max_out_of_order_duration_ messages into channels_.
   void QueueMessages();
 
-  // We need to read a large chunk at a time, then kit it up into parts and
-  // sort.
-  //
-  // We want to read 256 KB chunks at a time.  This is the fastest read size.
-  // This leaves us with a fragmentation problem though.
-  //
-  // The easy answer is to read 256 KB chunks.  Then, malloc and memcpy those
-  // chunks into single flatbuffer messages and manage them in a sorted queue.
-  // Everything is copied three times (into 256 kb buffer, then into separate
-  // buffer, then into sender), but none of it is all that expensive.  We can
-  // optimize if it is slow later.
-  //
-  // As we place the elements in the sorted list of times, keep doing this
-  // until we read a message that is newer than the threshold.
-  //
-  // Then repeat.  Keep filling up the sorted list with 256 KB chunks (need a
-  // small state machine so we can resume), and keep pulling messages back out
-  // and sending.
-  //
-  // For sorting, we want to use the fact that each channel is sorted, and
-  // then merge sort the channels.  Have a vector of deques, and then hold a
-  // sorted list of pointers to those.
-  //
-  // TODO(austin): Multithreaded read at some point.  Gotta go faster!
-  // Especially if we start compressing.
+  // Log chunk reader.
+  SortedMessageReader sorted_message_reader_;
 
-  // Allocator which doesn't zero initialize memory.
-  template <typename T>
-  struct DefaultInitAllocator {
-    typedef T value_type;
-
-    template <typename U>
-    void construct(U *p) {
-      ::new (static_cast<void *>(p)) U;
-    }
-
-    template <typename U, typename... Args>
-    void construct(U *p, Args &&... args) {
-      ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
-    }
-
-    T *allocate(std::size_t n) {
-      return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
-    }
-
-    template <typename U>
-    void deallocate(U *p, std::size_t /*n*/) {
-      ::operator delete(static_cast<void *>(p));
-    }
-  };
-
-  // Minimum amount of data to queue up for sorting before we are guarenteed
-  // to not see data out of order.
-  std::chrono::nanoseconds max_out_of_order_duration_;
-
-  // File descriptor for the log file.
-  int fd_ = -1;
+  std::vector<std::unique_ptr<RawSender>> channels_;
 
   SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
   std::unique_ptr<EventLoop> event_loop_unique_ptr_;
   EventLoop *event_loop_ = nullptr;
   TimerHandler *timer_handler_;
-
-  // Vector to read into.  This uses an allocator which doesn't zero
-  // initialize the memory.
-  std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
-
-  // Amount of data consumed already in data_.
-  size_t consumed_data_ = 0;
-
-  // Vector holding the data for the configuration.
-  std::vector<uint8_t> configuration_;
-
-  // Moves the message to the correct channel queue.
-  void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
-
-  // Pushes a pointer to the channel for the given timestamp to the sorted
-  // channel list.
-  void PushChannelHeap(monotonic_clock::time_point timestamp,
-                       int channel_index);
-
-  // Returns a pointer to the channel with the oldest message in it, and the
-  // timestamp.
-  const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
-    return channel_heap_.front();
-  }
-
-  // Pops a pointer to the channel with the oldest message in it, and the
-  // timestamp.
-  std::pair<monotonic_clock::time_point, int> PopOldestChannel();
-
-  // Datastructure to hold the list of messages, cached timestamp for the
-  // oldest message, and sender to send with.
-  struct ChannelData {
-    monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
-    std::deque<FlatbufferVector<MessageHeader>> data;
-    std::unique_ptr<RawSender> raw_sender;
-
-    // Returns the oldest message.
-    const FlatbufferVector<MessageHeader> &front() { return data.front(); }
-
-    // Returns the timestamp for the oldest message.
-    const monotonic_clock::time_point front_timestamp() {
-      return monotonic_clock::time_point(
-          std::chrono::nanoseconds(front().message().monotonic_sent_time()));
-    }
-  };
-
-  // List of channels and messages for them.
-  std::vector<ChannelData> channels_;
-
-  // Heap of channels so we can track which channel to send next.
-  std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
-
-  // Timestamp of the newest message in a channel queue.
-  monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
-
-  // The time at which we need to read another chunk from the logfile.
-  monotonic_clock::time_point queue_data_time_ = monotonic_clock::min_time;
-
-  // Cached bit for if we have reached the end of the file.  Otherwise we will
-  // hammer on the kernel asking for more data each time we send.
-  bool end_of_file_ = false;
 };
 
 }  // namespace logger