Add multi-node local logging to the logger
This is not yet able to forward messages, but is able to log messages
that have been forwarded. Create a log file and test that the
timestamps are getting recorded correctly.
Change-Id: Ica891dbc560543546f6ee594438cebb03672190e
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index e35fe5b..6eae9e9 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -18,6 +18,9 @@
DEFINE_int32(flush_size, 1000000,
"Number of outstanding bytes to allow before flushing to disk.");
+DEFINE_bool(skip_missing_forwarding_entries, false,
+ "If true, drop any forwarding entries with missing data. If "
+ "false, CHECK.");
namespace aos {
namespace logger {
@@ -86,7 +89,44 @@
polling_period_(polling_period) {
for (const Channel *channel : *event_loop_->configuration()->channels()) {
FetcherStruct fs;
- fs.fetcher = event_loop->MakeRawFetcher(channel);
+ const bool is_readable =
+ configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
+ const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
+ channel, event_loop_->node()) &&
+ is_readable;
+
+ const bool log_delivery_times =
+ (event_loop_->node() == nullptr)
+ ? false
+ : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ channel, event_loop_->node(), event_loop_->node());
+
+ if (log_message || log_delivery_times) {
+ fs.fetcher = event_loop->MakeRawFetcher(channel);
+ VLOG(1) << "Logging channel "
+ << configuration::CleanedChannelToString(channel);
+
+ if (log_delivery_times) {
+ if (log_message) {
+ VLOG(1) << " Logging message and delivery times";
+ fs.log_type = LogType::kLogMessageAndDeliveryTime;
+ } else {
+ VLOG(1) << " Logging delivery times only";
+ fs.log_type = LogType::kLogDeliveryTimeOnly;
+ }
+ } else {
+ // We don't have a particularly great use case right now for logging a
+ // forwarded message, but either not logging the delivery times, or
+ // logging them on another node. Fail rather than produce bad results.
+ CHECK(configuration::ChannelIsSendableOnNode(channel,
+ event_loop_->node()))
+ << ": Logger only knows how to log remote messages with "
+ "forwarding timestamps.";
+ VLOG(1) << " Logging message only";
+ fs.log_type = LogType::kLogMessage;
+ }
+ }
+
fs.written = false;
fetchers_.emplace_back(std::move(fs));
}
@@ -99,7 +139,9 @@
// so we can capture the latest message on each channel. This lets us have
// non periodic messages with configuration that now get logged.
for (FetcherStruct &f : fetchers_) {
- f.written = !f.fetcher->Fetch();
+ if (f.fetcher.get() != nullptr) {
+ f.written = !f.fetcher->Fetch();
+ }
}
// We need to pick a point in time to declare the log file "started". This
@@ -122,10 +164,16 @@
flatbuffers::Offset<flatbuffers::String> string_offset =
fbb.CreateString(network::GetHostname());
+ flatbuffers::Offset<Node> node_offset =
+ CopyFlatBuffer(event_loop_->node(), &fbb);
+ LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node());
+
aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
log_file_header_builder.add_name(string_offset);
+ log_file_header_builder.add_node(node_offset);
+
log_file_header_builder.add_configuration(configuration_offset);
// The worst case theoretical out of order is the polling period times 2.
// One message could get logged right after the boundary, but be for right
@@ -157,20 +205,46 @@
flatbuffers::Offset<MessageHeader> PackMessage(
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
- int channel_index) {
- flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
- fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
+ int channel_index, LogType log_type) {
+ flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
+
+ switch(log_type) {
+ case LogType::kLogMessage:
+ case LogType::kLogMessageAndDeliveryTime:
+ data_offset =
+ fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
+ break;
+
+ case LogType::kLogDeliveryTimeOnly:
+ break;
+ }
MessageHeader::Builder message_header_builder(*fbb);
message_header_builder.add_channel_index(channel_index);
+ message_header_builder.add_queue_index(context.queue_index);
message_header_builder.add_monotonic_sent_time(
context.monotonic_event_time.time_since_epoch().count());
message_header_builder.add_realtime_sent_time(
context.realtime_event_time.time_since_epoch().count());
- message_header_builder.add_queue_index(context.queue_index);
+ switch (log_type) {
+ case LogType::kLogMessage:
+ message_header_builder.add_data(data_offset);
+ break;
- message_header_builder.add_data(data_offset);
+ case LogType::kLogMessageAndDeliveryTime:
+ message_header_builder.add_data(data_offset);
+ [[fallthrough]];
+
+ case LogType::kLogDeliveryTimeOnly:
+ message_header_builder.add_monotonic_remote_time(
+ context.monotonic_remote_time.time_since_epoch().count());
+ message_header_builder.add_realtime_remote_time(
+ context.realtime_remote_time.time_since_epoch().count());
+ message_header_builder.add_remote_queue_index(context.remote_queue_index);
+ break;
+ }
+
return message_header_builder.Finish();
}
@@ -188,51 +262,46 @@
size_t channel_index = 0;
// Write each channel to disk, one at a time.
for (FetcherStruct &f : fetchers_) {
- while (true) {
- if (f.fetcher.get() == nullptr) {
- if (!f.fetcher->FetchNext()) {
- VLOG(1) << "No new data on "
- << FlatbufferToJson(f.fetcher->channel());
- break;
- } else {
- f.written = false;
+ // Skip any channels which we aren't supposed to log.
+ if (f.fetcher.get() != nullptr) {
+ while (true) {
+ if (f.written) {
+ if (!f.fetcher->FetchNext()) {
+ VLOG(2) << "No new data on "
+ << configuration::CleanedChannelToString(
+ f.fetcher->channel());
+ break;
+ } else {
+ f.written = false;
+ }
}
- }
- if (f.written) {
- if (!f.fetcher->FetchNext()) {
- VLOG(1) << "No new data on "
- << FlatbufferToJson(f.fetcher->channel());
- break;
+ CHECK(!f.written);
+
+ // TODO(james): Write tests to exercise this logic.
+ if (f.fetcher->context().monotonic_event_time <
+ last_synchronized_time_) {
+ // Write!
+ flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+ max_header_size_);
+ fbb.ForceDefaults(1);
+
+ fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+ channel_index, f.log_type));
+
+ VLOG(2) << "Writing data for channel "
+ << configuration::CleanedChannelToString(
+ f.fetcher->channel());
+
+ max_header_size_ = std::max(
+ max_header_size_, fbb.GetSize() - f.fetcher->context().size);
+ writer_->QueueSizedFlatbuffer(&fbb);
+
+ f.written = true;
} else {
- f.written = false;
+ break;
}
}
-
- CHECK(!f.written);
-
- // TODO(james): Write tests to exercise this logic.
- if (f.fetcher->context().monotonic_event_time <
- last_synchronized_time_) {
- // Write!
- flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
- max_header_size_);
- fbb.ForceDefaults(1);
-
- fbb.FinishSizePrefixed(
- PackMessage(&fbb, f.fetcher->context(), channel_index));
-
- VLOG(1) << "Writing data for channel "
- << FlatbufferToJson(f.fetcher->channel());
-
- max_header_size_ = std::max(
- max_header_size_, fbb.GetSize() - f.fetcher->context().size);
- writer_->QueueSizedFlatbuffer(&fbb);
-
- f.written = true;
- } else {
- break;
- }
}
++channel_index;
@@ -373,11 +442,20 @@
queue_data_time_ = newest_timestamp_ - max_out_of_order_duration_;
}
-const Configuration *LogReader::configuration() {
+const Configuration *LogReader::configuration() const {
return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
->configuration();
}
+const Node *LogReader::node() const {
+ return configuration::GetNode(
+ configuration(),
+ flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+ ->node()
+ ->name()
+ ->string_view());
+}
+
monotonic_clock::time_point LogReader::monotonic_start_time() {
return monotonic_clock::time_point(std::chrono::nanoseconds(
flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
@@ -434,20 +512,32 @@
FlatbufferVector<MessageHeader> front = std::move(channel.front());
- CHECK(front.message().data() != nullptr);
+ if (oldest_channel_index.first > monotonic_start_time() ||
+ event_loop_factory_ != nullptr) {
+ if (!FLAGS_skip_missing_forwarding_entries ||
+ front.message().data() != nullptr) {
+ CHECK(front.message().data() != nullptr)
+ << ": Got a message without data. Forwarding entry which was not "
+ "matched? Use --skip_missing_forwarding_entries to ignore "
+ "this.";
- if (oldest_channel_index.first > monotonic_start_time()) {
- // If we have access to the factory, use it to fix the realtime time.
- if (event_loop_factory_ != nullptr) {
- event_loop_factory_->SetRealtimeOffset(
+ // If we have access to the factory, use it to fix the realtime time.
+ if (event_loop_factory_ != nullptr) {
+ event_loop_factory_->SetRealtimeOffset(
+ monotonic_clock::time_point(
+ chrono::nanoseconds(front.message().monotonic_sent_time())),
+ realtime_clock::time_point(
+ chrono::nanoseconds(front.message().realtime_sent_time())));
+ }
+
+ channel.raw_sender->Send(
+ front.message().data()->Data(), front.message().data()->size(),
monotonic_clock::time_point(
- chrono::nanoseconds(front.message().monotonic_sent_time())),
+ chrono::nanoseconds(front.message().monotonic_remote_time())),
realtime_clock::time_point(
- chrono::nanoseconds(front.message().realtime_sent_time())));
+ chrono::nanoseconds(front.message().realtime_remote_time())),
+ front.message().remote_queue_index());
}
-
- channel.raw_sender->Send(front.message().data()->Data(),
- front.message().data()->size());
} else {
LOG(WARNING) << "Not sending data from before the start of the log file. "
<< oldest_channel_index.first.time_since_epoch().count()