Split DetachedBufferWriter into a separate file.
logger.h is too big. Let's pull out some of the concepts, especially
since we want to reuse them for multiple log files. This is the first
patch of a couple.
Change-Id: Ic8065fde8284ea68c4869777b9a8237eff987866
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
new file mode 100644
index 0000000..dc1801d
--- /dev/null
+++ b/aos/events/logging/logfile_utils.cc
@@ -0,0 +1,120 @@
+#include "aos/events/logging/logfile_utils.h"
+
+#include <fcntl.h>
+#include <limits.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+
+#include <vector>
+
+#include "aos/events/logging/logger_generated.h"
+#include "flatbuffers/flatbuffers.h"
+
+DEFINE_int32(flush_size, 1000000,
+ "Number of outstanding bytes to allow before flushing to disk.");
+
+namespace aos {
+namespace logger {
+
+DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
+ : fd_(open(std::string(filename).c_str(),
+ O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
+ PCHECK(fd_ != -1) << ": Failed to open " << filename;
+}
+
+DetachedBufferWriter::~DetachedBufferWriter() {
+ Flush();
+ PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
+}
+
+void DetachedBufferWriter::QueueSizedFlatbuffer(
+ flatbuffers::FlatBufferBuilder *fbb) {
+ QueueSizedFlatbuffer(fbb->Release());
+}
+
+void DetachedBufferWriter::QueueSizedFlatbuffer(
+ flatbuffers::DetachedBuffer &&buffer) {
+ queued_size_ += buffer.size();
+ queue_.emplace_back(std::move(buffer));
+
+ // Flush if we are at the max number of iovs per writev, or have written
+ // enough data. Otherwise writev will fail with an invalid argument.
+ if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
+ queue_.size() == IOV_MAX) {
+ Flush();
+ }
+}
+
+void DetachedBufferWriter::Flush() {
+ if (queue_.size() == 0u) {
+ return;
+ }
+ iovec_.clear();
+ iovec_.reserve(queue_.size());
+ size_t counted_size = 0;
+ for (size_t i = 0; i < queue_.size(); ++i) {
+ struct iovec n;
+ n.iov_base = queue_[i].data();
+ n.iov_len = queue_[i].size();
+ counted_size += n.iov_len;
+ iovec_.emplace_back(std::move(n));
+ }
+ CHECK_EQ(counted_size, queued_size_);
+ const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
+
+ PCHECK(written == static_cast<ssize_t>(queued_size_))
+ << ": Wrote " << written << " expected " << queued_size_;
+
+ queued_size_ = 0;
+ queue_.clear();
+ // TODO(austin): Handle partial writes in some way other than crashing...
+}
+
+flatbuffers::Offset<MessageHeader> PackMessage(
+ flatbuffers::FlatBufferBuilder *fbb, const Context &context,
+ int channel_index, LogType log_type) {
+ flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
+
+ switch (log_type) {
+ case LogType::kLogMessage:
+ case LogType::kLogMessageAndDeliveryTime:
+ data_offset =
+ fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
+ break;
+
+ case LogType::kLogDeliveryTimeOnly:
+ break;
+ }
+
+ MessageHeader::Builder message_header_builder(*fbb);
+ message_header_builder.add_channel_index(channel_index);
+ message_header_builder.add_queue_index(context.queue_index);
+ message_header_builder.add_monotonic_sent_time(
+ context.monotonic_event_time.time_since_epoch().count());
+ message_header_builder.add_realtime_sent_time(
+ context.realtime_event_time.time_since_epoch().count());
+
+ switch (log_type) {
+ case LogType::kLogMessage:
+ message_header_builder.add_data(data_offset);
+ break;
+
+ case LogType::kLogMessageAndDeliveryTime:
+ message_header_builder.add_data(data_offset);
+ [[fallthrough]];
+
+ case LogType::kLogDeliveryTimeOnly:
+ message_header_builder.add_monotonic_remote_time(
+ context.monotonic_remote_time.time_since_epoch().count());
+ message_header_builder.add_realtime_remote_time(
+ context.realtime_remote_time.time_since_epoch().count());
+ message_header_builder.add_remote_queue_index(context.remote_queue_index);
+ break;
+ }
+
+ return message_header_builder.Finish();
+}
+
+} // namespace logger
+} // namespace aos