Fix transmit timestamp logic in SimulatedNetworkBridge
We were seeing a case when monotonic_remote_transmit_times_ got out of
sync with the fetcher. This happened when another spurrious wakeup
triggered us to FetchNext out of sync with the actual wakeups.
This really just points to the logic being overly convoluted. Use
monotonic_remote_transmit_times_ as the source of truth for everything
and sync the fetcher to it instead, and only feed it with timestamps on
the actual wakeups (reliable messages at startup + reconnect need some
special handling, but everything else is easy).
Change-Id: I9c1cabda98ea9765129e6b5347d2a676536d3872
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index ae50917..df6ab81 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -68,6 +68,7 @@
void EventScheduler::Startup() {
++boot_count_;
+ cached_event_list_monotonic_time_ = kInvalidCachedTime();
CHECK(!is_running_);
MaybeRunOnStartup();
CHECK(called_started_);
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 55fa9a4..84201ed 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -210,6 +210,10 @@
void MaybeRunOnStartup();
void MaybeRunOnRun();
+ constexpr monotonic_clock::time_point kInvalidCachedTime() {
+ return monotonic_clock::max_time;
+ }
+
// Current execution time.
monotonic_clock::time_point monotonic_now_ = monotonic_clock::epoch();
@@ -237,7 +241,7 @@
bool called_started_ = false;
std::optional<distributed_clock::time_point> cached_epoch_;
monotonic_clock::time_point cached_event_list_monotonic_time_ =
- monotonic_clock::max_time;
+ kInvalidCachedTime();
distributed_clock::time_point cached_event_list_time_ =
distributed_clock::max_time;
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 4148738..1b05841 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -411,7 +411,8 @@
CHECK(!fell_behind_) << ": Got behind on "
<< configuration::StrippedChannelToString(
- simulated_channel_->channel());
+ simulated_channel_->channel())
+ << " on " << NodeName(event_loop()->node());
if (fn) {
Context context = msgs_.front()->context;
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index b717ea3..7a910b9 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -2495,6 +2495,63 @@
}
}
+// Simple class to call a function at a time with a timer.
+class FunctionScheduler {
+ public:
+ FunctionScheduler(aos::EventLoop *event_loop)
+ : event_loop_(event_loop), timer_(event_loop_->AddTimer([this]() {
+ IncrementTestTimer(event_loop_->context().monotonic_event_time);
+ })) {
+ timer_->set_name("function_timer");
+ event_loop_->OnRun([this]() {
+ IncrementTestTimer(event_loop_->context().monotonic_event_time);
+ });
+ }
+
+ // Schedules the function to be run at the provided time.
+ void ScheduleAt(std::function<void()> &&function,
+ aos::monotonic_clock::time_point time) {
+ functions_.insert(std::make_pair(time, std::move(function)));
+ timer_->Schedule(functions_.begin()->first);
+ }
+
+ private:
+ void IncrementTestTimer(aos::monotonic_clock::time_point now) {
+ while (true) {
+ if (functions_.empty()) return;
+ if (functions_.begin()->first > now) {
+ break;
+ }
+ CHECK_EQ(functions_.begin()->first, now);
+
+ functions_.begin()->second();
+ functions_.erase(functions_.begin());
+ }
+ timer_->Schedule(functions_.begin()->first);
+ }
+
+ aos::EventLoop *event_loop_;
+ aos::TimerHandler *timer_;
+
+ std::multimap<aos::monotonic_clock::time_point, std::function<void()>>
+ functions_;
+};
+
+// Struct to capture the expected time a message should be received (and it's
+// value). This is from the perspective of the node receiving the message.
+struct ExpectedTimestamps {
+ // The time that the message was published on the sending node's monotonic
+ // clock.
+ monotonic_clock::time_point remote_time;
+ // The time that the message was virtually transmitted over the virtual
+ // network on the sending node's monotonic clock.
+ monotonic_clock::time_point remote_transmit_time;
+ // The time that the message was received on the receiving node's clock.
+ monotonic_clock::time_point event_time;
+ // The value inside the message.
+ int value;
+};
+
// Tests that rapidly sent messages get timestamped correctly.
TEST(SimulatedEventLoopTest, TransmitTimestamps) {
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
@@ -2517,74 +2574,90 @@
{
::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+ FunctionScheduler run_at(ping_event_loop.get());
aos::Sender<examples::Ping> test_message_sender =
ping_event_loop->MakeSender<examples::Ping>("/reliable");
+ aos::monotonic_clock::time_point now = ping_event_loop->monotonic_now();
for (const std::chrono::nanoseconds dt :
{chrono::microseconds(5000), chrono::microseconds(1),
chrono::microseconds(2), chrono::microseconds(70),
- chrono::microseconds(63)}) {
- factory.RunFor(dt);
- SendPing(&test_message_sender, 1);
+ chrono::microseconds(63), chrono::microseconds(140)}) {
+ now += dt;
+ run_at.ScheduleAt([&]() { SendPing(&test_message_sender, 1); }, now);
}
- factory.RunFor(chrono::milliseconds(10));
+ now += chrono::milliseconds(10);
+
+ factory.RunFor(now - ping_event_loop->monotonic_now());
}
- ASSERT_TRUE(fetcher.FetchNext());
- EXPECT_EQ(fetcher.context().monotonic_remote_time,
- monotonic_clock::epoch() + chrono::microseconds(5000));
- // First message shows up after wakeup + network delay as expected.
- EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
- monotonic_clock::epoch() + chrono::microseconds(5000) +
- factory.send_delay());
- EXPECT_EQ(fetcher.context().monotonic_event_time,
- monotonic_clock::epoch() + chrono::microseconds(5000) +
- factory.send_delay() + factory.network_delay());
+ const monotonic_clock::time_point e = monotonic_clock::epoch();
+ const chrono::nanoseconds send_delay = factory.send_delay();
+ const chrono::nanoseconds network_delay = factory.network_delay();
- ASSERT_TRUE(fetcher.FetchNext());
- EXPECT_EQ(fetcher.context().monotonic_remote_time,
- monotonic_clock::epoch() + chrono::microseconds(5001));
- // Next message is close enough that it gets picked up at the same wakeup.
- EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
- monotonic_clock::epoch() + chrono::microseconds(5000) +
- factory.send_delay());
- EXPECT_EQ(fetcher.context().monotonic_event_time,
- monotonic_clock::epoch() + chrono::microseconds(5000) +
- factory.send_delay() + factory.network_delay());
+ const std::vector<ExpectedTimestamps> expected_values = {
+ // First message shows up after wakeup + network delay as expected.
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(5000),
+ .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
+ .event_time =
+ e + chrono::microseconds(5000) + send_delay + network_delay,
+ .value = 1,
+ },
+ // Next message is close enough that it gets picked up at the same wakeup.
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(5001),
+ .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
+ .event_time =
+ e + chrono::microseconds(5000) + send_delay + network_delay,
+ .value = 1,
+ },
+ // Same for the third.
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(5003),
+ .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
+ .event_time =
+ e + chrono::microseconds(5000) + send_delay + network_delay,
+ .value = 1,
+ },
+ // Fourth waits long enough to do the right thing.
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(5073),
+ .remote_transmit_time = e + chrono::microseconds(5073) + send_delay,
+ .event_time =
+ e + chrono::microseconds(5073) + send_delay + network_delay,
+ .value = 1,
+ },
+ // Fifth waits long enough to do the right thing as well (but kicks off
+ // while the fourth is in flight over the network).
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(5136),
+ .remote_transmit_time = e + chrono::microseconds(5136) + send_delay,
+ .event_time =
+ e + chrono::microseconds(5136) + send_delay + network_delay,
+ .value = 1,
+ },
+ // Sixth waits long enough to do the right thing as well (but kicks off
+ // while the fifth is in flight over the network and has almost landed).
+ // The timer wakeup for the Timestamp message coming back will find the
+ // sixth message a little bit early.
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(5276),
+ .remote_transmit_time = e + chrono::microseconds(5273) + send_delay,
+ .event_time =
+ e + chrono::microseconds(5273) + send_delay + network_delay,
+ .value = 1,
+ },
+ };
- ASSERT_TRUE(fetcher.FetchNext());
- EXPECT_EQ(fetcher.context().monotonic_remote_time,
- monotonic_clock::epoch() + chrono::microseconds(5003));
- // Same for the third.
- EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
- monotonic_clock::epoch() + chrono::microseconds(5000) +
- factory.send_delay());
- EXPECT_EQ(fetcher.context().monotonic_event_time,
- monotonic_clock::epoch() + chrono::microseconds(5000) +
- factory.send_delay() + factory.network_delay());
-
- ASSERT_TRUE(fetcher.FetchNext());
- EXPECT_EQ(fetcher.context().monotonic_remote_time,
- monotonic_clock::epoch() + chrono::microseconds(5073));
- // Fourth waits long enough to do the right thing.
- EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
- monotonic_clock::epoch() + chrono::microseconds(5073) +
- factory.send_delay());
- EXPECT_EQ(fetcher.context().monotonic_event_time,
- monotonic_clock::epoch() + chrono::microseconds(5073) +
- factory.send_delay() + factory.network_delay());
-
- ASSERT_TRUE(fetcher.FetchNext());
- EXPECT_EQ(fetcher.context().monotonic_remote_time,
- monotonic_clock::epoch() + chrono::microseconds(5136));
- // Fifth waits long enough to do the right thing as well (but kicks off while
- // the fourth is in flight over the network).
- EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
- monotonic_clock::epoch() + chrono::microseconds(5136) +
- factory.send_delay());
- EXPECT_EQ(fetcher.context().monotonic_event_time,
- monotonic_clock::epoch() + chrono::microseconds(5136) +
- factory.send_delay() + factory.network_delay());
+ for (const ExpectedTimestamps value : expected_values) {
+ ASSERT_TRUE(fetcher.FetchNext());
+ EXPECT_EQ(fetcher.context().monotonic_remote_time, value.remote_time);
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ value.remote_transmit_time);
+ EXPECT_EQ(fetcher.context().monotonic_event_time, value.event_time);
+ EXPECT_EQ(fetcher->value(), value.value);
+ }
ASSERT_FALSE(fetcher.FetchNext());
}
@@ -2615,13 +2688,17 @@
{
aos::Sender<examples::Ping> pi1_reliable_sender =
pi1_event_loop->MakeSender<examples::Ping>("/reliable");
+ FunctionScheduler run_at(pi1_event_loop.get());
+ aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
for (int i = 0; i < 100; ++i) {
- SendPing(&pi1_reliable_sender, i);
- factory.RunFor(chrono::milliseconds(100));
+ run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_reliable_sender, i); },
+ now);
+ now += chrono::milliseconds(100);
}
- }
+ now += chrono::milliseconds(50);
- factory.RunFor(chrono::milliseconds(50));
+ factory.RunFor(now - pi1_event_loop->monotonic_now());
+ }
ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
@@ -2640,7 +2717,345 @@
factory.network_delay());
ASSERT_EQ(pi2_reliable_fetcher->value(), 99);
+ // TODO(austin): Verify that the dropped packet count increases.
+
ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
}
+// Tests that if we disconnect while a message is in various states of being
+// queued, it gets either dropped or sent as expected.
+TEST_F(SimulatedEventLoopDisconnectTest, MessageInFlightDuringDisconnect) {
+ time.StartEqual();
+ factory.SkipTimingReport();
+ factory.DisableStatistics();
+
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+ NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+ std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
+
+ std::unique_ptr<aos::EventLoop> pi2_event_loop =
+ pi2->MakeEventLoop("fetcher");
+ aos::Fetcher<examples::Ping> fetcher =
+ pi2_event_loop->MakeFetcher<examples::Ping>("/unreliable");
+
+ ASSERT_FALSE(fetcher.Fetch());
+
+ aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
+ {
+ FunctionScheduler run_at(pi1_event_loop.get());
+ aos::Sender<examples::Ping> pi1_sender =
+ pi1_event_loop->MakeSender<examples::Ping>("/unreliable");
+
+ int i = 0;
+ for (const std::chrono::nanoseconds dt :
+ {chrono::microseconds(5000), chrono::microseconds(1),
+ chrono::microseconds(2), chrono::microseconds(70),
+ chrono::microseconds(63), chrono::microseconds(140),
+ chrono::microseconds(160)}) {
+ run_at.ScheduleAt(
+ [&]() {
+ pi1->Connect(pi2->node());
+ pi2->Connect(pi1->node());
+ },
+ now);
+
+ now += chrono::milliseconds(100);
+
+ run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); }, now);
+
+ now += dt;
+
+ run_at.ScheduleAt(
+ [&]() {
+ // Fully disconnect the nodes.
+ pi1->Disconnect(pi2->node());
+ pi2->Disconnect(pi1->node());
+ },
+ now);
+
+ now += chrono::milliseconds(100) - dt;
+ ++i;
+ }
+
+ factory.RunFor(now - pi1_event_loop->monotonic_now());
+ }
+
+ const monotonic_clock::time_point e = monotonic_clock::epoch();
+ const chrono::nanoseconds send_delay = factory.send_delay();
+ const chrono::nanoseconds network_delay = factory.network_delay();
+
+ const std::vector<ExpectedTimestamps> expected_values = {
+ ExpectedTimestamps{
+ .remote_time = e + chrono::milliseconds(100),
+ .remote_transmit_time = e + chrono::milliseconds(100) + send_delay,
+ .event_time =
+ e + chrono::milliseconds(100) + send_delay + network_delay,
+ .value = 0,
+ },
+ ExpectedTimestamps{
+ .remote_time = e + chrono::milliseconds(1300),
+ .remote_transmit_time = e + chrono::milliseconds(1300) + send_delay,
+ .event_time =
+ e + chrono::milliseconds(1300) + send_delay + network_delay,
+ .value = 6,
+ },
+ };
+
+ for (const ExpectedTimestamps value : expected_values) {
+ ASSERT_TRUE(fetcher.FetchNext());
+ EXPECT_EQ(fetcher.context().monotonic_remote_time, value.remote_time);
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ value.remote_transmit_time);
+ EXPECT_EQ(fetcher.context().monotonic_event_time, value.event_time);
+ EXPECT_EQ(fetcher->value(), value.value);
+ }
+
+ // TODO(austin): Verify that the dropped packet count increases.
+
+ ASSERT_FALSE(fetcher.Fetch());
+}
+
+class PingLogger {
+ public:
+ PingLogger(aos::EventLoop *event_loop, std::string_view channel,
+ std::vector<std::pair<aos::Context, int>> *msgs)
+ : event_loop_(event_loop),
+ fetcher_(event_loop_->MakeFetcher<examples::Ping>(channel)),
+ msgs_(msgs) {
+ event_loop_->OnRun([this]() { CHECK(!fetcher_.Fetch()); });
+ }
+
+ ~PingLogger() {
+ while (fetcher_.FetchNext()) {
+ msgs_->emplace_back(fetcher_.context(), fetcher_->value());
+ }
+ }
+
+ private:
+ aos::EventLoop *event_loop_;
+ aos::Fetcher<examples::Ping> fetcher_;
+ std::vector<std::pair<aos::Context, int>> *msgs_;
+};
+
+// Tests that rebooting while a message is in flight works as expected.
+TEST_F(SimulatedEventLoopDisconnectTest, MessageInFlightDuringReboot) {
+ time.StartEqual();
+ for (int i = 0; i < 8; ++i) {
+ time.RebootAt(1, distributed_clock::epoch() + chrono::seconds(10 * i));
+ }
+
+ factory.SkipTimingReport();
+ factory.DisableStatistics();
+
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+ NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+ std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
+
+ aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
+ FunctionScheduler run_at(pi1_event_loop.get());
+ aos::Sender<examples::Ping> pi1_sender =
+ pi1_event_loop->MakeSender<examples::Ping>("/unreliable");
+
+ int i = 0;
+ for (const std::chrono::nanoseconds dt :
+ {chrono::microseconds(5000), chrono::microseconds(1),
+ chrono::microseconds(2), chrono::microseconds(70),
+ chrono::microseconds(63), chrono::microseconds(140),
+ chrono::microseconds(160)}) {
+ run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); },
+ now + chrono::seconds(10) - dt);
+
+ now += chrono::seconds(10);
+ ++i;
+ }
+
+ std::vector<std::pair<aos::Context, int>> msgs;
+
+ pi2->OnStartup([pi2, &msgs]() {
+ pi2->AlwaysStart<PingLogger>("ping_logger", "/unreliable", &msgs);
+ });
+
+ factory.RunFor(now - pi1_event_loop->monotonic_now() + chrono::seconds(10));
+
+ const monotonic_clock::time_point e = monotonic_clock::epoch();
+ const chrono::nanoseconds send_delay = factory.send_delay();
+ const chrono::nanoseconds network_delay = factory.network_delay();
+
+ const std::vector<ExpectedTimestamps> expected_values = {
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(9995000),
+ .remote_transmit_time =
+ e + chrono::microseconds(9995000) + send_delay,
+ .event_time =
+ e + chrono::microseconds(9995000) + send_delay + network_delay,
+ .value = 0,
+ },
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(19999999),
+ .remote_transmit_time =
+ e + chrono::microseconds(19999999) + send_delay,
+ .event_time =
+ e + chrono::microseconds(-1) + send_delay + network_delay,
+ .value = 1,
+ },
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(29999998),
+ .remote_transmit_time =
+ e + chrono::microseconds(29999998) + send_delay,
+ .event_time =
+ e + chrono::microseconds(-2) + send_delay + network_delay,
+ .value = 2,
+ },
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(69999840),
+ .remote_transmit_time =
+ e + chrono::microseconds(69999840) + send_delay,
+ .event_time =
+ e + chrono::microseconds(9999840) + send_delay + network_delay,
+ .value = 6,
+ },
+ };
+
+ ASSERT_EQ(msgs.size(), expected_values.size());
+
+ for (size_t i = 0; i < msgs.size(); ++i) {
+ EXPECT_EQ(msgs[i].first.monotonic_remote_time,
+ expected_values[i].remote_time);
+ EXPECT_EQ(msgs[i].first.monotonic_remote_transmit_time,
+ expected_values[i].remote_transmit_time);
+ EXPECT_EQ(msgs[i].first.monotonic_event_time,
+ expected_values[i].event_time);
+ EXPECT_EQ(msgs[i].second, expected_values[i].value);
+ }
+
+ // TODO(austin): Verify that the dropped packet count increases.
+}
+
+// Tests that rebooting while a message is in flight works as expected.
+TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageInFlightDuringReboot) {
+ time.StartEqual();
+ for (int i = 0; i < 8; ++i) {
+ time.RebootAt(1, distributed_clock::epoch() + chrono::seconds(10 * i));
+ }
+
+ factory.SkipTimingReport();
+ factory.DisableStatistics();
+
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+ NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+ std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
+
+ aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
+ FunctionScheduler run_at(pi1_event_loop.get());
+ aos::Sender<examples::Ping> pi1_sender =
+ pi1_event_loop->MakeSender<examples::Ping>("/reliable");
+
+ int i = 0;
+ for (const std::chrono::nanoseconds dt :
+ {chrono::microseconds(5000), chrono::microseconds(1),
+ chrono::microseconds(2), chrono::microseconds(70),
+ chrono::microseconds(63), chrono::microseconds(140),
+ chrono::microseconds(160)}) {
+ run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); },
+ now + chrono::seconds(10) - dt);
+
+ now += chrono::seconds(10);
+ ++i;
+ }
+
+ std::vector<std::pair<aos::Context, int>> msgs;
+
+ PingLogger *logger;
+ pi2->OnStartup([pi2, &msgs, &logger]() {
+ logger = pi2->AlwaysStart<PingLogger>("ping_logger", "/reliable", &msgs);
+ });
+
+ factory.RunFor(now - pi1_event_loop->monotonic_now() + chrono::seconds(10));
+
+ // Stop the logger to flush the last boot of data.
+ pi2->Stop(logger);
+
+ const monotonic_clock::time_point e = monotonic_clock::epoch();
+ const chrono::nanoseconds send_delay = factory.send_delay();
+ const chrono::nanoseconds network_delay = factory.network_delay();
+
+ // Verified using --vmodule=simulated_event_loop=1 and looking at the actual
+ // event times to confirm what should have been forwarded when.
+ const std::vector<ExpectedTimestamps> expected_values = {
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(9995000),
+ .remote_transmit_time =
+ e + chrono::microseconds(9995000) + send_delay,
+ .event_time =
+ e + chrono::microseconds(9995000) + send_delay + network_delay,
+ .value = 0,
+ },
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(9995000),
+ .remote_transmit_time = e + chrono::microseconds(10000000),
+ .event_time = e + network_delay,
+ .value = 0,
+ },
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(19999999),
+ .remote_transmit_time = e + chrono::microseconds(20000000),
+ .event_time = e + network_delay,
+ .value = 1,
+ },
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(29999998),
+ .remote_transmit_time = e + chrono::microseconds(30000000),
+ .event_time = e + network_delay,
+ .value = 2,
+ },
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(39999930),
+ .remote_transmit_time = e + chrono::microseconds(40000000),
+ .event_time = e + network_delay,
+ .value = 3,
+ },
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(49999937),
+ .remote_transmit_time = e + chrono::microseconds(50000000),
+ .event_time = e + network_delay,
+ .value = 4,
+ },
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(59999860),
+ .remote_transmit_time = e + chrono::microseconds(60000000),
+ .event_time = e + network_delay,
+ .value = 5,
+ },
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(69999840),
+ .remote_transmit_time = e + chrono::microseconds(69999890),
+ .event_time = e + chrono::microseconds(9999890) + network_delay,
+ .value = 6,
+ },
+ ExpectedTimestamps{
+ .remote_time = e + chrono::microseconds(69999840),
+ .remote_transmit_time = e + chrono::microseconds(70000000),
+ .event_time = e + network_delay,
+ .value = 6,
+ },
+ };
+
+ ASSERT_EQ(msgs.size(), expected_values.size());
+
+ for (size_t i = 0; i < msgs.size(); ++i) {
+ EXPECT_EQ(msgs[i].first.monotonic_remote_time,
+ expected_values[i].remote_time);
+ EXPECT_EQ(msgs[i].first.monotonic_remote_transmit_time,
+ expected_values[i].remote_transmit_time);
+ EXPECT_EQ(msgs[i].first.monotonic_event_time,
+ expected_values[i].event_time);
+ EXPECT_EQ(msgs[i].second, expected_values[i].value);
+ }
+
+ // TODO(austin): Verify that the dropped packet count increases.
+}
+
} // namespace aos::testing
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index e070270..66c902f 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -13,8 +13,10 @@
// This class delays messages forwarded between two factories.
//
// The basic design is that we need to use the distributed_clock to convert
-// monotonic times from the source to the destination node. We also use a
-// fetcher to manage the queue of data, and a timer to schedule the sends.
+// monotonic times from the source to the destination node. We use a list of
+// timestamps added each time a message is delivered to the server side to drive
+// the client side publishing. This pulls the data from the fetcher to match
+// with the timestamps queued.
class RawMessageDelayer {
public:
RawMessageDelayer(const Channel *channel, const Connection *connection,
@@ -42,9 +44,8 @@
void SetFetchEventLoop(aos::EventLoop *fetch_event_loop,
MessageBridgeServerStatus *server_status,
ChannelTimestampSender *timestamp_loggers) {
- sent_ = false;
- reliable_scheduled_ = false;
- published_ = false;
+ // Clear out state when the source node restarts.
+ last_sent_ = TransmitTime();
fetch_event_loop_ = fetch_event_loop;
if (fetch_event_loop_) {
fetcher_ = fetch_event_loop_->MakeRawFetcher(channel_);
@@ -123,12 +124,15 @@
const Channel *channel() const { return channel_; }
- uint32_t time_to_live() {
+ // Returns true if the connection is reliable.
+ bool reliable() const { return time_to_live() == 0; }
+
+ uint32_t time_to_live() const {
return configuration::ConnectionToNode(channel_, send_node_factory_->node())
->time_to_live();
}
- std::string Name() {
+ std::string Name() const {
std::string result;
result +=
(fetch_event_loop_ ? fetch_event_loop_->node()->name()->string_view()
@@ -142,42 +146,52 @@
return result;
}
+ // Schedules forwarding any reliable messages when a node boots.
void ScheduleReliable() {
if (forwarding_disabled()) {
return;
}
+ // There is no sending side awake, don't do work.
if (!fetcher_) {
return;
}
- if (fetcher_->context().data == nullptr || sent_) {
- fetcher_->Fetch();
- sent_ = fetcher_->context().data == nullptr;
- published_ = sent_;
- reliable_scheduled_ = true;
+
+ // The network connection is disconnected, forget about this message. If
+ // this is a reliable message, it will get picked up in Connect() so we
+ // don't need to follow it here.
+ if (server_connection_->state() != State::CONNECTED) {
+ return;
}
+ // If there is no receiving side, bail.
if (!timer_) {
return;
}
- if (server_connection_->state() != State::CONNECTED) {
- reliable_scheduled_ = false;
- sent_ = true;
+ // We only want the newest message, grab it and see if there's anything to
+ // do.
+ fetcher_->Fetch();
+
+ // No data, bail.
+ if (fetcher_->context().data == nullptr) {
return;
}
- FetchNext();
- if (fetcher_->context().data == nullptr || sent_) {
- return;
- }
+ // Now, we know we've got a message we need to deliver, mark it down.
+ QueueMessage(fetcher_->context().queue_index,
+ fetcher_->context().monotonic_event_time,
+ fetch_event_loop_->monotonic_now());
// Send at startup. It is the best we can do.
- const monotonic_clock::time_point monotonic_delivered_time =
- send_node_factory_->monotonic_now() +
- send_node_factory_->network_delay();
+ const logger::BootTimestamp monotonic_delivery_time =
+ DeliveredTime(monotonic_remote_transmit_times_.front().transmit_time);
- CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
+ // This can only happen if a node reboots in under 100 uS. That's crazy,
+ // CHECK for now and handle it if someone actually has a good need.
+ CHECK_EQ(monotonic_delivery_time.boot, send_node_factory_->boot_count());
+
+ CHECK_GE(monotonic_delivery_time.time, send_node_factory_->monotonic_now())
<< ": Trying to deliver message in the past on channel "
<< configuration::StrippedChannelToString(fetcher_->channel())
<< " to node " << send_event_loop_->node()->name()->string_view()
@@ -186,42 +200,40 @@
if (!timer_scheduled_) {
server_status_->AddSentPacket(server_index_, channel_);
- timer_->Schedule(monotonic_delivered_time);
+ timer_->Schedule(monotonic_delivery_time.time);
timer_scheduled_ = true;
-
- QueueTransmitTimestamp(fetcher_->context().queue_index,
- fetcher_->context().monotonic_event_time,
- fetch_event_loop_->monotonic_now());
}
}
- bool timer_scheduled_ = false;
-
+ // Handles a message begin delivered to message_bridge_server, and either
+ // drops it or queues it up.
void MessageWatcherCallback(uint32_t sent_queue_index,
monotonic_clock::time_point monotonic_sent_time,
monotonic_clock::time_point transmit_time) {
- if (!reliable_scheduled_) {
- QueueTransmitTimestamp(sent_queue_index, monotonic_sent_time,
- transmit_time);
- } else {
- reliable_scheduled_ = false;
+ if (server_connection_->state() != State::CONNECTED) {
+ server_status_->AddDroppedPacket(server_index_, channel_);
+ return;
}
+
+ QueueMessage(sent_queue_index, monotonic_sent_time, transmit_time);
Schedule();
}
- void QueueTransmitTimestamp(uint32_t sent_queue_index,
- monotonic_clock::time_point monotonic_sent_time,
- monotonic_clock::time_point transmit_time) {
- if (forwarding_disabled()) return;
+ void QueueMessage(uint32_t sent_queue_index,
+ monotonic_clock::time_point monotonic_sent_time,
+ monotonic_clock::time_point transmit_time) {
+ CHECK(!forwarding_disabled());
+ // When a reliable message gets queued, we can both receive the wakeup from
+ // the watcher, and from ScheduleReliable. In that case, detect that it is
+ // already in the queue and deduplicate with it.
if (monotonic_remote_transmit_times_.size() > 0u) {
- // FetchNext can discover messages before we do in the same nanosecond. In
- // that case, make sure the contents match and don't add it a second time.
- auto back = monotonic_remote_transmit_times_
+ const TransmitTime back = monotonic_remote_transmit_times_
[monotonic_remote_transmit_times_.size() - 1];
if (back.sent_queue_index == sent_queue_index) {
CHECK_EQ(back.monotonic_sent_time, monotonic_sent_time) << this;
- CHECK_EQ(back.transmit_time, transmit_time) << this;
+ CHECK(reliable());
+ CHECK_LE(back.transmit_time, transmit_time) << this;
return;
}
}
@@ -235,24 +247,41 @@
});
}
+ // Handles this node connecting to the network.
void Connect() {
- if (time_to_live() == 0 && published_ == false) {
- if (forwarding_disabled()) {
- return;
- }
- CHECK(fetcher_);
+ CHECK(fetcher_);
- fetcher_->Fetch();
- sent_ = fetcher_->context().data == nullptr;
- reliable_scheduled_ = true;
+ // We only send the last message. Point the fetcher to the latest to handle
+ // getting too far behind.
+ fetcher_->Fetch();
- QueueTransmitTimestamp(fetcher_->context().queue_index,
- fetcher_->context().monotonic_event_time,
- fetch_event_loop_->monotonic_now());
- Schedule();
+ // Unreliable messages aren't resent on reconnect.
+ if (!reliable()) {
+ return;
}
+
+ if (forwarding_disabled()) {
+ return;
+ }
+
+ // Ignore it if there is no data.
+ if (fetcher_->context().data == nullptr) {
+ return;
+ }
+
+ // See if the newest message got sent already. If it hasn't, queue it up to
+ // be sent.
+ if (fetcher_->context().queue_index != last_sent_.sent_queue_index) {
+ QueueMessage(fetcher_->context().queue_index,
+ fetcher_->context().monotonic_event_time,
+ fetch_event_loop_->monotonic_now());
+ }
+
+ Schedule();
}
+ // Returns true if we know that this connection sends to the destination node.
+ // Returns false if the destination hasn't been constructed.
bool SendingTo(const Node *destination) {
return send_event_loop_ && send_event_loop_->node() == destination;
}
@@ -260,27 +289,40 @@
// Kicks us to re-fetch and schedule the timer.
void Schedule() {
CHECK(!forwarding_disabled());
+ // Can't receive, bail.
if (!fetcher_) {
return;
}
+
+ // Already scheduled, nothing to see here.
if (timer_scheduled_) {
return;
}
- FetchNext();
- if (fetcher_->context().data == nullptr || sent_) {
+
+ // We've finally caught up, nothing to do.
+ if (monotonic_remote_transmit_times_.empty()) {
return;
}
- CHECK_GT(monotonic_remote_transmit_times_.size(), 0u) << this;
const monotonic_clock::time_point transmit_time =
monotonic_remote_transmit_times_[0].transmit_time;
// Compute the time to publish this message.
- const monotonic_clock::time_point monotonic_delivered_time =
+ const logger::BootTimestamp monotonic_delivery_time =
DeliveredTime(transmit_time);
- CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
- << ": Trying to deliver message in the past on channel "
+ // This should be published after the reboot. Forget about it.
+ if (monotonic_delivery_time.boot != send_node_factory_->boot_count()) {
+ CHECK_GT(monotonic_delivery_time.boot, send_node_factory_->boot_count());
+
+ monotonic_remote_transmit_times_.erase(
+ monotonic_remote_transmit_times_.begin());
+ CHECK(monotonic_remote_transmit_times_.empty());
+ return;
+ }
+
+ CHECK_GE(monotonic_delivery_time.time, send_node_factory_->monotonic_now())
+ << ": " << this << " Trying to deliver message in the past on channel "
<< configuration::StrippedChannelToString(fetcher_->channel())
<< " to node " << send_event_loop_->node()->name()->string_view()
<< " sent from " << fetcher_->channel()->source_node()->string_view()
@@ -288,96 +330,40 @@
CHECK(timer_);
server_status_->AddSentPacket(server_index_, channel_);
- timer_->Schedule(monotonic_delivered_time);
+ timer_->Schedule(monotonic_delivery_time.time);
timer_scheduled_ = true;
}
private:
- void FetchNext() {
- CHECK(server_connection_);
- // Keep pulling messages out of the fetcher until we find one in the future.
- while (true) {
- if (fetcher_->context().data == nullptr || sent_) {
- sent_ = !fetcher_->FetchNext();
- if (!sent_) {
- published_ = false;
- }
- if (!sent_) {
- if (monotonic_remote_transmit_times_.size() == 0u) {
- QueueTransmitTimestamp(fetcher_->context().queue_index,
- fetcher_->context().monotonic_event_time,
- fetch_event_loop_->monotonic_now());
- }
- }
- }
- if (sent_) {
- break;
- }
-
- if (server_connection_->state() != State::CONNECTED) {
- CHECK_GT(monotonic_remote_transmit_times_.size(), 0u) << this;
- CHECK_EQ(monotonic_remote_transmit_times_[0].monotonic_sent_time,
- fetcher_->context().monotonic_event_time)
- << this << " " << Name();
- CHECK_EQ(monotonic_remote_transmit_times_[0].sent_queue_index,
- fetcher_->context().queue_index)
- << this << " " << Name();
-
- monotonic_remote_transmit_times_.erase(
- monotonic_remote_transmit_times_.begin());
- sent_ = true;
- reliable_scheduled_ = false;
- published_ = false;
- server_status_->AddDroppedPacket(server_index_, channel_);
- continue;
- }
-
- if (fetcher_->context().monotonic_event_time +
- send_node_factory_->network_delay() +
- send_node_factory_->send_delay() >=
- fetch_node_factory_->monotonic_now() ||
- time_to_live() == 0) {
- break;
- }
-
- // TODO(austin): Not cool. We want to actually forward these. This means
- // we need a more sophisticated concept of what is running.
- LOG(WARNING) << "Not forwarding message on "
- << configuration::CleanedChannelToString(fetcher_->channel())
- << " because we aren't running. Sent at "
- << fetcher_->context().monotonic_event_time << " now is "
- << fetch_node_factory_->monotonic_now();
- sent_ = true;
- reliable_scheduled_ = false;
- published_ = false;
- server_status_->AddDroppedPacket(server_index_, channel_);
- }
- }
-
// Actually sends the message, and reschedules.
void Send() {
timer_scheduled_ = false;
+
CHECK(sender_);
CHECK(client_status_);
+ CHECK(fetcher_);
+
+ CHECK(!monotonic_remote_transmit_times_.empty());
+ while (fetcher_->context().queue_index !=
+ monotonic_remote_transmit_times_.front().sent_queue_index) {
+ if (!fetcher_->FetchNext()) {
+ break;
+ }
+ }
// Confirm that the first element in the times list is ours, and pull the
// transmit time out of it.
- CHECK(!monotonic_remote_transmit_times_.empty());
CHECK_EQ(monotonic_remote_transmit_times_[0].monotonic_sent_time,
fetcher_->context().monotonic_event_time);
CHECK_EQ(monotonic_remote_transmit_times_[0].sent_queue_index,
fetcher_->context().queue_index);
- const monotonic_clock::time_point monotonic_remote_transmit_time =
- monotonic_remote_transmit_times_[0].transmit_time;
+ const TransmitTime timestamp = monotonic_remote_transmit_times_[0];
monotonic_remote_transmit_times_.erase(
monotonic_remote_transmit_times_.begin());
if (server_connection_->state() != State::CONNECTED) {
- sent_ = true;
- reliable_scheduled_ = false;
- published_ = false;
Schedule();
return;
}
@@ -386,11 +372,14 @@
sender_->CheckOk(sender_->Send(
fetcher_->context().data, fetcher_->context().size,
fetcher_->context().monotonic_event_time,
- fetcher_->context().realtime_event_time, monotonic_remote_transmit_time,
+ fetcher_->context().realtime_event_time, timestamp.transmit_time,
fetcher_->context().queue_index, fetcher_->context().source_boot_uuid));
+ // Record that this got sent.
+ last_sent_ = timestamp;
+
// And simulate message_bridge's offset recovery.
- client_status_->SampleFilter(client_index_, monotonic_remote_transmit_time,
+ client_status_->SampleFilter(client_index_, timestamp.transmit_time,
sender_->monotonic_sent_time(),
fetcher_->context().source_boot_uuid);
@@ -426,7 +415,7 @@
message_header_builder.add_remote_queue_index(
fetcher_->context().queue_index);
message_header_builder.add_monotonic_remote_transmit_time(
- monotonic_remote_transmit_time.time_since_epoch().count());
+ timestamp.transmit_time.time_since_epoch().count());
message_header_builder.add_monotonic_sent_time(
sender_->monotonic_sent_time().time_since_epoch().count());
message_header_builder.add_realtime_sent_time(
@@ -443,9 +432,6 @@
ScheduleTimestamp();
}
- sent_ = true;
- reliable_scheduled_ = false;
- published_ = true;
Schedule();
}
@@ -488,15 +474,14 @@
}
// Converts from time on the sending node to time on the receiving node.
- monotonic_clock::time_point DeliveredTime(
+ logger::BootTimestamp DeliveredTime(
const monotonic_clock::time_point transmit_time) const {
const distributed_clock::time_point distributed_sent_time =
fetch_node_factory_->ToDistributedClock(transmit_time);
const logger::BootTimestamp t = send_node_factory_->FromDistributedClock(
distributed_sent_time + send_node_factory_->network_delay());
- CHECK_EQ(t.boot, send_node_factory_->boot_count());
- return t.time;
+ return t;
}
const Channel *channel_;
@@ -512,6 +497,7 @@
aos::EventLoop *send_event_loop_ = nullptr;
// Timer used to send.
aos::TimerHandler *timer_ = nullptr;
+ bool timer_scheduled_ = false;
// Timer used to send timestamps out.
aos::TimerHandler *timestamp_timer_ = nullptr;
// Time that the timer is scheduled for. Used to track if it needs to be
@@ -525,10 +511,6 @@
MessageBridgeServerStatus *server_status_ = nullptr;
const size_t destination_node_index_;
- // True if we have sent the message in the fetcher.
- bool sent_ = false;
- bool published_ = false;
- bool reliable_scheduled_ = false;
ServerConnection *server_connection_ = nullptr;
int server_index_ = -1;
@@ -540,15 +522,19 @@
aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
struct TransmitTime {
- monotonic_clock::time_point monotonic_sent_time;
- uint32_t sent_queue_index;
- monotonic_clock::time_point transmit_time;
+ monotonic_clock::time_point monotonic_sent_time = monotonic_clock::min_time;
+ uint32_t sent_queue_index = 0xffffffff;
+ monotonic_clock::time_point transmit_time = monotonic_clock::min_time;
};
- // Stores tthe time the message was handed to the kernel to be published on
+ // Stores the time the message was handed to the kernel to be published on
// the remote node over the network for all forwarded relevant messages.
std::vector<TransmitTime> monotonic_remote_transmit_times_;
+ // Stores the last message which was published. This is used to know if we
+ // need to re-transmit something on reconnect or not.
+ TransmitTime last_sent_;
+
struct Timestamp {
Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
monotonic_clock::time_point new_monotonic_timestamp_time)
@@ -665,10 +651,9 @@
monotonic_clock::time_point monotonic_sent_time) {
for (std::unique_ptr<RawMessageDelayer> &delayer :
captured_delayers->v) {
- delayer->QueueTransmitTimestamp(
+ delayer->MessageWatcherCallback(
sent_queue_index, monotonic_sent_time,
source_event_loop->second.event_loop->monotonic_now());
- delayer->Schedule();
}
});
} else {
@@ -898,7 +883,7 @@
}
event_loop->OnRun([this]() {
for (RawMessageDelayer *destination_delayer : destination_delayers_) {
- if (destination_delayer->time_to_live() == 0) {
+ if (destination_delayer->reliable()) {
destination_delayer->ScheduleReliable();
}
}
@@ -917,7 +902,7 @@
// the message, then that would trigger the watchers in the delayers.
// However, we so far have continued to support Sending while stopped....
for (RawMessageDelayer *source_delayer : source_delayers_) {
- if (source_delayer->time_to_live() == 0) {
+ if (source_delayer->reliable()) {
source_delayer->ScheduleReliable();
}
}