Handle log files not starting at the same time.
The monotonic clocks were assumed to be in sync. That isn't realistic.
This assumption leaked into how we kept the queues primed, and how the
event loop was initialized.
This isn't enough to actually replay in sync. We are assuming that the
realtime clocks are in sync and the monotonic clocks don't drift from
each other. That'll be good enough to get started, but not for long.
Change-Id: Ic18e31598f1a76edee0b0d5a2d7936deee1fbfec
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 144890a..03b89d1 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -271,9 +271,11 @@
// And copy the config so we have it forever.
configuration_ = std::vector<uint8_t>(config_data.begin(), config_data.end());
- max_out_of_order_duration_ = std::chrono::nanoseconds(
- flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
- ->max_out_of_order_duration());
+ max_out_of_order_duration_ =
+ std::chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
+
+ VLOG(1) << "Opened " << filename << " as node "
+ << FlatbufferToJson(log_file_header()->node());
}
std::optional<FlatbufferVector<MessageHeader>> MessageReader::ReadMessage() {
@@ -289,8 +291,7 @@
chrono::nanoseconds(result.message().monotonic_sent_time()));
newest_timestamp_ = std::max(newest_timestamp_, timestamp);
- VLOG(1) << "Read from " << filename().substr(130) << " data "
- << FlatbufferToJson(result);
+ VLOG(1) << "Read from " << filename() << " data " << FlatbufferToJson(result);
return std::move(result);
}
@@ -361,16 +362,56 @@
}
bool SplitMessageReader::QueueMessages(
- monotonic_clock::time_point oldest_message_time) {
+ monotonic_clock::time_point last_dequeued_time) {
// TODO(austin): Once we are happy that everything works, read a 256kb chunk
// to reduce the need to re-heap down below.
+
+ // Special case no more data. Otherwise we blow up on the CHECK statement
+ // confirming that we have enough data queued.
+ if (at_end_) {
+ return false;
+ }
+
+ // If this isn't the first time around, confirm that we had enough data queued
+ // to follow the contract.
+ if (time_to_queue_ != monotonic_clock::min_time) {
+ CHECK_LE(last_dequeued_time,
+ newest_timestamp() - max_out_of_order_duration())
+ << " node " << FlatbufferToJson(node()) << " on " << this;
+
+ // Bail if there is enough data already queued.
+ if (last_dequeued_time < time_to_queue_) {
+ VLOG(1) << "All up to date on " << this << ", dequeued "
+ << last_dequeued_time << " queue time " << time_to_queue_;
+ return true;
+ }
+ } else {
+ // Startup takes a special dance. We want to queue up until the start time,
+ // but we then want to find the next message to read. The conservative
+ // answer is to immediately trigger a second requeue to get things moving.
+ time_to_queue_ = monotonic_start_time();
+ QueueMessages(time_to_queue_);
+ }
+
+ // If we are asked to queue, queue for at least max_out_of_order_duration past
+ // the last known time in the log file (ie the newest timestep read). As long
+ // as we requeue exactly when time_to_queue_ is dequeued and go no further, we
+ // are safe. And since we pop in order, that works.
+ //
+ // Special case the start of the log file. There should be at most 1 message
+ // from each channel at the start of the log file. So always force the start
+ // of the log file to just be read.
+ time_to_queue_ = std::max(time_to_queue_, newest_timestamp());
+ VLOG(1) << "Queueing, going until " << time_to_queue_ << " " << filename();
+
+ bool was_emplaced = false;
while (true) {
- // Don't queue if we have enough data already.
- // When a log file starts, there should be a message from each channel.
- // Those messages might be very old. Make sure to read a chunk past the
- // starting time.
- if (queued_messages_ > 0 &&
- message_reader_->queue_data_time() > oldest_message_time) {
+ // Stop if we have enough.
+ if (newest_timestamp() >
+ time_to_queue_ + max_out_of_order_duration() &&
+ was_emplaced) {
+ VLOG(1) << "Done queueing on " << this << ", queued to "
+ << newest_timestamp() << " with requeue time " << time_to_queue_;
return true;
}
@@ -378,12 +419,24 @@
message_reader_->ReadMessage()) {
const MessageHeader &header = msg.value().message();
- const int channel_index = header.channel_index();
- channels_to_write_[channel_index]->emplace_back(std::move(msg.value()));
+ const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
+ chrono::nanoseconds(header.monotonic_sent_time()));
- ++queued_messages_;
+ VLOG(1) << "Queued " << this << " " << filename()
+ << " ttq: " << time_to_queue_ << " now "
+ << newest_timestamp() << " start time "
+ << monotonic_start_time() << " " << FlatbufferToJson(&header);
+
+ const int channel_index = header.channel_index();
+ was_emplaced = channels_to_write_[channel_index]->emplace_back(
+ std::move(msg.value()));
+ if (was_emplaced) {
+ newest_timestamp_ = std::max(newest_timestamp_, timestamp);
+ }
} else {
if (!NextLogFile()) {
+ VLOG(1) << "End of log file.";
+ at_end_ = true;
return false;
}
}
@@ -398,26 +451,41 @@
const Channel *const channel =
configuration()->channels()->Get(channel_index);
+ VLOG(1) << " Configuring merger " << this << " for channel " << channel_index
+ << " "
+ << configuration::CleanedChannelToString(
+ configuration()->channels()->Get(channel_index));
+
MessageHeaderQueue *message_header_queue = nullptr;
// Figure out if this log file is from our point of view, or the other node's
// point of view.
if (node() == reinterpreted_target_node) {
- if (channels_to_write_[channel_index] != nullptr) {
- // We already have deduced which is the right channel. Use
- // channels_to_write_ here.
- message_header_queue = channels_to_write_[channel_index];
+ VLOG(1) << " Replaying as logged node " << filename();
+
+ if (configuration::ChannelIsSendableOnNode(channel, node())) {
+ VLOG(1) << " Data on node";
+ message_header_queue = &(channels_[channel_index].data);
+ } else if (configuration::ChannelIsReadableOnNode(channel, node())) {
+ VLOG(1) << " Timestamps on node";
+ message_header_queue =
+ &(channels_[channel_index].timestamps[configuration::GetNodeIndex(
+ configuration(), node())]);
} else {
- // This means this is data from another node, and will be ignored.
+ VLOG(1) << " Dropping";
}
} else {
+ VLOG(1) << " Replaying as other node " << filename();
// We are replaying from another node's point of view. The only interesting
- // data is data that is forwarded to our node, ie was sent on the other
- // node.
- if (configuration::ChannelIsSendableOnNode(channel, node())) {
+ // data is data that is sent from our node and received on theirs.
+ if (configuration::ChannelIsReadableOnNode(channel,
+ reinterpreted_target_node) &&
+ configuration::ChannelIsSendableOnNode(channel, node())) {
+ VLOG(1) << " Readable on target node";
// Data from another node.
message_header_queue = &(channels_[channel_index].data);
} else {
+ VLOG(1) << " Dropping";
// This is either not sendable on the other node, or is a timestamp and
// therefore not interesting.
}
@@ -435,12 +503,15 @@
FlatbufferVector<MessageHeader>>
SplitMessageReader::PopOldest(int channel_index) {
CHECK_GT(channels_[channel_index].data.size(), 0u);
- const std::tuple<monotonic_clock::time_point, uint32_t> timestamp =
- channels_[channel_index].data.front_timestamp();
+ const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+ timestamp = channels_[channel_index].data.front_timestamp();
FlatbufferVector<MessageHeader> front =
std::move(channels_[channel_index].data.front());
channels_[channel_index].data.pop_front();
- --queued_messages_;
+
+ VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp);
+
+ QueueMessages(std::get<0>(timestamp));
return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
std::move(front));
@@ -450,18 +521,21 @@
FlatbufferVector<MessageHeader>>
SplitMessageReader::PopOldest(int channel, int node_index) {
CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
- const std::tuple<monotonic_clock::time_point, uint32_t> timestamp =
- channels_[channel].timestamps[node_index].front_timestamp();
+ const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+ timestamp = channels_[channel].timestamps[node_index].front_timestamp();
FlatbufferVector<MessageHeader> front =
std::move(channels_[channel].timestamps[node_index].front());
channels_[channel].timestamps[node_index].pop_front();
- --queued_messages_;
+
+ VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp);
+
+ QueueMessages(std::get<0>(timestamp));
return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
std::move(front));
}
-void SplitMessageReader::MessageHeaderQueue::emplace_back(
+bool SplitMessageReader::MessageHeaderQueue::emplace_back(
FlatbufferVector<MessageHeader> &&msg) {
CHECK(split_reader != nullptr);
@@ -469,7 +543,7 @@
// the message. This happens when a log file from another node is replayed,
// and the timestamp mergers down stream just don't care.
if (timestamp_merger == nullptr) {
- return;
+ return false;
}
CHECK(timestamps != msg.message().has_data())
@@ -486,6 +560,8 @@
timestamp_merger->Update(split_reader, front_timestamp());
}
}
+
+ return true;
}
void SplitMessageReader::MessageHeaderQueue::pop_front() {
@@ -550,6 +626,8 @@
: -1),
channel_merger_(channel_merger) {
// Tell the readers we care so they know who to notify.
+ VLOG(1) << "Configuring channel " << channel_index << " target node "
+ << FlatbufferToJson(target_node);
for (SplitMessageReader *reader : split_message_readers_) {
reader->SetTimestampMerger(this, channel_index, target_node);
}
@@ -563,7 +641,8 @@
}
void TimestampMerger::PushMessageHeap(
- std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+ std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+ timestamp,
SplitMessageReader *split_message_reader) {
DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
[split_message_reader](
@@ -587,8 +666,26 @@
}
}
+std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+TimestampMerger::oldest_message() const {
+ CHECK_GT(message_heap_.size(), 0u);
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+ oldest_message_reader = message_heap_.front();
+ return std::get<2>(oldest_message_reader)->oldest_message(channel_index_);
+}
+
+std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+TimestampMerger::oldest_timestamp() const {
+ CHECK_GT(timestamp_heap_.size(), 0u);
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+ oldest_message_reader = timestamp_heap_.front();
+ return std::get<2>(oldest_message_reader)
+ ->oldest_message(channel_index_, node_index_);
+}
+
void TimestampMerger::PushTimestampHeap(
- std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+ std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+ timestamp,
SplitMessageReader *split_message_reader) {
DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
[split_message_reader](
@@ -642,8 +739,9 @@
std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
next_oldest_message_reader = message_heap_.front();
- std::tuple<monotonic_clock::time_point, uint32_t> next_oldest_message_time =
- std::get<2>(next_oldest_message_reader)->oldest_message(channel_index_);
+ std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+ next_oldest_message_time = std::get<2>(next_oldest_message_reader)
+ ->oldest_message(channel_index_);
if (std::get<0>(next_oldest_message_time) == std::get<0>(oldest_message) &&
std::get<1>(next_oldest_message_time) == std::get<1>(oldest_message)) {
@@ -733,33 +831,52 @@
std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
while (true) {
- // Ok, now try grabbing data until we find one which matches.
+ {
+ // Ok, now try grabbing data until we find one which matches.
+ std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+ oldest_message_ref = oldest_message();
+
+ // Time at which the message was sent (this message is written from the
+ // sending node's perspective.
+ monotonic_clock::time_point remote_monotonic_time(chrono::nanoseconds(
+ std::get<2>(oldest_message_ref)->monotonic_sent_time()));
+
+ if (remote_monotonic_time < remote_timestamp_monotonic_time) {
+ LOG(INFO) << "Undelivered message, skipping. Remote time is "
+ << remote_monotonic_time << " timestamp is "
+ << remote_timestamp_monotonic_time << " on channel "
+ << channel_index_;
+ PopMessageHeap();
+ continue;
+ } else if (remote_monotonic_time > remote_timestamp_monotonic_time) {
+ LOG(INFO) << "Data not found. Remote time should be "
+ << remote_timestamp_monotonic_time << " on channel "
+ << channel_index_;
+ return std::make_tuple(timestamp,
+ std::move(std::get<2>(oldest_timestamp)));
+ }
+
+ timestamp.monotonic_remote_time = remote_monotonic_time;
+ }
+
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
oldest_message = PopMessageHeap();
- // Time at which the message was sent (this message is written from the
- // sending node's perspective.
- monotonic_clock::time_point remote_monotonic_time(chrono::nanoseconds(
- std::get<2>(oldest_message).message().monotonic_sent_time()));
-
- if (remote_monotonic_time < remote_timestamp_monotonic_time) {
- LOG(INFO) << "Undelivered message, skipping. Remote time is "
- << remote_monotonic_time << " timestamp is "
- << remote_timestamp_monotonic_time << " on channel "
- << channel_index_;
- continue;
- }
-
- timestamp.monotonic_remote_time = remote_monotonic_time;
timestamp.realtime_remote_time =
realtime_clock::time_point(chrono::nanoseconds(
std::get<2>(oldest_message).message().realtime_sent_time()));
timestamp.remote_queue_index =
std::get<2>(oldest_message).message().queue_index();
- CHECK_EQ(remote_monotonic_time, remote_timestamp_monotonic_time);
- CHECK_EQ(timestamp.remote_queue_index, std::get<1>(oldest_timestamp));
+ CHECK_EQ(timestamp.monotonic_remote_time,
+ remote_timestamp_monotonic_time);
+
+ CHECK_EQ(timestamp.remote_queue_index,
+ std::get<2>(oldest_timestamp).message().remote_queue_index())
+ << ": " << FlatbufferToJson(&std::get<2>(oldest_timestamp).message())
+ << " data "
+ << FlatbufferToJson(&std::get<2>(oldest_message).message());
return std::make_tuple(timestamp, std::get<2>(oldest_message));
}
@@ -833,10 +950,14 @@
if (!found_node) {
found_node = true;
log_file_header_ = CopyFlatBuffer(reader->log_file_header());
+ VLOG(1) << "Found log file " << reader->filename() << " with node "
+ << FlatbufferToJson(reader->node()) << " start_time "
+ << monotonic_start_time();
} else {
// And then make sure all the other files have matching headers.
- CHECK(
- CompareFlatBuffer(log_file_header(), reader->log_file_header()));
+ CHECK(CompareFlatBuffer(log_file_header(), reader->log_file_header()))
+ << ": " << FlatbufferToJson(log_file_header()) << " reader "
+ << FlatbufferToJson(reader->log_file_header());
}
}
}
@@ -859,18 +980,10 @@
}
// And prime everything.
- size_t split_message_reader_index = 0;
for (std::unique_ptr<SplitMessageReader> &split_message_reader :
split_message_readers_) {
- if (split_message_reader->QueueMessages(
- split_message_reader->monotonic_start_time())) {
- split_message_reader_heap_.push_back(std::make_pair(
- split_message_reader->queue_data_time(), split_message_reader_index));
-
- std::push_heap(split_message_reader_heap_.begin(),
- split_message_reader_heap_.end(), ChannelHeapCompare);
- }
- ++split_message_reader_index;
+ split_message_reader->QueueMessages(
+ split_message_reader->monotonic_start_time());
}
node_ = configuration::GetNodeOrDie(configuration(), target_node);
@@ -920,51 +1033,138 @@
TimestampMerger *merger = ×tamp_mergers_[channel_index];
- // Merger auto-pushes from here, but doesn't fetch anything new from the log
- // file.
+ // Merger handles any queueing needed from here.
std::tuple<TimestampMerger::DeliveryTimestamp,
FlatbufferVector<MessageHeader>>
message = merger->PopOldest();
- QueueMessages(OldestMessage());
-
return std::make_tuple(std::get<0>(message), channel_index,
std::move(std::get<1>(message)));
}
-void ChannelMerger::QueueMessages(
- monotonic_clock::time_point oldest_message_time) {
- // Pop and re-queue readers until they are all caught up.
- while (true) {
- if (split_message_reader_heap_.size() == 0) {
- return;
+std::string SplitMessageReader::MessageHeaderQueue::DebugString() const {
+ std::stringstream ss;
+ for (size_t i = 0; i < data_.size(); ++i) {
+ if (timestamps) {
+ ss << " msg: ";
+ } else {
+ ss << " timestamp: ";
}
- std::pair<monotonic_clock::time_point, int> oldest_channel_data =
- split_message_reader_heap_.front();
-
- // No work to do, bail.
- if (oldest_channel_data.first > oldest_message_time) {
- return;
+ ss << monotonic_clock::time_point(std::chrono::nanoseconds(
+ data_[i].message().monotonic_sent_time()))
+ << " ("
+ << realtime_clock::time_point(
+ std::chrono::nanoseconds(data_[i].message().realtime_sent_time()))
+ << ") " << data_[i].message().queue_index();
+ if (timestamps) {
+ ss << " <- remote "
+ << monotonic_clock::time_point(std::chrono::nanoseconds(
+ data_[i].message().monotonic_remote_time()))
+ << " ("
+ << realtime_clock::time_point(std::chrono::nanoseconds(
+ data_[i].message().realtime_remote_time()))
+ << ")";
}
+ ss << "\n";
+ }
- // Drop it off the heap.
- std::pop_heap(split_message_reader_heap_.begin(),
- split_message_reader_heap_.end(), &ChannelHeapCompare);
- split_message_reader_heap_.pop_back();
+ return ss.str();
+}
- // And if there is data left in the log file, push it back on the heap with
- // the updated time.
- const int split_message_reader_index = oldest_channel_data.second;
- if (split_message_readers_[split_message_reader_index]->QueueMessages(
- oldest_message_time)) {
- split_message_reader_heap_.push_back(std::make_pair(
- split_message_readers_[split_message_reader_index]->queue_data_time(),
- split_message_reader_index));
+std::string SplitMessageReader::DebugString(int channel) const {
+ std::stringstream ss;
+ ss << "[\n";
+ ss << channels_[channel].data.DebugString();
+ ss << " ]";
+ return ss.str();
+}
- std::push_heap(split_message_reader_heap_.begin(),
- split_message_reader_heap_.end(), ChannelHeapCompare);
+std::string SplitMessageReader::DebugString(int channel, int node_index) const {
+ std::stringstream ss;
+ ss << "[\n";
+ ss << channels_[channel].timestamps[node_index].DebugString();
+ ss << " ]";
+ return ss.str();
+}
+
+std::string TimestampMerger::DebugString() const {
+ std::stringstream ss;
+
+ if (timestamp_heap_.size() > 0) {
+ ss << " timestamp_heap {\n";
+ std::vector<
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
+ timestamp_heap = timestamp_heap_;
+ while (timestamp_heap.size() > 0u) {
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+ oldest_timestamp_reader = timestamp_heap.front();
+
+ ss << " " << std::get<2>(oldest_timestamp_reader) << " "
+ << std::get<0>(oldest_timestamp_reader) << " queue_index ("
+ << std::get<1>(oldest_timestamp_reader) << ") ttq "
+ << std::get<2>(oldest_timestamp_reader)->time_to_queue() << " "
+ << std::get<2>(oldest_timestamp_reader)->filename() << " -> "
+ << std::get<2>(oldest_timestamp_reader)
+ ->DebugString(channel_index_, node_index_)
+ << "\n";
+
+ std::pop_heap(timestamp_heap.begin(), timestamp_heap.end(),
+ &SplitMessageReaderHeapCompare);
+ timestamp_heap.pop_back();
+ }
+ ss << " }\n";
+ }
+
+ ss << " message_heap {\n";
+ {
+ std::vector<
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
+ message_heap = message_heap_;
+ while (message_heap.size() > 0u) {
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+ oldest_message_reader = message_heap.front();
+
+ ss << " " << std::get<2>(oldest_message_reader) << " "
+ << std::get<0>(oldest_message_reader) << " queue_index ("
+ << std::get<1>(oldest_message_reader) << ") ttq "
+ << std::get<2>(oldest_message_reader)->time_to_queue() << " "
+ << std::get<2>(oldest_message_reader)->filename() << " -> "
+ << std::get<2>(oldest_message_reader)->DebugString(channel_index_)
+ << "\n";
+
+ std::pop_heap(message_heap.begin(), message_heap.end(),
+ &SplitMessageReaderHeapCompare);
+ message_heap.pop_back();
}
}
+ ss << " }";
+
+ return ss.str();
+}
+
+std::string ChannelMerger::DebugString() const {
+ std::stringstream ss;
+ ss << "start_time " << realtime_start_time() << " " << monotonic_start_time()
+ << "\n";
+ ss << "channel_heap {\n";
+ std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
+ channel_heap_;
+ while (channel_heap.size() > 0u) {
+ std::tuple<monotonic_clock::time_point, int> channel = channel_heap.front();
+ ss << " " << std::get<0>(channel) << " (" << std::get<1>(channel) << ") "
+ << configuration::CleanedChannelToString(
+ configuration()->channels()->Get(std::get<1>(channel)))
+ << "\n";
+
+ ss << timestamp_mergers_[std::get<1>(channel)].DebugString() << "\n";
+
+ std::pop_heap(channel_heap.begin(), channel_heap.end(),
+ &ChannelHeapCompare);
+ channel_heap.pop_back();
+ }
+ ss << "}";
+
+ return ss.str();
}
} // namespace logger