Move log_namer to log_namer.{cc,h}
logger.{cc,h} is getting too big. Also move more implementation from
the header to the .cc, and add comments.
Change-Id: Ia342bfade98c71d4d42f769571256c28e2c826d5
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 5aba387..56bef43 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -36,11 +36,13 @@
cc_library(
name = "logger",
srcs = [
+ "log_namer.cc",
"logger.cc",
"logger_math.cc",
],
hdrs = [
"eigen_mpq.h",
+ "log_namer.h",
"logger.h",
],
visibility = ["//visibility:public"],
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
new file mode 100644
index 0000000..def0842
--- /dev/null
+++ b/aos/events/logging/log_namer.cc
@@ -0,0 +1,237 @@
+#include "aos/events/logging/log_namer.h"
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <string_view>
+#include <vector>
+
+#include "absl/strings/str_cat.h"
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
+#include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace logger {
+
+void LogNamer::UpdateHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ const UUID &uuid, int parts_index) const {
+ header->mutable_message()->mutate_parts_index(parts_index);
+ CHECK_EQ(uuid.string_view().size(),
+ header->mutable_message()->mutable_parts_uuid()->size());
+ std::copy(uuid.string_view().begin(), uuid.string_view().end(),
+ reinterpret_cast<char *>(
+ header->mutable_message()->mutable_parts_uuid()->Data()));
+}
+
+void LocalLogNamer::WriteHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ const Node *node) {
+ CHECK_EQ(node, this->node());
+ UpdateHeader(header, uuid_, part_number_);
+ data_writer_->WriteSizedFlatbuffer(header->full_span());
+}
+
+DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
+ CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
+ return data_writer_.get();
+}
+
+void LocalLogNamer::Rotate(
+ const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
+ CHECK(node == this->node());
+ ++part_number_;
+ *data_writer_ = std::move(*OpenDataWriter());
+ UpdateHeader(header, uuid_, part_number_);
+ data_writer_->WriteSizedFlatbuffer(header->full_span());
+}
+
+DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
+ const Channel *channel) {
+ CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
+ << ": Message is not delivered to this node.";
+ CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
+ CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
+ node_))
+ << ": Delivery times aren't logged for this channel on this node.";
+ return data_writer_.get();
+}
+
+DetachedBufferWriter *LocalLogNamer::MakeForwardedTimestampWriter(
+ const Channel * /*channel*/, const Node * /*node*/) {
+ LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
+ return nullptr;
+}
+
+MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
+ const Configuration *configuration,
+ const Node *node)
+ : LogNamer(node),
+ base_name_(base_name),
+ configuration_(configuration),
+ uuid_(UUID::Random()),
+ data_writer_(OpenDataWriter()) {}
+
+void MultiNodeLogNamer::WriteHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ const Node *node) {
+ if (node == this->node()) {
+ UpdateHeader(header, uuid_, part_number_);
+ data_writer_->WriteSizedFlatbuffer(header->full_span());
+ } else {
+ for (std::pair<const Channel *const, DataWriter> &data_writer :
+ data_writers_) {
+ if (node == data_writer.second.node) {
+ UpdateHeader(header, data_writer.second.uuid,
+ data_writer.second.part_number);
+ data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+ }
+ }
+ }
+}
+
+void MultiNodeLogNamer::Rotate(
+ const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
+ if (node == this->node()) {
+ ++part_number_;
+ *data_writer_ = std::move(*OpenDataWriter());
+ UpdateHeader(header, uuid_, part_number_);
+ data_writer_->WriteSizedFlatbuffer(header->full_span());
+ } else {
+ for (std::pair<const Channel *const, DataWriter> &data_writer :
+ data_writers_) {
+ if (node == data_writer.second.node) {
+ ++data_writer.second.part_number;
+ data_writer.second.rotate(data_writer.first, &data_writer.second);
+ UpdateHeader(header, data_writer.second.uuid,
+ data_writer.second.part_number);
+ data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+ }
+ }
+ }
+}
+
+DetachedBufferWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
+ // See if we can read the data on this node at all.
+ const bool is_readable =
+ configuration::ChannelIsReadableOnNode(channel, this->node());
+ if (!is_readable) {
+ return nullptr;
+ }
+
+ // Then, see if we are supposed to log the data here.
+ const bool log_message =
+ configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
+
+ if (!log_message) {
+ return nullptr;
+ }
+
+ // Now, sort out if this is data generated on this node, or not. It is
+ // generated if it is sendable on this node.
+ if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
+ return data_writer_.get();
+ }
+
+ // Ok, we have data that is being forwarded to us that we are supposed to
+ // log. It needs to be logged with send timestamps, but be sorted enough
+ // to be able to be processed.
+ CHECK(data_writers_.find(channel) == data_writers_.end());
+
+ // Track that this node is being logged.
+ const Node *source_node = configuration::GetNode(
+ configuration_, channel->source_node()->string_view());
+
+ if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
+ nodes_.emplace_back(source_node);
+ }
+
+ DataWriter data_writer;
+ data_writer.node = source_node;
+ data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
+ OpenWriter(channel, data_writer);
+ };
+ data_writer.rotate(channel, &data_writer);
+
+ return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
+ .first->second.writer.get();
+}
+
+DetachedBufferWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
+ const Channel *channel, const Node *node) {
+ // See if we can read the data on this node at all.
+ const bool is_readable =
+ configuration::ChannelIsReadableOnNode(channel, this->node());
+ CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
+
+ CHECK(data_writers_.find(channel) == data_writers_.end());
+
+ if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
+ nodes_.emplace_back(node);
+ }
+
+ DataWriter data_writer;
+ data_writer.node = node;
+ data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
+ OpenForwardedTimestampWriter(channel, data_writer);
+ };
+ data_writer.rotate(channel, &data_writer);
+
+ return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
+ .first->second.writer.get();
+}
+
+DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
+ const Channel *channel) {
+ const bool log_delivery_times =
+ (this->node() == nullptr)
+ ? false
+ : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ channel, this->node(), this->node());
+ if (!log_delivery_times) {
+ return nullptr;
+ }
+
+ return data_writer_.get();
+}
+
+void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
+ DataWriter *data_writer) {
+ std::string filename =
+ absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
+ "/", channel->type()->string_view(), ".part",
+ data_writer->part_number, ".bfbs");
+
+ if (!data_writer->writer) {
+ data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+ } else {
+ *data_writer->writer = DetachedBufferWriter(filename);
+ }
+}
+
+void MultiNodeLogNamer::OpenWriter(const Channel *channel,
+ DataWriter *data_writer) {
+ const std::string filename = absl::StrCat(
+ base_name_, "_", channel->source_node()->string_view(), "_data",
+ channel->name()->string_view(), "/", channel->type()->string_view(),
+ ".part", data_writer->part_number, ".bfbs");
+ if (!data_writer->writer) {
+ data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+ } else {
+ *data_writer->writer = DetachedBufferWriter(filename);
+ }
+}
+
+std::unique_ptr<DetachedBufferWriter> MultiNodeLogNamer::OpenDataWriter() {
+ return std::make_unique<DetachedBufferWriter>(
+ absl::StrCat(base_name_, "_", node()->name()->string_view(), "_data.part",
+ part_number_, ".bfbs"));
+}
+
+} // namespace logger
+} // namespace aos
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
new file mode 100644
index 0000000..631016b
--- /dev/null
+++ b/aos/events/logging/log_namer.h
@@ -0,0 +1,168 @@
+#ifndef AOS_EVENTS_LOGGING_LOG_NAMER_H_
+#define AOS_EVENTS_LOGGING_LOG_NAMER_H_
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <string_view>
+#include <vector>
+
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+namespace logger {
+
+// Interface describing how to name, track, and add headers to log file parts.
+class LogNamer {
+ public:
+ // Constructs a LogNamer with the primary node (ie the one the logger runs on)
+ // being node.
+ LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
+ virtual ~LogNamer() {}
+
+ // Writes the header to all log files for a specific node. This function
+ // needs to be called after all the writers are created.
+ //
+ // Modifies header to contain the uuid and part number for each writer as it
+ // writes it. Since this is done unconditionally, it does not restore the
+ // previous value at the end.
+ virtual void WriteHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ const Node *node) = 0;
+
+ // Returns a writer for writing data logged on this channel (on the node
+ // provided in the constructor).
+ virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
+
+ // Returns a writer for writing timestamps logged on this channel (on the node
+ // provided in the constructor).
+ virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
+
+ // Returns a writer for writing timestamps delivered over the special
+ // /aos/remote_timestamps/* channels. node is the node that the timestamps
+ // are forwarded back from.
+ virtual DetachedBufferWriter *MakeForwardedTimestampWriter(
+ const Channel *channel, const Node *node) = 0;
+
+ // Rotates all log files for the provided node. The provided header will be
+ // modified and written per WriteHeader above.
+ virtual void Rotate(
+ const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) = 0;
+
+ // Returns all the nodes that data is being written for.
+ const std::vector<const Node *> &nodes() const { return nodes_; }
+
+ // Returns the node the logger is running on.
+ const Node *node() const { return node_; }
+
+ protected:
+ // Modifies the header to have the provided UUID and part id.
+ void UpdateHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ const UUID &uuid, int part_id) const;
+
+ const Node *const node_;
+ std::vector<const Node *> nodes_;
+};
+
+// Local log namer is a simple version which only names things
+// "base_name.part#.bfbs" and increments the part number. It doesn't support
+// any other log type.
+class LocalLogNamer : public LogNamer {
+ public:
+ LocalLogNamer(std::string_view base_name, const Node *node)
+ : LogNamer(node),
+ base_name_(base_name),
+ uuid_(UUID::Random()),
+ data_writer_(OpenDataWriter()) {}
+
+ void WriteHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ const Node *node) override;
+
+ DetachedBufferWriter *MakeWriter(const Channel *channel) override;
+
+ void Rotate(const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
+ override;
+
+ DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
+
+ DetachedBufferWriter *MakeForwardedTimestampWriter(
+ const Channel * /*channel*/, const Node * /*node*/) override;
+
+ private:
+ // Creates a new data writer with the new part number.
+ std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
+ return std::make_unique<DetachedBufferWriter>(
+ absl::StrCat(base_name_, ".part", part_number_, ".bfbs"));
+ }
+
+ const std::string base_name_;
+ const UUID uuid_;
+ size_t part_number_ = 0;
+ std::unique_ptr<DetachedBufferWriter> data_writer_;
+};
+
+// Log namer which uses a config and a base name to name a bunch of files.
+class MultiNodeLogNamer : public LogNamer {
+ public:
+ MultiNodeLogNamer(std::string_view base_name,
+ const Configuration *configuration, const Node *node);
+
+ void WriteHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ const Node *node) override;
+
+ void Rotate(const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
+ override;
+
+ DetachedBufferWriter *MakeWriter(const Channel *channel) override;
+
+ DetachedBufferWriter *MakeForwardedTimestampWriter(
+ const Channel *channel, const Node *node) override;
+
+ DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
+
+ private:
+ // Files to write remote data to. We want one per channel. Maps the channel
+ // to the writer, Node, and part number.
+ struct DataWriter {
+ std::unique_ptr<DetachedBufferWriter> writer = nullptr;
+ const Node *node;
+ size_t part_number = 0;
+ UUID uuid = UUID::Random();
+ std::function<void(const Channel *, DataWriter *)> rotate;
+ };
+
+ // Opens up a writer for timestamps forwarded back.
+ void OpenForwardedTimestampWriter(const Channel *channel,
+ DataWriter *data_writer);
+
+ // Opens up a writer for remote data.
+ void OpenWriter(const Channel *channel, DataWriter *data_writer);
+
+ // Opens the main data writer file for this node responsible for data_writer_.
+ std::unique_ptr<DetachedBufferWriter> OpenDataWriter();
+
+ const std::string base_name_;
+ const Configuration *const configuration_;
+ const UUID uuid_;
+
+ size_t part_number_ = 0;
+
+ // File to write both delivery timestamps and local data to.
+ std::unique_ptr<DetachedBufferWriter> data_writer_;
+
+ std::map<const Channel *, DataWriter> data_writers_;
+};
+
+} // namespace logger
+} // namespace aos
+
+#endif // AOS_EVENTS_LOGGING_LOG_NAMER_H_
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 85809b1..c94cb3a 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -35,56 +35,6 @@
namespace logger {
namespace chrono = std::chrono;
-void LogNamer::UpdateHeader(
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- const UUID &uuid, int parts_index) {
- header->mutable_message()->mutate_parts_index(parts_index);
- CHECK_EQ(uuid.string_view().size(),
- header->mutable_message()->mutable_parts_uuid()->size());
- std::copy(uuid.string_view().begin(), uuid.string_view().end(),
- reinterpret_cast<char *>(
- header->mutable_message()->mutable_parts_uuid()->Data()));
-}
-
-void MultiNodeLogNamer::WriteHeader(
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- const Node *node) {
- if (node == this->node()) {
- UpdateHeader(header, uuid_, part_number_);
- data_writer_->WriteSizedFlatbuffer(header->full_span());
- } else {
- for (std::pair<const Channel *const, DataWriter> &data_writer :
- data_writers_) {
- if (node == data_writer.second.node) {
- UpdateHeader(header, data_writer.second.uuid,
- data_writer.second.part_number);
- data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
- }
- }
- }
-}
-
-void MultiNodeLogNamer::Rotate(
- const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
- if (node == this->node()) {
- ++part_number_;
- *data_writer_ = std::move(*OpenDataWriter());
- UpdateHeader(header, uuid_, part_number_);
- data_writer_->WriteSizedFlatbuffer(header->full_span());
- } else {
- for (std::pair<const Channel *const, DataWriter> &data_writer :
- data_writers_) {
- if (node == data_writer.second.node) {
- ++data_writer.second.part_number;
- data_writer.second.rotate(data_writer.first, &data_writer.second);
- UpdateHeader(header, data_writer.second.uuid,
- data_writer.second.part_number);
- data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
- }
- }
- }
-}
Logger::Logger(std::string_view base_name, EventLoop *event_loop,
std::chrono::milliseconds polling_period)
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index d4ed786..6c67130 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -12,6 +12,7 @@
#include "absl/types/span.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/eigen_mpq.h"
+#include "aos/events/logging/log_namer.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/events/logging/uuid.h"
@@ -25,261 +26,6 @@
namespace aos {
namespace logger {
-class LogNamer {
- public:
- LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
- virtual ~LogNamer() {}
-
- virtual void WriteHeader(
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- const Node *node) = 0;
- virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
-
- virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
- virtual DetachedBufferWriter *MakeForwardedTimestampWriter(
- const Channel *channel, const Node *node) = 0;
- virtual void Rotate(
- const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) = 0;
- const std::vector<const Node *> &nodes() const { return nodes_; }
-
- const Node *node() const { return node_; }
-
- protected:
- void UpdateHeader(
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- const UUID &uuid, int part_id);
-
- const Node *const node_;
- std::vector<const Node *> nodes_;
-};
-
-class LocalLogNamer : public LogNamer {
- public:
- LocalLogNamer(std::string_view base_name, const Node *node)
- : LogNamer(node),
- base_name_(base_name),
- uuid_(UUID::Random()),
- data_writer_(OpenDataWriter()) {}
-
- void WriteHeader(
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- const Node *node) override {
- CHECK_EQ(node, this->node());
- UpdateHeader(header, uuid_, part_number_);
- data_writer_->WriteSizedFlatbuffer(header->full_span());
- }
-
- DetachedBufferWriter *MakeWriter(const Channel *channel) override {
- CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
- return data_writer_.get();
- }
-
- void Rotate(const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
- override {
- CHECK(node == this->node());
- ++part_number_;
- *data_writer_ = std::move(*OpenDataWriter());
- UpdateHeader(header, uuid_, part_number_);
- data_writer_->WriteSizedFlatbuffer(header->full_span());
- }
-
- DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
- CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
- << ": Message is not delivered to this node.";
- CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
- CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
- node_))
- << ": Delivery times aren't logged for this channel on this node.";
- return data_writer_.get();
- }
-
- DetachedBufferWriter *MakeForwardedTimestampWriter(
- const Channel * /*channel*/, const Node * /*node*/) override {
- LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
- return nullptr;
- }
-
- private:
- std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
- return std::make_unique<DetachedBufferWriter>(
- absl::StrCat(base_name_, ".part", part_number_, ".bfbs"));
- }
- const std::string base_name_;
- const UUID uuid_;
- size_t part_number_ = 0;
- std::unique_ptr<DetachedBufferWriter> data_writer_;
-};
-
-// TODO(austin): Split naming files from making files so we can re-use the
-// naming code to predict the log file names for a provided base name.
-class MultiNodeLogNamer : public LogNamer {
- public:
- MultiNodeLogNamer(std::string_view base_name,
- const Configuration *configuration, const Node *node)
- : LogNamer(node),
- base_name_(base_name),
- configuration_(configuration),
- uuid_(UUID::Random()),
- data_writer_(OpenDataWriter()) {}
-
- // Writes the header to all log files for a specific node. This function
- // needs to be called after all the writers are created.
- void WriteHeader(
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
- const Node *node) override;
-
- void Rotate(const Node *node,
- aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
- override;
-
- // Makes a data logger for a specific channel.
- DetachedBufferWriter *MakeWriter(const Channel *channel) override {
- // See if we can read the data on this node at all.
- const bool is_readable =
- configuration::ChannelIsReadableOnNode(channel, this->node());
- if (!is_readable) {
- return nullptr;
- }
-
- // Then, see if we are supposed to log the data here.
- const bool log_message =
- configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
-
- if (!log_message) {
- return nullptr;
- }
-
- // Now, sort out if this is data generated on this node, or not. It is
- // generated if it is sendable on this node.
- if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
- return data_writer_.get();
- }
-
- // Ok, we have data that is being forwarded to us that we are supposed to
- // log. It needs to be logged with send timestamps, but be sorted enough
- // to be able to be processed.
- CHECK(data_writers_.find(channel) == data_writers_.end());
-
- // Track that this node is being logged.
- const Node *source_node = configuration::GetNode(
- configuration_, channel->source_node()->string_view());
-
- if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
- nodes_.emplace_back(source_node);
- }
-
- DataWriter data_writer;
- data_writer.node = source_node;
- data_writer.rotate = [this](const Channel *channel,
- DataWriter *data_writer) {
- OpenWriter(channel, data_writer);
- };
- data_writer.rotate(channel, &data_writer);
-
- return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
- .first->second.writer.get();
- }
-
- DetachedBufferWriter *MakeForwardedTimestampWriter(
- const Channel *channel, const Node *node) override {
- // See if we can read the data on this node at all.
- const bool is_readable =
- configuration::ChannelIsReadableOnNode(channel, this->node());
- CHECK(is_readable) << ": "
- << configuration::CleanedChannelToString(channel);
-
- CHECK(data_writers_.find(channel) == data_writers_.end());
-
- if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
- nodes_.emplace_back(node);
- }
-
- DataWriter data_writer;
- data_writer.node = node;
- data_writer.rotate = [this](const Channel *channel,
- DataWriter *data_writer) {
- OpenForwardedTimestampWriter(channel, data_writer);
- };
- data_writer.rotate(channel, &data_writer);
-
- return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
- .first->second.writer.get();
- }
-
- // Makes a timestamp (or timestamp and data) logger for a channel and
- // forwarding connection.
- DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
- const bool log_delivery_times =
- (this->node() == nullptr)
- ? false
- : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
- channel, this->node(), this->node());
- if (!log_delivery_times) {
- return nullptr;
- }
-
- return data_writer_.get();
- }
-
- const std::vector<const Node *> &nodes() const { return nodes_; }
-
- private:
- // Files to write remote data to. We want one per channel. Maps the channel
- // to the writer, Node, and part number.
- struct DataWriter {
- std::unique_ptr<DetachedBufferWriter> writer = nullptr;
- const Node *node;
- size_t part_number = 0;
- UUID uuid = UUID::Random();
- std::function<void(const Channel *, DataWriter *)> rotate;
- };
-
- void OpenForwardedTimestampWriter(const Channel *channel,
- DataWriter *data_writer) {
- std::string filename =
- absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
- "/", channel->type()->string_view(), ".part",
- data_writer->part_number, ".bfbs");
-
- if (!data_writer->writer) {
- data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
- } else {
- *data_writer->writer = DetachedBufferWriter(filename);
- }
- }
-
- void OpenWriter(const Channel *channel, DataWriter *data_writer) {
- const std::string filename = absl::StrCat(
- base_name_, "_", channel->source_node()->string_view(), "_data",
- channel->name()->string_view(), "/", channel->type()->string_view(),
- ".part", data_writer->part_number, ".bfbs");
- if (!data_writer->writer) {
- data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
- } else {
- *data_writer->writer = DetachedBufferWriter(filename);
- }
- }
-
- std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
- return std::make_unique<DetachedBufferWriter>(
- absl::StrCat(base_name_, "_", node()->name()->string_view(),
- "_data.part", part_number_, ".bfbs"));
- }
-
- const std::string base_name_;
- const Configuration *const configuration_;
- const UUID uuid_;
-
- size_t part_number_ = 0;
-
- // File to write both delivery timestamps and local data to.
- std::unique_ptr<DetachedBufferWriter> data_writer_;
-
- std::map<const Channel *, DataWriter> data_writers_;
-};
-
// Logs all channels available in the event loop to disk every 100 ms.
// Start by logging one message per channel to capture any state and
// configuration that is sent rately on a channel and would affect execution.