Track which types of data are stored in which files

This sets us up to pre-load timestamps but not data when sorting log
files by recording the types of messages in each log file.  The goal is
to be able to load all timestamps and remote timestamps completely into
RAM at the beginning if they are in separate files, and then reduce our
data buffers to something small.  By loading all timestamps into RAM at
the beginning efficiently, we should be able to solve the entire class
of frozen bugs.

Change-Id: Ib21f9d3b39356abad6b912dd5108f971a9ed12ce
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 2516e6c..1125a0e 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -389,6 +389,8 @@
         ":logfile_utils",
         ":logger_fbs",
         "//aos:uuid",
+        "//aos/containers:error_list",
+        "//aos/containers:sized_array",
         "@com_github_google_flatbuffers//:flatbuffers",
         "@com_github_google_glog//:glog",
     ],
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index af7ff03..40a331a 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -10,6 +10,8 @@
 #include "flatbuffers/flatbuffers.h"
 #include "glog/logging.h"
 
+#include "aos/containers/error_list.h"
+#include "aos/containers/sized_array.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/flatbuffer_merge.h"
@@ -24,7 +26,8 @@
                              const Node *logger_node,
                              std::function<void(NewDataWriter *)> reopen,
                              std::function<void(NewDataWriter *)> close,
-                             size_t max_message_size)
+                             size_t max_message_size,
+                             std::initializer_list<StoredDataType> types)
     : node_(node),
       node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
       logger_node_index_(
@@ -34,8 +37,14 @@
       close_(std::move(close)),
       max_message_size_(max_message_size),
       max_out_of_order_duration_(log_namer_->base_max_out_of_order_duration()) {
+  allowed_data_types_.fill(false);
+
   state_.resize(configuration::NodesCount(log_namer->configuration_));
   CHECK_LT(node_index_, state_.size());
+  for (StoredDataType type : types) {
+    CHECK_LT(static_cast<size_t>(type), allowed_data_types_.size());
+    allowed_data_types_[static_cast<size_t>(type)] = true;
+  }
 }
 
 NewDataWriter::~NewDataWriter() {
@@ -190,6 +199,34 @@
   }
 }
 
+void NewDataWriter::CopyDataMessage(
+    DataEncoder::Copier *coppier, const UUID &source_node_boot_uuid,
+    aos::monotonic_clock::time_point now,
+    aos::monotonic_clock::time_point message_time) {
+  CHECK(allowed_data_types_[static_cast<size_t>(StoredDataType::DATA)])
+      << ": Tried to write data on non-data writer.";
+  CopyMessage(coppier, source_node_boot_uuid, now, message_time);
+}
+
+void NewDataWriter::CopyTimestampMessage(
+    DataEncoder::Copier *coppier, const UUID &source_node_boot_uuid,
+    aos::monotonic_clock::time_point now,
+    aos::monotonic_clock::time_point message_time) {
+  CHECK(allowed_data_types_[static_cast<size_t>(StoredDataType::TIMESTAMPS)])
+      << ": Tried to write timestamps on non-timestamp writer.";
+  CopyMessage(coppier, source_node_boot_uuid, now, message_time);
+}
+
+void NewDataWriter::CopyRemoteTimestampMessage(
+    DataEncoder::Copier *coppier, const UUID &source_node_boot_uuid,
+    aos::monotonic_clock::time_point now,
+    aos::monotonic_clock::time_point message_time) {
+  CHECK(allowed_data_types_[static_cast<size_t>(
+      StoredDataType::REMOTE_TIMESTAMPS)])
+      << ": Tried to write remote timestamps on non-remote timestamp writer.";
+  CopyMessage(coppier, source_node_boot_uuid, now, message_time);
+}
+
 void NewDataWriter::CopyMessage(DataEncoder::Copier *coppier,
                                 const UUID &source_node_boot_uuid,
                                 aos::monotonic_clock::time_point now,
@@ -285,7 +322,8 @@
     CHECK_EQ(state_[logger_node_index].boot_uuid, logger_node_boot_uuid);
   }
   return log_namer_->MakeHeader(node_index_, state_, parts_uuid(), parts_index_,
-                                max_out_of_order_duration_);
+                                max_out_of_order_duration_,
+                                allowed_data_types_);
 }
 
 void NewDataWriter::QueueHeader(
@@ -342,11 +380,13 @@
 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
     size_t node_index, const std::vector<NewDataWriter::State> &state,
     const UUID &parts_uuid, int parts_index,
-    std::chrono::nanoseconds max_out_of_order_duration) {
+    std::chrono::nanoseconds max_out_of_order_duration,
+    const std::array<bool, static_cast<size_t>(StoredDataType::MAX) + 1>
+        &allowed_data_types) {
   const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
   const Node *const source_node =
       configuration::GetNode(configuration_, node_index);
-  CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 34u);
+  CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 35u);
   flatbuffers::FlatBufferBuilder fbb;
   fbb.ForceDefaults(true);
 
@@ -524,6 +564,18 @@
       flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
       boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
 
+  aos::ErrorList<StoredDataType> allowed_data_types_vector;
+  for (size_t type = static_cast<size_t>(StoredDataType::MIN);
+       type <= static_cast<size_t>(StoredDataType::MAX); ++type) {
+    if (allowed_data_types[type]) {
+      allowed_data_types_vector.Set(static_cast<StoredDataType>(type));
+    }
+  }
+
+  flatbuffers::Offset<flatbuffers::Vector<StoredDataType>> data_stored_offset =
+      fbb.CreateVector(allowed_data_types_vector.data(),
+                       allowed_data_types_vector.size());
+
   aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
 
   log_file_header_builder.add_name(name_offset);
@@ -617,6 +669,8 @@
   log_file_header_builder
       .add_oldest_logger_local_unreliable_monotonic_timestamps(
           oldest_logger_local_unreliable_monotonic_timestamps_offset);
+
+  log_file_header_builder.add_data_stored(data_stored_offset);
   fbb.FinishSizePrefixed(log_file_header_builder.Finish());
   aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
       fbb.Release());
@@ -710,7 +764,7 @@
       MakeDataWriter();
     }
     data_writer_->UpdateMaxMessageSize(
-        PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
+        PackMessageSize(LogType::kLogMessage, channel->max_size()));
     return data_writer_.get();
   }
 
@@ -743,9 +797,8 @@
         OpenWriter(channel, data_writer);
       },
       [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
-      0);
-  data_writer.UpdateMaxMessageSize(
-      PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
+      PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()),
+      {StoredDataType::DATA});
 
   data_writers_.emplace(channel, std::move(data_writer));
   node_data_writers_.emplace(source_node, &data_writers_.find(channel)->second);
@@ -778,7 +831,7 @@
         OpenForwardedTimestampWriter(channel, data_writer);
       },
       [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
-      PackRemoteMessageSize());
+      PackRemoteMessageSize(), {StoredDataType::REMOTE_TIMESTAMPS});
 
   data_writers_.emplace(channel, std::move(data_writer));
   node_timestamp_writers_.emplace(node, &data_writers_.find(channel)->second);
@@ -871,7 +924,9 @@
         },
         // Default size is 0 so it will be obvious if something doesn't fix it
         // afterwards.
-        0);
+        0,
+        std::initializer_list<StoredDataType>{StoredDataType::DATA,
+                                              StoredDataType::TIMESTAMPS});
   }
 }
 
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 34484c4..6a18997 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -41,7 +41,8 @@
   NewDataWriter(LogNamer *log_namer, const Node *node, const Node *logger_node,
                 std::function<void(NewDataWriter *)> reopen,
                 std::function<void(NewDataWriter *)> close,
-                size_t max_message_size);
+                size_t max_message_size,
+                std::initializer_list<StoredDataType> types);
 
   void UpdateMaxMessageSize(size_t new_size) {
     if (new_size > max_message_size_) {
@@ -74,11 +75,20 @@
                     bool reliable,
                     monotonic_clock::time_point monotonic_timestamp_time =
                         monotonic_clock::min_time);
+
   // Coppies a message with the provided boot UUID.
-  void CopyMessage(DataEncoder::Copier *coppier,
-                   const UUID &source_node_boot_uuid,
-                   aos::monotonic_clock::time_point now,
-                   aos::monotonic_clock::time_point message_time);
+  void CopyDataMessage(DataEncoder::Copier *copier,
+                       const UUID &source_node_boot_uuid,
+                       aos::monotonic_clock::time_point now,
+                       aos::monotonic_clock::time_point message_time);
+  void CopyTimestampMessage(DataEncoder::Copier *copier,
+                            const UUID &source_node_boot_uuid,
+                            aos::monotonic_clock::time_point now,
+                            aos::monotonic_clock::time_point message_time);
+  void CopyRemoteTimestampMessage(
+      DataEncoder::Copier *copier, const UUID &source_node_boot_uuid,
+      aos::monotonic_clock::time_point now,
+      aos::monotonic_clock::time_point message_time);
 
   // Updates the current boot for the source node.  This is useful when you want
   // to queue a message that may trigger a reboot rotation, but then need to
@@ -144,6 +154,11 @@
   // Signals that a node has rebooted.
   void Reboot(const UUID &source_node_boot_uuid);
 
+  void CopyMessage(DataEncoder::Copier *copier,
+                   const UUID &source_node_boot_uuid,
+                   aos::monotonic_clock::time_point now,
+                   aos::monotonic_clock::time_point message_time);
+
   void QueueHeader(
       aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header);
 
@@ -182,6 +197,11 @@
   // Since the messages can be logged out of order, this helps determine if
   // max out of order duration was violated.
   monotonic_clock::time_point newest_message_time_ = monotonic_clock::min_time;
+
+  // An array with a bool for each value of StoredDataType representing if that
+  // data type is allowed to be logged by this object.
+  std::array<bool, static_cast<size_t>(StoredDataType::MAX) + 1>
+      allowed_data_types_;
 };
 
 // Interface describing how to name, track, and add headers to log file parts.
@@ -302,7 +322,9 @@
   aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
       size_t node_index, const std::vector<NewDataWriter::State> &state,
       const UUID &parts_uuid, int parts_index,
-      std::chrono::nanoseconds max_out_of_order_duration);
+      std::chrono::nanoseconds max_out_of_order_duration,
+      const std::array<bool, static_cast<size_t>(StoredDataType::MAX) + 1>
+          &allowed_data_types);
 
   EventLoop *event_loop_;
   const Configuration *const configuration_;
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 1e0f7c8..69b0542 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -708,7 +708,8 @@
         static_cast<int>(node_index_) != f.data_node_index
             ? f.fetcher->context().monotonic_remote_time
             : f.fetcher->context().monotonic_event_time;
-    writer->CopyMessage(&coppier, source_node_boot_uuid, start, message_time);
+    writer->CopyDataMessage(&coppier, source_node_boot_uuid, start,
+                            message_time);
     RecordCreateMessageTime(start, coppier.end_time(), f);
 
     VLOG(2) << "Wrote data as node " << FlatbufferToJson(node_)
@@ -733,8 +734,9 @@
     ContextDataCopier coppier(f.fetcher->context(), f.channel_index,
                               LogType::kLogDeliveryTimeOnly, event_loop_);
 
-    timestamp_writer->CopyMessage(&coppier, event_loop_->boot_uuid(), start,
-                                  f.fetcher->context().monotonic_event_time);
+    timestamp_writer->CopyTimestampMessage(
+        &coppier, event_loop_->boot_uuid(), start,
+        f.fetcher->context().monotonic_event_time);
     RecordCreateMessageTime(start, coppier.end_time(), f);
 
     VLOG(2) << "Wrote timestamps as node " << FlatbufferToJson(node_)
@@ -784,7 +786,7 @@
     RemoteMessageCopier coppier(msg, channel_index, monotonic_timestamp_time,
                                 event_loop_);
 
-    contents_writer->CopyMessage(
+    contents_writer->CopyRemoteTimestampMessage(
         &coppier, UUID::FromVector(msg->boot_uuid()), start,
         monotonic_clock::time_point(
             chrono::nanoseconds(msg->monotonic_sent_time())));
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index d27c301..ac204c4 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -48,7 +48,7 @@
 }
 
 bool ConfigOnly(const LogFileHeader *header) {
-  CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 34u);
+  CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 35u);
   if (header->has_monotonic_start_time()) return false;
   if (header->has_realtime_start_time()) return false;
   if (header->has_max_out_of_order_duration()) return false;
@@ -80,6 +80,7 @@
     return false;
   if (header->has_logger_sha1()) return false;
   if (header->has_logger_version()) return false;
+  if (header->has_data_stored()) return false;
 
   return header->has_configuration();
 }
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index 5bda225..5733fa0 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -2,6 +2,15 @@
 
 namespace aos.logger;
 
+enum StoredDataType : uint8 {
+  // We have message data in this file.
+  DATA = 0,
+  // We have timestamps of delivered data in this file.
+  TIMESTAMPS = 1,
+  // We have remote timestamps in this file.
+  REMOTE_TIMESTAMPS = 2,
+}
+
 // A log file is a sequence of size prefixed flatbuffers.
 // The first flatbuffer will be the LogFileHeader, followed by an arbitrary
 // number of MessageHeaders.
@@ -209,6 +218,10 @@
   // Logger textual version.  This is normally the release name stamped into
   // the binary.
   logger_version:string (id:33);
+
+  // The types of data stored in this log file.  This can be used to find logs
+  // with only timestamps in them to pre-solve the time problem.
+  data_stored:[StoredDataType] (id:34);
 }
 
 // Table holding a message.
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index 2073a5a..f6e5447 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -124,6 +124,49 @@
 
     EXPECT_EQ(log_header[15].message().parts_index(), 0);
     EXPECT_EQ(log_header[16].message().parts_index(), 1);
+
+    // And that the data_stored field is right.
+    EXPECT_THAT(*log_header[2].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::DATA,
+                                       StoredDataType::TIMESTAMPS));
+    EXPECT_THAT(*log_header[3].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::DATA,
+                                       StoredDataType::TIMESTAMPS));
+    EXPECT_THAT(*log_header[4].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::DATA,
+                                       StoredDataType::TIMESTAMPS));
+
+    EXPECT_THAT(*log_header[5].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::DATA,
+                                       StoredDataType::TIMESTAMPS));
+    EXPECT_THAT(*log_header[6].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::DATA,
+                                       StoredDataType::TIMESTAMPS));
+    EXPECT_THAT(*log_header[7].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::DATA,
+                                       StoredDataType::TIMESTAMPS));
+
+    EXPECT_THAT(*log_header[8].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::DATA));
+    EXPECT_THAT(*log_header[9].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::DATA));
+
+    EXPECT_THAT(*log_header[10].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::DATA));
+    EXPECT_THAT(*log_header[11].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::DATA));
+
+    EXPECT_THAT(*log_header[12].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
+    EXPECT_THAT(*log_header[13].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
+    EXPECT_THAT(*log_header[14].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
+
+    EXPECT_THAT(*log_header[15].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
+    EXPECT_THAT(*log_header[16].message().data_stored(),
+                ::testing::ElementsAre(StoredDataType::REMOTE_TIMESTAMPS));
   }
 
   const std::vector<LogFile> sorted_parts = SortParts(logfiles_);