Switch LogReader to the new API, and remove the old
Now that we have a fancy new log sorter, lets flip over to using it.
This doesn't simplify anything yet on the log reader side to take
advantage of the simpler code.
While we are here, the new API really wants a LogFiles object instead of
vectors of strings. Convert any calls over to the new LogFiles API.
The log file was updated to add UUIDs. They are required with
multi-node log files, and it doesn't seem worth changing that
requirement for this old log.
Change-Id: I84bd63c7339ec43ed01c106131153e1cb6d213bb
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index b32c748..5a6bb8f 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -38,11 +38,12 @@
namespace {
// Helper to safely read a header, or CHECK.
SizePrefixedFlatbufferVector<LogFileHeader> MaybeReadHeaderOrDie(
- const std::vector<std::vector<std::string>> &filenames) {
- CHECK_GE(filenames.size(), 1u) << ": Empty filenames list";
- CHECK_GE(filenames[0].size(), 1u) << ": Empty filenames list";
+ const std::vector<LogFile> &log_files) {
+ CHECK_GE(log_files.size(), 1u) << ": Empty filenames list";
+ CHECK_GE(log_files[0].parts.size(), 1u) << ": Empty filenames list";
+ CHECK_GE(log_files[0].parts[0].parts.size(), 1u) << ": Empty filenames list";
std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> result =
- ReadHeader(filenames[0][0]);
+ ReadHeader(log_files[0].parts[0].parts[0]);
CHECK(result);
return result.value();
}
@@ -717,24 +718,12 @@
LogReader::LogReader(std::string_view filename,
const Configuration *replay_configuration)
- : LogReader(std::vector<std::string>{std::string(filename)},
- replay_configuration) {}
+ : LogReader(SortParts({std::string(filename)}), replay_configuration) {}
-LogReader::LogReader(const std::vector<std::string> &filenames,
+LogReader::LogReader(std::vector<LogFile> log_files,
const Configuration *replay_configuration)
- : LogReader(std::vector<std::vector<std::string>>{filenames},
- replay_configuration) {}
-
-// TODO(austin): Make this the base and kill the others. This has much better
-// context for sorting.
-LogReader::LogReader(const std::vector<LogFile> &log_files,
- const Configuration *replay_configuration)
- : LogReader(ToLogReaderVector(log_files), replay_configuration) {}
-
-LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
- const Configuration *replay_configuration)
- : filenames_(filenames),
- log_file_header_(MaybeReadHeaderOrDie(filenames)),
+ : log_files_(std::move(log_files)),
+ log_file_header_(MaybeReadHeaderOrDie(log_files_)),
replay_configuration_(replay_configuration) {
MakeRemappedConfig();
@@ -762,8 +751,8 @@
}
if (!configuration::MultiNode(configuration())) {
- states_.emplace_back(
- std::make_unique<State>(std::make_unique<ChannelMerger>(filenames)));
+ states_.emplace_back(std::make_unique<State>(
+ std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, ""))));
} else {
if (replay_configuration) {
CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -851,8 +840,12 @@
for (const Node *node : configuration::GetNodes(configuration())) {
const size_t node_index =
configuration::GetNodeIndex(configuration(), node);
- states_[node_index] =
- std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
+ std::vector<LogParts> filtered_parts = FilterPartsForNode(
+ log_files_, node != nullptr ? node->name()->string_view() : "");
+ states_[node_index] = std::make_unique<State>(
+ filtered_parts.size() == 0u
+ ? nullptr
+ : std::make_unique<TimestampMapper>(std::move(filtered_parts)));
State *state = states_[node_index].get();
state->set_event_loop(state->SetNodeEventLoopFactory(
event_loop_factory_->GetNodeEventLoopFactory(node)));
@@ -860,6 +853,20 @@
state->SetChannelCount(logged_configuration()->channels()->size());
}
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ const size_t node_index =
+ configuration::GetNodeIndex(configuration(), node);
+ State *state = states_[node_index].get();
+ for (const Node *other_node : configuration::GetNodes(configuration())) {
+ const size_t other_node_index =
+ configuration::GetNodeIndex(configuration(), other_node);
+ State *other_state = states_[other_node_index].get();
+ if (other_state != state) {
+ state->AddPeer(other_state);
+ }
+ }
+ }
+
// Register after making all the State objects so we can build references
// between them.
for (const Node *node : configuration::GetNodes(configuration())) {
@@ -971,6 +978,9 @@
VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
<< MaybeNodeName(state->event_loop()->node()) << "now "
<< state->monotonic_now();
+ if (state->monotonic_start_time() == monotonic_clock::min_time) {
+ continue;
+ }
// And start computing the start time on the distributed clock now that
// that works.
start_time = std::max(
@@ -1221,8 +1231,6 @@
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
- const bool has_data = state->SetNode();
-
for (size_t logged_channel_index = 0;
logged_channel_index < logged_configuration()->channels()->size();
++logged_channel_index) {
@@ -1267,7 +1275,7 @@
// If we didn't find any log files with data in them, we won't ever get a
// callback or be live. So skip the rest of the setup.
- if (!has_data) {
+ if (state->OldestMessageTime() == monotonic_clock::max_time) {
return;
}
@@ -1283,45 +1291,39 @@
}
return;
}
- TimestampMerger::DeliveryTimestamp channel_timestamp;
- int channel_index;
- SizePrefixedFlatbufferVector<MessageHeader> channel_data =
- SizePrefixedFlatbufferVector<MessageHeader>::Empty();
-
if (VLOG_IS_ON(1)) {
LogFit("Offset was");
}
bool update_time;
- std::tie(channel_timestamp, channel_index, channel_data) =
- state->PopOldest(&update_time);
+ TimestampedMessage timestamped_message = state->PopOldest(&update_time);
const monotonic_clock::time_point monotonic_now =
state->event_loop()->context().monotonic_event_time;
if (!FLAGS_skip_order_validation) {
- CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
+ CHECK(monotonic_now == timestamped_message.monotonic_event_time)
<< ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
<< monotonic_now << " trying to send "
- << channel_timestamp.monotonic_event_time << " failure "
+ << timestamped_message.monotonic_event_time << " failure "
<< state->DebugString();
- } else if (monotonic_now != channel_timestamp.monotonic_event_time) {
+ } else if (monotonic_now != timestamped_message.monotonic_event_time) {
LOG(WARNING) << "Check failed: monotonic_now == "
- "channel_timestamp.monotonic_event_time) ("
+ "timestamped_message.monotonic_event_time) ("
<< monotonic_now << " vs. "
- << channel_timestamp.monotonic_event_time
+ << timestamped_message.monotonic_event_time
<< "): " << FlatbufferToJson(state->event_loop()->node())
<< " Now " << monotonic_now << " trying to send "
- << channel_timestamp.monotonic_event_time << " failure "
+ << timestamped_message.monotonic_event_time << " failure "
<< state->DebugString();
}
- if (channel_timestamp.monotonic_event_time >
+ if (timestamped_message.monotonic_event_time >
state->monotonic_start_time() ||
event_loop_factory_ != nullptr) {
if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
!state->at_end()) ||
- channel_data.message().data() != nullptr) {
- CHECK(channel_data.message().data() != nullptr)
+ timestamped_message.data.span().size() != 0u) {
+ CHECK_NE(timestamped_message.data.span().size(), 0u)
<< ": Got a message without data. Forwarding entry which was "
"not matched? Use --skip_missing_forwarding_entries to "
"ignore this.";
@@ -1331,28 +1333,38 @@
// destination node (this node). As a proxy, do this by making sure
// that time on the source node is past when the message was sent.
if (!FLAGS_skip_order_validation) {
- CHECK_LT(channel_timestamp.monotonic_remote_time,
- state->monotonic_remote_now(channel_index))
+ CHECK_LT(
+ timestamped_message.monotonic_remote_time,
+ state->monotonic_remote_now(timestamped_message.channel_index))
<< state->event_loop()->node()->name()->string_view() << " to "
- << state->remote_node(channel_index)->name()->string_view()
+ << state->remote_node(timestamped_message.channel_index)
+ ->name()
+ ->string_view()
<< " " << state->DebugString();
- } else if (channel_timestamp.monotonic_remote_time >=
- state->monotonic_remote_now(channel_index)) {
+ } else if (timestamped_message.monotonic_remote_time >=
+ state->monotonic_remote_now(
+ timestamped_message.channel_index)) {
LOG(WARNING)
- << "Check failed: channel_timestamp.monotonic_remote_time < "
- "state->monotonic_remote_now(channel_index) ("
- << channel_timestamp.monotonic_remote_time << " vs. "
- << state->monotonic_remote_now(channel_index) << ") "
- << state->event_loop()->node()->name()->string_view() << " to "
- << state->remote_node(channel_index)->name()->string_view()
- << " currently " << channel_timestamp.monotonic_event_time
+ << "Check failed: timestamped_message.monotonic_remote_time < "
+ "state->monotonic_remote_now(timestamped_message.channel_"
+ "index) ("
+ << timestamped_message.monotonic_remote_time << " vs. "
+ << state->monotonic_remote_now(
+ timestamped_message.channel_index)
+ << ") " << state->event_loop()->node()->name()->string_view()
+ << " to "
+ << state->remote_node(timestamped_message.channel_index)
+ ->name()
+ ->string_view()
+ << " currently " << timestamped_message.monotonic_event_time
<< " ("
<< state->ToDistributedClock(
- channel_timestamp.monotonic_event_time)
+ timestamped_message.monotonic_event_time)
<< ") remote event time "
- << channel_timestamp.monotonic_remote_time << " ("
+ << timestamped_message.monotonic_remote_time << " ("
<< state->RemoteToDistributedClock(
- channel_index, channel_timestamp.monotonic_remote_time)
+ timestamped_message.channel_index,
+ timestamped_message.monotonic_remote_time)
<< ") " << state->DebugString();
}
@@ -1362,12 +1374,12 @@
fprintf(
offset_fp_,
"# time_since_start, offset node 0, offset node 1, ...\n");
- first_time_ = channel_timestamp.realtime_event_time;
+ first_time_ = timestamped_message.realtime_event_time;
}
fprintf(offset_fp_, "%.9f",
std::chrono::duration_cast<std::chrono::duration<double>>(
- channel_timestamp.realtime_event_time - first_time_)
+ timestamped_message.realtime_event_time - first_time_)
.count());
for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
fprintf(offset_fp_, ", %.9f",
@@ -1383,15 +1395,14 @@
}
// If we have access to the factory, use it to fix the realtime time.
- state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
- channel_timestamp.realtime_event_time);
+ state->SetRealtimeOffset(timestamped_message.monotonic_event_time,
+ timestamped_message.realtime_event_time);
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
- << channel_timestamp.monotonic_event_time;
+ << timestamped_message.monotonic_event_time;
// TODO(austin): std::move channel_data in and make that efficient in
// simulation.
- state->Send(channel_index, channel_data.message().data()->Data(),
- channel_data.message().data()->size(), channel_timestamp);
+ state->Send(std::move(timestamped_message));
} else if (state->at_end() && !ignore_missing_data_) {
// We are at the end of the log file and found missing data. Finish
// reading the rest of the log file and call it quits. We don't want
@@ -1401,15 +1412,15 @@
state->PopOldest(&update_time_dummy);
}
} else {
- CHECK(channel_data.message().data() == nullptr) << ": Nullptr";
+ CHECK(timestamped_message.data.span().data() == nullptr) << ": Nullptr";
}
} else {
LOG(WARNING)
<< "Not sending data from before the start of the log file. "
- << channel_timestamp.monotonic_event_time.time_since_epoch().count()
+ << timestamped_message.monotonic_event_time.time_since_epoch().count()
<< " start " << monotonic_start_time().time_since_epoch().count()
<< " "
- << FlatbufferToJson(channel_data,
+ << FlatbufferToJson(timestamped_message.data,
{.multi_line = false, .max_vector_size = 100});
}
@@ -1742,8 +1753,14 @@
return remapped_channel;
}
-LogReader::State::State(std::unique_ptr<ChannelMerger> channel_merger)
- : channel_merger_(std::move(channel_merger)) {}
+LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper)
+ : timestamp_mapper_(std::move(timestamp_mapper)) {}
+
+void LogReader::State::AddPeer(State *peer) {
+ if (timestamp_mapper_ && peer->timestamp_mapper_) {
+ timestamp_mapper_->AddPeer(peer->timestamp_mapper_.get());
+ }
+}
EventLoop *LogReader::State::SetNodeEventLoopFactory(
NodeEventLoopFactory *node_event_loop_factory) {
@@ -1783,22 +1800,20 @@
factory_channel_index_[logged_channel_index] = factory_channel_index;
}
-bool LogReader::State::Send(
- size_t channel_index, const void *data, size_t size,
- const TimestampMerger::DeliveryTimestamp &delivery_timestamp) {
- aos::RawSender *sender = channels_[channel_index].get();
+bool LogReader::State::Send(const TimestampedMessage ×tamped_message) {
+ aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
uint32_t remote_queue_index = 0xffffffff;
- if (remote_timestamp_senders_[channel_index] != nullptr) {
- std::vector<SentTimestamp> *queue_index_map =
- CHECK_NOTNULL(CHECK_NOTNULL(channel_source_state_[channel_index])
- ->queue_index_map_[channel_index]
- .get());
+ if (remote_timestamp_senders_[timestamped_message.channel_index] != nullptr) {
+ std::vector<SentTimestamp> *queue_index_map = CHECK_NOTNULL(
+ CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index])
+ ->queue_index_map_[timestamped_message.channel_index]
+ .get());
SentTimestamp search;
- search.monotonic_event_time = delivery_timestamp.monotonic_remote_time;
- search.realtime_event_time = delivery_timestamp.realtime_remote_time;
- search.queue_index = delivery_timestamp.remote_queue_index;
+ search.monotonic_event_time = timestamped_message.monotonic_remote_time;
+ search.realtime_event_time = timestamped_message.realtime_remote_time;
+ search.queue_index = timestamped_message.remote_queue_index;
// Find the sent time if available.
auto element = std::lower_bound(
@@ -1828,10 +1843,10 @@
// receive time.
if (element != queue_index_map->end()) {
CHECK_EQ(element->monotonic_event_time,
- delivery_timestamp.monotonic_remote_time);
+ timestamped_message.monotonic_remote_time);
CHECK_EQ(element->realtime_event_time,
- delivery_timestamp.realtime_remote_time);
- CHECK_EQ(element->queue_index, delivery_timestamp.remote_queue_index);
+ timestamped_message.realtime_remote_time);
+ CHECK_EQ(element->queue_index, timestamped_message.remote_queue_index);
remote_queue_index = element->actual_queue_index;
}
@@ -1839,27 +1854,32 @@
// Send! Use the replayed queue index here instead of the logged queue index
// for the remote queue index. This makes re-logging work.
- const bool sent =
- sender->Send(data, size, delivery_timestamp.monotonic_remote_time,
- delivery_timestamp.realtime_remote_time, remote_queue_index);
+ const bool sent = sender->Send(
+ timestamped_message.data.message().data()->Data(),
+ timestamped_message.data.message().data()->size(),
+ timestamped_message.monotonic_remote_time,
+ timestamped_message.realtime_remote_time, remote_queue_index);
if (!sent) return false;
- if (queue_index_map_[channel_index]) {
+ if (queue_index_map_[timestamped_message.channel_index]) {
SentTimestamp timestamp;
- timestamp.monotonic_event_time = delivery_timestamp.monotonic_event_time;
- timestamp.realtime_event_time = delivery_timestamp.realtime_event_time;
- timestamp.queue_index = delivery_timestamp.queue_index;
+ timestamp.monotonic_event_time = timestamped_message.monotonic_event_time;
+ timestamp.realtime_event_time = timestamped_message.realtime_event_time;
+ timestamp.queue_index = timestamped_message.queue_index;
timestamp.actual_queue_index = sender->sent_queue_index();
- queue_index_map_[channel_index]->emplace_back(timestamp);
- } else if (remote_timestamp_senders_[channel_index] != nullptr) {
+ queue_index_map_[timestamped_message.channel_index]->emplace_back(
+ timestamp);
+ } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
+ nullptr) {
aos::Sender<MessageHeader>::Builder builder =
- remote_timestamp_senders_[channel_index]->MakeBuilder();
+ remote_timestamp_senders_[timestamped_message.channel_index]
+ ->MakeBuilder();
logger::MessageHeader::Builder message_header_builder =
builder.MakeBuilder<logger::MessageHeader>();
message_header_builder.add_channel_index(
- factory_channel_index_[channel_index]);
+ factory_channel_index_[timestamped_message.channel_index]);
// Swap the remote and sent metrics. They are from the sender's
// perspective, not the receiver's perspective.
@@ -1870,9 +1890,9 @@
message_header_builder.add_queue_index(sender->sent_queue_index());
message_header_builder.add_monotonic_remote_time(
- delivery_timestamp.monotonic_remote_time.time_since_epoch().count());
+ timestamped_message.monotonic_remote_time.time_since_epoch().count());
message_header_builder.add_realtime_remote_time(
- delivery_timestamp.realtime_remote_time.time_since_epoch().count());
+ timestamped_message.realtime_remote_time.time_since_epoch().count());
message_header_builder.add_remote_queue_index(remote_queue_index);
@@ -1899,28 +1919,23 @@
return &(sender->second);
}
-std::tuple<TimestampMerger::DeliveryTimestamp, int,
- SizePrefixedFlatbufferVector<MessageHeader>>
-LogReader::State::PopOldest(bool *update_time) {
+TimestampedMessage LogReader::State::PopOldest(bool *update_time) {
CHECK_GT(sorted_messages_.size(), 0u);
- std::tuple<TimestampMerger::DeliveryTimestamp, int,
- SizePrefixedFlatbufferVector<MessageHeader>,
- message_bridge::NoncausalOffsetEstimator *>
+ std::tuple<TimestampedMessage, message_bridge::NoncausalOffsetEstimator *>
result = std::move(sorted_messages_.front());
VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
<< std::get<0>(result).monotonic_event_time;
sorted_messages_.pop_front();
SeedSortedMessages();
- if (std::get<3>(result) != nullptr) {
- *update_time = std::get<3>(result)->Pop(
+ if (std::get<1>(result) != nullptr) {
+ *update_time = std::get<1>(result)->Pop(
event_loop_->node(), std::get<0>(result).monotonic_event_time);
} else {
*update_time = false;
}
- return std::make_tuple(std::get<0>(result), std::get<1>(result),
- std::move(std::get<2>(result)));
+ return std::move(std::get<0>(result));
}
monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
@@ -1930,18 +1945,25 @@
return std::get<0>(sorted_messages_.front()).monotonic_event_time;
}
- return channel_merger_->OldestMessageTime();
+ TimestampedMessage *m =
+ timestamp_mapper_ ? timestamp_mapper_->Front() : nullptr;
+ if (m == nullptr) {
+ return monotonic_clock::max_time;
+ }
+ return m->monotonic_event_time;
}
void LogReader::State::SeedSortedMessages() {
+ if (!timestamp_mapper_) return;
const aos::monotonic_clock::time_point end_queue_time =
(sorted_messages_.size() > 0
? std::get<0>(sorted_messages_.front()).monotonic_event_time
- : channel_merger_->monotonic_start_time()) +
+ : timestamp_mapper_->monotonic_start_time()) +
std::chrono::seconds(2);
while (true) {
- if (channel_merger_->OldestMessageTime() == monotonic_clock::max_time) {
+ TimestampedMessage *m = timestamp_mapper_->Front();
+ if (m == nullptr) {
return;
}
if (sorted_messages_.size() > 0) {
@@ -1953,31 +1975,25 @@
}
}
- TimestampMerger::DeliveryTimestamp channel_timestamp;
- int channel_index;
- SizePrefixedFlatbufferVector<MessageHeader> channel_data =
- SizePrefixedFlatbufferVector<MessageHeader>::Empty();
-
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
- std::tie(channel_timestamp, channel_index, channel_data) =
- channel_merger_->PopOldest();
+ TimestampedMessage timestamped_message = std::move(*m);
+ timestamp_mapper_->PopFront();
// Skip any messages without forwarding information.
- if (channel_timestamp.monotonic_remote_time != monotonic_clock::min_time) {
+ if (timestamped_message.monotonic_remote_time != monotonic_clock::min_time) {
// Got a forwarding timestamp!
- filter = filters_[channel_index];
+ filter = filters_[timestamped_message.channel_index];
CHECK(filter != nullptr);
// Call the correct method depending on if we are the forward or
// reverse direction here.
filter->Sample(event_loop_->node(),
- channel_timestamp.monotonic_event_time,
- channel_timestamp.monotonic_remote_time);
+ timestamped_message.monotonic_event_time,
+ timestamped_message.monotonic_remote_time);
}
- sorted_messages_.emplace_back(channel_timestamp, channel_index,
- std::move(channel_data), filter);
+ sorted_messages_.emplace_back(std::move(timestamped_message), filter);
}
}