Add multi-node log file reading
This handles timestamps, sorting, and merging with data.
For simplicity, we read the log files once per node. Once benchmarks
show if this is a bad idea, we can fix it.
Change-Id: I445ac5bfc7186bda25cc899602ac8d95a4cb946d
diff --git a/aos/BUILD b/aos/BUILD
index 80ba32a..2ae0fec 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -462,6 +462,7 @@
"@com_github_google_flatbuffers//:flatbuffers",
"@com_github_google_glog//:glog",
"@com_google_absl//absl/strings",
+ "@com_google_absl//absl/types:span",
],
)
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index f2427b8..7fb2378 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -7,16 +7,20 @@
#include "aos/json_to_flatbuffer.h"
#include "gflags/gflags.h"
-DEFINE_string(logfile, "/tmp/logfile.bfbs",
- "Name of the logfile to read from.");
DEFINE_string(
name, "",
"Name to match for printing out channels. Empty means no name filter.");
DEFINE_string(type, "",
"Channel type to match for printing out channels. Empty means no "
"type filter.");
+DEFINE_bool(raw, false,
+ "If true, just print the data out unsorted and unparsed");
+
int main(int argc, char **argv) {
gflags::SetUsageMessage(
+ "Usage:\n"
+ " log_cat [args] logfile1 logfile2 ...\n"
+ "\n"
"This program provides a basic interface to dump data from a logfile to "
"stdout. Given a logfile, channel name filter, and type filter, it will "
"print all the messages in the logfile matching the filters. The message "
@@ -27,64 +31,103 @@
"the logged data.");
aos::InitGoogle(&argc, &argv);
- aos::logger::LogReader reader(FLAGS_logfile);
- reader.Register();
-
- std::unique_ptr<aos::EventLoop> printer_event_loop =
- reader.event_loop_factory()->MakeEventLoop("printer", reader.node());
- printer_event_loop->SkipTimingReport();
- printer_event_loop->SkipAosLog();
-
- bool found_channel = false;
- const flatbuffers::Vector<flatbuffers::Offset<aos::Channel>> *channels =
- reader.configuration()->channels();
- for (flatbuffers::uoffset_t i = 0; i < channels->size(); i++) {
- const aos::Channel *channel = channels->Get(i);
- const flatbuffers::string_view name = channel->name()->string_view();
- const flatbuffers::string_view type = channel->type()->string_view();
- if (name.find(FLAGS_name) != std::string::npos &&
- type.find(FLAGS_type) != std::string::npos) {
- if (!aos::configuration::ChannelIsReadableOnNode(
- channel, printer_event_loop->node())) {
- continue;
- }
- LOG(INFO) << "Listening on " << name << " " << type;
-
- CHECK_NOTNULL(channel->schema());
- printer_event_loop->MakeRawWatcher(
- channel, [channel](const aos::Context &context, const void *message) {
- // Print the flatbuffer out to stdout, both to remove the
- // unnecessary cruft from glog and to allow the user to readily
- // redirect just the logged output independent of any debugging
- // information on stderr.
- if (context.monotonic_remote_time != context.monotonic_event_time) {
- std::cout << context.realtime_remote_time << " ("
- << context.monotonic_remote_time << ") delivered "
- << context.realtime_event_time << " ("
- << context.monotonic_event_time << ") "
- << channel->name()->c_str() << ' '
- << channel->type()->c_str() << ": "
- << aos::FlatbufferToJson(
- channel->schema(),
- static_cast<const uint8_t *>(message))
- << std::endl;
- } else {
- std::cout << context.realtime_event_time << " ("
- << context.monotonic_event_time << ") "
- << channel->name()->c_str() << ' '
- << channel->type()->c_str() << ": "
- << aos::FlatbufferToJson(
- channel->schema(),
- static_cast<const uint8_t *>(message))
- << std::endl;
- }
- });
- found_channel = true;
+ if (FLAGS_raw) {
+ if (argc != 2) {
+ LOG(FATAL) << "Expected 1 logfile as an argument.";
}
+ aos::logger::MessageReader reader(argv[1]);
+
+ while (true) {
+ std::optional<aos::FlatbufferVector<aos::logger::MessageHeader>> message =
+ reader.ReadMessage();
+ if (!message) {
+ break;
+ }
+
+ std::cout << aos::FlatbufferToJson(message.value()) << std::endl;
+ }
+ return 0;
}
- if (!found_channel) {
- LOG(FATAL) << "Could not find any channels";
+ if (argc < 2) {
+ LOG(FATAL) << "Expected at least 1 logfile as an argument.";
+ }
+
+ std::vector<std::vector<std::string>> logfiles;
+
+ for (int i = 1; i < argc; ++i) {
+ logfiles.emplace_back(std::vector<std::string>{std::string(argv[i])});
+ }
+
+ aos::logger::LogReader reader(logfiles);
+ reader.Register();
+
+ std::vector<std::unique_ptr<aos::EventLoop>> printer_event_loops;
+
+ for (const aos::Node *node : reader.Nodes()) {
+ std::unique_ptr<aos::EventLoop> printer_event_loop =
+ reader.event_loop_factory()->MakeEventLoop("printer", node);
+ printer_event_loop->SkipTimingReport();
+ printer_event_loop->SkipAosLog();
+
+ bool found_channel = false;
+ const flatbuffers::Vector<flatbuffers::Offset<aos::Channel>> *channels =
+ reader.configuration()->channels();
+ for (flatbuffers::uoffset_t i = 0; i < channels->size(); i++) {
+ const aos::Channel *channel = channels->Get(i);
+ const flatbuffers::string_view name = channel->name()->string_view();
+ const flatbuffers::string_view type = channel->type()->string_view();
+ if (name.find(FLAGS_name) != std::string::npos &&
+ type.find(FLAGS_type) != std::string::npos) {
+ if (!aos::configuration::ChannelIsReadableOnNode(
+ channel, printer_event_loop->node())) {
+ continue;
+ }
+ VLOG(1) << "Listening on " << name << " " << type;
+
+ std::string node_name =
+ node == nullptr ? ""
+ : std::string(node->name()->string_view()) + " ";
+
+ CHECK_NOTNULL(channel->schema());
+ printer_event_loop->MakeRawWatcher(
+ channel, [channel, node_name](const aos::Context &context,
+ const void *message) {
+ // Print the flatbuffer out to stdout, both to remove the
+ // unnecessary cruft from glog and to allow the user to readily
+ // redirect just the logged output independent of any debugging
+ // information on stderr.
+ if (context.monotonic_remote_time !=
+ context.monotonic_event_time) {
+ std::cout << node_name << context.realtime_event_time << " ("
+ << context.monotonic_event_time << ") sent "
+ << context.realtime_remote_time << " ("
+ << context.monotonic_remote_time << ") "
+ << channel->name()->c_str() << ' '
+ << channel->type()->c_str() << ": "
+ << aos::FlatbufferToJson(
+ channel->schema(),
+ static_cast<const uint8_t *>(message))
+ << std::endl;
+ } else {
+ std::cout << node_name << context.realtime_event_time << " ("
+ << context.monotonic_event_time << ") "
+ << channel->name()->c_str() << ' '
+ << channel->type()->c_str() << ": "
+ << aos::FlatbufferToJson(
+ channel->schema(),
+ static_cast<const uint8_t *>(message))
+ << std::endl;
+ }
+ });
+ found_channel = true;
+ }
+ }
+
+ if (!found_channel) {
+ LOG(FATAL) << "Could not find any channels";
+ }
+ printer_event_loops.emplace_back(std::move(printer_event_loop));
}
reader.event_loop_factory()->Run();
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index 65da871..88bf15c 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -14,6 +14,8 @@
name, "",
"Name to match for printing out channels. Empty means no name filter.");
+DEFINE_string(node, "", "Node to print stats out for.");
+
// define struct to hold all information
struct ChannelStats {
// pointer to the channel for which stats are collected
@@ -63,9 +65,23 @@
aos::SimulatedEventLoopFactory log_reader_factory(reader.configuration());
reader.Register(&log_reader_factory);
+ const aos::Node *node = nullptr;
+
+ if (aos::configuration::MultiNode(reader.configuration())) {
+ if (FLAGS_node.empty()) {
+ LOG(INFO) << "Need a --node specified. The log file has:";
+ for (const aos::Node *node : reader.Nodes()) {
+ LOG(INFO) << " " << node->name()->string_view();
+ }
+ return 1;
+ } else {
+ node = aos::configuration::GetNode(reader.configuration(), FLAGS_node);
+ }
+ }
+
// Make an eventloop for retrieving stats
std::unique_ptr<aos::EventLoop> stats_event_loop =
- log_reader_factory.MakeEventLoop("logstats", reader.node());
+ log_reader_factory.MakeEventLoop("logstats", node);
stats_event_loop->SkipTimingReport();
stats_event_loop->SkipAosLog();
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index a5ca084..144890a 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -11,6 +11,7 @@
#include "aos/configuration.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/flatbuffer_merge.h"
+#include "aos/util/file.h"
#include "flatbuffers/flatbuffers.h"
#include "gflags/gflags.h"
#include "glog/logging.h"
@@ -24,9 +25,12 @@
namespace chrono = std::chrono;
DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
- : fd_(open(std::string(filename).c_str(),
- O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
- PCHECK(fd_ != -1) << ": Failed to open " << filename;
+ : filename_(filename) {
+ util::MkdirP(filename, 0777);
+ fd_ = open(std::string(filename).c_str(),
+ O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
+ VLOG(1) << "Opened " << filename << " for writing";
+ PCHECK(fd_ != -1) << ": Failed to open " << filename << " for writing";
}
DetachedBufferWriter::~DetachedBufferWriter() {
@@ -105,6 +109,7 @@
switch (log_type) {
case LogType::kLogMessage:
case LogType::kLogMessageAndDeliveryTime:
+ case LogType::kLogRemoteMessage:
data_offset =
fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
break;
@@ -115,14 +120,30 @@
MessageHeader::Builder message_header_builder(*fbb);
message_header_builder.add_channel_index(channel_index);
- message_header_builder.add_queue_index(context.queue_index);
- message_header_builder.add_monotonic_sent_time(
- context.monotonic_event_time.time_since_epoch().count());
- message_header_builder.add_realtime_sent_time(
- context.realtime_event_time.time_since_epoch().count());
+
+ switch (log_type) {
+ case LogType::kLogRemoteMessage:
+ message_header_builder.add_queue_index(context.remote_queue_index);
+ message_header_builder.add_monotonic_sent_time(
+ context.monotonic_remote_time.time_since_epoch().count());
+ message_header_builder.add_realtime_sent_time(
+ context.realtime_remote_time.time_since_epoch().count());
+ break;
+
+ case LogType::kLogMessage:
+ case LogType::kLogMessageAndDeliveryTime:
+ case LogType::kLogDeliveryTimeOnly:
+ message_header_builder.add_queue_index(context.queue_index);
+ message_header_builder.add_monotonic_sent_time(
+ context.monotonic_event_time.time_since_epoch().count());
+ message_header_builder.add_realtime_sent_time(
+ context.realtime_event_time.time_since_epoch().count());
+ break;
+ }
switch (log_type) {
case LogType::kLogMessage:
+ case LogType::kLogRemoteMessage:
message_header_builder.add_data(data_offset);
break;
@@ -143,7 +164,8 @@
}
SpanReader::SpanReader(std::string_view filename)
- : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
+ : filename_(filename),
+ fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
PCHECK(fd_ != -1) << ": Failed to open " << filename;
}
@@ -224,6 +246,20 @@
return true;
}
+FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename) {
+ SpanReader span_reader(filename);
+ // Make sure we have enough to read the size.
+ absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
+
+ // Make sure something was read.
+ CHECK(config_data != absl::Span<const uint8_t>());
+
+ // And copy the config so we have it forever.
+ std::vector<uint8_t> data(
+ config_data.begin() + sizeof(flatbuffers::uoffset_t), config_data.end());
+ return FlatbufferVector<LogFileHeader>(std::move(data));
+}
+
MessageReader::MessageReader(std::string_view filename)
: span_reader_(filename) {
// Make sure we have enough to read the size.
@@ -253,23 +289,57 @@
chrono::nanoseconds(result.message().monotonic_sent_time()));
newest_timestamp_ = std::max(newest_timestamp_, timestamp);
- return result;
+ VLOG(1) << "Read from " << filename().substr(130) << " data "
+ << FlatbufferToJson(result);
+ return std::move(result);
}
-SortedMessageReader::SortedMessageReader(
+SplitMessageReader::SplitMessageReader(
const std::vector<std::string> &filenames)
: filenames_(filenames),
log_file_header_(FlatbufferDetachedBuffer<LogFileHeader>::Empty()) {
CHECK(NextLogFile()) << ": filenames is empty. Need files to read.";
+ // Grab any log file header. They should all match (and we will check as we
+ // open more of them).
log_file_header_ = CopyFlatBuffer(message_reader_->log_file_header());
+ // Setup per channel state.
channels_.resize(configuration()->channels()->size());
+ for (ChannelData &channel_data : channels_) {
+ channel_data.data.split_reader = this;
+ // Build up the timestamp list.
+ if (configuration::MultiNode(configuration())) {
+ channel_data.timestamps.resize(configuration()->nodes()->size());
+ for (MessageHeaderQueue &queue : channel_data.timestamps) {
+ queue.timestamps = true;
+ queue.split_reader = this;
+ }
+ }
+ }
- QueueMessages();
+ // Build up channels_to_write_ as an optimization to make it fast to figure
+ // out which datastructure to place any new data from a channel on.
+ for (const Channel *channel : *configuration()->channels()) {
+ // This is the main case. We will only see data on this node.
+ if (configuration::ChannelIsSendableOnNode(channel, node())) {
+ channels_to_write_.emplace_back(
+ &channels_[channels_to_write_.size()].data);
+ } else
+ // If we can't send, but can receive, we should be able to see
+ // timestamps here.
+ if (configuration::ChannelIsReadableOnNode(channel, node())) {
+ channels_to_write_.emplace_back(
+ &(channels_[channels_to_write_.size()]
+ .timestamps[configuration::GetNodeIndex(configuration(),
+ node())]));
+ } else {
+ channels_to_write_.emplace_back(nullptr);
+ }
+ }
}
-bool SortedMessageReader::NextLogFile() {
+bool SplitMessageReader::NextLogFile() {
if (next_filename_index_ == filenames_.size()) {
return false;
}
@@ -279,13 +349,8 @@
// We can't support the config diverging between two log file headers. See if
// they are the same.
if (next_filename_index_ != 0) {
- // Since we copied before, we need to copy again to guarantee that things
- // didn't get re-ordered.
- const FlatbufferDetachedBuffer<LogFileHeader> new_log_file_header =
- CopyFlatBuffer(message_reader_->log_file_header());
- CHECK_EQ(new_log_file_header.size(), log_file_header_.size());
- CHECK(memcmp(new_log_file_header.data(), log_file_header_.data(),
- log_file_header_.size()) == 0)
+ CHECK(CompareFlatBuffer(&log_file_header_.message(),
+ message_reader_->log_file_header()))
<< ": Header is different between log file chunks "
<< filenames_[next_filename_index_] << " and "
<< filenames_[next_filename_index_ - 1] << ", this is not supported.";
@@ -295,22 +360,170 @@
return true;
}
-void SortedMessageReader::EmplaceDataBack(
- FlatbufferVector<MessageHeader> &&new_data) {
- const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
- chrono::nanoseconds(new_data.message().monotonic_sent_time()));
- const size_t channel_index = new_data.message().channel_index();
- CHECK_LT(channel_index, channels_.size());
+bool SplitMessageReader::QueueMessages(
+ monotonic_clock::time_point oldest_message_time) {
+ // TODO(austin): Once we are happy that everything works, read a 256kb chunk
+ // to reduce the need to re-heap down below.
+ while (true) {
+ // Don't queue if we have enough data already.
+ // When a log file starts, there should be a message from each channel.
+ // Those messages might be very old. Make sure to read a chunk past the
+ // starting time.
+ if (queued_messages_ > 0 &&
+ message_reader_->queue_data_time() > oldest_message_time) {
+ return true;
+ }
- if (channels_[channel_index].data.size() == 0) {
- channels_[channel_index].oldest_timestamp = timestamp;
- PushChannelHeap(timestamp, channel_index);
+ if (std::optional<FlatbufferVector<MessageHeader>> msg =
+ message_reader_->ReadMessage()) {
+ const MessageHeader &header = msg.value().message();
+
+ const int channel_index = header.channel_index();
+ channels_to_write_[channel_index]->emplace_back(std::move(msg.value()));
+
+ ++queued_messages_;
+ } else {
+ if (!NextLogFile()) {
+ return false;
+ }
+ }
}
- channels_[channel_index].data.emplace_back(std::move(new_data));
+}
+
+void SplitMessageReader::SetTimestampMerger(TimestampMerger *timestamp_merger,
+ int channel_index,
+ const Node *target_node) {
+ const Node *reinterpreted_target_node =
+ configuration::GetNodeOrDie(configuration(), target_node);
+ const Channel *const channel =
+ configuration()->channels()->Get(channel_index);
+
+ MessageHeaderQueue *message_header_queue = nullptr;
+
+ // Figure out if this log file is from our point of view, or the other node's
+ // point of view.
+ if (node() == reinterpreted_target_node) {
+ if (channels_to_write_[channel_index] != nullptr) {
+ // We already have deduced which is the right channel. Use
+ // channels_to_write_ here.
+ message_header_queue = channels_to_write_[channel_index];
+ } else {
+ // This means this is data from another node, and will be ignored.
+ }
+ } else {
+ // We are replaying from another node's point of view. The only interesting
+ // data is data that is forwarded to our node, ie was sent on the other
+ // node.
+ if (configuration::ChannelIsSendableOnNode(channel, node())) {
+ // Data from another node.
+ message_header_queue = &(channels_[channel_index].data);
+ } else {
+ // This is either not sendable on the other node, or is a timestamp and
+ // therefore not interesting.
+ }
+ }
+
+ // If we found one, write it down. This will be nullptr when there is nothing
+ // relevant on this channel on this node for the target node. In that case,
+ // we want to drop the message instead of queueing it.
+ if (message_header_queue != nullptr) {
+ message_header_queue->timestamp_merger = timestamp_merger;
+ }
+}
+
+std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+SplitMessageReader::PopOldest(int channel_index) {
+ CHECK_GT(channels_[channel_index].data.size(), 0u);
+ const std::tuple<monotonic_clock::time_point, uint32_t> timestamp =
+ channels_[channel_index].data.front_timestamp();
+ FlatbufferVector<MessageHeader> front =
+ std::move(channels_[channel_index].data.front());
+ channels_[channel_index].data.pop_front();
+ --queued_messages_;
+
+ return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
+ std::move(front));
+}
+
+std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+SplitMessageReader::PopOldest(int channel, int node_index) {
+ CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
+ const std::tuple<monotonic_clock::time_point, uint32_t> timestamp =
+ channels_[channel].timestamps[node_index].front_timestamp();
+ FlatbufferVector<MessageHeader> front =
+ std::move(channels_[channel].timestamps[node_index].front());
+ channels_[channel].timestamps[node_index].pop_front();
+ --queued_messages_;
+
+ return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
+ std::move(front));
+}
+
+void SplitMessageReader::MessageHeaderQueue::emplace_back(
+ FlatbufferVector<MessageHeader> &&msg) {
+ CHECK(split_reader != nullptr);
+
+ // If there is no timestamp merger for this queue, nobody is listening. Drop
+ // the message. This happens when a log file from another node is replayed,
+ // and the timestamp mergers down stream just don't care.
+ if (timestamp_merger == nullptr) {
+ return;
+ }
+
+ CHECK(timestamps != msg.message().has_data())
+ << ": Got timestamps and data mixed up on a node. "
+ << FlatbufferToJson(msg);
+
+ data_.emplace_back(std::move(msg));
+
+ if (data_.size() == 1u) {
+ // Yup, new data. Notify.
+ if (timestamps) {
+ timestamp_merger->UpdateTimestamp(split_reader, front_timestamp());
+ } else {
+ timestamp_merger->Update(split_reader, front_timestamp());
+ }
+ }
+}
+
+void SplitMessageReader::MessageHeaderQueue::pop_front() {
+ data_.pop_front();
+ if (data_.size() != 0u) {
+ // Yup, new data.
+ if (timestamps) {
+ timestamp_merger->UpdateTimestamp(split_reader, front_timestamp());
+ } else {
+ timestamp_merger->Update(split_reader, front_timestamp());
+ }
+ }
}
namespace {
+bool SplitMessageReaderHeapCompare(
+ const std::tuple<monotonic_clock::time_point, uint32_t,
+ SplitMessageReader *>
+ first,
+ const std::tuple<monotonic_clock::time_point, uint32_t,
+ SplitMessageReader *>
+ second) {
+ if (std::get<0>(first) > std::get<0>(second)) {
+ return true;
+ } else if (std::get<0>(first) == std::get<0>(second)) {
+ if (std::get<1>(first) > std::get<1>(second)) {
+ return true;
+ } else if (std::get<1>(first) == std::get<1>(second)) {
+ return std::get<2>(first) > std::get<2>(second);
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+}
+
bool ChannelHeapCompare(
const std::pair<monotonic_clock::time_point, int> first,
const std::pair<monotonic_clock::time_point, int> second) {
@@ -325,8 +538,366 @@
} // namespace
-void SortedMessageReader::PushChannelHeap(monotonic_clock::time_point timestamp,
- int channel_index) {
+TimestampMerger::TimestampMerger(
+ const Configuration *configuration,
+ std::vector<SplitMessageReader *> split_message_readers, int channel_index,
+ const Node *target_node, ChannelMerger *channel_merger)
+ : configuration_(configuration),
+ split_message_readers_(std::move(split_message_readers)),
+ channel_index_(channel_index),
+ node_index_(configuration::MultiNode(configuration)
+ ? configuration::GetNodeIndex(configuration, target_node)
+ : -1),
+ channel_merger_(channel_merger) {
+ // Tell the readers we care so they know who to notify.
+ for (SplitMessageReader *reader : split_message_readers_) {
+ reader->SetTimestampMerger(this, channel_index, target_node);
+ }
+
+ // And then determine if we need to track timestamps.
+ const Channel *channel = configuration->channels()->Get(channel_index);
+ if (!configuration::ChannelIsSendableOnNode(channel, target_node) &&
+ configuration::ChannelIsReadableOnNode(channel, target_node)) {
+ has_timestamps_ = true;
+ }
+}
+
+void TimestampMerger::PushMessageHeap(
+ std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+ SplitMessageReader *split_message_reader) {
+ DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
+ [split_message_reader](
+ const std::tuple<monotonic_clock::time_point,
+ uint32_t, SplitMessageReader *>
+ x) {
+ return std::get<2>(x) == split_message_reader;
+ }) == message_heap_.end())
+ << ": Pushing message when it is already in the heap.";
+
+ message_heap_.push_back(std::make_tuple(
+ std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
+
+ std::push_heap(message_heap_.begin(), message_heap_.end(),
+ &SplitMessageReaderHeapCompare);
+
+ // If we are just a data merger, don't wait for timestamps.
+ if (!has_timestamps_) {
+ channel_merger_->Update(std::get<0>(timestamp), channel_index_);
+ pushed_ = true;
+ }
+}
+
+void TimestampMerger::PushTimestampHeap(
+ std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+ SplitMessageReader *split_message_reader) {
+ DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
+ [split_message_reader](
+ const std::tuple<monotonic_clock::time_point,
+ uint32_t, SplitMessageReader *>
+ x) {
+ return std::get<2>(x) == split_message_reader;
+ }) == timestamp_heap_.end())
+ << ": Pushing timestamp when it is already in the heap.";
+
+ timestamp_heap_.push_back(std::make_tuple(
+ std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
+
+ std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+ SplitMessageReaderHeapCompare);
+
+ // If we are a timestamp merger, don't wait for data. Missing data will be
+ // caught at read time.
+ if (has_timestamps_) {
+ channel_merger_->Update(std::get<0>(timestamp), channel_index_);
+ pushed_ = true;
+ }
+}
+
+std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+TimestampMerger::PopMessageHeap() {
+ // Pop the oldest message reader pointer off the heap.
+ CHECK_GT(message_heap_.size(), 0u);
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+ oldest_message_reader = message_heap_.front();
+
+ std::pop_heap(message_heap_.begin(), message_heap_.end(),
+ &SplitMessageReaderHeapCompare);
+ message_heap_.pop_back();
+
+ // Pop the oldest message. This re-pushes any messages from the reader to the
+ // message heap.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ oldest_message =
+ std::get<2>(oldest_message_reader)->PopOldest(channel_index_);
+
+ // Confirm that the time and queue_index we have recorded matches.
+ CHECK_EQ(std::get<0>(oldest_message), std::get<0>(oldest_message_reader));
+ CHECK_EQ(std::get<1>(oldest_message), std::get<1>(oldest_message_reader));
+
+ // Now, keep reading until we have found all duplicates.
+ while (message_heap_.size() > 0u) {
+ // See if it is a duplicate.
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+ next_oldest_message_reader = message_heap_.front();
+
+ std::tuple<monotonic_clock::time_point, uint32_t> next_oldest_message_time =
+ std::get<2>(next_oldest_message_reader)->oldest_message(channel_index_);
+
+ if (std::get<0>(next_oldest_message_time) == std::get<0>(oldest_message) &&
+ std::get<1>(next_oldest_message_time) == std::get<1>(oldest_message)) {
+ // Pop the message reader pointer.
+ std::pop_heap(message_heap_.begin(), message_heap_.end(),
+ &SplitMessageReaderHeapCompare);
+ message_heap_.pop_back();
+
+ // Pop the next oldest message. This re-pushes any messages from the
+ // reader.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ next_oldest_message = std::get<2>(next_oldest_message_reader)
+ ->PopOldest(channel_index_);
+
+ // And make sure the message matches in it's entirety.
+ CHECK(std::get<2>(oldest_message).span() ==
+ std::get<2>(next_oldest_message).span())
+ << ": Data at the same timestamp doesn't match.";
+ } else {
+ break;
+ }
+ }
+
+ return oldest_message;
+}
+
+std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+TimestampMerger::PopTimestampHeap() {
+ // Pop the oldest message reader pointer off the heap.
+ CHECK_GT(timestamp_heap_.size(), 0u);
+
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+ oldest_timestamp_reader = timestamp_heap_.front();
+
+ std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+ &SplitMessageReaderHeapCompare);
+ timestamp_heap_.pop_back();
+
+ CHECK(node_index_ != -1) << ": Timestamps in a single node environment";
+
+ // Pop the oldest message. This re-pushes any timestamps from the reader to
+ // the timestamp heap.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ oldest_timestamp = std::get<2>(oldest_timestamp_reader)
+ ->PopOldest(channel_index_, node_index_);
+
+ // Confirm that the time we have recorded matches.
+ CHECK_EQ(std::get<0>(oldest_timestamp), std::get<0>(oldest_timestamp_reader));
+ CHECK_EQ(std::get<1>(oldest_timestamp), std::get<1>(oldest_timestamp_reader));
+
+ // TODO(austin): What if we get duplicate timestamps?
+
+ return oldest_timestamp;
+}
+
+std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
+TimestampMerger::PopOldest() {
+ if (has_timestamps_) {
+ CHECK_GT(message_heap_.size(), 0u)
+ << ": Missing data from source node, no data available to match "
+ "timestamp on "
+ << configuration::CleanedChannelToString(
+ configuration_->channels()->Get(channel_index_));
+
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ oldest_timestamp = PopTimestampHeap();
+
+ TimestampMerger::DeliveryTimestamp timestamp;
+ timestamp.monotonic_event_time =
+ monotonic_clock::time_point(chrono::nanoseconds(
+ std::get<2>(oldest_timestamp).message().monotonic_sent_time()));
+ timestamp.realtime_event_time =
+ realtime_clock::time_point(chrono::nanoseconds(
+ std::get<2>(oldest_timestamp).message().realtime_sent_time()));
+
+ // Consistency check.
+ CHECK_EQ(timestamp.monotonic_event_time, std::get<0>(oldest_timestamp));
+ CHECK_EQ(std::get<2>(oldest_timestamp).message().queue_index(),
+ std::get<1>(oldest_timestamp));
+
+ monotonic_clock::time_point remote_timestamp_monotonic_time(
+ chrono::nanoseconds(
+ std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
+
+ while (true) {
+ // Ok, now try grabbing data until we find one which matches.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ oldest_message = PopMessageHeap();
+
+ // Time at which the message was sent (this message is written from the
+ // sending node's perspective.
+ monotonic_clock::time_point remote_monotonic_time(chrono::nanoseconds(
+ std::get<2>(oldest_message).message().monotonic_sent_time()));
+
+ if (remote_monotonic_time < remote_timestamp_monotonic_time) {
+ LOG(INFO) << "Undelivered message, skipping. Remote time is "
+ << remote_monotonic_time << " timestamp is "
+ << remote_timestamp_monotonic_time << " on channel "
+ << channel_index_;
+ continue;
+ }
+
+ timestamp.monotonic_remote_time = remote_monotonic_time;
+ timestamp.realtime_remote_time =
+ realtime_clock::time_point(chrono::nanoseconds(
+ std::get<2>(oldest_message).message().realtime_sent_time()));
+ timestamp.remote_queue_index =
+ std::get<2>(oldest_message).message().queue_index();
+
+ CHECK_EQ(remote_monotonic_time, remote_timestamp_monotonic_time);
+ CHECK_EQ(timestamp.remote_queue_index, std::get<1>(oldest_timestamp));
+
+ return std::make_tuple(timestamp, std::get<2>(oldest_message));
+ }
+ } else {
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ oldest_message = PopMessageHeap();
+
+ TimestampMerger::DeliveryTimestamp timestamp;
+ timestamp.monotonic_event_time =
+ monotonic_clock::time_point(chrono::nanoseconds(
+ std::get<2>(oldest_message).message().monotonic_sent_time()));
+ timestamp.realtime_event_time =
+ realtime_clock::time_point(chrono::nanoseconds(
+ std::get<2>(oldest_message).message().realtime_sent_time()));
+ timestamp.remote_queue_index = 0xffffffff;
+
+ CHECK_EQ(std::get<0>(oldest_message), timestamp.monotonic_event_time);
+ CHECK_EQ(std::get<1>(oldest_message),
+ std::get<2>(oldest_message).message().queue_index());
+
+ return std::make_tuple(timestamp, std::get<2>(oldest_message));
+ }
+}
+
+namespace {
+std::vector<std::unique_ptr<SplitMessageReader>> MakeSplitMessageReaders(
+ const std::vector<std::vector<std::string>> &filenames) {
+ CHECK_GT(filenames.size(), 0u);
+ // Build up all the SplitMessageReaders.
+ std::vector<std::unique_ptr<SplitMessageReader>> result;
+ for (const std::vector<std::string> &filenames : filenames) {
+ result.emplace_back(std::make_unique<SplitMessageReader>(filenames));
+ }
+ return result;
+}
+} // namespace
+
+ChannelMerger::ChannelMerger(
+ const std::vector<std::vector<std::string>> &filenames)
+ : split_message_readers_(MakeSplitMessageReaders(filenames)),
+ log_file_header_(
+ CopyFlatBuffer(split_message_readers_[0]->log_file_header())) {
+ // Now, confirm that the configuration matches for each and pick a start time.
+ // Also return the list of possible nodes.
+ for (const std::unique_ptr<SplitMessageReader> &reader :
+ split_message_readers_) {
+ CHECK(CompareFlatBuffer(log_file_header_.message().configuration(),
+ reader->log_file_header()->configuration()))
+ << ": Replaying log files with different configurations isn't "
+ "supported";
+ }
+
+ nodes_ = configuration::GetNodes(configuration());
+}
+
+bool ChannelMerger::SetNode(const Node *target_node) {
+ std::vector<SplitMessageReader *> split_message_readers;
+ for (const std::unique_ptr<SplitMessageReader> &reader :
+ split_message_readers_) {
+ split_message_readers.emplace_back(reader.get());
+ }
+
+ // Go find a log_file_header for this node.
+ {
+ bool found_node = false;
+
+ for (const std::unique_ptr<SplitMessageReader> &reader :
+ split_message_readers_) {
+ if (CompareFlatBuffer(reader->node(), target_node)) {
+ if (!found_node) {
+ found_node = true;
+ log_file_header_ = CopyFlatBuffer(reader->log_file_header());
+ } else {
+ // And then make sure all the other files have matching headers.
+ CHECK(
+ CompareFlatBuffer(log_file_header(), reader->log_file_header()));
+ }
+ }
+ }
+
+ if (!found_node) {
+ LOG(WARNING) << "Failed to find log file for node "
+ << FlatbufferToJson(target_node);
+ return false;
+ }
+ }
+
+ // Build up all the timestamp mergers. This connects up all the
+ // SplitMessageReaders.
+ timestamp_mergers_.reserve(configuration()->channels()->size());
+ for (size_t channel_index = 0;
+ channel_index < configuration()->channels()->size(); ++channel_index) {
+ timestamp_mergers_.emplace_back(
+ configuration(), split_message_readers, channel_index,
+ configuration::GetNode(configuration(), target_node), this);
+ }
+
+ // And prime everything.
+ size_t split_message_reader_index = 0;
+ for (std::unique_ptr<SplitMessageReader> &split_message_reader :
+ split_message_readers_) {
+ if (split_message_reader->QueueMessages(
+ split_message_reader->monotonic_start_time())) {
+ split_message_reader_heap_.push_back(std::make_pair(
+ split_message_reader->queue_data_time(), split_message_reader_index));
+
+ std::push_heap(split_message_reader_heap_.begin(),
+ split_message_reader_heap_.end(), ChannelHeapCompare);
+ }
+ ++split_message_reader_index;
+ }
+
+ node_ = configuration::GetNodeOrDie(configuration(), target_node);
+ return true;
+}
+
+monotonic_clock::time_point ChannelMerger::OldestMessage() const {
+ if (channel_heap_.size() == 0u) {
+ return monotonic_clock::max_time;
+ }
+ return channel_heap_.front().first;
+}
+
+void ChannelMerger::PushChannelHeap(monotonic_clock::time_point timestamp,
+ int channel_index) {
+ // Pop and recreate the heap if it has already been pushed. And since we are
+ // pushing again, we don't need to clear pushed.
+ if (timestamp_mergers_[channel_index].pushed()) {
+ channel_heap_.erase(std::find_if(
+ channel_heap_.begin(), channel_heap_.end(),
+ [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
+ return x.second == channel_index;
+ }));
+ std::make_heap(channel_heap_.begin(), channel_heap_.end(),
+ ChannelHeapCompare);
+ }
+
channel_heap_.push_back(std::make_pair(timestamp, channel_index));
// The default sort puts the newest message first. Use a custom comparator to
@@ -335,60 +906,65 @@
ChannelHeapCompare);
}
-void SortedMessageReader::QueueMessages() {
- while (true) {
- // Don't queue if we have enough data already.
- // When a log file starts, there should be a message from each channel.
- // Those messages might be very old. Make sure to read a chunk past the
- // starting time.
- if (channel_heap_.size() > 0 &&
- message_reader_->newest_timestamp() >
- std::max(oldest_message().first, monotonic_start_time()) +
- message_reader_->max_out_of_order_duration()) {
- break;
- }
-
- if (std::optional<FlatbufferVector<MessageHeader>> msg =
- message_reader_->ReadMessage()) {
- EmplaceDataBack(std::move(msg.value()));
- } else {
- if (!NextLogFile()) {
- break;
- }
- }
- }
-}
-
-std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
-SortedMessageReader::PopOldestChannel() {
+std::tuple<TimestampMerger::DeliveryTimestamp, int,
+ FlatbufferVector<MessageHeader>>
+ChannelMerger::PopOldest() {
+ CHECK(channel_heap_.size() > 0);
std::pair<monotonic_clock::time_point, int> oldest_channel_data =
channel_heap_.front();
+ int channel_index = oldest_channel_data.second;
std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
&ChannelHeapCompare);
channel_heap_.pop_back();
+ timestamp_mergers_[channel_index].set_pushed(false);
- struct ChannelData &channel = channels_[oldest_channel_data.second];
+ TimestampMerger *merger = ×tamp_mergers_[channel_index];
- FlatbufferVector<MessageHeader> front = std::move(channel.front());
+ // Merger auto-pushes from here, but doesn't fetch anything new from the log
+ // file.
+ std::tuple<TimestampMerger::DeliveryTimestamp,
+ FlatbufferVector<MessageHeader>>
+ message = merger->PopOldest();
- channel.data.pop_front();
+ QueueMessages(OldestMessage());
- // Re-push it and update the oldest timestamp.
- if (channel.data.size() != 0) {
- const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
- chrono::nanoseconds(channel.front().message().monotonic_sent_time()));
- PushChannelHeap(timestamp, oldest_channel_data.second);
- channel.oldest_timestamp = timestamp;
- } else {
- channel.oldest_timestamp = monotonic_clock::min_time;
+ return std::make_tuple(std::get<0>(message), channel_index,
+ std::move(std::get<1>(message)));
+}
+
+void ChannelMerger::QueueMessages(
+ monotonic_clock::time_point oldest_message_time) {
+ // Pop and re-queue readers until they are all caught up.
+ while (true) {
+ if (split_message_reader_heap_.size() == 0) {
+ return;
+ }
+ std::pair<monotonic_clock::time_point, int> oldest_channel_data =
+ split_message_reader_heap_.front();
+
+ // No work to do, bail.
+ if (oldest_channel_data.first > oldest_message_time) {
+ return;
+ }
+
+ // Drop it off the heap.
+ std::pop_heap(split_message_reader_heap_.begin(),
+ split_message_reader_heap_.end(), &ChannelHeapCompare);
+ split_message_reader_heap_.pop_back();
+
+ // And if there is data left in the log file, push it back on the heap with
+ // the updated time.
+ const int split_message_reader_index = oldest_channel_data.second;
+ if (split_message_readers_[split_message_reader_index]->QueueMessages(
+ oldest_message_time)) {
+ split_message_reader_heap_.push_back(std::make_pair(
+ split_message_readers_[split_message_reader_index]->queue_data_time(),
+ split_message_reader_index));
+
+ std::push_heap(split_message_reader_heap_.begin(),
+ split_message_reader_heap_.end(), ChannelHeapCompare);
+ }
}
-
- if (oldest_channel_data.first > message_reader_->queue_data_time()) {
- QueueMessages();
- }
-
- return std::make_tuple(oldest_channel_data.first, oldest_channel_data.second,
- std::move(front));
}
} // namespace logger
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 5b2bfa6..9a849b2 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -26,10 +26,11 @@
// The message originated on another node. Log it and the delivery times
// together. The message_gateway is responsible for logging any messages
// which didn't get delivered.
- kLogMessageAndDeliveryTime
+ kLogMessageAndDeliveryTime,
+ // The message originated on the other node and should be logged on this node.
+ kLogRemoteMessage
};
-
// This class manages efficiently writing a sequence of detached buffers to a
// file. It queues them up and batches the write operation.
class DetachedBufferWriter {
@@ -37,6 +38,8 @@
DetachedBufferWriter(std::string_view filename);
~DetachedBufferWriter();
+ std::string_view filename() const { return filename_; }
+
// TODO(austin): Snappy compress the log file if it ends with .snappy!
// Queues up a finished FlatBufferBuilder to be written. Steals the detached
@@ -51,6 +54,8 @@
void Flush();
private:
+ const std::string filename_;
+
int fd_ = -1;
// Size of all the data in the queue.
@@ -68,6 +73,8 @@
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, LogType log_type);
+FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename);
+
// Class to read chunks out of a log file.
class SpanReader {
public:
@@ -75,6 +82,8 @@
~SpanReader() { close(fd_); }
+ std::string_view filename() const { return filename_; }
+
// Returns a span with the data for a message from the log file, excluding
// the size.
absl::Span<const uint8_t> ReadMessage();
@@ -92,6 +101,8 @@
// Reads a chunk of data into data_. Returns false if no data was read.
bool ReadBlock();
+ const std::string filename_;
+
// File descriptor for the log file.
int fd_ = -1;
@@ -138,6 +149,8 @@
public:
MessageReader(std::string_view filename);
+ std::string_view filename() const { return span_reader_.filename(); }
+
// Returns the header from the log file.
const LogFileHeader *log_file_header() const {
return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(
@@ -177,52 +190,64 @@
monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
};
-// We need to read a large chunk at a time, then kit it up into parts and
-// sort.
-//
-// We want to read 256 KB chunks at a time. This is the fastest read size.
-// This leaves us with a fragmentation problem though.
-//
-// The easy answer is to read 256 KB chunks. Then, malloc and memcpy those
-// chunks into single flatbuffer messages and manage them in a sorted queue.
-// Everything is copied three times (into 256 kb buffer, then into separate
-// buffer, then into sender), but none of it is all that expensive. We can
-// optimize if it is slow later.
-//
-// As we place the elements in the sorted list of times, keep doing this
-// until we read a message that is newer than the threshold.
-//
-// Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
-// small state machine so we can resume), and keep pulling messages back out
-// and sending.
-//
-// For sorting, we want to use the fact that each channel is sorted, and
-// then merge sort the channels. Have a vector of deques, and then hold a
-// sorted list of pointers to those.
-class SortedMessageReader {
- public:
- SortedMessageReader(const std::vector<std::string> &filenames);
+class TimestampMerger;
- // Returns the header from the log file.
+// A design requirement is that the relevant data for a channel is not more than
+// max_out_of_order_duration out of order. We approach sorting in layers.
+//
+// 1) Split each (maybe chunked) log file into one queue per channel. Read this
+// log file looking for data pertaining to a specific node.
+// (SplitMessageReader)
+// 2) Merge all the data per channel from the different log files into a sorted
+// list of timestamps and messages. (TimestampMerger)
+// 3) Combine the timestamps and messages. (TimestampMerger)
+// 4) Merge all the channels to produce the next message on a node.
+// (ChannelMerger)
+// 5) Duplicate this entire stack per node.
+
+// This class splits messages and timestamps up into a queue per channel, and
+// handles reading data from multiple chunks.
+class SplitMessageReader {
+ public:
+ SplitMessageReader(const std::vector<std::string> &filenames);
+
+ // Sets the TimestampMerger that gets notified for each channel. The node
+ // that the TimestampMerger is merging as needs to be passed in.
+ void SetTimestampMerger(TimestampMerger *timestamp_merger, int channel,
+ const Node *target_node);
+
+ // Returns the (timestamp, queue_idex) for the oldest message in a channel, or
+ // max_time if there is nothing in the channel.
+ std::tuple<monotonic_clock::time_point, uint32_t> oldest_message(
+ int channel) {
+ return channels_[channel].data.front_timestamp();
+ }
+
+ // Returns the (timestamp, queue_index) for the oldest delivery time in a
+ // channel, or max_time if there is nothing in the channel.
+ std::tuple<monotonic_clock::time_point, uint32_t> oldest_message(
+ int channel, int destination_node) {
+ return channels_[channel].timestamps[destination_node].front_timestamp();
+ }
+
+ // Returns the timestamp, queue_index, and message for the oldest data on a
+ // channel. Requeues data as needed.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ PopOldest(int channel_index);
+
+ // Returns the timestamp, queue_index, and message for the oldest timestamp on
+ // a channel delivered to a node. Requeues data as needed.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ PopOldest(int channel, int node_index);
+
+ // Returns the header for the log files.
const LogFileHeader *log_file_header() const {
return &log_file_header_.message();
}
- // Returns a pointer to the channel with the oldest message in it, and the
- // timestamp.
- const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
- return channel_heap_.front();
- }
-
- // Returns the number of channels with data still in them.
- size_t active_channel_count() const { return channel_heap_.size(); }
-
- // Returns the configuration from the log file header.
- const Configuration *configuration() const {
- return log_file_header()->configuration();
- }
-
- // Returns the start time on both the monotonic and realtime clocks.
+ // Returns the starting time for this set of log files.
monotonic_clock::time_point monotonic_start_time() {
return monotonic_clock::time_point(
std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
@@ -232,74 +257,304 @@
std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
}
+ // Returns the configuration from the log file header.
+ const Configuration *configuration() const {
+ return log_file_header()->configuration();
+ }
+
// Returns the node who's point of view this log file is from. Make sure this
// is a pointer in the configuration() nodes list so it can be consumed
// elsewhere.
const Node *node() const {
if (configuration()->has_nodes()) {
- CHECK(log_file_header()->has_node());
- CHECK(log_file_header()->node()->has_name());
- return configuration::GetNode(
- configuration(), log_file_header()->node()->name()->string_view());
+ return configuration::GetNodeOrDie(configuration(),
+ log_file_header()->node());
} else {
CHECK(!log_file_header()->has_node());
return nullptr;
}
}
- // Pops a pointer to the channel with the oldest message in it, and the
- // timestamp.
- std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
- PopOldestChannel();
+ // Returns the timestamp of the newest message read from the log file, and the
+ // timestamp that we need to re-queue data.
+ monotonic_clock::time_point newest_timestamp() const {
+ return message_reader_->newest_timestamp();
+ }
+ monotonic_clock::time_point queue_data_time() const {
+ return message_reader_->queue_data_time();
+ }
+
+
+ // Adds more messages to the sorted list. This reads enough data such that
+ // oldest_message_time can be replayed safely. Returns false if the log file
+ // has all been read.
+ bool QueueMessages(monotonic_clock::time_point oldest_message_time);
private:
+ // TODO(austin): Need to copy or refcount the message instead of running
+ // multiple copies of the reader. Or maybe have a "as_node" index and hide it
+ // inside.
+
// Moves to the next log file in the list.
bool NextLogFile();
- // Adds more messages to the sorted list.
- void QueueMessages();
+ // Filenames of the log files.
+ std::vector<std::string> filenames_;
+ // And the index of the next file to open.
+ size_t next_filename_index_ = 0;
- // Moves the message to the correct channel queue.
- void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
-
- // Pushes a pointer to the channel for the given timestamp to the sorted
- // channel list.
- void PushChannelHeap(monotonic_clock::time_point timestamp,
- int channel_index);
-
+ // Log file header to report. This is a copy.
+ FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
+ // Current log file being read.
+ std::unique_ptr<MessageReader> message_reader_;
// Datastructure to hold the list of messages, cached timestamp for the
// oldest message, and sender to send with.
- struct ChannelData {
- monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
- std::deque<FlatbufferVector<MessageHeader>> data;
- std::unique_ptr<RawSender> raw_sender;
+ struct MessageHeaderQueue {
+ // If true, this is a timestamp queue.
+ bool timestamps = false;
- // Returns the oldest message.
- const FlatbufferVector<MessageHeader> &front() { return data.front(); }
-
- // Returns the timestamp for the oldest message.
- const monotonic_clock::time_point front_timestamp() {
- return monotonic_clock::time_point(
- std::chrono::nanoseconds(front().message().monotonic_sent_time()));
+ // Returns a reference to the the oldest message.
+ FlatbufferVector<MessageHeader> &front() {
+ CHECK_GT(data_.size(), 0u);
+ return data_.front();
}
+
+ // Adds a message to the back of the queue.
+ void emplace_back(FlatbufferVector<MessageHeader> &&msg);
+
+ // Drops the front message. Invalidates the front() reference.
+ void pop_front();
+
+ // The size of the queue.
+ size_t size() { return data_.size(); }
+
+ // Returns the (timestamp, queue_index) for the oldest message.
+ const std::tuple<monotonic_clock::time_point, uint32_t> front_timestamp() {
+ CHECK_GT(data_.size(), 0u);
+ return std::make_tuple(
+ monotonic_clock::time_point(std::chrono::nanoseconds(
+ front().message().monotonic_sent_time())),
+ front().message().queue_index());
+ }
+
+ // Pointer to the timestamp merger for this queue if available.
+ TimestampMerger *timestamp_merger = nullptr;
+ // Pointer to the reader which feeds this queue.
+ SplitMessageReader *split_reader = nullptr;
+
+ private:
+ // The data.
+ std::deque<FlatbufferVector<MessageHeader>> data_;
};
- std::vector<std::string> filenames_;
- size_t next_filename_index_ = 0;
+ // All the queues needed for a channel. There isn't going to be data in all
+ // of these.
+ struct ChannelData {
+ // The data queue for the channel.
+ MessageHeaderQueue data;
+ // Queues for timestamps for each node.
+ std::vector<MessageHeaderQueue> timestamps;
+ };
- FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
- std::unique_ptr<MessageReader> message_reader_;
-
- // TODO(austin): Multithreaded read at some point. Gotta go faster!
- // Especially if we start compressing.
-
- // List of channels and messages for them.
+ // Data for all the channels.
std::vector<ChannelData> channels_;
- // Heap of channels so we can track which channel to send next.
+ // Once we know the node that this SplitMessageReader will be writing as,
+ // there will be only one MessageHeaderQueue that a specific channel matches.
+ // Precompute this here for efficiency.
+ std::vector<MessageHeaderQueue *> channels_to_write_;
+
+ // Number of messages queued.
+ size_t queued_messages_ = 0;
+};
+
+class ChannelMerger;
+
+// Sorts channels (and timestamps) from multiple log files for a single channel.
+class TimestampMerger {
+ public:
+ TimestampMerger(const Configuration *configuration,
+ std::vector<SplitMessageReader *> split_message_readers,
+ int channel_index, const Node *target_node,
+ ChannelMerger *channel_merger);
+
+ // Metadata used to schedule the message.
+ struct DeliveryTimestamp {
+ monotonic_clock::time_point monotonic_event_time =
+ monotonic_clock::min_time;
+ realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
+
+ monotonic_clock::time_point monotonic_remote_time =
+ monotonic_clock::min_time;
+ realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
+ uint32_t remote_queue_index = 0xffffffff;
+ };
+
+ // Pushes SplitMessageReader onto the timestamp heap. This should only be
+ // called when timestamps are placed in the channel this class is merging for
+ // the reader.
+ void UpdateTimestamp(
+ SplitMessageReader *split_message_reader,
+ std::tuple<monotonic_clock::time_point, uint32_t> oldest_message_time) {
+ PushTimestampHeap(oldest_message_time, split_message_reader);
+ }
+ // Pushes SplitMessageReader onto the message heap. This should only be
+ // called when data is placed in the channel this class is merging for the
+ // reader.
+ void Update(
+ SplitMessageReader *split_message_reader,
+ std::tuple<monotonic_clock::time_point, uint32_t> oldest_message_time) {
+ PushMessageHeap(oldest_message_time, split_message_reader);
+ }
+
+ // Returns the oldest combined timestamp and data for this channel.
+ std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
+
+ // Tracks if the channel merger has pushed this onto it's heap or not.
+ bool pushed() { return pushed_; }
+ // Sets if this has been pushed to the channel merger heap. Should only be
+ // called by the channel merger.
+ void set_pushed(bool pushed) { pushed_ = pushed; }
+
+ private:
+ // Pushes messages and timestamps to the corresponding heaps.
+ void PushMessageHeap(
+ std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+ SplitMessageReader *split_message_reader);
+ void PushTimestampHeap(
+ std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+ SplitMessageReader *split_message_reader);
+
+ // Pops a message from the message heap. This automatically triggers the
+ // split message reader to re-fetch any new data.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ PopMessageHeap();
+ // Pops a message from the timestamp heap. This automatically triggers the
+ // split message reader to re-fetch any new data.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ PopTimestampHeap();
+
+ const Configuration *configuration_;
+
+ // If true, this is a forwarded channel and timestamps should be matched.
+ bool has_timestamps_ = false;
+
+ // Tracks if the ChannelMerger has pushed this onto it's queue.
+ bool pushed_ = false;
+
+ // The split message readers used for source data.
+ std::vector<SplitMessageReader *> split_message_readers_;
+
+ // The channel to merge.
+ int channel_index_;
+
+ // Our node.
+ int node_index_;
+
+ // Heaps for messages and timestamps.
+ std::vector<
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
+ message_heap_;
+ std::vector<
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
+ timestamp_heap_;
+
+ // Parent channel merger.
+ ChannelMerger *channel_merger_;
+};
+
+// This class handles constructing all the split message readers, channel
+// mergers, and combining the results.
+class ChannelMerger {
+ public:
+ // Builds a ChannelMerger around a set of log files. These are of the format:
+ // {
+ // {log1_part0, log1_part1, ...},
+ // {log2}
+ // }
+ // The inner vector is a list of log file chunks which form up a log file.
+ // The outer vector is a list of log files with subsets of the messages, or
+ // messages from different nodes.
+ ChannelMerger(const std::vector<std::vector<std::string>> &filenames);
+
+ // Returns the nodes that we know how to merge.
+ const std::vector<const Node *> nodes() const;
+ // Sets the node that we will return messages as. Returns true if the node
+ // has log files and will produce data. This can only be called once, and
+ // will likely corrupt state if called a second time.
+ bool SetNode(const Node *target_node);
+
+ // Everything else needs the node set before it works.
+
+ // Returns a timestamp for the oldest message in this group of logfiles.
+ monotonic_clock::time_point OldestMessage() const;
+ // Pops the oldest message.
+ std::tuple<TimestampMerger::DeliveryTimestamp, int,
+ FlatbufferVector<MessageHeader>>
+ PopOldest();
+
+ // Returns the config for this set of log files.
+ const Configuration *configuration() const {
+ return log_file_header()->configuration();
+ }
+
+ const LogFileHeader *log_file_header() const {
+ return &log_file_header_.message();
+ }
+
+ // Returns the start times for the configured node's log files.
+ monotonic_clock::time_point monotonic_start_time() {
+ return monotonic_clock::time_point(
+ std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
+ }
+ realtime_clock::time_point realtime_start_time() {
+ return realtime_clock::time_point(
+ std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
+ }
+
+ // Returns the node set by SetNode above.
+ const Node *node() const { return node_; }
+
+ // Called by the TimestampMerger when new data is available with the provided
+ // timestamp and channel_index.
+ void Update(monotonic_clock::time_point timestamp, int channel_index) {
+ PushChannelHeap(timestamp, channel_index);
+ }
+
+ private:
+ // Queues messages from each SplitMessageReader until enough data is queued
+ // such that we can guarentee all sorting has happened.
+ void QueueMessages(monotonic_clock::time_point oldest_message_time);
+
+ // Pushes the timestamp for new data on the provided channel.
+ void PushChannelHeap(monotonic_clock::time_point timestamp,
+ int channel_index);
+
+ // All the message readers.
+ std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
+
+ // The log header we are claiming to be.
+ FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
+
+ // The timestamp mergers which combine data from the split message readers.
+ std::vector<TimestampMerger> timestamp_mergers_;
+
+ // A heap of the channel readers and timestamps for the oldest data in each.
std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
+ // This holds a heap of split_message_readers sorted by the time at which they
+ // need to have QueueMessages called on them.
+ std::vector<std::pair<monotonic_clock::time_point, int>>
+ split_message_reader_heap_;
+
+ // Configured node.
+ const Node *node_;
+
+ // Cached copy of the list of nodes.
+ std::vector<const Node *> nodes_;
};
} // namespace logger
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index c501d7b..84e1f43 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -26,12 +26,22 @@
Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
std::chrono::milliseconds polling_period)
+ : Logger(std::make_unique<LocalLogNamer>(writer, event_loop->node()),
+ event_loop, polling_period) {}
+
+Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
+ std::chrono::milliseconds polling_period)
: event_loop_(event_loop),
- writer_(writer),
+ log_namer_(std::move(log_namer)),
timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
polling_period_(polling_period) {
+ VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
+ int channel_index = 0;
for (const Channel *channel : *event_loop_->configuration()->channels()) {
FetcherStruct fs;
+ const bool is_local =
+ configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
+
const bool is_readable =
configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
@@ -50,28 +60,21 @@
<< configuration::CleanedChannelToString(channel);
if (log_delivery_times) {
- if (log_message) {
- VLOG(1) << " Logging message and delivery times";
- fs.log_type = LogType::kLogMessageAndDeliveryTime;
- } else {
- VLOG(1) << " Logging delivery times only";
- fs.log_type = LogType::kLogDeliveryTimeOnly;
- }
- } else {
- // We don't have a particularly great use case right now for logging a
- // forwarded message, but either not logging the delivery times, or
- // logging them on another node. Fail rather than produce bad results.
- CHECK(configuration::ChannelIsSendableOnNode(channel,
- event_loop_->node()))
- << ": Logger only knows how to log remote messages with "
- "forwarding timestamps.";
- VLOG(1) << " Logging message only";
- fs.log_type = LogType::kLogMessage;
+ VLOG(1) << " Delivery times";
+ fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
}
+ if (log_message) {
+ VLOG(1) << " Data";
+ fs.writer = log_namer_->MakeWriter(channel);
+ if (!is_local) {
+ fs.log_type = LogType::kLogRemoteMessage;
+ }
+ }
+ fs.channel_index = channel_index;
+ fs.written = false;
+ fetchers_.emplace_back(std::move(fs));
}
-
- fs.written = false;
- fetchers_.emplace_back(std::move(fs));
+ ++channel_index;
}
// When things start, we want to log the header, then the most recent messages
@@ -82,9 +85,7 @@
// so we can capture the latest message on each channel. This lets us have
// non periodic messages with configuration that now get logged.
for (FetcherStruct &f : fetchers_) {
- if (f.fetcher.get() != nullptr) {
- f.written = !f.fetcher->Fetch();
- }
+ f.written = !f.fetcher->Fetch();
}
// We need to pick a point in time to declare the log file "started". This
@@ -105,6 +106,11 @@
}
void Logger::WriteHeader() {
+ for (const Node *node : log_namer_->nodes()) {
+ WriteHeader(node);
+ }
+}
+void Logger::WriteHeader(const Node *node) {
// Now write the header with this timestamp in it.
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(1);
@@ -117,7 +123,7 @@
flatbuffers::Offset<Node> node_offset;
if (event_loop_->node() != nullptr) {
- node_offset = CopyFlatBuffer(event_loop_->node(), &fbb);
+ node_offset = CopyFlatBuffer(node, &fbb);
}
aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
@@ -125,7 +131,7 @@
log_file_header_builder.add_name(string_offset);
// Only add the node if we are running in a multinode configuration.
- if (event_loop_->node() != nullptr) {
+ if (node != nullptr) {
log_file_header_builder.add_node(node_offset);
}
@@ -149,15 +155,32 @@
.count());
fbb.FinishSizePrefixed(log_file_header_builder.Finish());
- writer_->QueueSizedFlatbuffer(&fbb);
+ log_namer_->WriteHeader(&fbb, node);
}
void Logger::Rotate(DetachedBufferWriter *writer) {
+ Rotate(std::make_unique<LocalLogNamer>(writer, event_loop_->node()));
+}
+
+void Logger::Rotate(std::unique_ptr<LogNamer> log_namer) {
// Force data up until now to be written.
DoLogData();
// Swap the writer out, and re-write the header.
- writer_ = writer;
+ log_namer_ = std::move(log_namer);
+
+ // And then update the writers.
+ for (FetcherStruct &f : fetchers_) {
+ const Channel *channel =
+ event_loop_->configuration()->channels()->Get(f.channel_index);
+ if (f.timestamp_writer != nullptr) {
+ f.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
+ }
+ if (f.writer != nullptr) {
+ f.writer = log_namer_->MakeWriter(channel);
+ }
+ }
+
WriteHeader();
}
@@ -173,61 +196,81 @@
// per iteration, even if it is small.
last_synchronized_time_ =
std::min(last_synchronized_time_ + polling_period_, monotonic_now);
- size_t channel_index = 0;
// Write each channel to disk, one at a time.
for (FetcherStruct &f : fetchers_) {
- // Skip any channels which we aren't supposed to log.
- if (f.fetcher.get() != nullptr) {
- while (true) {
- if (f.written) {
- if (!f.fetcher->FetchNext()) {
- VLOG(2) << "No new data on "
- << configuration::CleanedChannelToString(
- f.fetcher->channel());
- break;
- } else {
- f.written = false;
- }
+ while (true) {
+ if (f.written) {
+ if (!f.fetcher->FetchNext()) {
+ VLOG(2) << "No new data on "
+ << configuration::CleanedChannelToString(
+ f.fetcher->channel());
+ break;
+ } else {
+ f.written = false;
}
+ }
- CHECK(!f.written);
+ CHECK(!f.written);
- // TODO(james): Write tests to exercise this logic.
- if (f.fetcher->context().monotonic_event_time <
- last_synchronized_time_) {
+ // TODO(james): Write tests to exercise this logic.
+ if (f.fetcher->context().monotonic_event_time <
+ last_synchronized_time_) {
+ if (f.writer != nullptr) {
// Write!
flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
max_header_size_);
fbb.ForceDefaults(1);
fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- channel_index, f.log_type));
+ f.channel_index, f.log_type));
- VLOG(2) << "Writing data for channel "
+ VLOG(1) << "Writing data as node "
+ << FlatbufferToJson(event_loop_->node()) << " for channel "
<< configuration::CleanedChannelToString(
- f.fetcher->channel());
+ f.fetcher->channel())
+ << " to " << f.writer->filename()
+ << " data "
+ << FlatbufferToJson(
+ flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
max_header_size_ = std::max(
max_header_size_, fbb.GetSize() - f.fetcher->context().size);
- writer_->QueueSizedFlatbuffer(&fbb);
-
- f.written = true;
- } else {
- break;
+ f.writer->QueueSizedFlatbuffer(&fbb);
}
+
+ if (f.timestamp_writer != nullptr) {
+ // And now handle timestamps.
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(1);
+
+ fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+ f.channel_index,
+ LogType::kLogDeliveryTimeOnly));
+
+ VLOG(1) << "Writing timestamps as node "
+ << FlatbufferToJson(event_loop_->node()) << " for channel "
+ << configuration::CleanedChannelToString(
+ f.fetcher->channel())
+ << " to " << f.timestamp_writer->filename()
+ << " timestamp "
+ << FlatbufferToJson(
+ flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
+
+ f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
+ }
+
+ f.written = true;
+ } else {
+ break;
}
}
-
- ++channel_index;
}
- CHECK_EQ(channel_index, fetchers_.size());
-
// If we missed cycles, we could be pretty far behind. Spin until we are
// caught up.
} while (last_synchronized_time_ + polling_period_ < monotonic_now);
-
- writer_->Flush();
}
LogReader::LogReader(std::string_view filename,
@@ -237,41 +280,58 @@
LogReader::LogReader(const std::vector<std::string> &filenames,
const Configuration *replay_configuration)
- : sorted_message_reader_(filenames),
+ : LogReader(std::vector<std::vector<std::string>>{filenames},
+ replay_configuration) {}
+
+LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
+ const Configuration *replay_configuration)
+ : filenames_(filenames),
+ log_file_header_(ReadHeader(filenames[0][0])),
replay_configuration_(replay_configuration) {
- channels_.resize(logged_configuration()->channels()->size());
MakeRemappedConfig();
+
+ if (!configuration::MultiNode(configuration())) {
+ auto it = channel_mergers_.insert(std::make_pair(nullptr, State{}));
+ State *state = &(it.first->second);
+
+ state->channel_merger = std::make_unique<ChannelMerger>(filenames);
+ }
}
LogReader::~LogReader() { Deregister(); }
const Configuration *LogReader::logged_configuration() const {
- return sorted_message_reader_.configuration();
+ return log_file_header_.message().configuration();
}
const Configuration *LogReader::configuration() const {
return remapped_configuration_;
}
-const Node *LogReader::node() const {
+std::vector<const Node *> LogReader::Nodes() const {
// Because the Node pointer will only be valid if it actually points to memory
// owned by remapped_configuration_, we need to wait for the
// remapped_configuration_ to be populated before accessing it.
+ //
+ // Also, note, that when ever a map is changed, the nodes in here are
+ // invalidated.
CHECK(remapped_configuration_ != nullptr)
<< ": Need to call Register before the node() pointer will be valid.";
- if (sorted_message_reader_.node() == nullptr) {
- return nullptr;
- }
- return configuration::GetNode(
- configuration(), sorted_message_reader_.node()->name()->string_view());
+ return configuration::GetNodes(remapped_configuration_);
}
-monotonic_clock::time_point LogReader::monotonic_start_time() {
- return sorted_message_reader_.monotonic_start_time();
+monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
+ auto it = channel_mergers_.find(node);
+ CHECK(it != channel_mergers_.end())
+ << ": Unknown node " << FlatbufferToJson(node);
+ return it->second.channel_merger->monotonic_start_time();
}
-realtime_clock::time_point LogReader::realtime_start_time() {
- return sorted_message_reader_.realtime_start_time();
+realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
+ auto it = channel_mergers_.find(node);
+ CHECK(it != channel_mergers_.end())
+ << ": Unknown node " << FlatbufferToJson(node);
+ return it->second.channel_merger->realtime_start_time();
}
void LogReader::Register() {
@@ -282,126 +342,156 @@
void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
event_loop_factory_ = event_loop_factory;
- node_event_loop_factory_ =
- event_loop_factory_->GetNodeEventLoopFactory(node());
- event_loop_unique_ptr_ =
- event_loop_factory->MakeEventLoop("log_reader", node());
- // We don't run timing reports when trying to print out logged data, because
- // otherwise we would end up printing out the timing reports themselves...
- // This is only really relevant when we are replaying into a simulation.
- event_loop_unique_ptr_->SkipTimingReport();
+ // We want to start the log file at the last start time of the log files from
+ // all the nodes. Compute how long each node's simulation needs to run to
+ // move time to this point.
+ monotonic_clock::duration run_time = monotonic_clock::duration(0);
- Register(event_loop_unique_ptr_.get());
- event_loop_factory_->RunFor(monotonic_start_time() -
- event_loop_->monotonic_now());
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ auto it = channel_mergers_.insert(std::make_pair(node, State{}));
+
+ State *state = &(it.first->second);
+
+ state->channel_merger = std::make_unique<ChannelMerger>(filenames_);
+
+ state->node_event_loop_factory =
+ event_loop_factory_->GetNodeEventLoopFactory(node);
+ state->event_loop_unique_ptr =
+ event_loop_factory->MakeEventLoop("log_reader", node);
+
+ Register(state->event_loop_unique_ptr.get());
+
+ const monotonic_clock::duration startup_time =
+ state->channel_merger->monotonic_start_time() -
+ state->event_loop->monotonic_now();
+ if (startup_time > run_time) {
+ run_time = startup_time;
+ }
+ }
+
+ // Forwarding is tracked per channel. If it is enabled, we want to turn it
+ // off. Otherwise messages replayed will get forwarded across to the other
+ // nodes, and also replayed on the other nodes. This may not satisfy all our
+ // users, but it'll start the discussion.
+ if (configuration::MultiNode(event_loop_factory_->configuration())) {
+ for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
+ const Channel *channel = logged_configuration()->channels()->Get(i);
+ const Node *node = configuration::GetNode(
+ configuration(), channel->source_node()->string_view());
+
+ auto state_pair = channel_mergers_.find(node);
+ CHECK(state_pair != channel_mergers_.end());
+ State *state = &(state_pair->second);
+
+ const Channel *remapped_channel =
+ RemapChannel(state->event_loop, channel);
+
+ event_loop_factory_->DisableForwarding(remapped_channel);
+ }
+ }
+
+ event_loop_factory_->RunFor(run_time);
}
void LogReader::Register(EventLoop *event_loop) {
- event_loop_ = event_loop;
+ auto state_pair = channel_mergers_.find(event_loop->node());
+ CHECK(state_pair != channel_mergers_.end());
+ State *state = &(state_pair->second);
+
+ state->event_loop = event_loop;
// We don't run timing reports when trying to print out logged data, because
// otherwise we would end up printing out the timing reports themselves...
// This is only really relevant when we are replaying into a simulation.
- // Otherwise we replay the timing report and try to resend it...
- event_loop_->SkipTimingReport();
- event_loop_->SkipAosLog();
+ event_loop->SkipTimingReport();
+ event_loop->SkipAosLog();
- for (size_t i = 0; i < channels_.size(); ++i) {
- const Channel *const original_channel =
- logged_configuration()->channels()->Get(i);
+ state->channel_merger->SetNode(event_loop->node());
- std::string_view channel_name = original_channel->name()->string_view();
- std::string_view channel_type = original_channel->type()->string_view();
- // If the channel is remapped, find the correct channel name to use.
- if (remapped_channels_.count(i) > 0) {
- VLOG(2) << "Got remapped channel on "
- << configuration::CleanedChannelToString(original_channel);
- channel_name = remapped_channels_[i];
- }
+ state->channels.resize(logged_configuration()->channels()->size());
- VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type;
- const Channel *channel = configuration::GetChannel(
- event_loop_->configuration(), channel_name, channel_type,
- event_loop_->name(), event_loop_->node());
+ for (size_t i = 0; i < state->channels.size(); ++i) {
+ const Channel *channel =
+ RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
- CHECK(channel != nullptr)
- << ": Unable to send {\"name\": \"" << channel_name
- << "\", \"type\": \"" << channel_type
- << "\"} because it is not in the provided configuration.";
-
- channels_[i] = event_loop_->MakeRawSender(channel);
+ state->channels[i] = event_loop->MakeRawSender(channel);
}
- timer_handler_ = event_loop_->AddTimer([this]() {
- if (sorted_message_reader_.active_channel_count() == 0u) {
- event_loop_factory_->Exit();
+ state->timer_handler = event_loop->AddTimer([this, state]() {
+ if (state->channel_merger->OldestMessage() == monotonic_clock::max_time) {
+ --live_nodes_;
+ if (live_nodes_ == 0) {
+ event_loop_factory_->Exit();
+ }
return;
}
- monotonic_clock::time_point channel_timestamp;
+ TimestampMerger::DeliveryTimestamp channel_timestamp;
int channel_index;
FlatbufferVector<MessageHeader> channel_data =
FlatbufferVector<MessageHeader>::Empty();
std::tie(channel_timestamp, channel_index, channel_data) =
- sorted_message_reader_.PopOldestChannel();
+ state->channel_merger->PopOldest();
const monotonic_clock::time_point monotonic_now =
- event_loop_->context().monotonic_event_time;
- CHECK(monotonic_now == channel_timestamp)
+ state->event_loop->context().monotonic_event_time;
+ CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
<< ": Now " << monotonic_now.time_since_epoch().count()
- << " trying to send " << channel_timestamp.time_since_epoch().count();
+ << " trying to send "
+ << channel_timestamp.monotonic_event_time.time_since_epoch().count();
- if (channel_timestamp > monotonic_start_time() ||
+ if (channel_timestamp.monotonic_event_time >
+ state->channel_merger->monotonic_start_time() ||
event_loop_factory_ != nullptr) {
if (!FLAGS_skip_missing_forwarding_entries ||
channel_data.message().data() != nullptr) {
CHECK(channel_data.message().data() != nullptr)
<< ": Got a message without data. Forwarding entry which was "
- "not "
- "matched? Use --skip_missing_forwarding_entries to ignore "
+ "not matched? Use --skip_missing_forwarding_entries to ignore "
"this.";
// If we have access to the factory, use it to fix the realtime time.
- if (node_event_loop_factory_ != nullptr) {
- node_event_loop_factory_->SetRealtimeOffset(
- monotonic_clock::time_point(chrono::nanoseconds(
- channel_data.message().monotonic_sent_time())),
- realtime_clock::time_point(chrono::nanoseconds(
- channel_data.message().realtime_sent_time())));
+ if (state->node_event_loop_factory != nullptr) {
+ state->node_event_loop_factory->SetRealtimeOffset(
+ channel_timestamp.monotonic_event_time,
+ channel_timestamp.realtime_event_time);
}
- channels_[channel_index]->Send(
+ state->channels[channel_index]->Send(
channel_data.message().data()->Data(),
channel_data.message().data()->size(),
- monotonic_clock::time_point(chrono::nanoseconds(
- channel_data.message().monotonic_remote_time())),
- realtime_clock::time_point(chrono::nanoseconds(
- channel_data.message().realtime_remote_time())),
- channel_data.message().remote_queue_index());
+ channel_timestamp.monotonic_remote_time,
+ channel_timestamp.realtime_remote_time,
+ channel_timestamp.remote_queue_index);
}
} else {
- LOG(WARNING) << "Not sending data from before the start of the log file. "
- << channel_timestamp.time_since_epoch().count() << " start "
- << monotonic_start_time().time_since_epoch().count() << " "
- << FlatbufferToJson(channel_data);
+ LOG(WARNING)
+ << "Not sending data from before the start of the log file. "
+ << channel_timestamp.monotonic_event_time.time_since_epoch().count()
+ << " start " << monotonic_start_time().time_since_epoch().count()
+ << " " << FlatbufferToJson(channel_data);
}
- if (sorted_message_reader_.active_channel_count() > 0u) {
- timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
+ const monotonic_clock::time_point next_time =
+ state->channel_merger->OldestMessage();
+ if (next_time != monotonic_clock::max_time) {
+ state->timer_handler->Setup(next_time);
} else {
// Set a timer up immediately after now to die. If we don't do this, then
// the senders waiting on the message we just read will never get called.
if (event_loop_factory_ != nullptr) {
- timer_handler_->Setup(monotonic_now +
- event_loop_factory_->send_delay() +
- std::chrono::nanoseconds(1));
+ state->timer_handler->Setup(monotonic_now +
+ event_loop_factory_->send_delay() +
+ std::chrono::nanoseconds(1));
}
}
});
- if (sorted_message_reader_.active_channel_count() > 0u) {
- event_loop_->OnRun([this]() {
- timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
+ ++live_nodes_;
+
+ if (state->channel_merger->OldestMessage() != monotonic_clock::max_time) {
+ event_loop->OnRun([state]() {
+ state->timer_handler->Setup(state->channel_merger->OldestMessage());
});
}
}
@@ -409,15 +499,20 @@
void LogReader::Deregister() {
// Make sure that things get destroyed in the correct order, rather than
// relying on getting the order correct in the class definition.
- for (size_t i = 0; i < channels_.size(); ++i) {
- channels_[i].reset();
+ for (const Node *node : Nodes()) {
+ auto state_pair = channel_mergers_.find(node);
+ CHECK(state_pair != channel_mergers_.end());
+ State *state = &(state_pair->second);
+ for (size_t i = 0; i < state->channels.size(); ++i) {
+ state->channels[i].reset();
+ }
+ state->event_loop_unique_ptr.reset();
+ state->event_loop = nullptr;
+ state->node_event_loop_factory = nullptr;
}
- event_loop_unique_ptr_.reset();
- event_loop_ = nullptr;
event_loop_factory_unique_ptr_.reset();
event_loop_factory_ = nullptr;
- node_event_loop_factory_ = nullptr;
}
void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
@@ -442,13 +537,15 @@
}
void LogReader::MakeRemappedConfig() {
- CHECK(!event_loop_)
- << ": Can't change the mapping after the events are scheduled.";
+ for (std::pair<const Node *const, State> &state : channel_mergers_) {
+ CHECK(!state.second.event_loop)
+ << ": Can't change the mapping after the events are scheduled.";
+ }
// If no remapping occurred and we are using the original config, then there
// is nothing interesting to do here.
if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
- remapped_configuration_ = sorted_message_reader_.configuration();
+ remapped_configuration_ = logged_configuration();
return;
}
// Config to copy Channel definitions from. Use the specified
@@ -526,5 +623,30 @@
remapped_configuration_ = &remapped_configuration_buffer_->message();
}
+const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
+ const Channel *channel) {
+ std::string_view channel_name = channel->name()->string_view();
+ std::string_view channel_type = channel->type()->string_view();
+ const int channel_index =
+ configuration::ChannelIndex(logged_configuration(), channel);
+ // If the channel is remapped, find the correct channel name to use.
+ if (remapped_channels_.count(channel_index) > 0) {
+ VLOG(2) << "Got remapped channel on "
+ << configuration::CleanedChannelToString(channel);
+ channel_name = remapped_channels_[channel_index];
+ }
+
+ VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type;
+ const Channel *remapped_channel = configuration::GetChannel(
+ event_loop->configuration(), channel_name, channel_type,
+ event_loop->name(), event_loop->node());
+
+ CHECK(remapped_channel != nullptr)
+ << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
+ << channel_type << "\"} because it is not in the provided configuration.";
+
+ return remapped_channel;
+}
+
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 54b55d8..5e3ca52 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -2,8 +2,8 @@
#define AOS_EVENTS_LOGGER_H_
#include <deque>
-#include <vector>
#include <string_view>
+#include <vector>
#include "absl/types/span.h"
#include "aos/events/event_loop.h"
@@ -16,6 +16,164 @@
namespace aos {
namespace logger {
+class LogNamer {
+ public:
+ LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
+ virtual ~LogNamer() {}
+
+ virtual void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
+ const Node *node) = 0;
+ virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
+
+ virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
+ const std::vector<const Node *> &nodes() const { return nodes_; }
+
+ const Node *node() const { return node_; }
+
+ protected:
+ const Node *const node_;
+ std::vector<const Node *> nodes_;
+};
+
+class LocalLogNamer : public LogNamer {
+ public:
+ LocalLogNamer(DetachedBufferWriter *writer, const Node *node)
+ : LogNamer(node), writer_(writer) {}
+
+ ~LocalLogNamer() override { writer_->Flush(); }
+
+ void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
+ const Node *node) override {
+ CHECK_EQ(node, this->node());
+ writer_->WriteSizedFlatbuffer(
+ absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
+ }
+
+ DetachedBufferWriter *MakeWriter(const Channel *channel) override {
+ CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
+ return writer_;
+ }
+
+ DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
+ CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
+ << ": Message is not delivered to this node.";
+ CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
+ CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
+ node_))
+ << ": Delivery times aren't logged for this channel on this node.";
+ return writer_;
+ }
+
+ private:
+ DetachedBufferWriter *writer_;
+};
+
+// TODO(austin): Split naming files from making files so we can re-use the
+// naming code to predict the log file names for a provided base name.
+class MultiNodeLogNamer : public LogNamer {
+ public:
+ MultiNodeLogNamer(std::string_view base_name,
+ const Configuration *configuration, const Node *node)
+ : LogNamer(node),
+ base_name_(base_name),
+ configuration_(configuration),
+ data_writer_(std::make_unique<DetachedBufferWriter>(absl::StrCat(
+ base_name_, "_", node->name()->string_view(), "_data.bfbs"))) {}
+
+ // Writes the header to all log files for a specific node. This function
+ // needs to be called after all the writers are created.
+ void WriteHeader(flatbuffers::FlatBufferBuilder *fbb, const Node *node) {
+ if (node == this->node()) {
+ data_writer_->WriteSizedFlatbuffer(
+ absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
+ } else {
+ for (std::pair<const Channel *const,
+ std::unique_ptr<DetachedBufferWriter>> &data_writer :
+ data_writers_) {
+ if (configuration::ChannelIsSendableOnNode(data_writer.first, node)) {
+ data_writer.second->WriteSizedFlatbuffer(absl::Span<const uint8_t>(
+ fbb->GetBufferPointer(), fbb->GetSize()));
+ }
+ }
+ }
+ }
+
+ // Makes a data logger for a specific channel.
+ DetachedBufferWriter *MakeWriter(const Channel *channel) {
+ // See if we can read the data on this node at all.
+ const bool is_readable =
+ configuration::ChannelIsReadableOnNode(channel, this->node());
+ if (!is_readable) {
+ return nullptr;
+ }
+
+ // Then, see if we are supposed to log the data here.
+ const bool log_message =
+ configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
+
+ if (!log_message) {
+ return nullptr;
+ }
+
+ // Now, sort out if this is data generated on this node, or not. It is
+ // generated if it is sendable on this node.
+ if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
+ return data_writer_.get();
+ } else {
+ // Ok, we have data that is being forwarded to us that we are supposed to
+ // log. It needs to be logged with send timestamps, but be sorted enough
+ // to be able to be processed.
+ CHECK(data_writers_.find(channel) == data_writers_.end());
+
+ // Track that this node is being logged.
+ if (configuration::MultiNode(configuration_)) {
+ const Node *source_node = configuration::GetNode(
+ configuration_, channel->source_node()->string_view());
+ if (std::find(nodes_.begin(), nodes_.end(), source_node) ==
+ nodes_.end()) {
+ nodes_.emplace_back(source_node);
+ }
+ }
+
+ return data_writers_
+ .insert(std::make_pair(
+ channel,
+ std::make_unique<DetachedBufferWriter>(absl::StrCat(
+ base_name_, "_", channel->source_node()->string_view(),
+ "_data", channel->name()->string_view(), "/",
+ channel->type()->string_view(), ".bfbs"))))
+ .first->second.get();
+ }
+ }
+
+ // Makes a timestamp (or timestamp and data) logger for a channel and
+ // forwarding connection.
+ DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) {
+ const bool log_delivery_times =
+ (this->node() == nullptr)
+ ? false
+ : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ channel, this->node(), this->node());
+ if (!log_delivery_times) {
+ return nullptr;
+ }
+
+ return data_writer_.get();
+ }
+
+ const std::vector<const Node *> &nodes() const { return nodes_; }
+
+ private:
+ const std::string base_name_;
+ const Configuration *const configuration_;
+
+ // File to write both delivery timestamps and local data to.
+ std::unique_ptr<DetachedBufferWriter> data_writer_;
+ // Files to write remote data to. We want one per channel.
+ std::map<const Channel *, std::unique_ptr<DetachedBufferWriter>>
+ data_writers_;
+};
+
// Logs all channels available in the event loop to disk every 100 ms.
// Start by logging one message per channel to capture any state and
// configuration that is sent rately on a channel and would affect execution.
@@ -24,18 +182,23 @@
Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
std::chrono::milliseconds polling_period =
std::chrono::milliseconds(100));
+ Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
+ std::chrono::milliseconds polling_period =
+ std::chrono::milliseconds(100));
// Rotates the log file with the new writer. This writes out the header
// again, but keeps going as if nothing else happened.
void Rotate(DetachedBufferWriter *writer);
+ void Rotate(std::unique_ptr<LogNamer> log_namer);
private:
void WriteHeader();
+ void WriteHeader(const Node *node);
void DoLogData();
EventLoop *event_loop_;
- DetachedBufferWriter *writer_;
+ std::unique_ptr<LogNamer> log_namer_;
// Structure to track both a fetcher, and if the data fetched has been
// written. We may want to delay writing data to disk so that we don't let
@@ -45,7 +208,12 @@
std::unique_ptr<RawFetcher> fetcher;
bool written = false;
- LogType log_type;
+ int channel_index = -1;
+
+ LogType log_type = LogType::kLogMessage;
+
+ DetachedBufferWriter *writer = nullptr;
+ DetachedBufferWriter *timestamp_writer = nullptr;
};
std::vector<FetcherStruct> fetchers_;
@@ -65,6 +233,34 @@
size_t max_header_size_ = 0;
};
+// We end up with one of the following 3 log file types.
+//
+// Single node logged as the source node.
+// -> Replayed just on the source node.
+//
+// Forwarding timestamps only logged from the perspective of the destination
+// node.
+// -> Matched with data on source node and logged.
+//
+// Forwarding timestamps with data logged as the destination node.
+// -> Replayed just as the destination
+// -> Replayed as the source (Much harder, ordering is not defined)
+//
+// Duplicate data logged. -> CHECK that it matches and explode otherwise.
+//
+// This can be boiled down to a set of constraints and tools.
+//
+// 1) Forwarding timestamps and data need to be logged separately.
+// 2) Any forwarded data logged on the destination node needs to be logged
+// separately such that it can be sorted.
+//
+// 1) Log reader needs to be able to sort a list of log files.
+// 2) Log reader needs to be able to merge sorted lists of log files.
+// 3) Log reader needs to be able to match timestamps with messages.
+//
+// We also need to be able to generate multiple views of a log file depending on
+// the target.
+
// Replays all the channels in the logfile to the event loop.
class LogReader {
public:
@@ -72,9 +268,23 @@
// (e.g., to change message rates, or to populate an updated schema), then
// pass it in here. It must provide all the channels that the original logged
// config did.
+ //
+ // Log filenames are in the following format:
+ //
+ // {
+ // {log1_part0, log1_part1, ...},
+ // {log2}
+ // }
+ // The inner vector is a list of log file chunks which form up a log file.
+ // The outer vector is a list of log files with subsets of the messages, or
+ // messages from different nodes.
+ //
+ // If the outer vector isn't provided, it is assumed to be of size 1.
LogReader(std::string_view filename,
const Configuration *replay_configuration = nullptr);
- LogReader(const std::vector<std::string> &filename,
+ LogReader(const std::vector<std::string> &filenames,
+ const Configuration *replay_configuration = nullptr);
+ LogReader(const std::vector<std::vector<std::string>> &filenames,
const Configuration *replay_configuration = nullptr);
~LogReader();
@@ -101,19 +311,17 @@
// Returns the configuration from the log file.
const Configuration *logged_configuration() const;
// Returns the configuration being used for replay.
+ // The pointer is invalidated whenever RemapLoggedChannel is called.
const Configuration *configuration() const;
- const LogFileHeader *log_file_header() const {
- return sorted_message_reader_.log_file_header();
- }
-
- // Returns the node that this log file was created on. This is a pointer to a
- // node in the nodes() list inside configuration().
- const Node *node() const;
+ // Returns the nodes that this log file was created on. This is a list of
+ // pointers to a node in the nodes() list inside configuration(). The
+ // pointers here are invalidated whenever RemapLoggedChannel is called.
+ std::vector<const Node *> Nodes() const;
// Returns the starting timestamp for the log file.
- monotonic_clock::time_point monotonic_start_time();
- realtime_clock::time_point realtime_start_time();
+ monotonic_clock::time_point monotonic_start_time(const Node *node = nullptr);
+ realtime_clock::time_point realtime_start_time(const Node *node = nullptr);
// Causes the logger to publish the provided channel on a different name so
// that replayed applications can publish on the proper channel name without
@@ -131,31 +339,48 @@
return event_loop_factory_;
}
- // TODO(austin): Add the ability to re-publish the fetched messages. Add 2
- // options, one which publishes them *now*, and another which publishes them
- // to the simulated event loop factory back in time where they actually
- // happened.
-
private:
+ const Channel *RemapChannel(const EventLoop *event_loop,
+ const Channel *channel);
+
+ const LogFileHeader *log_file_header() const {
+ return &log_file_header_.message();
+ }
+
// Queues at least max_out_of_order_duration_ messages into channels_.
void QueueMessages();
// Handle constructing a configuration with all the additional remapped
// channels from calls to RemapLoggedChannel.
void MakeRemappedConfig();
- // Log chunk reader.
- SortedMessageReader sorted_message_reader_;
+ const std::vector<std::vector<std::string>> filenames_;
+
+ // This is *a* log file header used to provide the logged config. The rest of
+ // the header is likely distracting.
+ FlatbufferVector<LogFileHeader> log_file_header_;
+
+ // State per node.
+ struct State {
+ // Log file.
+ std::unique_ptr<ChannelMerger> channel_merger;
+ // Senders.
+ std::vector<std::unique_ptr<RawSender>> channels;
+
+ // Factory (if we are in sim) that this loop was created on.
+ NodeEventLoopFactory *node_event_loop_factory = nullptr;
+ std::unique_ptr<EventLoop> event_loop_unique_ptr;
+ // Event loop.
+ EventLoop *event_loop = nullptr;
+ // And timer used to send messages.
+ TimerHandler *timer_handler;
+ };
+
+ // Map of nodes to States used to hold all the state for all the nodes.
+ std::map<const Node *, State> channel_mergers_;
std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
remapped_configuration_buffer_;
- std::vector<std::unique_ptr<RawSender>> channels_;
-
- std::unique_ptr<EventLoop> event_loop_unique_ptr_;
- NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
- EventLoop *event_loop_ = nullptr;
- TimerHandler *timer_handler_;
-
std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
@@ -164,6 +389,10 @@
// to send on instead of the logged channel name.
std::map<size_t, std::string> remapped_channels_;
+ // Number of nodes which still have data to send. This is used to figure out
+ // when to exit.
+ size_t live_nodes_ = 0;
+
const Configuration *remapped_configuration_ = nullptr;
const Configuration *replay_configuration_ = nullptr;
};
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 55d0ecc..3a969d4 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -5,6 +5,7 @@
#include "aos/events/pong_lib.h"
#include "aos/events/simulated_event_loop.h"
#include "glog/logging.h"
+#include "gmock/gmock.h"
#include "gtest/gtest.h"
namespace aos {
@@ -70,7 +71,7 @@
// log file.
reader.Register();
- EXPECT_EQ(reader.node(), nullptr);
+ EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(nullptr));
std::unique_ptr<EventLoop> test_event_loop =
reader.event_loop_factory()->MakeEventLoop("log_reader");
@@ -135,7 +136,7 @@
// log file.
reader.Register();
- EXPECT_EQ(reader.node(), nullptr);
+ EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(nullptr));
std::unique_ptr<EventLoop> test_event_loop =
reader.event_loop_factory()->MakeEventLoop("log_reader");
@@ -212,7 +213,11 @@
ping_event_loop_(event_loop_factory_.MakeEventLoop(
"ping", configuration::GetNode(event_loop_factory_.configuration(),
"pi1"))),
- ping_(ping_event_loop_.get()) {}
+ ping_(ping_event_loop_.get()),
+ pong_event_loop_(event_loop_factory_.MakeEventLoop(
+ "pong", configuration::GetNode(event_loop_factory_.configuration(),
+ "pi2"))),
+ pong_(pong_event_loop_.get()) {}
// Config and factory.
aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
@@ -221,65 +226,154 @@
// Event loop and app for Ping
std::unique_ptr<EventLoop> ping_event_loop_;
Ping ping_;
+
+ // Event loop and app for Pong
+ std::unique_ptr<EventLoop> pong_event_loop_;
+ Pong pong_;
};
-// Tests that we can startup at all in a multinode configuration.
-TEST_F(MultinodeLoggerTest, MultiNode) {
- constexpr chrono::seconds kTimeOffset = chrono::seconds(10000);
- constexpr uint32_t kQueueIndexOffset = 1024;
- const ::std::string tmpdir(getenv("TEST_TMPDIR"));
- const ::std::string logfile = tmpdir + "/multi_logfile.bfbs";
- // Remove it.
- unlink(logfile.c_str());
+// Counts the number of messages on a channel (returns channel, count) for every
+// message matching matcher()
+std::vector<std::pair<int, int>> CountChannelsMatching(
+ std::string_view filename,
+ std::function<bool(const MessageHeader *)> matcher) {
+ MessageReader message_reader(filename);
+ std::vector<int> counts(
+ message_reader.log_file_header()->configuration()->channels()->size(), 0);
- LOG(INFO) << "Logging data to " << logfile;
+ while (true) {
+ std::optional<FlatbufferVector<MessageHeader>> msg =
+ message_reader.ReadMessage();
+ if (!msg) {
+ break;
+ }
+
+ if (matcher(&msg.value().message())) {
+ counts[msg.value().message().channel_index()]++;
+ }
+ }
+
+ std::vector<std::pair<int, int>> result;
+ int channel = 0;
+ for (size_t i = 0; i < counts.size(); ++i) {
+ if (counts[i] != 0) {
+ result.push_back(std::make_pair(channel, counts[i]));
+ }
+ ++channel;
+ }
+
+ return result;
+}
+
+// Counts the number of messages (channel, count) for all data messages.
+std::vector<std::pair<int, int>> CountChannelsData(std::string_view filename) {
+ return CountChannelsMatching(filename, [](const MessageHeader *msg) {
+ if (msg->has_data()) {
+ CHECK(!msg->has_monotonic_remote_time());
+ CHECK(!msg->has_realtime_remote_time());
+ CHECK(!msg->has_remote_queue_index());
+ return true;
+ }
+ return false;
+ });
+}
+
+// Counts the number of messages (channel, count) for all timestamp messages.
+std::vector<std::pair<int, int>> CountChannelsTimestamp(
+ std::string_view filename) {
+ return CountChannelsMatching(filename, [](const MessageHeader *msg) {
+ if (!msg->has_data()) {
+ CHECK(msg->has_monotonic_remote_time());
+ CHECK(msg->has_realtime_remote_time());
+ CHECK(msg->has_remote_queue_index());
+ return true;
+ }
+ return false;
+ });
+}
+
+// Tests that we can write and read multi-node log files correctly.
+TEST_F(MultinodeLoggerTest, MultiNode) {
+ const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+ const ::std::string logfile_base = tmpdir + "/multi_logfile";
+ const ::std::string logfile1 = logfile_base + "_pi1_data.bfbs";
+ const ::std::string logfile2 =
+ logfile_base + "_pi2_data/test/aos.examples.Pong.bfbs";
+ const ::std::string logfile3 = logfile_base + "_pi2_data.bfbs";
+
+ // Remove them.
+ unlink(logfile1.c_str());
+ unlink(logfile2.c_str());
+ unlink(logfile3.c_str());
+
+ LOG(INFO) << "Logging data to " << logfile1 << ", " << logfile2 << " and "
+ << logfile3;
{
const Node *pi1 =
configuration::GetNode(event_loop_factory_.configuration(), "pi1");
- std::unique_ptr<EventLoop> pong_event_loop =
- event_loop_factory_.MakeEventLoop("pong", pi1);
+ const Node *pi2 =
+ configuration::GetNode(event_loop_factory_.configuration(), "pi2");
- std::unique_ptr<aos::RawSender> pong_sender(
- pong_event_loop->MakeRawSender(aos::configuration::GetChannel(
- pong_event_loop->configuration(), "/test", "aos.examples.Pong",
- pong_event_loop->name(), pong_event_loop->node())));
-
- // Ok, let's fake a remote node. We use the fancy raw sender Send
- // method that message_gateway will use to do that.
- int pong_count = 0;
- pong_event_loop->MakeWatcher(
- "/test", [&pong_event_loop, &pong_count, &pong_sender,
- kTimeOffset](const examples::Ping &ping) {
- flatbuffers::FlatBufferBuilder fbb;
- examples::Pong::Builder pong_builder(fbb);
- pong_builder.add_value(ping.value());
- pong_builder.add_initial_send_time(ping.send_time());
- fbb.Finish(pong_builder.Finish());
-
- pong_sender->Send(fbb.GetBufferPointer(), fbb.GetSize(),
- pong_event_loop->monotonic_now() + kTimeOffset,
- pong_event_loop->realtime_now() + kTimeOffset,
- kQueueIndexOffset + pong_count);
- ++pong_count;
- });
-
- DetachedBufferWriter writer(logfile);
- std::unique_ptr<EventLoop> logger_event_loop =
+ std::unique_ptr<EventLoop> pi1_logger_event_loop =
event_loop_factory_.MakeEventLoop("logger", pi1);
+ std::unique_ptr<LogNamer> pi1_log_namer =
+ std::make_unique<MultiNodeLogNamer>(
+ logfile_base, pi1_logger_event_loop->configuration(),
+ pi1_logger_event_loop->node());
+
+ std::unique_ptr<EventLoop> pi2_logger_event_loop =
+ event_loop_factory_.MakeEventLoop("logger", pi2);
+ std::unique_ptr<LogNamer> pi2_log_namer =
+ std::make_unique<MultiNodeLogNamer>(
+ logfile_base, pi2_logger_event_loop->configuration(),
+ pi2_logger_event_loop->node());
event_loop_factory_.RunFor(chrono::milliseconds(95));
- Logger logger(&writer, logger_event_loop.get(),
- std::chrono::milliseconds(100));
+ Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
+ std::chrono::milliseconds(100));
+ Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
+ std::chrono::milliseconds(100));
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(logfile);
+ {
+ // Confirm that the headers are all for the correct nodes.
+ FlatbufferVector<LogFileHeader> logheader1 = ReadHeader(logfile1);
+ EXPECT_EQ(logheader1.message().node()->name()->string_view(), "pi1");
+ FlatbufferVector<LogFileHeader> logheader2 = ReadHeader(logfile2);
+ EXPECT_EQ(logheader2.message().node()->name()->string_view(), "pi2");
+ FlatbufferVector<LogFileHeader> logheader3 = ReadHeader(logfile3);
+ EXPECT_EQ(logheader3.message().node()->name()->string_view(), "pi2");
- // TODO(austin): Also replay as pi2 or pi3 and make sure we see the pong
- // messages. This won't work today yet until the log reading code gets
- // significantly better.
+ // Timing reports, pings
+ EXPECT_THAT(CountChannelsData(logfile1),
+ ::testing::ElementsAre(::testing::Pair(1, 60),
+ ::testing::Pair(4, 2001)));
+ // Timestamps for pong
+ EXPECT_THAT(CountChannelsTimestamp(logfile1),
+ ::testing::ElementsAre(::testing::Pair(5, 2001)));
+
+ // Pong data.
+ EXPECT_THAT(CountChannelsData(logfile2),
+ ::testing::ElementsAre(::testing::Pair(5, 2001)));
+ // No timestamps
+ EXPECT_THAT(CountChannelsTimestamp(logfile2), ::testing::ElementsAre());
+
+ // Timing reports and pongs.
+ EXPECT_THAT(CountChannelsData(logfile3),
+ ::testing::ElementsAre(::testing::Pair(3, 60),
+ ::testing::Pair(5, 2001)));
+ // And ping timestamps.
+ EXPECT_THAT(CountChannelsTimestamp(logfile3),
+ ::testing::ElementsAre(::testing::Pair(4, 2001)));
+ }
+
+ LogReader reader({std::vector<std::string>{logfile1},
+ std::vector<std::string>{logfile2},
+ std::vector<std::string>{logfile3}});
+
SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -289,48 +383,138 @@
const Node *pi1 =
configuration::GetNode(log_reader_factory.configuration(), "pi1");
+ const Node *pi2 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi2");
- ASSERT_NE(reader.node(), nullptr);
- EXPECT_EQ(reader.node()->name()->string_view(), "pi1");
+ EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(pi1, pi2));
reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
- std::unique_ptr<EventLoop> test_event_loop =
+ std::unique_ptr<EventLoop> pi1_event_loop =
log_reader_factory.MakeEventLoop("test", pi1);
+ std::unique_ptr<EventLoop> pi2_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi2);
- int ping_count = 10;
- int pong_count = 10;
+ int pi1_ping_count = 10;
+ int pi2_ping_count = 10;
+ int pi1_pong_count = 10;
+ int pi2_pong_count = 10;
// Confirm that the ping value matches.
- test_event_loop->MakeWatcher("/test",
- [&ping_count](const examples::Ping &ping) {
- EXPECT_EQ(ping.value(), ping_count + 1);
- ++ping_count;
- });
- // Confirm that the ping and pong counts both match, and the value also
- // matches.
- test_event_loop->MakeWatcher(
- "/test", [&test_event_loop, &ping_count, &pong_count,
- kTimeOffset](const examples::Pong &pong) {
- EXPECT_EQ(test_event_loop->context().remote_queue_index,
- pong_count + kQueueIndexOffset);
- EXPECT_EQ(test_event_loop->context().monotonic_remote_time,
- test_event_loop->monotonic_now() + kTimeOffset);
- EXPECT_EQ(test_event_loop->context().realtime_remote_time,
- test_event_loop->realtime_now() + kTimeOffset);
+ pi1_event_loop->MakeWatcher(
+ "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
+ VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
+ << pi1_event_loop->context().monotonic_remote_time << " -> "
+ << pi1_event_loop->context().monotonic_event_time;
+ EXPECT_EQ(ping.value(), pi1_ping_count + 1);
+ EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
+ pi1_ping_count * chrono::milliseconds(10) +
+ monotonic_clock::epoch());
+ EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
+ pi1_ping_count * chrono::milliseconds(10) +
+ realtime_clock::epoch());
+ EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
+ pi1_event_loop->context().monotonic_event_time);
+ EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
+ pi1_event_loop->context().realtime_event_time);
- EXPECT_EQ(pong.value(), pong_count + 1);
- ++pong_count;
- EXPECT_EQ(ping_count, pong_count);
+ ++pi1_ping_count;
+ });
+ pi2_event_loop->MakeWatcher(
+ "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
+ VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
+ << pi2_event_loop->context().monotonic_remote_time << " -> "
+ << pi2_event_loop->context().monotonic_event_time;
+ EXPECT_EQ(ping.value(), pi2_ping_count + 1);
+
+ EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
+ pi2_ping_count * chrono::milliseconds(10) +
+ monotonic_clock::epoch());
+ EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
+ pi2_ping_count * chrono::milliseconds(10) +
+ realtime_clock::epoch());
+ EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
+ chrono::microseconds(150),
+ pi2_event_loop->context().monotonic_event_time);
+ EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
+ chrono::microseconds(150),
+ pi2_event_loop->context().realtime_event_time);
+ ++pi2_ping_count;
});
- log_reader_factory.RunFor(std::chrono::seconds(100));
- EXPECT_EQ(ping_count, 2010);
- EXPECT_EQ(pong_count, 2010);
+ constexpr ssize_t kQueueIndexOffset = 0;
+ // Confirm that the ping and pong counts both match, and the value also
+ // matches.
+ pi1_event_loop->MakeWatcher(
+ "/test", [&pi1_event_loop, &pi1_ping_count,
+ &pi1_pong_count](const examples::Pong &pong) {
+ VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
+ << pi1_event_loop->context().monotonic_remote_time << " -> "
+ << pi1_event_loop->context().monotonic_event_time;
+
+ EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
+ pi1_pong_count + kQueueIndexOffset);
+ EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
+ chrono::microseconds(200) +
+ pi1_pong_count * chrono::milliseconds(10) +
+ monotonic_clock::epoch());
+ EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
+ chrono::microseconds(200) +
+ pi1_pong_count * chrono::milliseconds(10) +
+ realtime_clock::epoch());
+
+ EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
+ chrono::microseconds(150),
+ pi1_event_loop->context().monotonic_event_time);
+ EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
+ chrono::microseconds(150),
+ pi1_event_loop->context().realtime_event_time);
+
+ EXPECT_EQ(pong.value(), pi1_pong_count + 1);
+ ++pi1_pong_count;
+ EXPECT_EQ(pi1_ping_count, pi1_pong_count);
+ });
+ pi2_event_loop->MakeWatcher(
+ "/test", [&pi2_event_loop, &pi2_ping_count,
+ &pi2_pong_count](const examples::Pong &pong) {
+ VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
+ << pi2_event_loop->context().monotonic_remote_time << " -> "
+ << pi2_event_loop->context().monotonic_event_time;
+
+ EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
+ pi2_pong_count + kQueueIndexOffset - 9);
+
+ EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
+ chrono::microseconds(200) +
+ pi2_pong_count * chrono::milliseconds(10) +
+ monotonic_clock::epoch());
+ EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
+ chrono::microseconds(200) +
+ pi2_pong_count * chrono::milliseconds(10) +
+ realtime_clock::epoch());
+
+ EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
+ pi2_event_loop->context().monotonic_event_time);
+ EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
+ pi2_event_loop->context().realtime_event_time);
+
+ EXPECT_EQ(pong.value(), pi2_pong_count + 1);
+ ++pi2_pong_count;
+ EXPECT_EQ(pi2_ping_count, pi2_pong_count);
+ });
+
+ log_reader_factory.Run();
+ EXPECT_EQ(pi1_ping_count, 2010);
+ EXPECT_EQ(pi2_ping_count, 2010);
+ EXPECT_EQ(pi1_pong_count, 2010);
+ EXPECT_EQ(pi2_pong_count, 2010);
reader.Deregister();
}
+// TODO(austin): We can write a test which recreates a logfile and confirms that
+// we get it back. That is the ultimate test.
+
} // namespace testing
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/multinode_pingpong.json b/aos/events/logging/multinode_pingpong.json
index be8a402..957cc2b 100644
--- a/aos/events/logging/multinode_pingpong.json
+++ b/aos/events/logging/multinode_pingpong.json
@@ -33,9 +33,7 @@
"num_senders": 20,
"max_size": 2048
},
- /* Forwarded to pi2.
- * Doesn't matter where timestamps are logged for the test.
- */
+ /* Forwarded to pi2 */
{
"name": "/test",
"type": "aos.examples.Ping",
@@ -44,7 +42,7 @@
{
"name": "pi2",
"priority": 1,
- "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
"timestamp_logger_node": "pi1",
"time_to_live": 5000000
}
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 01574fa..faa9fe9 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -755,15 +755,8 @@
SimulatedEventLoopFactory::SimulatedEventLoopFactory(
const Configuration *configuration)
- : configuration_(CHECK_NOTNULL(configuration)) {
- if (configuration::MultiNode(configuration_)) {
- for (const Node *node : *configuration->nodes()) {
- nodes_.emplace_back(node);
- }
- } else {
- nodes_.emplace_back(nullptr);
- }
-
+ : configuration_(CHECK_NOTNULL(configuration)),
+ nodes_(configuration::GetNodes(configuration_)) {
for (const Node *node : nodes_) {
node_factories_.emplace_back(
new NodeEventLoopFactory(&scheduler_, this, node, &raw_event_loops_));
@@ -838,4 +831,8 @@
}
}
+void SimulatedEventLoopFactory::DisableForwarding(const Channel *channel) {
+ bridge_->DisableForwarding(channel);
+}
+
} // namespace aos
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 8cff0d7..21b241a 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -98,6 +98,10 @@
// Returns the configuration used for everything.
const Configuration *configuration() const { return configuration_; }
+ // Disables forwarding for this channel. This should be used very rarely only
+ // for things like the logger.
+ void DisableForwarding(const Channel *channel);
+
private:
const Configuration *const configuration_;
EventScheduler scheduler_;
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 57f2efd..10ce37d 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -28,6 +28,8 @@
Schedule();
}
+ const Channel *channel() const { return fetcher_->channel(); }
+
// Kicks us to re-fetch and schedule the timer.
void Schedule() {
if (fetcher_->context().data == nullptr || sent_) {
@@ -161,5 +163,22 @@
SimulatedMessageBridge::~SimulatedMessageBridge() {}
+void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
+ for (std::unique_ptr<std::vector<std::unique_ptr<RawMessageDelayer>>>
+ &delayers : delayers_list_) {
+ if (delayers->size() > 0) {
+ if ((*delayers)[0]->channel() == channel) {
+ for (std::unique_ptr<RawMessageDelayer> &delayer : *delayers) {
+ CHECK(delayer->channel() == channel);
+ }
+
+ // If we clear the delayers list, nothing will be scheduled. Which is a
+ // success!
+ delayers->clear();
+ }
+ }
+ }
+}
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 5d613ab..7aeef64 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -19,12 +19,15 @@
SimulatedEventLoopFactory *simulated_event_loop_factory);
~SimulatedMessageBridge();
+ // Disables forwarding for this channel. This should be used very rarely only
+ // for things like the logger.
+ void DisableForwarding(const Channel *channel);
+
private:
// Map of nodes to event loops. This is a member variable so that the
// lifetime of the event loops matches the lifetime of the bridge.
std::map<const Node *, std::unique_ptr<aos::EventLoop>> event_loop_map_;
-
// List of delayers used to resend the messages.
using DelayersVector = std::vector<std::unique_ptr<RawMessageDelayer>>;
std::vector<std::unique_ptr<DelayersVector>> delayers_list_;
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index d9fcab6..e556c0f 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -4,6 +4,7 @@
#include <array>
#include <string_view>
+#include "absl/types/span.h"
#include "flatbuffers/flatbuffers.h"
#include "glog/logging.h"
@@ -106,6 +107,11 @@
virtual const uint8_t *data() const = 0;
virtual uint8_t *data() = 0;
virtual size_t size() const = 0;
+
+ absl::Span<uint8_t> span() { return absl::Span<uint8_t>(data(), size()); }
+ absl::Span<const uint8_t> span() const {
+ return absl::Span<const uint8_t>(data(), size());
+ }
};
// String backed flatbuffer.