Queue messages in LogReader::State
This gives us an interface to run the noncausal time offset filter.
Change-Id: I251714f3a6bcd78dd5cae8fd7089ca078e1663c4
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index abab5f6..242638c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -306,10 +306,8 @@
}
if (!configuration::MultiNode(configuration())) {
- states_.emplace_back(std::make_unique<State>());
- State *state = states_[0].get();
-
- state->channel_merger = std::make_unique<ChannelMerger>(filenames);
+ states_.emplace_back(
+ std::make_unique<State>(std::make_unique<ChannelMerger>(filenames)));
} else {
if (replay_configuration) {
CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -370,7 +368,7 @@
states_[configuration::GetNodeIndex(configuration(), node)].get();
CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
- return state->channel_merger->monotonic_start_time();
+ return state->monotonic_start_time();
}
realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
@@ -378,7 +376,7 @@
states_[configuration::GetNodeIndex(configuration(), node)].get();
CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
- return state->channel_merger->realtime_start_time();
+ return state->realtime_start_time();
}
void LogReader::Register() {
@@ -393,17 +391,12 @@
for (const Node *node : configuration::GetNodes(configuration())) {
const size_t node_index =
configuration::GetNodeIndex(configuration(), node);
- states_[node_index] = std::make_unique<State>();
+ states_[node_index] =
+ std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
State *state = states_[node_index].get();
- state->channel_merger = std::make_unique<ChannelMerger>(filenames_);
-
- state->node_event_loop_factory =
- event_loop_factory_->GetNodeEventLoopFactory(node);
- state->event_loop_unique_ptr =
- event_loop_factory->MakeEventLoop("log_reader", node);
-
- Register(state->event_loop_unique_ptr.get());
+ Register(state->SetNodeEventLoopFactory(
+ event_loop_factory_->GetNodeEventLoopFactory(node)));
}
if (live_nodes_ == 0) {
LOG(FATAL)
@@ -479,7 +472,7 @@
for (std::unique_ptr<State> &state : states_) {
for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
TimestampMerger::DeliveryTimestamp timestamp =
- state->channel_merger->OldestTimestampForChannel(i);
+ state->OldestTimestampForChannel(i);
if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
CHECK(state->MaybeUpdateTimestamp(timestamp, i));
}
@@ -548,13 +541,17 @@
for (std::unique_ptr<State> &state : states_) {
for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
TimestampMerger::DeliveryTimestamp timestamp =
- state->channel_merger->OldestTimestampForChannel(i);
+ state->OldestTimestampForChannel(i);
if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
CHECK(state->MaybeUpdateTimestamp(timestamp, i));
}
}
}
+ for (std::unique_ptr<State> &state : states_) {
+ state->SeedSortedMessages();
+ }
+
UpdateOffsets();
// We want to start the log file at the last start time of the log files from
@@ -564,14 +561,12 @@
for (std::unique_ptr<State> &state : states_) {
// Setup the realtime clock to have something sane in it now.
- state->node_event_loop_factory->SetRealtimeOffset(
- state->channel_merger->monotonic_start_time(),
- state->channel_merger->realtime_start_time());
+ state->SetRealtimeOffset(state->monotonic_start_time(),
+ state->realtime_start_time());
// And start computing the start time on the distributed clock now that that
// works.
- start_time = std::max(start_time,
- state->node_event_loop_factory->ToDistributedClock(
- state->channel_merger->monotonic_start_time()));
+ start_time = std::max(
+ start_time, state->ToDistributedClock(state->monotonic_start_time()));
}
CHECK_GE(start_time, distributed_clock::epoch());
@@ -589,7 +584,7 @@
states_[configuration::GetNodeIndex(configuration(), node)].get();
const Channel *remapped_channel =
- RemapChannel(state->event_loop, channel);
+ RemapChannel(state->event_loop(), channel);
event_loop_factory_->DisableForwarding(remapped_channel);
}
@@ -618,8 +613,7 @@
size_t node_index = 0;
for (std::unique_ptr<State> &state : states_) {
- state->node_event_loop_factory->SetDistributedOffset(-offset(node_index),
- 1.0);
+ state->SetDistributedOffset(-offset(node_index), 1.0);
++node_index;
}
}
@@ -664,21 +658,22 @@
const TimestampMerger::DeliveryTimestamp &channel_timestamp,
int channel_index) {
if (channel_timestamp.monotonic_remote_time == monotonic_clock::min_time) {
+ CHECK(std::get<0>(filters_[channel_index]) == nullptr);
return false;
}
// Got a forwarding timestamp!
- CHECK(std::get<0>(filters[channel_index]) != nullptr);
+ CHECK(std::get<0>(filters_[channel_index]) != nullptr);
// Call the correct method depending on if we are the forward or reverse
// direction here.
- if (std::get<1>(filters[channel_index])) {
- std::get<0>(filters[channel_index])
+ if (std::get<1>(filters_[channel_index])) {
+ std::get<0>(filters_[channel_index])
->FwdSample(channel_timestamp.monotonic_event_time,
channel_timestamp.monotonic_event_time -
channel_timestamp.monotonic_remote_time);
} else {
- std::get<0>(filters[channel_index])
+ std::get<0>(filters_[channel_index])
->RevSample(channel_timestamp.monotonic_event_time,
channel_timestamp.monotonic_event_time -
channel_timestamp.monotonic_remote_time);
@@ -691,7 +686,7 @@
states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
.get();
- state->event_loop = event_loop;
+ state->set_event_loop(event_loop);
// We don't run timing reports when trying to print out logged data, because
// otherwise we would end up printing out the timing reports themselves...
@@ -699,32 +694,33 @@
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
- const bool has_data = state->channel_merger->SetNode(event_loop->node());
+ const bool has_data = state->SetNode();
- state->channels.resize(logged_configuration()->channels()->size());
- state->filters.resize(state->channels.size());
+ state->SetChannelCount(logged_configuration()->channels()->size());
- state->channel_target_event_loop_factory.resize(state->channels.size());
-
- for (size_t i = 0; i < state->channels.size(); ++i) {
+ for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
const Channel *channel =
RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
- state->channels[i] = event_loop->MakeRawSender(channel);
+ std::tuple<message_bridge::ClippedAverageFilter *, bool> filter =
+ std::make_tuple(nullptr, false);
- state->filters[i] = std::make_tuple(nullptr, false);
+ NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
const Node *target_node = configuration::GetNode(
event_loop->configuration(), channel->source_node()->string_view());
- state->filters[i] = GetFilter(event_loop->node(), target_node);
+ filter = GetFilter(event_loop->node(), target_node);
if (event_loop_factory_ != nullptr) {
- state->channel_target_event_loop_factory[i] =
+ channel_target_event_loop_factory =
event_loop_factory_->GetNodeEventLoopFactory(target_node);
}
}
+
+ state->SetChannel(i, event_loop->MakeRawSender(channel), filter,
+ channel_target_event_loop_factory);
}
// If we didn't find any log files with data in them, we won't ever get a
@@ -733,8 +729,8 @@
return;
}
- state->timer_handler = event_loop->AddTimer([this, state]() {
- if (state->channel_merger->OldestMessage() == monotonic_clock::max_time) {
+ state->set_timer_handler(event_loop->AddTimer([this, state]() {
+ if (state->OldestMessageTime() == monotonic_clock::max_time) {
--live_nodes_;
VLOG(1) << "Node down!";
if (live_nodes_ == 0) {
@@ -748,22 +744,23 @@
FlatbufferVector<MessageHeader> channel_data =
FlatbufferVector<MessageHeader>::Empty();
+ bool dummy_update_time = false;
std::tie(channel_timestamp, channel_index, channel_data) =
- state->channel_merger->PopOldest();
+ state->PopOldest(&dummy_update_time);
const monotonic_clock::time_point monotonic_now =
- state->event_loop->context().monotonic_event_time;
+ state->event_loop()->context().monotonic_event_time;
CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
- << ": " << FlatbufferToJson(state->event_loop->node()) << " Now "
+ << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
<< monotonic_now << " trying to send "
<< channel_timestamp.monotonic_event_time << " failure "
- << state->channel_merger->DebugString();
+ << state->DebugString();
if (channel_timestamp.monotonic_event_time >
- state->channel_merger->monotonic_start_time() ||
+ state->monotonic_start_time() ||
event_loop_factory_ != nullptr) {
if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
- !state->channel_merger->at_end()) ||
+ !state->at_end()) ||
channel_data.message().data() != nullptr) {
CHECK(channel_data.message().data() != nullptr)
<< ": Got a message without data. Forwarding entry which was "
@@ -775,8 +772,7 @@
// destination node (this node). As a proxy, do this by making sure
// that time on the source node is past when the message was sent.
CHECK_LT(channel_timestamp.monotonic_remote_time,
- state->channel_target_event_loop_factory[channel_index]
- ->monotonic_now());
+ state->monotonic_remote_now(channel_index));
update_offsets = true;
@@ -803,31 +799,24 @@
}
fprintf(offset_fp_, "\n");
}
-
- } else {
- CHECK(std::get<0>(state->filters[channel_index]) == nullptr);
}
// If we have access to the factory, use it to fix the realtime time.
- if (state->node_event_loop_factory != nullptr) {
- state->node_event_loop_factory->SetRealtimeOffset(
- channel_timestamp.monotonic_event_time,
- channel_timestamp.realtime_event_time);
- }
+ state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
+ channel_timestamp.realtime_event_time);
- state->channels[channel_index]->Send(
- channel_data.message().data()->Data(),
- channel_data.message().data()->size(),
- channel_timestamp.monotonic_remote_time,
- channel_timestamp.realtime_remote_time,
- channel_timestamp.remote_queue_index);
- } else if (state->channel_merger->at_end()) {
+ state->Send(channel_index, channel_data.message().data()->Data(),
+ channel_data.message().data()->size(),
+ channel_timestamp.monotonic_remote_time,
+ channel_timestamp.realtime_remote_time,
+ channel_timestamp.remote_queue_index);
+ } else if (state->at_end()) {
// We are at the end of the log file and found missing data. Finish
// reading the rest of the log file and call it quits. We don't want to
// replay partial data.
- while (state->channel_merger->OldestMessage() !=
- monotonic_clock::max_time) {
- state->channel_merger->PopOldest();
+ while (state->OldestMessageTime() != monotonic_clock::max_time) {
+ bool update_time_dummy;
+ state->PopOldest(&update_time_dummy);
}
}
@@ -839,17 +828,15 @@
<< " " << FlatbufferToJson(channel_data);
}
- const monotonic_clock::time_point next_time =
- state->channel_merger->OldestMessage();
+ const monotonic_clock::time_point next_time = state->OldestMessageTime();
if (next_time != monotonic_clock::max_time) {
- state->timer_handler->Setup(next_time);
+ state->Setup(next_time);
} else {
// Set a timer up immediately after now to die. If we don't do this, then
// the senders waiting on the message we just read will never get called.
if (event_loop_factory_ != nullptr) {
- state->timer_handler->Setup(monotonic_now +
- event_loop_factory_->send_delay() +
- std::chrono::nanoseconds(1));
+ state->Setup(monotonic_now + event_loop_factory_->send_delay() +
+ std::chrono::nanoseconds(1));
}
}
@@ -859,14 +846,12 @@
if (update_offsets) {
UpdateOffsets();
}
- });
+ }));
++live_nodes_;
- if (state->channel_merger->OldestMessage() != monotonic_clock::max_time) {
- event_loop->OnRun([state]() {
- state->timer_handler->Setup(state->channel_merger->OldestMessage());
- });
+ if (state->OldestMessageTime() != monotonic_clock::max_time) {
+ event_loop->OnRun([state]() { state->Setup(state->OldestMessageTime()); });
}
}
@@ -874,12 +859,7 @@
// Make sure that things get destroyed in the correct order, rather than
// relying on getting the order correct in the class definition.
for (std::unique_ptr<State> &state : states_) {
- for (size_t i = 0; i < state->channels.size(); ++i) {
- state->channels[i].reset();
- }
- state->event_loop_unique_ptr.reset();
- state->event_loop = nullptr;
- state->node_event_loop_factory = nullptr;
+ state->Deregister();
}
event_loop_factory_unique_ptr_.reset();
@@ -910,7 +890,7 @@
void LogReader::MakeRemappedConfig() {
for (std::unique_ptr<State> &state : states_) {
if (state) {
- CHECK(!state->event_loop)
+ CHECK(!state->event_loop())
<< ": Can't change the mapping after the events are scheduled.";
}
}
@@ -1021,5 +1001,103 @@
return remapped_channel;
}
+LogReader::State::State(std::unique_ptr<ChannelMerger> channel_merger)
+ : channel_merger_(std::move(channel_merger)) {}
+
+EventLoop *LogReader::State::SetNodeEventLoopFactory(
+ NodeEventLoopFactory *node_event_loop_factory) {
+ node_event_loop_factory_ = node_event_loop_factory;
+ event_loop_unique_ptr_ =
+ node_event_loop_factory_->MakeEventLoop("log_reader");
+ return event_loop_unique_ptr_.get();
+}
+
+void LogReader::State::SetChannelCount(size_t count) {
+ channels_.resize(count);
+ filters_.resize(count);
+ channel_target_event_loop_factory_.resize(count);
+}
+
+void LogReader::State::SetChannel(
+ size_t channel, std::unique_ptr<RawSender> sender,
+ std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
+ NodeEventLoopFactory *channel_target_event_loop_factory) {
+ channels_[channel] = std::move(sender);
+ filters_[channel] = filter;
+ channel_target_event_loop_factory_[channel] =
+ channel_target_event_loop_factory;
+}
+
+std::tuple<TimestampMerger::DeliveryTimestamp, int,
+ FlatbufferVector<MessageHeader>>
+LogReader::State::PopOldest(bool *update_time) {
+ CHECK_GT(sorted_messages_.size(), 0u);
+
+ std::tuple<TimestampMerger::DeliveryTimestamp, int,
+ FlatbufferVector<MessageHeader>>
+ result = std::move(sorted_messages_.front());
+ VLOG(1) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
+ << std::get<0>(result).monotonic_event_time;
+ sorted_messages_.pop_front();
+ SeedSortedMessages();
+
+ *update_time = false;
+ return std::make_tuple(std::get<0>(result), std::get<1>(result),
+ std::move(std::get<2>(result)));
+}
+
+monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
+ if (sorted_messages_.size() > 0) {
+ VLOG(1) << MaybeNodeName(event_loop_->node()) << "oldest message at "
+ << std::get<0>(sorted_messages_.front()).monotonic_event_time;
+ return std::get<0>(sorted_messages_.front()).monotonic_event_time;
+ }
+
+ return channel_merger_->OldestMessageTime();
+}
+
+void LogReader::State::SeedSortedMessages() {
+ const aos::monotonic_clock::time_point end_queue_time =
+ (sorted_messages_.size() > 0
+ ? std::get<0>(sorted_messages_.front()).monotonic_event_time
+ : channel_merger_->monotonic_start_time()) +
+ std::chrono::seconds(2);
+
+ while (true) {
+ if (channel_merger_->OldestMessageTime() == monotonic_clock::max_time) {
+ return;
+ }
+ if (sorted_messages_.size() > 0) {
+ // Stop placing sorted messages on the list once we have 2 seconds
+ // queued up (but queue at least until the log starts.
+ if (end_queue_time <
+ std::get<0>(sorted_messages_.back()).monotonic_event_time) {
+ return;
+ }
+ }
+
+ TimestampMerger::DeliveryTimestamp channel_timestamp;
+ int channel_index;
+ FlatbufferVector<MessageHeader> channel_data =
+ FlatbufferVector<MessageHeader>::Empty();
+
+ std::tie(channel_timestamp, channel_index, channel_data) =
+ channel_merger_->PopOldest();
+
+ sorted_messages_.emplace_back(channel_timestamp, channel_index,
+ std::move(channel_data));
+ }
+}
+
+void LogReader::State::Deregister() {
+ for (size_t i = 0; i < channels_.size(); ++i) {
+ channels_[i].reset();
+ }
+ event_loop_unique_ptr_.reset();
+ event_loop_ = nullptr;
+ timer_handler_ = nullptr;
+ node_event_loop_factory_ = nullptr;
+}
+
} // namespace logger
} // namespace aos