Support graceful logging failures when running out of disk space

It's the most obvious failure mode when logging, so let's allow the
higher-level code to report a status instead of just crash-looping.

Change-Id: Iff223fd8b6b0f7f4b21d154a4dda5cce80fa6af2
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 573b9aa..ccec7fa 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -40,6 +40,33 @@
     }),
 )
 
+cc_binary(
+    name = "logfile_utils_out_of_space_test_runner",
+    testonly = True,
+    srcs = [
+        "logfile_utils_out_of_space_test_runner.cc",
+    ],
+    deps = [
+        ":logfile_utils",
+        "//aos:init",
+        "@com_github_gflags_gflags//:gflags",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+sh_test(
+    name = "logfile_utils_out_of_space_test",
+    srcs = [
+        "logfile_utils_out_of_space_test.sh",
+    ],
+    data = [
+        ":logfile_utils_out_of_space_test_runner",
+    ],
+    deps = [
+        "@bazel_tools//tools/bash/runfiles",
+    ],
+)
+
 cc_library(
     name = "buffer_encoder",
     srcs = [
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index a635682..a332929 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -188,11 +188,11 @@
 
 DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
     const Channel *channel) {
-  const bool log_delivery_times =
-      (this->node() == nullptr)
-          ? false
-          : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
-                channel, this->node(), this->node());
+  bool log_delivery_times = false;
+  if (this->node() != nullptr) {
+    log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+        channel, this->node(), this->node());
+  }
   if (!log_delivery_times) {
     return nullptr;
   }
@@ -200,20 +200,33 @@
   return data_writer_.get();
 }
 
+void MultiNodeLogNamer::Close() {
+  for (std::pair<const Channel *const, DataWriter> &data_writer :
+       data_writers_) {
+    if (data_writer.second.writer) {
+      data_writer.second.writer->Close();
+      if (data_writer.second.writer->ran_out_of_space()) {
+        ran_out_of_space_ = true;
+        data_writer.second.writer->acknowledge_out_of_space();
+      }
+    }
+  }
+  if (data_writer_) {
+    data_writer_->Close();
+    if (data_writer_->ran_out_of_space()) {
+      ran_out_of_space_ = true;
+      data_writer_->acknowledge_out_of_space();
+    }
+  }
+}
+
 void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
                                                      DataWriter *data_writer) {
   std::string filename =
       absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
                    "/", channel->type()->string_view(), ".part",
                    data_writer->part_number, ".bfbs");
-
-  if (!data_writer->writer) {
-    data_writer->writer = std::make_unique<DetachedBufferWriter>(
-        filename, std::make_unique<DummyEncoder>());
-  } else {
-    *data_writer->writer =
-        DetachedBufferWriter(filename, std::make_unique<DummyEncoder>());
-  }
+  CreateBufferWriter(filename, &data_writer->writer);
 }
 
 void MultiNodeLogNamer::OpenWriter(const Channel *channel,
@@ -222,13 +235,7 @@
       base_name_, "_", channel->source_node()->string_view(), "_data",
       channel->name()->string_view(), "/", channel->type()->string_view(),
       ".part", data_writer->part_number, ".bfbs");
-  if (!data_writer->writer) {
-    data_writer->writer = std::make_unique<DetachedBufferWriter>(
-        filename, std::make_unique<DummyEncoder>());
-  } else {
-    *data_writer->writer =
-        DetachedBufferWriter(filename, std::make_unique<DummyEncoder>());
-  }
+  CreateBufferWriter(filename, &data_writer->writer);
 }
 
 std::unique_ptr<DetachedBufferWriter> MultiNodeLogNamer::OpenDataWriter() {
@@ -238,5 +245,28 @@
       std::make_unique<DummyEncoder>());
 }
 
+void MultiNodeLogNamer::CreateBufferWriter(
+    std::string_view filename,
+    std::unique_ptr<DetachedBufferWriter> *destination) {
+  if (ran_out_of_space_) {
+    // Refuse to open any new files, which might skip data. Any existing files
+    // are in the same folder, which means they're on the same filesystem, which
+    // means they're probably going to run out of space and get stuck too.
+    return;
+  }
+  if (!destination->get()) {
+    *destination = std::make_unique<DetachedBufferWriter>(
+        filename, std::make_unique<DummyEncoder>());
+    return;
+  }
+  destination->get()->Close();
+  if (destination->get()->ran_out_of_space()) {
+    ran_out_of_space_ = true;
+    return;
+  }
+  *destination->get() =
+      DetachedBufferWriter(filename, std::make_unique<DummyEncoder>());
+}
+
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 1bef629..5265f24 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -88,6 +88,7 @@
         base_name_(base_name),
         uuid_(UUID::Random()),
         data_writer_(OpenDataWriter()) {}
+  ~LocalLogNamer() override = default;
 
   void WriteHeader(
       aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
@@ -123,6 +124,7 @@
  public:
   MultiNodeLogNamer(std::string_view base_name,
                     const Configuration *configuration, const Node *node);
+  ~MultiNodeLogNamer() override = default;
 
   void WriteHeader(
       aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
@@ -139,6 +141,20 @@
 
   DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
 
+  // Indicates that at least one file ran out of space. Once this happens, we
+  // stop trying to open new files, to avoid writing any files with holes from
+  // previous parts.
+  //
+  // Besides this function, this object will silently stop logging data when
+  // this occurs. If you want to ensure log files are complete, you must call
+  // this method.
+  bool ran_out_of_space() const { return ran_out_of_space_; }
+
+  // Closes all existing log files. No more data may be written after this.
+  //
+  // This may set ran_out_of_space().
+  void Close();
+
  private:
   // Files to write remote data to.  We want one per channel.  Maps the channel
   // to the writer, Node, and part number.
@@ -160,12 +176,17 @@
   // Opens the main data writer file for this node responsible for data_writer_.
   std::unique_ptr<DetachedBufferWriter> OpenDataWriter();
 
+  void CreateBufferWriter(std::string_view filename,
+                          std::unique_ptr<DetachedBufferWriter> *destination);
+
   const std::string base_name_;
   const Configuration *const configuration_;
   const UUID uuid_;
 
   size_t part_number_ = 0;
 
+  bool ran_out_of_space_ = false;
+
   // File to write both delivery timestamps and local data to.
   std::unique_ptr<DetachedBufferWriter> data_writer_;
 
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 7ddc658..5226154 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -46,12 +46,11 @@
 }
 
 DetachedBufferWriter::~DetachedBufferWriter() {
-  encoder_->Finish();
-  while (encoder_->queue_size() > 0) {
-    Flush();
+  Close();
+  if (ran_out_of_space_) {
+    CHECK(acknowledge_ran_out_of_space_)
+        << ": Unacknowledged out of disk space, log file was not completed";
   }
-  PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
-  VLOG(1) << "Closed " << filename_;
 }
 
 DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
@@ -66,6 +65,8 @@
   std::swap(filename_, other.filename_);
   std::swap(encoder_, other.encoder_);
   std::swap(fd_, other.fd_);
+  std::swap(ran_out_of_space_, other.ran_out_of_space_);
+  std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
   std::swap(iovec_, other.iovec_);
   std::swap(max_write_time_, other.max_write_time_);
   std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
@@ -83,6 +84,14 @@
     // syscall to write the data immediately instead of copying it to
     // enqueue.
 
+    if (ran_out_of_space_) {
+      // We don't want any later data to be written after space becomes
+      // available, so refuse to write anything more once we've dropped data
+      // because we ran out of space.
+      VLOG(1) << "Ignoring span: " << span.size();
+      return;
+    }
+
     // First, flush everything.
     while (encoder_->queue_size() > 0u) {
       Flush();
@@ -92,9 +101,7 @@
     const auto start = aos::monotonic_clock::now();
     const ssize_t written = write(fd_, span.data(), span.size());
     const auto end = aos::monotonic_clock::now();
-    PCHECK(written >= 0) << ": write failed";
-    CHECK_EQ(written, static_cast<ssize_t>(span.size()))
-        << ": Wrote " << written << " expected " << span.size();
+    HandleWriteReturn(written, span.size());
     UpdateStatsForWrite(end - start, written, 1);
   } else {
     encoder_->Encode(CopySpanAsDetachedBuffer(span));
@@ -103,11 +110,38 @@
   FlushAtThreshold();
 }
 
+void DetachedBufferWriter::Close() {
+  if (fd_ == -1) {
+    return;
+  }
+  encoder_->Finish();
+  while (encoder_->queue_size() > 0) {
+    Flush();
+  }
+  if (close(fd_) == -1) {
+    if (errno == ENOSPC) {
+      ran_out_of_space_ = true;
+    } else {
+      PLOG(ERROR) << "Closing log file failed";
+    }
+  }
+  fd_ = -1;
+  VLOG(1) << "Closed " << filename_;
+}
+
 void DetachedBufferWriter::Flush() {
   const auto queue = encoder_->queue();
   if (queue.empty()) {
     return;
   }
+  if (ran_out_of_space_) {
+    // We don't want any later data to be written after space becomes available,
+    // so refuse to write anything more once we've dropped data because we ran
+    // out of space.
+    VLOG(1) << "Ignoring queue: " << queue.size();
+    encoder_->Clear(queue.size());
+    return;
+  }
 
   iovec_.clear();
   const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
@@ -122,16 +156,30 @@
   const auto start = aos::monotonic_clock::now();
   const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
   const auto end = aos::monotonic_clock::now();
-  PCHECK(written >= 0) << ": writev failed";
-  // TODO(austin): Handle partial writes in some way other than crashing...
-  CHECK_EQ(written, static_cast<ssize_t>(counted_size))
-      << ": Wrote " << written << " expected " << counted_size;
+  HandleWriteReturn(written, counted_size);
 
   encoder_->Clear(iovec_size);
 
   UpdateStatsForWrite(end - start, written, iovec_size);
 }
 
+void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
+                                             size_t write_size) {
+  if (write_return == -1 && errno == ENOSPC) {
+    ran_out_of_space_ = true;
+    return;
+  }
+  PCHECK(write_return >= 0) << ": write failed";
+  if (write_return < static_cast<ssize_t>(write_size)) {
+    // Sometimes this happens instead of ENOSPC. On a real filesystem, this
+    // never seems to happen in any other case. If we ever want to log to a
+    // socket, this will happen more often. However, until we get there, we'll
+    // just assume it means we ran out of space.
+    ran_out_of_space_ = true;
+    return;
+  }
+}
+
 void DetachedBufferWriter::UpdateStatsForWrite(
     aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
   if (duration > max_write_time_) {
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index f64881c..3a312bc 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -75,6 +75,26 @@
   // Queues up data in span. May copy or may write it to disk immediately.
   void QueueSpan(absl::Span<const uint8_t> span);
 
+  // Indicates we got ENOSPC when trying to write. After this returns true, no
+  // further data is written.
+  bool ran_out_of_space() const { return ran_out_of_space_; }
+
+  // To avoid silently failing to write logfiles, you must call this before
+  // destruction if ran_out_of_space() is true and the situation has been
+  // handled.
+  void acknowledge_out_of_space() {
+    CHECK(ran_out_of_space_);
+    acknowledge_ran_out_of_space_ = true;
+  }
+
+  // Fully flushes and closes the underlying file now. No additional data may be
+  // enqueued after calling this.
+  //
+  // This will be performed in the destructor automatically.
+  //
+  // Note that this may set ran_out_of_space().
+  void Close();
+
   // Returns the total number of bytes written and currently queued.
   size_t total_bytes() const { return encoder_->total_bytes(); }
 
@@ -115,6 +135,10 @@
   // all of it.
   void Flush();
 
+  // write_return is what write(2) or writev(2) returned. write_size is the
+  // number of bytes we expected it to write.
+  void HandleWriteReturn(ssize_t write_return, size_t write_size);
+
   void UpdateStatsForWrite(aos::monotonic_clock::duration duration,
                            ssize_t written, int iovec_size);
 
@@ -126,6 +150,8 @@
   std::unique_ptr<DetachedBufferEncoder> encoder_;
 
   int fd_ = -1;
+  bool ran_out_of_space_ = false;
+  bool acknowledge_ran_out_of_space_ = false;
 
   // List of iovecs to use with writev.  This is a member variable to avoid
   // churn.
diff --git a/aos/events/logging/logfile_utils_out_of_space_test.sh b/aos/events/logging/logfile_utils_out_of_space_test.sh
new file mode 100755
index 0000000..f412e0d
--- /dev/null
+++ b/aos/events/logging/logfile_utils_out_of_space_test.sh
@@ -0,0 +1,46 @@
+#!/bin/bash
+
+# Tests DetachedBufferWriter's behavior when running out of disk space. We do
+# this by creating a small tmpfs in a new mount namespace, and then running a
+# test binary in that namespace. There's no easy way for a process with user
+# code to run itself in a new mount+user namespace, so we do that outside the
+# test process itself.
+
+# --- begin runfiles.bash initialization v2 ---
+# Copy-pasted from the Bazel Bash runfiles library v2.
+set -uo pipefail; f=bazel_tools/tools/bash/runfiles/runfiles.bash
+source "${RUNFILES_DIR:-/dev/null}/$f" 2>/dev/null || \
+source "$(grep -sm1 "^$f " "${RUNFILES_MANIFEST_FILE:-/dev/null}" | cut -f2- -d' ')" 2>/dev/null || \
+source "$0.runfiles/$f" 2>/dev/null || \
+source "$(grep -sm1 "^$f " "$0.runfiles_manifest" | cut -f2- -d' ')" 2>/dev/null || \
+source "$(grep -sm1 "^$f " "$0.exe.runfiles_manifest" | cut -f2- -d' ')" 2>/dev/null || \
+  { echo>&2 "ERROR: cannot find $f"; exit 1; }; f=; set -e
+# --- end runfiles.bash initialization v2 ---
+
+set -euo pipefail
+
+TMPFS="${TEST_TMPDIR}/small_tmpfs"
+rm -rf "${TMPFS}"
+mkdir "${TMPFS}"
+
+function test {
+  SIZE="$1"
+  echo "Running test with ${SIZE}..." >&2
+  unshare --mount --map-root-user bash <<END
+set -euo pipefail
+
+mount -t tmpfs tmpfs -o size=${SIZE} "${TMPFS}"
+
+exec "$(rlocation org_frc971/aos/events/logging/logfile_utils_out_of_space_test_runner)" -tmpfs "${TMPFS}"
+END
+  echo "Finished test with ${SIZE}" >&2
+}
+
+# Run out of space exactly at the beginning of a block.
+test 81920
+
+# Run out of space 1 byte into a block.
+test 81921
+
+# Run out of space in the middle of a block.
+test 87040
diff --git a/aos/events/logging/logfile_utils_out_of_space_test_runner.cc b/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
new file mode 100644
index 0000000..3010a6f
--- /dev/null
+++ b/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
@@ -0,0 +1,30 @@
+// See :logfile_utils_out_of_space_test for usage and details.
+
+#include <array>
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/init.h"
+
+DECLARE_int32(flush_size);
+DEFINE_string(tmpfs, "", "tmpfs with the desired size");
+
+int main(int argc, char **argv) {
+  aos::InitGoogle(&argc, &argv);
+  FLAGS_flush_size = 1;
+  CHECK(!FLAGS_tmpfs.empty()) << ": Must specify a tmpfs location";
+
+  aos::logger::DetachedBufferWriter writer(
+      FLAGS_tmpfs + "/file", std::make_unique<aos::logger::DummyEncoder>());
+  std::array<uint8_t, 10240> data;
+  data.fill(0);
+  for (int i = 0; i < 8; ++i) {
+    writer.QueueSpan(data);
+    CHECK(!writer.ran_out_of_space()) << ": " << i;
+  }
+  writer.QueueSpan(data);
+  CHECK(writer.ran_out_of_space());
+  writer.acknowledge_out_of_space();
+}