Rethink the write alignment

Try to decompose write queue to parts that are aligned prior to writing.

It hopefully makes code a little more obvious.

Change-Id: Ic9ebcd6f1d5b91d39f0ed1db8987fed97ccc0f0f
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_backend.cc b/aos/events/logging/log_backend.cc
index 01345c9..9b3f090 100644
--- a/aos/events/logging/log_backend.cc
+++ b/aos/events/logging/log_backend.cc
@@ -16,9 +16,104 @@
     "If true, sync data to disk as we go so we don't get too far ahead.  Also "
     "fadvise that we are done with the memory once it hits disk.");
 
+DEFINE_uint32(queue_reserve, 32, "Pre-reserved size of write queue.");
+
 namespace aos::logger {
 namespace {
 constexpr const char *kTempExtension = ".tmp";
+
+// Assuming that kSector is power of 2, it aligns address to the left size.
+inline size_t AlignToLeft(size_t value) {
+  return value & (~(FileHandler::kSector - 1));
+}
+
+inline bool IsAligned(size_t value) {
+  return value % FileHandler::kSector == 0;
+}
+
+inline bool IsAlignedStart(const absl::Span<const uint8_t> span) {
+  return (reinterpret_cast<size_t>(span.data()) % FileHandler::kSector) == 0;
+}
+
+inline bool IsAlignedLength(const absl::Span<const uint8_t> span) {
+  return (span.size() % FileHandler::kSector) == 0;
+}
+
+}  // namespace
+
+logger::QueueAligner::QueueAligner() {
+  aligned_queue_.reserve(FLAGS_queue_reserve);
+}
+
+void logger::QueueAligner::FillAlignedQueue(
+    const absl::Span<const absl::Span<const uint8_t>> &queue) {
+  aligned_queue_.clear();
+
+  for (const auto &span : queue) {
+    // Generally, every span might have 3 optional parts (i.e. 2^3 cases):
+    // 1. unaligned prefix -  from start till first aligned block.
+    // 2. aligned main - block with aligned start and size
+    // 3. unaligned suffix - block with aligned start, and size less than one
+    // sector. If size of the span is less than 1 sector, let's call it prefix.
+
+    auto *data = span.data();
+    size_t size = span.size();
+    const auto start = reinterpret_cast<size_t>(data);
+    VLOG(2) << "Consider span starting at " << std::hex << start
+            << " with size " << size;
+
+    CHECK_GT(size, 0u) << ": Nobody should be sending empty messages.";
+
+    const auto next_aligned =
+        IsAligned(start) ? start : AlignToLeft(start) + FileHandler::kSector;
+    const auto prefix_size = next_aligned - start;
+    VLOG(2) << "Calculated prefix size " << std::hex << prefix_size;
+
+    if (prefix_size >= size) {
+      // size of prefix >= size of span - alignment is not possible, accept the
+      // whole span
+      VLOG(2) << "Only prefix found";
+      CHECK_GT(size, 0u);
+      aligned_queue_.emplace_back(data, size, false);
+      continue;
+    }
+    CHECK_LT(prefix_size, FileHandler::kSector)
+        << ": Wrong calculation of 'next' aligned position";
+    if (prefix_size > 0) {
+      // Cut the prefix and move to the main part.
+      VLOG(2) << "Cutting prefix at " << std::hex << start << " of size "
+              << prefix_size;
+      aligned_queue_.emplace_back(data, prefix_size, false);
+      data += prefix_size;
+      size -= prefix_size;
+      CHECK(data <= span.data() + span.size()) << " :Boundaries after prefix";
+    }
+
+    if (IsAligned(size)) {
+      // the rest is aligned.
+      VLOG(2) << "Returning aligned main part";
+      CHECK_GT(size, 0u);
+      aligned_queue_.emplace_back(data, size, true);
+      continue;
+    }
+
+    const auto aligned_size = AlignToLeft(size);
+    CHECK(aligned_size < size) << ": Wrong calculation of 'main' size";
+    if (aligned_size > 0) {
+      VLOG(2) << "Cutting main part starting " << std::hex
+              << reinterpret_cast<size_t>(data) << " of size " << aligned_size;
+      aligned_queue_.emplace_back(data, aligned_size, true);
+
+      data += aligned_size;
+      size -= aligned_size;
+      CHECK(data <= span.data() + span.size()) << " :Boundaries after main";
+    }
+
+    VLOG(2) << "Cutting suffix part starting " << std::hex
+            << reinterpret_cast<size_t>(data) << " of size " << size;
+    CHECK_GT(size, 0u);
+    aligned_queue_.emplace_back(data, size, false);
+  }
 }
 
 FileHandler::FileHandler(std::string filename)
@@ -73,25 +168,13 @@
   }
 }
 
-inline bool IsAlignedStart(const absl::Span<const uint8_t> span) {
-  return (reinterpret_cast<size_t>(span.data()) % FileHandler::kSector) == 0;
-}
-
-inline bool IsAlignedLength(const absl::Span<const uint8_t> span) {
-  return (span.size() % FileHandler::kSector) == 0;
-}
-
-inline bool HasAtleastOneSector(const absl::Span<const uint8_t> span) {
-  return span.size() >= FileHandler::kSector;
-}
-
 WriteResult FileHandler::Write(
     const absl::Span<const absl::Span<const uint8_t>> &queue) {
   iovec_.clear();
   CHECK_LE(queue.size(), static_cast<size_t>(IOV_MAX));
-  iovec_.resize(queue.size());
-  // Size of the data currently in iovec_.
-  size_t counted_size = 0;
+
+  queue_aligner_.FillAlignedQueue(queue);
+  CHECK_LE(queue_aligner_.aligned_queue().size(), static_cast<size_t>(IOV_MAX));
 
   // Ok, we now need to figure out if we were aligned, and if we were, how much
   // of the data we are being asked to write is aligned.
@@ -101,148 +184,47 @@
   // kSector in memory, and the length being written is a multiple of kSector.
   // Some of the callers use an aligned ResizeableBuffer to generate 512 byte
   // aligned buffers for this code to find and use.
-  bool was_aligned = (total_write_bytes_ % kSector) == 0;
+  bool was_aligned = IsAligned(total_write_bytes_);
+  VLOG(1) << "Started " << (was_aligned ? "aligned" : "unaligned")
+          << " at offset " << total_write_bytes_ << " on " << filename();
 
-  if (was_aligned) {
-    VLOG(1) << "Started aligned at offset " << total_write_bytes_ << " on "
-            << filename();
-  } else {
-    VLOG(1) << "Started unaligned at offset " << total_write_bytes_ << " on "
-            << filename();
-  }
-
-  // Index we are filling in next.  Keeps resetting back to 0 as we write
-  // intermediates.
-  size_t write_index = 0;
-  for (size_t i = 0; i < queue.size(); ++i) {
-    iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
-    iovec_[write_index].iov_len = queue[i].size();
-
-    // Make sure the address is aligned, or give up.  This should be uncommon,
-    // but is always possible.
-    if (!IsAlignedStart(queue[i])) {
-      // Flush if we were aligned and have data.
-      if (was_aligned && write_index != 0) {
-        VLOG(1) << "Was aligned, now is not, writing previous data";
-        const auto code =
-            WriteV(iovec_.data(), write_index, true, counted_size);
+  // Walk through aligned queue and batch writes basel on aligned flag
+  for (const auto &item : queue_aligner_.aligned_queue()) {
+    if (was_aligned != item.aligned) {
+      // Switching aligned context. Let's flush current batch.
+      if (!iovec_.empty()) {
+        // Flush current queue if we need.
+        const auto code = WriteV(iovec_, was_aligned);
         if (code == WriteCode::kOutOfSpace) {
+          // We cannot say anything about what number of messages was written
+          // for sure.
           return {
               .code = code,
-              .messages_written = i,
+              .messages_written = queue.size(),
           };
         }
-
-        // Now, everything before here has been written.  Make an iovec out of
-        // the rest and keep going.
-        write_index = 0;
-        counted_size = 0;
-
-        iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
-        iovec_[write_index].iov_len = queue[i].size();
+        iovec_.clear();
       }
-      was_aligned = false;
-    } else {
-      // We are now starting aligned again, and have data worth writing! Flush
-      // what was there before.
-      if (!was_aligned && HasAtleastOneSector(queue[i]) &&
-          ((total_write_bytes_ + counted_size) % kSector) == 0 &&
-          write_index != 0) {
-        VLOG(1) << "Was not aligned, now is, writing previous data";
-
-        const auto code =
-            WriteV(iovec_.data(), write_index, false, counted_size);
-        if (code == WriteCode::kOutOfSpace) {
-          return {
-              .code = code,
-              .messages_written = i,
-          };
-        }
-
-        // Now, everything before here has been written.  Make an iovec out of
-        // the rest and keep going.
-        write_index = 0;
-        counted_size = 0;
-
-        iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
-        iovec_[write_index].iov_len = queue[i].size();
-        was_aligned = true;
-      }
+      // Write queue is flushed. WriteV updates the total_write_bytes_.
+      was_aligned = IsAligned(total_write_bytes_) && item.aligned;
     }
-
-    // Now, see if the length is a multiple of kSector.  The goal is to figure
-    // out if/how much memory we can write out with O_DIRECT so that only the
-    // last little bit is done with non-direct IO to keep it fast.
-    if (!IsAlignedLength(queue[i])) {
-      VLOG(1) << "Unaligned length " << queue[i].size() << " on " << filename();
-      // If we've got over a sector of data to write, write it out with
-      // O_DIRECT and then continue writing the rest unaligned.
-      if (was_aligned) {
-        if (!HasAtleastOneSector(queue[i])) {
-          if (write_index > 0) {
-            const auto code =
-                WriteV(iovec_.data(), write_index, true, counted_size);
-            if (code == WriteCode::kOutOfSpace) {
-              return {
-                  .code = code,
-                  .messages_written = i,
-              };
-            }
-
-            // Now, everything before here has been written.  Make an iovec out
-            // of the rest and keep going.
-            write_index = 0;
-            counted_size = 0;
-
-            iovec_[write_index].iov_base =
-                const_cast<uint8_t *>(queue[i].data());
-            iovec_[write_index].iov_len = queue[i].size();
-          }
-        } else {
-          const size_t aligned_size =
-              iovec_[write_index].iov_len & (~(kSector - 1));
-          VLOG(1) << "Was aligned, writing last chunk rounded from "
-                  << queue[i].size() << " to " << aligned_size;
-          iovec_[write_index].iov_len = aligned_size;
-
-          const auto code = WriteV(iovec_.data(), write_index + 1, true,
-                                   counted_size + aligned_size);
-          if (code == WriteCode::kOutOfSpace) {
-            return {
-                .code = code,
-                .messages_written = i,
-            };
-          }
-
-          // Now, everything before here has been written.  Make an iovec out of
-          // the rest and keep going.
-          write_index = 0;
-          counted_size = 0;
-
-          iovec_[write_index].iov_base =
-              const_cast<uint8_t *>(queue[i].data() + aligned_size);
-          iovec_[write_index].iov_len = queue[i].size() - aligned_size;
-        }
-      }
-      was_aligned = false;
-    }
-    VLOG(1) << "Writing " << iovec_[write_index].iov_len << " to "
-            << filename();
-    counted_size += iovec_[write_index].iov_len;
-    ++write_index;
+    iovec_.push_back(
+        {.iov_base = const_cast<uint8_t *>(item.data), .iov_len = item.size});
   }
 
-  // Either write the aligned data if it is all aligned, or write the rest
-  // unaligned if we wrote aligned up above.
-  const auto code = WriteV(iovec_.data(), write_index, was_aligned, counted_size);
+  WriteCode result_code = WriteCode::kOk;
+  if (!iovec_.empty()) {
+    // Flush current queue if we need.
+    result_code = WriteV(iovec_, was_aligned);
+  }
   return {
-      .code = code,
+      .code = result_code,
       .messages_written = queue.size(),
   };
 }
 
-WriteCode FileHandler::WriteV(struct iovec *iovec_data, size_t iovec_size,
-                              bool aligned, size_t counted_size) {
+WriteCode FileHandler::WriteV(const std::vector<struct iovec> &iovec,
+                              bool aligned) {
   // Configure the file descriptor to match the mode we should be in.  This is
   // safe to over-call since it only does the syscall if needed.
   if (aligned) {
@@ -251,55 +233,53 @@
     DisableDirect();
   }
 
-  CHECK_GT(iovec_size, 0u);
+  VLOG(2) << "Flushing queue of " << iovec.size() << " elements, "
+          << (aligned ? "aligned" : "unaligned");
+
+  CHECK_GT(iovec.size(), 0u);
   const auto start = aos::monotonic_clock::now();
 
+  // Validation of alignment assumptions.
   if (aligned) {
-    CHECK_EQ((total_write_bytes_ % FileHandler::kSector), 0u)
+    CHECK(IsAligned(total_write_bytes_))
         << ": Failed after writing " << total_write_bytes_
         << " to the file, attempting aligned write with unaligned start.";
-    for (size_t i = 0; i < iovec_size; ++i) {
+
+    for (const auto &iovec_item : iovec) {
       absl::Span<const uint8_t> data(
-          reinterpret_cast<const uint8_t *>(iovec_data[i].iov_base),
-          iovec_data[i].iov_len);
-      VLOG(2) << "  iov_base " << static_cast<void *>(iovec_data[i].iov_base)
-              << ", iov_len " << iovec_data[i].iov_len;
+          reinterpret_cast<const uint8_t *>(iovec_item.iov_base),
+          iovec_item.iov_len);
+      VLOG(2) << "  iov_base " << static_cast<void *>(iovec_item.iov_base)
+              << ", iov_len " << iovec_item.iov_len;
       CHECK(IsAlignedStart(data) && IsAlignedLength(data));
-      CHECK_GT(data.size(), 0u);
-    }
-  } else {
-    size_t accumulated_write_bytes = total_write_bytes_;
-    for (size_t i = 0; i < iovec_size; ++i) {
-      absl::Span<const uint8_t> data(
-          reinterpret_cast<const uint8_t *>(iovec_data[i].iov_base),
-          iovec_data[i].iov_len);
-      VLOG(2) << "  accumulated_write_bytes 0x" << std::hex
-              << accumulated_write_bytes << " (" << std::dec
-              << accumulated_write_bytes << "), iov_base "
-              << static_cast<void *>(iovec_data[i].iov_base) << ", iov_len 0x"
-              << std::hex << iovec_data[i].iov_len << " (" << std::dec
-              << iovec_data[i].iov_len << ")";
-
-      if ((accumulated_write_bytes % FileHandler::kSector) == 0) {
-        CHECK(!IsAlignedStart(data) || !IsAlignedLength(data));
-      }
-
-      accumulated_write_bytes += data.size();
-      CHECK_GT(data.size(), 0u);
     }
   }
 
-  if (VLOG_IS_ON(2)) {
-    size_t to_be_written = 0;
-    for (size_t i = 0; i < iovec_size; ++i) {
-      to_be_written += iovec_data[i].iov_len;
-    }
-    VLOG(2) << "Going to write " << to_be_written;
-    CHECK_GT(to_be_written, 0u);
+  // Calculation of expected written size.
+  size_t counted_size = 0;
+  for (const auto &iovec_item : iovec) {
+    CHECK_GT(iovec_item.iov_len, 0u);
+    counted_size += iovec_item.iov_len;
   }
 
-  const ssize_t written = writev(fd_, iovec_data, iovec_size);
-  VLOG(2) << "Wrote " << written << ", for iovec size " << iovec_size;
+  VLOG(2) << "Going to write " << counted_size;
+  CHECK_GT(counted_size, 0u);
+
+  const ssize_t written = writev(fd_, iovec.data(), iovec.size());
+  VLOG(2) << "Wrote " << written << ", for iovec size " << iovec.size();
+
+  const auto end = aos::monotonic_clock::now();
+  if (written == -1 && errno == ENOSPC) {
+    return WriteCode::kOutOfSpace;
+  }
+  PCHECK(written >= 0) << ": write failed, got " << written;
+  if (written < static_cast<ssize_t>(counted_size)) {
+    // Sometimes this happens instead of ENOSPC. On a real filesystem, this
+    // never seems to happen in any other case. If we ever want to log to a
+    // socket, this will happen more often. However, until we get there, we'll
+    // just assume it means we ran out of space.
+    return WriteCode::kOutOfSpace;
+  }
 
   if (FLAGS_sync) {
     // Flush asynchronously and force the data out of the cache.
@@ -321,21 +301,11 @@
     last_synced_bytes_ = total_write_bytes_;
   }
 
-  const auto end = aos::monotonic_clock::now();
-  if (written == -1 && errno == ENOSPC) {
-    return WriteCode::kOutOfSpace;
-  }
-  PCHECK(written >= 0) << ": write failed, got " << written;
-  if (written < static_cast<ssize_t>(counted_size)) {
-    // Sometimes this happens instead of ENOSPC. On a real filesystem, this
-    // never seems to happen in any other case. If we ever want to log to a
-    // socket, this will happen more often. However, until we get there, we'll
-    // just assume it means we ran out of space.
-    return WriteCode::kOutOfSpace;
-  }
-
   total_write_bytes_ += written;
-  write_stats_.UpdateStats(end - start, written, iovec_size);
+  if (aligned) {
+    written_aligned_ += written;
+  }
+  write_stats_.UpdateStats(end - start, written, iovec.size());
   return WriteCode::kOk;
 }
 
@@ -483,4 +453,5 @@
   }
   return WriteCode::kOk;
 }
+
 }  // namespace aos::logger
diff --git a/aos/events/logging/log_backend.h b/aos/events/logging/log_backend.h
index 20ade91..469c120 100644
--- a/aos/events/logging/log_backend.h
+++ b/aos/events/logging/log_backend.h
@@ -77,6 +77,36 @@
   size_t messages_written = 0;
 };
 
+// Source for iovec with additional flag that pointer and size of data is
+// aligned and be ready for O_DIRECT operation.
+struct AlignedIovec {
+  const uint8_t *data;
+  size_t size;
+  bool aligned;
+
+  AlignedIovec(const uint8_t *data, size_t size, bool aligned)
+      : data(data), size(size), aligned(aligned) {}
+};
+
+// Converts queue of pieces to write to the disk to the queue where every
+// element is either aligned for O_DIRECT operation or marked as not aligned.
+class QueueAligner {
+ public:
+  QueueAligner();
+
+  // Reads input queue and fills with aligned and unaligned pieces. It is easy
+  // to deal with smaller pieces and batch it during the write operation.
+  void FillAlignedQueue(
+      const absl::Span<const absl::Span<const uint8_t>> &queue);
+
+  const std::vector<AlignedIovec> &aligned_queue() const {
+    return aligned_queue_;
+  }
+
+ private:
+  std::vector<AlignedIovec> aligned_queue_;
+};
+
 // FileHandler is a replacement for bare filename in log writing and reading
 // operations.
 //
@@ -94,8 +124,6 @@
 //  4) Not all filesystems support O_DIRECT, and different sizes may be optimal
 //     for different machines.  The defaults should work decently anywhere and
 //     be tunable for faster systems.
-// TODO (Alexei): need 2 variations, to support systems with and without
-// O_DIRECT
 class FileHandler {
  public:
   // Size of an aligned sector used to detect when the data is aligned enough to
@@ -139,6 +167,9 @@
   // Get access to statistics related to the write operations.
   WriteStats *WriteStatistics() { return &write_stats_; }
 
+  // Number of bytes written in aligned mode. It is mostly for testing.
+  size_t written_aligned() const { return written_aligned_; }
+
  private:
   // Enables O_DIRECT on the open file if it is supported.  Cheap to call if it
   // is already enabled.
@@ -149,11 +180,9 @@
 
   bool ODirectEnabled() const { return !!(flags_ & O_DIRECT); }
 
-  // Writes a chunk of iovecs.  aligned is true if all the data is kSector byte
-  // aligned and multiples of it in length, and counted_size is the sum of the
-  // sizes of all the chunks of data.
-  WriteCode WriteV(struct iovec *iovec_data, size_t iovec_size, bool aligned,
-                   size_t counted_size);
+  // Writes a chunk of iovecs. aligned is true if all the data is kSector byte
+  // aligned and multiples of it in length.
+  WriteCode WriteV(const std::vector<struct iovec> &iovec, bool aligned);
 
   const std::string filename_;
 
@@ -163,9 +192,13 @@
   // churn.
   std::vector<struct iovec> iovec_;
 
+  QueueAligner queue_aligner_;
+
   int total_write_bytes_ = 0;
   int last_synced_bytes_ = 0;
 
+  size_t written_aligned_ = 0;
+
   bool supports_odirect_ = true;
   int flags_ = 0;
 
diff --git a/aos/events/logging/log_backend_test.cc b/aos/events/logging/log_backend_test.cc
index 1ebcde6..b969218 100644
--- a/aos/events/logging/log_backend_test.cc
+++ b/aos/events/logging/log_backend_test.cc
@@ -92,6 +92,109 @@
   EXPECT_TRUE(std::filesystem::exists(renamed + "test.log"));
 }
 
+TEST(QueueAlignmentTest, Cases) {
+  QueueAligner aligner;
+  uint8_t *start = nullptr;
+  {
+    // Only prefix
+    std::vector<absl::Span<const uint8_t>> queue;
+    const uint8_t *current = start + 1;
+    queue.emplace_back(current, FileHandler::kSector - 2);
+    aligner.FillAlignedQueue(queue);
+    ASSERT_EQ(aligner.aligned_queue().size(), 1);
+    const auto &prefix = aligner.aligned_queue()[0];
+    EXPECT_FALSE(prefix.aligned);
+    EXPECT_EQ(prefix.size, FileHandler::kSector - 2);
+  }
+  {
+    // Only main
+    std::vector<absl::Span<const uint8_t>> queue;
+    const uint8_t *current = start;
+    queue.emplace_back(current, FileHandler::kSector);
+    aligner.FillAlignedQueue(queue);
+    ASSERT_EQ(aligner.aligned_queue().size(), 1);
+    const auto &main = aligner.aligned_queue()[0];
+    EXPECT_TRUE(main.aligned);
+    EXPECT_EQ(main.size, FileHandler::kSector);
+  }
+  {
+    // Empty
+    std::vector<absl::Span<const uint8_t>> queue;
+    const uint8_t *current = start;
+    queue.emplace_back(current, 0);
+    EXPECT_DEATH(aligner.FillAlignedQueue(queue),
+                 "Nobody should be sending empty messages");
+  }
+  {
+    // Main and suffix
+    std::vector<absl::Span<const uint8_t>> queue;
+    const uint8_t *current = start;
+    queue.emplace_back(current, FileHandler::kSector + 1);
+    aligner.FillAlignedQueue(queue);
+    ASSERT_EQ(aligner.aligned_queue().size(), 2);
+
+    const auto &main = aligner.aligned_queue()[0];
+    EXPECT_TRUE(main.aligned);
+    EXPECT_EQ(main.size, FileHandler::kSector);
+
+    const auto &suffix = aligner.aligned_queue()[1];
+    EXPECT_FALSE(suffix.aligned);
+    EXPECT_EQ(suffix.size, 1);
+  }
+  {
+    // Prefix, main
+    std::vector<absl::Span<const uint8_t>> queue;
+    const uint8_t *current = start + 1;
+    queue.emplace_back(current, 2 * FileHandler::kSector - 1);
+    aligner.FillAlignedQueue(queue);
+    ASSERT_EQ(aligner.aligned_queue().size(), 2);
+
+    const auto &prefix = aligner.aligned_queue()[0];
+    EXPECT_FALSE(prefix.aligned);
+    EXPECT_EQ(prefix.size, FileHandler::kSector - 1);
+
+    const auto &main = aligner.aligned_queue()[1];
+    EXPECT_TRUE(main.aligned);
+    EXPECT_EQ(main.size, FileHandler::kSector);
+  }
+  {
+    // Prefix and suffix
+    std::vector<absl::Span<const uint8_t>> queue;
+    const uint8_t *current = start + 1;
+    queue.emplace_back(current, 2 * FileHandler::kSector - 2);
+    aligner.FillAlignedQueue(queue);
+    ASSERT_EQ(aligner.aligned_queue().size(), 2);
+
+    const auto &prefix = aligner.aligned_queue()[0];
+    EXPECT_FALSE(prefix.aligned);
+    EXPECT_EQ(prefix.size, FileHandler::kSector - 1);
+
+    const auto &suffix = aligner.aligned_queue()[1];
+    EXPECT_FALSE(suffix.aligned);
+    EXPECT_EQ(suffix.size, FileHandler::kSector - 1);
+  }
+  {
+    // Prefix, main and suffix
+    std::vector<absl::Span<const uint8_t>> queue;
+    const uint8_t *current = start + 1;
+    queue.emplace_back(current, 3 * FileHandler::kSector - 2);
+    aligner.FillAlignedQueue(queue);
+    ASSERT_EQ(aligner.aligned_queue().size(), 3);
+
+    const auto &prefix = aligner.aligned_queue()[0];
+    EXPECT_FALSE(prefix.aligned);
+    EXPECT_EQ(prefix.size, FileHandler::kSector - 1);
+
+    const auto &main = aligner.aligned_queue()[1];
+    EXPECT_TRUE(main.aligned);
+    EXPECT_EQ(main.size, FileHandler::kSector);
+
+    const auto &suffix = aligner.aligned_queue()[2];
+    EXPECT_FALSE(suffix.aligned);
+    EXPECT_EQ(suffix.size, FileHandler::kSector - 1);
+  }
+}
+
 // It represents calls to Write function (batching of calls and messages) where
 // int values are sizes of each message in the queue.
 using WriteRecipe = std::vector<std::vector<int>>;
@@ -261,6 +364,7 @@
   auto result = handler->Write(queue);
   EXPECT_EQ(result.code, WriteCode::kOk);
   EXPECT_EQ(result.messages_written, queue.size());
+  EXPECT_GT(handler->written_aligned(), 0);
 
   ASSERT_EQ(handler->Close(), WriteCode::kOk);
   EXPECT_TRUE(std::filesystem::exists(file));