Optimize simulated event loop log reading.

Currenly, we make 2 copies of a message before it is sent in
simulation: First, from the source (file or lzma decoder) to a
flatbuffer vector, then to the simulated message. By aligning the
data part of a MessageHeader in the first copy from the source,
we don't need to copy it again. This is all tracked with
shared_ptr's so the lifetime management is easy.

Change-Id: I82c86ef3f9662d4c615dc57862fa89b1b9981ed4
Signed-off-by: Tyler Chatow <tchatow@gmail.com>
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 0be64bb..a923cf6 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1,9 +1,8 @@
-#include "aos/events/logging/log_reader.h"
-
 #include <sys/stat.h>
 
 #include "absl/strings/str_format.h"
 #include "aos/events/event_loop.h"
+#include "aos/events/logging/log_reader.h"
 #include "aos/events/logging/log_writer.h"
 #include "aos/events/message_counter.h"
 #include "aos/events/ping_lib.h"
@@ -849,19 +848,18 @@
 // type, count) for every message matching matcher()
 std::vector<std::tuple<std::string, std::string, int>> CountChannelsMatching(
     std::shared_ptr<const aos::Configuration> config, std::string_view filename,
-    std::function<bool(const MessageHeader *)> matcher) {
+    std::function<bool(const UnpackedMessageHeader *)> matcher) {
   MessageReader message_reader(filename);
   std::vector<int> counts(config->channels()->size(), 0);
 
   while (true) {
-    std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
-        message_reader.ReadMessage();
+    std::shared_ptr<UnpackedMessageHeader> msg = message_reader.ReadMessage();
     if (!msg) {
       break;
     }
 
-    if (matcher(&msg.value().message())) {
-      counts[msg.value().message().channel_index()]++;
+    if (matcher(msg.get())) {
+      counts[msg->channel_index]++;
     }
   }
 
@@ -883,30 +881,32 @@
 std::vector<std::tuple<std::string, std::string, int>> CountChannelsData(
     std::shared_ptr<const aos::Configuration> config,
     std::string_view filename) {
-  return CountChannelsMatching(config, filename, [](const MessageHeader *msg) {
-    if (msg->has_data()) {
-      CHECK(!msg->has_monotonic_remote_time());
-      CHECK(!msg->has_realtime_remote_time());
-      CHECK(!msg->has_remote_queue_index());
-      return true;
-    }
-    return false;
-  });
+  return CountChannelsMatching(
+      config, filename, [](const UnpackedMessageHeader *msg) {
+        if (msg->span.data() != nullptr) {
+          CHECK(!msg->monotonic_remote_time.has_value());
+          CHECK(!msg->realtime_remote_time.has_value());
+          CHECK(!msg->remote_queue_index.has_value());
+          return true;
+        }
+        return false;
+      });
 }
 
 // Counts the number of messages (channel, count) for all timestamp messages.
 std::vector<std::tuple<std::string, std::string, int>> CountChannelsTimestamp(
     std::shared_ptr<const aos::Configuration> config,
     std::string_view filename) {
-  return CountChannelsMatching(config, filename, [](const MessageHeader *msg) {
-    if (!msg->has_data()) {
-      CHECK(msg->has_monotonic_remote_time());
-      CHECK(msg->has_realtime_remote_time());
-      CHECK(msg->has_remote_queue_index());
-      return true;
-    }
-    return false;
-  });
+  return CountChannelsMatching(
+      config, filename, [](const UnpackedMessageHeader *msg) {
+        if (msg->span.data() == nullptr) {
+          CHECK(msg->monotonic_remote_time.has_value());
+          CHECK(msg->realtime_remote_time.has_value());
+          CHECK(msg->remote_queue_index.has_value());
+          return true;
+        }
+        return false;
+      });
 }
 
 // Tests that we can write and read simple multi-node log files.