Split DetachedBufferWriter into a separate file.
logger.h is too big. Let's pull out some of the concepts, especially
since we want to reuse them for multiple log files. This is the first
patch of a couple.
Change-Id: Ic8065fde8284ea68c4869777b9a8237eff987866
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 80d8798..93caf31 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -12,8 +12,14 @@
cc_library(
name = "logger",
- srcs = ["logger.cc"],
- hdrs = ["logger.h"],
+ srcs = [
+ "logfile_utils.cc",
+ "logger.cc",
+ ],
+ hdrs = [
+ "logfile_utils.h",
+ "logger.h",
+ ],
visibility = ["//visibility:public"],
deps = [
":logger_fbs",
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
new file mode 100644
index 0000000..dc1801d
--- /dev/null
+++ b/aos/events/logging/logfile_utils.cc
@@ -0,0 +1,120 @@
+#include "aos/events/logging/logfile_utils.h"
+
+#include <fcntl.h>
+#include <limits.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+
+#include <vector>
+
+#include "aos/events/logging/logger_generated.h"
+#include "flatbuffers/flatbuffers.h"
+
+DEFINE_int32(flush_size, 1000000,
+ "Number of outstanding bytes to allow before flushing to disk.");
+
+namespace aos {
+namespace logger {
+
+DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
+ : fd_(open(std::string(filename).c_str(),
+ O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
+ PCHECK(fd_ != -1) << ": Failed to open " << filename;
+}
+
+DetachedBufferWriter::~DetachedBufferWriter() {
+ Flush();
+ PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
+}
+
+void DetachedBufferWriter::QueueSizedFlatbuffer(
+ flatbuffers::FlatBufferBuilder *fbb) {
+ QueueSizedFlatbuffer(fbb->Release());
+}
+
+void DetachedBufferWriter::QueueSizedFlatbuffer(
+ flatbuffers::DetachedBuffer &&buffer) {
+ queued_size_ += buffer.size();
+ queue_.emplace_back(std::move(buffer));
+
+ // Flush if we are at the max number of iovs per writev, or have written
+ // enough data. Otherwise writev will fail with an invalid argument.
+ if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
+ queue_.size() == IOV_MAX) {
+ Flush();
+ }
+}
+
+void DetachedBufferWriter::Flush() {
+ if (queue_.size() == 0u) {
+ return;
+ }
+ iovec_.clear();
+ iovec_.reserve(queue_.size());
+ size_t counted_size = 0;
+ for (size_t i = 0; i < queue_.size(); ++i) {
+ struct iovec n;
+ n.iov_base = queue_[i].data();
+ n.iov_len = queue_[i].size();
+ counted_size += n.iov_len;
+ iovec_.emplace_back(std::move(n));
+ }
+ CHECK_EQ(counted_size, queued_size_);
+ const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
+
+ PCHECK(written == static_cast<ssize_t>(queued_size_))
+ << ": Wrote " << written << " expected " << queued_size_;
+
+ queued_size_ = 0;
+ queue_.clear();
+ // TODO(austin): Handle partial writes in some way other than crashing...
+}
+
+flatbuffers::Offset<MessageHeader> PackMessage(
+ flatbuffers::FlatBufferBuilder *fbb, const Context &context,
+ int channel_index, LogType log_type) {
+ flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
+
+ switch (log_type) {
+ case LogType::kLogMessage:
+ case LogType::kLogMessageAndDeliveryTime:
+ data_offset =
+ fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
+ break;
+
+ case LogType::kLogDeliveryTimeOnly:
+ break;
+ }
+
+ MessageHeader::Builder message_header_builder(*fbb);
+ message_header_builder.add_channel_index(channel_index);
+ message_header_builder.add_queue_index(context.queue_index);
+ message_header_builder.add_monotonic_sent_time(
+ context.monotonic_event_time.time_since_epoch().count());
+ message_header_builder.add_realtime_sent_time(
+ context.realtime_event_time.time_since_epoch().count());
+
+ switch (log_type) {
+ case LogType::kLogMessage:
+ message_header_builder.add_data(data_offset);
+ break;
+
+ case LogType::kLogMessageAndDeliveryTime:
+ message_header_builder.add_data(data_offset);
+ [[fallthrough]];
+
+ case LogType::kLogDeliveryTimeOnly:
+ message_header_builder.add_monotonic_remote_time(
+ context.monotonic_remote_time.time_since_epoch().count());
+ message_header_builder.add_realtime_remote_time(
+ context.realtime_remote_time.time_since_epoch().count());
+ message_header_builder.add_remote_queue_index(context.remote_queue_index);
+ break;
+ }
+
+ return message_header_builder.Finish();
+}
+
+} // namespace logger
+} // namespace aos
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
new file mode 100644
index 0000000..0892290
--- /dev/null
+++ b/aos/events/logging/logfile_utils.h
@@ -0,0 +1,78 @@
+#ifndef AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
+#define AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
+
+#include <sys/uio.h>
+
+#include <string_view>
+#include <vector>
+
+#include "aos/events/event_loop.h"
+#include "aos/events/logging/logger_generated.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+namespace logger {
+
+enum class LogType : uint8_t {
+ // The message originated on this node and should be logged here.
+ kLogMessage,
+ // The message originated on another node, but only the delivery times are
+ // logged here.
+ kLogDeliveryTimeOnly,
+ // The message originated on another node. Log it and the delivery times
+ // together. The message_gateway is responsible for logging any messages
+ // which didn't get delivered.
+ kLogMessageAndDeliveryTime
+};
+
+
+// This class manages efficiently writing a sequence of detached buffers to a
+// file. It queues them up and batches the write operation.
+class DetachedBufferWriter {
+ public:
+ DetachedBufferWriter(std::string_view filename);
+ ~DetachedBufferWriter();
+
+ // TODO(austin): Snappy compress the log file if it ends with .snappy!
+
+ // Queues up a finished FlatBufferBuilder to be written. Steals the detached
+ // buffer from it.
+ void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
+ // Queues up a detached buffer directly.
+ void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
+
+ // Triggers data to be provided to the kernel and written.
+ void Flush();
+
+ private:
+ int fd_ = -1;
+
+ // Size of all the data in the queue.
+ size_t queued_size_ = 0;
+
+ // List of buffers to flush.
+ std::vector<flatbuffers::DetachedBuffer> queue_;
+ // List of iovecs to use with writev. This is a member variable to avoid
+ // churn.
+ std::vector<struct iovec> iovec_;
+};
+
+// Packes a message pointed to by the context into a MessageHeader.
+flatbuffers::Offset<MessageHeader> PackMessage(
+ flatbuffers::FlatBufferBuilder *fbb, const Context &context,
+ int channel_index, LogType log_type);
+
+// TODO(austin): 3 objects:
+// 1) log chunk reader. Returns span.
+// 2) Sorted message header reader. Returns sorted messages.
+// 3) LogReader, which does all the registration.
+//
+// Then, we can do a multi-node sim which forwards data nicely, try logging it, and then try replaying it.
+
+// Optimization:
+// Allocate the 256k blocks like we do today. But, refcount them with shared_ptr pointed to by the messageheader that is returned. This avoids the copy.
+
+} // namespace logger
+} // namespace aos
+
+#endif // AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index b247944..e7d0d3b 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -16,8 +16,6 @@
#include "aos/time/time.h"
#include "flatbuffers/flatbuffers.h"
-DEFINE_int32(flush_size, 1000000,
- "Number of outstanding bytes to allow before flushing to disk.");
DEFINE_bool(skip_missing_forwarding_entries, false,
"If true, drop any forwarding entries with missing data. If "
"false, CHECK.");
@@ -27,60 +25,6 @@
namespace chrono = std::chrono;
-DetachedBufferWriter::DetachedBufferWriter(absl::string_view filename)
- : fd_(open(std::string(filename).c_str(),
- O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
- PCHECK(fd_ != -1) << ": Failed to open " << filename;
-}
-
-DetachedBufferWriter::~DetachedBufferWriter() {
- Flush();
- PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
-}
-
-void DetachedBufferWriter::QueueSizedFlatbuffer(
- flatbuffers::FlatBufferBuilder *fbb) {
- QueueSizedFlatbuffer(fbb->Release());
-}
-
-void DetachedBufferWriter::QueueSizedFlatbuffer(
- flatbuffers::DetachedBuffer &&buffer) {
- queued_size_ += buffer.size();
- queue_.emplace_back(std::move(buffer));
-
- // Flush if we are at the max number of iovs per writev, or have written
- // enough data. Otherwise writev will fail with an invalid argument.
- if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
- queue_.size() == IOV_MAX) {
- Flush();
- }
-}
-
-void DetachedBufferWriter::Flush() {
- if (queue_.size() == 0u) {
- return;
- }
- iovec_.clear();
- iovec_.reserve(queue_.size());
- size_t counted_size = 0;
- for (size_t i = 0; i < queue_.size(); ++i) {
- struct iovec n;
- n.iov_base = queue_[i].data();
- n.iov_len = queue_[i].size();
- counted_size += n.iov_len;
- iovec_.emplace_back(std::move(n));
- }
- CHECK_EQ(counted_size, queued_size_);
- const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
-
- PCHECK(written == static_cast<ssize_t>(queued_size_))
- << ": Wrote " << written << " expected " << queued_size_;
-
- queued_size_ = 0;
- queue_.clear();
- // TODO(austin): Handle partial writes in some way other than crashing...
-}
-
Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
std::chrono::milliseconds polling_period)
: event_loop_(event_loop),
@@ -208,51 +152,6 @@
});
}
-flatbuffers::Offset<MessageHeader> PackMessage(
- flatbuffers::FlatBufferBuilder *fbb, const Context &context,
- int channel_index, LogType log_type) {
- flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
-
- switch(log_type) {
- case LogType::kLogMessage:
- case LogType::kLogMessageAndDeliveryTime:
- data_offset =
- fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
- break;
-
- case LogType::kLogDeliveryTimeOnly:
- break;
- }
-
- MessageHeader::Builder message_header_builder(*fbb);
- message_header_builder.add_channel_index(channel_index);
- message_header_builder.add_queue_index(context.queue_index);
- message_header_builder.add_monotonic_sent_time(
- context.monotonic_event_time.time_since_epoch().count());
- message_header_builder.add_realtime_sent_time(
- context.realtime_event_time.time_since_epoch().count());
-
- switch (log_type) {
- case LogType::kLogMessage:
- message_header_builder.add_data(data_offset);
- break;
-
- case LogType::kLogMessageAndDeliveryTime:
- message_header_builder.add_data(data_offset);
- [[fallthrough]];
-
- case LogType::kLogDeliveryTimeOnly:
- message_header_builder.add_monotonic_remote_time(
- context.monotonic_remote_time.time_since_epoch().count());
- message_header_builder.add_realtime_remote_time(
- context.realtime_remote_time.time_since_epoch().count());
- message_header_builder.add_remote_queue_index(context.remote_queue_index);
- break;
- }
-
- return message_header_builder.Finish();
-}
-
void Logger::DoLogData() {
// We want to guarentee that messages aren't out of order by more than
// max_out_of_order_duration. To do this, we need sync points. Every write
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index d44a885..e44f65f 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -7,6 +7,7 @@
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "aos/events/event_loop.h"
+#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/time/time.h"
@@ -15,49 +16,6 @@
namespace aos {
namespace logger {
-// This class manages efficiently writing a sequence of detached buffers to a
-// file. It queues them up and batches the write operation.
-class DetachedBufferWriter {
- public:
- DetachedBufferWriter(absl::string_view filename);
- ~DetachedBufferWriter();
-
- // TODO(austin): Snappy compress the log file if it ends with .snappy!
-
- // Queues up a finished FlatBufferBuilder to be written. Steals the detached
- // buffer from it.
- void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
- // Queues up a detached buffer directly.
- void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
-
- // Triggers data to be provided to the kernel and written.
- void Flush();
-
- private:
- int fd_ = -1;
-
- // Size of all the data in the queue.
- size_t queued_size_ = 0;
-
- // List of buffers to flush.
- std::vector<flatbuffers::DetachedBuffer> queue_;
- // List of iovecs to use with writev. This is a member variable to avoid
- // churn.
- std::vector<struct iovec> iovec_;
-};
-
-enum class LogType : uint8_t {
- // The message originated on this node and should be logged here.
- kLogMessage,
- // The message originated on another node, but only the delivery times are
- // logged here.
- kLogDeliveryTimeOnly,
- // The message originated on another node. Log it and the delivery times
- // together. The message_gateway is responsible for logging any messages
- // which didn't get delivered.
- kLogMessageAndDeliveryTime
-};
-
// Logs all channels available in the event loop to disk every 100 ms.
// Start by logging one message per channel to capture any state and
// configuration that is sent rately on a channel and would affect execution.
@@ -98,11 +56,6 @@
size_t max_header_size_ = 0;
};
-// Packes a message pointed to by the context into a MessageHeader.
-flatbuffers::Offset<MessageHeader> PackMessage(
- flatbuffers::FlatBufferBuilder *fbb, const Context &context,
- int channel_index, LogType log_type);
-
// Replays all the channels in the logfile to the event loop.
class LogReader {
public: