Make Copier and Encoder support writing partial messages
Along with changes to DummyEncoder, this gets us nicely setup to create
nice round buffer sizes for both DummyEncoder and LzmaEncoder. Those
can be written using O_DIRECT in a follow on commit for much more
efficient IO.
To do this, we need to introduce both start and stop bytes to Copier,
and then teach Encoder how to return what it was able to encode, and
then to restart part way through a message.
--flush_size now sets the buffer size.
Change-Id: I7a26d2ce677a28bd0e73333514302dc0612195c2
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 0918545..86c89a8 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -129,7 +129,7 @@
return *this;
}
-void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *coppier,
+void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *copier,
aos::monotonic_clock::time_point now) {
if (ran_out_of_space_) {
// We don't want any later data to be written after space becomes
@@ -138,12 +138,23 @@
return;
}
- if (!encoder_->HasSpace(coppier->size())) {
- Flush();
- CHECK(encoder_->HasSpace(coppier->size()));
- }
+ const size_t message_size = copier->size();
+ size_t overall_bytes_written = 0;
- encoder_->Encode(coppier);
+ // Keep writing chunks until we've written it all. If we end up with a
+ // partial write, this means we need to flush to disk.
+ do {
+ const size_t bytes_written = encoder_->Encode(copier, overall_bytes_written);
+ CHECK(bytes_written != 0);
+
+ overall_bytes_written += bytes_written;
+ if (overall_bytes_written < message_size) {
+ VLOG(1) << "Flushing because of a partial write, tried to write "
+ << message_size << " wrote " << overall_bytes_written;
+ Flush(now);
+ }
+ } while (overall_bytes_written < message_size);
+
FlushAtThreshold(now);
}
@@ -153,7 +164,7 @@
}
encoder_->Finish();
while (encoder_->queue_size() > 0) {
- Flush();
+ Flush(monotonic_clock::max_time);
}
if (close(fd_) == -1) {
if (errno == ENOSPC) {
@@ -166,7 +177,8 @@
VLOG(1) << "Closed " << filename();
}
-void DetachedBufferWriter::Flush() {
+void DetachedBufferWriter::Flush(aos::monotonic_clock::time_point now) {
+ last_flush_time_ = now;
if (ran_out_of_space_) {
// We don't want any later data to be written after space becomes available,
// so refuse to write anything more once we've dropped data because we ran
@@ -260,13 +272,14 @@
// Flush if we are at the max number of iovs per writev, because there's no
// point queueing up any more data in memory. Also flush once we have enough
// data queued up or if it has been long enough.
- while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
+ while (encoder_->space() == 0 ||
+ encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
encoder_->queue_size() >= IOV_MAX ||
now > last_flush_time_ +
chrono::duration_cast<chrono::nanoseconds>(
chrono::duration<double>(FLAGS_flush_period))) {
- last_flush_time_ = now;
- Flush();
+ VLOG(1) << "Chose to flush at " << now << ", last " << last_flush_time_;
+ Flush(now);
}
}