Optimize simulated event loop log reading.

Currenly, we make 2 copies of a message before it is sent in
simulation: First, from the source (file or lzma decoder) to a
flatbuffer vector, then to the simulated message. By aligning the
data part of a MessageHeader in the first copy from the source,
we don't need to copy it again. This is all tracked with
shared_ptr's so the lifetime management is easy.

Change-Id: I82c86ef3f9662d4c615dc57862fa89b1b9981ed4
Signed-off-by: Tyler Chatow <tchatow@gmail.com>
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 73578e6..dd82418 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -43,9 +43,20 @@
             "last header as the actual header.");
 
 namespace aos::logger {
+namespace {
 
 namespace chrono = std::chrono;
 
+template <typename T>
+void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
+  if (t.has_value()) {
+    *os << *t;
+  } else {
+    *os << "null";
+  }
+}
+}  // namespace
+
 DetachedBufferWriter::DetachedBufferWriter(
     std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
     : filename_(filename), encoder_(std::move(encoder)) {
@@ -499,23 +510,81 @@
           << FlatbufferToJson(log_file_header()->node());
 }
 
-std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
-MessageReader::ReadMessage() {
+std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
   absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
   if (msg_data == absl::Span<const uint8_t>()) {
-    return std::nullopt;
+    return nullptr;
   }
 
-  SizePrefixedFlatbufferVector<MessageHeader> result(msg_data);
+  SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
+  CHECK(msg.Verify()) << ": Corrupted message from " << filename();
 
-  CHECK(result.Verify()) << ": Corrupted message from " << filename();
+  auto result = UnpackedMessageHeader::MakeMessage(msg.message());
 
-  const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
-      chrono::nanoseconds(result.message().monotonic_sent_time()));
+  const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
 
   newest_timestamp_ = std::max(newest_timestamp_, timestamp);
-  VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(result);
-  return std::move(result);
+  VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
+  return result;
+}
+
+std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
+    const MessageHeader &message) {
+  const size_t data_size = message.has_data() ? message.data()->size() : 0;
+
+  UnpackedMessageHeader *const unpacked_message =
+      reinterpret_cast<UnpackedMessageHeader *>(
+          malloc(sizeof(UnpackedMessageHeader) + data_size +
+                 kChannelDataAlignment - 1));
+
+  CHECK(message.has_channel_index());
+  CHECK(message.has_monotonic_sent_time());
+
+  absl::Span<uint8_t> span;
+  if (data_size > 0) {
+    span =
+        absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
+                                &unpacked_message->actual_data[0], data_size)),
+                            data_size);
+  }
+
+  std::optional<std::chrono::nanoseconds> monotonic_remote_time;
+  if (message.has_monotonic_remote_time()) {
+    monotonic_remote_time =
+        std::chrono::nanoseconds(message.monotonic_remote_time());
+  }
+  std::optional<realtime_clock::time_point> realtime_remote_time;
+  if (message.has_realtime_remote_time()) {
+    realtime_remote_time = realtime_clock::time_point(
+        chrono::nanoseconds(message.realtime_remote_time()));
+  }
+
+  std::optional<uint32_t> remote_queue_index;
+  if (message.has_remote_queue_index()) {
+    remote_queue_index = message.remote_queue_index();
+  }
+
+  new (unpacked_message) UnpackedMessageHeader{
+      .channel_index = message.channel_index(),
+      .monotonic_sent_time = monotonic_clock::time_point(
+          chrono::nanoseconds(message.monotonic_sent_time())),
+      .realtime_sent_time = realtime_clock::time_point(
+          chrono::nanoseconds(message.realtime_sent_time())),
+      .queue_index = message.queue_index(),
+      .monotonic_remote_time = monotonic_remote_time,
+      .realtime_remote_time = realtime_remote_time,
+      .remote_queue_index = remote_queue_index,
+      .monotonic_timestamp_time = monotonic_clock::time_point(
+          std::chrono::nanoseconds(message.monotonic_timestamp_time())),
+      .has_monotonic_timestamp_time = message.has_monotonic_timestamp_time(),
+      .span = span};
+
+  if (data_size > 0) {
+    memcpy(span.data(), message.data()->data(), data_size);
+  }
+
+  return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
+                                                &DestroyAndFree);
 }
 
 PartsMessageReader::PartsMessageReader(LogParts log_parts)
@@ -569,19 +638,19 @@
   }
 }
 
-std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
-PartsMessageReader::ReadMessage() {
+std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
   while (!done_) {
-    std::optional<SizePrefixedFlatbufferVector<MessageHeader>> message =
+    std::shared_ptr<UnpackedMessageHeader> message =
         message_reader_.ReadMessage();
     if (message) {
       newest_timestamp_ = message_reader_.newest_timestamp();
-      const monotonic_clock::time_point monotonic_sent_time(
-          chrono::nanoseconds(message->message().monotonic_sent_time()));
-      // TODO(austin): Does this work with startup?  Might need to use the start
-      // time.
-      // TODO(austin): Does this work with startup when we don't know the remote
-      // start time too?  Look at one of those logs to compare.
+      const monotonic_clock::time_point monotonic_sent_time =
+          message->monotonic_sent_time;
+
+      // TODO(austin): Does this work with startup?  Might need to use the
+      // start time.
+      // TODO(austin): Does this work with startup when we don't know the
+      // remote start time too?  Look at one of those logs to compare.
       if (monotonic_sent_time >
           parts_.monotonic_start_time + max_out_of_order_duration()) {
         after_start_ = true;
@@ -599,7 +668,7 @@
     NextLog();
   }
   newest_timestamp_ = monotonic_clock::max_time;
-  return std::nullopt;
+  return nullptr;
 }
 
 void PartsMessageReader::NextLog() {
@@ -645,13 +714,29 @@
          channel_index == m2.channel_index && queue_index == m2.queue_index;
 }
 
+std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
+  os << "{.channel_index=" << m.channel_index
+     << ", .monotonic_sent_time=" << m.monotonic_sent_time
+     << ", .realtime_sent_time=" << m.realtime_sent_time
+     << ", .queue_index=" << m.queue_index;
+  if (m.monotonic_remote_time) {
+    os << ", .monotonic_remote_time=" << m.monotonic_remote_time->count();
+  }
+  os << ", .realtime_remote_time=";
+  PrintOptionalOrNull(&os, m.realtime_remote_time);
+  os << ", .remote_queue_index=";
+  PrintOptionalOrNull(&os, m.remote_queue_index);
+  if (m.has_monotonic_timestamp_time) {
+    os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
+  }
+  return os;
+}
+
 std::ostream &operator<<(std::ostream &os, const Message &m) {
   os << "{.channel_index=" << m.channel_index
      << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
-  if (m.data.Verify()) {
-    os << ", .data="
-       << aos::FlatbufferToJson(m.data,
-                                {.multi_line = false, .max_vector_size = 1});
+  if (m.data != nullptr) {
+    os << ", .data=" << m;
   }
   os << "}";
   return os;
@@ -674,10 +759,8 @@
   if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
     os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
   }
-  if (m.data.Verify()) {
-    os << ", .data="
-       << aos::FlatbufferToJson(m.data,
-                                {.multi_line = false, .max_vector_size = 1});
+  if (m.data != nullptr) {
+    os << ", .data=" << *m.data;
   }
   os << "}";
   return os;
@@ -700,7 +783,7 @@
         break;
       }
 
-      std::optional<SizePrefixedFlatbufferVector<MessageHeader>> m =
+      std::shared_ptr<UnpackedMessageHeader> m =
           parts_message_reader_.ReadMessage();
       // No data left, sorted forever, work through what is left.
       if (!m) {
@@ -709,36 +792,32 @@
       }
 
       size_t monotonic_timestamp_boot = 0;
-      if (m.value().message().has_monotonic_timestamp_time()) {
+      if (m->has_monotonic_timestamp_time) {
         monotonic_timestamp_boot = parts().logger_boot_count;
       }
       size_t monotonic_remote_boot = 0xffffff;
 
-      if (m.value().message().has_monotonic_remote_time()) {
+      if (m->monotonic_remote_time.has_value()) {
         const Node *node = parts().config->nodes()->Get(
-            source_node_index_[m->message().channel_index()]);
+            source_node_index_[m->channel_index]);
 
         std::optional<size_t> boot = parts_message_reader_.boot_count(
-            source_node_index_[m->message().channel_index()]);
+            source_node_index_[m->channel_index]);
         CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
                     << ", with index "
-                    << source_node_index_[m->message().channel_index()];
+                    << source_node_index_[m->channel_index];
         monotonic_remote_boot = *boot;
       }
 
-      messages_.insert(Message{
-          .channel_index = m.value().message().channel_index(),
-          .queue_index =
-              BootQueueIndex{.boot = parts().boot_count,
-                             .index = m.value().message().queue_index()},
-          .timestamp =
-              BootTimestamp{
-                  .boot = parts().boot_count,
-                  .time = monotonic_clock::time_point(std::chrono::nanoseconds(
-                      m.value().message().monotonic_sent_time()))},
-          .monotonic_remote_boot = monotonic_remote_boot,
-          .monotonic_timestamp_boot = monotonic_timestamp_boot,
-          .data = std::move(m.value())});
+      messages_.insert(
+          Message{.channel_index = m->channel_index,
+                  .queue_index = BootQueueIndex{.boot = parts().boot_count,
+                                                .index = m->queue_index},
+                  .timestamp = BootTimestamp{.boot = parts().boot_count,
+                                             .time = m->monotonic_sent_time},
+                  .monotonic_remote_boot = monotonic_remote_boot,
+                  .monotonic_timestamp_boot = monotonic_timestamp_boot,
+                  .data = std::move(m)});
 
       // Now, update sorted_until_ to match the new message.
       if (parts_message_reader_.newest_timestamp() >
@@ -878,17 +957,17 @@
       oldest = m;
       current_ = &parts_sorter;
     } else if (*m == *oldest) {
-      // Found a duplicate.  If there is a choice, we want the one which has the
-      // timestamp time.
-      if (!m->data.message().has_monotonic_timestamp_time()) {
+      // Found a duplicate.  If there is a choice, we want the one which has
+      // the timestamp time.
+      if (!m->data->has_monotonic_timestamp_time) {
         parts_sorter.PopFront();
-      } else if (!oldest->data.message().has_monotonic_timestamp_time()) {
+      } else if (!oldest->data->has_monotonic_timestamp_time) {
         current_->PopFront();
         current_ = &parts_sorter;
         oldest = m;
       } else {
-        CHECK_EQ(m->data.message().monotonic_timestamp_time(),
-                 oldest->data.message().monotonic_timestamp_time());
+        CHECK_EQ(m->data->monotonic_timestamp_time,
+                 oldest->data->monotonic_timestamp_time);
         parts_sorter.PopFront();
       }
     }
@@ -1017,7 +1096,7 @@
   CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
 
   NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
-  // Only set it if this node delivers to the peer timestamp_mapper.  Otherwise
+  // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
   // we could needlessly save data.
   if (node_data->any_delivered) {
     VLOG(1) << "Registering on node " << node() << " for peer node "
@@ -1035,8 +1114,7 @@
       .channel_index = m->channel_index,
       .queue_index = m->queue_index,
       .monotonic_event_time = m->timestamp,
-      .realtime_event_time = aos::realtime_clock::time_point(
-          std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+      .realtime_event_time = m->data->realtime_sent_time,
       .remote_queue_index = BootQueueIndex::Invalid(),
       .monotonic_remote_time = BootTimestamp::min_time(),
       .realtime_remote_time = realtime_clock::min_time,
@@ -1086,16 +1164,17 @@
     return true;
   }
 
-  // We need to only add messages to the list so they get processed for messages
-  // which are delivered.  Reuse the flow below which uses messages_ by just
-  // adding the new message to messages_ and continuing.
+  // We need to only add messages to the list so they get processed for
+  // messages which are delivered.  Reuse the flow below which uses messages_
+  // by just adding the new message to messages_ and continuing.
   if (messages_.empty()) {
     if (!Queue()) {
       // Found nothing to add, we are out of data!
       return false;
     }
 
-    // Now that it has been added (and cannibalized), forget about it upstream.
+    // Now that it has been added (and cannibalized), forget about it
+    // upstream.
     boot_merger_.PopFront();
   }
 
@@ -1110,7 +1189,8 @@
     timestamp_callback_(&matched_messages_.back());
     return true;
   } else {
-    // Got a timestamp, find the matching remote data, match it, and return it.
+    // Got a timestamp, find the matching remote data, match it, and return
+    // it.
     Message data = MatchingMessageFor(*m);
 
     // Return the data from the remote.  The local message only has timestamp
@@ -1119,21 +1199,16 @@
         .channel_index = m->channel_index,
         .queue_index = m->queue_index,
         .monotonic_event_time = m->timestamp,
-        .realtime_event_time = aos::realtime_clock::time_point(
-            std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+        .realtime_event_time = m->data->realtime_sent_time,
         .remote_queue_index =
             BootQueueIndex{.boot = m->monotonic_remote_boot,
-                           .index = m->data.message().remote_queue_index()},
-        .monotonic_remote_time =
-            {m->monotonic_remote_boot,
-             monotonic_clock::time_point(std::chrono::nanoseconds(
-                 m->data.message().monotonic_remote_time()))},
-        .realtime_remote_time = realtime_clock::time_point(
-            std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
-        .monotonic_timestamp_time =
-            {m->monotonic_timestamp_boot,
-             monotonic_clock::time_point(std::chrono::nanoseconds(
-                 m->data.message().monotonic_timestamp_time()))},
+                           .index = m->data->remote_queue_index.value()},
+        .monotonic_remote_time = {m->monotonic_remote_boot,
+                                  monotonic_clock::time_point(
+                                      m->data->monotonic_remote_time.value())},
+        .realtime_remote_time = m->data->realtime_remote_time.value(),
+        .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
+                                     m->data->monotonic_timestamp_time},
         .data = std::move(data.data)});
     CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
     last_message_time_ = matched_messages_.back().monotonic_event_time;
@@ -1153,8 +1228,9 @@
 }
 
 void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
-  // Note: queueing for time doesn't really work well across boots.  So we just
-  // assume that if you are using this, you only care about the current boot.
+  // Note: queueing for time doesn't really work well across boots.  So we
+  // just assume that if you are using this, you only care about the current
+  // boot.
   //
   // TODO(austin): Is that the right concept?
   //
@@ -1191,36 +1267,37 @@
 
 Message TimestampMapper::MatchingMessageFor(const Message &message) {
   // Figure out what queue index we are looking for.
-  CHECK(message.data.message().has_remote_queue_index());
+  CHECK_NOTNULL(message.data);
+  CHECK(message.data->remote_queue_index.has_value());
   const BootQueueIndex remote_queue_index =
       BootQueueIndex{.boot = message.monotonic_remote_boot,
-                     .index = message.data.message().remote_queue_index()};
+                     .index = *message.data->remote_queue_index};
 
-  CHECK(message.data.message().has_monotonic_remote_time());
-  CHECK(message.data.message().has_realtime_remote_time());
+  CHECK(message.data->monotonic_remote_time.has_value());
+  CHECK(message.data->realtime_remote_time.has_value());
 
   const BootTimestamp monotonic_remote_time{
       .boot = message.monotonic_remote_boot,
-      .time = monotonic_clock::time_point(std::chrono::nanoseconds(
-          message.data.message().monotonic_remote_time()))};
-  const realtime_clock::time_point realtime_remote_time(
-      std::chrono::nanoseconds(message.data.message().realtime_remote_time()));
+      .time = monotonic_clock::time_point(
+          message.data->monotonic_remote_time.value())};
+  const realtime_clock::time_point realtime_remote_time =
+      *message.data->realtime_remote_time;
 
-  TimestampMapper *peer = nodes_data_[source_node_[message.channel_index]].peer;
+  TimestampMapper *peer =
+      nodes_data_[source_node_[message.data->channel_index]].peer;
 
   // We only register the peers which we have data for.  So, if we are being
-  // asked to pull a timestamp from a peer which doesn't exist, return an empty
-  // message.
+  // asked to pull a timestamp from a peer which doesn't exist, return an
+  // empty message.
   if (peer == nullptr) {
     // TODO(austin): Make sure the tests hit all these paths with a boot count
     // of 1...
-    return Message{
-        .channel_index = message.channel_index,
-        .queue_index = remote_queue_index,
-        .timestamp = monotonic_remote_time,
-        .monotonic_remote_boot = 0xffffff,
-        .monotonic_timestamp_boot = 0xffffff,
-        .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+    return Message{.channel_index = message.channel_index,
+                   .queue_index = remote_queue_index,
+                   .timestamp = monotonic_remote_time,
+                   .monotonic_remote_boot = 0xffffff,
+                   .monotonic_timestamp_boot = 0xffffff,
+                   .data = nullptr};
   }
 
   // The queue which will have the matching data, if available.
@@ -1230,13 +1307,12 @@
   peer->QueueUnmatchedUntil(monotonic_remote_time);
 
   if (data_queue->empty()) {
-    return Message{
-        .channel_index = message.channel_index,
-        .queue_index = remote_queue_index,
-        .timestamp = monotonic_remote_time,
-        .monotonic_remote_boot = 0xffffff,
-        .monotonic_timestamp_boot = 0xffffff,
-        .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+    return Message{.channel_index = message.channel_index,
+                   .queue_index = remote_queue_index,
+                   .timestamp = monotonic_remote_time,
+                   .monotonic_remote_boot = 0xffffff,
+                   .monotonic_timestamp_boot = 0xffffff,
+                   .data = nullptr};
   }
 
   if (remote_queue_index < data_queue->front().queue_index ||
@@ -1247,7 +1323,7 @@
         .timestamp = monotonic_remote_time,
         .monotonic_remote_boot = 0xffffff,
         .monotonic_timestamp_boot = 0xffffff,
-        .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+        .data = nullptr};
   }
 
   // The algorithm below is constant time with some assumptions.  We need there
@@ -1267,8 +1343,7 @@
 
     CHECK_EQ(result.timestamp, monotonic_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please investigate!";
-    CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
-                 result.data.message().realtime_sent_time())),
+    CHECK_EQ(result.data->realtime_sent_time,
              realtime_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please investigate!";
     // Now drop the data off the front.  We have deduplicated timestamps, so we
@@ -1288,23 +1363,22 @@
                  m.timestamp.boot == remote_boot;
         });
     if (it == data_queue->end()) {
-      return Message{
-          .channel_index = message.channel_index,
-          .queue_index = remote_queue_index,
-          .timestamp = monotonic_remote_time,
-          .monotonic_remote_boot = 0xffffff,
-          .monotonic_timestamp_boot = 0xffffff,
-          .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+      return Message{.channel_index = message.channel_index,
+                     .queue_index = remote_queue_index,
+                     .timestamp = monotonic_remote_time,
+                     .monotonic_remote_boot = 0xffffff,
+                     .monotonic_timestamp_boot = 0xffffff,
+                     .data = nullptr};
     }
 
     Message result = std::move(*it);
 
     CHECK_EQ(result.timestamp, monotonic_remote_time)
-        << ": Queue index matches, but timestamp doesn't.  Please investigate!";
-    CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
-                 result.data.message().realtime_sent_time())),
-             realtime_remote_time)
-        << ": Queue index matches, but timestamp doesn't.  Please investigate!";
+        << ": Queue index matches, but timestamp doesn't.  Please "
+           "investigate!";
+    CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
+        << ": Queue index matches, but timestamp doesn't.  Please "
+           "investigate!";
 
     // TODO(austin): We still go in order, so we can erase from the beginning to
     // our iterator minus 1.  That'll keep 1 in the queue.
@@ -1330,7 +1404,8 @@
       return;
     }
 
-    // Now that it has been added (and cannibalized), forget about it upstream.
+    // Now that it has been added (and cannibalized), forget about it
+    // upstream.
     boot_merger_.PopFront();
   }
 }
@@ -1346,8 +1421,8 @@
     if (node_data.channels[m->channel_index].delivered) {
       // TODO(austin): This copies the data...  Probably not worth stressing
       // about yet.
-      // TODO(austin): Bound how big this can get.  We tend not to send massive
-      // data, so we can probably ignore this for a bit.
+      // TODO(austin): Bound how big this can get.  We tend not to send
+      // massive data, so we can probably ignore this for a bit.
       node_data.channels[m->channel_index].messages.emplace_back(*m);
     }
   }