Add velocity to timestamp filter
This lets us track better, but doesn't fix our out of order problem at
runtime.
Change-Id: I1b586258616f8f3534a09713e221cb6c73f9ee96
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index f82ac04..d630e98 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -641,12 +641,12 @@
if (FLAGS_timestamps_to_csv) {
std::string fwd_name =
absl::StrCat("/tmp/timestamp_", node_a->name()->string_view(), "_",
- node_b->name()->string_view(), ".csv");
- x.fwd_fp = fopen(fwd_name.c_str(), "w");
+ node_b->name()->string_view());
+ x.SetFwdCsvFileName(fwd_name);
std::string rev_name =
absl::StrCat("/tmp/timestamp_", node_b->name()->string_view(), "_",
- node_a->name()->string_view(), ".csv");
- x.rev_fp = fopen(rev_name.c_str(), "w");
+ node_a->name()->string_view());
+ x.SetRevCsvFileName(rev_name);
}
return std::make_tuple(&x, true);
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 65dac75..e3014fc 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -361,6 +361,7 @@
hdrs = ["timestamp_filter.h"],
deps = [
"//aos/time",
+ "@com_google_absl//absl/strings",
],
)
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 8fb3b0f..cda013a 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -3,12 +3,24 @@
#include <chrono>
#include <tuple>
+#include "absl/strings/str_cat.h"
#include "aos/time/time.h"
namespace aos {
namespace message_bridge {
+namespace {
+
namespace chrono = std::chrono;
+void ClippedAverageFilterPrintHeader(FILE *fp) {
+ fprintf(fp,
+ "# time_since_start, sample_ns, filtered_offset, offset, "
+ "velocity, filtered_velocity, velocity_contribution, "
+ "sample_contribution, time_contribution\n");
+}
+
+} // namespace
+
void TimestampFilter::Set(aos::monotonic_clock::time_point monotonic_now,
chrono::nanoseconds sample_ns) {
const double sample =
@@ -20,53 +32,86 @@
void TimestampFilter::Sample(aos::monotonic_clock::time_point monotonic_now,
chrono::nanoseconds sample_ns) {
+ VLOG(1) << " " << this << " Sample at " << monotonic_now << " is "
+ << sample_ns.count() << "ns, Base is " << base_offset_.count();
+ CHECK_GE(monotonic_now, last_time_)
+ << ": Being asked to filter backwards in time!";
// Compute the sample offset as a double (seconds), taking into account the
// base offset.
const double sample =
chrono::duration_cast<chrono::duration<double>>(sample_ns - base_offset_)
.count();
+ VelocitySample(monotonic_now, sample_ns);
+
// This is our first sample. Just use it.
if (last_time_ == aos::monotonic_clock::min_time) {
+ VLOG(1) << " " << this << " First, setting offset to sample.";
offset_ = sample;
+ velocity_contribution_ = 0.0;
+ sample_contribution_ = 0.0;
+ time_contribution_ = 0.0;
} else {
+ const double dt = chrono::duration_cast<chrono::duration<double>>(
+ monotonic_now - last_time_)
+ .count();
+ const double velocity_contribution = dt * filtered_velocity();
+ velocity_contribution_ = velocity_contribution;
+ offset_ += velocity_contribution;
// Took less time to transmit, so clamp to it.
if (sample < offset_) {
offset_ = sample;
+ velocity_contribution_ = 0.0;
+ sample_contribution_ = 0.0;
+ time_contribution_ = 0.0;
} else {
// We split things up into 2 portions.
// 1) Each sample has information. Correct some using it.
// 2) We want to keep a decent time constant if the sample rate slows.
// Take time since the last sample into account.
+ //
+ // There are other challenges. If every long sample is followed by a
+ // short sample, we will over-weight the long samples.
+ //
+ // In the end, this algorithm does ok for reasonably well conditioned
+ // data, but occasionally violates causality when the network delay goes
+ // from large to small suddenly. Good enough for a real time estimate,
+ // but not great for where we need 100% accuracy replaying logs.
// Time constant for the low pass filter in seconds.
- constexpr double kTau = 0.5;
+ constexpr double kTau = 1.0;
- constexpr double kClampNegative = -0.0003;
+ constexpr double kClampPositive = 0.0005;
+ // Scale the slew rate clamp by dt. This helps when there is an outage.
+ const double clamp_positive = kClampPositive * dt;
+ clamp_ = clamp_positive;
{
// 1)
- constexpr double kAlpha = 0.005;
+ constexpr double kAlpha = 0.0005;
// This formulation is more numerically precise.
- // Clamp to kClampNegative ms to reduce the effect of wildly large
- // samples.
- offset_ = offset_ - kAlpha * std::max(offset_ - sample, kClampNegative);
+ const double sample_contribution =
+ std::min(kAlpha * (sample - offset_), 0.000001);
+ offset_ = offset_ + sample_contribution;
+ sample_contribution_ = sample_contribution;
}
{
// 2)
//
- // 1-e^(t/tau) -> alpha
- const double alpha =
- -std::expm1(-chrono::duration_cast<chrono::duration<double>>(
- monotonic_now - last_time_)
- .count() /
- kTau);
+ // 1-e^(-t/tau) -> alpha
+ const double alpha = -std::expm1(-dt / kTau);
- // Clamp to kClampNegative ms to reduce the effect of wildly large
+ // Clamp to clamp_positive ms to reduce the effect of wildly large
// samples.
- offset_ = offset_ - alpha * std::max(offset_ - sample, kClampNegative);
+ const double time_contribution =
+ std::min(alpha * (sample - offset_), clamp_positive);
+ offset_ = offset_ + time_contribution;
+
+ time_contribution_ = time_contribution;
}
+
+ VLOG(1) << " " << this << " filter sample is " << offset_;
}
}
@@ -80,6 +125,9 @@
base_offset_ = base_offset;
// Clear everything out to avoid any numerical precision problems.
last_time_ = aos::monotonic_clock::min_time;
+ last_velocity_sample_time_ = aos::monotonic_clock::min_time;
+ velocity_ = 0;
+ filtered_velocity_ = 0;
}
void TimestampFilter::Reset() {
@@ -87,11 +135,101 @@
last_time_ = aos::monotonic_clock::min_time;
base_offset_ = chrono::nanoseconds(0);
+
+ last_velocity_sample_time_ = aos::monotonic_clock::min_time;
+}
+
+void TimestampFilter::VelocitySample(
+ aos::monotonic_clock::time_point monotonic_now,
+ chrono::nanoseconds sample_ns) {
+ if (last_velocity_sample_time_ == aos::monotonic_clock::min_time) {
+ last_velocity_sample_time_ = monotonic_now;
+ last_velocity_sample_ns_ = sample_ns;
+ velocity_ = 0.0;
+ state_velocity_ = 0.0;
+ filtered_velocity_ = 0.0;
+ last_sample_ns_ = sample_ns;
+ filtered_velocity_time_ = 0.5;
+ } else {
+ chrono::duration<double> elapsed_time =
+ chrono::duration_cast<chrono::duration<double>>(
+ monotonic_now - last_velocity_sample_time_);
+
+ velocity_ = chrono::duration_cast<chrono::duration<double>>(sample_ns -
+ last_sample_ns_)
+ .count() /
+ chrono::duration_cast<chrono::duration<double>>(monotonic_now -
+ last_time_)
+ .count();
+ if (sample_ns - last_velocity_sample_ns_ <
+ chrono::duration_cast<chrono::nanoseconds>(
+ chrono::duration<double>(elapsed_time.count() * kMaxVelocity()))) {
+ state_velocity_ = chrono::duration_cast<chrono::duration<double>>(
+ sample_ns - last_velocity_sample_ns_)
+ .count() /
+ elapsed_time.count();
+ last_velocity_sample_ns_ = sample_ns;
+ last_velocity_sample_time_ = monotonic_now;
+
+ constexpr double kSampleTime = 1.0;
+
+ // Limit the weight of historical time. This makes it so we slow down
+ // over time, but don't slow down forever.
+ const double clamped_time =
+ std::min(kSampleTime, filtered_velocity_time_);
+
+ // Compute a weighted average of the previous velocity and the new
+ // velocity. The filtered velocity is weighted by a time it has been
+ // accumulated over, and the sample velocity is purely based on the
+ // elapsed time.
+ const double unclamped_velocity =
+ (filtered_velocity_ * clamped_time +
+ std::clamp(state_velocity_, -0.1, kMaxVelocity()) *
+ elapsed_time.count()) /
+ (clamped_time + elapsed_time.count());
+
+ filtered_velocity_ =
+ std::clamp(unclamped_velocity, -kMaxVelocity(), kMaxVelocity());
+ filtered_velocity_time_ += elapsed_time.count();
+ }
+ }
+ last_sample_ns_ = sample_ns;
+}
+
+void ClippedAverageFilter::SetFwdCsvFileName(std::string_view name) {
+ fwd_csv_file_name_ = name;
+ fwd_fp_ = fopen(absl::StrCat(fwd_csv_file_name_, ".csv").c_str(), "w");
+ ClippedAverageFilterPrintHeader(fwd_fp_);
+}
+
+void ClippedAverageFilter::SetRevCsvFileName(std::string_view name) {
+ rev_csv_file_name_ = name;
+ rev_fp_ = fopen(absl::StrCat(rev_csv_file_name_, ".csv").c_str(), "w");
+ ClippedAverageFilterPrintHeader(rev_fp_);
+}
+
+void ClippedAverageFilter::set_first_fwd_time(
+ aos::monotonic_clock::time_point time) {
+ first_fwd_time_ = time;
+ if (fwd_fp_) {
+ fwd_fp_ = freopen(NULL, "wb", fwd_fp_);
+ ClippedAverageFilterPrintHeader(fwd_fp_);
+ }
+}
+
+void ClippedAverageFilter::set_first_rev_time(
+ aos::monotonic_clock::time_point time) {
+ first_rev_time_ = time;
+ if (rev_fp_) {
+ rev_fp_ = freopen(NULL, "wb", rev_fp_);
+ ClippedAverageFilterPrintHeader(rev_fp_);
+ }
}
void ClippedAverageFilter::FwdSet(
aos::monotonic_clock::time_point monotonic_now,
chrono::nanoseconds sample_ns) {
+ VLOG(1) << "Fwd Set";
fwd_.Set(monotonic_now, sample_ns);
Update(monotonic_now, &last_fwd_time_);
}
@@ -99,26 +237,40 @@
void ClippedAverageFilter::FwdSample(
aos::monotonic_clock::time_point monotonic_now,
chrono::nanoseconds sample_ns) {
+ VLOG(1) << &fwd_ << " Fwd sample now " << monotonic_now << " sample "
+ << sample_ns.count();
fwd_.Sample(monotonic_now, sample_ns);
Update(monotonic_now, &last_fwd_time_);
- if (fwd_fp != nullptr) {
+ if (fwd_fp_ != nullptr) {
if (first_fwd_time_ == aos::monotonic_clock::min_time) {
first_fwd_time_ = monotonic_now;
}
- fprintf(fwd_fp, "%f, %f, %f, %f\n",
- chrono::duration_cast<chrono::duration<double>>(monotonic_now -
- first_fwd_time_)
- .count(),
- chrono::duration_cast<chrono::duration<double>>(sample_ns).count(),
- fwd_.offset() + fwd_.base_offset_double(),
- chrono::duration_cast<chrono::duration<double>>(offset()).count());
+ fprintf(
+ fwd_fp_,
+ "%.9f, %.9f, %.9f, %.9f, %.9f, %.9f, %.9f, %.9f, %.9f, %9.9f, %9.9f, "
+ "%9.9f, %.9f\n",
+ chrono::duration_cast<chrono::duration<double>>(monotonic_now -
+ first_fwd_time_)
+ .count(),
+ chrono::duration_cast<chrono::duration<double>>(sample_ns).count(),
+ fwd_.offset() + fwd_.base_offset_double(),
+ chrono::duration_cast<chrono::duration<double>>(offset()).count(),
+ fwd_.velocity(), fwd_.filtered_velocity(),
+ chrono::duration_cast<chrono::duration<double>>(
+ monotonic_now.time_since_epoch())
+ .count(),
+ offset_velocity_, fwd_.last_velocity_sample(),
+ fwd_.velocity_contribution(), fwd_.sample_contribution(),
+ fwd_.time_contribution(), fwd_.clamp());
+ fflush(fwd_fp_);
}
}
void ClippedAverageFilter::RevSet(
aos::monotonic_clock::time_point monotonic_now,
chrono::nanoseconds sample_ns) {
+ VLOG(1) << "Rev set";
rev_.Set(monotonic_now, sample_ns);
Update(monotonic_now, &last_rev_time_);
}
@@ -126,20 +278,32 @@
void ClippedAverageFilter::RevSample(
aos::monotonic_clock::time_point monotonic_now,
chrono::nanoseconds sample_ns) {
+ VLOG(1) << "Rev sample";
rev_.Sample(monotonic_now, sample_ns);
Update(monotonic_now, &last_rev_time_);
- if (rev_fp != nullptr) {
+ if (rev_fp_ != nullptr) {
if (first_rev_time_ == aos::monotonic_clock::min_time) {
first_rev_time_ = monotonic_now;
}
- fprintf(rev_fp, "%f, %f, %f, %f\n",
- chrono::duration_cast<chrono::duration<double>>(monotonic_now -
- first_rev_time_)
- .count(),
- chrono::duration_cast<chrono::duration<double>>(sample_ns).count(),
- rev_.offset() + rev_.base_offset_double(),
- chrono::duration_cast<chrono::duration<double>>(offset()).count());
+ fprintf(
+ rev_fp_,
+ "%.9f, %.9f, %.9f, %.9f, %.9f, %.9f, %.9f, %.9f, %.9f, %9.9f, %9.9f, "
+ "%9.9f, %.9f\n",
+ chrono::duration_cast<chrono::duration<double>>(monotonic_now -
+ first_rev_time_)
+ .count(),
+ chrono::duration_cast<chrono::duration<double>>(sample_ns).count(),
+ rev_.offset() + rev_.base_offset_double(),
+ chrono::duration_cast<chrono::duration<double>>(offset()).count(),
+ rev_.velocity(), rev_.filtered_velocity(),
+ chrono::duration_cast<chrono::duration<double>>(
+ monotonic_now.time_since_epoch())
+ .count(),
+ offset_velocity_, rev_.last_velocity_sample(),
+ rev_.velocity_contribution(), rev_.sample_contribution(),
+ rev_.time_contribution(), rev_.clamp());
+ fflush(rev_fp_);
}
}
@@ -152,11 +316,25 @@
base_offset_ = base_offset;
last_fwd_time_ = aos::monotonic_clock::min_time;
last_rev_time_ = aos::monotonic_clock::min_time;
+
+ if (fwd_fp_) {
+ fprintf(fwd_fp_, "# Closing and opening\n");
+ fclose(fwd_fp_);
+ fwd_fp_ = NULL;
+ SetFwdCsvFileName(fwd_csv_file_name_);
+ }
+ if (rev_fp_) {
+ fprintf(rev_fp_, "# Closing and opening\n");
+ fclose(rev_fp_);
+ rev_fp_ = NULL;
+ SetRevCsvFileName(rev_csv_file_name_);
+ }
}
void ClippedAverageFilter::Reset() {
base_offset_ = chrono::nanoseconds(0);
offset_ = 0;
+ offset_velocity_ = 0;
last_fwd_time_ = aos::monotonic_clock::min_time;
last_rev_time_ = aos::monotonic_clock::min_time;
@@ -165,6 +343,17 @@
fwd_.Reset();
rev_.Reset();
+
+ if (fwd_fp_) {
+ fclose(fwd_fp_);
+ fwd_fp_ = NULL;
+ SetFwdCsvFileName(fwd_csv_file_name_);
+ }
+ if (rev_fp_) {
+ fclose(rev_fp_);
+ rev_fp_ = NULL;
+ SetRevCsvFileName(rev_csv_file_name_);
+ }
}
void ClippedAverageFilter::Update(
@@ -177,28 +366,42 @@
const double hard_max = fwd_.offset();
const double hard_min = -rev_.offset();
const double average = (hard_max + hard_min) / 2.0;
+ VLOG(1) << " Max(fwd) " << hard_max << " min(rev) " << hard_min;
// We don't want to clip the offset to the hard min/max. We really want to
- // keep it within a band around the middle. ratio of 0.5 means stay within
- // +- 0.25 of the middle of the hard min and max.
- constexpr double kBand = 0.5;
+ // keep it within a band around the middle. ratio of 0.3 means stay within
+ // +- 0.15 of the middle of the hard min and max.
+ constexpr double kBand = 0.3;
const double max = average + kBand / 2.0 * (hard_max - hard_min);
const double min = average - kBand / 2.0 * (hard_max - hard_min);
// Update regardless for the first sample from both the min and max.
if (*last_time == aos::monotonic_clock::min_time) {
+ VLOG(1) << " No last time " << average;
offset_ = average;
+ offset_velocity_ = 0.0;
} else {
// Do just a time constant based update. We can afford to be slow here
// for smoothness.
- constexpr double kTau = 10.0;
- const double alpha =
- -std::expm1(-chrono::duration_cast<chrono::duration<double>>(
- monotonic_now - *last_time)
- .count() /
- kTau);
+ constexpr double kTau = 5.0;
+ constexpr double kTauVelocity = 0.75;
+ const double dt = chrono::duration_cast<chrono::duration<double>>(
+ monotonic_now - *last_time)
+ .count();
+ const double alpha = -std::expm1(-dt / kTau);
+ const double velocity_alpha = -std::expm1(-dt / kTauVelocity);
// Clamp it such that it remains in the min/max bounds.
+ offset_ += std::clamp(offset_velocity_, -kMaxVelocity(), kMaxVelocity()) *
+ dt / 2.0;
offset_ = std::clamp(offset_ - alpha * (offset_ - average), min, max);
+
+ offset_velocity_ =
+ offset_velocity_ -
+ velocity_alpha *
+ (offset_velocity_ -
+ (fwd_.filtered_velocity() - rev_.filtered_velocity()) / 2.0);
+
+ VLOG(1) << " last time " << offset_;
}
*last_time = monotonic_now;
@@ -209,7 +412,13 @@
*sample_pointer_ = offset_;
VLOG(1) << "Updating sample to " << offset_;
} else {
- LOG(WARNING) << "Don't have both samples.";
+ VLOG(1) << "Don't have both samples.";
+ if (last_fwd_time_ == aos::monotonic_clock::min_time) {
+ VLOG(1) << " Missing forward";
+ }
+ if (last_rev_time_ == aos::monotonic_clock::min_time) {
+ VLOG(1) << " Missing reverse";
+ }
}
}
}
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 7165488..3940f9c 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -12,6 +12,9 @@
namespace aos {
namespace message_bridge {
+// Max velocity to clamp the filter to in seconds/second.
+inline constexpr double kMaxVelocity() { return 0.001; }
+
// This class handles filtering differences between clocks across a network.
//
// The basic concept is that network latencies are highly asymmetric. They will
@@ -29,6 +32,11 @@
void Set(aos::monotonic_clock::time_point monotonic_now,
std::chrono::nanoseconds sample_ns);
+ double velocity_contribution() const { return velocity_contribution_; }
+ double sample_contribution() const { return sample_contribution_; }
+ double time_contribution() const { return time_contribution_; }
+ double clamp() const { return clamp_; }
+
// Updates with a new sample. monotonic_now is the timestamp of the sample on
// the destination node, and sample_ns is destination_time - source_time.
void Sample(aos::monotonic_clock::time_point monotonic_now,
@@ -37,7 +45,10 @@
// Updates the base_offset, and compensates offset while we are here.
void set_base_offset(std::chrono::nanoseconds base_offset);
- double offset() const { return offset_; }
+ double offset() const {
+ VLOG(1) << " " << this << " offset " << offset_;
+ return offset_;
+ }
std::chrono::nanoseconds base_offset() const { return base_offset_; }
@@ -51,29 +62,53 @@
return last_time_ != aos::monotonic_clock::min_time;
}
+ double velocity() const { return state_velocity_; }
+ double filtered_velocity() const { return filtered_velocity_; }
+ double last_velocity_sample() const {
+ return std::chrono::duration_cast<std::chrono::duration<double>>(
+ last_velocity_sample_ns_)
+ .count();
+ }
+
+ aos::monotonic_clock::time_point last_time() const { return last_time_; }
+
void Reset();
private:
+ void VelocitySample(aos::monotonic_clock::time_point monotonic_now,
+ std::chrono::nanoseconds sample_ns);
+
double offset_ = 0;
+ double state_velocity_ = 0.0;
+ double velocity_ = 0;
+ double filtered_velocity_ = 0;
+ double filtered_velocity_time_ = 0;
+
+ double velocity_contribution_ = 0.0;
+ double clamp_ = 0.0;
+ double sample_contribution_ = 0.0;
+ double time_contribution_ = 0.0;
+
+ std::chrono::nanoseconds last_sample_ns_;
aos::monotonic_clock::time_point last_time_ = aos::monotonic_clock::min_time;
std::chrono::nanoseconds base_offset_{0};
+
+ aos::monotonic_clock::time_point last_velocity_sample_time_ =
+ aos::monotonic_clock::min_time;
+ std::chrono::nanoseconds last_velocity_sample_ns_{0};
};
// This class combines the a -> b offsets with the b -> a offsets and
// aggressively filters the results.
-struct ClippedAverageFilter {
- // If not nullptr, timestamps will get written to these two files for
- // debugging.
- FILE *fwd_fp = nullptr;
- FILE *rev_fp = nullptr;
-
+class ClippedAverageFilter {
+ public:
~ClippedAverageFilter() {
- if (fwd_fp != nullptr) {
- fclose(fwd_fp);
+ if (fwd_fp_ != nullptr) {
+ fclose(fwd_fp_);
}
- if (rev_fp != nullptr) {
- fclose(rev_fp);
+ if (rev_fp_ != nullptr) {
+ fclose(rev_fp_);
}
}
@@ -127,6 +162,21 @@
void Reset();
+ aos::monotonic_clock::time_point fwd_last_time() const {
+ return fwd_.last_time();
+ }
+ aos::monotonic_clock::time_point rev_last_time() const {
+ return rev_.last_time();
+ }
+
+ // If not nullptr, timestamps will get written to these two files for
+ // debugging.
+ void SetFwdCsvFileName(std::string_view name);
+ void SetRevCsvFileName(std::string_view name);
+
+ void set_first_fwd_time(aos::monotonic_clock::time_point time);
+ void set_first_rev_time(aos::monotonic_clock::time_point time);
+
private:
// Updates the offset estimate given the current time, and a pointer to the
// variable holding the last time.
@@ -142,6 +192,7 @@
std::chrono::nanoseconds base_offset_ = std::chrono::nanoseconds(0);
// Dynamic part of the offset.
double offset_ = 0;
+ double offset_velocity_ = 0;
// Last time we had a sample for a direction.
aos::monotonic_clock::time_point last_fwd_time_ =
@@ -157,6 +208,11 @@
// Pointer to copy the sample to when it is updated.
double *sample_pointer_ = nullptr;
+
+ std::string rev_csv_file_name_;
+ std::string fwd_csv_file_name_;
+ FILE *fwd_fp_ = nullptr;
+ FILE *rev_fp_ = nullptr;
};
} // namespace message_bridge
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index d06923e..213353a 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -39,8 +39,9 @@
filter.Sample(aos::monotonic_clock::epoch() + chrono::seconds(3),
chrono::milliseconds(0));
- EXPECT_GT(filter.offset(), -0.9999);
- EXPECT_LT(filter.offset(), -0.999);
+ // We have velocity now, so we will continue.
+ EXPECT_GT(filter.offset(), -1.001);
+ EXPECT_LT(filter.offset(), -1.0001);
EXPECT_EQ(filter.base_offset(), chrono::seconds(0));
EXPECT_TRUE(filter.has_sample());
}