Support queueing data in TimestampMapper and viewing timestamps
When solving for the starting time of a log, we don't always have enough
data queued on all the nodes. We can either do gymnastics to queue
enough data, or we can teach MultiNodeNoncausalOffsetEstimator how to
enqueue more data on each node when asked to evaluate times after the
pre-loaded range.
Teaching MultiNodeNoncausalOffsetEstimator how to automatically enqueue
more data will be more robust in the long term, so let's go that route.
This commit gives us enough hooks to remove the std::deque from inside
LogReader::State and directly use TimestampMapper. That in turn should
let us automatically enqueue more data when needed, and add it to the
filters automatically.
Change-Id: Ife1248d0b515d9ea9088ace84fa020a351289d0c
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 94525cf..070a2de 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -463,6 +463,9 @@
};
// Class to match timestamps with the corresponding data from other nodes.
+//
+// This class also buffers data for the node it represents, and supports
+// notifying when new data is queued as well as queueing until a point in time.
class TimestampMapper {
public:
TimestampMapper(std::vector<LogParts> file);
@@ -512,6 +515,14 @@
// Returns debug information about this node.
std::string DebugString() const;
+ // Queues data the provided time.
+ void QueueUntil(monotonic_clock::time_point queue_time);
+
+ // Sets a callback to be called whenever a full message is queued.
+ void set_timestamp_callback(std::function<void(TimestampedMessage *)> fn) {
+ timestamp_callback_ = fn;
+ }
+
private:
// The state for a remote node. This holds the data that needs to be matched
// with the remote node's timestamps.
@@ -546,13 +557,17 @@
// otherwise.
bool Queue();
+ // Queues up a single matched message into our matched message queue. Returns
+ // true if one was queued, and false otherwise.
+ bool QueueMatched();
+
// Queues up data until we have at least one message >= to time t.
// Useful for triggering a remote node to read enough data to have the
// timestamp you care about available.
- void QueueUntil(monotonic_clock::time_point t);
+ void QueueUnmatchedUntil(monotonic_clock::time_point t);
- // Fills message_ with the contents of m.
- void FillMessage(Message *m);
+ // Queues m into matched_messages_.
+ void QueueMessage(Message *m);
// The node merger to source messages from.
NodeMerger node_merger_;
@@ -569,10 +584,10 @@
std::vector<NodeData> nodes_data_;
// Latest message to return.
- TimestampedMessage message_;
+ std::deque<TimestampedMessage> matched_messages_;
- // Tracks if the first message points to message_, nullptr (all done), or is
- // invalid.
+ // Tracks the state of the first message in matched_messages_. Do we need to
+ // update it, is it valid, or should we return nullptr?
enum class FirstMessage {
kNeedsUpdate,
kInMessage,
@@ -585,6 +600,8 @@
monotonic_clock::time_point last_message_time_ = monotonic_clock::min_time;
// Time this node is queued up until. Used for caching.
monotonic_clock::time_point queued_until_ = monotonic_clock::min_time;
+
+ std::function<void(TimestampedMessage *)> timestamp_callback_;
};
// Returns the node name with a trailing space, or an empty string if we are on