Don't lose pieces out of the middle of writing files

We didn't get all the accounting right when writing out aligned subsets
of buffers and unaligned subsets.  Fix the correctness issues in that
and add tests (Alexei wrote most of the tests).

Change-Id: I8ac6bcbd6b762174b3f47d41ab5da1dff2cfec44
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_backend.cc b/aos/events/logging/log_backend.cc
index f47b16d..a8b7a43 100644
--- a/aos/events/logging/log_backend.cc
+++ b/aos/events/logging/log_backend.cc
@@ -11,6 +11,10 @@
 DEFINE_bool(direct, false,
             "If true, write using O_DIRECT and write 512 byte aligned blocks "
             "whenever possible.");
+DEFINE_bool(
+    sync, false,
+    "If true, sync data to disk as we go so we don't get too far ahead.  Also "
+    "fadvise that we are done with the memory once it hits disk.");
 
 namespace aos::logger {
 namespace {
@@ -72,34 +76,86 @@
 WriteResult FileHandler::Write(
     const absl::Span<const absl::Span<const uint8_t>> &queue) {
   iovec_.clear();
-  size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
-  iovec_.resize(iovec_size);
+  CHECK_LE(queue.size(), static_cast<size_t>(IOV_MAX));
+  iovec_.resize(queue.size());
+  // Size of the data currently in iovec_.
   size_t counted_size = 0;
 
   // Ok, we now need to figure out if we were aligned, and if we were, how much
   // of the data we are being asked to write is aligned.
   //
-  // The file is aligned if it is a multiple of kSector in length.  The data is
-  // aligned if it's memory is kSector aligned, and the length is a multiple of
-  // kSector in length.
+  // When writing with O_DIRECT, the kernel only will accept writes where the
+  // offset into the file is a multiple of kSector, the data is aligned to
+  // kSector in memory, and the length being written is a multiple of kSector.
+  // Some of the callers use an aligned ResizeableBuffer to generate 512 byte
+  // aligned buffers for this code to find and use.
   bool aligned = (total_write_bytes_ % kSector) == 0;
+
+  // Index we are filling in next.  Keeps resetting back to 0 as we write
+  // intermediates.
   size_t write_index = 0;
-  for (size_t i = 0; i < iovec_size; ++i) {
+  for (size_t i = 0; i < queue.size(); ++i) {
     iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
+    iovec_[write_index].iov_len = queue[i].size();
 
     // Make sure the address is aligned, or give up.  This should be uncommon,
     // but is always possible.
-    if ((reinterpret_cast<size_t>(iovec_[write_index].iov_base) &
-         (kSector - 1)) != 0) {
+    if ((reinterpret_cast<size_t>(iovec_[write_index].iov_base) % kSector) !=
+        0) {
+      // Flush if we were aligned and have data.
+      if (aligned && write_index != 0) {
+        VLOG(1) << "Was aligned, now is not, writing previous data";
+        const auto code =
+            WriteV(iovec_.data(), write_index, true, counted_size);
+        if (code == WriteCode::kOutOfSpace) {
+          return {
+              .code = code,
+              .messages_written = i,
+          };
+        }
+
+        // Now, everything before here has been written.  Make an iovec out of
+        // the rest and keep going.
+        write_index = 0;
+        counted_size = 0;
+
+        iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
+        iovec_[write_index].iov_len = queue[i].size();
+      }
       aligned = false;
+    } else {
+      // We are now starting aligned again, and have data worth writing!  Flush
+      // what was there before.
+      if (!aligned && iovec_[write_index].iov_len >= kSector &&
+          write_index != 0) {
+        VLOG(1) << "Was not aligned, now is, writing previous data";
+
+        const auto code =
+            WriteV(iovec_.data(), write_index, false, counted_size);
+        if (code == WriteCode::kOutOfSpace) {
+          return {
+              .code = code,
+              .messages_written = i,
+          };
+        }
+
+        // Now, everything before here has been written.  Make an iovec out of
+        // the rest and keep going.
+        write_index = 0;
+        counted_size = 0;
+
+        iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
+        iovec_[write_index].iov_len = queue[i].size();
+        aligned = true;
+      }
     }
 
     // Now, see if the length is a multiple of kSector.  The goal is to figure
     // out if/how much memory we can write out with O_DIRECT so that only the
     // last little bit is done with non-direct IO to keep it fast.
-    iovec_[write_index].iov_len = queue[i].size();
     if ((iovec_[write_index].iov_len % kSector) != 0) {
-      VLOG(1) << "Unaligned length on " << filename();
+      VLOG(1) << "Unaligned length " << iovec_[write_index].iov_len << " on "
+              << filename();
       // If we've got over a sector of data to write, write it out with O_DIRECT
       // and then continue writing the rest unaligned.
       if (aligned && iovec_[write_index].iov_len > kSector) {
@@ -110,7 +166,7 @@
         iovec_[write_index].iov_len = aligned_size;
 
         const auto code =
-            WriteV(iovec_.data(), i + 1, true, counted_size + aligned_size);
+            WriteV(iovec_.data(), write_index + 1, true, counted_size + aligned_size);
         if (code == WriteCode::kOutOfSpace) {
           return {
               .code = code,
@@ -119,11 +175,7 @@
         }
 
         // Now, everything before here has been written.  Make an iovec out of
-        // the last bytes, and keep going.
-        // TODO (Alexei, Austin): is it safe to do here since it can be a
-        // situation when i >= iovec_size
-        iovec_size -= write_index;
-        iovec_.resize(iovec_size);
+        // the rest and keep going.
         write_index = 0;
         counted_size = 0;
 
@@ -141,10 +193,10 @@
 
   // Either write the aligned data if it is all aligned, or write the rest
   // unaligned if we wrote aligned up above.
-  const auto code = WriteV(iovec_.data(), iovec_.size(), aligned, counted_size);
+  const auto code = WriteV(iovec_.data(), write_index, aligned, counted_size);
   return {
       .code = code,
-      .messages_written = iovec_size,
+      .messages_written = queue.size(),
   };
 }
 
@@ -159,9 +211,22 @@
   }
 
   const auto start = aos::monotonic_clock::now();
-  const ssize_t written = writev(fd_, iovec_data, iovec_size);
 
-  if (written > 0) {
+  if (VLOG_IS_ON(2)) {
+    size_t to_be_written = 0;
+    for (size_t i = 0; i < iovec_size; ++i) {
+      VLOG(2) << "  iov_base " << static_cast<void *>(iovec_data[i].iov_base)
+              << ", iov_len " << iovec_data[i].iov_len;
+      to_be_written += iovec_data[i].iov_len;
+    }
+    CHECK_GT(to_be_written, 0u);
+    VLOG(2) << "Going to write " << to_be_written;
+  }
+
+  const ssize_t written = writev(fd_, iovec_data, iovec_size);
+  VLOG(2) << "Wrote " << written << ", for iovec size " << iovec_size;
+
+  if (FLAGS_sync && written > 0) {
     // Flush asynchronously and force the data out of the cache.
     sync_file_range(fd_, total_write_bytes_, written, SYNC_FILE_RANGE_WRITE);
     if (last_synced_bytes_ != 0) {
@@ -343,4 +408,4 @@
   }
   return WriteCode::kOk;
 }
-}  // namespace aos::logger
\ No newline at end of file
+}  // namespace aos::logger