Handle empty and corrupted log parts.

We used to crash.  Now, sort them as "corrupted", and also use what we
can extract.

Corrupted files come from unsafe shutdowns.  It is impractical to be
perfect and never have any.

Change-Id: I68acc05d482c484f17ad335f4ab2054e180d8a37
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 57d274b..c55b18b 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -12,6 +12,10 @@
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
 
+#ifdef LZMA
+#include "aos/events/logging/lzma_encoder.h"
+#endif
+
 namespace aos {
 namespace logger {
 namespace testing {
@@ -261,7 +265,7 @@
     // parts_index increments.
     std::vector<FlatbufferVector<LogFileHeader>> log_header;
     for (std::string_view f : {logfile0, logfile1}) {
-      log_header.emplace_back(ReadHeader(f));
+      log_header.emplace_back(ReadHeader(f).value());
     }
 
     EXPECT_EQ(log_header[0].message().log_event_uuid()->string_view(),
@@ -422,20 +426,114 @@
     return {factory->MakeEventLoop("logger", node), {}};
   }
 
-  void StartLogger(LoggerState *logger, std::string logfile_base = "") {
+  void StartLogger(LoggerState *logger, std::string logfile_base = "",
+                   bool compress = false) {
     if (logfile_base.empty()) {
       logfile_base = logfile_base_;
     }
 
     logger->logger = std::make_unique<Logger>(logger->event_loop.get());
     logger->logger->set_polling_period(std::chrono::milliseconds(100));
-    logger->event_loop->OnRun([logger, logfile_base]() {
-      logger->logger->StartLogging(std::make_unique<MultiNodeLogNamer>(
-          logfile_base, logger->event_loop->configuration(),
-          logger->event_loop->node()));
+    logger->event_loop->OnRun([logger, logfile_base, compress]() {
+      std::unique_ptr<MultiNodeLogNamer> namer =
+          std::make_unique<MultiNodeLogNamer>(
+              logfile_base, logger->event_loop->configuration(),
+              logger->event_loop->node());
+      if (compress) {
+#ifdef LZMA
+        namer->set_extension(".xz");
+        namer->set_encoder_factory(
+            []() { return std::make_unique<aos::logger::LzmaEncoder>(3); });
+#else
+        LOG(FATAL) << "Compression unsupported";
+#endif
+      }
+
+      logger->logger->StartLogging(std::move(namer));
     });
   }
 
+  void VerifyParts(const std::vector<LogFile> &sorted_parts,
+                   const std::vector<std::string> &corrupted_parts = {}) {
+    EXPECT_EQ(sorted_parts.size(), 2u);
+
+    // Count up the number of UUIDs and make sure they are what we expect as a
+    // sanity check.
+    std::set<std::string> log_event_uuids;
+    std::set<std::string> parts_uuids;
+    std::set<std::string> both_uuids;
+
+    size_t missing_rt_count = 0;
+
+    std::vector<std::string> logger_nodes;
+    for (const LogFile &log_file : sorted_parts) {
+      EXPECT_FALSE(log_file.log_event_uuid.empty());
+      log_event_uuids.insert(log_file.log_event_uuid);
+      logger_nodes.emplace_back(log_file.logger_node);
+      both_uuids.insert(log_file.log_event_uuid);
+
+      for (const LogParts &part : log_file.parts) {
+        EXPECT_NE(part.monotonic_start_time, aos::monotonic_clock::min_time)
+            << ": " << part;
+        missing_rt_count +=
+            part.realtime_start_time == aos::realtime_clock::min_time;
+
+        EXPECT_TRUE(log_event_uuids.find(part.log_event_uuid) !=
+                    log_event_uuids.end());
+        EXPECT_NE(part.node, "");
+        parts_uuids.insert(part.parts_uuid);
+        both_uuids.insert(part.parts_uuid);
+      }
+    }
+
+    // We won't have RT timestamps for 5 log files.  We don't log the RT start
+    // time on remote nodes because we don't know it and would be guessing.  And
+    // the log reader can actually do a better job.
+    EXPECT_EQ(missing_rt_count, 5u);
+
+    EXPECT_EQ(log_event_uuids.size(), 2u);
+    EXPECT_EQ(parts_uuids.size(), ToLogReaderVector(sorted_parts).size());
+    EXPECT_EQ(log_event_uuids.size() + parts_uuids.size(), both_uuids.size());
+
+    // Test that each list of parts is in order.  Don't worry about the ordering
+    // between part file lists though.
+    // (inner vectors all need to be in order, but outer one doesn't matter).
+    EXPECT_THAT(ToLogReaderVector(sorted_parts),
+                ::testing::UnorderedElementsAreArray(structured_logfiles_));
+
+    EXPECT_THAT(logger_nodes, ::testing::UnorderedElementsAre("pi1", "pi2"));
+
+    EXPECT_NE(sorted_parts[0].realtime_start_time,
+              aos::realtime_clock::min_time);
+    EXPECT_NE(sorted_parts[1].realtime_start_time,
+              aos::realtime_clock::min_time);
+
+    EXPECT_NE(sorted_parts[0].monotonic_start_time,
+              aos::monotonic_clock::min_time);
+    EXPECT_NE(sorted_parts[1].monotonic_start_time,
+              aos::monotonic_clock::min_time);
+
+    EXPECT_THAT(sorted_parts[0].corrupted, ::testing::Eq(corrupted_parts));
+    EXPECT_THAT(sorted_parts[1].corrupted, ::testing::Eq(corrupted_parts));
+  }
+
+  void AddExtension(std::string_view extension) {
+    std::transform(logfiles_.begin(), logfiles_.end(), logfiles_.begin(),
+                   [extension](const std::string &in) {
+                     return absl::StrCat(in, extension);
+                   });
+
+    std::transform(structured_logfiles_.begin(), structured_logfiles_.end(),
+                   structured_logfiles_.begin(),
+                   [extension](std::vector<std::string> in) {
+                     std::transform(in.begin(), in.end(), in.begin(),
+                                    [extension](const std::string &in_str) {
+                                      return absl::StrCat(in_str, extension);
+                                    });
+                     return in;
+                   });
+  }
+
   // Config and factory.
   aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
   SimulatedEventLoopFactory event_loop_factory_;
@@ -540,7 +638,7 @@
     // UUIDs and parts UUIDs.
     std::vector<FlatbufferVector<LogFileHeader>> log_header;
     for (std::string_view f : logfiles_) {
-      log_header.emplace_back(ReadHeader(f));
+      log_header.emplace_back(ReadHeader(f).value());
       logfile_uuids.insert(log_header.back().message().log_event_uuid()->str());
       parts_uuids.insert(log_header.back().message().parts_uuid()->str());
     }
@@ -1116,64 +1214,96 @@
   }
 
   const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+  VerifyParts(sorted_parts);
+}
 
-  EXPECT_EQ(sorted_parts.size(), 2u);
+// Tests that we can sort a bunch of parts with an empty part.  We should ignore
+// it and remove it from the sorted list.
+TEST_F(MultinodeLoggerTest, SortEmptyParts) {
+  // Make a bunch of parts.
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
 
-  // Count up the number of UUIDs and make sure they are what we expect as a
-  // sanity check.
-  std::set<std::string> log_event_uuids;
-  std::set<std::string> parts_uuids;
-  std::set<std::string> both_uuids;
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
 
-  size_t missing_rt_count = 0;
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
 
-  std::vector<std::string> logger_nodes;
-  for (const LogFile &log_file : sorted_parts) {
-    EXPECT_FALSE(log_file.log_event_uuid.empty());
-    log_event_uuids.insert(log_file.log_event_uuid);
-    logger_nodes.emplace_back(log_file.logger_node);
-    both_uuids.insert(log_file.log_event_uuid);
-
-    for (const LogParts &part : log_file.parts) {
-      EXPECT_NE(part.monotonic_start_time, aos::monotonic_clock::min_time)
-          << ": " << part;
-      missing_rt_count +=
-          part.realtime_start_time == aos::realtime_clock::min_time;
-
-      EXPECT_TRUE(log_event_uuids.find(part.log_event_uuid) !=
-                  log_event_uuids.end());
-      EXPECT_NE(part.node, "");
-      parts_uuids.insert(part.parts_uuid);
-      both_uuids.insert(part.parts_uuid);
-    }
+    event_loop_factory_.RunFor(chrono::milliseconds(2000));
   }
 
-  // We won't have RT timestamps for 5 log files.  We don't log the RT start
-  // time on remote nodes because we don't know it and would be guessing.  And
-  // the log reader can actually do a better job.
-  EXPECT_EQ(missing_rt_count, 5u);
+  // TODO(austin): Should we flip out if the file can't open?
+  const std::string kEmptyFile("foobarinvalidfiledoesnotexist.bfbs");
 
-  EXPECT_EQ(log_event_uuids.size(), 2u);
-  EXPECT_EQ(parts_uuids.size(), ToLogReaderVector(sorted_parts).size());
-  EXPECT_EQ(log_event_uuids.size() + parts_uuids.size(), both_uuids.size());
+  aos::util::WriteStringToFileOrDie(kEmptyFile, "");
+  logfiles_.emplace_back(kEmptyFile);
 
-  // Test that each list of parts is in order.  Don't worry about the ordering
-  // between part file lists though.
-  // (inner vectors all need to be in order, but outer one doesn't matter).
-  EXPECT_THAT(ToLogReaderVector(sorted_parts),
-              ::testing::UnorderedElementsAreArray(structured_logfiles_));
-
-  EXPECT_THAT(logger_nodes, ::testing::UnorderedElementsAre("pi1", "pi2"));
-
-  EXPECT_NE(sorted_parts[0].realtime_start_time, aos::realtime_clock::min_time);
-  EXPECT_NE(sorted_parts[1].realtime_start_time, aos::realtime_clock::min_time);
-
-  EXPECT_NE(sorted_parts[0].monotonic_start_time,
-            aos::monotonic_clock::min_time);
-  EXPECT_NE(sorted_parts[1].monotonic_start_time,
-            aos::monotonic_clock::min_time);
+  const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+  VerifyParts(sorted_parts, {kEmptyFile});
 }
 
+#ifdef LZMA
+// Tests that we can sort a bunch of parts with an empty .xz file in there.  The
+// empty file should be ignored.
+TEST_F(MultinodeLoggerTest, SortEmptyCompressedParts) {
+  // Make a bunch of parts.
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger, "", true);
+    StartLogger(&pi2_logger, "", true);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(2000));
+  }
+
+  // TODO(austin): Should we flip out if the file can't open?
+  const std::string kEmptyFile("foobarinvalidfiledoesnotexist.bfbs.xz");
+
+  AddExtension(".xz");
+
+  aos::util::WriteStringToFileOrDie(kEmptyFile, "");
+  logfiles_.emplace_back(kEmptyFile);
+
+  const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+  VerifyParts(sorted_parts, {kEmptyFile});
+}
+
+// Tests that we can sort a bunch of parts with the end missing off a compressed
+// file.  We should use the part we can read.
+TEST_F(MultinodeLoggerTest, SortTruncatedCompressedParts) {
+  // Make a bunch of parts.
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger, "", true);
+    StartLogger(&pi2_logger, "", true);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(2000));
+  }
+
+  // Append everything with .xz.
+  AddExtension(".xz");
+
+  // Strip off the end of one of the files.  Pick one with a lot of data.
+  ::std::string compressed_contents =
+      aos::util::ReadFileToStringOrDie(logfiles_[0]);
+
+  aos::util::WriteStringToFileOrDie(
+      logfiles_[0],
+      compressed_contents.substr(0, compressed_contents.size() - 100));
+
+  const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+  VerifyParts(sorted_parts);
+}
+#endif
+
 // Tests that if we remap a remapped channel, it shows up correctly.
 TEST_F(MultinodeLoggerTest, RemapLoggedChannel) {
   {