Merge "Bound reconnect time for message_bridge_client"
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 02a9fdd..58c0fc3 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -237,7 +237,8 @@
}
iovec_.clear();
- size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
+ const size_t original_iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
+ size_t iovec_size = original_iovec_size;
iovec_.resize(iovec_size);
size_t counted_size = 0;
@@ -247,7 +248,7 @@
// The file is aligned if it is a multiple of kSector in length. The data is
// aligned if it's memory is kSector aligned, and the length is a multiple of
// kSector in length.
- bool aligned = (total_write_bytes_ % kSector) == 0;
+ bool aligned = (file_written_bytes_ % kSector) == 0;
size_t write_index = 0;
for (size_t i = 0; i < iovec_size; ++i) {
iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
@@ -295,11 +296,13 @@
++write_index;
}
- // Either write the aligned data if it is all aligned, or write the rest
- // unaligned if we wrote aligned up above.
- WriteV(iovec_.data(), iovec_.size(), aligned, counted_size);
+ if (counted_size > 0) {
+ // Either write the aligned data if it is all aligned, or write the rest
+ // unaligned if we wrote aligned up above.
+ WriteV(iovec_.data(), iovec_.size(), aligned, counted_size);
- encoder_->Clear(iovec_size);
+ encoder_->Clear(original_iovec_size);
+ }
}
size_t DetachedBufferWriter::WriteV(struct iovec *iovec_data, size_t iovec_size,
@@ -317,21 +320,21 @@
if (written > 0) {
// Flush asynchronously and force the data out of the cache.
- sync_file_range(fd_, total_write_bytes_, written, SYNC_FILE_RANGE_WRITE);
- if (last_synced_bytes_ != 0) {
+ sync_file_range(fd_, file_written_bytes_, written, SYNC_FILE_RANGE_WRITE);
+ if (file_written_bytes_ != 0) {
// Per Linus' recommendation online on how to do fast file IO, do a
// blocking flush of the previous write chunk, and then tell the kernel to
// drop the pages from the cache. This makes sure we can't get too far
// ahead.
sync_file_range(fd_, last_synced_bytes_,
- total_write_bytes_ - last_synced_bytes_,
+ file_written_bytes_ - last_synced_bytes_,
SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
SYNC_FILE_RANGE_WAIT_AFTER);
posix_fadvise(fd_, last_synced_bytes_,
- total_write_bytes_ - last_synced_bytes_,
+ file_written_bytes_ - last_synced_bytes_,
POSIX_FADV_DONTNEED);
- last_synced_bytes_ = total_write_bytes_;
+ last_synced_bytes_ = file_written_bytes_;
}
}
@@ -371,6 +374,7 @@
++total_write_count_;
total_write_messages_ += iovec_size;
total_write_bytes_ += written;
+ file_written_bytes_ += written;
}
void DetachedBufferWriter::FlushAtThreshold(
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 50f6b40..0c0ef7f 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -212,6 +212,8 @@
bool supports_odirect_ = true;
int flags_ = 0;
+ size_t file_written_bytes_ = 0;
+
aos::monotonic_clock::time_point last_flush_time_ =
aos::monotonic_clock::min_time;
};
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
index f354638..d348b70 100644
--- a/aos/events/logging/lzma_encoder.cc
+++ b/aos/events/logging/lzma_encoder.cc
@@ -84,6 +84,13 @@
// efficient to allocate if we go over a threshold to keep the static memory
// in use smaller, or just allocate the worst case like we are doing here?
input_buffer_.resize(max_message_size);
+
+ // Start our queues out with a reasonable space allocation. The cost of this
+ // is negligable compared to the buffer cost, but removes a bunch of
+ // allocations at runtime.
+ queue_.reserve(4);
+ free_queue_.reserve(4);
+ return_queue_.reserve(4);
}
LzmaEncoder::~LzmaEncoder() { lzma_end(&stream_); }
@@ -111,6 +118,9 @@
void LzmaEncoder::Clear(const int n) {
CHECK_GE(n, 0);
CHECK_LE(static_cast<size_t>(n), queue_size());
+ for (int i = 0; i < n; ++i) {
+ free_queue_.emplace_back(std::move(queue_[i]));
+ }
queue_.erase(queue_.begin(), queue_.begin() + n);
if (queue_.empty()) {
stream_.next_out = nullptr;
@@ -155,8 +165,13 @@
// construction or a Reset, or when an input buffer is large enough to fill
// more than one output buffer.
if (stream_.avail_out == 0) {
- queue_.emplace_back();
- queue_.back().resize(kEncodedBufferSizeBytes);
+ if (!free_queue_.empty()) {
+ queue_.emplace_back(std::move(free_queue_.back()));
+ free_queue_.pop_back();
+ } else {
+ queue_.emplace_back();
+ queue_.back().resize(kEncodedBufferSizeBytes);
+ }
stream_.next_out = queue_.back().data();
stream_.avail_out = kEncodedBufferSizeBytes;
// Update the byte count.
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
index b4964fb..3136d93 100644
--- a/aos/events/logging/lzma_encoder.h
+++ b/aos/events/logging/lzma_encoder.h
@@ -54,6 +54,11 @@
lzma_stream stream_;
uint32_t compression_preset_;
std::vector<ResizeableBuffer> queue_;
+ // Since we pretty much just allocate a couple of buffers, then allocate and
+ // release them over and over with very similar memory usage and without much
+ // variation in the peak usage, put the allocate chunks in a free queue to
+ // reduce fragmentation.
+ std::vector<ResizeableBuffer> free_queue_;
bool finished_ = false;
// Total bytes that resulted from encoding raw data since the last call to
// Reset.