Don't lose pieces out of the middle of writing files
We didn't get all the accounting right when writing out aligned subsets
of buffers and unaligned subsets. Fix the correctness issues in that
and add tests (Alexei wrote most of the tests).
Change-Id: I8ac6bcbd6b762174b3f47d41ab5da1dff2cfec44
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 85d27b4..6cb3740 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -94,6 +94,7 @@
target_compatible_with = ["@platforms//os:linux"],
deps = [
":log_backend",
+ "//aos/containers:resizeable_buffer",
"//aos/testing:googletest",
"//aos/testing:tmpdir",
"@com_github_google_glog//:glog",
diff --git a/aos/events/logging/log_backend.cc b/aos/events/logging/log_backend.cc
index f47b16d..a8b7a43 100644
--- a/aos/events/logging/log_backend.cc
+++ b/aos/events/logging/log_backend.cc
@@ -11,6 +11,10 @@
DEFINE_bool(direct, false,
"If true, write using O_DIRECT and write 512 byte aligned blocks "
"whenever possible.");
+DEFINE_bool(
+ sync, false,
+ "If true, sync data to disk as we go so we don't get too far ahead. Also "
+ "fadvise that we are done with the memory once it hits disk.");
namespace aos::logger {
namespace {
@@ -72,34 +76,86 @@
WriteResult FileHandler::Write(
const absl::Span<const absl::Span<const uint8_t>> &queue) {
iovec_.clear();
- size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
- iovec_.resize(iovec_size);
+ CHECK_LE(queue.size(), static_cast<size_t>(IOV_MAX));
+ iovec_.resize(queue.size());
+ // Size of the data currently in iovec_.
size_t counted_size = 0;
// Ok, we now need to figure out if we were aligned, and if we were, how much
// of the data we are being asked to write is aligned.
//
- // The file is aligned if it is a multiple of kSector in length. The data is
- // aligned if it's memory is kSector aligned, and the length is a multiple of
- // kSector in length.
+ // When writing with O_DIRECT, the kernel only will accept writes where the
+ // offset into the file is a multiple of kSector, the data is aligned to
+ // kSector in memory, and the length being written is a multiple of kSector.
+ // Some of the callers use an aligned ResizeableBuffer to generate 512 byte
+ // aligned buffers for this code to find and use.
bool aligned = (total_write_bytes_ % kSector) == 0;
+
+ // Index we are filling in next. Keeps resetting back to 0 as we write
+ // intermediates.
size_t write_index = 0;
- for (size_t i = 0; i < iovec_size; ++i) {
+ for (size_t i = 0; i < queue.size(); ++i) {
iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
+ iovec_[write_index].iov_len = queue[i].size();
// Make sure the address is aligned, or give up. This should be uncommon,
// but is always possible.
- if ((reinterpret_cast<size_t>(iovec_[write_index].iov_base) &
- (kSector - 1)) != 0) {
+ if ((reinterpret_cast<size_t>(iovec_[write_index].iov_base) % kSector) !=
+ 0) {
+ // Flush if we were aligned and have data.
+ if (aligned && write_index != 0) {
+ VLOG(1) << "Was aligned, now is not, writing previous data";
+ const auto code =
+ WriteV(iovec_.data(), write_index, true, counted_size);
+ if (code == WriteCode::kOutOfSpace) {
+ return {
+ .code = code,
+ .messages_written = i,
+ };
+ }
+
+ // Now, everything before here has been written. Make an iovec out of
+ // the rest and keep going.
+ write_index = 0;
+ counted_size = 0;
+
+ iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
+ iovec_[write_index].iov_len = queue[i].size();
+ }
aligned = false;
+ } else {
+ // We are now starting aligned again, and have data worth writing! Flush
+ // what was there before.
+ if (!aligned && iovec_[write_index].iov_len >= kSector &&
+ write_index != 0) {
+ VLOG(1) << "Was not aligned, now is, writing previous data";
+
+ const auto code =
+ WriteV(iovec_.data(), write_index, false, counted_size);
+ if (code == WriteCode::kOutOfSpace) {
+ return {
+ .code = code,
+ .messages_written = i,
+ };
+ }
+
+ // Now, everything before here has been written. Make an iovec out of
+ // the rest and keep going.
+ write_index = 0;
+ counted_size = 0;
+
+ iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
+ iovec_[write_index].iov_len = queue[i].size();
+ aligned = true;
+ }
}
// Now, see if the length is a multiple of kSector. The goal is to figure
// out if/how much memory we can write out with O_DIRECT so that only the
// last little bit is done with non-direct IO to keep it fast.
- iovec_[write_index].iov_len = queue[i].size();
if ((iovec_[write_index].iov_len % kSector) != 0) {
- VLOG(1) << "Unaligned length on " << filename();
+ VLOG(1) << "Unaligned length " << iovec_[write_index].iov_len << " on "
+ << filename();
// If we've got over a sector of data to write, write it out with O_DIRECT
// and then continue writing the rest unaligned.
if (aligned && iovec_[write_index].iov_len > kSector) {
@@ -110,7 +166,7 @@
iovec_[write_index].iov_len = aligned_size;
const auto code =
- WriteV(iovec_.data(), i + 1, true, counted_size + aligned_size);
+ WriteV(iovec_.data(), write_index + 1, true, counted_size + aligned_size);
if (code == WriteCode::kOutOfSpace) {
return {
.code = code,
@@ -119,11 +175,7 @@
}
// Now, everything before here has been written. Make an iovec out of
- // the last bytes, and keep going.
- // TODO (Alexei, Austin): is it safe to do here since it can be a
- // situation when i >= iovec_size
- iovec_size -= write_index;
- iovec_.resize(iovec_size);
+ // the rest and keep going.
write_index = 0;
counted_size = 0;
@@ -141,10 +193,10 @@
// Either write the aligned data if it is all aligned, or write the rest
// unaligned if we wrote aligned up above.
- const auto code = WriteV(iovec_.data(), iovec_.size(), aligned, counted_size);
+ const auto code = WriteV(iovec_.data(), write_index, aligned, counted_size);
return {
.code = code,
- .messages_written = iovec_size,
+ .messages_written = queue.size(),
};
}
@@ -159,9 +211,22 @@
}
const auto start = aos::monotonic_clock::now();
- const ssize_t written = writev(fd_, iovec_data, iovec_size);
- if (written > 0) {
+ if (VLOG_IS_ON(2)) {
+ size_t to_be_written = 0;
+ for (size_t i = 0; i < iovec_size; ++i) {
+ VLOG(2) << " iov_base " << static_cast<void *>(iovec_data[i].iov_base)
+ << ", iov_len " << iovec_data[i].iov_len;
+ to_be_written += iovec_data[i].iov_len;
+ }
+ CHECK_GT(to_be_written, 0u);
+ VLOG(2) << "Going to write " << to_be_written;
+ }
+
+ const ssize_t written = writev(fd_, iovec_data, iovec_size);
+ VLOG(2) << "Wrote " << written << ", for iovec size " << iovec_size;
+
+ if (FLAGS_sync && written > 0) {
// Flush asynchronously and force the data out of the cache.
sync_file_range(fd_, total_write_bytes_, written, SYNC_FILE_RANGE_WRITE);
if (last_synced_bytes_ != 0) {
@@ -343,4 +408,4 @@
}
return WriteCode::kOk;
}
-} // namespace aos::logger
\ No newline at end of file
+} // namespace aos::logger
diff --git a/aos/events/logging/log_backend.h b/aos/events/logging/log_backend.h
index 4dd393e..20ade91 100644
--- a/aos/events/logging/log_backend.h
+++ b/aos/events/logging/log_backend.h
@@ -122,6 +122,11 @@
// Peeks messages from queue and writes it to file. Returns code when
// out-of-space problem occurred along with number of messages from queue that
// was written.
+ //
+ // The spans can be aligned or not, and can have any lengths. This code will
+ // write faster if the spans passed in start at aligned addresses, and are
+ // multiples of kSector long (and the data written so far is also a multiple
+ // of kSector length).
virtual WriteResult Write(
const absl::Span<const absl::Span<const uint8_t>> &queue);
diff --git a/aos/events/logging/log_backend_test.cc b/aos/events/logging/log_backend_test.cc
index 452592f..e940948 100644
--- a/aos/events/logging/log_backend_test.cc
+++ b/aos/events/logging/log_backend_test.cc
@@ -1,8 +1,16 @@
#include "aos/events/logging/log_backend.h"
+#include <algorithm>
#include <filesystem>
+#include <fstream>
+#include <random>
+#include "absl/strings/str_cat.h"
+#include "absl/strings/str_join.h"
+#include "aos/containers/resizeable_buffer.h"
#include "aos/testing/tmpdir.h"
+#include "glog/logging.h"
+#include "gmock/gmock.h"
#include "gtest/gtest.h"
namespace aos::logger::testing {
@@ -84,4 +92,229 @@
EXPECT_TRUE(std::filesystem::exists(renamed + "test.log"));
}
-} // namespace aos::logger::testing
\ No newline at end of file
+// It represents calls to Write function (batching of calls and messages) where
+// int values are sizes of each message in the queue.
+using WriteRecipe = std::vector<std::vector<int>>;
+
+struct FileWriteTestBase : public ::testing::Test {
+ uint8_t NextRandom() { return distribution(engine); }
+
+ class AlignedReallocator {
+ public:
+ static void *Realloc(void *old, size_t old_size, size_t new_capacity) {
+ void *new_memory = std::aligned_alloc(512, new_capacity);
+ if (old) {
+ memcpy(new_memory, old, old_size);
+ free(old);
+ }
+ return new_memory;
+ }
+ };
+
+ AllocatorResizeableBuffer<AlignedReallocator> buffer;
+
+ void TestRecipe(const WriteRecipe &recipe) {
+ VLOG(1) << "Starting";
+ for (const std::vector<int> &r : recipe) {
+ VLOG(1) << " chunk " << absl::StrJoin(r, ", ");
+ }
+ size_t requested_size = 0;
+ for (const auto &call : recipe) {
+ for (const auto &message_size : call) {
+ requested_size += message_size;
+ }
+ }
+
+ // Grow the cached buffer if it needs to grow. Building a random buffer is
+ // the most expensive part of the test.
+ if (buffer.size() < requested_size) {
+ // Make sure it is 512 aligned... That makes sure we have the best chance
+ // of transitioning to and from being aligned.
+ buffer.resize((requested_size + FileHandler::kSector - 1) &
+ (~(FileHandler::kSector - 1)));
+ std::generate(std::begin(buffer), std::end(buffer),
+ [this]() { return NextRandom(); });
+ }
+
+ // Back align the data to the buffer so we make sure the contents of the
+ // file keep changing in case a file somehow doesn't get deleted, or
+ // collides with something else.
+ const uint8_t *adjusted_start =
+ buffer.data() + buffer.size() - requested_size;
+
+ // logevent has to end with '/' to be recognized as a folder.
+ const std::string logevent = aos::testing::TestTmpDir() + "/";
+ const auto file = std::filesystem::path(logevent) / "test.log";
+ std::filesystem::remove_all(file);
+ VLOG(1) << "Writing to " << file.c_str();
+
+ FileBackend backend(logevent);
+ auto handler = backend.RequestFile("test.log");
+ ASSERT_EQ(handler->OpenForWrite(), WriteCode::kOk);
+
+ // Build arguments for Write.
+ size_t position = 0;
+ for (const auto &call : recipe) {
+ std::vector<absl::Span<const uint8_t>> queue;
+ for (const auto &message_size : call) {
+ const uint8_t *current = adjusted_start + position;
+ queue.emplace_back(current, message_size);
+ position += message_size;
+ }
+ auto result = handler->Write(queue);
+ EXPECT_EQ(result.code, WriteCode::kOk);
+ EXPECT_EQ(result.messages_written, call.size());
+ }
+
+ ASSERT_EQ(handler->Close(), WriteCode::kOk);
+ EXPECT_TRUE(std::filesystem::exists(file));
+ EXPECT_EQ(std::filesystem::file_size(file), requested_size);
+
+ // Confirm that the contents then match the original buffer.
+ std::ifstream file_stream(file, std::ios::in | std::ios::binary);
+ std::vector<uint8_t> content((std::istreambuf_iterator<char>(file_stream)),
+ std::istreambuf_iterator<char>());
+ ASSERT_EQ(content.size(), requested_size);
+ bool matches = true;
+ for (size_t i = 0; i < content.size(); ++i) {
+ if (content[i] != adjusted_start[i]) {
+ matches = false;
+ break;
+ }
+ }
+ if (!matches) {
+ ASSERT_TRUE(false);
+ }
+ }
+
+ std::random_device random;
+ std::mt19937 engine{random()};
+ std::uniform_int_distribution<uint8_t> distribution{0, 0xFF};
+};
+
+// Tests that random sets of reads and writes always result in all the data
+// being written.
+TEST_F(FileWriteTestBase, RandomTest) {
+ std::mt19937 engine2{random()};
+ std::uniform_int_distribution<int> count_distribution{1, 5};
+
+ // Pick a bunch of lengths that will result in things that add up to multiples
+ // of 512 and end up transitioning across the aligned and unaligned boundary.
+ const std::vector<int> lengths = {
+ 0x100b5, 0xff4b, 0x10000, 1024 - 7, 1024 - 6, 1024 - 5, 1024 - 4,
+ 1024 - 3, 1024 - 2, 1024 - 1, 1024, 1024 + 1, 1024 + 2, 1024 + 3,
+ 1024 + 4, 1024 + 5, 1024 + 6, 1024 + 7, 512 - 7, 512 - 6, 512 - 5,
+ 512 - 4, 512 - 3, 512 - 2, 512 - 1, 512, 512 + 1, 512 + 2,
+ 512 + 3, 512 + 4, 512 + 5, 512 + 6, 512 + 7};
+ std::uniform_int_distribution<int> lengths_distribution{
+ 0, static_cast<int>(lengths.size() - 1)};
+
+ for (int i = 0; i < 100000; ++i) {
+ WriteRecipe recipe;
+ int number_of_writes = count_distribution(engine2);
+ for (int j = 0; j < number_of_writes; ++j) {
+ int number_of_chunks = count_distribution(engine2);
+ std::vector<int> r;
+ for (int k = 0; k < number_of_chunks; ++k) {
+ r.emplace_back(lengths[lengths_distribution(engine2)]);
+ }
+ recipe.emplace_back(std::move(r));
+ }
+
+ TestRecipe(recipe);
+ }
+}
+
+// Test an aligned to unaligned transition to make sure everything works.
+TEST_F(FileWriteTestBase, AlignedToUnaligned) {
+ AllocatorResizeableBuffer<AlignedReallocator> aligned_buffer;
+ AllocatorResizeableBuffer<AlignedReallocator> unaligned_buffer;
+
+ aligned_buffer.resize(FileHandler::kSector * 4);
+ std::generate(std::begin(aligned_buffer), std::end(aligned_buffer),
+ [this]() { return NextRandom(); });
+
+ unaligned_buffer.resize(FileHandler::kSector * 4);
+ std::generate(std::begin(unaligned_buffer), std::end(unaligned_buffer),
+ [this]() { return NextRandom(); });
+
+ const size_t kOffset = 53;
+ absl::Span<const uint8_t> unaligned_span(unaligned_buffer.data() + kOffset,
+ aligned_buffer.size() - kOffset);
+
+ std::vector<absl::Span<const uint8_t>> queue;
+
+ queue.emplace_back(aligned_buffer.data(), aligned_buffer.size());
+ queue.emplace_back(unaligned_span);
+ LOG(INFO) << "Queue 0 " << queue[0].size();
+ LOG(INFO) << "Queue 1 " << queue[1].size();
+
+ const std::string logevent = aos::testing::TestTmpDir() + "/";
+ const auto file = std::filesystem::path(logevent) / "test.log";
+ std::filesystem::remove_all(file);
+ VLOG(1) << "Writing to " << file.c_str();
+
+ FileBackend backend(logevent);
+ auto handler = backend.RequestFile("test.log");
+ ASSERT_EQ(handler->OpenForWrite(), WriteCode::kOk);
+
+ auto result = handler->Write(queue);
+ EXPECT_EQ(result.code, WriteCode::kOk);
+ EXPECT_EQ(result.messages_written, queue.size());
+
+ ASSERT_EQ(handler->Close(), WriteCode::kOk);
+ EXPECT_TRUE(std::filesystem::exists(file));
+ const size_t requested_size = queue[0].size() + queue[1].size();
+ EXPECT_EQ(std::filesystem::file_size(file), requested_size);
+
+ // Confirm that the contents then match the original buffer.
+ std::ifstream file_stream(file, std::ios::in | std::ios::binary);
+ std::vector<uint8_t> content((std::istreambuf_iterator<char>(file_stream)),
+ std::istreambuf_iterator<char>());
+ ASSERT_EQ(content.size(), requested_size);
+ bool matches = true;
+ for (size_t i = 0; i < queue[0].size(); ++i) {
+ if (content[i] != aligned_buffer.data()[i]) {
+ matches = false;
+ break;
+ }
+ }
+ for (size_t i = 0; i < queue[1].size(); ++i) {
+ if (content[i + queue[0].size()] != unaligned_span.data()[i]) {
+ matches = false;
+ break;
+ }
+ }
+ if (!matches) {
+ ASSERT_TRUE(false);
+ }
+}
+
+struct FileWriteTestFixture : public ::testing::WithParamInterface<WriteRecipe>,
+ public FileWriteTestBase {};
+
+TEST_P(FileWriteTestFixture, CheckSizeOfWrittenFile) {
+ auto recipe = GetParam();
+ TestRecipe(recipe);
+}
+
+// Try out some well known failure cases transitioning across the alignment
+// boundary.
+INSTANTIATE_TEST_SUITE_P(
+ FileWriteTest, FileWriteTestFixture,
+ ::testing::Values(WriteRecipe{{0x10000}}, WriteRecipe{{0x10000, 0x1000b5}},
+ WriteRecipe{{0x10000, 0x1000b5}, {0xfff4b, 0x10000}},
+ WriteRecipe{{0x1000b5, 0xfff4b}, {0x10000}},
+ WriteRecipe{{65536, 517, 65717}},
+ WriteRecipe{{65536, 517, 518, 511},
+ {514},
+ {505, 514},
+ {505, 514, 65355, 519}},
+ WriteRecipe{{65536, 518, 511, 511},
+ {65717},
+ {65717, 65717, 518},
+ {65536, 65536, 508, 65355},
+ {515, 519}},
+ WriteRecipe{{0x1000b5, 0xfff4b, 0x100000}, {0x10000}}));
+
+} // namespace aos::logger::testing
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 0338057..990e742 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -210,10 +210,12 @@
while (encoder_->space() == 0 ||
encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
encoder_->queue_size() >= IOV_MAX ||
- now > last_flush_time_ +
- chrono::duration_cast<chrono::nanoseconds>(
- chrono::duration<double>(FLAGS_flush_period))) {
- VLOG(1) << "Chose to flush at " << now << ", last " << last_flush_time_;
+ (now > last_flush_time_ +
+ chrono::duration_cast<chrono::nanoseconds>(
+ chrono::duration<double>(FLAGS_flush_period)) &&
+ encoder_->queued_bytes() != 0)) {
+ VLOG(1) << "Chose to flush at " << now << ", last " << last_flush_time_
+ << " queued bytes " << encoder_->queued_bytes();
Flush(now);
}
}