Rotate log files when the remote reboots
Add a logger_boot_uuid to track the difference between the logger node's
and the remote node's boot uuids, and also a logger start time. This
gives us enough information to enforce only 1 boot per node, and to also
detect which log came first if a node rebooted. Order detection isn't
used today, but before we generate more logs, we should fix the issue.
Also, enforce one boot per node when replaying. We don't manage
timestamps well enough to do anything else.
Change-Id: Ib5ba9f881a2c17d05b143e38ee20a209553acca8
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index ec60143..a73be9b 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -50,6 +50,11 @@
UpdateHeader(header, uuid_, part_number_);
data_writer_->QueueSpan(header->span());
}
+void LocalLogNamer::Reboot(
+ const Node * /*node*/,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> * /*header*/) {
+ LOG(FATAL) << "Can't reboot a single node.";
+}
DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
const Channel *channel) {
@@ -104,8 +109,23 @@
void MultiNodeLogNamer::Rotate(
const Node *node,
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
+ DoRotate(node, header, false);
+}
+
+void MultiNodeLogNamer::Reboot(
+ const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
+ DoRotate(node, header, true);
+}
+
+void MultiNodeLogNamer::DoRotate(
+ const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header, bool reboot) {
if (node == this->node()) {
if (data_writer_.writer) {
+ if (reboot) {
+ data_writer_.uuid = UUID::Random();
+ }
++data_writer_.part_number;
}
OpenDataWriter();
@@ -115,6 +135,9 @@
for (std::pair<const Channel *const, DataWriter> &data_writer :
data_writers_) {
if (node == data_writer.second.node) {
+ if (reboot) {
+ data_writer.second.uuid = UUID::Random();
+ }
++data_writer.second.part_number;
data_writer.second.rotate(data_writer.first, &data_writer.second);
UpdateHeader(header, data_writer.second.uuid,
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 5384ab2..5534131 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -62,6 +62,12 @@
const Node *node,
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) = 0;
+ // Reboots all log files for the provided node. The provided header will be
+ // modified and written per WriteHeader above. Resets any parts UUIDs.
+ virtual void Reboot(
+ const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) = 0;
+
// Returns all the nodes that data is being written for.
const std::vector<const Node *> &nodes() const { return nodes_; }
@@ -100,6 +106,10 @@
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
override;
+ void Reboot(const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
+ override;
+
DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
DetachedBufferWriter *MakeForwardedTimestampWriter(
@@ -167,6 +177,10 @@
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
override;
+ void Reboot(const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
+ override;
+
DetachedBufferWriter *MakeWriter(const Channel *channel) override;
DetachedBufferWriter *MakeForwardedTimestampWriter(const Channel *channel,
@@ -275,10 +289,17 @@
std::unique_ptr<DetachedBufferWriter> writer = nullptr;
const Node *node;
size_t part_number = 0;
- const UUID uuid = UUID::Random();
+ UUID uuid = UUID::Random();
std::function<void(const Channel *, DataWriter *)> rotate;
};
+ // Implements Rotate and Reboot, controlled by the 'reboot' flag. The only
+ // difference between the two is if DataWriter::uuid is reset or not.
+ void DoRotate(
+ const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ bool reboot);
+
// Opens up a writer for timestamps forwarded back.
void OpenForwardedTimestampWriter(const Channel *channel,
DataWriter *data_writer);
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 1998119..22fc513 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -81,9 +81,17 @@
aos::monotonic_clock::time_point monotonic_start_time;
aos::realtime_clock::time_point realtime_start_time;
+ aos::monotonic_clock::time_point logger_monotonic_start_time =
+ aos::monotonic_clock::min_time;
+ aos::realtime_clock::time_point logger_realtime_start_time =
+ aos::realtime_clock::min_time;
+
// Node to save.
std::string node;
+ // The boot UUID of the node which generated this data.
+ std::string source_boot_uuid;
+
// Pairs of the filename and the part index for sorting.
std::vector<std::pair<std::string, int>> parts;
};
@@ -91,6 +99,9 @@
// Struct to hold both the node, and the parts associated with it.
struct UnsortedLogPartsMap {
std::string logger_node;
+ // The boot UUID of the node this log file was created on.
+ std::string logger_boot_uuid;
+
aos::monotonic_clock::time_point monotonic_start_time =
aos::monotonic_clock::min_time;
aos::realtime_clock::time_point realtime_start_time =
@@ -133,6 +144,12 @@
chrono::nanoseconds(log_header->message().monotonic_start_time()));
const realtime_clock::time_point realtime_start_time(
chrono::nanoseconds(log_header->message().realtime_start_time()));
+ const monotonic_clock::time_point logger_monotonic_start_time(
+ chrono::nanoseconds(
+ log_header->message().logger_monotonic_start_time()));
+ const realtime_clock::time_point logger_realtime_start_time(
+ chrono::nanoseconds(
+ log_header->message().logger_realtime_start_time()));
const std::string_view node =
log_header->message().has_node()
@@ -144,6 +161,16 @@
? log_header->message().logger_node()->name()->string_view()
: "";
+ const std::string_view logger_boot_uuid =
+ log_header->message().has_logger_node_boot_uuid()
+ ? log_header->message().logger_node_boot_uuid()->string_view()
+ : "";
+
+ const std::string_view source_boot_uuid =
+ log_header->message().has_source_node_boot_uuid()
+ ? log_header->message().source_node_boot_uuid()->string_view()
+ : "";
+
// Looks like an old log. No UUID, index, and also single node. We have
// little to no multi-node log files in the wild without part UUIDs and
// indexes which we care much about.
@@ -200,8 +227,10 @@
.insert(std::make_pair(log_event_uuid, UnsortedLogPartsMap()))
.first;
log_it->second.logger_node = logger_node;
+ log_it->second.logger_boot_uuid = logger_boot_uuid;
} else {
CHECK_EQ(log_it->second.logger_node, logger_node);
+ CHECK_EQ(log_it->second.logger_boot_uuid, logger_boot_uuid);
}
if (node == log_it->second.logger_node) {
@@ -222,14 +251,25 @@
.first;
it->second.monotonic_start_time = monotonic_start_time;
it->second.realtime_start_time = realtime_start_time;
+ it->second.logger_monotonic_start_time = logger_monotonic_start_time;
+ it->second.logger_realtime_start_time = logger_realtime_start_time;
it->second.node = std::string(node);
+ it->second.source_boot_uuid = source_boot_uuid;
+ } else {
+ CHECK_EQ(it->second.source_boot_uuid, source_boot_uuid);
}
// First part might be min_time. If it is, try to put a better time on it.
if (it->second.monotonic_start_time == monotonic_clock::min_time) {
it->second.monotonic_start_time = monotonic_start_time;
+ it->second.logger_monotonic_start_time = logger_monotonic_start_time;
+ it->second.logger_realtime_start_time = logger_realtime_start_time;
} else if (monotonic_start_time != monotonic_clock::min_time) {
CHECK_EQ(it->second.monotonic_start_time, monotonic_start_time);
+ CHECK_EQ(it->second.logger_monotonic_start_time,
+ logger_monotonic_start_time);
+ CHECK_EQ(it->second.logger_realtime_start_time,
+ logger_realtime_start_time);
}
if (it->second.realtime_start_time == realtime_clock::min_time) {
it->second.realtime_start_time = realtime_start_time;
@@ -285,6 +325,7 @@
LogFile new_file;
new_file.log_event_uuid = logs.first;
new_file.logger_node = logs.second.logger_node;
+ new_file.logger_boot_uuid = logs.second.logger_boot_uuid;
new_file.monotonic_start_time = logs.second.monotonic_start_time;
new_file.realtime_start_time = logs.second.realtime_start_time;
new_file.corrupted = corrupted;
@@ -293,7 +334,12 @@
LogParts new_parts;
new_parts.monotonic_start_time = parts.second.monotonic_start_time;
new_parts.realtime_start_time = parts.second.realtime_start_time;
+ new_parts.logger_monotonic_start_time =
+ parts.second.logger_monotonic_start_time;
+ new_parts.logger_realtime_start_time =
+ parts.second.logger_realtime_start_time;
new_parts.log_event_uuid = logs.first;
+ new_parts.source_boot_uuid = parts.second.source_boot_uuid;
new_parts.parts_uuid = parts.first;
new_parts.node = std::move(parts.second.node);
@@ -341,38 +387,46 @@
}
std::ostream &operator<<(std::ostream &stream, const LogFile &file) {
- stream << "{";
+ stream << "{\n";
if (!file.log_event_uuid.empty()) {
- stream << "\"log_event_uuid\": \"" << file.log_event_uuid << "\", ";
+ stream << " \"log_event_uuid\": \"" << file.log_event_uuid << "\",\n";
}
if (!file.logger_node.empty()) {
- stream << "\"logger_node\": \"" << file.logger_node << "\", ";
+ stream << " \"logger_node\": \"" << file.logger_node << "\",\n";
}
- stream << "\"monotonic_start_time\": " << file.monotonic_start_time
- << ", \"realtime_start_time\": " << file.realtime_start_time << ", [";
- stream << "\"parts\": [";
+ if (!file.logger_boot_uuid.empty()) {
+ stream << " \"logger_boot_uuid\": \"" << file.logger_boot_uuid << "\",\n";
+ }
+ stream << " \"monotonic_start_time\": " << file.monotonic_start_time
+ << ",\n \"realtime_start_time\": " << file.realtime_start_time
+ << ",\n";
+ stream << " \"parts\": [\n";
for (size_t i = 0; i < file.parts.size(); ++i) {
if (i != 0u) {
- stream << ", ";
+ stream << ",\n";
}
stream << file.parts[i];
}
- stream << "]}";
+ stream << " ]\n}";
return stream;
}
std::ostream &operator<<(std::ostream &stream, const LogParts &parts) {
- stream << "{";
+ stream << " {\n";
if (!parts.log_event_uuid.empty()) {
- stream << "\"log_event_uuid\": \"" << parts.log_event_uuid << "\", ";
+ stream << " \"log_event_uuid\": \"" << parts.log_event_uuid << "\",\n";
}
if (!parts.parts_uuid.empty()) {
- stream << "\"parts_uuid\": \"" << parts.parts_uuid << "\", ";
+ stream << " \"parts_uuid\": \"" << parts.parts_uuid << "\",\n";
}
if (!parts.node.empty()) {
- stream << "\"node\": \"" << parts.node << "\", ";
+ stream << " \"node\": \"" << parts.node << "\",\n";
}
- stream << "\"monotonic_start_time\": " << parts.monotonic_start_time
- << ", \"realtime_start_time\": " << parts.realtime_start_time << ", [";
+ if (!parts.source_boot_uuid.empty()) {
+ stream << " \"source_boot_uuid\": \"" << parts.source_boot_uuid << "\",\n";
+ }
+ stream << " \"monotonic_start_time\": " << parts.monotonic_start_time
+ << ",\n \"realtime_start_time\": " << parts.realtime_start_time
+ << ",\n \"parts\": [";
for (size_t i = 0; i < parts.parts.size(); ++i) {
if (i != 0u) {
@@ -381,7 +435,7 @@
stream << parts.parts[i];
}
- stream << "]}";
+ stream << "]\n }";
return stream;
}
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index b955a09..bfe1aa4 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -19,6 +19,12 @@
aos::monotonic_clock::time_point monotonic_start_time;
aos::realtime_clock::time_point realtime_start_time;
+ // Time on the logger node (if applicable) that this log file started.
+ aos::monotonic_clock::time_point logger_monotonic_start_time =
+ aos::monotonic_clock::min_time;
+ aos::realtime_clock::time_point logger_realtime_start_time =
+ aos::realtime_clock::min_time;
+
// UUIDs if available.
std::string log_event_uuid;
std::string parts_uuid;
@@ -26,6 +32,11 @@
// The node this represents, or empty if we are in a single node world.
std::string node;
+ // Boot UUID of the node which generated this data, if available. For local
+ // data and timestamps, this is the same as the logger_boot_uuid. For remote
+ // data, this is the boot_uuid of the remote node.
+ std::string source_boot_uuid;
+
// Pre-sorted list of parts.
std::vector<std::string> parts;
};
@@ -38,6 +49,8 @@
// The node the logger was running on (if available)
std::string logger_node;
+ // Boot UUID of the node running the logger.
+ std::string logger_boot_uuid;
// The start time on the logger node.
aos::monotonic_clock::time_point monotonic_start_time;
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 3f85cc7..79d66f4 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -454,10 +454,16 @@
// time.
// TODO(austin): Does this work with startup when we don't know the remote
// start time too? Look at one of those logs to compare.
- if (monotonic_sent_time > parts_.monotonic_start_time) {
+ if (monotonic_sent_time >
+ parts_.monotonic_start_time + max_out_of_order_duration()) {
+ after_start_ = true;
+ }
+ if (after_start_) {
CHECK_GE(monotonic_sent_time,
newest_timestamp_ - max_out_of_order_duration())
- << ": Max out of order exceeded. " << parts_;
+ << ": Max out of order exceeded. " << parts_ << ", start time is "
+ << parts_.monotonic_start_time << " currently reading "
+ << filename();
}
return message;
}
@@ -581,7 +587,8 @@
return nullptr;
}
- CHECK_GE(messages_.begin()->timestamp, last_message_time_);
+ CHECK_GE(messages_.begin()->timestamp, last_message_time_)
+ << DebugString() << " reading " << parts_message_reader_.filename();
last_message_time_ = messages_.begin()->timestamp;
return &(*messages_.begin());
}
@@ -591,8 +598,16 @@
std::string LogPartsSorter::DebugString() const {
std::stringstream ss;
ss << "messages: [\n";
+ int count = 0;
+ bool no_dots = true;
for (const Message &m : messages_) {
- ss << m << "\n";
+ if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
+ ss << m << "\n";
+ } else if (no_dots) {
+ ss << "...\n";
+ no_dots = false;
+ }
+ ++count;
}
ss << "] <- " << parts_message_reader_.filename();
return ss.str();
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index fd600d3..61f16aa 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -314,6 +314,15 @@
bool done_ = false;
MessageReader message_reader_;
+ // True after we have seen a message after the start of the log. The
+ // guarentees on logging essentially are that all data from before the
+ // starting time of the log may be arbitrarily out of order, but once we get
+ // max_out_of_order_duration past the start, everything will remain within
+ // max_out_of_order_duration. We shouldn't see anything before the start
+ // after we've seen a message that is at least max_out_of_order_duration after
+ // the start.
+ bool after_start_ = false;
+
monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
};
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index a73d678..654c7ce 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -50,6 +50,14 @@
return result.value();
}
+std::string LogFileVectorToString(std::vector<LogFile> log_files) {
+ std::stringstream ss;
+ for (const auto f : log_files) {
+ ss << f << "\n";
+ }
+ return ss.str();
+}
+
// Copies the channel, removing the schema as we go. If new_name is provided,
// it is used instead of the name inside the channel. If new_type is provided,
// it is used instead of the type in the channel.
@@ -118,8 +126,6 @@
std::function<bool(const Channel *)> should_log)
: event_loop_(event_loop),
configuration_(configuration),
- boot_uuid_(
- util::ReadFileToStringOrDie("/proc/sys/kernel/random/boot_id")),
name_(network::GetHostname()),
timer_handler_(event_loop_->AddTimer(
[this]() { DoLogData(event_loop_->monotonic_now()); })),
@@ -182,7 +188,6 @@
}
FetcherStruct fs;
- fs.node_index = our_node_index;
fs.channel_index = channel_index;
fs.channel = channel;
@@ -214,12 +219,19 @@
if (log_delivery_times) {
VLOG(1) << " Delivery times";
fs.wants_timestamp_writer = true;
+ fs.timestamp_node_index = our_node_index;
}
if (log_message) {
VLOG(1) << " Data";
fs.wants_writer = true;
if (!is_local) {
+ const Node *source_node = configuration::GetNode(
+ configuration_, channel->source_node()->string_view());
+ fs.data_node_index =
+ configuration::GetNodeIndex(configuration_, source_node);
fs.log_type = LogType::kLogRemoteMessage;
+ } else {
+ fs.data_node_index = our_node_index;
}
}
if (log_contents) {
@@ -227,7 +239,7 @@
<< configuration::CleanedChannelToString(channel);
fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
fs.wants_contents_writer = true;
- fs.node_index =
+ fs.contents_node_index =
configuration::GetNodeIndex(configuration_, fs.timestamp_node);
}
fetchers_.emplace_back(std::move(fs));
@@ -319,7 +331,8 @@
// Clear out any old timestamps in case we are re-starting logging.
for (size_t i = 0; i < node_state_.size(); ++i) {
- SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time);
+ SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
+ monotonic_clock::min_time, realtime_clock::min_time);
}
WriteHeader();
@@ -327,6 +340,13 @@
LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
<< " start_time " << last_synchronized_time_;
+ // Force logging up until the start of the log file now, so the messages at
+ // the start are always ordered before the rest of the messages.
+ // Note: this ship may have already sailed, but we don't have to make it
+ // worse.
+ // TODO(austin): Test...
+ LogUntil(last_synchronized_time_);
+
timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
polling_period_);
}
@@ -373,10 +393,53 @@
const int node_index = configuration::GetNodeIndex(configuration_, node);
MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
realtime_start_time);
- log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
+ MaybeWriteHeader(node_index, node);
}
}
+void Logger::MaybeWriteHeader(int node_index) {
+ if (configuration::MultiNode(configuration_)) {
+ return MaybeWriteHeader(node_index,
+ configuration_->nodes()->Get(node_index));
+ } else {
+ return MaybeWriteHeader(node_index, nullptr);
+ }
+}
+
+void Logger::MaybeWriteHeader(int node_index, const Node *node) {
+ // This function is responsible for writing the header when the header both
+ // has valid data, and when it needs to be written.
+ if (node_state_[node_index].header_written &&
+ node_state_[node_index].header_valid) {
+ // The header has been written and is valid, nothing to do.
+ return;
+ }
+ if (!node_state_[node_index].has_source_node_boot_uuid) {
+ // Can't write a header if we don't have the boot UUID.
+ return;
+ }
+
+ // WriteHeader writes the first header in a log file. We want to do this only
+ // once.
+ //
+ // Rotate rewrites the same header with a new part ID, but keeps the same part
+ // UUID. We don't want that when things reboot, because that implies that
+ // parts go together across a reboot.
+ //
+ // Reboot resets the parts UUID. So, once we've written a header the first
+ // time, we want to use Reboot to rotate the log and reset the parts UUID.
+ //
+ // header_valid is cleared whenever the remote reboots.
+ if (node_state_[node_index].header_written) {
+ log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
+ } else {
+ log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
+
+ node_state_[node_index].header_written = true;
+ }
+ node_state_[node_index].header_valid = true;
+}
+
void Logger::WriteMissingTimestamps() {
if (configuration::MultiNode(configuration_)) {
server_statistics_fetcher_.Fetch();
@@ -394,14 +457,20 @@
node, node_index,
server_statistics_fetcher_.context().monotonic_event_time,
server_statistics_fetcher_.context().realtime_event_time)) {
+ CHECK(node_state_[node_index].header_written);
+ CHECK(node_state_[node_index].header_valid);
log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+ } else {
+ MaybeWriteHeader(node_index, node);
}
}
}
-void Logger::SetStartTime(size_t node_index,
- aos::monotonic_clock::time_point monotonic_start_time,
- aos::realtime_clock::time_point realtime_start_time) {
+void Logger::SetStartTime(
+ size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time,
+ aos::monotonic_clock::time_point logger_monotonic_start_time,
+ aos::realtime_clock::time_point logger_realtime_start_time) {
node_state_[node_index].monotonic_start_time = monotonic_start_time;
node_state_[node_index].realtime_start_time = realtime_start_time;
node_state_[node_index]
@@ -410,6 +479,30 @@
std::chrono::duration_cast<std::chrono::nanoseconds>(
monotonic_start_time.time_since_epoch())
.count());
+
+ // Add logger start times if they are available in the log file header.
+ if (node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->has_logger_monotonic_start_time()) {
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_logger_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ logger_monotonic_start_time.time_since_epoch())
+ .count());
+ }
+
+ if (node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->has_logger_realtime_start_time()) {
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_logger_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ logger_realtime_start_time.time_since_epoch())
+ .count());
+ }
+
if (node_state_[node_index]
.log_file_header.mutable_message()
->has_realtime_start_time()) {
@@ -431,47 +524,57 @@
monotonic_clock::min_time) {
return false;
}
- if (configuration::MultiNode(configuration_)) {
- if (event_loop_->node() == node) {
- // There are no offsets to compute for ourself, so always succeed.
- SetStartTime(node_index, monotonic_start_time, realtime_start_time);
- return true;
- } else if (server_statistics_fetcher_.get() != nullptr) {
- // We must be a remote node now. Look for the connection and see if it is
- // connected.
-
- for (const message_bridge::ServerConnection *connection :
- *server_statistics_fetcher_->connections()) {
- if (connection->node()->name()->string_view() !=
- node->name()->string_view()) {
- continue;
- }
-
- if (connection->state() != message_bridge::State::CONNECTED) {
- VLOG(1) << node->name()->string_view()
- << " is not connected, can't start it yet.";
- break;
- }
-
- if (!connection->has_monotonic_offset()) {
- VLOG(1) << "Missing monotonic offset for setting start time for node "
- << aos::FlatbufferToJson(node);
- break;
- }
-
- VLOG(1) << "Updating start time for " << aos::FlatbufferToJson(node);
-
- // Found it and it is connected. Compensate and go.
- monotonic_start_time +=
- std::chrono::nanoseconds(connection->monotonic_offset());
-
- SetStartTime(node_index, monotonic_start_time, realtime_start_time);
- return true;
- }
- }
- } else {
- SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+ if (event_loop_->node() == node ||
+ !configuration::MultiNode(configuration_)) {
+ // There are no offsets to compute for ourself, so always succeed.
+ SetStartTime(node_index, monotonic_start_time, realtime_start_time,
+ monotonic_start_time, realtime_start_time);
+ node_state_[node_index].SetBootUUID(event_loop_->boot_uuid().string_view());
return true;
+ } else if (server_statistics_fetcher_.get() != nullptr) {
+ // We must be a remote node now. Look for the connection and see if it is
+ // connected.
+
+ for (const message_bridge::ServerConnection *connection :
+ *server_statistics_fetcher_->connections()) {
+ if (connection->node()->name()->string_view() !=
+ node->name()->string_view()) {
+ continue;
+ }
+
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ VLOG(1) << node->name()->string_view()
+ << " is not connected, can't start it yet.";
+ break;
+ }
+
+ // Update the boot UUID as soon as we know we are connected.
+ if (!connection->has_boot_uuid()) {
+ VLOG(1) << "Missing boot_uuid for node " << aos::FlatbufferToJson(node);
+ break;
+ }
+
+ if (!node_state_[node_index].has_source_node_boot_uuid ||
+ node_state_[node_index].source_node_boot_uuid !=
+ connection->boot_uuid()->string_view()) {
+ node_state_[node_index].SetBootUUID(
+ connection->boot_uuid()->string_view());
+ }
+
+ if (!connection->has_monotonic_offset()) {
+ VLOG(1) << "Missing monotonic offset for setting start time for node "
+ << aos::FlatbufferToJson(node);
+ break;
+ }
+
+ // Found it and it is connected. Compensate and go.
+ SetStartTime(node_index,
+ monotonic_start_time +
+ std::chrono::nanoseconds(connection->monotonic_offset()),
+ realtime_start_time, monotonic_start_time,
+ realtime_start_time);
+ return true;
+ }
}
return false;
}
@@ -502,8 +605,11 @@
log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
}
- const flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
- fbb.CreateString(boot_uuid_);
+ const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
+ fbb.CreateString(event_loop_->boot_uuid().string_view());
+
+ const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
+ fbb.CreateString(event_loop_->boot_uuid().string_view());
const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
fbb.CreateString("00000000-0000-4000-8000-000000000000");
@@ -545,6 +651,15 @@
std::chrono::duration_cast<std::chrono::nanoseconds>(
realtime_clock::min_time.time_since_epoch())
.count());
+ } else {
+ log_file_header_builder.add_logger_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ monotonic_clock::min_time.time_since_epoch())
+ .count());
+ log_file_header_builder.add_logger_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_clock::min_time.time_since_epoch())
+ .count());
}
log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
@@ -552,7 +667,10 @@
if (!log_start_uuid_offset.IsNull()) {
log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
}
- log_file_header_builder.add_boot_uuid(boot_uuid_offset);
+ log_file_header_builder.add_logger_node_boot_uuid(
+ logger_node_boot_uuid_offset);
+ log_file_header_builder.add_source_node_boot_uuid(
+ source_node_boot_uuid_offset);
log_file_header_builder.add_parts_uuid(parts_uuid_offset);
log_file_header_builder.add_parts_index(0);
@@ -591,6 +709,9 @@
}
void Logger::LogUntil(monotonic_clock::time_point t) {
+ // Grab the latest ServerStatistics message. This will always have the
+ // oppertunity to be >= to the current time, so it will always represent any
+ // reboots which may have happened.
WriteMissingTimestamps();
// Write each channel to disk, one at a time.
@@ -636,6 +757,9 @@
max_header_size_ = std::max(max_header_size_,
fbb.GetSize() - f.fetcher->context().size);
+ CHECK(node_state_[f.data_node_index].header_valid)
+ << ": Can't write data before the header on channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel());
f.writer->QueueSizedFlatbuffer(&fbb);
}
@@ -659,6 +783,9 @@
flatbuffers::GetSizePrefixedRoot<MessageHeader>(
fbb.GetBufferPointer()));
+ CHECK(node_state_[f.timestamp_node_index].header_valid)
+ << ": Can't write data before the header on channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel());
f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
}
@@ -674,6 +801,16 @@
const RemoteMessage *msg =
flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
+ CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
+ if (!node_state_[f.contents_node_index].has_source_node_boot_uuid ||
+ node_state_[f.contents_node_index].source_node_boot_uuid !=
+ msg->boot_uuid()->string_view()) {
+ node_state_[f.contents_node_index].SetBootUUID(
+ msg->boot_uuid()->string_view());
+
+ MaybeWriteHeader(f.contents_node_index);
+ }
+
logger::MessageHeader::Builder message_header_builder(fbb);
// TODO(austin): This needs to check the channel_index and confirm
@@ -706,6 +843,9 @@
const auto end = event_loop_->monotonic_now();
RecordCreateMessageTime(start, end, &f);
+ CHECK(node_state_[f.contents_node_index].header_valid)
+ << ": Can't write data before the header on channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel());
f.contents_writer->QueueSizedFlatbuffer(&fbb);
}
@@ -917,6 +1057,18 @@
configuration::GetNodeIndex(configuration(), node);
std::vector<LogParts> filtered_parts = FilterPartsForNode(
log_files_, node != nullptr ? node->name()->string_view() : "");
+
+ // Confirm that all the parts are from the same boot if there are enough
+ // parts to not be from the same boot.
+ if (filtered_parts.size() > 1u) {
+ for (size_t i = 1; i < filtered_parts.size(); ++i) {
+ CHECK_EQ(filtered_parts[i].source_boot_uuid,
+ filtered_parts[0].source_boot_uuid)
+ << ": Found parts from different boots "
+ << LogFileVectorToString(log_files_);
+ }
+ }
+
states_[node_index] = std::make_unique<State>(
filtered_parts.size() == 0u
? nullptr
@@ -1418,6 +1570,10 @@
<< state->remote_node(timestamped_message.channel_index)
->name()
->string_view()
+ << " while trying to send a message on "
+ << configuration::CleanedChannelToString(
+ logged_configuration()->channels()->Get(
+ timestamped_message.channel_index))
<< " " << state->DebugString();
} else if (timestamped_message.monotonic_remote_time >=
state->monotonic_remote_now(
@@ -2017,6 +2173,9 @@
remote_timestamp_senders_[timestamped_message.channel_index]
->MakeBuilder();
+ flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
+ builder.fbb()->CreateString(event_loop_->boot_uuid().string_view());
+
RemoteMessage::Builder message_header_builder =
builder.MakeBuilder<RemoteMessage>();
@@ -2037,6 +2196,7 @@
timestamped_message.realtime_remote_time.time_since_epoch().count());
message_header_builder.add_remote_queue_index(remote_queue_index);
+ message_header_builder.add_boot_uuid(boot_uuid_offset);
builder.Send(message_header_builder.Finish());
}
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index fecfd6e..fef68f4 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -56,7 +56,15 @@
// All log parts generated on a single node while it is booted will have the
// same value here. It also matches with the one used by systemd.
- boot_uuid: string (id: 11);
+ logger_node_boot_uuid: string (id: 11);
+
+ // Empty if we didn't have one at the time.
+ source_node_boot_uuid: string (id: 13);
+
+ // Timestamps that this logfile started at on the logger's clocks. This is
+ // mostly useful when trying to deduce the order of node reboots.
+ logger_monotonic_start_time:int64 = -9223372036854775808 (id: 14);
+ logger_realtime_start_time:int64 = -9223372036854775808 (id: 15);
// All log events across all nodes produced by a single high-level start event
// will have the same value here.
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index a248f80..d042f90 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -180,7 +180,12 @@
bool wants_contents_writer = false;
DetachedBufferWriter *contents_writer = nullptr;
- int node_index = 0;
+ // Node which this data is from, or -1 if it is unknown.
+ int data_node_index = -1;
+ // Node that this timestamp is for, or -1 if it is known.
+ int timestamp_node_index = -1;
+ // Node that the contents this contents_writer will log are from.
+ int contents_node_index = -1;
};
// Vector mapping from the channel index from the event loop to the logged
@@ -193,14 +198,46 @@
aos::realtime_clock::time_point realtime_start_time =
aos::realtime_clock::min_time;
+ bool has_source_node_boot_uuid = false;
+
+ // This is an initial UUID that is a valid UUID4 and is pretty obvious that
+ // it isn't valid.
+ std::string source_node_boot_uuid = "00000000-0000-4000-8000-000000000000";
+
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
+
+ // True if a header has been written to the start of a log file.
+ bool header_written = false;
+ // True if the current written header represents the contents which will
+ // follow. This is cleared when boot_uuid is known to not match anymore.
+ bool header_valid = false;
+
+ // Sets the source_node_boot_uuid, properly updating everything.
+ void SetBootUUID(std::string_view new_source_node_boot_uuid) {
+ source_node_boot_uuid = new_source_node_boot_uuid;
+ header_valid = false;
+ has_source_node_boot_uuid = true;
+
+ flatbuffers::String *source_node_boot_uuid_string =
+ log_file_header.mutable_message()->mutable_source_node_boot_uuid();
+ CHECK_EQ(source_node_boot_uuid.size(),
+ source_node_boot_uuid_string->size());
+ memcpy(source_node_boot_uuid_string->data(), source_node_boot_uuid.data(),
+ source_node_boot_uuid.size());
+ }
};
void WriteHeader();
+
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
const Node *node);
+ // Writes the header for the provided node if enough information is valid.
+ void MaybeWriteHeader(int node_index);
+ // Overload for when we already know node as well.
+ void MaybeWriteHeader(int node_index, const Node *node);
+
bool MaybeUpdateTimestamp(
const Node *node, int node_index,
aos::monotonic_clock::time_point monotonic_start_time,
@@ -222,9 +259,11 @@
FetcherStruct *fetcher);
// Sets the start time for a specific node.
- void SetStartTime(size_t node_index,
- aos::monotonic_clock::time_point monotonic_start_time,
- aos::realtime_clock::time_point realtime_start_time);
+ void SetStartTime(
+ size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time,
+ aos::monotonic_clock::time_point logger_monotonic_start_time,
+ aos::realtime_clock::time_point logger_realtime_start_time);
EventLoop *const event_loop_;
// The configuration to place at the top of the log file.
@@ -235,7 +274,6 @@
std::unique_ptr<LogNamer> log_namer_;
// Empty indicates there isn't one.
std::string log_start_uuid_;
- const std::string boot_uuid_;
// Name to save in the log file. Defaults to hostname.
std::string name_;
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 8115400..3f4893e 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1,5 +1,6 @@
#include "aos/events/logging/logger.h"
+#include "absl/strings/str_format.h"
#include "aos/events/event_loop.h"
#include "aos/events/message_counter.h"
#include "aos/events/ping_lib.h"
@@ -360,6 +361,30 @@
}
}
+std::vector<std::string> MakeLogFiles(std::string logfile_base) {
+ return std::vector<std::string>(
+ {logfile_base + "_pi1_data.part0.bfbs",
+ logfile_base + "_pi2_data/test/aos.examples.Pong.part0.bfbs",
+ logfile_base + "_pi2_data/test/aos.examples.Pong.part1.bfbs",
+ logfile_base + "_pi2_data.part0.bfbs",
+ logfile_base + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.message_bridge.RemoteMessage.part0.bfbs",
+ logfile_base + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.message_bridge.RemoteMessage.part1.bfbs",
+ logfile_base + "_timestamps/pi2/aos/remote_timestamps/pi1/"
+ "aos.message_bridge.RemoteMessage.part0.bfbs",
+ logfile_base + "_timestamps/pi2/aos/remote_timestamps/pi1/"
+ "aos.message_bridge.RemoteMessage.part1.bfbs",
+ logfile_base +
+ "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part0.bfbs",
+ logfile_base +
+ "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part1.bfbs",
+ logfile_base +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs",
+ logfile_base +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part1.bfbs"});
+}
+
class MultinodeLoggerTest : public ::testing::Test {
public:
MultinodeLoggerTest()
@@ -372,27 +397,24 @@
configuration::GetNode(event_loop_factory_.configuration(), "pi2")),
tmp_dir_(aos::testing::TestTmpDir()),
logfile_base_(tmp_dir_ + "/multi_logfile"),
- logfiles_(
+ pi1_reboot_logfiles_(
{logfile_base_ + "_pi1_data.part0.bfbs",
logfile_base_ + "_pi2_data/test/aos.examples.Pong.part0.bfbs",
logfile_base_ + "_pi2_data/test/aos.examples.Pong.part1.bfbs",
- logfile_base_ + "_pi2_data.part0.bfbs",
+ logfile_base_ + "_pi2_data/test/aos.examples.Pong.part2.bfbs",
logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
"aos.message_bridge.RemoteMessage.part0.bfbs",
logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
"aos.message_bridge.RemoteMessage.part1.bfbs",
- logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
- "aos.message_bridge.RemoteMessage.part0.bfbs",
- logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
- "aos.message_bridge.RemoteMessage.part1.bfbs",
- logfile_base_ +
- "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part0.bfbs",
- logfile_base_ +
- "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part1.bfbs",
+ logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.message_bridge.RemoteMessage.part2.bfbs",
logfile_base_ +
"_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs",
logfile_base_ +
- "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part1.bfbs"}),
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part1.bfbs",
+ logfile_base_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part2.bfbs"}),
+ logfiles_(MakeLogFiles(logfile_base_)),
structured_logfiles_{
std::vector<std::string>{logfiles_[0]},
std::vector<std::string>{logfiles_[1], logfiles_[2]},
@@ -411,6 +433,14 @@
unlink((file + ".xz").c_str());
}
+ for (const auto file : MakeLogFiles("relogged")) {
+ unlink(file.c_str());
+ }
+
+ for (const auto file : pi1_reboot_logfiles_) {
+ unlink(file.c_str());
+ }
+
LOG(INFO) << "Logging data to " << logfiles_[0] << ", " << logfiles_[1]
<< " and " << logfiles_[2];
}
@@ -545,6 +575,7 @@
std::string tmp_dir_;
std::string logfile_base_;
+ std::vector<std::string> pi1_reboot_logfiles_;
std::vector<std::string> logfiles_;
std::vector<std::vector<std::string>> structured_logfiles_;
@@ -1478,9 +1509,9 @@
pi1_event_loop->MakeWatcher(
"/aos/remote_timestamps/pi2",
- [&pi1_event_loop, pi1_timestamp_channel, ping_timestamp_channel,
- &pi1_timestamp_on_pi1_fetcher, &pi1_timestamp_on_pi2_fetcher,
- &ping_on_pi1_fetcher,
+ [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
+ ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
+ &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
&ping_on_pi2_fetcher](const RemoteMessage &header) {
const aos::monotonic_clock::time_point header_monotonic_sent_time(
chrono::nanoseconds(header.monotonic_sent_time()));
@@ -1511,6 +1542,10 @@
header.channel_index()));
}
+ ASSERT_TRUE(header.has_boot_uuid());
+ EXPECT_EQ(header.boot_uuid()->string_view(),
+ pi2_event_loop->boot_uuid().string_view());
+
EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
EXPECT_EQ(pi2_context->remote_queue_index, header.remote_queue_index());
EXPECT_EQ(pi2_context->queue_index, header.queue_index());
@@ -1530,9 +1565,9 @@
});
pi2_event_loop->MakeWatcher(
"/aos/remote_timestamps/pi1",
- [&pi2_event_loop, pi2_timestamp_channel, pong_timestamp_channel,
- &pi2_timestamp_on_pi2_fetcher, &pi2_timestamp_on_pi1_fetcher,
- &pong_on_pi2_fetcher,
+ [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
+ pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
+ &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
&pong_on_pi1_fetcher](const RemoteMessage &header) {
const aos::monotonic_clock::time_point header_monotonic_sent_time(
chrono::nanoseconds(header.monotonic_sent_time()));
@@ -1563,6 +1598,10 @@
header.channel_index()));
}
+ ASSERT_TRUE(header.has_boot_uuid());
+ EXPECT_EQ(header.boot_uuid()->string_view(),
+ pi1_event_loop->boot_uuid().string_view());
+
EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
EXPECT_EQ(pi1_context->remote_queue_index, header.remote_queue_index());
EXPECT_EQ(pi1_context->queue_index, header.queue_index());
@@ -1602,9 +1641,92 @@
reader.Deregister();
}
+// Tests that we properly populate and extract the logger_start time by setting
+// up a clock difference between 2 nodes and looking at the resulting parts.
+TEST_F(MultinodeLoggerTest, LoggerStartTime) {
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ NodeEventLoopFactory *pi2 =
+ event_loop_factory_.GetNodeEventLoopFactory(pi2_);
+
+ pi2->SetDistributedOffset(chrono::seconds(1000), 1.0);
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(10000));
+ }
+
+ for (const LogFile &log_file : SortParts(logfiles_)) {
+ for (const LogParts &log_part : log_file.parts) {
+ if (log_part.node == log_file.logger_node) {
+ EXPECT_EQ(log_part.logger_monotonic_start_time,
+ aos::monotonic_clock::min_time);
+ EXPECT_EQ(log_part.logger_realtime_start_time,
+ aos::realtime_clock::min_time);
+ } else {
+ const chrono::seconds offset = log_file.logger_node == "pi1"
+ ? -chrono::seconds(1000)
+ : chrono::seconds(1000);
+ EXPECT_EQ(log_part.logger_monotonic_start_time,
+ log_part.monotonic_start_time + offset);
+ EXPECT_EQ(log_part.logger_realtime_start_time,
+ log_file.realtime_start_time +
+ (log_part.logger_monotonic_start_time -
+ log_file.monotonic_start_time));
+ }
+ }
+ }
+}
+
// TODO(austin): We can write a test which recreates a logfile and confirms that
// we get it back. That is the ultimate test.
+// Tests that we properly recreate forwarded timestamps when replaying a log.
+// This should be enough that we can then re-run the logger and get a valid log
+// back.
+TEST_F(MultinodeLoggerDeathTest, RemoteReboot) {
+ std::string pi2_boot1;
+ std::string pi2_boot2;
+ {
+ pi2_boot1 = event_loop_factory_.GetNodeEventLoopFactory(pi2_)
+ ->boot_uuid()
+ .string_view();
+ LoggerState pi1_logger = MakeLogger(pi1_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(10000));
+
+ event_loop_factory_.GetNodeEventLoopFactory(pi2_)->Reboot();
+
+ pi2_boot2 = event_loop_factory_.GetNodeEventLoopFactory(pi2_)
+ ->boot_uuid()
+ .string_view();
+
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+
+ // Confirm that we refuse to replay logs with missing boot uuids.
+ EXPECT_DEATH(
+ {
+ LogReader reader(SortParts(pi1_reboot_logfiles_));
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+ // This sends out the fetched messages and advances time to the start of
+ // the log file.
+ reader.Register(&log_reader_factory);
+ },
+ absl::StrFormat("(%s|%s).*(%s|%s).*Found parts from different boots",
+ pi2_boot1, pi2_boot2, pi2_boot2, pi2_boot1));
+}
+
} // namespace testing
} // namespace logger
} // namespace aos