Introduce interfaces for compressing and decompressing log files
Change-Id: Ia7da3f840a1780a04203f1c312447b50b142a5a3
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 9534860..f64881c 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -15,12 +15,14 @@
#include <vector>
#include "absl/types/span.h"
+#include "aos/containers/resizeable_buffer.h"
#include "aos/events/event_loop.h"
+#include "aos/events/logging/buffer_encoder.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/flatbuffers.h"
#include "flatbuffers/flatbuffers.h"
-namespace aos {
-namespace logger {
+namespace aos::logger {
enum class LogType : uint8_t {
// The message originated on this node and should be logged here.
@@ -37,10 +39,11 @@
};
// This class manages efficiently writing a sequence of detached buffers to a
-// file. It queues them up and batches the write operation.
+// file. It encodes them, queues them up, and batches the write operation.
class DetachedBufferWriter {
public:
- DetachedBufferWriter(std::string_view filename);
+ DetachedBufferWriter(std::string_view filename,
+ std::unique_ptr<DetachedBufferEncoder> encoder);
DetachedBufferWriter(DetachedBufferWriter &&other);
DetachedBufferWriter(const DetachedBufferWriter &) = delete;
@@ -55,39 +58,86 @@
// it. The main use case is updating start times after a log file starts.
void RewriteLocation(off64_t offset, absl::Span<const uint8_t> data);
- // TODO(austin): Snappy compress the log file if it ends with .snappy!
+ // Queues up a finished FlatBufferBuilder to be encoded and written.
+ //
+ // Triggers a flush if there's enough data queued up.
+ //
+ // Steals the detached buffer from it.
+ void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb) {
+ QueueSizedFlatbuffer(fbb->Release());
+ }
+ // May steal the backing storage of buffer, or may leave it alone.
+ void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
+ encoder_->Encode(std::move(buffer));
+ FlushAtThreshold();
+ }
- // Queues up a finished FlatBufferBuilder to be written. Steals the detached
- // buffer from it.
- void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
- // Queues up a detached buffer directly.
- void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
- // Writes a Span. This is not terribly optimized right now.
- void WriteSizedFlatbuffer(absl::Span<const uint8_t> span);
+ // Queues up data in span. May copy or may write it to disk immediately.
+ void QueueSpan(absl::Span<const uint8_t> span);
- // Triggers data to be provided to the kernel and written.
- void Flush();
+ // Returns the total number of bytes written and currently queued.
+ size_t total_bytes() const { return encoder_->total_bytes(); }
- // Returns the number of bytes written.
- size_t written_size() const { return written_size_; }
-
- // Returns the number of bytes written or currently queued.
- size_t total_size() const { return written_size_ + queued_size_; }
+ // The maximum time for a single write call, or 0 if none have been performed.
+ std::chrono::nanoseconds max_write_time() const { return max_write_time_; }
+ // The number of bytes in the longest write call, or -1 if none have been
+ // performed.
+ int max_write_time_bytes() const { return max_write_time_bytes_; }
+ // The number of buffers in the longest write call, or -1 if none have been
+ // performed.
+ int max_write_time_messages() const { return max_write_time_messages_; }
+ // The total time spent in write calls.
+ std::chrono::nanoseconds total_write_time() const {
+ return total_write_time_;
+ }
+ // The total number of writes which have been performed.
+ int total_write_count() const { return total_write_count_; }
+ // The total number of messages which have been written.
+ int total_write_messages() const { return total_write_messages_; }
+ // The total number of bytes which have been written.
+ int total_write_bytes() const { return total_write_bytes_; }
+ void ResetStatistics() {
+ max_write_time_ = std::chrono::nanoseconds::zero();
+ max_write_time_bytes_ = -1;
+ max_write_time_messages_ = -1;
+ total_write_time_ = std::chrono::nanoseconds::zero();
+ total_write_count_ = 0;
+ total_write_messages_ = 0;
+ total_write_bytes_ = 0;
+ }
private:
+ // Performs a single writev call with as much of the data we have queued up as
+ // possible.
+ //
+ // This will normally take all of the data we have queued up, unless an
+ // encoder has spit out a big enough chunk all at once that we can't manage
+ // all of it.
+ void Flush();
+
+ void UpdateStatsForWrite(aos::monotonic_clock::duration duration,
+ ssize_t written, int iovec_size);
+
+ // Flushes data if we've reached the threshold to do that as part of normal
+ // operation.
+ void FlushAtThreshold();
+
std::string filename_;
+ std::unique_ptr<DetachedBufferEncoder> encoder_;
int fd_ = -1;
- // Size of all the data in the queue.
- size_t queued_size_ = 0;
- size_t written_size_ = 0;
-
- // List of buffers to flush.
- std::vector<flatbuffers::DetachedBuffer> queue_;
// List of iovecs to use with writev. This is a member variable to avoid
// churn.
std::vector<struct iovec> iovec_;
+
+ std::chrono::nanoseconds max_write_time_ = std::chrono::nanoseconds::zero();
+ int max_write_time_bytes_ = -1;
+ int max_write_time_messages_ = -1;
+ std::chrono::nanoseconds total_write_time_ = std::chrono::nanoseconds::zero();
+ int total_write_count_ = 0;
+ int total_write_messages_ = 0;
+ int total_write_bytes_ = 0;
};
// Packes a message pointed to by the context into a MessageHeader.
@@ -104,8 +154,6 @@
public:
SpanReader(std::string_view filename);
- ~SpanReader() { close(fd_); }
-
std::string_view filename() const { return filename_; }
// Returns a span with the data for a message from the log file, excluding
@@ -121,50 +169,22 @@
// Allocate the 256k blocks like we do today. But, refcount them with
// shared_ptr pointed to by the messageheader that is returned. This avoids
// the copy. Need to do more benchmarking.
+ // And (Brian): Consider just mmapping the file and handing out refcounted
+ // pointers into that too.
// Reads a chunk of data into data_. Returns false if no data was read.
bool ReadBlock();
const std::string filename_;
- // File descriptor for the log file.
- int fd_ = -1;
+ // File reader and data decoder.
+ std::unique_ptr<DataDecoder> decoder_;
- // Allocator which doesn't zero initialize memory.
- template <typename T>
- struct DefaultInitAllocator {
- typedef T value_type;
-
- template <typename U>
- void construct(U *p) {
- ::new (static_cast<void *>(p)) U;
- }
-
- template <typename U, typename... Args>
- void construct(U *p, Args &&... args) {
- ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
- }
-
- T *allocate(std::size_t n) {
- return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
- }
-
- template <typename U>
- void deallocate(U *p, std::size_t /*n*/) {
- ::operator delete(static_cast<void *>(p));
- }
- };
-
- // Vector to read into. This uses an allocator which doesn't zero
- // initialize the memory.
- std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
+ // Vector to read into.
+ ResizeableBuffer data_;
// Amount of data consumed already in data_.
size_t consumed_data_ = 0;
-
- // Cached bit for if we have reached the end of the file. Otherwise we will
- // hammer on the kernel asking for more data each time we send.
- bool end_of_file_ = false;
};
// Class which handles reading the header and messages from the log file. This
@@ -668,7 +688,6 @@
// a single node.
std::string MaybeNodeName(const Node *);
-} // namespace logger
-} // namespace aos
+} // namespace aos::logger
#endif // AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_