Use O_DIRECT when possible for faster log writing

O_DIRECT skips the kernel buffers.  On the rockpi, I get 2x the write
performance for significantly less CPU utilization.  It is disabled by
default.

Change-Id: If0938e2720286f168383ecca2401f28c1ac121b8
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 86c89a8..02a9fdd 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -64,6 +64,10 @@
             "corrupt message found by MessageReader be silently ignored, "
             "providing access to all uncorrupted messages in a logfile.");
 
+DEFINE_bool(direct, false,
+            "If true, write using O_DIRECT and write 512 byte aligned blocks "
+            "whenever possible.");
+
 namespace aos::logger {
 namespace {
 
@@ -81,7 +85,10 @@
 
 DetachedBufferWriter::DetachedBufferWriter(std::string_view filename,
                                            std::unique_ptr<DataEncoder> encoder)
-    : filename_(filename), encoder_(std::move(encoder)) {
+    : filename_(filename),
+      encoder_(std::move(encoder)),
+      supports_odirect_(FLAGS_direct) {
+  iovec_.reserve(10);
   if (!util::MkdirPIfSpace(filename, 0777)) {
     ran_out_of_space_ = true;
   } else {
@@ -92,10 +99,38 @@
       PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
                         << " for writing";
       VLOG(1) << "Opened " << this->filename() << " for writing";
+
+      flags_ = fcntl(fd_, F_GETFL, 0);
+      PCHECK(flags_ >= 0) << ": Failed to get flags for " << this->filename();
+
+      EnableDirect();
     }
   }
 }
 
+void DetachedBufferWriter::EnableDirect() {
+  if (supports_odirect_ && !ODirectEnabled()) {
+    const int new_flags = flags_ | O_DIRECT;
+    // Track if we failed to set O_DIRECT.  Note: Austin hasn't seen this call
+    // fail.  The write call tends to fail instead.
+    if (fcntl(fd_, F_SETFL, new_flags) == -1) {
+      PLOG(WARNING) << "Failed to set O_DIRECT on " << filename();
+      supports_odirect_ = false;
+    } else {
+      VLOG(1) << "Enabled O_DIRECT on " << filename();
+      flags_ = new_flags;
+    }
+  }
+}
+
+void DetachedBufferWriter::DisableDirect() {
+  if (supports_odirect_ && ODirectEnabled()) {
+    flags_ = flags_ & (~O_DIRECT);
+    PCHECK(fcntl(fd_, F_SETFL, flags_) != -1) << ": Failed to disable O_DIRECT";
+    VLOG(1) << "Disabled O_DIRECT on " << filename();
+  }
+}
+
 DetachedBufferWriter::~DetachedBufferWriter() {
   Close();
   if (ran_out_of_space_) {
@@ -126,6 +161,10 @@
   std::swap(total_write_count_, other.total_write_count_);
   std::swap(total_write_messages_, other.total_write_messages_);
   std::swap(total_write_bytes_, other.total_write_bytes_);
+  std::swap(last_synced_bytes_, other.last_synced_bytes_);
+  std::swap(supports_odirect_, other.supports_odirect_);
+  std::swap(flags_, other.flags_);
+  std::swap(last_flush_time_, other.last_flush_time_);
   return *this;
 }
 
@@ -198,23 +237,110 @@
   }
 
   iovec_.clear();
-  const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
+  size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
   iovec_.resize(iovec_size);
   size_t counted_size = 0;
+
+  // Ok, we now need to figure out if we were aligned, and if we were, how much
+  // of the data we are being asked to write is aligned.
+  //
+  // The file is aligned if it is a multiple of kSector in length.  The data is
+  // aligned if it's memory is kSector aligned, and the length is a multiple of
+  // kSector in length.
+  bool aligned = (total_write_bytes_ % kSector) == 0;
+  size_t write_index = 0;
   for (size_t i = 0; i < iovec_size; ++i) {
-    iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
-    iovec_[i].iov_len = queue[i].size();
-    counted_size += iovec_[i].iov_len;
+    iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
+
+    // Make sure the address is aligned, or give up.  This should be uncommon,
+    // but is always possible.
+    if ((reinterpret_cast<size_t>(iovec_[write_index].iov_base) &
+         (kSector - 1)) != 0) {
+      aligned = false;
+    }
+
+    // Now, see if the length is a multiple of kSector.  The goal is to figure
+    // out if/how much memory we can write out with O_DIRECT so that only the
+    // last little bit is done with non-direct IO to keep it fast.
+    iovec_[write_index].iov_len = queue[i].size();
+    if ((iovec_[write_index].iov_len % kSector) != 0) {
+      VLOG(1) << "Unaligned length on " << filename();
+      // If we've got over a sector of data to write, write it out with O_DIRECT
+      // and then continue writing the rest unaligned.
+      if (aligned && iovec_[write_index].iov_len > kSector) {
+        const size_t aligned_size =
+            iovec_[write_index].iov_len & (~(kSector - 1));
+        VLOG(1) << "Was aligned, writing last chunk rounded from "
+                << queue[i].size() << " to " << aligned_size;
+        iovec_[write_index].iov_len = aligned_size;
+
+        WriteV(iovec_.data(), i + 1, true, counted_size + aligned_size);
+
+        // Now, everything before here has been written.  Make an iovec out of
+        // the last bytes, and keep going.
+        iovec_size -= write_index;
+        iovec_.resize(iovec_size);
+        write_index = 0;
+        counted_size = 0;
+
+        iovec_[write_index].iov_base =
+            const_cast<uint8_t *>(queue[i].data() + aligned_size);
+        iovec_[write_index].iov_len = queue[i].size() - aligned_size;
+      }
+      aligned = false;
+    }
+    VLOG(1) << "Writing " << iovec_[write_index].iov_len << " to "
+            << filename();
+    counted_size += iovec_[write_index].iov_len;
+    ++write_index;
+  }
+
+  // Either write the aligned data if it is all aligned, or write the rest
+  // unaligned if we wrote aligned up above.
+  WriteV(iovec_.data(), iovec_.size(), aligned, counted_size);
+
+  encoder_->Clear(iovec_size);
+}
+
+size_t DetachedBufferWriter::WriteV(struct iovec *iovec_data, size_t iovec_size,
+                                    bool aligned, size_t counted_size) {
+  // Configure the file descriptor to match the mode we should be in.  This is
+  // safe to over-call since it only does the syscall if needed.
+  if (aligned) {
+    EnableDirect();
+  } else {
+    DisableDirect();
   }
 
   const auto start = aos::monotonic_clock::now();
-  const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
+  const ssize_t written = writev(fd_, iovec_data, iovec_size);
+
+  if (written > 0) {
+    // Flush asynchronously and force the data out of the cache.
+    sync_file_range(fd_, total_write_bytes_, written, SYNC_FILE_RANGE_WRITE);
+    if (last_synced_bytes_ != 0) {
+      // Per Linus' recommendation online on how to do fast file IO, do a
+      // blocking flush of the previous write chunk, and then tell the kernel to
+      // drop the pages from the cache.  This makes sure we can't get too far
+      // ahead.
+      sync_file_range(fd_, last_synced_bytes_,
+                      total_write_bytes_ - last_synced_bytes_,
+                      SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
+                          SYNC_FILE_RANGE_WAIT_AFTER);
+      posix_fadvise(fd_, last_synced_bytes_,
+                    total_write_bytes_ - last_synced_bytes_,
+                    POSIX_FADV_DONTNEED);
+
+      last_synced_bytes_ = total_write_bytes_;
+    }
+  }
+
   const auto end = aos::monotonic_clock::now();
   HandleWriteReturn(written, counted_size);
 
-  encoder_->Clear(iovec_size);
-
   UpdateStatsForWrite(end - start, written, iovec_size);
+
+  return written;
 }
 
 void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
@@ -223,7 +349,7 @@
     ran_out_of_space_ = true;
     return;
   }
-  PCHECK(write_return >= 0) << ": write failed";
+  PCHECK(write_return >= 0) << ": write failed, got " << write_return;
   if (write_return < static_cast<ssize_t>(write_size)) {
     // Sometimes this happens instead of ENOSPC. On a real filesystem, this
     // never seems to happen in any other case. If we ever want to log to a
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 1dbcbd8..50f6b40 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -44,11 +44,30 @@
 
 // This class manages efficiently writing a sequence of detached buffers to a
 // file.  It encodes them, queues them up, and batches the write operation.
+//
+// There are a couple over-arching constraints on writing to keep track of.
+//  1) The kernel is both faster and more efficient at writing large, aligned
+//     chunks with O_DIRECT set on the file.  The alignment needed is specified
+//     by kSector and is file system dependent.
+//  2) Not all encoders support generating round multiples of kSector of data.
+//     Rather than burden the API for detecting when that is the case, we want
+//     DetachedBufferWriter to be as efficient as it can at writing what given.
+//  3) Some files are small and not updated frequently.  They need to be
+//     flushed or we will lose data on power off.  It is most efficient to write
+//     as much as we can aligned by kSector and then fall back to the non direct
+//     method when it has been flushed.
+//  4) Not all filesystems support O_DIRECT, and different sizes may be optimal
+//     for different machines.  The defaults should work decently anywhere and
+//     be tuneable for faster systems.
 class DetachedBufferWriter {
  public:
   // Marker struct for one of our constructor overloads.
   struct already_out_of_space_t {};
 
+  // Size of an aligned sector used to detect when the data is aligned enough to
+  // use O_DIRECT instead.
+  static constexpr size_t kSector = 512u;
+
   DetachedBufferWriter(std::string_view filename,
                        std::unique_ptr<DataEncoder> encoder);
   // Creates a dummy instance which won't even open a file. It will act as if
@@ -155,6 +174,21 @@
   // the current time.  It just needs to be close.
   void FlushAtThreshold(aos::monotonic_clock::time_point now);
 
+  // Enables O_DIRECT on the open file if it is supported.  Cheap to call if it
+  // is already enabled.
+  void EnableDirect();
+  // Disables O_DIRECT on the open file if it is supported.  Cheap to call if it
+  // is already disabld.
+  void DisableDirect();
+
+  // Writes a chunk of iovecs.  aligned is true if all the data is kSector byte
+  // aligned and multiples of it in length, and counted_size is the sum of the
+  // sizes of all the chunks of data.  Returns the size of data written.
+  size_t WriteV(struct iovec *iovec_data, size_t iovec_size, bool aligned,
+                size_t counted_size);
+
+  bool ODirectEnabled() { return !!(flags_ & O_DIRECT); }
+
   std::string filename_;
   std::unique_ptr<DataEncoder> encoder_;
 
@@ -173,6 +207,10 @@
   int total_write_count_ = 0;
   int total_write_messages_ = 0;
   int total_write_bytes_ = 0;
+  int last_synced_bytes_ = 0;
+
+  bool supports_odirect_ = true;
+  int flags_ = 0;
 
   aos::monotonic_clock::time_point last_flush_time_ =
       aos::monotonic_clock::min_time;