Encode flatbuffers directly into the encoder when logging
We were running out of memory when running for many hours. Initial
debugging looked like it was a heap fragmentation issue. Tracking the
allocated memory using the malloc hooks wasn't showing any growth of
memory. The heap was growing though.
Instead of allocating a FlatBufferBuilder/DetachedBuffer for each
message to be logged, we can instead have the BufferEncoder provide
memory to write to, and have it only alloate that buffer space once, and
allocate it to the maximum size that a writer might see.
Change-Id: I046bd2422aea368867b0c63cee7d04c6033fe724
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 3b82e98..82e7061 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -117,6 +117,8 @@
deps = [
":logger_fbs",
"//aos:configuration_fbs",
+ "//aos:flatbuffers",
+ "//aos/containers:resizeable_buffer",
"@com_github_google_flatbuffers//:flatbuffers",
"@com_github_google_glog//:glog",
"@com_google_absl//absl/types:span",
@@ -397,6 +399,11 @@
srcs = [
"logger_main.cc",
],
+ copts = select({
+ "//tools:cpu_k8": ["-DLZMA=1"],
+ "//tools:cpu_arm64": ["-DLZMA=1"],
+ "//conditions:default": [],
+ }),
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
deps = [
diff --git a/aos/events/logging/buffer_encoder.cc b/aos/events/logging/buffer_encoder.cc
index 6ef61d4..ae20440 100644
--- a/aos/events/logging/buffer_encoder.cc
+++ b/aos/events/logging/buffer_encoder.cc
@@ -4,39 +4,60 @@
#include <sys/stat.h>
#include <sys/types.h>
+#include "aos/flatbuffers.h"
#include "glog/logging.h"
namespace aos::logger {
-void DummyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
- CHECK(in.data()) << ": Encode called with nullptr.";
+DummyEncoder::DummyEncoder(size_t max_buffer_size) {
+ // TODO(austin): This is going to end up writing > 128k chunks, not 128k
+ // chunks exactly. If we really want to, we could make it always write 128k
+ // chunks by only exposing n * 128k chunks as we go. This might improve write
+ // performance, then again, it might have no effect if the kernel is combining
+ // writes...
+ constexpr size_t kWritePageSize = 128 * 1024;
+ // Round up to the nearest page size.
+ input_buffer_.reserve(
+ ((max_buffer_size + kWritePageSize - 1) / kWritePageSize) *
+ kWritePageSize);
+ return_queue_.resize(1);
+}
- total_bytes_ += in.size();
- queue_.emplace_back(std::move(in));
+bool DummyEncoder::HasSpace(size_t request) const {
+ return request + input_buffer_.size() < input_buffer_.capacity();
+}
+
+void DummyEncoder::Encode(Copier *copy) {
+ DCHECK(HasSpace(copy->size()));
+ const size_t input_buffer_initial_size = input_buffer_.size();
+
+ input_buffer_.resize(input_buffer_initial_size + copy->size());
+ const size_t written_size =
+ copy->Copy(input_buffer_.data() + input_buffer_initial_size);
+ DCHECK_EQ(written_size, copy->size());
+
+ total_bytes_ += written_size;
}
void DummyEncoder::Clear(const int n) {
CHECK_GE(n, 0);
CHECK_LE(static_cast<size_t>(n), queue_size());
- queue_.erase(queue_.begin(), queue_.begin() + n);
+ if (n != 0) {
+ input_buffer_.resize(0u);
+ }
}
-std::vector<absl::Span<const uint8_t>> DummyEncoder::queue() const {
- std::vector<absl::Span<const uint8_t>> queue;
- queue.reserve(queue_.size());
- for (const auto &buffer : queue_) {
- queue.emplace_back(buffer.data(), buffer.size());
+absl::Span<const absl::Span<const uint8_t>> DummyEncoder::queue() {
+ if (input_buffer_.size() != 0) {
+ return_queue_[0] =
+ absl::Span<const uint8_t>(input_buffer_.data(), input_buffer_.size());
+ return return_queue_;
+ } else {
+ return absl::Span<const absl::Span<const uint8_t>>();
}
- return queue;
}
-size_t DummyEncoder::queued_bytes() const {
- size_t bytes = 0;
- for (const auto &buffer : queue_) {
- bytes += buffer.size();
- }
- return bytes;
-}
+size_t DummyEncoder::queued_bytes() const { return input_buffer_.size(); }
DummyDecoder::DummyDecoder(std::string_view filename)
: filename_(filename), fd_(open(filename_.c_str(), O_RDONLY | O_CLOEXEC)) {
diff --git a/aos/events/logging/buffer_encoder.h b/aos/events/logging/buffer_encoder.h
index 235a49c..4cd661d 100644
--- a/aos/events/logging/buffer_encoder.h
+++ b/aos/events/logging/buffer_encoder.h
@@ -2,27 +2,57 @@
#define AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
#include "absl/types/span.h"
+#include "aos/containers/resizeable_buffer.h"
#include "aos/events/logging/logger_generated.h"
#include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
namespace aos::logger {
-// Interface to encode DetachedBuffers as they are written to a file.
-//
-// The interface to a file is represented as a series of buffers, appropriate to
-// pass to writev(2). The backing storage for these is managed internally so
-// subclasses can handle resizing/moving as desired. Implementations must
-// enqueue as many of these backing buffers as demanded, leaving it up to the
-// caller to write them to a file when desired.
-class DetachedBufferEncoder {
+// Interface to encode data as it is written to a file.
+class DataEncoder {
public:
- virtual ~DetachedBufferEncoder() = default;
+ virtual ~DataEncoder() = default;
- // Encodes and enqueues the given DetachedBuffer.
- //
- // Note that in may be moved-from, or it may be left unchanged, depending on
- // what makes the most sense for a given implementation.
- virtual void Encode(flatbuffers::DetachedBuffer &&in) = 0;
+ // Interface to copy data into a buffer.
+ class Copier {
+ public:
+ Copier(size_t size) : size_(size) {}
+
+ // Returns the data this will write.
+ size_t size() const { return size_; }
+
+ // Writes size() bytes to data, and returns the data written.
+ [[nodiscard]] virtual size_t Copy(uint8_t *data) = 0;
+
+ private:
+ size_t size_;
+ };
+
+ // Coppies a span. The span must have a longer lifetime than the coppier is
+ // being used.
+ class SpanCopier : public Copier {
+ public:
+ SpanCopier(absl::Span<const uint8_t> data)
+ : Copier(data.size()), data_(data) {
+ CHECK(data_.data());
+ }
+
+ size_t Copy(uint8_t *data) final {
+ std::memcpy(data, data_.data(), data_.size());
+ return data_.size();
+ }
+
+ private:
+ const absl::Span<const uint8_t> data_;
+ };
+
+ // Returns true if there is space in the buffer for the next request, or if
+ // the output needs to be flushed.
+ virtual bool HasSpace(size_t request) const = 0;
+
+ // Encodes and enqueues the given data encoder.
+ virtual void Encode(Copier *copy) = 0;
// If this returns true, the encoder may be bypassed by writing directly to
// the file.
@@ -37,8 +67,9 @@
// Clears the first n encoded buffers from the queue.
virtual void Clear(int n) = 0;
- // Returns a view of the queue of encoded buffers.
- virtual std::vector<absl::Span<const uint8_t>> queue() const = 0;
+ // Returns a view of the queue of encoded buffers. Valid until any other
+ // method on this class is called.
+ virtual absl::Span<const absl::Span<const uint8_t>> queue() = 0;
// Returns the total number of of bytes currently queued up.
virtual size_t queued_bytes() const = 0;
@@ -53,28 +84,32 @@
// This class does not encode the data. It just claims ownership of the raw data
// and queues it up as is.
-class DummyEncoder final : public DetachedBufferEncoder {
+class DummyEncoder final : public DataEncoder {
public:
- DummyEncoder() {}
+ DummyEncoder(size_t max_buffer_size);
DummyEncoder(const DummyEncoder &) = delete;
DummyEncoder(DummyEncoder &&other) = delete;
DummyEncoder &operator=(const DummyEncoder &) = delete;
DummyEncoder &operator=(DummyEncoder &&other) = delete;
~DummyEncoder() override = default;
- // No encoding happens, the raw data is queued up as is.
- void Encode(flatbuffers::DetachedBuffer &&in) final;
+ bool HasSpace(size_t request) const final;
+ void Encode(Copier *copy) final;
bool may_bypass() const final { return true; }
void Finish() final {}
void Clear(int n) final;
- std::vector<absl::Span<const uint8_t>> queue() const final;
+ absl::Span<const absl::Span<const uint8_t>> queue() final;
size_t queued_bytes() const final;
size_t total_bytes() const final { return total_bytes_; }
- size_t queue_size() const final { return queue_.size(); }
+ size_t queue_size() const final {
+ return input_buffer_.size() != 0 ? 1u : 0u;
+ }
private:
- std::vector<flatbuffers::DetachedBuffer> queue_;
size_t total_bytes_ = 0;
+
+ ResizeableBuffer input_buffer_;
+ std::vector<absl::Span<const uint8_t>> return_queue_;
};
// Interface to decode chunks of data. Implementations of this interface will
diff --git a/aos/events/logging/buffer_encoder_param_test.h b/aos/events/logging/buffer_encoder_param_test.h
index 871e25e..e6c4867 100644
--- a/aos/events/logging/buffer_encoder_param_test.h
+++ b/aos/events/logging/buffer_encoder_param_test.h
@@ -14,24 +14,55 @@
namespace aos::logger::testing {
-// Contains some helpful infrastructure for testing DetachedBufferEncoder
+// Contains some helpful infrastructure for testing DataEncoder
// implementations.
class BufferEncoderBaseTest : public ::testing::Test {
public:
+ static constexpr size_t kMaxMessageSize = 2 * 1024 * 1024;
+
+ class DetachedBufferCopier : public DataEncoder::Copier {
+ public:
+ DetachedBufferCopier(flatbuffers::DetachedBuffer &&data)
+ : DataEncoder::Copier(data.size()), data_(std::move(data)) {}
+
+ size_t Copy(uint8_t *data) final {
+ std::memcpy(data, data_.data(), data_.size());
+ return data_.size();
+ }
+
+ private:
+ const flatbuffers::DetachedBuffer data_;
+ };
+
// Creates and encodes n random buffers, returning a copy of the encoded data.
- std::vector<std::vector<uint8_t>> CreateAndEncode(
- int n, DetachedBufferEncoder *encoder) {
+ std::vector<std::vector<uint8_t>> CreateAndEncode(int n,
+ DataEncoder *encoder) {
std::vector<std::vector<uint8_t>> result;
for (int i = 0; i < n; i++) {
flatbuffers::DetachedBuffer buffer = CreateRandomBuffer();
result.emplace_back(buffer.data(), buffer.data() + buffer.size());
- encoder->Encode(std::move(buffer));
+ LOG(INFO) << "Encoding " << buffer.size();
+ CHECK(encoder->HasSpace(buffer.size()))
+ << ": The test isn't smart enough to flush, figure out what to do. "
+ "Has "
+ << encoder->queued_bytes() << ", encoding " << buffer.size();
+
+ DetachedBufferCopier coppier(std::move(buffer));
+ encoder->Encode(&coppier);
}
return result;
}
// Returns the total size of a vector full of objects with a size() method.
template <typename T>
+ static size_t TotalSize(const absl::Span<T> buffers) {
+ size_t result = 0;
+ for (const auto &v : buffers) {
+ result += v.size();
+ }
+ return result;
+ }
+ template <typename T>
static size_t TotalSize(const std::vector<T> &buffers) {
size_t result = 0;
for (const auto &v : buffers) {
@@ -86,7 +117,7 @@
std::mt19937(::aos::testing::RandomSeed())};
};
-// Tests some generic properties for any DetachedBufferEncoder+DataDecoder
+// Tests some generic properties for any DataEncoder+DataDecoder
// implementation pair.
//
// First and second test parameters are methods to create instances of the
@@ -95,13 +126,13 @@
class BufferEncoderTest
: public BufferEncoderBaseTest,
public ::testing::WithParamInterface<std::tuple<
- std::function<std::unique_ptr<DetachedBufferEncoder>()>,
+ std::function<std::unique_ptr<DataEncoder>(size_t)>,
std::function<std::unique_ptr<DataDecoder>(std::string_view)>, int>> {
public:
BufferEncoderTest() { Reseed(std::get<2>(GetParam())); }
- std::unique_ptr<DetachedBufferEncoder> MakeEncoder() const {
- return std::get<0>(GetParam())();
+ std::unique_ptr<DataEncoder> MakeEncoder() const {
+ return std::get<0>(GetParam())(BufferEncoderBaseTest::kMaxMessageSize);
}
std::unique_ptr<DataDecoder> MakeDecoder(std::string_view filename) const {
return std::get<1>(GetParam())(filename);
diff --git a/aos/events/logging/buffer_encoder_test.cc b/aos/events/logging/buffer_encoder_test.cc
index 5a9b85f..8e12e37 100644
--- a/aos/events/logging/buffer_encoder_test.cc
+++ b/aos/events/logging/buffer_encoder_test.cc
@@ -13,13 +13,26 @@
class DummyEncoderTest : public BufferEncoderBaseTest {};
-// Tests that buffers are enqueued without any changes.
+// Tests that buffers are concatenated without being modified.
TEST_F(DummyEncoderTest, QueuesBuffersAsIs) {
- DummyEncoder encoder;
+ DummyEncoder encoder(BufferEncoderBaseTest::kMaxMessageSize);
const auto expected = CreateAndEncode(100, &encoder);
+ std::vector<uint8_t> data = Flatten(expected);
auto queue = encoder.queue();
- EXPECT_THAT(queue, ::testing::ElementsAreArray(expected));
+ ASSERT_EQ(queue.size(), 1u);
+ EXPECT_EQ(queue[0], absl::Span<const uint8_t>(data));
+}
+
+// Tests that buffers are concatenated without being modified.
+TEST_F(DummyEncoderTest, CoppiesBuffersAsIs) {
+ DummyEncoder encoder(BufferEncoderBaseTest::kMaxMessageSize);
+ const auto expected = CreateAndEncode(100, &encoder);
+ std::vector<uint8_t> data = Flatten(expected);
+
+ auto queue = encoder.queue();
+ ASSERT_EQ(queue.size(), 1u);
+ EXPECT_EQ(queue[0], absl::Span<const uint8_t>(data));
}
// Checks that DummyDecoder can read into a buffer.
@@ -94,8 +107,8 @@
INSTANTIATE_TEST_SUITE_P(
Dummy, BufferEncoderTest,
- ::testing::Combine(::testing::Values([]() {
- return std::make_unique<DummyEncoder>();
+ ::testing::Combine(::testing::Values([](size_t max_buffer_size) {
+ return std::make_unique<DummyEncoder>(max_buffer_size);
}),
::testing::Values([](std::string_view filename) {
return std::make_unique<DummyDecoder>(filename);
diff --git a/aos/events/logging/log_edit.cc b/aos/events/logging/log_edit.cc
index b0f8ba5..969d59c 100644
--- a/aos/events/logging/log_edit.cc
+++ b/aos/events/logging/log_edit.cc
@@ -18,6 +18,11 @@
"If provided, this is the path to the JSON with the log file header. If "
"not provided, _header.json will be appended to --logfile.");
+DEFINE_int32(
+ max_message_size, 128 * 1024 * 1024,
+ "Max size of a message to be written. This sets the buffers inside "
+ "the encoders.");
+
int main(int argc, char **argv) {
gflags::SetUsageMessage(R"(This tool lets us manipulate log files.)");
aos::InitGoogle(&argc, &argv);
@@ -44,7 +49,8 @@
CHECK(!span_reader.ReadMessage().empty()) << ": Empty header, aborting";
aos::logger::DetachedBufferWriter buffer_writer(
- FLAGS_logfile, std::make_unique<aos::logger::DummyEncoder>());
+ FLAGS_logfile,
+ std::make_unique<aos::logger::DummyEncoder>(FLAGS_max_message_size));
buffer_writer.QueueSpan(header.span());
while (true) {
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index a04d0ea..120bd79 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -20,14 +20,16 @@
NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
const Node *logger_node,
std::function<void(NewDataWriter *)> reopen,
- std::function<void(NewDataWriter *)> close)
+ std::function<void(NewDataWriter *)> close,
+ size_t max_message_size)
: node_(node),
node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
logger_node_index_(
configuration::GetNodeIndex(log_namer->configuration_, logger_node)),
log_namer_(log_namer),
reopen_(std::move(reopen)),
- close_(std::move(close)) {
+ close_(std::move(close)),
+ max_message_size_(max_message_size) {
state_.resize(configuration::NodesCount(log_namer->configuration_));
CHECK_LT(node_index_, state_.size());
}
@@ -179,9 +181,9 @@
}
}
-void NewDataWriter::QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
- const UUID &source_node_boot_uuid,
- aos::monotonic_clock::time_point now) {
+void NewDataWriter::CopyMessage(DataEncoder::Copier *coppier,
+ const UUID &source_node_boot_uuid,
+ aos::monotonic_clock::time_point now) {
// Trigger a reboot if we detect the boot UUID change.
UpdateBoot(source_node_boot_uuid);
@@ -202,7 +204,7 @@
CHECK(writer);
CHECK(header_written_) << ": Attempting to write message before header to "
<< writer->filename();
- writer->QueueSizedFlatbuffer(fbb, now);
+ writer->CopyMessage(coppier, now);
}
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
@@ -228,6 +230,16 @@
CHECK_EQ(state_[node_index_].boot_uuid,
UUID::FromString(header.message().source_node_boot_uuid()));
if (!writer) {
+ // Since we haven't opened the first time, it's still not too late to update
+ // the max message size. Make sure the header fits.
+ //
+ // This won't work well on reboots, but the structure of the header is fixed
+ // by that point in time, so it's size is fixed too.
+ //
+ // Most of the time, the minimum buffer size inside the encoder of around
+ // 128k will make this a non-issue.
+ UpdateMaxMessageSize(header.span().size());
+
reopen_(this);
}
@@ -235,10 +247,8 @@
<< aos::FlatbufferToJson(
header, {.multi_line = false, .max_vector_size = 100});
- // TODO(austin): This triggers a dummy allocation that we don't need as part
- // of releasing. Can we skip it?
CHECK(writer);
- writer->QueueSizedFlatbuffer(header.Release());
+ writer->QueueSpan(header.span());
header_written_ = true;
monotonic_start_time_ = log_namer_->monotonic_start_time(
node_index_, state_[node_index_].boot_uuid);
@@ -593,9 +603,10 @@
base_name_, separator, config_sha256, ".bfbs", extension_, temp_suffix_);
std::unique_ptr<DetachedBufferWriter> writer =
- std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
+ std::make_unique<DetachedBufferWriter>(
+ filename, encoder_factory_(header->span().size()));
- writer->QueueSizedFlatbuffer(header->Release());
+ writer->QueueSpan(header->span());
if (!writer->ran_out_of_space()) {
all_filenames_.emplace_back(
@@ -624,8 +635,10 @@
// generated if it is sendable on this node.
if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
if (!data_writer_) {
- OpenDataWriter();
+ MakeDataWriter();
}
+ data_writer_->UpdateMaxMessageSize(PackMessageSize(
+ LogType::kLogRemoteMessage, channel->max_size()));
return data_writer_.get();
}
@@ -647,9 +660,8 @@
[this, channel](NewDataWriter *data_writer) {
OpenWriter(channel, data_writer);
},
- [this](NewDataWriter *data_writer) {
- CloseWriter(&data_writer->writer);
- });
+ [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
+ PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
return &(
data_writers_.emplace(channel, std::move(data_writer)).first->second);
}
@@ -672,9 +684,8 @@
[this, channel](NewDataWriter *data_writer) {
OpenForwardedTimestampWriter(channel, data_writer);
},
- [this](NewDataWriter *data_writer) {
- CloseWriter(&data_writer->writer);
- });
+ [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
+ PackRemoteMessageSize());
return &(
data_writers_.emplace(channel, std::move(data_writer)).first->second);
}
@@ -690,8 +701,10 @@
}
if (!data_writer_) {
- OpenDataWriter();
+ MakeDataWriter();
}
+ data_writer_->UpdateMaxMessageSize(
+ PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
return data_writer_.get();
}
@@ -724,7 +737,8 @@
absl::StrCat("timestamps", channel->name()->string_view(), "/",
channel->type()->string_view(), ".part",
data_writer->parts_index(), ".bfbs", extension_);
- CreateBufferWriter(filename, &data_writer->writer);
+ CreateBufferWriter(filename, data_writer->max_message_size(),
+ &data_writer->writer);
}
void MultiNodeLogNamer::OpenWriter(const Channel *channel,
@@ -733,10 +747,11 @@
CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
channel->name()->string_view(), "/", channel->type()->string_view(),
".part", data_writer->parts_index(), ".bfbs", extension_);
- CreateBufferWriter(filename, &data_writer->writer);
+ CreateBufferWriter(filename, data_writer->max_message_size(),
+ &data_writer->writer);
}
-void MultiNodeLogNamer::OpenDataWriter() {
+void MultiNodeLogNamer::MakeDataWriter() {
if (!data_writer_) {
data_writer_ = std::make_unique<NewDataWriter>(
this, node_, node_,
@@ -747,16 +762,20 @@
}
absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
extension_);
- CreateBufferWriter(name, &writer->writer);
+ CreateBufferWriter(name, writer->max_message_size(), &writer->writer);
},
[this](NewDataWriter *data_writer) {
CloseWriter(&data_writer->writer);
- });
+ },
+ // Default size is 0 so it will be obvious if something doesn't fix it
+ // afterwards.
+ 0);
}
}
void MultiNodeLogNamer::CreateBufferWriter(
- std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
+ std::string_view path, size_t max_message_size,
+ std::unique_ptr<DetachedBufferWriter> *destination) {
if (ran_out_of_space_) {
// Refuse to open any new files, which might skip data. Any existing files
// are in the same folder, which means they're on the same filesystem, which
@@ -778,8 +797,8 @@
DetachedBufferWriter::already_out_of_space_t());
return;
}
- *destination =
- std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
+ *destination = std::make_unique<DetachedBufferWriter>(
+ filename, encoder_factory_(max_message_size));
if (!destination->get()->ran_out_of_space()) {
all_filenames_.emplace_back(path);
}
@@ -793,7 +812,8 @@
return;
}
- *destination->get() = DetachedBufferWriter(filename, encoder_factory_());
+ *destination->get() =
+ DetachedBufferWriter(filename, encoder_factory_(max_message_size));
if (!destination->get()->ran_out_of_space()) {
all_filenames_.emplace_back(path);
}
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 00ed63f..4e8c990 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -38,7 +38,16 @@
// close is called to close that file and extract any statistics.
NewDataWriter(LogNamer *log_namer, const Node *node, const Node *logger_node,
std::function<void(NewDataWriter *)> reopen,
- std::function<void(NewDataWriter *)> close);
+ std::function<void(NewDataWriter *)> close,
+ size_t max_message_size);
+
+ void UpdateMaxMessageSize(size_t new_size) {
+ if (new_size > max_message_size_) {
+ CHECK(!header_written_);
+ max_message_size_ = new_size;
+ }
+ }
+ size_t max_message_size() const { return max_message_size_; }
NewDataWriter(NewDataWriter &&other) = default;
aos::logger::NewDataWriter &operator=(NewDataWriter &&other) = default;
@@ -58,10 +67,10 @@
bool reliable,
monotonic_clock::time_point monotonic_timestamp_time =
monotonic_clock::min_time);
- // Queues up a message with the provided boot UUID.
- void QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
- const UUID &node_boot_uuid,
- aos::monotonic_clock::time_point now);
+ // Coppies a message with the provided boot UUID.
+ void CopyMessage(DataEncoder::Copier *coppier,
+ const UUID &source_node_boot_uuid,
+ aos::monotonic_clock::time_point now);
// Updates the current boot for the source node. This is useful when you want
// to queue a message that may trigger a reboot rotation, but then need to
@@ -148,6 +157,8 @@
bool header_written_ = false;
std::vector<State> state_;
+
+ size_t max_message_size_;
};
// Interface describing how to name, track, and add headers to log file parts.
@@ -313,11 +324,12 @@
temp_suffix_ = temp_suffix;
}
- // Sets the function for creating encoders.
+ // Sets the function for creating encoders. The argument is the max message
+ // size (including headers) that will be written into this encoder.
//
// Defaults to just creating DummyEncoders.
void set_encoder_factory(
- std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory) {
+ std::function<std::unique_ptr<DataEncoder>(size_t)> encoder_factory) {
encoder_factory_ = std::move(encoder_factory);
}
@@ -449,9 +461,9 @@
void OpenWriter(const Channel *channel, NewDataWriter *data_writer);
// Opens the main data writer file for this node responsible for data_writer_.
- void OpenDataWriter();
+ void MakeDataWriter();
- void CreateBufferWriter(std::string_view path,
+ void CreateBufferWriter(std::string_view path, size_t max_message_size,
std::unique_ptr<DetachedBufferWriter> *destination);
void RenameTempFile(DetachedBufferWriter *destination);
@@ -479,8 +491,10 @@
std::vector<std::string> all_filenames_;
std::string temp_suffix_;
- std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory_ =
- []() { return std::make_unique<DummyEncoder>(); };
+ std::function<std::unique_ptr<DataEncoder>(size_t)> encoder_factory_ =
+ [](size_t max_message_size) {
+ return std::make_unique<DummyEncoder>(max_message_size);
+ };
std::string extension_;
// Storage for statistics from previously-rotated DetachedBufferWriters.
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 36116a3..ab7d3c6 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -759,6 +759,64 @@
}
}
+// Class to copy a context into the provided buffer.
+class ContextDataCopier : public DataEncoder::Copier {
+ public:
+ ContextDataCopier(const Context &context, int channel_index,
+ LogType log_type, EventLoop *event_loop)
+ : DataEncoder::Copier(PackMessageSize(log_type, context.size)),
+ context_(context),
+ channel_index_(channel_index),
+ log_type_(log_type),
+ event_loop_(event_loop) {}
+
+ monotonic_clock::time_point end_time() const { return end_time_; }
+
+ size_t Copy(uint8_t *data) final {
+ size_t result =
+ PackMessageInline(data, context_, channel_index_, log_type_);
+ end_time_ = event_loop_->monotonic_now();
+ return result;
+ }
+
+ private:
+ const Context &context_;
+ const int channel_index_;
+ const LogType log_type_;
+ EventLoop *event_loop_;
+ monotonic_clock::time_point end_time_;
+};
+
+// Class to copy a RemoteMessage into the provided buffer.
+class RemoteMessageCopier : public DataEncoder::Copier {
+ public:
+ RemoteMessageCopier(
+ const message_bridge::RemoteMessage *message, int channel_index,
+ aos::monotonic_clock::time_point monotonic_timestamp_time,
+ EventLoop *event_loop)
+ : DataEncoder::Copier(PackRemoteMessageSize()),
+ message_(message),
+ channel_index_(channel_index),
+ monotonic_timestamp_time_(monotonic_timestamp_time),
+ event_loop_(event_loop) {}
+
+ monotonic_clock::time_point end_time() const { return end_time_; }
+
+ size_t Copy(uint8_t *data) final {
+ size_t result = PackRemoteMessageInline(data, message_, channel_index_,
+ monotonic_timestamp_time_);
+ end_time_ = event_loop_->monotonic_now();
+ return result;
+ }
+
+ private:
+ const message_bridge::RemoteMessage *message_;
+ int channel_index_;
+ aos::monotonic_clock::time_point monotonic_timestamp_time_;
+ EventLoop *event_loop_;
+ monotonic_clock::time_point end_time_;
+};
+
void Logger::WriteData(NewDataWriter *writer, const FetcherStruct &f) {
if (writer != nullptr) {
const UUID source_node_boot_uuid =
@@ -767,25 +825,17 @@
: event_loop_->boot_uuid();
// Write!
const auto start = event_loop_->monotonic_now();
- flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
- max_header_size_);
- fbb.ForceDefaults(true);
- fbb.FinishSizePrefixed(
- PackMessage(&fbb, f.fetcher->context(), f.channel_index, f.log_type));
- const auto end = event_loop_->monotonic_now();
- RecordCreateMessageTime(start, end, f);
+ ContextDataCopier coppier(f.fetcher->context(), f.channel_index,
+ f.log_type, event_loop_);
- max_header_size_ =
- std::max(max_header_size_, fbb.GetSize() - f.fetcher->context().size);
- writer->QueueMessage(&fbb, source_node_boot_uuid, end);
+ writer->CopyMessage(&coppier, source_node_boot_uuid, start);
+ RecordCreateMessageTime(start, coppier.end_time(), f);
VLOG(2) << "Wrote data as node " << FlatbufferToJson(node_)
<< " for channel "
<< configuration::CleanedChannelToString(f.fetcher->channel())
- << " to " << writer->filename() << " data "
- << FlatbufferToJson(flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
+ << " to " << writer->filename();
}
}
@@ -793,29 +843,24 @@
const FetcherStruct &f) {
if (timestamp_writer != nullptr) {
// And now handle timestamps.
- const auto start = event_loop_->monotonic_now();
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
-
- fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- f.channel_index,
- LogType::kLogDeliveryTimeOnly));
- const auto end = event_loop_->monotonic_now();
- RecordCreateMessageTime(start, end, f);
// Tell our writer that we know something about the remote boot.
timestamp_writer->UpdateRemote(
f.data_node_index, f.fetcher->context().source_boot_uuid,
f.fetcher->context().monotonic_remote_time,
f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
- timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
+
+ const auto start = event_loop_->monotonic_now();
+ ContextDataCopier coppier(f.fetcher->context(), f.channel_index,
+ LogType::kLogDeliveryTimeOnly, event_loop_);
+
+ timestamp_writer->CopyMessage(&coppier, event_loop_->boot_uuid(), start);
+ RecordCreateMessageTime(start, coppier.end_time(), f);
VLOG(2) << "Wrote timestamps as node " << FlatbufferToJson(node_)
<< " for channel "
<< configuration::CleanedChannelToString(f.fetcher->channel())
- << " to " << timestamp_writer->filename() << " timestamp "
- << FlatbufferToJson(flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
+ << " to " << timestamp_writer->filename() << " timestamp";
}
}
@@ -825,20 +870,10 @@
const auto start = event_loop_->monotonic_now();
// And now handle the special message contents channel. Copy the
// message into a FlatBufferBuilder and save it to disk.
- //
- // TODO(austin): We can be more efficient here when we start to
- // care...
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
-
const RemoteMessage *msg =
flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
- // TODO(austin): This needs to check the channel_index and confirm
- // that it should be logged before squirreling away the timestamp to
- // disk. We don't want to log irrelevant timestamps.
-
// Translate from the channel index that the event loop uses to the
// channel index in the log file.
const int channel_index =
@@ -847,11 +882,6 @@
const aos::monotonic_clock::time_point monotonic_timestamp_time =
f.fetcher->context().monotonic_event_time;
- fbb.FinishSizePrefixed(
- PackRemoteMessage(&fbb, msg, channel_index, monotonic_timestamp_time));
- const auto end = event_loop_->monotonic_now();
- RecordCreateMessageTime(start, end, f);
-
// Timestamps tell us information about what happened too!
// Capture any reboots so UpdateRemote is properly recorded.
contents_writer->UpdateBoot(UUID::FromVector(msg->boot_uuid()));
@@ -871,8 +901,13 @@
chrono::nanoseconds(msg->monotonic_sent_time())),
reliable, monotonic_timestamp_time);
- contents_writer->QueueMessage(&fbb, UUID::FromVector(msg->boot_uuid()),
- end);
+ RemoteMessageCopier coppier(msg, channel_index, monotonic_timestamp_time,
+ event_loop_);
+
+ contents_writer->CopyMessage(&coppier, UUID::FromVector(msg->boot_uuid()),
+ start);
+
+ RecordCreateMessageTime(start, coppier.end_time(), f);
}
}
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
index 51b46ae..3e1832e 100644
--- a/aos/events/logging/log_writer.h
+++ b/aos/events/logging/log_writer.h
@@ -321,10 +321,6 @@
// Last time that data was written for all channels to disk.
monotonic_clock::time_point last_synchronized_time_;
- // Max size that the header has consumed. This much extra data will be
- // reserved in the builder to avoid reallocating.
- size_t max_header_size_ = 0;
-
// If true, write the message header into a separate file.
bool separate_config_ = true;
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 7f32f44..4c06f0a 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -29,7 +29,7 @@
#include "aos/events/logging/lzma_encoder.h"
#endif
-DEFINE_int32(flush_size, 128000,
+DEFINE_int32(flush_size, 128 * 1024,
"Number of outstanding bytes to allow before flushing to disk.");
DEFINE_double(
flush_period, 5.0,
@@ -76,13 +76,13 @@
}
} // namespace
-DetachedBufferWriter::DetachedBufferWriter(
- std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
+DetachedBufferWriter::DetachedBufferWriter(std::string_view filename,
+ std::unique_ptr<DataEncoder> encoder)
: filename_(filename), encoder_(std::move(encoder)) {
if (!util::MkdirPIfSpace(filename, 0777)) {
ran_out_of_space_ = true;
} else {
- fd_ = open(std::string(filename).c_str(),
+ fd_ = open(filename_.c_str(),
O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
if (fd_ == -1 && errno == ENOSPC) {
ran_out_of_space_ = true;
@@ -136,29 +136,27 @@
return;
}
- aos::monotonic_clock::time_point now;
- if (encoder_->may_bypass() && span.size() > 4096u) {
- // Over this threshold, we'll assume it's cheaper to add an extra
- // syscall to write the data immediately instead of copying it to
- // enqueue.
+ if (!encoder_->HasSpace(span.size())) {
+ Flush();
+ CHECK(encoder_->HasSpace(span.size()));
+ }
+ DataEncoder::SpanCopier coppier(span);
+ encoder_->Encode(&coppier);
+ FlushAtThreshold(aos::monotonic_clock::now());
+}
- // First, flush everything.
- while (encoder_->queue_size() > 0u) {
- Flush();
- }
-
- // Then, write it directly.
- const auto start = aos::monotonic_clock::now();
- const ssize_t written = write(fd_, span.data(), span.size());
- const auto end = aos::monotonic_clock::now();
- HandleWriteReturn(written, span.size());
- UpdateStatsForWrite(end - start, written, 1);
- now = end;
- } else {
- encoder_->Encode(CopySpanAsDetachedBuffer(span));
- now = aos::monotonic_clock::now();
+void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *coppier,
+ aos::monotonic_clock::time_point now) {
+ if (ran_out_of_space_) {
+ return;
}
+ if (!encoder_->HasSpace(coppier->size())) {
+ Flush();
+ CHECK(encoder_->HasSpace(coppier->size()));
+ }
+
+ encoder_->Encode(coppier);
FlushAtThreshold(now);
}
@@ -575,11 +573,11 @@
}
flatbuffers::uoffset_t PackMessageSize(LogType log_type,
- const Context &context) {
+ size_t data_size) {
static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
"Update size logic please.");
const flatbuffers::uoffset_t aligned_data_length =
- ((context.size + 7) & 0xfffffff8u);
+ ((data_size + 7) & 0xfffffff8u);
switch (log_type) {
case LogType::kLogDeliveryTimeOnly:
return PackMessageHeaderSize(log_type);
@@ -596,8 +594,11 @@
size_t PackMessageInline(uint8_t *buffer, const Context &context,
int channel_index, LogType log_type) {
+ // TODO(austin): Figure out how to copy directly from shared memory instead of
+ // first into the fetcher's memory and then into here. That would save a lot
+ // of memory.
const flatbuffers::uoffset_t message_size =
- PackMessageSize(log_type, context);
+ PackMessageSize(log_type, context.size);
buffer = Push<flatbuffers::uoffset_t>(
buffer, message_size - sizeof(flatbuffers::uoffset_t));
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 16f3675..e034bfe 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -50,7 +50,7 @@
struct already_out_of_space_t {};
DetachedBufferWriter(std::string_view filename,
- std::unique_ptr<DetachedBufferEncoder> encoder);
+ std::unique_ptr<DataEncoder> encoder);
// Creates a dummy instance which won't even open a file. It will act as if
// opening the file ran out of space immediately.
DetachedBufferWriter(already_out_of_space_t) : ran_out_of_space_(true) {}
@@ -73,23 +73,8 @@
// Triggers a flush if there's enough data queued up.
//
// Steals the detached buffer from it.
- void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb,
- aos::monotonic_clock::time_point now) {
- QueueSizedFlatbuffer(fbb->Release(), now);
- }
- // May steal the backing storage of buffer, or may leave it alone.
- void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer,
- aos::monotonic_clock::time_point now) {
- QueueSizedFlatbuffer(std::move(buffer));
- FlushAtThreshold(now);
- }
- // Unconditionally queues the buffer.
- void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
- if (ran_out_of_space_) {
- return;
- }
- encoder_->Encode(std::move(buffer));
- }
+ void CopyMessage(DataEncoder::Copier *coppier,
+ aos::monotonic_clock::time_point now);
// Queues up data in span. May copy or may write it to disk immediately.
void QueueSpan(absl::Span<const uint8_t> span);
@@ -173,7 +158,7 @@
void FlushAtThreshold(aos::monotonic_clock::time_point now);
std::string filename_;
- std::unique_ptr<DetachedBufferEncoder> encoder_;
+ std::unique_ptr<DataEncoder> encoder_;
int fd_ = -1;
bool ran_out_of_space_ = false;
@@ -213,8 +198,7 @@
// Returns the size that the packed message from PackMessage or
// PackMessageInline will be.
-flatbuffers::uoffset_t PackMessageSize(LogType log_type,
- const Context &context);
+flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size);
// Packs the provided message pointed to by context into the provided buffer.
// This is equivalent to PackMessage, but doesn't require allocating a
diff --git a/aos/events/logging/logfile_utils_out_of_space_test_runner.cc b/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
index b08bebb..afc4d77 100644
--- a/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
+++ b/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
@@ -15,10 +15,12 @@
FLAGS_flush_size = 1;
CHECK(!FLAGS_tmpfs.empty()) << ": Must specify a tmpfs location";
- aos::logger::DetachedBufferWriter writer(
- FLAGS_tmpfs + "/file", std::make_unique<aos::logger::DummyEncoder>());
std::array<uint8_t, 10240> data;
data.fill(0);
+
+ aos::logger::DetachedBufferWriter writer(
+ FLAGS_tmpfs + "/file",
+ std::make_unique<aos::logger::DummyEncoder>(data.size()));
for (int i = 0; i < 8; ++i) {
writer.QueueSpan(data);
CHECK(!writer.ran_out_of_space()) << ": " << i;
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index c8d16e0..3d99757 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -31,8 +31,11 @@
// test only boilerplate to DetachedBufferWriter.
class TestDetachedBufferWriter : public DetachedBufferWriter {
public:
+ // Pick a max size that is rather conservative.
+ static constexpr size_t kMaxMessageSize = 128 * 1024;
TestDetachedBufferWriter(std::string_view filename)
- : DetachedBufferWriter(filename, std::make_unique<DummyEncoder>()) {}
+ : DetachedBufferWriter(filename,
+ std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
}
@@ -2983,12 +2986,13 @@
fbb.GetBufferSpan().size()));
// Make sure that both the builder and inline method agree on sizes.
- ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context))
+ ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
<< "log type " << static_cast<int>(type);
// Initialize the buffer to something nonzero to make sure all the padding
// bytes are set to 0.
- std::vector<uint8_t> repacked_message(PackMessageSize(type, context), 67);
+ std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
+ 67);
// And verify packing inline works as expected.
EXPECT_EQ(repacked_message.size(),
diff --git a/aos/events/logging/logger_main.cc b/aos/events/logging/logger_main.cc
index 526eb31..81b4e3f 100644
--- a/aos/events/logging/logger_main.cc
+++ b/aos/events/logging/logger_main.cc
@@ -3,6 +3,9 @@
#include "aos/configuration.h"
#include "aos/events/logging/log_writer.h"
+#ifdef LZMA
+#include "aos/events/logging/lzma_encoder.h"
+#endif
#include "aos/events/logging/snappy_encoder.h"
#include "aos/events/shm_event_loop.h"
#include "aos/init.h"
@@ -18,9 +21,17 @@
DEFINE_bool(snappy_compress, false, "If true, compress log data using snappy.");
+#ifdef LZMA
+DEFINE_bool(xz_compress, false, "If true, compress log data using xz.");
+#endif
+
DEFINE_double(rotate_every, 0.0,
"If set, rotate the logger after this many seconds");
+#ifdef LZMA
+DEFINE_int32(xz_compression_level, 9, "Compression level for the LZMA Encoder");
+#endif
+
int main(int argc, char *argv[]) {
gflags::SetUsageMessage(
"This program provides a simple logger binary that logs all SHMEM data "
@@ -41,8 +52,17 @@
if (FLAGS_snappy_compress) {
log_namer->set_extension(aos::logger::SnappyDecoder::kExtension);
- log_namer->set_encoder_factory(
- []() { return std::make_unique<aos::logger::SnappyEncoder>(); });
+ log_namer->set_encoder_factory([](size_t max_message_size) {
+ return std::make_unique<aos::logger::SnappyEncoder>(max_message_size);
+ });
+#ifdef LZMA
+ } else if (FLAGS_xz_compress) {
+ log_namer->set_extension(aos::logger::LzmaEncoder::kExtension);
+ log_namer->set_encoder_factory([](size_t max_message_size) {
+ return std::make_unique<aos::logger::LzmaEncoder>(
+ max_message_size, FLAGS_xz_compression_level);
+ });
+#endif
}
aos::monotonic_clock::time_point last_rotation_time =
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index c5b1768..cf5ec32 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -540,7 +540,8 @@
struct CompressionParams {
std::string_view extension;
- std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory;
+ std::function<std::unique_ptr<DataEncoder>(size_t max_message_size)>
+ encoder_factory;
};
std::ostream &operator<<(std::ostream &ostream,
@@ -550,12 +551,19 @@
}
std::vector<CompressionParams> SupportedCompressionAlgorithms() {
- return {{"", []() { return std::make_unique<DummyEncoder>(); }},
+ return {{"",
+ [](size_t max_message_size) {
+ return std::make_unique<DummyEncoder>(max_message_size);
+ }},
{SnappyDecoder::kExtension,
- []() { return std::make_unique<SnappyEncoder>(); }},
+ [](size_t max_message_size) {
+ return std::make_unique<SnappyEncoder>(max_message_size);
+ }},
#ifdef LZMA
{LzmaDecoder::kExtension,
- []() { return std::make_unique<LzmaEncoder>(3); }}
+ [](size_t max_message_size) {
+ return std::make_unique<LzmaEncoder>(max_message_size, 3);
+ }}
#endif // LZMA
};
}
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
index 5cb9897..f583818 100644
--- a/aos/events/logging/lzma_encoder.cc
+++ b/aos/events/logging/lzma_encoder.cc
@@ -50,7 +50,8 @@
} // namespace
-LzmaEncoder::LzmaEncoder(const uint32_t compression_preset, size_t block_size)
+LzmaEncoder::LzmaEncoder(size_t max_message_size,
+ const uint32_t compression_preset, size_t block_size)
: stream_(LZMA_STREAM_INIT), compression_preset_(compression_preset) {
CHECK_GE(compression_preset_, 0u)
<< ": Compression preset must be in the range [0, 9].";
@@ -78,15 +79,26 @@
stream_.avail_out = 0;
VLOG(2) << "LzmaEncoder: Initialization succeeded.";
+
+ // TODO(austin): We don't write the biggest messages very often. Is it more
+ // efficient to allocate if we go over a threshold to keep the static memory
+ // in use smaller, or just allocate the worst case like we are doing here?
+ input_buffer_.resize(max_message_size);
}
LzmaEncoder::~LzmaEncoder() { lzma_end(&stream_); }
-void LzmaEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
- CHECK(in.data()) << ": Encode called with nullptr.";
+void LzmaEncoder::Encode(Copier *copy) {
+ const size_t copy_size = copy->size();
+ // LZMA compresses the data as it goes along, copying the compressed results
+ // into another buffer. So, there's no need to store more than one message
+ // since lzma is going to take it from here.
+ CHECK_LE(copy_size, input_buffer_.size());
- stream_.next_in = in.data();
- stream_.avail_in = in.size();
+ CHECK_EQ(copy->Copy(input_buffer_.data()), copy_size);
+
+ stream_.next_in = input_buffer_.data();
+ stream_.avail_in = copy_size;
RunLzmaCode(LZMA_RUN);
}
@@ -103,20 +115,21 @@
}
}
-std::vector<absl::Span<const uint8_t>> LzmaEncoder::queue() const {
- std::vector<absl::Span<const uint8_t>> queue;
+absl::Span<const absl::Span<const uint8_t>> LzmaEncoder::queue() {
+ return_queue_.clear();
if (queue_.empty()) {
- return queue;
+ return return_queue_;
}
+ return_queue_.reserve(queue_.size());
for (size_t i = 0; i < queue_.size() - 1; ++i) {
- queue.emplace_back(
+ return_queue_.emplace_back(
absl::MakeConstSpan(queue_.at(i).data(), queue_.at(i).size()));
}
// For the last buffer in the queue, we must account for the possibility that
// the buffer isn't full yet.
- queue.emplace_back(absl::MakeConstSpan(
+ return_queue_.emplace_back(absl::MakeConstSpan(
queue_.back().data(), queue_.back().size() - stream_.avail_out));
- return queue;
+ return return_queue_;
}
size_t LzmaEncoder::queued_bytes() const {
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
index 14c00eb..bbd739a 100644
--- a/aos/events/logging/lzma_encoder.h
+++ b/aos/events/logging/lzma_encoder.h
@@ -16,12 +16,15 @@
namespace aos::logger {
// Encodes buffers using liblzma.
-class LzmaEncoder final : public DetachedBufferEncoder {
+class LzmaEncoder final : public DataEncoder {
public:
+ static constexpr std::string_view kExtension = ".xz";
+
// Initializes the LZMA stream and encoder. The block size is the block size
// used by the multithreaded encoder for batching. A block size of 0 tells
// lzma to pick it's favorite block size.
- explicit LzmaEncoder(uint32_t compression_preset, size_t block_size = 0);
+ explicit LzmaEncoder(size_t max_message_size, uint32_t compression_preset,
+ size_t block_size = 0);
LzmaEncoder(const LzmaEncoder &) = delete;
LzmaEncoder(LzmaEncoder &&other) = delete;
LzmaEncoder &operator=(const LzmaEncoder &) = delete;
@@ -29,16 +32,21 @@
// Gracefully shuts down the encoder.
~LzmaEncoder() final;
- void Encode(flatbuffers::DetachedBuffer &&in) final;
+ bool HasSpace(size_t /*request*/) const override {
+ // Since the underlying lzma encoder handles buffering, we always have
+ // space.
+ return true;
+ }
+ void Encode(Copier *copy) final;
void Finish() final;
void Clear(int n) final;
- std::vector<absl::Span<const uint8_t>> queue() const final;
+ absl::Span<const absl::Span<const uint8_t>> queue() final;
size_t queued_bytes() const final;
size_t total_bytes() const final { return total_bytes_; }
size_t queue_size() const final { return queue_.size(); }
private:
- static constexpr size_t kEncodedBufferSizeBytes{4096 * 10};
+ static constexpr size_t kEncodedBufferSizeBytes{1024 * 128};
void RunLzmaCode(lzma_action action);
@@ -49,6 +57,11 @@
// Total bytes that resulted from encoding raw data since the last call to
// Reset.
size_t total_bytes_ = 0;
+
+ // Buffer that messages get coppied into for encoding.
+ ResizeableBuffer input_buffer_;
+
+ std::vector<absl::Span<const uint8_t>> return_queue_;
};
// Decompresses data with liblzma.
diff --git a/aos/events/logging/lzma_encoder_test.cc b/aos/events/logging/lzma_encoder_test.cc
index 1b8c895..1e814ff 100644
--- a/aos/events/logging/lzma_encoder_test.cc
+++ b/aos/events/logging/lzma_encoder_test.cc
@@ -11,9 +11,10 @@
INSTANTIATE_TEST_SUITE_P(
MtLzma, BufferEncoderTest,
- ::testing::Combine(::testing::Values([]() {
+ ::testing::Combine(::testing::Values([](size_t max_message_size) {
FLAGS_lzma_threads = 3;
- return std::make_unique<LzmaEncoder>(2, 4096);
+ return std::make_unique<LzmaEncoder>(max_message_size,
+ 2, 4096);
}),
::testing::Values([](std::string_view filename) {
return std::make_unique<LzmaDecoder>(filename);
@@ -22,9 +23,10 @@
INSTANTIATE_TEST_SUITE_P(
MtLzmaThreaded, BufferEncoderTest,
- ::testing::Combine(::testing::Values([]() {
+ ::testing::Combine(::testing::Values([](size_t max_message_size) {
FLAGS_lzma_threads = 3;
- return std::make_unique<LzmaEncoder>(5, 4096);
+ return std::make_unique<LzmaEncoder>(max_message_size,
+ 5, 4096);
}),
::testing::Values([](std::string_view filename) {
return std::make_unique<ThreadedLzmaDecoder>(filename);
@@ -33,9 +35,10 @@
INSTANTIATE_TEST_SUITE_P(
Lzma, BufferEncoderTest,
- ::testing::Combine(::testing::Values([]() {
+ ::testing::Combine(::testing::Values([](size_t max_message_size) {
FLAGS_lzma_threads = 1;
- return std::make_unique<LzmaEncoder>(2, 4096);
+ return std::make_unique<LzmaEncoder>(max_message_size,
+ 2, 4096);
}),
::testing::Values([](std::string_view filename) {
return std::make_unique<LzmaDecoder>(filename);
@@ -44,9 +47,10 @@
INSTANTIATE_TEST_SUITE_P(
LzmaThreaded, BufferEncoderTest,
- ::testing::Combine(::testing::Values([]() {
+ ::testing::Combine(::testing::Values([](size_t max_message_size) {
FLAGS_lzma_threads = 1;
- return std::make_unique<LzmaEncoder>(5, 4096);
+ return std::make_unique<LzmaEncoder>(max_message_size,
+ 5, 4096);
}),
::testing::Values([](std::string_view filename) {
return std::make_unique<ThreadedLzmaDecoder>(filename);
@@ -63,7 +67,8 @@
std::vector<std::vector<uint8_t>> encoded_buffers;
{
const int encode_chunks = quantity_distribution(*random_number_generator());
- const auto encoder = std::make_unique<LzmaEncoder>(2);
+ const auto encoder = std::make_unique<LzmaEncoder>(
+ BufferEncoderBaseTest::kMaxMessageSize, 2);
encoded_buffers = CreateAndEncode(encode_chunks, encoder.get());
encoder->Finish();
diff --git a/aos/events/logging/snappy_encoder.cc b/aos/events/logging/snappy_encoder.cc
index 19011da..2ef8363 100644
--- a/aos/events/logging/snappy_encoder.cc
+++ b/aos/events/logging/snappy_encoder.cc
@@ -29,7 +29,11 @@
}
} // namespace
-SnappyEncoder::SnappyEncoder(size_t chunk_size) : chunk_size_(chunk_size) {
+SnappyEncoder::SnappyEncoder(size_t max_message_size, size_t chunk_size)
+ : chunk_size_(chunk_size),
+ // Make sure we have enough space to always add our largest message to the
+ // chunks we will flush.
+ buffer_source_(max_message_size + chunk_size - 1) {
queue_.emplace_back();
queue_.back().resize(kSnappyIdentifierChunk.size());
std::copy(kSnappyIdentifierChunk.begin(), kSnappyIdentifierChunk.end(),
@@ -39,10 +43,8 @@
void SnappyEncoder::Finish() { EncodeCurrentBuffer(); }
-void SnappyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
- accumulated_checksum_ =
- AccumulateCrc32({in.data(), in.size()}, accumulated_checksum_);
- buffer_source_.AppendBuffer(std::move(in));
+void SnappyEncoder::Encode(Copier *copy) {
+ buffer_source_.Append(copy);
if (buffer_source_.Available() >= chunk_size_) {
EncodeCurrentBuffer();
@@ -54,7 +56,7 @@
return;
}
const uint32_t uncompressed_checksum =
- MaskChecksum(accumulated_checksum_.value());
+ MaskChecksum(buffer_source_.accumulated_checksum());
queue_.emplace_back();
constexpr size_t kPrefixSize = 8;
queue_.back().resize(kPrefixSize +
@@ -79,7 +81,7 @@
total_bytes_ += queue_.back().size();
- accumulated_checksum_.reset();
+ buffer_source_.ResetAccumulatedChecksum();
CHECK_EQ(0u, buffer_source_.Available());
}
@@ -97,27 +99,26 @@
return bytes;
}
-std::vector<absl::Span<const uint8_t>> SnappyEncoder::queue() const {
- std::vector<absl::Span<const uint8_t>> queue;
- queue.reserve(queue_.size());
+absl::Span<const absl::Span<const uint8_t>> SnappyEncoder::queue() {
+ return_queue_.clear();
+ return_queue_.reserve(queue_.size());
for (const auto &buffer : queue_) {
- queue.emplace_back(buffer.data(), buffer.size());
+ return_queue_.emplace_back(buffer.data(), buffer.size());
}
- return queue;
+ return return_queue_;
+}
+
+SnappyEncoder::DetachedBufferSource::DetachedBufferSource(size_t buffer_size) {
+ data_.reserve(buffer_size);
}
size_t SnappyEncoder::DetachedBufferSource::Available() const {
- size_t total_available = 0;
- for (const flatbuffers::DetachedBuffer &buffer : buffers_) {
- total_available += buffer.size();
- }
- return total_available;
+ return data_.size() - index_into_first_buffer_;
}
const char *SnappyEncoder::DetachedBufferSource::Peek(size_t *length) {
- *CHECK_NOTNULL(length) = buffers_[0].size() - index_into_first_buffer_;
- return reinterpret_cast<char *>(buffers_[0].data()) +
- index_into_first_buffer_;
+ *CHECK_NOTNULL(length) = data_.size() - index_into_first_buffer_;
+ return reinterpret_cast<char *>(data_.data()) + index_into_first_buffer_;
}
void SnappyEncoder::DetachedBufferSource::Skip(size_t n) {
@@ -125,20 +126,25 @@
return;
}
- CHECK(!buffers_.empty());
+ CHECK_NE(data_.size(), 0u);
index_into_first_buffer_ += n;
- CHECK_LE(index_into_first_buffer_, buffers_[0].size())
+ CHECK_LE(index_into_first_buffer_, data_.size())
<< ": " << n << " is too large a skip.";
- if (index_into_first_buffer_ == buffers_[0].size()) {
- buffers_.erase(buffers_.begin());
+ if (index_into_first_buffer_ == data_.size()) {
+ data_.resize(0u);
index_into_first_buffer_ = 0;
}
}
-void SnappyEncoder::DetachedBufferSource::AppendBuffer(
- flatbuffers::DetachedBuffer &&buffer) {
- buffers_.emplace_back(std::move(buffer));
+void SnappyEncoder::DetachedBufferSource::Append(Copier *copy) {
+ const size_t copy_size = copy->size();
+ CHECK_LE(copy_size + data_.size(), data_.capacity());
+ size_t starting_size = data_.size();
+ data_.resize(starting_size + copy_size);
+ CHECK_EQ(copy->Copy(data_.data() + starting_size), copy_size);
+ accumulated_checksum_ = AccumulateCrc32(
+ {data_.data() + starting_size, copy_size}, accumulated_checksum_);
}
size_t SnappyDecoder::Read(uint8_t *begin, uint8_t *end) {
diff --git a/aos/events/logging/snappy_encoder.h b/aos/events/logging/snappy_encoder.h
index 6b39c21..d3edbe5 100644
--- a/aos/events/logging/snappy_encoder.h
+++ b/aos/events/logging/snappy_encoder.h
@@ -14,29 +14,42 @@
namespace aos::logger {
// Encodes buffers using snappy.
-class SnappyEncoder final : public DetachedBufferEncoder {
+class SnappyEncoder final : public DataEncoder {
public:
- explicit SnappyEncoder(size_t chunk_size = 32768);
+ explicit SnappyEncoder(size_t max_message_size, size_t chunk_size = 32768);
- void Encode(flatbuffers::DetachedBuffer &&in) final;
+ void Encode(Copier *copy) final;
+
void Finish() final;
void Clear(int n) final;
- std::vector<absl::Span<const uint8_t>> queue() const final;
+ absl::Span<const absl::Span<const uint8_t>> queue() final;
size_t queued_bytes() const final;
+ bool HasSpace(size_t /*request*/) const final {
+ // Since the output always mallocs space, we have infinite output space.
+ return true;
+ }
size_t total_bytes() const final { return total_bytes_; }
size_t queue_size() const final { return queue_.size(); }
private:
class DetachedBufferSource : public snappy::Source {
public:
+ DetachedBufferSource(size_t buffer_size);
size_t Available() const final;
const char *Peek(size_t *length) final;
void Skip(size_t n) final;
- void AppendBuffer(flatbuffers::DetachedBuffer &&buffer);
+ void Append(Copier *copy);
+
+ uint32_t accumulated_checksum() const {
+ return accumulated_checksum_.value();
+ }
+
+ void ResetAccumulatedChecksum() { accumulated_checksum_.reset(); }
private:
- std::vector<flatbuffers::DetachedBuffer> buffers_;
+ ResizeableBuffer data_;
size_t index_into_first_buffer_ = 0;
+ std::optional<uint32_t> accumulated_checksum_;
};
// Flushes buffer_source_ and stores the compressed buffer in queue_.
@@ -55,8 +68,9 @@
// Clear() to be written to disk.
const size_t chunk_size_;
DetachedBufferSource buffer_source_;
- std::optional<uint32_t> accumulated_checksum_;
std::vector<ResizeableBuffer> queue_;
+
+ std::vector<absl::Span<const uint8_t>> return_queue_;
size_t total_bytes_ = 0;
};
diff --git a/aos/events/logging/snappy_encoder_test.cc b/aos/events/logging/snappy_encoder_test.cc
index 3334602..7e61f3f 100644
--- a/aos/events/logging/snappy_encoder_test.cc
+++ b/aos/events/logging/snappy_encoder_test.cc
@@ -9,8 +9,8 @@
INSTANTIATE_TEST_SUITE_P(
Snappy, BufferEncoderTest,
- ::testing::Combine(::testing::Values([]() {
- return std::make_unique<SnappyEncoder>();
+ ::testing::Combine(::testing::Values([](size_t max_message_size) {
+ return std::make_unique<SnappyEncoder>(max_message_size);
}),
::testing::Values([](std::string_view filename) {
return std::make_unique<SnappyDecoder>(filename);
diff --git a/aos/events/pingpong.json b/aos/events/pingpong.json
index 64fb27f..48bd2ec 100644
--- a/aos/events/pingpong.json
+++ b/aos/events/pingpong.json
@@ -1,10 +1,14 @@
{
"applications": [
{
- "name": "ping"
+ "name": "ping",
+ "executable_name": "./ping",
+ "args": ["--config=aos_config.json"]
},
{
- "name": "pong"
+ "name": "pong",
+ "executable_name": "./pong",
+ "args": ["--config=aos_config.json"]
}
],
"channels": [
@@ -22,4 +26,4 @@
"imports": [
"aos.json"
]
-}
\ No newline at end of file
+}
diff --git a/aos/flatbuffer_merge.h b/aos/flatbuffer_merge.h
index 61dbf6b..4d2d395 100644
--- a/aos/flatbuffer_merge.h
+++ b/aos/flatbuffer_merge.h
@@ -12,7 +12,7 @@
// Merges 2 flat buffers with the provided type table into the builder. Returns
// the offset to the flatbuffers.
// One or both of t1 and t2 must be non-null. If one is null, this method
-// coppies instead of merging.
+// copies instead of merging.
flatbuffers::Offset<flatbuffers::Table> MergeFlatBuffers(
const flatbuffers::TypeTable *typetable, const flatbuffers::Table *t1,
const flatbuffers::Table *t2, flatbuffers::FlatBufferBuilder *fbb);
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index a88e801..8c48fa1 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -681,7 +681,7 @@
size_t /*iteration*/) {
// TODO(austin): Can we take a problem directly without
// AddConstraintSlackVariable being called on it and fill everything in?
- // That'll avoid a bunch of allocations and coppies of rather sizable
+ // That'll avoid a bunch of allocations and copies of rather sizable
// matricies.
//
// I think I want to do it as another review after everything works with the
diff --git a/aos/starter/BUILD b/aos/starter/BUILD
index 5bf3811..d4566a0 100644
--- a/aos/starter/BUILD
+++ b/aos/starter/BUILD
@@ -188,12 +188,16 @@
"$(rootpath //aos/events:ping)",
"$(rootpath //aos/events:pong)",
"$(rootpath :aos_starter)",
+ "$(rootpath //aos:aos_dump)",
+ "$(rootpath //aos/events/logging:logger_main)",
],
data = [
":aos_starter",
":starterd",
+ "//aos:aos_dump",
"//aos/events:ping",
"//aos/events:pingpong_config",
"//aos/events:pong",
+ "//aos/events/logging:logger_main",
],
)
diff --git a/aos/starter/starter_demo.py b/aos/starter/starter_demo.py
index f8fa520..5a50890 100755
--- a/aos/starter/starter_demo.py
+++ b/aos/starter/starter_demo.py
@@ -24,6 +24,8 @@
parser.add_argument("ping", help="Location of ping")
parser.add_argument("pong", help="Location of pong")
parser.add_argument("starter_cmd", help="Location of starter_cmd")
+ parser.add_argument("aos_dump", help="Location of aos_dump")
+ parser.add_argument("logger_main", help="Location of logger_main")
args = parser.parse_args()
# Copy all the interesting files into a temporary directory and run
@@ -38,6 +40,8 @@
shutil.copy(args.ping, tmpdir + "/ping")
shutil.copy(args.pong, tmpdir + "/pong")
shutil.copy(args.starter_cmd, tmpdir + "/starter_cmd")
+ shutil.copy(args.aos_dump, tmpdir + "/aos_dump")
+ shutil.copy(args.logger_main, tmpdir + "/logger_main")
print(f"Running starter from {tmpdir}")
print(f"\n\nTo run starter_cmd, do:\ncd {tmpdir}\n./starter_cmd\n\n")