Repair possible message loss due to log set restarts.
Moves the restart logic down inside aos. Slight refactor
therein to encapsulate common Start/Restart logic.
Change-Id: I4f4e6c88fe6af7bc2d37710c35a3e96d7867e876
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 0e1dece..b8d0c7b 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -272,12 +272,9 @@
return true;
}
-void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
- std::optional<UUID> log_start_uuid) {
- CHECK(!log_namer_) << ": Already logging";
- log_namer_ = std::move(log_namer);
-
+std::string Logger::WriteConfiguration(LogNamer* log_namer) {
std::string config_sha256;
+
if (separate_config_) {
flatbuffers::FlatBufferBuilder fbb;
flatbuffers::Offset<aos::Configuration> configuration_offset =
@@ -289,12 +286,24 @@
fbb.Release());
config_sha256 = Sha256(config_header.span());
LOG(INFO) << "Config sha256 of " << config_sha256;
- log_namer_->WriteConfiguration(&config_header, config_sha256);
+ log_namer->WriteConfiguration(&config_header, config_sha256);
}
+ return config_sha256;
+}
+
+void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
+ std::optional<UUID> log_start_uuid) {
+ CHECK(!log_namer_) << ": Already logging";
+
+ VLOG(1) << "Starting logger for " << FlatbufferToJson(node_);
+
+ auto config_sha256 = WriteConfiguration(log_namer.get());
+
+ log_namer_ = std::move(log_namer);
+
log_event_uuid_ = UUID::Random();
log_start_uuid_ = log_start_uuid;
- VLOG(1) << "Starting logger for " << FlatbufferToJson(node_);
// We want to do as much work as possible before the initial Fetch. Time
// between that and actually starting to log opens up the possibility of
@@ -342,12 +351,12 @@
const aos::monotonic_clock::time_point header_time =
event_loop_->monotonic_now();
- LOG(INFO) << "Logging node as " << FlatbufferToJson(node_)
- << " start_time " << last_synchronized_time_ << ", took "
- << chrono::duration<double>(fetch_time - beginning_time).count()
- << " to fetch, "
- << chrono::duration<double>(header_time - fetch_time).count()
- << " to write headers, boot uuid " << event_loop_->boot_uuid();
+ VLOG(1) << "Logging node as " << FlatbufferToJson(node_)
+ << " start_time " << last_synchronized_time_ << ", took "
+ << chrono::duration<double>(fetch_time - beginning_time).count()
+ << " to fetch, "
+ << chrono::duration<double>(header_time - fetch_time).count()
+ << " to write headers, boot uuid " << event_loop_->boot_uuid();
// Force logging up until the start of the log file now, so the messages at
// the start are always ordered before the rest of the messages.
@@ -364,6 +373,113 @@
polling_period_);
}
+std::unique_ptr<LogNamer> Logger::RestartLogging(std::unique_ptr<LogNamer> log_namer,
+ std::optional<UUID> log_start_uuid) {
+ CHECK(log_namer_) << ": Unexpected restart while not logging";
+
+ VLOG(1) << "Restarting logger for " << FlatbufferToJson(node_);
+
+ // Force out every currently pending message, pointing all fetchers at the
+ // last (currently available) records. Note that LogUntil() updates
+ // last_synchronized_time_ to the time value that it receives.
+ while(LogUntil(last_synchronized_time_ + polling_period_));
+
+ std::unique_ptr<LogNamer> old_log_namer = std::move(log_namer_);
+ log_namer_ = std::move(log_namer);
+
+ const aos::monotonic_clock::time_point beginning_time =
+ event_loop_->monotonic_now();
+
+ auto config_sha256 = WriteConfiguration(log_namer_.get());
+
+ log_event_uuid_ = UUID::Random();
+ log_start_uuid_ = log_start_uuid;
+
+ log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
+
+ // Note that WriteHeader updates last_synchronized_time_ to be the
+ // current time when it is called, which is then the "start time"
+ // of the new (restarted) log. This timestamp will be after
+ // the timestamp of the last message fetched on each channel.
+ WriteHeader();
+
+ const aos::monotonic_clock::time_point header_time =
+ event_loop_->monotonic_now();
+
+ // Write the transition record(s) for each channel ...
+ for (FetcherStruct &f : fetchers_) {
+
+ // Create writers from the new namer
+ NewDataWriter *next_writer = nullptr;
+ NewDataWriter *next_timestamp_writer = nullptr;
+ NewDataWriter *next_contents_writer = nullptr;
+
+ if (f.wants_writer) {
+ next_writer = log_namer_->MakeWriter(f.channel);
+ }
+ if (f.wants_timestamp_writer) {
+ next_timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
+ }
+ if (f.wants_contents_writer) {
+ next_contents_writer = log_namer_->MakeForwardedTimestampWriter(
+ f.channel, CHECK_NOTNULL(f.timestamp_node));
+ }
+
+ if (f.fetcher->context().data != nullptr) {
+
+ // Write the last message fetched as the first of the new log of this type.
+ // The timestamps on these will all be before the new start time.
+ WriteData(next_writer, f);
+ WriteTimestamps(next_timestamp_writer, f);
+ WriteContent(next_contents_writer, f);
+
+ // It is possible that a few more snuck in. Write them all out also, including
+ // any that should also be in the old log.
+ while (true) {
+ // Get the next message ...
+ const auto start = event_loop_->monotonic_now();
+ const bool got_new = f.fetcher->FetchNext();
+ const auto end = event_loop_->monotonic_now();
+ RecordFetchResult(start, end, got_new, &f);
+
+ if (got_new) {
+ if (f.fetcher->context().monotonic_event_time < last_synchronized_time_) {
+ WriteFetchedRecord(f);
+ }
+
+ WriteData(next_writer, f);
+ WriteTimestamps(next_timestamp_writer, f);
+ WriteContent(next_contents_writer, f);
+
+ if (f.fetcher->context().monotonic_event_time > last_synchronized_time_) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ // Switch fully over to the new writers.
+ f.writer = next_writer;
+ f.timestamp_writer = next_timestamp_writer;
+ f.contents_writer = next_contents_writer;
+ f.written = true;
+ }
+
+ const aos::monotonic_clock::time_point channel_time =
+ event_loop_->monotonic_now();
+
+ VLOG(1) << "Logging node as " << FlatbufferToJson(node_)
+ << " restart_time " << last_synchronized_time_ << ", took "
+ << chrono::duration<double>(header_time - beginning_time).count()
+ << " to prepare and write header, "
+ << chrono::duration<double>(channel_time - header_time).count()
+ << " to write initial channel messages, boot uuid " << event_loop_->boot_uuid();
+
+ return old_log_namer;
+}
+
std::unique_ptr<LogNamer> Logger::StopLogging(
aos::monotonic_clock::time_point end_time) {
CHECK(log_namer_) << ": Not logging right now";
@@ -445,6 +561,7 @@
monotonic_clock::min_time) {
return false;
}
+
// There are no offsets to compute for ourself, so always succeed.
log_namer_->SetStartTimes(node_index, event_loop_->boot_uuid(),
monotonic_start_time, realtime_start_time,
@@ -603,7 +720,148 @@
}
}
-void Logger::LogUntil(monotonic_clock::time_point t) {
+void Logger::WriteData(NewDataWriter *writer, const FetcherStruct &f) {
+ if (writer != nullptr) {
+ const UUID source_node_boot_uuid =
+ static_cast<int>(node_index_) != f.data_node_index
+ ? f.fetcher->context().source_boot_uuid
+ : event_loop_->boot_uuid();
+ // Write!
+ const auto start = event_loop_->monotonic_now();
+ flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+ max_header_size_);
+ fbb.ForceDefaults(true);
+
+ fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+ f.channel_index, f.log_type));
+ const auto end = event_loop_->monotonic_now();
+ RecordCreateMessageTime(start, end, f);
+
+ max_header_size_ = std::max(max_header_size_,
+ fbb.GetSize() - f.fetcher->context().size);
+ writer->QueueMessage(&fbb, source_node_boot_uuid, end);
+
+ VLOG(2) << "Wrote data as node "
+ << FlatbufferToJson(node_) << " for channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel())
+ << " to " << writer->filename() << " data "
+ << FlatbufferToJson(
+ flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
+ }
+}
+
+void Logger::WriteTimestamps(NewDataWriter *timestamp_writer, const FetcherStruct &f) {
+ if (timestamp_writer != nullptr) {
+ // And now handle timestamps.
+ const auto start = event_loop_->monotonic_now();
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+ f.channel_index,
+ LogType::kLogDeliveryTimeOnly));
+ const auto end = event_loop_->monotonic_now();
+ RecordCreateMessageTime(start, end, f);
+
+ // Tell our writer that we know something about the remote boot.
+ timestamp_writer->UpdateRemote(
+ f.data_node_index, f.fetcher->context().source_boot_uuid,
+ f.fetcher->context().monotonic_remote_time,
+ f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
+ timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
+
+ VLOG(2) << "Wrote timestamps as node "
+ << FlatbufferToJson(node_) << " for channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel())
+ << " to " << timestamp_writer->filename() << " timestamp "
+ << FlatbufferToJson(
+ flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
+ }
+}
+
+void Logger::WriteContent(NewDataWriter *contents_writer, const FetcherStruct &f) {
+ if (contents_writer != nullptr) {
+ const auto start = event_loop_->monotonic_now();
+ // And now handle the special message contents channel. Copy the
+ // message into a FlatBufferBuilder and save it to disk.
+ // TODO(austin): We can be more efficient here when we start to
+ // care...
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ const RemoteMessage *msg =
+ flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
+
+ CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
+
+ logger::MessageHeader::Builder message_header_builder(fbb);
+
+ // TODO(austin): This needs to check the channel_index and confirm
+ // that it should be logged before squirreling away the timestamp to
+ // disk. We don't want to log irrelevant timestamps.
+
+ // Note: this must match the same order as MessageBridgeServer and
+ // PackMessage. We want identical headers to have identical
+ // on-the-wire formats to make comparing them easier.
+
+ // Translate from the channel index that the event loop uses to the
+ // channel index in the log file.
+ message_header_builder.add_channel_index(
+ event_loop_to_logged_channel_index_[msg->channel_index()]);
+
+ message_header_builder.add_queue_index(msg->queue_index());
+ message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
+ message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
+
+ message_header_builder.add_monotonic_remote_time(
+ msg->monotonic_remote_time());
+ message_header_builder.add_realtime_remote_time(
+ msg->realtime_remote_time());
+ message_header_builder.add_remote_queue_index(msg->remote_queue_index());
+
+ const aos::monotonic_clock::time_point monotonic_timestamp_time =
+ f.fetcher->context().monotonic_event_time;
+ message_header_builder.add_monotonic_timestamp_time(
+ monotonic_timestamp_time.time_since_epoch().count());
+
+ fbb.FinishSizePrefixed(message_header_builder.Finish());
+ const auto end = event_loop_->monotonic_now();
+ RecordCreateMessageTime(start, end, f);
+
+ // Timestamps tell us information about what happened too!
+ // Capture any reboots so UpdateRemote is properly recorded.
+ contents_writer->UpdateBoot(UUID::FromVector(msg->boot_uuid()));
+
+ // Start with recording info about the data flowing from our node to the
+ // remote.
+ const bool reliable =
+ f.channel_reliable_contents.size() != 0u
+ ? f.channel_reliable_contents[msg->channel_index()]
+ : f.reliable_contents;
+
+ contents_writer->UpdateRemote(node_index_, event_loop_->boot_uuid(),
+ monotonic_clock::time_point(
+ chrono::nanoseconds(msg->monotonic_remote_time())),
+ monotonic_clock::time_point(
+ chrono::nanoseconds(msg->monotonic_sent_time())),
+ reliable, monotonic_timestamp_time);
+
+ contents_writer->QueueMessage(
+ &fbb, UUID::FromVector(msg->boot_uuid()), end);
+ }
+}
+
+void Logger::WriteFetchedRecord(FetcherStruct &f) {
+ WriteData(f.writer, f);
+ WriteTimestamps(f.timestamp_writer, f);
+ WriteContent(f.contents_writer, f);
+}
+
+bool Logger::LogUntil(monotonic_clock::time_point t) {
+ bool has_pending_messages = false;
+
// Grab the latest ServerStatistics message. This will always have the
// oppertunity to be >= to the current time, so it will always represent any
// reboots which may have happened.
@@ -628,143 +886,18 @@
// TODO(james): Write tests to exercise this logic.
if (f.fetcher->context().monotonic_event_time >= t) {
+ has_pending_messages = true;
break;
}
- if (f.writer != nullptr) {
- const UUID source_node_boot_uuid =
- static_cast<int>(node_index_) != f.data_node_index
- ? f.fetcher->context().source_boot_uuid
- : event_loop_->boot_uuid();
- // Write!
- const auto start = event_loop_->monotonic_now();
- flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
- max_header_size_);
- fbb.ForceDefaults(true);
- fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- f.channel_index, f.log_type));
- const auto end = event_loop_->monotonic_now();
- RecordCreateMessageTime(start, end, &f);
-
- VLOG(2) << "Writing data as node "
- << FlatbufferToJson(node_) << " for channel "
- << configuration::CleanedChannelToString(f.fetcher->channel())
- << " to " << f.writer->filename() << " data "
- << FlatbufferToJson(
- flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
-
- max_header_size_ = std::max(max_header_size_,
- fbb.GetSize() - f.fetcher->context().size);
- f.writer->QueueMessage(&fbb, source_node_boot_uuid, end);
- }
-
- if (f.timestamp_writer != nullptr) {
- // And now handle timestamps.
- const auto start = event_loop_->monotonic_now();
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
-
- fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- f.channel_index,
- LogType::kLogDeliveryTimeOnly));
- const auto end = event_loop_->monotonic_now();
- RecordCreateMessageTime(start, end, &f);
-
- VLOG(2) << "Writing timestamps as node "
- << FlatbufferToJson(node_) << " for channel "
- << configuration::CleanedChannelToString(f.fetcher->channel())
- << " to " << f.timestamp_writer->filename() << " timestamp "
- << FlatbufferToJson(
- flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
-
- // Tell our writer that we know something about the remote boot.
- f.timestamp_writer->UpdateRemote(
- f.data_node_index, f.fetcher->context().source_boot_uuid,
- f.fetcher->context().monotonic_remote_time,
- f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
- f.timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
- }
-
- if (f.contents_writer != nullptr) {
- const auto start = event_loop_->monotonic_now();
- // And now handle the special message contents channel. Copy the
- // message into a FlatBufferBuilder and save it to disk.
- // TODO(austin): We can be more efficient here when we start to
- // care...
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
-
- const RemoteMessage *msg =
- flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
-
- CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
-
- logger::MessageHeader::Builder message_header_builder(fbb);
-
- // TODO(austin): This needs to check the channel_index and confirm
- // that it should be logged before squirreling away the timestamp to
- // disk. We don't want to log irrelevant timestamps.
-
- // Note: this must match the same order as MessageBridgeServer and
- // PackMessage. We want identical headers to have identical
- // on-the-wire formats to make comparing them easier.
-
- // Translate from the channel index that the event loop uses to the
- // channel index in the log file.
- message_header_builder.add_channel_index(
- event_loop_to_logged_channel_index_[msg->channel_index()]);
-
- message_header_builder.add_queue_index(msg->queue_index());
- message_header_builder.add_monotonic_sent_time(
- msg->monotonic_sent_time());
- message_header_builder.add_realtime_sent_time(
- msg->realtime_sent_time());
-
- message_header_builder.add_monotonic_remote_time(
- msg->monotonic_remote_time());
- message_header_builder.add_realtime_remote_time(
- msg->realtime_remote_time());
- message_header_builder.add_remote_queue_index(
- msg->remote_queue_index());
-
- const aos::monotonic_clock::time_point monotonic_timestamp_time =
- f.fetcher->context().monotonic_event_time;
- message_header_builder.add_monotonic_timestamp_time(
- monotonic_timestamp_time.time_since_epoch().count());
-
- fbb.FinishSizePrefixed(message_header_builder.Finish());
- const auto end = event_loop_->monotonic_now();
- RecordCreateMessageTime(start, end, &f);
-
- // Timestamps tell us information about what happened too!
- // Capture any reboots so UpdateRemote is properly recorded.
- f.contents_writer->UpdateBoot(UUID::FromVector(msg->boot_uuid()));
-
- // Start with recording info about the data flowing from our node to the
- // remote.
- const bool reliable =
- f.channel_reliable_contents.size() != 0u
- ? f.channel_reliable_contents[msg->channel_index()]
- : f.reliable_contents;
-
- f.contents_writer->UpdateRemote(
- node_index_, event_loop_->boot_uuid(),
- monotonic_clock::time_point(
- chrono::nanoseconds(msg->monotonic_remote_time())),
- monotonic_clock::time_point(
- chrono::nanoseconds(msg->monotonic_sent_time())),
- reliable, monotonic_timestamp_time);
-
- f.contents_writer->QueueMessage(
- &fbb, UUID::FromVector(msg->boot_uuid()), end);
- }
+ WriteFetchedRecord(f);
f.written = true;
}
}
last_synchronized_time_ = t;
+
+ return has_pending_messages;
}
void Logger::DoLogData(const monotonic_clock::time_point end_time,
@@ -808,15 +941,15 @@
void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
aos::monotonic_clock::time_point end,
- FetcherStruct *fetcher) {
+ const FetcherStruct &fetcher) {
const auto duration = end - start;
total_copy_time_ += duration;
++total_copy_count_;
- total_copy_bytes_ += fetcher->fetcher->context().size;
+ total_copy_bytes_ += fetcher.fetcher->context().size;
if (duration > max_copy_time_) {
max_copy_time_ = duration;
- max_copy_time_channel_ = fetcher->channel_index;
- max_copy_time_size_ = fetcher->fetcher->context().size;
+ max_copy_time_channel_ = fetcher.channel_index;
+ max_copy_time_size_ = fetcher.fetcher->context().size;
}
}
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
index 6b04f5d..3c75f29 100644
--- a/aos/events/logging/log_writer.h
+++ b/aos/events/logging/log_writer.h
@@ -128,6 +128,11 @@
void StartLogging(std::unique_ptr<LogNamer> log_namer,
std::optional<UUID> log_start_uuid = std::nullopt);
+ // Restart logging using a new naming scheme. Intended for log rotation.
+ // Returns a unique_ptr to the prior log_namer instance.
+ std::unique_ptr<LogNamer> RestartLogging(std::unique_ptr<LogNamer> log_namer,
+ std::optional<UUID> log_start_uuid = std::nullopt);
+
// Moves the current log location to the new name. Returns true if a change
// was made, false otherwise.
// Only renaming the folder is supported, not the file base name.
@@ -216,6 +221,9 @@
// channel index.
std::vector<int> event_loop_to_logged_channel_index_;
+ // Start/Restart write configuration into LogNamer space.
+ std::string WriteConfiguration(LogNamer* log_namer);
+
void WriteHeader();
// Makes a template header for all the follower nodes.
@@ -232,10 +240,18 @@
void WriteMissingTimestamps();
+ void WriteData(NewDataWriter *writer, const FetcherStruct &f);
+ void WriteTimestamps(NewDataWriter *timestamps_writer, const FetcherStruct &f);
+ void WriteContent(NewDataWriter *contents_writer, const FetcherStruct &f);
+
+ void WriteFetchedRecord(FetcherStruct &f);
+
// Fetches from each channel until all the data is logged. This is dangerous
// because it lets you log for more than 1 period. All calls need to verify
// that t isn't greater than 1 period in the future.
- void LogUntil(monotonic_clock::time_point t);
+ // Returns true if there is at least one message that has been fetched but
+ // not yet written.
+ bool LogUntil(monotonic_clock::time_point t);
void RecordFetchResult(aos::monotonic_clock::time_point start,
aos::monotonic_clock::time_point end, bool got_new,
@@ -243,7 +259,7 @@
void RecordCreateMessageTime(aos::monotonic_clock::time_point start,
aos::monotonic_clock::time_point end,
- FetcherStruct *fetcher);
+ const FetcherStruct &fetcher);
EventLoop *const event_loop_;
// The configuration to place at the top of the log file.