Merge "Encode flatbuffers directly into the encoder when logging"
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 3b82e98..82e7061 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -117,6 +117,8 @@
deps = [
":logger_fbs",
"//aos:configuration_fbs",
+ "//aos:flatbuffers",
+ "//aos/containers:resizeable_buffer",
"@com_github_google_flatbuffers//:flatbuffers",
"@com_github_google_glog//:glog",
"@com_google_absl//absl/types:span",
@@ -397,6 +399,11 @@
srcs = [
"logger_main.cc",
],
+ copts = select({
+ "//tools:cpu_k8": ["-DLZMA=1"],
+ "//tools:cpu_arm64": ["-DLZMA=1"],
+ "//conditions:default": [],
+ }),
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
deps = [
diff --git a/aos/events/logging/buffer_encoder.cc b/aos/events/logging/buffer_encoder.cc
index 6ef61d4..ae20440 100644
--- a/aos/events/logging/buffer_encoder.cc
+++ b/aos/events/logging/buffer_encoder.cc
@@ -4,39 +4,60 @@
#include <sys/stat.h>
#include <sys/types.h>
+#include "aos/flatbuffers.h"
#include "glog/logging.h"
namespace aos::logger {
-void DummyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
- CHECK(in.data()) << ": Encode called with nullptr.";
+DummyEncoder::DummyEncoder(size_t max_buffer_size) {
+ // TODO(austin): This is going to end up writing > 128k chunks, not 128k
+ // chunks exactly. If we really want to, we could make it always write 128k
+ // chunks by only exposing n * 128k chunks as we go. This might improve write
+ // performance, then again, it might have no effect if the kernel is combining
+ // writes...
+ constexpr size_t kWritePageSize = 128 * 1024;
+ // Round up to the nearest page size.
+ input_buffer_.reserve(
+ ((max_buffer_size + kWritePageSize - 1) / kWritePageSize) *
+ kWritePageSize);
+ return_queue_.resize(1);
+}
- total_bytes_ += in.size();
- queue_.emplace_back(std::move(in));
+bool DummyEncoder::HasSpace(size_t request) const {
+ return request + input_buffer_.size() < input_buffer_.capacity();
+}
+
+void DummyEncoder::Encode(Copier *copy) {
+ DCHECK(HasSpace(copy->size()));
+ const size_t input_buffer_initial_size = input_buffer_.size();
+
+ input_buffer_.resize(input_buffer_initial_size + copy->size());
+ const size_t written_size =
+ copy->Copy(input_buffer_.data() + input_buffer_initial_size);
+ DCHECK_EQ(written_size, copy->size());
+
+ total_bytes_ += written_size;
}
void DummyEncoder::Clear(const int n) {
CHECK_GE(n, 0);
CHECK_LE(static_cast<size_t>(n), queue_size());
- queue_.erase(queue_.begin(), queue_.begin() + n);
+ if (n != 0) {
+ input_buffer_.resize(0u);
+ }
}
-std::vector<absl::Span<const uint8_t>> DummyEncoder::queue() const {
- std::vector<absl::Span<const uint8_t>> queue;
- queue.reserve(queue_.size());
- for (const auto &buffer : queue_) {
- queue.emplace_back(buffer.data(), buffer.size());
+absl::Span<const absl::Span<const uint8_t>> DummyEncoder::queue() {
+ if (input_buffer_.size() != 0) {
+ return_queue_[0] =
+ absl::Span<const uint8_t>(input_buffer_.data(), input_buffer_.size());
+ return return_queue_;
+ } else {
+ return absl::Span<const absl::Span<const uint8_t>>();
}
- return queue;
}
-size_t DummyEncoder::queued_bytes() const {
- size_t bytes = 0;
- for (const auto &buffer : queue_) {
- bytes += buffer.size();
- }
- return bytes;
-}
+size_t DummyEncoder::queued_bytes() const { return input_buffer_.size(); }
DummyDecoder::DummyDecoder(std::string_view filename)
: filename_(filename), fd_(open(filename_.c_str(), O_RDONLY | O_CLOEXEC)) {
diff --git a/aos/events/logging/buffer_encoder.h b/aos/events/logging/buffer_encoder.h
index 235a49c..4cd661d 100644
--- a/aos/events/logging/buffer_encoder.h
+++ b/aos/events/logging/buffer_encoder.h
@@ -2,27 +2,57 @@
#define AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
#include "absl/types/span.h"
+#include "aos/containers/resizeable_buffer.h"
#include "aos/events/logging/logger_generated.h"
#include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
namespace aos::logger {
-// Interface to encode DetachedBuffers as they are written to a file.
-//
-// The interface to a file is represented as a series of buffers, appropriate to
-// pass to writev(2). The backing storage for these is managed internally so
-// subclasses can handle resizing/moving as desired. Implementations must
-// enqueue as many of these backing buffers as demanded, leaving it up to the
-// caller to write them to a file when desired.
-class DetachedBufferEncoder {
+// Interface to encode data as it is written to a file.
+class DataEncoder {
public:
- virtual ~DetachedBufferEncoder() = default;
+ virtual ~DataEncoder() = default;
- // Encodes and enqueues the given DetachedBuffer.
- //
- // Note that in may be moved-from, or it may be left unchanged, depending on
- // what makes the most sense for a given implementation.
- virtual void Encode(flatbuffers::DetachedBuffer &&in) = 0;
+ // Interface to copy data into a buffer.
+ class Copier {
+ public:
+ Copier(size_t size) : size_(size) {}
+
+ // Returns the data this will write.
+ size_t size() const { return size_; }
+
+ // Writes size() bytes to data, and returns the data written.
+ [[nodiscard]] virtual size_t Copy(uint8_t *data) = 0;
+
+ private:
+ size_t size_;
+ };
+
+ // Coppies a span. The span must have a longer lifetime than the coppier is
+ // being used.
+ class SpanCopier : public Copier {
+ public:
+ SpanCopier(absl::Span<const uint8_t> data)
+ : Copier(data.size()), data_(data) {
+ CHECK(data_.data());
+ }
+
+ size_t Copy(uint8_t *data) final {
+ std::memcpy(data, data_.data(), data_.size());
+ return data_.size();
+ }
+
+ private:
+ const absl::Span<const uint8_t> data_;
+ };
+
+ // Returns true if there is space in the buffer for the next request, or if
+ // the output needs to be flushed.
+ virtual bool HasSpace(size_t request) const = 0;
+
+ // Encodes and enqueues the given data encoder.
+ virtual void Encode(Copier *copy) = 0;
// If this returns true, the encoder may be bypassed by writing directly to
// the file.
@@ -37,8 +67,9 @@
// Clears the first n encoded buffers from the queue.
virtual void Clear(int n) = 0;
- // Returns a view of the queue of encoded buffers.
- virtual std::vector<absl::Span<const uint8_t>> queue() const = 0;
+ // Returns a view of the queue of encoded buffers. Valid until any other
+ // method on this class is called.
+ virtual absl::Span<const absl::Span<const uint8_t>> queue() = 0;
// Returns the total number of of bytes currently queued up.
virtual size_t queued_bytes() const = 0;
@@ -53,28 +84,32 @@
// This class does not encode the data. It just claims ownership of the raw data
// and queues it up as is.
-class DummyEncoder final : public DetachedBufferEncoder {
+class DummyEncoder final : public DataEncoder {
public:
- DummyEncoder() {}
+ DummyEncoder(size_t max_buffer_size);
DummyEncoder(const DummyEncoder &) = delete;
DummyEncoder(DummyEncoder &&other) = delete;
DummyEncoder &operator=(const DummyEncoder &) = delete;
DummyEncoder &operator=(DummyEncoder &&other) = delete;
~DummyEncoder() override = default;
- // No encoding happens, the raw data is queued up as is.
- void Encode(flatbuffers::DetachedBuffer &&in) final;
+ bool HasSpace(size_t request) const final;
+ void Encode(Copier *copy) final;
bool may_bypass() const final { return true; }
void Finish() final {}
void Clear(int n) final;
- std::vector<absl::Span<const uint8_t>> queue() const final;
+ absl::Span<const absl::Span<const uint8_t>> queue() final;
size_t queued_bytes() const final;
size_t total_bytes() const final { return total_bytes_; }
- size_t queue_size() const final { return queue_.size(); }
+ size_t queue_size() const final {
+ return input_buffer_.size() != 0 ? 1u : 0u;
+ }
private:
- std::vector<flatbuffers::DetachedBuffer> queue_;
size_t total_bytes_ = 0;
+
+ ResizeableBuffer input_buffer_;
+ std::vector<absl::Span<const uint8_t>> return_queue_;
};
// Interface to decode chunks of data. Implementations of this interface will
diff --git a/aos/events/logging/buffer_encoder_param_test.h b/aos/events/logging/buffer_encoder_param_test.h
index 871e25e..e6c4867 100644
--- a/aos/events/logging/buffer_encoder_param_test.h
+++ b/aos/events/logging/buffer_encoder_param_test.h
@@ -14,24 +14,55 @@
namespace aos::logger::testing {
-// Contains some helpful infrastructure for testing DetachedBufferEncoder
+// Contains some helpful infrastructure for testing DataEncoder
// implementations.
class BufferEncoderBaseTest : public ::testing::Test {
public:
+ static constexpr size_t kMaxMessageSize = 2 * 1024 * 1024;
+
+ class DetachedBufferCopier : public DataEncoder::Copier {
+ public:
+ DetachedBufferCopier(flatbuffers::DetachedBuffer &&data)
+ : DataEncoder::Copier(data.size()), data_(std::move(data)) {}
+
+ size_t Copy(uint8_t *data) final {
+ std::memcpy(data, data_.data(), data_.size());
+ return data_.size();
+ }
+
+ private:
+ const flatbuffers::DetachedBuffer data_;
+ };
+
// Creates and encodes n random buffers, returning a copy of the encoded data.
- std::vector<std::vector<uint8_t>> CreateAndEncode(
- int n, DetachedBufferEncoder *encoder) {
+ std::vector<std::vector<uint8_t>> CreateAndEncode(int n,
+ DataEncoder *encoder) {
std::vector<std::vector<uint8_t>> result;
for (int i = 0; i < n; i++) {
flatbuffers::DetachedBuffer buffer = CreateRandomBuffer();
result.emplace_back(buffer.data(), buffer.data() + buffer.size());
- encoder->Encode(std::move(buffer));
+ LOG(INFO) << "Encoding " << buffer.size();
+ CHECK(encoder->HasSpace(buffer.size()))
+ << ": The test isn't smart enough to flush, figure out what to do. "
+ "Has "
+ << encoder->queued_bytes() << ", encoding " << buffer.size();
+
+ DetachedBufferCopier coppier(std::move(buffer));
+ encoder->Encode(&coppier);
}
return result;
}
// Returns the total size of a vector full of objects with a size() method.
template <typename T>
+ static size_t TotalSize(const absl::Span<T> buffers) {
+ size_t result = 0;
+ for (const auto &v : buffers) {
+ result += v.size();
+ }
+ return result;
+ }
+ template <typename T>
static size_t TotalSize(const std::vector<T> &buffers) {
size_t result = 0;
for (const auto &v : buffers) {
@@ -86,7 +117,7 @@
std::mt19937(::aos::testing::RandomSeed())};
};
-// Tests some generic properties for any DetachedBufferEncoder+DataDecoder
+// Tests some generic properties for any DataEncoder+DataDecoder
// implementation pair.
//
// First and second test parameters are methods to create instances of the
@@ -95,13 +126,13 @@
class BufferEncoderTest
: public BufferEncoderBaseTest,
public ::testing::WithParamInterface<std::tuple<
- std::function<std::unique_ptr<DetachedBufferEncoder>()>,
+ std::function<std::unique_ptr<DataEncoder>(size_t)>,
std::function<std::unique_ptr<DataDecoder>(std::string_view)>, int>> {
public:
BufferEncoderTest() { Reseed(std::get<2>(GetParam())); }
- std::unique_ptr<DetachedBufferEncoder> MakeEncoder() const {
- return std::get<0>(GetParam())();
+ std::unique_ptr<DataEncoder> MakeEncoder() const {
+ return std::get<0>(GetParam())(BufferEncoderBaseTest::kMaxMessageSize);
}
std::unique_ptr<DataDecoder> MakeDecoder(std::string_view filename) const {
return std::get<1>(GetParam())(filename);
diff --git a/aos/events/logging/buffer_encoder_test.cc b/aos/events/logging/buffer_encoder_test.cc
index 5a9b85f..8e12e37 100644
--- a/aos/events/logging/buffer_encoder_test.cc
+++ b/aos/events/logging/buffer_encoder_test.cc
@@ -13,13 +13,26 @@
class DummyEncoderTest : public BufferEncoderBaseTest {};
-// Tests that buffers are enqueued without any changes.
+// Tests that buffers are concatenated without being modified.
TEST_F(DummyEncoderTest, QueuesBuffersAsIs) {
- DummyEncoder encoder;
+ DummyEncoder encoder(BufferEncoderBaseTest::kMaxMessageSize);
const auto expected = CreateAndEncode(100, &encoder);
+ std::vector<uint8_t> data = Flatten(expected);
auto queue = encoder.queue();
- EXPECT_THAT(queue, ::testing::ElementsAreArray(expected));
+ ASSERT_EQ(queue.size(), 1u);
+ EXPECT_EQ(queue[0], absl::Span<const uint8_t>(data));
+}
+
+// Tests that buffers are concatenated without being modified.
+TEST_F(DummyEncoderTest, CoppiesBuffersAsIs) {
+ DummyEncoder encoder(BufferEncoderBaseTest::kMaxMessageSize);
+ const auto expected = CreateAndEncode(100, &encoder);
+ std::vector<uint8_t> data = Flatten(expected);
+
+ auto queue = encoder.queue();
+ ASSERT_EQ(queue.size(), 1u);
+ EXPECT_EQ(queue[0], absl::Span<const uint8_t>(data));
}
// Checks that DummyDecoder can read into a buffer.
@@ -94,8 +107,8 @@
INSTANTIATE_TEST_SUITE_P(
Dummy, BufferEncoderTest,
- ::testing::Combine(::testing::Values([]() {
- return std::make_unique<DummyEncoder>();
+ ::testing::Combine(::testing::Values([](size_t max_buffer_size) {
+ return std::make_unique<DummyEncoder>(max_buffer_size);
}),
::testing::Values([](std::string_view filename) {
return std::make_unique<DummyDecoder>(filename);
diff --git a/aos/events/logging/log_edit.cc b/aos/events/logging/log_edit.cc
index b0f8ba5..969d59c 100644
--- a/aos/events/logging/log_edit.cc
+++ b/aos/events/logging/log_edit.cc
@@ -18,6 +18,11 @@
"If provided, this is the path to the JSON with the log file header. If "
"not provided, _header.json will be appended to --logfile.");
+DEFINE_int32(
+ max_message_size, 128 * 1024 * 1024,
+ "Max size of a message to be written. This sets the buffers inside "
+ "the encoders.");
+
int main(int argc, char **argv) {
gflags::SetUsageMessage(R"(This tool lets us manipulate log files.)");
aos::InitGoogle(&argc, &argv);
@@ -44,7 +49,8 @@
CHECK(!span_reader.ReadMessage().empty()) << ": Empty header, aborting";
aos::logger::DetachedBufferWriter buffer_writer(
- FLAGS_logfile, std::make_unique<aos::logger::DummyEncoder>());
+ FLAGS_logfile,
+ std::make_unique<aos::logger::DummyEncoder>(FLAGS_max_message_size));
buffer_writer.QueueSpan(header.span());
while (true) {
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index a04d0ea..120bd79 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -20,14 +20,16 @@
NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
const Node *logger_node,
std::function<void(NewDataWriter *)> reopen,
- std::function<void(NewDataWriter *)> close)
+ std::function<void(NewDataWriter *)> close,
+ size_t max_message_size)
: node_(node),
node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
logger_node_index_(
configuration::GetNodeIndex(log_namer->configuration_, logger_node)),
log_namer_(log_namer),
reopen_(std::move(reopen)),
- close_(std::move(close)) {
+ close_(std::move(close)),
+ max_message_size_(max_message_size) {
state_.resize(configuration::NodesCount(log_namer->configuration_));
CHECK_LT(node_index_, state_.size());
}
@@ -179,9 +181,9 @@
}
}
-void NewDataWriter::QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
- const UUID &source_node_boot_uuid,
- aos::monotonic_clock::time_point now) {
+void NewDataWriter::CopyMessage(DataEncoder::Copier *coppier,
+ const UUID &source_node_boot_uuid,
+ aos::monotonic_clock::time_point now) {
// Trigger a reboot if we detect the boot UUID change.
UpdateBoot(source_node_boot_uuid);
@@ -202,7 +204,7 @@
CHECK(writer);
CHECK(header_written_) << ": Attempting to write message before header to "
<< writer->filename();
- writer->QueueSizedFlatbuffer(fbb, now);
+ writer->CopyMessage(coppier, now);
}
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
@@ -228,6 +230,16 @@
CHECK_EQ(state_[node_index_].boot_uuid,
UUID::FromString(header.message().source_node_boot_uuid()));
if (!writer) {
+ // Since we haven't opened the first time, it's still not too late to update
+ // the max message size. Make sure the header fits.
+ //
+ // This won't work well on reboots, but the structure of the header is fixed
+ // by that point in time, so it's size is fixed too.
+ //
+ // Most of the time, the minimum buffer size inside the encoder of around
+ // 128k will make this a non-issue.
+ UpdateMaxMessageSize(header.span().size());
+
reopen_(this);
}
@@ -235,10 +247,8 @@
<< aos::FlatbufferToJson(
header, {.multi_line = false, .max_vector_size = 100});
- // TODO(austin): This triggers a dummy allocation that we don't need as part
- // of releasing. Can we skip it?
CHECK(writer);
- writer->QueueSizedFlatbuffer(header.Release());
+ writer->QueueSpan(header.span());
header_written_ = true;
monotonic_start_time_ = log_namer_->monotonic_start_time(
node_index_, state_[node_index_].boot_uuid);
@@ -593,9 +603,10 @@
base_name_, separator, config_sha256, ".bfbs", extension_, temp_suffix_);
std::unique_ptr<DetachedBufferWriter> writer =
- std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
+ std::make_unique<DetachedBufferWriter>(
+ filename, encoder_factory_(header->span().size()));
- writer->QueueSizedFlatbuffer(header->Release());
+ writer->QueueSpan(header->span());
if (!writer->ran_out_of_space()) {
all_filenames_.emplace_back(
@@ -624,8 +635,10 @@
// generated if it is sendable on this node.
if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
if (!data_writer_) {
- OpenDataWriter();
+ MakeDataWriter();
}
+ data_writer_->UpdateMaxMessageSize(PackMessageSize(
+ LogType::kLogRemoteMessage, channel->max_size()));
return data_writer_.get();
}
@@ -647,9 +660,8 @@
[this, channel](NewDataWriter *data_writer) {
OpenWriter(channel, data_writer);
},
- [this](NewDataWriter *data_writer) {
- CloseWriter(&data_writer->writer);
- });
+ [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
+ PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
return &(
data_writers_.emplace(channel, std::move(data_writer)).first->second);
}
@@ -672,9 +684,8 @@
[this, channel](NewDataWriter *data_writer) {
OpenForwardedTimestampWriter(channel, data_writer);
},
- [this](NewDataWriter *data_writer) {
- CloseWriter(&data_writer->writer);
- });
+ [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
+ PackRemoteMessageSize());
return &(
data_writers_.emplace(channel, std::move(data_writer)).first->second);
}
@@ -690,8 +701,10 @@
}
if (!data_writer_) {
- OpenDataWriter();
+ MakeDataWriter();
}
+ data_writer_->UpdateMaxMessageSize(
+ PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
return data_writer_.get();
}
@@ -724,7 +737,8 @@
absl::StrCat("timestamps", channel->name()->string_view(), "/",
channel->type()->string_view(), ".part",
data_writer->parts_index(), ".bfbs", extension_);
- CreateBufferWriter(filename, &data_writer->writer);
+ CreateBufferWriter(filename, data_writer->max_message_size(),
+ &data_writer->writer);
}
void MultiNodeLogNamer::OpenWriter(const Channel *channel,
@@ -733,10 +747,11 @@
CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
channel->name()->string_view(), "/", channel->type()->string_view(),
".part", data_writer->parts_index(), ".bfbs", extension_);
- CreateBufferWriter(filename, &data_writer->writer);
+ CreateBufferWriter(filename, data_writer->max_message_size(),
+ &data_writer->writer);
}
-void MultiNodeLogNamer::OpenDataWriter() {
+void MultiNodeLogNamer::MakeDataWriter() {
if (!data_writer_) {
data_writer_ = std::make_unique<NewDataWriter>(
this, node_, node_,
@@ -747,16 +762,20 @@
}
absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
extension_);
- CreateBufferWriter(name, &writer->writer);
+ CreateBufferWriter(name, writer->max_message_size(), &writer->writer);
},
[this](NewDataWriter *data_writer) {
CloseWriter(&data_writer->writer);
- });
+ },
+ // Default size is 0 so it will be obvious if something doesn't fix it
+ // afterwards.
+ 0);
}
}
void MultiNodeLogNamer::CreateBufferWriter(
- std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
+ std::string_view path, size_t max_message_size,
+ std::unique_ptr<DetachedBufferWriter> *destination) {
if (ran_out_of_space_) {
// Refuse to open any new files, which might skip data. Any existing files
// are in the same folder, which means they're on the same filesystem, which
@@ -778,8 +797,8 @@
DetachedBufferWriter::already_out_of_space_t());
return;
}
- *destination =
- std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
+ *destination = std::make_unique<DetachedBufferWriter>(
+ filename, encoder_factory_(max_message_size));
if (!destination->get()->ran_out_of_space()) {
all_filenames_.emplace_back(path);
}
@@ -793,7 +812,8 @@
return;
}
- *destination->get() = DetachedBufferWriter(filename, encoder_factory_());
+ *destination->get() =
+ DetachedBufferWriter(filename, encoder_factory_(max_message_size));
if (!destination->get()->ran_out_of_space()) {
all_filenames_.emplace_back(path);
}
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 00ed63f..4e8c990 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -38,7 +38,16 @@
// close is called to close that file and extract any statistics.
NewDataWriter(LogNamer *log_namer, const Node *node, const Node *logger_node,
std::function<void(NewDataWriter *)> reopen,
- std::function<void(NewDataWriter *)> close);
+ std::function<void(NewDataWriter *)> close,
+ size_t max_message_size);
+
+ void UpdateMaxMessageSize(size_t new_size) {
+ if (new_size > max_message_size_) {
+ CHECK(!header_written_);
+ max_message_size_ = new_size;
+ }
+ }
+ size_t max_message_size() const { return max_message_size_; }
NewDataWriter(NewDataWriter &&other) = default;
aos::logger::NewDataWriter &operator=(NewDataWriter &&other) = default;
@@ -58,10 +67,10 @@
bool reliable,
monotonic_clock::time_point monotonic_timestamp_time =
monotonic_clock::min_time);
- // Queues up a message with the provided boot UUID.
- void QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
- const UUID &node_boot_uuid,
- aos::monotonic_clock::time_point now);
+ // Coppies a message with the provided boot UUID.
+ void CopyMessage(DataEncoder::Copier *coppier,
+ const UUID &source_node_boot_uuid,
+ aos::monotonic_clock::time_point now);
// Updates the current boot for the source node. This is useful when you want
// to queue a message that may trigger a reboot rotation, but then need to
@@ -148,6 +157,8 @@
bool header_written_ = false;
std::vector<State> state_;
+
+ size_t max_message_size_;
};
// Interface describing how to name, track, and add headers to log file parts.
@@ -313,11 +324,12 @@
temp_suffix_ = temp_suffix;
}
- // Sets the function for creating encoders.
+ // Sets the function for creating encoders. The argument is the max message
+ // size (including headers) that will be written into this encoder.
//
// Defaults to just creating DummyEncoders.
void set_encoder_factory(
- std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory) {
+ std::function<std::unique_ptr<DataEncoder>(size_t)> encoder_factory) {
encoder_factory_ = std::move(encoder_factory);
}
@@ -449,9 +461,9 @@
void OpenWriter(const Channel *channel, NewDataWriter *data_writer);
// Opens the main data writer file for this node responsible for data_writer_.
- void OpenDataWriter();
+ void MakeDataWriter();
- void CreateBufferWriter(std::string_view path,
+ void CreateBufferWriter(std::string_view path, size_t max_message_size,
std::unique_ptr<DetachedBufferWriter> *destination);
void RenameTempFile(DetachedBufferWriter *destination);
@@ -479,8 +491,10 @@
std::vector<std::string> all_filenames_;
std::string temp_suffix_;
- std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory_ =
- []() { return std::make_unique<DummyEncoder>(); };
+ std::function<std::unique_ptr<DataEncoder>(size_t)> encoder_factory_ =
+ [](size_t max_message_size) {
+ return std::make_unique<DummyEncoder>(max_message_size);
+ };
std::string extension_;
// Storage for statistics from previously-rotated DetachedBufferWriters.
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 36116a3..ab7d3c6 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -759,6 +759,64 @@
}
}
+// Class to copy a context into the provided buffer.
+class ContextDataCopier : public DataEncoder::Copier {
+ public:
+ ContextDataCopier(const Context &context, int channel_index,
+ LogType log_type, EventLoop *event_loop)
+ : DataEncoder::Copier(PackMessageSize(log_type, context.size)),
+ context_(context),
+ channel_index_(channel_index),
+ log_type_(log_type),
+ event_loop_(event_loop) {}
+
+ monotonic_clock::time_point end_time() const { return end_time_; }
+
+ size_t Copy(uint8_t *data) final {
+ size_t result =
+ PackMessageInline(data, context_, channel_index_, log_type_);
+ end_time_ = event_loop_->monotonic_now();
+ return result;
+ }
+
+ private:
+ const Context &context_;
+ const int channel_index_;
+ const LogType log_type_;
+ EventLoop *event_loop_;
+ monotonic_clock::time_point end_time_;
+};
+
+// Class to copy a RemoteMessage into the provided buffer.
+class RemoteMessageCopier : public DataEncoder::Copier {
+ public:
+ RemoteMessageCopier(
+ const message_bridge::RemoteMessage *message, int channel_index,
+ aos::monotonic_clock::time_point monotonic_timestamp_time,
+ EventLoop *event_loop)
+ : DataEncoder::Copier(PackRemoteMessageSize()),
+ message_(message),
+ channel_index_(channel_index),
+ monotonic_timestamp_time_(monotonic_timestamp_time),
+ event_loop_(event_loop) {}
+
+ monotonic_clock::time_point end_time() const { return end_time_; }
+
+ size_t Copy(uint8_t *data) final {
+ size_t result = PackRemoteMessageInline(data, message_, channel_index_,
+ monotonic_timestamp_time_);
+ end_time_ = event_loop_->monotonic_now();
+ return result;
+ }
+
+ private:
+ const message_bridge::RemoteMessage *message_;
+ int channel_index_;
+ aos::monotonic_clock::time_point monotonic_timestamp_time_;
+ EventLoop *event_loop_;
+ monotonic_clock::time_point end_time_;
+};
+
void Logger::WriteData(NewDataWriter *writer, const FetcherStruct &f) {
if (writer != nullptr) {
const UUID source_node_boot_uuid =
@@ -767,25 +825,17 @@
: event_loop_->boot_uuid();
// Write!
const auto start = event_loop_->monotonic_now();
- flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
- max_header_size_);
- fbb.ForceDefaults(true);
- fbb.FinishSizePrefixed(
- PackMessage(&fbb, f.fetcher->context(), f.channel_index, f.log_type));
- const auto end = event_loop_->monotonic_now();
- RecordCreateMessageTime(start, end, f);
+ ContextDataCopier coppier(f.fetcher->context(), f.channel_index,
+ f.log_type, event_loop_);
- max_header_size_ =
- std::max(max_header_size_, fbb.GetSize() - f.fetcher->context().size);
- writer->QueueMessage(&fbb, source_node_boot_uuid, end);
+ writer->CopyMessage(&coppier, source_node_boot_uuid, start);
+ RecordCreateMessageTime(start, coppier.end_time(), f);
VLOG(2) << "Wrote data as node " << FlatbufferToJson(node_)
<< " for channel "
<< configuration::CleanedChannelToString(f.fetcher->channel())
- << " to " << writer->filename() << " data "
- << FlatbufferToJson(flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
+ << " to " << writer->filename();
}
}
@@ -793,29 +843,24 @@
const FetcherStruct &f) {
if (timestamp_writer != nullptr) {
// And now handle timestamps.
- const auto start = event_loop_->monotonic_now();
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
-
- fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- f.channel_index,
- LogType::kLogDeliveryTimeOnly));
- const auto end = event_loop_->monotonic_now();
- RecordCreateMessageTime(start, end, f);
// Tell our writer that we know something about the remote boot.
timestamp_writer->UpdateRemote(
f.data_node_index, f.fetcher->context().source_boot_uuid,
f.fetcher->context().monotonic_remote_time,
f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
- timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
+
+ const auto start = event_loop_->monotonic_now();
+ ContextDataCopier coppier(f.fetcher->context(), f.channel_index,
+ LogType::kLogDeliveryTimeOnly, event_loop_);
+
+ timestamp_writer->CopyMessage(&coppier, event_loop_->boot_uuid(), start);
+ RecordCreateMessageTime(start, coppier.end_time(), f);
VLOG(2) << "Wrote timestamps as node " << FlatbufferToJson(node_)
<< " for channel "
<< configuration::CleanedChannelToString(f.fetcher->channel())
- << " to " << timestamp_writer->filename() << " timestamp "
- << FlatbufferToJson(flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
+ << " to " << timestamp_writer->filename() << " timestamp";
}
}
@@ -825,20 +870,10 @@
const auto start = event_loop_->monotonic_now();
// And now handle the special message contents channel. Copy the
// message into a FlatBufferBuilder and save it to disk.
- //
- // TODO(austin): We can be more efficient here when we start to
- // care...
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
-
const RemoteMessage *msg =
flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
- // TODO(austin): This needs to check the channel_index and confirm
- // that it should be logged before squirreling away the timestamp to
- // disk. We don't want to log irrelevant timestamps.
-
// Translate from the channel index that the event loop uses to the
// channel index in the log file.
const int channel_index =
@@ -847,11 +882,6 @@
const aos::monotonic_clock::time_point monotonic_timestamp_time =
f.fetcher->context().monotonic_event_time;
- fbb.FinishSizePrefixed(
- PackRemoteMessage(&fbb, msg, channel_index, monotonic_timestamp_time));
- const auto end = event_loop_->monotonic_now();
- RecordCreateMessageTime(start, end, f);
-
// Timestamps tell us information about what happened too!
// Capture any reboots so UpdateRemote is properly recorded.
contents_writer->UpdateBoot(UUID::FromVector(msg->boot_uuid()));
@@ -871,8 +901,13 @@
chrono::nanoseconds(msg->monotonic_sent_time())),
reliable, monotonic_timestamp_time);
- contents_writer->QueueMessage(&fbb, UUID::FromVector(msg->boot_uuid()),
- end);
+ RemoteMessageCopier coppier(msg, channel_index, monotonic_timestamp_time,
+ event_loop_);
+
+ contents_writer->CopyMessage(&coppier, UUID::FromVector(msg->boot_uuid()),
+ start);
+
+ RecordCreateMessageTime(start, coppier.end_time(), f);
}
}
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
index 51b46ae..3e1832e 100644
--- a/aos/events/logging/log_writer.h
+++ b/aos/events/logging/log_writer.h
@@ -321,10 +321,6 @@
// Last time that data was written for all channels to disk.
monotonic_clock::time_point last_synchronized_time_;
- // Max size that the header has consumed. This much extra data will be
- // reserved in the builder to avoid reallocating.
- size_t max_header_size_ = 0;
-
// If true, write the message header into a separate file.
bool separate_config_ = true;
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 7f32f44..4c06f0a 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -29,7 +29,7 @@
#include "aos/events/logging/lzma_encoder.h"
#endif
-DEFINE_int32(flush_size, 128000,
+DEFINE_int32(flush_size, 128 * 1024,
"Number of outstanding bytes to allow before flushing to disk.");
DEFINE_double(
flush_period, 5.0,
@@ -76,13 +76,13 @@
}
} // namespace
-DetachedBufferWriter::DetachedBufferWriter(
- std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
+DetachedBufferWriter::DetachedBufferWriter(std::string_view filename,
+ std::unique_ptr<DataEncoder> encoder)
: filename_(filename), encoder_(std::move(encoder)) {
if (!util::MkdirPIfSpace(filename, 0777)) {
ran_out_of_space_ = true;
} else {
- fd_ = open(std::string(filename).c_str(),
+ fd_ = open(filename_.c_str(),
O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
if (fd_ == -1 && errno == ENOSPC) {
ran_out_of_space_ = true;
@@ -136,29 +136,27 @@
return;
}
- aos::monotonic_clock::time_point now;
- if (encoder_->may_bypass() && span.size() > 4096u) {
- // Over this threshold, we'll assume it's cheaper to add an extra
- // syscall to write the data immediately instead of copying it to
- // enqueue.
+ if (!encoder_->HasSpace(span.size())) {
+ Flush();
+ CHECK(encoder_->HasSpace(span.size()));
+ }
+ DataEncoder::SpanCopier coppier(span);
+ encoder_->Encode(&coppier);
+ FlushAtThreshold(aos::monotonic_clock::now());
+}
- // First, flush everything.
- while (encoder_->queue_size() > 0u) {
- Flush();
- }
-
- // Then, write it directly.
- const auto start = aos::monotonic_clock::now();
- const ssize_t written = write(fd_, span.data(), span.size());
- const auto end = aos::monotonic_clock::now();
- HandleWriteReturn(written, span.size());
- UpdateStatsForWrite(end - start, written, 1);
- now = end;
- } else {
- encoder_->Encode(CopySpanAsDetachedBuffer(span));
- now = aos::monotonic_clock::now();
+void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *coppier,
+ aos::monotonic_clock::time_point now) {
+ if (ran_out_of_space_) {
+ return;
}
+ if (!encoder_->HasSpace(coppier->size())) {
+ Flush();
+ CHECK(encoder_->HasSpace(coppier->size()));
+ }
+
+ encoder_->Encode(coppier);
FlushAtThreshold(now);
}
@@ -575,11 +573,11 @@
}
flatbuffers::uoffset_t PackMessageSize(LogType log_type,
- const Context &context) {
+ size_t data_size) {
static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
"Update size logic please.");
const flatbuffers::uoffset_t aligned_data_length =
- ((context.size + 7) & 0xfffffff8u);
+ ((data_size + 7) & 0xfffffff8u);
switch (log_type) {
case LogType::kLogDeliveryTimeOnly:
return PackMessageHeaderSize(log_type);
@@ -596,8 +594,11 @@
size_t PackMessageInline(uint8_t *buffer, const Context &context,
int channel_index, LogType log_type) {
+ // TODO(austin): Figure out how to copy directly from shared memory instead of
+ // first into the fetcher's memory and then into here. That would save a lot
+ // of memory.
const flatbuffers::uoffset_t message_size =
- PackMessageSize(log_type, context);
+ PackMessageSize(log_type, context.size);
buffer = Push<flatbuffers::uoffset_t>(
buffer, message_size - sizeof(flatbuffers::uoffset_t));
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 16f3675..e034bfe 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -50,7 +50,7 @@
struct already_out_of_space_t {};
DetachedBufferWriter(std::string_view filename,
- std::unique_ptr<DetachedBufferEncoder> encoder);
+ std::unique_ptr<DataEncoder> encoder);
// Creates a dummy instance which won't even open a file. It will act as if
// opening the file ran out of space immediately.
DetachedBufferWriter(already_out_of_space_t) : ran_out_of_space_(true) {}
@@ -73,23 +73,8 @@
// Triggers a flush if there's enough data queued up.
//
// Steals the detached buffer from it.
- void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb,
- aos::monotonic_clock::time_point now) {
- QueueSizedFlatbuffer(fbb->Release(), now);
- }
- // May steal the backing storage of buffer, or may leave it alone.
- void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer,
- aos::monotonic_clock::time_point now) {
- QueueSizedFlatbuffer(std::move(buffer));
- FlushAtThreshold(now);
- }
- // Unconditionally queues the buffer.
- void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
- if (ran_out_of_space_) {
- return;
- }
- encoder_->Encode(std::move(buffer));
- }
+ void CopyMessage(DataEncoder::Copier *coppier,
+ aos::monotonic_clock::time_point now);
// Queues up data in span. May copy or may write it to disk immediately.
void QueueSpan(absl::Span<const uint8_t> span);
@@ -173,7 +158,7 @@
void FlushAtThreshold(aos::monotonic_clock::time_point now);
std::string filename_;
- std::unique_ptr<DetachedBufferEncoder> encoder_;
+ std::unique_ptr<DataEncoder> encoder_;
int fd_ = -1;
bool ran_out_of_space_ = false;
@@ -213,8 +198,7 @@
// Returns the size that the packed message from PackMessage or
// PackMessageInline will be.
-flatbuffers::uoffset_t PackMessageSize(LogType log_type,
- const Context &context);
+flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size);
// Packs the provided message pointed to by context into the provided buffer.
// This is equivalent to PackMessage, but doesn't require allocating a
diff --git a/aos/events/logging/logfile_utils_out_of_space_test_runner.cc b/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
index b08bebb..afc4d77 100644
--- a/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
+++ b/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
@@ -15,10 +15,12 @@
FLAGS_flush_size = 1;
CHECK(!FLAGS_tmpfs.empty()) << ": Must specify a tmpfs location";
- aos::logger::DetachedBufferWriter writer(
- FLAGS_tmpfs + "/file", std::make_unique<aos::logger::DummyEncoder>());
std::array<uint8_t, 10240> data;
data.fill(0);
+
+ aos::logger::DetachedBufferWriter writer(
+ FLAGS_tmpfs + "/file",
+ std::make_unique<aos::logger::DummyEncoder>(data.size()));
for (int i = 0; i < 8; ++i) {
writer.QueueSpan(data);
CHECK(!writer.ran_out_of_space()) << ": " << i;
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index c8d16e0..3d99757 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -31,8 +31,11 @@
// test only boilerplate to DetachedBufferWriter.
class TestDetachedBufferWriter : public DetachedBufferWriter {
public:
+ // Pick a max size that is rather conservative.
+ static constexpr size_t kMaxMessageSize = 128 * 1024;
TestDetachedBufferWriter(std::string_view filename)
- : DetachedBufferWriter(filename, std::make_unique<DummyEncoder>()) {}
+ : DetachedBufferWriter(filename,
+ std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
}
@@ -2983,12 +2986,13 @@
fbb.GetBufferSpan().size()));
// Make sure that both the builder and inline method agree on sizes.
- ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context))
+ ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
<< "log type " << static_cast<int>(type);
// Initialize the buffer to something nonzero to make sure all the padding
// bytes are set to 0.
- std::vector<uint8_t> repacked_message(PackMessageSize(type, context), 67);
+ std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
+ 67);
// And verify packing inline works as expected.
EXPECT_EQ(repacked_message.size(),
diff --git a/aos/events/logging/logger_main.cc b/aos/events/logging/logger_main.cc
index 526eb31..81b4e3f 100644
--- a/aos/events/logging/logger_main.cc
+++ b/aos/events/logging/logger_main.cc
@@ -3,6 +3,9 @@
#include "aos/configuration.h"
#include "aos/events/logging/log_writer.h"
+#ifdef LZMA
+#include "aos/events/logging/lzma_encoder.h"
+#endif
#include "aos/events/logging/snappy_encoder.h"
#include "aos/events/shm_event_loop.h"
#include "aos/init.h"
@@ -18,9 +21,17 @@
DEFINE_bool(snappy_compress, false, "If true, compress log data using snappy.");
+#ifdef LZMA
+DEFINE_bool(xz_compress, false, "If true, compress log data using xz.");
+#endif
+
DEFINE_double(rotate_every, 0.0,
"If set, rotate the logger after this many seconds");
+#ifdef LZMA
+DEFINE_int32(xz_compression_level, 9, "Compression level for the LZMA Encoder");
+#endif
+
int main(int argc, char *argv[]) {
gflags::SetUsageMessage(
"This program provides a simple logger binary that logs all SHMEM data "
@@ -41,8 +52,17 @@
if (FLAGS_snappy_compress) {
log_namer->set_extension(aos::logger::SnappyDecoder::kExtension);
- log_namer->set_encoder_factory(
- []() { return std::make_unique<aos::logger::SnappyEncoder>(); });
+ log_namer->set_encoder_factory([](size_t max_message_size) {
+ return std::make_unique<aos::logger::SnappyEncoder>(max_message_size);
+ });
+#ifdef LZMA
+ } else if (FLAGS_xz_compress) {
+ log_namer->set_extension(aos::logger::LzmaEncoder::kExtension);
+ log_namer->set_encoder_factory([](size_t max_message_size) {
+ return std::make_unique<aos::logger::LzmaEncoder>(
+ max_message_size, FLAGS_xz_compression_level);
+ });
+#endif
}
aos::monotonic_clock::time_point last_rotation_time =
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index c5b1768..cf5ec32 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -540,7 +540,8 @@
struct CompressionParams {
std::string_view extension;
- std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory;
+ std::function<std::unique_ptr<DataEncoder>(size_t max_message_size)>
+ encoder_factory;
};
std::ostream &operator<<(std::ostream &ostream,
@@ -550,12 +551,19 @@
}
std::vector<CompressionParams> SupportedCompressionAlgorithms() {
- return {{"", []() { return std::make_unique<DummyEncoder>(); }},
+ return {{"",
+ [](size_t max_message_size) {
+ return std::make_unique<DummyEncoder>(max_message_size);
+ }},
{SnappyDecoder::kExtension,
- []() { return std::make_unique<SnappyEncoder>(); }},
+ [](size_t max_message_size) {
+ return std::make_unique<SnappyEncoder>(max_message_size);
+ }},
#ifdef LZMA
{LzmaDecoder::kExtension,
- []() { return std::make_unique<LzmaEncoder>(3); }}
+ [](size_t max_message_size) {
+ return std::make_unique<LzmaEncoder>(max_message_size, 3);
+ }}
#endif // LZMA
};
}
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
index 5cb9897..f583818 100644
--- a/aos/events/logging/lzma_encoder.cc
+++ b/aos/events/logging/lzma_encoder.cc
@@ -50,7 +50,8 @@
} // namespace
-LzmaEncoder::LzmaEncoder(const uint32_t compression_preset, size_t block_size)
+LzmaEncoder::LzmaEncoder(size_t max_message_size,
+ const uint32_t compression_preset, size_t block_size)
: stream_(LZMA_STREAM_INIT), compression_preset_(compression_preset) {
CHECK_GE(compression_preset_, 0u)
<< ": Compression preset must be in the range [0, 9].";
@@ -78,15 +79,26 @@
stream_.avail_out = 0;
VLOG(2) << "LzmaEncoder: Initialization succeeded.";
+
+ // TODO(austin): We don't write the biggest messages very often. Is it more
+ // efficient to allocate if we go over a threshold to keep the static memory
+ // in use smaller, or just allocate the worst case like we are doing here?
+ input_buffer_.resize(max_message_size);
}
LzmaEncoder::~LzmaEncoder() { lzma_end(&stream_); }
-void LzmaEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
- CHECK(in.data()) << ": Encode called with nullptr.";
+void LzmaEncoder::Encode(Copier *copy) {
+ const size_t copy_size = copy->size();
+ // LZMA compresses the data as it goes along, copying the compressed results
+ // into another buffer. So, there's no need to store more than one message
+ // since lzma is going to take it from here.
+ CHECK_LE(copy_size, input_buffer_.size());
- stream_.next_in = in.data();
- stream_.avail_in = in.size();
+ CHECK_EQ(copy->Copy(input_buffer_.data()), copy_size);
+
+ stream_.next_in = input_buffer_.data();
+ stream_.avail_in = copy_size;
RunLzmaCode(LZMA_RUN);
}
@@ -103,20 +115,21 @@
}
}
-std::vector<absl::Span<const uint8_t>> LzmaEncoder::queue() const {
- std::vector<absl::Span<const uint8_t>> queue;
+absl::Span<const absl::Span<const uint8_t>> LzmaEncoder::queue() {
+ return_queue_.clear();
if (queue_.empty()) {
- return queue;
+ return return_queue_;
}
+ return_queue_.reserve(queue_.size());
for (size_t i = 0; i < queue_.size() - 1; ++i) {
- queue.emplace_back(
+ return_queue_.emplace_back(
absl::MakeConstSpan(queue_.at(i).data(), queue_.at(i).size()));
}
// For the last buffer in the queue, we must account for the possibility that
// the buffer isn't full yet.
- queue.emplace_back(absl::MakeConstSpan(
+ return_queue_.emplace_back(absl::MakeConstSpan(
queue_.back().data(), queue_.back().size() - stream_.avail_out));
- return queue;
+ return return_queue_;
}
size_t LzmaEncoder::queued_bytes() const {
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
index 14c00eb..bbd739a 100644
--- a/aos/events/logging/lzma_encoder.h
+++ b/aos/events/logging/lzma_encoder.h
@@ -16,12 +16,15 @@
namespace aos::logger {
// Encodes buffers using liblzma.
-class LzmaEncoder final : public DetachedBufferEncoder {
+class LzmaEncoder final : public DataEncoder {
public:
+ static constexpr std::string_view kExtension = ".xz";
+
// Initializes the LZMA stream and encoder. The block size is the block size
// used by the multithreaded encoder for batching. A block size of 0 tells
// lzma to pick it's favorite block size.
- explicit LzmaEncoder(uint32_t compression_preset, size_t block_size = 0);
+ explicit LzmaEncoder(size_t max_message_size, uint32_t compression_preset,
+ size_t block_size = 0);
LzmaEncoder(const LzmaEncoder &) = delete;
LzmaEncoder(LzmaEncoder &&other) = delete;
LzmaEncoder &operator=(const LzmaEncoder &) = delete;
@@ -29,16 +32,21 @@
// Gracefully shuts down the encoder.
~LzmaEncoder() final;
- void Encode(flatbuffers::DetachedBuffer &&in) final;
+ bool HasSpace(size_t /*request*/) const override {
+ // Since the underlying lzma encoder handles buffering, we always have
+ // space.
+ return true;
+ }
+ void Encode(Copier *copy) final;
void Finish() final;
void Clear(int n) final;
- std::vector<absl::Span<const uint8_t>> queue() const final;
+ absl::Span<const absl::Span<const uint8_t>> queue() final;
size_t queued_bytes() const final;
size_t total_bytes() const final { return total_bytes_; }
size_t queue_size() const final { return queue_.size(); }
private:
- static constexpr size_t kEncodedBufferSizeBytes{4096 * 10};
+ static constexpr size_t kEncodedBufferSizeBytes{1024 * 128};
void RunLzmaCode(lzma_action action);
@@ -49,6 +57,11 @@
// Total bytes that resulted from encoding raw data since the last call to
// Reset.
size_t total_bytes_ = 0;
+
+ // Buffer that messages get coppied into for encoding.
+ ResizeableBuffer input_buffer_;
+
+ std::vector<absl::Span<const uint8_t>> return_queue_;
};
// Decompresses data with liblzma.
diff --git a/aos/events/logging/lzma_encoder_test.cc b/aos/events/logging/lzma_encoder_test.cc
index 1b8c895..1e814ff 100644
--- a/aos/events/logging/lzma_encoder_test.cc
+++ b/aos/events/logging/lzma_encoder_test.cc
@@ -11,9 +11,10 @@
INSTANTIATE_TEST_SUITE_P(
MtLzma, BufferEncoderTest,
- ::testing::Combine(::testing::Values([]() {
+ ::testing::Combine(::testing::Values([](size_t max_message_size) {
FLAGS_lzma_threads = 3;
- return std::make_unique<LzmaEncoder>(2, 4096);
+ return std::make_unique<LzmaEncoder>(max_message_size,
+ 2, 4096);
}),
::testing::Values([](std::string_view filename) {
return std::make_unique<LzmaDecoder>(filename);
@@ -22,9 +23,10 @@
INSTANTIATE_TEST_SUITE_P(
MtLzmaThreaded, BufferEncoderTest,
- ::testing::Combine(::testing::Values([]() {
+ ::testing::Combine(::testing::Values([](size_t max_message_size) {
FLAGS_lzma_threads = 3;
- return std::make_unique<LzmaEncoder>(5, 4096);
+ return std::make_unique<LzmaEncoder>(max_message_size,
+ 5, 4096);
}),
::testing::Values([](std::string_view filename) {
return std::make_unique<ThreadedLzmaDecoder>(filename);
@@ -33,9 +35,10 @@
INSTANTIATE_TEST_SUITE_P(
Lzma, BufferEncoderTest,
- ::testing::Combine(::testing::Values([]() {
+ ::testing::Combine(::testing::Values([](size_t max_message_size) {
FLAGS_lzma_threads = 1;
- return std::make_unique<LzmaEncoder>(2, 4096);
+ return std::make_unique<LzmaEncoder>(max_message_size,
+ 2, 4096);
}),
::testing::Values([](std::string_view filename) {
return std::make_unique<LzmaDecoder>(filename);
@@ -44,9 +47,10 @@
INSTANTIATE_TEST_SUITE_P(
LzmaThreaded, BufferEncoderTest,
- ::testing::Combine(::testing::Values([]() {
+ ::testing::Combine(::testing::Values([](size_t max_message_size) {
FLAGS_lzma_threads = 1;
- return std::make_unique<LzmaEncoder>(5, 4096);
+ return std::make_unique<LzmaEncoder>(max_message_size,
+ 5, 4096);
}),
::testing::Values([](std::string_view filename) {
return std::make_unique<ThreadedLzmaDecoder>(filename);
@@ -63,7 +67,8 @@
std::vector<std::vector<uint8_t>> encoded_buffers;
{
const int encode_chunks = quantity_distribution(*random_number_generator());
- const auto encoder = std::make_unique<LzmaEncoder>(2);
+ const auto encoder = std::make_unique<LzmaEncoder>(
+ BufferEncoderBaseTest::kMaxMessageSize, 2);
encoded_buffers = CreateAndEncode(encode_chunks, encoder.get());
encoder->Finish();
diff --git a/aos/events/logging/snappy_encoder.cc b/aos/events/logging/snappy_encoder.cc
index 19011da..2ef8363 100644
--- a/aos/events/logging/snappy_encoder.cc
+++ b/aos/events/logging/snappy_encoder.cc
@@ -29,7 +29,11 @@
}
} // namespace
-SnappyEncoder::SnappyEncoder(size_t chunk_size) : chunk_size_(chunk_size) {
+SnappyEncoder::SnappyEncoder(size_t max_message_size, size_t chunk_size)
+ : chunk_size_(chunk_size),
+ // Make sure we have enough space to always add our largest message to the
+ // chunks we will flush.
+ buffer_source_(max_message_size + chunk_size - 1) {
queue_.emplace_back();
queue_.back().resize(kSnappyIdentifierChunk.size());
std::copy(kSnappyIdentifierChunk.begin(), kSnappyIdentifierChunk.end(),
@@ -39,10 +43,8 @@
void SnappyEncoder::Finish() { EncodeCurrentBuffer(); }
-void SnappyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
- accumulated_checksum_ =
- AccumulateCrc32({in.data(), in.size()}, accumulated_checksum_);
- buffer_source_.AppendBuffer(std::move(in));
+void SnappyEncoder::Encode(Copier *copy) {
+ buffer_source_.Append(copy);
if (buffer_source_.Available() >= chunk_size_) {
EncodeCurrentBuffer();
@@ -54,7 +56,7 @@
return;
}
const uint32_t uncompressed_checksum =
- MaskChecksum(accumulated_checksum_.value());
+ MaskChecksum(buffer_source_.accumulated_checksum());
queue_.emplace_back();
constexpr size_t kPrefixSize = 8;
queue_.back().resize(kPrefixSize +
@@ -79,7 +81,7 @@
total_bytes_ += queue_.back().size();
- accumulated_checksum_.reset();
+ buffer_source_.ResetAccumulatedChecksum();
CHECK_EQ(0u, buffer_source_.Available());
}
@@ -97,27 +99,26 @@
return bytes;
}
-std::vector<absl::Span<const uint8_t>> SnappyEncoder::queue() const {
- std::vector<absl::Span<const uint8_t>> queue;
- queue.reserve(queue_.size());
+absl::Span<const absl::Span<const uint8_t>> SnappyEncoder::queue() {
+ return_queue_.clear();
+ return_queue_.reserve(queue_.size());
for (const auto &buffer : queue_) {
- queue.emplace_back(buffer.data(), buffer.size());
+ return_queue_.emplace_back(buffer.data(), buffer.size());
}
- return queue;
+ return return_queue_;
+}
+
+SnappyEncoder::DetachedBufferSource::DetachedBufferSource(size_t buffer_size) {
+ data_.reserve(buffer_size);
}
size_t SnappyEncoder::DetachedBufferSource::Available() const {
- size_t total_available = 0;
- for (const flatbuffers::DetachedBuffer &buffer : buffers_) {
- total_available += buffer.size();
- }
- return total_available;
+ return data_.size() - index_into_first_buffer_;
}
const char *SnappyEncoder::DetachedBufferSource::Peek(size_t *length) {
- *CHECK_NOTNULL(length) = buffers_[0].size() - index_into_first_buffer_;
- return reinterpret_cast<char *>(buffers_[0].data()) +
- index_into_first_buffer_;
+ *CHECK_NOTNULL(length) = data_.size() - index_into_first_buffer_;
+ return reinterpret_cast<char *>(data_.data()) + index_into_first_buffer_;
}
void SnappyEncoder::DetachedBufferSource::Skip(size_t n) {
@@ -125,20 +126,25 @@
return;
}
- CHECK(!buffers_.empty());
+ CHECK_NE(data_.size(), 0u);
index_into_first_buffer_ += n;
- CHECK_LE(index_into_first_buffer_, buffers_[0].size())
+ CHECK_LE(index_into_first_buffer_, data_.size())
<< ": " << n << " is too large a skip.";
- if (index_into_first_buffer_ == buffers_[0].size()) {
- buffers_.erase(buffers_.begin());
+ if (index_into_first_buffer_ == data_.size()) {
+ data_.resize(0u);
index_into_first_buffer_ = 0;
}
}
-void SnappyEncoder::DetachedBufferSource::AppendBuffer(
- flatbuffers::DetachedBuffer &&buffer) {
- buffers_.emplace_back(std::move(buffer));
+void SnappyEncoder::DetachedBufferSource::Append(Copier *copy) {
+ const size_t copy_size = copy->size();
+ CHECK_LE(copy_size + data_.size(), data_.capacity());
+ size_t starting_size = data_.size();
+ data_.resize(starting_size + copy_size);
+ CHECK_EQ(copy->Copy(data_.data() + starting_size), copy_size);
+ accumulated_checksum_ = AccumulateCrc32(
+ {data_.data() + starting_size, copy_size}, accumulated_checksum_);
}
size_t SnappyDecoder::Read(uint8_t *begin, uint8_t *end) {
diff --git a/aos/events/logging/snappy_encoder.h b/aos/events/logging/snappy_encoder.h
index 6b39c21..d3edbe5 100644
--- a/aos/events/logging/snappy_encoder.h
+++ b/aos/events/logging/snappy_encoder.h
@@ -14,29 +14,42 @@
namespace aos::logger {
// Encodes buffers using snappy.
-class SnappyEncoder final : public DetachedBufferEncoder {
+class SnappyEncoder final : public DataEncoder {
public:
- explicit SnappyEncoder(size_t chunk_size = 32768);
+ explicit SnappyEncoder(size_t max_message_size, size_t chunk_size = 32768);
- void Encode(flatbuffers::DetachedBuffer &&in) final;
+ void Encode(Copier *copy) final;
+
void Finish() final;
void Clear(int n) final;
- std::vector<absl::Span<const uint8_t>> queue() const final;
+ absl::Span<const absl::Span<const uint8_t>> queue() final;
size_t queued_bytes() const final;
+ bool HasSpace(size_t /*request*/) const final {
+ // Since the output always mallocs space, we have infinite output space.
+ return true;
+ }
size_t total_bytes() const final { return total_bytes_; }
size_t queue_size() const final { return queue_.size(); }
private:
class DetachedBufferSource : public snappy::Source {
public:
+ DetachedBufferSource(size_t buffer_size);
size_t Available() const final;
const char *Peek(size_t *length) final;
void Skip(size_t n) final;
- void AppendBuffer(flatbuffers::DetachedBuffer &&buffer);
+ void Append(Copier *copy);
+
+ uint32_t accumulated_checksum() const {
+ return accumulated_checksum_.value();
+ }
+
+ void ResetAccumulatedChecksum() { accumulated_checksum_.reset(); }
private:
- std::vector<flatbuffers::DetachedBuffer> buffers_;
+ ResizeableBuffer data_;
size_t index_into_first_buffer_ = 0;
+ std::optional<uint32_t> accumulated_checksum_;
};
// Flushes buffer_source_ and stores the compressed buffer in queue_.
@@ -55,8 +68,9 @@
// Clear() to be written to disk.
const size_t chunk_size_;
DetachedBufferSource buffer_source_;
- std::optional<uint32_t> accumulated_checksum_;
std::vector<ResizeableBuffer> queue_;
+
+ std::vector<absl::Span<const uint8_t>> return_queue_;
size_t total_bytes_ = 0;
};
diff --git a/aos/events/logging/snappy_encoder_test.cc b/aos/events/logging/snappy_encoder_test.cc
index 3334602..7e61f3f 100644
--- a/aos/events/logging/snappy_encoder_test.cc
+++ b/aos/events/logging/snappy_encoder_test.cc
@@ -9,8 +9,8 @@
INSTANTIATE_TEST_SUITE_P(
Snappy, BufferEncoderTest,
- ::testing::Combine(::testing::Values([]() {
- return std::make_unique<SnappyEncoder>();
+ ::testing::Combine(::testing::Values([](size_t max_message_size) {
+ return std::make_unique<SnappyEncoder>(max_message_size);
}),
::testing::Values([](std::string_view filename) {
return std::make_unique<SnappyDecoder>(filename);
diff --git a/aos/events/pingpong.json b/aos/events/pingpong.json
index 64fb27f..48bd2ec 100644
--- a/aos/events/pingpong.json
+++ b/aos/events/pingpong.json
@@ -1,10 +1,14 @@
{
"applications": [
{
- "name": "ping"
+ "name": "ping",
+ "executable_name": "./ping",
+ "args": ["--config=aos_config.json"]
},
{
- "name": "pong"
+ "name": "pong",
+ "executable_name": "./pong",
+ "args": ["--config=aos_config.json"]
}
],
"channels": [
@@ -22,4 +26,4 @@
"imports": [
"aos.json"
]
-}
\ No newline at end of file
+}
diff --git a/aos/flatbuffer_merge.h b/aos/flatbuffer_merge.h
index 61dbf6b..4d2d395 100644
--- a/aos/flatbuffer_merge.h
+++ b/aos/flatbuffer_merge.h
@@ -12,7 +12,7 @@
// Merges 2 flat buffers with the provided type table into the builder. Returns
// the offset to the flatbuffers.
// One or both of t1 and t2 must be non-null. If one is null, this method
-// coppies instead of merging.
+// copies instead of merging.
flatbuffers::Offset<flatbuffers::Table> MergeFlatBuffers(
const flatbuffers::TypeTable *typetable, const flatbuffers::Table *t1,
const flatbuffers::Table *t2, flatbuffers::FlatBufferBuilder *fbb);
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index a88e801..8c48fa1 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -681,7 +681,7 @@
size_t /*iteration*/) {
// TODO(austin): Can we take a problem directly without
// AddConstraintSlackVariable being called on it and fill everything in?
- // That'll avoid a bunch of allocations and coppies of rather sizable
+ // That'll avoid a bunch of allocations and copies of rather sizable
// matricies.
//
// I think I want to do it as another review after everything works with the
diff --git a/aos/starter/BUILD b/aos/starter/BUILD
index 5bf3811..d4566a0 100644
--- a/aos/starter/BUILD
+++ b/aos/starter/BUILD
@@ -188,12 +188,16 @@
"$(rootpath //aos/events:ping)",
"$(rootpath //aos/events:pong)",
"$(rootpath :aos_starter)",
+ "$(rootpath //aos:aos_dump)",
+ "$(rootpath //aos/events/logging:logger_main)",
],
data = [
":aos_starter",
":starterd",
+ "//aos:aos_dump",
"//aos/events:ping",
"//aos/events:pingpong_config",
"//aos/events:pong",
+ "//aos/events/logging:logger_main",
],
)
diff --git a/aos/starter/starter_demo.py b/aos/starter/starter_demo.py
index f8fa520..5a50890 100755
--- a/aos/starter/starter_demo.py
+++ b/aos/starter/starter_demo.py
@@ -24,6 +24,8 @@
parser.add_argument("ping", help="Location of ping")
parser.add_argument("pong", help="Location of pong")
parser.add_argument("starter_cmd", help="Location of starter_cmd")
+ parser.add_argument("aos_dump", help="Location of aos_dump")
+ parser.add_argument("logger_main", help="Location of logger_main")
args = parser.parse_args()
# Copy all the interesting files into a temporary directory and run
@@ -38,6 +40,8 @@
shutil.copy(args.ping, tmpdir + "/ping")
shutil.copy(args.pong, tmpdir + "/pong")
shutil.copy(args.starter_cmd, tmpdir + "/starter_cmd")
+ shutil.copy(args.aos_dump, tmpdir + "/aos_dump")
+ shutil.copy(args.logger_main, tmpdir + "/logger_main")
print(f"Running starter from {tmpdir}")
print(f"\n\nTo run starter_cmd, do:\ncd {tmpdir}\n./starter_cmd\n\n")