Introduce interfaces for compressing and decompressing log files
Change-Id: Ia7da3f840a1780a04203f1c312447b50b142a5a3
diff --git a/aos/containers/BUILD b/aos/containers/BUILD
index 3e24fab..8277212 100644
--- a/aos/containers/BUILD
+++ b/aos/containers/BUILD
@@ -56,3 +56,13 @@
"//aos/testing:googletest",
],
)
+
+cc_library(
+ name = "resizeable_buffer",
+ hdrs = [
+ "resizeable_buffer.h",
+ ],
+ deps = [
+ "@com_github_google_glog//:glog",
+ ],
+)
diff --git a/aos/containers/resizeable_buffer.h b/aos/containers/resizeable_buffer.h
new file mode 100644
index 0000000..b94f8cc
--- /dev/null
+++ b/aos/containers/resizeable_buffer.h
@@ -0,0 +1,101 @@
+#ifndef AOS_CONTAINERS_RESIZEABLE_BUFFER_H_
+#define AOS_CONTAINERS_RESIZEABLE_BUFFER_H_
+
+#include <stdlib.h>
+
+#include <memory>
+
+#include "glog/logging.h"
+
+namespace aos {
+
+// Kind of like a subset of vector<uint8_t>, but with less destructor calls.
+// When building unoptimized, especially with sanitizers, the vector<uint8_t>
+// version ends up being really slow in tests.
+class ResizeableBuffer {
+ public:
+ ResizeableBuffer() = default;
+
+ ResizeableBuffer(const ResizeableBuffer &other) { *this = other; }
+ ResizeableBuffer(ResizeableBuffer &&other) { *this = std::move(other); }
+ ResizeableBuffer &operator=(const ResizeableBuffer &other) {
+ resize(other.size());
+ memcpy(storage_.get(), other.storage_.get(), size());
+ return *this;
+ }
+ ResizeableBuffer &operator=(ResizeableBuffer &&other) {
+ std::swap(storage_, other.storage_);
+ std::swap(size_, other.size_);
+ std::swap(capacity_, other.capacity_);
+ return *this;
+ }
+
+ uint8_t *data() { return static_cast<uint8_t *>(storage_.get()); }
+ const uint8_t *data() const {
+ return static_cast<const uint8_t *>(storage_.get());
+ }
+
+ uint8_t *begin() { return data(); }
+ const uint8_t *begin() const { return data(); }
+ uint8_t *end() { return data() + size(); }
+ const uint8_t *end() const { return data() + size(); }
+
+ size_t size() const { return size_; }
+ size_t capacity() const { return capacity_; }
+
+ void resize(size_t new_size) {
+ if (new_size > capacity_) {
+ Allocate(new_size);
+ }
+ size_ = new_size;
+ }
+
+ void erase_front(size_t count) {
+ if (count == 0) {
+ return;
+ }
+ CHECK_LE(count, size_);
+ memmove(static_cast<void *>(data()), static_cast<void *>(data() + count),
+ size_ - count);
+ size_ -= count;
+ }
+
+ void push_back(uint8_t x) {
+ if (size_ == capacity_) {
+ Allocate(std::max<size_t>(16, size_ * 2));
+ }
+ *end() = x;
+ ++size_;
+ CHECK_LE(size_, capacity_);
+ }
+
+ private:
+ // We need this silly function because C++ is bad, and extensions to it which
+ // we work with make it a true nightmare.
+ //
+ // (a) You can't easily write out the signature of free because it depends on
+ // whether exceptions are enabled or not. You could use decltype, but see (b).
+ //
+ // (b) You can't easily write &free because CUDA overloads it with a
+ // __device__ version. You could cast to the appropriate version, but see (a).
+ //
+ // There's probably some kind of SFINAE thing which could find the matching
+ // signature from a set of choices, and then we could just
+ // static_cast<TheSignature>(&free). However, that sounds like a nightmare,
+ // especially because it has to conditionally enable the part mentioning CUDA
+ // identifiers in the preprocessor. This little function is way simpler.
+ static void DoFree(void *p) { free(p); }
+
+ void Allocate(size_t new_capacity) {
+ void *const old = storage_.release();
+ storage_.reset(CHECK_NOTNULL(realloc(old, new_capacity)));
+ capacity_ = new_capacity;
+ }
+
+ std::unique_ptr<void, decltype(&DoFree)> storage_{nullptr, &DoFree};
+ size_t size_ = 0, capacity_ = 0;
+};
+
+} // namespace aos
+
+#endif // AOS_CONTAINERS_RESIZEABLE_BUFFER_H_
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 48409bb..3f43e07 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -21,9 +21,12 @@
],
visibility = ["//visibility:public"],
deps = [
+ ":buffer_encoder",
":logger_fbs",
"//aos:configuration",
"//aos:flatbuffer_merge",
+ "//aos:flatbuffers",
+ "//aos/containers:resizeable_buffer",
"//aos/events:event_loop",
"//aos/util:file",
"@com_github_gflags_gflags//:gflags",
@@ -34,6 +37,56 @@
)
cc_library(
+ name = "buffer_encoder",
+ srcs = [
+ "buffer_encoder.cc",
+ ],
+ hdrs = [
+ "buffer_encoder.h",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ ":logger_fbs",
+ "//aos:configuration_fbs",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ "@com_github_google_glog//:glog",
+ "@com_google_absl//absl/types:span",
+ ],
+)
+
+cc_test(
+ name = "buffer_encoder_test",
+ srcs = [
+ "buffer_encoder_test.cc",
+ ],
+ shard_count = 4,
+ deps = [
+ ":buffer_encoder",
+ ":buffer_encoder_param_test",
+ "//aos/testing:googletest",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
+cc_library(
+ name = "buffer_encoder_param_test",
+ testonly = True,
+ srcs = [
+ "buffer_encoder_param_test.cc",
+ ],
+ hdrs = [
+ "buffer_encoder_param_test.h",
+ ],
+ deps = [
+ ":logfile_utils",
+ ":logger_fbs",
+ "//aos/testing:googletest",
+ "//aos/testing:random_seed",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
+cc_library(
name = "logger",
srcs = [
"log_namer.cc",
diff --git a/aos/events/logging/buffer_encoder.cc b/aos/events/logging/buffer_encoder.cc
new file mode 100644
index 0000000..10a7ed1
--- /dev/null
+++ b/aos/events/logging/buffer_encoder.cc
@@ -0,0 +1,65 @@
+#include "aos/events/logging/buffer_encoder.h"
+
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#include "glog/logging.h"
+
+namespace aos::logger {
+
+void DummyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
+ CHECK(in.data()) << ": Encode called with nullptr.";
+
+ total_bytes_ += in.size();
+ queue_.emplace_back(std::move(in));
+}
+
+void DummyEncoder::Clear(const int n) {
+ CHECK_GE(n, 0);
+ CHECK_LE(static_cast<size_t>(n), queue_size());
+ queue_.erase(queue_.begin(), queue_.begin() + n);
+}
+
+std::vector<absl::Span<const uint8_t>> DummyEncoder::queue() const {
+ std::vector<absl::Span<const uint8_t>> queue;
+ queue.reserve(queue_.size());
+ for (const auto &buffer : queue_) {
+ queue.emplace_back(buffer.data(), buffer.size());
+ }
+ return queue;
+}
+
+size_t DummyEncoder::queued_bytes() const {
+ size_t bytes = 0;
+ for (const auto &buffer : queue_) {
+ bytes += buffer.size();
+ }
+ return bytes;
+}
+
+DummyDecoder::DummyDecoder(std::string_view filename)
+ : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
+ PCHECK(fd_ != -1) << ": Failed to open " << filename;
+}
+
+DummyDecoder::~DummyDecoder() {
+ int status = close(fd_);
+ if (status != 0) {
+ PLOG(ERROR) << "DummyDecoder: Failed to close file";
+ }
+}
+
+size_t DummyDecoder::Read(uint8_t *begin, uint8_t *end) {
+ if (end_of_file_) {
+ return 0;
+ }
+ const ssize_t count = read(fd_, begin, end - begin);
+ PCHECK(count >= 0) << ": Failed to read from file";
+ if (count == 0) {
+ end_of_file_ = true;
+ }
+ return static_cast<size_t>(count);
+}
+
+} // namespace aos::logger
diff --git a/aos/events/logging/buffer_encoder.h b/aos/events/logging/buffer_encoder.h
new file mode 100644
index 0000000..cc16fea
--- /dev/null
+++ b/aos/events/logging/buffer_encoder.h
@@ -0,0 +1,108 @@
+#ifndef AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
+#define AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
+
+#include "absl/types/span.h"
+#include "flatbuffers/flatbuffers.h"
+
+#include "aos/events/logging/logger_generated.h"
+
+namespace aos::logger {
+
+// Interface to encode DetachedBuffers as they are written to a file.
+//
+// The interface to a file is represented as a series of buffers, appropriate to
+// pass to writev(2). The backing storage for these is managed internally so
+// subclasses can handle resizing/moving as desired. Implementations must
+// enqueue as many of these backing buffers as demanded, leaving it up to the
+// caller to write them to a file when desired.
+class DetachedBufferEncoder {
+ public:
+ virtual ~DetachedBufferEncoder() = default;
+
+ // Encodes and enqueues the given DetachedBuffer.
+ //
+ // Note that in may be moved-from, or it may be left unchanged, depending on
+ // what makes the most sense for a given implementation.
+ virtual void Encode(flatbuffers::DetachedBuffer &&in) = 0;
+
+ // If this returns true, the encoder may be bypassed by writing directly to
+ // the file.
+ virtual bool may_bypass() const { return false; }
+
+ // Finalizes the encoding process. After this, queue_size() represents the
+ // full extent of data which will be written to this file.
+ //
+ // Encode may not be called after this method.
+ virtual void Finish() = 0;
+
+ // Clears the first n encoded buffers from the queue.
+ virtual void Clear(int n) = 0;
+
+ // Returns a view of the queue of encoded buffers.
+ virtual std::vector<absl::Span<const uint8_t>> queue() const = 0;
+
+ // Returns the total number of of bytes currently queued up.
+ virtual size_t queued_bytes() const = 0;
+
+ // Returns the cumulative number of bytes which have been queued. This
+ // includes data which has been removed via Clear.
+ virtual size_t total_bytes() const = 0;
+
+ // Returns the number of elements in the queue.
+ virtual size_t queue_size() const = 0;
+};
+
+// This class does not encode the data. It just claims ownership of the raw data
+// and queues it up as is.
+class DummyEncoder final : public DetachedBufferEncoder {
+ public:
+ ~DummyEncoder() override = default;
+
+ // No encoding happens, the raw data is queued up as is.
+ void Encode(flatbuffers::DetachedBuffer &&in) final;
+ bool may_bypass() const final { return true; }
+ void Finish() final {}
+ void Clear(int n) final;
+ std::vector<absl::Span<const uint8_t>> queue() const final;
+ size_t queued_bytes() const final;
+ size_t total_bytes() const final { return total_bytes_; }
+ size_t queue_size() const final { return queue_.size(); }
+
+ private:
+ std::vector<flatbuffers::DetachedBuffer> queue_;
+ size_t total_bytes_ = 0;
+};
+
+// Interface to decode chunks of data. Implementations of this interface will
+// manage opening, reading, and closing the file stream.
+class DataDecoder {
+ public:
+ virtual ~DataDecoder() = default;
+
+ // Reads data into the given range. Returns the number of bytes read.
+ //
+ // Returns less than end-begin if all bytes have been read. Otherwise, this
+ // will always fill the whole range.
+ virtual size_t Read(uint8_t *begin, uint8_t *end) = 0;
+};
+
+// Simply reads the contents of the file into the target buffer.
+class DummyDecoder final : public DataDecoder {
+ public:
+ explicit DummyDecoder(std::string_view filename);
+ ~DummyDecoder() override;
+
+ size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+ // File descriptor for the log file.
+ int fd_;
+
+ // Cached bit for if we have reached the end of the file. Otherwise we will
+ // hammer on the kernel asking for more data each time we send.
+ bool end_of_file_ = false;
+};
+
+} // namespace aos::logger
+
+#endif // AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
diff --git a/aos/events/logging/buffer_encoder_param_test.cc b/aos/events/logging/buffer_encoder_param_test.cc
new file mode 100644
index 0000000..825696e
--- /dev/null
+++ b/aos/events/logging/buffer_encoder_param_test.cc
@@ -0,0 +1,92 @@
+#include "aos/events/logging/buffer_encoder_param_test.h"
+
+#include "gmock/gmock.h"
+
+namespace aos::logger::testing {
+
+// Verifies that Clear affects the sizes as expected.
+TEST_P(BufferEncoderTest, ClearAndSizes) {
+ const auto encoder = MakeEncoder();
+
+ std::uniform_int_distribution<int> quantity_distribution(500, 600);
+ const int first_quantity = quantity_distribution(*random_number_generator());
+ const int second_quantity = quantity_distribution(*random_number_generator());
+
+ const auto first_buffers = CreateAndEncode(first_quantity, encoder.get());
+ const auto first_size = encoder->queued_bytes();
+ // We want at least 2 buffers to ensure this test is actually validating the
+ // behavior.
+ ASSERT_GT(first_size, 2u) << ": Encoder chunk size is too big for this test";
+ ASSERT_EQ(encoder->total_bytes(), first_size);
+ ASSERT_EQ(TotalSize(encoder->queue()), first_size);
+ ASSERT_EQ(encoder->queue_size(), encoder->queue().size());
+
+ encoder->Clear(encoder->queue_size());
+ ASSERT_EQ(encoder->queued_bytes(), 0);
+ ASSERT_EQ(encoder->total_bytes(), first_size);
+ ASSERT_EQ(encoder->queue().size(), 0);
+ ASSERT_EQ(encoder->queue_size(), 0);
+
+ const auto second_buffers = CreateAndEncode(second_quantity, encoder.get());
+ const auto second_size = encoder->queued_bytes();
+ ASSERT_GT(second_size, 2u) << ": Encoder chunk size is too big for this test";
+ ASSERT_EQ(encoder->total_bytes(), first_size + second_size);
+ ASSERT_EQ(TotalSize(encoder->queue()), second_size);
+ ASSERT_EQ(encoder->queue_size(), encoder->queue().size());
+}
+
+// Runs data in randomly-chunked sizes through the encoder and decoder to verify
+// it comes back out the same.
+TEST_P(BufferEncoderTest, RoundTrip) {
+ std::uniform_int_distribution<int> quantity_distribution(20, 60);
+ const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+ const std::string file_path = std::string(test_dir) + "/foo";
+
+ std::vector<std::vector<uint8_t>> encoded_buffers;
+ {
+ const int encode_chunks = quantity_distribution(*random_number_generator());
+ const auto encoder = MakeEncoder();
+ encoded_buffers = CreateAndEncode(encode_chunks, encoder.get());
+ encoder->Finish();
+
+ std::ofstream output_file(file_path, std::ios::binary);
+ for (auto span : encoder->queue()) {
+ output_file.write(reinterpret_cast<const char *>(span.data()),
+ span.size());
+ }
+ }
+
+ const size_t total_encoded_size = TotalSize(encoded_buffers);
+
+ // Try decoding in multiple random chunkings.
+ for (int i = 0; i < 20; ++i) {
+ const auto decoder = MakeDecoder(file_path);
+ std::vector<std::vector<uint8_t>> decoded_buffers;
+ size_t total_decoded_size = 0;
+ while (true) {
+ const int chunk_size = quantity_distribution(*random_number_generator());
+ std::vector<uint8_t> chunk(chunk_size);
+ const size_t read_result =
+ decoder->Read(chunk.data(), chunk.data() + chunk_size);
+ if (read_result + total_decoded_size != total_encoded_size) {
+ // We didn't read anything, so we should've read the complete chunk.
+ ASSERT_EQ(read_result, chunk_size);
+ }
+ // Eventually we'll get here, once the decoder is really sure it's done.
+ if (read_result == 0) {
+ // Sanity check the math in the test code.
+ CHECK_EQ(total_decoded_size, TotalSize(decoded_buffers));
+ // Bail out because we're done.
+ break;
+ }
+ // If we're at the end, trim off the 0s so our comparison later works out.
+ chunk.resize(read_result);
+ total_decoded_size += read_result;
+ decoded_buffers.emplace_back(std::move(chunk));
+ }
+ ASSERT_THAT(Flatten(decoded_buffers),
+ ::testing::Eq(Flatten(encoded_buffers)));
+ }
+}
+
+} // namespace aos::logger::testing
diff --git a/aos/events/logging/buffer_encoder_param_test.h b/aos/events/logging/buffer_encoder_param_test.h
new file mode 100644
index 0000000..fd1a00c
--- /dev/null
+++ b/aos/events/logging/buffer_encoder_param_test.h
@@ -0,0 +1,114 @@
+#ifndef AOS_EVENTS_LOGGING_BUFFER_ENCODER_PARAM_TEST_H_
+#define AOS_EVENTS_LOGGING_BUFFER_ENCODER_PARAM_TEST_H_
+
+#include <functional>
+#include <memory>
+#include <random>
+#include <vector>
+
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/testing/random_seed.h"
+
+namespace aos::logger::testing {
+
+// Contains some helpful infrastructure for testing DetachedBufferEncoder
+// implementations.
+class BufferEncoderBaseTest : public ::testing::Test {
+ public:
+ // Creates and encodes n random buffers, returning a copy of the encoded data.
+ std::vector<std::vector<uint8_t>> CreateAndEncode(
+ int n, DetachedBufferEncoder *encoder) {
+ std::vector<std::vector<uint8_t>> result;
+ for (int i = 0; i < n; i++) {
+ flatbuffers::DetachedBuffer buffer = CreateRandomBuffer();
+ result.emplace_back(buffer.data(), buffer.data() + buffer.size());
+ encoder->Encode(std::move(buffer));
+ }
+ return result;
+ }
+
+ // Returns the total size of a vector full of objects with a size() method.
+ template <typename T>
+ static size_t TotalSize(const std::vector<T> &buffers) {
+ size_t result = 0;
+ for (const auto &v : buffers) {
+ result += v.size();
+ }
+ return result;
+ }
+
+ // Returns a flattened copy of a vector of sequences.
+ template <typename T>
+ static std::vector<typename T::value_type> Flatten(
+ const std::vector<T> &buffers) {
+ std::vector<typename T::value_type> result;
+ for (const auto &buffer : buffers) {
+ result.insert(result.end(), buffer.begin(), buffer.end());
+ }
+ return result;
+ }
+
+ void Reseed(int seed) {
+ // This algorithm allows a reasonable --runs_per_test to cover new seeds.
+ random_number_generator_ =
+ std::mt19937(::aos::testing::RandomSeed() + 1000 * seed);
+ }
+
+ std::mt19937 *random_number_generator() { return &random_number_generator_; }
+
+ private:
+ flatbuffers::DetachedBuffer CreateRandomBuffer() {
+ std::uniform_int_distribution<int> length_distribution(2800, 3000);
+ const size_t length = length_distribution(random_number_generator_);
+
+ flatbuffers::FlatBufferBuilder fbb;
+ uint8_t *data;
+ const flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
+ fbb.CreateUninitializedVector(length, &data);
+ {
+ std::independent_bits_engine<std::mt19937, CHAR_BIT, uint8_t> engine(
+ std::ref(random_number_generator_));
+ const uint8_t byte = engine();
+ std::fill(data, data + length, byte);
+ }
+
+ MessageHeader::Builder builder(fbb);
+ builder.add_data(data_offset);
+ fbb.FinishSizePrefixed(builder.Finish());
+
+ return fbb.Release();
+ }
+
+ std::mt19937 random_number_generator_{
+ std::mt19937(::aos::testing::RandomSeed())};
+};
+
+// Tests some generic properties for any DetachedBufferEncoder+DataDecoder
+// implementation pair.
+//
+// First and second test parameters are methods to create instances of the
+// encoder and decoder. Third test parameter is a random seed, to allow testing
+// many different data patterns.
+class BufferEncoderTest
+ : public BufferEncoderBaseTest,
+ public ::testing::WithParamInterface<std::tuple<
+ std::function<std::unique_ptr<DetachedBufferEncoder>()>,
+ std::function<std::unique_ptr<DataDecoder>(std::string_view)>, int>> {
+ public:
+ BufferEncoderTest() { Reseed(std::get<2>(GetParam())); }
+
+ std::unique_ptr<DetachedBufferEncoder> MakeEncoder() const {
+ return std::get<0>(GetParam())();
+ }
+ std::unique_ptr<DataDecoder> MakeDecoder(std::string_view filename) const {
+ return std::get<1>(GetParam())(filename);
+ }
+};
+
+} // namespace aos::logger::testing
+
+#endif // AOS_EVENTS_LOGGING_BUFFER_ENCODER_PARAM_TEST_H_
diff --git a/aos/events/logging/buffer_encoder_test.cc b/aos/events/logging/buffer_encoder_test.cc
new file mode 100644
index 0000000..a9c2bbb
--- /dev/null
+++ b/aos/events/logging/buffer_encoder_test.cc
@@ -0,0 +1,106 @@
+#include "aos/events/logging/buffer_encoder.h"
+
+#include <algorithm>
+#include <fstream>
+#include <string>
+
+#include "glog/logging.h"
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "aos/events/logging/buffer_encoder_param_test.h"
+
+namespace aos::logger::testing {
+
+class DummyEncoderTest : public BufferEncoderBaseTest {};
+
+// Tests that buffers are enqueued without any changes.
+TEST_F(DummyEncoderTest, QueuesBuffersAsIs) {
+ DummyEncoder encoder;
+ const auto expected = CreateAndEncode(100, &encoder);
+
+ auto queue = encoder.queue();
+ EXPECT_THAT(queue, ::testing::ElementsAreArray(expected));
+}
+
+// Checks that DummyDecoder can read into a buffer.
+TEST(DummyDecoderTest, ReadsIntoExactBuffer) {
+ static const std::string kTestString{"Just some random words."};
+
+ const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+ const std::string file_path = std::string(test_dir) + "/foo";
+ std::ofstream(file_path, std::ios::binary) << kTestString;
+
+ // Read the contents of the file into the buffer.
+ DummyDecoder dummy_decoder(file_path.c_str());
+ std::vector<uint8_t> buffer(kTestString.size());
+ const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+ ASSERT_EQ(std::string(buffer.data(), buffer.data() + count), kTestString);
+
+ for (int i = 0; i < 10; ++i) {
+ // Verify that there is no more data to read from the file.
+ ASSERT_EQ(dummy_decoder.Read(&*buffer.begin(), &*buffer.end()), 0);
+ }
+}
+
+// Checks that DummyDecoder can read into a buffer that can accommodate all the
+// data in the file.
+TEST(DummyDecoderTest, ReadsIntoLargerBuffer) {
+ static const std::string kTestString{"Just some random words."};
+
+ const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+ const std::string file_path = std::string(test_dir) + "/foo";
+ std::ofstream(file_path, std::ios::binary) << kTestString;
+
+ DummyDecoder dummy_decoder(file_path.c_str());
+ std::vector<uint8_t> buffer(100);
+ const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+ buffer.resize(count);
+ ASSERT_EQ(std::string(buffer.data(), buffer.data() + count), kTestString);
+
+ // Verify that there is no more data to read from the file.
+ ASSERT_EQ(dummy_decoder.Read(&*buffer.begin(), &*buffer.end()), 0);
+}
+
+// Checks that DummyDecoder can repeatedly read the contents of the file into a
+// smaller buffer until there is no more to read.
+TEST(DummyDecoderTest, ReadsRepeatedlyIntoSmallerBuffer) {
+ static const std::string kTestString{"Just some random words."};
+
+ const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+ const std::string file_path = std::string(test_dir) + "/foo";
+ std::ofstream(file_path, std::ios::binary) << kTestString;
+
+ DummyDecoder dummy_decoder(file_path.c_str());
+ std::vector<uint8_t> buffer((kTestString.size() + 1) / 2);
+
+ {
+ // Read into our buffer once, and verify the contents.
+ const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+ ASSERT_EQ(std::string(buffer.data(), buffer.data() + count),
+ kTestString.substr(0, buffer.size()));
+ }
+
+ {
+ // Read into the same buffer again, and verify the contents.
+ const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+ ASSERT_EQ(
+ std::string(buffer.data(), buffer.data() + count),
+ kTestString.substr(buffer.size(), kTestString.size() - buffer.size()));
+ }
+
+ // Verify that there is no more data to read from the file.
+ ASSERT_EQ(dummy_decoder.Read(&*buffer.begin(), &*buffer.end()), 0);
+}
+
+INSTANTIATE_TEST_CASE_P(
+ Dummy, BufferEncoderTest,
+ ::testing::Combine(::testing::Values([]() {
+ return std::make_unique<DummyEncoder>();
+ }),
+ ::testing::Values([](std::string_view filename) {
+ return std::make_unique<DummyDecoder>(filename);
+ }),
+ ::testing::Range(0, 100)));
+
+} // namespace aos::logger::testing
diff --git a/aos/events/logging/log_edit.cc b/aos/events/logging/log_edit.cc
index 4078f9b..9961bd8 100644
--- a/aos/events/logging/log_edit.cc
+++ b/aos/events/logging/log_edit.cc
@@ -38,7 +38,8 @@
aos::logger::SpanReader span_reader(orig_path);
CHECK(!span_reader.ReadMessage().empty()) << ": Empty header, aborting";
- aos::logger::DetachedBufferWriter buffer_writer(FLAGS_logfile);
+ aos::logger::DetachedBufferWriter buffer_writer(
+ FLAGS_logfile, std::make_unique<aos::logger::DummyEncoder>());
buffer_writer.QueueSizedFlatbuffer(&fbb);
while (true) {
@@ -47,7 +48,7 @@
break;
}
- buffer_writer.WriteSizedFlatbuffer(msg_data);
+ buffer_writer.QueueSpan(msg_data);
}
} else {
aos::logger::MessageReader reader(FLAGS_logfile);
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index def0842..a635682 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -32,7 +32,7 @@
const Node *node) {
CHECK_EQ(node, this->node());
UpdateHeader(header, uuid_, part_number_);
- data_writer_->WriteSizedFlatbuffer(header->full_span());
+ data_writer_->QueueSpan(header->full_span());
}
DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
@@ -47,7 +47,7 @@
++part_number_;
*data_writer_ = std::move(*OpenDataWriter());
UpdateHeader(header, uuid_, part_number_);
- data_writer_->WriteSizedFlatbuffer(header->full_span());
+ data_writer_->QueueSpan(header->full_span());
}
DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
@@ -81,14 +81,14 @@
const Node *node) {
if (node == this->node()) {
UpdateHeader(header, uuid_, part_number_);
- data_writer_->WriteSizedFlatbuffer(header->full_span());
+ data_writer_->QueueSpan(header->full_span());
} else {
for (std::pair<const Channel *const, DataWriter> &data_writer :
data_writers_) {
if (node == data_writer.second.node) {
UpdateHeader(header, data_writer.second.uuid,
data_writer.second.part_number);
- data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+ data_writer.second.writer->QueueSpan(header->full_span());
}
}
}
@@ -101,7 +101,7 @@
++part_number_;
*data_writer_ = std::move(*OpenDataWriter());
UpdateHeader(header, uuid_, part_number_);
- data_writer_->WriteSizedFlatbuffer(header->full_span());
+ data_writer_->QueueSpan(header->full_span());
} else {
for (std::pair<const Channel *const, DataWriter> &data_writer :
data_writers_) {
@@ -110,7 +110,7 @@
data_writer.second.rotate(data_writer.first, &data_writer.second);
UpdateHeader(header, data_writer.second.uuid,
data_writer.second.part_number);
- data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+ data_writer.second.writer->QueueSpan(header->full_span());
}
}
}
@@ -208,9 +208,11 @@
data_writer->part_number, ".bfbs");
if (!data_writer->writer) {
- data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+ data_writer->writer = std::make_unique<DetachedBufferWriter>(
+ filename, std::make_unique<DummyEncoder>());
} else {
- *data_writer->writer = DetachedBufferWriter(filename);
+ *data_writer->writer =
+ DetachedBufferWriter(filename, std::make_unique<DummyEncoder>());
}
}
@@ -221,16 +223,19 @@
channel->name()->string_view(), "/", channel->type()->string_view(),
".part", data_writer->part_number, ".bfbs");
if (!data_writer->writer) {
- data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+ data_writer->writer = std::make_unique<DetachedBufferWriter>(
+ filename, std::make_unique<DummyEncoder>());
} else {
- *data_writer->writer = DetachedBufferWriter(filename);
+ *data_writer->writer =
+ DetachedBufferWriter(filename, std::make_unique<DummyEncoder>());
}
}
std::unique_ptr<DetachedBufferWriter> MultiNodeLogNamer::OpenDataWriter() {
return std::make_unique<DetachedBufferWriter>(
absl::StrCat(base_name_, "_", node()->name()->string_view(), "_data.part",
- part_number_, ".bfbs"));
+ part_number_, ".bfbs"),
+ std::make_unique<DummyEncoder>());
}
} // namespace logger
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 7611cb8..1bef629 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -108,7 +108,8 @@
// Creates a new data writer with the new part number.
std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
return std::make_unique<DetachedBufferWriter>(
- absl::StrCat(base_name_, ".part", part_number_, ".bfbs"));
+ absl::StrCat(base_name_, ".part", part_number_, ".bfbs"),
+ std::make_unique<aos::logger::DummyEncoder>());
}
const std::string base_name_;
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 26c3825..c7f013d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1,16 +1,15 @@
#include "aos/events/logging/logfile_utils.h"
#include <fcntl.h>
-#include <limits.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/uio.h>
-#include <vector>
+#include <algorithm>
+#include <climits>
#include "absl/strings/escaping.h"
#include "aos/configuration.h"
-#include "aos/events/logging/logger_generated.h"
#include "aos/flatbuffer_merge.h"
#include "aos/util/file.h"
#include "flatbuffers/flatbuffers.h"
@@ -20,13 +19,13 @@
DEFINE_int32(flush_size, 128000,
"Number of outstanding bytes to allow before flushing to disk.");
-namespace aos {
-namespace logger {
+namespace aos::logger {
namespace chrono = std::chrono;
-DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
- : filename_(filename) {
+DetachedBufferWriter::DetachedBufferWriter(
+ std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
+ : filename_(filename), encoder_(std::move(encoder)) {
util::MkdirP(filename, 0777);
fd_ = open(std::string(filename).c_str(),
O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
@@ -35,7 +34,10 @@
}
DetachedBufferWriter::~DetachedBufferWriter() {
- Flush();
+ encoder_->Finish();
+ while (encoder_->queue_size() > 0) {
+ Flush();
+ }
PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
VLOG(1) << "Closed " << filename_;
}
@@ -50,77 +52,95 @@
DetachedBufferWriter &DetachedBufferWriter::operator=(
DetachedBufferWriter &&other) {
std::swap(filename_, other.filename_);
+ std::swap(encoder_, other.encoder_);
std::swap(fd_, other.fd_);
- std::swap(queued_size_, other.queued_size_);
- std::swap(written_size_, other.written_size_);
- std::swap(queue_, other.queue_);
std::swap(iovec_, other.iovec_);
+ std::swap(max_write_time_, other.max_write_time_);
+ std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
+ std::swap(max_write_time_messages_, other.max_write_time_messages_);
+ std::swap(total_write_time_, other.total_write_time_);
+ std::swap(total_write_count_, other.total_write_count_);
+ std::swap(total_write_messages_, other.total_write_messages_);
+ std::swap(total_write_bytes_, other.total_write_bytes_);
return *this;
}
-void DetachedBufferWriter::QueueSizedFlatbuffer(
- flatbuffers::FlatBufferBuilder *fbb) {
- QueueSizedFlatbuffer(fbb->Release());
-}
+void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
+ if (encoder_->may_bypass() && span.size() > 4096u) {
+ // Over this threshold, we'll assume it's cheaper to add an extra
+ // syscall to write the data immediately instead of copying it to
+ // enqueue.
-void DetachedBufferWriter::WriteSizedFlatbuffer(
- absl::Span<const uint8_t> span) {
- // Cheat aggressively... Write out the queued up data, and then write this
- // data once without buffering. It is hard to make a DetachedBuffer out of
- // this data, and we don't want to worry about lifetimes.
- Flush();
- iovec_.clear();
- iovec_.reserve(1);
+ // First, flush everything.
+ while (encoder_->queue_size() > 0u) {
+ Flush();
+ }
- struct iovec n;
- n.iov_base = const_cast<uint8_t *>(span.data());
- n.iov_len = span.size();
- iovec_.emplace_back(n);
-
- const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
-
- PCHECK(written == static_cast<ssize_t>(n.iov_len))
- << ": Wrote " << written << " expected " << n.iov_len;
- written_size_ += written;
-}
-
-void DetachedBufferWriter::QueueSizedFlatbuffer(
- flatbuffers::DetachedBuffer &&buffer) {
- queued_size_ += buffer.size();
- queue_.emplace_back(std::move(buffer));
-
- // Flush if we are at the max number of iovs per writev, or have written
- // enough data. Otherwise writev will fail with an invalid argument.
- if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
- queue_.size() == IOV_MAX) {
- Flush();
+ // Then, write it directly.
+ const auto start = aos::monotonic_clock::now();
+ const ssize_t written = write(fd_, span.data(), span.size());
+ const auto end = aos::monotonic_clock::now();
+ PCHECK(written >= 0) << ": write failed";
+ CHECK_EQ(written, static_cast<ssize_t>(span.size()))
+ << ": Wrote " << written << " expected " << span.size();
+ UpdateStatsForWrite(end - start, written, 1);
+ } else {
+ encoder_->Encode(CopySpanAsDetachedBuffer(span));
}
+
+ FlushAtThreshold();
}
void DetachedBufferWriter::Flush() {
- if (queue_.size() == 0u) {
+ const auto queue = encoder_->queue();
+ if (queue.empty()) {
return;
}
+
iovec_.clear();
- iovec_.reserve(queue_.size());
+ const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
+ iovec_.resize(iovec_size);
size_t counted_size = 0;
- for (size_t i = 0; i < queue_.size(); ++i) {
- struct iovec n;
- n.iov_base = queue_[i].data();
- n.iov_len = queue_[i].size();
- counted_size += n.iov_len;
- iovec_.emplace_back(std::move(n));
+ for (size_t i = 0; i < iovec_size; ++i) {
+ iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
+ iovec_[i].iov_len = queue[i].size();
+ counted_size += iovec_[i].iov_len;
}
- CHECK_EQ(counted_size, queued_size_);
+
+ const auto start = aos::monotonic_clock::now();
const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
-
- PCHECK(written == static_cast<ssize_t>(queued_size_))
- << ": Wrote " << written << " expected " << queued_size_;
- written_size_ += written;
-
- queued_size_ = 0;
- queue_.clear();
+ const auto end = aos::monotonic_clock::now();
+ PCHECK(written >= 0) << ": writev failed";
// TODO(austin): Handle partial writes in some way other than crashing...
+ CHECK_EQ(written, static_cast<ssize_t>(counted_size))
+ << ": Wrote " << written << " expected " << counted_size;
+
+ encoder_->Clear(iovec_size);
+
+ UpdateStatsForWrite(end - start, written, iovec_size);
+}
+
+void DetachedBufferWriter::UpdateStatsForWrite(
+ aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
+ if (duration > max_write_time_) {
+ max_write_time_ = duration;
+ max_write_time_bytes_ = written;
+ max_write_time_messages_ = iovec_size;
+ }
+ total_write_time_ += duration;
+ ++total_write_count_;
+ total_write_messages_ += iovec_size;
+ total_write_bytes_ += written;
+}
+
+void DetachedBufferWriter::FlushAtThreshold() {
+ // Flush if we are at the max number of iovs per writev, because there's no
+ // point queueing up any more data in memory. Also flush once we have enough
+ // data queued up.
+ while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
+ encoder_->queue_size() >= IOV_MAX) {
+ Flush();
+ }
}
flatbuffers::Offset<MessageHeader> PackMessage(
@@ -185,10 +205,10 @@
return message_header_builder.Finish();
}
-SpanReader::SpanReader(std::string_view filename)
- : filename_(filename),
- fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
- PCHECK(fd_ != -1) << ": Failed to open " << filename;
+SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
+ // Support for other kinds of decoders based on the filename should be added
+ // here.
+ decoder_ = std::make_unique<DummyDecoder>(filename);
}
absl::Span<const uint8_t> SpanReader::ReadMessage() {
@@ -244,18 +264,13 @@
}
bool SpanReader::ReadBlock() {
- if (end_of_file_) {
- return false;
- }
-
- // Appends 256k. This is enough that the read call is efficient. We don't
- // want to spend too much time reading small chunks because the syscalls for
- // that will be expensive.
+ // This is the amount of data we grab at a time. Doing larger chunks minimizes
+ // syscalls and helps decompressors batch things more efficiently.
constexpr size_t kReadSize = 256 * 1024;
// Strip off any unused data at the front.
if (consumed_data_ != 0) {
- data_.erase(data_.begin(), data_.begin() + consumed_data_);
+ data_.erase_front(consumed_data_);
consumed_data_ = 0;
}
@@ -264,15 +279,14 @@
// This should automatically grow the backing store. It won't shrink if we
// get a small chunk later. This reduces allocations when we want to append
// more data.
- data_.resize(data_.size() + kReadSize);
+ data_.resize(starting_size + kReadSize);
- ssize_t count = read(fd_, &data_[starting_size], kReadSize);
- data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
+ const size_t count =
+ decoder_->Read(data_.begin() + starting_size, data_.end());
+ data_.resize(starting_size + count);
if (count == 0) {
- end_of_file_ = true;
return false;
}
- PCHECK(count > 0);
return true;
}
@@ -1520,5 +1534,4 @@
return "";
}
-} // namespace logger
-} // namespace aos
+} // namespace aos::logger
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 9534860..f64881c 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -15,12 +15,14 @@
#include <vector>
#include "absl/types/span.h"
+#include "aos/containers/resizeable_buffer.h"
#include "aos/events/event_loop.h"
+#include "aos/events/logging/buffer_encoder.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/flatbuffers.h"
#include "flatbuffers/flatbuffers.h"
-namespace aos {
-namespace logger {
+namespace aos::logger {
enum class LogType : uint8_t {
// The message originated on this node and should be logged here.
@@ -37,10 +39,11 @@
};
// This class manages efficiently writing a sequence of detached buffers to a
-// file. It queues them up and batches the write operation.
+// file. It encodes them, queues them up, and batches the write operation.
class DetachedBufferWriter {
public:
- DetachedBufferWriter(std::string_view filename);
+ DetachedBufferWriter(std::string_view filename,
+ std::unique_ptr<DetachedBufferEncoder> encoder);
DetachedBufferWriter(DetachedBufferWriter &&other);
DetachedBufferWriter(const DetachedBufferWriter &) = delete;
@@ -55,39 +58,86 @@
// it. The main use case is updating start times after a log file starts.
void RewriteLocation(off64_t offset, absl::Span<const uint8_t> data);
- // TODO(austin): Snappy compress the log file if it ends with .snappy!
+ // Queues up a finished FlatBufferBuilder to be encoded and written.
+ //
+ // Triggers a flush if there's enough data queued up.
+ //
+ // Steals the detached buffer from it.
+ void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb) {
+ QueueSizedFlatbuffer(fbb->Release());
+ }
+ // May steal the backing storage of buffer, or may leave it alone.
+ void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
+ encoder_->Encode(std::move(buffer));
+ FlushAtThreshold();
+ }
- // Queues up a finished FlatBufferBuilder to be written. Steals the detached
- // buffer from it.
- void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
- // Queues up a detached buffer directly.
- void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
- // Writes a Span. This is not terribly optimized right now.
- void WriteSizedFlatbuffer(absl::Span<const uint8_t> span);
+ // Queues up data in span. May copy or may write it to disk immediately.
+ void QueueSpan(absl::Span<const uint8_t> span);
- // Triggers data to be provided to the kernel and written.
- void Flush();
+ // Returns the total number of bytes written and currently queued.
+ size_t total_bytes() const { return encoder_->total_bytes(); }
- // Returns the number of bytes written.
- size_t written_size() const { return written_size_; }
-
- // Returns the number of bytes written or currently queued.
- size_t total_size() const { return written_size_ + queued_size_; }
+ // The maximum time for a single write call, or 0 if none have been performed.
+ std::chrono::nanoseconds max_write_time() const { return max_write_time_; }
+ // The number of bytes in the longest write call, or -1 if none have been
+ // performed.
+ int max_write_time_bytes() const { return max_write_time_bytes_; }
+ // The number of buffers in the longest write call, or -1 if none have been
+ // performed.
+ int max_write_time_messages() const { return max_write_time_messages_; }
+ // The total time spent in write calls.
+ std::chrono::nanoseconds total_write_time() const {
+ return total_write_time_;
+ }
+ // The total number of writes which have been performed.
+ int total_write_count() const { return total_write_count_; }
+ // The total number of messages which have been written.
+ int total_write_messages() const { return total_write_messages_; }
+ // The total number of bytes which have been written.
+ int total_write_bytes() const { return total_write_bytes_; }
+ void ResetStatistics() {
+ max_write_time_ = std::chrono::nanoseconds::zero();
+ max_write_time_bytes_ = -1;
+ max_write_time_messages_ = -1;
+ total_write_time_ = std::chrono::nanoseconds::zero();
+ total_write_count_ = 0;
+ total_write_messages_ = 0;
+ total_write_bytes_ = 0;
+ }
private:
+ // Performs a single writev call with as much of the data we have queued up as
+ // possible.
+ //
+ // This will normally take all of the data we have queued up, unless an
+ // encoder has spit out a big enough chunk all at once that we can't manage
+ // all of it.
+ void Flush();
+
+ void UpdateStatsForWrite(aos::monotonic_clock::duration duration,
+ ssize_t written, int iovec_size);
+
+ // Flushes data if we've reached the threshold to do that as part of normal
+ // operation.
+ void FlushAtThreshold();
+
std::string filename_;
+ std::unique_ptr<DetachedBufferEncoder> encoder_;
int fd_ = -1;
- // Size of all the data in the queue.
- size_t queued_size_ = 0;
- size_t written_size_ = 0;
-
- // List of buffers to flush.
- std::vector<flatbuffers::DetachedBuffer> queue_;
// List of iovecs to use with writev. This is a member variable to avoid
// churn.
std::vector<struct iovec> iovec_;
+
+ std::chrono::nanoseconds max_write_time_ = std::chrono::nanoseconds::zero();
+ int max_write_time_bytes_ = -1;
+ int max_write_time_messages_ = -1;
+ std::chrono::nanoseconds total_write_time_ = std::chrono::nanoseconds::zero();
+ int total_write_count_ = 0;
+ int total_write_messages_ = 0;
+ int total_write_bytes_ = 0;
};
// Packes a message pointed to by the context into a MessageHeader.
@@ -104,8 +154,6 @@
public:
SpanReader(std::string_view filename);
- ~SpanReader() { close(fd_); }
-
std::string_view filename() const { return filename_; }
// Returns a span with the data for a message from the log file, excluding
@@ -121,50 +169,22 @@
// Allocate the 256k blocks like we do today. But, refcount them with
// shared_ptr pointed to by the messageheader that is returned. This avoids
// the copy. Need to do more benchmarking.
+ // And (Brian): Consider just mmapping the file and handing out refcounted
+ // pointers into that too.
// Reads a chunk of data into data_. Returns false if no data was read.
bool ReadBlock();
const std::string filename_;
- // File descriptor for the log file.
- int fd_ = -1;
+ // File reader and data decoder.
+ std::unique_ptr<DataDecoder> decoder_;
- // Allocator which doesn't zero initialize memory.
- template <typename T>
- struct DefaultInitAllocator {
- typedef T value_type;
-
- template <typename U>
- void construct(U *p) {
- ::new (static_cast<void *>(p)) U;
- }
-
- template <typename U, typename... Args>
- void construct(U *p, Args &&... args) {
- ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
- }
-
- T *allocate(std::size_t n) {
- return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
- }
-
- template <typename U>
- void deallocate(U *p, std::size_t /*n*/) {
- ::operator delete(static_cast<void *>(p));
- }
- };
-
- // Vector to read into. This uses an allocator which doesn't zero
- // initialize the memory.
- std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
+ // Vector to read into.
+ ResizeableBuffer data_;
// Amount of data consumed already in data_.
size_t consumed_data_ = 0;
-
- // Cached bit for if we have reached the end of the file. Otherwise we will
- // hammer on the kernel asking for more data each time we send.
- bool end_of_file_ = false;
};
// Class which handles reading the header and messages from the log file. This
@@ -668,7 +688,6 @@
// a single node.
std::string MaybeNodeName(const Node *);
-} // namespace logger
-} // namespace aos
+} // namespace aos::logger
#endif // AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index bab006a..33d50d3 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -359,6 +359,16 @@
flatbuffers::DetachedBuffer buffer_;
};
+inline flatbuffers::DetachedBuffer CopySpanAsDetachedBuffer(
+ absl::Span<const uint8_t> span) {
+ // Copy the data from the span.
+ uint8_t *buf = flatbuffers::DefaultAllocator().allocate(span.size());
+ memcpy(buf, span.data(), span.size());
+ // Then give it to a DetachedBuffer to manage.
+ return flatbuffers::DetachedBuffer(nullptr, false, buf, span.size(), buf,
+ span.size());
+}
+
} // namespace aos
#endif // AOS_FLATBUFFERS_H_