Change LogReader API to be able to replace messages

The mutation API in LogReader was not able to express dropping messages,
or growing messages.  This enables more aggressive mutation.

Change-Id: I477482da4262483a780d15ebf8c98a51e37099f6
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index c01026f..30ed3d7 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1579,14 +1579,14 @@
   os << "{.channel_index=" << msg.channel_index
      << ", .queue_index=" << msg.queue_index
      << ", .timestamp=" << msg.timestamp;
-  if (msg.data != nullptr) {
-    if (msg.data->remote_queue_index.has_value()) {
-      os << ", .remote_queue_index=" << *msg.data->remote_queue_index;
+  if (msg.header != nullptr) {
+    if (msg.header->remote_queue_index.has_value()) {
+      os << ", .remote_queue_index=" << *msg.header->remote_queue_index;
     }
-    if (msg.data->monotonic_remote_time.has_value()) {
-      os << ", .monotonic_remote_time=" << *msg.data->monotonic_remote_time;
+    if (msg.header->monotonic_remote_time.has_value()) {
+      os << ", .monotonic_remote_time=" << *msg.header->monotonic_remote_time;
     }
-    os << ", .data=" << msg.data;
+    os << ", .header=" << msg.header;
   }
   os << "}";
   return os;
@@ -1614,7 +1614,7 @@
     os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time;
   }
   if (msg.data != nullptr) {
-    os << ", .data=" << *msg.data;
+    os << ", .data=" << msg.data.get();
   } else {
     os << ", .data=nullptr";
   }
@@ -1666,15 +1666,20 @@
         monotonic_remote_boot = *boot;
       }
 
-      messages_.insert(
-          Message{.channel_index = msg->channel_index,
-                  .queue_index = BootQueueIndex{.boot = parts().boot_count,
-                                                .index = msg->queue_index},
-                  .timestamp = BootTimestamp{.boot = parts().boot_count,
-                                             .time = msg->monotonic_sent_time},
-                  .monotonic_remote_boot = monotonic_remote_boot,
-                  .monotonic_timestamp_boot = monotonic_timestamp_boot,
-                  .data = std::move(msg)});
+      std::shared_ptr<SharedSpan> data =
+          std::make_shared<SharedSpan>(msg, &msg->span);
+
+      messages_.insert(Message{
+          .channel_index = msg->channel_index,
+          .queue_index = BootQueueIndex{.boot = parts().boot_count,
+                                        .index = msg->queue_index},
+          .timestamp = BootTimestamp{.boot = parts().boot_count,
+                                     .time = msg->monotonic_sent_time},
+          .monotonic_remote_boot = monotonic_remote_boot,
+          .monotonic_timestamp_boot = monotonic_timestamp_boot,
+          .header = std::move(msg),
+          .data = std::move(data),
+      });
 
       // Now, update sorted_until_ to match the new message.
       if (parts_message_reader_.newest_timestamp() >
@@ -1827,15 +1832,15 @@
     } else if (*msg == *oldest) {
       // Found a duplicate.  If there is a choice, we want the one which has
       // the timestamp time.
-      if (!msg->data->has_monotonic_timestamp_time) {
+      if (!msg->header->has_monotonic_timestamp_time) {
         message_sorter.PopFront();
-      } else if (!oldest->data->has_monotonic_timestamp_time) {
+      } else if (!oldest->header->has_monotonic_timestamp_time) {
         current_->PopFront();
         current_ = &message_sorter;
         oldest = msg;
       } else {
-        CHECK_EQ(msg->data->monotonic_timestamp_time,
-                 oldest->data->monotonic_timestamp_time);
+        CHECK_EQ(msg->header->monotonic_timestamp_time,
+                 oldest->header->monotonic_timestamp_time);
         message_sorter.PopFront();
       }
     }
@@ -2037,26 +2042,30 @@
     }
     CHECK_LT(msg->channel_index, source_node.size());
     if (source_node[msg->channel_index] != static_cast<size_t>(node())) {
-      timestamp_messages_.emplace_back(TimestampedMessage{
+      TimestampedMessage timestamped_message{
           .channel_index = msg->channel_index,
           .queue_index = msg->queue_index,
           .monotonic_event_time = msg->timestamp,
-          .realtime_event_time = msg->data->realtime_sent_time,
+          .realtime_event_time = msg->header->realtime_sent_time,
           .remote_queue_index =
               BootQueueIndex{.boot = msg->monotonic_remote_boot,
-                             .index = msg->data->remote_queue_index.value()},
+                             .index = msg->header->remote_queue_index.value()},
           .monotonic_remote_time = {msg->monotonic_remote_boot,
-                                    msg->data->monotonic_remote_time.value()},
-          .realtime_remote_time = msg->data->realtime_remote_time.value(),
+                                    msg->header->monotonic_remote_time.value()},
+          .realtime_remote_time = msg->header->realtime_remote_time.value(),
           .monotonic_remote_transmit_time =
               {msg->monotonic_remote_boot,
-               msg->data->monotonic_remote_transmit_time},
+               msg->header->monotonic_remote_transmit_time},
           .monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
-                                       msg->data->monotonic_timestamp_time},
-          .data = std::move(msg->data)});
+                                       msg->header->monotonic_timestamp_time},
+          .data = msg->data,
+      };
 
-      VLOG(2) << this << " Queued timestamp of " << timestamp_messages_.back();
-      fn(&timestamp_messages_.back());
+      fn(&timestamped_message);
+
+      VLOG(2) << this << " Queued timestamp of " << timestamped_message;
+
+      timestamp_messages_.emplace_back(std::move(*msg));
     } else {
       VLOG(2) << this << " Dropped data";
     }
@@ -2100,25 +2109,12 @@
     CHECK(queue_timestamps_ran_);
   }
 
-  // timestamp_messages_ is a queue of TimestampedMessage, but we are supposed
-  // to return a Message.  We need to convert the first message in the list
-  // before returning it (and comparing, honestly).  Fill next_timestamp_ in if
-  // it is empty so the rest of the logic here can just look at next_timestamp_
-  // and use that instead.
-  if (!next_timestamp_ && !timestamp_messages_.empty()) {
-    auto &front = timestamp_messages_.front();
-    next_timestamp_ = Message{
-        .channel_index = front.channel_index,
-        .queue_index = front.queue_index,
-        .timestamp = front.monotonic_event_time,
-        .monotonic_remote_boot = front.remote_queue_index.boot,
-        .monotonic_timestamp_boot = front.monotonic_timestamp_time.boot,
-        .data = std::move(front.data),
-    };
-    timestamp_messages_.pop_front();
+  const Message *timestamp_messages_front = nullptr;
+  if (!timestamp_messages_.empty()) {
+    timestamp_messages_front = &timestamp_messages_.front();
   }
 
-  if (!next_timestamp_) {
+  if (!timestamp_messages_front) {
     message_source_ = MessageSource::kBootMerger;
     if (boot_merger_front != nullptr) {
       VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
@@ -2134,15 +2130,15 @@
     message_source_ = MessageSource::kTimestampMessage;
 
     VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
-            << next_timestamp_.value();
-    return &next_timestamp_.value();
+            << *timestamp_messages_front;
+    return timestamp_messages_front;
   }
 
-  if (*boot_merger_front <= next_timestamp_.value()) {
-    if (*boot_merger_front == next_timestamp_.value()) {
+  if (*boot_merger_front <= *timestamp_messages_front) {
+    if (*boot_merger_front == *timestamp_messages_front) {
       VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
               << " Dropping duplicate timestamp.";
-      next_timestamp_.reset();
+      timestamp_messages_.pop_front();
     }
     message_source_ = MessageSource::kBootMerger;
     if (boot_merger_front != nullptr) {
@@ -2156,16 +2152,16 @@
   } else {
     message_source_ = MessageSource::kTimestampMessage;
     VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
-            << next_timestamp_.value();
-    return &next_timestamp_.value();
+            << *timestamp_messages_front;
+    return timestamp_messages_front;
   }
 }
 
 void SplitTimestampBootMerger::PopFront() {
   switch (message_source_) {
     case MessageSource::kTimestampMessage:
-      CHECK(next_timestamp_.has_value());
-      next_timestamp_.reset();
+      CHECK(!timestamp_messages_.empty());
+      timestamp_messages_.pop_front();
       break;
     case MessageSource::kBootMerger:
       boot_merger_.PopFront();
@@ -2246,7 +2242,7 @@
       .channel_index = msg->channel_index,
       .queue_index = msg->queue_index,
       .monotonic_event_time = msg->timestamp,
-      .realtime_event_time = msg->data->realtime_sent_time,
+      .realtime_event_time = msg->header->realtime_sent_time,
       .remote_queue_index = BootQueueIndex::Invalid(),
       .monotonic_remote_time = BootTimestamp::min_time(),
       .realtime_remote_time = realtime_clock::min_time,
@@ -2368,18 +2364,18 @@
         .channel_index = msg->channel_index,
         .queue_index = msg->queue_index,
         .monotonic_event_time = msg->timestamp,
-        .realtime_event_time = msg->data->realtime_sent_time,
+        .realtime_event_time = msg->header->realtime_sent_time,
         .remote_queue_index =
             BootQueueIndex{.boot = msg->monotonic_remote_boot,
-                           .index = msg->data->remote_queue_index.value()},
+                           .index = msg->header->remote_queue_index.value()},
         .monotonic_remote_time = {msg->monotonic_remote_boot,
-                                  msg->data->monotonic_remote_time.value()},
-        .realtime_remote_time = msg->data->realtime_remote_time.value(),
+                                  msg->header->monotonic_remote_time.value()},
+        .realtime_remote_time = msg->header->realtime_remote_time.value(),
         .monotonic_remote_transmit_time =
             {msg->monotonic_remote_boot,
-             msg->data->monotonic_remote_transmit_time},
+             msg->header->monotonic_remote_transmit_time},
         .monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
-                                     msg->data->monotonic_timestamp_time},
+                                     msg->header->monotonic_timestamp_time},
         .data = std::move(data.data)});
     VLOG(1) << node_name() << " Inserted timestamp "
             << matched_messages_.back();
@@ -2446,23 +2442,23 @@
 
 Message TimestampMapper::MatchingMessageFor(const Message &message) {
   // Figure out what queue index we are looking for.
-  CHECK_NOTNULL(message.data);
-  CHECK(message.data->remote_queue_index.has_value());
+  CHECK_NOTNULL(message.header);
+  CHECK(message.header->remote_queue_index.has_value());
   const BootQueueIndex remote_queue_index =
       BootQueueIndex{.boot = message.monotonic_remote_boot,
-                     .index = *message.data->remote_queue_index};
+                     .index = *message.header->remote_queue_index};
 
-  CHECK(message.data->monotonic_remote_time.has_value());
-  CHECK(message.data->realtime_remote_time.has_value());
+  CHECK(message.header->monotonic_remote_time.has_value());
+  CHECK(message.header->realtime_remote_time.has_value());
 
   const BootTimestamp monotonic_remote_time{
       .boot = message.monotonic_remote_boot,
-      .time = message.data->monotonic_remote_time.value()};
+      .time = message.header->monotonic_remote_time.value()};
   const realtime_clock::time_point realtime_remote_time =
-      *message.data->realtime_remote_time;
+      *message.header->realtime_remote_time;
 
   TimestampMapper *peer =
-      nodes_data_[source_node_[message.data->channel_index]].peer;
+      nodes_data_[source_node_[message.header->channel_index]].peer;
 
   // We only register the peers which we have data for.  So, if we are being
   // asked to pull a timestamp from a peer which doesn't exist, return an
@@ -2475,6 +2471,7 @@
                    .timestamp = monotonic_remote_time,
                    .monotonic_remote_boot = 0xffffff,
                    .monotonic_timestamp_boot = 0xffffff,
+                   .header = nullptr,
                    .data = nullptr};
   }
 
@@ -2490,6 +2487,7 @@
                    .timestamp = monotonic_remote_time,
                    .monotonic_remote_boot = 0xffffff,
                    .monotonic_timestamp_boot = 0xffffff,
+                   .header = nullptr,
                    .data = nullptr};
   }
 
@@ -2500,6 +2498,7 @@
                    .timestamp = monotonic_remote_time,
                    .monotonic_remote_boot = 0xffffff,
                    .monotonic_timestamp_boot = 0xffffff,
+                   .header = nullptr,
                    .data = nullptr};
   }
 
@@ -2520,7 +2519,7 @@
 
     CHECK_EQ(result.timestamp, monotonic_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please investigate!";
-    CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
+    CHECK_EQ(result.header->realtime_sent_time, realtime_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please investigate!";
     // Now drop the data off the front.  We have deduplicated timestamps, so we
     // are done.  And all the data is in order.
@@ -2544,6 +2543,7 @@
                      .timestamp = monotonic_remote_time,
                      .monotonic_remote_boot = 0xffffff,
                      .monotonic_timestamp_boot = 0xffffff,
+                     .header = nullptr,
                      .data = nullptr};
     }
 
@@ -2552,7 +2552,7 @@
     CHECK_EQ(result.timestamp, monotonic_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please "
            "investigate!";
-    CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
+    CHECK_EQ(result.header->realtime_sent_time, realtime_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please "
            "investigate!";