blob: dc1801d080a538bc093489befddeebb5eeeadfdf [file] [log] [blame]
#include "aos/events/logging/logfile_utils.h"
#include <fcntl.h>
#include <limits.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <vector>
#include "aos/events/logging/logger_generated.h"
#include "flatbuffers/flatbuffers.h"
DEFINE_int32(flush_size, 1000000,
"Number of outstanding bytes to allow before flushing to disk.");
namespace aos {
namespace logger {
DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
: fd_(open(std::string(filename).c_str(),
O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
PCHECK(fd_ != -1) << ": Failed to open " << filename;
}
DetachedBufferWriter::~DetachedBufferWriter() {
Flush();
PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
}
void DetachedBufferWriter::QueueSizedFlatbuffer(
flatbuffers::FlatBufferBuilder *fbb) {
QueueSizedFlatbuffer(fbb->Release());
}
void DetachedBufferWriter::QueueSizedFlatbuffer(
flatbuffers::DetachedBuffer &&buffer) {
queued_size_ += buffer.size();
queue_.emplace_back(std::move(buffer));
// Flush if we are at the max number of iovs per writev, or have written
// enough data. Otherwise writev will fail with an invalid argument.
if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
queue_.size() == IOV_MAX) {
Flush();
}
}
void DetachedBufferWriter::Flush() {
if (queue_.size() == 0u) {
return;
}
iovec_.clear();
iovec_.reserve(queue_.size());
size_t counted_size = 0;
for (size_t i = 0; i < queue_.size(); ++i) {
struct iovec n;
n.iov_base = queue_[i].data();
n.iov_len = queue_[i].size();
counted_size += n.iov_len;
iovec_.emplace_back(std::move(n));
}
CHECK_EQ(counted_size, queued_size_);
const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
PCHECK(written == static_cast<ssize_t>(queued_size_))
<< ": Wrote " << written << " expected " << queued_size_;
queued_size_ = 0;
queue_.clear();
// TODO(austin): Handle partial writes in some way other than crashing...
}
flatbuffers::Offset<MessageHeader> PackMessage(
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, LogType log_type) {
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
switch (log_type) {
case LogType::kLogMessage:
case LogType::kLogMessageAndDeliveryTime:
data_offset =
fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
break;
case LogType::kLogDeliveryTimeOnly:
break;
}
MessageHeader::Builder message_header_builder(*fbb);
message_header_builder.add_channel_index(channel_index);
message_header_builder.add_queue_index(context.queue_index);
message_header_builder.add_monotonic_sent_time(
context.monotonic_event_time.time_since_epoch().count());
message_header_builder.add_realtime_sent_time(
context.realtime_event_time.time_since_epoch().count());
switch (log_type) {
case LogType::kLogMessage:
message_header_builder.add_data(data_offset);
break;
case LogType::kLogMessageAndDeliveryTime:
message_header_builder.add_data(data_offset);
[[fallthrough]];
case LogType::kLogDeliveryTimeOnly:
message_header_builder.add_monotonic_remote_time(
context.monotonic_remote_time.time_since_epoch().count());
message_header_builder.add_realtime_remote_time(
context.realtime_remote_time.time_since_epoch().count());
message_header_builder.add_remote_queue_index(context.remote_queue_index);
break;
}
return message_header_builder.Finish();
}
} // namespace logger
} // namespace aos