Move reboot and header responsibility into DataWriter
Now that we have a class wrapped around the raw flatbuffer writing to
disk, we can start to make it responsible for tracking and writing the
header. It can also track reboots. Each message that is to be written
now needs to be passed in with the corresponding boot UUID.
This removes a ton of code in LogWriter since it is simpler to do it in
DataWriter. We don't need to detect state and then trigger the right
behavior nearly as much because the bottom layers track it very easily
now and just work.
Side effects here of delaying writing a bunch of stuff is that we
shouldn't write headers with no body because we delay writing the header
until we know the body. This would potentially be a performance problem
with inline configuration, but with the separate configuration files,
this isn't a problem.
Change-Id: Iec91284fcceeb73e3b6c181aebd2fb7edc7bc1ac
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 71c5750..580a126 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -254,11 +254,6 @@
}
}
- CHECK(node_state_.empty());
- node_state_.resize(configuration::MultiNode(configuration_)
- ? configuration_->nodes()->size()
- : 1u);
-
log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
const aos::monotonic_clock::time_point beginning_time =
@@ -278,7 +273,7 @@
}
// Clear out any old timestamps in case we are re-starting logging.
- for (size_t i = 0; i < node_state_.size(); ++i) {
+ for (size_t i = 0; i < configuration::NodesCount(configuration_); ++i) {
log_namer_->SetStartTimes(
i, monotonic_clock::min_time, realtime_clock::min_time,
monotonic_clock::min_time, realtime_clock::min_time);
@@ -330,7 +325,6 @@
f.timestamp_writer = nullptr;
f.contents_writer = nullptr;
}
- node_state_.clear();
log_event_uuid_ = UUID::Zero();
log_start_uuid_ = std::nullopt;
@@ -358,54 +352,9 @@
const int node_index = configuration::GetNodeIndex(configuration_, node);
MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
realtime_start_time);
- MaybeWriteHeader(node_index, node);
}
}
-void Logger::MaybeWriteHeader(int node_index) {
- if (configuration::MultiNode(configuration_)) {
- return MaybeWriteHeader(node_index,
- configuration_->nodes()->Get(node_index));
- } else {
- return MaybeWriteHeader(node_index, nullptr);
- }
-}
-
-void Logger::MaybeWriteHeader(int node_index, const Node *node) {
- // This function is responsible for writing the header when the header both
- // has valid data, and when it needs to be written.
- if (node_state_[node_index].header_written &&
- node_state_[node_index].header_valid) {
- // The header has been written and is valid, nothing to do.
- return;
- }
- if (!node_state_[node_index].has_source_node_boot_uuid) {
- // Can't write a header if we don't have the boot UUID.
- return;
- }
-
- // WriteHeader writes the first header in a log file. We want to do this only
- // once.
- //
- // Rotate rewrites the same header with a new part ID, but keeps the same part
- // UUID. We don't want that when things reboot, because that implies that
- // parts go together across a reboot.
- //
- // Reboot resets the parts UUID. So, once we've written a header the first
- // time, we want to use Reboot to rotate the log and reset the parts UUID.
- //
- // header_valid is cleared whenever the remote reboots.
- if (node_state_[node_index].header_written) {
- VLOG(1) << "Rebooting";
- log_namer_->Reboot(node);
- } else {
- log_namer_->WriteHeader(node);
-
- node_state_[node_index].header_written = true;
- }
- node_state_[node_index].header_valid = true;
-}
-
void Logger::WriteMissingTimestamps() {
if (configuration::MultiNode(configuration_)) {
server_statistics_fetcher_.Fetch();
@@ -423,12 +372,8 @@
node, node_index,
server_statistics_fetcher_.context().monotonic_event_time,
server_statistics_fetcher_.context().realtime_event_time)) {
- CHECK(node_state_[node_index].header_written);
- CHECK(node_state_[node_index].header_valid);
VLOG(1) << "Rotating because timestamps changed";
log_namer_->Rotate(node);
- } else {
- MaybeWriteHeader(node_index, node);
}
}
}
@@ -448,9 +393,6 @@
log_namer_->SetStartTimes(node_index, monotonic_start_time,
realtime_start_time, monotonic_start_time,
realtime_start_time);
- log_namer_->SetBootUUID(node_index, event_loop_->boot_uuid());
- node_state_[node_index].header_valid = false;
- node_state_[node_index].has_source_node_boot_uuid = true;
return true;
} else if (server_statistics_fetcher_.get() != nullptr) {
// We must be a remote node now. Look for the connection and see if it is
@@ -623,18 +565,10 @@
break;
}
if (f.writer != nullptr) {
- // Only check if the boot UUID has changed if this is data from another
- // node. Our UUID can't change without restarting the application.
- if (our_node_index != f.data_node_index) {
- // And update our boot UUID if the UUID has changed.
- if (log_namer_->SetBootUUID(f.data_node_index,
- f.fetcher->context().source_boot_uuid)) {
- node_state_[f.data_node_index].header_valid = false;
- node_state_[f.data_node_index].has_source_node_boot_uuid = true;
- MaybeWriteHeader(f.data_node_index);
- }
- }
-
+ const UUID source_node_boot_uuid =
+ our_node_index != f.data_node_index
+ ? f.fetcher->context().source_boot_uuid
+ : event_loop_->boot_uuid();
// Write!
const auto start = event_loop_->monotonic_now();
flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
@@ -656,10 +590,7 @@
max_header_size_ = std::max(max_header_size_,
fbb.GetSize() - f.fetcher->context().size);
- CHECK(node_state_[f.data_node_index].header_valid)
- << ": Can't write data before the header on channel "
- << configuration::CleanedChannelToString(f.fetcher->channel());
- f.writer->QueueSizedFlatbuffer(&fbb, end);
+ f.writer->QueueSizedFlatbuffer(&fbb, source_node_boot_uuid, end);
}
if (f.timestamp_writer != nullptr) {
@@ -682,10 +613,11 @@
flatbuffers::GetSizePrefixedRoot<MessageHeader>(
fbb.GetBufferPointer()));
- CHECK(node_state_[f.timestamp_node_index].header_valid)
- << ": Can't write data before the header on channel "
- << configuration::CleanedChannelToString(f.fetcher->channel());
- f.timestamp_writer->QueueSizedFlatbuffer(&fbb, end);
+ // TODO(austin): How do I track remote timestamp boot UUIDs? I need to
+ // update the uuid list in the header when one changes and track
+ // timestamps.
+ f.timestamp_writer->QueueSizedFlatbuffer(&fbb, event_loop_->boot_uuid(),
+ end);
}
if (f.contents_writer != nullptr) {
@@ -701,12 +633,6 @@
flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
- if (log_namer_->SetBootUUID(f.contents_node_index,
- UUID::FromVector(msg->boot_uuid()))) {
- node_state_[f.contents_node_index].header_valid = false;
- node_state_[f.contents_node_index].has_source_node_boot_uuid = true;
- MaybeWriteHeader(f.contents_node_index);
- }
logger::MessageHeader::Builder message_header_builder(fbb);
@@ -745,10 +671,8 @@
const auto end = event_loop_->monotonic_now();
RecordCreateMessageTime(start, end, &f);
- CHECK(node_state_[f.contents_node_index].header_valid)
- << ": Can't write data before the header on channel "
- << configuration::CleanedChannelToString(f.fetcher->channel());
- f.contents_writer->QueueSizedFlatbuffer(&fbb, end);
+ f.contents_writer->QueueSizedFlatbuffer(
+ &fbb, UUID::FromVector(msg->boot_uuid()), end);
}
f.written = true;