Make LogNamer copy a template header rather than modify an input
This sets us up much better for making it so LogNamer can decided when
to rotate to handle boot UUIDs changes. We also have a bug hiding in
log_writer where we get duplicate headers written. This will make that
a lot harder to do by accident once we've got all the tracking
simplified inside LogNamer.
Change-Id: Ib4a2eb5085e12f74c2c61c58d1f511fc2aeb85fa
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index 166e505..7ff3bc4 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -9,6 +9,7 @@
#include "absl/strings/str_cat.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/flatbuffer_merge.h"
#include "aos/uuid.h"
#include "flatbuffers/flatbuffers.h"
#include "glog/logging.h"
@@ -16,22 +17,141 @@
namespace aos {
namespace logger {
-void LogNamer::UpdateHeader(
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- const UUID &uuid, int parts_index) const {
- header->mutable_message()->mutate_parts_index(parts_index);
- CHECK_EQ(UUID::kStringSize,
- header->mutable_message()->mutable_parts_uuid()->size());
- uuid.CopyTo(reinterpret_cast<char *>(
- header->mutable_message()->mutable_parts_uuid()->Data()));
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
+ size_t node_index, const UUID &source_node_boot_uuid,
+ const UUID &parts_uuid, int parts_index) const {
+ const Node *const source_node =
+ configuration::GetNode(configuration_, node_index);
+ CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 18u);
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
+ flatbuffers::Offset<aos::Configuration> configuration_offset;
+ if (header_.message().has_configuration()) {
+ CHECK(!header_.message().has_configuration_sha256());
+ configuration_offset =
+ CopyFlatBuffer(header_.message().configuration(), &fbb);
+ } else {
+ CHECK(!header_.message().has_configuration());
+ CHECK(header_.message().has_configuration_sha256());
+ config_sha256_offset = fbb.CreateString(
+ header_.message().configuration_sha256()->string_view());
+ }
+
+ CHECK(header_.message().has_name());
+ const flatbuffers::Offset<flatbuffers::String> name_offset =
+ fbb.CreateString(header_.message().name()->string_view());
+
+ CHECK(header_.message().has_log_event_uuid());
+ const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
+ fbb.CreateString(header_.message().log_event_uuid()->string_view());
+
+ CHECK(header_.message().has_logger_instance_uuid());
+ const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
+ fbb.CreateString(header_.message().logger_instance_uuid()->string_view());
+
+ flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
+ if (header_.message().has_log_start_uuid()) {
+ log_start_uuid_offset =
+ fbb.CreateString(header_.message().log_start_uuid()->string_view());
+ }
+
+ CHECK(header_.message().has_logger_node_boot_uuid());
+ const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
+ fbb.CreateString(
+ header_.message().logger_node_boot_uuid()->string_view());
+
+ CHECK_NE(source_node_boot_uuid, UUID::Zero());
+ const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
+ source_node_boot_uuid.PackString(&fbb);
+
+ const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
+ parts_uuid.PackString(&fbb);
+
+ flatbuffers::Offset<Node> node_offset;
+ flatbuffers::Offset<Node> logger_node_offset;
+
+ if (configuration::MultiNode(configuration_)) {
+ node_offset = RecursiveCopyFlatBuffer(source_node, &fbb);
+ logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
+ }
+
+ aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
+
+ log_file_header_builder.add_name(name_offset);
+
+ // Only add the node if we are running in a multinode configuration.
+ if (!logger_node_offset.IsNull()) {
+ log_file_header_builder.add_node(node_offset);
+ log_file_header_builder.add_logger_node(logger_node_offset);
+ }
+
+ if (!configuration_offset.IsNull()) {
+ log_file_header_builder.add_configuration(configuration_offset);
+ }
+ log_file_header_builder.add_max_out_of_order_duration(
+ header_.message().max_out_of_order_duration());
+
+ log_file_header_builder.add_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ node_states_[node_index].monotonic_start_time.time_since_epoch())
+ .count());
+ if (source_node == node_) {
+ log_file_header_builder.add_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ node_states_[node_index].realtime_start_time.time_since_epoch())
+ .count());
+ } else {
+ // Fill out the legacy start times. Since these were implemented to never
+ // change on reboot, they aren't very helpful in tracking what happened.
+ log_file_header_builder.add_logger_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ node_states_[node_index]
+ .logger_monotonic_start_time.time_since_epoch())
+ .count());
+ log_file_header_builder.add_logger_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ node_states_[node_index]
+ .logger_realtime_start_time.time_since_epoch())
+ .count());
+ }
+
+ // TODO(austin): Add more useful times. When was this part started? What do
+ // we know about both the logger and remote then?
+
+ log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
+ log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
+ if (!log_start_uuid_offset.IsNull()) {
+ log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
+ }
+ log_file_header_builder.add_logger_node_boot_uuid(
+ logger_node_boot_uuid_offset);
+ log_file_header_builder.add_source_node_boot_uuid(
+ source_node_boot_uuid_offset);
+
+ log_file_header_builder.add_parts_uuid(parts_uuid_offset);
+ log_file_header_builder.add_parts_index(parts_index);
+
+ if (!config_sha256_offset.IsNull()) {
+ log_file_header_builder.add_configuration_sha256(config_sha256_offset);
+ }
+
+ fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
+ fbb.Release());
+
+ CHECK(result.Verify()) << ": Built a corrupted header.";
+
+ return result;
}
-void LocalLogNamer::WriteHeader(
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- const Node *node) {
+void LocalLogNamer::WriteHeader(const Node *node) {
CHECK_EQ(node, this->node());
- UpdateHeader(header, data_writer_.uuid(), data_writer_.part_number);
- data_writer_.writer->QueueSpan(header->span());
+ const size_t node_index = configuration::GetNodeIndex(configuration_, node);
+ data_writer_.QueueHeader(
+ MakeHeader(node_index, node_states_[node_index].source_node_boot_uuid,
+ data_writer_.uuid(), data_writer_.part_number));
}
NewDataWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
@@ -40,13 +160,14 @@
return &data_writer_;
}
-void LocalLogNamer::Rotate(
- const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
+void LocalLogNamer::Rotate(const Node *node) {
CHECK(node == this->node());
+ const size_t node_index = configuration::GetNodeIndex(configuration_, node);
data_writer_.Rotate();
- UpdateHeader(header, data_writer_.uuid(), data_writer_.part_number);
- data_writer_.writer->QueueSpan(header->span());
+
+ data_writer_.QueueHeader(
+ MakeHeader(node_index, node_states_[node_index].source_node_boot_uuid,
+ data_writer_.uuid(), data_writer_.part_number));
}
void LocalLogNamer::WriteConfiguration(
@@ -60,9 +181,8 @@
writer->QueueSizedFlatbuffer(header->Release());
}
-void LocalLogNamer::Reboot(
- const Node * /*node*/,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> * /*header*/) {
+void LocalLogNamer::Reboot(const Node * /*node*/
+) {
LOG(FATAL) << "Can't reboot a single node.";
}
@@ -85,10 +205,7 @@
MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
const Configuration *configuration,
const Node *node)
- : LogNamer(node),
- base_name_(base_name),
- old_base_name_(),
- configuration_(configuration) {}
+ : LogNamer(configuration, node), base_name_(base_name), old_base_name_() {}
MultiNodeLogNamer::~MultiNodeLogNamer() {
if (!ran_out_of_space_) {
@@ -97,43 +214,34 @@
}
}
-void MultiNodeLogNamer::WriteHeader(
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- const Node *node) {
+void MultiNodeLogNamer::WriteHeader(const Node *node) {
if (node == this->node()) {
if (!data_writer_) {
OpenDataWriter();
}
- UpdateHeader(header, data_writer_->uuid(), data_writer_->part_number);
- data_writer_->writer->QueueSpan(header->span());
+
+ const size_t node_index = configuration::GetNodeIndex(configuration_, node);
+ data_writer_->QueueHeader(
+ MakeHeader(node_index, node_states_[node_index].source_node_boot_uuid,
+ data_writer_->uuid(), data_writer_->part_number));
} else {
+ const size_t node_index = configuration::GetNodeIndex(configuration_, node);
for (std::pair<const Channel *const, NewDataWriter> &data_writer :
data_writers_) {
if (node == data_writer.second.node) {
- UpdateHeader(header, data_writer.second.uuid(),
- data_writer.second.part_number);
- data_writer.second.writer->QueueSpan(header->span());
+ data_writer.second.QueueHeader(MakeHeader(
+ node_index, node_states_[node_index].source_node_boot_uuid,
+ data_writer.second.uuid(), data_writer.second.part_number));
}
}
}
}
-void MultiNodeLogNamer::Rotate(
- const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
- DoRotate(node, header, false);
-}
+void MultiNodeLogNamer::Rotate(const Node *node) { DoRotate(node, false); }
-void MultiNodeLogNamer::Reboot(
- const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
- DoRotate(node, header, true);
-}
+void MultiNodeLogNamer::Reboot(const Node *node) { DoRotate(node, true); }
-void MultiNodeLogNamer::DoRotate(
- const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- bool reboot) {
+void MultiNodeLogNamer::DoRotate(const Node *node, bool reboot) {
if (node == this->node()) {
if (data_writer_) {
if (reboot) {
@@ -143,10 +251,15 @@
}
// TODO(austin): Move this logic down once we have a better ownership
// model for the header.
- UpdateHeader(header, data_writer_->uuid(), data_writer_->part_number);
- data_writer_->writer->QueueSpan(header->span());
+
+ const size_t node_index =
+ configuration::GetNodeIndex(configuration_, node);
+ data_writer_->QueueHeader(
+ MakeHeader(node_index, node_states_[node_index].source_node_boot_uuid,
+ data_writer_->uuid(), data_writer_->part_number));
}
} else {
+ const size_t node_index = configuration::GetNodeIndex(configuration_, node);
for (std::pair<const Channel *const, NewDataWriter> &data_writer :
data_writers_) {
if (node == data_writer.second.node) {
@@ -155,9 +268,9 @@
} else {
data_writer.second.Rotate();
}
- UpdateHeader(header, data_writer.second.uuid(),
- data_writer.second.part_number);
- data_writer.second.writer->QueueSpan(header->span());
+ data_writer.second.QueueHeader(MakeHeader(
+ node_index, node_states_[node_index].source_node_boot_uuid,
+ data_writer.second.uuid(), data_writer.second.part_number));
}
}
}
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index f192dd3..8349a09 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -46,8 +46,6 @@
reopen_(this);
}
- // TODO(austin): Store common header in LogNamer.
- //
// TODO(austin): Copy header and add all UUIDs and such when available
// whenever data is written.
//
@@ -62,6 +60,13 @@
writer->QueueSizedFlatbuffer(fbb, now);
}
+ void QueueHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header) {
+ // TODO(austin): This triggers a dummy allocation that we don't need as part
+ // of releasing. Can we skip it?
+ writer->QueueSizedFlatbuffer(header.Release());
+ }
+
std::string_view filename() const { return writer->filename(); }
void Reboot() {
@@ -91,7 +96,11 @@
public:
// Constructs a LogNamer with the primary node (ie the one the logger runs on)
// being node.
- LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
+ LogNamer(const Configuration *configuration, const Node *node)
+ : configuration_(configuration), node_(node) {
+ nodes_.emplace_back(node_);
+ node_states_.resize(configuration::NodesCount(configuration_));
+ }
virtual ~LogNamer() {}
virtual std::string_view base_name() const = 0;
@@ -109,9 +118,7 @@
// Modifies header to contain the uuid and part number for each writer as it
// writes it. Since this is done unconditionally, it does not restore the
// previous value at the end.
- virtual void WriteHeader(
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- const Node *node) = 0;
+ virtual void WriteHeader(const Node *node) = 0;
// Returns a writer for writing data from messages on this channel (on the
// primary node).
@@ -133,20 +140,14 @@
//
// The returned pointer will stay valid across rotations, but the object it
// points to will be assigned to.
- virtual NewDataWriter *MakeForwardedTimestampWriter(
- const Channel *channel, const Node *node) = 0;
+ virtual NewDataWriter *MakeForwardedTimestampWriter(const Channel *channel,
+ const Node *node) = 0;
- // Rotates all log files for the provided node. The provided header will be
- // modified and written per WriteHeader above.
- virtual void Rotate(
- const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) = 0;
+ // Rotates all log files for the provided node.
+ virtual void Rotate(const Node *node) = 0;
- // Reboots all log files for the provided node. The provided header will be
- // modified and written per WriteHeader above. Resets any parts UUIDs.
- virtual void Reboot(
- const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) = 0;
+ // Reboots all log files for the provided node. Resets any parts UUIDs.
+ virtual void Reboot(const Node *node) = 0;
// Returns all the nodes that data is being written for.
const std::vector<const Node *> &nodes() const { return nodes_; }
@@ -159,14 +160,72 @@
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
std::string_view config_sha256) = 0;
- protected:
- // Modifies the header to have the provided UUID and part id.
- void UpdateHeader(
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- const UUID &uuid, int part_id) const;
+ void SetHeaderTemplate(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> header) {
+ header_ = std::move(header);
+ }
+ void SetStartTimes(size_t node_index,
+ monotonic_clock::time_point monotonic_start_time,
+ realtime_clock::time_point realtime_start_time,
+ monotonic_clock::time_point logger_monotonic_start_time,
+ realtime_clock::time_point logger_realtime_start_time) {
+ node_states_[node_index].monotonic_start_time = monotonic_start_time;
+ node_states_[node_index].realtime_start_time = realtime_start_time;
+ node_states_[node_index].logger_monotonic_start_time =
+ logger_monotonic_start_time;
+ node_states_[node_index].logger_realtime_start_time =
+ logger_realtime_start_time;
+ // TODO(austin): Track that the header has changed and needs to be
+ // rewritten.
+ }
+
+ monotonic_clock::time_point monotonic_start_time(size_t node_index) const {
+ return node_states_[node_index].monotonic_start_time;
+ }
+
+ // TODO(austin): I need to move header writing fully inside NewDataWriter and
+ // delete this method.
+ bool SetBootUUID(size_t node_index, const UUID &uuid) {
+ if (node_states_[node_index].source_node_boot_uuid != uuid) {
+ node_states_[node_index].source_node_boot_uuid = uuid;
+ return true;
+ }
+ return false;
+ }
+
+ protected:
+ // Creates a new header by copying fields out of the template and combining
+ // them with the arguments provided.
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+ size_t node_index, const UUID &source_node_boot_uuid,
+ const UUID &parts_uuid, int parts_index) const;
+
+ const Configuration *const configuration_;
const Node *const node_;
std::vector<const Node *> nodes_;
+
+ // Structure with state per node about times and such.
+ // TODO(austin): some of this lives better in NewDataWriter once we move
+ // ownership of deciding when to write headers into LogNamer.
+ struct NodeState {
+ // Time when this node started logging.
+ monotonic_clock::time_point monotonic_start_time =
+ monotonic_clock::min_time;
+ realtime_clock::time_point realtime_start_time = realtime_clock::min_time;
+
+ // Corresponding time on the logger node when it started logging.
+ monotonic_clock::time_point logger_monotonic_start_time =
+ monotonic_clock::min_time;
+ realtime_clock::time_point logger_realtime_start_time =
+ realtime_clock::min_time;
+
+ UUID source_node_boot_uuid = UUID::Zero();
+ };
+ std::vector<NodeState> node_states_;
+
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> header_ =
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
};
// Local log namer is a simple version which only names things
@@ -174,8 +233,9 @@
// any other log type.
class LocalLogNamer : public LogNamer {
public:
- LocalLogNamer(std::string_view base_name, const Node *node)
- : LogNamer(node),
+ LocalLogNamer(std::string_view base_name, const Configuration *configuration,
+ const Node *node)
+ : LogNamer(configuration, node),
base_name_(base_name),
data_writer_(
[this](NewDataWriter *writer) {
@@ -199,24 +259,18 @@
base_name_ = base_name;
}
- void WriteHeader(
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- const Node *node) override;
+ void WriteHeader(const Node *node) override;
NewDataWriter *MakeWriter(const Channel *channel) override;
- void Rotate(const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
- override;
+ void Rotate(const Node *node) override;
- void Reboot(const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
- override;
+ void Reboot(const Node *node) override;
NewDataWriter *MakeTimestampWriter(const Channel *channel) override;
- NewDataWriter *MakeForwardedTimestampWriter(
- const Channel * /*channel*/, const Node * /*node*/) override;
+ NewDataWriter *MakeForwardedTimestampWriter(const Channel * /*channel*/,
+ const Node * /*node*/) override;
void WriteConfiguration(
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
@@ -273,17 +327,11 @@
return all_filenames_;
}
- void WriteHeader(
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- const Node *node) override;
+ void WriteHeader(const Node *node) override;
- void Rotate(const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
- override;
+ void Rotate(const Node *node) override;
- void Reboot(const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
- override;
+ void Reboot(const Node *node) override;
void WriteConfiguration(
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
@@ -292,7 +340,7 @@
NewDataWriter *MakeWriter(const Channel *channel) override;
NewDataWriter *MakeForwardedTimestampWriter(const Channel *channel,
- const Node *node) override;
+ const Node *node) override;
NewDataWriter *MakeTimestampWriter(const Channel *channel) override;
@@ -393,10 +441,7 @@
private:
// Implements Rotate and Reboot, controlled by the 'reboot' flag. The only
// difference between the two is if DataWriter::uuid is reset or not.
- void DoRotate(
- const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- bool reboot);
+ void DoRotate(const Node *node, bool reboot);
// Opens up a writer for timestamps forwarded back.
void OpenForwardedTimestampWriter(const Channel *channel,
@@ -431,7 +476,6 @@
std::string base_name_;
std::string old_base_name_;
- const Configuration *const configuration_;
bool ran_out_of_space_ = false;
std::vector<std::string> all_filenames_;
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index abae9a1..71c5750 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -259,11 +259,7 @@
? configuration_->nodes()->size()
: 1u);
- for (const Node *node : log_namer_->nodes()) {
- const int node_index = configuration::GetNodeIndex(configuration_, node);
-
- node_state_[node_index].log_file_header = MakeHeader(node, config_sha256);
- }
+ log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
const aos::monotonic_clock::time_point beginning_time =
event_loop_->monotonic_now();
@@ -283,8 +279,9 @@
// Clear out any old timestamps in case we are re-starting logging.
for (size_t i = 0; i < node_state_.size(); ++i) {
- SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
- monotonic_clock::min_time, realtime_clock::min_time);
+ log_namer_->SetStartTimes(
+ i, monotonic_clock::min_time, realtime_clock::min_time,
+ monotonic_clock::min_time, realtime_clock::min_time);
}
const aos::monotonic_clock::time_point fetch_time =
@@ -346,9 +343,9 @@
server_statistics_fetcher_.Fetch();
}
- aos::monotonic_clock::time_point monotonic_start_time =
+ const aos::monotonic_clock::time_point monotonic_start_time =
event_loop_->monotonic_now();
- aos::realtime_clock::time_point realtime_start_time =
+ const aos::realtime_clock::time_point realtime_start_time =
event_loop_->realtime_now();
// We need to pick a point in time to declare the log file "started". This
@@ -399,9 +396,10 @@
//
// header_valid is cleared whenever the remote reboots.
if (node_state_[node_index].header_written) {
- log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
+ VLOG(1) << "Rebooting";
+ log_namer_->Reboot(node);
} else {
- log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
+ log_namer_->WriteHeader(node);
node_state_[node_index].header_written = true;
}
@@ -427,77 +425,32 @@
server_statistics_fetcher_.context().realtime_event_time)) {
CHECK(node_state_[node_index].header_written);
CHECK(node_state_[node_index].header_valid);
- log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+ VLOG(1) << "Rotating because timestamps changed";
+ log_namer_->Rotate(node);
} else {
MaybeWriteHeader(node_index, node);
}
}
}
-void Logger::SetStartTime(
- size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
- aos::realtime_clock::time_point realtime_start_time,
- aos::monotonic_clock::time_point logger_monotonic_start_time,
- aos::realtime_clock::time_point logger_realtime_start_time) {
- node_state_[node_index].monotonic_start_time = monotonic_start_time;
- node_state_[node_index].realtime_start_time = realtime_start_time;
- node_state_[node_index]
- .log_file_header.mutable_message()
- ->mutate_monotonic_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- monotonic_start_time.time_since_epoch())
- .count());
-
- // Add logger start times if they are available in the log file header.
- if (node_state_[node_index]
- .log_file_header.mutable_message()
- ->has_logger_monotonic_start_time()) {
- node_state_[node_index]
- .log_file_header.mutable_message()
- ->mutate_logger_monotonic_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- logger_monotonic_start_time.time_since_epoch())
- .count());
- }
-
- if (node_state_[node_index]
- .log_file_header.mutable_message()
- ->has_logger_realtime_start_time()) {
- node_state_[node_index]
- .log_file_header.mutable_message()
- ->mutate_logger_realtime_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- logger_realtime_start_time.time_since_epoch())
- .count());
- }
-
- if (node_state_[node_index]
- .log_file_header.mutable_message()
- ->has_realtime_start_time()) {
- node_state_[node_index]
- .log_file_header.mutable_message()
- ->mutate_realtime_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- realtime_start_time.time_since_epoch())
- .count());
- }
-}
-
bool Logger::MaybeUpdateTimestamp(
const Node *node, int node_index,
aos::monotonic_clock::time_point monotonic_start_time,
aos::realtime_clock::time_point realtime_start_time) {
// Bail early if the start times are already set.
- if (node_state_[node_index].monotonic_start_time !=
+ if (log_namer_->monotonic_start_time(node_index) !=
monotonic_clock::min_time) {
return false;
}
if (event_loop_->node() == node ||
!configuration::MultiNode(configuration_)) {
// There are no offsets to compute for ourself, so always succeed.
- SetStartTime(node_index, monotonic_start_time, realtime_start_time,
- monotonic_start_time, realtime_start_time);
- node_state_[node_index].SetBootUUID(event_loop_->boot_uuid());
+ log_namer_->SetStartTimes(node_index, monotonic_start_time,
+ realtime_start_time, monotonic_start_time,
+ realtime_start_time);
+ log_namer_->SetBootUUID(node_index, event_loop_->boot_uuid());
+ node_state_[node_index].header_valid = false;
+ node_state_[node_index].has_source_node_boot_uuid = true;
return true;
} else if (server_statistics_fetcher_.get() != nullptr) {
// We must be a remote node now. Look for the connection and see if it is
@@ -523,11 +476,11 @@
}
// Found it and it is connected. Compensate and go.
- SetStartTime(node_index,
- monotonic_start_time +
- std::chrono::nanoseconds(connection->monotonic_offset()),
- realtime_start_time, monotonic_start_time,
- realtime_start_time);
+ log_namer_->SetStartTimes(
+ node_index,
+ monotonic_start_time +
+ std::chrono::nanoseconds(connection->monotonic_offset()),
+ realtime_start_time, monotonic_start_time, realtime_start_time);
return true;
}
}
@@ -535,8 +488,7 @@
}
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
- const Node *node, std::string_view config_sha256) {
- // Now write the header with this timestamp in it.
+ std::string_view config_sha256) {
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
@@ -570,17 +522,9 @@
const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
event_loop_->boot_uuid().PackString(&fbb);
- const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
- event_loop_->boot_uuid().PackString(&fbb);
-
- const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
- fbb.CreateString("00000000-0000-4000-8000-000000000000");
-
- flatbuffers::Offset<Node> node_offset;
flatbuffers::Offset<Node> logger_node_offset;
if (configuration::MultiNode(configuration_)) {
- node_offset = RecursiveCopyFlatBuffer(node, &fbb);
logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
}
@@ -589,8 +533,7 @@
log_file_header_builder.add_name(name_offset);
// Only add the node if we are running in a multinode configuration.
- if (node != nullptr) {
- log_file_header_builder.add_node(node_offset);
+ if (configuration::MultiNode(configuration_)) {
log_file_header_builder.add_logger_node(logger_node_offset);
}
@@ -605,26 +548,6 @@
log_file_header_builder.add_max_out_of_order_duration(
std::chrono::nanoseconds(3 * polling_period_).count());
- log_file_header_builder.add_monotonic_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- monotonic_clock::min_time.time_since_epoch())
- .count());
- if (node == event_loop_->node()) {
- log_file_header_builder.add_realtime_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- realtime_clock::min_time.time_since_epoch())
- .count());
- } else {
- log_file_header_builder.add_logger_monotonic_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- monotonic_clock::min_time.time_since_epoch())
- .count());
- log_file_header_builder.add_logger_realtime_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- realtime_clock::min_time.time_since_epoch())
- .count());
- }
-
log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
if (!log_start_uuid_offset.IsNull()) {
@@ -632,13 +555,6 @@
}
log_file_header_builder.add_logger_node_boot_uuid(
logger_node_boot_uuid_offset);
- log_file_header_builder.add_source_node_boot_uuid(
- source_node_boot_uuid_offset);
-
- log_file_header_builder.add_parts_uuid(parts_uuid_offset);
- log_file_header_builder.add_parts_index(0);
-
- log_file_header_builder.add_configuration_sha256(0);
if (!config_sha256_offset.IsNull()) {
log_file_header_builder.add_configuration_sha256(config_sha256_offset);
@@ -672,8 +588,7 @@
void Logger::Rotate() {
for (const Node *node : log_namer_->nodes()) {
- const int node_index = configuration::GetNodeIndex(configuration_, node);
- log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+ log_namer_->Rotate(node);
}
}
@@ -712,8 +627,10 @@
// node. Our UUID can't change without restarting the application.
if (our_node_index != f.data_node_index) {
// And update our boot UUID if the UUID has changed.
- if (node_state_[f.data_node_index].SetBootUUID(
- f.fetcher->context().source_boot_uuid)) {
+ if (log_namer_->SetBootUUID(f.data_node_index,
+ f.fetcher->context().source_boot_uuid)) {
+ node_state_[f.data_node_index].header_valid = false;
+ node_state_[f.data_node_index].has_source_node_boot_uuid = true;
MaybeWriteHeader(f.data_node_index);
}
}
@@ -784,8 +701,10 @@
flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
- if (node_state_[f.contents_node_index].SetBootUUID(
- UUID::FromVector(msg->boot_uuid()))) {
+ if (log_namer_->SetBootUUID(f.contents_node_index,
+ UUID::FromVector(msg->boot_uuid()))) {
+ node_state_[f.contents_node_index].header_valid = false;
+ node_state_[f.contents_node_index].has_source_node_boot_uuid = true;
MaybeWriteHeader(f.contents_node_index);
}
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
index 3d975a3..fa76b15 100644
--- a/aos/events/logging/log_writer.h
+++ b/aos/events/logging/log_writer.h
@@ -149,8 +149,8 @@
// starts.
void StartLoggingLocalNamerOnRun(std::string base_name) {
event_loop_->OnRun([this, base_name]() {
- StartLogging(
- std::make_unique<LocalLogNamer>(base_name, event_loop_->node()));
+ StartLogging(std::make_unique<LocalLogNamer>(
+ base_name, event_loop_->configuration(), event_loop_->node()));
});
}
@@ -203,50 +203,21 @@
std::vector<int> event_loop_to_logged_channel_index_;
struct NodeState {
- aos::monotonic_clock::time_point monotonic_start_time =
- aos::monotonic_clock::min_time;
- aos::realtime_clock::time_point realtime_start_time =
- aos::realtime_clock::min_time;
-
+ // Tracks if LogNamer has a source boot UUID set or not.
bool has_source_node_boot_uuid = false;
- // This is an initial UUID that is a valid UUID4 and is pretty obvious that
- // it isn't valid.
- UUID source_node_boot_uuid = UUID::Zero();
-
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
-
// True if a header has been written to the start of a log file.
bool header_written = false;
// True if the current written header represents the contents which will
// follow. This is cleared when boot_uuid is known to not match anymore.
bool header_valid = false;
-
- // Sets the source_node_boot_uuid, properly updating everything. Returns
- // true if it changed, false otherwise.
- bool SetBootUUID(const UUID &new_source_node_boot_uuid) {
- if (has_source_node_boot_uuid &&
- source_node_boot_uuid == new_source_node_boot_uuid) {
- return false;
- }
- source_node_boot_uuid = new_source_node_boot_uuid;
- header_valid = false;
- has_source_node_boot_uuid = true;
-
- flatbuffers::String *source_node_boot_uuid_string =
- log_file_header.mutable_message()->mutable_source_node_boot_uuid();
- CHECK_EQ(UUID::kStringSize, source_node_boot_uuid_string->size());
- source_node_boot_uuid.CopyTo(source_node_boot_uuid_string->data());
-
- return true;
- }
};
void WriteHeader();
+ // Makes a template header for all the follower nodes.
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
- const Node *node, std::string_view config_sha256);
+ std::string_view config_sha256);
// Writes the header for the provided node if enough information is valid.
void MaybeWriteHeader(int node_index);
@@ -276,13 +247,6 @@
aos::monotonic_clock::time_point end,
FetcherStruct *fetcher);
- // Sets the start time for a specific node.
- void SetStartTime(
- size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
- aos::realtime_clock::time_point realtime_start_time,
- aos::monotonic_clock::time_point logger_monotonic_start_time,
- aos::realtime_clock::time_point logger_realtime_start_time);
-
EventLoop *const event_loop_;
// The configuration to place at the top of the log file.
const Configuration *const configuration_;
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 7c89b15..a7d627d 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -171,9 +171,11 @@
logger_event_loop->OnRun(
[base_name1, base_name2, &logger_event_loop, &logger]() {
logger.StartLogging(std::make_unique<LocalLogNamer>(
- base_name1, logger_event_loop->node()));
+ base_name1, logger_event_loop->configuration(),
+ logger_event_loop->node()));
EXPECT_DEATH(logger.StartLogging(std::make_unique<LocalLogNamer>(
- base_name2, logger_event_loop->node())),
+ base_name2, logger_event_loop->configuration(),
+ logger_event_loop->node())),
"Already logging");
});
event_loop_factory_.RunFor(chrono::milliseconds(20000));
@@ -204,7 +206,8 @@
logger.set_polling_period(std::chrono::milliseconds(100));
logger_event_loop->OnRun([base_name, &logger_event_loop, &logger]() {
logger.StartLogging(std::make_unique<LocalLogNamer>(
- base_name, logger_event_loop->node()));
+ base_name, logger_event_loop->configuration(),
+ logger_event_loop->node()));
logger.StopLogging(aos::monotonic_clock::min_time);
EXPECT_DEATH(logger.StopLogging(aos::monotonic_clock::min_time),
"Not logging right now");
@@ -240,13 +243,15 @@
Logger logger(logger_event_loop.get());
logger.set_separate_config(false);
logger.set_polling_period(std::chrono::milliseconds(100));
- logger.StartLogging(
- std::make_unique<LocalLogNamer>(base_name1, logger_event_loop->node()));
+ logger.StartLogging(std::make_unique<LocalLogNamer>(
+ base_name1, logger_event_loop->configuration(),
+ logger_event_loop->node()));
event_loop_factory_.RunFor(chrono::milliseconds(10000));
logger.StopLogging(logger_event_loop->monotonic_now());
event_loop_factory_.RunFor(chrono::milliseconds(10000));
- logger.StartLogging(
- std::make_unique<LocalLogNamer>(base_name2, logger_event_loop->node()));
+ logger.StartLogging(std::make_unique<LocalLogNamer>(
+ base_name2, logger_event_loop->configuration(),
+ logger_event_loop->node()));
event_loop_factory_.RunFor(chrono::milliseconds(10000));
}