Read all timestamps into RAM where possible when reading logs
There are cases where we fail to buffer far enough in the future to
solve the timestamp problem, or where we end up choosing to buffer until
the end of time to solve the timestamp problem. These both result in
log reading failures either through CHECKs or OOMs.
Timestamps are tiny. The existence of timestamp_extractor proves that
we can store the whole timestamp problem in memory relatively easily.
When timestamps are stored in separate files, let's just load them at
the start.
This also adds flags to force this behavior on and off. When a log with
data and timestamps mixed in it is found, and the flag forces it on, we
will read the data files twice to extract the timestamps the first time.
I'd like to add more tests to logfile_utils_test to test this all
explicitly, but the multinode_logger tests appear to do a really good
job already.
Change-Id: I38e23836afa980e3e3a839125e78e132066e2c90
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 133bb79..02cff6d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1608,8 +1608,9 @@
}
void PartsMessageReader::ComputeBootCounts() {
- boot_counts_.assign(configuration::NodesCount(log_parts_access_.config()),
- std::nullopt);
+ boot_counts_.assign(
+ configuration::NodesCount(log_parts_access_.config().get()),
+ std::nullopt);
const auto boots = log_parts_access_.parts().boots;
@@ -1636,7 +1637,7 @@
// Older multi-node logs which are guarenteed to have UUIDs logged, or
// single node log files with boot UUIDs in the header. We only know how to
// order certain boots in certain circumstances.
- if (configuration::MultiNode(log_parts_access_.config()) || boots) {
+ if (configuration::MultiNode(log_parts_access_.config().get()) || boots) {
for (size_t node_index = 0; node_index < boot_counts_.size();
++node_index) {
if (boots->boots[node_index].size() == 1u) {
@@ -1704,11 +1705,9 @@
}
bool Message::operator<(const Message &m2) const {
- CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
-
- if (this->timestamp.time < m2.timestamp.time) {
+ if (this->timestamp < m2.timestamp) {
return true;
- } else if (this->timestamp.time > m2.timestamp.time) {
+ } else if (this->timestamp > m2.timestamp) {
return false;
}
@@ -1723,66 +1722,69 @@
bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
bool Message::operator==(const Message &m2) const {
- CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
-
- return timestamp.time == m2.timestamp.time &&
- channel_index == m2.channel_index && queue_index == m2.queue_index;
+ return timestamp == m2.timestamp && channel_index == m2.channel_index &&
+ queue_index == m2.queue_index;
}
-std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
- os << "{.channel_index=" << m.channel_index
- << ", .monotonic_sent_time=" << m.monotonic_sent_time
- << ", .realtime_sent_time=" << m.realtime_sent_time
- << ", .queue_index=" << m.queue_index;
- if (m.monotonic_remote_time) {
- os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
+bool Message::operator<=(const Message &m2) const {
+ return *this == m2 || *this < m2;
+}
+
+std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &msg) {
+ os << "{.channel_index=" << msg.channel_index
+ << ", .monotonic_sent_time=" << msg.monotonic_sent_time
+ << ", .realtime_sent_time=" << msg.realtime_sent_time
+ << ", .queue_index=" << msg.queue_index;
+ if (msg.monotonic_remote_time) {
+ os << ", .monotonic_remote_time=" << *msg.monotonic_remote_time;
}
os << ", .realtime_remote_time=";
- PrintOptionalOrNull(&os, m.realtime_remote_time);
+ PrintOptionalOrNull(&os, msg.realtime_remote_time);
os << ", .remote_queue_index=";
- PrintOptionalOrNull(&os, m.remote_queue_index);
- if (m.has_monotonic_timestamp_time) {
- os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
+ PrintOptionalOrNull(&os, msg.remote_queue_index);
+ if (msg.has_monotonic_timestamp_time) {
+ os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time;
}
os << "}";
return os;
}
-std::ostream &operator<<(std::ostream &os, const Message &m) {
- os << "{.channel_index=" << m.channel_index
- << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
- if (m.data != nullptr) {
- if (m.data->remote_queue_index.has_value()) {
- os << ", .remote_queue_index=" << *m.data->remote_queue_index;
+std::ostream &operator<<(std::ostream &os, const Message &msg) {
+ os << "{.channel_index=" << msg.channel_index
+ << ", .queue_index=" << msg.queue_index
+ << ", .timestamp=" << msg.timestamp;
+ if (msg.data != nullptr) {
+ if (msg.data->remote_queue_index.has_value()) {
+ os << ", .remote_queue_index=" << *msg.data->remote_queue_index;
}
- if (m.data->monotonic_remote_time.has_value()) {
- os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
+ if (msg.data->monotonic_remote_time.has_value()) {
+ os << ", .monotonic_remote_time=" << *msg.data->monotonic_remote_time;
}
- os << ", .data=" << m.data;
+ os << ", .data=" << msg.data;
}
os << "}";
return os;
}
-std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
- os << "{.channel_index=" << m.channel_index
- << ", .queue_index=" << m.queue_index
- << ", .monotonic_event_time=" << m.monotonic_event_time
- << ", .realtime_event_time=" << m.realtime_event_time;
- if (m.remote_queue_index != BootQueueIndex::Invalid()) {
- os << ", .remote_queue_index=" << m.remote_queue_index;
+std::ostream &operator<<(std::ostream &os, const TimestampedMessage &msg) {
+ os << "{.channel_index=" << msg.channel_index
+ << ", .queue_index=" << msg.queue_index
+ << ", .monotonic_event_time=" << msg.monotonic_event_time
+ << ", .realtime_event_time=" << msg.realtime_event_time;
+ if (msg.remote_queue_index != BootQueueIndex::Invalid()) {
+ os << ", .remote_queue_index=" << msg.remote_queue_index;
}
- if (m.monotonic_remote_time != BootTimestamp::min_time()) {
- os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
+ if (msg.monotonic_remote_time != BootTimestamp::min_time()) {
+ os << ", .monotonic_remote_time=" << msg.monotonic_remote_time;
}
- if (m.realtime_remote_time != realtime_clock::min_time) {
- os << ", .realtime_remote_time=" << m.realtime_remote_time;
+ if (msg.realtime_remote_time != realtime_clock::min_time) {
+ os << ", .realtime_remote_time=" << msg.realtime_remote_time;
}
- if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
- os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
+ if (msg.monotonic_timestamp_time != BootTimestamp::min_time()) {
+ os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time;
}
- if (m.data != nullptr) {
- os << ", .data=" << *m.data;
+ if (msg.data != nullptr) {
+ os << ", .data=" << *msg.data;
} else {
os << ", .data=nullptr";
}
@@ -1807,40 +1809,41 @@
break;
}
- std::shared_ptr<UnpackedMessageHeader> m =
+ std::shared_ptr<UnpackedMessageHeader> msg =
parts_message_reader_.ReadMessage();
// No data left, sorted forever, work through what is left.
- if (!m) {
+ if (!msg) {
sorted_until_ = monotonic_clock::max_time;
break;
}
size_t monotonic_timestamp_boot = 0;
- if (m->has_monotonic_timestamp_time) {
+ if (msg->has_monotonic_timestamp_time) {
monotonic_timestamp_boot = parts().logger_boot_count;
}
size_t monotonic_remote_boot = 0xffffff;
- if (m->monotonic_remote_time.has_value()) {
- const Node *node =
- parts().config->nodes()->Get(source_node_index_[m->channel_index]);
+ if (msg->monotonic_remote_time.has_value()) {
+ const Node *node = parts().config->nodes()->Get(
+ source_node_index_[msg->channel_index]);
std::optional<size_t> boot = parts_message_reader_.boot_count(
- source_node_index_[m->channel_index]);
+ source_node_index_[msg->channel_index]);
CHECK(boot) << ": Failed to find boot for node '" << MaybeNodeName(node)
- << "', with index " << source_node_index_[m->channel_index];
+ << "', with index "
+ << source_node_index_[msg->channel_index];
monotonic_remote_boot = *boot;
}
messages_.insert(
- Message{.channel_index = m->channel_index,
+ Message{.channel_index = msg->channel_index,
.queue_index = BootQueueIndex{.boot = parts().boot_count,
- .index = m->queue_index},
+ .index = msg->queue_index},
.timestamp = BootTimestamp{.boot = parts().boot_count,
- .time = m->monotonic_sent_time},
+ .time = msg->monotonic_sent_time},
.monotonic_remote_boot = monotonic_remote_boot,
.monotonic_timestamp_boot = monotonic_timestamp_boot,
- .data = std::move(m)});
+ .data = std::move(msg)});
// Now, update sorted_until_ to match the new message.
if (parts_message_reader_.newest_timestamp() >
@@ -1864,6 +1867,8 @@
CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
<< DebugString() << " reading " << parts_message_reader_.filename();
last_message_time_ = messages_.begin()->timestamp.time;
+ VLOG(1) << this << " Front, sorted until " << sorted_until_ << " for "
+ << (*messages_.begin()) << " on " << parts_message_reader_.filename();
return &(*messages_.begin());
}
@@ -1874,9 +1879,9 @@
ss << "messages: [\n";
int count = 0;
bool no_dots = true;
- for (const Message &m : messages_) {
+ for (const Message &msg : messages_) {
if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
- ss << m << "\n";
+ ss << msg << "\n";
} else if (no_dots) {
ss << "...\n";
no_dots = false;
@@ -1887,31 +1892,25 @@
return ss.str();
}
-PartsMerger::PartsMerger(std::string_view node_name, size_t boot_count,
- const LogFilesContainer &log_files) {
- const auto parts = log_files.SelectParts(node_name, boot_count);
- node_ = configuration::GetNodeIndex(parts.config(), node_name);
-
- for (LogPartsAccess part : parts) {
- message_sorters_.emplace_back(std::move(part));
- }
-
- monotonic_start_time_ = monotonic_clock::max_time;
- realtime_start_time_ = realtime_clock::min_time;
- for (const MessageSorter &message_sorter : message_sorters_) {
+// Class to merge start times cleanly, reusably, and incrementally.
+class StartTimes {
+ public:
+ void Update(monotonic_clock::time_point new_monotonic_start_time,
+ realtime_clock::time_point new_realtime_start_time) {
// We want to capture the earliest meaningful start time here. The start
// time defaults to min_time when there's no meaningful value to report, so
// let's ignore those.
- if (message_sorter.monotonic_start_time() != monotonic_clock::min_time) {
+ if (new_monotonic_start_time != monotonic_clock::min_time) {
bool accept = false;
// We want to prioritize start times from the logger node. Really, we
// want to prioritize start times with a valid realtime_clock time. So,
// if we have a start time without a RT clock, prefer a start time with a
// RT clock, even it if is later.
- if (message_sorter.realtime_start_time() != realtime_clock::min_time) {
+ if (new_realtime_start_time != realtime_clock::min_time) {
// We've got a good one. See if the current start time has a good RT
// clock, or if we should use this one instead.
- if (message_sorter.monotonic_start_time() < monotonic_start_time_) {
+ if (new_monotonic_start_time < monotonic_start_time_ ||
+ monotonic_start_time_ == monotonic_clock::min_time) {
accept = true;
} else if (realtime_start_time_ == realtime_clock::min_time) {
// The previous start time doesn't have a good RT time, so it is very
@@ -1921,23 +1920,45 @@
}
} else if (realtime_start_time_ == realtime_clock::min_time) {
// We don't have a RT time, so take the oldest.
- if (message_sorter.monotonic_start_time() < monotonic_start_time_) {
+ if (new_monotonic_start_time < monotonic_start_time_ ||
+ monotonic_start_time_ == monotonic_clock::min_time) {
accept = true;
}
}
if (accept) {
- monotonic_start_time_ = message_sorter.monotonic_start_time();
- realtime_start_time_ = message_sorter.realtime_start_time();
+ monotonic_start_time_ = new_monotonic_start_time;
+ realtime_start_time_ = new_realtime_start_time;
}
}
}
- // If there was no meaningful start time reported, just use min_time.
- if (monotonic_start_time_ == monotonic_clock::max_time) {
- monotonic_start_time_ = monotonic_clock::min_time;
- realtime_start_time_ = realtime_clock::min_time;
+ monotonic_clock::time_point monotonic_start_time() const {
+ return monotonic_start_time_;
}
+ realtime_clock::time_point realtime_start_time() const {
+ return realtime_start_time_;
+ }
+
+ private:
+ monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::min_time;
+ realtime_clock::time_point realtime_start_time_ = realtime_clock::min_time;
+};
+
+PartsMerger::PartsMerger(SelectedLogParts &&parts) {
+ node_ = configuration::GetNodeIndex(parts.config().get(), parts.node_name());
+
+ for (LogPartsAccess part : parts) {
+ message_sorters_.emplace_back(std::move(part));
+ }
+
+ StartTimes start_times;
+ for (const MessageSorter &message_sorter : message_sorters_) {
+ start_times.Update(message_sorter.monotonic_start_time(),
+ message_sorter.realtime_start_time());
+ }
+ monotonic_start_time_ = start_times.monotonic_start_time();
+ realtime_start_time_ = start_times.realtime_start_time();
}
std::vector<const LogParts *> PartsMerger::Parts() const {
@@ -1954,6 +1975,8 @@
if (current_ != nullptr) {
Message *result = current_->Front();
CHECK_GE(result->timestamp.time, last_message_time_);
+ VLOG(1) << this << " PartsMerger::Front for node " << node_name() << " "
+ << *result;
return result;
}
@@ -1962,25 +1985,25 @@
Message *oldest = nullptr;
sorted_until_ = monotonic_clock::max_time;
for (MessageSorter &message_sorter : message_sorters_) {
- Message *m = message_sorter.Front();
- if (!m) {
+ Message *msg = message_sorter.Front();
+ if (!msg) {
sorted_until_ = std::min(sorted_until_, message_sorter.sorted_until());
continue;
}
- if (oldest == nullptr || *m < *oldest) {
- oldest = m;
+ if (oldest == nullptr || *msg < *oldest) {
+ oldest = msg;
current_ = &message_sorter;
- } else if (*m == *oldest) {
+ } else if (*msg == *oldest) {
// Found a duplicate. If there is a choice, we want the one which has
// the timestamp time.
- if (!m->data->has_monotonic_timestamp_time) {
+ if (!msg->data->has_monotonic_timestamp_time) {
message_sorter.PopFront();
} else if (!oldest->data->has_monotonic_timestamp_time) {
current_->PopFront();
current_ = &message_sorter;
- oldest = m;
+ oldest = msg;
} else {
- CHECK_EQ(m->data->monotonic_timestamp_time,
+ CHECK_EQ(msg->data->monotonic_timestamp_time,
oldest->data->monotonic_timestamp_time);
message_sorter.PopFront();
}
@@ -1993,6 +2016,11 @@
if (oldest) {
CHECK_GE(oldest->timestamp.time, last_message_time_);
last_message_time_ = oldest->timestamp.time;
+ if (monotonic_oldest_time_ > oldest->timestamp.time) {
+ VLOG(1) << this << " Updating oldest to " << oldest->timestamp.time
+ << " for node " << node_name() << " with a start time of "
+ << monotonic_start_time_ << " " << *oldest;
+ }
monotonic_oldest_time_ =
std::min(monotonic_oldest_time_, oldest->timestamp.time);
} else {
@@ -2001,6 +2029,12 @@
// Return the oldest message found. This will be nullptr if nothing was
// found, indicating there is nothing left.
+ if (oldest) {
+ VLOG(1) << this << " PartsMerger::Front for node " << node_name() << " "
+ << *oldest;
+ } else {
+ VLOG(1) << this << " PartsMerger::Front for node " << node_name();
+ }
return oldest;
}
@@ -2011,29 +2045,52 @@
}
BootMerger::BootMerger(std::string_view node_name,
- const LogFilesContainer &log_files) {
+ const LogFilesContainer &log_files,
+ const std::vector<StoredDataType> &types)
+ : configuration_(log_files.config()),
+ node_(configuration::GetNodeIndex(configuration_.get(), node_name)) {
size_t number_of_boots = log_files.BootsForNode(node_name);
parts_mergers_.reserve(number_of_boots);
for (size_t i = 0; i < number_of_boots; ++i) {
VLOG(2) << "Boot " << i;
- parts_mergers_.emplace_back(
- std::make_unique<PartsMerger>(node_name, i, log_files));
+ SelectedLogParts selected_parts =
+ log_files.SelectParts(node_name, i, types);
+ // We are guarenteed to have something each boot, but not guarenteed to have
+ // both timestamps and data for each boot. If we don't have anything, don't
+ // create a parts merger. The rest of this class will detect that and
+ // ignore it as required.
+ if (selected_parts.empty()) {
+ parts_mergers_.emplace_back(nullptr);
+ } else {
+ parts_mergers_.emplace_back(
+ std::make_unique<PartsMerger>(std::move(selected_parts)));
+ }
}
}
-Message *BootMerger::Front() {
- Message *result = parts_mergers_[index_]->Front();
+std::string_view BootMerger::node_name() const {
+ return configuration::NodeName(configuration().get(), node());
+}
- if (result != nullptr) {
- return result;
+Message *BootMerger::Front() {
+ if (parts_mergers_[index_].get() != nullptr) {
+ Message *result = parts_mergers_[index_]->Front();
+
+ if (result != nullptr) {
+ VLOG(1) << this << " BootMerger::Front " << node_name() << " " << *result;
+ return result;
+ }
}
if (index_ + 1u == parts_mergers_.size()) {
// At the end of the last node merger, just return.
+ VLOG(1) << this << " BootMerger::Front " << node_name() << " nullptr";
return nullptr;
} else {
++index_;
- return Front();
+ Message *result = Front();
+ VLOG(1) << this << " BootMerger::Front " << node_name() << " " << *result;
+ return result;
}
}
@@ -2042,6 +2099,8 @@
std::vector<const LogParts *> BootMerger::Parts() const {
std::vector<const LogParts *> results;
for (const std::unique_ptr<PartsMerger> &parts_merger : parts_mergers_) {
+ if (!parts_merger) continue;
+
std::vector<const LogParts *> node_parts = parts_merger->Parts();
results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
@@ -2051,17 +2110,241 @@
return results;
}
-TimestampMapper::TimestampMapper(std::string_view node_name,
- const LogFilesContainer &log_files)
- : boot_merger_(node_name, log_files),
- timestamp_callback_([](TimestampedMessage *) {}) {
- for (const LogParts *part : boot_merger_.Parts()) {
- if (!configuration_) {
- configuration_ = part->config;
- } else {
- CHECK_EQ(configuration_.get(), part->config.get());
- }
+monotonic_clock::time_point BootMerger::monotonic_start_time(
+ size_t boot) const {
+ CHECK_LT(boot, parts_mergers_.size());
+ if (parts_mergers_[boot]) {
+ return parts_mergers_[boot]->monotonic_start_time();
}
+ return monotonic_clock::min_time;
+}
+
+realtime_clock::time_point BootMerger::realtime_start_time(size_t boot) const {
+ CHECK_LT(boot, parts_mergers_.size());
+ if (parts_mergers_[boot]) {
+ return parts_mergers_[boot]->realtime_start_time();
+ }
+ return realtime_clock::min_time;
+}
+
+monotonic_clock::time_point BootMerger::monotonic_oldest_time(
+ size_t boot) const {
+ CHECK_LT(boot, parts_mergers_.size());
+ if (parts_mergers_[boot]) {
+ return parts_mergers_[boot]->monotonic_oldest_time();
+ }
+ return monotonic_clock::max_time;
+}
+
+bool BootMerger::started() const {
+ if (index_ == 0) {
+ if (!parts_mergers_[0]) {
+ return false;
+ }
+ return parts_mergers_[index_]->sorted_until() != monotonic_clock::min_time;
+ }
+ return true;
+}
+
+SplitTimestampBootMerger::SplitTimestampBootMerger(
+ std::string_view node_name, const LogFilesContainer &log_files,
+ TimestampQueueStrategy timestamp_queue_strategy)
+ : boot_merger_(node_name, log_files,
+ (timestamp_queue_strategy ==
+ TimestampQueueStrategy::kQueueTimestampsAtStartup)
+ ? std::vector<StoredDataType>{StoredDataType::DATA}
+ : std::vector<StoredDataType>{
+ StoredDataType::DATA, StoredDataType::TIMESTAMPS,
+ StoredDataType::REMOTE_TIMESTAMPS}) {
+ // Make the timestamp_boot_merger_ only if we are asked to, and if there are
+ // files to put in it. We don't need it for a data only log.
+ if (timestamp_queue_strategy ==
+ TimestampQueueStrategy::kQueueTimestampsAtStartup &&
+ log_files.HasTimestamps(node_name)) {
+ timestamp_boot_merger_ = std::make_unique<BootMerger>(
+ node_name, log_files,
+ std::vector<StoredDataType>{StoredDataType::TIMESTAMPS,
+ StoredDataType::REMOTE_TIMESTAMPS});
+ }
+
+ size_t number_of_boots = log_files.BootsForNode(node_name);
+ monotonic_start_time_.reserve(number_of_boots);
+ realtime_start_time_.reserve(number_of_boots);
+
+ // Start times are split across the timestamp boot merger, and data boot
+ // merger. Pull from both and combine them to get the same answer as before.
+ for (size_t i = 0u; i < number_of_boots; ++i) {
+ StartTimes start_times;
+
+ if (timestamp_boot_merger_) {
+ start_times.Update(timestamp_boot_merger_->monotonic_start_time(i),
+ timestamp_boot_merger_->realtime_start_time(i));
+ }
+
+ start_times.Update(boot_merger_.monotonic_start_time(i),
+ boot_merger_.realtime_start_time(i));
+
+ monotonic_start_time_.push_back(start_times.monotonic_start_time());
+ realtime_start_time_.push_back(start_times.realtime_start_time());
+ }
+}
+
+void SplitTimestampBootMerger::QueueTimestamps(
+ std::function<void(TimestampedMessage *)> fn,
+ const std::vector<size_t> &source_node) {
+ if (!timestamp_boot_merger_) {
+ return;
+ }
+
+ while (true) {
+ // Load all the timestamps. If we find data, ignore it and drop it on the
+ // floor. It will be read when boot_merger_ is used.
+ Message *msg = timestamp_boot_merger_->Front();
+ if (!msg) {
+ queue_timestamps_ran_ = true;
+ return;
+ }
+ if (source_node[msg->channel_index] != static_cast<size_t>(node())) {
+ timestamp_messages_.emplace_back(TimestampedMessage{
+ .channel_index = msg->channel_index,
+ .queue_index = msg->queue_index,
+ .monotonic_event_time = msg->timestamp,
+ .realtime_event_time = msg->data->realtime_sent_time,
+ .remote_queue_index =
+ BootQueueIndex{.boot = msg->monotonic_remote_boot,
+ .index = msg->data->remote_queue_index.value()},
+ .monotonic_remote_time = {msg->monotonic_remote_boot,
+ msg->data->monotonic_remote_time.value()},
+ .realtime_remote_time = msg->data->realtime_remote_time.value(),
+ .monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
+ msg->data->monotonic_timestamp_time},
+ .data = std::move(msg->data)});
+
+ VLOG(2) << this << " Queued timestamp of " << timestamp_messages_.back();
+ fn(×tamp_messages_.back());
+ } else {
+ VLOG(2) << this << " Dropped data";
+ }
+ timestamp_boot_merger_->PopFront();
+ }
+
+ // TODO(austin): Push the queue into TimestampMapper instead. Have it pull
+ // all the timestamps. That will also make it so we don't have to clear the
+ // function.
+}
+
+std::string_view SplitTimestampBootMerger::node_name() const {
+ return configuration::NodeName(configuration().get(), node());
+}
+
+monotonic_clock::time_point SplitTimestampBootMerger::monotonic_start_time(
+ size_t boot) const {
+ CHECK_LT(boot, monotonic_start_time_.size());
+ return monotonic_start_time_[boot];
+}
+
+realtime_clock::time_point SplitTimestampBootMerger::realtime_start_time(
+ size_t boot) const {
+ CHECK_LT(boot, realtime_start_time_.size());
+ return realtime_start_time_[boot];
+}
+
+monotonic_clock::time_point SplitTimestampBootMerger::monotonic_oldest_time(
+ size_t boot) const {
+ if (!timestamp_boot_merger_) {
+ return boot_merger_.monotonic_oldest_time(boot);
+ }
+ return std::min(boot_merger_.monotonic_oldest_time(boot),
+ timestamp_boot_merger_->monotonic_oldest_time(boot));
+}
+
+Message *SplitTimestampBootMerger::Front() {
+ Message *boot_merger_front = boot_merger_.Front();
+
+ if (timestamp_boot_merger_) {
+ CHECK(queue_timestamps_ran_);
+ }
+
+ // timestamp_messages_ is a queue of TimestampedMessage, but we are supposed
+ // to return a Message. We need to convert the first message in the list
+ // before returning it (and comparing, honestly). Fill next_timestamp_ in if
+ // it is empty so the rest of the logic here can just look at next_timestamp_
+ // and use that instead.
+ if (!next_timestamp_ && !timestamp_messages_.empty()) {
+ auto &front = timestamp_messages_.front();
+ next_timestamp_ = Message{
+ .channel_index = front.channel_index,
+ .queue_index = front.queue_index,
+ .timestamp = front.monotonic_event_time,
+ .monotonic_remote_boot = front.remote_queue_index.boot,
+ .monotonic_timestamp_boot = front.monotonic_timestamp_time.boot,
+ .data = std::move(front.data),
+ };
+ timestamp_messages_.pop_front();
+ }
+
+ if (!next_timestamp_) {
+ message_source_ = MessageSource::kBootMerger;
+ if (boot_merger_front != nullptr) {
+ VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
+ << " " << *boot_merger_front;
+ } else {
+ VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
+ << " nullptr";
+ }
+ return boot_merger_front;
+ }
+
+ if (boot_merger_front == nullptr) {
+ message_source_ = MessageSource::kTimestampMessage;
+
+ VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
+ << next_timestamp_.value();
+ return &next_timestamp_.value();
+ }
+
+ if (*boot_merger_front <= next_timestamp_.value()) {
+ if (*boot_merger_front == next_timestamp_.value()) {
+ VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
+ << " Dropping duplicate timestamp.";
+ next_timestamp_.reset();
+ }
+ message_source_ = MessageSource::kBootMerger;
+ if (boot_merger_front != nullptr) {
+ VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
+ << " " << *boot_merger_front;
+ } else {
+ VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
+ << " nullptr";
+ }
+ return boot_merger_front;
+ } else {
+ message_source_ = MessageSource::kTimestampMessage;
+ VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
+ << next_timestamp_.value();
+ return &next_timestamp_.value();
+ }
+}
+
+void SplitTimestampBootMerger::PopFront() {
+ switch (message_source_) {
+ case MessageSource::kTimestampMessage:
+ CHECK(next_timestamp_.has_value());
+ next_timestamp_.reset();
+ break;
+ case MessageSource::kBootMerger:
+ boot_merger_.PopFront();
+ break;
+ }
+}
+
+TimestampMapper::TimestampMapper(
+ std::string_view node_name, const LogFilesContainer &log_files,
+ TimestampQueueStrategy timestamp_queue_strategy)
+ : boot_merger_(node_name, log_files, timestamp_queue_strategy),
+ timestamp_callback_([](TimestampedMessage *) {}) {
+ configuration_ = boot_merger_.configuration();
+
const Configuration *config = configuration_.get();
// Only fill out nodes_data_ if there are nodes. Otherwise, everything is
// pretty simple.
@@ -2120,17 +2403,18 @@
}
}
-void TimestampMapper::QueueMessage(Message *m) {
+void TimestampMapper::QueueMessage(Message *msg) {
matched_messages_.emplace_back(
- TimestampedMessage{.channel_index = m->channel_index,
- .queue_index = m->queue_index,
- .monotonic_event_time = m->timestamp,
- .realtime_event_time = m->data->realtime_sent_time,
+ TimestampedMessage{.channel_index = msg->channel_index,
+ .queue_index = msg->queue_index,
+ .monotonic_event_time = msg->timestamp,
+ .realtime_event_time = msg->data->realtime_sent_time,
.remote_queue_index = BootQueueIndex::Invalid(),
.monotonic_remote_time = BootTimestamp::min_time(),
.realtime_remote_time = realtime_clock::min_time,
.monotonic_timestamp_time = BootTimestamp::min_time(),
- .data = std::move(m->data)});
+ .data = std::move(msg->data)});
+ VLOG(1) << node_name() << " Inserted " << matched_messages_.back();
}
TimestampedMessage *TimestampMapper::Front() {
@@ -2139,18 +2423,26 @@
case FirstMessage::kNeedsUpdate:
break;
case FirstMessage::kInMessage:
+ VLOG(1) << this << " TimestampMapper::Front " << node_name() << " "
+ << matched_messages_.front();
return &matched_messages_.front();
case FirstMessage::kNullptr:
+ VLOG(1) << this << " TimestampMapper::Front " << node_name()
+ << " nullptr";
return nullptr;
}
if (matched_messages_.empty()) {
if (!QueueMatched()) {
first_message_ = FirstMessage::kNullptr;
+ VLOG(1) << this << " TimestampMapper::Front " << node_name()
+ << " nullptr";
return nullptr;
}
}
first_message_ = FirstMessage::kInMessage;
+ VLOG(1) << this << " TimestampMapper::Front " << node_name() << " "
+ << matched_messages_.front();
return &matched_messages_.front();
}
@@ -2166,6 +2458,7 @@
const TimestampedMessage & /*message*/) {
if (replay_channels_callback_ &&
!replay_channels_callback_(matched_messages_.back())) {
+ VLOG(1) << node_name() << " Popped " << matched_messages_.back();
matched_messages_.pop_back();
return true;
}
@@ -2176,15 +2469,16 @@
if (nodes_data_.empty()) {
// Simple path. We are single node, so there are no timestamps to match!
CHECK_EQ(messages_.size(), 0u);
- Message *m = boot_merger_.Front();
- if (!m) {
+ Message *msg = boot_merger_.Front();
+ if (!msg) {
return MatchResult::kEndOfFile;
}
// Enqueue this message into matched_messages_ so we have a place to
// associate remote timestamps, and return it.
- QueueMessage(m);
+ QueueMessage(msg);
- CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
+ CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
+ << " on " << node_name();
last_message_time_ = matched_messages_.back().monotonic_event_time;
// We are thin wrapper around parts_merger. Call it directly.
@@ -2210,12 +2504,13 @@
boot_merger_.PopFront();
}
- Message *m = &(messages_.front());
+ Message *msg = &(messages_.front());
- if (source_node_[m->channel_index] == node()) {
+ if (source_node_[msg->channel_index] == node()) {
// From us, just forward it on, filling the remote data in as invalid.
- QueueMessage(m);
- CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
+ QueueMessage(msg);
+ CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
+ << " on " << node_name();
last_message_time_ = matched_messages_.back().monotonic_event_time;
messages_.pop_front();
timestamp_callback_(&matched_messages_.back());
@@ -2226,25 +2521,28 @@
} else {
// Got a timestamp, find the matching remote data, match it, and return
// it.
- Message data = MatchingMessageFor(*m);
+ Message data = MatchingMessageFor(*msg);
// Return the data from the remote. The local message only has timestamp
// info which isn't relevant anymore once extracted.
matched_messages_.emplace_back(TimestampedMessage{
- .channel_index = m->channel_index,
- .queue_index = m->queue_index,
- .monotonic_event_time = m->timestamp,
- .realtime_event_time = m->data->realtime_sent_time,
+ .channel_index = msg->channel_index,
+ .queue_index = msg->queue_index,
+ .monotonic_event_time = msg->timestamp,
+ .realtime_event_time = msg->data->realtime_sent_time,
.remote_queue_index =
- BootQueueIndex{.boot = m->monotonic_remote_boot,
- .index = m->data->remote_queue_index.value()},
- .monotonic_remote_time = {m->monotonic_remote_boot,
- m->data->monotonic_remote_time.value()},
- .realtime_remote_time = m->data->realtime_remote_time.value(),
- .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
- m->data->monotonic_timestamp_time},
+ BootQueueIndex{.boot = msg->monotonic_remote_boot,
+ .index = msg->data->remote_queue_index.value()},
+ .monotonic_remote_time = {msg->monotonic_remote_boot,
+ msg->data->monotonic_remote_time.value()},
+ .realtime_remote_time = msg->data->realtime_remote_time.value(),
+ .monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
+ msg->data->monotonic_timestamp_time},
.data = std::move(data.data)});
- CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
+ VLOG(1) << node_name() << " Inserted timestamp "
+ << matched_messages_.back();
+ CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
+ << " on " << node_name() << " " << matched_messages_.back();
last_message_time_ = matched_messages_.back().monotonic_event_time;
// Since messages_ holds the data, drop it.
messages_.pop_front();
@@ -2300,6 +2598,7 @@
last_popped_message_time_ = Front()->monotonic_event_time;
first_message_ = FirstMessage::kNeedsUpdate;
+ VLOG(1) << node_name() << " Popped " << matched_messages_.back();
matched_messages_.pop_front();
}
@@ -2393,9 +2692,9 @@
auto it = std::find_if(
data_queue->begin(), data_queue->end(),
[remote_queue_index,
- remote_boot = monotonic_remote_time.boot](const Message &m) {
- return m.queue_index == remote_queue_index &&
- m.timestamp.boot == remote_boot;
+ remote_boot = monotonic_remote_time.boot](const Message &msg) {
+ return msg.queue_index == remote_queue_index &&
+ msg.timestamp.boot == remote_boot;
});
if (it == data_queue->end()) {
return Message{.channel_index = message.channel_index,
@@ -2446,14 +2745,14 @@
}
bool TimestampMapper::Queue() {
- Message *m = boot_merger_.Front();
- if (m == nullptr) {
+ Message *msg = boot_merger_.Front();
+ if (msg == nullptr) {
return false;
}
for (NodeData &node_data : nodes_data_) {
if (!node_data.any_delivered) continue;
if (!node_data.save_for_peer) continue;
- if (node_data.channels[m->channel_index].delivered) {
+ if (node_data.channels[msg->channel_index].delivered) {
// If we have data but no timestamps (logs where the timestamps didn't get
// logged are classic), we can grow this indefinitely. We don't need to
// keep anything that is older than the last message returned.
@@ -2461,11 +2760,11 @@
// We have the time on the source node.
// We care to wait until we have the time on the destination node.
std::deque<Message> &messages =
- node_data.channels[m->channel_index].messages;
+ node_data.channels[msg->channel_index].messages;
// Max delay over the network is the TTL, so let's take the queue time and
// add TTL to it. Don't forget any messages which are reliable until
// someone can come up with a good reason to forget those too.
- if (node_data.channels[m->channel_index].time_to_live >
+ if (node_data.channels[msg->channel_index].time_to_live >
chrono::nanoseconds(0)) {
// We need to make *some* assumptions about network delay for this to
// work. We want to only look at the RX side. This means we need to
@@ -2479,21 +2778,25 @@
// messages getting sent twice.
while (messages.size() > 1u &&
messages.begin()->timestamp +
- node_data.channels[m->channel_index].time_to_live +
+ node_data.channels[msg->channel_index].time_to_live +
chrono::duration_cast<chrono::nanoseconds>(
chrono::duration<double>(FLAGS_max_network_delay)) <
last_popped_message_time_) {
messages.pop_front();
}
}
- node_data.channels[m->channel_index].messages.emplace_back(*m);
+ node_data.channels[msg->channel_index].messages.emplace_back(*msg);
}
}
- messages_.emplace_back(std::move(*m));
+ messages_.emplace_back(std::move(*msg));
return true;
}
+void TimestampMapper::QueueTimestamps() {
+ boot_merger_.QueueTimestamps(std::ref(timestamp_callback_), source_node_);
+}
+
std::string TimestampMapper::DebugString() const {
std::stringstream ss;
ss << "node " << node() << " (" << node_name() << ") [\n";
@@ -2512,8 +2815,8 @@
}
ss << " channel " << channel_index << " [\n";
- for (const Message &m : channel_data.messages) {
- ss << " " << m << "\n";
+ for (const Message &msg : channel_data.messages) {
+ ss << " " << msg << "\n";
}
ss << " ]\n";
++channel_index;