Rethink the write alignment
Try to decompose write queue to parts that are aligned prior to writing.
It hopefully makes code a little more obvious.
Change-Id: Ic9ebcd6f1d5b91d39f0ed1db8987fed97ccc0f0f
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_backend.cc b/aos/events/logging/log_backend.cc
index 01345c9..9b3f090 100644
--- a/aos/events/logging/log_backend.cc
+++ b/aos/events/logging/log_backend.cc
@@ -16,9 +16,104 @@
"If true, sync data to disk as we go so we don't get too far ahead. Also "
"fadvise that we are done with the memory once it hits disk.");
+DEFINE_uint32(queue_reserve, 32, "Pre-reserved size of write queue.");
+
namespace aos::logger {
namespace {
constexpr const char *kTempExtension = ".tmp";
+
+// Assuming that kSector is power of 2, it aligns address to the left size.
+inline size_t AlignToLeft(size_t value) {
+ return value & (~(FileHandler::kSector - 1));
+}
+
+inline bool IsAligned(size_t value) {
+ return value % FileHandler::kSector == 0;
+}
+
+inline bool IsAlignedStart(const absl::Span<const uint8_t> span) {
+ return (reinterpret_cast<size_t>(span.data()) % FileHandler::kSector) == 0;
+}
+
+inline bool IsAlignedLength(const absl::Span<const uint8_t> span) {
+ return (span.size() % FileHandler::kSector) == 0;
+}
+
+} // namespace
+
+logger::QueueAligner::QueueAligner() {
+ aligned_queue_.reserve(FLAGS_queue_reserve);
+}
+
+void logger::QueueAligner::FillAlignedQueue(
+ const absl::Span<const absl::Span<const uint8_t>> &queue) {
+ aligned_queue_.clear();
+
+ for (const auto &span : queue) {
+ // Generally, every span might have 3 optional parts (i.e. 2^3 cases):
+ // 1. unaligned prefix - from start till first aligned block.
+ // 2. aligned main - block with aligned start and size
+ // 3. unaligned suffix - block with aligned start, and size less than one
+ // sector. If size of the span is less than 1 sector, let's call it prefix.
+
+ auto *data = span.data();
+ size_t size = span.size();
+ const auto start = reinterpret_cast<size_t>(data);
+ VLOG(2) << "Consider span starting at " << std::hex << start
+ << " with size " << size;
+
+ CHECK_GT(size, 0u) << ": Nobody should be sending empty messages.";
+
+ const auto next_aligned =
+ IsAligned(start) ? start : AlignToLeft(start) + FileHandler::kSector;
+ const auto prefix_size = next_aligned - start;
+ VLOG(2) << "Calculated prefix size " << std::hex << prefix_size;
+
+ if (prefix_size >= size) {
+ // size of prefix >= size of span - alignment is not possible, accept the
+ // whole span
+ VLOG(2) << "Only prefix found";
+ CHECK_GT(size, 0u);
+ aligned_queue_.emplace_back(data, size, false);
+ continue;
+ }
+ CHECK_LT(prefix_size, FileHandler::kSector)
+ << ": Wrong calculation of 'next' aligned position";
+ if (prefix_size > 0) {
+ // Cut the prefix and move to the main part.
+ VLOG(2) << "Cutting prefix at " << std::hex << start << " of size "
+ << prefix_size;
+ aligned_queue_.emplace_back(data, prefix_size, false);
+ data += prefix_size;
+ size -= prefix_size;
+ CHECK(data <= span.data() + span.size()) << " :Boundaries after prefix";
+ }
+
+ if (IsAligned(size)) {
+ // the rest is aligned.
+ VLOG(2) << "Returning aligned main part";
+ CHECK_GT(size, 0u);
+ aligned_queue_.emplace_back(data, size, true);
+ continue;
+ }
+
+ const auto aligned_size = AlignToLeft(size);
+ CHECK(aligned_size < size) << ": Wrong calculation of 'main' size";
+ if (aligned_size > 0) {
+ VLOG(2) << "Cutting main part starting " << std::hex
+ << reinterpret_cast<size_t>(data) << " of size " << aligned_size;
+ aligned_queue_.emplace_back(data, aligned_size, true);
+
+ data += aligned_size;
+ size -= aligned_size;
+ CHECK(data <= span.data() + span.size()) << " :Boundaries after main";
+ }
+
+ VLOG(2) << "Cutting suffix part starting " << std::hex
+ << reinterpret_cast<size_t>(data) << " of size " << size;
+ CHECK_GT(size, 0u);
+ aligned_queue_.emplace_back(data, size, false);
+ }
}
FileHandler::FileHandler(std::string filename)
@@ -73,25 +168,13 @@
}
}
-inline bool IsAlignedStart(const absl::Span<const uint8_t> span) {
- return (reinterpret_cast<size_t>(span.data()) % FileHandler::kSector) == 0;
-}
-
-inline bool IsAlignedLength(const absl::Span<const uint8_t> span) {
- return (span.size() % FileHandler::kSector) == 0;
-}
-
-inline bool HasAtleastOneSector(const absl::Span<const uint8_t> span) {
- return span.size() >= FileHandler::kSector;
-}
-
WriteResult FileHandler::Write(
const absl::Span<const absl::Span<const uint8_t>> &queue) {
iovec_.clear();
CHECK_LE(queue.size(), static_cast<size_t>(IOV_MAX));
- iovec_.resize(queue.size());
- // Size of the data currently in iovec_.
- size_t counted_size = 0;
+
+ queue_aligner_.FillAlignedQueue(queue);
+ CHECK_LE(queue_aligner_.aligned_queue().size(), static_cast<size_t>(IOV_MAX));
// Ok, we now need to figure out if we were aligned, and if we were, how much
// of the data we are being asked to write is aligned.
@@ -101,148 +184,47 @@
// kSector in memory, and the length being written is a multiple of kSector.
// Some of the callers use an aligned ResizeableBuffer to generate 512 byte
// aligned buffers for this code to find and use.
- bool was_aligned = (total_write_bytes_ % kSector) == 0;
+ bool was_aligned = IsAligned(total_write_bytes_);
+ VLOG(1) << "Started " << (was_aligned ? "aligned" : "unaligned")
+ << " at offset " << total_write_bytes_ << " on " << filename();
- if (was_aligned) {
- VLOG(1) << "Started aligned at offset " << total_write_bytes_ << " on "
- << filename();
- } else {
- VLOG(1) << "Started unaligned at offset " << total_write_bytes_ << " on "
- << filename();
- }
-
- // Index we are filling in next. Keeps resetting back to 0 as we write
- // intermediates.
- size_t write_index = 0;
- for (size_t i = 0; i < queue.size(); ++i) {
- iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
- iovec_[write_index].iov_len = queue[i].size();
-
- // Make sure the address is aligned, or give up. This should be uncommon,
- // but is always possible.
- if (!IsAlignedStart(queue[i])) {
- // Flush if we were aligned and have data.
- if (was_aligned && write_index != 0) {
- VLOG(1) << "Was aligned, now is not, writing previous data";
- const auto code =
- WriteV(iovec_.data(), write_index, true, counted_size);
+ // Walk through aligned queue and batch writes basel on aligned flag
+ for (const auto &item : queue_aligner_.aligned_queue()) {
+ if (was_aligned != item.aligned) {
+ // Switching aligned context. Let's flush current batch.
+ if (!iovec_.empty()) {
+ // Flush current queue if we need.
+ const auto code = WriteV(iovec_, was_aligned);
if (code == WriteCode::kOutOfSpace) {
+ // We cannot say anything about what number of messages was written
+ // for sure.
return {
.code = code,
- .messages_written = i,
+ .messages_written = queue.size(),
};
}
-
- // Now, everything before here has been written. Make an iovec out of
- // the rest and keep going.
- write_index = 0;
- counted_size = 0;
-
- iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
- iovec_[write_index].iov_len = queue[i].size();
+ iovec_.clear();
}
- was_aligned = false;
- } else {
- // We are now starting aligned again, and have data worth writing! Flush
- // what was there before.
- if (!was_aligned && HasAtleastOneSector(queue[i]) &&
- ((total_write_bytes_ + counted_size) % kSector) == 0 &&
- write_index != 0) {
- VLOG(1) << "Was not aligned, now is, writing previous data";
-
- const auto code =
- WriteV(iovec_.data(), write_index, false, counted_size);
- if (code == WriteCode::kOutOfSpace) {
- return {
- .code = code,
- .messages_written = i,
- };
- }
-
- // Now, everything before here has been written. Make an iovec out of
- // the rest and keep going.
- write_index = 0;
- counted_size = 0;
-
- iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
- iovec_[write_index].iov_len = queue[i].size();
- was_aligned = true;
- }
+ // Write queue is flushed. WriteV updates the total_write_bytes_.
+ was_aligned = IsAligned(total_write_bytes_) && item.aligned;
}
-
- // Now, see if the length is a multiple of kSector. The goal is to figure
- // out if/how much memory we can write out with O_DIRECT so that only the
- // last little bit is done with non-direct IO to keep it fast.
- if (!IsAlignedLength(queue[i])) {
- VLOG(1) << "Unaligned length " << queue[i].size() << " on " << filename();
- // If we've got over a sector of data to write, write it out with
- // O_DIRECT and then continue writing the rest unaligned.
- if (was_aligned) {
- if (!HasAtleastOneSector(queue[i])) {
- if (write_index > 0) {
- const auto code =
- WriteV(iovec_.data(), write_index, true, counted_size);
- if (code == WriteCode::kOutOfSpace) {
- return {
- .code = code,
- .messages_written = i,
- };
- }
-
- // Now, everything before here has been written. Make an iovec out
- // of the rest and keep going.
- write_index = 0;
- counted_size = 0;
-
- iovec_[write_index].iov_base =
- const_cast<uint8_t *>(queue[i].data());
- iovec_[write_index].iov_len = queue[i].size();
- }
- } else {
- const size_t aligned_size =
- iovec_[write_index].iov_len & (~(kSector - 1));
- VLOG(1) << "Was aligned, writing last chunk rounded from "
- << queue[i].size() << " to " << aligned_size;
- iovec_[write_index].iov_len = aligned_size;
-
- const auto code = WriteV(iovec_.data(), write_index + 1, true,
- counted_size + aligned_size);
- if (code == WriteCode::kOutOfSpace) {
- return {
- .code = code,
- .messages_written = i,
- };
- }
-
- // Now, everything before here has been written. Make an iovec out of
- // the rest and keep going.
- write_index = 0;
- counted_size = 0;
-
- iovec_[write_index].iov_base =
- const_cast<uint8_t *>(queue[i].data() + aligned_size);
- iovec_[write_index].iov_len = queue[i].size() - aligned_size;
- }
- }
- was_aligned = false;
- }
- VLOG(1) << "Writing " << iovec_[write_index].iov_len << " to "
- << filename();
- counted_size += iovec_[write_index].iov_len;
- ++write_index;
+ iovec_.push_back(
+ {.iov_base = const_cast<uint8_t *>(item.data), .iov_len = item.size});
}
- // Either write the aligned data if it is all aligned, or write the rest
- // unaligned if we wrote aligned up above.
- const auto code = WriteV(iovec_.data(), write_index, was_aligned, counted_size);
+ WriteCode result_code = WriteCode::kOk;
+ if (!iovec_.empty()) {
+ // Flush current queue if we need.
+ result_code = WriteV(iovec_, was_aligned);
+ }
return {
- .code = code,
+ .code = result_code,
.messages_written = queue.size(),
};
}
-WriteCode FileHandler::WriteV(struct iovec *iovec_data, size_t iovec_size,
- bool aligned, size_t counted_size) {
+WriteCode FileHandler::WriteV(const std::vector<struct iovec> &iovec,
+ bool aligned) {
// Configure the file descriptor to match the mode we should be in. This is
// safe to over-call since it only does the syscall if needed.
if (aligned) {
@@ -251,55 +233,53 @@
DisableDirect();
}
- CHECK_GT(iovec_size, 0u);
+ VLOG(2) << "Flushing queue of " << iovec.size() << " elements, "
+ << (aligned ? "aligned" : "unaligned");
+
+ CHECK_GT(iovec.size(), 0u);
const auto start = aos::monotonic_clock::now();
+ // Validation of alignment assumptions.
if (aligned) {
- CHECK_EQ((total_write_bytes_ % FileHandler::kSector), 0u)
+ CHECK(IsAligned(total_write_bytes_))
<< ": Failed after writing " << total_write_bytes_
<< " to the file, attempting aligned write with unaligned start.";
- for (size_t i = 0; i < iovec_size; ++i) {
+
+ for (const auto &iovec_item : iovec) {
absl::Span<const uint8_t> data(
- reinterpret_cast<const uint8_t *>(iovec_data[i].iov_base),
- iovec_data[i].iov_len);
- VLOG(2) << " iov_base " << static_cast<void *>(iovec_data[i].iov_base)
- << ", iov_len " << iovec_data[i].iov_len;
+ reinterpret_cast<const uint8_t *>(iovec_item.iov_base),
+ iovec_item.iov_len);
+ VLOG(2) << " iov_base " << static_cast<void *>(iovec_item.iov_base)
+ << ", iov_len " << iovec_item.iov_len;
CHECK(IsAlignedStart(data) && IsAlignedLength(data));
- CHECK_GT(data.size(), 0u);
- }
- } else {
- size_t accumulated_write_bytes = total_write_bytes_;
- for (size_t i = 0; i < iovec_size; ++i) {
- absl::Span<const uint8_t> data(
- reinterpret_cast<const uint8_t *>(iovec_data[i].iov_base),
- iovec_data[i].iov_len);
- VLOG(2) << " accumulated_write_bytes 0x" << std::hex
- << accumulated_write_bytes << " (" << std::dec
- << accumulated_write_bytes << "), iov_base "
- << static_cast<void *>(iovec_data[i].iov_base) << ", iov_len 0x"
- << std::hex << iovec_data[i].iov_len << " (" << std::dec
- << iovec_data[i].iov_len << ")";
-
- if ((accumulated_write_bytes % FileHandler::kSector) == 0) {
- CHECK(!IsAlignedStart(data) || !IsAlignedLength(data));
- }
-
- accumulated_write_bytes += data.size();
- CHECK_GT(data.size(), 0u);
}
}
- if (VLOG_IS_ON(2)) {
- size_t to_be_written = 0;
- for (size_t i = 0; i < iovec_size; ++i) {
- to_be_written += iovec_data[i].iov_len;
- }
- VLOG(2) << "Going to write " << to_be_written;
- CHECK_GT(to_be_written, 0u);
+ // Calculation of expected written size.
+ size_t counted_size = 0;
+ for (const auto &iovec_item : iovec) {
+ CHECK_GT(iovec_item.iov_len, 0u);
+ counted_size += iovec_item.iov_len;
}
- const ssize_t written = writev(fd_, iovec_data, iovec_size);
- VLOG(2) << "Wrote " << written << ", for iovec size " << iovec_size;
+ VLOG(2) << "Going to write " << counted_size;
+ CHECK_GT(counted_size, 0u);
+
+ const ssize_t written = writev(fd_, iovec.data(), iovec.size());
+ VLOG(2) << "Wrote " << written << ", for iovec size " << iovec.size();
+
+ const auto end = aos::monotonic_clock::now();
+ if (written == -1 && errno == ENOSPC) {
+ return WriteCode::kOutOfSpace;
+ }
+ PCHECK(written >= 0) << ": write failed, got " << written;
+ if (written < static_cast<ssize_t>(counted_size)) {
+ // Sometimes this happens instead of ENOSPC. On a real filesystem, this
+ // never seems to happen in any other case. If we ever want to log to a
+ // socket, this will happen more often. However, until we get there, we'll
+ // just assume it means we ran out of space.
+ return WriteCode::kOutOfSpace;
+ }
if (FLAGS_sync) {
// Flush asynchronously and force the data out of the cache.
@@ -321,21 +301,11 @@
last_synced_bytes_ = total_write_bytes_;
}
- const auto end = aos::monotonic_clock::now();
- if (written == -1 && errno == ENOSPC) {
- return WriteCode::kOutOfSpace;
- }
- PCHECK(written >= 0) << ": write failed, got " << written;
- if (written < static_cast<ssize_t>(counted_size)) {
- // Sometimes this happens instead of ENOSPC. On a real filesystem, this
- // never seems to happen in any other case. If we ever want to log to a
- // socket, this will happen more often. However, until we get there, we'll
- // just assume it means we ran out of space.
- return WriteCode::kOutOfSpace;
- }
-
total_write_bytes_ += written;
- write_stats_.UpdateStats(end - start, written, iovec_size);
+ if (aligned) {
+ written_aligned_ += written;
+ }
+ write_stats_.UpdateStats(end - start, written, iovec.size());
return WriteCode::kOk;
}
@@ -483,4 +453,5 @@
}
return WriteCode::kOk;
}
+
} // namespace aos::logger