Support stopping and starting logging at runtime
Change-Id: If1e1b7119808d1f56e96efb71ea7000e0fa13fe8
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 3032a6f..859e24c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -35,43 +35,30 @@
namespace logger {
namespace chrono = std::chrono;
-Logger::Logger(std::string_view base_name, EventLoop *event_loop,
- std::chrono::milliseconds polling_period)
- : Logger(base_name, event_loop, event_loop->configuration(),
- polling_period) {}
-Logger::Logger(std::string_view base_name, EventLoop *event_loop,
- const Configuration *configuration,
- std::chrono::milliseconds polling_period)
- : Logger(std::make_unique<LocalLogNamer>(base_name, event_loop->node()),
- event_loop, configuration, polling_period) {}
-Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
- std::chrono::milliseconds polling_period)
- : Logger(std::move(log_namer), event_loop, event_loop->configuration(),
- polling_period) {}
-
-Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
- const Configuration *configuration,
- std::chrono::milliseconds polling_period)
+Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
+ std::function<bool(const Channel *)> should_log)
: event_loop_(event_loop),
- uuid_(UUID::Random()),
- log_namer_(std::move(log_namer)),
configuration_(configuration),
name_(network::GetHostname()),
- timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
- polling_period_(polling_period),
+ timer_handler_(event_loop_->AddTimer(
+ [this]() { DoLogData(event_loop_->monotonic_now()); })),
server_statistics_fetcher_(
configuration::MultiNode(event_loop_->configuration())
? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
"/aos")
: aos::Fetcher<message_bridge::ServerStatistics>()) {
- VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
- int channel_index = 0;
+ VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
// Find all the nodes which are logging timestamps on our node.
std::set<const Node *> timestamp_logger_nodes;
for (const Channel *channel : *configuration_->channels()) {
- if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) ||
- !channel->has_destination_nodes()) {
+ if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
+ continue;
+ }
+ if (!channel->has_destination_nodes()) {
+ continue;
+ }
+ if (!should_log(channel)) {
continue;
}
for (const Connection *connection : *channel->destination_nodes()) {
@@ -103,44 +90,53 @@
<< event_loop_->node()->name()->string_view()
<< " but can't find channel /aos/remote_timestamps/"
<< node->name()->string_view();
+ if (!should_log(channel)) {
+ continue;
+ }
timestamp_logger_channels.insert(std::make_pair(channel, node));
}
const size_t our_node_index =
configuration::GetNodeIndex(configuration_, event_loop_->node());
- for (const Channel *config_channel : *configuration_->channels()) {
+ for (size_t channel_index = 0;
+ channel_index < configuration_->channels()->size(); ++channel_index) {
+ const Channel *const config_channel =
+ configuration_->channels()->Get(channel_index);
// The MakeRawFetcher method needs a channel which is in the event loop
// configuration() object, not the configuration_ object. Go look that up
// from the config.
const Channel *channel = aos::configuration::GetChannel(
event_loop_->configuration(), config_channel->name()->string_view(),
config_channel->type()->string_view(), "", event_loop_->node());
+ if (!should_log(channel)) {
+ continue;
+ }
FetcherStruct fs;
fs.node_index = our_node_index;
+ fs.channel_index = channel_index;
+ fs.channel = channel;
+
const bool is_local =
configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
const bool is_readable =
configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
- const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
- channel, event_loop_->node()) &&
- is_readable;
+ const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
+ channel, event_loop_->node());
+ const bool log_message = is_logged && is_readable;
- const bool log_delivery_times =
- (event_loop_->node() == nullptr)
- ? false
- : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
- channel, event_loop_->node(), event_loop_->node());
+ bool log_delivery_times = false;
+ if (event_loop_->node() != nullptr) {
+ log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ channel, event_loop_->node(), event_loop_->node());
+ }
// Now, detect a MessageHeader timestamp logger where we should just log the
// contents to a file directly.
const bool log_contents = timestamp_logger_channels.find(channel) !=
timestamp_logger_channels.end();
- const Node *timestamp_node =
- log_contents ? timestamp_logger_channels.find(channel)->second
- : nullptr;
if (log_message || log_delivery_times || log_contents) {
fs.fetcher = event_loop->MakeRawFetcher(channel);
@@ -149,11 +145,11 @@
if (log_delivery_times) {
VLOG(1) << " Delivery times";
- fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
+ fs.wants_timestamp_writer = true;
}
if (log_message) {
VLOG(1) << " Data";
- fs.writer = log_namer_->MakeWriter(channel);
+ fs.wants_writer = true;
if (!is_local) {
fs.log_type = LogType::kLogRemoteMessage;
}
@@ -161,18 +157,50 @@
if (log_contents) {
VLOG(1) << "Timestamp logger channel "
<< configuration::CleanedChannelToString(channel);
- fs.contents_writer =
- log_namer_->MakeForwardedTimestampWriter(channel, timestamp_node);
+ fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
+ fs.wants_contents_writer = true;
fs.node_index =
- configuration::GetNodeIndex(configuration_, timestamp_node);
+ configuration::GetNodeIndex(configuration_, fs.timestamp_node);
}
- fs.channel_index = channel_index;
- fs.written = false;
fetchers_.emplace_back(std::move(fs));
}
- ++channel_index;
+ }
+}
+
+Logger::~Logger() {
+ if (log_namer_) {
+ // If we are replaying a log file, or in simulation, we want to force the
+ // last bit of data to be logged. The easiest way to deal with this is to
+ // poll everything as we go to destroy the class, ie, shut down the logger,
+ // and write it to disk.
+ StopLogging(event_loop_->monotonic_now());
+ }
+}
+
+void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer) {
+ CHECK(!log_namer_) << ": Already logging";
+ log_namer_ = std::move(log_namer);
+ uuid_ = UUID::Random();
+ VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
+
+ // We want to do as much work as possible before the initial Fetch. Time
+ // between that and actually starting to log opens up the possibility of
+ // falling off the end of the queue during that time.
+
+ for (FetcherStruct &f : fetchers_) {
+ if (f.wants_writer) {
+ f.writer = log_namer_->MakeWriter(f.channel);
+ }
+ if (f.wants_timestamp_writer) {
+ f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
+ }
+ if (f.wants_contents_writer) {
+ f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
+ f.channel, CHECK_NOTNULL(f.timestamp_node));
+ }
}
+ CHECK(node_state_.empty());
node_state_.resize(configuration::MultiNode(configuration_)
? configuration_->nodes()->size()
: 1u);
@@ -183,21 +211,6 @@
node_state_[node_index].log_file_header = MakeHeader(node);
}
- // When things start, we want to log the header, then the most recent
- // messages available on each fetcher to capture the previous state, then
- // start polling.
- event_loop_->OnRun([this]() { StartLogging(); });
-}
-
-Logger::~Logger() {
- // If we are replaying a log file, or in simulation, we want to force the last
- // bit of data to be logged. The easiest way to deal with this is to poll
- // everything as we go to destroy the class, ie, shut down the logger, and
- // write it to disk.
- DoLogData();
-}
-
-void Logger::StartLogging() {
// Grab data from each channel right before we declare the log file started
// so we can capture the latest message on each channel. This lets us have
// non periodic messages with configuration that now get logged.
@@ -219,6 +232,25 @@
polling_period_);
}
+std::unique_ptr<LogNamer> Logger::StopLogging(
+ aos::monotonic_clock::time_point end_time) {
+ CHECK(log_namer_) << ": Not logging right now";
+
+ if (end_time != aos::monotonic_clock::min_time) {
+ LogUntil(end_time);
+ }
+ timer_handler_->Disable();
+
+ for (FetcherStruct &f : fetchers_) {
+ f.writer = nullptr;
+ f.timestamp_writer = nullptr;
+ f.contents_writer = nullptr;
+ }
+ node_state_.clear();
+
+ return std::move(log_namer_);
+}
+
void Logger::WriteHeader() {
if (configuration::MultiNode(configuration_)) {
server_statistics_fetcher_.Fetch();
@@ -356,6 +388,7 @@
flatbuffers::Offset<flatbuffers::String> name_offset =
fbb.CreateString(name_);
+ CHECK(uuid_ != UUID::Zero());
flatbuffers::Offset<flatbuffers::String> logger_uuid_offset =
fbb.CreateString(uuid_.string_view());
@@ -384,8 +417,7 @@
// message. Report back 3x to be extra safe, and because the cost isn't
// huge on the read side.
log_file_header_builder.add_max_out_of_order_duration(
- std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
- .count());
+ std::chrono::nanoseconds(3 * polling_period_).count());
log_file_header_builder.add_monotonic_start_time(
std::chrono::duration_cast<std::chrono::nanoseconds>(
@@ -522,22 +554,21 @@
last_synchronized_time_ = t;
}
-void Logger::DoLogData() {
- // We want to guarentee that messages aren't out of order by more than
+void Logger::DoLogData(const monotonic_clock::time_point end_time) {
+ // We want to guarantee that messages aren't out of order by more than
// max_out_of_order_duration. To do this, we need sync points. Every write
// cycle should be a sync point.
- const monotonic_clock::time_point monotonic_now =
- event_loop_->monotonic_now();
do {
// Move the sync point up by at most polling_period. This forces one sync
// per iteration, even if it is small.
- LogUntil(
- std::min(last_synchronized_time_ + polling_period_, monotonic_now));
+ LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
+
+ on_logged_period_();
// If we missed cycles, we could be pretty far behind. Spin until we are
// caught up.
- } while (last_synchronized_time_ + polling_period_ < monotonic_now);
+ } while (last_synchronized_time_ + polling_period_ < end_time);
}
std::vector<LogFile> SortParts(const std::vector<std::string> &parts) {