Gentle introduction of log backend
The goal is to be able to write short logs to the pre-allocated
memory. To do that, we want to decouple file operations from logs
behind light abstraction.
There are a couple of TODO added. I hope to fix them with next iteration
where actual memory log backend will be implemented.
Change-Id: I65e80825b1e080375efc54f35b270df1ceb17a0d
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 02a9fdd..0338057 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -7,6 +7,7 @@
#include <algorithm>
#include <climits>
+#include <filesystem>
#include "absl/strings/escaping.h"
#include "aos/configuration.h"
@@ -64,10 +65,6 @@
"corrupt message found by MessageReader be silently ignored, "
"providing access to all uncorrupted messages in a logfile.");
-DEFINE_bool(direct, false,
- "If true, write using O_DIRECT and write 512 byte aligned blocks "
- "whenever possible.");
-
namespace aos::logger {
namespace {
@@ -83,51 +80,14 @@
}
} // namespace
-DetachedBufferWriter::DetachedBufferWriter(std::string_view filename,
- std::unique_ptr<DataEncoder> encoder)
- : filename_(filename),
- encoder_(std::move(encoder)),
- supports_odirect_(FLAGS_direct) {
- iovec_.reserve(10);
- if (!util::MkdirPIfSpace(filename, 0777)) {
- ran_out_of_space_ = true;
- } else {
- fd_ = open(filename_.c_str(), O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
- if (fd_ == -1 && errno == ENOSPC) {
- ran_out_of_space_ = true;
- } else {
- PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
- << " for writing";
- VLOG(1) << "Opened " << this->filename() << " for writing";
-
- flags_ = fcntl(fd_, F_GETFL, 0);
- PCHECK(flags_ >= 0) << ": Failed to get flags for " << this->filename();
-
- EnableDirect();
- }
- }
-}
-
-void DetachedBufferWriter::EnableDirect() {
- if (supports_odirect_ && !ODirectEnabled()) {
- const int new_flags = flags_ | O_DIRECT;
- // Track if we failed to set O_DIRECT. Note: Austin hasn't seen this call
- // fail. The write call tends to fail instead.
- if (fcntl(fd_, F_SETFL, new_flags) == -1) {
- PLOG(WARNING) << "Failed to set O_DIRECT on " << filename();
- supports_odirect_ = false;
- } else {
- VLOG(1) << "Enabled O_DIRECT on " << filename();
- flags_ = new_flags;
- }
- }
-}
-
-void DetachedBufferWriter::DisableDirect() {
- if (supports_odirect_ && ODirectEnabled()) {
- flags_ = flags_ & (~O_DIRECT);
- PCHECK(fcntl(fd_, F_SETFL, flags_) != -1) << ": Failed to disable O_DIRECT";
- VLOG(1) << "Disabled O_DIRECT on " << filename();
+DetachedBufferWriter::DetachedBufferWriter(
+ std::unique_ptr<FileHandler> file_handler,
+ std::unique_ptr<DataEncoder> encoder)
+ : file_handler_(std::move(file_handler)), encoder_(std::move(encoder)) {
+ CHECK(file_handler_);
+ ran_out_of_space_ = file_handler_->OpenForWrite() == WriteCode::kOutOfSpace;
+ if (ran_out_of_space_) {
+ LOG(WARNING) << "And we are out of space";
}
}
@@ -148,22 +108,10 @@
// (because that data will then be its data).
DetachedBufferWriter &DetachedBufferWriter::operator=(
DetachedBufferWriter &&other) {
- std::swap(filename_, other.filename_);
+ std::swap(file_handler_, other.file_handler_);
std::swap(encoder_, other.encoder_);
- std::swap(fd_, other.fd_);
std::swap(ran_out_of_space_, other.ran_out_of_space_);
std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
- std::swap(iovec_, other.iovec_);
- std::swap(max_write_time_, other.max_write_time_);
- std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
- std::swap(max_write_time_messages_, other.max_write_time_messages_);
- std::swap(total_write_time_, other.total_write_time_);
- std::swap(total_write_count_, other.total_write_count_);
- std::swap(total_write_messages_, other.total_write_messages_);
- std::swap(total_write_bytes_, other.total_write_bytes_);
- std::swap(last_synced_bytes_, other.last_synced_bytes_);
- std::swap(supports_odirect_, other.supports_odirect_);
- std::swap(flags_, other.flags_);
std::swap(last_flush_time_, other.last_flush_time_);
return *this;
}
@@ -183,7 +131,8 @@
// Keep writing chunks until we've written it all. If we end up with a
// partial write, this means we need to flush to disk.
do {
- const size_t bytes_written = encoder_->Encode(copier, overall_bytes_written);
+ const size_t bytes_written =
+ encoder_->Encode(copier, overall_bytes_written);
CHECK(bytes_written != 0);
overall_bytes_written += bytes_written;
@@ -198,22 +147,14 @@
}
void DetachedBufferWriter::Close() {
- if (fd_ == -1) {
+ if (!file_handler_->is_open()) {
return;
}
encoder_->Finish();
while (encoder_->queue_size() > 0) {
Flush(monotonic_clock::max_time);
}
- if (close(fd_) == -1) {
- if (errno == ENOSPC) {
- ran_out_of_space_ = true;
- } else {
- PLOG(ERROR) << "Closing log file failed";
- }
- }
- fd_ = -1;
- VLOG(1) << "Closed " << filename();
+ ran_out_of_space_ = file_handler_->Close() == WriteCode::kOutOfSpace;
}
void DetachedBufferWriter::Flush(aos::monotonic_clock::time_point now) {
@@ -236,141 +177,9 @@
return;
}
- iovec_.clear();
- size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
- iovec_.resize(iovec_size);
- size_t counted_size = 0;
-
- // Ok, we now need to figure out if we were aligned, and if we were, how much
- // of the data we are being asked to write is aligned.
- //
- // The file is aligned if it is a multiple of kSector in length. The data is
- // aligned if it's memory is kSector aligned, and the length is a multiple of
- // kSector in length.
- bool aligned = (total_write_bytes_ % kSector) == 0;
- size_t write_index = 0;
- for (size_t i = 0; i < iovec_size; ++i) {
- iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
-
- // Make sure the address is aligned, or give up. This should be uncommon,
- // but is always possible.
- if ((reinterpret_cast<size_t>(iovec_[write_index].iov_base) &
- (kSector - 1)) != 0) {
- aligned = false;
- }
-
- // Now, see if the length is a multiple of kSector. The goal is to figure
- // out if/how much memory we can write out with O_DIRECT so that only the
- // last little bit is done with non-direct IO to keep it fast.
- iovec_[write_index].iov_len = queue[i].size();
- if ((iovec_[write_index].iov_len % kSector) != 0) {
- VLOG(1) << "Unaligned length on " << filename();
- // If we've got over a sector of data to write, write it out with O_DIRECT
- // and then continue writing the rest unaligned.
- if (aligned && iovec_[write_index].iov_len > kSector) {
- const size_t aligned_size =
- iovec_[write_index].iov_len & (~(kSector - 1));
- VLOG(1) << "Was aligned, writing last chunk rounded from "
- << queue[i].size() << " to " << aligned_size;
- iovec_[write_index].iov_len = aligned_size;
-
- WriteV(iovec_.data(), i + 1, true, counted_size + aligned_size);
-
- // Now, everything before here has been written. Make an iovec out of
- // the last bytes, and keep going.
- iovec_size -= write_index;
- iovec_.resize(iovec_size);
- write_index = 0;
- counted_size = 0;
-
- iovec_[write_index].iov_base =
- const_cast<uint8_t *>(queue[i].data() + aligned_size);
- iovec_[write_index].iov_len = queue[i].size() - aligned_size;
- }
- aligned = false;
- }
- VLOG(1) << "Writing " << iovec_[write_index].iov_len << " to "
- << filename();
- counted_size += iovec_[write_index].iov_len;
- ++write_index;
- }
-
- // Either write the aligned data if it is all aligned, or write the rest
- // unaligned if we wrote aligned up above.
- WriteV(iovec_.data(), iovec_.size(), aligned, counted_size);
-
- encoder_->Clear(iovec_size);
-}
-
-size_t DetachedBufferWriter::WriteV(struct iovec *iovec_data, size_t iovec_size,
- bool aligned, size_t counted_size) {
- // Configure the file descriptor to match the mode we should be in. This is
- // safe to over-call since it only does the syscall if needed.
- if (aligned) {
- EnableDirect();
- } else {
- DisableDirect();
- }
-
- const auto start = aos::monotonic_clock::now();
- const ssize_t written = writev(fd_, iovec_data, iovec_size);
-
- if (written > 0) {
- // Flush asynchronously and force the data out of the cache.
- sync_file_range(fd_, total_write_bytes_, written, SYNC_FILE_RANGE_WRITE);
- if (last_synced_bytes_ != 0) {
- // Per Linus' recommendation online on how to do fast file IO, do a
- // blocking flush of the previous write chunk, and then tell the kernel to
- // drop the pages from the cache. This makes sure we can't get too far
- // ahead.
- sync_file_range(fd_, last_synced_bytes_,
- total_write_bytes_ - last_synced_bytes_,
- SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
- SYNC_FILE_RANGE_WAIT_AFTER);
- posix_fadvise(fd_, last_synced_bytes_,
- total_write_bytes_ - last_synced_bytes_,
- POSIX_FADV_DONTNEED);
-
- last_synced_bytes_ = total_write_bytes_;
- }
- }
-
- const auto end = aos::monotonic_clock::now();
- HandleWriteReturn(written, counted_size);
-
- UpdateStatsForWrite(end - start, written, iovec_size);
-
- return written;
-}
-
-void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
- size_t write_size) {
- if (write_return == -1 && errno == ENOSPC) {
- ran_out_of_space_ = true;
- return;
- }
- PCHECK(write_return >= 0) << ": write failed, got " << write_return;
- if (write_return < static_cast<ssize_t>(write_size)) {
- // Sometimes this happens instead of ENOSPC. On a real filesystem, this
- // never seems to happen in any other case. If we ever want to log to a
- // socket, this will happen more often. However, until we get there, we'll
- // just assume it means we ran out of space.
- ran_out_of_space_ = true;
- return;
- }
-}
-
-void DetachedBufferWriter::UpdateStatsForWrite(
- aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
- if (duration > max_write_time_) {
- max_write_time_ = duration;
- max_write_time_bytes_ = written;
- max_write_time_messages_ = iovec_size;
- }
- total_write_time_ += duration;
- ++total_write_count_;
- total_write_messages_ += iovec_size;
- total_write_bytes_ += written;
+ const WriteResult result = file_handler_->Write(queue);
+ encoder_->Clear(result.messages_written);
+ ran_out_of_space_ = result.code == WriteCode::kOutOfSpace;
}
void DetachedBufferWriter::FlushAtThreshold(