Split logger.h into log_reader and log_writer
Much simpler!
Change-Id: I6c4ee363b56b67dac40c456261bbed79d01b8eb6
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index b88478d..fcf7683 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -166,18 +166,52 @@
)
cc_library(
- name = "logger",
+ name = "log_namer",
srcs = [
"log_namer.cc",
- "logger.cc",
],
hdrs = [
"log_namer.h",
- "logger.h",
+ ],
+ deps = [
+ ":logfile_utils",
+ ":logger_fbs",
+ ":uuid",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ ],
+)
+
+cc_library(
+ name = "log_writer",
+ srcs = [
+ "log_writer.cc",
+ ],
+ hdrs = [
+ "log_writer.h",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ ":log_namer",
+ "//aos:configuration",
+ "//aos/events:event_loop",
+ "//aos/events:simulated_event_loop",
+ "//aos/network:message_bridge_server_fbs",
+ ],
+)
+
+cc_library(
+ name = "log_reader",
+ srcs = [
+ "log_reader.cc",
+ ],
+ hdrs = [
+ "log_reader.h",
],
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
deps = [
+ ":log_namer",
+ ":log_writer",
":logfile_utils",
":logger_fbs",
":uuid",
@@ -204,7 +238,7 @@
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
deps = [
- ":logger",
+ ":log_reader",
"//aos:configuration",
"//aos:init",
"//aos:json_to_flatbuffer",
@@ -239,7 +273,7 @@
],
target_compatible_with = ["@platforms//os:linux"],
deps = [
- ":logger",
+ ":log_reader",
"//aos:configuration",
"//aos:init",
"//aos:json_to_flatbuffer",
@@ -256,7 +290,7 @@
],
target_compatible_with = ["@platforms//os:linux"],
deps = [
- ":logger",
+ ":log_reader",
"//aos:configuration",
"//aos:init",
"//aos:json_to_flatbuffer",
@@ -275,7 +309,7 @@
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
deps = [
- ":logger",
+ ":log_writer",
"//aos:configuration",
"//aos:init",
"//aos/events:shm_event_loop",
@@ -315,7 +349,8 @@
shard_count = 5,
target_compatible_with = ["@platforms//os:linux"],
deps = [
- ":logger",
+ ":log_reader",
+ ":log_writer",
"//aos/events:message_counter",
"//aos/events:ping_lib",
"//aos/events:pong_lib",
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 2e23e0a..2032f9f 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -7,7 +7,7 @@
#include <vector>
#include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/init.h"
#include "aos/json_to_flatbuffer.h"
diff --git a/aos/events/logging/log_edit.cc b/aos/events/logging/log_edit.cc
index 02a1402..7eff9e7 100644
--- a/aos/events/logging/log_edit.cc
+++ b/aos/events/logging/log_edit.cc
@@ -1,7 +1,7 @@
#include <iostream>
#include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/init.h"
#include "aos/json_to_flatbuffer.h"
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/log_reader.cc
similarity index 62%
rename from aos/events/logging/logger.cc
rename to aos/events/logging/log_reader.cc
index ff45143..c111a73 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/log_reader.cc
@@ -1,4 +1,4 @@
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
#include <fcntl.h>
#include <limits.h>
@@ -113,839 +113,6 @@
using message_bridge::RemoteMessage;
} // namespace
-Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
- std::function<bool(const Channel *)> should_log)
- : event_loop_(event_loop),
- configuration_(configuration),
- name_(network::GetHostname()),
- timer_handler_(event_loop_->AddTimer(
- [this]() { DoLogData(event_loop_->monotonic_now()); })),
- server_statistics_fetcher_(
- configuration::MultiNode(event_loop_->configuration())
- ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
- "/aos")
- : aos::Fetcher<message_bridge::ServerStatistics>()) {
- VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
-
- // Find all the nodes which are logging timestamps on our node. This may
- // over-estimate if should_log is specified.
- std::vector<const Node *> timestamp_logger_nodes =
- configuration::TimestampNodes(configuration_, event_loop_->node());
-
- std::map<const Channel *, const Node *> timestamp_logger_channels;
-
- // Now that we have all the nodes accumulated, make remote timestamp loggers
- // for them.
- for (const Node *node : timestamp_logger_nodes) {
- // Note: since we are doing a find using the event loop channel, we need to
- // make sure this channel pointer is part of the event loop configuration,
- // not configuration_. This only matters when configuration_ !=
- // event_loop->configuration();
- const Channel *channel = configuration::GetChannel(
- event_loop->configuration(),
- absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
- RemoteMessage::GetFullyQualifiedName(), event_loop_->name(),
- event_loop_->node());
-
- CHECK(channel != nullptr)
- << ": Remote timestamps are logged on "
- << event_loop_->node()->name()->string_view()
- << " but can't find channel /aos/remote_timestamps/"
- << node->name()->string_view();
- if (!should_log(channel)) {
- continue;
- }
- timestamp_logger_channels.insert(std::make_pair(channel, node));
- }
-
- const size_t our_node_index =
- configuration::GetNodeIndex(configuration_, event_loop_->node());
-
- for (size_t channel_index = 0;
- channel_index < configuration_->channels()->size(); ++channel_index) {
- const Channel *const config_channel =
- configuration_->channels()->Get(channel_index);
- // The MakeRawFetcher method needs a channel which is in the event loop
- // configuration() object, not the configuration_ object. Go look that up
- // from the config.
- const Channel *channel = aos::configuration::GetChannel(
- event_loop_->configuration(), config_channel->name()->string_view(),
- config_channel->type()->string_view(), "", event_loop_->node());
- CHECK(channel != nullptr)
- << ": Failed to look up channel "
- << aos::configuration::CleanedChannelToString(config_channel);
- if (!should_log(channel)) {
- continue;
- }
-
- FetcherStruct fs;
- fs.channel_index = channel_index;
- fs.channel = channel;
-
- const bool is_local =
- configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
-
- const bool is_readable =
- configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
- const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
- channel, event_loop_->node());
- const bool log_message = is_logged && is_readable;
-
- bool log_delivery_times = false;
- if (event_loop_->node() != nullptr) {
- log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
- channel, event_loop_->node(), event_loop_->node());
- }
-
- // Now, detect a RemoteMessage timestamp logger where we should just log the
- // contents to a file directly.
- const bool log_contents = timestamp_logger_channels.find(channel) !=
- timestamp_logger_channels.end();
-
- if (log_message || log_delivery_times || log_contents) {
- fs.fetcher = event_loop->MakeRawFetcher(channel);
- VLOG(1) << "Logging channel "
- << configuration::CleanedChannelToString(channel);
-
- if (log_delivery_times) {
- VLOG(1) << " Delivery times";
- fs.wants_timestamp_writer = true;
- fs.timestamp_node_index = our_node_index;
- }
- if (log_message) {
- VLOG(1) << " Data";
- fs.wants_writer = true;
- if (!is_local) {
- const Node *source_node = configuration::GetNode(
- configuration_, channel->source_node()->string_view());
- fs.data_node_index =
- configuration::GetNodeIndex(configuration_, source_node);
- fs.log_type = LogType::kLogRemoteMessage;
- } else {
- fs.data_node_index = our_node_index;
- }
- }
- if (log_contents) {
- VLOG(1) << "Timestamp logger channel "
- << configuration::CleanedChannelToString(channel);
- fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
- fs.wants_contents_writer = true;
- fs.contents_node_index =
- configuration::GetNodeIndex(configuration_, fs.timestamp_node);
- }
- fetchers_.emplace_back(std::move(fs));
- }
- }
-
- // When we are logging remote timestamps, we need to be able to translate from
- // the channel index that the event loop uses to the channel index in the
- // config in the log file.
- event_loop_to_logged_channel_index_.resize(
- event_loop->configuration()->channels()->size(), -1);
- for (size_t event_loop_channel_index = 0;
- event_loop_channel_index <
- event_loop->configuration()->channels()->size();
- ++event_loop_channel_index) {
- const Channel *event_loop_channel =
- event_loop->configuration()->channels()->Get(event_loop_channel_index);
-
- const Channel *logged_channel = aos::configuration::GetChannel(
- configuration_, event_loop_channel->name()->string_view(),
- event_loop_channel->type()->string_view(), "",
- configuration::GetNode(configuration_, event_loop_->node()));
-
- if (logged_channel != nullptr) {
- event_loop_to_logged_channel_index_[event_loop_channel_index] =
- configuration::ChannelIndex(configuration_, logged_channel);
- }
- }
-}
-
-Logger::~Logger() {
- if (log_namer_) {
- // If we are replaying a log file, or in simulation, we want to force the
- // last bit of data to be logged. The easiest way to deal with this is to
- // poll everything as we go to destroy the class, ie, shut down the logger,
- // and write it to disk.
- StopLogging(event_loop_->monotonic_now());
- }
-}
-
-void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
- std::string_view log_start_uuid) {
- CHECK(!log_namer_) << ": Already logging";
- log_namer_ = std::move(log_namer);
-
- std::string config_sha256;
- if (separate_config_) {
- flatbuffers::FlatBufferBuilder fbb;
- flatbuffers::Offset<aos::Configuration> configuration_offset =
- CopyFlatBuffer(configuration_, &fbb);
- LogFileHeader::Builder log_file_header_builder(fbb);
- log_file_header_builder.add_configuration(configuration_offset);
- fbb.FinishSizePrefixed(log_file_header_builder.Finish());
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
- fbb.Release());
- config_sha256 = Sha256(config_header.span());
- LOG(INFO) << "Config sha256 of " << config_sha256;
- log_namer_->WriteConfiguration(&config_header, config_sha256);
- }
-
- log_event_uuid_ = UUID::Random();
- log_start_uuid_ = log_start_uuid;
- VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
-
- // We want to do as much work as possible before the initial Fetch. Time
- // between that and actually starting to log opens up the possibility of
- // falling off the end of the queue during that time.
-
- for (FetcherStruct &f : fetchers_) {
- if (f.wants_writer) {
- f.writer = log_namer_->MakeWriter(f.channel);
- }
- if (f.wants_timestamp_writer) {
- f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
- }
- if (f.wants_contents_writer) {
- f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
- f.channel, CHECK_NOTNULL(f.timestamp_node));
- }
- }
-
- CHECK(node_state_.empty());
- node_state_.resize(configuration::MultiNode(configuration_)
- ? configuration_->nodes()->size()
- : 1u);
-
- for (const Node *node : log_namer_->nodes()) {
- const int node_index = configuration::GetNodeIndex(configuration_, node);
-
- node_state_[node_index].log_file_header = MakeHeader(node, config_sha256);
- }
-
- // Grab data from each channel right before we declare the log file started
- // so we can capture the latest message on each channel. This lets us have
- // non periodic messages with configuration that now get logged.
- for (FetcherStruct &f : fetchers_) {
- const auto start = event_loop_->monotonic_now();
- const bool got_new = f.fetcher->Fetch();
- const auto end = event_loop_->monotonic_now();
- RecordFetchResult(start, end, got_new, &f);
-
- // If there is a message, we want to write it.
- f.written = f.fetcher->context().data == nullptr;
- }
-
- // Clear out any old timestamps in case we are re-starting logging.
- for (size_t i = 0; i < node_state_.size(); ++i) {
- SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
- monotonic_clock::min_time, realtime_clock::min_time);
- }
-
- WriteHeader();
-
- LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
- << " start_time " << last_synchronized_time_;
-
- // Force logging up until the start of the log file now, so the messages at
- // the start are always ordered before the rest of the messages.
- // Note: this ship may have already sailed, but we don't have to make it
- // worse.
- // TODO(austin): Test...
- LogUntil(last_synchronized_time_);
-
- timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
- polling_period_);
-}
-
-std::unique_ptr<LogNamer> Logger::StopLogging(
- aos::monotonic_clock::time_point end_time) {
- CHECK(log_namer_) << ": Not logging right now";
-
- if (end_time != aos::monotonic_clock::min_time) {
- LogUntil(end_time);
- }
- timer_handler_->Disable();
-
- for (FetcherStruct &f : fetchers_) {
- f.writer = nullptr;
- f.timestamp_writer = nullptr;
- f.contents_writer = nullptr;
- }
- node_state_.clear();
-
- log_event_uuid_ = UUID::Zero();
- log_start_uuid_ = std::string();
-
- return std::move(log_namer_);
-}
-
-void Logger::WriteHeader() {
- if (configuration::MultiNode(configuration_)) {
- server_statistics_fetcher_.Fetch();
- }
-
- aos::monotonic_clock::time_point monotonic_start_time =
- event_loop_->monotonic_now();
- aos::realtime_clock::time_point realtime_start_time =
- event_loop_->realtime_now();
-
- // We need to pick a point in time to declare the log file "started". This
- // starts here. It needs to be after everything is fetched so that the
- // fetchers are all pointed at the most recent message before the start
- // time.
- last_synchronized_time_ = monotonic_start_time;
-
- for (const Node *node : log_namer_->nodes()) {
- const int node_index = configuration::GetNodeIndex(configuration_, node);
- MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
- realtime_start_time);
- MaybeWriteHeader(node_index, node);
- }
-}
-
-void Logger::MaybeWriteHeader(int node_index) {
- if (configuration::MultiNode(configuration_)) {
- return MaybeWriteHeader(node_index,
- configuration_->nodes()->Get(node_index));
- } else {
- return MaybeWriteHeader(node_index, nullptr);
- }
-}
-
-void Logger::MaybeWriteHeader(int node_index, const Node *node) {
- // This function is responsible for writing the header when the header both
- // has valid data, and when it needs to be written.
- if (node_state_[node_index].header_written &&
- node_state_[node_index].header_valid) {
- // The header has been written and is valid, nothing to do.
- return;
- }
- if (!node_state_[node_index].has_source_node_boot_uuid) {
- // Can't write a header if we don't have the boot UUID.
- return;
- }
-
- // WriteHeader writes the first header in a log file. We want to do this only
- // once.
- //
- // Rotate rewrites the same header with a new part ID, but keeps the same part
- // UUID. We don't want that when things reboot, because that implies that
- // parts go together across a reboot.
- //
- // Reboot resets the parts UUID. So, once we've written a header the first
- // time, we want to use Reboot to rotate the log and reset the parts UUID.
- //
- // header_valid is cleared whenever the remote reboots.
- if (node_state_[node_index].header_written) {
- log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
- } else {
- log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
-
- node_state_[node_index].header_written = true;
- }
- node_state_[node_index].header_valid = true;
-}
-
-void Logger::WriteMissingTimestamps() {
- if (configuration::MultiNode(configuration_)) {
- server_statistics_fetcher_.Fetch();
- } else {
- return;
- }
-
- if (server_statistics_fetcher_.get() == nullptr) {
- return;
- }
-
- for (const Node *node : log_namer_->nodes()) {
- const int node_index = configuration::GetNodeIndex(configuration_, node);
- if (MaybeUpdateTimestamp(
- node, node_index,
- server_statistics_fetcher_.context().monotonic_event_time,
- server_statistics_fetcher_.context().realtime_event_time)) {
- CHECK(node_state_[node_index].header_written);
- CHECK(node_state_[node_index].header_valid);
- log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
- } else {
- MaybeWriteHeader(node_index, node);
- }
- }
-}
-
-void Logger::SetStartTime(
- size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
- aos::realtime_clock::time_point realtime_start_time,
- aos::monotonic_clock::time_point logger_monotonic_start_time,
- aos::realtime_clock::time_point logger_realtime_start_time) {
- node_state_[node_index].monotonic_start_time = monotonic_start_time;
- node_state_[node_index].realtime_start_time = realtime_start_time;
- node_state_[node_index]
- .log_file_header.mutable_message()
- ->mutate_monotonic_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- monotonic_start_time.time_since_epoch())
- .count());
-
- // Add logger start times if they are available in the log file header.
- if (node_state_[node_index]
- .log_file_header.mutable_message()
- ->has_logger_monotonic_start_time()) {
- node_state_[node_index]
- .log_file_header.mutable_message()
- ->mutate_logger_monotonic_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- logger_monotonic_start_time.time_since_epoch())
- .count());
- }
-
- if (node_state_[node_index]
- .log_file_header.mutable_message()
- ->has_logger_realtime_start_time()) {
- node_state_[node_index]
- .log_file_header.mutable_message()
- ->mutate_logger_realtime_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- logger_realtime_start_time.time_since_epoch())
- .count());
- }
-
- if (node_state_[node_index]
- .log_file_header.mutable_message()
- ->has_realtime_start_time()) {
- node_state_[node_index]
- .log_file_header.mutable_message()
- ->mutate_realtime_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- realtime_start_time.time_since_epoch())
- .count());
- }
-}
-
-bool Logger::MaybeUpdateTimestamp(
- const Node *node, int node_index,
- aos::monotonic_clock::time_point monotonic_start_time,
- aos::realtime_clock::time_point realtime_start_time) {
- // Bail early if the start times are already set.
- if (node_state_[node_index].monotonic_start_time !=
- monotonic_clock::min_time) {
- return false;
- }
- if (event_loop_->node() == node ||
- !configuration::MultiNode(configuration_)) {
- // There are no offsets to compute for ourself, so always succeed.
- SetStartTime(node_index, monotonic_start_time, realtime_start_time,
- monotonic_start_time, realtime_start_time);
- node_state_[node_index].SetBootUUID(event_loop_->boot_uuid().string_view());
- return true;
- } else if (server_statistics_fetcher_.get() != nullptr) {
- // We must be a remote node now. Look for the connection and see if it is
- // connected.
-
- for (const message_bridge::ServerConnection *connection :
- *server_statistics_fetcher_->connections()) {
- if (connection->node()->name()->string_view() !=
- node->name()->string_view()) {
- continue;
- }
-
- if (connection->state() != message_bridge::State::CONNECTED) {
- VLOG(1) << node->name()->string_view()
- << " is not connected, can't start it yet.";
- break;
- }
-
- // Update the boot UUID as soon as we know we are connected.
- if (!connection->has_boot_uuid()) {
- VLOG(1) << "Missing boot_uuid for node " << aos::FlatbufferToJson(node);
- break;
- }
-
- if (!node_state_[node_index].has_source_node_boot_uuid ||
- node_state_[node_index].source_node_boot_uuid !=
- connection->boot_uuid()->string_view()) {
- node_state_[node_index].SetBootUUID(
- connection->boot_uuid()->string_view());
- }
-
- if (!connection->has_monotonic_offset()) {
- VLOG(1) << "Missing monotonic offset for setting start time for node "
- << aos::FlatbufferToJson(node);
- break;
- }
-
- // Found it and it is connected. Compensate and go.
- SetStartTime(node_index,
- monotonic_start_time +
- std::chrono::nanoseconds(connection->monotonic_offset()),
- realtime_start_time, monotonic_start_time,
- realtime_start_time);
- return true;
- }
- }
- return false;
-}
-
-aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
- const Node *node, std::string_view config_sha256) {
- // Now write the header with this timestamp in it.
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
-
- flatbuffers::Offset<aos::Configuration> configuration_offset;
- if (!separate_config_) {
- configuration_offset = CopyFlatBuffer(configuration_, &fbb);
- } else {
- CHECK(!config_sha256.empty());
- }
-
- const flatbuffers::Offset<flatbuffers::String> name_offset =
- fbb.CreateString(name_);
-
- CHECK(log_event_uuid_ != UUID::Zero());
- const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
- fbb.CreateString(log_event_uuid_.string_view());
-
- const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
- fbb.CreateString(logger_instance_uuid_.string_view());
-
- flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
- if (!log_start_uuid_.empty()) {
- log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
- }
-
- flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
- if (!config_sha256.empty()) {
- config_sha256_offset = fbb.CreateString(config_sha256);
- }
-
- const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
- fbb.CreateString(event_loop_->boot_uuid().string_view());
-
- const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
- fbb.CreateString(event_loop_->boot_uuid().string_view());
-
- const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
- fbb.CreateString("00000000-0000-4000-8000-000000000000");
-
- flatbuffers::Offset<Node> node_offset;
- flatbuffers::Offset<Node> logger_node_offset;
-
- if (configuration::MultiNode(configuration_)) {
- node_offset = RecursiveCopyFlatBuffer(node, &fbb);
- logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
- }
-
- aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
-
- log_file_header_builder.add_name(name_offset);
-
- // Only add the node if we are running in a multinode configuration.
- if (node != nullptr) {
- log_file_header_builder.add_node(node_offset);
- log_file_header_builder.add_logger_node(logger_node_offset);
- }
-
- if (!configuration_offset.IsNull()) {
- log_file_header_builder.add_configuration(configuration_offset);
- }
- // The worst case theoretical out of order is the polling period times 2.
- // One message could get logged right after the boundary, but be for right
- // before the next boundary. And the reverse could happen for another
- // message. Report back 3x to be extra safe, and because the cost isn't
- // huge on the read side.
- log_file_header_builder.add_max_out_of_order_duration(
- std::chrono::nanoseconds(3 * polling_period_).count());
-
- log_file_header_builder.add_monotonic_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- monotonic_clock::min_time.time_since_epoch())
- .count());
- if (node == event_loop_->node()) {
- log_file_header_builder.add_realtime_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- realtime_clock::min_time.time_since_epoch())
- .count());
- } else {
- log_file_header_builder.add_logger_monotonic_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- monotonic_clock::min_time.time_since_epoch())
- .count());
- log_file_header_builder.add_logger_realtime_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- realtime_clock::min_time.time_since_epoch())
- .count());
- }
-
- log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
- log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
- if (!log_start_uuid_offset.IsNull()) {
- log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
- }
- log_file_header_builder.add_logger_node_boot_uuid(
- logger_node_boot_uuid_offset);
- log_file_header_builder.add_source_node_boot_uuid(
- source_node_boot_uuid_offset);
-
- log_file_header_builder.add_parts_uuid(parts_uuid_offset);
- log_file_header_builder.add_parts_index(0);
-
- log_file_header_builder.add_configuration_sha256(0);
-
- if (!config_sha256_offset.IsNull()) {
- log_file_header_builder.add_configuration_sha256(config_sha256_offset);
- }
-
- fbb.FinishSizePrefixed(log_file_header_builder.Finish());
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
- fbb.Release());
-
- CHECK(result.Verify()) << ": Built a corrupted header.";
-
- return result;
-}
-
-void Logger::ResetStatisics() {
- max_message_fetch_time_ = std::chrono::nanoseconds::zero();
- max_message_fetch_time_channel_ = -1;
- max_message_fetch_time_size_ = -1;
- total_message_fetch_time_ = std::chrono::nanoseconds::zero();
- total_message_fetch_count_ = 0;
- total_message_fetch_bytes_ = 0;
- total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
- total_nop_fetch_count_ = 0;
- max_copy_time_ = std::chrono::nanoseconds::zero();
- max_copy_time_channel_ = -1;
- max_copy_time_size_ = -1;
- total_copy_time_ = std::chrono::nanoseconds::zero();
- total_copy_count_ = 0;
- total_copy_bytes_ = 0;
-}
-
-void Logger::Rotate() {
- for (const Node *node : log_namer_->nodes()) {
- const int node_index = configuration::GetNodeIndex(configuration_, node);
- log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
- }
-}
-
-void Logger::LogUntil(monotonic_clock::time_point t) {
- // Grab the latest ServerStatistics message. This will always have the
- // oppertunity to be >= to the current time, so it will always represent any
- // reboots which may have happened.
- WriteMissingTimestamps();
-
- // Write each channel to disk, one at a time.
- for (FetcherStruct &f : fetchers_) {
- while (true) {
- if (f.written) {
- const auto start = event_loop_->monotonic_now();
- const bool got_new = f.fetcher->FetchNext();
- const auto end = event_loop_->monotonic_now();
- RecordFetchResult(start, end, got_new, &f);
- if (!got_new) {
- VLOG(2) << "No new data on "
- << configuration::CleanedChannelToString(
- f.fetcher->channel());
- break;
- }
- f.written = false;
- }
-
- // TODO(james): Write tests to exercise this logic.
- if (f.fetcher->context().monotonic_event_time >= t) {
- break;
- }
- if (f.writer != nullptr) {
- // Write!
- const auto start = event_loop_->monotonic_now();
- flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
- max_header_size_);
- fbb.ForceDefaults(true);
-
- fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- f.channel_index, f.log_type));
- const auto end = event_loop_->monotonic_now();
- RecordCreateMessageTime(start, end, &f);
-
- VLOG(2) << "Writing data as node "
- << FlatbufferToJson(event_loop_->node()) << " for channel "
- << configuration::CleanedChannelToString(f.fetcher->channel())
- << " to " << f.writer->filename() << " data "
- << FlatbufferToJson(
- flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
-
- max_header_size_ = std::max(max_header_size_,
- fbb.GetSize() - f.fetcher->context().size);
- CHECK(node_state_[f.data_node_index].header_valid)
- << ": Can't write data before the header on channel "
- << configuration::CleanedChannelToString(f.fetcher->channel());
- f.writer->QueueSizedFlatbuffer(&fbb);
- }
-
- if (f.timestamp_writer != nullptr) {
- // And now handle timestamps.
- const auto start = event_loop_->monotonic_now();
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
-
- fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- f.channel_index,
- LogType::kLogDeliveryTimeOnly));
- const auto end = event_loop_->monotonic_now();
- RecordCreateMessageTime(start, end, &f);
-
- VLOG(2) << "Writing timestamps as node "
- << FlatbufferToJson(event_loop_->node()) << " for channel "
- << configuration::CleanedChannelToString(f.fetcher->channel())
- << " to " << f.timestamp_writer->filename() << " timestamp "
- << FlatbufferToJson(
- flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
-
- CHECK(node_state_[f.timestamp_node_index].header_valid)
- << ": Can't write data before the header on channel "
- << configuration::CleanedChannelToString(f.fetcher->channel());
- f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
- }
-
- if (f.contents_writer != nullptr) {
- const auto start = event_loop_->monotonic_now();
- // And now handle the special message contents channel. Copy the
- // message into a FlatBufferBuilder and save it to disk.
- // TODO(austin): We can be more efficient here when we start to
- // care...
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
-
- const RemoteMessage *msg =
- flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
-
- CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
- if (!node_state_[f.contents_node_index].has_source_node_boot_uuid ||
- node_state_[f.contents_node_index].source_node_boot_uuid !=
- msg->boot_uuid()->string_view()) {
- node_state_[f.contents_node_index].SetBootUUID(
- msg->boot_uuid()->string_view());
-
- MaybeWriteHeader(f.contents_node_index);
- }
-
- logger::MessageHeader::Builder message_header_builder(fbb);
-
- // TODO(austin): This needs to check the channel_index and confirm
- // that it should be logged before squirreling away the timestamp to
- // disk. We don't want to log irrelevant timestamps.
-
- // Note: this must match the same order as MessageBridgeServer and
- // PackMessage. We want identical headers to have identical
- // on-the-wire formats to make comparing them easier.
-
- // Translate from the channel index that the event loop uses to the
- // channel index in the log file.
- message_header_builder.add_channel_index(
- event_loop_to_logged_channel_index_[msg->channel_index()]);
-
- message_header_builder.add_queue_index(msg->queue_index());
- message_header_builder.add_monotonic_sent_time(
- msg->monotonic_sent_time());
- message_header_builder.add_realtime_sent_time(
- msg->realtime_sent_time());
-
- message_header_builder.add_monotonic_remote_time(
- msg->monotonic_remote_time());
- message_header_builder.add_realtime_remote_time(
- msg->realtime_remote_time());
- message_header_builder.add_remote_queue_index(
- msg->remote_queue_index());
-
- message_header_builder.add_monotonic_timestamp_time(
- f.fetcher->context()
- .monotonic_event_time.time_since_epoch()
- .count());
-
- fbb.FinishSizePrefixed(message_header_builder.Finish());
- const auto end = event_loop_->monotonic_now();
- RecordCreateMessageTime(start, end, &f);
-
- CHECK(node_state_[f.contents_node_index].header_valid)
- << ": Can't write data before the header on channel "
- << configuration::CleanedChannelToString(f.fetcher->channel());
- f.contents_writer->QueueSizedFlatbuffer(&fbb);
- }
-
- f.written = true;
- }
- }
- last_synchronized_time_ = t;
-}
-
-void Logger::DoLogData(const monotonic_clock::time_point end_time) {
- // We want to guarantee that messages aren't out of order by more than
- // max_out_of_order_duration. To do this, we need sync points. Every write
- // cycle should be a sync point.
-
- do {
- // Move the sync point up by at most polling_period. This forces one sync
- // per iteration, even if it is small.
- LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
-
- on_logged_period_();
-
- // If we missed cycles, we could be pretty far behind. Spin until we are
- // caught up.
- } while (last_synchronized_time_ + polling_period_ < end_time);
-}
-
-void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
- aos::monotonic_clock::time_point end,
- bool got_new, FetcherStruct *fetcher) {
- const auto duration = end - start;
- if (!got_new) {
- ++total_nop_fetch_count_;
- total_nop_fetch_time_ += duration;
- return;
- }
- ++total_message_fetch_count_;
- total_message_fetch_bytes_ += fetcher->fetcher->context().size;
- total_message_fetch_time_ += duration;
- if (duration > max_message_fetch_time_) {
- max_message_fetch_time_ = duration;
- max_message_fetch_time_channel_ = fetcher->channel_index;
- max_message_fetch_time_size_ = fetcher->fetcher->context().size;
- }
-}
-
-void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
- aos::monotonic_clock::time_point end,
- FetcherStruct *fetcher) {
- const auto duration = end - start;
- total_copy_time_ += duration;
- ++total_copy_count_;
- total_copy_bytes_ += fetcher->fetcher->context().size;
- if (duration > max_copy_time_) {
- max_copy_time_ = duration;
- max_copy_time_channel_ = fetcher->channel_index;
- max_copy_time_size_ = fetcher->fetcher->context().size;
- }
-}
-
-std::vector<std::vector<std::string>> ToLogReaderVector(
- const std::vector<LogFile> &log_files) {
- std::vector<std::vector<std::string>> result;
- for (const LogFile &log_file : log_files) {
- for (const LogParts &log_parts : log_file.parts) {
- std::vector<std::string> parts;
- for (const std::string &part : log_parts.parts) {
- parts.emplace_back(part);
- }
- result.emplace_back(std::move(parts));
- }
- }
- return result;
-}
-
LogReader::LogReader(std::string_view filename,
const Configuration *replay_configuration)
: LogReader(SortParts({std::string(filename)}), replay_configuration) {}
diff --git a/aos/events/logging/logger.h b/aos/events/logging/log_reader.h
similarity index 61%
rename from aos/events/logging/logger.h
rename to aos/events/logging/log_reader.h
index d142bd8..875c90d 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/log_reader.h
@@ -1,5 +1,5 @@
-#ifndef AOS_EVENTS_LOGGER_H_
-#define AOS_EVENTS_LOGGER_H_
+#ifndef AOS_EVENTS_LOGGING_LOG_READER_H_
+#define AOS_EVENTS_LOGGING_LOG_READER_H_
#include <chrono>
#include <deque>
@@ -8,7 +8,6 @@
#include <vector>
#include "aos/events/event_loop.h"
-#include "aos/events/logging/log_namer.h"
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
@@ -24,307 +23,6 @@
namespace aos {
namespace logger {
-// Logs all channels available in the event loop to disk every 100 ms.
-// Start by logging one message per channel to capture any state and
-// configuration that is sent rately on a channel and would affect execution.
-class Logger {
- public:
- // Constructs a logger.
- // event_loop: The event loop used to read the messages.
- // configuration: When provided, this is the configuration to log, and the
- // configuration to use for the channel list to log. If not provided,
- // this becomes the configuration from the event loop.
- // should_log: When provided, a filter for channels to log. If not provided,
- // all available channels are logged.
- Logger(EventLoop *event_loop)
- : Logger(event_loop, event_loop->configuration()) {}
- Logger(EventLoop *event_loop, const Configuration *configuration)
- : Logger(event_loop, configuration,
- [](const Channel *) { return true; }) {}
- Logger(EventLoop *event_loop, const Configuration *configuration,
- std::function<bool(const Channel *)> should_log);
- ~Logger();
-
- // Overrides the name in the log file header.
- void set_name(std::string_view name) { name_ = name; }
-
- // Sets the callback to run after each period of data is logged. Defaults to
- // doing nothing.
- //
- // This callback may safely do things like call Rotate().
- void set_on_logged_period(std::function<void()> on_logged_period) {
- on_logged_period_ = std::move(on_logged_period);
- }
-
- void set_separate_config(bool separate_config) {
- separate_config_ = separate_config;
- }
-
- // Sets the period between polling the data. Defaults to 100ms.
- //
- // Changing this while a set of files is being written may result in
- // unreadable files.
- void set_polling_period(std::chrono::nanoseconds polling_period) {
- polling_period_ = polling_period;
- }
-
- std::string_view log_start_uuid() const { return log_start_uuid_; }
- UUID logger_instance_uuid() const { return logger_instance_uuid_; }
-
- // The maximum time for a single fetch which returned a message, or 0 if none
- // of those have happened.
- std::chrono::nanoseconds max_message_fetch_time() const {
- return max_message_fetch_time_;
- }
- // The channel for that longest fetch which returned a message, or -1 if none
- // of those have happened.
- int max_message_fetch_time_channel() const {
- return max_message_fetch_time_channel_;
- }
- // The size of the message returned by that longest fetch, or -1 if none of
- // those have happened.
- int max_message_fetch_time_size() const {
- return max_message_fetch_time_size_;
- }
- // The total time spent fetching messages.
- std::chrono::nanoseconds total_message_fetch_time() const {
- return total_message_fetch_time_;
- }
- // The total number of fetch calls which returned messages.
- int total_message_fetch_count() const { return total_message_fetch_count_; }
- // The total number of bytes fetched.
- int64_t total_message_fetch_bytes() const {
- return total_message_fetch_bytes_;
- }
-
- // The total time spent in fetches which did not return a message.
- std::chrono::nanoseconds total_nop_fetch_time() const {
- return total_nop_fetch_time_;
- }
- // The total number of fetches which did not return a message.
- int total_nop_fetch_count() const { return total_nop_fetch_count_; }
-
- // The maximum time for a single copy, or 0 if none of those have happened.
- std::chrono::nanoseconds max_copy_time() const { return max_copy_time_; }
- // The channel for that longest copy, or -1 if none of those have happened.
- int max_copy_time_channel() const { return max_copy_time_channel_; }
- // The size of the message for that longest copy, or -1 if none of those have
- // happened.
- int max_copy_time_size() const { return max_copy_time_size_; }
- // The total time spent copying messages.
- std::chrono::nanoseconds total_copy_time() const { return total_copy_time_; }
- // The total number of messages copied.
- int total_copy_count() const { return total_copy_count_; }
- // The total number of bytes copied.
- int64_t total_copy_bytes() const { return total_copy_bytes_; }
-
- void ResetStatisics();
-
- // Rotates the log file(s), triggering new part files to be written for each
- // log file.
- void Rotate();
-
- // Starts logging to files with the given naming scheme.
- //
- // log_start_uuid may be used to tie this log event to other log events across
- // multiple nodes. The default (empty string) indicates there isn't one
- // available.
- void StartLogging(std::unique_ptr<LogNamer> log_namer,
- std::string_view log_start_uuid = "");
-
- // Stops logging. Ensures any messages through end_time make it into the log.
- //
- // If you want to stop ASAP, pass min_time to avoid reading any more messages.
- //
- // Returns the LogNamer in case the caller wants to do anything else with it
- // before destroying it.
- std::unique_ptr<LogNamer> StopLogging(
- aos::monotonic_clock::time_point end_time);
-
- // Returns whether a log is currently being written.
- bool is_started() const { return static_cast<bool>(log_namer_); }
-
- // Shortcut to call StartLogging with a LocalLogNamer when event processing
- // starts.
- void StartLoggingLocalNamerOnRun(std::string base_name) {
- event_loop_->OnRun([this, base_name]() {
- StartLogging(
- std::make_unique<LocalLogNamer>(base_name, event_loop_->node()));
- });
- }
-
- private:
- // Structure to track both a fetcher, and if the data fetched has been
- // written. We may want to delay writing data to disk so that we don't let
- // data get too far out of order when written to disk so we can avoid making
- // it too hard to sort when reading.
- struct FetcherStruct {
- std::unique_ptr<RawFetcher> fetcher;
- bool written = false;
-
- // Channel index to log to.
- int channel_index = -1;
- const Channel *channel = nullptr;
- const Node *timestamp_node = nullptr;
-
- LogType log_type = LogType::kLogMessage;
-
- // We fill out the metadata at construction, but the actual writers have to
- // be updated each time we start logging. To avoid duplicating the complex
- // logic determining whether each writer should be initialized, we just
- // stash the answer in separate member variables.
- bool wants_writer = false;
- DetachedBufferWriter *writer = nullptr;
- bool wants_timestamp_writer = false;
- DetachedBufferWriter *timestamp_writer = nullptr;
- bool wants_contents_writer = false;
- DetachedBufferWriter *contents_writer = nullptr;
-
- // Node which this data is from, or -1 if it is unknown.
- int data_node_index = -1;
- // Node that this timestamp is for, or -1 if it is known.
- int timestamp_node_index = -1;
- // Node that the contents this contents_writer will log are from.
- int contents_node_index = -1;
- };
-
- // Vector mapping from the channel index from the event loop to the logged
- // channel index.
- std::vector<int> event_loop_to_logged_channel_index_;
-
- struct NodeState {
- aos::monotonic_clock::time_point monotonic_start_time =
- aos::monotonic_clock::min_time;
- aos::realtime_clock::time_point realtime_start_time =
- aos::realtime_clock::min_time;
-
- bool has_source_node_boot_uuid = false;
-
- // This is an initial UUID that is a valid UUID4 and is pretty obvious that
- // it isn't valid.
- std::string source_node_boot_uuid = "00000000-0000-4000-8000-000000000000";
-
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
-
- // True if a header has been written to the start of a log file.
- bool header_written = false;
- // True if the current written header represents the contents which will
- // follow. This is cleared when boot_uuid is known to not match anymore.
- bool header_valid = false;
-
- // Sets the source_node_boot_uuid, properly updating everything.
- void SetBootUUID(std::string_view new_source_node_boot_uuid) {
- source_node_boot_uuid = new_source_node_boot_uuid;
- header_valid = false;
- has_source_node_boot_uuid = true;
-
- flatbuffers::String *source_node_boot_uuid_string =
- log_file_header.mutable_message()->mutable_source_node_boot_uuid();
- CHECK_EQ(source_node_boot_uuid.size(),
- source_node_boot_uuid_string->size());
- memcpy(source_node_boot_uuid_string->data(), source_node_boot_uuid.data(),
- source_node_boot_uuid.size());
- }
- };
-
- void WriteHeader();
-
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
- const Node *node, std::string_view config_sha256);
-
- // Writes the header for the provided node if enough information is valid.
- void MaybeWriteHeader(int node_index);
- // Overload for when we already know node as well.
- void MaybeWriteHeader(int node_index, const Node *node);
-
- bool MaybeUpdateTimestamp(
- const Node *node, int node_index,
- aos::monotonic_clock::time_point monotonic_start_time,
- aos::realtime_clock::time_point realtime_start_time);
-
- void DoLogData(const monotonic_clock::time_point end_time);
-
- void WriteMissingTimestamps();
-
- // Fetches from each channel until all the data is logged.
- void LogUntil(monotonic_clock::time_point t);
-
- void RecordFetchResult(aos::monotonic_clock::time_point start,
- aos::monotonic_clock::time_point end, bool got_new,
- FetcherStruct *fetcher);
-
- void RecordCreateMessageTime(aos::monotonic_clock::time_point start,
- aos::monotonic_clock::time_point end,
- FetcherStruct *fetcher);
-
- // Sets the start time for a specific node.
- void SetStartTime(
- size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
- aos::realtime_clock::time_point realtime_start_time,
- aos::monotonic_clock::time_point logger_monotonic_start_time,
- aos::realtime_clock::time_point logger_realtime_start_time);
-
- EventLoop *const event_loop_;
- // The configuration to place at the top of the log file.
- const Configuration *const configuration_;
-
- UUID log_event_uuid_ = UUID::Zero();
- const UUID logger_instance_uuid_ = UUID::Random();
- std::unique_ptr<LogNamer> log_namer_;
- // Empty indicates there isn't one.
- std::string log_start_uuid_;
-
- // Name to save in the log file. Defaults to hostname.
- std::string name_;
-
- std::function<void()> on_logged_period_ = []() {};
-
- std::chrono::nanoseconds max_message_fetch_time_ =
- std::chrono::nanoseconds::zero();
- int max_message_fetch_time_channel_ = -1;
- int max_message_fetch_time_size_ = -1;
- std::chrono::nanoseconds total_message_fetch_time_ =
- std::chrono::nanoseconds::zero();
- int total_message_fetch_count_ = 0;
- int64_t total_message_fetch_bytes_ = 0;
-
- std::chrono::nanoseconds total_nop_fetch_time_ =
- std::chrono::nanoseconds::zero();
- int total_nop_fetch_count_ = 0;
-
- std::chrono::nanoseconds max_copy_time_ = std::chrono::nanoseconds::zero();
- int max_copy_time_channel_ = -1;
- int max_copy_time_size_ = -1;
- std::chrono::nanoseconds total_copy_time_ = std::chrono::nanoseconds::zero();
- int total_copy_count_ = 0;
- int64_t total_copy_bytes_ = 0;
-
- std::vector<FetcherStruct> fetchers_;
- TimerHandler *timer_handler_;
-
- // Period to poll the channels.
- std::chrono::nanoseconds polling_period_ = std::chrono::milliseconds(100);
-
- // Last time that data was written for all channels to disk.
- monotonic_clock::time_point last_synchronized_time_;
-
- // Max size that the header has consumed. This much extra data will be
- // reserved in the builder to avoid reallocating.
- size_t max_header_size_ = 0;
-
- // If true, write the message header into a separate file.
- bool separate_config_ = true;
-
- // Fetcher for all the statistics from all the nodes.
- aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
-
- std::vector<NodeState> node_state_;
-};
-
-std::vector<std::vector<std::string>> ToLogReaderVector(
- const std::vector<LogFile> &log_files);
-
// We end up with one of the following 3 log file types.
//
// Single node logged as the source node.
@@ -771,4 +469,4 @@
} // namespace logger
} // namespace aos
-#endif // AOS_EVENTS_LOGGER_H_
+#endif // AOS_EVENTS_LOGGING_LOG_READER_H_
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index 47d0ced..d4f04c3 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -1,7 +1,7 @@
#include <iomanip>
#include <iostream>
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/init.h"
#include "aos/json_to_flatbuffer.h"
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
new file mode 100644
index 0000000..cb0f7c6
--- /dev/null
+++ b/aos/events/logging/log_writer.cc
@@ -0,0 +1,837 @@
+#include "aos/events/logging/log_writer.h"
+
+#include <functional>
+#include <map>
+#include <vector>
+
+#include "aos/configuration.h"
+#include "aos/events/event_loop.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/team_number.h"
+
+namespace aos {
+namespace logger {
+namespace {
+using message_bridge::RemoteMessage;
+} // namespace
+
+Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
+ std::function<bool(const Channel *)> should_log)
+ : event_loop_(event_loop),
+ configuration_(configuration),
+ name_(network::GetHostname()),
+ timer_handler_(event_loop_->AddTimer(
+ [this]() { DoLogData(event_loop_->monotonic_now()); })),
+ server_statistics_fetcher_(
+ configuration::MultiNode(event_loop_->configuration())
+ ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
+ "/aos")
+ : aos::Fetcher<message_bridge::ServerStatistics>()) {
+ VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
+
+ // Find all the nodes which are logging timestamps on our node. This may
+ // over-estimate if should_log is specified.
+ std::vector<const Node *> timestamp_logger_nodes =
+ configuration::TimestampNodes(configuration_, event_loop_->node());
+
+ std::map<const Channel *, const Node *> timestamp_logger_channels;
+
+ // Now that we have all the nodes accumulated, make remote timestamp loggers
+ // for them.
+ for (const Node *node : timestamp_logger_nodes) {
+ // Note: since we are doing a find using the event loop channel, we need to
+ // make sure this channel pointer is part of the event loop configuration,
+ // not configuration_. This only matters when configuration_ !=
+ // event_loop->configuration();
+ const Channel *channel = configuration::GetChannel(
+ event_loop->configuration(),
+ absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
+ RemoteMessage::GetFullyQualifiedName(), event_loop_->name(),
+ event_loop_->node());
+
+ CHECK(channel != nullptr)
+ << ": Remote timestamps are logged on "
+ << event_loop_->node()->name()->string_view()
+ << " but can't find channel /aos/remote_timestamps/"
+ << node->name()->string_view();
+ if (!should_log(channel)) {
+ continue;
+ }
+ timestamp_logger_channels.insert(std::make_pair(channel, node));
+ }
+
+ const size_t our_node_index =
+ configuration::GetNodeIndex(configuration_, event_loop_->node());
+
+ for (size_t channel_index = 0;
+ channel_index < configuration_->channels()->size(); ++channel_index) {
+ const Channel *const config_channel =
+ configuration_->channels()->Get(channel_index);
+ // The MakeRawFetcher method needs a channel which is in the event loop
+ // configuration() object, not the configuration_ object. Go look that up
+ // from the config.
+ const Channel *channel = aos::configuration::GetChannel(
+ event_loop_->configuration(), config_channel->name()->string_view(),
+ config_channel->type()->string_view(), "", event_loop_->node());
+ CHECK(channel != nullptr)
+ << ": Failed to look up channel "
+ << aos::configuration::CleanedChannelToString(config_channel);
+ if (!should_log(channel)) {
+ continue;
+ }
+
+ FetcherStruct fs;
+ fs.channel_index = channel_index;
+ fs.channel = channel;
+
+ const bool is_local =
+ configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
+
+ const bool is_readable =
+ configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
+ const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
+ channel, event_loop_->node());
+ const bool log_message = is_logged && is_readable;
+
+ bool log_delivery_times = false;
+ if (event_loop_->node() != nullptr) {
+ log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ channel, event_loop_->node(), event_loop_->node());
+ }
+
+ // Now, detect a RemoteMessage timestamp logger where we should just log the
+ // contents to a file directly.
+ const bool log_contents = timestamp_logger_channels.find(channel) !=
+ timestamp_logger_channels.end();
+
+ if (log_message || log_delivery_times || log_contents) {
+ fs.fetcher = event_loop->MakeRawFetcher(channel);
+ VLOG(1) << "Logging channel "
+ << configuration::CleanedChannelToString(channel);
+
+ if (log_delivery_times) {
+ VLOG(1) << " Delivery times";
+ fs.wants_timestamp_writer = true;
+ fs.timestamp_node_index = our_node_index;
+ }
+ if (log_message) {
+ VLOG(1) << " Data";
+ fs.wants_writer = true;
+ if (!is_local) {
+ const Node *source_node = configuration::GetNode(
+ configuration_, channel->source_node()->string_view());
+ fs.data_node_index =
+ configuration::GetNodeIndex(configuration_, source_node);
+ fs.log_type = LogType::kLogRemoteMessage;
+ } else {
+ fs.data_node_index = our_node_index;
+ }
+ }
+ if (log_contents) {
+ VLOG(1) << "Timestamp logger channel "
+ << configuration::CleanedChannelToString(channel);
+ fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
+ fs.wants_contents_writer = true;
+ fs.contents_node_index =
+ configuration::GetNodeIndex(configuration_, fs.timestamp_node);
+ }
+ fetchers_.emplace_back(std::move(fs));
+ }
+ }
+
+ // When we are logging remote timestamps, we need to be able to translate from
+ // the channel index that the event loop uses to the channel index in the
+ // config in the log file.
+ event_loop_to_logged_channel_index_.resize(
+ event_loop->configuration()->channels()->size(), -1);
+ for (size_t event_loop_channel_index = 0;
+ event_loop_channel_index <
+ event_loop->configuration()->channels()->size();
+ ++event_loop_channel_index) {
+ const Channel *event_loop_channel =
+ event_loop->configuration()->channels()->Get(event_loop_channel_index);
+
+ const Channel *logged_channel = aos::configuration::GetChannel(
+ configuration_, event_loop_channel->name()->string_view(),
+ event_loop_channel->type()->string_view(), "",
+ configuration::GetNode(configuration_, event_loop_->node()));
+
+ if (logged_channel != nullptr) {
+ event_loop_to_logged_channel_index_[event_loop_channel_index] =
+ configuration::ChannelIndex(configuration_, logged_channel);
+ }
+ }
+}
+
+Logger::~Logger() {
+ if (log_namer_) {
+ // If we are replaying a log file, or in simulation, we want to force the
+ // last bit of data to be logged. The easiest way to deal with this is to
+ // poll everything as we go to destroy the class, ie, shut down the logger,
+ // and write it to disk.
+ StopLogging(event_loop_->monotonic_now());
+ }
+}
+
+void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
+ std::string_view log_start_uuid) {
+ CHECK(!log_namer_) << ": Already logging";
+ log_namer_ = std::move(log_namer);
+
+ std::string config_sha256;
+ if (separate_config_) {
+ flatbuffers::FlatBufferBuilder fbb;
+ flatbuffers::Offset<aos::Configuration> configuration_offset =
+ CopyFlatBuffer(configuration_, &fbb);
+ LogFileHeader::Builder log_file_header_builder(fbb);
+ log_file_header_builder.add_configuration(configuration_offset);
+ fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
+ fbb.Release());
+ config_sha256 = Sha256(config_header.span());
+ LOG(INFO) << "Config sha256 of " << config_sha256;
+ log_namer_->WriteConfiguration(&config_header, config_sha256);
+ }
+
+ log_event_uuid_ = UUID::Random();
+ log_start_uuid_ = log_start_uuid;
+ VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
+
+ // We want to do as much work as possible before the initial Fetch. Time
+ // between that and actually starting to log opens up the possibility of
+ // falling off the end of the queue during that time.
+
+ for (FetcherStruct &f : fetchers_) {
+ if (f.wants_writer) {
+ f.writer = log_namer_->MakeWriter(f.channel);
+ }
+ if (f.wants_timestamp_writer) {
+ f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
+ }
+ if (f.wants_contents_writer) {
+ f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
+ f.channel, CHECK_NOTNULL(f.timestamp_node));
+ }
+ }
+
+ CHECK(node_state_.empty());
+ node_state_.resize(configuration::MultiNode(configuration_)
+ ? configuration_->nodes()->size()
+ : 1u);
+
+ for (const Node *node : log_namer_->nodes()) {
+ const int node_index = configuration::GetNodeIndex(configuration_, node);
+
+ node_state_[node_index].log_file_header = MakeHeader(node, config_sha256);
+ }
+
+ // Grab data from each channel right before we declare the log file started
+ // so we can capture the latest message on each channel. This lets us have
+ // non periodic messages with configuration that now get logged.
+ for (FetcherStruct &f : fetchers_) {
+ const auto start = event_loop_->monotonic_now();
+ const bool got_new = f.fetcher->Fetch();
+ const auto end = event_loop_->monotonic_now();
+ RecordFetchResult(start, end, got_new, &f);
+
+ // If there is a message, we want to write it.
+ f.written = f.fetcher->context().data == nullptr;
+ }
+
+ // Clear out any old timestamps in case we are re-starting logging.
+ for (size_t i = 0; i < node_state_.size(); ++i) {
+ SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
+ monotonic_clock::min_time, realtime_clock::min_time);
+ }
+
+ WriteHeader();
+
+ LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
+ << " start_time " << last_synchronized_time_;
+
+ // Force logging up until the start of the log file now, so the messages at
+ // the start are always ordered before the rest of the messages.
+ // Note: this ship may have already sailed, but we don't have to make it
+ // worse.
+ // TODO(austin): Test...
+ LogUntil(last_synchronized_time_);
+
+ timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
+ polling_period_);
+}
+
+std::unique_ptr<LogNamer> Logger::StopLogging(
+ aos::monotonic_clock::time_point end_time) {
+ CHECK(log_namer_) << ": Not logging right now";
+
+ if (end_time != aos::monotonic_clock::min_time) {
+ LogUntil(end_time);
+ }
+ timer_handler_->Disable();
+
+ for (FetcherStruct &f : fetchers_) {
+ f.writer = nullptr;
+ f.timestamp_writer = nullptr;
+ f.contents_writer = nullptr;
+ }
+ node_state_.clear();
+
+ log_event_uuid_ = UUID::Zero();
+ log_start_uuid_ = std::string();
+
+ return std::move(log_namer_);
+}
+
+void Logger::WriteHeader() {
+ if (configuration::MultiNode(configuration_)) {
+ server_statistics_fetcher_.Fetch();
+ }
+
+ aos::monotonic_clock::time_point monotonic_start_time =
+ event_loop_->monotonic_now();
+ aos::realtime_clock::time_point realtime_start_time =
+ event_loop_->realtime_now();
+
+ // We need to pick a point in time to declare the log file "started". This
+ // starts here. It needs to be after everything is fetched so that the
+ // fetchers are all pointed at the most recent message before the start
+ // time.
+ last_synchronized_time_ = monotonic_start_time;
+
+ for (const Node *node : log_namer_->nodes()) {
+ const int node_index = configuration::GetNodeIndex(configuration_, node);
+ MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
+ realtime_start_time);
+ MaybeWriteHeader(node_index, node);
+ }
+}
+
+void Logger::MaybeWriteHeader(int node_index) {
+ if (configuration::MultiNode(configuration_)) {
+ return MaybeWriteHeader(node_index,
+ configuration_->nodes()->Get(node_index));
+ } else {
+ return MaybeWriteHeader(node_index, nullptr);
+ }
+}
+
+void Logger::MaybeWriteHeader(int node_index, const Node *node) {
+ // This function is responsible for writing the header when the header both
+ // has valid data, and when it needs to be written.
+ if (node_state_[node_index].header_written &&
+ node_state_[node_index].header_valid) {
+ // The header has been written and is valid, nothing to do.
+ return;
+ }
+ if (!node_state_[node_index].has_source_node_boot_uuid) {
+ // Can't write a header if we don't have the boot UUID.
+ return;
+ }
+
+ // WriteHeader writes the first header in a log file. We want to do this only
+ // once.
+ //
+ // Rotate rewrites the same header with a new part ID, but keeps the same part
+ // UUID. We don't want that when things reboot, because that implies that
+ // parts go together across a reboot.
+ //
+ // Reboot resets the parts UUID. So, once we've written a header the first
+ // time, we want to use Reboot to rotate the log and reset the parts UUID.
+ //
+ // header_valid is cleared whenever the remote reboots.
+ if (node_state_[node_index].header_written) {
+ log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
+ } else {
+ log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
+
+ node_state_[node_index].header_written = true;
+ }
+ node_state_[node_index].header_valid = true;
+}
+
+void Logger::WriteMissingTimestamps() {
+ if (configuration::MultiNode(configuration_)) {
+ server_statistics_fetcher_.Fetch();
+ } else {
+ return;
+ }
+
+ if (server_statistics_fetcher_.get() == nullptr) {
+ return;
+ }
+
+ for (const Node *node : log_namer_->nodes()) {
+ const int node_index = configuration::GetNodeIndex(configuration_, node);
+ if (MaybeUpdateTimestamp(
+ node, node_index,
+ server_statistics_fetcher_.context().monotonic_event_time,
+ server_statistics_fetcher_.context().realtime_event_time)) {
+ CHECK(node_state_[node_index].header_written);
+ CHECK(node_state_[node_index].header_valid);
+ log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+ } else {
+ MaybeWriteHeader(node_index, node);
+ }
+ }
+}
+
+void Logger::SetStartTime(
+ size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time,
+ aos::monotonic_clock::time_point logger_monotonic_start_time,
+ aos::realtime_clock::time_point logger_realtime_start_time) {
+ node_state_[node_index].monotonic_start_time = monotonic_start_time;
+ node_state_[node_index].realtime_start_time = realtime_start_time;
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ monotonic_start_time.time_since_epoch())
+ .count());
+
+ // Add logger start times if they are available in the log file header.
+ if (node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->has_logger_monotonic_start_time()) {
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_logger_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ logger_monotonic_start_time.time_since_epoch())
+ .count());
+ }
+
+ if (node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->has_logger_realtime_start_time()) {
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_logger_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ logger_realtime_start_time.time_since_epoch())
+ .count());
+ }
+
+ if (node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->has_realtime_start_time()) {
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_start_time.time_since_epoch())
+ .count());
+ }
+}
+
+bool Logger::MaybeUpdateTimestamp(
+ const Node *node, int node_index,
+ aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time) {
+ // Bail early if the start times are already set.
+ if (node_state_[node_index].monotonic_start_time !=
+ monotonic_clock::min_time) {
+ return false;
+ }
+ if (event_loop_->node() == node ||
+ !configuration::MultiNode(configuration_)) {
+ // There are no offsets to compute for ourself, so always succeed.
+ SetStartTime(node_index, monotonic_start_time, realtime_start_time,
+ monotonic_start_time, realtime_start_time);
+ node_state_[node_index].SetBootUUID(event_loop_->boot_uuid().string_view());
+ return true;
+ } else if (server_statistics_fetcher_.get() != nullptr) {
+ // We must be a remote node now. Look for the connection and see if it is
+ // connected.
+
+ for (const message_bridge::ServerConnection *connection :
+ *server_statistics_fetcher_->connections()) {
+ if (connection->node()->name()->string_view() !=
+ node->name()->string_view()) {
+ continue;
+ }
+
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ VLOG(1) << node->name()->string_view()
+ << " is not connected, can't start it yet.";
+ break;
+ }
+
+ // Update the boot UUID as soon as we know we are connected.
+ if (!connection->has_boot_uuid()) {
+ VLOG(1) << "Missing boot_uuid for node " << aos::FlatbufferToJson(node);
+ break;
+ }
+
+ if (!node_state_[node_index].has_source_node_boot_uuid ||
+ node_state_[node_index].source_node_boot_uuid !=
+ connection->boot_uuid()->string_view()) {
+ node_state_[node_index].SetBootUUID(
+ connection->boot_uuid()->string_view());
+ }
+
+ if (!connection->has_monotonic_offset()) {
+ VLOG(1) << "Missing monotonic offset for setting start time for node "
+ << aos::FlatbufferToJson(node);
+ break;
+ }
+
+ // Found it and it is connected. Compensate and go.
+ SetStartTime(node_index,
+ monotonic_start_time +
+ std::chrono::nanoseconds(connection->monotonic_offset()),
+ realtime_start_time, monotonic_start_time,
+ realtime_start_time);
+ return true;
+ }
+ }
+ return false;
+}
+
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
+ const Node *node, std::string_view config_sha256) {
+ // Now write the header with this timestamp in it.
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ flatbuffers::Offset<aos::Configuration> configuration_offset;
+ if (!separate_config_) {
+ configuration_offset = CopyFlatBuffer(configuration_, &fbb);
+ } else {
+ CHECK(!config_sha256.empty());
+ }
+
+ const flatbuffers::Offset<flatbuffers::String> name_offset =
+ fbb.CreateString(name_);
+
+ CHECK(log_event_uuid_ != UUID::Zero());
+ const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
+ fbb.CreateString(log_event_uuid_.string_view());
+
+ const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
+ fbb.CreateString(logger_instance_uuid_.string_view());
+
+ flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
+ if (!log_start_uuid_.empty()) {
+ log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
+ }
+
+ flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
+ if (!config_sha256.empty()) {
+ config_sha256_offset = fbb.CreateString(config_sha256);
+ }
+
+ const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
+ fbb.CreateString(event_loop_->boot_uuid().string_view());
+
+ const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
+ fbb.CreateString(event_loop_->boot_uuid().string_view());
+
+ const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
+ fbb.CreateString("00000000-0000-4000-8000-000000000000");
+
+ flatbuffers::Offset<Node> node_offset;
+ flatbuffers::Offset<Node> logger_node_offset;
+
+ if (configuration::MultiNode(configuration_)) {
+ node_offset = RecursiveCopyFlatBuffer(node, &fbb);
+ logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
+ }
+
+ aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
+
+ log_file_header_builder.add_name(name_offset);
+
+ // Only add the node if we are running in a multinode configuration.
+ if (node != nullptr) {
+ log_file_header_builder.add_node(node_offset);
+ log_file_header_builder.add_logger_node(logger_node_offset);
+ }
+
+ if (!configuration_offset.IsNull()) {
+ log_file_header_builder.add_configuration(configuration_offset);
+ }
+ // The worst case theoretical out of order is the polling period times 2.
+ // One message could get logged right after the boundary, but be for right
+ // before the next boundary. And the reverse could happen for another
+ // message. Report back 3x to be extra safe, and because the cost isn't
+ // huge on the read side.
+ log_file_header_builder.add_max_out_of_order_duration(
+ std::chrono::nanoseconds(3 * polling_period_).count());
+
+ log_file_header_builder.add_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ monotonic_clock::min_time.time_since_epoch())
+ .count());
+ if (node == event_loop_->node()) {
+ log_file_header_builder.add_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_clock::min_time.time_since_epoch())
+ .count());
+ } else {
+ log_file_header_builder.add_logger_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ monotonic_clock::min_time.time_since_epoch())
+ .count());
+ log_file_header_builder.add_logger_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_clock::min_time.time_since_epoch())
+ .count());
+ }
+
+ log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
+ log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
+ if (!log_start_uuid_offset.IsNull()) {
+ log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
+ }
+ log_file_header_builder.add_logger_node_boot_uuid(
+ logger_node_boot_uuid_offset);
+ log_file_header_builder.add_source_node_boot_uuid(
+ source_node_boot_uuid_offset);
+
+ log_file_header_builder.add_parts_uuid(parts_uuid_offset);
+ log_file_header_builder.add_parts_index(0);
+
+ log_file_header_builder.add_configuration_sha256(0);
+
+ if (!config_sha256_offset.IsNull()) {
+ log_file_header_builder.add_configuration_sha256(config_sha256_offset);
+ }
+
+ fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
+ fbb.Release());
+
+ CHECK(result.Verify()) << ": Built a corrupted header.";
+
+ return result;
+}
+
+void Logger::ResetStatisics() {
+ max_message_fetch_time_ = std::chrono::nanoseconds::zero();
+ max_message_fetch_time_channel_ = -1;
+ max_message_fetch_time_size_ = -1;
+ total_message_fetch_time_ = std::chrono::nanoseconds::zero();
+ total_message_fetch_count_ = 0;
+ total_message_fetch_bytes_ = 0;
+ total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
+ total_nop_fetch_count_ = 0;
+ max_copy_time_ = std::chrono::nanoseconds::zero();
+ max_copy_time_channel_ = -1;
+ max_copy_time_size_ = -1;
+ total_copy_time_ = std::chrono::nanoseconds::zero();
+ total_copy_count_ = 0;
+ total_copy_bytes_ = 0;
+}
+
+void Logger::Rotate() {
+ for (const Node *node : log_namer_->nodes()) {
+ const int node_index = configuration::GetNodeIndex(configuration_, node);
+ log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+ }
+}
+
+void Logger::LogUntil(monotonic_clock::time_point t) {
+ // Grab the latest ServerStatistics message. This will always have the
+ // oppertunity to be >= to the current time, so it will always represent any
+ // reboots which may have happened.
+ WriteMissingTimestamps();
+
+ // Write each channel to disk, one at a time.
+ for (FetcherStruct &f : fetchers_) {
+ while (true) {
+ if (f.written) {
+ const auto start = event_loop_->monotonic_now();
+ const bool got_new = f.fetcher->FetchNext();
+ const auto end = event_loop_->monotonic_now();
+ RecordFetchResult(start, end, got_new, &f);
+ if (!got_new) {
+ VLOG(2) << "No new data on "
+ << configuration::CleanedChannelToString(
+ f.fetcher->channel());
+ break;
+ }
+ f.written = false;
+ }
+
+ // TODO(james): Write tests to exercise this logic.
+ if (f.fetcher->context().monotonic_event_time >= t) {
+ break;
+ }
+ if (f.writer != nullptr) {
+ // Write!
+ const auto start = event_loop_->monotonic_now();
+ flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+ max_header_size_);
+ fbb.ForceDefaults(true);
+
+ fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+ f.channel_index, f.log_type));
+ const auto end = event_loop_->monotonic_now();
+ RecordCreateMessageTime(start, end, &f);
+
+ VLOG(2) << "Writing data as node "
+ << FlatbufferToJson(event_loop_->node()) << " for channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel())
+ << " to " << f.writer->filename() << " data "
+ << FlatbufferToJson(
+ flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
+
+ max_header_size_ = std::max(max_header_size_,
+ fbb.GetSize() - f.fetcher->context().size);
+ CHECK(node_state_[f.data_node_index].header_valid)
+ << ": Can't write data before the header on channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel());
+ f.writer->QueueSizedFlatbuffer(&fbb);
+ }
+
+ if (f.timestamp_writer != nullptr) {
+ // And now handle timestamps.
+ const auto start = event_loop_->monotonic_now();
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+ f.channel_index,
+ LogType::kLogDeliveryTimeOnly));
+ const auto end = event_loop_->monotonic_now();
+ RecordCreateMessageTime(start, end, &f);
+
+ VLOG(2) << "Writing timestamps as node "
+ << FlatbufferToJson(event_loop_->node()) << " for channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel())
+ << " to " << f.timestamp_writer->filename() << " timestamp "
+ << FlatbufferToJson(
+ flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
+
+ CHECK(node_state_[f.timestamp_node_index].header_valid)
+ << ": Can't write data before the header on channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel());
+ f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
+ }
+
+ if (f.contents_writer != nullptr) {
+ const auto start = event_loop_->monotonic_now();
+ // And now handle the special message contents channel. Copy the
+ // message into a FlatBufferBuilder and save it to disk.
+ // TODO(austin): We can be more efficient here when we start to
+ // care...
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ const RemoteMessage *msg =
+ flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
+
+ CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
+ if (!node_state_[f.contents_node_index].has_source_node_boot_uuid ||
+ node_state_[f.contents_node_index].source_node_boot_uuid !=
+ msg->boot_uuid()->string_view()) {
+ node_state_[f.contents_node_index].SetBootUUID(
+ msg->boot_uuid()->string_view());
+
+ MaybeWriteHeader(f.contents_node_index);
+ }
+
+ logger::MessageHeader::Builder message_header_builder(fbb);
+
+ // TODO(austin): This needs to check the channel_index and confirm
+ // that it should be logged before squirreling away the timestamp to
+ // disk. We don't want to log irrelevant timestamps.
+
+ // Note: this must match the same order as MessageBridgeServer and
+ // PackMessage. We want identical headers to have identical
+ // on-the-wire formats to make comparing them easier.
+
+ // Translate from the channel index that the event loop uses to the
+ // channel index in the log file.
+ message_header_builder.add_channel_index(
+ event_loop_to_logged_channel_index_[msg->channel_index()]);
+
+ message_header_builder.add_queue_index(msg->queue_index());
+ message_header_builder.add_monotonic_sent_time(
+ msg->monotonic_sent_time());
+ message_header_builder.add_realtime_sent_time(
+ msg->realtime_sent_time());
+
+ message_header_builder.add_monotonic_remote_time(
+ msg->monotonic_remote_time());
+ message_header_builder.add_realtime_remote_time(
+ msg->realtime_remote_time());
+ message_header_builder.add_remote_queue_index(
+ msg->remote_queue_index());
+
+ message_header_builder.add_monotonic_timestamp_time(
+ f.fetcher->context()
+ .monotonic_event_time.time_since_epoch()
+ .count());
+
+ fbb.FinishSizePrefixed(message_header_builder.Finish());
+ const auto end = event_loop_->monotonic_now();
+ RecordCreateMessageTime(start, end, &f);
+
+ CHECK(node_state_[f.contents_node_index].header_valid)
+ << ": Can't write data before the header on channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel());
+ f.contents_writer->QueueSizedFlatbuffer(&fbb);
+ }
+
+ f.written = true;
+ }
+ }
+ last_synchronized_time_ = t;
+}
+
+void Logger::DoLogData(const monotonic_clock::time_point end_time) {
+ // We want to guarantee that messages aren't out of order by more than
+ // max_out_of_order_duration. To do this, we need sync points. Every write
+ // cycle should be a sync point.
+
+ do {
+ // Move the sync point up by at most polling_period. This forces one sync
+ // per iteration, even if it is small.
+ LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
+
+ on_logged_period_();
+
+ // If we missed cycles, we could be pretty far behind. Spin until we are
+ // caught up.
+ } while (last_synchronized_time_ + polling_period_ < end_time);
+}
+
+void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
+ aos::monotonic_clock::time_point end,
+ bool got_new, FetcherStruct *fetcher) {
+ const auto duration = end - start;
+ if (!got_new) {
+ ++total_nop_fetch_count_;
+ total_nop_fetch_time_ += duration;
+ return;
+ }
+ ++total_message_fetch_count_;
+ total_message_fetch_bytes_ += fetcher->fetcher->context().size;
+ total_message_fetch_time_ += duration;
+ if (duration > max_message_fetch_time_) {
+ max_message_fetch_time_ = duration;
+ max_message_fetch_time_channel_ = fetcher->channel_index;
+ max_message_fetch_time_size_ = fetcher->fetcher->context().size;
+ }
+}
+
+void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
+ aos::monotonic_clock::time_point end,
+ FetcherStruct *fetcher) {
+ const auto duration = end - start;
+ total_copy_time_ += duration;
+ ++total_copy_count_;
+ total_copy_bytes_ += fetcher->fetcher->context().size;
+ if (duration > max_copy_time_) {
+ max_copy_time_ = duration;
+ max_copy_time_channel_ = fetcher->channel_index;
+ max_copy_time_size_ = fetcher->fetcher->context().size;
+ }
+}
+
+} // namespace logger
+} // namespace aos
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
new file mode 100644
index 0000000..4c7d39f
--- /dev/null
+++ b/aos/events/logging/log_writer.h
@@ -0,0 +1,323 @@
+#ifndef AOS_EVENTS_LOGGING_LOG_WRITER_H_
+#define AOS_EVENTS_LOGGING_LOG_WRITER_H_
+
+#include <chrono>
+#include <string_view>
+#include <vector>
+
+#include "aos/events/event_loop.h"
+#include "aos/events/logging/log_namer.h"
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
+#include "aos/events/simulated_event_loop.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/remote_message_generated.h"
+#include "aos/time/time.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+namespace logger {
+
+// Logs all channels available in the event loop to disk every 100 ms.
+// Start by logging one message per channel to capture any state and
+// configuration that is sent rately on a channel and would affect execution.
+class Logger {
+ public:
+ // Constructs a logger.
+ // event_loop: The event loop used to read the messages.
+ // configuration: When provided, this is the configuration to log, and the
+ // configuration to use for the channel list to log. If not provided,
+ // this becomes the configuration from the event loop.
+ // should_log: When provided, a filter for channels to log. If not provided,
+ // all available channels are logged.
+ Logger(EventLoop *event_loop)
+ : Logger(event_loop, event_loop->configuration()) {}
+ Logger(EventLoop *event_loop, const Configuration *configuration)
+ : Logger(event_loop, configuration,
+ [](const Channel *) { return true; }) {}
+ Logger(EventLoop *event_loop, const Configuration *configuration,
+ std::function<bool(const Channel *)> should_log);
+ ~Logger();
+
+ // Overrides the name in the log file header.
+ void set_name(std::string_view name) { name_ = name; }
+
+ // Sets the callback to run after each period of data is logged. Defaults to
+ // doing nothing.
+ //
+ // This callback may safely do things like call Rotate().
+ void set_on_logged_period(std::function<void()> on_logged_period) {
+ on_logged_period_ = std::move(on_logged_period);
+ }
+
+ void set_separate_config(bool separate_config) {
+ separate_config_ = separate_config;
+ }
+
+ // Sets the period between polling the data. Defaults to 100ms.
+ //
+ // Changing this while a set of files is being written may result in
+ // unreadable files.
+ void set_polling_period(std::chrono::nanoseconds polling_period) {
+ polling_period_ = polling_period;
+ }
+
+ std::string_view log_start_uuid() const { return log_start_uuid_; }
+ UUID logger_instance_uuid() const { return logger_instance_uuid_; }
+
+ // The maximum time for a single fetch which returned a message, or 0 if none
+ // of those have happened.
+ std::chrono::nanoseconds max_message_fetch_time() const {
+ return max_message_fetch_time_;
+ }
+ // The channel for that longest fetch which returned a message, or -1 if none
+ // of those have happened.
+ int max_message_fetch_time_channel() const {
+ return max_message_fetch_time_channel_;
+ }
+ // The size of the message returned by that longest fetch, or -1 if none of
+ // those have happened.
+ int max_message_fetch_time_size() const {
+ return max_message_fetch_time_size_;
+ }
+ // The total time spent fetching messages.
+ std::chrono::nanoseconds total_message_fetch_time() const {
+ return total_message_fetch_time_;
+ }
+ // The total number of fetch calls which returned messages.
+ int total_message_fetch_count() const { return total_message_fetch_count_; }
+ // The total number of bytes fetched.
+ int64_t total_message_fetch_bytes() const {
+ return total_message_fetch_bytes_;
+ }
+
+ // The total time spent in fetches which did not return a message.
+ std::chrono::nanoseconds total_nop_fetch_time() const {
+ return total_nop_fetch_time_;
+ }
+ // The total number of fetches which did not return a message.
+ int total_nop_fetch_count() const { return total_nop_fetch_count_; }
+
+ // The maximum time for a single copy, or 0 if none of those have happened.
+ std::chrono::nanoseconds max_copy_time() const { return max_copy_time_; }
+ // The channel for that longest copy, or -1 if none of those have happened.
+ int max_copy_time_channel() const { return max_copy_time_channel_; }
+ // The size of the message for that longest copy, or -1 if none of those have
+ // happened.
+ int max_copy_time_size() const { return max_copy_time_size_; }
+ // The total time spent copying messages.
+ std::chrono::nanoseconds total_copy_time() const { return total_copy_time_; }
+ // The total number of messages copied.
+ int total_copy_count() const { return total_copy_count_; }
+ // The total number of bytes copied.
+ int64_t total_copy_bytes() const { return total_copy_bytes_; }
+
+ void ResetStatisics();
+
+ // Rotates the log file(s), triggering new part files to be written for each
+ // log file.
+ void Rotate();
+
+ // Starts logging to files with the given naming scheme.
+ //
+ // log_start_uuid may be used to tie this log event to other log events across
+ // multiple nodes. The default (empty string) indicates there isn't one
+ // available.
+ void StartLogging(std::unique_ptr<LogNamer> log_namer,
+ std::string_view log_start_uuid = "");
+
+ // Stops logging. Ensures any messages through end_time make it into the log.
+ //
+ // If you want to stop ASAP, pass min_time to avoid reading any more messages.
+ //
+ // Returns the LogNamer in case the caller wants to do anything else with it
+ // before destroying it.
+ std::unique_ptr<LogNamer> StopLogging(
+ aos::monotonic_clock::time_point end_time);
+
+ // Returns whether a log is currently being written.
+ bool is_started() const { return static_cast<bool>(log_namer_); }
+
+ // Shortcut to call StartLogging with a LocalLogNamer when event processing
+ // starts.
+ void StartLoggingLocalNamerOnRun(std::string base_name) {
+ event_loop_->OnRun([this, base_name]() {
+ StartLogging(
+ std::make_unique<LocalLogNamer>(base_name, event_loop_->node()));
+ });
+ }
+
+ private:
+ // Structure to track both a fetcher, and if the data fetched has been
+ // written. We may want to delay writing data to disk so that we don't let
+ // data get too far out of order when written to disk so we can avoid making
+ // it too hard to sort when reading.
+ struct FetcherStruct {
+ std::unique_ptr<RawFetcher> fetcher;
+ bool written = false;
+
+ // Channel index to log to.
+ int channel_index = -1;
+ const Channel *channel = nullptr;
+ const Node *timestamp_node = nullptr;
+
+ LogType log_type = LogType::kLogMessage;
+
+ // We fill out the metadata at construction, but the actual writers have to
+ // be updated each time we start logging. To avoid duplicating the complex
+ // logic determining whether each writer should be initialized, we just
+ // stash the answer in separate member variables.
+ bool wants_writer = false;
+ DetachedBufferWriter *writer = nullptr;
+ bool wants_timestamp_writer = false;
+ DetachedBufferWriter *timestamp_writer = nullptr;
+ bool wants_contents_writer = false;
+ DetachedBufferWriter *contents_writer = nullptr;
+
+ // Node which this data is from, or -1 if it is unknown.
+ int data_node_index = -1;
+ // Node that this timestamp is for, or -1 if it is known.
+ int timestamp_node_index = -1;
+ // Node that the contents this contents_writer will log are from.
+ int contents_node_index = -1;
+ };
+
+ // Vector mapping from the channel index from the event loop to the logged
+ // channel index.
+ std::vector<int> event_loop_to_logged_channel_index_;
+
+ struct NodeState {
+ aos::monotonic_clock::time_point monotonic_start_time =
+ aos::monotonic_clock::min_time;
+ aos::realtime_clock::time_point realtime_start_time =
+ aos::realtime_clock::min_time;
+
+ bool has_source_node_boot_uuid = false;
+
+ // This is an initial UUID that is a valid UUID4 and is pretty obvious that
+ // it isn't valid.
+ std::string source_node_boot_uuid = "00000000-0000-4000-8000-000000000000";
+
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
+
+ // True if a header has been written to the start of a log file.
+ bool header_written = false;
+ // True if the current written header represents the contents which will
+ // follow. This is cleared when boot_uuid is known to not match anymore.
+ bool header_valid = false;
+
+ // Sets the source_node_boot_uuid, properly updating everything.
+ void SetBootUUID(std::string_view new_source_node_boot_uuid) {
+ source_node_boot_uuid = new_source_node_boot_uuid;
+ header_valid = false;
+ has_source_node_boot_uuid = true;
+
+ flatbuffers::String *source_node_boot_uuid_string =
+ log_file_header.mutable_message()->mutable_source_node_boot_uuid();
+ CHECK_EQ(source_node_boot_uuid.size(),
+ source_node_boot_uuid_string->size());
+ memcpy(source_node_boot_uuid_string->data(), source_node_boot_uuid.data(),
+ source_node_boot_uuid.size());
+ }
+ };
+
+ void WriteHeader();
+
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+ const Node *node, std::string_view config_sha256);
+
+ // Writes the header for the provided node if enough information is valid.
+ void MaybeWriteHeader(int node_index);
+ // Overload for when we already know node as well.
+ void MaybeWriteHeader(int node_index, const Node *node);
+
+ bool MaybeUpdateTimestamp(
+ const Node *node, int node_index,
+ aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time);
+
+ void DoLogData(const monotonic_clock::time_point end_time);
+
+ void WriteMissingTimestamps();
+
+ // Fetches from each channel until all the data is logged.
+ void LogUntil(monotonic_clock::time_point t);
+
+ void RecordFetchResult(aos::monotonic_clock::time_point start,
+ aos::monotonic_clock::time_point end, bool got_new,
+ FetcherStruct *fetcher);
+
+ void RecordCreateMessageTime(aos::monotonic_clock::time_point start,
+ aos::monotonic_clock::time_point end,
+ FetcherStruct *fetcher);
+
+ // Sets the start time for a specific node.
+ void SetStartTime(
+ size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time,
+ aos::monotonic_clock::time_point logger_monotonic_start_time,
+ aos::realtime_clock::time_point logger_realtime_start_time);
+
+ EventLoop *const event_loop_;
+ // The configuration to place at the top of the log file.
+ const Configuration *const configuration_;
+
+ UUID log_event_uuid_ = UUID::Zero();
+ const UUID logger_instance_uuid_ = UUID::Random();
+ std::unique_ptr<LogNamer> log_namer_;
+ // Empty indicates there isn't one.
+ std::string log_start_uuid_;
+
+ // Name to save in the log file. Defaults to hostname.
+ std::string name_;
+
+ std::function<void()> on_logged_period_ = []() {};
+
+ std::chrono::nanoseconds max_message_fetch_time_ =
+ std::chrono::nanoseconds::zero();
+ int max_message_fetch_time_channel_ = -1;
+ int max_message_fetch_time_size_ = -1;
+ std::chrono::nanoseconds total_message_fetch_time_ =
+ std::chrono::nanoseconds::zero();
+ int total_message_fetch_count_ = 0;
+ int64_t total_message_fetch_bytes_ = 0;
+
+ std::chrono::nanoseconds total_nop_fetch_time_ =
+ std::chrono::nanoseconds::zero();
+ int total_nop_fetch_count_ = 0;
+
+ std::chrono::nanoseconds max_copy_time_ = std::chrono::nanoseconds::zero();
+ int max_copy_time_channel_ = -1;
+ int max_copy_time_size_ = -1;
+ std::chrono::nanoseconds total_copy_time_ = std::chrono::nanoseconds::zero();
+ int total_copy_count_ = 0;
+ int64_t total_copy_bytes_ = 0;
+
+ std::vector<FetcherStruct> fetchers_;
+ TimerHandler *timer_handler_;
+
+ // Period to poll the channels.
+ std::chrono::nanoseconds polling_period_ = std::chrono::milliseconds(100);
+
+ // Last time that data was written for all channels to disk.
+ monotonic_clock::time_point last_synchronized_time_;
+
+ // Max size that the header has consumed. This much extra data will be
+ // reserved in the builder to avoid reallocating.
+ size_t max_header_size_ = 0;
+
+ // If true, write the message header into a separate file.
+ bool separate_config_ = true;
+
+ // Fetcher for all the statistics from all the nodes.
+ aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
+
+ std::vector<NodeState> node_state_;
+};
+
+} // namespace logger
+} // namespace aos
+
+#endif // AOS_EVENTS_LOGGING_LOG_WRITER_H_
diff --git a/aos/events/logging/logger_main.cc b/aos/events/logging/logger_main.cc
index da84cc9..dd1d8a2 100644
--- a/aos/events/logging/logger_main.cc
+++ b/aos/events/logging/logger_main.cc
@@ -2,7 +2,7 @@
#include <sys/time.h>
#include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_writer.h"
#include "aos/events/shm_event_loop.h"
#include "aos/init.h"
#include "aos/logging/log_namer.h"
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 2929198..bd41663 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1,7 +1,8 @@
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
#include "absl/strings/str_format.h"
#include "aos/events/event_loop.h"
+#include "aos/events/logging/log_writer.h"
#include "aos/events/message_counter.h"
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
@@ -32,6 +33,21 @@
constexpr std::string_view kConfigSha1(
"47511a1906dbb59cf9f8ad98ad08e568c718a4deb204c8bbce81ff76cef9095c");
+std::vector<std::vector<std::string>> ToLogReaderVector(
+ const std::vector<LogFile> &log_files) {
+ std::vector<std::vector<std::string>> result;
+ for (const LogFile &log_file : log_files) {
+ for (const LogParts &log_parts : log_file.parts) {
+ std::vector<std::string> parts;
+ for (const std::string &part : log_parts.parts) {
+ parts.emplace_back(part);
+ }
+ result.emplace_back(std::move(parts));
+ }
+ }
+ return result;
+}
+
class LoggerTest : public ::testing::Test {
public:
LoggerTest()
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 3cb5c26..85bbf18 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -197,7 +197,7 @@
":timestamp_fbs",
"//aos:unique_malloc_ptr",
"//aos/events:shm_event_loop",
- "//aos/events/logging:logger",
+ "//aos/events/logging:log_reader",
"//third_party/lksctp-tools:sctp",
],
)
@@ -275,7 +275,7 @@
":sctp_client",
":timestamp_fbs",
"//aos/events:shm_event_loop",
- "//aos/events/logging:logger",
+ "//aos/events/logging:log_reader",
],
)
@@ -448,7 +448,7 @@
":web_proxy",
"//aos:init",
"//aos/events:simulated_event_loop",
- "//aos/events/logging:logger",
+ "//aos/events/logging:log_reader",
"@com_github_google_flatbuffers//:flatbuffers",
],
)
diff --git a/aos/network/log_web_proxy_main.cc b/aos/network/log_web_proxy_main.cc
index 1942c57..5c099a2 100644
--- a/aos/network/log_web_proxy_main.cc
+++ b/aos/network/log_web_proxy_main.cc
@@ -4,7 +4,7 @@
// And then opening the plotting webpage at http://localhost:8080/graph.html
#include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/flatbuffer_merge.h"
#include "aos/init.h"
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index 38c3831..8a0ec34 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -3,7 +3,7 @@
#include <chrono>
#include <string_view>
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
#include "aos/events/shm_event_loop.h"
#include "aos/network/connect_generated.h"
#include "aos/network/message_bridge_client_generated.h"
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 1ac6cb5..aaeecff 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -2,7 +2,7 @@
#include "absl/strings/str_cat.h"
#include "absl/types/span.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/events/shm_event_loop.h"
#include "aos/network/connect_generated.h"
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index 55f3fea..fea2d06 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -4,7 +4,7 @@
#include <deque>
#include "absl/types/span.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/events/shm_event_loop.h"
#include "aos/network/connect_generated.h"
diff --git a/frc971/analysis/BUILD b/frc971/analysis/BUILD
index 2901463..b72634a 100644
--- a/frc971/analysis/BUILD
+++ b/frc971/analysis/BUILD
@@ -23,7 +23,7 @@
"//aos:json_to_flatbuffer",
"//aos/events:shm_event_loop",
"//aos/events:simulated_event_loop",
- "//aos/events/logging:logger",
+ "//aos/events/logging:log_reader",
"@com_github_google_glog//:glog",
"@python_repo//:python3.7_lib",
],
diff --git a/frc971/analysis/py_log_reader.cc b/frc971/analysis/py_log_reader.cc
index 8d3e2a6..5b32ed6 100644
--- a/frc971/analysis/py_log_reader.cc
+++ b/frc971/analysis/py_log_reader.cc
@@ -16,7 +16,7 @@
#include <errno.h>
#include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/flatbuffer_merge.h"
#include "aos/init.h"
diff --git a/frc971/control_loops/drivetrain/BUILD b/frc971/control_loops/drivetrain/BUILD
index ad39bcf..8c3274d 100644
--- a/frc971/control_loops/drivetrain/BUILD
+++ b/frc971/control_loops/drivetrain/BUILD
@@ -470,7 +470,7 @@
":drivetrain_output_fbs",
":drivetrain_test_lib",
"//aos/controls:control_loop_test",
- "//aos/events/logging:logger",
+ "//aos/events/logging:log_writer",
"//aos/testing:googletest",
"//frc971/queues:gyro_fbs",
"//frc971/wpilib:imu_fbs",
diff --git a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
index bfa3d26..b378484 100644
--- a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
+++ b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
@@ -6,7 +6,7 @@
#include "aos/controls/control_loop_test.h"
#include "aos/controls/polytope.h"
#include "aos/events/event_loop.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_writer.h"
#include "aos/time/time.h"
#include "gflags/gflags.h"
#include "gtest/gtest.h"
diff --git a/y2019/control_loops/drivetrain/BUILD b/y2019/control_loops/drivetrain/BUILD
index ed6d832..c21433b 100644
--- a/y2019/control_loops/drivetrain/BUILD
+++ b/y2019/control_loops/drivetrain/BUILD
@@ -232,7 +232,8 @@
"//aos:init",
"//aos/events:shm_event_loop",
"//aos/events:simulated_event_loop",
- "//aos/events/logging:logger",
+ "//aos/events/logging:log_reader",
+ "//aos/events/logging:log_writer",
"//frc971/control_loops/drivetrain:drivetrain_lib",
"@com_github_gflags_gflags//:gflags",
"@com_github_google_glog//:glog",
diff --git a/y2019/control_loops/drivetrain/drivetrain_replay.cc b/y2019/control_loops/drivetrain/drivetrain_replay.cc
index 69cce4f..97b4c2b 100644
--- a/y2019/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2019/control_loops/drivetrain/drivetrain_replay.cc
@@ -1,7 +1,8 @@
#include <iostream>
#include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_writer.h"
+#include "aos/events/logging/log_reader.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/init.h"
#include "aos/json_to_flatbuffer.h"
diff --git a/y2020/control_loops/drivetrain/BUILD b/y2020/control_loops/drivetrain/BUILD
index a74b18e..1b98592 100644
--- a/y2020/control_loops/drivetrain/BUILD
+++ b/y2020/control_loops/drivetrain/BUILD
@@ -137,7 +137,7 @@
":localizer",
"//aos/controls:control_loop_test",
"//aos/events:simulated_event_loop",
- "//aos/events/logging:logger",
+ "//aos/events/logging:log_writer",
"//aos/network:team_number",
"//aos/network:testing_time_converter",
"//frc971/control_loops:team_number_test_environment",
@@ -160,7 +160,8 @@
"//aos:init",
"//aos/events:shm_event_loop",
"//aos/events:simulated_event_loop",
- "//aos/events/logging:logger",
+ "//aos/events/logging:log_reader",
+ "//aos/events/logging:log_writer",
"//aos/testing:googletest",
"//frc971/control_loops/drivetrain:drivetrain_lib",
"@com_github_gflags_gflags//:gflags",
@@ -180,7 +181,8 @@
"//aos:init",
"//aos/events:shm_event_loop",
"//aos/events:simulated_event_loop",
- "//aos/events/logging:logger",
+ "//aos/events/logging:log_reader",
+ "//aos/events/logging:log_writer",
"//frc971/control_loops/drivetrain:drivetrain_lib",
"@com_github_gflags_gflags//:gflags",
"@com_github_google_glog//:glog",
diff --git a/y2020/control_loops/drivetrain/drivetrain_replay.cc b/y2020/control_loops/drivetrain/drivetrain_replay.cc
index 1533316..9b2375b 100644
--- a/y2020/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2020/control_loops/drivetrain/drivetrain_replay.cc
@@ -5,7 +5,8 @@
// in some other way. The original drivetrain status data will be on the
// /original/drivetrain channel.
#include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_writer.h"
+#include "aos/events/logging/log_reader.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/init.h"
#include "aos/json_to_flatbuffer.h"
diff --git a/y2020/control_loops/drivetrain/drivetrain_replay_test.cc b/y2020/control_loops/drivetrain/drivetrain_replay_test.cc
index 4b02923..2e6b327 100644
--- a/y2020/control_loops/drivetrain/drivetrain_replay_test.cc
+++ b/y2020/control_loops/drivetrain/drivetrain_replay_test.cc
@@ -11,7 +11,8 @@
#include "gtest/gtest.h"
#include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_writer.h"
+#include "aos/events/logging/log_reader.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/init.h"
#include "aos/json_to_flatbuffer.h"
diff --git a/y2020/control_loops/drivetrain/localizer_test.cc b/y2020/control_loops/drivetrain/localizer_test.cc
index d440aaf..d195b5d 100644
--- a/y2020/control_loops/drivetrain/localizer_test.cc
+++ b/y2020/control_loops/drivetrain/localizer_test.cc
@@ -3,7 +3,7 @@
#include "gtest/gtest.h"
#include "aos/controls/control_loop_test.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_writer.h"
#include "aos/network/message_bridge_server_generated.h"
#include "aos/network/team_number.h"
#include "aos/network/testing_time_converter.h"
diff --git a/y2020/control_loops/superstructure/BUILD b/y2020/control_loops/superstructure/BUILD
index d659920..3ea3dbf 100644
--- a/y2020/control_loops/superstructure/BUILD
+++ b/y2020/control_loops/superstructure/BUILD
@@ -105,7 +105,7 @@
":superstructure_status_fbs",
"//aos:math",
"//aos/controls:control_loop_test",
- "//aos/events/logging:logger",
+ "//aos/events/logging:log_writer",
"//aos/testing:googletest",
"//aos/time",
"//frc971/control_loops:capped_test_plant",
diff --git a/y2020/control_loops/superstructure/superstructure_lib_test.cc b/y2020/control_loops/superstructure/superstructure_lib_test.cc
index ac1e4a0..c406d09 100644
--- a/y2020/control_loops/superstructure/superstructure_lib_test.cc
+++ b/y2020/control_loops/superstructure/superstructure_lib_test.cc
@@ -4,7 +4,7 @@
#include <memory>
#include "aos/controls/control_loop_test.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_writer.h"
#include "frc971/control_loops/capped_test_plant.h"
#include "frc971/control_loops/position_sensor_sim.h"
#include "frc971/control_loops/team_number_test_environment.h"
diff --git a/y2020/vision/BUILD b/y2020/vision/BUILD
index a58ed8c..a5d47fa 100644
--- a/y2020/vision/BUILD
+++ b/y2020/vision/BUILD
@@ -117,7 +117,7 @@
":vision_fbs",
"//aos:init",
"//aos/events:simulated_event_loop",
- "//aos/events/logging:logger",
+ "//aos/events/logging:log_reader",
"//third_party:opencv",
],
)
diff --git a/y2020/vision/viewer_replay.cc b/y2020/vision/viewer_replay.cc
index 82ab11a..234c445 100644
--- a/y2020/vision/viewer_replay.cc
+++ b/y2020/vision/viewer_replay.cc
@@ -3,7 +3,7 @@
#include <opencv2/highgui/highgui.hpp>
#include <opencv2/imgproc.hpp>
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/init.h"
#include "y2020/vision/vision_generated.h"