Read all timestamps into RAM where possible when reading logs
There are cases where we fail to buffer far enough in the future to
solve the timestamp problem, or where we end up choosing to buffer until
the end of time to solve the timestamp problem. These both result in
log reading failures either through CHECKs or OOMs.
Timestamps are tiny. The existence of timestamp_extractor proves that
we can store the whole timestamp problem in memory relatively easily.
When timestamps are stored in separate files, let's just load them at
the start.
This also adds flags to force this behavior on and off. When a log with
data and timestamps mixed in it is found, and the flag forces it on, we
will read the data files twice to extract the timestamps the first time.
I'd like to add more tests to logfile_utils_test to test this all
explicitly, but the multinode_logger tests appear to do a really good
job already.
Change-Id: I38e23836afa980e3e3a839125e78e132066e2c90
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index c8d2e70..fb3fcbf 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -18,6 +18,7 @@
#include "absl/types/span.h"
#include "flatbuffers/flatbuffers.h"
+#include "aos/configuration.h"
#include "aos/containers/resizeable_buffer.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/boot_timestamp.h"
@@ -528,6 +529,7 @@
std::shared_ptr<UnpackedMessageHeader> data;
bool operator<(const Message &m2) const;
+ bool operator<=(const Message &m2) const;
bool operator>=(const Message &m2) const;
bool operator==(const Message &m2) const;
};
@@ -610,8 +612,7 @@
// boot.
class PartsMerger {
public:
- PartsMerger(std::string_view node_name, size_t boot_count,
- const LogFilesContainer &log_files);
+ PartsMerger(SelectedLogParts &&selected_parts);
// Copying and moving will mess up the internal raw pointers. Just don't do
// it.
@@ -623,11 +624,15 @@
// Node index in the configuration of this node.
int node() const { return node_; }
+ std::string_view node_name() const {
+ return configuration::NodeName(configuration().get(), node());
+ }
+
// List of parts being sorted together.
std::vector<const LogParts *> Parts() const;
- const Configuration *configuration() const {
- return message_sorters_[0].parts().config.get();
+ const std::shared_ptr<const Configuration> configuration() const {
+ return message_sorters_[0].parts().config;
}
monotonic_clock::time_point monotonic_start_time() const {
@@ -636,7 +641,15 @@
realtime_clock::time_point realtime_start_time() const {
return realtime_start_time_;
}
- monotonic_clock::time_point monotonic_oldest_time() const {
+
+ // Returns the oldest message observed in this set of parts. This could be
+ // before the start time if we fetched it at the start of logging from long
+ // ago.
+ monotonic_clock::time_point monotonic_oldest_time() {
+ if (monotonic_oldest_time_ == monotonic_clock::max_time) {
+ VLOG(1) << "No oldest message time, fetching " << node_name();
+ (void)Front();
+ }
return monotonic_oldest_time_;
}
@@ -677,7 +690,8 @@
// stream.
class BootMerger {
public:
- BootMerger(std::string_view node_name, const LogFilesContainer &log_files);
+ BootMerger(std::string_view node_name, const LogFilesContainer &log_files,
+ const std::vector<StoredDataType> &types);
// Copying and moving will mess up the internal raw pointers. Just don't do
// it.
@@ -687,33 +701,21 @@
void operator=(BootMerger &&) = delete;
// Node index in the configuration of this node.
- int node() const { return parts_mergers_[0]->node(); }
+ int node() const { return node_; }
+ std::string_view node_name() const;
// List of parts being sorted together.
std::vector<const LogParts *> Parts() const;
- const Configuration *configuration() const {
- return parts_mergers_[0]->configuration();
+ const std::shared_ptr<const Configuration> configuration() const {
+ return configuration_;
}
- monotonic_clock::time_point monotonic_start_time(size_t boot) const {
- CHECK_LT(boot, parts_mergers_.size());
- return parts_mergers_[boot]->monotonic_start_time();
- }
- realtime_clock::time_point realtime_start_time(size_t boot) const {
- CHECK_LT(boot, parts_mergers_.size());
- return parts_mergers_[boot]->realtime_start_time();
- }
- monotonic_clock::time_point monotonic_oldest_time(size_t boot) const {
- CHECK_LT(boot, parts_mergers_.size());
- return parts_mergers_[boot]->monotonic_oldest_time();
- }
+ monotonic_clock::time_point monotonic_start_time(size_t boot) const;
+ realtime_clock::time_point realtime_start_time(size_t boot) const;
+ monotonic_clock::time_point monotonic_oldest_time(size_t boot) const;
- bool started() const {
- return parts_mergers_[index_]->sorted_until() !=
- monotonic_clock::min_time ||
- index_ != 0;
- }
+ bool started() const;
// Returns the next sorted message from the set of log files. It is safe to
// call std::move() on the result to move the data flatbuffer from it.
@@ -727,7 +729,101 @@
// TODO(austin): Sanjay points out this is pretty inefficient. Don't keep so
// many things open.
+ // A list of all the parts mergers. Only the boots with something to sort are
+ // instantiated.
std::vector<std::unique_ptr<PartsMerger>> parts_mergers_;
+
+ std::shared_ptr<const Configuration> configuration_;
+ int node_;
+};
+
+enum class TimestampQueueStrategy {
+ // Read the timestamps at the same time as all the other data.
+ kQueueTogether,
+ // Read the timestamps first.
+ kQueueTimestampsAtStartup,
+};
+
+// Class to manage queueing up timestamps from BootMerger and notifying
+// TimestampMapper of them.
+class SplitTimestampBootMerger {
+ public:
+ SplitTimestampBootMerger(std::string_view node_name,
+ const LogFilesContainer &log_files,
+ TimestampQueueStrategy timestamp_queue_strategy);
+
+ // Copying and moving will mess up the internal raw pointers. Just don't do
+ // it.
+ SplitTimestampBootMerger(SplitTimestampBootMerger const &) = delete;
+ SplitTimestampBootMerger(SplitTimestampBootMerger &&) = delete;
+ void operator=(SplitTimestampBootMerger const &) = delete;
+ void operator=(SplitTimestampBootMerger &&) = delete;
+
+ // Reads all timestamps into a member variable queue, and calls the function
+ // on each timestamp. This only saves timestamps, which are defined as
+ // messages sent on this node, but not originally from this node. To make
+ // that distinction, source_node is provided which has a list of which node
+ // index is the source node for each channel, where the channel index is the
+ // array index.
+ void QueueTimestamps(std::function<void(TimestampedMessage *)> fn,
+ const std::vector<size_t> &source_node);
+
+ // Node index in the configuration of this node.
+ int node() const { return boot_merger_.node(); }
+ // Returns the name of the node this class is sorting for.
+ std::string_view node_name() const;
+
+ std::shared_ptr<const Configuration> configuration() const {
+ return boot_merger_.configuration();
+ }
+
+ monotonic_clock::time_point monotonic_start_time(size_t boot) const;
+ realtime_clock::time_point realtime_start_time(size_t boot) const;
+ monotonic_clock::time_point monotonic_oldest_time(size_t boot) const;
+
+ // Returns true if the log has been started.
+ bool started() const {
+ // Timestamps don't count, so only track boot_merger_.
+ return boot_merger_.started();
+ }
+
+ // Returns the next sorted message from the set of log files. It is safe to
+ // call std::move() on the result to move the data flatbuffer from it.
+ Message *Front();
+
+ // Pops the front message. This should only be called after a call to
+ // Front().
+ void PopFront();
+
+ private:
+ enum class MessageSource {
+ kTimestampMessage,
+ kBootMerger,
+ };
+
+ MessageSource message_source_ = MessageSource::kBootMerger;
+
+ // Boot merger for data and potentially timestamps.
+ BootMerger boot_merger_;
+
+ // Boot merger for just timestamps. Any data read from here is to be ignored.
+ std::unique_ptr<BootMerger> timestamp_boot_merger_;
+
+ // The callback requires us to convert each message to a TimestampedMessage.
+ std::deque<TimestampedMessage> timestamp_messages_;
+
+ // Storage for the next timestamp message to return. This is separate so we
+ // can convert them back to a Message.
+ //
+ // TODO(austin): It would be nice to not have to convert...
+ std::optional<Message> next_timestamp_;
+
+ // Start times for each boot.
+ std::vector<monotonic_clock::time_point> monotonic_start_time_;
+ std::vector<realtime_clock::time_point> realtime_start_time_;
+
+ // Tracks if QueueTimestamps loaded any timestamps.
+ bool queue_timestamps_ran_ = false;
};
// Class to match timestamps with the corresponding data from other nodes.
@@ -737,7 +833,8 @@
class TimestampMapper {
public:
TimestampMapper(std::string_view node_name,
- const LogFilesContainer &log_files);
+ const LogFilesContainer &log_files,
+ TimestampQueueStrategy timestamp_queue_strategy);
// Copying and moving will mess up the internal raw pointers. Just don't do
// it.
@@ -784,6 +881,11 @@
// Returns debug information about this node.
std::string DebugString() const;
+ // Queues just the timestamps so that the timestamp callback gets called.
+ // Note, the timestamp callback will get called when they get returned too, so
+ // make sure to unset it if you don't want to be called twice.
+ void QueueTimestamps();
+
// Queues data the provided time.
void QueueUntil(BootTimestamp queue_time);
// Queues until we have time_estimation_buffer of data in the queue.
@@ -882,15 +984,11 @@
// Returns the name of the node this class is sorting for.
std::string_view node_name() const {
- return configuration_->has_nodes() ? configuration_->nodes()
- ->Get(boot_merger_.node())
- ->name()
- ->string_view()
- : "(single node)";
+ return configuration::NodeName(configuration(), node());
}
// The node merger to source messages from.
- BootMerger boot_merger_;
+ SplitTimestampBootMerger boot_merger_;
std::shared_ptr<const Configuration> configuration_;