Fix log sorting for good
Unfortunately, this is really hard to split up, so a bunch of stuff
happens at once.
When a message is sent from node A -> node B, add support for sending
that timestamp back from node B to node A for logging.
{
"name": "/pi1/aos",
"type": "aos.message_bridge.Timestamp",
"source_node": "pi1",
"frequency": 10,
"max_size": 200,
"destination_nodes": [
{
"name": "pi2",
"priority": 1,
"timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
"timestamp_logger_nodes": ["pi1"]
}
]
},
This gives us a way to log enough information on node A such that
everything is self contained. We know all the messages we sent to B,
and when they got there, so we can recreate the time offset and replay
the node.
This data is then published over
{ "name": "/aos/remote_timestamps/pi2", "type": ".aos.logger.MessageHeader"}
The logger then treats that channel specially and log the contents
directly as though the message contents were received on the remote
node.
This (among other things) exposes log sorting problems. Use our fancy
new infinite precision noncausal filter to estimate time precise enough
to actually order events. This gets us down to 2-3 ns of error due to
integer precision.
Change-Id: Ia843c5176a2c4efc227e669c07d7bb4c7cbe7c91
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index ade82f9..05ee4e0 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -37,6 +37,24 @@
DetachedBufferWriter::~DetachedBufferWriter() {
Flush();
PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
+ VLOG(1) << "Closed " << filename_;
+}
+
+DetachedBufferWriter::DetachedBufferWriter(
+ DetachedBufferWriter &&other) {
+ *this = std::move(other);
+}
+
+DetachedBufferWriter &DetachedBufferWriter::operator=(
+ DetachedBufferWriter &&other) {
+ Flush();
+ std::swap(filename_, other.filename_);
+ std::swap(fd_, other.fd_);
+ std::swap(queued_size_, other.queued_size_);
+ std::swap(written_size_, other.written_size_);
+ std::swap(queue_, other.queue_);
+ std::swap(iovec_, other.iovec_);
+ return *this;
}
void DetachedBufferWriter::QueueSizedFlatbuffer(
@@ -290,7 +308,7 @@
FlatbufferVector<LogFileHeader>(std::move(header_data_copy));
max_out_of_order_duration_ =
- std::chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
+ chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
VLOG(1) << "Opened " << filename << " as node "
<< FlatbufferToJson(log_file_header()->node());
@@ -323,6 +341,49 @@
// open more of them).
log_file_header_ = message_reader_->raw_log_file_header();
+ for (size_t i = 1; i < filenames_.size(); ++i) {
+ MessageReader message_reader(filenames_[i]);
+
+ const monotonic_clock::time_point new_monotonic_start_time(
+ chrono::nanoseconds(
+ message_reader.log_file_header()->monotonic_start_time()));
+ const realtime_clock::time_point new_realtime_start_time(
+ chrono::nanoseconds(
+ message_reader.log_file_header()->realtime_start_time()));
+
+ // There are 2 types of part files. Part files from before time estimation
+ // has started, and part files after. We don't declare a log file "started"
+ // until time estimation is up. And once a log file starts, it should never
+ // stop again, and should remain constant.
+ // To compare both types of headers, we mutate our saved copy of the header
+ // to match the next chunk by updating time if we detect a stopped ->
+ // started transition.
+ if (monotonic_start_time() == monotonic_clock::min_time) {
+ CHECK_EQ(realtime_start_time(), realtime_clock::min_time);
+ // We should only be missing the monotonic start time when logging data
+ // for remote nodes. We don't have a good way to deteremine the remote
+ // realtime offset, so it shouldn't be filled out.
+ // TODO(austin): If we have a good way, feel free to fill it out. It
+ // probably won't be better than we could do in post though with the same
+ // data.
+ CHECK(!log_file_header_.mutable_message()->has_realtime_start_time());
+ if (new_monotonic_start_time != monotonic_clock::min_time) {
+ // If we finally found our start time, update the header. Do this once
+ // because it should never change again.
+ log_file_header_.mutable_message()->mutate_monotonic_start_time(
+ new_monotonic_start_time.time_since_epoch().count());
+ log_file_header_.mutable_message()->mutate_realtime_start_time(
+ new_realtime_start_time.time_since_epoch().count());
+ }
+ }
+
+ // Now compare that the headers match.
+ CHECK(CompareFlatBuffer(message_reader.raw_log_file_header(),
+ log_file_header_))
+ << ": Header is different between log file chunks " << filenames_[0]
+ << " and " << filenames_[i] << ", this is not supported.";
+ }
+
// Setup per channel state.
channels_.resize(configuration()->channels()->size());
for (ChannelData &channel_data : channels_) {
@@ -545,10 +606,13 @@
timestamp = channels_[channel_index].data.front_timestamp();
FlatbufferVector<MessageHeader> front =
std::move(channels_[channel_index].data.front());
- channels_[channel_index].data.pop_front();
+ channels_[channel_index].data.PopFront();
- VLOG(1) << MaybeNodeName(target_node_) << "Popped " << this << " "
- << std::get<0>(timestamp) << " for " << channel_index;
+ VLOG(1) << MaybeNodeName(target_node_) << "Popped Data " << this << " "
+ << std::get<0>(timestamp) << " for "
+ << configuration::StrippedChannelToString(
+ configuration()->channels()->Get(channel_index))
+ << " (" << channel_index << ")";
QueueMessages(std::get<0>(timestamp));
@@ -558,19 +622,21 @@
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
-SplitMessageReader::PopOldest(int channel, int node_index) {
+SplitMessageReader::PopOldestTimestamp(int channel, int node_index) {
CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
timestamp = channels_[channel].timestamps[node_index].front_timestamp();
FlatbufferVector<MessageHeader> front =
std::move(channels_[channel].timestamps[node_index].front());
- channels_[channel].timestamps[node_index].pop_front();
+ channels_[channel].timestamps[node_index].PopFront();
- VLOG(1) << MaybeNodeName(target_node_) << "Popped " << this << " "
+ VLOG(1) << MaybeNodeName(target_node_) << "Popped timestamp " << this << " "
<< std::get<0>(timestamp) << " for "
<< configuration::StrippedChannelToString(
configuration()->channels()->Get(channel))
- << " on " << node_index;
+ << " on "
+ << configuration()->nodes()->Get(node_index)->name()->string_view()
+ << " (" << node_index << ")";
QueueMessages(std::get<0>(timestamp));
@@ -607,7 +673,7 @@
return true;
}
-void SplitMessageReader::MessageHeaderQueue::pop_front() {
+void SplitMessageReader::MessageHeaderQueue::PopFront() {
data_.pop_front();
if (data_.size() != 0u) {
// Yup, new data.
@@ -616,6 +682,15 @@
} else {
timestamp_merger->Update(split_reader, front_timestamp());
}
+ } else {
+ // Poke anyways to update the heap.
+ if (timestamps) {
+ timestamp_merger->UpdateTimestamp(
+ nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
+ } else {
+ timestamp_merger->Update(
+ nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
+ }
}
}
@@ -687,25 +762,32 @@
std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
timestamp,
SplitMessageReader *split_message_reader) {
- DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
- [split_message_reader](
- const std::tuple<monotonic_clock::time_point,
- uint32_t, SplitMessageReader *>
- x) {
- return std::get<2>(x) == split_message_reader;
- }) == message_heap_.end())
- << ": Pushing message when it is already in the heap.";
+ if (split_message_reader != nullptr) {
+ DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
+ [split_message_reader](
+ const std::tuple<monotonic_clock::time_point,
+ uint32_t, SplitMessageReader *>
+ x) {
+ return std::get<2>(x) == split_message_reader;
+ }) == message_heap_.end())
+ << ": Pushing message when it is already in the heap.";
- message_heap_.push_back(std::make_tuple(
- std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
+ message_heap_.push_back(std::make_tuple(
+ std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
- std::push_heap(message_heap_.begin(), message_heap_.end(),
- &SplitMessageReaderHeapCompare);
+ std::push_heap(message_heap_.begin(), message_heap_.end(),
+ &SplitMessageReaderHeapCompare);
+ }
// If we are just a data merger, don't wait for timestamps.
if (!has_timestamps_) {
- channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
- pushed_ = true;
+ if (!message_heap_.empty()) {
+ channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
+ pushed_ = true;
+ } else {
+ // Remove ourselves if we are empty.
+ channel_merger_->Update(monotonic_clock::min_time, channel_index_);
+ }
}
}
@@ -730,26 +812,33 @@
std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
timestamp,
SplitMessageReader *split_message_reader) {
- DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
- [split_message_reader](
- const std::tuple<monotonic_clock::time_point,
- uint32_t, SplitMessageReader *>
- x) {
- return std::get<2>(x) == split_message_reader;
- }) == timestamp_heap_.end())
- << ": Pushing timestamp when it is already in the heap.";
+ if (split_message_reader != nullptr) {
+ DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
+ [split_message_reader](
+ const std::tuple<monotonic_clock::time_point,
+ uint32_t, SplitMessageReader *>
+ x) {
+ return std::get<2>(x) == split_message_reader;
+ }) == timestamp_heap_.end())
+ << ": Pushing timestamp when it is already in the heap.";
- timestamp_heap_.push_back(std::make_tuple(
- std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
+ timestamp_heap_.push_back(std::make_tuple(
+ std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
- std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
- SplitMessageReaderHeapCompare);
+ std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+ SplitMessageReaderHeapCompare);
+ }
// If we are a timestamp merger, don't wait for data. Missing data will be
// caught at read time.
if (has_timestamps_) {
- channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
- pushed_ = true;
+ if (!timestamp_heap_.empty()) {
+ channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
+ pushed_ = true;
+ } else {
+ // Remove ourselves if we are empty.
+ channel_merger_->Update(monotonic_clock::min_time, channel_index_);
+ }
}
}
@@ -832,50 +921,73 @@
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
oldest_timestamp = std::get<2>(oldest_timestamp_reader)
- ->PopOldest(channel_index_, node_index_);
+ ->PopOldestTimestamp(channel_index_, node_index_);
// Confirm that the time we have recorded matches.
CHECK_EQ(std::get<0>(oldest_timestamp), std::get<0>(oldest_timestamp_reader));
CHECK_EQ(std::get<1>(oldest_timestamp), std::get<1>(oldest_timestamp_reader));
- // TODO(austin): What if we get duplicate timestamps?
+ // Now, keep reading until we have found all duplicates.
+ while (!timestamp_heap_.empty()) {
+ // See if it is a duplicate.
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+ next_oldest_timestamp_reader = timestamp_heap_.front();
- return oldest_timestamp;
-}
+ std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+ next_oldest_timestamp_time =
+ std::get<2>(next_oldest_timestamp_reader)
+ ->oldest_message(channel_index_, node_index_);
-TimestampMerger::DeliveryTimestamp TimestampMerger::OldestTimestamp() const {
- if (!has_timestamps_ || timestamp_heap_.size() == 0u) {
- return TimestampMerger::DeliveryTimestamp{};
+ if (std::get<0>(next_oldest_timestamp_time) ==
+ std::get<0>(oldest_timestamp) &&
+ std::get<1>(next_oldest_timestamp_time) ==
+ std::get<1>(oldest_timestamp)) {
+ // Pop the timestamp reader pointer.
+ std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+ &SplitMessageReaderHeapCompare);
+ timestamp_heap_.pop_back();
+
+ // Pop the next oldest timestamp. This re-pushes any messages from the
+ // reader.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ next_oldest_timestamp =
+ std::get<2>(next_oldest_timestamp_reader)
+ ->PopOldestTimestamp(channel_index_, node_index_);
+
+ // And make sure the contents matches in it's entirety.
+ CHECK(std::get<2>(oldest_timestamp).span() ==
+ std::get<2>(next_oldest_timestamp).span())
+ << ": Data at the same timestamp doesn't match, "
+ << aos::FlatbufferToJson(std::get<2>(oldest_timestamp)) << " vs "
+ << aos::FlatbufferToJson(std::get<2>(next_oldest_timestamp)) << " "
+ << absl::BytesToHexString(std::string_view(
+ reinterpret_cast<const char *>(
+ std::get<2>(oldest_timestamp).span().data()),
+ std::get<2>(oldest_timestamp).span().size()))
+ << " vs "
+ << absl::BytesToHexString(std::string_view(
+ reinterpret_cast<const char *>(
+ std::get<2>(next_oldest_timestamp).span().data()),
+ std::get<2>(next_oldest_timestamp).span().size()));
+
+ } else {
+ break;
+ }
}
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_timestamp_reader = timestamp_heap_.front();
-
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_timestamp = std::get<2>(oldest_timestamp_reader)
- ->oldest_message(channel_index_, node_index_);
-
- TimestampMerger::DeliveryTimestamp timestamp;
- timestamp.monotonic_event_time =
- monotonic_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_timestamp)->monotonic_sent_time()));
- timestamp.realtime_event_time = realtime_clock::time_point(
- chrono::nanoseconds(std::get<2>(oldest_timestamp)->realtime_sent_time()));
-
- timestamp.monotonic_remote_time =
- monotonic_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_timestamp)->monotonic_remote_time()));
- timestamp.realtime_remote_time =
- realtime_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_timestamp)->realtime_remote_time()));
-
- timestamp.remote_queue_index = std::get<2>(oldest_timestamp)->queue_index();
- return timestamp;
+ return oldest_timestamp;
}
std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
TimestampMerger::PopOldest() {
if (has_timestamps_) {
+ VLOG(1) << "Looking for matching timestamp for "
+ << configuration::StrippedChannelToString(
+ configuration_->channels()->Get(channel_index_))
+ << " (" << channel_index_ << ") "
+ << " at " << std::get<0>(oldest_timestamp());
+
// Read the timestamps.
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
@@ -944,7 +1056,8 @@
<< " on channel "
<< configuration::StrippedChannelToString(
configuration_->channels()->Get(channel_index_))
- << " (" << channel_index_ << ")";
+ << " (" << channel_index_ << ")"
+ << (VLOG_IS_ON(1) ? DebugString() : "");
return std::make_tuple(timestamp,
std::move(std::get<2>(oldest_timestamp)));
}
@@ -952,6 +1065,10 @@
timestamp.monotonic_remote_time = remote_monotonic_time;
}
+ VLOG(1) << "Found matching data "
+ << configuration::StrippedChannelToString(
+ configuration_->channels()->Get(channel_index_))
+ << " (" << channel_index_ << ")";
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
oldest_message = PopMessageHeap();
@@ -1062,10 +1179,27 @@
<< FlatbufferToJson(reader->node()) << " start_time "
<< monotonic_start_time();
} else {
- // And then make sure all the other files have matching headers.
- CHECK(CompareFlatBuffer(log_file_header(), reader->log_file_header()))
- << ": " << FlatbufferToJson(log_file_header()) << " reader "
- << FlatbufferToJson(reader->log_file_header());
+ // Find the earliest start time. That way, if we get a full log file
+ // directly from the node, and a partial later, we start with the
+ // full. Update our header to match that.
+ const monotonic_clock::time_point new_monotonic_start_time(
+ chrono::nanoseconds(
+ reader->log_file_header()->monotonic_start_time()));
+ const realtime_clock::time_point new_realtime_start_time(
+ chrono::nanoseconds(
+ reader->log_file_header()->realtime_start_time()));
+
+ if (monotonic_start_time() == monotonic_clock::min_time ||
+ (new_monotonic_start_time != monotonic_clock::min_time &&
+ new_monotonic_start_time < monotonic_start_time())) {
+ log_file_header_.mutable_message()->mutate_monotonic_start_time(
+ new_monotonic_start_time.time_since_epoch().count());
+ log_file_header_.mutable_message()->mutate_realtime_start_time(
+ new_realtime_start_time.time_since_epoch().count());
+ VLOG(1) << "Updated log file " << reader->filename()
+ << " with node " << FlatbufferToJson(reader->node())
+ << " start_time " << new_monotonic_start_time;
+ }
}
}
}
@@ -1105,24 +1239,6 @@
return channel_heap_.front().first;
}
-TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestamp() const {
- if (timestamp_heap_.empty()) {
- return TimestampMerger::DeliveryTimestamp{};
- }
- return timestamp_mergers_[timestamp_heap_.front().second].OldestTimestamp();
-}
-
-TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestampForChannel(
- int channel) const {
- // If we didn't find any data for this node, we won't have any mergers. Return
- // an invalid timestamp in that case.
- if (timestamp_mergers_.size() <= static_cast<size_t>(channel)) {
- TimestampMerger::DeliveryTimestamp result;
- return result;
- }
- return timestamp_mergers_[channel].OldestTimestamp();
-}
-
void ChannelMerger::PushChannelHeap(monotonic_clock::time_point timestamp,
int channel_index) {
// Pop and recreate the heap if it has already been pushed. And since we are
@@ -1161,6 +1277,11 @@
}
}
+ if (timestamp == monotonic_clock::min_time) {
+ timestamp_mergers_[channel_index].set_pushed(false);
+ return;
+ }
+
channel_heap_.push_back(std::make_pair(timestamp, channel_index));
// The default sort puts the newest message first. Use a custom comparator to
@@ -1175,6 +1296,32 @@
}
}
+void ChannelMerger::VerifyHeaps() {
+ {
+ std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
+ channel_heap_;
+ std::make_heap(channel_heap.begin(), channel_heap.end(),
+ &ChannelHeapCompare);
+
+ for (size_t i = 0; i < channel_heap_.size(); ++i) {
+ CHECK(channel_heap_[i] == channel_heap[i]) << ": Heaps diverged...";
+ CHECK_EQ(std::get<0>(channel_heap[i]),
+ timestamp_mergers_[std::get<1>(channel_heap[i])]
+ .channel_merger_time());
+ }
+ }
+ {
+ std::vector<std::pair<monotonic_clock::time_point, int>> timestamp_heap =
+ timestamp_heap_;
+ std::make_heap(timestamp_heap.begin(), timestamp_heap.end(),
+ &ChannelHeapCompare);
+
+ for (size_t i = 0; i < timestamp_heap_.size(); ++i) {
+ CHECK(timestamp_heap_[i] == timestamp_heap[i]) << ": Heaps diverged...";
+ }
+ }
+}
+
std::tuple<TimestampMerger::DeliveryTimestamp, int,
FlatbufferVector<MessageHeader>>
ChannelMerger::PopOldest() {
@@ -1210,6 +1357,16 @@
<< ": channel_heap_ was corrupted for " << channel_index << ": "
<< DebugString();
+ CHECK_GE(std::get<0>(message).monotonic_event_time, last_popped_time_)
+ << ": " << MaybeNodeName(log_file_header()->node())
+ << "Messages came off the queue out of order. " << DebugString();
+ last_popped_time_ = std::get<0>(message).monotonic_event_time;
+
+ VLOG(1) << "Popped " << last_popped_time_ << " "
+ << configuration::StrippedChannelToString(
+ configuration()->channels()->Get(channel_index))
+ << " (" << channel_index << ")";
+
return std::make_tuple(std::get<0>(message), channel_index,
std::move(std::get<1>(message)));
}
@@ -1217,27 +1374,31 @@
std::string SplitMessageReader::MessageHeaderQueue::DebugString() const {
std::stringstream ss;
for (size_t i = 0; i < data_.size(); ++i) {
- if (timestamps) {
- ss << " msg: ";
- } else {
- ss << " timestamp: ";
- }
- ss << monotonic_clock::time_point(std::chrono::nanoseconds(
- data_[i].message().monotonic_sent_time()))
- << " ("
- << realtime_clock::time_point(
- std::chrono::nanoseconds(data_[i].message().realtime_sent_time()))
- << ") " << data_[i].message().queue_index();
- if (timestamps) {
- ss << " <- remote "
- << monotonic_clock::time_point(std::chrono::nanoseconds(
- data_[i].message().monotonic_remote_time()))
+ if (i < 5 || i + 5 > data_.size()) {
+ if (timestamps) {
+ ss << " msg: ";
+ } else {
+ ss << " timestamp: ";
+ }
+ ss << monotonic_clock::time_point(
+ chrono::nanoseconds(data_[i].message().monotonic_sent_time()))
<< " ("
- << realtime_clock::time_point(std::chrono::nanoseconds(
- data_[i].message().realtime_remote_time()))
- << ")";
+ << realtime_clock::time_point(
+ chrono::nanoseconds(data_[i].message().realtime_sent_time()))
+ << ") " << data_[i].message().queue_index();
+ if (timestamps) {
+ ss << " <- remote "
+ << monotonic_clock::time_point(chrono::nanoseconds(
+ data_[i].message().monotonic_remote_time()))
+ << " ("
+ << realtime_clock::time_point(chrono::nanoseconds(
+ data_[i].message().realtime_remote_time()))
+ << ")";
+ }
+ ss << "\n";
+ } else if (i == 5) {
+ ss << " ...\n";
}
- ss << "\n";
}
return ss.str();