Merge "Remove old starter and libevent"
diff --git a/WORKSPACE b/WORKSPACE
index 1b0c85c..6118a37 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -657,27 +657,6 @@
)
http_archive(
- name = "webrtc_x64",
- build_file = "@//debian:webrtc.BUILD",
- sha256 = "bd212b2a112a043d08d27f49027091788fa01c7c2ac5f072d096c17d9dbd976f",
- url = "https://www.frc971.org/Build-Dependencies/webrtc-30326-1a68679-linux-x64.tar.gz",
-)
-
-http_archive(
- name = "webrtc_arm",
- build_file = "@//debian:webrtc.BUILD",
- sha256 = "c34badaf313877cd03a0dfd6b71de024d806a7652550a7f1cd7dea523a7c813d",
- url = "https://www.frc971.org/Build-Dependencies/webrtc-30326-1a68679-linux-arm.tar.gz",
-)
-
-http_archive(
- name = "webrtc_rio",
- build_file = "@//debian:webrtc.BUILD",
- sha256 = "d86d3b030099b35ae5ea31c807fb4d0b0352598e79f1ea84877e5504e185faa8",
- url = "https://www.frc971.org/Build-Dependencies/webrtc-30376-4c4735b-linux-rio.tar.gz",
-)
-
-http_archive(
name = "build_bazel_rules_nodejs",
sha256 = "0d9660cf0894f1fe1e9840818553e0080fbce0851169812d77a70bdb9981c946",
urls = ["https://www.frc971.org/Build-Dependencies/rules_nodejs-0.37.0.tar.gz"],
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 05555c6..e295fc1 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -653,7 +653,10 @@
// data on that channel. Not all channels will end at the same point in
// time since they can be in different files.
VLOG(1) << "Found the last message on channel "
- << timestamped_message.channel_index;
+ << timestamped_message.channel_index << ", "
+ << configuration::CleanedChannelToString(
+ logged_configuration()->channels()->Get(
+ timestamped_message.channel_index));
// The user might be working with log files from 1 node but forgot to
// configure the infrastructure to log data for a remote channel on that
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 42e115b..2aa313d 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -412,8 +412,14 @@
cc_library(
name = "web_proxy",
- srcs = ["web_proxy.cc"],
- hdrs = ["web_proxy.h"],
+ srcs = [
+ "rawrtc.cc",
+ "web_proxy.cc",
+ ],
+ hdrs = [
+ "rawrtc.h",
+ "web_proxy.h",
+ ],
copts = [
"-DWEBRTC_POSIX",
],
@@ -426,9 +432,9 @@
"//aos/events:shm_event_loop",
"//aos/mutex",
"//aos/seasocks:seasocks_logger",
- "//third_party:webrtc",
"//third_party/seasocks",
"@com_github_google_glog//:glog",
+ "@com_github_rawrtc_rawrtc//:rawrtc",
],
)
@@ -502,7 +508,6 @@
"//aos/events:simulated_event_loop",
"//aos/events/logging:logfile_utils",
"//aos/time",
- "@com_github_stevengj_nlopt//:nlopt",
"@org_tuxfamily_eigen//:eigen",
],
)
@@ -543,7 +548,5 @@
":testing_time_converter",
":timestamp_filter",
"//aos/testing:googletest",
- "//third_party/gmp",
- "@com_github_stevengj_nlopt//:nlopt",
],
)
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 292d24d..d7bbff0 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -10,7 +10,6 @@
#include "aos/network/timestamp_filter.h"
#include "aos/time/time.h"
#include "glog/logging.h"
-#include "nlopt.h"
DEFINE_bool(timestamps_to_csv, false,
"If true, write all the time synchronization information to a set "
@@ -43,101 +42,6 @@
// TODO(austin): Add a rate of change constraint from the last sample. 1
// ms/s. Figure out how to define it. Do this last. This lets us handle
// constraints going away, and constraints close in time.
-//
-// TODO(austin): When the new newton's method solver prooves it's worth, kill
-// the old SLSQP solver. It will be unreachable for a little bit.
-
-std::vector<double> TimestampProblem::SolveDouble() {
- MaybeUpdateNodeMapping();
- // TODO(austin): Add constraints for relevant segments.
- const size_t n = filters_.size() - 1u;
- // NLOPT_LD_MMA and NLOPT_LD_LBFGS are alternative solvers, but SLSQP is a
- // better fit for the quadratic nature of this problem.
- nlopt_opt opt = nlopt_create(NLOPT_LD_SLSQP, n);
- nlopt_set_min_objective(opt, TimestampProblem::DoCost, this);
-
- // Ask for really good. This is very quadratic, so it should be pretty
- // precise.
- nlopt_set_xtol_rel(opt, 1e-9);
-
- cost_call_count_ = 0;
-
- std::vector<double> result(n, 0.0);
- double minf = 0.0;
- nlopt_result status = nlopt_optimize(opt, result.data(), &minf);
- if (status < 0) {
- if (status == NLOPT_ROUNDOFF_LIMITED) {
- constexpr double kTolerance = 1e-9;
- std::vector<double> gradient(n, 0.0);
- Cost(result.data(), gradient.data());
- for (double g : gradient) {
- if (std::abs(g) > kTolerance) {
- // If we failed, update base_clock_ to the current time so it gets
- // printed inside Debug and explode with a CHECK saying the same
- // thing.
- std::vector<monotonic_clock::time_point> new_base =
- DoubleToMonotonic(result.data());
- // Put the result into base_clock_ so Debug prints out something
- // useful.
- base_clock_ = std::move(new_base);
- Debug();
- CHECK_LE(std::abs(g), kTolerance)
- << ": Optimization problem failed with a large gradient. "
- << nlopt_result_to_string(status);
- }
- }
- } else {
- LOG(FATAL) << "Failed to solve optimization problem "
- << nlopt_result_to_string(status);
- }
- }
-
- if (VLOG_IS_ON(1)) {
- PrintSolution(result);
- }
- nlopt_destroy(opt);
- return result;
-}
-
-void TimestampProblem::PrintSolution(const std::vector<double> &solution) {
- const size_t n = filters_.size() - 1u;
- std::vector<double> gradient(n, 0.0);
- const double minf = Cost(solution.data(), gradient.data());
-
- // High precision formatter for the gradient.
- struct MyFormatter {
- void operator()(std::string *out, double i) const {
- std::stringstream ss;
- ss << std::setprecision(12) << std::fixed << i;
- out->append(ss.str());
- }
- };
-
- LOG(INFO) << std::setprecision(12) << std::fixed << "Found minimum at f("
- << absl::StrJoin(solution, ", ") << ") -> " << minf << " grad ["
- << absl::StrJoin(gradient, ", ", MyFormatter()) << "] after "
- << cost_call_count_ << " cycles for node " << solution_node_ << ".";
-}
-
-std::vector<monotonic_clock::time_point> TimestampProblem::DoubleToMonotonic(
- const double *r) const {
- std::vector<monotonic_clock::time_point> result(filters_.size());
- for (size_t i = 0; i < result.size(); ++i) {
- if (live(i)) {
- result[i] = base_clock(i) + std::chrono::nanoseconds(static_cast<int64_t>(
- std::round(get_t(r, i))));
- } else {
- result[i] = monotonic_clock::min_time;
- }
- }
-
- return result;
-}
-
-std::vector<monotonic_clock::time_point> TimestampProblem::Solve() {
- std::vector<double> solution = SolveDouble();
- return DoubleToMonotonic(solution.data());
-}
bool TimestampProblem::ValidateSolution(
std::vector<monotonic_clock::time_point> solution) {
@@ -384,99 +288,26 @@
return result;
}
-double TimestampProblem::Cost(const double *time_offsets, double *grad) {
- ++cost_call_count_;
-
- if (grad != nullptr) {
- for (size_t i = 0; i < filters_.size() - 1u; ++i) {
- grad[i] = 0;
- }
-
- for (size_t i = 0u; i < filters_.size(); ++i) {
- for (const struct FilterPair &filter : filters_[i]) {
- if (i != solution_node_) {
- grad[NodeToSolutionIndex(i)] += filter.filter->DCostDta(
- base_clock_[i], get_t(time_offsets, i),
- base_clock_[filter.b_index], get_t(time_offsets, filter.b_index));
- }
- if (filter.b_index != solution_node_) {
- grad[NodeToSolutionIndex(filter.b_index)] += filter.filter->DCostDtb(
- base_clock_[i], get_t(time_offsets, i),
- base_clock_[filter.b_index], get_t(time_offsets, filter.b_index));
- }
- }
- }
- }
-
- double cost = 0;
- for (size_t i = 0u; i < filters_.size(); ++i) {
- for (const struct FilterPair &filter : filters_[i]) {
- cost += filter.filter->Cost(base_clock_[i], get_t(time_offsets, i),
- base_clock_[filter.b_index],
- get_t(time_offsets, filter.b_index));
- }
- }
-
- if (VLOG_IS_ON(2)) {
- struct MyFormatter {
- void operator()(std::string *out, monotonic_clock::time_point t) const {
- std::stringstream ss;
- ss << t;
- out->append(ss.str());
- }
- void operator()(std::string *out, double i) const {
- std::stringstream ss;
- ss << std::setprecision(12) << std::fixed << i;
- out->append(ss.str());
- }
- };
-
- std::string gradient;
- if (grad) {
- std::stringstream ss;
- ss << " grad ["
- << absl::StrJoin(absl::Span<const double>(grad, LiveNodesCount() - 1u),
- ", ", MyFormatter())
- << "]";
- gradient = ss.str();
- }
-
- LOG(INFO) << std::setprecision(12) << std::fixed
- << "Evaluated minimum at f("
- << absl::StrJoin(DoubleToMonotonic(time_offsets), ", ",
- MyFormatter())
- << ") -> " << cost << gradient << " after " << cost_call_count_
- << " cycles.";
- }
- return cost;
-}
-
void TimestampProblem::Debug() {
MaybeUpdateNodeMapping();
LOG(INFO) << "Solving for node " << solution_node_ << " at "
<< base_clock_[solution_node_];
- std::vector<std::string> cost;
- for (size_t i = 0u; i < filters_.size(); ++i) {
- for (const struct FilterPair &filter : filters_[i]) {
- cost.emplace_back(filter.filter->DebugCost(base_clock_[i], 0.0,
- base_clock_[filter.b_index],
- 0.0, i, filter.b_index));
- }
- }
- LOG(INFO) << "Cost: " << absl::StrJoin(cost, " + ");
-
std::vector<std::vector<std::string>> gradients(filters_.size());
for (size_t i = 0u; i < filters_.size(); ++i) {
- std::string gradient = "0.0";
for (const struct FilterPair &filter : filters_[i]) {
- if (i != solution_node_ && live(i)) {
- gradients[i].emplace_back(filter.filter->DebugDCostDta(
- base_clock_[i], 0.0, base_clock_[filter.b_index], 0.0, i,
- filter.b_index));
- }
- if (filter.b_index != solution_node_ && live(filter.b_index)) {
- gradients[filter.b_index].emplace_back(filter.filter->DebugDCostDtb(
+ if (live(i) && live(filter.b_index)) {
+ // TODO(austin): This should be right, but I haven't gone and spent a
+ // bunch of time making sure it all matches perfectly. We aren't
+ // hitting this anymore. I'm also likely the one who will be debugging
+ // it next and would rather spend the time debugging it when I get a bug
+ // report.
+ gradients[i].emplace_back(
+ std::string("- ") +
+ filter.filter->DebugOffsetError(base_clock_[i], 0.0,
+ base_clock_[filter.b_index], 0.0, i,
+ filter.b_index));
+ gradients[filter.b_index].emplace_back(filter.filter->DebugOffsetError(
base_clock_[i], 0.0, base_clock_[filter.b_index], 0.0, i,
filter.b_index));
}
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index 338a515..929c837 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -57,40 +57,10 @@
filters_[a_index].emplace_back(filter, b_index);
}
- // Solves the optimization problem phrased and returns the offsets from the
- // base clock for each node, excluding the solution node.
- std::vector<double> SolveDouble();
- // Solves the optimization problem phrased and returns the optimal time on
- // each node.
- std::vector<monotonic_clock::time_point> Solve();
-
// Solves the optimization problem phrased using the symmetric Netwon's method
// solver and returns the optimal time on each node.
std::vector<monotonic_clock::time_point> SolveNewton();
- // Returns the squared error for all of the offsets.
- // time_offsets is the offsets from the base_clock for every node (in order)
- // except the solution node. It should be one element shorter than the number
- // of nodes this problem was constructed with. grad (if non-nullptr) is the
- // place to put the current gradient and needs to be the same length as
- // time_offsets.
- double Cost(const double *time_offsets, double *grad);
-
- // Returns the time offset from base for a node.
- double get_t(const double *time_offsets, size_t node_index) const {
- if (node_index == solution_node_) return 0.0;
- size_t mapped_index = NodeToSolutionIndex(node_index);
- if (mapped_index == std::numeric_limits<size_t>::max()) {
- return 0.0;
- }
- return time_offsets[mapped_index];
- }
-
- // Converts a list of solutions to the corresponding monotonic times for all
- // nodes, not just the nodes being solved.
- std::vector<monotonic_clock::time_point> DoubleToMonotonic(
- const double *r) const;
-
// Validates the solution, returning true if it meets all the constraints, and
// false otherwise.
bool ValidateSolution(std::vector<monotonic_clock::time_point> solution);
@@ -115,21 +85,7 @@
return count;
}
- // LOG(INFO)'s the provided solution, showing the arguments, the minimum
- // value, and the gradient.
- void PrintSolution(const std::vector<double> &solution);
-
private:
- // Static trampoline for nlopt. n is the number of constraints, time_offsets
- // is input solution to solve for, grad is the gradient to fill out (if not
- // nullptr), and data is an untyped pointer to a TimestampProblem.
- static double DoCost(unsigned n, const double *time_offsets, double *grad,
- void *data) {
- CHECK_EQ(n + 1u,
- reinterpret_cast<TimestampProblem *>(data)->filters_.size());
- return reinterpret_cast<TimestampProblem *>(data)->Cost(time_offsets, grad);
- }
-
// Returns the Hessian of the cost function at time_offsets.
Eigen::MatrixXd Hessian(const Eigen::Ref<Eigen::VectorXd> time_offsets) const;
// Returns the gradient of the cost function at time_offsets.
@@ -158,17 +114,6 @@
node_mapping_valid_ = true;
}
- // Converts from a node index to an index in the solution.
- size_t NodeToSolutionIndex(size_t node_index) const {
- CHECK_NE(node_index, solution_node_);
- // The solver is going to provide us a matrix with solution_node_ removed.
- // The indices of all nodes before solution_node_ are in the same spot, and
- // the indices of the nodes after solution node are shifted over.
- size_t mapped_node_index = NodeToFullSolutionIndex(node_index);
- return node_index < solution_node_ ? mapped_node_index
- : (mapped_node_index - 1);
- }
-
// Converts from a node index to an index in the solution without skipping the
// solution node.
size_t NodeToFullSolutionIndex(size_t node_index) const {
@@ -176,9 +121,6 @@
return node_mapping_[node_index];
}
- // Number of times Cost has been called for tracking.
- int cost_call_count_ = 0;
-
// The node to hold fixed when solving.
size_t solution_node_ = 0;
diff --git a/aos/network/multinode_timestamp_filter_test.cc b/aos/network/multinode_timestamp_filter_test.cc
index 1d13601..f1cf0aa 100644
--- a/aos/network/multinode_timestamp_filter_test.cc
+++ b/aos/network/multinode_timestamp_filter_test.cc
@@ -8,8 +8,6 @@
#include "aos/network/multinode_timestamp_filter.h"
#include "aos/network/testing_time_converter.h"
#include "gtest/gtest.h"
-#include "nlopt.h"
-#include "third_party/gmp/gmpxx.h"
namespace aos {
namespace message_bridge {
@@ -18,140 +16,6 @@
namespace chrono = std::chrono;
using aos::monotonic_clock;
-// Converts a int64_t into a mpq_class. This only uses 32 bit precision
-// internally, so it will work on ARM. This should only be used on 64 bit
-// platforms to test out the 32 bit implementation.
-inline mpq_class FromInt64(int64_t i) {
- uint64_t absi = std::abs(i);
- mpq_class bits(static_cast<uint32_t>((absi >> 32) & 0xffffffffu));
- bits *= mpq_class(0x10000);
- bits *= mpq_class(0x10000);
- bits += mpq_class(static_cast<uint32_t>(absi & 0xffffffffu));
-
- if (i < 0) {
- return -bits;
- } else {
- return bits;
- }
-}
-
-// Class to hold an affine function for the time offset.
-// O(t) = slope * t + offset
-//
-// This is stored using mpq_class, which stores everything as full rational
-// fractions.
-class Line {
- public:
- Line() {}
-
- // Constructs a line given the offset and slope.
- Line(mpq_class offset, mpq_class slope) : offset_(offset), slope_(slope) {}
-
- // TODO(austin): Remove this one.
- Line(std::chrono::nanoseconds offset, double slope)
- : offset_(DoFromInt64(offset.count())), slope_(slope) {}
-
- // Fits a line to 2 points and returns the associated line.
- static Line Fit(
- const std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> a,
- const std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds>
- b);
-
- // Returns the full precision slopes and offsets.
- mpq_class mpq_offset() const { return offset_; }
- mpq_class mpq_slope() const { return slope_; }
- void increment_mpq_offset(mpq_class increment) { offset_ += increment; }
-
- // Returns the rounded offsets and slopes.
- std::chrono::nanoseconds offset() const {
- double o = offset_.get_d();
- return std::chrono::nanoseconds(static_cast<int64_t>(o));
- }
- double slope() const { return slope_.get_d(); }
-
- std::string DebugString() const {
- std::stringstream ss;
- ss << "Offset " << mpq_offset() << " slope " << mpq_slope();
- return ss.str();
- }
-
- void Debug() const { LOG(INFO) << DebugString(); }
-
- // Returns the offset at a given time.
- // TODO(austin): get_d() ie double -> int64 can't be accurate...
- std::chrono::nanoseconds Eval(monotonic_clock::time_point pt) const {
- mpq_class result =
- mpq_class(FromInt64(pt.time_since_epoch().count())) * slope_ + offset_;
- return std::chrono::nanoseconds(static_cast<int64_t>(result.get_d()));
- }
-
- private:
- static mpq_class DoFromInt64(int64_t i) {
-#if GMP_NUMB_BITS == 32
- return FromInt64(i);
-#else
- return i;
-#endif
- }
-
- mpq_class offset_;
- mpq_class slope_;
-};
-
-Line Line::Fit(
- const std::tuple<monotonic_clock::time_point, chrono::nanoseconds> a,
- const std::tuple<monotonic_clock::time_point, chrono::nanoseconds> b) {
- mpq_class slope = FromInt64((std::get<1>(b) - std::get<1>(a)).count()) /
- FromInt64((std::get<0>(b) - std::get<0>(a)).count());
- slope.canonicalize();
- mpq_class offset =
- FromInt64(std::get<1>(a).count()) -
- FromInt64(std::get<0>(a).time_since_epoch().count()) * slope;
- offset.canonicalize();
- Line f(offset, slope);
- return f;
-}
-
-mpq_class SolveExact(Line la, Line lb, monotonic_clock::time_point ta) {
- mpq_class ma = la.mpq_slope();
- mpq_class ba = la.mpq_offset();
- mpq_class mb = lb.mpq_slope();
- mpq_class bb = lb.mpq_offset();
- // The min of a quadratic is when the slope is 0. Solve algebraically.
- //
- // 2.0 * (tb - (1 + d.ma) * ta - d.ba) + 2.0 * ((1.0 + d.mb) * tb - ta +
- // d.bb) * (1.0 + d.mb) = 0;
- //
- // tb - (1 + d.ma) * ta - d.ba + ((1.0 + d.mb) *
- // tb - ta + d.bb) * (1.0 + d.mb) = 0;
- //
- // tb - (1 + d.ma) * ta - d.ba + (1 + d.mb) (1 + d.mb) * tb - (1 + d.mb) ta
- // + (1 + d.mb) d.bb = 0;
- //
- // (1 + (1 + d.mb) (1 + d.mb)) tb - ((1 + d.ma) + (1
- // + d.mb)) * ta - d.ba + (1 + d.mb) d.bb = 0;
- //
- // tb = (((1 + d.ma) + (1 + d.mb)) * ta + d.ba - (1 + d.mb) d.bb) / (1 + (1
- // + d.mb) (1 + d.mb))
-
- mpq_class mpq_ta(FromInt64(ta.time_since_epoch().count()));
- mpq_class one(1);
- mpq_class mpq_tb =
- (((one + ma) + (one + mb)) * mpq_ta + ba - (one + mb) * bb) /
- (one + (one + mb) * (one + mb));
- mpq_tb.canonicalize();
- return mpq_tb;
-}
-
-Line FitLine(const NoncausalTimestampFilter &filter) {
- if (filter.timestamps_size() == 1) {
- Line fit(std::get<1>(filter.timestamp(0)), 0.0);
- return fit;
- } else {
- return Line::Fit(filter.timestamp(0), filter.timestamp(1));
- }
-}
-
// Tests solution time(s) comparison and measure of invalid / inconsistent times
TEST(TimestampProblemTest, CompareTimes) {
const monotonic_clock::time_point e = monotonic_clock::epoch();
@@ -213,93 +77,6 @@
CHECK_EQ(InvalidDistance(times_b_mixed, times_a).count(), 3000);
}
-// Tests that an infinite precision solution matches our numeric solver solution
-// for a couple of simple problems.
-TEST(TimestampProblemTest, Solve) {
- const monotonic_clock::time_point e = monotonic_clock::epoch();
- const monotonic_clock::time_point ta = e + chrono::milliseconds(500);
-
- NoncausalTimestampFilter a(nullptr, nullptr);
- // Delivered at e, sent at e + offset.
- // Sent at 1.001, received at 0
- a.Sample(e, chrono::milliseconds(1001));
- a.Sample(e + chrono::milliseconds(1000), chrono::milliseconds(1001));
- a.Sample(e + chrono::milliseconds(3000), chrono::milliseconds(999));
-
- NoncausalTimestampFilter b(nullptr, nullptr);
- // Sent at 0.001s, received at 1.000s
- b.Sample(e + chrono::milliseconds(1000), -chrono::milliseconds(999));
- b.Sample(e + chrono::milliseconds(2000), -chrono::milliseconds(1000));
- b.Sample(e + chrono::milliseconds(4000), -chrono::milliseconds(1002));
-
- TimestampProblem problem(2);
- problem.set_base_clock(0, ta);
- problem.set_base_clock(1, e);
- problem.set_solution_node(0);
- problem.add_filter(0, &a, 1);
- problem.add_filter(1, &b, 0);
-
- // Solve the problem with infinite precision as a verification and compare the
- // result.
- {
- const std::vector<double> result = problem.SolveDouble();
-
- mpq_class tb_mpq =
- SolveExact(FitLine(a), FitLine(b), problem.base_clock(0));
- EXPECT_EQ(tb_mpq.get_d(), result[0])
- << std::setprecision(12) << std::fixed << " Expected " << tb_mpq.get_d()
- << " " << tb_mpq << " got " << result[0];
- }
-
- // Solve some other timestamps for grins.
- {
- problem.set_base_clock(0, e + chrono::milliseconds(500));
- std::vector<double> result = problem.SolveDouble();
-
- mpq_class tb_mpq =
- SolveExact(FitLine(a), FitLine(b), problem.base_clock(0));
-
- EXPECT_EQ(tb_mpq.get_d(), result[0])
- << std::setprecision(12) << std::fixed << " Expected " << tb_mpq.get_d()
- << " " << tb_mpq << " got " << result[0];
- }
-
- // Now do the second line segment.
- {
- NoncausalTimestampFilter a(nullptr, nullptr);
- a.Sample(e + chrono::milliseconds(1000), chrono::milliseconds(1001));
- a.Sample(e + chrono::milliseconds(3000), chrono::milliseconds(999));
-
- NoncausalTimestampFilter b(nullptr, nullptr);
- b.Sample(e + chrono::milliseconds(2000), -chrono::milliseconds(1000));
- b.Sample(e + chrono::milliseconds(4000), -chrono::milliseconds(1002));
- {
- problem.set_base_clock(0, e + chrono::milliseconds(1500));
- const std::vector<double> result = problem.SolveDouble();
-
- mpq_class tb_mpq =
- SolveExact(FitLine(a), FitLine(b), problem.base_clock(0));
-
- EXPECT_NEAR(tb_mpq.get_d(), result[0], 1e-6)
- << std::setprecision(12) << std::fixed << " Expected "
- << tb_mpq.get_d() << " " << tb_mpq << " got " << result[0];
- }
-
- {
- problem.set_base_clock(0, e + chrono::milliseconds(1600));
- const std::vector<double> result = problem.SolveDouble();
-
- mpq_class tb_mpq =
- SolveExact(FitLine(a), FitLine(b), problem.base_clock(0));
-
- EXPECT_NEAR(tb_mpq.get_d(), result[0], 1e-6)
- << std::setprecision(12) << std::fixed << " Expected "
- << tb_mpq.get_d() << " " << tb_mpq << " got " << result[0]
- << " difference of " << tb_mpq.get_d() - result[0];
- }
- }
-}
-
// Tests that a single timestamp InterpolatedTimeConverter returns equal
// results. 1 second should be 1 second everywhere.
TEST(InterpolatedTimeConverterTest, OneTime) {
diff --git a/aos/network/rawrtc.cc b/aos/network/rawrtc.cc
new file mode 100644
index 0000000..a92b408
--- /dev/null
+++ b/aos/network/rawrtc.cc
@@ -0,0 +1,283 @@
+#include "aos/network/rawrtc.h"
+
+extern "C" {
+#include <rawrtc.h>
+
+#include "external/com_github_rawrtc_rawrtc_common/include/rawrtcc/utils.h"
+}
+
+#include <functional>
+#include <string>
+
+#include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace web_proxy {
+namespace {
+enum {
+ TRANSPORT_BUFFER_LENGTH = 1048576, // 1 MiB
+};
+}
+
+ScopedDataChannel::ScopedDataChannel() {}
+
+void ScopedDataChannel::Open(struct rawrtc_peer_connection *connection,
+ const std::string &label) {
+ label_ = label;
+ VLOG(1) << "(" << this << ") Opening " << label_;
+ struct rawrtc_data_channel_parameters *channel_parameters;
+ // Create data channel parameters
+ // TODO(austin): TYPE?
+ CHECK_RAWRTC(rawrtc_data_channel_parameters_create(
+ &channel_parameters, label.c_str(),
+ RAWRTC_DATA_CHANNEL_TYPE_RELIABLE_ORDERED, 0, NULL, false, 0));
+
+ // Create data channel
+ CHECK_RAWRTC(rawrtc_peer_connection_create_data_channel(
+ &data_channel_, connection, channel_parameters,
+ StaticDataChannelOpenHandler, StaticBufferedAmountLowHandler,
+ StaticDataChannelErrorHandler, StaticDataChannelCloseHandler,
+ StaticDataChannelMessageHandler, this));
+
+ // Un-reference data channel parameters
+ mem_deref(channel_parameters);
+}
+
+void ScopedDataChannel::Open(struct rawrtc_data_channel *const channel) {
+ struct rawrtc_data_channel_parameters *parameters;
+ enum rawrtc_code const ignore[] = {RAWRTC_CODE_NO_VALUE};
+ char *label = NULL;
+
+ // Get data channel label and protocol
+ CHECK_RAWRTC(rawrtc_data_channel_get_parameters(¶meters, channel));
+ CHECK_RAWRTC_IGNORE(
+ rawrtc_data_channel_parameters_get_label(&label, parameters), ignore);
+ if (label) {
+ label_ = label;
+ mem_deref(label);
+ }
+ mem_deref(parameters);
+
+ VLOG(1) << "(" << this << ") New data channel instance: " << label_;
+
+ mem_ref(channel);
+ data_channel_ = channel;
+
+ CHECK_RAWRTC(rawrtc_data_channel_set_arg(data_channel_, this));
+
+ CHECK_RAWRTC(rawrtc_data_channel_set_open_handler(
+ data_channel_, StaticDataChannelOpenHandler));
+
+ CHECK_RAWRTC(rawrtc_data_channel_set_buffered_amount_low_handler(
+ data_channel_, StaticBufferedAmountLowHandler));
+
+ CHECK_RAWRTC(rawrtc_data_channel_set_error_handler(
+ data_channel_, StaticDataChannelErrorHandler));
+
+ CHECK_RAWRTC(rawrtc_data_channel_set_close_handler(
+ data_channel_, StaticDataChannelCloseHandler));
+
+ CHECK_RAWRTC(rawrtc_data_channel_set_message_handler(
+ data_channel_, StaticDataChannelMessageHandler));
+}
+
+ScopedDataChannel::~ScopedDataChannel() {
+ CHECK(opened_);
+ CHECK(closed_);
+ CHECK(data_channel_ == nullptr)
+ << ": Destroying open data channel " << this << ".";
+}
+
+void ScopedDataChannel::StaticDataChannelOpenHandler(void *const arg) {
+ ScopedDataChannel *const client = reinterpret_cast<ScopedDataChannel *>(arg);
+ CHECK(!client->opened_);
+ CHECK(!client->closed_);
+ if (client->on_open_) client->on_open_();
+ client->opened_ = true;
+}
+
+void ScopedDataChannel::StaticBufferedAmountLowHandler(void *const arg) {
+ ScopedDataChannel *const client = reinterpret_cast<ScopedDataChannel *>(arg);
+ if (client->on_buffered_amount_low_) client->on_buffered_amount_low_();
+}
+
+void ScopedDataChannel::StaticDataChannelErrorHandler(void *const arg) {
+ ScopedDataChannel *const client = reinterpret_cast<ScopedDataChannel *>(arg);
+ if (client->on_error_) client->on_error_();
+}
+
+void ScopedDataChannel::StaticDataChannelCloseHandler(void *const arg) {
+ ScopedDataChannel *const client = reinterpret_cast<ScopedDataChannel *>(arg);
+ CHECK(client->opened_);
+ CHECK(!client->closed_);
+ // Close() assumes that this method will do the final cleanup. The destructor
+ // CHECKs that.
+ client->closed_ = true;
+ struct rawrtc_data_channel *data_channel = client->data_channel_;
+ client->data_channel_ = nullptr;
+ if (client->on_close_) {
+ // Take the function so we can call it without referencing client.
+ // This could destroy the client when the function is deleted by releasing
+ // any shared_ptrs.
+ std::function<void()> on_close = std::move(client->on_close_);
+ on_close();
+ }
+ mem_deref(data_channel);
+}
+
+void ScopedDataChannel::StaticDataChannelMessageHandler(
+ struct mbuf *const
+ buffer, // nullable (in case partial delivery has been requested)
+ enum rawrtc_data_channel_message_flag const flags, void *const arg) {
+ ScopedDataChannel *const client = reinterpret_cast<ScopedDataChannel *>(arg);
+ if (client->on_message_) client->on_message_(buffer, flags);
+}
+
+void ScopedDataChannel::Close() {
+ CHECK(opened_);
+ CHECK(!closed_);
+ CHECK_RAWRTC(rawrtc_data_channel_close(data_channel_));
+}
+
+void ScopedDataChannel::Send(const ::flatbuffers::DetachedBuffer &buffer) {
+ struct mbuf *mbuffer = mbuf_alloc(buffer.size());
+ mbuf_write_mem(mbuffer, buffer.data(), buffer.size());
+ mbuf_set_pos(mbuffer, 0);
+
+ Send(mbuffer);
+
+ mem_deref(mbuffer);
+}
+
+void ScopedDataChannel::Send(struct mbuf *buffer) {
+ // TODO(austin): Checking isn't right, handle errors more gracefully.
+ CHECK_RAWRTC(
+ rawrtc_data_channel_send(CHECK_NOTNULL(data_channel_), buffer, true));
+}
+
+uint64_t ScopedDataChannel::buffered_amount() {
+ return 0;
+
+ // TODO(austin): Not implemented yet...
+ uint64_t result;
+ CHECK_RAWRTC(rawrtc_data_channel_get_buffered_amount(
+ &result, CHECK_NOTNULL(data_channel_)));
+ return result;
+}
+
+RawRTCConnection::RawRTCConnection() {}
+
+void RawRTCConnection::Open() {
+ const char *const stun_google_com_urls[] = {"stun:stun.l.google.com:19302",
+ "stun:stun1.l.google.com:19302"};
+
+ struct rawrtc_peer_connection_configuration *configuration = nullptr;
+
+ CHECK_RAWRTC(rawrtc_peer_connection_configuration_create(
+ &configuration, RAWRTC_ICE_GATHER_POLICY_ALL));
+
+ // Add ICE servers to configuration
+ CHECK_RAWRTC(rawrtc_peer_connection_configuration_add_ice_server(
+ configuration, stun_google_com_urls, ARRAY_SIZE(stun_google_com_urls),
+ NULL, NULL, RAWRTC_ICE_CREDENTIAL_TYPE_NONE));
+
+ // Set the SCTP transport's buffer length
+ CHECK_RAWRTC(rawrtc_peer_connection_configuration_set_sctp_buffer_length(
+ configuration, TRANSPORT_BUFFER_LENGTH, TRANSPORT_BUFFER_LENGTH));
+
+ // Create peer connection
+ CHECK_RAWRTC(rawrtc_peer_connection_create(
+ &connection_, configuration, StaticNegotiationNeededHandler,
+ StaticLocalCandidateHandler,
+ StaticPeerConnectionLocalCandidateErrorHandler,
+ StaticSignalingStateChangeHandler, StaticIceTransportStateChangeHandler,
+ StaticIceGathererStateChangeHandler, StaticConnectionStateChangeHandler,
+ StaticDataChannelHandler, this));
+
+ mem_deref(configuration);
+}
+
+RawRTCConnection::~RawRTCConnection() {
+ CHECK_RAWRTC(rawrtc_peer_connection_close(connection_));
+ mem_deref(connection_);
+ connection_ = nullptr;
+}
+
+void RawRTCConnection::StaticNegotiationNeededHandler(void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ if (client->on_negotiation_needed_) client->on_negotiation_needed_();
+}
+
+void RawRTCConnection::StaticLocalCandidateHandler(
+ struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url, // read-only
+ void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ if (client->on_local_candidate_) client->on_local_candidate_(candidate, url);
+}
+
+void RawRTCConnection::StaticPeerConnectionLocalCandidateErrorHandler(
+ struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url, uint16_t const error_code,
+ char const *const error_text, void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ LOG(ERROR) << "(" << client << ") ICE candidate error, URL: " << url
+ << ", reason: " << error_text;
+ if (client->on_peer_connection_local_candidate_error_)
+ client->on_peer_connection_local_candidate_error_(candidate, url,
+ error_code, error_text);
+}
+
+void RawRTCConnection::StaticSignalingStateChangeHandler(
+ const enum rawrtc_signaling_state state, void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ VLOG(1) << "(" << client << ") Signaling state change: "
+ << rawrtc_signaling_state_to_name(state);
+ if (client->on_signaling_state_change_)
+ client->on_signaling_state_change_(state);
+}
+
+void RawRTCConnection::StaticIceTransportStateChangeHandler(
+ const enum rawrtc_ice_transport_state state, void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ VLOG(1) << "(" << client << ") ICE transport state: "
+ << rawrtc_ice_transport_state_to_name(state);
+ if (client->on_ice_transport_state_change_)
+ client->on_ice_transport_state_change_(state);
+}
+
+void RawRTCConnection::StaticIceGathererStateChangeHandler(
+ const enum rawrtc_ice_gatherer_state state, void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ VLOG(1) << "(" << client << ") ICE gatherer state: "
+ << rawrtc_ice_gatherer_state_to_name(state);
+ if (client->on_ice_gatherer_state_change_)
+ client->on_ice_gatherer_state_change_(state);
+}
+
+void RawRTCConnection::StaticConnectionStateChangeHandler(
+ const enum rawrtc_peer_connection_state state, // read-only
+ void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ VLOG(1) << "(" << client << ") Peer connection state change: "
+ << rawrtc_peer_connection_state_to_name(state);
+ if (client->on_connection_state_change_)
+ client->on_connection_state_change_(state);
+}
+
+void RawRTCConnection::StaticDataChannelHandler(
+ struct rawrtc_data_channel
+ *const channel, // read-only, MUST be referenced when used
+ void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ if (client->on_data_channel_) {
+ std::shared_ptr<ScopedDataChannel> new_channel =
+ std::make_shared<ScopedDataChannel>();
+ new_channel->Open(channel);
+ client->on_data_channel_(std::move(new_channel));
+ }
+}
+
+} // namespace web_proxy
+} // namespace aos
diff --git a/aos/network/rawrtc.h b/aos/network/rawrtc.h
new file mode 100644
index 0000000..3f37435
--- /dev/null
+++ b/aos/network/rawrtc.h
@@ -0,0 +1,223 @@
+#ifndef AOS_NETWORK_RAWRTC_H_
+#define AOS_NETWORK_RAWRTC_H_
+
+#include <functional>
+#include <string>
+
+extern "C" {
+#include <rawrtc.h>
+
+#include "external/com_github_rawrtc_rawrtc_common/include/rawrtcc/utils.h"
+}
+
+#include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace web_proxy {
+
+// TODO(austin): This doesn't allow streaming data in.
+#define CHECK_RAWRTC(x) \
+ [&]() { \
+ enum rawrtc_code r = x; \
+ return CHECK(r == RAWRTC_CODE_SUCCESS) \
+ << " actual " << rawrtc_code_to_str(r); \
+ }()
+
+#define CHECK_RAWRTC_IGNORE(x, i) \
+ [&]() { \
+ enum rawrtc_code r = x; \
+ for (auto w : i) { \
+ if (w == r) return; \
+ } \
+ return CHECK(r == RAWRTC_CODE_SUCCESS); \
+ }()
+
+// Wrapper around a RawRTC data channel to manage it's lifetime and provide C++
+// callbacks for all the callbacks.
+//
+// There are 3 phases of the object's lifetime.
+// 1) Initialization. Callbacks can be set here.
+// 2) Open. Calling Open transitions the channel to be open and triggers the
+// on_open callback to be called.
+// 3) Close. This must be called before destroying the channel and calls the
+// on_close callback and shuts down the channel.
+class ScopedDataChannel {
+ public:
+ ScopedDataChannel();
+ ScopedDataChannel(const ScopedDataChannel &) = delete;
+ ScopedDataChannel &operator=(const ScopedDataChannel &) = delete;
+
+ ~ScopedDataChannel();
+
+ // Setters for all the callbacks. These may be called whenever.
+
+ // Registers a callback to be called when the channel is opened. This only
+ // gets called once during or after Open is called.
+ void set_on_open(std::function<void()> &&fn) { on_open_ = std::move(fn); }
+
+ // Registers a callback to be called when the channel is closed. This only
+ // gets called once during or after Close is called.
+ void set_on_close(std::function<void()> &&fn) { on_close_ = std::move(fn); }
+
+ void set_on_buffered_amount_low(std::function<void()> &&fn) {
+ on_buffered_amount_low_ = std::move(fn);
+ }
+ void set_on_error(std::function<void()> &&fn) { on_error_ = std::move(fn); }
+ void set_on_message(
+ std::function<void(struct mbuf *const,
+ enum rawrtc_data_channel_message_flag const)> &&fn) {
+ on_message_ = std::move(fn);
+ }
+
+ // Opens the channel on the provided connection with the provided label. This
+ // is separate so we can optionally register callbacks before opening.
+ void Open(struct rawrtc_peer_connection *connection,
+ const std::string &label);
+ // Takes over an already open channel.
+ void Open(struct rawrtc_data_channel *channel);
+
+ // Closes the channel. It must be open first.
+ void Close();
+
+ // Sends a buffer.
+ void Send(const ::flatbuffers::DetachedBuffer &buffer);
+ void Send(struct mbuf *buffer);
+
+ std::string_view label() const { return label_; }
+
+ // Returns the amount of data buffered.
+ uint64_t buffered_amount();
+
+ private:
+ // Trampolines from C -> C++.
+ static void StaticDataChannelOpenHandler(void *const arg);
+ static void StaticBufferedAmountLowHandler(void *const arg);
+ static void StaticDataChannelErrorHandler(void *const arg);
+ static void StaticDataChannelCloseHandler(void *const arg);
+ static void StaticDataChannelMessageHandler(
+ struct mbuf *const
+ buffer, // nullable (in case partial delivery has been requested)
+ enum rawrtc_data_channel_message_flag const flags, void *const arg);
+
+ // Our channel and the label for it.
+ std::string label_;
+ struct rawrtc_data_channel *data_channel_ = nullptr;
+
+ bool opened_ = false;
+ bool closed_ = false;
+
+ std::function<void()> on_open_;
+ std::function<void()> on_buffered_amount_low_;
+ std::function<void()> on_error_;
+ std::function<void()> on_close_;
+ std::function<void(struct mbuf *const,
+ enum rawrtc_data_channel_message_flag const)>
+ on_message_;
+
+ // Self referential pointer to keep ourselves in scope until close() gets
+ // called.
+ std::shared_ptr<ScopedDataChannel> self_;
+};
+
+// Wraper around a RawRTC connection to both manage it's lifetime and provide
+// std::function interfaces for the callbacks.
+class RawRTCConnection {
+ public:
+ RawRTCConnection();
+
+ virtual ~RawRTCConnection();
+
+ void set_on_negotiation_needed(std::function<void()> &&fn) {
+ on_negotiation_needed_ = std::move(fn);
+ }
+ void set_on_local_candidate(
+ std::function<void(struct rawrtc_peer_connection_ice_candidate *,
+ char const *)> &&fn) {
+ on_local_candidate_ = std::move(fn);
+ }
+ // Sets the handler for a peer connection local candidate error. Arguments
+ // are the candidate, URL, error_code and error_text.
+ void set_on_peer_connection_local_candidate_error(
+ std::function<void(struct rawrtc_peer_connection_ice_candidate *,
+ char const *, uint16_t, char const *)> &&fn) {
+ on_peer_connection_local_candidate_error_ = std::move(fn);
+ }
+ void set_on_signaling_state_change(
+ std::function<void(enum rawrtc_signaling_state const)> &&fn) {
+ on_signaling_state_change_ = std::move(fn);
+ }
+ void set_on_ice_transport_state_change(
+ std::function<void(const enum rawrtc_ice_transport_state)> &&fn) {
+ on_ice_transport_state_change_ = std::move(fn);
+ }
+ void set_on_ice_gatherer_state_change(
+ std::function<void(const enum rawrtc_ice_gatherer_state)> &&fn) {
+ on_ice_gatherer_state_change_ = std::move(fn);
+ }
+ void set_on_connection_state_change(
+ std::function<void(const enum rawrtc_peer_connection_state)> &&fn) {
+ on_connection_state_change_ = std::move(fn);
+ }
+
+ // TODO(austin): Really, this should be a ScopedDataChannel object.
+ void set_on_data_channel(
+ std::function<void(std::shared_ptr<ScopedDataChannel>)> &&fn) {
+ on_data_channel_ = std::move(fn);
+ }
+
+ // Opens the connection. This lets us register callbacks before starting it.
+ void Open();
+
+ // Returns the connection if Open has been called.
+ struct rawrtc_peer_connection *connection() {
+ return connection_;
+ }
+
+ private:
+ // Trampolines from C -> C++.
+ static void StaticNegotiationNeededHandler(void *const arg);
+ static void StaticLocalCandidateHandler(
+ struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url, void *const arg);
+ static void StaticPeerConnectionLocalCandidateErrorHandler(
+ struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url, uint16_t const error_code,
+ char const *const error_text, void *const arg);
+ static void StaticSignalingStateChangeHandler(
+ const enum rawrtc_signaling_state state, void *const arg);
+ static void StaticIceTransportStateChangeHandler(
+ const enum rawrtc_ice_transport_state state, void *const arg);
+ static void StaticIceGathererStateChangeHandler(
+ const enum rawrtc_ice_gatherer_state state, void *const arg);
+ static void StaticConnectionStateChangeHandler(
+ const enum rawrtc_peer_connection_state state, void *const arg);
+ static void StaticDataChannelHandler(
+ struct rawrtc_data_channel *const channel, void *const arg);
+
+ // The connection.
+ struct rawrtc_peer_connection *connection_ = nullptr;
+
+ // Callbacks
+ std::function<void()> on_negotiation_needed_;
+ std::function<void(struct rawrtc_peer_connection_ice_candidate *,
+ char const *)>
+ on_local_candidate_;
+ std::function<void(struct rawrtc_peer_connection_ice_candidate *,
+ char const *, uint16_t, char const *)>
+ on_peer_connection_local_candidate_error_;
+ std::function<void(enum rawrtc_signaling_state const)>
+ on_signaling_state_change_;
+ std::function<void(const enum rawrtc_ice_transport_state)>
+ on_ice_transport_state_change_;
+ std::function<void(const enum rawrtc_ice_gatherer_state)>
+ on_ice_gatherer_state_change_;
+ std::function<void(const enum rawrtc_peer_connection_state)>
+ on_connection_state_change_;
+ std::function<void(std::shared_ptr<ScopedDataChannel>)> on_data_channel_;
+};
+
+} // namespace web_proxy
+} // namespace aos
+
+#endif // AOS_NETWORK_RAWRTC_H_
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index dd77f40..1041b97 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -753,164 +753,7 @@
((tb - ta) - offset.second);
}
-double NoncausalTimestampFilter::Cost(aos::monotonic_clock::time_point ta_base,
- double ta,
- aos::monotonic_clock::time_point tb_base,
- double tb) const {
- NormalizeTimestamps(&ta_base, &ta);
- NormalizeTimestamps(&tb_base, &tb);
-
- // Squaring the error throws away half the digits. The optimizer uses the
- // gradient heavily to compensate, so we don't need to care much about
- // computing this carefully.
- return std::pow(OffsetError(ta_base, ta, tb_base, tb), 2.0);
-}
-
-double NoncausalTimestampFilter::DCostDta(
- aos::monotonic_clock::time_point ta_base, double ta,
- aos::monotonic_clock::time_point tb_base, double tb) const {
- // As a reminder, our cost function is:
- // (OffsetError(ta, tb))^2
- // ie
- // ((tb - ta - Offset(ta))^2
- //
- // Assuming Offset(ta) = m * ta + ba (linear): this becomes
- // ((tb - ta - (ma ta + ba))^2
- // ie
- // ((tb - (1 + ma) ta - ba)^2
- //
- // d cost/dta =>
- // 2 * (tb - (1 + ma) ta - ba) * (-(1 + ma))
- //
- // We don't actually want to compute tb - (1 + ma) ta for numerical precision
- // reasons. The important digits are small compared to the offset. Given our
- // original cost above, this is equivalent to:
- // - 2 * (tb - ta - Offset(ta)) * (1 + ma)
- //
- // We can compute this a lot more accurately.
-
- // Go find our timestamps for the interpolation.
- // Rather than lookup timestamps a number of times, look them up here and
- // inline the implementation of OffsetError.
- NormalizeTimestamps(&ta_base, &ta);
- NormalizeTimestamps(&tb_base, &tb);
-
- if (IsOutsideSamples(ta_base, ta)) {
- double slope = kMaxVelocity();
- if (IsAfterSamples(ta_base, ta)) {
- // If we're past the last sample point, use a negative slope
- slope = -kMaxVelocity();
- }
- // This implements the high precision version of the above equation:
- // - 2 * (tb - ta - Offset(ta)) * (1 + ma)
- // = -2 * OffsetError(ta,tb) * (1 + ma)
- // Where for extrapolated data, we have to extrapolate for Offset(ta)
- // and use +/- kMaxVelocity() for ma
- // (slope is positive if timepoint is before our first sample, and
- // negative if the point is after our last sample)
- return -2.0 * OffsetError(ta_base, ta, tb_base, tb) * (1.0 + slope);
- }
-
- std::pair<std::tuple<monotonic_clock::time_point, chrono::nanoseconds>,
- std::tuple<monotonic_clock::time_point, chrono::nanoseconds>>
- points = FindTimestamps(ta_base, ta);
-
- const double m =
- static_cast<double>(
- (std::get<1>(points.second) - std::get<1>(points.first)).count()) /
- static_cast<double>(
- (std::get<0>(points.second) - std::get<0>(points.first)).count());
- // Subtract the integer offsets first and then handle the double remainder to
- // keep precision up.
- //
- // (tb - ta - Offset(ta)) ->
- // (tb_base - ta_base - OffsetBase + tb - ta - OffsetRemainder)
- // NOTE: We don't use OffsetError function here, in order to avoid
- // extra calls to FindTimestamps
- return -2.0 *
- (static_cast<double>((tb_base - ta_base -
- NoncausalTimestampFilter::InterpolateOffset(
- points.first, points.second, ta_base, ta))
- .count()) +
- (tb - ta) -
- NoncausalTimestampFilter::InterpolateOffsetRemainder(
- points.first, points.second, ta_base, ta)) *
- (1.0 + m);
-}
-
-std::string NoncausalTimestampFilter::DebugDCostDta(
- aos::monotonic_clock::time_point ta_base, double ta,
- aos::monotonic_clock::time_point tb_base, double tb, size_t node_a,
- size_t node_b) const {
- NormalizeTimestamps(&ta_base, &ta);
- NormalizeTimestamps(&tb_base, &tb);
-
- if (IsOutsideSamples(ta_base, ta)) {
- auto reference_timestamp = GetReferenceTimestamp(ta_base, ta);
- double slope = kMaxVelocity();
- std::string note = "_";
- if (IsAfterSamples(ta_base, ta)) {
- slope = -kMaxVelocity();
- note = "^";
- }
- // d cost / dta ==>
- // - 2 * (tb - ta - (ta - ref) * ma - ref_offset) * (1 + ma)
- return absl::StrFormat(
- "-2. * (t%d - t%d - ((t%d - %d) * %f - %d.) * (1 + %f.)%s", node_b,
- node_a, node_a,
- std::get<0>(reference_timestamp).time_since_epoch().count(), slope,
- std::get<1>(reference_timestamp).count(), slope, note);
- }
-
- std::pair<std::tuple<monotonic_clock::time_point, chrono::nanoseconds>,
- std::tuple<monotonic_clock::time_point, chrono::nanoseconds>>
- points = FindTimestamps(ta_base, ta);
-
- // As a reminder, our cost function is essentially:
- // ((tb - ta - (ma ta + ba))^2
- // ie
- // ((tb - (1 + ma) ta - ba)^2
- //
- // d cost/dta =>
- // 2 * (tb - (1 + ma) ta - ba) * (-(1 + ma))
-
- const int64_t rise =
- (std::get<1>(points.second) - std::get<1>(points.first)).count();
- const int64_t run =
- (std::get<0>(points.second) - std::get<0>(points.first)).count();
-
- if (rise == 0) {
- return absl::StrFormat("-2. * (t%d - t%d %c %d.)", node_b, node_a,
- std::get<1>(points.first).count() >= 0 ? '-' : '+',
- std::abs(std::get<1>(points.first).count()));
- } else {
- return absl::StrFormat(
- "-2. * (t%d - t%d - (t%d - %d.) * %d. / %d. - %d.) * (1 + %d. / "
- "%d.)",
- node_b, node_a, node_a,
- std::get<0>(points.first).time_since_epoch().count(), rise, run,
- std::get<1>(points.first).count(), rise, run);
- }
-}
-
-double NoncausalTimestampFilter::DCostDtb(
- aos::monotonic_clock::time_point ta_base, double ta,
- aos::monotonic_clock::time_point tb_base, double tb) const {
- // As a reminder, our cost function is:
- // (OffsetError(ta, tb))^2
- //
- // d cost/dtb =>
- // 2 * OffsetError(ta, tb) * d/dtb OffsetError(ta, tb)
- //
- // OffsetError => (tb - (1 + ma) ta - ba), so
- // d/dtb OffsetError(ta, tb) = 1
- //
- // d cost/dtb => 2 * OffsetError(ta, tb)
- NormalizeTimestamps(&ta_base, &ta);
- return 2.0 * OffsetError(ta_base, ta, tb_base, tb);
-}
-
-std::string NoncausalTimestampFilter::DebugDCostDtb(
+std::string NoncausalTimestampFilter::DebugOffsetError(
aos::monotonic_clock::time_point ta_base, double ta,
aos::monotonic_clock::time_point tb_base, double tb, size_t node_a,
size_t node_b) const {
@@ -962,58 +805,6 @@
rise, run, std::get<1>(points.first).count());
}
-std::string NoncausalTimestampFilter::DebugCost(
- aos::monotonic_clock::time_point ta_base, double ta,
- aos::monotonic_clock::time_point tb_base, double tb, size_t node_a,
- size_t node_b) const {
- NormalizeTimestamps(&ta_base, &ta);
- NormalizeTimestamps(&tb_base, &tb);
-
- if (IsOutsideSamples(ta_base, ta)) {
- auto reference_timestamp = GetReferenceTimestamp(ta_base, ta);
- double slope = kMaxVelocity();
- std::string note = "_";
- if (IsAfterSamples(ta_base, ta)) {
- slope = -kMaxVelocity();
- note = "^";
- }
- // Cost = OffsetError(ta, tb) ** 2 = (tb - ta - Offset(ta, tb)) ** 2 ==>
- // (tb - ta - ((ta - ref) * ma - ref_offset)
- return absl::StrFormat(
- "(t%d - t%d - (t%d - %d.) * %f - %d.)%s ** 2.", node_b, node_a, node_a,
- std::get<0>(reference_timestamp).time_since_epoch().count(), slope,
- std::get<1>(reference_timestamp).count(), note);
- }
-
- std::pair<std::tuple<monotonic_clock::time_point, chrono::nanoseconds>,
- std::tuple<monotonic_clock::time_point, chrono::nanoseconds>>
- points = FindTimestamps(ta_base, ta);
-
- // As a reminder, our cost function is essentially:
- // ((tb - ta - (ma ta + ba))^2
- // ie
- // ((tb - (1 + ma) ta - ba)^2
- //
- // d cost/dta =>
- // 2 * ((tb - (1 + ma) ta - ba)
-
- const int64_t rise =
- (std::get<1>(points.second) - std::get<1>(points.first)).count();
- const int64_t run =
- (std::get<0>(points.second) - std::get<0>(points.first)).count();
-
- if (rise == 0) {
- return absl::StrFormat("(t%d - t%d %c %d.) ** 2.", node_b, node_a,
- std::get<1>(points.first).count() < 0 ? '+' : '-',
- std::abs(std::get<1>(points.first).count()));
- } else {
- return absl::StrFormat("(t%d - t%d - (t%d - %d.) * %d. / %d. - %d.) ** 2.",
- node_b, node_a, node_a,
- std::get<0>(points.first).time_since_epoch().count(),
- rise, run, std::get<1>(points.first).count());
- }
-}
-
std::string NoncausalTimestampFilter::NodeNames() const {
return absl::StrCat(node_a_->name()->string_view(), " -> ",
node_b_->name()->string_view());
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 6090fce..73c9006 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -261,29 +261,11 @@
// offset at ta.
double OffsetError(aos::monotonic_clock::time_point ta_base, double ta,
aos::monotonic_clock::time_point tb_base, double tb) const;
-
- // Returns the cost (OffsetError^2), ie (ob - oa - offset(oa, ob))^2,
- // calculated accurately.
- // Since this is designed to be used with a gradient based solver, it isn't
- // super important if Cost is precise.
- double Cost(aos::monotonic_clock::time_point ta_base, double ta,
- aos::monotonic_clock::time_point tb_base, double tb) const;
- std::string DebugCost(aos::monotonic_clock::time_point ta_base, double ta,
- aos::monotonic_clock::time_point tb_base, double tb,
- size_t node_a, size_t node_b) const;
-
- // Returns the partial derivitive dcost/dta
- double DCostDta(aos::monotonic_clock::time_point ta_base, double ta,
- aos::monotonic_clock::time_point tb_base, double tb) const;
- std::string DebugDCostDta(aos::monotonic_clock::time_point ta_base, double ta,
- aos::monotonic_clock::time_point tb_base, double tb,
- size_t node_a, size_t node_b) const;
- // Returns the partial derivitive dcost/dtb
- double DCostDtb(aos::monotonic_clock::time_point ta_base, double ta,
- aos::monotonic_clock::time_point tb_base, double tb) const;
- std::string DebugDCostDtb(aos::monotonic_clock::time_point ta_base, double ta,
- aos::monotonic_clock::time_point tb_base, double tb,
- size_t node_a, size_t node_b) const;
+ // Returns the string representation of 2 * OffsetError(ta, tb)
+ std::string DebugOffsetError(aos::monotonic_clock::time_point ta_base,
+ double ta,
+ aos::monotonic_clock::time_point tb_base,
+ double tb, size_t node_a, size_t node_b) const;
// Confirms that the solution meets the constraints. Returns true on success.
bool ValidateSolution(aos::monotonic_clock::time_point ta,
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index a7e1dcf..95efe08 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -984,166 +984,6 @@
EXPECT_EQ(filter.Offset(t4, 0.0), std::make_pair(o3, offset_post));
}
-// Tests that the cost function handles a single point line properly, and the
-// derivatives are consistent. Do this with a massive offset to ensure that we
-// are subtracting out nominal offsets correctly to retain numerical precision
-// in the result.
-TEST_F(NoncausalTimestampFilterTest, CostAndSlopeSinglePoint) {
- const monotonic_clock::time_point e = monotonic_clock::epoch();
- const monotonic_clock::time_point t1 =
- e + chrono::nanoseconds(0) + chrono::seconds(10000000000);
- const chrono::nanoseconds o1 =
- chrono::nanoseconds(1000) - chrono::seconds(10000000000);
-
- NoncausalTimestampFilter filter(node_a, node_b);
-
- filter.Sample(t1, o1);
-
- // Spot check some known points.
- // NOTE that double arg must be 0 <= ta < 1. for Offset and OffsetError
- // We can handle values outside that range for Cost functions
- EXPECT_EQ(filter.Offset(t1, 0.0), std::make_pair(o1, 0.0));
- EXPECT_EQ(filter.Offset(t1, 0.5), std::make_pair(o1, -0.5 * kMaxVelocity()));
- EXPECT_EQ(filter.OffsetError(t1, 0.0, t1 + o1, 0.0), 0.0);
- EXPECT_EQ(filter.OffsetError(t1, 0.5, t1 + o1, 0.5), 0.5 * kMaxVelocity());
- EXPECT_EQ(filter.OffsetError(t1, 0.5, t1 + o1, 0.0),
- -0.5 + 0.5 * kMaxVelocity());
-
- EXPECT_EQ(filter.Cost(t1, 0.0, t1 + o1, 0.0), 0.0);
- EXPECT_EQ(filter.Cost(t1, 1.0, t1 + o1, 1.0),
- std::pow(-1.0 * kMaxVelocity(), 2.0));
- EXPECT_EQ(filter.Cost(t1, 1.0, t1 + o1, 0.0),
- std::pow(1.0 - kMaxVelocity(), 2.0));
- EXPECT_EQ(filter.Cost(t1, -0.5, t1 + o1, 0.0),
- std::pow(0.5 + 0.5 * kMaxVelocity(), 2.0));
- EXPECT_EQ(filter.Cost(t1, 0.5, t1 + o1, 0.0),
- std::pow(-0.5 + 0.5 * kMaxVelocity(), 2.0));
-
- constexpr double kDelta = 10.;
-
- // Perturb ta and tb so we make sure it works away from 0.
- // Avoid offsets of 0.0, since this function isn't well behaved there
- for (double ta_nominal : {-1000.0, 20.0, 1000.0}) {
- for (double tb_nominal : {-2000.0, 20.0, 2000.0}) {
- {
- // Evaluate functions around (ta (or tb) + kDelta)
- const double minus_costa =
- filter.Cost(t1, ta_nominal - kDelta, t1 + o1, tb_nominal);
- const double plus_costa =
- filter.Cost(t1, ta_nominal + kDelta, t1 + o1, tb_nominal);
-
- const double minus_costb =
- filter.Cost(t1, ta_nominal, t1 + o1, tb_nominal - kDelta);
- const double plus_costb =
- filter.Cost(t1, ta_nominal, t1 + o1, tb_nominal + kDelta);
-
- EXPECT_NEAR((plus_costa - minus_costa) / (2.0 * kDelta),
- filter.DCostDta(t1, ta_nominal, t1 + o1, tb_nominal), 1e-9);
- EXPECT_NEAR((plus_costb - minus_costb) / (2.0 * kDelta),
- filter.DCostDtb(t1, ta_nominal, t1 + o1, tb_nominal), 1e-9);
- }
- }
- }
-}
-
-TEST_F(NoncausalTimestampFilterTest, CostAndSlope) {
- const monotonic_clock::time_point e = monotonic_clock::epoch();
- // Note: t1, t2, t3 need to be picked such that the slope is small so filter
- // doesn't modify the timestamps.
- const monotonic_clock::time_point t1 =
- e + chrono::nanoseconds(0) + chrono::seconds(10000000000);
- const chrono::nanoseconds o1 =
- chrono::nanoseconds(1000) - chrono::seconds(10000000000);
-
- const monotonic_clock::time_point t2 =
- e + chrono::microseconds(1000) + chrono::seconds(10000000000);
- const chrono::nanoseconds o2 =
- chrono::nanoseconds(1500) - chrono::seconds(10000000000);
-
- const monotonic_clock::time_point t3 =
- e + chrono::microseconds(2000) + chrono::seconds(10000000000);
- const chrono::nanoseconds o3 =
- chrono::nanoseconds(500) - chrono::seconds(10000000000);
-
- NoncausalTimestampFilter filter(node_a, node_b);
-
- filter.Sample(t1, o1);
- filter.Sample(t2, o2);
- filter.Sample(t3, o3);
-
- // Spot check some known points.
- EXPECT_EQ(filter.OffsetError(t1, 0.0, t1 + o1, 0.0), 0.0);
- EXPECT_EQ(filter.OffsetError(t1, 0.5, t1 + o1, 0.5), -0.00025);
- EXPECT_EQ(filter.OffsetError(t2, 0.0, t2 + o2, 0.0), 0.0);
- EXPECT_EQ(filter.OffsetError(t3, 0.0, t3 + o3, 0.0), 0.0);
-
- EXPECT_EQ(filter.Cost(t1, 0.0, t1 + o1, 0.0), 0.0);
- EXPECT_EQ(filter.Cost(t2, 0.0, t2 + o2, 0.0), 0.0);
- EXPECT_EQ(filter.Cost(t3, 0.0, t3 + o3, 0.0), 0.0);
-
- // Perturb ta and tbd so we make sure it works away from 0.
- constexpr double kDelta = 10.;
-
- // Note: don't test 0 delta because that makes the computed slope at t2
- // wrong.
- for (double ta_nominal : {-1000.0, 20.0, 1000.0}) {
- for (double tb_nominal : {-4000.0, -2000.0, 20.0, 2000.0, 4000.0}) {
- // Check points round each of the 3 points in the polyline. Use 3 points
- // so if we mess up the point selection code, it shows up.
- {
- const double minus_costa =
- filter.Cost(t1, ta_nominal - kDelta, t1 + o1, tb_nominal);
- const double plus_costa =
- filter.Cost(t1, ta_nominal + kDelta, t1 + o1, tb_nominal);
-
- const double minus_costb =
- filter.Cost(t1, ta_nominal, t1 + o1, tb_nominal - kDelta);
- const double plus_costb =
- filter.Cost(t1, ta_nominal, t1 + o1, tb_nominal + kDelta);
-
- EXPECT_NEAR((plus_costa - minus_costa) / (2.0 * kDelta),
- filter.DCostDta(t1, ta_nominal, t1 + o1, tb_nominal), 1e-9);
- EXPECT_NEAR((plus_costb - minus_costb) / (2.0 * kDelta),
- filter.DCostDtb(t1, ta_nominal, t1 + o1, tb_nominal), 1e-9);
- }
-
- {
- const double minus_costa =
- filter.Cost(t2, ta_nominal - kDelta, t2 + o2, tb_nominal);
- const double plus_costa =
- filter.Cost(t2, ta_nominal + kDelta, t2 + o2, tb_nominal);
-
- const double minus_costb =
- filter.Cost(t2, ta_nominal, t2 + o2, tb_nominal - kDelta);
- const double plus_costb =
- filter.Cost(t2, ta_nominal, t2 + o2, tb_nominal + kDelta);
-
- EXPECT_NEAR((plus_costa - minus_costa) / (2.0 * kDelta),
- filter.DCostDta(t2, ta_nominal, t2 + o2, tb_nominal), 1e-9);
- EXPECT_NEAR((plus_costb - minus_costb) / (2.0 * kDelta),
- filter.DCostDtb(t2, ta_nominal, t2 + o2, tb_nominal), 1e-9);
- }
-
- {
- const double minus_costa =
- filter.Cost(t3, ta_nominal - kDelta, t3 + o3, tb_nominal);
- const double plus_costa =
- filter.Cost(t3, ta_nominal + kDelta, t3 + o3, tb_nominal);
-
- const double minus_costb =
- filter.Cost(t3, ta_nominal, t3 + o3, tb_nominal - kDelta);
- const double plus_costb =
- filter.Cost(t3, ta_nominal, t3 + o3, tb_nominal + kDelta);
-
- EXPECT_NEAR((plus_costa - minus_costa) / (2.0 * kDelta),
- filter.DCostDta(t3, ta_nominal, t3 + o3, tb_nominal), 1e-9);
- EXPECT_NEAR((plus_costb - minus_costb) / (2.0 * kDelta),
- filter.DCostDtb(t3, ta_nominal, t3 + o3, tb_nominal), 1e-9);
- }
- }
- }
-}
-
// Run a couple of points through the estimator and confirm it works.
TEST(NoncausalOffsetEstimatorTest, FullEstimator) {
const aos::FlatbufferDetachedBuffer<Node> node_a_buffer =
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index c762b63..b53feab 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -5,74 +5,119 @@
#include "aos/network/web_proxy_generated.h"
#include "aos/network/web_proxy_utils.h"
#include "aos/seasocks/seasocks_logger.h"
-#include "api/create_peerconnection_factory.h"
#include "glog/logging.h"
#include "internal/Embedded.h"
+extern "C" {
+#include <rawrtc.h>
+
+#define DEBUG_LEVEL 7
+#define DEBUG_MODULE "web-proxy"
+#include <re_dbg.h>
+struct list *tmrl_get(void);
+}
+
DEFINE_int32(proxy_port, 8080, "Port to use for the web proxy server.");
namespace aos {
namespace web_proxy {
-
-namespace {
-// Based on webrtc examples. In our controlled environment we expect setting sdp
-// to always succeed, and we can't do anything about a failure, so just ignore
-// everything.
-class DummySetSessionDescriptionObserver
- : public webrtc::SetSessionDescriptionObserver {
- public:
- static DummySetSessionDescriptionObserver *Create() {
- return new rtc::RefCountedObject<DummySetSessionDescriptionObserver>();
- }
- virtual void OnSuccess() {}
- virtual void OnFailure(webrtc::RTCError /*error*/) {}
-};
-
-} // namespace
-
WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
aos::EventLoop *event_loop, int buffer_size)
: server_(server),
config_(aos::CopyFlatBuffer(event_loop->configuration())),
event_loop_(event_loop) {
+ if (VLOG_IS_ON(2)) {
+ dbg_init(DBG_DEBUG, DBG_ALL);
+ }
+ CHECK_RAWRTC(rawrtc_init(true));
+
// We need to reference findEmbeddedContent() to make the linker happy...
findEmbeddedContent("");
- const aos::Node *self = event_loop->node();
+ const aos::Node *self = event_loop_->node();
- for (uint i = 0; i < event_loop->configuration()->channels()->size(); ++i) {
- auto channel = event_loop->configuration()->channels()->Get(i);
+ subscribers_.reserve(event_loop_->configuration()->channels()->size());
+ for (uint i = 0; i < event_loop_->configuration()->channels()->size(); ++i) {
+ auto channel = event_loop_->configuration()->channels()->Get(i);
if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
- auto fetcher = event_loop->MakeRawFetcher(channel);
+ auto fetcher = event_loop_->MakeRawFetcher(channel);
subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
std::move(fetcher), i, buffer_size));
+ } else {
+ subscribers_.emplace_back(nullptr);
}
}
- TimerHandler *const timer = event_loop->AddTimer([this]() {
+ TimerHandler *const timer = event_loop_->AddTimer([this]() {
for (auto &subscriber : subscribers_) {
- subscriber->RunIteration();
+ if (subscriber) subscriber->RunIteration();
}
});
- event_loop->OnRun([timer, event_loop]() {
- timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(100));
+ event_loop_->OnRun([this, timer]() {
+ timer->Setup(event_loop_->monotonic_now(), std::chrono::milliseconds(100));
});
}
void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
- std::unique_ptr<Connection> conn = std::make_unique<Connection>(
- sock, server_, subscribers_, config_, event_loop_);
- connections_.insert({sock, std::move(conn)});
+ std::unique_ptr<ApplicationConnection> connection =
+ std::make_unique<ApplicationConnection>(server_, sock, subscribers_,
+ config_, event_loop_);
+
+ connections_.insert({sock, std::move(connection)});
}
void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
size_t size) {
- connections_[sock]->HandleWebSocketData(data, size);
+ const FlatbufferSpan<WebSocketMessage> message({data, size});
+ if (!message.Verify()) {
+ LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
+ return;
+ }
+ VLOG(1) << "Got msg " << aos::FlatbufferToJson(message);
+ switch (message.message().payload_type()) {
+ case Payload::WebSocketSdp: {
+ const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
+ if (offer->type() != SdpType::OFFER) {
+ LOG(WARNING) << "Got the wrong sdp type from client";
+ break;
+ }
+ const flatbuffers::String *sdp = offer->payload();
+ connections_[sock]->OnSdp(sdp->c_str());
+ break;
+ }
+ case Payload::WebSocketIce: {
+ const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
+ connections_[sock]->OnIce(ice);
+ break;
+ }
+ default: { break; }
+ }
}
void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
connections_.erase(sock);
}
+// Global epoll pointer
+static aos::internal::EPoll *global_epoll = nullptr;
+
+static int ReFdListen(int fd, int flags, fd_h *fh, void *arg) {
+ if (flags & 0x1) {
+ global_epoll->OnReadable(fd, [fh, arg]() { (*fh)(0x1, arg); });
+ }
+ if (flags & 0x2) {
+ global_epoll->OnWriteable(fd, [fh, arg]() { (*fh)(0x2, arg); });
+ }
+ if (flags & 0x4) {
+ global_epoll->OnError(fd, [fh, arg]() { (*fh)(0x4, arg); });
+ }
+ return 0;
+}
+
+static void ReFdClose(int fd) {
+ CHECK(global_epoll != nullptr);
+ global_epoll->DeleteFd(fd);
+}
+
WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
: WebProxy(event_loop, &internal_epoll_, buffer_size) {}
@@ -86,6 +131,23 @@
::seasocks::Logger::Level::Info)),
websocket_handler_(
new WebsocketHandler(&server_, event_loop, buffer_size)) {
+ CHECK(!global_epoll);
+ global_epoll = epoll;
+
+ re_fd_set_listen_callback(&ReFdListen);
+ re_fd_set_close_callback(&ReFdClose);
+
+ epoll->BeforeWait([]() {
+ const uint64_t to = tmr_next_timeout(tmrl_get());
+ if (to != 0) {
+ VLOG(1) << "Next timeout " << to;
+ }
+ // Note: this only works because we are spinning on it...
+ // TODO(austin): If we choose to actually sleep, use a timerfd reserved just
+ // for handling tmr.
+ tmr_poll(tmrl_get());
+ });
+
server_.addWebSocketHandler("/ws", websocket_handler_);
CHECK(server_.startListening(FLAGS_proxy_port));
@@ -117,27 +179,11 @@
epoll_->DeleteFd(server_.fd());
server_.terminate();
CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
+ CHECK(global_epoll == epoll_);
+ global_epoll = nullptr;
}
void Subscriber::RunIteration() {
- {
- // Manage updating the channels_ map given the pending_* members from the
- // *Listeners() methods.
- // We handle all the removals first so that we correctly handle the
- // situation where the user calls RemoveListener() and then AddListener()
- // between calls to RunIteration(). The reverse order (adding and then
- // removing before an update) is handled directly in RemoveListener() where
- // we remove things from the pending_channels_ map directly.
- MutexLocker lock(&mutex_);
- for (const auto &channel : pending_removal_) {
- channels_.erase(channel);
- }
- pending_removal_.clear();
- for (const auto &channel : pending_channels_) {
- channels_.insert(channel);
- }
- pending_channels_.clear();
- }
if (channels_.empty() && buffer_size_ == 0) {
return;
}
@@ -153,38 +199,47 @@
<< "packets";
for (int packet_index = 0;
packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
- flatbuffers::Offset<MessageHeader> message_offset =
- PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
- fbb_.Finish(message_offset);
+ // Pack directly into the mbuffer. This is admittedly a bit painful.
+ const size_t packet_size =
+ PackedMessageSize(fetcher_->context(), packet_index);
+ struct mbuf *mbuffer = mbuf_alloc(packet_size);
- const flatbuffers::DetachedBuffer buffer = fbb_.Release();
+ {
+ // Wrap a pre-allocated builder around the mbuffer.
+ PreallocatedAllocator allocator(mbuf_buf(mbuffer), packet_size);
+ flatbuffers::FlatBufferBuilder fbb(packet_size, &allocator);
+ flatbuffers::Offset<MessageHeader> message_offset = PackMessage(
+ &fbb, fetcher_->context(), channel_index_, packet_index);
+ fbb.Finish(message_offset);
+
+ // Now, the flatbuffer is built from the back to the front. So any
+ // extra memory will be at the front. Setup the end and start pointers
+ // on the mbuf.
+ mbuf_set_end(mbuffer, packet_size);
+ mbuf_set_pos(mbuffer, packet_size - fbb.GetSize());
+ }
message.data.emplace_back(
- rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
- true /* binary array */);
+ std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
}
message_buffer_.push_back(std::move(message));
}
for (auto &conn : channels_) {
- rtc::scoped_refptr<webrtc::DataChannelInterface> rtc_channel = conn.first;
+ std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first;
ChannelInformation *channel_data = &conn.second;
if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
SkipToLastMessage(channel_data);
}
- const webrtc::DataBuffer *buffer = NextBuffer(channel_data);
- while (buffer != nullptr) {
+ std::shared_ptr<struct mbuf> buffer = NextBuffer(channel_data);
+ while (buffer) {
+ // TODO(austin): This is a nop so we just buffer forever. Fix this when
+ // we care.
if (rtc_channel->buffered_amount() > 14000000) {
VLOG(1) << "skipping a send because buffered amount is too high";
break;
}
- // Call Send() from the signalling thread. The Invoke() call blocks until
- // the handler has been called, so we do not need to handle any
- // synchronization on this end. The body of the handler should be kept as
- // short as possible to avoid blocking the signalling thread continuously
- // for any longer than necessary.
- channel_data->signaling_thread->Invoke<void>(
- RTC_FROM_HERE,
- [rtc_channel, buffer]() { rtc_channel->Send(*buffer); });
+
+ rtc_channel->Send(buffer.get());
buffer = NextBuffer(channel_data);
}
}
@@ -195,24 +250,23 @@
}
}
-bool Subscriber::Compare(const Channel *channel) const {
- return channel->name()->string_view() ==
- fetcher_->channel()->name()->string_view() &&
- channel->type()->string_view() ==
- fetcher_->channel()->type()->string_view();
-}
-
-void Subscriber::AddListener(
- rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
- TransferMethod transfer_method, rtc::Thread *signaling_thread) {
- MutexLocker lock(&mutex_);
+void Subscriber::AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
+ TransferMethod transfer_method) {
+ VLOG(1) << "Adding listener for " << data_channel.get();
ChannelInformation info;
info.transfer_method = transfer_method;
- info.signaling_thread = signaling_thread;
- pending_channels_.emplace(channel, info);
+
+ channels_.emplace(data_channel, info);
}
-const webrtc::DataBuffer *Subscriber::NextBuffer(ChannelInformation *channel) {
+void Subscriber::RemoveListener(
+ std::shared_ptr<ScopedDataChannel> data_channel) {
+ VLOG(1) << "Removing listener for " << data_channel.get();
+ channels_.erase(data_channel);
+}
+
+std::shared_ptr<struct mbuf> Subscriber::NextBuffer(
+ ChannelInformation *channel) {
CHECK_NOTNULL(channel);
if (message_buffer_.empty()) {
return nullptr;
@@ -223,7 +277,7 @@
if (fell_behind) {
channel->current_queue_index = earliest_index;
channel->next_packet_number = 0;
- return &message_buffer_.front().data.at(0);
+ return message_buffer_.front().data.at(0);
}
if (channel->current_queue_index > latest_index) {
// We are still waiting on the next message to appear; return.
@@ -237,8 +291,8 @@
CHECK_LT(0u, packets_in_message);
CHECK_LT(channel->next_packet_number, packets_in_message);
- const webrtc::DataBuffer *data =
- &message_buffer_[channel->current_queue_index - earliest_index].data.at(
+ std::shared_ptr<struct mbuf> original_data =
+ message_buffer_[channel->current_queue_index - earliest_index].data.at(
channel->next_packet_number);
++channel->next_packet_number;
@@ -247,7 +301,9 @@
channel->next_packet_number = 0;
}
- return data;
+ // Trigger a copy of the mbuf without copying the data.
+ return std::shared_ptr<struct mbuf>(mbuf_alloc_ref(original_data.get()),
+ mem_deref);
}
void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
@@ -260,229 +316,282 @@
channel->next_packet_number = 0;
}
-void Subscriber::RemoveListener(
- rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
- MutexLocker lock(&mutex_);
- pending_channels_.erase(channel);
- pending_removal_.push_back(channel);
-}
-
-Connection::Connection(
- ::seasocks::WebSocket *sock, ::seasocks::Server *server,
+ApplicationConnection::ApplicationConnection(
+ ::seasocks::Server *server, ::seasocks::WebSocket *sock,
const std::vector<std::unique_ptr<Subscriber>> &subscribers,
const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
const EventLoop *event_loop)
- : sock_(sock),
- server_(server),
+ : server_(server),
+ sock_(sock),
subscribers_(subscribers),
config_headers_(PackBuffer(config.span())),
- event_loop_(event_loop) {}
+ event_loop_(event_loop) {
+ connection_.set_on_negotiation_needed([]() {
+ VLOG(1) << "Negotiation needed, not offering so not creating offer.";
+ });
-// Function called for web socket data. Parses the flatbuffer and
-// handles it appropriately.
-void Connection::HandleWebSocketData(const uint8_t *data, size_t size) {
- const FlatbufferSpan<WebSocketMessage> message({data, size});
- if (!message.Verify()) {
- LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
+ connection_.set_on_local_candidate(
+ [this](struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url) { LocalCandidate(candidate, url); });
+
+ connection_.set_on_data_channel(
+ [this](std::shared_ptr<ScopedDataChannel> channel) {
+ OnDataChannel(channel);
+ });
+
+ connection_.Open();
+}
+
+ApplicationConnection::~ApplicationConnection() {
+ for (auto &it : channels_) {
+ it.second.data_channel->Close();
+ it.second.data_channel = nullptr;
+ }
+
+ // Eh, we are done, tell the channel to shut down. If we didn't, it would
+ // just hang around until the connection closes, which is rather shortly
+ // after.
+ if (channel_) {
+ channel_->Close();
+ }
+}
+
+void ApplicationConnection::OnSdp(const char *sdp) {
+ struct rawrtc_peer_connection_description *remote_description = NULL;
+
+ auto error = rawrtc_peer_connection_description_create(
+ &remote_description, RAWRTC_SDP_TYPE_OFFER, sdp);
+ if (error) {
+ LOG(WARNING) << "Cannot parse remote description: "
+ << rawrtc_code_to_str(error);
return;
}
- switch (message.message().payload_type()) {
- case Payload::WebSocketSdp: {
- const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
- if (offer->type() != SdpType::OFFER) {
- LOG(WARNING) << "Got the wrong sdp type from client";
- break;
- }
- const flatbuffers::String *sdp = offer->payload();
- webrtc::SdpParseError error;
- std::unique_ptr<webrtc::SessionDescriptionInterface> desc =
- CreateSessionDescription(webrtc::SdpType::kOffer, sdp->str(), &error);
- if (!desc) {
- LOG(WARNING) << "Failed to parse sdp description: "
- << error.description;
- // TODO(alex): send a message back to browser for failure.
- break;
- }
- // We can only start creating the PeerConnection once we have
- // something to give it, so we wait until we get an offer before
- // starting.
- webrtc::PeerConnectionInterface::RTCConfiguration config;
- config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
- config.enable_dtls_srtp = true;
- {
- webrtc::PeerConnectionInterface::IceServer ice_server;
- ice_server.urls.push_back("stun:stun.l.google.com:19302");
- config.servers.push_back(ice_server);
- }
+ CHECK_RAWRTC(rawrtc_peer_connection_set_remote_description(
+ connection_.connection(), remote_description));
- std::unique_ptr<rtc::Thread> signaling_thread = rtc::Thread::Create();
- signaling_thread->SetName("signaling_thread", nullptr);
- signaling_thread->Start();
+ struct rawrtc_peer_connection_description *local_description;
+ CHECK_RAWRTC(rawrtc_peer_connection_create_answer(&local_description,
+ connection_.connection()));
+ CHECK_RAWRTC(rawrtc_peer_connection_set_local_description(
+ connection_.connection(), local_description));
- signaling_thread_ = signaling_thread.get();
+ enum rawrtc_sdp_type type;
+ char *local_sdp = nullptr;
+ // Get SDP type & the SDP itself
+ CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp_type(
+ &type, local_description));
+ CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp(&local_sdp,
+ local_description));
- webrtc::PeerConnectionFactoryDependencies factory_deps;
- factory_deps.signaling_thread = signaling_thread.release();
- rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory =
- CreateModularPeerConnectionFactory(std::move(factory_deps));
- {
- // Don't ignore *any* networks--by default, the loopback interface is
- // ignored, which makes it impossible to use WebRTC on devices with no
- // network.
- webrtc::PeerConnectionFactoryInterface::Options options;
- options.network_ignore_mask = 0;
- factory->SetOptions(options);
- }
-
- peer_connection_ =
- factory->CreatePeerConnection(config, nullptr, nullptr, this);
-
- peer_connection_->SetRemoteDescription(
- DummySetSessionDescriptionObserver::Create(), desc.release());
-
- peer_connection_->CreateAnswer(
- this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions());
- break;
- }
- case Payload::WebSocketIce: {
- const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
- std::string candidate = ice->candidate()->str();
- std::string sdpMid = ice->sdpMid()->str();
- int sdpMLineIndex = ice->sdpMLineIndex();
- webrtc::SdpParseError error;
- webrtc::IceCandidateInterface *ice_candidate =
- webrtc::CreateIceCandidate(sdpMid, sdpMLineIndex, candidate, &error);
- if (!ice_candidate) {
- LOG(WARNING) << "Failed to parse ice candidate: " << error.description;
- // TODO(alex): send a message back to browser for failure.
- break;
- }
- peer_connection_->AddIceCandidate(ice_candidate);
- break;
- }
- default: {
- break;
- }
- }
-}
-
-void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const {
- webrtc::DataBuffer data_buffer(
- rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
- true /* binary array */);
- VLOG(1) << "Sending " << buffer.size() << "bytes to a client";
- data_channel_->Send(data_buffer);
-}
-
-void Connection::OnDataChannel(
- rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
- data_channel_ = channel;
- data_channel_->RegisterObserver(this);
-}
-
-void Connection::OnIceCandidate(
- const webrtc::IceCandidateInterface *candidate) {
- flatbuffers::FlatBufferBuilder fbb(512);
- std::string ice_string;
- candidate->ToString(&ice_string);
-
- flatbuffers::Offset<WebSocketIce> ice_fb = CreateWebSocketIceDirect(
- fbb, ice_string.c_str(), candidate->sdp_mid().c_str(),
- candidate->sdp_mline_index());
- flatbuffers::Offset<WebSocketMessage> ice_message =
- CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union());
- fbb.Finish(ice_message);
-
- server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
-}
-
-// This is the callback for creating an sdp. We have to manually assign it
-// locally and send it to the client.
-void Connection::OnSuccess(webrtc::SessionDescriptionInterface *desc) {
- peer_connection_->SetLocalDescription(
- DummySetSessionDescriptionObserver::Create(), desc);
- flatbuffers::FlatBufferBuilder fbb(512);
- std::string answer_string;
- desc->ToString(&answer_string);
+ flatbuffers::FlatBufferBuilder fbb;
flatbuffers::Offset<WebSocketSdp> sdp_fb =
- CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, answer_string.c_str());
+ CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, local_sdp);
flatbuffers::Offset<WebSocketMessage> answer_message =
CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
+
+ VLOG(1) << aos::FlatbufferToJson(
+ flatbuffers::GetTemporaryPointer(fbb, answer_message));
fbb.Finish(answer_message);
server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
+ mem_deref(local_sdp);
}
-// Wait until the data channel is ready for data before sending the config.
-void Connection::OnStateChange() {
- if (peer_connection_.get() != nullptr &&
- data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
- for (const auto &header : config_headers_) {
- Send(header.buffer());
+void ApplicationConnection::OnIce(const WebSocketIce *ice) {
+ if (!ice->has_candidate()) {
+ return;
+ }
+ uint8_t sdpMLineIndex = ice->sdpMLineIndex();
+
+ struct rawrtc_peer_connection_ice_candidate *ice_candidate = nullptr;
+ CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_create(
+ &ice_candidate, ice->candidate()->c_str(), ice->sdpMid()->c_str(),
+ &sdpMLineIndex, nullptr));
+
+ rawrtc_peer_connection_add_ice_candidate(connection_.connection(),
+ ice_candidate);
+
+ mem_deref(ice_candidate);
+}
+
+void ApplicationConnection::LocalCandidate(
+ struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url) {
+ struct rawrtc_ice_candidate *ortc_candidate = nullptr;
+ if (candidate) {
+ CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_get_ortc_candidate(
+ &ortc_candidate, candidate));
+
+ flatbuffers::FlatBufferBuilder fbb;
+ char *sdpp = nullptr;
+ CHECK_RAWRTC(
+ rawrtc_peer_connection_ice_candidate_get_sdp(&sdpp, candidate));
+ char *midp = nullptr;
+ CHECK_RAWRTC(
+ rawrtc_peer_connection_ice_candidate_get_sdp_mid(&midp, candidate));
+
+ uint8_t media_line_index;
+ enum rawrtc_code error =
+ rawrtc_peer_connection_ice_candidate_get_sdp_media_line_index(
+ &media_line_index, candidate);
+
+ flatbuffers::Offset<flatbuffers::String> sdpp_offset =
+ fbb.CreateString(sdpp);
+ flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
+ fbb.CreateString(midp);
+
+ WebSocketIce::Builder web_socket_ice_builder(fbb);
+
+ web_socket_ice_builder.add_candidate(sdpp_offset);
+ web_socket_ice_builder.add_sdpMid(sdp_mid_offset);
+
+ if (error == RAWRTC_CODE_SUCCESS) {
+ web_socket_ice_builder.add_sdpMLineIndex(media_line_index);
}
+ flatbuffers::Offset<WebSocketIce> ice_offset =
+ web_socket_ice_builder.Finish();
+
+ flatbuffers::Offset<WebSocketMessage> ice_message =
+ CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_offset.Union());
+ VLOG(1) << url << ": "
+ << aos::FlatbufferToJson(
+ flatbuffers::GetTemporaryPointer(fbb, ice_message));
+ fbb.Finish(ice_message);
+
+ server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
+
+ mem_deref(sdpp);
+ mem_deref(midp);
}
}
-// Handle DataChannel messages. Subscribe to each listener that matches the
-// subscribe message
-void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
- // Sanity check--we are relying on the Add/RemoveListener calls being made
- // from the correct thread.
- CHECK(signaling_thread_->IsCurrent());
+void ApplicationConnection::OnDataChannel(
+ std::shared_ptr<ScopedDataChannel> channel) {
+ if (channel->label() == std::string_view("signalling")) {
+ CHECK(!channel_);
+ channel_ = channel;
+
+ channel_->set_on_message(
+ [this](struct mbuf *const buffer,
+ const enum rawrtc_data_channel_message_flag flags) {
+ HandleSignallingData(buffer, flags);
+ });
+
+ channel_->set_on_open([this]() {
+ for (const auto &header : config_headers_) {
+ channel_->Send(header.buffer());
+ }
+ });
+
+ channel_->set_on_error([this]() { LOG(ERROR) << "Error on " << this; });
+
+ // Register an on_close callback which does nothing but keeps channel alive
+ // until it is done. This keeps the memory around until rawrtc can finish
+ // calling the close callback.
+ channel_->set_on_close([channel]() {});
+ } else {
+ channel_->set_on_close([channel]() {});
+ channel->Close();
+ }
+}
+
+void ApplicationConnection::HandleSignallingData(
+ struct mbuf *const
+ buffer, // nullable (in case partial delivery has been requested)
+ const enum rawrtc_data_channel_message_flag /*flags*/) {
FlatbufferSpan<SubscriberRequest> message(
- {buffer.data.data(), buffer.data.size()});
+ {mbuf_buf(buffer), mbuf_get_left(buffer)});
if (!message.Verify()) {
LOG(ERROR) << "Invalid flatbuffer received from browser client.";
return;
}
- VLOG(2) << "Got a subscription message "
+ VLOG(1) << "Got a subscription message "
<< aos::FlatbufferToJson(&message.message());
if (!message.message().has_channels_to_transfer()) {
LOG(ERROR) << "No channels requested for transfer.";
return;
}
- for (auto &subscriber : subscribers_) {
- bool found_match = false;
- for (auto channel_request : *message.message().channels_to_transfer()) {
- const Channel *channel = channel_request->channel();
- if (channel == nullptr) {
- LOG(ERROR) << "Got unpopulated channel.";
- continue;
- }
- const TransferMethod transfer_method = channel_request->method();
- // Call GetChannel() before comparing the channel name/type to each
- // subscriber. This allows us to resolve any node or application specific
- // mappings.
- const Channel *comparison_channel =
- configuration::GetChannel(event_loop_->configuration(), channel,
- event_loop_->name(), event_loop_->node());
- if (comparison_channel == nullptr) {
- LOG(ERROR) << "Channel not available: "
- << configuration::StrippedChannelToString(channel);
- continue;
- }
- if (subscriber->Compare(comparison_channel)) {
- int index = subscriber->index();
- auto it = channels_.find(index);
- if (it == channels_.end()) {
- auto pair = channels_.insert(
- {index, peer_connection_->CreateDataChannel(
- channel->name()->str() + "/" + channel->type()->str(),
- nullptr)});
- it = pair.first;
- }
- subscriber->AddListener(it->second, transfer_method, signaling_thread_);
- VLOG(1) << "Subscribe to: " << channel->type()->str();
- found_match = true;
- break;
- }
+ // The client each time sends a full list of everything it wants to be
+ // subscribed to. It is our responsibility to remove channels which aren't
+ // in that list and add ones which need to be.
+ //
+ // Start by clearing a tracking bit on each channel. This takes O(number of
+ // open channels), which should be small.
+ //
+ // Then open any new channels. For any we visit which are already open,
+ // don't update those.
+ //
+ // Finally, iterate over the channel list and purge anything which we didn't
+ // touch.
+ for (auto &it : channels_) {
+ it.second.requested = false;
+ }
+ for (auto channel_request : *message.message().channels_to_transfer()) {
+ const Channel *channel = channel_request->channel();
+ if (channel == nullptr) {
+ LOG(ERROR) << "Got unpopulated channel.";
+ continue;
}
- if (!found_match) {
- int index = subscriber->index();
- auto it = channels_.find(index);
- subscriber->RemoveListener(it->second);
+ const TransferMethod transfer_method = channel_request->method();
+ // Call GetChannel() before comparing the channel name/type to each
+ // subscriber. This allows us to resolve any node or application
+ // specific mappings.
+ const Channel *comparison_channel =
+ configuration::GetChannel(event_loop_->configuration(), channel,
+ event_loop_->name(), event_loop_->node());
+ if (comparison_channel == nullptr) {
+ LOG(ERROR) << "Channel not available: "
+ << configuration::StrippedChannelToString(channel);
+ continue;
+ }
+
+ size_t channel_index = configuration::ChannelIndex(
+ event_loop_->configuration(), comparison_channel);
+
+ auto it = channels_.find(channel_index);
+ if (it == channels_.end()) {
+ std::shared_ptr<ScopedDataChannel> data_channel =
+ std::make_shared<ScopedDataChannel>();
+
+ std::weak_ptr<ScopedDataChannel> data_channel_weak_ptr = data_channel;
+
+ data_channel->set_on_open([this, data_channel_weak_ptr, transfer_method,
+ channel_index]() {
+ std::shared_ptr<ScopedDataChannel> data_channel =
+ data_channel_weak_ptr.lock();
+ CHECK(data_channel) << ": Subscriber got destroyed before we started.";
+ // Weak ptr inside the subscriber so we don't have a circular
+ // reference. AddListener will close it.
+ subscribers_[channel_index]->AddListener(data_channel, transfer_method);
+ });
+
+ Subscriber *subscriber = subscribers_[channel_index].get();
+ data_channel->set_on_close([subscriber, data_channel_weak_ptr]() {
+ std::shared_ptr<ScopedDataChannel> data_channel =
+ data_channel_weak_ptr.lock();
+ CHECK(data_channel) << ": Subscriber got destroyed before we finished.";
+ subscriber->RemoveListener(data_channel);
+ });
+
+ data_channel->Open(
+ connection_.connection(),
+ absl::StrCat(channel->name()->str(), "/", channel->type()->str()));
+
+ auto pair = channels_.insert({channel_index, {data_channel, true}});
+ it = pair.first;
+ }
+
+ it->second.requested = true;
+
+ VLOG(1) << "Subscribe to: " << channel->type()->str();
+ }
+
+ for (auto &it : channels_) {
+ if (!it.second.requested) {
+ it.second.data_channel->Close();
+ it.second.data_channel = nullptr;
}
}
}
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index e7bf4e0..4ad0630 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -1,11 +1,15 @@
#ifndef AOS_NETWORK_WEB_PROXY_H_
#define AOS_NETWORK_WEB_PROXY_H_
+
+#include <deque>
#include <map>
#include <set>
+
#include "aos/events/event_loop.h"
#include "aos/events/shm_event_loop.h"
#include "aos/mutex/mutex.h"
#include "aos/network/connect_generated.h"
+#include "aos/network/rawrtc.h"
#include "aos/network/web_proxy_generated.h"
#include "aos/seasocks/seasocks_logger.h"
#include "flatbuffers/flatbuffers.h"
@@ -13,13 +17,12 @@
#include "seasocks/StringUtil.h"
#include "seasocks/WebSocket.h"
-#include "api/peer_connection_interface.h"
-
namespace aos {
namespace web_proxy {
class Connection;
class Subscriber;
+class ApplicationConnection;
// Basic class that handles receiving new websocket connections. Creates a new
// Connection to manage the rest of the negotiation and data passing. When the
@@ -34,12 +37,14 @@
void onDisconnect(::seasocks::WebSocket *sock) override;
private:
- std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
::seasocks::Server *server_;
std::vector<std::unique_ptr<Subscriber>> subscribers_;
const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
- const EventLoop *const event_loop_;
+ std::map<::seasocks::WebSocket *, std::unique_ptr<ApplicationConnection>>
+ connections_;
+
+ EventLoop *const event_loop_;
};
// Wrapper class that manages the seasocks server and WebsocketHandler.
@@ -92,135 +97,86 @@
public:
Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index,
int buffer_size)
- : fbb_(1024),
- fetcher_(std::move(fetcher)),
+ : fetcher_(std::move(fetcher)),
channel_index_(channel_index),
buffer_size_(buffer_size) {}
void RunIteration();
- void AddListener(rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
- TransferMethod transfer_method,
- rtc::Thread *signaling_thread);
+ void AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
+ TransferMethod transfer_method);
- void RemoveListener(
- rtc::scoped_refptr<webrtc::DataChannelInterface> channel);
-
- // Check if the Channel passed matches the channel this fetchs.
- bool Compare(const Channel *channel) const;
-
- int index() const { return channel_index_; }
+ void RemoveListener(std::shared_ptr<ScopedDataChannel> data_channel);
private:
struct ChannelInformation {
TransferMethod transfer_method;
uint32_t current_queue_index = 0;
size_t next_packet_number = 0;
- // Thread to use for making calls to the DataChannelInterface.
- rtc::Thread *signaling_thread;
};
struct Message {
uint32_t index = 0xffffffff;
- std::vector<webrtc::DataBuffer> data;
+ std::vector<std::shared_ptr<struct mbuf>> data;
};
- const webrtc::DataBuffer *NextBuffer(ChannelInformation *channel);
+ std::shared_ptr<struct mbuf> NextBuffer(ChannelInformation *channel);
void SkipToLastMessage(ChannelInformation *channel);
- flatbuffers::FlatBufferBuilder fbb_;
std::unique_ptr<RawFetcher> fetcher_;
int channel_index_;
int buffer_size_;
std::deque<Message> message_buffer_;
- std::map<rtc::scoped_refptr<webrtc::DataChannelInterface>, ChannelInformation>
- channels_;
- // In order to enable the Connection class to add/remove listeners
- // asyncrhonously, queue up all the newly added listeners in pending_*
- // members. Access to these members is controlled by mutex_.
- std::map<rtc::scoped_refptr<webrtc::DataChannelInterface>, ChannelInformation>
- pending_channels_;
- std::vector<rtc::scoped_refptr<webrtc::DataChannelInterface>>
- pending_removal_;
-
- aos::Mutex mutex_;
+ std::map<std::shared_ptr<ScopedDataChannel>, ChannelInformation> channels_;
};
-// Represents a single connection to a browser for the entire lifetime of the
-// connection.
-class Connection : public webrtc::PeerConnectionObserver,
- public webrtc::CreateSessionDescriptionObserver,
- public webrtc::DataChannelObserver {
+// Class to manage a WebRTC connection to a browser.
+class ApplicationConnection {
public:
- Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server,
- const std::vector<std::unique_ptr<Subscriber>> &subscribers,
- const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
- const EventLoop *event_loop);
+ ApplicationConnection(
+ ::seasocks::Server *server, ::seasocks::WebSocket *sock,
+ const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
+ const EventLoop *event_loop);
- ~Connection() {
- // DataChannel may call OnStateChange after this is destroyed, so make sure
- // it doesn't.
- if (data_channel_) {
- data_channel_->UnregisterObserver();
- }
- }
+ ~ApplicationConnection();
- void HandleWebSocketData(const uint8_t *data, size_t size);
-
- void Send(const flatbuffers::DetachedBuffer &buffer) const;
-
- // PeerConnectionObserver implementation
- void OnSignalingChange(
- webrtc::PeerConnectionInterface::SignalingState) override {}
- void OnAddStream(rtc::scoped_refptr<webrtc::MediaStreamInterface>) override {}
- void OnRemoveStream(
- rtc::scoped_refptr<webrtc::MediaStreamInterface>) override {}
- void OnDataChannel(
- rtc::scoped_refptr<webrtc::DataChannelInterface> channel) override;
- void OnRenegotiationNeeded() override {}
- void OnIceConnectionChange(
- webrtc::PeerConnectionInterface::IceConnectionState) override {}
- void OnIceGatheringChange(
- webrtc::PeerConnectionInterface::IceGatheringState) override {}
- void OnIceCandidate(const webrtc::IceCandidateInterface *candidate) override;
- void OnIceCandidateError(const std::string &host_candidate,
- const std::string &url, int error_code,
- const std::string &error_text) override {
- LOG(ERROR) << "ICE Candidate Error on " << host_candidate << " for " << url
- << " with error " << error_code << ": " << error_text;
- }
- void OnIceConnectionReceivingChange(bool) override {}
-
- // CreateSessionDescriptionObserver implementation
- void OnSuccess(webrtc::SessionDescriptionInterface *desc) override;
- void OnFailure(webrtc::RTCError /*error*/) override {}
- // CreateSessionDescriptionObserver is a refcounted object
- void AddRef() const override {}
- // We handle ownership with a unique_ptr so don't worry about actually
- // refcounting. We will delete when we are done.
- rtc::RefCountReleaseStatus Release() const override {
- return rtc::RefCountReleaseStatus::kOtherRefsRemained;
- }
-
- // DataChannelObserver implementation
- void OnStateChange() override;
- void OnMessage(const webrtc::DataBuffer &buffer) override;
- void OnBufferedAmountChange(uint64_t /*sent_data_size*/) override {}
+ // Handles a SDP sent through the negotiation channel.
+ void OnSdp(const char *sdp);
+ // Handles a ICE candidate sent through the negotiation channel.
+ void OnIce(const WebSocketIce *ice);
private:
- ::seasocks::WebSocket *sock_;
- ::seasocks::Server *server_;
- // The signaling thread is the thread on which most/all of the work we do with
- // WebRTC will happen--it is both where the handlers we register should be
- // called and where we should be calling Send() from.
- rtc::Thread *signaling_thread_;
- const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
- const std::vector<FlatbufferDetachedBuffer<MessageHeader>> config_headers_;
- std::map<int, rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;
+ void LocalCandidate(
+ struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url);
- rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
- rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
+ // Handles a signaling channel being made.
+ void OnDataChannel(std::shared_ptr<ScopedDataChannel> channel);
+
+ // Handles data coming in on the signaling channel requesting subscription.
+ void HandleSignallingData(
+ struct mbuf *const
+ buffer, // nullable (in case partial delivery has been requested)
+ const enum rawrtc_data_channel_message_flag /*flags*/);
+
+ RawRTCConnection connection_;
+
+ ::seasocks::Server *server_;
+ ::seasocks::WebSocket *sock_;
+
+ struct ChannelState {
+ std::shared_ptr<ScopedDataChannel> data_channel;
+ bool requested = true;
+ };
+
+ std::map<int, ChannelState> channels_;
+ const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
+
+ const std::vector<FlatbufferDetachedBuffer<MessageHeader>> config_headers_;
const EventLoop *const event_loop_;
+
+ std::shared_ptr<ScopedDataChannel> channel_;
};
} // namespace web_proxy
diff --git a/aos/network/web_proxy_utils.cc b/aos/network/web_proxy_utils.cc
index deb4a38..d6148d4 100644
--- a/aos/network/web_proxy_utils.cc
+++ b/aos/network/web_proxy_utils.cc
@@ -9,6 +9,10 @@
// the middle which seems to work.
constexpr size_t kPacketSize = 125000;
+// Max header size we have seen is 72 bytes, followed by 48 bytes of scratch
+// space.
+constexpr size_t kMaxHeaderSize = 72 + 48;
+
int GetPacketCountFromSize(const int packet_size) {
return packet_size / kPacketSize + 1;
}
@@ -35,6 +39,19 @@
return GetPacketCountFromSize(context.size);
}
+size_t PackedMessageSize(const Context &context, int packet_index) {
+ // Make sure the final size is aligned because flatbuffers will align it up
+ // otherwise.
+ constexpr size_t kAlignment = 8;
+ if (kPacketSize * (packet_index + 1) < context.size) {
+ return (kPacketSize + kMaxHeaderSize + kAlignment - 1) & ~(kAlignment - 1);
+ } else {
+ const int prefix_size = kPacketSize * packet_index;
+ return (context.size - prefix_size + kMaxHeaderSize + kAlignment - 1) &
+ ~(kAlignment - 1);
+ }
+}
+
flatbuffers::Offset<MessageHeader> PackMessage(
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, int packet_index) {
diff --git a/aos/network/web_proxy_utils.h b/aos/network/web_proxy_utils.h
index 09ad333..19b496d 100644
--- a/aos/network/web_proxy_utils.h
+++ b/aos/network/web_proxy_utils.h
@@ -8,15 +8,16 @@
int GetPacketCount(const Context &context);
-/*
- * Packs a message embedded in context into a MessageHeader on fbb. Handles
- * multipart messages by use of the packet_index.
- * TODO(alex): make this an iterator that returns each packet sequentially
- */
+// Packs a message embedded in context into a MessageHeader on fbb. Handles
+// multipart messages by use of the packet_index.
flatbuffers::Offset<MessageHeader> PackMessage(
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, int packet_index);
+// Returns the size that the overall packed message packed by PackMessage will
+// be for the provided packet index.
+size_t PackedMessageSize(const Context &context, int packet_index);
+
// Packs the provided raw data into a series of MessageHeader's of the
// appropriate length.
std::vector<FlatbufferDetachedBuffer<MessageHeader>> PackBuffer(
diff --git a/aos/starter/BUILD b/aos/starter/BUILD
index 706b123..fa62a87 100644
--- a/aos/starter/BUILD
+++ b/aos/starter/BUILD
@@ -75,7 +75,7 @@
target_compatible_with = ["@platforms//os:linux"],
deps = [
":starter_rpc_lib",
- "//aos/time:time",
+ "//aos/time",
"@com_github_google_glog//:glog",
"@com_google_absl//absl/strings:str_format",
],
diff --git a/third_party/BUILD b/third_party/BUILD
index 4ecad0d..040d3f7 100644
--- a/third_party/BUILD
+++ b/third_party/BUILD
@@ -91,18 +91,6 @@
)
cc_library(
- name = "webrtc",
- target_compatible_with = ["@platforms//os:linux"],
- visibility = ["//visibility:public"],
- deps = cpu_select({
- "amd64": ["@webrtc_x64//:webrtc"],
- "armhf": ["@webrtc_arm//:webrtc"],
- "cortex-m": ["@webrtc_arm//:webrtc"],
- "roborio": ["@webrtc_rio//:webrtc"],
- }),
-)
-
-cc_library(
name = "lzma",
visibility = ["//visibility:public"],
deps = select({
diff --git a/third_party/rawrtc/rawrtc/BUILD b/third_party/rawrtc/rawrtc/BUILD
index d65d0af..dc278f7 100644
--- a/third_party/rawrtc/rawrtc/BUILD
+++ b/third_party/rawrtc/rawrtc/BUILD
@@ -28,6 +28,9 @@
"-DHAVE_STDBOOL_H",
"-DHAVE_INTTYPES_H",
],
+ defines = [
+ "HAVE_STDBOOL_H",
+ ],
includes = ["include/"],
local_defines = [
"RAWRTC_VERSION=\\\"0.5.1\\\"",
@@ -39,17 +42,36 @@
],
)
+cc_library(
+ name = "tools_helper",
+ srcs = [
+ "tools/helper/common.c",
+ "tools/helper/handler.c",
+ "tools/helper/parameters.c",
+ "tools/helper/utils.c",
+ ],
+ hdrs = [
+ "tools/helper/common.h",
+ "tools/helper/handler.h",
+ "tools/helper/parameters.h",
+ "tools/helper/utils.h",
+ ],
+ copts = [
+ "-Wno-missing-braces",
+ "-Wno-incompatible-pointer-types-discards-qualifiers",
+ ] + compiler_select({
+ "clang": [],
+ "gcc": [
+ "-Wno-discarded-qualifiers",
+ ],
+ }),
+ visibility = ["//visibility:public"],
+ deps = [":rawrtc"],
+)
+
cc_binary(
name = "peer-connection",
srcs = [
- "tools/helper/common.c",
- "tools/helper/common.h",
- "tools/helper/handler.c",
- "tools/helper/handler.h",
- "tools/helper/parameters.c",
- "tools/helper/parameters.h",
- "tools/helper/utils.c",
- "tools/helper/utils.h",
"tools/peer-connection.c",
],
copts = [
@@ -64,5 +86,8 @@
],
}),
includes = ["tools"],
- deps = [":rawrtc"],
+ deps = [
+ ":rawrtc",
+ ":tools_helper",
+ ],
)
diff --git a/third_party/rawrtc/re/include/re_main.h b/third_party/rawrtc/re/include/re_main.h
index cc9ac4b..c4e6293 100644
--- a/third_party/rawrtc/re/include/re_main.h
+++ b/third_party/rawrtc/re/include/re_main.h
@@ -29,8 +29,13 @@
*
* @param sig Signal number
*/
+typedef int(re_fd_listen_h)(int fd, int flags, fd_h *fh, void *arg);
+typedef void(re_fd_close_h)(int fd);
+
typedef void (re_signal_h)(int sig);
+void re_fd_set_listen_callback(re_fd_listen_h *listenh);
+void re_fd_set_close_callback(re_fd_close_h *closeh);
int fd_listen(int fd, int flags, fd_h *fh, void *arg);
void fd_close(int fd);
diff --git a/third_party/rawrtc/re/src/main/main.c b/third_party/rawrtc/re/src/main/main.c
index 0243b4b..de3e68b 100644
--- a/third_party/rawrtc/re/src/main/main.c
+++ b/third_party/rawrtc/re/src/main/main.c
@@ -222,6 +222,25 @@
#endif
+static re_fd_listen_h *global_fd_listen_h = NULL;
+static re_fd_close_h *global_fd_close_h = NULL;
+
+void re_fd_set_listen_callback(re_fd_listen_h *listenh) {
+ struct re *re = re_get();
+
+ re_lock(re);
+ global_fd_listen_h = listenh;
+ re_unlock(re);
+}
+void re_fd_set_close_callback(re_fd_close_h *closeh) {
+ struct re *re = re_get();
+
+ re_lock(re);
+ global_fd_close_h = closeh;
+ re_unlock(re);
+}
+
+
#if MAIN_DEBUG
/**
* Call the application event handler
@@ -566,7 +585,11 @@
struct re *re = re_get();
int err = 0;
- DEBUG_INFO("fd_listen: fd=%d flags=0x%02x\n", fd, flags);
+ if (global_fd_listen_h != NULL) {
+ return (*global_fd_listen_h)(fd, flags, fh, arg);
+ }
+
+ DEBUG_INFO("fd_listen: fd=%d flags=0x%02x\n", fd, flags);
if (fd < 0) {
DEBUG_WARNING("fd_listen: corrupt fd %d\n", fd);
@@ -642,6 +665,11 @@
*/
void fd_close(int fd)
{
+ if (global_fd_close_h != NULL) {
+ (*global_fd_close_h)(fd);
+ return;
+ }
+
(void)fd_listen(fd, 0, NULL, NULL);
}
diff --git a/y2020/BUILD b/y2020/BUILD
index 964def4..d95b687 100644
--- a/y2020/BUILD
+++ b/y2020/BUILD
@@ -146,6 +146,14 @@
aos_config(
name = "config",
src = "y2020.json",
+ flatbuffers = [
+ "//aos/network:message_bridge_client_fbs",
+ "//aos/network:message_bridge_server_fbs",
+ "//aos/network:timestamp_fbs",
+ "//y2020/vision/sift:sift_fbs",
+ "//y2020/vision/sift:sift_training_fbs",
+ "//y2020/vision:vision_fbs",
+ ],
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
deps = [
@@ -157,14 +165,6 @@
":config_pi5",
":config_roborio",
],
- flatbuffers = [
- "//aos/network:message_bridge_client_fbs",
- "//aos/network:message_bridge_server_fbs",
- "//aos/network:timestamp_fbs",
- "//y2020/vision/sift:sift_fbs",
- "//y2020/vision/sift:sift_training_fbs",
- "//y2020/vision:vision_fbs",
- ]
)
[
@@ -179,7 +179,7 @@
"//y2020/vision/sift:sift_training_fbs",
"//y2020/vision:vision_fbs",
"//aos/network:remote_message_fbs",
- "//y2020/vision:galactic_search_path_fbs"
+ "//y2020/vision:galactic_search_path_fbs",
],
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
diff --git a/y2020/actors/BUILD b/y2020/actors/BUILD
index aacf77a..6521b85 100644
--- a/y2020/actors/BUILD
+++ b/y2020/actors/BUILD
@@ -19,8 +19,8 @@
filegroup(
name = "spline_jsons",
srcs = glob([
- "splines/*.json"
- ]),
+ "splines/*.json",
+ ]),
visibility = ["//visibility:public"],
)
@@ -56,11 +56,11 @@
"//frc971/control_loops:profiled_subsystem_fbs",
"//frc971/control_loops/drivetrain:drivetrain_config",
"//frc971/control_loops/drivetrain:localizer_fbs",
+ "//frc971/control_loops/drivetrain:spline",
"//y2020/control_loops/drivetrain:drivetrain_base",
"//y2020/control_loops/superstructure:superstructure_goal_fbs",
"//y2020/control_loops/superstructure:superstructure_status_fbs",
"//y2020/vision:galactic_search_path_fbs",
- "//frc971/control_loops/drivetrain:spline",
],
)