Add option to run the logger X second behind.
This lets us rotate when an event happens, but trigger the rotation
before the event is written to disk.
Change-Id: Ic275702db768b332a175a8eb86fa37844c7dbff2
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 312a6ae..5ccf15b 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -27,8 +27,9 @@
node_(configuration::GetNode(configuration_, event_loop->node())),
node_index_(configuration::GetNodeIndex(configuration_, node_)),
name_(network::GetHostname()),
- timer_handler_(event_loop_->AddTimer(
- [this]() { DoLogData(event_loop_->monotonic_now(), true); })),
+ timer_handler_(event_loop_->AddTimer([this]() {
+ DoLogData(event_loop_->monotonic_now() - logging_delay_, true);
+ })),
server_statistics_fetcher_(
configuration::MultiNode(event_loop_->configuration())
? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
@@ -291,6 +292,8 @@
const aos::monotonic_clock::time_point beginning_time =
event_loop_->monotonic_now();
+ log_until_time_ = beginning_time;
+
// Grab data from each channel right before we declare the log file started
// so we can capture the latest message on each channel. This lets us have
// non periodic messages with configuration that now get logged.
@@ -338,77 +341,45 @@
}
std::unique_ptr<LogNamer> Logger::RestartLogging(
- std::unique_ptr<LogNamer> log_namer, std::optional<UUID> log_start_uuid) {
+ std::unique_ptr<LogNamer> log_namer, std::optional<UUID> log_start_uuid,
+ std::optional<monotonic_clock::time_point> end_time) {
CHECK(log_namer_) << ": Unexpected restart while not logging";
VLOG(1) << "Restarting logger for " << FlatbufferToJson(node_);
- // Make sure not to write past now so we don't risk out of order problems. We
- // don't want to get into a situation where we write out up to now + 0.1 sec,
- // and that operation takes ~0.1 seconds, so we end up writing a different
- // amount of the early and late channels. That would then result in the next
- // go around finding more than 0.1 sec of data on the early channels.
- //
- // Make sure we read up until "now" and log it. This sets us up so that we
- // are unlikely to fetch a message far in the future and have a ton of data
- // before the offical start time.
- monotonic_clock::time_point newest_record = monotonic_clock::min_time;
- while (true) {
- aos::monotonic_clock::time_point next_time =
- last_synchronized_time_ + polling_period_;
- const aos::monotonic_clock::time_point monotonic_now =
- event_loop_->monotonic_now();
- if (next_time > monotonic_now) {
- next_time = monotonic_now;
- }
+ // Grab a representative time on both the RT and monotonic clock.
+ // Average a monotonic clock before and after to reduce the error.
+ const aos::monotonic_clock::time_point monotonic_now1 =
+ event_loop_->monotonic_now();
+ const aos::realtime_clock::time_point realtime_now =
+ event_loop_->realtime_now();
+ const aos::monotonic_clock::time_point monotonic_now2 =
+ event_loop_->monotonic_now();
- bool wrote_messages = false;
- std::tie(wrote_messages, newest_record) = LogUntil(next_time);
-
- if (next_time == monotonic_now &&
- (!wrote_messages || newest_record < monotonic_now + polling_period_)) {
- // If we stopped writing messages, then we probably have stopped making
- // progress. If the newest record (unwritten or written) on a channel is
- // very close to the current time, then there won't be much data
- // officially after the end of the last log but before the start of the
- // current one. We need to pick the start of the current log to be after
- // the last message on record so we don't have holes in the log.
- break;
+ // Log until the provided end time.
+ if (end_time) {
+ CHECK_LE(*end_time, monotonic_now1) << ": Can't log into the future.";
+ // DoLogData is a bit fragile.
+ if (*end_time > last_synchronized_time_) {
+ DoLogData(*end_time, false);
}
}
- // We are now synchronized up to last_synchronized_time_. Our start time can
- // safely be "newest_record". But, we need to guarentee that the start time
- // is after the newest message we have a record of, and that we don't skip any
- // messages as we rotate. This means we can't call Fetch anywhere.
+ // We are now synchronized up to last_synchronized_time_. We only have record
+ // of messages from before last_synchronized_time_, so it is a safe start
+ // time.
std::unique_ptr<LogNamer> old_log_namer = std::move(log_namer_);
log_namer_ = std::move(log_namer);
- // Now grab a representative time on both the RT and monotonic clock.
- // Average a monotonic clock before and after to reduce the error.
- const aos::monotonic_clock::time_point beginning_time =
- event_loop_->monotonic_now();
- const aos::realtime_clock::time_point beginning_time_rt =
- event_loop_->realtime_now();
- const aos::monotonic_clock::time_point beginning_time2 =
- event_loop_->monotonic_now();
-
- if (beginning_time > last_synchronized_time_) {
- LOG(WARNING) << "Took over " << polling_period_.count()
- << "ns to swap log_namer";
- }
-
- // Our start time is now the newest message we have a record of. We will
- // declare the old log "done", and start in on the new one, double-logging
- // anything we have a record of so we have all the messages from before the
- // start.
- const aos::monotonic_clock::time_point monotonic_start_time = newest_record;
+ // Our start time is now how far we logged until before.
+ const aos::monotonic_clock::time_point monotonic_start_time =
+ last_synchronized_time_;
const aos::realtime_clock::time_point realtime_start_time =
- (beginning_time_rt + (monotonic_start_time.time_since_epoch() -
- ((beginning_time.time_since_epoch() +
- beginning_time2.time_since_epoch()) /
- 2)));
+ (realtime_now + (last_synchronized_time_.time_since_epoch() -
+ ((monotonic_now1.time_since_epoch() +
+ monotonic_now2.time_since_epoch()) /
+ 2)));
auto config_sha256 = WriteConfiguration(log_namer_.get());
@@ -459,7 +430,7 @@
VLOG(1) << "Logging node as " << FlatbufferToJson(node_) << " restart_time "
<< last_synchronized_time_ << ", took "
- << chrono::duration<double>(header_time - beginning_time).count()
+ << chrono::duration<double>(header_time - monotonic_now1).count()
<< " to prepare and write header, "
<< chrono::duration<double>(channel_time - header_time).count()
<< " to write initial channel messages, boot uuid "
@@ -827,6 +798,8 @@
bool wrote_messages = false;
monotonic_clock::time_point newest_record = monotonic_clock::min_time;
+ log_until_time_ = t;
+
// Grab the latest ServerStatistics message. This will always have the
// oppertunity to be >= to the current time, so it will always represent any
// reboots which may have happened.
@@ -842,7 +815,8 @@
while (true) {
if (f.written) {
const auto start = event_loop_->monotonic_now();
- const bool got_new = f.fetcher->FetchNext();
+ const bool got_new =
+ f.fetcher->FetchNextIf(std::ref(fetch_next_if_fn_));
const auto end = event_loop_->monotonic_now();
RecordFetchResult(start, end, got_new, &f);
if (!got_new) {
@@ -857,7 +831,11 @@
}
// TODO(james): Write tests to exercise this logic.
- if (f.fetcher->context().monotonic_event_time >= t) {
+ CHECK_LE(f.fetcher->context().monotonic_event_time, t);
+
+ // At startup, we can end up grabbing a message at the current time.
+ // Ignore it.
+ if (f.fetcher->context().monotonic_event_time == t) {
break;
}
@@ -874,6 +852,9 @@
void Logger::DoLogData(const monotonic_clock::time_point end_time,
bool run_on_logged) {
+ if (end_time < last_synchronized_time_) return;
+
+ DCHECK(is_started());
// We want to guarantee that messages aren't out of order by more than
// max_out_of_order_duration. To do this, we need sync points. Every write
// cycle should be a sync point.
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
index 837a02a..3d3d7f2 100644
--- a/aos/events/logging/log_writer.h
+++ b/aos/events/logging/log_writer.h
@@ -66,6 +66,15 @@
separate_config_ = separate_config;
}
+ // Sets the amount to run the logger behind the current time. This lets us
+ // make decisions about rotating or stopping logging before something happens.
+ // Using this to start logging in the past isn't yet supported. This can be
+ // changed at runtime, but will only influence future writes, not what is
+ // already written.
+ void set_logging_delay(std::chrono::nanoseconds logging_delay) {
+ logging_delay_ = logging_delay;
+ }
+
// Sets the period between polling the data. Defaults to 100ms.
//
// Changing this while a set of files is being written may result in
@@ -139,11 +148,14 @@
void StartLogging(std::unique_ptr<LogNamer> log_namer,
std::optional<UUID> log_start_uuid = std::nullopt);
- // Restart logging using a new naming scheme. Intended for log rotation.
- // Returns a unique_ptr to the prior log_namer instance.
+ // Restarts logging using a new naming scheme. Intended for log rotation.
+ // Returns a unique_ptr to the prior log_namer instance. If provided,
+ // end_time is the time to log until. It must be in the past. Times before
+ // the last_synchronized_time are ignored.
std::unique_ptr<LogNamer> RestartLogging(
std::unique_ptr<LogNamer> log_namer,
- std::optional<UUID> log_start_uuid = std::nullopt);
+ std::optional<UUID> log_start_uuid = std::nullopt,
+ std::optional<monotonic_clock::time_point> end_time = std::nullopt);
// Stops logging. Ensures any messages through end_time make it into the log.
//
@@ -331,6 +343,16 @@
// Fetcher for all the statistics from all the nodes.
aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
+
+ monotonic_clock::time_point log_until_time_ = monotonic_clock::min_time;
+
+ std::function<bool(const Context &)> fetch_next_if_fn_ =
+ [this](const Context &context) {
+ return context.monotonic_event_time < log_until_time_;
+ };
+
+ // Amount of time to run the logger behind now.
+ std::chrono::nanoseconds logging_delay_ = std::chrono::nanoseconds(0);
};
} // namespace logger