Add support for rotated log files
We want to be able to chunk log files. This adds read support for
chunking. Write support is only added to enable testing, and needs more
configuration to be useful.
Change-Id: I0c7bc9980438a934ac0f2692ef06caf2605bbc88
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 714796e..e55b80d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -10,6 +10,7 @@
#include "aos/configuration.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/flatbuffer_merge.h"
#include "flatbuffers/flatbuffers.h"
#include "gflags/gflags.h"
#include "glog/logging.h"
@@ -235,13 +236,45 @@
return result;
}
-SortedMessageReader::SortedMessageReader(std::string_view filename)
- : message_reader_(filename) {
+SortedMessageReader::SortedMessageReader(
+ const std::vector<std::string> &filenames)
+ : filenames_(filenames),
+ log_file_header_(FlatbufferDetachedBuffer<LogFileHeader>::Empty()) {
+ CHECK(NextLogFile()) << ": filenames is empty. Need files to read.";
+
+ log_file_header_ = CopyFlatBuffer(message_reader_->log_file_header());
+
channels_.resize(configuration()->channels()->size());
QueueMessages();
}
+bool SortedMessageReader::NextLogFile() {
+ if (next_filename_index_ == filenames_.size()) {
+ return false;
+ }
+ message_reader_ =
+ std::make_unique<MessageReader>(filenames_[next_filename_index_]);
+
+ // We can't support the config diverging between two log file headers. See if
+ // they are the same.
+ if (next_filename_index_ != 0) {
+ // Since we copied before, we need to copy again to guarantee that things
+ // didn't get re-ordered.
+ const FlatbufferDetachedBuffer<LogFileHeader> new_log_file_header =
+ CopyFlatBuffer(message_reader_->log_file_header());
+ CHECK_EQ(new_log_file_header.size(), log_file_header_.size());
+ CHECK(memcmp(new_log_file_header.data(), log_file_header_.data(),
+ log_file_header_.size()) == 0)
+ << ": Header is different between log file chunks "
+ << filenames_[next_filename_index_] << " and "
+ << filenames_[next_filename_index_ - 1] << ", this is not supported.";
+ }
+
+ ++next_filename_index_;
+ return true;
+}
+
void SortedMessageReader::EmplaceDataBack(
FlatbufferVector<MessageHeader> &&new_data) {
const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
@@ -289,17 +322,19 @@
// Those messages might be very old. Make sure to read a chunk past the
// starting time.
if (channel_heap_.size() > 0 &&
- message_reader_.newest_timestamp() >
+ message_reader_->newest_timestamp() >
std::max(oldest_message().first, monotonic_start_time()) +
- message_reader_.max_out_of_order_duration()) {
+ message_reader_->max_out_of_order_duration()) {
break;
}
if (std::optional<FlatbufferVector<MessageHeader>> msg =
- message_reader_.ReadMessage()) {
+ message_reader_->ReadMessage()) {
EmplaceDataBack(std::move(msg.value()));
} else {
- break;
+ if (!NextLogFile()) {
+ break;
+ }
}
}
}
@@ -328,7 +363,7 @@
channel.oldest_timestamp = monotonic_clock::min_time;
}
- if (oldest_channel_data.first > message_reader_.queue_data_time()) {
+ if (oldest_channel_data.first > message_reader_->queue_data_time()) {
QueueMessages();
}