Rotate log files when the remote reboots
Add a logger_boot_uuid to track the difference between the logger node's
and the remote node's boot uuids, and also a logger start time. This
gives us enough information to enforce only 1 boot per node, and to also
detect which log came first if a node rebooted. Order detection isn't
used today, but before we generate more logs, we should fix the issue.
Also, enforce one boot per node when replaying. We don't manage
timestamps well enough to do anything else.
Change-Id: Ib5ba9f881a2c17d05b143e38ee20a209553acca8
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index a73d678..654c7ce 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -50,6 +50,14 @@
return result.value();
}
+std::string LogFileVectorToString(std::vector<LogFile> log_files) {
+ std::stringstream ss;
+ for (const auto f : log_files) {
+ ss << f << "\n";
+ }
+ return ss.str();
+}
+
// Copies the channel, removing the schema as we go. If new_name is provided,
// it is used instead of the name inside the channel. If new_type is provided,
// it is used instead of the type in the channel.
@@ -118,8 +126,6 @@
std::function<bool(const Channel *)> should_log)
: event_loop_(event_loop),
configuration_(configuration),
- boot_uuid_(
- util::ReadFileToStringOrDie("/proc/sys/kernel/random/boot_id")),
name_(network::GetHostname()),
timer_handler_(event_loop_->AddTimer(
[this]() { DoLogData(event_loop_->monotonic_now()); })),
@@ -182,7 +188,6 @@
}
FetcherStruct fs;
- fs.node_index = our_node_index;
fs.channel_index = channel_index;
fs.channel = channel;
@@ -214,12 +219,19 @@
if (log_delivery_times) {
VLOG(1) << " Delivery times";
fs.wants_timestamp_writer = true;
+ fs.timestamp_node_index = our_node_index;
}
if (log_message) {
VLOG(1) << " Data";
fs.wants_writer = true;
if (!is_local) {
+ const Node *source_node = configuration::GetNode(
+ configuration_, channel->source_node()->string_view());
+ fs.data_node_index =
+ configuration::GetNodeIndex(configuration_, source_node);
fs.log_type = LogType::kLogRemoteMessage;
+ } else {
+ fs.data_node_index = our_node_index;
}
}
if (log_contents) {
@@ -227,7 +239,7 @@
<< configuration::CleanedChannelToString(channel);
fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
fs.wants_contents_writer = true;
- fs.node_index =
+ fs.contents_node_index =
configuration::GetNodeIndex(configuration_, fs.timestamp_node);
}
fetchers_.emplace_back(std::move(fs));
@@ -319,7 +331,8 @@
// Clear out any old timestamps in case we are re-starting logging.
for (size_t i = 0; i < node_state_.size(); ++i) {
- SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time);
+ SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
+ monotonic_clock::min_time, realtime_clock::min_time);
}
WriteHeader();
@@ -327,6 +340,13 @@
LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
<< " start_time " << last_synchronized_time_;
+ // Force logging up until the start of the log file now, so the messages at
+ // the start are always ordered before the rest of the messages.
+ // Note: this ship may have already sailed, but we don't have to make it
+ // worse.
+ // TODO(austin): Test...
+ LogUntil(last_synchronized_time_);
+
timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
polling_period_);
}
@@ -373,10 +393,53 @@
const int node_index = configuration::GetNodeIndex(configuration_, node);
MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
realtime_start_time);
- log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
+ MaybeWriteHeader(node_index, node);
}
}
+void Logger::MaybeWriteHeader(int node_index) {
+ if (configuration::MultiNode(configuration_)) {
+ return MaybeWriteHeader(node_index,
+ configuration_->nodes()->Get(node_index));
+ } else {
+ return MaybeWriteHeader(node_index, nullptr);
+ }
+}
+
+void Logger::MaybeWriteHeader(int node_index, const Node *node) {
+ // This function is responsible for writing the header when the header both
+ // has valid data, and when it needs to be written.
+ if (node_state_[node_index].header_written &&
+ node_state_[node_index].header_valid) {
+ // The header has been written and is valid, nothing to do.
+ return;
+ }
+ if (!node_state_[node_index].has_source_node_boot_uuid) {
+ // Can't write a header if we don't have the boot UUID.
+ return;
+ }
+
+ // WriteHeader writes the first header in a log file. We want to do this only
+ // once.
+ //
+ // Rotate rewrites the same header with a new part ID, but keeps the same part
+ // UUID. We don't want that when things reboot, because that implies that
+ // parts go together across a reboot.
+ //
+ // Reboot resets the parts UUID. So, once we've written a header the first
+ // time, we want to use Reboot to rotate the log and reset the parts UUID.
+ //
+ // header_valid is cleared whenever the remote reboots.
+ if (node_state_[node_index].header_written) {
+ log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
+ } else {
+ log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
+
+ node_state_[node_index].header_written = true;
+ }
+ node_state_[node_index].header_valid = true;
+}
+
void Logger::WriteMissingTimestamps() {
if (configuration::MultiNode(configuration_)) {
server_statistics_fetcher_.Fetch();
@@ -394,14 +457,20 @@
node, node_index,
server_statistics_fetcher_.context().monotonic_event_time,
server_statistics_fetcher_.context().realtime_event_time)) {
+ CHECK(node_state_[node_index].header_written);
+ CHECK(node_state_[node_index].header_valid);
log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+ } else {
+ MaybeWriteHeader(node_index, node);
}
}
}
-void Logger::SetStartTime(size_t node_index,
- aos::monotonic_clock::time_point monotonic_start_time,
- aos::realtime_clock::time_point realtime_start_time) {
+void Logger::SetStartTime(
+ size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time,
+ aos::monotonic_clock::time_point logger_monotonic_start_time,
+ aos::realtime_clock::time_point logger_realtime_start_time) {
node_state_[node_index].monotonic_start_time = monotonic_start_time;
node_state_[node_index].realtime_start_time = realtime_start_time;
node_state_[node_index]
@@ -410,6 +479,30 @@
std::chrono::duration_cast<std::chrono::nanoseconds>(
monotonic_start_time.time_since_epoch())
.count());
+
+ // Add logger start times if they are available in the log file header.
+ if (node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->has_logger_monotonic_start_time()) {
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_logger_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ logger_monotonic_start_time.time_since_epoch())
+ .count());
+ }
+
+ if (node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->has_logger_realtime_start_time()) {
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_logger_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ logger_realtime_start_time.time_since_epoch())
+ .count());
+ }
+
if (node_state_[node_index]
.log_file_header.mutable_message()
->has_realtime_start_time()) {
@@ -431,47 +524,57 @@
monotonic_clock::min_time) {
return false;
}
- if (configuration::MultiNode(configuration_)) {
- if (event_loop_->node() == node) {
- // There are no offsets to compute for ourself, so always succeed.
- SetStartTime(node_index, monotonic_start_time, realtime_start_time);
- return true;
- } else if (server_statistics_fetcher_.get() != nullptr) {
- // We must be a remote node now. Look for the connection and see if it is
- // connected.
-
- for (const message_bridge::ServerConnection *connection :
- *server_statistics_fetcher_->connections()) {
- if (connection->node()->name()->string_view() !=
- node->name()->string_view()) {
- continue;
- }
-
- if (connection->state() != message_bridge::State::CONNECTED) {
- VLOG(1) << node->name()->string_view()
- << " is not connected, can't start it yet.";
- break;
- }
-
- if (!connection->has_monotonic_offset()) {
- VLOG(1) << "Missing monotonic offset for setting start time for node "
- << aos::FlatbufferToJson(node);
- break;
- }
-
- VLOG(1) << "Updating start time for " << aos::FlatbufferToJson(node);
-
- // Found it and it is connected. Compensate and go.
- monotonic_start_time +=
- std::chrono::nanoseconds(connection->monotonic_offset());
-
- SetStartTime(node_index, monotonic_start_time, realtime_start_time);
- return true;
- }
- }
- } else {
- SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+ if (event_loop_->node() == node ||
+ !configuration::MultiNode(configuration_)) {
+ // There are no offsets to compute for ourself, so always succeed.
+ SetStartTime(node_index, monotonic_start_time, realtime_start_time,
+ monotonic_start_time, realtime_start_time);
+ node_state_[node_index].SetBootUUID(event_loop_->boot_uuid().string_view());
return true;
+ } else if (server_statistics_fetcher_.get() != nullptr) {
+ // We must be a remote node now. Look for the connection and see if it is
+ // connected.
+
+ for (const message_bridge::ServerConnection *connection :
+ *server_statistics_fetcher_->connections()) {
+ if (connection->node()->name()->string_view() !=
+ node->name()->string_view()) {
+ continue;
+ }
+
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ VLOG(1) << node->name()->string_view()
+ << " is not connected, can't start it yet.";
+ break;
+ }
+
+ // Update the boot UUID as soon as we know we are connected.
+ if (!connection->has_boot_uuid()) {
+ VLOG(1) << "Missing boot_uuid for node " << aos::FlatbufferToJson(node);
+ break;
+ }
+
+ if (!node_state_[node_index].has_source_node_boot_uuid ||
+ node_state_[node_index].source_node_boot_uuid !=
+ connection->boot_uuid()->string_view()) {
+ node_state_[node_index].SetBootUUID(
+ connection->boot_uuid()->string_view());
+ }
+
+ if (!connection->has_monotonic_offset()) {
+ VLOG(1) << "Missing monotonic offset for setting start time for node "
+ << aos::FlatbufferToJson(node);
+ break;
+ }
+
+ // Found it and it is connected. Compensate and go.
+ SetStartTime(node_index,
+ monotonic_start_time +
+ std::chrono::nanoseconds(connection->monotonic_offset()),
+ realtime_start_time, monotonic_start_time,
+ realtime_start_time);
+ return true;
+ }
}
return false;
}
@@ -502,8 +605,11 @@
log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
}
- const flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
- fbb.CreateString(boot_uuid_);
+ const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
+ fbb.CreateString(event_loop_->boot_uuid().string_view());
+
+ const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
+ fbb.CreateString(event_loop_->boot_uuid().string_view());
const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
fbb.CreateString("00000000-0000-4000-8000-000000000000");
@@ -545,6 +651,15 @@
std::chrono::duration_cast<std::chrono::nanoseconds>(
realtime_clock::min_time.time_since_epoch())
.count());
+ } else {
+ log_file_header_builder.add_logger_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ monotonic_clock::min_time.time_since_epoch())
+ .count());
+ log_file_header_builder.add_logger_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_clock::min_time.time_since_epoch())
+ .count());
}
log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
@@ -552,7 +667,10 @@
if (!log_start_uuid_offset.IsNull()) {
log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
}
- log_file_header_builder.add_boot_uuid(boot_uuid_offset);
+ log_file_header_builder.add_logger_node_boot_uuid(
+ logger_node_boot_uuid_offset);
+ log_file_header_builder.add_source_node_boot_uuid(
+ source_node_boot_uuid_offset);
log_file_header_builder.add_parts_uuid(parts_uuid_offset);
log_file_header_builder.add_parts_index(0);
@@ -591,6 +709,9 @@
}
void Logger::LogUntil(monotonic_clock::time_point t) {
+ // Grab the latest ServerStatistics message. This will always have the
+ // oppertunity to be >= to the current time, so it will always represent any
+ // reboots which may have happened.
WriteMissingTimestamps();
// Write each channel to disk, one at a time.
@@ -636,6 +757,9 @@
max_header_size_ = std::max(max_header_size_,
fbb.GetSize() - f.fetcher->context().size);
+ CHECK(node_state_[f.data_node_index].header_valid)
+ << ": Can't write data before the header on channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel());
f.writer->QueueSizedFlatbuffer(&fbb);
}
@@ -659,6 +783,9 @@
flatbuffers::GetSizePrefixedRoot<MessageHeader>(
fbb.GetBufferPointer()));
+ CHECK(node_state_[f.timestamp_node_index].header_valid)
+ << ": Can't write data before the header on channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel());
f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
}
@@ -674,6 +801,16 @@
const RemoteMessage *msg =
flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
+ CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
+ if (!node_state_[f.contents_node_index].has_source_node_boot_uuid ||
+ node_state_[f.contents_node_index].source_node_boot_uuid !=
+ msg->boot_uuid()->string_view()) {
+ node_state_[f.contents_node_index].SetBootUUID(
+ msg->boot_uuid()->string_view());
+
+ MaybeWriteHeader(f.contents_node_index);
+ }
+
logger::MessageHeader::Builder message_header_builder(fbb);
// TODO(austin): This needs to check the channel_index and confirm
@@ -706,6 +843,9 @@
const auto end = event_loop_->monotonic_now();
RecordCreateMessageTime(start, end, &f);
+ CHECK(node_state_[f.contents_node_index].header_valid)
+ << ": Can't write data before the header on channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel());
f.contents_writer->QueueSizedFlatbuffer(&fbb);
}
@@ -917,6 +1057,18 @@
configuration::GetNodeIndex(configuration(), node);
std::vector<LogParts> filtered_parts = FilterPartsForNode(
log_files_, node != nullptr ? node->name()->string_view() : "");
+
+ // Confirm that all the parts are from the same boot if there are enough
+ // parts to not be from the same boot.
+ if (filtered_parts.size() > 1u) {
+ for (size_t i = 1; i < filtered_parts.size(); ++i) {
+ CHECK_EQ(filtered_parts[i].source_boot_uuid,
+ filtered_parts[0].source_boot_uuid)
+ << ": Found parts from different boots "
+ << LogFileVectorToString(log_files_);
+ }
+ }
+
states_[node_index] = std::make_unique<State>(
filtered_parts.size() == 0u
? nullptr
@@ -1418,6 +1570,10 @@
<< state->remote_node(timestamped_message.channel_index)
->name()
->string_view()
+ << " while trying to send a message on "
+ << configuration::CleanedChannelToString(
+ logged_configuration()->channels()->Get(
+ timestamped_message.channel_index))
<< " " << state->DebugString();
} else if (timestamped_message.monotonic_remote_time >=
state->monotonic_remote_now(
@@ -2017,6 +2173,9 @@
remote_timestamp_senders_[timestamped_message.channel_index]
->MakeBuilder();
+ flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
+ builder.fbb()->CreateString(event_loop_->boot_uuid().string_view());
+
RemoteMessage::Builder message_header_builder =
builder.MakeBuilder<RemoteMessage>();
@@ -2037,6 +2196,7 @@
timestamped_message.realtime_remote_time.time_since_epoch().count());
message_header_builder.add_remote_queue_index(remote_queue_index);
+ message_header_builder.add_boot_uuid(boot_uuid_offset);
builder.Send(message_header_builder.Finish());
}