Rethink the write alignment

Try to decompose write queue to parts that are aligned prior to writing.

It hopefully makes code a little more obvious.

Change-Id: Ic9ebcd6f1d5b91d39f0ed1db8987fed97ccc0f0f
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_backend.cc b/aos/events/logging/log_backend.cc
index 01345c9..9b3f090 100644
--- a/aos/events/logging/log_backend.cc
+++ b/aos/events/logging/log_backend.cc
@@ -16,9 +16,104 @@
     "If true, sync data to disk as we go so we don't get too far ahead.  Also "
     "fadvise that we are done with the memory once it hits disk.");
 
+DEFINE_uint32(queue_reserve, 32, "Pre-reserved size of write queue.");
+
 namespace aos::logger {
 namespace {
 constexpr const char *kTempExtension = ".tmp";
+
+// Assuming that kSector is power of 2, it aligns address to the left size.
+inline size_t AlignToLeft(size_t value) {
+  return value & (~(FileHandler::kSector - 1));
+}
+
+inline bool IsAligned(size_t value) {
+  return value % FileHandler::kSector == 0;
+}
+
+inline bool IsAlignedStart(const absl::Span<const uint8_t> span) {
+  return (reinterpret_cast<size_t>(span.data()) % FileHandler::kSector) == 0;
+}
+
+inline bool IsAlignedLength(const absl::Span<const uint8_t> span) {
+  return (span.size() % FileHandler::kSector) == 0;
+}
+
+}  // namespace
+
+logger::QueueAligner::QueueAligner() {
+  aligned_queue_.reserve(FLAGS_queue_reserve);
+}
+
+void logger::QueueAligner::FillAlignedQueue(
+    const absl::Span<const absl::Span<const uint8_t>> &queue) {
+  aligned_queue_.clear();
+
+  for (const auto &span : queue) {
+    // Generally, every span might have 3 optional parts (i.e. 2^3 cases):
+    // 1. unaligned prefix -  from start till first aligned block.
+    // 2. aligned main - block with aligned start and size
+    // 3. unaligned suffix - block with aligned start, and size less than one
+    // sector. If size of the span is less than 1 sector, let's call it prefix.
+
+    auto *data = span.data();
+    size_t size = span.size();
+    const auto start = reinterpret_cast<size_t>(data);
+    VLOG(2) << "Consider span starting at " << std::hex << start
+            << " with size " << size;
+
+    CHECK_GT(size, 0u) << ": Nobody should be sending empty messages.";
+
+    const auto next_aligned =
+        IsAligned(start) ? start : AlignToLeft(start) + FileHandler::kSector;
+    const auto prefix_size = next_aligned - start;
+    VLOG(2) << "Calculated prefix size " << std::hex << prefix_size;
+
+    if (prefix_size >= size) {
+      // size of prefix >= size of span - alignment is not possible, accept the
+      // whole span
+      VLOG(2) << "Only prefix found";
+      CHECK_GT(size, 0u);
+      aligned_queue_.emplace_back(data, size, false);
+      continue;
+    }
+    CHECK_LT(prefix_size, FileHandler::kSector)
+        << ": Wrong calculation of 'next' aligned position";
+    if (prefix_size > 0) {
+      // Cut the prefix and move to the main part.
+      VLOG(2) << "Cutting prefix at " << std::hex << start << " of size "
+              << prefix_size;
+      aligned_queue_.emplace_back(data, prefix_size, false);
+      data += prefix_size;
+      size -= prefix_size;
+      CHECK(data <= span.data() + span.size()) << " :Boundaries after prefix";
+    }
+
+    if (IsAligned(size)) {
+      // the rest is aligned.
+      VLOG(2) << "Returning aligned main part";
+      CHECK_GT(size, 0u);
+      aligned_queue_.emplace_back(data, size, true);
+      continue;
+    }
+
+    const auto aligned_size = AlignToLeft(size);
+    CHECK(aligned_size < size) << ": Wrong calculation of 'main' size";
+    if (aligned_size > 0) {
+      VLOG(2) << "Cutting main part starting " << std::hex
+              << reinterpret_cast<size_t>(data) << " of size " << aligned_size;
+      aligned_queue_.emplace_back(data, aligned_size, true);
+
+      data += aligned_size;
+      size -= aligned_size;
+      CHECK(data <= span.data() + span.size()) << " :Boundaries after main";
+    }
+
+    VLOG(2) << "Cutting suffix part starting " << std::hex
+            << reinterpret_cast<size_t>(data) << " of size " << size;
+    CHECK_GT(size, 0u);
+    aligned_queue_.emplace_back(data, size, false);
+  }
 }
 
 FileHandler::FileHandler(std::string filename)
@@ -73,25 +168,13 @@
   }
 }
 
-inline bool IsAlignedStart(const absl::Span<const uint8_t> span) {
-  return (reinterpret_cast<size_t>(span.data()) % FileHandler::kSector) == 0;
-}
-
-inline bool IsAlignedLength(const absl::Span<const uint8_t> span) {
-  return (span.size() % FileHandler::kSector) == 0;
-}
-
-inline bool HasAtleastOneSector(const absl::Span<const uint8_t> span) {
-  return span.size() >= FileHandler::kSector;
-}
-
 WriteResult FileHandler::Write(
     const absl::Span<const absl::Span<const uint8_t>> &queue) {
   iovec_.clear();
   CHECK_LE(queue.size(), static_cast<size_t>(IOV_MAX));
-  iovec_.resize(queue.size());
-  // Size of the data currently in iovec_.
-  size_t counted_size = 0;
+
+  queue_aligner_.FillAlignedQueue(queue);
+  CHECK_LE(queue_aligner_.aligned_queue().size(), static_cast<size_t>(IOV_MAX));
 
   // Ok, we now need to figure out if we were aligned, and if we were, how much
   // of the data we are being asked to write is aligned.
@@ -101,148 +184,47 @@
   // kSector in memory, and the length being written is a multiple of kSector.
   // Some of the callers use an aligned ResizeableBuffer to generate 512 byte
   // aligned buffers for this code to find and use.
-  bool was_aligned = (total_write_bytes_ % kSector) == 0;
+  bool was_aligned = IsAligned(total_write_bytes_);
+  VLOG(1) << "Started " << (was_aligned ? "aligned" : "unaligned")
+          << " at offset " << total_write_bytes_ << " on " << filename();
 
-  if (was_aligned) {
-    VLOG(1) << "Started aligned at offset " << total_write_bytes_ << " on "
-            << filename();
-  } else {
-    VLOG(1) << "Started unaligned at offset " << total_write_bytes_ << " on "
-            << filename();
-  }
-
-  // Index we are filling in next.  Keeps resetting back to 0 as we write
-  // intermediates.
-  size_t write_index = 0;
-  for (size_t i = 0; i < queue.size(); ++i) {
-    iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
-    iovec_[write_index].iov_len = queue[i].size();
-
-    // Make sure the address is aligned, or give up.  This should be uncommon,
-    // but is always possible.
-    if (!IsAlignedStart(queue[i])) {
-      // Flush if we were aligned and have data.
-      if (was_aligned && write_index != 0) {
-        VLOG(1) << "Was aligned, now is not, writing previous data";
-        const auto code =
-            WriteV(iovec_.data(), write_index, true, counted_size);
+  // Walk through aligned queue and batch writes basel on aligned flag
+  for (const auto &item : queue_aligner_.aligned_queue()) {
+    if (was_aligned != item.aligned) {
+      // Switching aligned context. Let's flush current batch.
+      if (!iovec_.empty()) {
+        // Flush current queue if we need.
+        const auto code = WriteV(iovec_, was_aligned);
         if (code == WriteCode::kOutOfSpace) {
+          // We cannot say anything about what number of messages was written
+          // for sure.
           return {
               .code = code,
-              .messages_written = i,
+              .messages_written = queue.size(),
           };
         }
-
-        // Now, everything before here has been written.  Make an iovec out of
-        // the rest and keep going.
-        write_index = 0;
-        counted_size = 0;
-
-        iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
-        iovec_[write_index].iov_len = queue[i].size();
+        iovec_.clear();
       }
-      was_aligned = false;
-    } else {
-      // We are now starting aligned again, and have data worth writing! Flush
-      // what was there before.
-      if (!was_aligned && HasAtleastOneSector(queue[i]) &&
-          ((total_write_bytes_ + counted_size) % kSector) == 0 &&
-          write_index != 0) {
-        VLOG(1) << "Was not aligned, now is, writing previous data";
-
-        const auto code =
-            WriteV(iovec_.data(), write_index, false, counted_size);
-        if (code == WriteCode::kOutOfSpace) {
-          return {
-              .code = code,
-              .messages_written = i,
-          };
-        }
-
-        // Now, everything before here has been written.  Make an iovec out of
-        // the rest and keep going.
-        write_index = 0;
-        counted_size = 0;
-
-        iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
-        iovec_[write_index].iov_len = queue[i].size();
-        was_aligned = true;
-      }
+      // Write queue is flushed. WriteV updates the total_write_bytes_.
+      was_aligned = IsAligned(total_write_bytes_) && item.aligned;
     }
-
-    // Now, see if the length is a multiple of kSector.  The goal is to figure
-    // out if/how much memory we can write out with O_DIRECT so that only the
-    // last little bit is done with non-direct IO to keep it fast.
-    if (!IsAlignedLength(queue[i])) {
-      VLOG(1) << "Unaligned length " << queue[i].size() << " on " << filename();
-      // If we've got over a sector of data to write, write it out with
-      // O_DIRECT and then continue writing the rest unaligned.
-      if (was_aligned) {
-        if (!HasAtleastOneSector(queue[i])) {
-          if (write_index > 0) {
-            const auto code =
-                WriteV(iovec_.data(), write_index, true, counted_size);
-            if (code == WriteCode::kOutOfSpace) {
-              return {
-                  .code = code,
-                  .messages_written = i,
-              };
-            }
-
-            // Now, everything before here has been written.  Make an iovec out
-            // of the rest and keep going.
-            write_index = 0;
-            counted_size = 0;
-
-            iovec_[write_index].iov_base =
-                const_cast<uint8_t *>(queue[i].data());
-            iovec_[write_index].iov_len = queue[i].size();
-          }
-        } else {
-          const size_t aligned_size =
-              iovec_[write_index].iov_len & (~(kSector - 1));
-          VLOG(1) << "Was aligned, writing last chunk rounded from "
-                  << queue[i].size() << " to " << aligned_size;
-          iovec_[write_index].iov_len = aligned_size;
-
-          const auto code = WriteV(iovec_.data(), write_index + 1, true,
-                                   counted_size + aligned_size);
-          if (code == WriteCode::kOutOfSpace) {
-            return {
-                .code = code,
-                .messages_written = i,
-            };
-          }
-
-          // Now, everything before here has been written.  Make an iovec out of
-          // the rest and keep going.
-          write_index = 0;
-          counted_size = 0;
-
-          iovec_[write_index].iov_base =
-              const_cast<uint8_t *>(queue[i].data() + aligned_size);
-          iovec_[write_index].iov_len = queue[i].size() - aligned_size;
-        }
-      }
-      was_aligned = false;
-    }
-    VLOG(1) << "Writing " << iovec_[write_index].iov_len << " to "
-            << filename();
-    counted_size += iovec_[write_index].iov_len;
-    ++write_index;
+    iovec_.push_back(
+        {.iov_base = const_cast<uint8_t *>(item.data), .iov_len = item.size});
   }
 
-  // Either write the aligned data if it is all aligned, or write the rest
-  // unaligned if we wrote aligned up above.
-  const auto code = WriteV(iovec_.data(), write_index, was_aligned, counted_size);
+  WriteCode result_code = WriteCode::kOk;
+  if (!iovec_.empty()) {
+    // Flush current queue if we need.
+    result_code = WriteV(iovec_, was_aligned);
+  }
   return {
-      .code = code,
+      .code = result_code,
       .messages_written = queue.size(),
   };
 }
 
-WriteCode FileHandler::WriteV(struct iovec *iovec_data, size_t iovec_size,
-                              bool aligned, size_t counted_size) {
+WriteCode FileHandler::WriteV(const std::vector<struct iovec> &iovec,
+                              bool aligned) {
   // Configure the file descriptor to match the mode we should be in.  This is
   // safe to over-call since it only does the syscall if needed.
   if (aligned) {
@@ -251,55 +233,53 @@
     DisableDirect();
   }
 
-  CHECK_GT(iovec_size, 0u);
+  VLOG(2) << "Flushing queue of " << iovec.size() << " elements, "
+          << (aligned ? "aligned" : "unaligned");
+
+  CHECK_GT(iovec.size(), 0u);
   const auto start = aos::monotonic_clock::now();
 
+  // Validation of alignment assumptions.
   if (aligned) {
-    CHECK_EQ((total_write_bytes_ % FileHandler::kSector), 0u)
+    CHECK(IsAligned(total_write_bytes_))
         << ": Failed after writing " << total_write_bytes_
         << " to the file, attempting aligned write with unaligned start.";
-    for (size_t i = 0; i < iovec_size; ++i) {
+
+    for (const auto &iovec_item : iovec) {
       absl::Span<const uint8_t> data(
-          reinterpret_cast<const uint8_t *>(iovec_data[i].iov_base),
-          iovec_data[i].iov_len);
-      VLOG(2) << "  iov_base " << static_cast<void *>(iovec_data[i].iov_base)
-              << ", iov_len " << iovec_data[i].iov_len;
+          reinterpret_cast<const uint8_t *>(iovec_item.iov_base),
+          iovec_item.iov_len);
+      VLOG(2) << "  iov_base " << static_cast<void *>(iovec_item.iov_base)
+              << ", iov_len " << iovec_item.iov_len;
       CHECK(IsAlignedStart(data) && IsAlignedLength(data));
-      CHECK_GT(data.size(), 0u);
-    }
-  } else {
-    size_t accumulated_write_bytes = total_write_bytes_;
-    for (size_t i = 0; i < iovec_size; ++i) {
-      absl::Span<const uint8_t> data(
-          reinterpret_cast<const uint8_t *>(iovec_data[i].iov_base),
-          iovec_data[i].iov_len);
-      VLOG(2) << "  accumulated_write_bytes 0x" << std::hex
-              << accumulated_write_bytes << " (" << std::dec
-              << accumulated_write_bytes << "), iov_base "
-              << static_cast<void *>(iovec_data[i].iov_base) << ", iov_len 0x"
-              << std::hex << iovec_data[i].iov_len << " (" << std::dec
-              << iovec_data[i].iov_len << ")";
-
-      if ((accumulated_write_bytes % FileHandler::kSector) == 0) {
-        CHECK(!IsAlignedStart(data) || !IsAlignedLength(data));
-      }
-
-      accumulated_write_bytes += data.size();
-      CHECK_GT(data.size(), 0u);
     }
   }
 
-  if (VLOG_IS_ON(2)) {
-    size_t to_be_written = 0;
-    for (size_t i = 0; i < iovec_size; ++i) {
-      to_be_written += iovec_data[i].iov_len;
-    }
-    VLOG(2) << "Going to write " << to_be_written;
-    CHECK_GT(to_be_written, 0u);
+  // Calculation of expected written size.
+  size_t counted_size = 0;
+  for (const auto &iovec_item : iovec) {
+    CHECK_GT(iovec_item.iov_len, 0u);
+    counted_size += iovec_item.iov_len;
   }
 
-  const ssize_t written = writev(fd_, iovec_data, iovec_size);
-  VLOG(2) << "Wrote " << written << ", for iovec size " << iovec_size;
+  VLOG(2) << "Going to write " << counted_size;
+  CHECK_GT(counted_size, 0u);
+
+  const ssize_t written = writev(fd_, iovec.data(), iovec.size());
+  VLOG(2) << "Wrote " << written << ", for iovec size " << iovec.size();
+
+  const auto end = aos::monotonic_clock::now();
+  if (written == -1 && errno == ENOSPC) {
+    return WriteCode::kOutOfSpace;
+  }
+  PCHECK(written >= 0) << ": write failed, got " << written;
+  if (written < static_cast<ssize_t>(counted_size)) {
+    // Sometimes this happens instead of ENOSPC. On a real filesystem, this
+    // never seems to happen in any other case. If we ever want to log to a
+    // socket, this will happen more often. However, until we get there, we'll
+    // just assume it means we ran out of space.
+    return WriteCode::kOutOfSpace;
+  }
 
   if (FLAGS_sync) {
     // Flush asynchronously and force the data out of the cache.
@@ -321,21 +301,11 @@
     last_synced_bytes_ = total_write_bytes_;
   }
 
-  const auto end = aos::monotonic_clock::now();
-  if (written == -1 && errno == ENOSPC) {
-    return WriteCode::kOutOfSpace;
-  }
-  PCHECK(written >= 0) << ": write failed, got " << written;
-  if (written < static_cast<ssize_t>(counted_size)) {
-    // Sometimes this happens instead of ENOSPC. On a real filesystem, this
-    // never seems to happen in any other case. If we ever want to log to a
-    // socket, this will happen more often. However, until we get there, we'll
-    // just assume it means we ran out of space.
-    return WriteCode::kOutOfSpace;
-  }
-
   total_write_bytes_ += written;
-  write_stats_.UpdateStats(end - start, written, iovec_size);
+  if (aligned) {
+    written_aligned_ += written;
+  }
+  write_stats_.UpdateStats(end - start, written, iovec.size());
   return WriteCode::kOk;
 }
 
@@ -483,4 +453,5 @@
   }
   return WriteCode::kOk;
 }
+
 }  // namespace aos::logger