Fix log sorting for good

Unfortunately, this is really hard to split up, so a bunch of stuff
happens at once.

When a message is sent from node A -> node B, add support for sending
that timestamp back from node B to node A for logging.

    {
      "name": "/pi1/aos",
      "type": "aos.message_bridge.Timestamp",
      "source_node": "pi1",
      "frequency": 10,
      "max_size": 200,
      "destination_nodes": [
        {
          "name": "pi2",
          "priority": 1,
          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
          "timestamp_logger_nodes": ["pi1"]
        }
      ]
    },

This gives us a way to log enough information on node A such that
everything is self contained.  We know all the messages we sent to B,
and when they got there, so we can recreate the time offset and replay
the node.

This data is then published over
   { "name": "/aos/remote_timestamps/pi2", "type": ".aos.logger.MessageHeader"}

The logger then treats that channel specially and log the contents
directly as though the message contents were received on the remote
node.

This (among other things) exposes log sorting problems.  Use our fancy
new infinite precision noncausal filter to estimate time precise enough
to actually order events.  This gets us down to 2-3 ns of error due to
integer precision.

Change-Id: Ia843c5176a2c4efc227e669c07d7bb4c7cbe7c91
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 2b08b59..d1fbeb2 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -41,13 +41,20 @@
 class DetachedBufferWriter {
  public:
   DetachedBufferWriter(std::string_view filename);
+  DetachedBufferWriter(DetachedBufferWriter &&other);
+  DetachedBufferWriter(const DetachedBufferWriter &) = delete;
+
   ~DetachedBufferWriter();
 
-  DetachedBufferWriter(const DetachedBufferWriter &) = delete;
+  DetachedBufferWriter &operator=(DetachedBufferWriter &&other);
   DetachedBufferWriter &operator=(const DetachedBufferWriter &) = delete;
 
   std::string_view filename() const { return filename_; }
 
+  // Rewrites a location in a file (relative to the start) to have new data in
+  // it.  The main use case is updating start times after a log file starts.
+  void RewriteLocation(off64_t offset, absl::Span<const uint8_t> data);
+
   // TODO(austin): Snappy compress the log file if it ends with .snappy!
 
   // Queues up a finished FlatBufferBuilder to be written.  Steals the detached
@@ -68,7 +75,7 @@
   size_t total_size() const { return written_size_ + queued_size_; }
 
  private:
-  const std::string filename_;
+  std::string filename_;
 
   int fd_ = -1;
 
@@ -236,15 +243,15 @@
   void SetTimestampMerger(TimestampMerger *timestamp_merger, int channel,
                           const Node *target_node);
 
-  // Returns the (timestamp, queue_idex) for the oldest message in a channel, or
-  // max_time if there is nothing in the channel.
+  // Returns the (timestamp, queue_index, message_header) for the oldest message
+  // in a channel, or max_time if there is nothing in the channel.
   std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
   oldest_message(int channel) {
     return channels_[channel].data.front_timestamp();
   }
 
-  // Returns the (timestamp, queue_index) for the oldest delivery time in a
-  // channel, or max_time if there is nothing in the channel.
+  // Returns the (timestamp, queue_index, message_header) for the oldest
+  // delivery time in a channel, or max_time if there is nothing in the channel.
   std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
   oldest_message(int channel, int destination_node) {
     return channels_[channel].timestamps[destination_node].front_timestamp();
@@ -260,7 +267,7 @@
   // a channel delivered to a node.  Requeues data as needed.
   std::tuple<monotonic_clock::time_point, uint32_t,
              FlatbufferVector<MessageHeader>>
-  PopOldest(int channel, int node_index);
+  PopOldestTimestamp(int channel, int node_index);
 
   // Returns the header for the log files.
   const LogFileHeader *log_file_header() const {
@@ -367,7 +374,7 @@
     bool emplace_back(FlatbufferVector<MessageHeader> &&msg);
 
     // Drops the front message.  Invalidates the front() reference.
-    void pop_front();
+    void PopFront();
 
     // The size of the queue.
     size_t size() { return data_.size(); }
@@ -375,15 +382,16 @@
     // Returns a debug string with info about each message in the queue.
     std::string DebugString() const;
 
-    // Returns the (timestamp, queue_index) for the oldest message.
+    // Returns the (timestamp, queue_index, message_header) for the oldest
+    // message.
     const std::tuple<monotonic_clock::time_point, uint32_t,
                      const MessageHeader *>
     front_timestamp() {
-      CHECK_GT(data_.size(), 0u);
+      const MessageHeader &message = front().message();
       return std::make_tuple(
-          monotonic_clock::time_point(std::chrono::nanoseconds(
-              front().message().monotonic_sent_time())),
-          front().message().queue_index(), &front().message());
+          monotonic_clock::time_point(
+              std::chrono::nanoseconds(message.monotonic_sent_time())),
+          message.queue_index(), &message);
     }
 
     // Pointer to the timestamp merger for this queue if available.
@@ -471,9 +479,6 @@
   // The caller can determine what the appropriate action is to recover.
   std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
 
-  // Returns the oldest forwarding timestamp.
-  DeliveryTimestamp OldestTimestamp() const;
-
   // Tracks if the channel merger has pushed this onto it's heap or not.
   bool pushed() { return pushed_; }
   // Sets if this has been pushed to the channel merger heap.  Should only be
@@ -490,6 +495,14 @@
   // called by a SplitMessageReader.
   void NoticeAtEnd();
 
+  aos::monotonic_clock::time_point channel_merger_time() {
+    if (has_timestamps_) {
+      return std::get<0>(timestamp_heap_[0]);
+    } else {
+      return std::get<0>(message_heap_[0]);
+    }
+  }
+
  private:
   // Pushes messages and timestamps to the corresponding heaps.
   void PushMessageHeap(
@@ -576,12 +589,6 @@
              FlatbufferVector<MessageHeader>>
   PopOldest();
 
-  // Returns the oldest timestamp in the timestamp heap.
-  TimestampMerger::DeliveryTimestamp OldestTimestamp() const;
-  // Returns the oldest timestamp in the timestamp heap for a specific channel.
-  TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
-      int channel) const;
-
   // Returns the config for this set of log files.
   const Configuration *configuration() const {
     return log_file_header()->configuration();
@@ -628,6 +635,9 @@
   void PushChannelHeap(monotonic_clock::time_point timestamp,
                        int channel_index);
 
+  // CHECKs that channel_heap_ and timestamp_heap_ are valid heaps.
+  void VerifyHeaps();
+
   // All the message readers.
   std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
 
@@ -641,6 +651,7 @@
   std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
   // A heap of just the timestamp channel readers and timestamps for the oldest
   // data in each.
+  // TODO(austin): I think this is no longer used and can be removed (!)
   std::vector<std::pair<monotonic_clock::time_point, int>> timestamp_heap_;
 
   // Configured node.
@@ -650,6 +661,9 @@
 
   // Cached copy of the list of nodes.
   std::vector<const Node *> nodes_;
+
+  // Last time popped.  Used to detect events being returned out of order.
+  monotonic_clock::time_point last_popped_time_ = monotonic_clock::min_time;
 };
 
 // Returns the node name with a trailing space, or an empty string if we are on