Queue filters until there are at least 2 points each
We were seeing issues where each channel had 1 message at start, then a
*long* time gap, then the body of the messages. We would queue the 1
message up, do time estimation, freeze past that message, then find the
body of data. That was resulting in us not being able to read
reasonably phrased logs.
The fix is to make sure to queue 2 points for each active filter (plus a
bit after so we don't immediately try to modify it).
Change-Id: I41455bf3c2426233a72f40035430f1bc0c420fdc
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 632b1df..d251772 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -520,6 +520,19 @@
// Queues until we have time_estimation_buffer of data in the queue.
void QueueFor(std::chrono::nanoseconds time_estimation_buffer);
+ // Queues until the condition is met.
+ template <typename T>
+ void QueueUntilCondition(T fn) {
+ while (true) {
+ if (fn()) {
+ break;
+ }
+ if (!QueueMatched()) {
+ break;
+ }
+ }
+ }
+
// Sets a callback to be called whenever a full message is queued.
void set_timestamp_callback(std::function<void(TimestampedMessage *)> fn) {
timestamp_callback_ = fn;
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index b3c1110..08dab79 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -630,6 +630,8 @@
}
++node_index;
}
+
+ timestamp_mappers_ = std::move(timestamp_mappers);
}
TimeComparison CompareTimes(const std::vector<monotonic_clock::time_point> &ta,
@@ -867,8 +869,40 @@
all_live_nodes.Set(node_a_index, true);
all_live_nodes.Set(filter.b_index, true);
problem.add_filter(node_a_index, filter.filter, filter.b_index);
+
+ if (timestamp_mappers_[node_a_index] != nullptr) {
+ // Make sure we have enough data queued such that if we are going to
+ // have a timestamp on this filter, we do have a timestamp queued.
+ timestamp_mappers_[node_a_index]->QueueFor(
+ time_estimation_buffer_seconds_);
+ // Now, we have cases at startup where we have a couple of points
+ // followed by a long gap, followed by the body of the data. We are
+ // extrapolating, then adding the new data, and finding that time
+ // was frozen already.
+ //
+ // The fix is to make sure we queue a couple of seconds past the
+ // second point in the line segment, and we always make sure to load
+ // the line segment.
+ //
+ // But, there are filters which have no data in them. We don't want
+ // to queue those until we hit 2, because that'll force us to read
+ // everything into memory. So, only queue for the filters which
+ // have data in them.
+ if (filter.filter->timestamps_size() == 1u) {
+ timestamp_mappers_[node_a_index]->QueueUntilCondition(
+ [&filter]() {
+ return filter.filter->timestamps_size() >= 2u;
+ });
+ }
+ if (filter.filter->timestamps_size() >= 2u) {
+ timestamp_mappers_[node_a_index]->QueueUntil(
+ std::get<0>(filter.filter->timestamp(1u)) +
+ time_estimation_buffer_seconds_);
+ }
+ }
}
}
+
++node_a_index;
}
}
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index 523b384..1ceb25c 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -239,13 +239,13 @@
// unknown.
bool have_popped_ = false;
+ protected:
// The amount of time to buffer when estimating. We care so we don't throw
// data out of our queue too soon. This time is indicative of how much to
// buffer everywhere, so let's latch onto it as well until proven that there
// is a different metric.
const std::chrono::nanoseconds time_estimation_buffer_seconds_;
- protected:
// If true, NextTimestamp returned nothing, so don't bother checking again.
// (This also enforces that we don't find more time data after calling it
// quits.)
@@ -369,6 +369,8 @@
// A mapping from node and channel to the relevant estimator.
std::vector<std::vector<NoncausalOffsetEstimator *>> filters_per_channel_;
+ std::vector<logger::TimestampMapper *> timestamp_mappers_;
+
bool first_solution_ = true;
bool all_done_ = false;