Add remote_boot_uuid to Context
This lets us track which boot a message came from and finally fix the
logger relying on ServerStatistics having all the required information
needed to build up the logfile header.
Change-Id: I17fc4c5718d5d69c7a1e154afdd83b1ccb388a8f
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 22f7c6f..cc5a356 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -75,6 +75,10 @@
// the caller has access to this context, which makes this pretty useless.
int buffer_index;
+ // UUID of the remote node which sent this message, or this node in the case
+ // of events which are local to this node.
+ UUID remote_boot_uuid = UUID::Zero();
+
// Efficiently copies the flatbuffer into a FlatbufferVector, allocating
// memory in the process. It is vital that T matches the type of the
// underlying flatbuffer.
@@ -150,7 +154,7 @@
bool Send(size_t size);
bool Send(size_t size, monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index);
+ uint32_t remote_queue_index, const UUID &remote_boot_uuid);
// Sends a single block of data by copying it.
// The remote arguments have the same meaning as in Send above.
@@ -158,7 +162,7 @@
bool Send(const void *data, size_t size,
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index);
+ uint32_t remote_queue_index, const UUID &remote_boot_uuid);
const Channel *channel() const { return channel_; }
@@ -195,13 +199,15 @@
friend class EventLoop;
virtual bool DoSend(const void *data, size_t size,
- aos::monotonic_clock::time_point monotonic_remote_time,
- aos::realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index) = 0;
+ monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index,
+ const UUID &remote_boot_uuid) = 0;
virtual bool DoSend(size_t size,
- aos::monotonic_clock::time_point monotonic_remote_time,
- aos::realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index) = 0;
+ monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index,
+ const UUID &remote_boot_uuid) = 0;
EventLoop *const event_loop_;
const Channel *const channel_;
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index e0c6024..ee77b8a 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -274,6 +274,7 @@
EXPECT_EQ(fetcher.context().monotonic_remote_time, monotonic_clock::min_time);
EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
+ EXPECT_EQ(fetcher.context().remote_boot_uuid, UUID::Zero());
EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
EXPECT_EQ(fetcher.context().size, 0u);
EXPECT_EQ(fetcher.context().data, nullptr);
@@ -301,6 +302,7 @@
EXPECT_LE(fetcher.context().monotonic_event_time, monotonic_now + kEpsilon);
EXPECT_GE(fetcher.context().realtime_event_time, realtime_now - kEpsilon);
EXPECT_LE(fetcher.context().realtime_event_time, realtime_now + kEpsilon);
+ EXPECT_EQ(fetcher.context().remote_boot_uuid, loop2->boot_uuid());
EXPECT_EQ(fetcher.context().queue_index, 0x0u);
EXPECT_EQ(fetcher.context().size, 20u);
EXPECT_NE(fetcher.context().data, nullptr);
@@ -955,6 +957,7 @@
EXPECT_EQ(loop->context().monotonic_remote_time, monotonic_clock::min_time);
EXPECT_EQ(loop->context().realtime_event_time, realtime_clock::min_time);
EXPECT_EQ(loop->context().realtime_remote_time, realtime_clock::min_time);
+ EXPECT_EQ(loop->context().remote_boot_uuid, loop->boot_uuid());
EXPECT_EQ(loop->context().queue_index, 0xffffffffu);
EXPECT_EQ(loop->context().size, 0u);
EXPECT_EQ(loop->context().data, nullptr);
@@ -1249,6 +1252,7 @@
loop1->context().monotonic_event_time);
EXPECT_EQ(loop1->context().realtime_remote_time,
loop1->context().realtime_event_time);
+ EXPECT_EQ(loop1->context().remote_boot_uuid, loop1->boot_uuid());
const aos::monotonic_clock::time_point monotonic_now =
loop1->monotonic_now();
@@ -1296,6 +1300,7 @@
fetcher.context().realtime_remote_time);
EXPECT_EQ(fetcher.context().monotonic_event_time,
fetcher.context().monotonic_remote_time);
+ EXPECT_EQ(fetcher.context().remote_boot_uuid, loop1->boot_uuid());
EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500))
<< ": Got "
@@ -1348,6 +1353,7 @@
loop1->context().monotonic_event_time);
EXPECT_EQ(loop1->context().realtime_remote_time,
loop1->context().realtime_event_time);
+ EXPECT_EQ(loop1->context().remote_boot_uuid, loop1->boot_uuid());
const aos::monotonic_clock::time_point monotonic_now =
loop1->monotonic_now();
@@ -1381,6 +1387,7 @@
fetcher.context().realtime_remote_time);
EXPECT_EQ(fetcher.context().monotonic_event_time,
fetcher.context().monotonic_remote_time);
+ EXPECT_EQ(fetcher.context().remote_boot_uuid, loop1->boot_uuid());
EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500))
<< ": Got "
@@ -1438,6 +1445,7 @@
EXPECT_EQ(loop1->context().monotonic_remote_time,
monotonic_clock::min_time);
+ EXPECT_EQ(loop1->context().remote_boot_uuid, loop1->boot_uuid());
EXPECT_EQ(loop1->context().realtime_event_time,
realtime_clock::min_time);
EXPECT_EQ(loop1->context().realtime_remote_time,
@@ -1582,13 +1590,13 @@
FlatbufferDetachedBuffer<timing::Report> primary_report =
FlatbufferDetachedBuffer<timing::Report>::Empty();
while (report_fetcher.FetchNext()) {
- LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
+ VLOG(1) << "Report " << FlatbufferToJson(report_fetcher.get());
if (report_fetcher->name()->string_view() == "primary") {
primary_report = CopyFlatBuffer(report_fetcher.get());
}
}
- LOG(INFO) << FlatbufferToJson(primary_report, {.multi_line = true});
+ VLOG(1) << FlatbufferToJson(primary_report, {.multi_line = true});
EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
@@ -1849,6 +1857,7 @@
const aos::realtime_clock::time_point realtime_remote_time =
aos::realtime_clock::time_point(chrono::seconds(3132));
const uint32_t remote_queue_index = 0x254971;
+ const UUID remote_boot_uuid = UUID::Random();
std::unique_ptr<aos::RawSender> sender =
loop1->MakeRawSender(configuration::GetChannel(
@@ -1860,19 +1869,21 @@
loop2->OnRun([&]() {
EXPECT_TRUE(sender->Send(kData.data(), kData.size(), monotonic_remote_time,
- realtime_remote_time, remote_queue_index));
+ realtime_remote_time, remote_queue_index,
+ remote_boot_uuid));
});
bool happened = false;
loop2->MakeRawWatcher(
configuration::GetChannel(loop2->configuration(), "/test",
"aos.TestMessage", "", nullptr),
- [this, monotonic_remote_time, realtime_remote_time,
+ [this, monotonic_remote_time, realtime_remote_time, remote_boot_uuid,
remote_queue_index, &fetcher,
&happened](const Context &context, const void * /*message*/) {
happened = true;
EXPECT_EQ(monotonic_remote_time, context.monotonic_remote_time);
EXPECT_EQ(realtime_remote_time, context.realtime_remote_time);
+ EXPECT_EQ(remote_boot_uuid, context.remote_boot_uuid);
EXPECT_EQ(remote_queue_index, context.remote_queue_index);
ASSERT_TRUE(fetcher->Fetch());
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 74d3493..d6922de 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -131,15 +131,15 @@
inline bool RawSender::Send(size_t size) {
return Send(size, monotonic_clock::min_time, realtime_clock::min_time,
- 0xffffffffu);
+ 0xffffffffu, event_loop_->boot_uuid());
}
inline bool RawSender::Send(
size_t size, aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index) {
+ uint32_t remote_queue_index, const UUID &uuid) {
if (DoSend(size, monotonic_remote_time, realtime_remote_time,
- remote_queue_index)) {
+ remote_queue_index, uuid)) {
timing_.size.Add(size);
timing_.sender->mutate_count(timing_.sender->count() + 1);
ftrace_.FormatMessage(
@@ -154,16 +154,16 @@
inline bool RawSender::Send(const void *data, size_t size) {
return Send(data, size, monotonic_clock::min_time, realtime_clock::min_time,
- 0xffffffffu);
+ 0xffffffffu, event_loop_->boot_uuid());
}
inline bool RawSender::Send(
const void *data, size_t size,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index) {
+ uint32_t remote_queue_index, const UUID &uuid) {
if (DoSend(data, size, monotonic_remote_time, realtime_remote_time,
- remote_queue_index)) {
+ remote_queue_index, uuid)) {
timing_.size.Add(size);
timing_.sender->mutate_count(timing_.sender->count() + 1);
ftrace_.FormatMessage(
@@ -190,6 +190,7 @@
event_loop_->context_.size = 0;
event_loop_->context_.data = nullptr;
event_loop_->context_.buffer_index = -1;
+ event_loop_->context_.remote_boot_uuid = event_loop_->boot_uuid();
ftrace_.FormatMessage(
"timer: %.*s: start now=%" PRId64 " event=%" PRId64,
@@ -235,6 +236,7 @@
event_loop_->context_.size = 0;
event_loop_->context_.data = nullptr;
event_loop_->context_.buffer_index = -1;
+ event_loop_->context_.remote_boot_uuid = event_loop_->boot_uuid();
// Compute how many cycles elapsed and schedule the next wakeup.
Reschedule(schedule, monotonic_start_time);
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 07b423f..098b78c 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -315,7 +315,8 @@
for (size_t i = 1; i < filtered_parts.size(); ++i) {
CHECK_EQ(filtered_parts[i].source_boot_uuid,
filtered_parts[0].source_boot_uuid)
- << ": Found parts from different boots "
+ << ": Found parts from different boots for node "
+ << node->name()->string_view() << " "
<< LogFileVectorToString(log_files_);
}
if (!filtered_parts[0].source_boot_uuid.empty()) {
@@ -1125,7 +1126,12 @@
timestamped_message.data.message().data()->Data(),
timestamped_message.data.message().data()->size(),
timestamped_message.monotonic_remote_time,
- timestamped_message.realtime_remote_time, remote_queue_index);
+ timestamped_message.realtime_remote_time, remote_queue_index,
+ (channel_source_state_[timestamped_message.channel_index] != nullptr
+ ? CHECK_NOTNULL(
+ channel_source_state_[timestamped_message.channel_index])
+ ->event_loop_->boot_uuid()
+ : event_loop_->boot_uuid()));
if (!sent) return false;
if (queue_index_map_[timestamped_message.channel_index]) {
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 61a4149..63e1cb9 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -63,7 +63,7 @@
}
std::string ShmPath(std::string_view shm_base, const Channel *channel) {
CHECK(channel->has_type());
- return ShmFolder(shm_base, channel) + channel->type()->str() + ".v3";
+ return ShmFolder(shm_base, channel) + channel->type()->str() + ".v4";
}
void PageFaultDataWrite(char *data, size_t size) {
@@ -372,7 +372,7 @@
queue_index.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
- &context_.size, copy_buffer);
+ &context_.remote_boot_uuid, &context_.size, copy_buffer);
if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
if (pin_data()) {
@@ -535,13 +535,15 @@
bool DoSend(size_t length,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index) override {
+ uint32_t remote_queue_index,
+ const UUID &remote_boot_uuid) override {
CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
<< ": Sent too big a message on "
<< configuration::CleanedChannelToString(channel());
- CHECK(lockless_queue_sender_.Send(
- length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
- &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_))
+ CHECK(lockless_queue_sender_.Send(length, monotonic_remote_time,
+ realtime_remote_time, remote_queue_index,
+ remote_boot_uuid, &monotonic_sent_time_,
+ &realtime_sent_time_, &sent_queue_index_))
<< ": Somebody wrote outside the buffer of their message on channel "
<< configuration::CleanedChannelToString(channel());
@@ -552,14 +554,15 @@
bool DoSend(const void *msg, size_t length,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index) override {
+ uint32_t remote_queue_index,
+ const UUID &remote_boot_uuid) override {
CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
<< ": Sent too big a message on "
<< configuration::CleanedChannelToString(channel());
CHECK(lockless_queue_sender_.Send(
reinterpret_cast<const char *>(msg), length, monotonic_remote_time,
- realtime_remote_time, remote_queue_index, &monotonic_sent_time_,
- &realtime_sent_time_, &sent_queue_index_))
+ realtime_remote_time, remote_queue_index, remote_boot_uuid,
+ &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_))
<< ": Somebody wrote outside the buffer of their message on channel "
<< configuration::CleanedChannelToString(channel());
wake_upper_.Wakeup(event_loop()->priority());
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index cd91ab3..2a2b706 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -26,12 +26,12 @@
}
// Clean up anything left there before.
- unlink((FLAGS_shm_base + "/test/aos.TestMessage.v3").c_str());
- unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v3").c_str());
- unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v3").c_str());
- unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v3").c_str());
- unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v3").c_str());
- unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v3").c_str());
+ unlink((FLAGS_shm_base + "/test/aos.TestMessage.v4").c_str());
+ unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v4").c_str());
+ unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v4").c_str());
+ unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v4").c_str());
+ unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v4").c_str());
+ unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v4").c_str());
}
~ShmEventLoopTestFactory() { FLAGS_override_hostname = ""; }
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index b38829e..9d431b7 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -294,12 +294,14 @@
bool DoSend(size_t length,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index) override;
+ uint32_t remote_queue_index,
+ const UUID &remote_boot_uuid) override;
bool DoSend(const void *msg, size_t size,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index) override;
+ uint32_t remote_queue_index,
+ const UUID &remote_boot_uuid) override;
int buffer_index() override {
// First, ensure message_ is allocated.
@@ -834,10 +836,11 @@
simulated_channel_->CountSenderDestroyed();
}
-bool SimulatedSender::DoSend(
- size_t length, aos::monotonic_clock::time_point monotonic_remote_time,
- aos::realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index) {
+bool SimulatedSender::DoSend(size_t length,
+ monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index,
+ const UUID &remote_boot_uuid) {
// The allocations in here are due to infrastructure and don't count in the
// no mallocs in RT code.
ScopedNotRealtime nrt;
@@ -847,6 +850,7 @@
message_->context.remote_queue_index = remote_queue_index;
message_->context.realtime_event_time = event_loop_->realtime_now();
message_->context.realtime_remote_time = realtime_remote_time;
+ message_->context.remote_boot_uuid = remote_boot_uuid;
CHECK_LE(length, message_->context.size);
message_->context.size = length;
@@ -862,11 +866,11 @@
return true;
}
-bool SimulatedSender::DoSend(
- const void *msg, size_t size,
- aos::monotonic_clock::time_point monotonic_remote_time,
- aos::realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index) {
+bool SimulatedSender::DoSend(const void *msg, size_t size,
+ monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index,
+ const UUID &remote_boot_uuid) {
CHECK_LE(size, this->size())
<< ": Attempting to send too big a message on "
<< configuration::CleanedChannelToString(simulated_channel_->channel());
@@ -883,7 +887,7 @@
msg, size);
return DoSend(size, monotonic_remote_time, realtime_remote_time,
- remote_queue_index);
+ remote_queue_index, remote_boot_uuid);
}
SimulatedTimerHandler::SimulatedTimerHandler(
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 60731c1..22d4028 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -1485,18 +1485,28 @@
std::unique_ptr<EventLoop> pi1_remote_timestamp =
simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
- std::string expected_boot_uuid(
- simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
- ->boot_uuid()
- .ToString());
+ UUID expected_boot_uuid =
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid();
int timestamp_count = 0;
pi1_remote_timestamp->MakeWatcher(
+ "/pi2/aos", [&expected_boot_uuid,
+ &pi1_remote_timestamp](const message_bridge::Timestamp &) {
+ EXPECT_EQ(pi1_remote_timestamp->context().remote_boot_uuid,
+ expected_boot_uuid);
+ });
+ pi1_remote_timestamp->MakeWatcher(
+ "/test",
+ [&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
+ EXPECT_EQ(pi1_remote_timestamp->context().remote_boot_uuid,
+ expected_boot_uuid);
+ });
+ pi1_remote_timestamp->MakeWatcher(
shared() ? "/pi1/aos/remote_timestamps/pi2"
: "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
[×tamp_count, &expected_boot_uuid](const RemoteMessage &header) {
EXPECT_TRUE(header.has_boot_uuid());
- EXPECT_EQ(header.boot_uuid()->string_view(), expected_boot_uuid);
+ EXPECT_EQ(UUID::FromString(header.boot_uuid()), expected_boot_uuid);
VLOG(1) << aos::FlatbufferToJson(&header);
++timestamp_count;
});
@@ -1511,7 +1521,7 @@
EXPECT_TRUE(connection->has_boot_uuid());
if (connection->node()->name()->string_view() == "pi2") {
EXPECT_EQ(expected_boot_uuid,
- connection->boot_uuid()->string_view())
+ UUID::FromString(connection->boot_uuid()))
<< " : Got " << aos::FlatbufferToJson(&stats);
++pi1_server_statistics_count;
}
@@ -1527,15 +1537,12 @@
// Confirm that reboot changes the UUID.
simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->Reboot();
- EXPECT_NE(expected_boot_uuid,
- simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
- ->boot_uuid()
- .ToString());
+ EXPECT_NE(
+ expected_boot_uuid,
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid());
expected_boot_uuid =
- simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
- ->boot_uuid()
- .ToString();
+ simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)->boot_uuid();
timestamp_count = 0;
pi1_server_statistics_count = 0;
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index e5eb4cb..eb7f243 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -136,7 +136,8 @@
sender_->Send(fetcher_->context().data, fetcher_->context().size,
fetcher_->context().monotonic_event_time,
fetcher_->context().realtime_event_time,
- fetcher_->context().queue_index);
+ fetcher_->context().queue_index,
+ fetcher_->context().remote_boot_uuid);
// And simulate message_bridge's offset recovery.
client_status_->SampleFilter(client_index_,