Merge changes Id0f3fe5a,I1657b15c,I80e69a93
* changes:
Create an ExitHandle interface
autocxx: Support multiple AUTOCXX_RS_JSON_ARCHIVE entries
autocxx: Generate impls for abstract types too
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 278934b..deca15b 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -373,6 +373,7 @@
visibility = ["//visibility:public"],
deps = [
":aos_logging",
+ ":epoll",
":event_loop",
":simple_channel",
"//aos:init",
@@ -395,6 +396,7 @@
target_compatible_with = ["@platforms//os:linux"],
deps = [
":simulated_event_loop",
+ "//aos/network:testing_time_converter",
"//aos/testing:googletest",
"@com_github_google_glog//:glog",
],
diff --git a/aos/events/epoll.cc b/aos/events/epoll.cc
index 8cb553b..71e3c9b 100644
--- a/aos/events/epoll.cc
+++ b/aos/events/epoll.cc
@@ -24,6 +24,7 @@
void TimerFd::SetTime(monotonic_clock::time_point start,
monotonic_clock::duration interval) {
+ CHECK_GE(start, monotonic_clock::epoch());
struct itimerspec new_value;
new_value.it_interval = ::aos::time::to_timespec(interval);
new_value.it_value = ::aos::time::to_timespec(start);
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 3ceb240..2bcde2f 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -462,6 +462,7 @@
// Timer should sleep until base, base + offset, base + offset * 2, ...
// If repeat_offset isn't set, the timer only expires once.
+ // base must be greater than or equal to zero.
virtual void Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset =
::aos::monotonic_clock::zero()) = 0;
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 2d7decc..0eeecf8 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -752,6 +752,14 @@
"/test/invalid");
}
+// Verify that setting up a timer before monotonic_clock::epoch() fails.
+TEST_P(AbstractEventLoopDeathTest, NegativeTimeTimer) {
+ auto loop = Make();
+ TimerHandler *time = loop->AddTimer([]() {});
+ EXPECT_DEATH(time->Setup(monotonic_clock::epoch() - std::chrono::seconds(1)),
+ "-1.000");
+}
+
// Verify that registering a watcher twice for "/test" fails.
TEST_P(AbstractEventLoopDeathTest, TwoWatcher) {
auto loop = Make();
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index cb0c629..97c1e83 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -10,6 +10,7 @@
EventScheduler::Token EventScheduler::Schedule(monotonic_clock::time_point time,
Event *callback) {
+ CHECK_LE(monotonic_clock::epoch(), time);
return events_list_.emplace(time, callback);
}
@@ -35,6 +36,12 @@
}
aos::monotonic_clock::time_point EventScheduler::OldestEvent() {
+ // If we haven't started yet, schedule a special event for the epoch to allow
+ // ourselves to boot.
+ if (!called_started_) {
+ return aos::monotonic_clock::epoch();
+ }
+
if (events_list_.empty()) {
return monotonic_clock::max_time;
}
@@ -42,14 +49,27 @@
return events_list_.begin()->first;
}
-void EventScheduler::Shutdown() { on_shutdown_(); }
+void EventScheduler::Shutdown() {
+ CHECK(!is_running_);
+ on_shutdown_();
+}
void EventScheduler::Startup() {
++boot_count_;
- RunOnStartup();
+ CHECK(!is_running_);
+ MaybeRunOnStartup();
+ CHECK(called_started_);
}
void EventScheduler::CallOldestEvent() {
+ if (!called_started_) {
+ // If we haven't started, start.
+ MaybeRunOnStartup();
+ MaybeRunOnRun();
+ CHECK(called_started_);
+ return;
+ }
+ CHECK(is_running_);
CHECK_GT(events_list_.size(), 0u);
auto iter = events_list_.begin();
const logger::BootTimestamp t =
@@ -66,6 +86,7 @@
}
void EventScheduler::RunOnRun() {
+ CHECK(is_running_);
while (!on_run_.empty()) {
std::function<void()> fn = std::move(*on_run_.begin());
on_run_.erase(on_run_.begin());
@@ -75,6 +96,7 @@
void EventScheduler::RunOnStartup() noexcept {
while (!on_startup_.empty()) {
+ CHECK(!is_running_);
std::function<void()> fn = std::move(*on_startup_.begin());
on_startup_.erase(on_startup_.begin());
fn();
@@ -82,14 +104,39 @@
}
void EventScheduler::RunStarted() {
+ CHECK(!is_running_);
if (started_) {
started_();
}
+ is_running_ = true;
}
-void EventScheduler::RunStopped() {
- if (stopped_) {
- stopped_();
+void EventScheduler::MaybeRunStopped() {
+ CHECK(is_running_);
+ is_running_ = false;
+ if (called_started_) {
+ called_started_ = false;
+ if (stopped_) {
+ stopped_();
+ }
+ }
+}
+
+void EventScheduler::MaybeRunOnStartup() {
+ CHECK(!called_started_);
+ CHECK(!is_running_);
+ const logger::BootTimestamp t =
+ FromDistributedClock(scheduler_scheduler_->distributed_now());
+ if (t.boot == boot_count_ && t.time >= monotonic_clock::epoch()) {
+ called_started_ = true;
+ RunOnStartup();
+ }
+}
+
+void EventScheduler::MaybeRunOnRun() {
+ if (called_started_) {
+ RunStarted();
+ RunOnRun();
}
}
@@ -110,6 +157,88 @@
scheduler->scheduler_scheduler_ = this;
}
+void EventSchedulerScheduler::MaybeRunStopped() {
+ CHECK(!is_running_);
+ for (EventScheduler *scheduler : schedulers_) {
+ if (scheduler->is_running()) {
+ scheduler->MaybeRunStopped();
+ }
+ }
+}
+
+bool EventSchedulerScheduler::RunUntil(
+ realtime_clock::time_point end_time, EventScheduler *scheduler,
+ std::function<std::chrono::nanoseconds()> fn_realtime_offset) {
+ logging::ScopedLogRestorer prev_logger;
+ MaybeRunOnStartup();
+
+ bool reached_end_time = false;
+
+ RunMaybeRealtimeLoop([this, scheduler, end_time, fn_realtime_offset,
+ &reached_end_time]() {
+ std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
+ OldestEvent();
+ aos::distributed_clock::time_point oldest_event_time_distributed =
+ std::get<0>(oldest_event);
+ logger::BootTimestamp test_time_monotonic =
+ scheduler->FromDistributedClock(oldest_event_time_distributed);
+ realtime_clock::time_point oldest_event_realtime(
+ test_time_monotonic.time_since_epoch() + fn_realtime_offset());
+
+ if ((std::get<0>(oldest_event) == distributed_clock::max_time) ||
+ (oldest_event_realtime > end_time &&
+ (reboots_.empty() ||
+ std::get<0>(reboots_.front()) > oldest_event_time_distributed))) {
+ is_running_ = false;
+ reached_end_time = true;
+
+ // We have to nudge our time back to the distributed time
+ // corresponding to our desired realtime time.
+ const aos::monotonic_clock::time_point end_monotonic =
+ aos::monotonic_clock::epoch() + end_time.time_since_epoch() -
+ fn_realtime_offset();
+ const aos::distributed_clock::time_point end_time_distributed =
+ scheduler->ToDistributedClock(end_monotonic);
+
+ now_ = end_time_distributed;
+
+ return;
+ }
+
+ if (!reboots_.empty() &&
+ std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
+ // Reboot is next.
+ CHECK_LE(now_,
+ std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
+ << ": Simulated time went backwards by too much. Please "
+ "investigate.";
+ now_ = std::get<0>(reboots_.front());
+ Reboot();
+ reboots_.erase(reboots_.begin());
+ return;
+ }
+
+ // We get to pick our tradeoffs here. Either we assume that there are
+ // no backward step changes in our time function for each node, or we
+ // have to let time go backwards. We currently only really see this
+ // happen when 2 events are scheduled for "now", time changes, and
+ // there is a nanosecond or two of rounding due to integer math.
+ //
+ // //aos/events/logging:logger_test triggers this.
+ CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
+ << ": Simulated time went backwards by too much. Please "
+ "investigate.";
+
+ now_ = std::get<0>(oldest_event);
+
+ std::get<1>(oldest_event)->CallOldestEvent();
+ });
+
+ MaybeRunStopped();
+
+ return reached_end_time;
+}
+
void EventSchedulerScheduler::Reboot() {
const std::vector<logger::BootTimestamp> × =
std::get<1>(reboots_.front());
@@ -131,7 +260,7 @@
rebooted.emplace_back(node_index);
CHECK_EQ(schedulers_[node_index]->boot_count() + 1,
times[node_index].boot);
- schedulers_[node_index]->RunStopped();
+ schedulers_[node_index]->MaybeRunStopped();
schedulers_[node_index]->Shutdown();
}
}
@@ -140,16 +269,10 @@
// (especially message_bridge), it could try to send stuff out. We want
// to move everything over to the new boot before doing that.
for (const size_t node_index : rebooted) {
- CHECK_EQ(schedulers_[node_index]->boot_count() + 1, times[node_index].boot);
schedulers_[node_index]->Startup();
}
-
for (const size_t node_index : rebooted) {
- schedulers_[node_index]->RunStarted();
- }
-
- for (const size_t node_index : rebooted) {
- schedulers_[node_index]->RunOnRun();
+ schedulers_[node_index]->MaybeRunOnRun();
}
is_running_ = true;
}
@@ -157,11 +280,10 @@
void EventSchedulerScheduler::RunFor(distributed_clock::duration duration) {
distributed_clock::time_point end_time = now_ + duration;
logging::ScopedLogRestorer prev_logger;
- RunOnStartup();
- RunOnRun();
+ MaybeRunOnStartup();
// Run all the sub-event-schedulers.
- while (is_running_) {
+ RunMaybeRealtimeLoop([this, end_time]() {
std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
OldestEvent();
if (!reboots_.empty() &&
@@ -170,7 +292,7 @@
if (std::get<0>(reboots_.front()) > end_time) {
// Reboot is after our end time, give up.
is_running_ = false;
- break;
+ return;
}
CHECK_LE(now_,
@@ -180,14 +302,14 @@
now_ = std::get<0>(reboots_.front());
Reboot();
reboots_.erase(reboots_.begin());
- continue;
+ return;
}
// No events left, bail.
if (std::get<0>(oldest_event) == distributed_clock::max_time ||
std::get<0>(oldest_event) > end_time) {
is_running_ = false;
- break;
+ return;
}
// We get to pick our tradeoffs here. Either we assume that there are no
@@ -199,22 +321,23 @@
// //aos/events/logging:logger_test triggers this.
CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
<< ": Simulated time went backwards by too much. Please investigate.";
+ // push time forwards
now_ = std::get<0>(oldest_event);
std::get<1>(oldest_event)->CallOldestEvent();
- }
+ });
now_ = end_time;
- RunStopped();
+ MaybeRunStopped();
}
void EventSchedulerScheduler::Run() {
logging::ScopedLogRestorer prev_logger;
- RunOnStartup();
- RunOnRun();
+ MaybeRunOnStartup();
+
// Run all the sub-event-schedulers.
- while (is_running_) {
+ RunMaybeRealtimeLoop([this]() {
std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
OldestEvent();
if (!reboots_.empty() &&
@@ -227,11 +350,12 @@
now_ = std::get<0>(reboots_.front());
Reboot();
reboots_.erase(reboots_.begin());
- continue;
+ return;
}
// No events left, bail.
if (std::get<0>(oldest_event) == distributed_clock::max_time) {
- break;
+ is_running_ = false;
+ return;
}
// We get to pick our tradeoffs here. Either we assume that there are no
@@ -246,11 +370,55 @@
now_ = std::get<0>(oldest_event);
std::get<1>(oldest_event)->CallOldestEvent();
- }
+ });
- is_running_ = false;
+ MaybeRunStopped();
+}
- RunStopped();
+template <typename F>
+void EventSchedulerScheduler::RunMaybeRealtimeLoop(F loop_body) {
+ internal::TimerFd timerfd;
+ CHECK_LT(0.0, replay_rate_) << "Replay rate must be positive.";
+ distributed_clock::time_point last_distributed_clock =
+ std::get<0>(OldestEvent());
+ monotonic_clock::time_point last_monotonic_clock = monotonic_clock::now();
+ timerfd.SetTime(last_monotonic_clock, std::chrono::seconds(0));
+ epoll_.OnReadable(
+ timerfd.fd(), [this, &last_distributed_clock, &last_monotonic_clock,
+ &timerfd, loop_body]() {
+ const uint64_t read_result = timerfd.Read();
+ if (!is_running_) {
+ epoll_.Quit();
+ return;
+ }
+ CHECK_EQ(read_result, 1u);
+ // Call loop_body() at least once; if we are in infinite-speed replay,
+ // we don't actually want/need the context switches from the epoll
+ // setup, so just loop.
+ // Note: The performance impacts of this code have not been carefully
+ // inspected (e.g., how much does avoiding the context-switch help; does
+ // the timerfd_settime call matter).
+ // This is deliberately written to support the user changing replay
+ // rates dynamically.
+ do {
+ loop_body();
+ if (is_running_) {
+ const monotonic_clock::time_point next_trigger =
+ last_monotonic_clock +
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ (now_ - last_distributed_clock) / replay_rate_);
+ timerfd.SetTime(next_trigger, std::chrono::seconds(0));
+ last_monotonic_clock = next_trigger;
+ last_distributed_clock = now_;
+ } else {
+ epoll_.Quit();
+ }
+ } while (replay_rate_ == std::numeric_limits<double>::infinity() &&
+ is_running_);
+ });
+
+ epoll_.Run();
+ epoll_.DeleteFd(timerfd.fd());
}
std::tuple<distributed_clock::time_point, EventScheduler *>
@@ -284,12 +452,23 @@
const bool was_running = is_running_;
if (is_running_) {
is_running_ = false;
- RunStopped();
+ MaybeRunStopped();
}
fn();
if (was_running) {
- RunOnStartup();
- RunOnRun();
+ MaybeRunOnStartup();
+ }
+}
+
+void EventSchedulerScheduler::MaybeRunOnStartup() {
+ is_running_ = true;
+ for (EventScheduler *scheduler : schedulers_) {
+ scheduler->MaybeRunOnStartup();
+ }
+ // We must trigger all the OnRun's *after* all the OnStartup callbacks are
+ // triggered because that is the contract that we have stated.
+ for (EventScheduler *scheduler : schedulers_) {
+ scheduler->MaybeRunOnRun();
}
}
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index d91a75e..237a240 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -8,6 +8,7 @@
#include <utility>
#include <vector>
+#include "aos/events/epoll.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/boot_timestamp.h"
#include "aos/logging/implementations.h"
@@ -108,47 +109,50 @@
// Returns an iterator to the event
Token Schedule(monotonic_clock::time_point time, Event *callback);
- // Schedules a callback when the event scheduler starts.
+ // Schedules a callback whenever the event scheduler starts, after we have
+ // entered the running state. Callbacks are cleared after being called once.
+ // Will not get called until a node starts (a node does not start until its
+ // monotonic clock has reached at least monotonic_clock::epoch()).
void ScheduleOnRun(std::function<void()> callback) {
on_run_.emplace_back(std::move(callback));
}
- // Schedules a callback when the event scheduler starts.
+ // Schedules a callback whenever the event scheduler starts, before we have
+ // entered the running state. Callbacks are cleared after being called once.
+ // Will not get called until a node starts (a node does not start until its
+ // monotonic clock has reached at least monotonic_clock::epoch()).
void ScheduleOnStartup(std::function<void()> callback) {
on_startup_.emplace_back(std::move(callback));
}
+ // Schedules a callback for whenever a node reboots, after we have exited the
+ // running state. Does not get called when the event scheduler stops (unless
+ // it is stopping to execute the reboot).
void set_on_shutdown(std::function<void()> callback) {
on_shutdown_ = std::move(callback);
}
+ // Identical to ScheduleOnStartup, except that only one callback may get set
+ // and it will not be cleared after being called.
void set_started(std::function<void()> callback) {
started_ = std::move(callback);
}
+ // Schedules a callback for whenever the scheduler exits the running state
+ // (running will be false during the callback). This includes both node
+ // reboots and the end of regular execution. Will not be called if the node
+ // never started.
void set_stopped(std::function<void()> callback) {
stopped_ = std::move(callback);
}
- std::function<void()> started_;
- std::function<void()> stopped_;
- std::function<void()> on_shutdown_;
-
Token InvalidToken() { return events_list_.end(); }
// Deschedule an event by its iterator
void Deschedule(Token token);
- // Runs the OnRun callbacks.
- void RunOnRun();
-
- // Runs the OnStartup callbacks.
- void RunOnStartup() noexcept;
-
// Runs the Started callback.
- void RunStarted();
- // Runs the Started callback.
- void RunStopped();
+ void MaybeRunStopped();
// Returns true if events are being handled.
inline bool is_running() const;
@@ -185,12 +189,24 @@
size_t node_index() const { return node_index_; }
+ private:
+ friend class EventSchedulerScheduler;
+
+ // Runs the OnRun callbacks.
+ void RunOnRun();
+
+ // Runs the OnStartup callbacks.
+ void RunOnStartup() noexcept;
+
+ // Runs the Started callback.
+ void RunStarted();
+
// For implementing reboots.
void Shutdown();
void Startup();
- private:
- friend class EventSchedulerScheduler;
+ void MaybeRunOnStartup();
+ void MaybeRunOnRun();
// Current execution time.
monotonic_clock::time_point monotonic_now_ = monotonic_clock::epoch();
@@ -213,6 +229,15 @@
// distinguish which one.
size_t node_index_ = 0;
+ // Whether this individual scheduler is currently running.
+ bool is_running_ = false;
+ // Whether we have called all the startup handlers during this boot.
+ bool called_started_ = false;
+
+ std::function<void()> started_;
+ std::function<void()> stopped_;
+ std::function<void()> on_shutdown_;
+
// Converts time by doing nothing to it.
class UnityConverter final : public TimeConverter {
public:
@@ -270,33 +295,28 @@
// Stops running.
void Exit() { is_running_ = false; }
- bool is_running() const { return is_running_; }
-
// Runs for a duration on the distributed clock. Time on the distributed
// clock should be very representative of time on each node, but won't be
// exactly the same.
void RunFor(distributed_clock::duration duration);
+ // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+ // try to play events in realtime. 0.5 will run at half speed. Use infinity
+ // (the default) to run as fast as possible. This can be changed during
+ // run-time.
+ void SetReplayRate(double replay_rate) { replay_rate_ = replay_rate; }
+ internal::EPoll *epoll() { return &epoll_; }
+
+ // Run until time. fn_realtime_offset is a function that returns the
+ // realtime offset.
+ // Returns true if it ran until time (i.e., Exit() was not called before
+ // end_time).
+ bool RunUntil(realtime_clock::time_point end_time, EventScheduler *scheduler,
+ std::function<std::chrono::nanoseconds()> fn_realtime_offset);
+
// Returns the current distributed time.
distributed_clock::time_point distributed_now() const { return now_; }
- void RunOnStartup() {
- CHECK(!is_running_);
- for (EventScheduler *scheduler : schedulers_) {
- scheduler->RunOnStartup();
- }
- for (EventScheduler *scheduler : schedulers_) {
- scheduler->RunStarted();
- }
- }
-
- void RunStopped() {
- CHECK(!is_running_);
- for (EventScheduler *scheduler : schedulers_) {
- scheduler->RunStopped();
- }
- }
-
void SetTimeConverter(TimeConverter *time_converter) {
time_converter->set_reboot_found(
[this](distributed_clock::time_point reboot_time,
@@ -314,20 +334,20 @@
void TemporarilyStopAndRun(std::function<void()> fn);
private:
- // Handles running the OnRun functions.
- void RunOnRun() {
- CHECK(!is_running_);
- is_running_ = true;
- for (EventScheduler *scheduler : schedulers_) {
- scheduler->RunOnRun();
- }
- }
-
void Reboot();
+ void MaybeRunStopped();
+ void MaybeRunOnStartup();
+
// Returns the next event time and scheduler on which to run it.
std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
+ // Handles running loop_body repeatedly until complete. loop_body should
+ // return the next time at which it wants to be called, and set is_running_ to
+ // false once we should stop.
+ template <typename F>
+ void RunMaybeRealtimeLoop(F loop_body);
+
// True if we are running.
bool is_running_ = false;
// The current time.
@@ -339,6 +359,9 @@
std::vector<std::tuple<distributed_clock::time_point,
std::vector<logger::BootTimestamp>>>
reboots_;
+
+ double replay_rate_ = std::numeric_limits<double>::infinity();
+ internal::EPoll epoll_;
};
inline distributed_clock::time_point EventScheduler::distributed_now() const {
@@ -353,9 +376,7 @@
return t.time;
}
-inline bool EventScheduler::is_running() const {
- return scheduler_scheduler_->is_running();
-}
+inline bool EventScheduler::is_running() const { return is_running_; }
} // namespace aos
diff --git a/aos/events/event_scheduler_test.cc b/aos/events/event_scheduler_test.cc
index 54fb91a..c32399c 100644
--- a/aos/events/event_scheduler_test.cc
+++ b/aos/events/event_scheduler_test.cc
@@ -2,6 +2,7 @@
#include <chrono>
+#include "aos/network/testing_time_converter.h"
#include "gtest/gtest.h"
namespace aos {
@@ -67,6 +68,16 @@
std::vector<UUID> uuids_;
};
+class FunctionEvent : public EventScheduler::Event {
+ public:
+ FunctionEvent(std::function<void()> fn) : fn_(fn) {}
+
+ void Handle() noexcept override { fn_(); }
+
+ private:
+ std::function<void()> fn_;
+};
+
// Tests that the default parameters (slope of 1, offest of 0) behave as
// an identity.
TEST(EventSchedulerTest, IdentityTimeConversion) {
@@ -108,4 +119,408 @@
distributed_clock::epoch() + chrono::seconds(1));
}
+// Test that RunUntil() stops at the appointed time and returns correctly.
+TEST(EventSchedulerTest, RunUntil) {
+ int counter = 0;
+ EventSchedulerScheduler scheduler_scheduler;
+ EventScheduler scheduler(0);
+ scheduler_scheduler.AddEventScheduler(&scheduler);
+
+ FunctionEvent e([&counter]() { counter += 1; });
+ FunctionEvent quitter(
+ [&scheduler_scheduler]() { scheduler_scheduler.Exit(); });
+ scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+ scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(3), &quitter);
+ scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(5), &e);
+ ASSERT_TRUE(scheduler_scheduler.RunUntil(
+ realtime_clock::epoch() + std::chrono::seconds(2), &scheduler,
+ []() { return std::chrono::nanoseconds{0}; }));
+ EXPECT_EQ(counter, 1);
+ ASSERT_FALSE(scheduler_scheduler.RunUntil(
+ realtime_clock::epoch() + std::chrono::seconds(4), &scheduler,
+ []() { return std::chrono::nanoseconds{0}; }));
+ EXPECT_EQ(counter, 1);
+ ASSERT_TRUE(scheduler_scheduler.RunUntil(
+ realtime_clock::epoch() + std::chrono::seconds(6), &scheduler,
+ []() { return std::chrono::nanoseconds{0}; }));
+ EXPECT_EQ(counter, 2);
+}
+
+enum class RunMode {
+ kRun,
+ kRunUntil,
+ kRunFor,
+};
+
+// Sets up a parameterized test case that will excercise all three of the Run(),
+// RunFor(), and RunUntil() methods of the EventSchedulerScheduler. This exposes
+// a ParamRunFor() to the test case that will nominally run for the specified
+// time (except for when in kRun mode, where it will just call Run()).
+class EventSchedulerParamTest : public testing::TestWithParam<RunMode> {
+ public:
+ EventSchedulerParamTest() {
+ schedulers_.reserve(kNumNodes);
+ for (size_t ii = 0; ii < kNumNodes; ++ii) {
+ schedulers_.emplace_back(ii);
+ schedulers_.back().SetTimeConverter(ii, &time_);
+ scheduler_scheduler_.AddEventScheduler(&schedulers_.back());
+ }
+ scheduler_scheduler_.SetTimeConverter(&time_);
+ }
+
+ void StartClocksAtEpoch() {
+ time_.AddMonotonic({BootTimestamp::epoch(), BootTimestamp::epoch()});
+ }
+
+ protected:
+ static constexpr size_t kNumNodes = 2;
+
+ void CheckSchedulersRunning(bool running) {
+ for (EventScheduler &scheduler : schedulers_) {
+ EXPECT_EQ(running, scheduler.is_running());
+ }
+ }
+
+ void ParamRunFor(std::chrono::nanoseconds t) {
+ switch (GetParam()) {
+ case RunMode::kRun:
+ scheduler_scheduler_.Run();
+ break;
+ case RunMode::kRunUntil:
+ scheduler_scheduler_.RunUntil(
+ realtime_clock::time_point(
+ schedulers_.at(0).monotonic_now().time_since_epoch() + t),
+ &schedulers_.at(0), []() { return std::chrono::nanoseconds(0); });
+ break;
+ case RunMode::kRunFor:
+ scheduler_scheduler_.RunFor(t);
+ break;
+ }
+ }
+
+ message_bridge::TestingTimeConverter time_{kNumNodes};
+ std::vector<EventScheduler> schedulers_;
+ EventSchedulerScheduler scheduler_scheduler_;
+};
+
+// Tests that we correctly handle exiting during startup.
+TEST_P(EventSchedulerParamTest, ExitOnStartup) {
+ StartClocksAtEpoch();
+ bool observed_handler = false;
+ schedulers_.at(0).ScheduleOnStartup([this, &observed_handler]() {
+ EXPECT_FALSE(schedulers_.at(0).is_running());
+ observed_handler = true;
+ scheduler_scheduler_.Exit();
+ });
+ ParamRunFor(std::chrono::seconds(1));
+ EXPECT_TRUE(observed_handler);
+}
+
+// Test that creating an event and running the scheduler runs the event.
+TEST_P(EventSchedulerParamTest, ScheduleEvent) {
+ StartClocksAtEpoch();
+ int counter = 0;
+
+ FunctionEvent e([&counter]() { counter += 1; });
+ schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+ ParamRunFor(std::chrono::seconds(1));
+ EXPECT_EQ(counter, 1);
+ auto token = schedulers_.at(0).Schedule(
+ monotonic_clock::epoch() + chrono::seconds(2), &e);
+ schedulers_.at(0).Deschedule(token);
+ ParamRunFor(std::chrono::seconds(2));
+ EXPECT_EQ(counter, 1);
+}
+
+// Tests that a node that would have a negative monotonic time at boot does not
+// get started until later.
+TEST_P(EventSchedulerParamTest, NodeWaitsTillEpochToBoot) {
+ time_.AddNextTimestamp(
+ distributed_clock::epoch(),
+ {BootTimestamp{0, monotonic_clock::epoch()},
+ BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)}});
+ bool observed_startup_0 = false;
+ bool observed_startup_1 = false;
+ bool observed_on_run_1 = false;
+ schedulers_.at(0).ScheduleOnStartup([this, &observed_startup_0]() {
+ observed_startup_0 = true;
+ EXPECT_FALSE(schedulers_.at(0).is_running());
+ EXPECT_FALSE(schedulers_.at(1).is_running());
+ EXPECT_EQ(distributed_clock::epoch(),
+ scheduler_scheduler_.distributed_now());
+ EXPECT_EQ(monotonic_clock::epoch(), schedulers_.at(0).monotonic_now());
+ EXPECT_EQ(monotonic_clock::epoch() - chrono::seconds(1),
+ schedulers_.at(1).monotonic_now());
+ });
+ schedulers_.at(1).ScheduleOnStartup([this, &observed_startup_1]() {
+ observed_startup_1 = true;
+ // Note that we do not *stop* execution on node zero just to get 1 started.
+ EXPECT_TRUE(schedulers_.at(0).is_running());
+ EXPECT_FALSE(schedulers_.at(1).is_running());
+ EXPECT_EQ(distributed_clock::epoch() + chrono::seconds(1),
+ scheduler_scheduler_.distributed_now());
+ EXPECT_EQ(monotonic_clock::epoch() + chrono::seconds(1),
+ schedulers_.at(0).monotonic_now());
+ EXPECT_EQ(monotonic_clock::epoch(), schedulers_.at(1).monotonic_now());
+ });
+ schedulers_.at(1).ScheduleOnRun([this, &observed_on_run_1]() {
+ observed_on_run_1 = true;
+ // Note that we do not *stop* execution on node zero just to get 1 started.
+ EXPECT_TRUE(schedulers_.at(0).is_running());
+ EXPECT_TRUE(schedulers_.at(1).is_running());
+ EXPECT_EQ(distributed_clock::epoch() + chrono::seconds(1),
+ scheduler_scheduler_.distributed_now());
+ EXPECT_EQ(monotonic_clock::epoch() + chrono::seconds(1),
+ schedulers_.at(0).monotonic_now());
+ EXPECT_EQ(monotonic_clock::epoch(), schedulers_.at(1).monotonic_now());
+ });
+
+ FunctionEvent e([]() {});
+ schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+ ParamRunFor(chrono::seconds(1));
+ EXPECT_TRUE(observed_startup_0);
+ EXPECT_TRUE(observed_startup_1);
+ EXPECT_TRUE(observed_on_run_1);
+}
+
+// Tests that a node that never boots does not get any of its handlers run.
+TEST_P(EventSchedulerParamTest, NodeNeverBootsIfAlwaysNegative) {
+ time_.AddNextTimestamp(
+ distributed_clock::epoch(),
+ {BootTimestamp{0, monotonic_clock::epoch()},
+ BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(10)}});
+ bool observed_startup_0 = false;
+ schedulers_.at(0).ScheduleOnStartup([this, &observed_startup_0]() {
+ observed_startup_0 = true;
+ EXPECT_FALSE(schedulers_.at(0).is_running());
+ EXPECT_FALSE(schedulers_.at(1).is_running());
+ EXPECT_EQ(distributed_clock::epoch(),
+ scheduler_scheduler_.distributed_now());
+ EXPECT_EQ(monotonic_clock::epoch(), schedulers_.at(0).monotonic_now());
+ EXPECT_EQ(monotonic_clock::epoch() - chrono::seconds(10),
+ schedulers_.at(1).monotonic_now());
+ });
+ schedulers_.at(1).ScheduleOnStartup(
+ []() { FAIL() << "Should never have hit startup handlers for node 1."; });
+ schedulers_.at(1).ScheduleOnRun(
+ []() { FAIL() << "Should never have hit OnRun handlers for node 1."; });
+ schedulers_.at(1).set_stopped(
+ []() { FAIL() << "Should never have hit stopped handlers for node 1."; });
+
+ FunctionEvent e([this]() { scheduler_scheduler_.Exit(); });
+ schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+ ParamRunFor(chrono::seconds(1));
+ EXPECT_TRUE(observed_startup_0);
+}
+
+// Checks for regressions in how the startup/shutdown handlers behave.
+TEST_P(EventSchedulerParamTest, StartupShutdownHandlers) {
+ StartClocksAtEpoch();
+ time_.AddNextTimestamp(
+ distributed_clock::epoch() + chrono::seconds(3),
+ {BootTimestamp{0, monotonic_clock::epoch() + chrono::seconds(3)},
+ BootTimestamp{0, monotonic_clock::epoch() + chrono::seconds(3)}});
+ time_.RebootAt(0, distributed_clock::epoch() + chrono::seconds(4));
+ // Expected behavior:
+ // If all handlers get called during a reboot, they should sequence as:
+ // * is_running_ = false
+ // * stopped()
+ // * on_shutdown()
+ // * on_startup()
+ // * started()
+ // * is_running_ = true
+ // * OnRun()
+ //
+ // on_shutdown handlers should not get called at end of execution (e.g., when
+ // TemporarilyStopAndRun is called)--only when a node reboots.
+ //
+ // startup and OnRun handlers get cleared after being called once; these are
+ // also the only handlers that can have more than one handler registered.
+ //
+ // Create counters for all the handlers on the 0 node. Create separate a/b
+ // counters for the handlers that can/should get cleared.
+ int shutdown_counter = 0;
+ int stopped_counter = 0;
+ int startup_counter_a = 0;
+ int startup_counter_b = 0;
+ int started_counter = 0;
+ int on_run_counter_a = 0;
+ int on_run_counter_b = 0;
+
+ schedulers_.at(1).set_on_shutdown([]() {
+ FAIL() << "Should never reach the node 1 shutdown handler, since it never "
+ "reboots.";
+ });
+
+ auto startup_handler_a = [this, &startup_counter_a]() {
+ EXPECT_FALSE(schedulers_.at(0).is_running());
+ ++startup_counter_a;
+ };
+
+ auto startup_handler_b = [this, &startup_counter_b]() {
+ EXPECT_FALSE(schedulers_.at(0).is_running());
+ ++startup_counter_b;
+ };
+
+ auto on_run_handler_a = [this, &on_run_counter_a]() {
+ EXPECT_TRUE(schedulers_.at(0).is_running());
+ ++on_run_counter_a;
+ };
+
+ auto on_run_handler_b = [this, &on_run_counter_b]() {
+ EXPECT_TRUE(schedulers_.at(0).is_running());
+ ++on_run_counter_b;
+ };
+
+ schedulers_.at(0).set_stopped([this, &stopped_counter]() {
+ EXPECT_FALSE(schedulers_.at(0).is_running());
+ ++stopped_counter;
+ });
+ schedulers_.at(0).set_on_shutdown(
+ [this, &shutdown_counter, startup_handler_a, on_run_handler_a]() {
+ EXPECT_FALSE(schedulers_.at(0).is_running());
+ schedulers_.at(0).ScheduleOnStartup(startup_handler_a);
+ schedulers_.at(0).ScheduleOnRun(on_run_handler_a);
+ ++shutdown_counter;
+ });
+ schedulers_.at(0).ScheduleOnStartup(startup_handler_a);
+ schedulers_.at(0).set_started([this, &started_counter]() {
+ EXPECT_FALSE(schedulers_.at(0).is_running());
+ ++started_counter;
+ });
+ schedulers_.at(0).ScheduleOnRun(on_run_handler_a);
+
+ FunctionEvent e([]() {});
+ schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+ ParamRunFor(std::chrono::seconds(1));
+ EXPECT_EQ(shutdown_counter, 0);
+ EXPECT_EQ(stopped_counter, 1);
+ EXPECT_EQ(started_counter, 1);
+ EXPECT_EQ(startup_counter_a, 1);
+ EXPECT_EQ(on_run_counter_a, 1);
+ EXPECT_EQ(startup_counter_b, 0);
+ EXPECT_EQ(on_run_counter_b, 0);
+
+ // In the middle, execute a TemporarilyStopAndRun. Use it to re-register the
+ // startup handlers.
+ schedulers_.at(0).ScheduleOnStartup(startup_handler_b);
+ schedulers_.at(0).ScheduleOnRun(on_run_handler_b);
+ FunctionEvent stop_and_run([this, startup_handler_a, on_run_handler_a]() {
+ scheduler_scheduler_.TemporarilyStopAndRun(
+ [this, startup_handler_a, on_run_handler_a]() {
+ schedulers_.at(0).ScheduleOnStartup(startup_handler_a);
+ schedulers_.at(0).ScheduleOnRun(on_run_handler_a);
+ });
+ });
+ schedulers_.at(1).Schedule(monotonic_clock::epoch() + chrono::seconds(2),
+ &stop_and_run);
+ ParamRunFor(std::chrono::seconds(1));
+ EXPECT_EQ(shutdown_counter, 0);
+ EXPECT_EQ(stopped_counter, 3);
+ EXPECT_EQ(started_counter, 3);
+ EXPECT_EQ(startup_counter_a, 2);
+ EXPECT_EQ(on_run_counter_a, 2);
+ EXPECT_EQ(startup_counter_b, 1);
+ EXPECT_EQ(on_run_counter_b, 1);
+
+ // Next, execute a reboot in the middle of running and confirm that things
+ // tally correctly. We do not re-register the startup/on_run handlers before
+ // starting here, but do in the shutdown handler, so should see the A handlers
+ // increment.
+ // We need to schedule at least one event so that the reboot is actually
+ // observable (otherwise Run() will just terminate immediately, since there
+ // are no scheduled events that could possibly observe the reboot anyways).
+ schedulers_.at(1).Schedule(monotonic_clock::epoch() + chrono::seconds(5), &e);
+ ParamRunFor(std::chrono::seconds(5));
+ EXPECT_EQ(shutdown_counter, 1);
+ EXPECT_EQ(stopped_counter, 5);
+ EXPECT_EQ(started_counter, 5);
+ EXPECT_EQ(startup_counter_a, 3);
+ EXPECT_EQ(on_run_counter_a, 3);
+ EXPECT_EQ(startup_counter_b, 1);
+ EXPECT_EQ(on_run_counter_b, 1);
+}
+
+// Test that descheduling an already scheduled event doesn't run the event.
+TEST_P(EventSchedulerParamTest, DescheduleEvent) {
+ StartClocksAtEpoch();
+ int counter = 0;
+ FunctionEvent e([&counter]() { counter += 1; });
+ auto token = schedulers_.at(0).Schedule(
+ monotonic_clock::epoch() + chrono::seconds(1), &e);
+ schedulers_.at(0).Deschedule(token);
+ ParamRunFor(std::chrono::seconds(2));
+ EXPECT_EQ(counter, 0);
+}
+
+// Test that TemporarilyStopAndRun respects and preserves running.
+TEST_P(EventSchedulerParamTest, TemporarilyStopAndRun) {
+ StartClocksAtEpoch();
+ int counter = 0;
+
+ scheduler_scheduler_.TemporarilyStopAndRun([this]() {
+ SCOPED_TRACE("StopAndRun while stopped.");
+ CheckSchedulersRunning(false);
+ });
+ {
+ SCOPED_TRACE("After StopAndRun while stopped.");
+ CheckSchedulersRunning(false);
+ }
+
+ FunctionEvent e([&]() {
+ counter += 1;
+ {
+ SCOPED_TRACE("Before StopAndRun while running.");
+ CheckSchedulersRunning(true);
+ }
+ scheduler_scheduler_.TemporarilyStopAndRun([&]() {
+ SCOPED_TRACE("StopAndRun while running.");
+ CheckSchedulersRunning(false);
+ });
+ {
+ SCOPED_TRACE("After StopAndRun while running.");
+ CheckSchedulersRunning(true);
+ }
+ });
+ schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+ ParamRunFor(std::chrono::seconds(1));
+ EXPECT_EQ(counter, 1);
+}
+
+// Test that TemporarilyStopAndRun leaves stopped nodes stopped.
+TEST_P(EventSchedulerParamTest, TemporarilyStopAndRunStaggeredStart) {
+ time_.AddNextTimestamp(
+ distributed_clock::epoch(),
+ {BootTimestamp{0, monotonic_clock::epoch()},
+ BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(10)}});
+ int counter = 0;
+
+ schedulers_[1].ScheduleOnRun([]() { FAIL(); });
+ schedulers_[1].ScheduleOnStartup([]() { FAIL(); });
+ schedulers_[1].set_on_shutdown([]() { FAIL(); });
+ schedulers_[1].set_started([]() { FAIL(); });
+ schedulers_[1].set_stopped([]() { FAIL(); });
+
+ FunctionEvent e([this, &counter]() {
+ counter += 1;
+ EXPECT_TRUE(schedulers_[0].is_running());
+ EXPECT_FALSE(schedulers_[1].is_running());
+ scheduler_scheduler_.TemporarilyStopAndRun([&]() {
+ SCOPED_TRACE("StopAndRun while running.");
+ CheckSchedulersRunning(false);
+ });
+ EXPECT_TRUE(schedulers_[0].is_running());
+ EXPECT_FALSE(schedulers_[1].is_running());
+ });
+ FunctionEvent exiter([this]() { scheduler_scheduler_.Exit(); });
+ schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
+ schedulers_.at(0).Schedule(monotonic_clock::epoch() + chrono::seconds(2),
+ &exiter);
+ ParamRunFor(std::chrono::seconds(1));
+ EXPECT_EQ(counter, 1);
+}
+
+INSTANTIATE_TEST_SUITE_P(EventSchedulerParamTest, EventSchedulerParamTest,
+ testing::Values(RunMode::kRun, RunMode::kRunFor));
+
} // namespace aos
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 9f6234a..0afdda0 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -1,10 +1,11 @@
load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
load("//aos:config.bzl", "aos_config")
+load("//aos:flatbuffers.bzl", "cc_static_flatbuffer")
flatbuffer_cc_library(
name = "logger_fbs",
srcs = ["logger.fbs"],
- gen_reflections = 1,
+ gen_reflections = True,
includes = [
"//aos:configuration_fbs_includes",
],
@@ -12,6 +13,20 @@
visibility = ["//visibility:public"],
)
+flatbuffer_cc_library(
+ name = "replay_timing_fbs",
+ srcs = ["replay_timing.fbs"],
+ gen_reflections = True,
+ target_compatible_with = ["@platforms//os:linux"],
+)
+
+cc_static_flatbuffer(
+ name = "replay_timing_schema",
+ function = "aos::timing::ReplayTimingSchema",
+ target = ":replay_timing_fbs_reflection_out",
+ visibility = ["//visibility:public"],
+)
+
cc_library(
name = "boot_timestamp",
srcs = ["boot_timestamp.cc"],
@@ -265,9 +280,13 @@
":log_writer",
":logfile_utils",
":logger_fbs",
+ ":replay_timing_fbs",
+ "//aos:condition",
"//aos:uuid",
"//aos/events:event_loop",
+ "//aos/events:shm_event_loop",
"//aos/events:simulated_event_loop",
+ "//aos/mutex",
"//aos/network:message_bridge_server_fbs",
"//aos/network:multinode_timestamp_filter",
"//aos/network:remote_message_fbs",
@@ -276,6 +295,7 @@
"//aos/network:timestamp_filter",
"//aos/time",
"//aos/util:file",
+ "//aos/util:threaded_queue",
"@com_github_google_flatbuffers//:flatbuffers",
"@com_google_absl//absl/strings",
],
@@ -446,6 +466,41 @@
deps = ["//aos/events:aos_config"],
)
+aos_config(
+ name = "multinode_pingpong_triangle_split_config",
+ src = "multinode_pingpong_triangle_split.json",
+ flatbuffers = [
+ "//aos/events:ping_fbs",
+ "//aos/events:pong_fbs",
+ "//aos/network:message_bridge_client_fbs",
+ "//aos/network:message_bridge_server_fbs",
+ "//aos/network:remote_message_fbs",
+ "//aos/network:timestamp_fbs",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+ deps = ["//aos/events:aos_config"],
+)
+
+cc_test(
+ name = "realtime_replay_test",
+ srcs = ["realtime_replay_test.cc"],
+ data = [
+ "//aos/events:pingpong_config",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+ deps = [
+ ":log_reader",
+ ":log_writer",
+ "//aos/events:ping_lib",
+ "//aos/events:pong_lib",
+ "//aos/events:shm_event_loop",
+ "//aos/events:simulated_event_loop",
+ "//aos/testing:googletest",
+ "//aos/testing:path",
+ "//aos/testing:tmpdir",
+ ],
+)
+
cc_test(
name = "logger_test",
srcs = ["logger_test.cc"],
@@ -460,6 +515,7 @@
":multinode_pingpong_split4_config",
":multinode_pingpong_split4_reliable_config",
":multinode_pingpong_split_config",
+ ":multinode_pingpong_triangle_split_config",
"//aos/events:pingpong_config",
],
shard_count = 10,
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 1c3a349..87ad72c 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -15,6 +15,7 @@
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/flatbuffer_merge.h"
+#include "aos/json_to_flatbuffer.h"
#include "aos/network/multinode_timestamp_filter.h"
#include "aos/network/remote_message_generated.h"
#include "aos/network/remote_message_schema.h"
@@ -46,6 +47,19 @@
end_time, "",
"If set, end at this point in time in the log on the realtime clock.");
+DEFINE_bool(drop_realtime_messages_before_start, false,
+ "If set, will drop any messages sent before the start of the "
+ "logfile in realtime replay. Setting this guarantees consistency "
+ "in timing with the original logfile, but means that you lose "
+ "access to fetched low-frequency messages.");
+
+DEFINE_double(
+ threaded_look_ahead_seconds, 2.0,
+ "Time, in seconds, to add to look-ahead when using multi-threaded replay. "
+ "Can validly be zero, but higher values are encouraged for realtime replay "
+ "in order to prevent the replay from ever having to block on waiting for "
+ "the reader to find the next message.");
+
namespace aos {
namespace configuration {
// We don't really want to expose this publicly, but log reader doesn't really
@@ -162,6 +176,11 @@
~EventNotifier() { event_timer_->Disable(); }
+ // Sets the clock offset for realtime playback.
+ void SetClockOffset(std::chrono::nanoseconds clock_offset) {
+ clock_offset_ = clock_offset;
+ }
+
// Returns the event trigger time.
realtime_clock::time_point realtime_event_time() const {
return realtime_event_time_;
@@ -189,7 +208,7 @@
// Whops, time went backwards. Just do it now.
HandleTime();
} else {
- event_timer_->Setup(candidate_monotonic);
+ event_timer_->Setup(candidate_monotonic + clock_offset_);
}
}
@@ -208,6 +227,8 @@
const realtime_clock::time_point realtime_event_time_ =
realtime_clock::min_time;
+ std::chrono::nanoseconds clock_offset_{0};
+
bool called_ = false;
};
@@ -325,9 +346,7 @@
}
if (!configuration::MultiNode(configuration())) {
- states_.emplace_back(std::make_unique<State>(
- std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, "")),
- nullptr));
+ states_.resize(1);
} else {
if (replay_configuration) {
CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -405,6 +424,46 @@
state->OnStart(std::move(fn));
}
+void LogReader::State::QueueThreadUntil(BootTimestamp time) {
+ if (threading_ == ThreadedBuffering::kYes) {
+ CHECK(!message_queuer_.has_value()) << "Can't start thread twice.";
+ message_queuer_.emplace(
+ [this](const BootTimestamp queue_until) {
+ // This will be called whenever anything prompts us for any state
+ // change; there may be wakeups that result in us not having any new
+ // data to push (even if we aren't done), in which case we will return
+ // nullopt but not done().
+ if (last_queued_message_.has_value() &&
+ queue_until < last_queued_message_) {
+ return util::ThreadedQueue<TimestampedMessage,
+ BootTimestamp>::PushResult{
+ std::nullopt, false,
+ last_queued_message_ == BootTimestamp::max_time()};
+ }
+ TimestampedMessage *message = timestamp_mapper_->Front();
+ // Upon reaching the end of the log, exit.
+ if (message == nullptr) {
+ last_queued_message_ = BootTimestamp::max_time();
+ return util::ThreadedQueue<TimestampedMessage,
+ BootTimestamp>::PushResult{std::nullopt,
+ false, true};
+ }
+ last_queued_message_ = message->monotonic_event_time;
+ const util::ThreadedQueue<TimestampedMessage,
+ BootTimestamp>::PushResult result{
+ *message, queue_until >= last_queued_message_, false};
+ timestamp_mapper_->PopFront();
+ SeedSortedMessages();
+ return result;
+ },
+ time);
+ // Spin until the first few seconds of messages are queued up so that we
+ // don't end up with delays/inconsistent timing during the first few seconds
+ // of replay.
+ message_queuer_->WaitForNoMoreWork();
+ }
+}
+
void LogReader::State::OnStart(std::function<void()> fn) {
on_starts_.emplace_back(std::move(fn));
}
@@ -464,6 +523,51 @@
stopped_ = true;
started_ = true;
+ if (message_queuer_.has_value()) {
+ message_queuer_->StopPushing();
+ }
+}
+
+std::vector<
+ std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+LogReader::State::NonExclusiveChannels() {
+ CHECK_NOTNULL(node_event_loop_factory_);
+ const aos::Configuration *config = node_event_loop_factory_->configuration();
+ std::vector<
+ std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+ result{// Timing reports can be sent by logged and replayed applications.
+ {aos::configuration::GetChannel(config, "/aos",
+ "aos.timing.Report", "", node_),
+ NodeEventLoopFactory::ExclusiveSenders::kNo},
+ // AOS_LOG may be used in the log and in replay.
+ {aos::configuration::GetChannel(
+ config, "/aos", "aos.logging.LogMessageFbs", "", node_),
+ NodeEventLoopFactory::ExclusiveSenders::kNo}};
+ for (const Node *const node : configuration::GetNodes(config)) {
+ if (node == nullptr) {
+ break;
+ }
+ const Channel *const old_timestamp_channel = aos::configuration::GetChannel(
+ config,
+ absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
+ "aos.message_bridge.RemoteMessage", "", node_, /*quiet=*/true);
+ // The old-style remote timestamp channel can be populated from any
+ // channel, simulated or replayed.
+ if (old_timestamp_channel != nullptr) {
+ result.push_back(std::make_pair(
+ old_timestamp_channel, NodeEventLoopFactory::ExclusiveSenders::kNo));
+ }
+ }
+ // Remove any channels that weren't found due to not existing in the
+ // config.
+ for (size_t ii = 0; ii < result.size();) {
+ if (result[ii].first == nullptr) {
+ result.erase(result.begin() + ii);
+ } else {
+ ++ii;
+ }
+ }
+ return result;
}
void LogReader::Register() {
@@ -490,11 +594,15 @@
std::vector<LogParts> filtered_parts = FilterPartsForNode(
log_files_, node != nullptr ? node->name()->string_view() : "");
+ // We don't run with threading on the buffering for simulated event loops
+ // because we haven't attempted to validate how the interactions beteen the
+ // buffering and the timestamp mapper works when running multiple nodes
+ // concurrently.
states_[node_index] = std::make_unique<State>(
filtered_parts.size() == 0u
? nullptr
: std::make_unique<TimestampMapper>(std::move(filtered_parts)),
- node);
+ filters_.get(), node, State::ThreadedBuffering::kNo);
State *state = states_[node_index].get();
state->SetNodeEventLoopFactory(
event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -532,7 +640,7 @@
// If we didn't find any log files with data in them, we won't ever get a
// callback or be live. So skip the rest of the setup.
- if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+ if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
continue;
}
++live_nodes_;
@@ -581,7 +689,7 @@
// If we are replaying a log, we don't want a bunch of redundant messages
// from both the real message bridge and simulated message bridge.
- event_loop_factory_->DisableStatistics();
+ event_loop_factory_->PermanentlyDisableStatistics();
}
// Write pseudo start times out to file now that we are all setup.
@@ -661,8 +769,58 @@
return nullptr;
}
+// TODO(jkuszmaul): Make in-line modifications to
+// ServerStatistics/ClientStatistics messages for ShmEventLoop-based replay to
+// avoid messing up anything that depends on them having valid offsets.
void LogReader::Register(EventLoop *event_loop) {
- Register(event_loop, event_loop->node());
+ filters_ =
+ std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
+ event_loop->configuration(), logged_configuration(),
+ log_files_[0].boots, FLAGS_skip_order_validation,
+ chrono::duration_cast<chrono::nanoseconds>(
+ chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
+
+ std::vector<TimestampMapper *> timestamp_mappers;
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ const size_t node_index =
+ configuration::GetNodeIndex(configuration(), node);
+ std::vector<LogParts> filtered_parts = FilterPartsForNode(
+ log_files_, node != nullptr ? node->name()->string_view() : "");
+
+ states_[node_index] = std::make_unique<State>(
+ filtered_parts.size() == 0u
+ ? nullptr
+ : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
+ filters_.get(), node, State::ThreadedBuffering::kYes);
+ State *state = states_[node_index].get();
+
+ state->SetChannelCount(logged_configuration()->channels()->size());
+ timestamp_mappers.emplace_back(state->timestamp_mapper());
+ }
+
+ filters_->SetTimestampMappers(std::move(timestamp_mappers));
+
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ const size_t node_index =
+ configuration::GetNodeIndex(configuration(), node);
+ State *state = states_[node_index].get();
+ for (const Node *other_node : configuration::GetNodes(configuration())) {
+ const size_t other_node_index =
+ configuration::GetNodeIndex(configuration(), other_node);
+ State *other_state = states_[other_node_index].get();
+ if (other_state != state) {
+ state->AddPeer(other_state);
+ }
+ }
+ }
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ if (node == nullptr || node->name()->string_view() ==
+ event_loop->node()->name()->string_view()) {
+ Register(event_loop, event_loop->node());
+ } else {
+ Register(nullptr, node);
+ }
+ }
}
void LogReader::Register(EventLoop *event_loop, const Node *node) {
@@ -671,10 +829,13 @@
// If we didn't find any log files with data in them, we won't ever get a
// callback or be live. So skip the rest of the setup.
- if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+ if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
return;
}
- ++live_nodes_;
+
+ if (event_loop != nullptr) {
+ ++live_nodes_;
+ }
if (event_loop_factory_ != nullptr) {
event_loop_factory_->GetNodeEventLoopFactory(node)->OnStartup(
@@ -687,14 +848,14 @@
}
void LogReader::RegisterDuringStartup(EventLoop *event_loop, const Node *node) {
- if (event_loop) {
+ if (event_loop != nullptr) {
CHECK(event_loop->configuration() == configuration());
}
State *state =
states_[configuration::GetNodeIndex(configuration(), node)].get();
- if (!event_loop) {
+ if (event_loop == nullptr) {
state->ClearTimeFlags();
}
@@ -703,7 +864,7 @@
// We don't run timing reports when trying to print out logged data, because
// otherwise we would end up printing out the timing reports themselves...
// This is only really relevant when we are replaying into a simulation.
- if (event_loop) {
+ if (event_loop != nullptr) {
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
}
@@ -716,10 +877,10 @@
logged_configuration()->channels()->Get(logged_channel_index));
const bool logged = channel->logger() != LoggerConfig::NOT_LOGGED;
-
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
State *source_state = nullptr;
+
if (!configuration::ChannelIsSendableOnNode(channel, node) &&
configuration::ChannelIsReadableOnNode(channel, node)) {
const Node *source_node = configuration::GetNode(
@@ -741,7 +902,10 @@
state->SetChannel(
logged_channel_index,
configuration::ChannelIndex(configuration(), channel),
- event_loop && logged ? event_loop->MakeRawSender(channel) : nullptr,
+ event_loop && logged &&
+ configuration::ChannelIsReadableOnNode(channel, node)
+ ? event_loop->MakeRawSender(channel)
+ : nullptr,
filter, is_forwarded, source_state);
if (is_forwarded && logged) {
@@ -758,10 +922,12 @@
states_[configuration::GetNodeIndex(
configuration(), connection->name()->string_view())]
.get();
- destination_state->SetRemoteTimestampSender(
- logged_channel_index,
- event_loop ? state->RemoteTimestampSender(channel, connection)
- : nullptr);
+ if (destination_state) {
+ destination_state->SetRemoteTimestampSender(
+ logged_channel_index,
+ event_loop ? state->RemoteTimestampSender(channel, connection)
+ : nullptr);
+ }
}
}
}
@@ -775,14 +941,12 @@
}
state->set_timer_handler(event_loop->AddTimer([this, state]() {
- VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
- << "at " << state->event_loop()->context().monotonic_event_time
- << " now " << state->monotonic_now();
- if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+ if (state->MultiThreadedOldestMessageTime() == BootTimestamp::max_time()) {
--live_nodes_;
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
- if (exit_on_finish_ && live_nodes_ == 0) {
- event_loop_factory_->Exit();
+ if (exit_on_finish_ && live_nodes_ == 0 &&
+ event_loop_factory_ != nullptr) {
+ CHECK_NOTNULL(event_loop_factory_)->Exit();
}
return;
}
@@ -794,32 +958,37 @@
const monotonic_clock::time_point monotonic_now =
state->event_loop()->context().monotonic_event_time;
- if (!FLAGS_skip_order_validation) {
- CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
- << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
- << monotonic_now << " trying to send "
- << timestamped_message.monotonic_event_time << " failure "
- << state->DebugString();
- } else if (BootTimestamp{.boot = state->boot_count(),
- .time = monotonic_now} !=
- timestamped_message.monotonic_event_time) {
- LOG(WARNING) << "Check failed: monotonic_now == "
- "timestamped_message.monotonic_event_time) ("
- << monotonic_now << " vs. "
- << timestamped_message.monotonic_event_time
- << "): " << FlatbufferToJson(state->event_loop()->node())
- << " Now " << monotonic_now << " trying to send "
- << timestamped_message.monotonic_event_time << " failure "
- << state->DebugString();
+ if (event_loop_factory_ != nullptr) {
+ // Only enforce exact timing in simulation.
+ if (!FLAGS_skip_order_validation) {
+ CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
+ << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
+ << monotonic_now << " trying to send "
+ << timestamped_message.monotonic_event_time << " failure "
+ << state->DebugString();
+ } else if (BootTimestamp{.boot = state->boot_count(),
+ .time = monotonic_now} !=
+ timestamped_message.monotonic_event_time) {
+ LOG(WARNING) << "Check failed: monotonic_now == "
+ "timestamped_message.monotonic_event_time) ("
+ << monotonic_now << " vs. "
+ << timestamped_message.monotonic_event_time
+ << "): " << FlatbufferToJson(state->event_loop()->node())
+ << " Now " << monotonic_now << " trying to send "
+ << timestamped_message.monotonic_event_time << " failure "
+ << state->DebugString();
+ }
}
if (timestamped_message.monotonic_event_time.time >
state->monotonic_start_time(
timestamped_message.monotonic_event_time.boot) ||
- event_loop_factory_ != nullptr) {
+ event_loop_factory_ != nullptr ||
+ !FLAGS_drop_realtime_messages_before_start) {
if (timestamped_message.data != nullptr && !state->found_last_message()) {
if (timestamped_message.monotonic_remote_time !=
- BootTimestamp::min_time()) {
+ BootTimestamp::min_time() &&
+ !FLAGS_skip_order_validation && event_loop_factory_ != nullptr) {
// Confirm that the message was sent on the sending node before the
// destination node (this node). As a proxy, do this by making sure
// that time on the source node is past when the message was sent.
@@ -890,7 +1059,8 @@
timestamped_message.realtime_event_time);
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
- << timestamped_message.monotonic_event_time;
+ << timestamped_message.monotonic_event_time << " "
+ << state->DebugString();
// TODO(austin): std::move channel_data in and make that efficient in
// simulation.
state->Send(std::move(timestamped_message));
@@ -982,16 +1152,16 @@
}
}
} else {
- LOG(WARNING) << "Not sending data from before the start of the log file. "
- << timestamped_message.monotonic_event_time.time
- .time_since_epoch()
- .count()
- << " start "
- << monotonic_start_time().time_since_epoch().count() << " "
- << *timestamped_message.data;
+ LOG(WARNING)
+ << "Not sending data from before the start of the log file. "
+ << timestamped_message.monotonic_event_time.time.time_since_epoch()
+ .count()
+ << " start "
+ << monotonic_start_time(state->node()).time_since_epoch().count()
+ << " timestamped_message.data is null";
}
- const BootTimestamp next_time = state->OldestMessageTime();
+ const BootTimestamp next_time = state->MultiThreadedOldestMessageTime();
if (next_time != BootTimestamp::max_time()) {
if (next_time.boot != state->boot_count()) {
VLOG(1) << "Next message for "
@@ -1002,18 +1172,28 @@
state->NotifyLogfileEnd();
return;
}
- VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
- << "wakeup for " << next_time.time << "("
- << state->ToDistributedClock(next_time.time)
- << " distributed), now is " << state->monotonic_now();
+ if (event_loop_factory_ != nullptr) {
+ VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+ << "wakeup for " << next_time.time << "("
+ << state->ToDistributedClock(next_time.time)
+ << " distributed), now is " << state->monotonic_now();
+ } else {
+ VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+ << "wakeup for " << next_time.time << ", now is "
+ << state->monotonic_now();
+ }
+ // TODO(james): This can result in negative times getting passed-through
+ // in realtime replay.
state->Setup(next_time.time);
} else {
VLOG(1) << MaybeNodeName(state->event_loop()->node())
<< "No next message, scheduling shutdown";
state->NotifyLogfileEnd();
// Set a timer up immediately after now to die. If we don't do this,
- // then the senders waiting on the message we just read will never get
+ // then the watchers waiting on the message we just read will never get
// called.
+ // Doesn't apply to single-EventLoop replay since the watchers in question
+ // are not under our control.
if (event_loop_factory_ != nullptr) {
state->Setup(monotonic_now + event_loop_factory_->send_delay() +
std::chrono::nanoseconds(1));
@@ -1025,7 +1205,9 @@
<< state->monotonic_now();
}));
- if (state->OldestMessageTime() != BootTimestamp::max_time()) {
+ state->SeedSortedMessages();
+
+ if (state->SingleThreadedOldestMessageTime() != BootTimestamp::max_time()) {
state->set_startup_timer(
event_loop->AddTimer([state]() { state->NotifyLogfileStart(); }));
if (start_time_ != realtime_clock::min_time) {
@@ -1035,8 +1217,16 @@
state->SetEndTimeFlag(end_time_);
}
event_loop->OnRun([state]() {
- BootTimestamp next_time = state->OldestMessageTime();
+ BootTimestamp next_time = state->SingleThreadedOldestMessageTime();
CHECK_EQ(next_time.boot, state->boot_count());
+ // Queue up messages and then set clock offsets (we don't want to set
+ // clock offsets before we've done the work of getting the first messages
+ // primed).
+ state->QueueThreadUntil(
+ next_time + std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(
+ FLAGS_threaded_look_ahead_seconds)));
+ state->MaybeSetClockOffset();
state->Setup(next_time.time);
state->SetupStartupTimer();
});
@@ -1451,9 +1641,14 @@
return remapped_channel;
}
-LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper,
- const Node *node)
- : timestamp_mapper_(std::move(timestamp_mapper)), node_(node) {}
+LogReader::State::State(
+ std::unique_ptr<TimestampMapper> timestamp_mapper,
+ message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
+ const Node *node, LogReader::State::ThreadedBuffering threading)
+ : timestamp_mapper_(std::move(timestamp_mapper)),
+ node_(node),
+ multinode_filters_(multinode_filters),
+ threading_(threading) {}
void LogReader::State::AddPeer(State *peer) {
if (timestamp_mapper_ && peer->timestamp_mapper_) {
@@ -1499,6 +1694,52 @@
factory_channel_index_[logged_channel_index] = factory_channel_index;
}
+void LogReader::State::TrackMessageSendTiming(
+ const RawSender &sender, monotonic_clock::time_point expected_send_time) {
+ if (event_loop_ == nullptr || !timing_statistics_sender_.valid()) {
+ return;
+ }
+
+ timing::MessageTimingT sample;
+ sample.channel = configuration::ChannelIndex(event_loop_->configuration(),
+ sender.channel());
+ sample.expected_send_time = expected_send_time.time_since_epoch().count();
+ sample.actual_send_time =
+ sender.monotonic_sent_time().time_since_epoch().count();
+ sample.send_time_error = aos::time::DurationInSeconds(
+ expected_send_time - sender.monotonic_sent_time());
+ send_timings_.push_back(sample);
+
+ // Somewhat arbitrarily send out timing information in batches of 100. No need
+ // to create excessive overhead in regenerated logfiles.
+ // TODO(james): The overhead may be fine.
+ constexpr size_t kMaxTimesPerStatisticsMessage = 100;
+ CHECK(timing_statistics_sender_.valid());
+ if (send_timings_.size() == kMaxTimesPerStatisticsMessage) {
+ SendMessageTimings();
+ }
+}
+
+void LogReader::State::SendMessageTimings() {
+ if (send_timings_.empty() || !timing_statistics_sender_.valid()) {
+ return;
+ }
+ auto builder = timing_statistics_sender_.MakeBuilder();
+ std::vector<flatbuffers::Offset<timing::MessageTiming>> timing_offsets;
+ for (const auto &timing : send_timings_) {
+ timing_offsets.push_back(
+ timing::MessageTiming::Pack(*builder.fbb(), &timing));
+ }
+ send_timings_.clear();
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<timing::MessageTiming>>>
+ timings_offset = builder.fbb()->CreateVector(timing_offsets);
+ timing::ReplayTiming::Builder timing_builder =
+ builder.MakeBuilder<timing::ReplayTiming>();
+ timing_builder.add_messages(timings_offset);
+ timing_statistics_sender_.CheckOk(builder.Send(timing_builder.Finish()));
+}
+
bool LogReader::State::Send(const TimestampedMessage ×tamped_message) {
aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
CHECK(sender);
@@ -1572,6 +1813,23 @@
source_state->boot_count());
}
+ if (event_loop_factory_ != nullptr &&
+ channel_source_state_[timestamped_message.channel_index] != nullptr &&
+ multinode_filters_ != nullptr) {
+ // Sanity check that we are using consistent boot uuids.
+ State *source_state =
+ channel_source_state_[timestamped_message.channel_index];
+ CHECK_EQ(multinode_filters_->boot_uuid(
+ configuration::GetNodeIndex(event_loop_->configuration(),
+ source_state->node()),
+ timestamped_message.monotonic_remote_time.boot),
+ CHECK_NOTNULL(
+ CHECK_NOTNULL(
+ channel_source_state_[timestamped_message.channel_index])
+ ->event_loop_)
+ ->boot_uuid());
+ }
+
// Send! Use the replayed queue index here instead of the logged queue index
// for the remote queue index. This makes re-logging work.
const auto err = sender->Send(
@@ -1580,11 +1838,22 @@
timestamped_message.monotonic_remote_time.time,
timestamped_message.realtime_remote_time, remote_queue_index,
(channel_source_state_[timestamped_message.channel_index] != nullptr
- ? CHECK_NOTNULL(
- channel_source_state_[timestamped_message.channel_index])
- ->event_loop_->boot_uuid()
+ ? CHECK_NOTNULL(multinode_filters_)
+ ->boot_uuid(configuration::GetNodeIndex(
+ event_loop_->configuration(),
+ channel_source_state_[timestamped_message
+ .channel_index]
+ ->node()),
+ timestamped_message.monotonic_remote_time.boot)
: event_loop_->boot_uuid()));
if (err != RawSender::Error::kOk) return false;
+ if (monotonic_start_time(timestamped_message.monotonic_event_time.boot) <=
+ timestamped_message.monotonic_event_time.time) {
+ // Only track errors for non-fetched messages.
+ TrackMessageSendTiming(
+ *sender,
+ timestamped_message.monotonic_event_time.time + clock_offset());
+ }
if (queue_index_map_[timestamped_message.channel_index]) {
CHECK_EQ(timestamped_message.monotonic_event_time.boot, boot_count());
@@ -1629,6 +1898,11 @@
// map.
} else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
nullptr) {
+ // TODO(james): Currently, If running replay against a single event loop,
+ // remote timestamps will not get replayed because this code-path only
+ // gets triggered on the event loop that receives the forwarded message
+ // that the timestamps correspond to. This code, as written, also doesn't
+ // correctly handle a non-zero clock_offset for the *_remote_time fields.
State *source_state =
CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
@@ -1802,27 +2076,55 @@
}
TimestampedMessage LogReader::State::PopOldest() {
- CHECK(timestamp_mapper_ != nullptr);
- TimestampedMessage *result_ptr = timestamp_mapper_->Front();
- CHECK(result_ptr != nullptr);
+ if (message_queuer_.has_value()) {
+ std::optional<TimestampedMessage> message = message_queuer_->Pop();
+ CHECK(message.has_value()) << ": Unexpectedly ran out of messages.";
+ message_queuer_->SetState(
+ message.value().monotonic_event_time +
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(FLAGS_threaded_look_ahead_seconds)));
+ return message.value();
+ } else {
+ CHECK(timestamp_mapper_ != nullptr);
+ TimestampedMessage *result_ptr = timestamp_mapper_->Front();
+ CHECK(result_ptr != nullptr);
- TimestampedMessage result = std::move(*result_ptr);
+ TimestampedMessage result = std::move(*result_ptr);
- VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
- << result.monotonic_event_time;
- timestamp_mapper_->PopFront();
- SeedSortedMessages();
+ VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
+ << result.monotonic_event_time;
+ timestamp_mapper_->PopFront();
+ SeedSortedMessages();
- CHECK_EQ(result.monotonic_event_time.boot, boot_count());
+ CHECK_EQ(result.monotonic_event_time.boot, boot_count());
- VLOG(1) << "Popped " << result
- << configuration::CleanedChannelToString(
- event_loop_->configuration()->channels()->Get(
- factory_channel_index_[result.channel_index]));
- return result;
+ VLOG(1) << "Popped " << result
+ << configuration::CleanedChannelToString(
+ event_loop_->configuration()->channels()->Get(
+ factory_channel_index_[result.channel_index]));
+ return result;
+ }
}
-BootTimestamp LogReader::State::OldestMessageTime() {
+BootTimestamp LogReader::State::MultiThreadedOldestMessageTime() {
+ if (!message_queuer_.has_value()) {
+ return SingleThreadedOldestMessageTime();
+ }
+ std::optional<TimestampedMessage> message = message_queuer_->Peek();
+ if (!message.has_value()) {
+ return BootTimestamp::max_time();
+ }
+ if (message.value().monotonic_event_time.boot == boot_count()) {
+ ObserveNextMessage(message.value().monotonic_event_time.time,
+ message.value().realtime_event_time);
+ }
+ return message.value().monotonic_event_time;
+}
+
+BootTimestamp LogReader::State::SingleThreadedOldestMessageTime() {
+ CHECK(!message_queuer_.has_value())
+ << "Cannot use SingleThreadedOldestMessageTime() once the queuer thread "
+ "is created.";
if (timestamp_mapper_ == nullptr) {
return BootTimestamp::max_time();
}
@@ -1832,12 +2134,10 @@
}
VLOG(2) << MaybeNodeName(node()) << "oldest message at "
<< result_ptr->monotonic_event_time.time;
-
if (result_ptr->monotonic_event_time.boot == boot_count()) {
ObserveNextMessage(result_ptr->monotonic_event_time.time,
result_ptr->realtime_event_time);
}
-
return result_ptr->monotonic_event_time;
}
@@ -1862,6 +2162,7 @@
event_loop_ = nullptr;
timer_handler_ = nullptr;
node_event_loop_factory_ = nullptr;
+ timing_statistics_sender_ = Sender<timing::ReplayTiming>();
}
void LogReader::State::SetStartTimeFlag(realtime_clock::time_point start_time) {
@@ -1934,5 +2235,27 @@
}
}
+void LogReader::State::MaybeSetClockOffset() {
+ if (node_event_loop_factory_ == nullptr) {
+ // If not running with simulated event loop, set the monotonic clock
+ // offset.
+ clock_offset_ = event_loop()->monotonic_now() - monotonic_start_time(0);
+
+ if (start_event_notifier_) {
+ start_event_notifier_->SetClockOffset(clock_offset_);
+ }
+ if (end_event_notifier_) {
+ end_event_notifier_->SetClockOffset(clock_offset_);
+ }
+ }
+}
+
+void LogReader::SetRealtimeReplayRate(double replay_rate) {
+ CHECK(event_loop_factory_ != nullptr)
+ << ": Can't set replay rate without an event loop factory (have you "
+ "called Register()?).";
+ event_loop_factory_->SetRealtimeReplayRate(replay_rate);
+}
+
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index c1333c6..9e44996 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -3,20 +3,26 @@
#include <chrono>
#include <deque>
+#include <queue>
#include <string_view>
#include <tuple>
#include <vector>
+#include "aos/condition.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/replay_timing_generated.h"
+#include "aos/events/shm_event_loop.h"
#include "aos/events/simulated_event_loop.h"
+#include "aos/mutex/mutex.h"
#include "aos/network/message_bridge_server_generated.h"
#include "aos/network/multinode_timestamp_filter.h"
#include "aos/network/remote_message_generated.h"
#include "aos/network/timestamp_filter.h"
#include "aos/time/time.h"
+#include "aos/util/threaded_queue.h"
#include "aos/uuid.h"
#include "flatbuffers/flatbuffers.h"
@@ -92,10 +98,19 @@
// Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
// and then calls Register.
void Register();
+
// Registers callbacks for all the events after the log file starts. This is
// only useful when replaying live.
void Register(EventLoop *event_loop);
+ // Sets a sender that should be used for tracking timing statistics. If not
+ // set, no statistics will be recorded.
+ void set_timing_accuracy_sender(
+ const Node *node, aos::Sender<timing::ReplayTiming> timing_sender) {
+ states_[configuration::GetNodeIndex(configuration(), node)]
+ ->set_timing_accuracy_sender(std::move(timing_sender));
+ }
+
// Called whenever a log file starts for a node.
void OnStart(std::function<void()> fn);
void OnStart(const Node *node, std::function<void()> fn);
@@ -216,6 +231,13 @@
exit_on_finish_ = exit_on_finish;
}
+ // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+ // try to play events in realtime. 0.5 will run at half speed. Use infinity
+ // (the default) to run as fast as possible. This can be changed during
+ // run-time.
+ // Only applies when running against a SimulatedEventLoopFactory.
+ void SetRealtimeReplayRate(double replay_rate);
+
private:
void Register(EventLoop *event_loop, const Node *node);
@@ -285,7 +307,13 @@
// State per node.
class State {
public:
- State(std::unique_ptr<TimestampMapper> timestamp_mapper, const Node *node);
+ // Whether we should spin up a separate thread for buffering up messages.
+ // Only allowed in realtime replay--see comments on threading_ member for
+ // details.
+ enum class ThreadedBuffering { kYes, kNo };
+ State(std::unique_ptr<TimestampMapper> timestamp_mapper,
+ message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
+ const Node *node, ThreadedBuffering threading);
// Connects up the timestamp mappers.
void AddPeer(State *peer);
@@ -297,12 +325,25 @@
TimestampedMessage PopOldest();
// Returns the monotonic time of the oldest message.
- BootTimestamp OldestMessageTime();
+ BootTimestamp SingleThreadedOldestMessageTime();
+ // Returns the monotonic time of the oldest message, handling querying the
+ // separate thread of ThreadedBuffering was set.
+ BootTimestamp MultiThreadedOldestMessageTime();
size_t boot_count() const {
// If we are replaying directly into an event loop, we can't reboot. So
// we will stay stuck on the 0th boot.
- if (!node_event_loop_factory_) return 0u;
+ if (!node_event_loop_factory_) {
+ if (event_loop_ == nullptr) {
+ // If boot_count is being checked after startup for any of the
+ // non-primary nodes, then returning 0 may not be accurate (since
+ // remote nodes *can* reboot even if the EventLoop being played to
+ // can't).
+ CHECK(!started_);
+ CHECK(!stopped_);
+ }
+ return 0u;
+ }
return node_event_loop_factory_->boot_count();
}
@@ -319,8 +360,10 @@
NotifyLogfileStart();
return;
}
- CHECK_GE(start_time, event_loop_->monotonic_now());
- startup_timer_->Setup(start_time);
+ if (node_event_loop_factory_) {
+ CHECK_GE(start_time + clock_offset(), event_loop_->monotonic_now());
+ }
+ startup_timer_->Setup(start_time + clock_offset());
}
void set_startup_timer(TimerHandler *timer_handler) {
@@ -382,6 +425,7 @@
// distributed clock.
distributed_clock::time_point ToDistributedClock(
monotonic_clock::time_point time) {
+ CHECK(node_event_loop_factory_);
return node_event_loop_factory_->ToDistributedClock(time);
}
@@ -409,12 +453,14 @@
// ensure we are remapping channels correctly.
event_loop_unique_ptr_ = node_event_loop_factory_->MakeEventLoop(
"log_reader", {NodeEventLoopFactory::CheckSentTooFast::kNo,
- NodeEventLoopFactory::ExclusiveSenders::kNo});
+ NodeEventLoopFactory::ExclusiveSenders::kYes,
+ NonExclusiveChannels()});
return event_loop_unique_ptr_.get();
}
distributed_clock::time_point RemoteToDistributedClock(
size_t channel_index, monotonic_clock::time_point time) {
+ CHECK(node_event_loop_factory_);
return channel_source_state_[channel_index]
->node_event_loop_factory_->ToDistributedClock(time);
}
@@ -425,7 +471,7 @@
}
monotonic_clock::time_point monotonic_now() const {
- return node_event_loop_factory_->monotonic_now();
+ return event_loop_->monotonic_now();
}
// Sets the number of channels.
@@ -487,12 +533,16 @@
// Sets the next wakeup time on the replay callback.
void Setup(monotonic_clock::time_point next_time) {
- timer_handler_->Setup(next_time);
+ timer_handler_->Setup(
+ std::max(monotonic_now(), next_time + clock_offset()));
}
// Sends a buffer on the provided channel index.
bool Send(const TimestampedMessage ×tamped_message);
+ void MaybeSetClockOffset();
+ std::chrono::nanoseconds clock_offset() const { return clock_offset_; }
+
// Returns a debug string for the channel merger.
std::string DebugString() const {
if (!timestamp_mapper_) {
@@ -522,7 +572,21 @@
return last_message_[channel_index];
}
+ void set_timing_accuracy_sender(
+ aos::Sender<timing::ReplayTiming> timing_sender) {
+ timing_statistics_sender_ = std::move(timing_sender);
+ OnEnd([this]() { SendMessageTimings(); });
+ }
+
+ // If running with ThreadedBuffering::kYes, will start the processing thread
+ // and queue up messages until the specified time. No-op of
+ // ThreadedBuffering::kNo is set. Should only be called once.
+ void QueueThreadUntil(BootTimestamp time);
+
private:
+ void TrackMessageSendTiming(const RawSender &sender,
+ monotonic_clock::time_point expected_send_time);
+ void SendMessageTimings();
// Log file.
std::unique_ptr<TimestampMapper> timestamp_mapper_;
@@ -556,6 +620,12 @@
uint32_t actual_queue_index = 0xffffffff;
};
+ // Returns a list of channels which LogReader will send on but which may
+ // *also* get sent on by other applications in replay.
+ std::vector<
+ std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
+ NonExclusiveChannels();
+
// Stores all the timestamps that have been sent on this channel. This is
// only done for channels which are forwarded and on the node which
// initially sends the message. Compress using ranges and offsets.
@@ -582,6 +652,7 @@
// going between 2 nodes. The second element in the tuple indicates if this
// is the primary direction or not.
std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
+ message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters_;
// List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
// channel) which correspond to the originating node.
@@ -598,14 +669,46 @@
absl::btree_map<const Channel *, std::shared_ptr<RemoteMessageSender>>
timestamp_loggers_;
+ // Time offset between the log's monotonic clock and the current event
+ // loop's monotonic clock. Useful when replaying logs with non-simulated
+ // event loops.
+ std::chrono::nanoseconds clock_offset_{0};
+
std::vector<std::function<void()>> on_starts_;
std::vector<std::function<void()>> on_ends_;
- bool stopped_ = false;
- bool started_ = false;
+ std::atomic<bool> stopped_ = false;
+ std::atomic<bool> started_ = false;
bool found_last_message_ = false;
std::vector<bool> last_message_;
+
+ std::vector<timing::MessageTimingT> send_timings_;
+ aos::Sender<timing::ReplayTiming> timing_statistics_sender_;
+
+ // Protects access to any internal state after Run() is called. Designed
+ // assuming that only one node is actually executing in replay.
+ // Threading design:
+ // * The worker passed to message_queuer_ has full ownership over all
+ // the log-reading code, timestamp filters, last_queued_message_, etc.
+ // * The main thread should only have exclusive access to the replay
+ // event loop and associated features (mainly senders).
+ // It will pop an item out of the queue (which does maintain a shared_ptr
+ // reference which may also be being used by the message_queuer_ thread,
+ // but having shared_ptr's accessing the same memory from
+ // separate threads is permissible).
+ // Enabling this in simulation is currently infeasible due to a lack of
+ // synchronization in the MultiNodeNoncausalOffsetEstimator. Essentially,
+ // when the message_queuer_ thread attempts to read/pop messages from the
+ // timestamp_mapper_, it will end up calling callbacks that update the
+ // internal state of the MultiNodeNoncausalOffsetEstimator. Simultaneously,
+ // the event scheduler that is running in the main thread to orchestrate the
+ // simulation will be querying the estimator to know what the clocks on the
+ // various nodes are at, leading to potential issues.
+ ThreadedBuffering threading_;
+ std::optional<BootTimestamp> last_queued_message_;
+ std::optional<util::ThreadedQueue<TimestampedMessage, BootTimestamp>>
+ message_queuer_;
};
// Node index -> State.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 5f265e2..a463f4f 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -181,6 +181,46 @@
}
}
+// Tests that we die if the replayer attempts to send on a logged channel.
+TEST_F(LoggerDeathTest, DieOnDuplicateReplayChannels) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ ArtifactPath("aos/events/pingpong_config.json"));
+ SimulatedEventLoopFactory event_loop_factory(&config.message());
+ const ::std::string tmpdir = aos::testing::TestTmpDir();
+ const ::std::string base_name = tmpdir + "/logfile";
+ const ::std::string config_file =
+ absl::StrCat(base_name, kSingleConfigSha256, ".bfbs");
+ const ::std::string logfile = base_name + ".part0.bfbs";
+ // Remove the log file.
+ unlink(config_file.c_str());
+ unlink(logfile.c_str());
+
+ LOG(INFO) << "Logging data to " << logfile;
+
+ {
+ std::unique_ptr<EventLoop> logger_event_loop =
+ event_loop_factory.MakeEventLoop("logger");
+
+ Logger logger(logger_event_loop.get());
+ logger.set_separate_config(false);
+ logger.set_polling_period(std::chrono::milliseconds(100));
+ logger.StartLoggingLocalNamerOnRun(base_name);
+
+ event_loop_factory.RunFor(chrono::seconds(2));
+ }
+
+ LogReader reader(logfile);
+
+ reader.Register();
+
+ std::unique_ptr<EventLoop> test_event_loop =
+ reader.event_loop_factory()->MakeEventLoop("log_reader");
+
+ EXPECT_DEATH(test_event_loop->MakeSender<examples::Ping>("/test"),
+ "exclusive channel.*examples.Ping");
+}
+
// Tests calling StopLogging twice.
TEST_F(LoggerDeathTest, ExtraStop) {
const ::std::string tmpdir = aos::testing::TestTmpDir();
@@ -442,7 +482,8 @@
std::unique_ptr<EventLoop> ping_spammer_event_loop =
event_loop_factory.GetNodeEventLoopFactory(nullptr)->MakeEventLoop(
"ping_spammer", {NodeEventLoopFactory::CheckSentTooFast::kNo,
- NodeEventLoopFactory::ExclusiveSenders::kNo});
+ NodeEventLoopFactory::ExclusiveSenders::kNo,
+ {}});
aos::Sender<examples::Ping> ping_sender =
ping_spammer_event_loop->MakeSender<examples::Ping>("/test");
@@ -2192,6 +2233,149 @@
reader.Deregister();
}
+// Tests that we observe all the same events in log replay (for a given node)
+// whether we just register an event loop for that node or if we register a full
+// event loop factory.
+TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
+ time_converter_.StartEqual();
+ constexpr chrono::milliseconds kStartupDelay(95);
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(kStartupDelay);
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+
+ LogReader full_reader(SortParts(logfiles_));
+ LogReader single_node_reader(SortParts(logfiles_));
+
+ SimulatedEventLoopFactory full_factory(full_reader.configuration());
+ SimulatedEventLoopFactory single_node_factory(
+ single_node_reader.configuration());
+ single_node_factory.SkipTimingReport();
+ single_node_factory.DisableStatistics();
+ std::unique_ptr<EventLoop> replay_event_loop =
+ single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
+ "log_reader");
+
+ full_reader.Register(&full_factory);
+ single_node_reader.Register(replay_event_loop.get());
+
+ const Node *full_pi1 =
+ configuration::GetNode(full_factory.configuration(), "pi1");
+
+ // Confirm we can read the data on the remapped channel, just for pi1. Nothing
+ // else should have moved.
+ std::unique_ptr<EventLoop> full_event_loop =
+ full_factory.MakeEventLoop("test", full_pi1);
+ full_event_loop->SkipTimingReport();
+ full_event_loop->SkipAosLog();
+ // maps are indexed on channel index.
+ // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
+ std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
+ observed_messages;
+ std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
+ for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
+ ++ii) {
+ const Channel *channel =
+ full_event_loop->configuration()->channels()->Get(ii);
+ // We currently don't support replaying remote timestamp channels in
+ // realtime replay.
+ if (channel->name()->string_view().find("remote_timestamp") !=
+ std::string_view::npos) {
+ continue;
+ }
+ if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
+ observed_messages[ii] = {};
+ fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
+ full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
+ if (fetchers[ii]->Fetch()) {
+ observed_messages[ii].push_back(std::make_pair(
+ fetchers[ii]->context().monotonic_event_time, true));
+ }
+ });
+ full_event_loop->MakeRawNoArgWatcher(
+ channel, [ii, &observed_messages](const Context &context) {
+ observed_messages[ii].push_back(
+ std::make_pair(context.monotonic_event_time, false));
+ });
+ }
+ }
+
+ full_factory.Run();
+ fetchers.clear();
+ full_reader.Deregister();
+
+ const Node *single_node_pi1 =
+ configuration::GetNode(single_node_factory.configuration(), "pi1");
+ std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
+
+ std::unique_ptr<EventLoop> single_node_event_loop =
+ single_node_factory.MakeEventLoop("test", single_node_pi1);
+ single_node_event_loop->SkipTimingReport();
+ single_node_event_loop->SkipAosLog();
+ for (size_t ii = 0;
+ ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
+ const Channel *channel =
+ single_node_event_loop->configuration()->channels()->Get(ii);
+ single_node_factory.DisableForwarding(channel);
+ if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
+ single_node_fetchers[ii] =
+ single_node_event_loop->MakeRawFetcher(channel);
+ single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
+ EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
+ << "Single EventLoop replay doesn't support pre-loading fetchers. "
+ << configuration::StrippedChannelToString(channel);
+ });
+ single_node_event_loop->MakeRawNoArgWatcher(
+ channel, [ii, &observed_messages, channel,
+ kStartupDelay](const Context &context) {
+ if (observed_messages[ii].empty()) {
+ FAIL() << "Observed extra message at "
+ << context.monotonic_event_time << " on "
+ << configuration::StrippedChannelToString(channel);
+ return;
+ }
+ const std::pair<monotonic_clock::time_point, bool> &message =
+ observed_messages[ii].front();
+ if (message.second) {
+ EXPECT_LE(message.first,
+ context.monotonic_event_time + kStartupDelay)
+ << "Mismatched message times " << context.monotonic_event_time
+ << " and " << message.first << " on "
+ << configuration::StrippedChannelToString(channel);
+ } else {
+ EXPECT_EQ(message.first,
+ context.monotonic_event_time + kStartupDelay)
+ << "Mismatched message times " << context.monotonic_event_time
+ << " and " << message.first << " on "
+ << configuration::StrippedChannelToString(channel);
+ }
+ observed_messages[ii].erase(observed_messages[ii].begin());
+ });
+ }
+ }
+
+ single_node_factory.Run();
+
+ single_node_fetchers.clear();
+
+ single_node_reader.Deregister();
+
+ for (const auto &pair : observed_messages) {
+ EXPECT_TRUE(pair.second.empty())
+ << "Missed " << pair.second.size() << " messages on "
+ << configuration::StrippedChannelToString(
+ single_node_event_loop->configuration()->channels()->Get(
+ pair.first));
+ }
+}
+
// Tests that we properly recreate forwarded timestamps when replaying a log.
// This should be enough that we can then re-run the logger and get a valid log
// back.
@@ -4349,6 +4533,163 @@
ConfirmReadable(filenames);
}
+// Tests that we can replay a logfile that has timestamps such that at least one
+// node's epoch is at a positive distributed_clock (and thus will have to be
+// booted after the other node(s)).
+TEST_P(MultinodeLoggerTest, StartOneNodeBeforeOther) {
+ std::vector<std::string> filenames;
+
+ CHECK_EQ(pi1_index_, 0u);
+ CHECK_EQ(pi2_index_, 1u);
+
+ time_converter_.AddNextTimestamp(
+ distributed_clock::epoch(),
+ {BootTimestamp::epoch(), BootTimestamp::epoch()});
+
+ const chrono::nanoseconds before_reboot_duration = chrono::milliseconds(1000);
+ time_converter_.RebootAt(
+ 0, distributed_clock::time_point(before_reboot_duration));
+
+ const chrono::nanoseconds test_duration = time_converter_.AddMonotonic(
+ {chrono::milliseconds(10000), chrono::milliseconds(10000)});
+
+ const std::string kLogfile =
+ aos::testing::TestTmpDir() + "/multi_logfile2.1/";
+ util::UnlinkRecursive(kLogfile);
+
+ pi2_->Disconnect(pi1_->node());
+ pi1_->Disconnect(pi2_->node());
+
+ {
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ pi2_logger.StartLogger(kLogfile);
+ event_loop_factory_.RunFor(before_reboot_duration);
+
+ pi2_->Connect(pi1_->node());
+ pi1_->Connect(pi2_->node());
+
+ event_loop_factory_.RunFor(test_duration);
+
+ pi2_logger.AppendAllFilenames(&filenames);
+ }
+
+ const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ ConfirmReadable(filenames);
+
+ {
+ LogReader reader(sorted_parts);
+ SimulatedEventLoopFactory replay_factory(reader.configuration());
+ reader.RegisterWithoutStarting(&replay_factory);
+
+ NodeEventLoopFactory *const replay_node =
+ reader.event_loop_factory()->GetNodeEventLoopFactory("pi1");
+
+ std::unique_ptr<EventLoop> test_event_loop =
+ replay_node->MakeEventLoop("test_reader");
+ replay_node->OnStartup([replay_node]() {
+ // Check that we didn't boot until at least t=0.
+ CHECK_LE(monotonic_clock::epoch(), replay_node->monotonic_now());
+ });
+ test_event_loop->OnRun([&test_event_loop]() {
+ // Check that we didn't boot until at least t=0.
+ EXPECT_LE(monotonic_clock::epoch(), test_event_loop->monotonic_now());
+ });
+ reader.event_loop_factory()->Run();
+ reader.Deregister();
+ }
+}
+
+// Tests that when we have a loop without all the logs at all points in time, we
+// can sort it properly.
+TEST(MultinodeLoggerLoopTest, DISABLED_Loop) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(ArtifactPath(
+ "aos/events/logging/multinode_pingpong_triangle_split_config.json"));
+ message_bridge::TestingTimeConverter time_converter(
+ configuration::NodesCount(&config.message()));
+ SimulatedEventLoopFactory event_loop_factory(&config.message());
+ event_loop_factory.SetTimeConverter(&time_converter);
+
+ NodeEventLoopFactory *const pi1 =
+ event_loop_factory.GetNodeEventLoopFactory("pi1");
+ NodeEventLoopFactory *const pi2 =
+ event_loop_factory.GetNodeEventLoopFactory("pi2");
+ NodeEventLoopFactory *const pi3 =
+ event_loop_factory.GetNodeEventLoopFactory("pi3");
+
+ const std::string kLogfile1_1 =
+ aos::testing::TestTmpDir() + "/multi_logfile1/";
+ const std::string kLogfile2_1 =
+ aos::testing::TestTmpDir() + "/multi_logfile2/";
+ const std::string kLogfile3_1 =
+ aos::testing::TestTmpDir() + "/multi_logfile3/";
+ util::UnlinkRecursive(kLogfile1_1);
+ util::UnlinkRecursive(kLogfile2_1);
+ util::UnlinkRecursive(kLogfile3_1);
+
+ {
+ // Make pi1 boot before everything else.
+ time_converter.AddNextTimestamp(
+ distributed_clock::epoch(),
+ {BootTimestamp::epoch(),
+ BootTimestamp::epoch() - chrono::milliseconds(100),
+ BootTimestamp::epoch() - chrono::milliseconds(300)});
+ }
+
+ // We want to setup a situation such that 2 of the 3 legs of the loop are very
+ // confident about time being X, and the third leg is pulling the average off
+ // to one side.
+ //
+ // It's easiest to visualize this in timestamp_plotter.
+
+ std::vector<std::string> filenames;
+ {
+ // Have pi1 send out a reliable message at startup. This sets up a long
+ // forwarding time message at the start to bias time.
+ std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
+ {
+ aos::Sender<examples::Ping> ping_sender =
+ pi1_event_loop->MakeSender<examples::Ping>("/reliable");
+
+ aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
+ examples::Ping::Builder ping_builder =
+ builder.MakeBuilder<examples::Ping>();
+ CHECK_EQ(builder.Send(ping_builder.Finish()), RawSender::Error::kOk);
+ }
+
+ // Wait a while so there's enough data to let the worst case be rather off.
+ event_loop_factory.RunFor(chrono::seconds(1000));
+
+ // Now start a receiving node first. This sets up 2 tight bounds between 2
+ // of the nodes.
+ LoggerState pi2_logger = LoggerState::MakeLogger(
+ pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+ pi2_logger.StartLogger(kLogfile2_1);
+
+ event_loop_factory.RunFor(chrono::seconds(100));
+
+ // And now start the third leg.
+ LoggerState pi3_logger = LoggerState::MakeLogger(
+ pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+ pi3_logger.StartLogger(kLogfile3_1);
+
+ LoggerState pi1_logger = LoggerState::MakeLogger(
+ pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+ pi1_logger.StartLogger(kLogfile1_1);
+
+ event_loop_factory.RunFor(chrono::seconds(100));
+
+ pi1_logger.AppendAllFilenames(&filenames);
+ pi2_logger.AppendAllFilenames(&filenames);
+ pi3_logger.AppendAllFilenames(&filenames);
+ }
+
+ // Make sure we can read this.
+ const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ auto result = ConfirmReadable(filenames);
+}
+
} // namespace testing
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/multinode_pingpong_triangle_split.json b/aos/events/logging/multinode_pingpong_triangle_split.json
new file mode 100644
index 0000000..226718e
--- /dev/null
+++ b/aos/events/logging/multinode_pingpong_triangle_split.json
@@ -0,0 +1,343 @@
+{
+ "channels": [
+ {
+ "name": "/pi1/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi1",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi2",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi3",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ /* Logged on pi1 locally */
+ {
+ "name": "/pi1/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi1",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi2",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi3",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi2"
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi3"
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.logging.DynamicLogCommand",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.logging.DynamicLogCommand",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi2"
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.logging.DynamicLogCommand",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi3"
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi2"
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi3"
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["pi2", "pi3"],
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_LOGGER"
+ },
+ {
+ "name": "pi3",
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_LOGGER"
+ }
+ ]
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["pi1", "pi3"],
+ "source_node": "pi2",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
+ },
+ {
+ "name": "pi3",
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_LOGGER"
+ }
+ ]
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["pi1", "pi2"],
+ "source_node": "pi3",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_LOGGER"
+ },
+ {
+ "name": "pi2",
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_LOGGER"
+ }
+ ]
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "num_senders": 2,
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi3/pi1/aos/aos-message_bridge-Timestamp",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "num_senders": 2,
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "num_senders": 2,
+ "source_node": "pi2"
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi3/pi2/aos/aos-message_bridge-Timestamp",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "num_senders": 2,
+ "source_node": "pi2"
+ },
+ {
+ "name": "/pi3/aos/remote_timestamps/pi1/pi3/aos/aos-message_bridge-Timestamp",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "num_senders": 2,
+ "source_node": "pi3"
+ },
+ {
+ "name": "/pi3/aos/remote_timestamps/pi2/pi3/aos/aos-message_bridge-Timestamp",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "num_senders": 2,
+ "source_node": "pi3"
+ },
+ /* Reliable channel */
+ {
+ "name": "/reliable",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["pi2", "pi3"],
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": [
+ "pi1"
+ ]
+ },
+ {
+ "name": "pi3",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": [
+ "pi1"
+ ]
+ }
+ ]
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi2/reliable/aos-examples-Ping",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "num_senders": 2,
+ "source_node": "pi1",
+ "frequency": 150
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi3/reliable/aos-examples-Ping",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "num_senders": 2,
+ "source_node": "pi1",
+ "frequency": 150
+ },
+ /* Forwarded to pi2 */
+ {
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": [
+ "pi1"
+ ],
+ "time_to_live": 5000000
+ }
+ ],
+ "frequency": 150
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "num_senders": 2,
+ "source_node": "pi1",
+ "frequency": 150
+ },
+ /* Forwarded back to pi1.
+ * The message is logged both on the sending node and the receiving node
+ * (to make it easier to look at the results for now).
+ *
+ * The timestamps are logged on the receiving node.
+ */
+ {
+ "name": "/test",
+ "type": "aos.examples.Pong",
+ "source_node": "pi2",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["pi1"],
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
+ }
+ ],
+ "frequency": 150
+ }
+ ],
+ "maps": [
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/pi1/aos"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/pi2/aos"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi3"
+ },
+ "rename": {
+ "name": "/pi3/aos"
+ }
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "raspberrypi",
+ "port": 9971
+ },
+ {
+ "name": "pi2",
+ "hostname": "raspberrypi2",
+ "port": 9971
+ },
+ {
+ "name": "pi3",
+ "hostname": "raspberrypi3",
+ "port": 9971
+ }
+ ]
+}
diff --git a/aos/events/logging/realtime_replay_test.cc b/aos/events/logging/realtime_replay_test.cc
new file mode 100644
index 0000000..c7744d4
--- /dev/null
+++ b/aos/events/logging/realtime_replay_test.cc
@@ -0,0 +1,76 @@
+#include "aos/events/logging/log_reader.h"
+#include "aos/events/logging/log_writer.h"
+#include "aos/events/ping_lib.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/json_to_flatbuffer.h"
+#include "aos/testing/path.h"
+#include "aos/testing/tmpdir.h"
+#include "gtest/gtest.h"
+
+namespace aos::logger::testing {
+
+class RealtimeLoggerTest : public ::testing::Test {
+ protected:
+ RealtimeLoggerTest()
+ : shm_dir_(aos::testing::TestTmpDir() + "/aos"),
+ config_file_(
+ aos::testing::ArtifactPath("aos/events/pingpong_config.json")),
+ config_(aos::configuration::ReadConfig(config_file_)),
+ event_loop_factory_(&config_.message()),
+ ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
+ ping_(ping_event_loop_.get()) {
+ FLAGS_shm_base = shm_dir_;
+
+ // Nuke the shm dir, to ensure we aren't being affected by any preexisting
+ // tests.
+ aos::util::UnlinkRecursive(shm_dir_);
+ }
+
+ gflags::FlagSaver flag_saver_;
+ std::string shm_dir_;
+
+ const std::string config_file_;
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+
+ // Factory and Ping class to generate a test logfile.
+ SimulatedEventLoopFactory event_loop_factory_;
+ std::unique_ptr<EventLoop> ping_event_loop_;
+ Ping ping_;
+};
+
+TEST_F(RealtimeLoggerTest, RealtimeReplay) {
+ const std::string tmpdir = aos::testing::TestTmpDir();
+ const std::string base_name = tmpdir + "/logfile/";
+ aos::util::UnlinkRecursive(base_name);
+ {
+ std::unique_ptr<EventLoop> logger_event_loop =
+ event_loop_factory_.MakeEventLoop("logger");
+
+ event_loop_factory_.RunFor(std::chrono::milliseconds(95));
+
+ Logger logger(logger_event_loop.get());
+ logger.set_separate_config(false);
+ logger.set_polling_period(std::chrono::milliseconds(100));
+ logger.StartLoggingLocalNamerOnRun(base_name);
+ event_loop_factory_.RunFor(std::chrono::milliseconds(2000));
+ }
+
+ LogReader reader(logger::SortParts(logger::FindLogs(base_name)));
+ ShmEventLoop shm_event_loop(reader.configuration());
+ reader.Register(&shm_event_loop);
+ reader.OnEnd(shm_event_loop.node(),
+ [&shm_event_loop]() { shm_event_loop.Exit(); });
+
+ Fetcher<examples::Ping> ping_fetcher =
+ shm_event_loop.MakeFetcher<examples::Ping>("/test");
+
+ shm_event_loop.AddTimer([]() { LOG(INFO) << "Hello, World!"; })
+ ->Setup(shm_event_loop.monotonic_now(), std::chrono::seconds(1));
+
+ shm_event_loop.Run();
+ reader.Deregister();
+
+ ASSERT_TRUE(ping_fetcher.Fetch());
+ ASSERT_EQ(ping_fetcher->value(), 210);
+}
+} // namespace aos::logger::testing
diff --git a/aos/events/logging/replay_timing.fbs b/aos/events/logging/replay_timing.fbs
new file mode 100644
index 0000000..dbe7159
--- /dev/null
+++ b/aos/events/logging/replay_timing.fbs
@@ -0,0 +1,16 @@
+namespace aos.timing;
+
+table MessageTiming {
+ channel:uint (id: 0);
+ // Expected and actual monotonic send times, in nanoseconds.
+ expected_send_time:int64 (id: 1);
+ actual_send_time:int64 (id: 2);
+ // expected - actual, in seconds (provides no additional information).
+ send_time_error:double (id: 3);
+}
+
+table ReplayTiming {
+ messages:[MessageTiming] (id: 0);
+}
+
+root_type ReplayTiming;
diff --git a/aos/events/logging/timestamp_plot.cc b/aos/events/logging/timestamp_plot.cc
index 9c5f2dd..1239eb2 100644
--- a/aos/events/logging/timestamp_plot.cc
+++ b/aos/events/logging/timestamp_plot.cc
@@ -64,7 +64,9 @@
}
std::vector<std::string_view> l = absl::StrSplit(n, ", ");
- CHECK_EQ(l.size(), 4u);
+ if (l.size() != 4u) {
+ continue;
+ }
double t;
double o;
CHECK(absl::SimpleAtod(l[0], &t));
@@ -166,7 +168,9 @@
}
std::vector<std::string_view> l = absl::StrSplit(n, ", ");
- CHECK_EQ(l.size(), 3u);
+ if (l.size() != 3u) {
+ continue;
+ }
double t;
double o;
CHECK(absl::SimpleAtod(l[0], &t));
diff --git a/aos/events/multinode_pingpong_test_combined.json b/aos/events/multinode_pingpong_test_combined.json
index 2d58dd0..66ef8df 100644
--- a/aos/events/multinode_pingpong_test_combined.json
+++ b/aos/events/multinode_pingpong_test_combined.json
@@ -250,6 +250,20 @@
"time_to_live": 5000000
}
]
+ },
+ {
+ "name": "/reliable2",
+ "type": "aos.examples.Ping",
+ "source_node": "pi2",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"],
+ "time_to_live": 0
+ }
+ ]
}
],
"maps": [
diff --git a/aos/events/multinode_pingpong_test_split.json b/aos/events/multinode_pingpong_test_split.json
index b160c42..049c407 100644
--- a/aos/events/multinode_pingpong_test_split.json
+++ b/aos/events/multinode_pingpong_test_split.json
@@ -163,6 +163,12 @@
"source_node": "pi1"
},
{
+ "name": "/pi2/aos/remote_timestamps/pi1/reliable2/aos-examples-Ping",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi2"
+ },
+ {
"name": "/pi1/aos",
"type": "aos.timing.Report",
"source_node": "pi1",
@@ -266,6 +272,20 @@
"time_to_live": 5000000
}
]
+ },
+ {
+ "name": "/reliable2",
+ "type": "aos.examples.Ping",
+ "source_node": "pi2",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"],
+ "time_to_live": 0
+ }
+ ]
}
],
"maps": [
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 88504a5..f570296 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -773,6 +773,10 @@
}
}
+void SimulatedEventLoopFactory::SetRealtimeReplayRate(double replay_rate) {
+ scheduler_scheduler_.SetReplayRate(replay_rate);
+}
+
void SimulatedEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &channel, const void *message)> watcher) {
@@ -962,7 +966,27 @@
CHECK(allow_new_senders_)
<< ": Attempted to create a new sender on exclusive channel "
<< configuration::StrippedChannelToString(channel_);
- if (event_loop->options().exclusive_senders == ExclusiveSenders::kYes) {
+ std::optional<ExclusiveSenders> per_channel_option;
+ for (const std::pair<const aos::Channel *, ExclusiveSenders> &per_channel :
+ event_loop->options().per_channel_exclusivity) {
+ if (per_channel.first->name()->string_view() ==
+ channel_->name()->string_view() &&
+ per_channel.first->type()->string_view() ==
+ channel_->type()->string_view()) {
+ CHECK(!per_channel_option.has_value())
+ << ": Channel " << configuration::StrippedChannelToString(channel_)
+ << " listed twice in per-channel list.";
+ per_channel_option = per_channel.second;
+ }
+ }
+ if (!per_channel_option.has_value()) {
+ // This could just as easily be implemented by setting
+ // per_channel_option to the global setting when we initialize it, but
+ // then we'd lose track of whether a given channel appears twice in
+ // the list.
+ per_channel_option = event_loop->options().exclusive_senders;
+ }
+ if (per_channel_option.value() == ExclusiveSenders::kYes) {
CHECK_EQ(0, sender_count_)
<< ": Attempted to add an exclusive sender on a channel with existing "
"senders: "
@@ -1157,6 +1181,7 @@
void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) {
+ CHECK_GE(base, monotonic_clock::epoch());
// The allocations in here are due to infrastructure and don't count in the no
// mallocs in RT code.
ScopedNotRealtime nrt;
@@ -1486,7 +1511,14 @@
void SimulatedEventLoopFactory::DisableStatistics() {
CHECK(bridge_) << ": Can't disable statistics without a message bridge.";
- bridge_->DisableStatistics();
+ bridge_->DisableStatistics(
+ message_bridge::SimulatedMessageBridge::DestroySenders::kNo);
+}
+
+void SimulatedEventLoopFactory::PermanentlyDisableStatistics() {
+ CHECK(bridge_) << ": Can't disable statistics without a message bridge.";
+ bridge_->DisableStatistics(
+ message_bridge::SimulatedMessageBridge::DestroySenders::kYes);
}
void SimulatedEventLoopFactory::EnableStatistics() {
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 7b6eaa7..22eedc1 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -123,6 +123,9 @@
// Disables the messages sent by the simulated message gateway.
void DisableStatistics();
+ // Disables statistics sent by the simulated message gateway, and prevents
+ // EnableStatistcs from ever being called again (used by LogReader).
+ void PermanentlyDisableStatistics();
// Enables the messages sent by the simulated message gateway.
void EnableStatistics();
@@ -136,6 +139,15 @@
// starts up without stopping execution.
void AllowApplicationCreationDuring(std::function<void()> fn);
+ // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
+ // try to play events in realtime. 0.5 will run at half speed. Use infinity
+ // (the default) to run as fast as possible. This can be changed during
+ // run-time.
+ void SetRealtimeReplayRate(double replay_rate);
+
+ // Access to the internal scheduler's epoll object for realtime replay.
+ internal::EPoll *scheduler_epoll() { return scheduler_scheduler_.epoll(); }
+
private:
friend class NodeEventLoopFactory;
friend class SimulatedFactoryExitHandle;
@@ -172,14 +184,18 @@
struct EventLoopOptions {
CheckSentTooFast check_sent_too_fast;
ExclusiveSenders exclusive_senders;
+ // per_channel_exclusivity is used to list any exceptions to the overall
+ // exclusive_senders policy for this event loop.
+ std::vector<std::pair<const aos::Channel *, ExclusiveSenders>>
+ per_channel_exclusivity;
};
// Takes the name for the event loop and a struct of options for selecting
// what checks to run for the event loop in question.
std::unique_ptr<EventLoop> MakeEventLoop(
std::string_view name,
- EventLoopOptions options = EventLoopOptions{CheckSentTooFast::kYes,
- ExclusiveSenders::kNo});
+ EventLoopOptions options = EventLoopOptions{
+ CheckSentTooFast::kYes, ExclusiveSenders::kNo, {}});
// Returns the node that this factory is running as, or nullptr if this is a
// single node setup.
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index eddacb1..2922460 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -139,72 +139,6 @@
aos::FlatbufferDetachedBuffer<aos::Configuration> config;
};
-class FunctionEvent : public EventScheduler::Event {
- public:
- FunctionEvent(std::function<void()> fn) : fn_(fn) {}
-
- void Handle() noexcept override { fn_(); }
-
- private:
- std::function<void()> fn_;
-};
-
-// Test that creating an event and running the scheduler runs the event.
-TEST(EventSchedulerTest, ScheduleEvent) {
- int counter = 0;
- EventSchedulerScheduler scheduler_scheduler;
- EventScheduler scheduler(0);
- scheduler_scheduler.AddEventScheduler(&scheduler);
-
- FunctionEvent e([&counter]() { counter += 1; });
- scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
- scheduler_scheduler.Run();
- EXPECT_EQ(counter, 1);
- auto token =
- scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2), &e);
- scheduler.Deschedule(token);
- scheduler_scheduler.Run();
- EXPECT_EQ(counter, 1);
-}
-
-// Test that descheduling an already scheduled event doesn't run the event.
-TEST(EventSchedulerTest, DescheduleEvent) {
- int counter = 0;
- EventSchedulerScheduler scheduler_scheduler;
- EventScheduler scheduler(0);
- scheduler_scheduler.AddEventScheduler(&scheduler);
-
- FunctionEvent e([&counter]() { counter += 1; });
- auto token =
- scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
- scheduler.Deschedule(token);
- scheduler_scheduler.Run();
- EXPECT_EQ(counter, 0);
-}
-
-// Test that TemporarilyStopAndRun respects and preserves running.
-TEST(EventSchedulerTest, TemporarilyStopAndRun) {
- int counter = 0;
- EventSchedulerScheduler scheduler_scheduler;
- EventScheduler scheduler(0);
- scheduler_scheduler.AddEventScheduler(&scheduler);
-
- scheduler_scheduler.TemporarilyStopAndRun(
- [&]() { CHECK(!scheduler_scheduler.is_running()); });
- ASSERT_FALSE(scheduler_scheduler.is_running());
-
- FunctionEvent e([&]() {
- counter += 1;
- CHECK(scheduler_scheduler.is_running());
- scheduler_scheduler.TemporarilyStopAndRun(
- [&]() { CHECK(!scheduler_scheduler.is_running()); });
- CHECK(scheduler_scheduler.is_running());
- });
- scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1), &e);
- scheduler_scheduler.Run();
- EXPECT_EQ(counter, 1);
-}
-
// Test that sending a message after running gets properly notified.
TEST(SimulatedEventLoopTest, SendAfterRunFor) {
SimulatedEventLoopTestFactory factory;
@@ -262,7 +196,8 @@
simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
->MakeEventLoop("too_fast_sender",
{NodeEventLoopFactory::CheckSentTooFast::kNo,
- NodeEventLoopFactory::ExclusiveSenders::kNo});
+ NodeEventLoopFactory::ExclusiveSenders::kNo,
+ {}});
aos::Sender<TestMessage> too_fast_message_sender =
too_fast_event_loop->MakeSender<TestMessage>("/test");
@@ -292,9 +227,13 @@
::std::unique_ptr<EventLoop> exclusive_event_loop =
simulated_event_loop_factory.GetNodeEventLoopFactory(nullptr)
- ->MakeEventLoop("too_fast_sender",
- {NodeEventLoopFactory::CheckSentTooFast::kYes,
- NodeEventLoopFactory::ExclusiveSenders::kYes});
+ ->MakeEventLoop(
+ "too_fast_sender",
+ {NodeEventLoopFactory::CheckSentTooFast::kYes,
+ NodeEventLoopFactory::ExclusiveSenders::kYes,
+ {{configuration::GetChannel(factory.configuration(), "/test1",
+ "aos.TestMessage", "", nullptr),
+ NodeEventLoopFactory::ExclusiveSenders::kNo}}});
exclusive_event_loop->SkipAosLog();
exclusive_event_loop->SkipTimingReport();
::std::unique_ptr<EventLoop> normal_event_loop =
@@ -313,6 +252,12 @@
normal_event_loop->MakeSender<TestMessage>("/test");
EXPECT_DEATH(exclusive_event_loop->MakeSender<TestMessage>("/test"),
"TestMessage");
+
+ // And check an explicitly exempted channel:
+ aos::Sender<TestMessage> non_exclusive_sender =
+ exclusive_event_loop->MakeSender<TestMessage>("/test1");
+ aos::Sender<TestMessage> non_exclusive_sender_regular_event_loop =
+ normal_event_loop->MakeSender<TestMessage>("/test1");
}
void TestSentTooFastCheckEdgeCase(
@@ -1336,9 +1281,34 @@
pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
"/pi3/aos");
+ std::vector<std::unique_ptr<aos::EventLoop>> statistics_watcher_loops;
+ statistics_watcher_loops.emplace_back(pi1->MakeEventLoop("test"));
+ statistics_watcher_loops.emplace_back(pi2->MakeEventLoop("test"));
+ statistics_watcher_loops.emplace_back(pi3->MakeEventLoop("test"));
+ // The currenct contract is that, if all nodes boot simultaneously in
+ // simulation, that they should all act as if they area already connected,
+ // without ever observing the transition from disconnected to connected (note
+ // that on a real system the ServerStatistics message will get resent for each
+ // and every new connection, even if the new connections happen
+ // "simultaneously"--in simulation, we are essentially acting as if we are
+ // starting execution in an already running system, rather than observing the
+ // boot process).
+ for (auto &event_loop : statistics_watcher_loops) {
+ event_loop->MakeWatcher(
+ "/aos", [](const message_bridge::ServerStatistics &msg) {
+ for (const message_bridge::ServerConnection *connection :
+ *msg.connections()) {
+ EXPECT_EQ(message_bridge::State::CONNECTED, connection->state())
+ << connection->node()->name()->string_view();
+ }
+ });
+ }
+
simulated_event_loop_factory.RunFor(chrono::seconds(2) +
chrono::milliseconds(5));
+ statistics_watcher_loops.clear();
+
EXPECT_EQ(pi1_pong_counter.count(), 201u);
EXPECT_EQ(pi2_pong_counter.count(), 201u);
@@ -1631,8 +1601,13 @@
std::unique_ptr<EventLoop> pi2_pong_event_loop =
simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+ aos::Sender<examples::Ping> pi2_reliable_sender =
+ pi2_pong_event_loop->MakeSender<examples::Ping>("/reliable2");
+ SendPing(&pi2_reliable_sender, 1);
MessageCounter<examples::Ping> pi2_reliable_counter(pi2_pong_event_loop.get(),
"/reliable");
+ MessageCounter<examples::Ping> pi1_reliable_counter(ping_event_loop.get(),
+ "/reliable2");
MessageCounter<examples::Ping> pi2_unreliable_counter(
pi2_pong_event_loop.get(), "/unreliable");
aos::Fetcher<examples::Ping> reliable_on_pi2_fetcher =
@@ -1688,6 +1663,7 @@
SendPing(&pi1_unreliable_sender, 2);
simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
EXPECT_EQ(pi2_reliable_counter.count(), 2u);
+ EXPECT_EQ(pi1_reliable_counter.count(), 1u);
EXPECT_EQ(pi2_unreliable_counter.count(), 1u);
EXPECT_EQ(reliable_timestamp_count, 2u);
@@ -2186,6 +2162,71 @@
EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
}
+TEST(SimulatedEventLoopTest, ReliableMessageSentOnStaggeredBoot) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+ message_bridge::TestingTimeConverter time(
+ configuration::NodesCount(&config.message()));
+ time.AddNextTimestamp(
+ distributed_clock::epoch(),
+ {BootTimestamp{0, monotonic_clock::epoch()},
+ BootTimestamp{0, monotonic_clock::epoch() - chrono::seconds(1)},
+ BootTimestamp{0, monotonic_clock::epoch()}});
+ SimulatedEventLoopFactory factory(&config.message());
+ factory.SetTimeConverter(&time);
+
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+ NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+ const UUID pi1_boot_uuid = pi1->boot_uuid();
+ const UUID pi2_boot_uuid = pi2->boot_uuid();
+ EXPECT_NE(pi1_boot_uuid, UUID::Zero());
+ EXPECT_NE(pi2_boot_uuid, UUID::Zero());
+
+ {
+ ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("ping");
+ aos::Sender<examples::Ping> pi1_sender =
+ pi1_event_loop->MakeSender<examples::Ping>("/reliable");
+ SendPing(&pi1_sender, 1);
+ }
+ ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("ping");
+ aos::Sender<examples::Ping> pi2_sender =
+ pi2_event_loop->MakeSender<examples::Ping>("/reliable2");
+ SendPing(&pi2_sender, 1);
+ // Verify that we staggered the OnRun callback correctly.
+ pi2_event_loop->OnRun([pi1, pi2]() {
+ EXPECT_EQ(pi1->monotonic_now(),
+ monotonic_clock::epoch() + std::chrono::seconds(1));
+ EXPECT_EQ(pi2->monotonic_now(), monotonic_clock::epoch());
+ });
+
+ factory.RunFor(chrono::seconds(2));
+
+ {
+ ::std::unique_ptr<EventLoop> pi2_event_loop = pi2->MakeEventLoop("pong");
+ aos::Fetcher<examples::Ping> fetcher =
+ pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
+ ASSERT_TRUE(fetcher.Fetch());
+ EXPECT_EQ(fetcher.context().monotonic_event_time,
+ monotonic_clock::epoch() + factory.network_delay());
+ EXPECT_EQ(fetcher.context().monotonic_remote_time,
+ monotonic_clock::epoch());
+ }
+ {
+ ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
+ aos::Fetcher<examples::Ping> fetcher =
+ pi1_event_loop->MakeFetcher<examples::Ping>("/reliable2");
+ ASSERT_TRUE(fetcher.Fetch());
+ EXPECT_EQ(fetcher.context().monotonic_event_time,
+ monotonic_clock::epoch() + std::chrono::seconds(1) +
+ factory.network_delay());
+ EXPECT_EQ(fetcher.context().monotonic_remote_time,
+ monotonic_clock::epoch() - std::chrono::seconds(1));
+ }
+}
+
class SimulatedEventLoopDisconnectTest : public ::testing::Test {
public:
SimulatedEventLoopDisconnectTest()
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 19ca658..31f3f0e 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -32,6 +32,10 @@
bool forwarding_disabled() const { return forwarding_disabled_; }
void set_forwarding_disabled(bool forwarding_disabled) {
forwarding_disabled_ = forwarding_disabled;
+ if (!forwarding_disabled_) {
+ CHECK(timestamp_logger_ == nullptr);
+ CHECK(sender_ == nullptr);
+ }
}
void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
@@ -50,7 +54,8 @@
server_connection_ =
server_status_->FindServerConnection(send_node_factory_->node());
}
- if (delivery_time_is_logged_ && timestamp_loggers != nullptr) {
+ if (delivery_time_is_logged_ && timestamp_loggers != nullptr &&
+ !forwarding_disabled_) {
timestamp_logger_ =
timestamp_loggers->SenderForChannel(channel_, connection_);
} else {
@@ -79,7 +84,7 @@
MessageBridgeClientStatus *client_status) {
sent_ = false;
send_event_loop_ = send_event_loop;
- if (send_event_loop_) {
+ if (send_event_loop_ && !forwarding_disabled_) {
sender_ = send_event_loop_->MakeRawSender(channel_);
} else {
sender_ = nullptr;
@@ -133,7 +138,6 @@
if (fetcher_->context().data == nullptr || sent_) {
return;
}
- CHECK(!timer_scheduled_);
// Send at startup. It is the best we can do.
const monotonic_clock::time_point monotonic_delivered_time =
@@ -561,16 +565,17 @@
destination_state->second.SetClientState(source, state);
}
-void SimulatedMessageBridge::DisableStatistics() {
+void SimulatedMessageBridge::DisableStatistics(DestroySenders destroy_senders) {
for (std::pair<const Node *const, State> &state : event_loop_map_) {
- state.second.DisableStatistics();
+ state.second.DisableStatistics(destroy_senders);
}
}
-void SimulatedMessageBridge::DisableStatistics(const Node *node) {
+void SimulatedMessageBridge::DisableStatistics(const Node *node,
+ DestroySenders destroy_senders) {
auto it = event_loop_map_.find(node);
CHECK(it != event_loop_map_.end());
- it->second.DisableStatistics();
+ it->second.DisableStatistics(destroy_senders);
}
void SimulatedMessageBridge::EnableStatistics() {
@@ -625,7 +630,7 @@
timestamp_loggers = ChannelTimestampSender(event_loop.get());
server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
if (disable_statistics_) {
- server_status->DisableStatistics();
+ server_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
}
{
@@ -659,7 +664,7 @@
}
client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
if (disable_statistics_) {
- client_status->DisableStatistics();
+ client_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
}
for (size_t i = 0;
@@ -701,8 +706,16 @@
configuration::ConnectionDeliveryTimeIsLoggedOnNode(
connection, event_loop->node());
+ const RawMessageDelayer *delayer = nullptr;
+ for (const RawMessageDelayer *candidate : source_delayers_) {
+ if (candidate->channel() == channel) {
+ delayer = candidate;
+ }
+ }
+
// And the timestamps are then logged back by us again.
- if (!delivery_time_is_logged) {
+ if (!delivery_time_is_logged ||
+ CHECK_NOTNULL(delayer)->forwarding_disabled()) {
continue;
}
@@ -725,6 +738,25 @@
destination_delayer->ScheduleReliable();
}
}
+ // Note: This exists to work around the fact that some users like to be able
+ // to send reliable messages while execution is stopped, creating a
+ // situation where the following sequencing can occur:
+ // 1) <While stopped> Send a reliable message on Node A (to be forwarded to
+ // Node B).
+ // 2) Node B starts up.
+ // 3) Anywhere from 0 to N seconds later, Node A starts up.
+ //
+ // In this case, we need the reliable message to make it to Node B, but it
+ // also shouldn't make it to Node B until Node A has started up.
+ //
+ // Ideally, if the user were to wait for the Node B OnRun callbacks to send
+ // the message, then that would trigger the watchers in the delayers.
+ // However, we so far have continued to support Sending while stopped....
+ for (RawMessageDelayer *source_delayer : source_delayers_) {
+ if (source_delayer->time_to_live() == 0) {
+ source_delayer->ScheduleReliable();
+ }
+ }
});
}
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index f5b3eae..9565029 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -35,8 +35,10 @@
// Disables generating and sending the messages which message_gateway sends.
// The messages are the ClientStatistics, ServerStatistics and Timestamp
// messages.
- void DisableStatistics();
- void DisableStatistics(const Node *node);
+ enum class DestroySenders { kNo, kYes };
+ void DisableStatistics(DestroySenders destroy_senders = DestroySenders::kNo);
+ void DisableStatistics(const Node *node,
+ DestroySenders destroy_senders = DestroySenders::kNo);
void EnableStatistics();
void EnableStatistics(const Node *node);
@@ -55,13 +57,16 @@
}
State(const State &state) = delete;
- void DisableStatistics() {
+ void DisableStatistics(DestroySenders destroy_senders) {
disable_statistics_ = true;
+ destroy_senders_ = destroy_senders;
if (server_status) {
- server_status->DisableStatistics();
+ server_status->DisableStatistics(destroy_senders ==
+ DestroySenders::kYes);
}
if (client_status) {
- client_status->DisableStatistics();
+ client_status->DisableStatistics(destroy_senders ==
+ DestroySenders::kYes);
}
}
@@ -88,7 +93,8 @@
// logfiles.
SetEventLoop(node_factory_->MakeEventLoop(
"message_bridge", {NodeEventLoopFactory::CheckSentTooFast::kNo,
- NodeEventLoopFactory::ExclusiveSenders::kNo}));
+ NodeEventLoopFactory::ExclusiveSenders::kNo,
+ {}}));
}
void SetEventLoop(std::unique_ptr<aos::EventLoop> loop);
@@ -218,7 +224,9 @@
std::vector<RawMessageDelayer *> destination_delayers_;
bool disable_statistics_ = false;
+ DestroySenders destroy_senders_ = DestroySenders::kNo;
};
+
// Map of nodes to event loops. This is a member variable so that the
// lifetime of the event loops matches the lifetime of the bridge.
std::map<const Node *, State> event_loop_map_;
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 57dd94b..b708ebc 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -211,8 +211,8 @@
Event ready2;
// Start the thread.
- ::std::thread t1([this, &ready1]() { RunUntilWakeup(&ready1, 5); });
- ::std::thread t2([this, &ready2]() { RunUntilWakeup(&ready2, 4); });
+ ::std::thread t1([this, &ready1]() { RunUntilWakeup(&ready1, 2); });
+ ::std::thread t2([this, &ready2]() { RunUntilWakeup(&ready2, 1); });
ready1.Wait();
ready2.Wait();
@@ -330,20 +330,27 @@
namespace {
-// Temporarily pins the current thread to CPUs 0 and 1.
+// Temporarily pins the current thread to the first 2 available CPUs.
// This speeds up the test on some machines a lot (~4x). It also preserves
// opportunities for the 2 threads to race each other.
class PinForTest {
public:
PinForTest() {
- PCHECK(sched_getaffinity(0, sizeof(old_), &old_) == 0);
- cpu_set_t new_set;
- CPU_ZERO(&new_set);
- CPU_SET(0, &new_set);
- CPU_SET(1, &new_set);
- PCHECK(sched_setaffinity(0, sizeof(new_set), &new_set) == 0);
+ cpu_set_t cpus = GetCurrentThreadAffinity();
+ old_ = cpus;
+ int number_found = 0;
+ for (int i = 0; i < CPU_SETSIZE; ++i) {
+ if (CPU_ISSET(i, &cpus)) {
+ if (number_found < 2) {
+ ++number_found;
+ } else {
+ CPU_CLR(i, &cpus);
+ }
+ }
+ }
+ SetCurrentThreadAffinity(cpus);
}
- ~PinForTest() { PCHECK(sched_setaffinity(0, sizeof(old_), &old_) == 0); }
+ ~PinForTest() { SetCurrentThreadAffinity(old_); }
private:
cpu_set_t old_;
diff --git a/aos/json_to_flatbuffer.h b/aos/json_to_flatbuffer.h
index 7168e4c..45920a8 100644
--- a/aos/json_to_flatbuffer.h
+++ b/aos/json_to_flatbuffer.h
@@ -102,7 +102,8 @@
std::ostreambuf_iterator<char>(file));
}
-// Parses a file as JSON and returns the corresponding Flatbuffer, or dies.
+// Parses a file as JSON and returns the corresponding Flatbuffer. Dies if
+// reading the file fails, returns an empty buffer if the contents are invalid.
template <typename T>
inline FlatbufferDetachedBuffer<T> JsonFileToFlatbuffer(
const std::string_view path) {
diff --git a/aos/network/log_web_proxy_main.cc b/aos/network/log_web_proxy_main.cc
index 1cc8e17..34e40a2 100644
--- a/aos/network/log_web_proxy_main.cc
+++ b/aos/network/log_web_proxy_main.cc
@@ -17,6 +17,9 @@
DEFINE_int32(buffer_size, -1, "-1 if infinite, in # of messages / channel.");
DEFINE_double(monotonic_start_time, -1.0, "Start time (sec)");
DEFINE_double(monotonic_end_time, -1.0, "End time (sec)");
+DEFINE_double(
+ replay_rate, -1,
+ "-1 to replay as fast as possible; 1.0 = realtime, 0.5 = half speed.");
int main(int argc, char **argv) {
aos::InitGoogle(&argc, &argv);
@@ -31,6 +34,12 @@
reader.Register();
+ // If going for "as fast as possible" don't actually use infinity, because we
+ // don't want the log reading blocking our use of the epoll handlers.
+ reader.SetRealtimeReplayRate(FLAGS_replay_rate == -1.0
+ ? std::numeric_limits<double>::max()
+ : FLAGS_replay_rate);
+
std::unique_ptr<aos::EventLoop> event_loop;
if (FLAGS_node.empty()) {
@@ -55,13 +64,12 @@
}
aos::web_proxy::WebProxy web_proxy(
- event_loop.get(), aos::web_proxy::StoreHistory::kYes, FLAGS_buffer_size);
+ event_loop.get(),
+ reader.event_loop_factory()->scheduler_epoll(),
+ aos::web_proxy::StoreHistory::kYes, FLAGS_buffer_size);
web_proxy.SetDataPath(FLAGS_data_dir.c_str());
- // Keep the web proxy alive past when we finish reading the logfile.
- reader.set_exit_on_finish(false);
-
if (FLAGS_monotonic_end_time > 0) {
event_loop->AddTimer([&web_proxy]() { web_proxy.StopRecording(); })
->Setup(aos::monotonic_clock::time_point(
@@ -70,4 +78,11 @@
}
reader.event_loop_factory()->Run();
+
+ // Keep the web proxy alive past when we finish reading the logfile, but crank
+ // down the replay rate so that we don't peg our entire CPU just trying to
+ // service timers in the web proxy code.
+ reader.set_exit_on_finish(false);
+ reader.SetRealtimeReplayRate(1.0);
+ reader.event_loop_factory()->Run();
}
diff --git a/aos/network/message_bridge_client_status.cc b/aos/network/message_bridge_client_status.cc
index aa2484d..f3511a0 100644
--- a/aos/network/message_bridge_client_status.cc
+++ b/aos/network/message_bridge_client_status.cc
@@ -219,12 +219,16 @@
filter->Sample(monotonic_delivered_time, offset);
}
-void MessageBridgeClientStatus::DisableStatistics() {
+void MessageBridgeClientStatus::DisableStatistics(bool destroy_sender) {
statistics_timer_->Disable();
send_ = false;
+ if (destroy_sender) {
+ sender_ = aos::Sender<ClientStatistics>();
+ }
}
void MessageBridgeClientStatus::EnableStatistics() {
+ CHECK(sender_.valid());
send_ = true;
statistics_timer_->Setup(event_loop_->monotonic_now() + kStatisticsPeriod,
kStatisticsPeriod);
diff --git a/aos/network/message_bridge_client_status.h b/aos/network/message_bridge_client_status.h
index 9c21169..033af95 100644
--- a/aos/network/message_bridge_client_status.h
+++ b/aos/network/message_bridge_client_status.h
@@ -54,7 +54,10 @@
void Connect(int client_index);
// Disables sending out any statistics messages.
- void DisableStatistics();
+ // If destroy_sender is set, will clear the ClientStatistics Sender.
+ // EnableStatistics cannot be called again if destroy_sender is set. This is
+ // used by the LogReader to enforce one-sender-per-channel checks.
+ void DisableStatistics(bool destroy_sender);
// Enables sending out any statistics messages.
void EnableStatistics();
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index c82158b..4f6abff 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -411,13 +411,19 @@
}
}
-void MessageBridgeServerStatus::DisableStatistics() {
+void MessageBridgeServerStatus::DisableStatistics(bool destroy_senders) {
send_ = false;
statistics_timer_->Disable();
+ if (destroy_senders) {
+ sender_ = aos::Sender<ServerStatistics>();
+ timestamp_sender_ = aos::Sender<Timestamp>();
+ }
}
void MessageBridgeServerStatus::EnableStatistics() {
send_ = true;
+ CHECK(sender_.valid());
+ CHECK(timestamp_sender_.valid());
statistics_timer_->Setup(event_loop_->monotonic_now() + kPingPeriod,
kPingPeriod);
}
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index feb1b05..327930c 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -73,7 +73,7 @@
}
// Disables sending out any statistics messages.
- void DisableStatistics();
+ void DisableStatistics(bool destroy_senders);
// Enables sending out any statistics messages.
void EnableStatistics();
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 81fc03f..5b63d18 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -182,7 +182,7 @@
constexpr double kMinNetworkDelay = 2.0;
const std::pair<NoncausalTimestampFilter::Pointer,
- std::pair<chrono::nanoseconds, double>>
+ std::tuple<chrono::nanoseconds, double, double>>
offset_error =
FLAGS_bounds_offset_error
? filter.filter->BoundsOffsetError(
@@ -198,8 +198,8 @@
filter.pointer = offset_error.first;
const std::pair<chrono::nanoseconds, double> error =
- std::make_pair(offset_error.second.first,
- offset_error.second.second - kMinNetworkDelay);
+ std::make_pair(std::get<0>(offset_error.second),
+ std::get<1>(offset_error.second) - kMinNetworkDelay);
std::pair<chrono::nanoseconds, double> grad;
double hess;
@@ -1856,6 +1856,7 @@
std::optional<std::tuple<logger::BootTimestamp, logger::BootDuration>>
result = next_node_filter->Consume();
CHECK(result);
+ WriteFilter(next_node_filter, *result);
next_node_filter->Pop(std::get<0>(*result) -
time_estimation_buffer_seconds_);
}
@@ -2061,17 +2062,22 @@
<< (last_monotonics_[i].time - result_times[i].time).count()
<< "ns";
}
+ UpdateSolution(std::move(result_times));
+ WriteFilter(next_filter, sample);
+ FlushAndClose(false);
LOG(FATAL)
<< "Found a solution before the last returned solution on node "
<< solution_node_index;
break;
case TimeComparison::kEq:
+ WriteFilter(next_filter, sample);
return NextTimestamp();
case TimeComparison::kInvalid: {
const chrono::nanoseconds invalid_distance =
InvalidDistance(last_monotonics_, result_times);
if (invalid_distance <=
chrono::nanoseconds(FLAGS_max_invalid_distance_ns)) {
+ WriteFilter(next_filter, sample);
return NextTimestamp();
}
LOG(INFO) << "Times can't be compared by " << invalid_distance.count()
@@ -2083,6 +2089,9 @@
<< (last_monotonics_[i].time - result_times[i].time).count()
<< "ns";
}
+ UpdateSolution(std::move(result_times));
+ WriteFilter(next_filter, sample);
+ FlushAndClose(false);
LOG(FATAL) << "Please investigate. Use --max_invalid_distance_ns="
<< invalid_distance.count() << " to ignore this.";
} break;
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index 0954984..253bb82 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -192,7 +192,7 @@
// TODO(austin): Pop here instead of in log reader.
void ObserveTimePassed(distributed_clock::time_point time) override;
- // Queues 1 more timestammp in the interpolation list. This is public for
+ // Queues 1 more timestamp in the interpolation list. This is public for
// timestamp_extractor so it can hammer on the log until everything is queued.
std::optional<const std::tuple<distributed_clock::time_point,
std::vector<logger::BootTimestamp>> *>
diff --git a/aos/network/multinode_timestamp_filter_test.cc b/aos/network/multinode_timestamp_filter_test.cc
index dab8e06..c078e28 100644
--- a/aos/network/multinode_timestamp_filter_test.cc
+++ b/aos/network/multinode_timestamp_filter_test.cc
@@ -393,16 +393,17 @@
// Confirm that the error is almost equal for both directions. The solution
// is an integer solution, so there will be a little bit of error left over.
- std::pair<chrono::nanoseconds, double> a_error =
+ std::tuple<chrono::nanoseconds, double, double> a_error =
a.OffsetError(nullptr, NoncausalTimestampFilter::Pointer(),
std::get<0>(result1)[0], 0.0, std::get<0>(result1)[1], 0.0)
.second;
- std::pair<chrono::nanoseconds, double> b_error =
+ std::tuple<chrono::nanoseconds, double, double> b_error =
b.OffsetError(nullptr, NoncausalTimestampFilter::Pointer(),
std::get<0>(result1)[1], 0.0, std::get<0>(result1)[0], 0.0)
.second;
- EXPECT_NEAR(static_cast<double>((a_error.first - b_error.first).count()) +
- (a_error.second - b_error.second),
+ EXPECT_NEAR(static_cast<double>(
+ (std::get<0>(a_error) - std::get<0>(b_error)).count()) +
+ (std::get<1>(a_error) - std::get<1>(b_error)),
0.0, 0.5);
}
diff --git a/aos/network/timestamp_channel.h b/aos/network/timestamp_channel.h
index e35a170..d95edda 100644
--- a/aos/network/timestamp_channel.h
+++ b/aos/network/timestamp_channel.h
@@ -59,6 +59,8 @@
aos::Sender<RemoteMessage> *SenderForChannel(const Channel *channel,
const Connection *connection);
+ void ClearSenderForChannel(const Channel *channel,
+ const Connection *connection);
private:
aos::EventLoop *event_loop_;
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index 8d1c9fe..99dd671 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -773,17 +773,17 @@
chrono::nanoseconds NoncausalTimestampFilter::ExtrapolateOffset(
std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p0,
monotonic_clock::time_point ta) {
- return ExtrapolateOffset(p0, ta, 0.0).first;
+ return std::get<0>(ExtrapolateOffset(p0, ta, 0.0));
}
chrono::nanoseconds NoncausalTimestampFilter::InterpolateOffset(
std::tuple<monotonic_clock::time_point, chrono::nanoseconds> p0,
std::tuple<monotonic_clock::time_point, chrono::nanoseconds> p1,
monotonic_clock::time_point ta) {
- return InterpolateOffset(p0, p1, ta, 0.0).first;
+ return std::get<0>(InterpolateOffset(p0, p1, ta, 0.0));
}
-std::pair<chrono::nanoseconds, double>
+std::tuple<chrono::nanoseconds, double, double>
NoncausalTimestampFilter::InterpolateOffset(
std::tuple<monotonic_clock::time_point, chrono::nanoseconds> p0,
std::tuple<monotonic_clock::time_point, chrono::nanoseconds> p1,
@@ -821,12 +821,12 @@
//
// We have good tests which confirm for small offsets this matches nicely. For
// large offsets, the 128 bit math will take care of us.
+ const double slope =
+ static_cast<double>(doffset.count()) / static_cast<double>(dt.count());
const double remainder =
static_cast<double>(numerator % absl::int128(dt.count())) / dt.count() +
- (numerator > 0 ? -0.5 : 0.5) +
- ta * static_cast<double>(doffset.count()) /
- static_cast<double>(dt.count());
- return std::make_pair(integer, remainder);
+ (numerator > 0 ? -0.5 : 0.5) + ta * slope;
+ return std::make_tuple(integer, remainder, slope);
}
chrono::nanoseconds NoncausalTimestampFilter::BoundOffset(
@@ -841,21 +841,22 @@
NoncausalTimestampFilter::ExtrapolateOffset(p1, ta));
}
-std::pair<chrono::nanoseconds, double> NoncausalTimestampFilter::BoundOffset(
+std::tuple<chrono::nanoseconds, double, double>
+NoncausalTimestampFilter::BoundOffset(
std::tuple<monotonic_clock::time_point, chrono::nanoseconds> p0,
std::tuple<monotonic_clock::time_point, chrono::nanoseconds> p1,
monotonic_clock::time_point ta_base, double ta) {
DCHECK_GE(ta, 0.0);
DCHECK_LT(ta, 1.0);
- const std::pair<chrono::nanoseconds, double> o0 =
+ const std::tuple<chrono::nanoseconds, double, double> o0 =
NoncausalTimestampFilter::ExtrapolateOffset(p0, ta_base, ta);
- const std::pair<chrono::nanoseconds, double> o1 =
+ const std::tuple<chrono::nanoseconds, double, double> o1 =
NoncausalTimestampFilter::ExtrapolateOffset(p1, ta_base, ta);
// Want to calculate max(o0 + o0r, o1 + o1r) without precision problems.
- if (static_cast<double>((o0.first - o1.first).count()) >
- o1.second - o0.second) {
+ if (static_cast<double>((std::get<0>(o0) - std::get<0>(o1)).count()) >
+ std::get<1>(o1) - std::get<1>(o0)) {
// Ok, o0 is now > o1. We want the max, so return o0.
return o0;
} else {
@@ -863,7 +864,7 @@
}
}
-std::pair<chrono::nanoseconds, double>
+std::tuple<chrono::nanoseconds, double, double>
NoncausalTimestampFilter::ExtrapolateOffset(
std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p0,
monotonic_clock::time_point ta_base, double ta) {
@@ -892,12 +893,13 @@
const absl::int128 numerator =
(absl::int128(dt.count() - MaxVelocityRatio::den / 2) *
absl::int128(MaxVelocityRatio::num));
- return std::make_pair(
+ return std::make_tuple(
std::get<1>(p0) + chrono::nanoseconds(static_cast<int64_t>(
numerator / absl::int128(MaxVelocityRatio::den))),
static_cast<double>(numerator % absl::int128(MaxVelocityRatio::den)) /
static_cast<double>(MaxVelocityRatio::den) +
- 0.5 + ta * kMaxVelocity());
+ 0.5 + ta * kMaxVelocity(),
+ kMaxVelocity());
} else {
// Extrapolate forwards, using the (negative) MaxVelocity slope
// Same concept, except going foward past our last (most recent) sample:
@@ -913,12 +915,13 @@
const absl::int128 numerator =
absl::int128(dt.count() + MaxVelocityRatio::den / 2) *
absl::int128(MaxVelocityRatio::num);
- return std::make_pair(
+ return std::make_tuple(
std::get<1>(p0) - chrono::nanoseconds(static_cast<int64_t>(
numerator / absl::int128(MaxVelocityRatio::den))),
-static_cast<double>(numerator % absl::int128(MaxVelocityRatio::den)) /
static_cast<double>(MaxVelocityRatio::den) +
- 0.5 - ta * kMaxVelocity());
+ 0.5 - ta * kMaxVelocity(),
+ -kMaxVelocity());
}
}
@@ -946,7 +949,7 @@
points.second.first, points.second.second, ta));
}
-std::pair<Pointer, std::pair<chrono::nanoseconds, double>>
+std::pair<Pointer, std::tuple<chrono::nanoseconds, double, double>>
NoncausalTimestampFilter::SingleFilter::Offset(
const SingleFilter *other, Pointer pointer,
monotonic_clock::time_point ta_base, double ta) const {
@@ -976,7 +979,7 @@
points.second.first, points.second.second, ta_base, ta));
}
-std::pair<Pointer, std::pair<chrono::nanoseconds, double>>
+std::pair<Pointer, std::tuple<chrono::nanoseconds, double, double>>
NoncausalTimestampFilter::SingleFilter::BoundsOffset(
const SingleFilter *other, Pointer pointer,
monotonic_clock::time_point ta_base, double ta) const {
@@ -1005,7 +1008,7 @@
points.second.second, ta_base, ta));
}
-std::pair<Pointer, std::pair<chrono::nanoseconds, double>>
+std::pair<Pointer, std::tuple<chrono::nanoseconds, double, double>>
NoncausalTimestampFilter::SingleFilter::OffsetError(
const SingleFilter *other, Pointer pointer,
aos::monotonic_clock::time_point ta_base, double ta,
@@ -1013,17 +1016,19 @@
NormalizeTimestamps(&ta_base, &ta);
NormalizeTimestamps(&tb_base, &tb);
- const std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> offset =
- Offset(other, pointer, ta_base, ta);
+ const std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+ offset = Offset(other, pointer, ta_base, ta);
// Compute the integer portion first, and the double portion second. Subtract
// the results of each. This handles large offsets without losing precision.
return std::make_pair(
- offset.first, std::make_pair(((tb_base - ta_base) - offset.second.first),
- (tb - ta) - offset.second.second));
+ offset.first,
+ std::make_tuple(((tb_base - ta_base) - std::get<0>(offset.second)),
+ (tb - ta) - std::get<1>(offset.second),
+ std::get<2>(offset.second)));
}
-std::pair<Pointer, std::pair<chrono::nanoseconds, double>>
+std::pair<Pointer, std::tuple<chrono::nanoseconds, double, double>>
NoncausalTimestampFilter::SingleFilter::BoundsOffsetError(
const SingleFilter *other, Pointer pointer,
aos::monotonic_clock::time_point ta_base, double ta,
@@ -1031,14 +1036,16 @@
NormalizeTimestamps(&ta_base, &ta);
NormalizeTimestamps(&tb_base, &tb);
- const std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> offset =
- BoundsOffset(other, pointer, ta_base, ta);
+ const std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+ offset = BoundsOffset(other, pointer, ta_base, ta);
// Compute the integer portion first, and the double portion second. Subtract
// the results of each. This handles large offsets without losing precision.
return std::make_pair(
- offset.first, std::make_pair((tb_base - ta_base) - offset.second.first,
- (tb - ta) - offset.second.second));
+ offset.first,
+ std::make_tuple((tb_base - ta_base) - std::get<0>(offset.second),
+ (tb - ta) - std::get<1>(offset.second),
+ std::get<2>(offset.second)));
}
std::string NoncausalTimestampFilter::DebugOffsetError(
@@ -1128,22 +1135,24 @@
auto reference_timestamp = GetReferenceTimestamp(ta_base, ta);
// Special case size = 1 or ta before first timestamp, so we extrapolate
- const std::pair<chrono::nanoseconds, double> offset =
+ const std::tuple<chrono::nanoseconds, double, double> offset =
NoncausalTimestampFilter::ExtrapolateOffset(reference_timestamp.second,
ta_base, ta);
// We want to do offset + ta > tb, but we need to do it with minimal
// numerical precision problems.
// See below for why this is a >=
- if (static_cast<double>((offset.first + ta_base - tb_base).count()) >=
- tb - ta - offset.second) {
+ if (static_cast<double>(
+ (std::get<0>(offset) + ta_base - tb_base).count()) >=
+ tb - ta - std::get<1>(offset)) {
LOG(ERROR) << node_names_ << " "
- << TimeString(ta_base, ta, offset.first, offset.second)
+ << TimeString(ta_base, ta, std::get<0>(offset),
+ std::get<1>(offset))
<< " > solution time "
<< tb_base + chrono::nanoseconds(
static_cast<int64_t>(std::round(tb)))
<< ", " << tb - std::round(tb) << " foo";
- LOG(INFO) << "Remainder " << offset.second;
+ LOG(INFO) << "Remainder " << std::get<1>(offset);
return false;
}
return true;
@@ -1159,14 +1168,15 @@
std::pair<std::tuple<monotonic_clock::time_point, chrono::nanoseconds>,
std::tuple<monotonic_clock::time_point, chrono::nanoseconds>>>
points = FindTimestamps(other, false, pointer, ta_base, ta);
- const std::pair<chrono::nanoseconds, double> offset =
+ const std::tuple<chrono::nanoseconds, double, double> offset =
NoncausalTimestampFilter::BoundOffset(points.second.first,
points.second.second, ta_base, ta);
// See below for why this is a >=
- if (static_cast<double>((offset.first + ta_base - tb_base).count()) >=
- tb - offset.second - ta) {
+ if (static_cast<double>((std::get<0>(offset) + ta_base - tb_base).count()) >=
+ tb - std::get<1>(offset) - ta) {
LOG(ERROR) << node_names_ << " "
- << TimeString(ta_base, ta, offset.first, offset.second)
+ << TimeString(ta_base, ta, std::get<0>(offset),
+ std::get<1>(offset))
<< " > solution time " << tb_base << ", " << tb;
LOG(ERROR) << "Bracketing times are " << TimeString(points.second.first)
<< " and " << TimeString(points.second.second);
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 2b5ff47..5aaa7fd 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -311,29 +311,31 @@
other_points_;
};
- // Returns the error between the offset in the provided timestamps, and the
- // offset at ta. Also returns a pointer to the timestamps used for the
- // lookup to be passed back in again for a more efficient second lookup.
- std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> OffsetError(
- const NoncausalTimestampFilter *other, Pointer pointer,
- logger::BootTimestamp ta_base, double ta, logger::BootTimestamp tb_base,
- double tb) const {
+ // Returns the error between the offset in the provided timestamps, the
+ // offset at ta, and d error/dta. Also returns a pointer to the timestamps
+ // used for the lookup to be passed back in again for a more efficient second
+ // lookup.
+ std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+ OffsetError(const NoncausalTimestampFilter *other, Pointer pointer,
+ logger::BootTimestamp ta_base, double ta,
+ logger::BootTimestamp tb_base, double tb) const {
const BootFilter *boot_filter = filter(pointer, ta_base.boot, tb_base.boot);
const SingleFilter *other_filter =
other == nullptr
? nullptr
: other->maybe_single_filter(tb_base.boot, ta_base.boot);
- std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> result =
- boot_filter->filter.OffsetError(other_filter, pointer, ta_base.time, ta,
- tb_base.time, tb);
+ std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+ result = boot_filter->filter.OffsetError(
+ other_filter, pointer, ta_base.time, ta, tb_base.time, tb);
result.first.boot_filter_ = boot_filter;
return result;
}
- // Returns the error between the offset in the provided timestamps, and the
- // bounds offset at ta. Also returns a pointer to the timestamps used for the
- // lookup to be passed back in again for a more efficient second lookup.
- std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>>
+ // Returns the error between the offset in the provided timestamps, the
+ // bounds offset at ta, and d error/dta. Also returns a pointer to the
+ // timestamps used for the lookup to be passed back in again for a more
+ // efficient second lookup.
+ std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
BoundsOffsetError(const NoncausalTimestampFilter *other, Pointer pointer,
logger::BootTimestamp ta_base, double ta,
logger::BootTimestamp tb_base, double tb) const {
@@ -342,8 +344,8 @@
other == nullptr
? nullptr
: other->maybe_single_filter(tb_base.boot, ta_base.boot);
- std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> result =
- boot_filter->filter.BoundsOffsetError(
+ std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+ result = boot_filter->filter.BoundsOffsetError(
other_filter, pointer, ta_base.time, ta, tb_base.time, tb);
result.first.boot_filter_ = boot_filter;
return result;
@@ -573,17 +575,17 @@
std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p0,
monotonic_clock::time_point ta);
- static std::pair<std::chrono::nanoseconds, double> InterpolateOffset(
+ static std::tuple<std::chrono::nanoseconds, double, double> InterpolateOffset(
std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p0,
std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p1,
monotonic_clock::time_point ta_base, double ta);
- static std::pair<std::chrono::nanoseconds, double> BoundOffset(
+ static std::tuple<std::chrono::nanoseconds, double, double> BoundOffset(
std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p0,
std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p1,
monotonic_clock::time_point ta_base, double ta);
- static std::pair<std::chrono::nanoseconds, double> ExtrapolateOffset(
+ static std::tuple<std::chrono::nanoseconds, double, double> ExtrapolateOffset(
std::tuple<monotonic_clock::time_point, std::chrono::nanoseconds> p0,
monotonic_clock::time_point ta_base, double ta);
@@ -634,20 +636,20 @@
std::pair<Pointer, std::chrono::nanoseconds> Offset(
const SingleFilter *other, Pointer pointer,
monotonic_clock::time_point ta) const;
- std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> Offset(
- const SingleFilter *other, Pointer pointer,
- monotonic_clock::time_point ta_base, double ta) const;
+ std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+ Offset(const SingleFilter *other, Pointer pointer,
+ monotonic_clock::time_point ta_base, double ta) const;
- std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>>
+ std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
BoundsOffset(const SingleFilter *other, Pointer pointer,
monotonic_clock::time_point ta_base, double ta) const;
- std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> OffsetError(
- const SingleFilter *other, Pointer pointer,
- aos::monotonic_clock::time_point ta_base, double ta,
- aos::monotonic_clock::time_point tb_base, double tb) const;
+ std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+ OffsetError(const SingleFilter *other, Pointer pointer,
+ aos::monotonic_clock::time_point ta_base, double ta,
+ aos::monotonic_clock::time_point tb_base, double tb) const;
- std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>>
+ std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
BoundsOffsetError(const SingleFilter *other, Pointer pointer,
aos::monotonic_clock::time_point ta_base, double ta,
aos::monotonic_clock::time_point tb_base,
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
index 5d8eeeb..adb6172 100644
--- a/aos/network/timestamp_filter_test.cc
+++ b/aos/network/timestamp_filter_test.cc
@@ -48,34 +48,36 @@
.second};
}
- std::pair<logger::BootDuration, double> Offset(
+ std::tuple<logger::BootDuration, double, double> Offset(
const TestingNoncausalTimestampFilter *other, Pointer pointer,
logger::BootTimestamp ta_base, double ta, size_t sample_boot) const {
- std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> result =
- filter(ta_base.boot, sample_boot)
- ->filter.Offset(
- other == nullptr
- ? nullptr
- : &other->filter(sample_boot, ta_base.boot)->filter,
- pointer, ta_base.time, ta);
- return std::make_pair(
- logger::BootDuration{sample_boot, result.second.first},
- result.second.second);
+ std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+ result =
+ filter(ta_base.boot, sample_boot)
+ ->filter.Offset(
+ other == nullptr
+ ? nullptr
+ : &other->filter(sample_boot, ta_base.boot)->filter,
+ pointer, ta_base.time, ta);
+ return std::make_tuple(
+ logger::BootDuration{sample_boot, std::get<0>(result.second)},
+ std::get<1>(result.second), std::get<2>(result.second));
}
- std::pair<logger::BootDuration, double> BoundsOffset(
+ std::tuple<logger::BootDuration, double, double> BoundsOffset(
const TestingNoncausalTimestampFilter *other, Pointer pointer,
logger::BootTimestamp ta_base, double ta, size_t sample_boot) const {
- std::pair<Pointer, std::pair<std::chrono::nanoseconds, double>> result =
- filter(ta_base.boot, sample_boot)
- ->filter.BoundsOffset(
- other == nullptr
- ? nullptr
- : &other->filter(sample_boot, ta_base.boot)->filter,
- pointer, ta_base.time, ta);
- return std::make_pair(
- logger::BootDuration{sample_boot, result.second.first},
- result.second.second);
+ std::pair<Pointer, std::tuple<std::chrono::nanoseconds, double, double>>
+ result =
+ filter(ta_base.boot, sample_boot)
+ ->filter.BoundsOffset(
+ other == nullptr
+ ? nullptr
+ : &other->filter(sample_boot, ta_base.boot)->filter,
+ pointer, ta_base.time, ta);
+ return std::make_tuple(
+ logger::BootDuration{sample_boot, std::get<0>(result.second)},
+ std::get<1>(result.second), std::get<2>(result.second));
}
};
@@ -866,19 +868,21 @@
const monotonic_clock::time_point t2 = t1 + chrono::nanoseconds(1000);
const chrono::nanoseconds o2 = chrono::nanoseconds(150);
+ const double slope = 50.0 / 1000.0;
+
EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2), t1),
o1);
EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2), t1, 0.0),
- std::make_pair(o1, 0.0));
+ std::make_tuple(o1, 0.0, slope));
EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2), t2),
o2);
EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2), t2, 0.0),
- std::make_pair(o2, 0.0));
+ std::make_tuple(o2, 0.0, slope));
EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2),
@@ -887,7 +891,7 @@
EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2),
t1 + chrono::nanoseconds(500), 0.0),
- std::make_pair(o1 + chrono::nanoseconds(25), 0.0));
+ std::make_tuple(o1 + chrono::nanoseconds(25), 0.0, slope));
EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2),
@@ -896,7 +900,7 @@
EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2),
t1 - chrono::nanoseconds(200), 0.0),
- std::make_pair(o1 - chrono::nanoseconds(10), 0.0));
+ std::make_tuple(o1 - chrono::nanoseconds(10), 0.0, slope));
EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2),
@@ -905,7 +909,7 @@
EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2),
t1 + chrono::nanoseconds(200), 0.0),
- std::make_pair(o1 + chrono::nanoseconds(10), 0.0));
+ std::make_tuple(o1 + chrono::nanoseconds(10), 0.0, slope));
EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2),
@@ -914,7 +918,7 @@
EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2),
t1 + chrono::nanoseconds(800), 0.0),
- std::make_pair(o1 + chrono::nanoseconds(40), 0.0));
+ std::make_tuple(o1 + chrono::nanoseconds(40), 0.0, slope));
EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2),
@@ -923,7 +927,7 @@
EXPECT_EQ(NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2),
t1 + chrono::nanoseconds(1200), 0.0),
- std::make_pair(o1 + chrono::nanoseconds(60), 0.0));
+ std::make_tuple(o1 + chrono::nanoseconds(60), 0.0, slope));
for (int i = -MaxVelocityRatio::den * MaxVelocityRatio::num * 6;
i <
@@ -941,18 +945,19 @@
NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2), ta_base);
- const std::pair<chrono::nanoseconds, double> offset =
+ const std::tuple<chrono::nanoseconds, double, double> offset =
NoncausalTimestampFilter::InterpolateOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2), ta_base, ta);
- EXPECT_EQ(expected_offset, offset.first);
+ EXPECT_EQ(expected_offset, std::get<0>(offset));
const double expected_double_offset =
static_cast<double>(o1.count()) +
static_cast<double>(ta_orig) / static_cast<double>((t2 - t1).count()) *
(o2 - o1).count();
- EXPECT_NEAR(static_cast<double>(offset.first.count()) + offset.second,
- expected_double_offset, 1e-9)
+ EXPECT_NEAR(
+ static_cast<double>(std::get<0>(offset).count()) + std::get<1>(offset),
+ expected_double_offset, 1e-9)
<< ": i " << i << " t " << ta_base << " " << ta << " t1 " << t1
<< " o1 " << o1.count() << "ns t2 " << t2 << " o2 " << o2.count()
<< "ns Non-rounded: " << expected_offset.count() << "ns";
@@ -999,14 +1004,14 @@
// Test base + double version
EXPECT_EQ(NoncausalTimestampFilter::ExtrapolateOffset(std::make_tuple(t1, o1),
e, 0.),
- std::make_pair(chrono::nanoseconds(90), 0.0));
+ std::make_tuple(chrono::nanoseconds(90), 0.0, kMaxVelocity()));
EXPECT_EQ(NoncausalTimestampFilter::ExtrapolateOffset(std::make_tuple(t1, o1),
t1, 0.),
- std::make_pair(o1, 0.0));
+ std::make_tuple(o1, 0.0, -kMaxVelocity()));
EXPECT_EQ(NoncausalTimestampFilter::ExtrapolateOffset(std::make_tuple(t1, o1),
t1, 0.5),
- std::make_pair(o1, -0.5 * kMaxVelocity()));
+ std::make_tuple(o1, -0.5 * kMaxVelocity(), -kMaxVelocity()));
EXPECT_EQ(
NoncausalTimestampFilter::ExtrapolateOffset(std::make_tuple(t2, o2), t2),
@@ -1014,7 +1019,7 @@
EXPECT_EQ(NoncausalTimestampFilter::ExtrapolateOffset(std::make_tuple(t2, o2),
t2, 0.0),
- std::make_pair(o2, 0.0));
+ std::make_tuple(o2, 0.0, -kMaxVelocity()));
// Test points past our last sample
EXPECT_EQ(NoncausalTimestampFilter::ExtrapolateOffset(
@@ -1022,10 +1027,10 @@
chrono::nanoseconds(
static_cast<int64_t>(o2.count() - 10000. * kMaxVelocity())));
- EXPECT_EQ(
- NoncausalTimestampFilter::ExtrapolateOffset(
- std::make_tuple(t2, o2), t2 + chrono::nanoseconds(10000), 0.5),
- std::make_pair(o2 - chrono::nanoseconds(10), -0.5 * kMaxVelocity()));
+ EXPECT_EQ(NoncausalTimestampFilter::ExtrapolateOffset(
+ std::make_tuple(t2, o2), t2 + chrono::nanoseconds(10000), 0.5),
+ std::make_tuple(o2 - chrono::nanoseconds(10), -0.5 * kMaxVelocity(),
+ -kMaxVelocity()));
// Now, test that offset + remainder functions add up to the right answer for
// a lot of cases. This is enough to catch all the various rounding cases.
@@ -1043,13 +1048,13 @@
NoncausalTimestampFilter::ExtrapolateOffset(std::make_tuple(t1, o1),
ta_base);
- std::pair<chrono::nanoseconds, double> offset =
+ std::tuple<chrono::nanoseconds, double, double> offset =
NoncausalTimestampFilter::ExtrapolateOffset(std::make_tuple(t1, o1),
ta_base, ta);
- EXPECT_EQ(expected_offset, offset.first);
+ EXPECT_EQ(expected_offset, std::get<0>(offset));
EXPECT_NEAR(
- static_cast<double>(offset.first.count()) + offset.second,
+ static_cast<double>(std::get<0>(offset).count()) + std::get<1>(offset),
static_cast<double>(o1.count()) - std::abs(ta_orig) * kMaxVelocity(),
1e-9)
<< ": i " << i << " t " << ta_base << " " << ta
@@ -1072,14 +1077,14 @@
o1);
EXPECT_EQ(NoncausalTimestampFilter::BoundOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2), t1, 0.0),
- std::pair(o1, 0.0));
+ std::tuple(o1, 0.0, -kMaxVelocity()));
EXPECT_EQ(NoncausalTimestampFilter::BoundOffset(std::make_tuple(t1, o1),
std::make_tuple(t2, o2), t2),
o2);
EXPECT_EQ(NoncausalTimestampFilter::BoundOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2), t2, 0.0),
- std::pair(o2, 0.0));
+ std::tuple(o2, 0.0, -kMaxVelocity()));
// Iterate from before t1 to after t2 and confirm that the solution is right.
// We must always be >= than interpolation, and must also be equal to the max
@@ -1114,19 +1119,21 @@
// / \/ \ |
// / \ |
- const std::pair<chrono::nanoseconds, double> offset =
+ const std::tuple<chrono::nanoseconds, double, double> offset =
NoncausalTimestampFilter::BoundOffset(
std::make_tuple(t1, o1), std::make_tuple(t2, o2), ta_base, ta);
- EXPECT_EQ(std::max(expected_offset_1, expected_offset_2), offset.first);
+ EXPECT_EQ(std::max(expected_offset_1, expected_offset_2),
+ std::get<0>(offset));
const double expected_double_offset = std::max(
static_cast<double>(o1.count()) - std::abs(ta_orig) * kMaxVelocity(),
static_cast<double>(o2.count()) -
std::abs(ta_orig - (t2 - t1).count()) * kMaxVelocity());
- EXPECT_NEAR(static_cast<double>(offset.first.count()) + offset.second,
- expected_double_offset, 1e-9)
+ EXPECT_NEAR(
+ static_cast<double>(std::get<0>(offset).count()) + std::get<1>(offset),
+ expected_double_offset, 1e-9)
<< ": i " << i << " t " << ta_base << " " << ta << " t1 " << t1
<< " o1 " << o1.count() << "ns t2 " << t2 << " o2 " << o2.count()
<< "ns Non-rounded: "
@@ -1468,9 +1475,15 @@
const BootTimestamp t2 = e + chrono::microseconds(2000);
const BootDuration o2{0, chrono::nanoseconds(150)};
+ const double slope12 = static_cast<double>((o2 - o1).duration.count()) /
+ static_cast<double>((t2 - t1).duration.count());
+
const BootTimestamp t3 = e + chrono::microseconds(3000);
const BootDuration o3{0, chrono::nanoseconds(50)};
+ const double slope23 = static_cast<double>((o3 - o2).duration.count()) /
+ static_cast<double>((t3 - t2).duration.count());
+
const BootTimestamp t4 = e + chrono::microseconds(4000);
TestingNoncausalTimestampFilter filter(node_a, node_b);
@@ -1480,87 +1493,88 @@
// 1 point is handled properly.
EXPECT_EQ(filter.Offset(nullptr, Pointer(), t1, 0), o1);
EXPECT_EQ(filter.Offset(nullptr, Pointer(), t1, 0.0, 0),
- std::make_pair(o1, 0.0));
+ std::make_tuple(o1, 0.0, -kMaxVelocity()));
EXPECT_EQ(filter.BoundsOffset(nullptr, Pointer(), t1, 0.0, 0),
- std::make_pair(o1, 0.0));
+ std::make_tuple(o1, 0.0, -kMaxVelocity()));
// Check if we ask for something away from point that we get an offset
// based on the MaxVelocity allowed
const double offset_pre = -(t1.time - e.time).count() * kMaxVelocity();
EXPECT_EQ(filter.Offset(nullptr, Pointer(), e, 0),
o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)));
- EXPECT_EQ(
- filter.Offset(nullptr, Pointer(), e, 0.0, 0),
- std::make_pair(o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)),
- 0.0));
- EXPECT_EQ(
- filter.BoundsOffset(nullptr, Pointer(), e, 0.0, 0),
- std::make_pair(o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)),
- 0.0));
+ EXPECT_EQ(filter.Offset(nullptr, Pointer(), e, 0.0, 0),
+ std::make_tuple(
+ o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)), 0.0,
+ kMaxVelocity()));
+ EXPECT_EQ(filter.BoundsOffset(nullptr, Pointer(), e, 0.0, 0),
+ std::make_tuple(
+ o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)), 0.0,
+ kMaxVelocity()));
double offset_post = -(t2.time - t1.time).count() * kMaxVelocity();
EXPECT_EQ(filter.Offset(nullptr, Pointer(), t2, 0),
o1 + chrono::nanoseconds(static_cast<int64_t>(offset_post)));
- EXPECT_EQ(
- filter.Offset(nullptr, Pointer(), t2, 0.0, 0),
- std::make_pair(
- o1 + chrono::nanoseconds(static_cast<int64_t>(offset_post)), 0.0));
- EXPECT_EQ(
- filter.BoundsOffset(nullptr, Pointer(), t2, 0.0, 0),
- std::make_pair(
- o1 + chrono::nanoseconds(static_cast<int64_t>(offset_post)), 0.0));
+ EXPECT_EQ(filter.Offset(nullptr, Pointer(), t2, 0.0, 0),
+ std::make_tuple(
+ o1 + chrono::nanoseconds(static_cast<int64_t>(offset_post)),
+ 0.0, -kMaxVelocity()));
+ EXPECT_EQ(filter.BoundsOffset(nullptr, Pointer(), t2, 0.0, 0),
+ std::make_tuple(
+ o1 + chrono::nanoseconds(static_cast<int64_t>(offset_post)),
+ 0.0, -kMaxVelocity()));
filter.Sample(t2, o2);
filter.Sample(t3, o3);
EXPECT_EQ(filter.Offset(nullptr, Pointer(), t1, 0), o1);
EXPECT_EQ(filter.BoundsOffset(nullptr, Pointer(), t1, 0.0, 0),
- std::make_pair(o1, 0.0));
+ std::make_tuple(o1, 0.0, -kMaxVelocity()));
EXPECT_EQ(filter.Offset(nullptr, Pointer(), t2, 0), o2);
EXPECT_EQ(filter.BoundsOffset(nullptr, Pointer(), t2, 0.0, 0),
- std::make_pair(o2, 0.0));
+ std::make_tuple(o2, 0.0, -kMaxVelocity()));
EXPECT_EQ(filter.Offset(nullptr, Pointer(), t3, 0), o3);
EXPECT_EQ(filter.BoundsOffset(nullptr, Pointer(), t3, 0.0, 0),
- std::make_pair(o3, 0.0));
+ std::make_tuple(o3, 0.0, -kMaxVelocity()));
EXPECT_EQ(filter.Offset(nullptr, Pointer(), t1, 0.0, 0),
- std::make_pair(o1, 0.0));
+ std::make_tuple(o1, 0.0, slope12));
EXPECT_EQ(
filter.Offset(nullptr, Pointer(),
e + (t2.time_since_epoch() + t1.time_since_epoch()) / 2,
0.0, 0),
- std::make_pair(o1 + (o2 - o1) / 2, 0.0));
+ std::make_tuple(o1 + (o2 - o1) / 2, 0.0, slope12));
EXPECT_EQ(filter.Offset(nullptr, Pointer(), t2, 0.0, 0),
- std::make_pair(o2, 0.0));
+ std::make_tuple(o2, 0.0, slope23));
EXPECT_EQ(
filter.Offset(nullptr, Pointer(),
e + (t2.time_since_epoch() + t3.time_since_epoch()) / 2,
0.0, 0),
- std::make_pair((o2 + o3) / 2, 0.0));
+ std::make_tuple((o2 + o3) / 2, 0.0, slope23));
EXPECT_EQ(filter.Offset(nullptr, Pointer(), t3, 0.0, 0),
- std::make_pair(o3, 0.0));
+ std::make_tuple(o3, 0.0, -kMaxVelocity()));
// Check that we still get same answer for times before our sample data...
EXPECT_EQ(filter.Offset(nullptr, Pointer(), e, 0),
o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)));
- EXPECT_EQ(
- filter.Offset(nullptr, Pointer(), e, 0.0, 0),
- std::make_pair(o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)),
- 0.0));
+ EXPECT_EQ(filter.Offset(nullptr, Pointer(), e, 0.0, 0),
+ std::make_tuple(
+ o1 + chrono::nanoseconds(static_cast<int64_t>(offset_pre)), 0.0,
+ kMaxVelocity()));
// ... and after
offset_post = -(t4.time - t3.time).count() * kMaxVelocity();
EXPECT_EQ(filter.Offset(nullptr, Pointer(), t4, 0),
(o3 + chrono::nanoseconds(static_cast<int64_t>(offset_post))));
- EXPECT_EQ(
- filter.Offset(nullptr, Pointer(), t4, 0.0, 0),
- std::make_pair(
- o3 + chrono::nanoseconds(static_cast<int64_t>(offset_post)), 0.0));
+ EXPECT_EQ(filter.Offset(nullptr, Pointer(), t4, 0.0, 0),
+ std::make_tuple(
+ o3 + chrono::nanoseconds(static_cast<int64_t>(offset_post)),
+ 0.0, -kMaxVelocity()));
- EXPECT_EQ(filter.BoundsOffset(nullptr, Pointer(), t2 + (t3 - t2) / 2, 0.0, 0),
- std::make_pair(o2 - chrono::nanoseconds(500), 0.0));
+ EXPECT_EQ(
+ filter.BoundsOffset(nullptr, Pointer(), t2 + (t3 - t2) / 2, 0.0, 0),
+ std::make_tuple(o2 - chrono::nanoseconds(500), 0.0, -kMaxVelocity()));
}
// Tests that adding duplicates gets correctly deduplicated.
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index 0c1d1dc..2b57c05 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -77,6 +77,8 @@
int per_channel_buffer_size_bytes);
WebProxy(aos::ShmEventLoop *event_loop, StoreHistory store_history,
int per_channel_buffer_size_bytes);
+ WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
+ StoreHistory store_history, int per_channel_buffer_size_bytes);
~WebProxy();
void SetDataPath(const char *path) { server_.setStaticPath(path); }
@@ -85,9 +87,6 @@
void StopRecording();
private:
- WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
- StoreHistory store_history, int per_channel_buffer_size_bytes);
-
aos::internal::EPoll internal_epoll_;
aos::internal::EPoll *const epoll_;
::seasocks::Server server_;
diff --git a/aos/util/BUILD b/aos/util/BUILD
index faa3dc1..bc62f87 100644
--- a/aos/util/BUILD
+++ b/aos/util/BUILD
@@ -385,3 +385,24 @@
visibility = ["//visibility:public"],
deps = ["//aos:python_init"],
)
+
+cc_library(
+ name = "threaded_queue",
+ hdrs = [
+ "threaded_queue.h",
+ "threaded_queue_tmpl.h",
+ ],
+ deps = [
+ "//aos:condition",
+ "//aos/mutex",
+ ],
+)
+
+cc_test(
+ name = "threaded_queue_test",
+ srcs = ["threaded_queue_test.cc"],
+ deps = [
+ ":threaded_queue",
+ "//aos/testing:googletest",
+ ],
+)
diff --git a/aos/util/bitpacking_test.cc b/aos/util/bitpacking_test.cc
index 80d1977..087c719 100644
--- a/aos/util/bitpacking_test.cc
+++ b/aos/util/bitpacking_test.cc
@@ -1,5 +1,6 @@
#include "aos/util/bitpacking.h"
+#include <array>
#include <cstdint>
#include "gtest/gtest.h"
diff --git a/aos/util/threaded_queue.h b/aos/util/threaded_queue.h
new file mode 100644
index 0000000..0b06632
--- /dev/null
+++ b/aos/util/threaded_queue.h
@@ -0,0 +1,99 @@
+#ifndef AOS_UTIL_THREADED_QUEUE_H_
+#define AOS_UTIL_THREADED_QUEUE_H_
+#include <functional>
+#include <optional>
+#include <queue>
+#include <thread>
+
+#include "aos/condition.h"
+#include "aos/mutex/mutex.h"
+
+namespace aos::util {
+// This class implements a queue of objects of type T in which a worker thread
+// pushes and the calling thread pops from the queue.
+//
+// This is setup such that the user will pass a worker function that will get
+// called in a separate thread whenever we think that there might be something
+// to trigger more work to be done. All the methods on this calss are intended
+// to be called from the main thread (not from within the worker function).
+// Communication between the main thread and the worker can be achieved either
+// by manually handling your own state, or preferably by using the SharedState
+// object, which will get passed into the worker. The worker gets called every
+// time that SetState() is called.
+template <typename T, typename SharedState>
+class ThreadedQueue {
+ public:
+ // PushResult is returned from the worker to indicate its current state.
+ struct PushResult {
+ // The new item to push, if any. If set to nullopt, nothing gets pushed.
+ std::optional<T> item = std::nullopt;
+ // Set to true if the worker knows that there is more work that it has to do
+ // and so should be immediately called again. If set to false, then the
+ // worker will not get called again until one of the following events
+ // occurs:
+ // 1) The queue is successfully popped from.
+ // 2) SetState() is called.
+ bool more_to_push = false;
+ // Set to true if there is no more work to be done. The worker should not
+ // get called after setting done to true.
+ bool done = false;
+ };
+ ThreadedQueue(
+ std::function<PushResult(SharedState)> push_request_handler,
+ SharedState initial_state);
+ ~ThreadedQueue();
+ // Sets state. Triggers a new call to push_request_handler.
+ void SetState(const SharedState &state);
+ // Returns the current front of the queue, blocking until a new item is
+ // available. Will only return nullopt if done is true and there will be no
+ // more items in the queue.
+ std::optional<T> Peek();
+ // Identical to Peek(), except that it also removes the item from the queue.
+ std::optional<T> Pop();
+ // Waits until the push_request_handler has returned more_to_push = false, and
+ // so is just spinning. Useful if you want ensure that the worker has done all
+ // the work it can before going to the next step.
+ void WaitForNoMoreWork();
+ // Stops any further calls to push_request_handler. Used to terminate the
+ // queue from the calling thread.
+ void StopPushing();
+
+ private:
+ // Safely grabs the current state and returns a copy.
+ SharedState State();
+ // Implements the Peek()/Pop() methods, blocking until we are either done or
+ // the next item is available in the queue.
+ std::optional<T> PeekOrPop(bool pop);
+
+ // Mutex controlling access to all shared state (in this case, all the members
+ // of this class).
+ aos::Mutex mutex_;
+ // Condition variable used to indicate when anything has happened that should
+ // cause us to check for whether the worker can add anything new to the queue.
+ // Called "popped_" in reference to that it may be called when an item is
+ // popped from the queue.
+ aos::Condition popped_;
+ // Condition variable to indicate when an item has either been pushed to the
+ // queue or there is some other state change that consumers may care about
+ // (e.g., being done).
+ aos::Condition pushed_;
+ // TODO(jkuszmaul): Evaluate impact of dynamic memory allocation in
+ // std::queue, consider using aos::RingBuffer or similarly statically
+ // allocated buffer.
+ std::queue<T> queue_;
+ // Whether we are done processing entirely.
+ bool done_{false};
+ // Set while the pusher thread is waiting on popped_.
+ // Used to notice when the push handler is out of work to do.
+ bool pusher_waiting_{false};
+ // Set when SetState() is called, cleared when State() is read. Used to track
+ // whether the push handler has been called with the most recent state.
+ bool state_updated_{false};
+ SharedState state_;
+ std::thread pusher_thread_;
+};
+
+} // namespace aos::util
+
+#include "aos/util/threaded_queue_tmpl.h"
+#endif // AOS_UTIL_THREADED_QUEUE_H_
diff --git a/aos/util/threaded_queue_test.cc b/aos/util/threaded_queue_test.cc
new file mode 100644
index 0000000..4f27c57
--- /dev/null
+++ b/aos/util/threaded_queue_test.cc
@@ -0,0 +1,105 @@
+#include "aos/util/threaded_queue.h"
+
+#include "gtest/gtest.h"
+
+namespace aos::util {
+
+TEST(ThreadedQueueTest, BasicFunction) {
+ std::atomic<int> counter{10000};
+ int state = 0;
+ std::atomic<int> observed_state{0};
+ ThreadedQueue<int, int> queue(
+ [&counter, &observed_state](const int state) {
+ // Because this handler always returns more_to_push = false, it will
+ // only get called when the queue is popped from.
+ observed_state = state;
+ int count = --counter;
+ return ThreadedQueue<int, int>::PushResult{count, false, count == 0};
+ },
+ state);
+ while (true) {
+ std::optional<int> peek_result = queue.Peek();
+ std::optional<int> pop_result = queue.Pop();
+ ASSERT_EQ(peek_result.has_value(), pop_result.has_value());
+ if (peek_result.has_value()) {
+ ASSERT_EQ(peek_result.value(), pop_result.value());
+ } else {
+ break;
+ }
+ state++;
+ queue.SetState(state);
+ }
+ ASSERT_EQ(counter, 0);
+ // Our API doesn't make any guarantee about the push/pop cycle being kept in
+ // lock-step, so just check that the observed state got incremente at all.
+ ASSERT_LT(1, observed_state);
+ ASSERT_EQ(state, 10000);
+}
+
+// Test running a queue where the consumer wants to have X entries pre-loaded
+// and so communicates its current state back to the pusher.
+TEST(ThreadedQueueTest, StatefulQueue) {
+ std::atomic<int> counter{10000};
+ int state = counter;
+ constexpr int kMaxLookahead = 10;
+ std::atomic<int> observed_state{0};
+ ThreadedQueue<int, int> queue(
+ [&counter, &observed_state](const int state) {
+ observed_state = state;
+ if (counter + kMaxLookahead < state) {
+ return ThreadedQueue<int, int>::PushResult{std::nullopt, false,
+ counter == 0};
+ } else {
+ int count = --counter;
+ return ThreadedQueue<int, int>::PushResult{count, true, count == 0};
+ }
+ },
+ state);
+ while (true) {
+ std::optional<int> peek_result = queue.Peek();
+ std::optional<int> pop_result = queue.Pop();
+ ASSERT_EQ(peek_result.has_value(), pop_result.has_value());
+ if (peek_result.has_value()) {
+ ASSERT_EQ(peek_result.value(), pop_result.value());
+ } else {
+ break;
+ }
+ for (int ii = 0; ii < 2 * kMaxLookahead; ++ii) {
+ // Trigger the internal condition variable a bunch of times to cause
+ // trouble.
+ queue.Peek();
+ }
+ // The pusher should never be more than the permissible distance ahead.
+ ASSERT_GE(counter + kMaxLookahead + 1, state);
+ ASSERT_GE(observed_state, state);
+ state--;
+ queue.SetState(state);
+ // Periodically pause, ensure that the pusher has enough time to catch up,
+ // and check that it has indeed pre-queued kMaxLookahead items.
+ if (state % 1000 == 0 && state > 0) {
+ queue.WaitForNoMoreWork();
+ ASSERT_EQ(observed_state, state);
+ ASSERT_EQ(counter + kMaxLookahead + 1, state);
+ }
+ }
+ ASSERT_EQ(counter, 0);
+ ASSERT_EQ(state, 0);
+}
+
+
+// Tests that we can exit early without any issues.
+TEST(ThreadedQueueTest, ExitEarly) {
+ // There used to exist a deadlock in this case where StopPushing would
+ // improperly synchronize things internally, but required very (un)lucky
+ // timing to hit.
+ for (int ii = 0; ii < 10000; ++ii) {
+ ThreadedQueue<int, int> queue(
+ [](int) {
+ return ThreadedQueue<int, int>::PushResult{971, false, false};
+ },
+ 0);
+ queue.StopPushing();
+ }
+}
+
+} // namespace aos::util
diff --git a/aos/util/threaded_queue_tmpl.h b/aos/util/threaded_queue_tmpl.h
new file mode 100644
index 0000000..cb4cdfc
--- /dev/null
+++ b/aos/util/threaded_queue_tmpl.h
@@ -0,0 +1,99 @@
+namespace aos::util {
+template <typename T, typename SharedState>
+ThreadedQueue<T, SharedState>::ThreadedQueue(
+ std::function<PushResult(SharedState)> push_request_handler,
+ SharedState initial_state)
+ : popped_(&mutex_),
+ pushed_(&mutex_),
+ state_(initial_state),
+ pusher_thread_([this, push_request_handler]() {
+ while (true) {
+ PushResult result = push_request_handler(State());
+ {
+ MutexLocker locker(&mutex_);
+ done_ = done_ || result.done;
+ if (result.item.has_value()) {
+ queue_.push(std::move(result.item.value()));
+ }
+ pushed_.Broadcast();
+ if (done_) {
+ return;
+ }
+ if (result.more_to_push || state_updated_) {
+ continue;
+ } else {
+ pusher_waiting_ = true;
+ CHECK(!popped_.Wait());
+ pusher_waiting_ = false;
+ }
+ }
+ }
+ }) {}
+
+template <typename T, typename SharedState>
+ThreadedQueue<T, SharedState>::~ThreadedQueue() {
+ StopPushing();
+ pusher_thread_.join();
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::WaitForNoMoreWork() {
+ MutexLocker locker(&mutex_);
+ while (state_updated_ || (!pusher_waiting_ && !done_)) {
+ CHECK(!pushed_.Wait());
+ }
+}
+
+template <typename T, typename SharedState>
+SharedState ThreadedQueue<T, SharedState>::State() {
+ MutexLocker locker(&mutex_);
+ state_updated_ = false;
+ return state_;
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::SetState(const SharedState &state) {
+ MutexLocker locker(&mutex_);
+ state_ = state;
+ state_updated_ = true;
+ popped_.Broadcast();
+}
+
+template <typename T, typename SharedState>
+void ThreadedQueue<T, SharedState>::StopPushing() {
+ // Ensure that the mutex is locked before doing anything, to make sure that
+ // the pushing thread actually observes the change.
+ MutexLocker locker(&mutex_);
+ done_ = true;
+ popped_.Broadcast();
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::Peek() {
+ return PeekOrPop(false);
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::Pop() {
+ return PeekOrPop(true);
+}
+
+template <typename T, typename SharedState>
+std::optional<T> ThreadedQueue<T, SharedState>::PeekOrPop(bool pop) {
+ MutexLocker locker(&mutex_);
+ while (!done_ && queue_.empty()) {
+ CHECK(!pushed_.Wait());
+ }
+ if (queue_.empty()) {
+ return std::nullopt;
+ }
+ if (pop) {
+ T result = std::move(queue_.front());
+ queue_.pop();
+ popped_.Broadcast();
+ return result;
+ } else {
+ return queue_.front();
+ }
+}
+} // namespace aos::util
diff --git a/frc971/analysis/in_process_plotter.cc b/frc971/analysis/in_process_plotter.cc
index 545740d..33d999e 100644
--- a/frc971/analysis/in_process_plotter.cc
+++ b/frc971/analysis/in_process_plotter.cc
@@ -15,7 +15,8 @@
event_loop_factory_(&config_.message()),
event_loop_(event_loop_factory_.MakeEventLoop("plotter")),
plot_sender_(event_loop_->MakeSender<Plot>("/analysis")),
- web_proxy_(event_loop_.get(), aos::web_proxy::StoreHistory::kYes, -1),
+ web_proxy_(event_loop_.get(), event_loop_factory_.scheduler_epoll(),
+ aos::web_proxy::StoreHistory::kYes, -1),
builder_(plot_sender_.MakeBuilder()) {
web_proxy_.SetDataPath(kDataPath);
event_loop_->SkipTimingReport();
@@ -41,7 +42,11 @@
ColorWheelColor{.name = "white", .color = {1, 1, 1}});
}
-void Plotter::Spin() { event_loop_factory_.Run(); }
+void Plotter::Spin() {
+ // Set non-infinite replay rate to avoid pegging a full CPU.
+ event_loop_factory_.SetRealtimeReplayRate(1.0);
+ event_loop_factory_.Run();
+}
void Plotter::Title(std::string_view title) {
title_ = builder_.fbb()->CreateString(title);
diff --git a/y2020/control_loops/drivetrain/localizer_test.cc b/y2020/control_loops/drivetrain/localizer_test.cc
index f0bcd88..91b5505 100644
--- a/y2020/control_loops/drivetrain/localizer_test.cc
+++ b/y2020/control_loops/drivetrain/localizer_test.cc
@@ -91,7 +91,7 @@
return locations;
}
-constexpr std::chrono::seconds kPiTimeOffset(-10);
+constexpr std::chrono::seconds kPiTimeOffset(10);
} // namespace
namespace chrono = std::chrono;