Sort messages between nodes properly
We used to assume the realtime clocks were in sync. This isn't
realistic. Use the timestamps on forwarded messages in each
direction to observe the network latency and the offset between nodes.
Since time is no longer exactly linear with all the adjustments, we need
to redo how events are scheduled. They can't be converted to the
distributed_clock once. They need to now be converted every time they
are compared between nodes.
Change-Id: I1888c1e6a12f475c321a73aa020b0dc0bab107b3
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 03b89d1..44381f3 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -291,7 +291,7 @@
chrono::nanoseconds(result.message().monotonic_sent_time()));
newest_timestamp_ = std::max(newest_timestamp_, timestamp);
- VLOG(1) << "Read from " << filename() << " data " << FlatbufferToJson(result);
+ VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(result);
return std::move(result);
}
@@ -435,8 +435,14 @@
}
} else {
if (!NextLogFile()) {
- VLOG(1) << "End of log file.";
+ VLOG(1) << "End of log file " << filenames_.back();
at_end_ = true;
+ for (MessageHeaderQueue *queue : channels_to_write_) {
+ if (queue == nullptr || queue->timestamp_merger == nullptr) {
+ continue;
+ }
+ queue->timestamp_merger->NoticeAtEnd();
+ }
return false;
}
}
@@ -800,15 +806,40 @@
return oldest_timestamp;
}
+TimestampMerger::DeliveryTimestamp TimestampMerger::OldestTimestamp() const {
+ if (!has_timestamps_ || timestamp_heap_.size() == 0u) {
+ return TimestampMerger::DeliveryTimestamp{};
+ }
+
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+ oldest_timestamp_reader = timestamp_heap_.front();
+
+ std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+ oldest_timestamp = std::get<2>(oldest_timestamp_reader)
+ ->oldest_message(channel_index_, node_index_);
+
+ TimestampMerger::DeliveryTimestamp timestamp;
+ timestamp.monotonic_event_time =
+ monotonic_clock::time_point(chrono::nanoseconds(
+ std::get<2>(oldest_timestamp)->monotonic_sent_time()));
+ timestamp.realtime_event_time = realtime_clock::time_point(
+ chrono::nanoseconds(std::get<2>(oldest_timestamp)->realtime_sent_time()));
+
+ timestamp.monotonic_remote_time =
+ monotonic_clock::time_point(chrono::nanoseconds(
+ std::get<2>(oldest_timestamp)->monotonic_remote_time()));
+ timestamp.realtime_remote_time =
+ realtime_clock::time_point(chrono::nanoseconds(
+ std::get<2>(oldest_timestamp)->realtime_remote_time()));
+
+ timestamp.remote_queue_index = std::get<2>(oldest_timestamp)->queue_index();
+ return timestamp;
+}
+
std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
TimestampMerger::PopOldest() {
if (has_timestamps_) {
- CHECK_GT(message_heap_.size(), 0u)
- << ": Missing data from source node, no data available to match "
- "timestamp on "
- << configuration::CleanedChannelToString(
- configuration_->channels()->Get(channel_index_));
-
+ // Read the timestamps.
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
oldest_timestamp = PopTimestampHeap();
@@ -830,6 +861,15 @@
chrono::nanoseconds(
std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
+ // See if we have any data. If not, pass the problem up the chain.
+ if (message_heap_.size() == 0u) {
+ VLOG(1) << "No data to match timestamp on "
+ << configuration::CleanedChannelToString(
+ configuration_->channels()->Get(channel_index_));
+ return std::make_tuple(timestamp,
+ std::move(std::get<2>(oldest_timestamp)));
+ }
+
while (true) {
{
// Ok, now try grabbing data until we find one which matches.
@@ -842,16 +882,16 @@
std::get<2>(oldest_message_ref)->monotonic_sent_time()));
if (remote_monotonic_time < remote_timestamp_monotonic_time) {
- LOG(INFO) << "Undelivered message, skipping. Remote time is "
- << remote_monotonic_time << " timestamp is "
- << remote_timestamp_monotonic_time << " on channel "
- << channel_index_;
+ VLOG(1) << "Undelivered message, skipping. Remote time is "
+ << remote_monotonic_time << " timestamp is "
+ << remote_timestamp_monotonic_time << " on channel "
+ << channel_index_;
PopMessageHeap();
continue;
} else if (remote_monotonic_time > remote_timestamp_monotonic_time) {
- LOG(INFO) << "Data not found. Remote time should be "
- << remote_timestamp_monotonic_time << " on channel "
- << channel_index_;
+ VLOG(1) << "Data not found. Remote time should be "
+ << remote_timestamp_monotonic_time << " on channel "
+ << channel_index_;
return std::make_tuple(timestamp,
std::move(std::get<2>(oldest_timestamp)));
}
@@ -902,6 +942,8 @@
}
}
+void TimestampMerger::NoticeAtEnd() { channel_merger_->NoticeAtEnd(); }
+
namespace {
std::vector<std::unique_ptr<SplitMessageReader>> MakeSplitMessageReaders(
const std::vector<std::vector<std::string>> &filenames) {
@@ -997,6 +1039,18 @@
return channel_heap_.front().first;
}
+TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestamp() const {
+ if (timestamp_heap_.size() == 0u) {
+ return TimestampMerger::DeliveryTimestamp{};
+ }
+ return timestamp_mergers_[timestamp_heap_.front().second].OldestTimestamp();
+}
+
+TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestampForChannel(
+ int channel) const {
+ return timestamp_mergers_[channel].OldestTimestamp();
+}
+
void ChannelMerger::PushChannelHeap(monotonic_clock::time_point timestamp,
int channel_index) {
// Pop and recreate the heap if it has already been pushed. And since we are
@@ -1009,6 +1063,16 @@
}));
std::make_heap(channel_heap_.begin(), channel_heap_.end(),
ChannelHeapCompare);
+
+ if (timestamp_mergers_[channel_index].has_timestamps()) {
+ timestamp_heap_.erase(std::find_if(
+ timestamp_heap_.begin(), timestamp_heap_.end(),
+ [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
+ return x.second == channel_index;
+ }));
+ std::make_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+ ChannelHeapCompare);
+ }
}
channel_heap_.push_back(std::make_pair(timestamp, channel_index));
@@ -1017,22 +1081,40 @@
// put the oldest message first.
std::push_heap(channel_heap_.begin(), channel_heap_.end(),
ChannelHeapCompare);
+
+ if (timestamp_mergers_[channel_index].has_timestamps()) {
+ timestamp_heap_.push_back(std::make_pair(timestamp, channel_index));
+ std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+ ChannelHeapCompare);
+ }
}
std::tuple<TimestampMerger::DeliveryTimestamp, int,
FlatbufferVector<MessageHeader>>
ChannelMerger::PopOldest() {
- CHECK(channel_heap_.size() > 0);
+ CHECK_GT(channel_heap_.size(), 0u);
std::pair<monotonic_clock::time_point, int> oldest_channel_data =
channel_heap_.front();
int channel_index = oldest_channel_data.second;
std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
&ChannelHeapCompare);
channel_heap_.pop_back();
+
timestamp_mergers_[channel_index].set_pushed(false);
TimestampMerger *merger = ×tamp_mergers_[channel_index];
+ if (merger->has_timestamps()) {
+ CHECK_GT(timestamp_heap_.size(), 0u);
+ std::pair<monotonic_clock::time_point, int> oldest_timestamp_data =
+ timestamp_heap_.front();
+ CHECK(oldest_timestamp_data == oldest_channel_data)
+ << ": Timestamp heap out of sync.";
+ std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+ &ChannelHeapCompare);
+ timestamp_heap_.pop_back();
+ }
+
// Merger handles any queueing needed from here.
std::tuple<TimestampMerger::DeliveryTimestamp,
FlatbufferVector<MessageHeader>>