Logger: Pipe the monotonic_remote_transmit_time through event loop
Populate the field in all the network bridges and pipe it through
all the required spots in the event loop. Update all the tests to match
the update.
As part of this, we realized that our modeling of network delay was
wrong. Wakeups don't always take 50 uS if something else triggers the
wakeup and the message is ready in shared memory. This wasn't being
handled properly.
Change-Id: Idf94c5c6d7c87f4d65868c71b1cceedca7bf3853
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index e500417..926d0c5 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -509,7 +509,7 @@
":multinode_pingpong_test_combined_config",
":multinode_pingpong_test_split_config",
],
- shard_count = 4,
+ shard_count = 16,
target_compatible_with = ["@platforms//os:linux"],
deps = [
":event_loop_param_test",
diff --git a/aos/events/context.h b/aos/events/context.h
index 6648949..c50b5c0 100644
--- a/aos/events/context.h
+++ b/aos/events/context.h
@@ -24,6 +24,9 @@
monotonic_clock::time_point monotonic_remote_time;
realtime_clock::time_point realtime_remote_time;
+ // Time that the message was published over the network on the remote node.
+ monotonic_clock::time_point monotonic_remote_transmit_time;
+
// Index in the queue.
uint32_t queue_index;
// Index into the remote queue. Useful to determine if data was lost. In a
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index ba71b97..424971d 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -83,9 +83,11 @@
RawSender::Error RawSender::DoSend(
const SharedSpan data, monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index, const UUID &source_boot_uuid) {
return DoSend(data->data(), data->size(), monotonic_remote_time,
- realtime_remote_time, remote_queue_index, source_boot_uuid);
+ realtime_remote_time, monotonic_remote_transmit_time,
+ remote_queue_index, source_boot_uuid);
}
void RawSender::RecordSendResult(const Error error, size_t message_size) {
@@ -113,6 +115,7 @@
timing_(event_loop_->ChannelIndex(channel)) {
context_.monotonic_event_time = monotonic_clock::min_time;
context_.monotonic_remote_time = monotonic_clock::min_time;
+ context_.monotonic_remote_transmit_time = monotonic_clock::min_time;
context_.realtime_event_time = realtime_clock::min_time;
context_.realtime_remote_time = realtime_clock::min_time;
context_.queue_index = 0xffffffff;
@@ -628,6 +631,7 @@
void EventLoop::ClearContext() {
context_.monotonic_event_time = monotonic_clock::min_time;
context_.monotonic_remote_time = monotonic_clock::min_time;
+ context_.monotonic_remote_transmit_time = monotonic_clock::min_time;
context_.realtime_event_time = realtime_clock::min_time;
context_.realtime_remote_time = realtime_clock::min_time;
context_.queue_index = 0xffffffffu;
@@ -642,6 +646,7 @@
monotonic_clock::time_point monotonic_event_time) {
context_.monotonic_event_time = monotonic_event_time;
context_.monotonic_remote_time = monotonic_clock::min_time;
+ context_.monotonic_remote_transmit_time = monotonic_clock::min_time;
context_.realtime_event_time = realtime_clock::min_time;
context_.realtime_remote_time = realtime_clock::min_time;
context_.queue_index = 0xffffffffu;
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index ac43acb..83393c2 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -140,12 +140,14 @@
// size() bytes into the data backed by data(). They then call Send to send.
// Returns Error::kOk on a successful send, or
// Error::kMessagesSentTooFast if messages were sent too fast. If provided,
- // monotonic_remote_time, realtime_remote_time, and remote_queue_index are
- // attached to the message and are available in the context on the read side.
- // If they are not populated, the read side will get the sent times instead.
+ // monotonic_remote_time, realtime_remote_time,
+ // monotonic_remote_transmit_time, and remote_queue_index are attached to the
+ // message and are available in the context on the read side. If they are not
+ // populated, the read side will get the sent times instead.
Error Send(size_t size);
Error Send(size_t size, monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index, const UUID &source_boot_uuid);
// Sends a single block of data by copying it.
@@ -155,6 +157,7 @@
Error Send(const void *data, size_t size,
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index, const UUID &source_boot_uuid);
// CHECKs that no sending Error occurred and logs the channel_ data if
@@ -168,6 +171,7 @@
Error Send(const SharedSpan data,
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index, const UUID &remote_boot_uuid);
const Channel *channel() const { return channel_; }
@@ -216,21 +220,22 @@
private:
friend class EventLoop;
- virtual Error DoSend(const void *data, size_t size,
- monotonic_clock::time_point monotonic_remote_time,
- realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index,
- const UUID &source_boot_uuid) = 0;
- virtual Error DoSend(size_t size,
- monotonic_clock::time_point monotonic_remote_time,
- realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index,
- const UUID &source_boot_uuid) = 0;
- virtual Error DoSend(const SharedSpan data,
- monotonic_clock::time_point monotonic_remote_time,
- realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index,
- const UUID &source_boot_uuid);
+ virtual Error DoSend(
+ const void *data, size_t size,
+ monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
+ uint32_t remote_queue_index, const UUID &source_boot_uuid) = 0;
+ virtual Error DoSend(
+ size_t size, monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
+ uint32_t remote_queue_index, const UUID &source_boot_uuid) = 0;
+ virtual Error DoSend(
+ const SharedSpan data, monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
+ uint32_t remote_queue_index, const UUID &source_boot_uuid);
void RecordSendResult(const Error error, size_t message_size);
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index d2d3aa9..4885f5c 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -358,6 +358,8 @@
EXPECT_EQ(fetcher.context().monotonic_event_time, monotonic_clock::min_time);
EXPECT_EQ(fetcher.context().monotonic_remote_time, monotonic_clock::min_time);
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero());
@@ -381,6 +383,8 @@
const aos::realtime_clock::time_point realtime_now = loop2->realtime_now();
EXPECT_EQ(fetcher.context().monotonic_event_time,
fetcher.context().monotonic_remote_time);
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
EXPECT_EQ(fetcher.context().realtime_event_time,
fetcher.context().realtime_remote_time);
@@ -429,6 +433,8 @@
monotonic_clock::min_time);
EXPECT_EQ(fetcher.context().monotonic_remote_time,
monotonic_clock::min_time);
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero());
@@ -449,6 +455,8 @@
EXPECT_EQ(fetcher.context().monotonic_event_time, monotonic_clock::min_time);
EXPECT_EQ(fetcher.context().monotonic_remote_time, monotonic_clock::min_time);
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero());
@@ -470,6 +478,8 @@
fetcher.context().monotonic_remote_time);
EXPECT_EQ(fetcher.context().realtime_event_time,
fetcher.context().realtime_remote_time);
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
EXPECT_GE(fetcher.context().monotonic_event_time, monotonic_now - kEpsilon);
EXPECT_LE(fetcher.context().monotonic_event_time, monotonic_now + kEpsilon);
@@ -1464,6 +1474,8 @@
auto test_timer = loop->AddTimer([this, ×, &expected_times, &loop]() {
times.push_back(loop->monotonic_now());
EXPECT_EQ(loop->context().monotonic_remote_time, monotonic_clock::min_time);
+ EXPECT_EQ(loop->context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
EXPECT_EQ(loop->context().realtime_event_time, realtime_clock::min_time);
EXPECT_EQ(loop->context().realtime_remote_time, realtime_clock::min_time);
EXPECT_EQ(loop->context().source_boot_uuid, loop->boot_uuid());
@@ -1914,6 +1926,8 @@
EXPECT_EQ(loop1->context().realtime_remote_time,
loop1->context().realtime_event_time);
EXPECT_EQ(loop1->context().source_boot_uuid, loop1->boot_uuid());
+ EXPECT_EQ(loop1->context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
const aos::monotonic_clock::time_point monotonic_now =
loop1->monotonic_now();
@@ -1961,6 +1975,8 @@
fetcher.context().realtime_remote_time);
EXPECT_EQ(fetcher.context().monotonic_event_time,
fetcher.context().monotonic_remote_time);
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
EXPECT_EQ(fetcher.context().source_boot_uuid, loop1->boot_uuid());
EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500))
@@ -2012,6 +2028,8 @@
EXPECT_EQ(loop1->context().monotonic_remote_time,
loop1->context().monotonic_event_time);
+ EXPECT_EQ(loop1->context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
EXPECT_EQ(loop1->context().realtime_remote_time,
loop1->context().realtime_event_time);
EXPECT_EQ(loop1->context().source_boot_uuid, loop1->boot_uuid());
@@ -2048,6 +2066,8 @@
fetcher.context().realtime_remote_time);
EXPECT_EQ(fetcher.context().monotonic_event_time,
fetcher.context().monotonic_remote_time);
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
EXPECT_EQ(fetcher.context().source_boot_uuid, loop1->boot_uuid());
EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500))
@@ -2106,6 +2126,8 @@
EXPECT_EQ(loop1->context().monotonic_remote_time,
monotonic_clock::min_time);
+ EXPECT_EQ(loop1->context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
EXPECT_EQ(loop1->context().source_boot_uuid, loop1->boot_uuid());
EXPECT_EQ(loop1->context().realtime_event_time,
realtime_clock::min_time);
@@ -3116,6 +3138,8 @@
aos::monotonic_clock::time_point(chrono::seconds(1501));
const aos::realtime_clock::time_point realtime_remote_time =
aos::realtime_clock::time_point(chrono::seconds(3132));
+ const aos::monotonic_clock::time_point monotonic_remote_transmit_time =
+ aos::monotonic_clock::time_point(chrono::seconds(1601));
const uint32_t remote_queue_index = 0x254971;
const UUID source_boot_uuid = UUID::Random();
@@ -3130,7 +3154,8 @@
loop2->OnRun([&]() {
EXPECT_EQ(sender->Send(kMessage.span().data(), kMessage.span().size(),
monotonic_remote_time, realtime_remote_time,
- remote_queue_index, source_boot_uuid),
+ monotonic_remote_transmit_time, remote_queue_index,
+ source_boot_uuid),
RawSender::Error::kOk);
});
@@ -3138,9 +3163,9 @@
loop2->MakeRawWatcher(
configuration::GetChannel(loop2->configuration(), "/test",
"aos.TestMessage", "", nullptr),
- [this, monotonic_remote_time, realtime_remote_time, source_boot_uuid,
- remote_queue_index, &fetcher,
- &happened](const Context &context, const void * /*message*/) {
+ [this, monotonic_remote_time, realtime_remote_time,
+ monotonic_remote_transmit_time, source_boot_uuid, remote_queue_index,
+ &fetcher, &happened](const Context &context, const void * /*message*/) {
happened = true;
EXPECT_EQ(monotonic_remote_time, context.monotonic_remote_time);
EXPECT_EQ(realtime_remote_time, context.realtime_remote_time);
@@ -3152,6 +3177,8 @@
fetcher->context().monotonic_remote_time);
EXPECT_EQ(realtime_remote_time,
fetcher->context().realtime_remote_time);
+ EXPECT_EQ(monotonic_remote_transmit_time,
+ fetcher->context().monotonic_remote_transmit_time);
this->Exit();
});
@@ -3159,6 +3186,19 @@
EXPECT_FALSE(happened);
Run();
EXPECT_TRUE(happened);
+
+ // Confirm everything goes back.
+ EXPECT_EQ(loop2->context().monotonic_event_time, monotonic_clock::min_time);
+ EXPECT_EQ(loop2->context().monotonic_remote_time, monotonic_clock::min_time);
+ EXPECT_EQ(loop2->context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
+ EXPECT_EQ(loop2->context().realtime_event_time, realtime_clock::min_time);
+ EXPECT_EQ(loop2->context().realtime_remote_time, realtime_clock::min_time);
+ EXPECT_EQ(loop2->context().source_boot_uuid, loop2->boot_uuid());
+ EXPECT_EQ(loop2->context().queue_index, 0xffffffffu);
+ EXPECT_EQ(loop2->context().size, 0u);
+ EXPECT_EQ(loop2->context().data, nullptr);
+ EXPECT_EQ(loop2->context().buffer_index, -1);
}
// Tests that a raw sender fills out sent data.
@@ -3406,6 +3446,8 @@
EXPECT_EQ(loop->context().monotonic_event_time, monotonic_clock::min_time);
EXPECT_EQ(loop->context().monotonic_remote_time, monotonic_clock::min_time);
+ EXPECT_EQ(loop->context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
EXPECT_EQ(loop->context().realtime_event_time, realtime_clock::min_time);
EXPECT_EQ(loop->context().realtime_remote_time, realtime_clock::min_time);
EXPECT_EQ(loop->context().source_boot_uuid, loop->boot_uuid());
@@ -3423,6 +3465,8 @@
monotonic_event_time_on_run = loop->context().monotonic_event_time;
EXPECT_LE(monotonic_event_time_on_run, loop->monotonic_now());
EXPECT_EQ(loop->context().monotonic_remote_time, monotonic_clock::min_time);
+ EXPECT_EQ(loop->context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
EXPECT_EQ(loop->context().realtime_event_time, realtime_clock::min_time);
EXPECT_EQ(loop->context().realtime_remote_time, realtime_clock::min_time);
EXPECT_EQ(loop->context().source_boot_uuid, loop->boot_uuid());
@@ -3442,6 +3486,8 @@
EXPECT_EQ(loop->context().monotonic_event_time, monotonic_clock::min_time);
EXPECT_EQ(loop->context().monotonic_remote_time, monotonic_clock::min_time);
+ EXPECT_EQ(loop->context().monotonic_remote_transmit_time,
+ monotonic_clock::min_time);
EXPECT_EQ(loop->context().realtime_event_time, realtime_clock::min_time);
EXPECT_EQ(loop->context().realtime_remote_time, realtime_clock::min_time);
EXPECT_EQ(loop->context().source_boot_uuid, loop->boot_uuid());
diff --git a/aos/events/event_loop_runtime.h b/aos/events/event_loop_runtime.h
index 0505852..afc96e2 100644
--- a/aos/events/event_loop_runtime.h
+++ b/aos/events/event_loop_runtime.h
@@ -25,6 +25,7 @@
int64_t monotonic_remote_time;
int64_t realtime_remote_time;
+ int64_t monotonic_remote_transmit_time;
uint32_t queue_index;
uint32_t remote_queue_index;
@@ -48,6 +49,8 @@
offsetof(RustContext, monotonic_remote_time));
static_assert(offsetof(Context, realtime_remote_time) ==
offsetof(RustContext, realtime_remote_time));
+static_assert(offsetof(Context, monotonic_remote_transmit_time) ==
+ offsetof(RustContext, monotonic_remote_transmit_time));
static_assert(offsetof(Context, queue_index) ==
offsetof(RustContext, queue_index));
static_assert(offsetof(Context, remote_queue_index) ==
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index cc8e5c3..3f41ec1 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -197,15 +197,17 @@
inline RawSender::Error RawSender::Send(size_t size) {
return Send(size, monotonic_clock::min_time, realtime_clock::min_time,
- 0xffffffffu, event_loop_->boot_uuid());
+ monotonic_clock::min_time, 0xffffffffu, event_loop_->boot_uuid());
}
inline RawSender::Error RawSender::Send(
size_t size, aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
+ aos::monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index, const UUID &uuid) {
- const auto err = DoSend(size, monotonic_remote_time, realtime_remote_time,
- remote_queue_index, uuid);
+ const auto err =
+ DoSend(size, monotonic_remote_time, realtime_remote_time,
+ monotonic_remote_transmit_time, remote_queue_index, uuid);
RecordSendResult(err, size);
if (err == Error::kOk) {
ftrace_.FormatMessage(
@@ -219,16 +221,18 @@
inline RawSender::Error RawSender::Send(const void *data, size_t size) {
return Send(data, size, monotonic_clock::min_time, realtime_clock::min_time,
- 0xffffffffu, event_loop_->boot_uuid());
+ monotonic_clock::min_time, 0xffffffffu, event_loop_->boot_uuid());
}
inline RawSender::Error RawSender::Send(
const void *data, size_t size,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
+ aos::monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index, const UUID &uuid) {
- const auto err = DoSend(data, size, monotonic_remote_time,
- realtime_remote_time, remote_queue_index, uuid);
+ const auto err =
+ DoSend(data, size, monotonic_remote_time, realtime_remote_time,
+ monotonic_remote_transmit_time, remote_queue_index, uuid);
RecordSendResult(err, size);
if (err == RawSender::Error::kOk) {
ftrace_.FormatMessage(
@@ -242,17 +246,20 @@
inline RawSender::Error RawSender::Send(const SharedSpan data) {
return Send(std::move(data), monotonic_clock::min_time,
- realtime_clock::min_time, 0xffffffffu, event_loop_->boot_uuid());
+ realtime_clock::min_time, monotonic_clock::min_time, 0xffffffffu,
+ event_loop_->boot_uuid());
}
inline RawSender::Error RawSender::Send(
const SharedSpan data,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
+ aos::monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index, const UUID &uuid) {
const size_t size = data->size();
- const auto err = DoSend(std::move(data), monotonic_remote_time,
- realtime_remote_time, remote_queue_index, uuid);
+ const auto err =
+ DoSend(std::move(data), monotonic_remote_time, realtime_remote_time,
+ monotonic_remote_transmit_time, remote_queue_index, uuid);
RecordSendResult(err, size);
if (err == Error::kOk) {
ftrace_.FormatMessage(
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 3c46de9..7e448fe 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -1521,7 +1521,8 @@
const RawSender::Error err = sender->Send(
SharedSpan(timestamped_message.data, ×tamped_message.data->span),
timestamped_message.monotonic_remote_time.time,
- timestamped_message.realtime_remote_time, remote_queue_index,
+ timestamped_message.realtime_remote_time, monotonic_clock::min_time,
+ remote_queue_index,
(channel_source_state_[timestamped_message.channel_index] != nullptr
? CHECK_NOTNULL(multinode_filters_)
->boot_uuid(configuration::GetNodeIndex(
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index a3cda43..5dddc60 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -490,11 +490,18 @@
EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
pi2_ping_count * chrono::milliseconds(10) +
realtime_clock::epoch());
- EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
- chrono::microseconds(150),
+ // The message at the start of each second doesn't have wakeup latency
+ // since timing reports and server statistics wake us up already at that
+ // point in time.
+ chrono::nanoseconds offset = chrono::microseconds(150);
+ if (pi2_event_loop->context().monotonic_remote_time.time_since_epoch() %
+ chrono::seconds(1) ==
+ chrono::seconds(0)) {
+ offset = chrono::microseconds(100);
+ }
+ EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time + offset,
pi2_event_loop->context().monotonic_event_time);
- EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
- chrono::microseconds(150),
+ EXPECT_EQ(pi2_event_loop->context().realtime_remote_time + offset,
pi2_event_loop->context().realtime_event_time);
++pi2_ping_count;
});
@@ -502,63 +509,76 @@
constexpr ssize_t kQueueIndexOffset = -9;
// Confirm that the ping and pong counts both match, and the value also
// matches.
- pi1_event_loop->MakeWatcher(
- "/test", [&pi1_event_loop, &pi1_ping_count,
- &pi1_pong_count](const examples::Pong &pong) {
- VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
- << pi1_event_loop->context().monotonic_remote_time << " -> "
- << pi1_event_loop->context().monotonic_event_time;
+ pi1_event_loop->MakeWatcher("/test", [&pi1_event_loop, &pi1_ping_count,
+ &pi1_pong_count](
+ const examples::Pong &pong) {
+ VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
+ << pi1_event_loop->context().monotonic_remote_time << " -> "
+ << pi1_event_loop->context().monotonic_event_time;
- EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
- pi1_pong_count + kQueueIndexOffset);
- EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
- chrono::microseconds(200) +
- pi1_pong_count * chrono::milliseconds(10) +
- monotonic_clock::epoch());
- EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
- chrono::microseconds(200) +
- pi1_pong_count * chrono::milliseconds(10) +
- realtime_clock::epoch());
+ EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
+ pi1_pong_count + kQueueIndexOffset);
- EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
- chrono::microseconds(150),
- pi1_event_loop->context().monotonic_event_time);
- EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
- chrono::microseconds(150),
- pi1_event_loop->context().realtime_event_time);
+ chrono::nanoseconds offset = chrono::microseconds(200);
+ if ((pi1_event_loop->context().monotonic_remote_time.time_since_epoch() -
+ chrono::microseconds(150)) %
+ chrono::seconds(1) ==
+ chrono::seconds(0)) {
+ offset = chrono::microseconds(150);
+ }
- EXPECT_EQ(pong.value(), pi1_pong_count + 1);
- ++pi1_pong_count;
- EXPECT_EQ(pi1_ping_count, pi1_pong_count);
- });
- pi2_event_loop->MakeWatcher(
- "/test", [&pi2_event_loop, &pi2_ping_count,
- &pi2_pong_count](const examples::Pong &pong) {
- VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
- << pi2_event_loop->context().monotonic_remote_time << " -> "
- << pi2_event_loop->context().monotonic_event_time;
+ EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
+ offset + pi1_pong_count * chrono::milliseconds(10) +
+ monotonic_clock::epoch());
+ EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
+ offset + pi1_pong_count * chrono::milliseconds(10) +
+ realtime_clock::epoch());
- EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
- pi2_pong_count + kQueueIndexOffset);
+ EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
+ chrono::microseconds(150),
+ pi1_event_loop->context().monotonic_event_time);
+ EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
+ chrono::microseconds(150),
+ pi1_event_loop->context().realtime_event_time);
- EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
- chrono::microseconds(200) +
- pi2_pong_count * chrono::milliseconds(10) +
- monotonic_clock::epoch());
- EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
- chrono::microseconds(200) +
- pi2_pong_count * chrono::milliseconds(10) +
- realtime_clock::epoch());
+ EXPECT_EQ(pong.value(), pi1_pong_count + 1);
+ ++pi1_pong_count;
+ EXPECT_EQ(pi1_ping_count, pi1_pong_count);
+ });
+ pi2_event_loop->MakeWatcher("/test", [&pi2_event_loop, &pi2_ping_count,
+ &pi2_pong_count](
+ const examples::Pong &pong) {
+ VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
+ << pi2_event_loop->context().monotonic_remote_time << " -> "
+ << pi2_event_loop->context().monotonic_event_time;
- EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
- pi2_event_loop->context().monotonic_event_time);
- EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
- pi2_event_loop->context().realtime_event_time);
+ EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
+ pi2_pong_count + kQueueIndexOffset);
- EXPECT_EQ(pong.value(), pi2_pong_count + 1);
- ++pi2_pong_count;
- EXPECT_EQ(pi2_ping_count, pi2_pong_count);
- });
+ chrono::nanoseconds offset = chrono::microseconds(200);
+ if ((pi2_event_loop->context().monotonic_remote_time.time_since_epoch() -
+ chrono::microseconds(150)) %
+ chrono::seconds(1) ==
+ chrono::seconds(0)) {
+ offset = chrono::microseconds(150);
+ }
+
+ EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
+ offset + pi2_pong_count * chrono::milliseconds(10) +
+ monotonic_clock::epoch());
+ EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
+ offset + pi2_pong_count * chrono::milliseconds(10) +
+ realtime_clock::epoch());
+
+ EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
+ pi2_event_loop->context().monotonic_event_time);
+ EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
+ pi2_event_loop->context().realtime_event_time);
+
+ EXPECT_EQ(pong.value(), pi2_pong_count + 1);
+ ++pi2_pong_count;
+ EXPECT_EQ(pi2_ping_count, pi2_pong_count);
+ });
log_reader_factory.Run();
EXPECT_EQ(pi1_ping_count, 2010);
@@ -1908,11 +1928,28 @@
ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
+ // Timestamps don't have wakeup delay, so they show back up after 2
+ // times the network delay on the source node. Confirm that matches
+ // when we are reading the log.
+ EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
+ pi1_context->monotonic_event_time + 2 * network_delay);
} else if (header.channel_index() == ping_timestamp_channel) {
ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
pi1_context = &ping_on_pi1_fetcher.context();
pi2_context = &ping_on_pi2_fetcher.context();
+ // Ping messages get picked up faster at the start of each message
+ // when timers wake up. Verify all that behavior matches exactly as
+ // expected when reading the log.
+ EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
+ pi1_context->monotonic_event_time + 2 * network_delay +
+ ((pi1_event_loop->context().monotonic_event_time -
+ 2 * network_delay)
+ .time_since_epoch() %
+ chrono::nanoseconds(1000000000) ==
+ chrono::nanoseconds(0)
+ ? chrono::nanoseconds(0)
+ : send_delay));
} else {
LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
<< configuration::CleanedChannelToString(
@@ -1942,13 +1979,6 @@
header_realtime_remote_time);
EXPECT_EQ(pi1_context->monotonic_event_time,
header_monotonic_remote_time);
-
- // Time estimation isn't perfect, but we know the clocks were
- // identical when logged, so we know when this should have come back.
- // Confirm we got it when we expected.
- EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
- pi1_context->monotonic_event_time + 2 * network_delay +
- send_delay);
});
}
for (std::pair<int, std::string> channel :
@@ -1988,11 +2018,20 @@
ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
+ // Again, timestamps don't have wakeup delay, so they show back up
+ // after 2 times the network delay on the source node.
+ EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
+ pi2_context->monotonic_event_time + 2 * network_delay);
} else if (header.channel_index() == pong_timestamp_channel) {
ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
pi2_context = &pong_on_pi2_fetcher.context();
pi1_context = &pong_on_pi1_fetcher.context();
+ // And Pong messages come back repeatably since they aren't at the
+ // start of a second.
+ EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
+ pi2_context->monotonic_event_time + 2 * network_delay +
+ send_delay);
} else {
LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
<< configuration::CleanedChannelToString(
@@ -2022,13 +2061,6 @@
header_realtime_remote_time);
EXPECT_EQ(pi2_context->monotonic_event_time,
header_monotonic_remote_time);
-
- // Time estimation isn't perfect, but we know the clocks were
- // identical when logged, so we know when this should have come back.
- // Confirm we got it when we expected.
- EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
- pi2_context->monotonic_event_time + 2 * network_delay +
- send_delay);
});
}
@@ -2266,7 +2298,7 @@
case 3:
EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
- chrono::nanoseconds(2322999462))
+ chrono::nanoseconds(2323000000))
<< " on " << file;
break;
default:
@@ -2293,7 +2325,7 @@
case 5:
EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
- chrono::nanoseconds(2322999462))
+ chrono::nanoseconds(2323000000))
<< " on " << file;
break;
default:
@@ -2439,7 +2471,7 @@
monotonic_clock::time_point(chrono::microseconds(100000)))
<< file;
EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(100150)))
+ monotonic_clock::time_point(chrono::microseconds(100100)))
<< file;
break;
case 2:
@@ -2479,7 +2511,7 @@
monotonic_clock::time_point(chrono::microseconds(1423000)))
<< file;
EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
- monotonic_clock::time_point(chrono::microseconds(10200150)))
+ monotonic_clock::time_point(chrono::microseconds(10200100)))
<< file;
break;
default:
@@ -3427,7 +3459,7 @@
chrono::seconds(1)));
EXPECT_THAT(result[2].second,
::testing::ElementsAre(realtime_clock::epoch() +
- chrono::microseconds(34900150)));
+ chrono::microseconds(34900100)));
}
// Tests that local data before remote data after reboot is properly replayed.
@@ -3579,7 +3611,7 @@
EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
EXPECT_THAT(result[0].second,
::testing::ElementsAre(realtime_clock::epoch() +
- chrono::microseconds(11000350)));
+ chrono::microseconds(11000300)));
EXPECT_THAT(result[1].first,
::testing::ElementsAre(
@@ -3587,13 +3619,13 @@
realtime_clock::epoch() + chrono::microseconds(107005000)));
EXPECT_THAT(result[1].second,
::testing::ElementsAre(
- realtime_clock::epoch() + chrono::microseconds(4000150),
- realtime_clock::epoch() + chrono::microseconds(111000200)));
+ realtime_clock::epoch() + chrono::microseconds(4000100),
+ realtime_clock::epoch() + chrono::microseconds(111000150)));
EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
EXPECT_THAT(result[2].second,
::testing::ElementsAre(realtime_clock::epoch() +
- chrono::microseconds(11000150)));
+ chrono::microseconds(11000100)));
auto start_stop_result = ConfirmReadable(
filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
@@ -3737,7 +3769,7 @@
EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
EXPECT_THAT(result[0].second,
::testing::ElementsAre(realtime_clock::epoch() +
- chrono::microseconds(11000350)));
+ chrono::microseconds(11000300)));
EXPECT_THAT(result[1].first,
::testing::ElementsAre(
@@ -3745,13 +3777,13 @@
realtime_clock::epoch() + chrono::microseconds(6005000)));
EXPECT_THAT(result[1].second,
::testing::ElementsAre(
- realtime_clock::epoch() + chrono::microseconds(4900150),
- realtime_clock::epoch() + chrono::microseconds(11000200)));
+ realtime_clock::epoch() + chrono::microseconds(4900100),
+ realtime_clock::epoch() + chrono::microseconds(11000150)));
EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
EXPECT_THAT(result[2].second,
::testing::ElementsAre(realtime_clock::epoch() +
- chrono::microseconds(11000150)));
+ chrono::microseconds(11000100)));
// Confirm we observed the correct start and stop times. We should see the
// reboot here.
@@ -3771,7 +3803,7 @@
realtime_clock::epoch() + chrono::microseconds(6005000)));
EXPECT_THAT(start_stop_result[1].second,
::testing::ElementsAre(
- realtime_clock::epoch() + chrono::microseconds(4900150),
+ realtime_clock::epoch() + chrono::microseconds(4900100),
realtime_clock::epoch() + chrono::seconds(8)));
EXPECT_THAT(
start_stop_result[2].first,
diff --git a/aos/events/logging/multinode_logger_test_lib.h b/aos/events/logging/multinode_logger_test_lib.h
index 8f64f66..adc41a3 100644
--- a/aos/events/logging/multinode_logger_test_lib.h
+++ b/aos/events/logging/multinode_logger_test_lib.h
@@ -76,13 +76,13 @@
};
constexpr std::string_view kCombinedConfigSha1() {
- return "71eb8341221fbabefb4ddde43bcebf794fd5855e3ad77786a1db0f9e27a39091";
+ return "adf6a65be9b9a00d85ad1db4c78495e46d9c35b883ef95581c46222b2624d79d";
}
constexpr std::string_view kSplitConfigSha1() {
- return "f61d45dc0bda026e852e2da9b3e5c2c7f1c89c9f7958cfba3d02e2c960416f04";
+ return "7998834e993bcf000c8f03f6fcc5cc63650fdbd1f42ff0a2d2bdbbf1182e3104";
}
constexpr std::string_view kReloggedSplitConfigSha1() {
- return "3d8fd3d13955b517ee3d66a50b5e4dd7a13fd648f469d16910990418bcfc6beb";
+ return "5e800fdbaf6a088f33d5df42a58d803a33caa33eea269fef9e390b0306e9c11e";
}
LoggerState MakeLoggerState(NodeEventLoopFactory *node,
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 7f06ee0..9d010b7 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -237,6 +237,7 @@
ipc_lib::LocklessQueueReader::Result read_result = reader_.Read(
queue_index.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
+ &context_.monotonic_remote_transmit_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
&context_.source_boot_uuid, &context_.size, copy_buffer, std::move(fn));
@@ -457,6 +458,7 @@
Error DoSend(size_t length,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
+ aos::monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index,
const UUID &source_boot_uuid) override {
shm_event_loop()->CheckCurrentThread();
@@ -464,9 +466,9 @@
<< ": Sent too big a message on "
<< configuration::CleanedChannelToString(channel());
const auto result = lockless_queue_sender_.Send(
- length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
- source_boot_uuid, &monotonic_sent_time_, &realtime_sent_time_,
- &sent_queue_index_);
+ length, monotonic_remote_time, realtime_remote_time,
+ monotonic_remote_transmit_time, remote_queue_index, source_boot_uuid,
+ &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
CHECK_NE(result, ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE)
<< ": Somebody wrote outside the buffer of their message on channel "
<< configuration::CleanedChannelToString(channel());
@@ -480,6 +482,7 @@
Error DoSend(const void *msg, size_t length,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
+ aos::monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index,
const UUID &source_boot_uuid) override {
shm_event_loop()->CheckCurrentThread();
@@ -488,8 +491,9 @@
<< configuration::CleanedChannelToString(channel());
const auto result = lockless_queue_sender_.Send(
reinterpret_cast<const char *>(msg), length, monotonic_remote_time,
- realtime_remote_time, remote_queue_index, source_boot_uuid,
- &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
+ realtime_remote_time, monotonic_remote_transmit_time,
+ remote_queue_index, source_boot_uuid, &monotonic_sent_time_,
+ &realtime_sent_time_, &sent_queue_index_);
CHECK_NE(result, ipc_lib::LocklessQueueSender::Result::INVALID_REDZONE)
<< ": Somebody wrote outside the buffer of their message on "
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index 4db0c7d..bae834a 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -25,12 +25,12 @@
}
// Clean up anything left there before.
- unlink((FLAGS_shm_base + "/test/aos.TestMessage.v6").c_str());
- unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v6").c_str());
- unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v6").c_str());
- unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v6").c_str());
- unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v6").c_str());
- unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v6").c_str());
+ unlink((FLAGS_shm_base + "/test/aos.TestMessage.v7").c_str());
+ unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v7").c_str());
+ unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v7").c_str());
+ unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v7").c_str());
+ unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v7").c_str());
+ unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v7").c_str());
}
~ShmEventLoopTestFactory() { FLAGS_override_hostname = ""; }
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 5cb67b4..4148738 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -356,18 +356,21 @@
Error DoSend(size_t length, monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index,
const UUID &source_boot_uuid) override;
Error DoSend(const void *msg, size_t size,
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index,
const UUID &source_boot_uuid) override;
Error DoSend(const SharedSpan data,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index,
const UUID &source_boot_uuid) override;
@@ -857,8 +860,13 @@
->emplace(SimpleChannel(channel),
std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
channel,
+ // There are a lot of tests which assume that 100 hz
+ // messages can actually be sent out at 100 hz and
+ // forwarded. The jitter in wakeups causes small
+ // variation in timing. Ignore that.
configuration::ChannelStorageDuration(
- configuration(), channel),
+ configuration(), channel) -
+ send_delay(),
scheduler_)))
.first;
}
@@ -1038,7 +1046,7 @@
// Remove times that are greater than or equal to a channel_storage_duration_
// ago
while (!last_times_.empty() &&
- (now - last_times_.front() >= channel_storage_duration_)) {
+ (now >= channel_storage_duration_ + last_times_.front())) {
last_times_.pop();
}
@@ -1101,6 +1109,7 @@
RawSender::Error SimulatedSender::DoSend(
size_t length, monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index, const UUID &source_boot_uuid) {
// The allocations in here are due to infrastructure and don't count in the
// no mallocs in RT code.
@@ -1120,6 +1129,8 @@
message_->context.realtime_event_time = simulated_event_loop_->realtime_now();
message_->context.realtime_remote_time = realtime_remote_time;
message_->context.source_boot_uuid = source_boot_uuid;
+ message_->context.monotonic_remote_transmit_time =
+ monotonic_remote_transmit_time;
CHECK_LE(length, message_->context.size);
message_->context.size = length;
@@ -1131,19 +1142,14 @@
VLOG(1) << simulated_event_loop_->distributed_now() << " "
<< NodeName(simulated_event_loop_->node())
<< simulated_event_loop_->monotonic_now() << " "
- << simulated_event_loop_->name()
- << "\nMessages were sent too fast:\n"
- << "For channel: "
- << configuration::CleanedChannelToString(
- simulated_channel_->channel())
- << '\n'
- << "Tried to send more than " << simulated_channel_->queue_size()
+ << simulated_event_loop_->name() << " -> SentTooFast "
+ << configuration::StrippedChannelToString(channel())
+ << ", Tried to send more than " << simulated_channel_->queue_size()
<< " (queue size) messages in the last "
<< std::chrono::duration<double>(
simulated_channel_->channel_storage_duration())
.count()
- << " seconds (channel storage duration)"
- << "\n\n";
+ << " seconds (channel storage duration)";
return Error::kMessagesSentTooFast;
}
@@ -1162,6 +1168,7 @@
const void *msg, size_t size,
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index, const UUID &source_boot_uuid) {
CHECK_LE(size, this->size())
<< ": Attempting to send too big a message on "
@@ -1176,12 +1183,14 @@
memcpy(mutable_span.data(), msg, size);
return DoSend(size, monotonic_remote_time, realtime_remote_time,
- remote_queue_index, source_boot_uuid);
+ monotonic_remote_transmit_time, remote_queue_index,
+ source_boot_uuid);
}
RawSender::Error SimulatedSender::DoSend(
const SharedSpan data, monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index, const UUID &source_boot_uuid) {
CHECK_LE(data->size(), this->size())
<< ": Attempting to send too big a message on "
@@ -1192,7 +1201,8 @@
message_ = SimulatedMessage::Make(simulated_channel_, data);
return DoSend(data->size(), monotonic_remote_time, realtime_remote_time,
- remote_queue_index, source_boot_uuid);
+ monotonic_remote_transmit_time, remote_queue_index,
+ source_boot_uuid);
}
SimulatedTimerHandler::SimulatedTimerHandler(
@@ -1604,6 +1614,8 @@
result->SkipTimingReport();
}
+ // TODO(austin): You shouldn't be able to make an event loop before t=0...
+
VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
<< monotonic_now() << " MakeEventLoop(\"" << result->name() << "\")";
return result;
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 050c9a3..b717ea3 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -715,7 +715,7 @@
EXPECT_EQ(connection->partial_deliveries(), 0);
EXPECT_TRUE(connection->has_monotonic_offset());
- EXPECT_EQ(connection->monotonic_offset(), 150000);
+ EXPECT_EQ(connection->monotonic_offset(), 100000);
EXPECT_EQ(connection->connection_count(), 1u);
EXPECT_EQ(connection->connected_since_time(), 0);
}
@@ -735,7 +735,7 @@
EXPECT_GT(connection->received_packets(), 50);
EXPECT_EQ(connection->partial_deliveries(), 0);
EXPECT_TRUE(connection->has_monotonic_offset());
- EXPECT_EQ(connection->monotonic_offset(), 150000);
+ EXPECT_EQ(connection->monotonic_offset(), 100000);
EXPECT_EQ(connection->connection_count(), 1u);
EXPECT_EQ(connection->connected_since_time(), 0);
++pi2_client_statistics_count;
@@ -754,7 +754,7 @@
EXPECT_GE(connection->received_packets(), 5);
EXPECT_EQ(connection->partial_deliveries(), 0);
EXPECT_TRUE(connection->has_monotonic_offset());
- EXPECT_EQ(connection->monotonic_offset(), 150000);
+ EXPECT_EQ(connection->monotonic_offset(), 100000);
EXPECT_EQ(connection->connection_count(), 1u);
EXPECT_EQ(connection->connected_since_time(), 0);
++pi3_client_statistics_count;
@@ -798,8 +798,10 @@
[pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
&ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
&pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
- channel_index = channel.first](const RemoteMessage &header) {
- VLOG(1) << aos::FlatbufferToJson(&header);
+ channel_index = channel.first,
+ channel_name = channel.second](const RemoteMessage &header) {
+ VLOG(1) << channel_name << " aos::message_bridge::RemoteMessage -> "
+ << aos::FlatbufferToJson(&header);
EXPECT_TRUE(header.has_boot_uuid());
EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
@@ -811,6 +813,9 @@
chrono::nanoseconds(header.realtime_sent_time()));
const aos::monotonic_clock::time_point header_monotonic_remote_time(
chrono::nanoseconds(header.monotonic_remote_time()));
+ const aos::monotonic_clock::time_point
+ header_monotonic_remote_transmit_time(
+ chrono::nanoseconds(header.monotonic_remote_transmit_time()));
const aos::realtime_clock::time_point header_realtime_remote_time(
chrono::nanoseconds(header.realtime_remote_time()));
@@ -836,6 +841,9 @@
pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
+
+ EXPECT_EQ(header_monotonic_remote_transmit_time,
+ pi2_context->monotonic_remote_time);
} else if (header.channel_index() == ping_timestamp_channel) {
// Find the forwarded message.
while (ping_on_pi2_fetcher.context().monotonic_event_time <
@@ -851,6 +859,10 @@
pi1_context = &ping_on_pi1_fetcher.context();
pi2_context = &ping_on_pi2_fetcher.context();
+
+ EXPECT_EQ(header_monotonic_remote_transmit_time,
+ pi2_context->monotonic_event_time -
+ simulated_event_loop_factory.network_delay());
} else {
LOG(FATAL) << "Unknown channel";
}
@@ -868,6 +880,8 @@
header_realtime_remote_time);
EXPECT_EQ(pi2_context->monotonic_remote_time,
header_monotonic_remote_time);
+ EXPECT_EQ(pi2_context->monotonic_remote_transmit_time,
+ header_monotonic_remote_transmit_time);
// Confirm the forwarded message also matches the source message.
EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
@@ -2195,7 +2209,15 @@
::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
aos::Fetcher<examples::Ping> fetcher =
ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
- EXPECT_TRUE(fetcher.Fetch());
+ ASSERT_TRUE(fetcher.Fetch());
+ EXPECT_EQ(fetcher.context().monotonic_remote_time,
+ monotonic_clock::epoch());
+ // Message bridge picks up the Ping message immediately on reboot.
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::epoch());
+ EXPECT_EQ(fetcher.context().monotonic_event_time,
+ monotonic_clock::epoch() + factory.network_delay());
+ ASSERT_FALSE(fetcher.Fetch());
}
factory.RunFor(chrono::seconds(1));
@@ -2204,7 +2226,15 @@
::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
aos::Fetcher<examples::Ping> fetcher =
ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
- EXPECT_TRUE(fetcher.Fetch());
+ ASSERT_TRUE(fetcher.Fetch());
+ EXPECT_EQ(fetcher.context().monotonic_remote_time,
+ monotonic_clock::epoch());
+ // Message bridge picks up the Ping message immediately on reboot.
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::epoch() + chrono::seconds(1));
+ EXPECT_EQ(fetcher.context().monotonic_event_time,
+ monotonic_clock::epoch() + factory.network_delay());
+ ASSERT_FALSE(fetcher.Fetch());
}
EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
}
@@ -2260,6 +2290,8 @@
monotonic_clock::epoch() + factory.network_delay());
EXPECT_EQ(fetcher.context().monotonic_remote_time,
monotonic_clock::epoch());
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::epoch() + chrono::seconds(1));
}
{
::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
@@ -2271,6 +2303,8 @@
factory.network_delay());
EXPECT_EQ(fetcher.context().monotonic_remote_time,
monotonic_clock::epoch() - std::chrono::seconds(1));
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::epoch());
}
}
@@ -2461,4 +2495,152 @@
}
}
+// Tests that rapidly sent messages get timestamped correctly.
+TEST(SimulatedEventLoopTest, TransmitTimestamps) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+ message_bridge::TestingTimeConverter time(
+ configuration::NodesCount(&config.message()));
+ SimulatedEventLoopFactory factory(&config.message());
+ factory.SetTimeConverter(&time);
+ time.StartEqual();
+
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+ NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+ ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
+ aos::Fetcher<examples::Ping> fetcher =
+ ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
+ EXPECT_FALSE(fetcher.Fetch());
+
+ {
+ ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+ aos::Sender<examples::Ping> test_message_sender =
+ ping_event_loop->MakeSender<examples::Ping>("/reliable");
+ for (const std::chrono::nanoseconds dt :
+ {chrono::microseconds(5000), chrono::microseconds(1),
+ chrono::microseconds(2), chrono::microseconds(70),
+ chrono::microseconds(63)}) {
+ factory.RunFor(dt);
+ SendPing(&test_message_sender, 1);
+ }
+
+ factory.RunFor(chrono::milliseconds(10));
+ }
+
+ ASSERT_TRUE(fetcher.FetchNext());
+ EXPECT_EQ(fetcher.context().monotonic_remote_time,
+ monotonic_clock::epoch() + chrono::microseconds(5000));
+ // First message shows up after wakeup + network delay as expected.
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::epoch() + chrono::microseconds(5000) +
+ factory.send_delay());
+ EXPECT_EQ(fetcher.context().monotonic_event_time,
+ monotonic_clock::epoch() + chrono::microseconds(5000) +
+ factory.send_delay() + factory.network_delay());
+
+ ASSERT_TRUE(fetcher.FetchNext());
+ EXPECT_EQ(fetcher.context().monotonic_remote_time,
+ monotonic_clock::epoch() + chrono::microseconds(5001));
+ // Next message is close enough that it gets picked up at the same wakeup.
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::epoch() + chrono::microseconds(5000) +
+ factory.send_delay());
+ EXPECT_EQ(fetcher.context().monotonic_event_time,
+ monotonic_clock::epoch() + chrono::microseconds(5000) +
+ factory.send_delay() + factory.network_delay());
+
+ ASSERT_TRUE(fetcher.FetchNext());
+ EXPECT_EQ(fetcher.context().monotonic_remote_time,
+ monotonic_clock::epoch() + chrono::microseconds(5003));
+ // Same for the third.
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::epoch() + chrono::microseconds(5000) +
+ factory.send_delay());
+ EXPECT_EQ(fetcher.context().monotonic_event_time,
+ monotonic_clock::epoch() + chrono::microseconds(5000) +
+ factory.send_delay() + factory.network_delay());
+
+ ASSERT_TRUE(fetcher.FetchNext());
+ EXPECT_EQ(fetcher.context().monotonic_remote_time,
+ monotonic_clock::epoch() + chrono::microseconds(5073));
+ // Fourth waits long enough to do the right thing.
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::epoch() + chrono::microseconds(5073) +
+ factory.send_delay());
+ EXPECT_EQ(fetcher.context().monotonic_event_time,
+ monotonic_clock::epoch() + chrono::microseconds(5073) +
+ factory.send_delay() + factory.network_delay());
+
+ ASSERT_TRUE(fetcher.FetchNext());
+ EXPECT_EQ(fetcher.context().monotonic_remote_time,
+ monotonic_clock::epoch() + chrono::microseconds(5136));
+ // Fifth waits long enough to do the right thing as well (but kicks off while
+ // the fourth is in flight over the network).
+ EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::epoch() + chrono::microseconds(5136) +
+ factory.send_delay());
+ EXPECT_EQ(fetcher.context().monotonic_event_time,
+ monotonic_clock::epoch() + chrono::microseconds(5136) +
+ factory.send_delay() + factory.network_delay());
+
+ ASSERT_FALSE(fetcher.FetchNext());
+}
+
+// Tests that a reliable message gets forwarded if it was sent originally when
+// nodes were disconnected.
+TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageSendsOnConnect) {
+ time.StartEqual();
+ factory.SkipTimingReport();
+ factory.DisableStatistics();
+
+ NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+ NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+ // Fully disconnect the nodes.
+ pi1->Disconnect(pi2->node());
+ pi2->Disconnect(pi1->node());
+
+ std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
+
+ std::unique_ptr<aos::EventLoop> pi2_event_loop =
+ pi2->MakeEventLoop("fetcher");
+ aos::Fetcher<examples::Ping> pi2_reliable_fetcher =
+ pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
+
+ factory.RunFor(chrono::milliseconds(100));
+
+ {
+ aos::Sender<examples::Ping> pi1_reliable_sender =
+ pi1_event_loop->MakeSender<examples::Ping>("/reliable");
+ for (int i = 0; i < 100; ++i) {
+ SendPing(&pi1_reliable_sender, i);
+ factory.RunFor(chrono::milliseconds(100));
+ }
+ }
+
+ factory.RunFor(chrono::milliseconds(50));
+
+ ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
+
+ pi1->Connect(pi2->node());
+ pi2->Connect(pi1->node());
+
+ factory.RunFor(chrono::milliseconds(1));
+
+ ASSERT_TRUE(pi2_reliable_fetcher.Fetch());
+ ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_time,
+ monotonic_clock::epoch() + chrono::milliseconds(10000));
+ ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_transmit_time,
+ monotonic_clock::epoch() + chrono::milliseconds(10150));
+ ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_event_time,
+ monotonic_clock::epoch() + chrono::milliseconds(10150) +
+ factory.network_delay());
+ ASSERT_EQ(pi2_reliable_fetcher->value(), 99);
+
+ ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
+}
+
} // namespace aos::testing
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 5c97292..82ee6d1 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -1,6 +1,7 @@
#include "aos/events/simulated_network_bridge.h"
#include "absl/strings/str_cat.h"
+#include "glog/logging.h"
#include "aos/configuration.h"
#include "aos/events/event_loop.h"
@@ -42,6 +43,8 @@
MessageBridgeServerStatus *server_status,
ChannelTimestampSender *timestamp_loggers) {
sent_ = false;
+ reliable_scheduled_ = false;
+ published_ = false;
fetch_event_loop_ = fetch_event_loop;
if (fetch_event_loop_) {
fetcher_ = fetch_event_loop_->MakeRawFetcher(channel_);
@@ -50,7 +53,7 @@
}
server_status_ = server_status;
- if (server_status) {
+ if (server_status_) {
server_connection_ =
server_status_->FindServerConnection(send_node_factory_->node());
server_index_ = configuration::GetNodeIndex(
@@ -84,7 +87,6 @@
void SetSendEventLoop(aos::EventLoop *send_event_loop,
MessageBridgeClientStatus *client_status) {
- sent_ = false;
send_event_loop_ = send_event_loop;
if (send_event_loop_ && !forwarding_disabled_) {
sender_ = send_event_loop_->MakeRawSender(channel_);
@@ -126,14 +128,43 @@
->time_to_live();
}
+ std::string Name() {
+ std::string result;
+ result +=
+ (fetch_event_loop_ ? fetch_event_loop_->node()->name()->string_view()
+ : std::string_view("?"));
+ result += " -> ";
+ result +=
+ (send_event_loop_ ? send_event_loop_->node()->name()->string_view()
+ : std::string_view("?"));
+ result += " ";
+ result += aos::configuration::StrippedChannelToString(channel());
+ return result;
+ }
+
void ScheduleReliable() {
- if (forwarding_disabled()) return;
+ if (forwarding_disabled()) {
+ return;
+ }
if (!fetcher_) {
return;
}
if (fetcher_->context().data == nullptr || sent_) {
- sent_ = !fetcher_->Fetch();
+ fetcher_->Fetch();
+ sent_ = fetcher_->context().data == nullptr;
+ published_ = sent_;
+ reliable_scheduled_ = true;
+ }
+
+ if (!timer_) {
+ return;
+ }
+
+ if (server_connection_->state() != State::CONNECTED) {
+ reliable_scheduled_ = false;
+ sent_ = true;
+ return;
}
FetchNext();
@@ -154,17 +185,87 @@
<< " at " << fetch_node_factory_->monotonic_now();
if (timer_) {
- server_status_->AddSentPacket(server_index_, channel_);
- timer_->Schedule(monotonic_delivered_time);
- timer_scheduled_ = true;
+ if (!timer_scheduled_) {
+ server_status_->AddSentPacket(server_index_, channel_);
+ timer_->Schedule(monotonic_delivered_time);
+ timer_scheduled_ = true;
+
+ QueueTransmitTimestamp(fetcher_->context().queue_index,
+ fetcher_->context().monotonic_event_time,
+ fetch_event_loop_->monotonic_now());
+ }
} else {
+ // TODO(austin): When do we hit this? Can we add a test to make sure this
+ // is right?
server_status_->AddDroppedPacket(server_index_, channel_);
sent_ = true;
+ reliable_scheduled_ = false;
+ published_ = false;
}
}
bool timer_scheduled_ = false;
+ void MessageWatcherCallback(uint32_t sent_queue_index,
+ monotonic_clock::time_point monotonic_sent_time,
+ monotonic_clock::time_point transmit_time) {
+ if (!reliable_scheduled_) {
+ QueueTransmitTimestamp(sent_queue_index, monotonic_sent_time,
+ transmit_time);
+ } else {
+ reliable_scheduled_ = false;
+ }
+ Schedule();
+ }
+
+ void QueueTransmitTimestamp(uint32_t sent_queue_index,
+ monotonic_clock::time_point monotonic_sent_time,
+ monotonic_clock::time_point transmit_time) {
+ if (forwarding_disabled()) return;
+
+ if (monotonic_remote_transmit_times_.size() > 0u) {
+ // FetchNext can discover messages before we do in the same nanosecond. In
+ // that case, make sure the contents match and don't add it a second time.
+ auto back = monotonic_remote_transmit_times_
+ [monotonic_remote_transmit_times_.size() - 1];
+ if (back.sent_queue_index == sent_queue_index) {
+ CHECK_EQ(back.monotonic_sent_time, monotonic_sent_time) << this;
+ CHECK_EQ(back.transmit_time, transmit_time) << this;
+ return;
+ }
+ }
+
+ // Capture the time this message was published over the network on the
+ // remote node
+ monotonic_remote_transmit_times_.push_back(TransmitTime{
+ .monotonic_sent_time = monotonic_sent_time,
+ .sent_queue_index = sent_queue_index,
+ .transmit_time = transmit_time,
+ });
+ }
+
+ void Connect() {
+ if (time_to_live() == 0 && published_ == false) {
+ if (forwarding_disabled()) {
+ return;
+ }
+ CHECK(fetcher_);
+
+ fetcher_->Fetch();
+ sent_ = fetcher_->context().data == nullptr;
+ reliable_scheduled_ = true;
+
+ QueueTransmitTimestamp(fetcher_->context().queue_index,
+ fetcher_->context().monotonic_event_time,
+ fetch_event_loop_->monotonic_now());
+ Schedule();
+ }
+ }
+
+ bool SendingTo(const Node *destination) {
+ return send_event_loop_ && send_event_loop_->node() == destination;
+ }
+
// Kicks us to re-fetch and schedule the timer.
void Schedule() {
CHECK(!forwarding_disabled());
@@ -179,9 +280,13 @@
return;
}
+ CHECK_GT(monotonic_remote_transmit_times_.size(), 0u) << this;
+ const monotonic_clock::time_point transmit_time =
+ monotonic_remote_transmit_times_[0].transmit_time;
+
// Compute the time to publish this message.
const monotonic_clock::time_point monotonic_delivered_time =
- DeliveredTime(fetcher_->context());
+ DeliveredTime(transmit_time);
CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
<< ": Trying to deliver message in the past on channel "
@@ -197,6 +302,8 @@
} else {
server_status_->AddDroppedPacket(server_index_, channel_);
sent_ = true;
+ reliable_scheduled_ = false;
+ published_ = false;
Schedule();
}
}
@@ -208,20 +315,42 @@
while (true) {
if (fetcher_->context().data == nullptr || sent_) {
sent_ = !fetcher_->FetchNext();
+ if (!sent_) {
+ published_ = false;
+ }
+ if (!sent_) {
+ if (monotonic_remote_transmit_times_.size() == 0u) {
+ QueueTransmitTimestamp(fetcher_->context().queue_index,
+ fetcher_->context().monotonic_event_time,
+ fetch_event_loop_->monotonic_now());
+ }
+ }
}
if (sent_) {
break;
}
if (server_connection_->state() != State::CONNECTED) {
+ CHECK_GT(monotonic_remote_transmit_times_.size(), 0u) << this;
+ CHECK_EQ(monotonic_remote_transmit_times_[0].monotonic_sent_time,
+ fetcher_->context().monotonic_event_time)
+ << this << " " << Name();
+ CHECK_EQ(monotonic_remote_transmit_times_[0].sent_queue_index,
+ fetcher_->context().queue_index)
+ << this << " " << Name();
+
+ monotonic_remote_transmit_times_.erase(
+ monotonic_remote_transmit_times_.begin());
sent_ = true;
+ reliable_scheduled_ = false;
+ published_ = false;
server_status_->AddDroppedPacket(server_index_, channel_);
continue;
}
if (fetcher_->context().monotonic_event_time +
send_node_factory_->network_delay() +
- send_node_factory_->send_delay() >
+ send_node_factory_->send_delay() >=
fetch_node_factory_->monotonic_now() ||
time_to_live() == 0) {
break;
@@ -229,14 +358,14 @@
// TODO(austin): Not cool. We want to actually forward these. This means
// we need a more sophisticated concept of what is running.
- // TODO(james): This fails if multiple messages are sent on the same
- // channel within the same callback.
LOG(WARNING) << "Not forwarding message on "
<< configuration::CleanedChannelToString(fetcher_->channel())
<< " because we aren't running. Sent at "
<< fetcher_->context().monotonic_event_time << " now is "
<< fetch_node_factory_->monotonic_now();
sent_ = true;
+ reliable_scheduled_ = false;
+ published_ = false;
server_status_->AddDroppedPacket(server_index_, channel_);
}
}
@@ -246,22 +375,40 @@
timer_scheduled_ = false;
CHECK(sender_);
CHECK(client_status_);
+
+ // Confirm that the first element in the times list is ours, and pull the
+ // transmit time out of it.
+ CHECK(!monotonic_remote_transmit_times_.empty());
+ CHECK_EQ(monotonic_remote_transmit_times_[0].monotonic_sent_time,
+ fetcher_->context().monotonic_event_time);
+ CHECK_EQ(monotonic_remote_transmit_times_[0].sent_queue_index,
+ fetcher_->context().queue_index);
+
+ const monotonic_clock::time_point monotonic_remote_transmit_time =
+ monotonic_remote_transmit_times_[0].transmit_time;
+
+ monotonic_remote_transmit_times_.erase(
+ monotonic_remote_transmit_times_.begin());
+
if (server_connection_->state() != State::CONNECTED) {
sent_ = true;
+ reliable_scheduled_ = false;
+ published_ = false;
Schedule();
return;
}
+
// Fill out the send times.
sender_->CheckOk(sender_->Send(
fetcher_->context().data, fetcher_->context().size,
fetcher_->context().monotonic_event_time,
- fetcher_->context().realtime_event_time,
+ fetcher_->context().realtime_event_time, monotonic_remote_transmit_time,
fetcher_->context().queue_index, fetcher_->context().source_boot_uuid));
// And simulate message_bridge's offset recovery.
- client_status_->SampleFilter(
- client_index_, fetcher_->context().monotonic_event_time,
- sender_->monotonic_sent_time(), fetcher_->context().source_boot_uuid);
+ client_status_->SampleFilter(client_index_, monotonic_remote_transmit_time,
+ sender_->monotonic_sent_time(),
+ fetcher_->context().source_boot_uuid);
client_connection_->mutate_received_packets(
client_connection_->received_packets() + 1);
@@ -294,7 +441,8 @@
fetcher_->context().realtime_event_time.time_since_epoch().count());
message_header_builder.add_remote_queue_index(
fetcher_->context().queue_index);
-
+ message_header_builder.add_monotonic_remote_transmit_time(
+ monotonic_remote_transmit_time.time_since_epoch().count());
message_header_builder.add_monotonic_sent_time(
sender_->monotonic_sent_time().time_since_epoch().count());
message_header_builder.add_realtime_sent_time(
@@ -312,6 +460,8 @@
}
sent_ = true;
+ reliable_scheduled_ = false;
+ published_ = true;
Schedule();
}
@@ -354,13 +504,13 @@
}
// Converts from time on the sending node to time on the receiving node.
- monotonic_clock::time_point DeliveredTime(const Context &context) const {
+ monotonic_clock::time_point DeliveredTime(
+ const monotonic_clock::time_point transmit_time) const {
const distributed_clock::time_point distributed_sent_time =
- fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
+ fetch_node_factory_->ToDistributedClock(transmit_time);
const logger::BootTimestamp t = send_node_factory_->FromDistributedClock(
- distributed_sent_time + send_node_factory_->network_delay() +
- send_node_factory_->send_delay());
+ distributed_sent_time + send_node_factory_->network_delay());
CHECK_EQ(t.boot, send_node_factory_->boot_count());
return t.time;
}
@@ -393,6 +543,8 @@
const size_t destination_node_index_;
// True if we have sent the message in the fetcher.
bool sent_ = false;
+ bool published_ = false;
+ bool reliable_scheduled_ = false;
ServerConnection *server_connection_ = nullptr;
int server_index_ = -1;
@@ -403,6 +555,16 @@
size_t channel_index_;
aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
+ struct TransmitTime {
+ monotonic_clock::time_point monotonic_sent_time;
+ uint32_t sent_queue_index;
+ monotonic_clock::time_point transmit_time;
+ };
+
+ // Stores tthe time the message was handed to the kernel to be published on
+ // the remote node over the network for all forwarded relevant messages.
+ std::vector<TransmitTime> monotonic_remote_transmit_times_;
+
struct Timestamp {
Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
monotonic_clock::time_point new_monotonic_timestamp_time)
@@ -440,9 +602,9 @@
size_t node_index = 0;
for (const std::optional<MessageBridgeServerStatus::NodeState>
- &connection : node_state->server_status->nodes()) {
+ &connection : node_state->server_status_->nodes()) {
if (connection.has_value()) {
- node_state->server_status->ResetFilter(node_index);
+ node_state->server_status_->ResetFilter(node_index);
}
++node_index;
}
@@ -514,9 +676,14 @@
if (channel == timestamp_channel) {
source_event_loop->second.SetSendData(
- [captured_delayers = delayers.get()]() {
+ [source_event_loop, captured_delayers = delayers.get()](
+ uint32_t sent_queue_index,
+ monotonic_clock::time_point monotonic_sent_time) {
for (std::unique_ptr<RawMessageDelayer> &delayer :
captured_delayers->v) {
+ delayer->QueueTransmitTimestamp(
+ sent_queue_index, monotonic_sent_time,
+ source_event_loop->second.event_loop->monotonic_now());
delayer->Schedule();
}
});
@@ -588,11 +755,21 @@
it->second.EnableStatistics();
}
+void SimulatedMessageBridge::State::MakeEventLoop() {
+ // Message bridge isn't the thing that should be catching sent-too-fast,
+ // and may need to be able to forward too-fast messages replayed from old
+ // logfiles.
+ SetEventLoop(node_factory_->MakeEventLoop(
+ "message_bridge", {NodeEventLoopFactory::CheckSentTooFast::kNo,
+ NodeEventLoopFactory::ExclusiveSenders::kNo,
+ {}}));
+}
+
void SimulatedMessageBridge::State::SetEventLoop(
std::unique_ptr<aos::EventLoop> loop) {
if (!loop) {
timestamp_loggers = ChannelTimestampSender(nullptr);
- server_status.reset();
+ server_status_.reset();
client_status.reset();
for (RawMessageDelayer *source_delayer : source_delayers_) {
source_delayer->SetFetchEventLoop(nullptr, nullptr, nullptr);
@@ -615,38 +792,42 @@
// Don't register watchers if we know we aren't forwarding.
if (watcher.second->disable_forwarding) continue;
event_loop->MakeRawNoArgWatcher(
- watcher.first, [captured_delayers = watcher.second](const Context &) {
+ watcher.first,
+ [this, captured_delayers = watcher.second](const Context &context) {
// We might get told after registering, so don't forward at that point
// too.
for (std::unique_ptr<RawMessageDelayer> &delayer :
captured_delayers->v) {
- delayer->Schedule();
+ delayer->MessageWatcherCallback(context.queue_index,
+ context.monotonic_event_time,
+ event_loop->monotonic_now());
}
});
}
timestamp_loggers = ChannelTimestampSender(event_loop.get());
- server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
+ server_status_ =
+ std::make_unique<MessageBridgeServerStatus>(event_loop.get());
if (disable_statistics_) {
- server_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
+ server_status_->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
}
{
size_t node_index = 0;
for (const std::optional<MessageBridgeServerStatus::NodeState> &connection :
- server_status->nodes()) {
+ server_status_->nodes()) {
if (connection.has_value()) {
if (boot_uuids_[node_index] != UUID::Zero()) {
switch (server_state_[node_index]) {
case message_bridge::State::DISCONNECTED:
- server_status->Disconnect(node_index);
+ server_status_->Disconnect(node_index);
break;
case message_bridge::State::CONNECTED:
- server_status->Connect(node_index, event_loop->monotonic_now());
+ server_status_->Connect(node_index, event_loop->monotonic_now());
break;
}
} else {
- server_status->Disconnect(node_index);
+ server_status_->Disconnect(node_index);
}
}
++node_index;
@@ -655,11 +836,11 @@
for (size_t i = 0; i < boot_uuids_.size(); ++i) {
if (boot_uuids_[i] != UUID::Zero()) {
- server_status->SetBootUUID(i, boot_uuids_[i]);
+ server_status_->SetBootUUID(i, boot_uuids_[i]);
}
}
if (fn_) {
- server_status->set_send_data(fn_);
+ server_status_->set_send_data(fn_);
}
client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
if (disable_statistics_) {
@@ -724,7 +905,7 @@
}
for (RawMessageDelayer *source_delayer : source_delayers_) {
- source_delayer->SetFetchEventLoop(event_loop.get(), server_status.get(),
+ source_delayer->SetFetchEventLoop(event_loop.get(), server_status_.get(),
×tamp_loggers);
}
for (RawMessageDelayer *destination_delayer : destination_delayers_) {
@@ -759,4 +940,110 @@
});
}
+void SimulatedMessageBridge::State::SetSendData(
+ std::function<void(uint32_t, monotonic_clock::time_point)> fn) {
+ CHECK(!fn_);
+ fn_ = std::move(fn);
+ if (server_status_) {
+ server_status_->set_send_data(fn_);
+ }
+}
+
+void SimulatedMessageBridge::State::SetBootUUID(size_t node_index,
+ const UUID &boot_uuid) {
+ boot_uuids_[node_index] = boot_uuid;
+ const Node *node = node_factory_->configuration()->nodes()->Get(node_index);
+ if (server_status_) {
+ ServerConnection *connection = server_status_->FindServerConnection(node);
+ if (connection) {
+ if (boot_uuid == UUID::Zero()) {
+ server_status_->Disconnect(node_index);
+ server_status_->ResetFilter(node_index);
+ } else {
+ switch (server_state_[node_index]) {
+ case message_bridge::State::DISCONNECTED:
+ server_status_->Disconnect(node_index);
+ break;
+ case message_bridge::State::CONNECTED:
+ server_status_->Connect(node_index, event_loop->monotonic_now());
+ break;
+ }
+ server_status_->ResetFilter(node_index);
+ server_status_->SetBootUUID(node_index, boot_uuid);
+ }
+ }
+ }
+ if (client_status) {
+ const int client_index =
+ client_status->FindClientIndex(node->name()->string_view());
+ client_status->SampleReset(client_index);
+ if (boot_uuid == UUID::Zero()) {
+ client_status->Disconnect(client_index);
+ } else {
+ switch (client_state_[node_index]) {
+ case message_bridge::State::CONNECTED:
+ client_status->Connect(client_index);
+ break;
+ case message_bridge::State::DISCONNECTED:
+ client_status->Disconnect(client_index);
+ break;
+ }
+ }
+ }
+}
+
+void SimulatedMessageBridge::State::SetServerState(
+ const Node *destination, message_bridge::State state) {
+ const size_t node_index =
+ configuration::GetNodeIndex(node_factory_->configuration(), destination);
+ server_state_[node_index] = state;
+ if (server_status_) {
+ ServerConnection *connection =
+ server_status_->FindServerConnection(destination);
+ if (connection == nullptr) return;
+
+ if (state == connection->state()) {
+ return;
+ }
+ switch (state) {
+ case message_bridge::State::DISCONNECTED:
+ server_status_->Disconnect(node_index);
+ break;
+ case message_bridge::State::CONNECTED:
+ server_status_->Connect(node_index, event_loop->monotonic_now());
+ for (RawMessageDelayer *delayer : source_delayers_) {
+ if (delayer->SendingTo(destination)) {
+ delayer->Connect();
+ }
+ }
+ break;
+ }
+ }
+}
+
+void SimulatedMessageBridge::State::SetClientState(
+ const Node *source, message_bridge::State state) {
+ const size_t node_index =
+ configuration::GetNodeIndex(node_factory_->configuration(), source);
+ client_state_[node_index] = state;
+ if (client_status) {
+ const int client_index =
+ client_status->FindClientIndex(source->name()->string_view());
+ ClientConnection *connection = client_status->GetClientConnection(source);
+
+ // TODO(austin): Are there cases where we want to dedup 2 CONNECTED
+ // calls?
+ if (connection->state() != state) {
+ switch (state) {
+ case message_bridge::State::CONNECTED:
+ client_status->Connect(client_index);
+ break;
+ case message_bridge::State::DISCONNECTED:
+ client_status->Disconnect(client_index);
+ break;
+ }
+ }
+ }
+}
+
} // namespace aos::message_bridge
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 14a7321..e850396 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -59,9 +59,9 @@
void DisableStatistics(DestroySenders destroy_senders) {
disable_statistics_ = true;
destroy_senders_ = destroy_senders;
- if (server_status) {
- server_status->DisableStatistics(destroy_senders ==
- DestroySenders::kYes);
+ if (server_status_) {
+ server_status_->DisableStatistics(destroy_senders ==
+ DestroySenders::kYes);
}
if (client_status) {
client_status->DisableStatistics(destroy_senders ==
@@ -71,8 +71,8 @@
void EnableStatistics() {
disable_statistics_ = false;
- if (server_status) {
- server_status->EnableStatistics();
+ if (server_status_) {
+ server_status_->EnableStatistics();
}
if (client_status) {
client_status->EnableStatistics();
@@ -86,121 +86,22 @@
destination_delayers_.emplace_back(delayer);
}
- void MakeEventLoop() {
- // Message bridge isn't the thing that should be catching sent-too-fast,
- // and may need to be able to forward too-fast messages replayed from old
- // logfiles.
- SetEventLoop(node_factory_->MakeEventLoop(
- "message_bridge", {NodeEventLoopFactory::CheckSentTooFast::kNo,
- NodeEventLoopFactory::ExclusiveSenders::kNo,
- {}}));
- }
+ void MakeEventLoop();
void SetEventLoop(std::unique_ptr<aos::EventLoop> loop);
- void SetSendData(std::function<void()> fn) {
- CHECK(!fn_);
- fn_ = std::move(fn);
- if (server_status) {
- server_status->set_send_data(fn_);
- }
- }
+ void SetSendData(
+ std::function<void(uint32_t, monotonic_clock::time_point)> fn);
void AddDelayerWatcher(const Channel *channel, DelayersVector *v) {
delayer_watchers_.emplace_back(channel, v);
}
- void SetBootUUID(size_t node_index, const UUID &boot_uuid) {
- boot_uuids_[node_index] = boot_uuid;
- const Node *node =
- node_factory_->configuration()->nodes()->Get(node_index);
- if (server_status) {
- ServerConnection *connection =
- server_status->FindServerConnection(node);
- if (connection) {
- if (boot_uuid == UUID::Zero()) {
- server_status->Disconnect(node_index);
- server_status->ResetFilter(node_index);
- } else {
- switch (server_state_[node_index]) {
- case message_bridge::State::DISCONNECTED:
- server_status->Disconnect(node_index);
- break;
- case message_bridge::State::CONNECTED:
- server_status->Connect(node_index, event_loop->monotonic_now());
- break;
- }
- server_status->ResetFilter(node_index);
- server_status->SetBootUUID(node_index, boot_uuid);
- }
- }
- }
- if (client_status) {
- const int client_index =
- client_status->FindClientIndex(node->name()->string_view());
- client_status->SampleReset(client_index);
- if (boot_uuid == UUID::Zero()) {
- client_status->Disconnect(client_index);
- } else {
- switch (client_state_[node_index]) {
- case message_bridge::State::CONNECTED:
- client_status->Connect(client_index);
- break;
- case message_bridge::State::DISCONNECTED:
- client_status->Disconnect(client_index);
- break;
- }
- }
- }
- }
+ void SetBootUUID(size_t node_index, const UUID &boot_uuid);
- void SetServerState(const Node *destination, message_bridge::State state) {
- const size_t node_index = configuration::GetNodeIndex(
- node_factory_->configuration(), destination);
- server_state_[node_index] = state;
- if (server_status) {
- ServerConnection *connection =
- server_status->FindServerConnection(destination);
- if (connection == nullptr) return;
+ void SetServerState(const Node *destination, message_bridge::State state);
- if (state == connection->state()) {
- return;
- }
- switch (state) {
- case message_bridge::State::DISCONNECTED:
- server_status->Disconnect(node_index);
- break;
- case message_bridge::State::CONNECTED:
- server_status->Connect(node_index, event_loop->monotonic_now());
- break;
- }
- }
- }
-
- void SetClientState(const Node *source, message_bridge::State state) {
- const size_t node_index =
- configuration::GetNodeIndex(node_factory_->configuration(), source);
- client_state_[node_index] = state;
- if (client_status) {
- const int client_index =
- client_status->FindClientIndex(source->name()->string_view());
- ClientConnection *connection =
- client_status->GetClientConnection(source);
-
- // TODO(austin): Are there cases where we want to dedup 2 CONNECTED
- // calls?
- if (connection->state() != state) {
- switch (state) {
- case message_bridge::State::CONNECTED:
- client_status->Connect(client_index);
- break;
- case message_bridge::State::DISCONNECTED:
- client_status->Disconnect(client_index);
- break;
- }
- }
- }
- }
+ void SetClientState(const Node *source, message_bridge::State state);
std::vector<UUID> boot_uuids_;
std::vector<message_bridge::State> client_state_;
@@ -208,12 +109,12 @@
std::vector<std::pair<const Channel *, DelayersVector *>> delayer_watchers_;
- std::function<void()> fn_;
+ std::function<void(uint32_t, monotonic_clock::time_point)> fn_;
NodeEventLoopFactory *node_factory_;
std::unique_ptr<aos::EventLoop> event_loop;
ChannelTimestampSender timestamp_loggers;
- std::unique_ptr<MessageBridgeServerStatus> server_status;
+ std::unique_ptr<MessageBridgeServerStatus> server_status_;
std::unique_ptr<MessageBridgeClientStatus> client_status;
// List of delayers to update whenever this node starts or stops.
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 9d14d8f..57a2e9e 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -953,6 +953,7 @@
const char *data, size_t length,
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index, const UUID &source_boot_uuid,
monotonic_clock::time_point *monotonic_sent_time,
realtime_clock::time_point *realtime_sent_time, uint32_t *queue_index) {
@@ -962,13 +963,15 @@
// adhere to this convention and place it at the end.
memcpy((reinterpret_cast<char *>(Data()) + size() - length), data, length);
return Send(length, monotonic_remote_time, realtime_remote_time,
- remote_queue_index, source_boot_uuid, monotonic_sent_time,
- realtime_sent_time, queue_index);
+ monotonic_remote_transmit_time, remote_queue_index,
+ source_boot_uuid, monotonic_sent_time, realtime_sent_time,
+ queue_index);
}
LocklessQueueSender::Result LocklessQueueSender::Send(
size_t length, monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index, const UUID &source_boot_uuid,
monotonic_clock::time_point *monotonic_sent_time,
realtime_clock::time_point *realtime_sent_time, uint32_t *queue_index) {
@@ -997,6 +1000,8 @@
message->header.source_boot_uuid = source_boot_uuid;
message->header.monotonic_remote_time = monotonic_remote_time;
message->header.realtime_remote_time = realtime_remote_time;
+ message->header.monotonic_remote_transmit_time =
+ monotonic_remote_transmit_time;
Index to_replace = Index::Invalid();
while (true) {
@@ -1298,6 +1303,7 @@
monotonic_clock::time_point *monotonic_sent_time,
realtime_clock::time_point *realtime_sent_time,
monotonic_clock::time_point *monotonic_remote_time,
+ monotonic_clock::time_point *monotonic_remote_transmit_time,
realtime_clock::time_point *realtime_remote_time,
uint32_t *remote_queue_index, UUID *source_boot_uuid, size_t *length,
char *data,
@@ -1379,6 +1385,8 @@
context.monotonic_event_time = m->header.monotonic_sent_time;
context.realtime_event_time = m->header.realtime_sent_time;
context.monotonic_remote_time = m->header.monotonic_remote_time;
+ context.monotonic_remote_transmit_time =
+ m->header.monotonic_remote_transmit_time;
context.realtime_remote_time = m->header.realtime_remote_time;
context.queue_index = queue_index.index();
if (m->header.remote_queue_index == 0xffffffffu) {
@@ -1443,6 +1451,7 @@
*realtime_sent_time = context.realtime_event_time;
*remote_queue_index = context.remote_queue_index;
*monotonic_remote_time = context.monotonic_remote_time;
+ *monotonic_remote_transmit_time = context.monotonic_remote_transmit_time;
*realtime_remote_time = context.realtime_remote_time;
*source_boot_uuid = context.source_boot_uuid;
*length = context.size;
@@ -1597,6 +1606,12 @@
<< m->header.monotonic_remote_time << " 0x" << std::hex
<< m->header.monotonic_remote_time.time_since_epoch().count()
<< std::dec << ::std::endl;
+ ::std::cout
+ << " monotonic_clock::time_point "
+ "monotonic_remote_transmit_time = "
+ << m->header.monotonic_remote_transmit_time << " 0x" << std::hex
+ << m->header.monotonic_remote_transmit_time.time_since_epoch().count()
+ << std::dec << ::std::endl;
::std::cout << " realtime_clock::time_point realtime_remote_time = "
<< m->header.realtime_remote_time << " 0x" << std::hex
<< m->header.realtime_remote_time.time_since_epoch().count()
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 4f867b9..462b2b6 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -91,6 +91,7 @@
// passed through.
monotonic_clock::time_point monotonic_remote_time;
realtime_clock::time_point realtime_remote_time;
+ monotonic_clock::time_point monotonic_remote_transmit_time;
// Queue index from the remote node.
uint32_t remote_queue_index;
@@ -326,6 +327,7 @@
LocklessQueueSender::Result Send(
size_t length, monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index, const UUID &source_boot_uuid,
monotonic_clock::time_point *monotonic_sent_time = nullptr,
realtime_clock::time_point *realtime_sent_time = nullptr,
@@ -336,6 +338,7 @@
const char *data, size_t length,
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
+ monotonic_clock::time_point monotonic_remote_transmit_time,
uint32_t remote_queue_index, const UUID &source_boot_uuid,
monotonic_clock::time_point *monotonic_sent_time = nullptr,
realtime_clock::time_point *realtime_sent_time = nullptr,
@@ -442,6 +445,7 @@
uint32_t queue_index, monotonic_clock::time_point *monotonic_sent_time,
realtime_clock::time_point *realtime_sent_time,
monotonic_clock::time_point *monotonic_remote_time,
+ monotonic_clock::time_point *monotonic_remote_transmit_time,
realtime_clock::time_point *realtime_remote_time,
uint32_t *remote_queue_index, UUID *source_boot_uuid, size_t *length,
char *data,
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index ccf95f7..bc5ebb7 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -79,10 +79,11 @@
for (int i = 0; i < 5; ++i) {
char data[100];
size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
- ASSERT_EQ(sender.Send(data, s + 1, monotonic_clock::min_time,
- realtime_clock::min_time, 0xffffffffl,
- UUID::Zero(), nullptr, nullptr, nullptr),
- LocklessQueueSender::Result::GOOD);
+ ASSERT_EQ(
+ sender.Send(data, s + 1, monotonic_clock::min_time,
+ realtime_clock::min_time, monotonic_clock::min_time,
+ 0xffffffffl, UUID::Zero(), nullptr, nullptr, nullptr),
+ LocklessQueueSender::Result::GOOD);
// Pin a message, so when we keep writing we will exercise the pinning
// logic.
if (i == 1) {
@@ -156,10 +157,11 @@
// Send a message to make sure that the queue still works.
char data[100];
size_t s = snprintf(data, sizeof(data), "foobar%d", 971);
- ASSERT_EQ(sender.Send(data, s + 1, monotonic_clock::min_time,
- realtime_clock::min_time, 0xffffffffl,
- UUID::Zero(), nullptr, nullptr, nullptr),
- LocklessQueueSender::Result::GOOD);
+ ASSERT_EQ(
+ sender.Send(data, s + 1, monotonic_clock::min_time,
+ realtime_clock::min_time, monotonic_clock::min_time,
+ 0xffffffffl, UUID::Zero(), nullptr, nullptr, nullptr),
+ LocklessQueueSender::Result::GOOD);
}
// Now loop through the queue and make sure the number in the snprintf
@@ -175,17 +177,18 @@
monotonic_clock::time_point monotonic_sent_time;
realtime_clock::time_point realtime_sent_time;
monotonic_clock::time_point monotonic_remote_time;
+ monotonic_clock::time_point monotonic_remote_transmit_time;
realtime_clock::time_point realtime_remote_time;
uint32_t remote_queue_index;
UUID source_boot_uuid;
char read_data[1024];
size_t length;
- LocklessQueueReader::Result read_result =
- reader.Read(i, &monotonic_sent_time, &realtime_sent_time,
- &monotonic_remote_time, &realtime_remote_time,
- &remote_queue_index, &source_boot_uuid, &length,
- &(read_data[0]), std::ref(should_read));
+ LocklessQueueReader::Result read_result = reader.Read(
+ i, &monotonic_sent_time, &realtime_sent_time,
+ &monotonic_remote_time, &monotonic_remote_transmit_time,
+ &realtime_remote_time, &remote_queue_index, &source_boot_uuid,
+ &length, &(read_data[0]), std::ref(should_read));
if (read_result != LocklessQueueReader::Result::GOOD) {
if (read_result == LocklessQueueReader::Result::TOO_OLD) {
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index bfd9916..a2c0992 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -251,8 +251,8 @@
char data[100];
size_t s = snprintf(data, sizeof(data), "foobar%d", i);
ASSERT_EQ(sender.Send(data, s, monotonic_clock::min_time,
- realtime_clock::min_time, 0xffffffffu, UUID::Zero(),
- nullptr, nullptr, nullptr),
+ realtime_clock::min_time, monotonic_clock::min_time,
+ 0xffffffffu, UUID::Zero(), nullptr, nullptr, nullptr),
LocklessQueueSender::Result::GOOD);
// Confirm that the queue index still makes sense. This is easier since the
@@ -263,6 +263,7 @@
monotonic_clock::time_point monotonic_sent_time;
realtime_clock::time_point realtime_sent_time;
monotonic_clock::time_point monotonic_remote_time;
+ monotonic_clock::time_point monotonic_remote_transmit_time;
realtime_clock::time_point realtime_remote_time;
uint32_t remote_queue_index;
UUID source_boot_uuid;
@@ -277,8 +278,9 @@
}
LocklessQueueReader::Result read_result = reader.Read(
index.index(), &monotonic_sent_time, &realtime_sent_time,
- &monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
- &source_boot_uuid, &length, &(read_data[0]), std::ref(should_read));
+ &monotonic_remote_time, &monotonic_remote_transmit_time,
+ &realtime_remote_time, &remote_queue_index, &source_boot_uuid, &length,
+ &(read_data[0]), std::ref(should_read));
// This should either return GOOD, or TOO_OLD if it is before the start of
// the queue.
@@ -450,6 +452,7 @@
monotonic_clock::time_point monotonic_sent_time;
realtime_clock::time_point realtime_sent_time;
monotonic_clock::time_point monotonic_remote_time;
+ monotonic_clock::time_point monotonic_remote_transmit_time;
realtime_clock::time_point realtime_remote_time;
uint32_t remote_queue_index;
UUID source_boot_uuid;
@@ -458,8 +461,9 @@
LocklessQueueReader::Result read_result = reader.Read(
i, &monotonic_sent_time, &realtime_sent_time, &monotonic_remote_time,
- &realtime_remote_time, &remote_queue_index, &source_boot_uuid, &length,
- &(read_data[0]), should_read_callback);
+ &monotonic_remote_transmit_time, &realtime_remote_time,
+ &remote_queue_index, &source_boot_uuid, &length, &(read_data[0]),
+ should_read_callback);
if (read_result != LocklessQueueReader::Result::GOOD) {
if (read_result == LocklessQueueReader::Result::TOO_OLD) {
@@ -526,10 +530,11 @@
for (int i = 0; i < 5; ++i) {
char data[100];
size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
- ASSERT_EQ(sender.Send(data, s + 1, monotonic_clock::min_time,
- realtime_clock::min_time, 0xffffffffl,
- UUID::Zero(), nullptr, nullptr, nullptr),
- LocklessQueueSender::Result::GOOD);
+ ASSERT_EQ(
+ sender.Send(data, s + 1, monotonic_clock::min_time,
+ realtime_clock::min_time, monotonic_clock::min_time,
+ 0xffffffffl, UUID::Zero(), nullptr, nullptr, nullptr),
+ LocklessQueueSender::Result::GOOD);
}
},
[config, &tid](void *raw_memory) {
@@ -549,10 +554,11 @@
{
char data[100];
size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
- ASSERT_EQ(sender.Send(data, s + 1, monotonic_clock::min_time,
- realtime_clock::min_time, 0xffffffffl,
- UUID::Zero(), nullptr, nullptr, nullptr),
- LocklessQueueSender::Result::GOOD);
+ ASSERT_EQ(
+ sender.Send(data, s + 1, monotonic_clock::min_time,
+ realtime_clock::min_time, monotonic_clock::min_time,
+ 0xffffffffl, UUID::Zero(), nullptr, nullptr, nullptr),
+ LocklessQueueSender::Result::GOOD);
}
// Now, make sure we can send 1 message and receive it to confirm we
diff --git a/aos/ipc_lib/memory_mapped_queue.cc b/aos/ipc_lib/memory_mapped_queue.cc
index fc486d4..dc457fe 100644
--- a/aos/ipc_lib/memory_mapped_queue.cc
+++ b/aos/ipc_lib/memory_mapped_queue.cc
@@ -16,7 +16,7 @@
std::string ShmPath(std::string_view shm_base, const Channel *channel) {
CHECK(channel->has_type());
- return ShmFolder(shm_base, channel) + channel->type()->str() + ".v6";
+ return ShmFolder(shm_base, channel) + channel->type()->str() + ".v7";
}
void PageFaultDataWrite(char *data, size_t size, const long page_size) {
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index aa73f2b..2797c24 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -197,7 +197,8 @@
++started_writes_;
auto result =
sender.Send(sizeof(ThreadPlusCount), aos::monotonic_clock::min_time,
- aos::realtime_clock::min_time, 0xffffffff,
+ aos::realtime_clock::min_time,
+ aos::monotonic_clock::min_time, 0xffffffff,
UUID::FromSpan(absl::Span<const uint8_t>(
reinterpret_cast<const uint8_t *>(&tpc),
sizeof(ThreadPlusCount))),
@@ -309,6 +310,7 @@
monotonic_clock::time_point monotonic_sent_time;
realtime_clock::time_point realtime_sent_time;
monotonic_clock::time_point monotonic_remote_time;
+ monotonic_clock::time_point monotonic_remote_transmit_time;
realtime_clock::time_point realtime_remote_time;
UUID source_boot_uuid;
uint32_t remote_queue_index;
@@ -321,14 +323,16 @@
0xffffffffu, LocklessQueueSize(queue_.memory())));
LocklessQueueReader::Result read_result =
set_should_read
- ? reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
- &monotonic_remote_time, &realtime_remote_time,
- &remote_queue_index, &source_boot_uuid, &length,
- &(read_data[0]), std::ref(should_read))
+ ? reader.Read(
+ wrapped_i, &monotonic_sent_time, &realtime_sent_time,
+ &monotonic_remote_time, &monotonic_remote_transmit_time,
+ &realtime_remote_time, &remote_queue_index, &source_boot_uuid,
+ &length, &(read_data[0]), std::ref(should_read))
: reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
- &monotonic_remote_time, &realtime_remote_time,
- &remote_queue_index, &source_boot_uuid, &length,
- &(read_data[0]), nop);
+ &monotonic_remote_time,
+ &monotonic_remote_transmit_time,
+ &realtime_remote_time, &remote_queue_index,
+ &source_boot_uuid, &length, &(read_data[0]), nop);
// The code in lockless_queue.cc reads everything but data, checks that the
// header hasn't changed, then reads the data. So, if we succeed and both
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index 620a069..90c0cc0 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -97,6 +97,7 @@
message_header_builder.add_queue_index(0);
message_header_builder.add_monotonic_remote_time(0);
message_header_builder.add_realtime_remote_time(0);
+ message_header_builder.add_monotonic_remote_transmit_time(0);
message_header_builder.add_remote_queue_index(0);
fbb.Finish(message_header_builder.Finish());
@@ -323,12 +324,14 @@
chrono::nanoseconds(remote_data->monotonic_sent_time())),
realtime_clock::time_point(
chrono::nanoseconds(remote_data->realtime_sent_time())),
+ monotonic_clock::time_point(
+ chrono::nanoseconds(remote_data->monotonic_remote_transmit_time())),
remote_data->queue_index(), remote_boot_uuid));
client_status_->SampleFilter(
client_index_,
monotonic_clock::time_point(
- chrono::nanoseconds(remote_data->monotonic_sent_time())),
+ chrono::nanoseconds(remote_data->monotonic_remote_transmit_time())),
sender->monotonic_sent_time(), remote_boot_uuid);
if (stream_reply_with_timestamp_[stream]) {
@@ -339,6 +342,8 @@
.queue_index = remote_data->queue_index(),
.monotonic_remote_time =
sender->monotonic_sent_time().time_since_epoch().count(),
+ .monotonic_remote_transmit_time =
+ remote_data->monotonic_remote_transmit_time(),
.realtime_remote_time =
sender->realtime_sent_time().time_since_epoch().count(),
.remote_queue_index = sender->sent_queue_index(),
@@ -402,6 +407,9 @@
timestamp.channel_index);
message_reception_reply_.mutable_message()->mutate_monotonic_sent_time(
timestamp.monotonic_sent_time);
+ message_reception_reply_.mutable_message()
+ ->mutate_monotonic_remote_transmit_time(
+ timestamp.monotonic_remote_transmit_time);
message_reception_reply_.mutable_message()->mutate_realtime_sent_time(
timestamp.realtime_sent_time);
message_reception_reply_.mutable_message()->mutate_queue_index(
diff --git a/aos/network/message_bridge_client_lib.h b/aos/network/message_bridge_client_lib.h
index 56d2563..97d5946 100644
--- a/aos/network/message_bridge_client_lib.h
+++ b/aos/network/message_bridge_client_lib.h
@@ -57,6 +57,7 @@
int64_t realtime_sent_time;
uint32_t queue_index;
int64_t monotonic_remote_time;
+ int64_t monotonic_remote_transmit_time;
int64_t realtime_remote_time;
uint32_t remote_queue_index;
};
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index caecc81..3c7a8d2 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -95,6 +95,8 @@
context.realtime_event_time.time_since_epoch().count());
remote_data_builder.add_data(data_offset);
remote_data_builder.add_boot_uuid(boot_uuid_offset);
+ remote_data_builder.add_monotonic_remote_transmit_time(
+ monotonic_clock::now().time_since_epoch().count());
// TODO(austin): Use an iovec to build it up in 3 parts to avoid the copy?
// Only useful when not logging.
@@ -304,6 +306,8 @@
remote_message_builder.add_remote_queue_index(
message_header->queue_index());
remote_message_builder.add_boot_uuid(boot_uuid_offset);
+ remote_message_builder.add_monotonic_remote_transmit_time(
+ message_header->monotonic_remote_transmit_time());
server_status->AddPartialDeliveries(peer.node_index,
partial_deliveries);
@@ -418,7 +422,10 @@
timestamp_loggers_(event_loop_),
server_(max_channels() + kControlStreams(), "",
event_loop->node()->port(), requested_authentication),
- server_status_(event_loop, [this]() { timestamp_state_->SendData(); }),
+ server_status_(event_loop,
+ [this](uint32_t, monotonic_clock::time_point) {
+ timestamp_state_->SendData();
+ }),
config_sha256_(std::move(config_sha256)),
allocator_(0),
refresh_key_timer_(event_loop->AddTimer([this]() { RequestAuthKey(); })),
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index 4df717f..50a8cea 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -90,7 +90,8 @@
} // namespace
MessageBridgeServerStatus::MessageBridgeServerStatus(
- aos::EventLoop *event_loop, std::function<void()> send_data)
+ aos::EventLoop *event_loop,
+ std::function<void(uint32_t, monotonic_clock::time_point)> send_data)
: event_loop_(event_loop),
sender_(event_loop_->MakeSender<ServerStatistics>("/aos")),
statistics_(MakeServerStatistics(
@@ -100,7 +101,7 @@
client_statistics_fetcher_(
event_loop_->MakeFetcher<ClientStatistics>("/aos")),
timestamp_sender_(event_loop_->MakeSender<Timestamp>("/aos")),
- send_data_(send_data) {
+ send_data_(std::move(send_data)) {
server_connection_offsets_.reserve(
statistics_.message().connections()->size());
client_offsets_.reserve(statistics_.message().connections()->size());
@@ -484,7 +485,8 @@
// Since we are building up the timestamp to send here, we need to trigger
// the SendData call ourselves.
if (send_data_) {
- send_data_();
+ send_data_(timestamp_sender_.sent_queue_index(),
+ timestamp_sender_.monotonic_sent_time());
}
}
}
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index 1248c27..11b74a0 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -52,7 +52,8 @@
MessageBridgeServerStatus(
aos::EventLoop *event_loop,
- std::function<void()> send_data = std::function<void()>());
+ std::function<void(uint32_t, monotonic_clock::time_point)> send_data =
+ std::function<void(uint32_t, monotonic_clock::time_point)>());
MessageBridgeServerStatus(const MessageBridgeServerStatus &) = delete;
MessageBridgeServerStatus(MessageBridgeServerStatus &&) = delete;
@@ -60,7 +61,8 @@
delete;
MessageBridgeServerStatus &operator=(MessageBridgeServerStatus &&) = delete;
- void set_send_data(std::function<void()> send_data) {
+ void set_send_data(
+ std::function<void(uint32_t, monotonic_clock::time_point)> send_data) {
send_data_ = send_data;
}
@@ -152,7 +154,7 @@
aos::monotonic_clock::time_point last_statistics_send_time_ =
aos::monotonic_clock::min_time;
- std::function<void()> send_data_;
+ std::function<void(uint32_t, monotonic_clock::time_point)> send_data_;
bool send_ = true;
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index c211c89..d55438f 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -344,6 +344,9 @@
chrono::nanoseconds(header.realtime_sent_time()));
const aos::monotonic_clock::time_point header_monotonic_remote_time(
chrono::nanoseconds(header.monotonic_remote_time()));
+ const aos::monotonic_clock::time_point
+ header_monotonic_remote_transmit_time(
+ chrono::nanoseconds(header.monotonic_remote_transmit_time()));
const aos::realtime_clock::time_point header_realtime_remote_time(
chrono::nanoseconds(header.realtime_remote_time()));
@@ -396,12 +399,19 @@
EXPECT_EQ(pi2_context->monotonic_remote_time,
header_monotonic_remote_time);
+ EXPECT_LT(header_monotonic_remote_transmit_time,
+ pi2_context->monotonic_event_time);
+ EXPECT_GT(header_monotonic_remote_transmit_time,
+ pi2_context->monotonic_remote_time);
+
// Confirm the forwarded message also matches the source message.
EXPECT_EQ(pi1_context->queue_index, header.queue_index());
EXPECT_EQ(pi1_context->monotonic_event_time,
header_monotonic_remote_time);
EXPECT_EQ(pi1_context->realtime_event_time,
header_realtime_remote_time);
+ EXPECT_EQ(header_monotonic_remote_transmit_time,
+ pi2_context->monotonic_remote_transmit_time);
});
}
@@ -860,6 +870,8 @@
&pi1_remote_timestamp_event_loop);
{
+ const aos::monotonic_clock::time_point startup_time =
+ aos::monotonic_clock::now();
// Now spin up a client for 2 seconds.
MakePi2Client();
@@ -878,6 +890,10 @@
0u);
EXPECT_TRUE(ping_fetcher.Fetch());
+ EXPECT_GT(ping_fetcher.context().monotonic_remote_transmit_time,
+ startup_time);
+ EXPECT_LT(ping_fetcher.context().monotonic_remote_transmit_time,
+ aos::monotonic_clock::now());
EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
EXPECT_EQ(ping_timestamp_count, 1);
@@ -990,6 +1006,8 @@
&pi1_remote_timestamp_event_loop);
{
+ const aos::monotonic_clock::time_point startup_time =
+ aos::monotonic_clock::now();
// Now, spin up a server for 2 seconds.
MakePi1Server();
@@ -1008,6 +1026,11 @@
0u);
EXPECT_TRUE(ping_fetcher.Fetch());
+ EXPECT_GT(ping_fetcher.context().monotonic_remote_transmit_time,
+ startup_time);
+ EXPECT_LT(ping_fetcher.context().monotonic_remote_transmit_time,
+ aos::monotonic_clock::now());
+
EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
EXPECT_EQ(ping_timestamp_count, 1);
LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
@@ -1144,6 +1167,8 @@
}
{
+ const aos::monotonic_clock::time_point startup_time =
+ aos::monotonic_clock::now();
// Now, spin up a client for 2 seconds.
MakePi2Client();
@@ -1165,6 +1190,11 @@
// We should have gotten precisely one more ping message--the latest one
// sent should've made it, but no previous ones.
EXPECT_TRUE(ping_fetcher.FetchNext());
+ EXPECT_GT(ping_fetcher.context().monotonic_remote_transmit_time,
+ startup_time);
+ EXPECT_LT(ping_fetcher.context().monotonic_remote_transmit_time,
+ aos::monotonic_clock::now());
+
EXPECT_EQ(ping_index, ping_fetcher->value());
EXPECT_FALSE(ping_fetcher.FetchNext());
diff --git a/aos/network/remote_data.fbs b/aos/network/remote_data.fbs
index 0813e57..4e4c7af 100644
--- a/aos/network/remote_data.fbs
+++ b/aos/network/remote_data.fbs
@@ -20,6 +20,12 @@
// UUID for this boot. This is 16 bytes long.
boot_uuid:[uint8] (id: 5);
+
+ // Time that the message was handed to the kernel to be published over the
+ // network on the remote node.
+ //
+ // See MessageHeader fbs definition for more details.
+ monotonic_remote_transmit_time:int64 = -9223372036854775808(id: 6);
}
root_type RemoteData;
diff --git a/aos/network/remote_message.fbs b/aos/network/remote_message.fbs
index 6d2a8d1..4305f51 100644
--- a/aos/network/remote_message.fbs
+++ b/aos/network/remote_message.fbs
@@ -32,6 +32,13 @@
// UUID for this boot.
boot_uuid:[uint8] (id: 9);
+
+ // The time that the message was transmitted on the source node to the
+ // destination node i.e handed to the kernel to be published over the
+ // message bridge.
+ //
+ // See MessageHeader fbs definition for more details.
+ monotonic_remote_transmit_time:int64 = -9223372036854775808(id: 10);
}
root_type RemoteMessage;