blob: 6cd9a7de3afb26bce4f0d1410a2e94a6c1609adb [file] [log] [blame]
#include "aos/events/logging/log_backend.h"
#include <dirent.h>
#include <filesystem>
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "glog/logging.h"
#include "aos/events/logging/file_operations.h"
#include "aos/util/file.h"
DEFINE_bool(
sync, false,
"If true, sync data to disk as we go so we don't get too far ahead. Also "
"fadvise that we are done with the memory once it hits disk.");
DEFINE_uint32(queue_reserve, 32, "Pre-reserved size of write queue.");
namespace aos::logger {
namespace {
constexpr const char *kTempExtension = ".tmp";
// Assuming that kSector is power of 2, it aligns address to the left size.
inline size_t AlignToLeft(size_t value) {
return value & (~(FileHandler::kSector - 1));
}
inline bool IsAligned(size_t value) {
return value % FileHandler::kSector == 0;
}
inline bool IsAlignedStart(const absl::Span<const uint8_t> span) {
return (reinterpret_cast<size_t>(span.data()) % FileHandler::kSector) == 0;
}
inline bool IsAlignedLength(const absl::Span<const uint8_t> span) {
return (span.size() % FileHandler::kSector) == 0;
}
} // namespace
logger::QueueAligner::QueueAligner() {
aligned_queue_.reserve(FLAGS_queue_reserve);
}
void logger::QueueAligner::FillAlignedQueue(
const absl::Span<const absl::Span<const uint8_t>> &queue) {
aligned_queue_.clear();
size_t queue_index = 0;
for (const auto &span : queue) {
++queue_index;
// Generally, every span might have 3 optional parts (i.e. 2^3 cases):
// 1. unaligned prefix - from start till first aligned block.
// 2. aligned main - block with aligned start and size
// 3. unaligned suffix - block with aligned start, and size less than one
// sector. If size of the span is less than 1 sector, let's call it prefix.
auto *data = span.data();
size_t size = span.size();
const auto start = reinterpret_cast<size_t>(data);
VLOG(2) << "Consider span starting at " << std::hex << start
<< " with size " << size;
CHECK_GT(size, 0u)
<< ": Nobody should be sending empty messages. Queue index "
<< (queue_index - 1) << " out of " << queue.size();
const auto next_aligned =
IsAligned(start) ? start : AlignToLeft(start) + FileHandler::kSector;
const auto prefix_size = next_aligned - start;
VLOG(2) << "Calculated prefix size " << std::hex << prefix_size;
if (prefix_size >= size) {
// size of prefix >= size of span - alignment is not possible, accept the
// whole span
VLOG(2) << "Only prefix found";
CHECK_GT(size, 0u);
aligned_queue_.emplace_back(data, size, false);
continue;
}
CHECK_LT(prefix_size, FileHandler::kSector)
<< ": Wrong calculation of 'next' aligned position";
if (prefix_size > 0) {
// Cut the prefix and move to the main part.
VLOG(2) << "Cutting prefix at " << std::hex << start << " of size "
<< prefix_size;
aligned_queue_.emplace_back(data, prefix_size, false);
data += prefix_size;
size -= prefix_size;
CHECK(data <= span.data() + span.size()) << " :Boundaries after prefix";
}
if (IsAligned(size)) {
// the rest is aligned.
VLOG(2) << "Returning aligned main part";
CHECK_GT(size, 0u);
aligned_queue_.emplace_back(data, size, true);
continue;
}
const auto aligned_size = AlignToLeft(size);
CHECK(aligned_size < size) << ": Wrong calculation of 'main' size";
if (aligned_size > 0) {
VLOG(2) << "Cutting main part starting " << std::hex
<< reinterpret_cast<size_t>(data) << " of size " << aligned_size;
aligned_queue_.emplace_back(data, aligned_size, true);
data += aligned_size;
size -= aligned_size;
CHECK(data <= span.data() + span.size()) << " :Boundaries after main";
}
VLOG(2) << "Cutting suffix part starting " << std::hex
<< reinterpret_cast<size_t>(data) << " of size " << size;
CHECK_GT(size, 0u);
aligned_queue_.emplace_back(data, size, false);
}
}
FileHandler::FileHandler(std::string filename, bool supports_odirect)
: filename_(std::move(filename)), supports_odirect_(supports_odirect) {}
FileHandler::~FileHandler() { Close(); }
WriteCode FileHandler::OpenForWrite() {
iovec_.reserve(10);
if (!aos::util::MkdirPIfSpace(filename_, 0777)) {
return WriteCode::kOutOfSpace;
} else {
fd_ = open(filename_.c_str(), O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
if (fd_ == -1 && errno == ENOSPC) {
return WriteCode::kOutOfSpace;
} else {
PCHECK(fd_ != -1) << ": Failed to open " << filename_ << " for writing";
VLOG(1) << "Opened " << filename_ << " for writing";
}
flags_ = fcntl(fd_, F_GETFL, 0);
PCHECK(flags_ >= 0) << ": Failed to get flags for " << filename_;
EnableDirect();
CHECK(std::filesystem::exists(filename_));
return WriteCode::kOk;
}
}
void FileHandler::EnableDirect() {
if (supports_odirect_ && !ODirectEnabled()) {
const int new_flags = flags_ | O_DIRECT;
// Track if we failed to set O_DIRECT. Note: Austin hasn't seen this call
// fail. The write call tends to fail instead.
if (fcntl(fd_, F_SETFL, new_flags) == -1) {
PLOG(WARNING) << "Failed to set O_DIRECT on " << filename_;
supports_odirect_ = false;
} else {
VLOG(1) << "Enabled O_DIRECT on " << filename_;
flags_ = new_flags;
}
}
}
void FileHandler::DisableDirect() {
if (supports_odirect_ && ODirectEnabled()) {
flags_ = flags_ & (~O_DIRECT);
PCHECK(fcntl(fd_, F_SETFL, flags_) != -1) << ": Failed to disable O_DIRECT";
VLOG(1) << "Disabled O_DIRECT on " << filename_;
}
}
WriteResult FileHandler::Write(
const absl::Span<const absl::Span<const uint8_t>> &queue) {
iovec_.clear();
CHECK_LE(queue.size(), static_cast<size_t>(IOV_MAX));
queue_aligner_.FillAlignedQueue(queue);
CHECK_LE(queue_aligner_.aligned_queue().size(), static_cast<size_t>(IOV_MAX));
// Ok, we now need to figure out if we were aligned, and if we were, how much
// of the data we are being asked to write is aligned.
//
// When writing with O_DIRECT, the kernel only will accept writes where the
// offset into the file is a multiple of kSector, the data is aligned to
// kSector in memory, and the length being written is a multiple of kSector.
// Some of the callers use an aligned ResizeableBuffer to generate 512 byte
// aligned buffers for this code to find and use.
bool was_aligned = IsAligned(total_write_bytes_);
VLOG(1) << "Started " << (was_aligned ? "aligned" : "unaligned")
<< " at offset " << total_write_bytes_ << " on " << filename();
// Walk through aligned queue and batch writes based on aligned flag
for (const auto &item : queue_aligner_.aligned_queue()) {
if (was_aligned != item.aligned) {
// Switching aligned context. Let's flush current batch.
if (!iovec_.empty()) {
// Flush current queue if we need.
const auto code = WriteV(iovec_, was_aligned);
if (code == WriteCode::kOutOfSpace) {
// We cannot say anything about what number of messages was written
// for sure.
return {
.code = code,
.messages_written = queue.size(),
};
}
iovec_.clear();
}
// Write queue is flushed. WriteV updates the total_write_bytes_.
was_aligned = IsAligned(total_write_bytes_) && item.aligned;
}
iovec_.push_back(
{.iov_base = const_cast<uint8_t *>(item.data), .iov_len = item.size});
}
WriteCode result_code = WriteCode::kOk;
if (!iovec_.empty()) {
// Flush current queue if we need.
result_code = WriteV(iovec_, was_aligned);
}
return {
.code = result_code,
.messages_written = queue.size(),
};
}
WriteCode FileHandler::WriteV(const std::vector<struct iovec> &iovec,
bool aligned) {
// Configure the file descriptor to match the mode we should be in. This is
// safe to over-call since it only does the syscall if needed.
if (aligned) {
EnableDirect();
} else {
DisableDirect();
}
VLOG(2) << "Flushing queue of " << iovec.size() << " elements, "
<< (aligned ? "aligned" : "unaligned");
CHECK_GT(iovec.size(), 0u);
const auto start = aos::monotonic_clock::now();
// Validation of alignment assumptions.
if (aligned) {
CHECK(IsAligned(total_write_bytes_))
<< ": Failed after writing " << total_write_bytes_
<< " to the file, attempting aligned write with unaligned start.";
for (const auto &iovec_item : iovec) {
absl::Span<const uint8_t> data(
reinterpret_cast<const uint8_t *>(iovec_item.iov_base),
iovec_item.iov_len);
VLOG(2) << " iov_base " << static_cast<void *>(iovec_item.iov_base)
<< ", iov_len " << iovec_item.iov_len;
CHECK(IsAlignedStart(data) && IsAlignedLength(data));
}
}
// Calculation of expected written size.
size_t counted_size = 0;
for (const auto &iovec_item : iovec) {
CHECK_GT(iovec_item.iov_len, 0u);
counted_size += iovec_item.iov_len;
}
VLOG(2) << "Going to write " << counted_size;
CHECK_GT(counted_size, 0u);
const ssize_t written = writev(fd_, iovec.data(), iovec.size());
VLOG(2) << "Wrote " << written << ", for iovec size " << iovec.size();
const auto end = aos::monotonic_clock::now();
if (written == -1 && errno == ENOSPC) {
return WriteCode::kOutOfSpace;
}
PCHECK(written >= 0) << ": write failed, got " << written;
if (written < static_cast<ssize_t>(counted_size)) {
// Sometimes this happens instead of ENOSPC. On a real filesystem, this
// never seems to happen in any other case. If we ever want to log to a
// socket, this will happen more often. However, until we get there, we'll
// just assume it means we ran out of space.
return WriteCode::kOutOfSpace;
}
if (FLAGS_sync) {
// Flush asynchronously and force the data out of the cache.
sync_file_range(fd_, total_write_bytes_, written, SYNC_FILE_RANGE_WRITE);
if (last_synced_bytes_ != 0) {
// Per Linus' recommendation online on how to do fast file IO, do a
// blocking flush of the previous write chunk, and then tell the kernel to
// drop the pages from the cache. This makes sure we can't get too far
// ahead.
sync_file_range(fd_, last_synced_bytes_,
total_write_bytes_ - last_synced_bytes_,
SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
SYNC_FILE_RANGE_WAIT_AFTER);
posix_fadvise(fd_, last_synced_bytes_,
total_write_bytes_ - last_synced_bytes_,
POSIX_FADV_DONTNEED);
}
last_synced_bytes_ = total_write_bytes_;
}
total_write_bytes_ += written;
if (aligned) {
written_aligned_ += written;
}
WriteStatistics()->UpdateStats(end - start, written, iovec.size());
return WriteCode::kOk;
}
WriteCode FileHandler::Close() {
if (!is_open()) {
return WriteCode::kOk;
}
bool ran_out_of_space = false;
if (close(fd_) == -1) {
if (errno == ENOSPC) {
ran_out_of_space = true;
} else {
PLOG(ERROR) << "Closing log file failed";
}
}
fd_ = -1;
VLOG(1) << "Closed " << filename_;
return ran_out_of_space ? WriteCode::kOutOfSpace : WriteCode::kOk;
}
FileBackend::FileBackend(std::string_view base_name, bool supports_odirect)
: supports_odirect_(supports_odirect),
base_name_(base_name),
separator_(base_name_.back() == '/' ? "" : "_") {}
std::unique_ptr<LogSink> FileBackend::RequestFile(std::string_view id) {
const std::string filename = absl::StrCat(base_name_, separator_, id);
return std::make_unique<FileHandler>(filename, supports_odirect_);
}
std::vector<std::string> FileBackend::ListFiles() const {
std::filesystem::path directory(base_name_);
if (!is_directory(directory)) {
directory = directory.parent_path();
}
internal::LocalFileOperations operations(directory.string());
std::vector<std::string> files;
operations.FindLogs(&files);
std::vector<std::string> names;
const std::string prefix = absl::StrCat(base_name_, separator_);
for (const auto &file : files) {
CHECK(absl::StartsWith(file, prefix))
<< ": File " << file << ", prefix " << prefix;
names.push_back(file.substr(prefix.size()));
}
return names;
}
std::unique_ptr<DataDecoder> FileBackend::GetDecoder(
std::string_view id) const {
const std::string filename = absl::StrCat(base_name_, separator_, id);
CHECK(std::filesystem::exists(filename));
return std::make_unique<DummyDecoder>(filename);
}
RenamableFileBackend::RenamableFileBackend(std::string_view base_name,
bool supports_odirect)
: supports_odirect_(supports_odirect),
base_name_(base_name),
separator_(base_name_.back() == '/' ? "" : "_") {}
std::unique_ptr<LogSink> RenamableFileBackend::RequestFile(
std::string_view id) {
const std::string filename =
absl::StrCat(base_name_, separator_, id, temp_suffix_);
return std::make_unique<RenamableFileHandler>(this, filename,
supports_odirect_);
}
void RenamableFileBackend::EnableTempFiles() {
use_temp_files_ = true;
temp_suffix_ = kTempExtension;
}
bool RenamableFileBackend::RenameLogBase(std::string_view new_base_name) {
if (new_base_name == base_name_) {
return true;
}
CHECK(old_base_name_.empty())
<< "Only one change of base_name is supported. Was: " << old_base_name_;
std::string current_directory = base_name_;
std::string new_directory(new_base_name);
auto current_path_split = current_directory.rfind("/");
CHECK(current_path_split != std::string::npos)
<< "Could not find / in the current directory path";
auto new_path_split = new_directory.rfind("/");
CHECK(new_path_split != std::string::npos)
<< "Could not find / in the new directory path";
CHECK(new_base_name.substr(new_path_split) ==
current_directory.substr(current_path_split))
<< "Rename of file base from " << current_directory << " to "
<< new_directory << " is not supported.";
current_directory.resize(current_path_split);
new_directory.resize(new_path_split);
DIR *dir = opendir(current_directory.c_str());
if (dir) {
closedir(dir);
const int result = rename(current_directory.c_str(), new_directory.c_str());
if (result != 0) {
PLOG(ERROR) << "Unable to rename " << current_directory << " to "
<< new_directory;
return false;
}
} else {
// Handle if directory was already renamed.
dir = opendir(new_directory.c_str());
if (!dir) {
LOG(ERROR) << "Old directory " << current_directory
<< " missing and new directory " << new_directory
<< " not present.";
return false;
}
closedir(dir);
}
old_base_name_ = base_name_;
base_name_ = std::string(new_base_name);
separator_ = base_name_.back() == '/' ? "" : "_";
return true;
}
WriteCode RenamableFileBackend::RenameFileAfterClose(
std::string_view filename) {
// Fast check that we can skip rename.
if (!use_temp_files_ && old_base_name_.empty()) {
return WriteCode::kOk;
}
std::string current_filename(filename);
// When changing the base name, we rename the log folder while there active
// buffer writers. Therefore, the name of that active buffer may still refer
// to the old file location rather than the new one.
if (!old_base_name_.empty()) {
auto offset = current_filename.find(old_base_name_);
if (offset != std::string::npos) {
current_filename.replace(offset, old_base_name_.length(), base_name_);
}
}
std::string final_filename = current_filename;
if (use_temp_files_) {
CHECK(current_filename.size() > temp_suffix_.size());
final_filename = current_filename.substr(
0, current_filename.size() - temp_suffix_.size());
}
int result = rename(current_filename.c_str(), final_filename.c_str());
bool ran_out_of_space = false;
if (result != 0) {
if (errno == ENOSPC) {
ran_out_of_space = true;
} else {
PLOG(FATAL) << "Renaming " << current_filename << " to " << final_filename
<< " failed";
}
} else {
VLOG(1) << "Renamed " << current_filename << " -> " << final_filename;
}
return ran_out_of_space ? WriteCode::kOutOfSpace : WriteCode::kOk;
}
WriteCode RenamableFileBackend::RenamableFileHandler::Close() {
if (!is_open()) {
return WriteCode::kOk;
}
if (FileHandler::Close() == WriteCode::kOutOfSpace) {
return WriteCode::kOutOfSpace;
}
if (owner_->RenameFileAfterClose(filename()) == WriteCode::kOutOfSpace) {
return WriteCode::kOutOfSpace;
}
return WriteCode::kOk;
}
} // namespace aos::logger