Read all timestamps into RAM where possible when reading logs
There are cases where we fail to buffer far enough in the future to
solve the timestamp problem, or where we end up choosing to buffer until
the end of time to solve the timestamp problem. These both result in
log reading failures either through CHECKs or OOMs.
Timestamps are tiny. The existence of timestamp_extractor proves that
we can store the whole timestamp problem in memory relatively easily.
When timestamps are stored in separate files, let's just load them at
the start.
This also adds flags to force this behavior on and off. When a log with
data and timestamps mixed in it is found, and the flag forces it on, we
will read the data files twice to extract the timestamps the first time.
I'd like to add more tests to logfile_utils_test to test this all
explicitly, but the multinode_logger tests appear to do a really good
job already.
Change-Id: I38e23836afa980e3e3a839125e78e132066e2c90
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 70e7b48..954bf38 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -1144,6 +1144,13 @@
return nullptr;
}
+std::string_view NodeName(const Configuration *config, size_t node_index) {
+ if (!configuration::MultiNode(config)) {
+ return "(singlenode)";
+ }
+ return config->nodes()->Get(node_index)->name()->string_view();
+}
+
const Node *GetMyNode(const Configuration *config) {
const std::string hostname = (FLAGS_override_hostname.size() > 0)
? FLAGS_override_hostname
diff --git a/aos/configuration.h b/aos/configuration.h
index e35c0bf..01b9e7c 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -128,6 +128,10 @@
const Node *GetNodeFromHostname(const Configuration *config,
std::string_view name);
+// Returns a printable name for the node. (singlenode) if we are on a single
+// node system, and the name otherwise.
+std::string_view NodeName(const Configuration *config, size_t node_index);
+
// Returns a vector of the nodes in the config. (nullptr is considered the node
// in a single node world.)
std::vector<const Node *> GetNodes(const Configuration *config);
diff --git a/aos/events/logging/config_remapper_test.cc b/aos/events/logging/config_remapper_test.cc
index 89e1c96..a1fedb2 100644
--- a/aos/events/logging/config_remapper_test.cc
+++ b/aos/events/logging/config_remapper_test.cc
@@ -28,10 +28,12 @@
::testing::Values(
ConfigParams{"multinode_pingpong_combined_config.json", true,
kCombinedConfigSha1(), kCombinedConfigSha1(),
- FileStrategy::kCombine},
+ FileStrategy::kCombine,
+ ForceTimestampBuffering::kForceBufferTimestamps},
ConfigParams{"multinode_pingpong_split_config.json", false,
kSplitConfigSha1(), kReloggedSplitConfigSha1(),
- FileStrategy::kCombine}),
+ FileStrategy::kCombine,
+ ForceTimestampBuffering::kForceBufferTimestamps}),
::testing::ValuesIn(SupportedCompressionAlgorithms())));
// Tests that we can read a config and remap a channel
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index b18873e..850a564 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -35,6 +35,19 @@
"false, CHECK.");
DECLARE_bool(timestamps_to_csv);
+DEFINE_bool(
+ enable_timestamp_loading, true,
+ "Enable loading all the timestamps into RAM at startup if they are in "
+ "separate files. This fixes any timestamp queueing problems for the cost "
+ "of storing timestamps in RAM only on logs with timestamps logged in "
+ "separate files from data. Only disable this if you are reading a really "
+ "long log file and are experiencing memory problems due to loading all the "
+ "timestamps into RAM.");
+DEFINE_bool(
+ force_timestamp_loading, false,
+ "Force loading all the timestamps into RAM at startup. This fixes any "
+ "timestamp queueing problems for the cost of storing timestamps in RAM and "
+ "potentially reading each log twice.");
DEFINE_bool(skip_order_validation, false,
"If true, ignore any out of orderness in replay");
@@ -175,14 +188,14 @@
: log_files_(std::move(log_files)),
replay_configuration_(replay_configuration),
replay_channels_(replay_channels),
- config_remapper_(log_files_.config(), replay_configuration_,
+ config_remapper_(log_files_.config().get(), replay_configuration_,
replay_channels_) {
SetStartTime(FLAGS_start_time);
SetEndTime(FLAGS_end_time);
{
// Log files container validates that log files shared the same config.
- const Configuration *config = log_files_.config();
+ const Configuration *config = log_files_.config().get();
CHECK_NOTNULL(config);
}
@@ -298,7 +311,7 @@
BootTimestamp>::PushResult result{
*message, queue_until >= last_queued_message_, false};
timestamp_mapper_->PopFront();
- SeedSortedMessages();
+ MaybeSeedSortedMessages();
return result;
},
time);
@@ -425,12 +438,20 @@
SimulatedEventLoopFactory *event_loop_factory) {
event_loop_factory_ = event_loop_factory;
config_remapper_.set_configuration(event_loop_factory_->configuration());
+
+ const TimestampQueueStrategy timestamp_queue_strategy =
+ ComputeTimestampQueueStrategy();
+
filters_ =
std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
event_loop_factory_->configuration(), logged_configuration(),
log_files_.boots(), FLAGS_skip_order_validation,
- chrono::duration_cast<chrono::nanoseconds>(
- chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
+ timestamp_queue_strategy ==
+ TimestampQueueStrategy::kQueueTimestampsAtStartup
+ ? chrono::seconds(0)
+ : chrono::duration_cast<chrono::nanoseconds>(
+ chrono::duration<double>(
+ FLAGS_time_estimation_buffer_seconds)));
std::vector<TimestampMapper *> timestamp_mappers;
for (const Node *node : configuration::GetNodes(configuration())) {
@@ -444,8 +465,10 @@
states_[node_index] = std::make_unique<State>(
!log_files_.ContainsPartsForNode(node_name)
? nullptr
- : std::make_unique<TimestampMapper>(node_name, log_files_),
- filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
+ : std::make_unique<TimestampMapper>(node_name, log_files_,
+ timestamp_queue_strategy),
+ timestamp_queue_strategy, filters_.get(),
+ std::bind(&LogReader::NoticeRealtimeEnd, this), node,
State::ThreadedBuffering::kNo, MaybeMakeReplayChannelIndices(node),
before_send_callbacks_);
State *state = states_[node_index].get();
@@ -476,6 +499,14 @@
}
}
+ // Now that every state has a peer, load all the timestamps into RAM.
+ if (timestamp_queue_strategy ==
+ TimestampQueueStrategy::kQueueTimestampsAtStartup) {
+ for (std::unique_ptr<State> &state : states_) {
+ state->ReadTimestamps();
+ }
+ }
+
// Register after making all the State objects so we can build references
// between them.
for (const Node *node : configuration::GetNodes(configuration())) {
@@ -511,7 +542,7 @@
filters_->CheckGraph();
for (std::unique_ptr<State> &state : states_) {
- state->SeedSortedMessages();
+ state->MaybeSeedSortedMessages();
}
// Forwarding is tracked per channel. If it is enabled, we want to turn it
@@ -561,7 +592,8 @@
CHECK(state);
VLOG(1) << "Start time is " << state->monotonic_start_time(0)
<< " for node '" << MaybeNodeName(state->node()) << "' now "
- << state->monotonic_now();
+ << (state->event_loop() != nullptr ? state->monotonic_now()
+ : monotonic_clock::min_time);
if (state->monotonic_start_time(0) == monotonic_clock::min_time) {
continue;
}
@@ -608,16 +640,33 @@
return nullptr;
}
+TimestampQueueStrategy LogReader::ComputeTimestampQueueStrategy() const {
+ if ((log_files_.TimestampsStoredSeparately() &&
+ FLAGS_enable_timestamp_loading) ||
+ FLAGS_force_timestamp_loading) {
+ return TimestampQueueStrategy::kQueueTimestampsAtStartup;
+ } else {
+ return TimestampQueueStrategy::kQueueTogether;
+ }
+}
+
// TODO(jkuszmaul): Make in-line modifications to
// ServerStatistics/ClientStatistics messages for ShmEventLoop-based replay to
// avoid messing up anything that depends on them having valid offsets.
void LogReader::Register(EventLoop *event_loop) {
+ const TimestampQueueStrategy timestamp_queue_strategy =
+ ComputeTimestampQueueStrategy();
+
filters_ =
std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
event_loop->configuration(), logged_configuration(),
log_files_.boots(), FLAGS_skip_order_validation,
- chrono::duration_cast<chrono::nanoseconds>(
- chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
+ timestamp_queue_strategy ==
+ TimestampQueueStrategy::kQueueTimestampsAtStartup
+ ? chrono::seconds(0)
+ : chrono::duration_cast<chrono::nanoseconds>(
+ chrono::duration<double>(
+ FLAGS_time_estimation_buffer_seconds)));
std::vector<TimestampMapper *> timestamp_mappers;
for (const Node *node : configuration::GetNodes(configuration())) {
@@ -628,8 +677,10 @@
states_[node_index] = std::make_unique<State>(
!log_files_.ContainsPartsForNode(node_name)
? nullptr
- : std::make_unique<TimestampMapper>(node_name, log_files_),
- filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
+ : std::make_unique<TimestampMapper>(node_name, log_files_,
+ timestamp_queue_strategy),
+ timestamp_queue_strategy, filters_.get(),
+ std::bind(&LogReader::NoticeRealtimeEnd, this), node,
State::ThreadedBuffering::kYes, MaybeMakeReplayChannelIndices(node),
before_send_callbacks_);
State *state = states_[node_index].get();
@@ -653,6 +704,16 @@
}
}
}
+
+ // Now that all the peers are added, we can buffer up all the timestamps if
+ // needed.
+ if (timestamp_queue_strategy ==
+ TimestampQueueStrategy::kQueueTimestampsAtStartup) {
+ for (std::unique_ptr<State> &state : states_) {
+ state->ReadTimestamps();
+ }
+ }
+
for (const Node *node : configuration::GetNodes(configuration())) {
if (node == nullptr || node->name()->string_view() ==
event_loop->node()->name()->string_view()) {
@@ -1048,7 +1109,7 @@
<< state->monotonic_now();
}));
- state->SeedSortedMessages();
+ state->MaybeSeedSortedMessages();
if (state->SingleThreadedOldestMessageTime() != BootTimestamp::max_time()) {
state->set_startup_timer(
@@ -1232,6 +1293,7 @@
LogReader::State::State(
std::unique_ptr<TimestampMapper> timestamp_mapper,
+ TimestampQueueStrategy timestamp_queue_strategy,
message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
std::function<void()> notice_realtime_end, const Node *node,
LogReader::State::ThreadedBuffering threading,
@@ -1239,6 +1301,7 @@
const std::vector<std::function<void(void *message)>>
&before_send_callbacks)
: timestamp_mapper_(std::move(timestamp_mapper)),
+ timestamp_queue_strategy_(timestamp_queue_strategy),
notice_realtime_end_(notice_realtime_end),
node_(node),
multinode_filters_(multinode_filters),
@@ -1703,6 +1766,10 @@
message.value().monotonic_event_time +
std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<double>(FLAGS_threaded_look_ahead_seconds)));
+ VLOG(1) << "Popped " << message.value()
+ << configuration::CleanedChannelToString(
+ event_loop_->configuration()->channels()->Get(
+ factory_channel_index_[message->channel_index]));
return message.value();
} else { // single threaded
CHECK(timestamp_mapper_ != nullptr);
@@ -1714,7 +1781,7 @@
VLOG(2) << "Node '" << MaybeNodeName(event_loop_->node())
<< "': PopOldest Popping " << result.monotonic_event_time;
timestamp_mapper_->PopFront();
- SeedSortedMessages();
+ MaybeSeedSortedMessages();
CHECK_EQ(result.monotonic_event_time.boot, boot_count());
@@ -1761,8 +1828,27 @@
return result_ptr->monotonic_event_time;
}
-void LogReader::State::SeedSortedMessages() {
+void LogReader::State::ReadTimestamps() {
if (!timestamp_mapper_) return;
+ timestamp_mapper_->QueueTimestamps();
+
+ // TODO(austin): Maybe make timestamp mapper do this so we don't need to clear
+ // it out?
+ //
+ // If we don't clear the timestamp callback, we end up getting a callback both
+ // when SplitTimestampBootMerger sees the timestamp, and when TimestampMapper
+ // sees it.
+ timestamp_mapper_->set_timestamp_callback([](TimestampedMessage *) {});
+}
+
+void LogReader::State::MaybeSeedSortedMessages() {
+ if (!timestamp_mapper_) return;
+
+ // The whole purpose of seeding is to make timestamp_mapper load timestamps.
+ // So if we have already loaded them, skip seeding.
+ if (timestamp_queue_strategy_ ==
+ TimestampQueueStrategy::kQueueTimestampsAtStartup)
+ return;
timestamp_mapper_->QueueFor(chrono::duration_cast<chrono::seconds>(
chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 228341d..bb3588a 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -459,6 +459,7 @@
// details.
enum class ThreadedBuffering { kYes, kNo };
State(std::unique_ptr<TimestampMapper> timestamp_mapper,
+ TimestampQueueStrategy timestamp_queue_strategy,
message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
std::function<void()> notice_realtime_end, const Node *node,
ThreadedBuffering threading,
@@ -498,9 +499,19 @@
return node_event_loop_factory_->boot_count();
}
+ // Reads all the timestamps into RAM so we don't need to manage buffering
+ // them. For logs where the timestamps are in separate files, this
+ // minimizes RAM usage in the cases where the log reader decides to buffer
+ // to the end of the file, or where the time estimation buffer needs to be
+ // set high to sort. This means we devote our RAM to holding lots of
+ // timestamps instead of timestamps and much larger data for a shorter
+ // period. For logs where timestamps are stored with the data, this
+ // triggers those files to be read twice.
+ void ReadTimestamps();
+
// Primes the queues inside State. Should be called before calling
// OldestMessageTime.
- void SeedSortedMessages();
+ void MaybeSeedSortedMessages();
void SetUpStartupTimer() {
const monotonic_clock::time_point start_time =
@@ -741,6 +752,7 @@
void SendMessageTimings();
// Log file.
std::unique_ptr<TimestampMapper> timestamp_mapper_;
+ const TimestampQueueStrategy timestamp_queue_strategy_;
// Senders.
std::vector<std::unique_ptr<RawSender>> channels_;
@@ -891,6 +903,9 @@
message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
const Node *node_b);
+ // Returns the timestamp queueing strategy to use.
+ TimestampQueueStrategy ComputeTimestampQueueStrategy() const;
+
// List of filters for a connection. The pointer to the first node will be
// less than the second node.
std::unique_ptr<message_bridge::MultiNodeNoncausalOffsetEstimator> filters_;
diff --git a/aos/events/logging/log_reader_utils_test.cc b/aos/events/logging/log_reader_utils_test.cc
index 047c7b9..9cd64f2 100644
--- a/aos/events/logging/log_reader_utils_test.cc
+++ b/aos/events/logging/log_reader_utils_test.cc
@@ -18,7 +18,8 @@
::testing::Combine(::testing::Values(ConfigParams{
"multinode_pingpong_combined_config.json", true,
kCombinedConfigSha1(), kCombinedConfigSha1(),
- FileStrategy::kCombine}),
+ FileStrategy::kCombine,
+ ForceTimestampBuffering::kForceBufferTimestamps}),
::testing::ValuesIn(SupportedCompressionAlgorithms())));
// This test is to check if we are able to get the right channels from a log
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 2157d98..41dc20e 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -2210,6 +2210,12 @@
return log_file.config.get();
}
+// Provides unified access to config field stored in LogPartsAccess. It is used
+// in CheckMatchingConfigs.
+inline const Configuration *GetConfig(const LogPartsAccess &log_parts_access) {
+ return log_parts_access.config().get();
+}
+
// Output of LogPartsAccess for debug purposes.
std::ostream &operator<<(std::ostream &stream,
const LogPartsAccess &log_parts_access) {
@@ -2223,11 +2229,18 @@
: node_name_(node_name),
boot_index_(boot_index),
log_parts_(std::move(log_parts)) {
- CHECK_GT(log_parts_.size(), 0u) << ": Nothing was selected for node "
- << node_name_ << " boot " << boot_index_;
+ if (log_parts_.empty()) {
+ VLOG(1) << "Nothing was selected for node " << node_name_ << " boot "
+ << boot_index_;
+ return;
+ }
CHECK(CheckMatchingConfigs(log_parts_));
config_ = log_parts_.front().config();
+ for (LogPartsAccess &part : log_parts_) {
+ CHECK_EQ(config_.get(), part.config().get());
+ }
+
// Enforce that we are sorting things only from a single node from a single
// boot.
const std::string_view part0_source_boot_uuid =
@@ -2245,13 +2258,14 @@
: log_source_(log_source), log_files_(std::move(log_files)) {
CHECK_GT(log_files_.size(), 0u);
CHECK(CheckMatchingConfigs(log_files_));
- config_ = log_files_.front().config.get();
+ config_ = log_files_.front().config;
boots_ = log_files_.front().boots;
std::unordered_set<std::string> logger_nodes;
// Scan and collect all related nodes and number of reboots per node.
for (const LogFile &log_file : log_files_) {
+ CHECK_EQ(config_.get(), log_file.config.get());
for (const LogParts &part : log_file.parts) {
auto node_item = nodes_boots_.find(part.node);
if (node_item != nodes_boots_.end()) {
@@ -2276,12 +2290,28 @@
return node_item->second;
}
-SelectedLogParts LogFilesContainer::SelectParts(std::string_view node_name,
- size_t boot_index) const {
+SelectedLogParts LogFilesContainer::SelectParts(
+ std::string_view node_name, size_t boot_index,
+ const std::vector<StoredDataType> &types) const {
std::vector<LogPartsAccess> result;
for (const LogFile &log_file : log_files_) {
for (const LogParts &part : log_file.parts) {
if (part.node == node_name && part.boot_count == boot_index) {
+ bool found = false;
+ for (const StoredDataType type : types) {
+ for (const StoredDataType part_type : part.data_stored) {
+ if (type == part_type) {
+ found = true;
+ break;
+ }
+ }
+ if (found) {
+ break;
+ }
+ }
+ if (!found) {
+ continue;
+ }
result.emplace_back(log_source_, part);
}
}
@@ -2317,5 +2347,26 @@
return true;
}
+bool LogFilesContainer::HasTimestamps(std::string_view node_name) const {
+ for (const LogFile &log_file : log_files_) {
+ for (const LogParts &part : log_file.parts) {
+ if (part.node != node_name) {
+ continue;
+ }
+
+ for (StoredDataType type : part.data_stored) {
+ switch (type) {
+ case StoredDataType::DATA:
+ break;
+ case StoredDataType::TIMESTAMPS:
+ case StoredDataType::REMOTE_TIMESTAMPS:
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+}
+
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 840c3d6..2275673 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -180,7 +180,9 @@
size_t boot_count() const { return log_parts_.boot_count; }
- const Configuration *config() const { return log_parts_.config.get(); }
+ std::shared_ptr<const Configuration> config() const {
+ return log_parts_.config;
+ }
std::optional<const LogSource *> log_source() const { return log_source_; }
@@ -203,12 +205,6 @@
LogParts log_parts_;
};
-// Provides unified access to config field stored in LogPartsAccess. It is used
-// in CheckMatchingConfigs.
-inline const Configuration *GetConfig(const LogPartsAccess &log_parts_access) {
- return log_parts_access.config();
-}
-
// Output of LogPartsAccess for debug purposes.
std::ostream &operator<<(std::ostream &stream,
const LogPartsAccess &log_parts_access);
@@ -223,20 +219,22 @@
auto begin() const { return log_parts_.begin(); }
auto end() const { return log_parts_.end(); }
+ bool empty() const { return log_parts_.empty(); }
+
// Config that shared across all log parts.
- const Configuration *config() const { return config_; }
+ std::shared_ptr<const Configuration> config() const { return config_; }
const std::string &node_name() const { return node_name_; }
- // Number of boots found in the log parts.
- size_t boot_count() const { return boot_index_; }
+ // Returns the boot index found in the log parts.
+ size_t boot_index() const { return boot_index_; }
private:
std::string node_name_;
- size_t boot_index_;
+ size_t boot_index_ = 0u;
std::vector<LogPartsAccess> log_parts_;
- const Configuration *config_;
+ std::shared_ptr<const Configuration> config_ = nullptr;
};
// Container that keeps a sorted list of log files and provides functions that
@@ -262,15 +260,16 @@
// Returns numbers of reboots found in log files associated with the node.
size_t BootsForNode(std::string_view node_name) const;
- // Get only log parts that associated with node and boot number.
- SelectedLogParts SelectParts(std::string_view node_name,
- size_t boot_index) const;
+ // Get only log parts that associated with node, boot number, and with data of
+ // any of the types provided.
+ SelectedLogParts SelectParts(std::string_view node_name, size_t boot_index,
+ const std::vector<StoredDataType> &types) const;
// It provides access to boots logged by all log files in the container.
const std::shared_ptr<const Boots> &boots() const { return boots_; }
// Access the configuration shared with all log files in the container.
- const Configuration *config() const { return config_; }
+ std::shared_ptr<const Configuration> config() const { return config_; }
// List of logger nodes for given set of log files.
const auto &logger_nodes() const { return logger_nodes_; }
@@ -283,6 +282,9 @@
// distinct from the data.
bool TimestampsStoredSeparately() const;
+ // Returns true if we have timestamps in any of the files.
+ bool HasTimestamps(std::string_view node_name) const;
+
private:
LogFilesContainer(std::optional<const LogSource *> log_source,
std::vector<LogFile> log_files);
@@ -290,7 +292,7 @@
std::optional<const LogSource *> log_source_;
std::vector<LogFile> log_files_;
- const Configuration *config_;
+ std::shared_ptr<const Configuration> config_;
std::shared_ptr<const Boots> boots_;
// Keeps information about nodes and number of reboots per node.
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 133bb79..02cff6d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1608,8 +1608,9 @@
}
void PartsMessageReader::ComputeBootCounts() {
- boot_counts_.assign(configuration::NodesCount(log_parts_access_.config()),
- std::nullopt);
+ boot_counts_.assign(
+ configuration::NodesCount(log_parts_access_.config().get()),
+ std::nullopt);
const auto boots = log_parts_access_.parts().boots;
@@ -1636,7 +1637,7 @@
// Older multi-node logs which are guarenteed to have UUIDs logged, or
// single node log files with boot UUIDs in the header. We only know how to
// order certain boots in certain circumstances.
- if (configuration::MultiNode(log_parts_access_.config()) || boots) {
+ if (configuration::MultiNode(log_parts_access_.config().get()) || boots) {
for (size_t node_index = 0; node_index < boot_counts_.size();
++node_index) {
if (boots->boots[node_index].size() == 1u) {
@@ -1704,11 +1705,9 @@
}
bool Message::operator<(const Message &m2) const {
- CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
-
- if (this->timestamp.time < m2.timestamp.time) {
+ if (this->timestamp < m2.timestamp) {
return true;
- } else if (this->timestamp.time > m2.timestamp.time) {
+ } else if (this->timestamp > m2.timestamp) {
return false;
}
@@ -1723,66 +1722,69 @@
bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
bool Message::operator==(const Message &m2) const {
- CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
-
- return timestamp.time == m2.timestamp.time &&
- channel_index == m2.channel_index && queue_index == m2.queue_index;
+ return timestamp == m2.timestamp && channel_index == m2.channel_index &&
+ queue_index == m2.queue_index;
}
-std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
- os << "{.channel_index=" << m.channel_index
- << ", .monotonic_sent_time=" << m.monotonic_sent_time
- << ", .realtime_sent_time=" << m.realtime_sent_time
- << ", .queue_index=" << m.queue_index;
- if (m.monotonic_remote_time) {
- os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
+bool Message::operator<=(const Message &m2) const {
+ return *this == m2 || *this < m2;
+}
+
+std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &msg) {
+ os << "{.channel_index=" << msg.channel_index
+ << ", .monotonic_sent_time=" << msg.monotonic_sent_time
+ << ", .realtime_sent_time=" << msg.realtime_sent_time
+ << ", .queue_index=" << msg.queue_index;
+ if (msg.monotonic_remote_time) {
+ os << ", .monotonic_remote_time=" << *msg.monotonic_remote_time;
}
os << ", .realtime_remote_time=";
- PrintOptionalOrNull(&os, m.realtime_remote_time);
+ PrintOptionalOrNull(&os, msg.realtime_remote_time);
os << ", .remote_queue_index=";
- PrintOptionalOrNull(&os, m.remote_queue_index);
- if (m.has_monotonic_timestamp_time) {
- os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
+ PrintOptionalOrNull(&os, msg.remote_queue_index);
+ if (msg.has_monotonic_timestamp_time) {
+ os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time;
}
os << "}";
return os;
}
-std::ostream &operator<<(std::ostream &os, const Message &m) {
- os << "{.channel_index=" << m.channel_index
- << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
- if (m.data != nullptr) {
- if (m.data->remote_queue_index.has_value()) {
- os << ", .remote_queue_index=" << *m.data->remote_queue_index;
+std::ostream &operator<<(std::ostream &os, const Message &msg) {
+ os << "{.channel_index=" << msg.channel_index
+ << ", .queue_index=" << msg.queue_index
+ << ", .timestamp=" << msg.timestamp;
+ if (msg.data != nullptr) {
+ if (msg.data->remote_queue_index.has_value()) {
+ os << ", .remote_queue_index=" << *msg.data->remote_queue_index;
}
- if (m.data->monotonic_remote_time.has_value()) {
- os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
+ if (msg.data->monotonic_remote_time.has_value()) {
+ os << ", .monotonic_remote_time=" << *msg.data->monotonic_remote_time;
}
- os << ", .data=" << m.data;
+ os << ", .data=" << msg.data;
}
os << "}";
return os;
}
-std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
- os << "{.channel_index=" << m.channel_index
- << ", .queue_index=" << m.queue_index
- << ", .monotonic_event_time=" << m.monotonic_event_time
- << ", .realtime_event_time=" << m.realtime_event_time;
- if (m.remote_queue_index != BootQueueIndex::Invalid()) {
- os << ", .remote_queue_index=" << m.remote_queue_index;
+std::ostream &operator<<(std::ostream &os, const TimestampedMessage &msg) {
+ os << "{.channel_index=" << msg.channel_index
+ << ", .queue_index=" << msg.queue_index
+ << ", .monotonic_event_time=" << msg.monotonic_event_time
+ << ", .realtime_event_time=" << msg.realtime_event_time;
+ if (msg.remote_queue_index != BootQueueIndex::Invalid()) {
+ os << ", .remote_queue_index=" << msg.remote_queue_index;
}
- if (m.monotonic_remote_time != BootTimestamp::min_time()) {
- os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
+ if (msg.monotonic_remote_time != BootTimestamp::min_time()) {
+ os << ", .monotonic_remote_time=" << msg.monotonic_remote_time;
}
- if (m.realtime_remote_time != realtime_clock::min_time) {
- os << ", .realtime_remote_time=" << m.realtime_remote_time;
+ if (msg.realtime_remote_time != realtime_clock::min_time) {
+ os << ", .realtime_remote_time=" << msg.realtime_remote_time;
}
- if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
- os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
+ if (msg.monotonic_timestamp_time != BootTimestamp::min_time()) {
+ os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time;
}
- if (m.data != nullptr) {
- os << ", .data=" << *m.data;
+ if (msg.data != nullptr) {
+ os << ", .data=" << *msg.data;
} else {
os << ", .data=nullptr";
}
@@ -1807,40 +1809,41 @@
break;
}
- std::shared_ptr<UnpackedMessageHeader> m =
+ std::shared_ptr<UnpackedMessageHeader> msg =
parts_message_reader_.ReadMessage();
// No data left, sorted forever, work through what is left.
- if (!m) {
+ if (!msg) {
sorted_until_ = monotonic_clock::max_time;
break;
}
size_t monotonic_timestamp_boot = 0;
- if (m->has_monotonic_timestamp_time) {
+ if (msg->has_monotonic_timestamp_time) {
monotonic_timestamp_boot = parts().logger_boot_count;
}
size_t monotonic_remote_boot = 0xffffff;
- if (m->monotonic_remote_time.has_value()) {
- const Node *node =
- parts().config->nodes()->Get(source_node_index_[m->channel_index]);
+ if (msg->monotonic_remote_time.has_value()) {
+ const Node *node = parts().config->nodes()->Get(
+ source_node_index_[msg->channel_index]);
std::optional<size_t> boot = parts_message_reader_.boot_count(
- source_node_index_[m->channel_index]);
+ source_node_index_[msg->channel_index]);
CHECK(boot) << ": Failed to find boot for node '" << MaybeNodeName(node)
- << "', with index " << source_node_index_[m->channel_index];
+ << "', with index "
+ << source_node_index_[msg->channel_index];
monotonic_remote_boot = *boot;
}
messages_.insert(
- Message{.channel_index = m->channel_index,
+ Message{.channel_index = msg->channel_index,
.queue_index = BootQueueIndex{.boot = parts().boot_count,
- .index = m->queue_index},
+ .index = msg->queue_index},
.timestamp = BootTimestamp{.boot = parts().boot_count,
- .time = m->monotonic_sent_time},
+ .time = msg->monotonic_sent_time},
.monotonic_remote_boot = monotonic_remote_boot,
.monotonic_timestamp_boot = monotonic_timestamp_boot,
- .data = std::move(m)});
+ .data = std::move(msg)});
// Now, update sorted_until_ to match the new message.
if (parts_message_reader_.newest_timestamp() >
@@ -1864,6 +1867,8 @@
CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
<< DebugString() << " reading " << parts_message_reader_.filename();
last_message_time_ = messages_.begin()->timestamp.time;
+ VLOG(1) << this << " Front, sorted until " << sorted_until_ << " for "
+ << (*messages_.begin()) << " on " << parts_message_reader_.filename();
return &(*messages_.begin());
}
@@ -1874,9 +1879,9 @@
ss << "messages: [\n";
int count = 0;
bool no_dots = true;
- for (const Message &m : messages_) {
+ for (const Message &msg : messages_) {
if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
- ss << m << "\n";
+ ss << msg << "\n";
} else if (no_dots) {
ss << "...\n";
no_dots = false;
@@ -1887,31 +1892,25 @@
return ss.str();
}
-PartsMerger::PartsMerger(std::string_view node_name, size_t boot_count,
- const LogFilesContainer &log_files) {
- const auto parts = log_files.SelectParts(node_name, boot_count);
- node_ = configuration::GetNodeIndex(parts.config(), node_name);
-
- for (LogPartsAccess part : parts) {
- message_sorters_.emplace_back(std::move(part));
- }
-
- monotonic_start_time_ = monotonic_clock::max_time;
- realtime_start_time_ = realtime_clock::min_time;
- for (const MessageSorter &message_sorter : message_sorters_) {
+// Class to merge start times cleanly, reusably, and incrementally.
+class StartTimes {
+ public:
+ void Update(monotonic_clock::time_point new_monotonic_start_time,
+ realtime_clock::time_point new_realtime_start_time) {
// We want to capture the earliest meaningful start time here. The start
// time defaults to min_time when there's no meaningful value to report, so
// let's ignore those.
- if (message_sorter.monotonic_start_time() != monotonic_clock::min_time) {
+ if (new_monotonic_start_time != monotonic_clock::min_time) {
bool accept = false;
// We want to prioritize start times from the logger node. Really, we
// want to prioritize start times with a valid realtime_clock time. So,
// if we have a start time without a RT clock, prefer a start time with a
// RT clock, even it if is later.
- if (message_sorter.realtime_start_time() != realtime_clock::min_time) {
+ if (new_realtime_start_time != realtime_clock::min_time) {
// We've got a good one. See if the current start time has a good RT
// clock, or if we should use this one instead.
- if (message_sorter.monotonic_start_time() < monotonic_start_time_) {
+ if (new_monotonic_start_time < monotonic_start_time_ ||
+ monotonic_start_time_ == monotonic_clock::min_time) {
accept = true;
} else if (realtime_start_time_ == realtime_clock::min_time) {
// The previous start time doesn't have a good RT time, so it is very
@@ -1921,23 +1920,45 @@
}
} else if (realtime_start_time_ == realtime_clock::min_time) {
// We don't have a RT time, so take the oldest.
- if (message_sorter.monotonic_start_time() < monotonic_start_time_) {
+ if (new_monotonic_start_time < monotonic_start_time_ ||
+ monotonic_start_time_ == monotonic_clock::min_time) {
accept = true;
}
}
if (accept) {
- monotonic_start_time_ = message_sorter.monotonic_start_time();
- realtime_start_time_ = message_sorter.realtime_start_time();
+ monotonic_start_time_ = new_monotonic_start_time;
+ realtime_start_time_ = new_realtime_start_time;
}
}
}
- // If there was no meaningful start time reported, just use min_time.
- if (monotonic_start_time_ == monotonic_clock::max_time) {
- monotonic_start_time_ = monotonic_clock::min_time;
- realtime_start_time_ = realtime_clock::min_time;
+ monotonic_clock::time_point monotonic_start_time() const {
+ return monotonic_start_time_;
}
+ realtime_clock::time_point realtime_start_time() const {
+ return realtime_start_time_;
+ }
+
+ private:
+ monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::min_time;
+ realtime_clock::time_point realtime_start_time_ = realtime_clock::min_time;
+};
+
+PartsMerger::PartsMerger(SelectedLogParts &&parts) {
+ node_ = configuration::GetNodeIndex(parts.config().get(), parts.node_name());
+
+ for (LogPartsAccess part : parts) {
+ message_sorters_.emplace_back(std::move(part));
+ }
+
+ StartTimes start_times;
+ for (const MessageSorter &message_sorter : message_sorters_) {
+ start_times.Update(message_sorter.monotonic_start_time(),
+ message_sorter.realtime_start_time());
+ }
+ monotonic_start_time_ = start_times.monotonic_start_time();
+ realtime_start_time_ = start_times.realtime_start_time();
}
std::vector<const LogParts *> PartsMerger::Parts() const {
@@ -1954,6 +1975,8 @@
if (current_ != nullptr) {
Message *result = current_->Front();
CHECK_GE(result->timestamp.time, last_message_time_);
+ VLOG(1) << this << " PartsMerger::Front for node " << node_name() << " "
+ << *result;
return result;
}
@@ -1962,25 +1985,25 @@
Message *oldest = nullptr;
sorted_until_ = monotonic_clock::max_time;
for (MessageSorter &message_sorter : message_sorters_) {
- Message *m = message_sorter.Front();
- if (!m) {
+ Message *msg = message_sorter.Front();
+ if (!msg) {
sorted_until_ = std::min(sorted_until_, message_sorter.sorted_until());
continue;
}
- if (oldest == nullptr || *m < *oldest) {
- oldest = m;
+ if (oldest == nullptr || *msg < *oldest) {
+ oldest = msg;
current_ = &message_sorter;
- } else if (*m == *oldest) {
+ } else if (*msg == *oldest) {
// Found a duplicate. If there is a choice, we want the one which has
// the timestamp time.
- if (!m->data->has_monotonic_timestamp_time) {
+ if (!msg->data->has_monotonic_timestamp_time) {
message_sorter.PopFront();
} else if (!oldest->data->has_monotonic_timestamp_time) {
current_->PopFront();
current_ = &message_sorter;
- oldest = m;
+ oldest = msg;
} else {
- CHECK_EQ(m->data->monotonic_timestamp_time,
+ CHECK_EQ(msg->data->monotonic_timestamp_time,
oldest->data->monotonic_timestamp_time);
message_sorter.PopFront();
}
@@ -1993,6 +2016,11 @@
if (oldest) {
CHECK_GE(oldest->timestamp.time, last_message_time_);
last_message_time_ = oldest->timestamp.time;
+ if (monotonic_oldest_time_ > oldest->timestamp.time) {
+ VLOG(1) << this << " Updating oldest to " << oldest->timestamp.time
+ << " for node " << node_name() << " with a start time of "
+ << monotonic_start_time_ << " " << *oldest;
+ }
monotonic_oldest_time_ =
std::min(monotonic_oldest_time_, oldest->timestamp.time);
} else {
@@ -2001,6 +2029,12 @@
// Return the oldest message found. This will be nullptr if nothing was
// found, indicating there is nothing left.
+ if (oldest) {
+ VLOG(1) << this << " PartsMerger::Front for node " << node_name() << " "
+ << *oldest;
+ } else {
+ VLOG(1) << this << " PartsMerger::Front for node " << node_name();
+ }
return oldest;
}
@@ -2011,29 +2045,52 @@
}
BootMerger::BootMerger(std::string_view node_name,
- const LogFilesContainer &log_files) {
+ const LogFilesContainer &log_files,
+ const std::vector<StoredDataType> &types)
+ : configuration_(log_files.config()),
+ node_(configuration::GetNodeIndex(configuration_.get(), node_name)) {
size_t number_of_boots = log_files.BootsForNode(node_name);
parts_mergers_.reserve(number_of_boots);
for (size_t i = 0; i < number_of_boots; ++i) {
VLOG(2) << "Boot " << i;
- parts_mergers_.emplace_back(
- std::make_unique<PartsMerger>(node_name, i, log_files));
+ SelectedLogParts selected_parts =
+ log_files.SelectParts(node_name, i, types);
+ // We are guarenteed to have something each boot, but not guarenteed to have
+ // both timestamps and data for each boot. If we don't have anything, don't
+ // create a parts merger. The rest of this class will detect that and
+ // ignore it as required.
+ if (selected_parts.empty()) {
+ parts_mergers_.emplace_back(nullptr);
+ } else {
+ parts_mergers_.emplace_back(
+ std::make_unique<PartsMerger>(std::move(selected_parts)));
+ }
}
}
-Message *BootMerger::Front() {
- Message *result = parts_mergers_[index_]->Front();
+std::string_view BootMerger::node_name() const {
+ return configuration::NodeName(configuration().get(), node());
+}
- if (result != nullptr) {
- return result;
+Message *BootMerger::Front() {
+ if (parts_mergers_[index_].get() != nullptr) {
+ Message *result = parts_mergers_[index_]->Front();
+
+ if (result != nullptr) {
+ VLOG(1) << this << " BootMerger::Front " << node_name() << " " << *result;
+ return result;
+ }
}
if (index_ + 1u == parts_mergers_.size()) {
// At the end of the last node merger, just return.
+ VLOG(1) << this << " BootMerger::Front " << node_name() << " nullptr";
return nullptr;
} else {
++index_;
- return Front();
+ Message *result = Front();
+ VLOG(1) << this << " BootMerger::Front " << node_name() << " " << *result;
+ return result;
}
}
@@ -2042,6 +2099,8 @@
std::vector<const LogParts *> BootMerger::Parts() const {
std::vector<const LogParts *> results;
for (const std::unique_ptr<PartsMerger> &parts_merger : parts_mergers_) {
+ if (!parts_merger) continue;
+
std::vector<const LogParts *> node_parts = parts_merger->Parts();
results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
@@ -2051,17 +2110,241 @@
return results;
}
-TimestampMapper::TimestampMapper(std::string_view node_name,
- const LogFilesContainer &log_files)
- : boot_merger_(node_name, log_files),
- timestamp_callback_([](TimestampedMessage *) {}) {
- for (const LogParts *part : boot_merger_.Parts()) {
- if (!configuration_) {
- configuration_ = part->config;
- } else {
- CHECK_EQ(configuration_.get(), part->config.get());
- }
+monotonic_clock::time_point BootMerger::monotonic_start_time(
+ size_t boot) const {
+ CHECK_LT(boot, parts_mergers_.size());
+ if (parts_mergers_[boot]) {
+ return parts_mergers_[boot]->monotonic_start_time();
}
+ return monotonic_clock::min_time;
+}
+
+realtime_clock::time_point BootMerger::realtime_start_time(size_t boot) const {
+ CHECK_LT(boot, parts_mergers_.size());
+ if (parts_mergers_[boot]) {
+ return parts_mergers_[boot]->realtime_start_time();
+ }
+ return realtime_clock::min_time;
+}
+
+monotonic_clock::time_point BootMerger::monotonic_oldest_time(
+ size_t boot) const {
+ CHECK_LT(boot, parts_mergers_.size());
+ if (parts_mergers_[boot]) {
+ return parts_mergers_[boot]->monotonic_oldest_time();
+ }
+ return monotonic_clock::max_time;
+}
+
+bool BootMerger::started() const {
+ if (index_ == 0) {
+ if (!parts_mergers_[0]) {
+ return false;
+ }
+ return parts_mergers_[index_]->sorted_until() != monotonic_clock::min_time;
+ }
+ return true;
+}
+
+SplitTimestampBootMerger::SplitTimestampBootMerger(
+ std::string_view node_name, const LogFilesContainer &log_files,
+ TimestampQueueStrategy timestamp_queue_strategy)
+ : boot_merger_(node_name, log_files,
+ (timestamp_queue_strategy ==
+ TimestampQueueStrategy::kQueueTimestampsAtStartup)
+ ? std::vector<StoredDataType>{StoredDataType::DATA}
+ : std::vector<StoredDataType>{
+ StoredDataType::DATA, StoredDataType::TIMESTAMPS,
+ StoredDataType::REMOTE_TIMESTAMPS}) {
+ // Make the timestamp_boot_merger_ only if we are asked to, and if there are
+ // files to put in it. We don't need it for a data only log.
+ if (timestamp_queue_strategy ==
+ TimestampQueueStrategy::kQueueTimestampsAtStartup &&
+ log_files.HasTimestamps(node_name)) {
+ timestamp_boot_merger_ = std::make_unique<BootMerger>(
+ node_name, log_files,
+ std::vector<StoredDataType>{StoredDataType::TIMESTAMPS,
+ StoredDataType::REMOTE_TIMESTAMPS});
+ }
+
+ size_t number_of_boots = log_files.BootsForNode(node_name);
+ monotonic_start_time_.reserve(number_of_boots);
+ realtime_start_time_.reserve(number_of_boots);
+
+ // Start times are split across the timestamp boot merger, and data boot
+ // merger. Pull from both and combine them to get the same answer as before.
+ for (size_t i = 0u; i < number_of_boots; ++i) {
+ StartTimes start_times;
+
+ if (timestamp_boot_merger_) {
+ start_times.Update(timestamp_boot_merger_->monotonic_start_time(i),
+ timestamp_boot_merger_->realtime_start_time(i));
+ }
+
+ start_times.Update(boot_merger_.monotonic_start_time(i),
+ boot_merger_.realtime_start_time(i));
+
+ monotonic_start_time_.push_back(start_times.monotonic_start_time());
+ realtime_start_time_.push_back(start_times.realtime_start_time());
+ }
+}
+
+void SplitTimestampBootMerger::QueueTimestamps(
+ std::function<void(TimestampedMessage *)> fn,
+ const std::vector<size_t> &source_node) {
+ if (!timestamp_boot_merger_) {
+ return;
+ }
+
+ while (true) {
+ // Load all the timestamps. If we find data, ignore it and drop it on the
+ // floor. It will be read when boot_merger_ is used.
+ Message *msg = timestamp_boot_merger_->Front();
+ if (!msg) {
+ queue_timestamps_ran_ = true;
+ return;
+ }
+ if (source_node[msg->channel_index] != static_cast<size_t>(node())) {
+ timestamp_messages_.emplace_back(TimestampedMessage{
+ .channel_index = msg->channel_index,
+ .queue_index = msg->queue_index,
+ .monotonic_event_time = msg->timestamp,
+ .realtime_event_time = msg->data->realtime_sent_time,
+ .remote_queue_index =
+ BootQueueIndex{.boot = msg->monotonic_remote_boot,
+ .index = msg->data->remote_queue_index.value()},
+ .monotonic_remote_time = {msg->monotonic_remote_boot,
+ msg->data->monotonic_remote_time.value()},
+ .realtime_remote_time = msg->data->realtime_remote_time.value(),
+ .monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
+ msg->data->monotonic_timestamp_time},
+ .data = std::move(msg->data)});
+
+ VLOG(2) << this << " Queued timestamp of " << timestamp_messages_.back();
+ fn(×tamp_messages_.back());
+ } else {
+ VLOG(2) << this << " Dropped data";
+ }
+ timestamp_boot_merger_->PopFront();
+ }
+
+ // TODO(austin): Push the queue into TimestampMapper instead. Have it pull
+ // all the timestamps. That will also make it so we don't have to clear the
+ // function.
+}
+
+std::string_view SplitTimestampBootMerger::node_name() const {
+ return configuration::NodeName(configuration().get(), node());
+}
+
+monotonic_clock::time_point SplitTimestampBootMerger::monotonic_start_time(
+ size_t boot) const {
+ CHECK_LT(boot, monotonic_start_time_.size());
+ return monotonic_start_time_[boot];
+}
+
+realtime_clock::time_point SplitTimestampBootMerger::realtime_start_time(
+ size_t boot) const {
+ CHECK_LT(boot, realtime_start_time_.size());
+ return realtime_start_time_[boot];
+}
+
+monotonic_clock::time_point SplitTimestampBootMerger::monotonic_oldest_time(
+ size_t boot) const {
+ if (!timestamp_boot_merger_) {
+ return boot_merger_.monotonic_oldest_time(boot);
+ }
+ return std::min(boot_merger_.monotonic_oldest_time(boot),
+ timestamp_boot_merger_->monotonic_oldest_time(boot));
+}
+
+Message *SplitTimestampBootMerger::Front() {
+ Message *boot_merger_front = boot_merger_.Front();
+
+ if (timestamp_boot_merger_) {
+ CHECK(queue_timestamps_ran_);
+ }
+
+ // timestamp_messages_ is a queue of TimestampedMessage, but we are supposed
+ // to return a Message. We need to convert the first message in the list
+ // before returning it (and comparing, honestly). Fill next_timestamp_ in if
+ // it is empty so the rest of the logic here can just look at next_timestamp_
+ // and use that instead.
+ if (!next_timestamp_ && !timestamp_messages_.empty()) {
+ auto &front = timestamp_messages_.front();
+ next_timestamp_ = Message{
+ .channel_index = front.channel_index,
+ .queue_index = front.queue_index,
+ .timestamp = front.monotonic_event_time,
+ .monotonic_remote_boot = front.remote_queue_index.boot,
+ .monotonic_timestamp_boot = front.monotonic_timestamp_time.boot,
+ .data = std::move(front.data),
+ };
+ timestamp_messages_.pop_front();
+ }
+
+ if (!next_timestamp_) {
+ message_source_ = MessageSource::kBootMerger;
+ if (boot_merger_front != nullptr) {
+ VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
+ << " " << *boot_merger_front;
+ } else {
+ VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
+ << " nullptr";
+ }
+ return boot_merger_front;
+ }
+
+ if (boot_merger_front == nullptr) {
+ message_source_ = MessageSource::kTimestampMessage;
+
+ VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
+ << next_timestamp_.value();
+ return &next_timestamp_.value();
+ }
+
+ if (*boot_merger_front <= next_timestamp_.value()) {
+ if (*boot_merger_front == next_timestamp_.value()) {
+ VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
+ << " Dropping duplicate timestamp.";
+ next_timestamp_.reset();
+ }
+ message_source_ = MessageSource::kBootMerger;
+ if (boot_merger_front != nullptr) {
+ VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
+ << " " << *boot_merger_front;
+ } else {
+ VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
+ << " nullptr";
+ }
+ return boot_merger_front;
+ } else {
+ message_source_ = MessageSource::kTimestampMessage;
+ VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
+ << next_timestamp_.value();
+ return &next_timestamp_.value();
+ }
+}
+
+void SplitTimestampBootMerger::PopFront() {
+ switch (message_source_) {
+ case MessageSource::kTimestampMessage:
+ CHECK(next_timestamp_.has_value());
+ next_timestamp_.reset();
+ break;
+ case MessageSource::kBootMerger:
+ boot_merger_.PopFront();
+ break;
+ }
+}
+
+TimestampMapper::TimestampMapper(
+ std::string_view node_name, const LogFilesContainer &log_files,
+ TimestampQueueStrategy timestamp_queue_strategy)
+ : boot_merger_(node_name, log_files, timestamp_queue_strategy),
+ timestamp_callback_([](TimestampedMessage *) {}) {
+ configuration_ = boot_merger_.configuration();
+
const Configuration *config = configuration_.get();
// Only fill out nodes_data_ if there are nodes. Otherwise, everything is
// pretty simple.
@@ -2120,17 +2403,18 @@
}
}
-void TimestampMapper::QueueMessage(Message *m) {
+void TimestampMapper::QueueMessage(Message *msg) {
matched_messages_.emplace_back(
- TimestampedMessage{.channel_index = m->channel_index,
- .queue_index = m->queue_index,
- .monotonic_event_time = m->timestamp,
- .realtime_event_time = m->data->realtime_sent_time,
+ TimestampedMessage{.channel_index = msg->channel_index,
+ .queue_index = msg->queue_index,
+ .monotonic_event_time = msg->timestamp,
+ .realtime_event_time = msg->data->realtime_sent_time,
.remote_queue_index = BootQueueIndex::Invalid(),
.monotonic_remote_time = BootTimestamp::min_time(),
.realtime_remote_time = realtime_clock::min_time,
.monotonic_timestamp_time = BootTimestamp::min_time(),
- .data = std::move(m->data)});
+ .data = std::move(msg->data)});
+ VLOG(1) << node_name() << " Inserted " << matched_messages_.back();
}
TimestampedMessage *TimestampMapper::Front() {
@@ -2139,18 +2423,26 @@
case FirstMessage::kNeedsUpdate:
break;
case FirstMessage::kInMessage:
+ VLOG(1) << this << " TimestampMapper::Front " << node_name() << " "
+ << matched_messages_.front();
return &matched_messages_.front();
case FirstMessage::kNullptr:
+ VLOG(1) << this << " TimestampMapper::Front " << node_name()
+ << " nullptr";
return nullptr;
}
if (matched_messages_.empty()) {
if (!QueueMatched()) {
first_message_ = FirstMessage::kNullptr;
+ VLOG(1) << this << " TimestampMapper::Front " << node_name()
+ << " nullptr";
return nullptr;
}
}
first_message_ = FirstMessage::kInMessage;
+ VLOG(1) << this << " TimestampMapper::Front " << node_name() << " "
+ << matched_messages_.front();
return &matched_messages_.front();
}
@@ -2166,6 +2458,7 @@
const TimestampedMessage & /*message*/) {
if (replay_channels_callback_ &&
!replay_channels_callback_(matched_messages_.back())) {
+ VLOG(1) << node_name() << " Popped " << matched_messages_.back();
matched_messages_.pop_back();
return true;
}
@@ -2176,15 +2469,16 @@
if (nodes_data_.empty()) {
// Simple path. We are single node, so there are no timestamps to match!
CHECK_EQ(messages_.size(), 0u);
- Message *m = boot_merger_.Front();
- if (!m) {
+ Message *msg = boot_merger_.Front();
+ if (!msg) {
return MatchResult::kEndOfFile;
}
// Enqueue this message into matched_messages_ so we have a place to
// associate remote timestamps, and return it.
- QueueMessage(m);
+ QueueMessage(msg);
- CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
+ CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
+ << " on " << node_name();
last_message_time_ = matched_messages_.back().monotonic_event_time;
// We are thin wrapper around parts_merger. Call it directly.
@@ -2210,12 +2504,13 @@
boot_merger_.PopFront();
}
- Message *m = &(messages_.front());
+ Message *msg = &(messages_.front());
- if (source_node_[m->channel_index] == node()) {
+ if (source_node_[msg->channel_index] == node()) {
// From us, just forward it on, filling the remote data in as invalid.
- QueueMessage(m);
- CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
+ QueueMessage(msg);
+ CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
+ << " on " << node_name();
last_message_time_ = matched_messages_.back().monotonic_event_time;
messages_.pop_front();
timestamp_callback_(&matched_messages_.back());
@@ -2226,25 +2521,28 @@
} else {
// Got a timestamp, find the matching remote data, match it, and return
// it.
- Message data = MatchingMessageFor(*m);
+ Message data = MatchingMessageFor(*msg);
// Return the data from the remote. The local message only has timestamp
// info which isn't relevant anymore once extracted.
matched_messages_.emplace_back(TimestampedMessage{
- .channel_index = m->channel_index,
- .queue_index = m->queue_index,
- .monotonic_event_time = m->timestamp,
- .realtime_event_time = m->data->realtime_sent_time,
+ .channel_index = msg->channel_index,
+ .queue_index = msg->queue_index,
+ .monotonic_event_time = msg->timestamp,
+ .realtime_event_time = msg->data->realtime_sent_time,
.remote_queue_index =
- BootQueueIndex{.boot = m->monotonic_remote_boot,
- .index = m->data->remote_queue_index.value()},
- .monotonic_remote_time = {m->monotonic_remote_boot,
- m->data->monotonic_remote_time.value()},
- .realtime_remote_time = m->data->realtime_remote_time.value(),
- .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
- m->data->monotonic_timestamp_time},
+ BootQueueIndex{.boot = msg->monotonic_remote_boot,
+ .index = msg->data->remote_queue_index.value()},
+ .monotonic_remote_time = {msg->monotonic_remote_boot,
+ msg->data->monotonic_remote_time.value()},
+ .realtime_remote_time = msg->data->realtime_remote_time.value(),
+ .monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
+ msg->data->monotonic_timestamp_time},
.data = std::move(data.data)});
- CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
+ VLOG(1) << node_name() << " Inserted timestamp "
+ << matched_messages_.back();
+ CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
+ << " on " << node_name() << " " << matched_messages_.back();
last_message_time_ = matched_messages_.back().monotonic_event_time;
// Since messages_ holds the data, drop it.
messages_.pop_front();
@@ -2300,6 +2598,7 @@
last_popped_message_time_ = Front()->monotonic_event_time;
first_message_ = FirstMessage::kNeedsUpdate;
+ VLOG(1) << node_name() << " Popped " << matched_messages_.back();
matched_messages_.pop_front();
}
@@ -2393,9 +2692,9 @@
auto it = std::find_if(
data_queue->begin(), data_queue->end(),
[remote_queue_index,
- remote_boot = monotonic_remote_time.boot](const Message &m) {
- return m.queue_index == remote_queue_index &&
- m.timestamp.boot == remote_boot;
+ remote_boot = monotonic_remote_time.boot](const Message &msg) {
+ return msg.queue_index == remote_queue_index &&
+ msg.timestamp.boot == remote_boot;
});
if (it == data_queue->end()) {
return Message{.channel_index = message.channel_index,
@@ -2446,14 +2745,14 @@
}
bool TimestampMapper::Queue() {
- Message *m = boot_merger_.Front();
- if (m == nullptr) {
+ Message *msg = boot_merger_.Front();
+ if (msg == nullptr) {
return false;
}
for (NodeData &node_data : nodes_data_) {
if (!node_data.any_delivered) continue;
if (!node_data.save_for_peer) continue;
- if (node_data.channels[m->channel_index].delivered) {
+ if (node_data.channels[msg->channel_index].delivered) {
// If we have data but no timestamps (logs where the timestamps didn't get
// logged are classic), we can grow this indefinitely. We don't need to
// keep anything that is older than the last message returned.
@@ -2461,11 +2760,11 @@
// We have the time on the source node.
// We care to wait until we have the time on the destination node.
std::deque<Message> &messages =
- node_data.channels[m->channel_index].messages;
+ node_data.channels[msg->channel_index].messages;
// Max delay over the network is the TTL, so let's take the queue time and
// add TTL to it. Don't forget any messages which are reliable until
// someone can come up with a good reason to forget those too.
- if (node_data.channels[m->channel_index].time_to_live >
+ if (node_data.channels[msg->channel_index].time_to_live >
chrono::nanoseconds(0)) {
// We need to make *some* assumptions about network delay for this to
// work. We want to only look at the RX side. This means we need to
@@ -2479,21 +2778,25 @@
// messages getting sent twice.
while (messages.size() > 1u &&
messages.begin()->timestamp +
- node_data.channels[m->channel_index].time_to_live +
+ node_data.channels[msg->channel_index].time_to_live +
chrono::duration_cast<chrono::nanoseconds>(
chrono::duration<double>(FLAGS_max_network_delay)) <
last_popped_message_time_) {
messages.pop_front();
}
}
- node_data.channels[m->channel_index].messages.emplace_back(*m);
+ node_data.channels[msg->channel_index].messages.emplace_back(*msg);
}
}
- messages_.emplace_back(std::move(*m));
+ messages_.emplace_back(std::move(*msg));
return true;
}
+void TimestampMapper::QueueTimestamps() {
+ boot_merger_.QueueTimestamps(std::ref(timestamp_callback_), source_node_);
+}
+
std::string TimestampMapper::DebugString() const {
std::stringstream ss;
ss << "node " << node() << " (" << node_name() << ") [\n";
@@ -2512,8 +2815,8 @@
}
ss << " channel " << channel_index << " [\n";
- for (const Message &m : channel_data.messages) {
- ss << " " << m << "\n";
+ for (const Message &msg : channel_data.messages) {
+ ss << " " << msg << "\n";
}
ss << " ]\n";
++channel_index;
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index c8d2e70..fb3fcbf 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -18,6 +18,7 @@
#include "absl/types/span.h"
#include "flatbuffers/flatbuffers.h"
+#include "aos/configuration.h"
#include "aos/containers/resizeable_buffer.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/boot_timestamp.h"
@@ -528,6 +529,7 @@
std::shared_ptr<UnpackedMessageHeader> data;
bool operator<(const Message &m2) const;
+ bool operator<=(const Message &m2) const;
bool operator>=(const Message &m2) const;
bool operator==(const Message &m2) const;
};
@@ -610,8 +612,7 @@
// boot.
class PartsMerger {
public:
- PartsMerger(std::string_view node_name, size_t boot_count,
- const LogFilesContainer &log_files);
+ PartsMerger(SelectedLogParts &&selected_parts);
// Copying and moving will mess up the internal raw pointers. Just don't do
// it.
@@ -623,11 +624,15 @@
// Node index in the configuration of this node.
int node() const { return node_; }
+ std::string_view node_name() const {
+ return configuration::NodeName(configuration().get(), node());
+ }
+
// List of parts being sorted together.
std::vector<const LogParts *> Parts() const;
- const Configuration *configuration() const {
- return message_sorters_[0].parts().config.get();
+ const std::shared_ptr<const Configuration> configuration() const {
+ return message_sorters_[0].parts().config;
}
monotonic_clock::time_point monotonic_start_time() const {
@@ -636,7 +641,15 @@
realtime_clock::time_point realtime_start_time() const {
return realtime_start_time_;
}
- monotonic_clock::time_point monotonic_oldest_time() const {
+
+ // Returns the oldest message observed in this set of parts. This could be
+ // before the start time if we fetched it at the start of logging from long
+ // ago.
+ monotonic_clock::time_point monotonic_oldest_time() {
+ if (monotonic_oldest_time_ == monotonic_clock::max_time) {
+ VLOG(1) << "No oldest message time, fetching " << node_name();
+ (void)Front();
+ }
return monotonic_oldest_time_;
}
@@ -677,7 +690,8 @@
// stream.
class BootMerger {
public:
- BootMerger(std::string_view node_name, const LogFilesContainer &log_files);
+ BootMerger(std::string_view node_name, const LogFilesContainer &log_files,
+ const std::vector<StoredDataType> &types);
// Copying and moving will mess up the internal raw pointers. Just don't do
// it.
@@ -687,33 +701,21 @@
void operator=(BootMerger &&) = delete;
// Node index in the configuration of this node.
- int node() const { return parts_mergers_[0]->node(); }
+ int node() const { return node_; }
+ std::string_view node_name() const;
// List of parts being sorted together.
std::vector<const LogParts *> Parts() const;
- const Configuration *configuration() const {
- return parts_mergers_[0]->configuration();
+ const std::shared_ptr<const Configuration> configuration() const {
+ return configuration_;
}
- monotonic_clock::time_point monotonic_start_time(size_t boot) const {
- CHECK_LT(boot, parts_mergers_.size());
- return parts_mergers_[boot]->monotonic_start_time();
- }
- realtime_clock::time_point realtime_start_time(size_t boot) const {
- CHECK_LT(boot, parts_mergers_.size());
- return parts_mergers_[boot]->realtime_start_time();
- }
- monotonic_clock::time_point monotonic_oldest_time(size_t boot) const {
- CHECK_LT(boot, parts_mergers_.size());
- return parts_mergers_[boot]->monotonic_oldest_time();
- }
+ monotonic_clock::time_point monotonic_start_time(size_t boot) const;
+ realtime_clock::time_point realtime_start_time(size_t boot) const;
+ monotonic_clock::time_point monotonic_oldest_time(size_t boot) const;
- bool started() const {
- return parts_mergers_[index_]->sorted_until() !=
- monotonic_clock::min_time ||
- index_ != 0;
- }
+ bool started() const;
// Returns the next sorted message from the set of log files. It is safe to
// call std::move() on the result to move the data flatbuffer from it.
@@ -727,7 +729,101 @@
// TODO(austin): Sanjay points out this is pretty inefficient. Don't keep so
// many things open.
+ // A list of all the parts mergers. Only the boots with something to sort are
+ // instantiated.
std::vector<std::unique_ptr<PartsMerger>> parts_mergers_;
+
+ std::shared_ptr<const Configuration> configuration_;
+ int node_;
+};
+
+enum class TimestampQueueStrategy {
+ // Read the timestamps at the same time as all the other data.
+ kQueueTogether,
+ // Read the timestamps first.
+ kQueueTimestampsAtStartup,
+};
+
+// Class to manage queueing up timestamps from BootMerger and notifying
+// TimestampMapper of them.
+class SplitTimestampBootMerger {
+ public:
+ SplitTimestampBootMerger(std::string_view node_name,
+ const LogFilesContainer &log_files,
+ TimestampQueueStrategy timestamp_queue_strategy);
+
+ // Copying and moving will mess up the internal raw pointers. Just don't do
+ // it.
+ SplitTimestampBootMerger(SplitTimestampBootMerger const &) = delete;
+ SplitTimestampBootMerger(SplitTimestampBootMerger &&) = delete;
+ void operator=(SplitTimestampBootMerger const &) = delete;
+ void operator=(SplitTimestampBootMerger &&) = delete;
+
+ // Reads all timestamps into a member variable queue, and calls the function
+ // on each timestamp. This only saves timestamps, which are defined as
+ // messages sent on this node, but not originally from this node. To make
+ // that distinction, source_node is provided which has a list of which node
+ // index is the source node for each channel, where the channel index is the
+ // array index.
+ void QueueTimestamps(std::function<void(TimestampedMessage *)> fn,
+ const std::vector<size_t> &source_node);
+
+ // Node index in the configuration of this node.
+ int node() const { return boot_merger_.node(); }
+ // Returns the name of the node this class is sorting for.
+ std::string_view node_name() const;
+
+ std::shared_ptr<const Configuration> configuration() const {
+ return boot_merger_.configuration();
+ }
+
+ monotonic_clock::time_point monotonic_start_time(size_t boot) const;
+ realtime_clock::time_point realtime_start_time(size_t boot) const;
+ monotonic_clock::time_point monotonic_oldest_time(size_t boot) const;
+
+ // Returns true if the log has been started.
+ bool started() const {
+ // Timestamps don't count, so only track boot_merger_.
+ return boot_merger_.started();
+ }
+
+ // Returns the next sorted message from the set of log files. It is safe to
+ // call std::move() on the result to move the data flatbuffer from it.
+ Message *Front();
+
+ // Pops the front message. This should only be called after a call to
+ // Front().
+ void PopFront();
+
+ private:
+ enum class MessageSource {
+ kTimestampMessage,
+ kBootMerger,
+ };
+
+ MessageSource message_source_ = MessageSource::kBootMerger;
+
+ // Boot merger for data and potentially timestamps.
+ BootMerger boot_merger_;
+
+ // Boot merger for just timestamps. Any data read from here is to be ignored.
+ std::unique_ptr<BootMerger> timestamp_boot_merger_;
+
+ // The callback requires us to convert each message to a TimestampedMessage.
+ std::deque<TimestampedMessage> timestamp_messages_;
+
+ // Storage for the next timestamp message to return. This is separate so we
+ // can convert them back to a Message.
+ //
+ // TODO(austin): It would be nice to not have to convert...
+ std::optional<Message> next_timestamp_;
+
+ // Start times for each boot.
+ std::vector<monotonic_clock::time_point> monotonic_start_time_;
+ std::vector<realtime_clock::time_point> realtime_start_time_;
+
+ // Tracks if QueueTimestamps loaded any timestamps.
+ bool queue_timestamps_ran_ = false;
};
// Class to match timestamps with the corresponding data from other nodes.
@@ -737,7 +833,8 @@
class TimestampMapper {
public:
TimestampMapper(std::string_view node_name,
- const LogFilesContainer &log_files);
+ const LogFilesContainer &log_files,
+ TimestampQueueStrategy timestamp_queue_strategy);
// Copying and moving will mess up the internal raw pointers. Just don't do
// it.
@@ -784,6 +881,11 @@
// Returns debug information about this node.
std::string DebugString() const;
+ // Queues just the timestamps so that the timestamp callback gets called.
+ // Note, the timestamp callback will get called when they get returned too, so
+ // make sure to unset it if you don't want to be called twice.
+ void QueueTimestamps();
+
// Queues data the provided time.
void QueueUntil(BootTimestamp queue_time);
// Queues until we have time_estimation_buffer of data in the queue.
@@ -882,15 +984,11 @@
// Returns the name of the node this class is sorting for.
std::string_view node_name() const {
- return configuration_->has_nodes() ? configuration_->nodes()
- ->Get(boot_merger_.node())
- ->name()
- ->string_view()
- : "(single node)";
+ return configuration::NodeName(configuration(), node());
}
// The node merger to source messages from.
- BootMerger boot_merger_;
+ SplitTimestampBootMerger boot_merger_;
std::shared_ptr<const Configuration> configuration_;
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index a0abcd8..ea56d8d 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -779,7 +779,10 @@
LogFilesContainer log_files(parts);
ASSERT_EQ(parts.size(), 1u);
- PartsMerger merger("pi1", 0, log_files);
+ PartsMerger merger(
+ log_files.SelectParts("pi1", 0,
+ {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
+ StoredDataType::REMOTE_TIMESTAMPS}));
EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
@@ -881,7 +884,10 @@
LogFilesContainer log_files(parts);
ASSERT_EQ(parts.size(), 1u);
- PartsMerger merger("pi1", 0, log_files);
+ PartsMerger merger(
+ log_files.SelectParts("pi1", 0,
+ {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
+ StoredDataType::REMOTE_TIMESTAMPS}));
EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
@@ -949,11 +955,13 @@
size_t mapper0_count = 0;
- TimestampMapper mapper0("pi1", log_files);
+ TimestampMapper mapper0("pi1", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1("pi2", log_files);
+ TimestampMapper mapper1("pi2", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1094,13 +1102,15 @@
// messages due to the channel filter callbacks used
size_t mapper0_count = 0;
- TimestampMapper mapper0("pi1", log_files);
+ TimestampMapper mapper0("pi1", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
mapper0.set_replay_channels_callback(
[&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
size_t mapper1_count = 0;
- TimestampMapper mapper1("pi2", log_files);
+ TimestampMapper mapper1("pi2", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
mapper1.set_replay_channels_callback(
@@ -1231,11 +1241,13 @@
ASSERT_EQ(parts.size(), 1u);
size_t mapper0_count = 0;
- TimestampMapper mapper0("pi1", log_files);
+ TimestampMapper mapper0("pi1", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1("pi2", log_files);
+ TimestampMapper mapper1("pi2", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1353,11 +1365,13 @@
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0("pi1", log_files);
+ TimestampMapper mapper0("pi1", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1("pi2", log_files);
+ TimestampMapper mapper1("pi2", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1473,11 +1487,13 @@
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0("pi1", log_files);
+ TimestampMapper mapper0("pi1", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1("pi2", log_files);
+ TimestampMapper mapper1("pi2", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1557,11 +1573,13 @@
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0("pi1", log_files);
+ TimestampMapper mapper0("pi1", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1("pi2", log_files);
+ TimestampMapper mapper1("pi2", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1642,11 +1660,13 @@
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0("pi1", log_files);
+ TimestampMapper mapper0("pi1", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1("pi2", log_files);
+ TimestampMapper mapper1("pi2", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1717,11 +1737,13 @@
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0("pi1", log_files);
+ TimestampMapper mapper0("pi1", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1("pi2", log_files);
+ TimestampMapper mapper1("pi2", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1781,7 +1803,8 @@
LogFilesContainer log_files(parts);
size_t mapper0_count = 0;
- TimestampMapper mapper0("pi1", log_files);
+ TimestampMapper mapper0("pi1", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
@@ -1823,7 +1846,8 @@
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper1_count = 0;
- TimestampMapper mapper1("pi2", log_files);
+ TimestampMapper mapper1("pi2", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1897,11 +1921,13 @@
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0("pi1", log_files);
+ TimestampMapper mapper0("pi1", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1("pi2", log_files);
+ TimestampMapper mapper1("pi2", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -2154,7 +2180,9 @@
ASSERT_EQ(parts.size(), 1u);
ASSERT_EQ(parts[0].parts.size(), 2u);
- BootMerger merger("pi2", log_files);
+ BootMerger merger("pi2", log_files,
+ {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
+ StoredDataType::REMOTE_TIMESTAMPS});
EXPECT_EQ(merger.node(), 1u);
@@ -2338,11 +2366,13 @@
ASSERT_EQ(parts[0].logger_node, "pi1");
size_t mapper0_count = 0;
- TimestampMapper mapper0("pi1", log_files);
+ TimestampMapper mapper0("pi1", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1("pi2", log_files);
+ TimestampMapper mapper1("pi2", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -2557,11 +2587,13 @@
ASSERT_EQ(parts[0].logger_node, "pi1");
size_t mapper0_count = 0;
- TimestampMapper mapper0("pi1", log_files);
+ TimestampMapper mapper0("pi1", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1("pi2", log_files);
+ TimestampMapper mapper1("pi2", log_files,
+ TimestampQueueStrategy::kQueueTogether);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index b843e3e..a055006 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -27,13 +27,28 @@
::testing::Values(
ConfigParams{"multinode_pingpong_combined_config.json", true,
kCombinedConfigSha1(), kCombinedConfigSha1(),
- FileStrategy::kKeepSeparate},
+ FileStrategy::kKeepSeparate,
+ ForceTimestampBuffering::kForceBufferTimestamps},
+ ConfigParams{"multinode_pingpong_combined_config.json", true,
+ kCombinedConfigSha1(), kCombinedConfigSha1(),
+ FileStrategy::kKeepSeparate,
+ ForceTimestampBuffering::kAutoBuffer},
ConfigParams{"multinode_pingpong_split_config.json", false,
kSplitConfigSha1(), kReloggedSplitConfigSha1(),
- FileStrategy::kKeepSeparate},
+ FileStrategy::kKeepSeparate,
+ ForceTimestampBuffering::kForceBufferTimestamps},
ConfigParams{"multinode_pingpong_split_config.json", false,
kSplitConfigSha1(), kReloggedSplitConfigSha1(),
- FileStrategy::kCombine}),
+ FileStrategy::kKeepSeparate,
+ ForceTimestampBuffering::kAutoBuffer},
+ ConfigParams{"multinode_pingpong_split_config.json", false,
+ kSplitConfigSha1(), kReloggedSplitConfigSha1(),
+ FileStrategy::kCombine,
+ ForceTimestampBuffering::kForceBufferTimestamps},
+ ConfigParams{"multinode_pingpong_split_config.json", false,
+ kSplitConfigSha1(), kReloggedSplitConfigSha1(),
+ FileStrategy::kCombine,
+ ForceTimestampBuffering::kAutoBuffer}),
::testing::ValuesIn(SupportedCompressionAlgorithms())));
INSTANTIATE_TEST_SUITE_P(
@@ -42,13 +57,28 @@
::testing::Values(
ConfigParams{"multinode_pingpong_combined_config.json", true,
kCombinedConfigSha1(), kCombinedConfigSha1(),
- FileStrategy::kKeepSeparate},
+ FileStrategy::kKeepSeparate,
+ ForceTimestampBuffering::kForceBufferTimestamps},
+ ConfigParams{"multinode_pingpong_combined_config.json", true,
+ kCombinedConfigSha1(), kCombinedConfigSha1(),
+ FileStrategy::kKeepSeparate,
+ ForceTimestampBuffering::kAutoBuffer},
ConfigParams{"multinode_pingpong_split_config.json", false,
kSplitConfigSha1(), kReloggedSplitConfigSha1(),
- FileStrategy::kKeepSeparate},
+ FileStrategy::kKeepSeparate,
+ ForceTimestampBuffering::kForceBufferTimestamps},
ConfigParams{"multinode_pingpong_split_config.json", false,
kSplitConfigSha1(), kReloggedSplitConfigSha1(),
- FileStrategy::kCombine}),
+ FileStrategy::kKeepSeparate,
+ ForceTimestampBuffering::kAutoBuffer},
+ ConfigParams{"multinode_pingpong_split_config.json", false,
+ kSplitConfigSha1(), kReloggedSplitConfigSha1(),
+ FileStrategy::kCombine,
+ ForceTimestampBuffering::kForceBufferTimestamps},
+ ConfigParams{"multinode_pingpong_split_config.json", false,
+ kSplitConfigSha1(), kReloggedSplitConfigSha1(),
+ FileStrategy::kCombine,
+ ForceTimestampBuffering::kAutoBuffer}),
::testing::ValuesIn(SupportedCompressionAlgorithms())));
// Tests that we can write and read simple multi-node log files.
diff --git a/aos/events/logging/multinode_logger_test_lib.cc b/aos/events/logging/multinode_logger_test_lib.cc
index f62ddf5..5574c6e 100644
--- a/aos/events/logging/multinode_logger_test_lib.cc
+++ b/aos/events/logging/multinode_logger_test_lib.cc
@@ -8,6 +8,8 @@
#include "aos/events/simulated_event_loop.h"
#include "aos/testing/tmpdir.h"
+DECLARE_bool(force_timestamp_loading);
+
namespace aos {
namespace logger {
namespace testing {
@@ -104,6 +106,10 @@
pi1_reboot_logfiles_(MakePi1RebootLogfiles()),
logfiles_(MakeLogFiles(logfile_base1_, logfile_base2_)),
structured_logfiles_(StructureLogFiles()) {
+ FLAGS_force_timestamp_loading =
+ std::get<0>(GetParam()).timestamp_buffering ==
+ ForceTimestampBuffering::kForceBufferTimestamps;
+
util::UnlinkRecursive(tmp_dir_ + "/logs");
std::filesystem::create_directory(tmp_dir_ + "/logs");
diff --git a/aos/events/logging/multinode_logger_test_lib.h b/aos/events/logging/multinode_logger_test_lib.h
index 63d947a..4321cc3 100644
--- a/aos/events/logging/multinode_logger_test_lib.h
+++ b/aos/events/logging/multinode_logger_test_lib.h
@@ -34,6 +34,11 @@
kKeepSeparate,
};
+enum class ForceTimestampBuffering {
+ kForceBufferTimestamps,
+ kAutoBuffer,
+};
+
// Parameters to run all the tests with.
struct ConfigParams {
// The config file to use.
@@ -48,6 +53,9 @@
std::string_view relogged_sha256;
// If kCombine, use MinimalFileMultiNodeLogNamer.
FileStrategy file_strategy;
+ // If kForceBufferTimestamps, set --force_timestamp_loading to force buffering
+ // timestamps at the start.
+ ForceTimestampBuffering timestamp_buffering;
};
struct LoggerState {
@@ -165,6 +173,8 @@
void AddExtension(std::string_view extension);
+ gflags::FlagSaver flag_saver_;
+
// Config and factory.
aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
message_bridge::TestingTimeConverter time_converter_;
diff --git a/aos/events/logging/single_node_merge.cc b/aos/events/logging/single_node_merge.cc
index aee2c5e..0fa3567 100644
--- a/aos/events/logging/single_node_merge.cc
+++ b/aos/events/logging/single_node_merge.cc
@@ -21,7 +21,7 @@
int Main(int argc, char **argv) {
const LogFilesContainer log_files(SortParts(FindLogs(argc, argv)));
- const Configuration *config = log_files.config();
+ const Configuration *config = log_files.config().get();
// Haven't tested this on a single node log, and don't really see a need to
// right now. The higher layers just work.
@@ -37,8 +37,8 @@
// parts to not be from the same boot.
if (log_files.ContainsPartsForNode(node_name)) {
// Filter the parts relevant to each node when building the mapper.
- mappers.emplace_back(
- std::make_unique<TimestampMapper>(node_name, log_files));
+ mappers.emplace_back(std::make_unique<TimestampMapper>(
+ node_name, log_files, TimestampQueueStrategy::kQueueTogether));
if (node_name == FLAGS_node) {
node_mapper = mappers.back().get();
}
diff --git a/aos/events/logging/timestamp_extractor.cc b/aos/events/logging/timestamp_extractor.cc
index 0b22f93..ca7f231 100644
--- a/aos/events/logging/timestamp_extractor.cc
+++ b/aos/events/logging/timestamp_extractor.cc
@@ -19,7 +19,7 @@
int Main(int argc, char **argv) {
const LogFilesContainer log_files(SortParts(FindLogs(argc, argv)));
- const Configuration *config = log_files.config();
+ const Configuration *config = log_files.config().get();
CHECK(configuration::MultiNode(config))
<< ": Timestamps only make sense in a multi-node world.";
@@ -33,8 +33,8 @@
// parts to not be from the same boot.
if (!log_files.ContainsPartsForNode(node_name)) {
// Filter the parts relevant to each node when building the mapper.
- mappers.emplace_back(
- std::make_unique<TimestampMapper>(node_name, log_files));
+ mappers.emplace_back(std::make_unique<TimestampMapper>(
+ node_name, log_files, TimestampQueueStrategy::kQueueTogether));
} else {
mappers.emplace_back(nullptr);
}
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 417845b..65a049f 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -1380,6 +1380,14 @@
}
if (rebooted) {
CHECK(reboot_found_);
+ if (VLOG_IS_ON(2)) {
+ VLOG(2) << "Notified reboot of";
+ size_t node_index = 0;
+ for (logger::BootTimestamp t : std::get<1>(*next_time)) {
+ VLOG(2) << " Node " << node_index << " " << t;
+ ++node_index;
+ }
+ }
reboot_found_(std::get<0>(*next_time), std::get<1>(*next_time));
}
}
@@ -2163,12 +2171,15 @@
BitSet64 all_live_nodes(problem.size());
const BitSet64 all_nodes = ~BitSet64(problem.size());
- for (size_t node_index = 0; node_index < timestamp_mappers_.size();
- ++node_index) {
- if (timestamp_mappers_[node_index] != nullptr) {
- // Make sure we have enough data queued such that if we are going to
- // have a timestamp on this filter, we do have a timestamp queued.
- timestamp_mappers_[node_index]->QueueFor(time_estimation_buffer_seconds_);
+ if (time_estimation_buffer_seconds_ != chrono::seconds(0)) {
+ for (size_t node_index = 0; node_index < timestamp_mappers_.size();
+ ++node_index) {
+ if (timestamp_mappers_[node_index] != nullptr) {
+ // Make sure we have enough data queued such that if we are going to
+ // have a timestamp on this filter, we do have a timestamp queued.
+ timestamp_mappers_[node_index]->QueueFor(
+ time_estimation_buffer_seconds_);
+ }
}
}
@@ -2188,6 +2199,10 @@
problem.add_clock_offset_filter(node_a_index, filter.filter,
filter.b_index, filter.b_filter);
+ if (time_estimation_buffer_seconds_ == chrono::seconds(0)) {
+ continue;
+ }
+
if (timestamp_mappers_[node_a_index] != nullptr) {
// Now, we have cases at startup where we have a couple of points
// followed by a long gap, followed by the body of the data. We are