Really fix log sorting
We were queueing all the forward timestamps, but the reverse timestamps
were killing us. So, we now queue node_b up until the reverse direction
as well.
Change-Id: I94fab9bb2b49eb439d4b576b19177b3d13703fd2
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index aa580e7..bc11ffc 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -863,6 +863,15 @@
BitSet64 all_live_nodes(problem.size());
const BitSet64 all_nodes = ~BitSet64(problem.size());
+ for (size_t node_index = 0; node_index < timestamp_mappers_.size();
+ ++node_index) {
+ if (timestamp_mappers_[node_index] != nullptr) {
+ // Make sure we have enough data queued such that if we are going to
+ // have a timestamp on this filter, we do have a timestamp queued.
+ timestamp_mappers_[node_index]->QueueFor(time_estimation_buffer_seconds_);
+ }
+ }
+
{
size_t node_a_index = 0;
for (const auto &filters : filters_per_node_) {
@@ -879,10 +888,6 @@
problem.add_filter(node_a_index, filter.filter, filter.b_index);
if (timestamp_mappers_[node_a_index] != nullptr) {
- // Make sure we have enough data queued such that if we are going to
- // have a timestamp on this filter, we do have a timestamp queued.
- timestamp_mappers_[node_a_index]->QueueFor(
- time_estimation_buffer_seconds_);
// Now, we have cases at startup where we have a couple of points
// followed by a long gap, followed by the body of the data. We are
// extrapolating, then adding the new data, and finding that time
@@ -896,15 +901,34 @@
// to queue those until we hit 2, because that'll force us to read
// everything into memory. So, only queue for the filters which
// have data in them.
- if (filter.filter->timestamps_size() == 1u) {
- timestamp_mappers_[node_a_index]->QueueUntilCondition(
- [&filter]() {
- return filter.filter->timestamps_size() >= 2u;
- });
- }
- if (filter.filter->timestamps_size() >= 2u) {
+
+ size_t node_b_index = configuration::GetNodeIndex(
+ timestamp_mappers_[node_a_index]->configuration(),
+ configuration::GetNode(
+ timestamp_mappers_[node_a_index]->configuration(),
+ filter.filter->node_b()));
+
+ // Timestamps can come from either node. When a message is
+ // delivered to a node, it can have the timestamp time attached to
+ // that message as well. That means that we have to queue both
+ // nodes until we have 2 unobserved points from both nodes.
+ timestamp_mappers_[node_a_index]->QueueUntilCondition(
+ [&filter]() { return filter.filter->has_unobserved_line(); });
+
+ timestamp_mappers_[node_b_index]->QueueUntilCondition(
+ [&filter]() { return filter.filter->has_unobserved_line(); });
+
+ // If we actually found a line, make sure to buffer to the desired
+ // distance past that last point so the filter doesn't try to
+ // invalidate the point. Do this for both nodes to pick up all the
+ // timestamps.
+ if (filter.filter->has_unobserved_line()) {
timestamp_mappers_[node_a_index]->QueueUntil(
- std::get<0>(filter.filter->timestamp(1u)) +
+ filter.filter->unobserved_line_end() +
+ time_estimation_buffer_seconds_);
+
+ timestamp_mappers_[node_b_index]->QueueUntil(
+ filter.filter->unobserved_line_remote_end() +
time_estimation_buffer_seconds_);
}
}
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 5605246..b4455a5 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -1008,6 +1008,11 @@
}
}
+std::string NoncausalTimestampFilter::NodeNames() const {
+ return absl::StrCat(node_a_->name()->string_view(), " -> ",
+ node_b_->name()->string_view());
+}
+
bool NoncausalTimestampFilter::ValidateSolution(
aos::monotonic_clock::time_point ta,
aos::monotonic_clock::time_point tb) const {
@@ -1021,9 +1026,8 @@
const chrono::nanoseconds offset =
NoncausalTimestampFilter::ExtrapolateOffset(reference_timestamp, ta);
if (offset + ta > tb) {
- LOG(ERROR) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view() << " "
- << TimeString(ta, offset) << " > solution time " << tb;
+ LOG(ERROR) << NodeNames() << " " << TimeString(ta, offset)
+ << " > solution time " << tb;
return false;
}
return true;
@@ -1036,9 +1040,8 @@
NoncausalTimestampFilter::InterpolateOffset(points.first, points.second,
ta);
if (offset + ta > tb) {
- LOG(ERROR) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view() << " "
- << TimeString(ta, offset) << " > solution time " << tb;
+ LOG(ERROR) << NodeNames() << " " << TimeString(ta, offset)
+ << " > solution time " << tb;
LOG(ERROR) << "Bracketing times are " << TimeString(points.first) << " and "
<< TimeString(points.second);
return false;
@@ -1058,20 +1061,20 @@
// The first sample is easy. Just do it!
if (timestamps_.size() == 0) {
- VLOG(1) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view() << " Initial sample of "
+ VLOG(1) << NodeNames() << " Initial sample of "
<< TimeString(monotonic_now, sample_ns);
timestamps_.emplace_back(std::make_tuple(monotonic_now, sample_ns, false));
- CHECK(!fully_frozen_) << ": " << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view()
- << " Returned a horizontal line previously and then "
- "got a new sample at "
- << monotonic_now << ", "
- << chrono::duration<double>(
- monotonic_now - std::get<0>(timestamps_[0]))
- .count()
- << " seconds after the last sample at "
- << std::get<0>(timestamps_[0]) << ".";
+ CHECK(!fully_frozen_)
+ << ": " << NodeNames()
+ << " Returned a horizontal line previously and then "
+ "got a new sample at "
+ << monotonic_now << ", "
+ << chrono::duration<double>(monotonic_now - std::get<0>(timestamps_[0]))
+ .count()
+ << " seconds after the last sample at " << std::get<0>(timestamps_[0])
+ << ". Increase --time_estimation_buffer_seconds to at least "
+ << chrono::duration<double>(monotonic_now - std::get<0>(timestamps_[0]))
+ .count();
return;
}
@@ -1084,9 +1087,8 @@
aos::monotonic_clock::duration doffset = sample_ns - std::get<1>(back);
if (dt == chrono::nanoseconds(0) && doffset == chrono::nanoseconds(0)) {
- VLOG(1) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view() << " Duplicate sample of O("
- << monotonic_now << ") = " << sample_ns.count() << ", remote time "
+ VLOG(1) << NodeNames() << " Duplicate sample of O(" << monotonic_now
+ << ") = " << sample_ns.count() << ", remote time "
<< monotonic_now + sample_ns;
return;
@@ -1099,26 +1101,27 @@
// negative slope, the point violates our constraint and will never be worth
// considering. Ignore it.
if (doffset < -dt * kMaxVelocity()) {
- VLOG(1) << std::setprecision(1) << std::fixed
- << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view() << " Rejected sample of "
- << TimeString(monotonic_now, sample_ns) << " because "
- << doffset.count() << " < " << (-dt * kMaxVelocity()).count()
- << " len " << timestamps_.size();
+ VLOG(1) << std::setprecision(1) << std::fixed << NodeNames()
+ << " Rejected sample of " << TimeString(monotonic_now, sample_ns)
+ << " because " << doffset.count() << " < "
+ << (-dt * kMaxVelocity()).count() << " len "
+ << timestamps_.size();
return;
}
// Be overly conservative here. It either won't make a difference, or
// will give us an error with an actual useful time difference.
CHECK(!fully_frozen_)
- << ": " << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view()
+ << ": " << NodeNames()
<< " Returned a horizontal line previously and then got a new "
"sample at "
<< monotonic_now << ", "
<< chrono::duration<double>(monotonic_now - std::get<0>(timestamps_[0]))
.count()
- << " seconds after the last sample at " << std::get<0>(timestamps_[0]);
+ << " seconds after the last sample at " << std::get<0>(timestamps_[0])
+ << ". Increase --time_estimation_buffer_seconds to at least "
+ << chrono::duration<double>(monotonic_now - std::get<0>(timestamps_[0]))
+ .count();
// Back propagate the max velocity and remove any elements violating the
// velocity constraint. This is to handle the case where the offsets were
@@ -1132,11 +1135,16 @@
// In this case, point 3 is now violating our constraint and we need to
// remove it. This is the non-causal part of the filter.
while (dt * kMaxVelocity() < doffset && timestamps_.size() > 1u) {
- CHECK(!std::get<2>(back)) << ": " << node_a_->name()->string_view()
- << " -> " << node_b_->name()->string_view()
- << " Can't pop an already frozen sample.";
- VLOG(1) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view()
+ CHECK(!std::get<2>(back))
+ << ": " << NodeNames() << " Can't pop an already frozen sample "
+ << TimeString(back) << " while inserting "
+ << TimeString(monotonic_now, sample_ns) << ", "
+ << chrono::duration<double>(monotonic_now - std::get<0>(back)).count()
+ << " seconds in the past. Increase --time_estimation_buffer_seconds "
+ "to at least "
+ << chrono::duration<double>(monotonic_now - std::get<0>(back))
+ .count();
+ VLOG(1) << NodeNames()
<< " Removing now invalid sample during back propegation of "
<< TimeString(back);
timestamps_.pop_back();
@@ -1163,9 +1171,11 @@
CHECK(it != timestamps_.end());
CHECK(!std::get<2>(*(it)))
- << ": " << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view() << " Tried to insert " << monotonic_now
- << " before " << std::get<0>(*it) << ", which is a frozen time.";
+ << ": " << NodeNames() << " Tried to insert " << monotonic_now
+ << " before " << std::get<0>(*it)
+ << ", which is frozen in the past. Increase "
+ "--time_estimation_buffer_seconds to at least "
+ << chrono::duration<double>(std::get<0>(*it) - monotonic_now).count();
if (it == timestamps_.begin()) {
// We are being asked to add at the beginning.
@@ -1175,8 +1185,7 @@
const chrono::nanoseconds doffset = original_offset - sample_ns;
if (dt == chrono::nanoseconds(0) && doffset >= chrono::nanoseconds(0)) {
- VLOG(1) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view() << " Redundant timestamp "
+ VLOG(1) << NodeNames() << " Redundant timestamp "
<< TimeString(monotonic_now, sample_ns) << " because "
<< TimeString(timestamps_.front())
<< " is at the same time and a better solution.";
@@ -1184,8 +1193,7 @@
}
}
- VLOG(1) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view() << " Added sample at beginning "
+ VLOG(1) << NodeNames() << " Added sample at beginning "
<< TimeString(monotonic_now, sample_ns);
timestamps_.insert(it, std::make_tuple(monotonic_now, sample_ns, false));
@@ -1198,10 +1206,9 @@
const chrono::nanoseconds doffset = std::get<1>(*second) - sample_ns;
if (doffset < -dt * kMaxVelocity()) {
- VLOG(1) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view()
- << " Removing redundant sample of " << TimeString(*second)
- << " because " << TimeString(timestamps_.front())
+ VLOG(1) << NodeNames() << " Removing redundant sample of "
+ << TimeString(*second) << " because "
+ << TimeString(timestamps_.front())
<< " would make the slope too negative.";
timestamps_.erase(second);
continue;
@@ -1226,10 +1233,8 @@
std::get<1>(*third) - std::get<1>(*second);
if (doffset > dt * kMaxVelocity()) {
- VLOG(1) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view()
- << " Removing invalid sample of " << TimeString(*second)
- << " because " << TimeString(*third)
+ VLOG(1) << NodeNames() << " Removing invalid sample of "
+ << TimeString(*second) << " because " << TimeString(*third)
<< " would make the slope too positive.";
timestamps_.erase(second);
continue;
@@ -1241,10 +1246,8 @@
}
return;
} else {
- VLOG(1) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view() << " Found the next time "
- << std::get<0>(*(it - 1)) << " < " << monotonic_now << " < "
- << std::get<0>(*it);
+ VLOG(1) << NodeNames() << " Found the next time " << std::get<0>(*(it - 1))
+ << " < " << monotonic_now << " < " << std::get<0>(*it);
{
chrono::nanoseconds prior_dt = monotonic_now - std::get<0>(*(it - 1));
@@ -1254,16 +1257,14 @@
// If we are worse than either the previous or next point, discard.
if (prior_doffset < -prior_dt * kMaxVelocity()) {
- VLOG(1) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view() << " Ignoring timestamp "
+ VLOG(1) << NodeNames() << " Ignoring timestamp "
<< TimeString(monotonic_now, sample_ns) << " because "
<< TimeString(*(it - 1))
<< " is before and the slope would be too negative.";
return;
}
if (next_doffset > next_dt * kMaxVelocity()) {
- VLOG(1) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view() << " Ignoring timestamp "
+ VLOG(1) << NodeNames() << " Ignoring timestamp "
<< TimeString(monotonic_now, sample_ns) << " because "
<< TimeString(*it)
<< " is following and the slope would be too positive.";
@@ -1276,9 +1277,7 @@
// new.
auto middle_it = timestamps_.insert(
it, std::make_tuple(monotonic_now, sample_ns, false));
- VLOG(1) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view() << " Inserted "
- << TimeString(*middle_it);
+ VLOG(1) << NodeNames() << " Inserted " << TimeString(*middle_it);
while (middle_it != timestamps_.end() && middle_it != timestamps_.begin()) {
auto next_it =
@@ -1294,8 +1293,7 @@
std::get<1>(*next_it) - std::get<1>(*middle_it);
if (next_doffset < -next_dt * kMaxVelocity()) {
- VLOG(1) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view()
+ VLOG(1) << NodeNames()
<< " Next slope is too negative, removing next point "
<< TimeString(*next_it);
next_it = timestamps_.erase(next_it);
@@ -1315,9 +1313,11 @@
if (prior_doffset > prior_dt * kMaxVelocity()) {
CHECK(!std::get<2>(*prior_it))
- << ": " << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view()
- << " Can't pop an already frozen sample.";
+ << ": " << NodeNames()
+ << " Can't pop an already frozen sample. Increase "
+ "--time_estimation_buffer_seconds to at least "
+ << chrono::duration<double>(prior_dt).count();
+
VLOG(1) << "Prior slope is too positive, removing prior point "
<< TimeString(*prior_it);
prior_it = timestamps_.erase(prior_it);
@@ -1342,6 +1342,28 @@
return removed;
}
+monotonic_clock::time_point NoncausalTimestampFilter::unobserved_line_end()
+ const {
+ if (has_unobserved_line()) {
+ return std::get<0>(timestamp(next_to_consume_ + 1));
+ }
+ return monotonic_clock::min_time;
+}
+
+monotonic_clock::time_point
+NoncausalTimestampFilter::unobserved_line_remote_end() const {
+ if (has_unobserved_line()) {
+ const std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> t =
+ timestamp(next_to_consume_ + 1);
+ return std::get<0>(t) + std::get<1>(t);
+ }
+ return monotonic_clock::min_time;
+}
+
+bool NoncausalTimestampFilter::has_unobserved_line() const {
+ return next_to_consume_ + 1 < timestamps_.size();
+}
+
std::optional<std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds>>
NoncausalTimestampFilter::Observe() const {
if (timestamps_.empty() || next_to_consume_ >= timestamps_.size()) {
@@ -1372,10 +1394,12 @@
}
if (timestamps_.empty()) {
+ VLOG(1) << NodeNames() << " fully_frozen_";
fully_frozen_ = true;
} else if (node_monotonic_now > std::get<0>(timestamps_.back())) {
// We've been asked to freeze past the last point. It isn't safe to add any
// more points or we will change this region.
+ VLOG(1) << NodeNames() << " fully_frozen_";
fully_frozen_ = true;
}
}
@@ -1392,11 +1416,13 @@
}
if (timestamps_.empty()) {
+ VLOG(1) << NodeNames() << " fully_frozen_";
fully_frozen_ = true;
} else if (remote_monotonic_now > std::get<0>(timestamps_.back()) +
std::get<1>(timestamps_.back())) {
// We've been asked to freeze past the last point. It isn't safe to add any
// more points or we will change this region.
+ VLOG(1) << NodeNames() << " fully_frozen_";
fully_frozen_ = true;
}
}
@@ -1417,16 +1443,13 @@
void NoncausalTimestampFilter::SetCsvFileName(std::string_view name) {
fp_ = fopen(absl::StrCat(name, ".csv").c_str(), "w");
- samples_fp_ =
- fopen(absl::StrCat(name, "_samples.csv").c_str(), "w");
+ samples_fp_ = fopen(absl::StrCat(name, "_samples.csv").c_str(), "w");
PrintNoncausalTimestampFilterHeader(fp_);
PrintNoncausalTimestampFilterSamplesHeader(samples_fp_);
}
void NoncausalTimestampFilter::PopFront() {
- VLOG(1) << node_a_->name()->string_view() << " -> "
- << node_b_->name()->string_view() << " Popped sample of "
- << TimeString(timestamp(0));
+ VLOG(1) << NodeNames() << " Popped sample of " << TimeString(timestamp(0));
MaybeWriteTimestamp(timestamp(0));
timestamps_.pop_front();
has_popped_ = true;
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 7a9b7fa..9f01efc 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -343,6 +343,9 @@
size_t timestamps_size() const { return timestamps_.size(); }
+ // Returns a debug string with the nodes this filter represents.
+ std::string NodeNames() const;
+
void Debug() {
size_t count = 0;
for (std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds,
@@ -366,6 +369,15 @@
void FreezeUntil(aos::monotonic_clock::time_point node_monotonic_now);
void FreezeUntilRemote(aos::monotonic_clock::time_point remote_monotonic_now);
+ // Returns true if there is a full line which hasn't been observed.
+ bool has_unobserved_line() const;
+ // Returns the time of the second point in the unobserved line, or min_time if
+ // there is no line.
+ monotonic_clock::time_point unobserved_line_end() const;
+ // Returns the time of the second point in the unobserved line on the remote
+ // node, or min_time if there is no line.
+ monotonic_clock::time_point unobserved_line_remote_end() const;
+
// Returns the next timestamp in the queue if available without incrementing
// the pointer. This, Consume, and FreezeUntil work together to allow
// tracking and freezing timestamps which have been combined externally.
@@ -415,6 +427,9 @@
std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p0,
monotonic_clock::time_point ta_base, double ta);
+ const Node *node_a() const { return node_a_; }
+ const Node *node_b() const { return node_b_; }
+
private:
// Removes the oldest timestamp.
void PopFront();
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index caecccd..0e47cc6 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -625,9 +625,11 @@
ASSERT_EQ(filter.timestamps_size(), 2u);
filter.FreezeUntil(tb);
- EXPECT_DEATH({ filter.Sample(tb, oa); },
- "Tried to insert 0.100000000sec before 0.100000000sec, which "
- "is a frozen time");
+ EXPECT_DEATH(
+ { filter.Sample(tb, oa); },
+ "test_a -> test_b Tried to insert 0.100000000sec before "
+ "0.100000000sec, which is frozen "
+ "in the past. Increase --time_estimation_buffer_seconds to at least");
}
{
@@ -641,7 +643,8 @@
EXPECT_DEATH(
{ filter.Sample(tc, oc); },
- "Returned a horizontal line previously and then got a new sample at "
+ "test_a -> test_b Returned a horizontal line previously and then got a "
+ "new sample at "
"0.200000000sec, 0.2 seconds after the last sample at 0.000000000sec");
}
@@ -655,8 +658,9 @@
filter.FreezeUntil(tc);
EXPECT_DEATH({ filter.Sample(tb, ob); },
- "Tried to insert 0.100000000sec before 0.200000000sec, which "
- "is a frozen time");
+ "test_a -> test_b Tried to insert 0.100000000sec before "
+ "0.200000000sec, which "
+ "is frozen");
}
{
@@ -670,11 +674,13 @@
ASSERT_EQ(filter.timestamps_size(), 3u);
filter.FreezeUntil(tb);
- EXPECT_DEATH({ filter.Sample(tb, oa); },
- "Tried to insert 0.100000000sec before 0.100000000sec, which "
- "is a frozen time");
+ EXPECT_DEATH(
+ { filter.Sample(tb, oa); },
+ "test_a -> test_b Tried to insert 0.100000000sec before "
+ "0.100000000sec, which "
+ "is frozen in the past. Increase --time_estimation_buffer_seconds");
EXPECT_DEATH({ filter.Sample(tb + chrono::nanoseconds(1), oa); },
- "Can't pop an already frozen sample");
+ "test_a -> test_b Can't pop an already frozen sample");
}
}