Queue messages in LogReader::State
This gives us an interface to run the noncausal time offset filter.
Change-Id: I251714f3a6bcd78dd5cae8fd7089ca078e1663c4
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 6dbc36f..ade82f9 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1098,7 +1098,7 @@
return true;
}
-monotonic_clock::time_point ChannelMerger::OldestMessage() const {
+monotonic_clock::time_point ChannelMerger::OldestMessageTime() const {
if (channel_heap_.empty()) {
return monotonic_clock::max_time;
}
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 23dadc8..2b08b59 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -570,7 +570,7 @@
// Everything else needs the node set before it works.
// Returns a timestamp for the oldest message in this group of logfiles.
- monotonic_clock::time_point OldestMessage() const;
+ monotonic_clock::time_point OldestMessageTime() const;
// Pops the oldest message.
std::tuple<TimestampMerger::DeliveryTimestamp, int,
FlatbufferVector<MessageHeader>>
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index abab5f6..242638c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -306,10 +306,8 @@
}
if (!configuration::MultiNode(configuration())) {
- states_.emplace_back(std::make_unique<State>());
- State *state = states_[0].get();
-
- state->channel_merger = std::make_unique<ChannelMerger>(filenames);
+ states_.emplace_back(
+ std::make_unique<State>(std::make_unique<ChannelMerger>(filenames)));
} else {
if (replay_configuration) {
CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -370,7 +368,7 @@
states_[configuration::GetNodeIndex(configuration(), node)].get();
CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
- return state->channel_merger->monotonic_start_time();
+ return state->monotonic_start_time();
}
realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
@@ -378,7 +376,7 @@
states_[configuration::GetNodeIndex(configuration(), node)].get();
CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
- return state->channel_merger->realtime_start_time();
+ return state->realtime_start_time();
}
void LogReader::Register() {
@@ -393,17 +391,12 @@
for (const Node *node : configuration::GetNodes(configuration())) {
const size_t node_index =
configuration::GetNodeIndex(configuration(), node);
- states_[node_index] = std::make_unique<State>();
+ states_[node_index] =
+ std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
State *state = states_[node_index].get();
- state->channel_merger = std::make_unique<ChannelMerger>(filenames_);
-
- state->node_event_loop_factory =
- event_loop_factory_->GetNodeEventLoopFactory(node);
- state->event_loop_unique_ptr =
- event_loop_factory->MakeEventLoop("log_reader", node);
-
- Register(state->event_loop_unique_ptr.get());
+ Register(state->SetNodeEventLoopFactory(
+ event_loop_factory_->GetNodeEventLoopFactory(node)));
}
if (live_nodes_ == 0) {
LOG(FATAL)
@@ -479,7 +472,7 @@
for (std::unique_ptr<State> &state : states_) {
for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
TimestampMerger::DeliveryTimestamp timestamp =
- state->channel_merger->OldestTimestampForChannel(i);
+ state->OldestTimestampForChannel(i);
if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
CHECK(state->MaybeUpdateTimestamp(timestamp, i));
}
@@ -548,13 +541,17 @@
for (std::unique_ptr<State> &state : states_) {
for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
TimestampMerger::DeliveryTimestamp timestamp =
- state->channel_merger->OldestTimestampForChannel(i);
+ state->OldestTimestampForChannel(i);
if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
CHECK(state->MaybeUpdateTimestamp(timestamp, i));
}
}
}
+ for (std::unique_ptr<State> &state : states_) {
+ state->SeedSortedMessages();
+ }
+
UpdateOffsets();
// We want to start the log file at the last start time of the log files from
@@ -564,14 +561,12 @@
for (std::unique_ptr<State> &state : states_) {
// Setup the realtime clock to have something sane in it now.
- state->node_event_loop_factory->SetRealtimeOffset(
- state->channel_merger->monotonic_start_time(),
- state->channel_merger->realtime_start_time());
+ state->SetRealtimeOffset(state->monotonic_start_time(),
+ state->realtime_start_time());
// And start computing the start time on the distributed clock now that that
// works.
- start_time = std::max(start_time,
- state->node_event_loop_factory->ToDistributedClock(
- state->channel_merger->monotonic_start_time()));
+ start_time = std::max(
+ start_time, state->ToDistributedClock(state->monotonic_start_time()));
}
CHECK_GE(start_time, distributed_clock::epoch());
@@ -589,7 +584,7 @@
states_[configuration::GetNodeIndex(configuration(), node)].get();
const Channel *remapped_channel =
- RemapChannel(state->event_loop, channel);
+ RemapChannel(state->event_loop(), channel);
event_loop_factory_->DisableForwarding(remapped_channel);
}
@@ -618,8 +613,7 @@
size_t node_index = 0;
for (std::unique_ptr<State> &state : states_) {
- state->node_event_loop_factory->SetDistributedOffset(-offset(node_index),
- 1.0);
+ state->SetDistributedOffset(-offset(node_index), 1.0);
++node_index;
}
}
@@ -664,21 +658,22 @@
const TimestampMerger::DeliveryTimestamp &channel_timestamp,
int channel_index) {
if (channel_timestamp.monotonic_remote_time == monotonic_clock::min_time) {
+ CHECK(std::get<0>(filters_[channel_index]) == nullptr);
return false;
}
// Got a forwarding timestamp!
- CHECK(std::get<0>(filters[channel_index]) != nullptr);
+ CHECK(std::get<0>(filters_[channel_index]) != nullptr);
// Call the correct method depending on if we are the forward or reverse
// direction here.
- if (std::get<1>(filters[channel_index])) {
- std::get<0>(filters[channel_index])
+ if (std::get<1>(filters_[channel_index])) {
+ std::get<0>(filters_[channel_index])
->FwdSample(channel_timestamp.monotonic_event_time,
channel_timestamp.monotonic_event_time -
channel_timestamp.monotonic_remote_time);
} else {
- std::get<0>(filters[channel_index])
+ std::get<0>(filters_[channel_index])
->RevSample(channel_timestamp.monotonic_event_time,
channel_timestamp.monotonic_event_time -
channel_timestamp.monotonic_remote_time);
@@ -691,7 +686,7 @@
states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
.get();
- state->event_loop = event_loop;
+ state->set_event_loop(event_loop);
// We don't run timing reports when trying to print out logged data, because
// otherwise we would end up printing out the timing reports themselves...
@@ -699,32 +694,33 @@
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
- const bool has_data = state->channel_merger->SetNode(event_loop->node());
+ const bool has_data = state->SetNode();
- state->channels.resize(logged_configuration()->channels()->size());
- state->filters.resize(state->channels.size());
+ state->SetChannelCount(logged_configuration()->channels()->size());
- state->channel_target_event_loop_factory.resize(state->channels.size());
-
- for (size_t i = 0; i < state->channels.size(); ++i) {
+ for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
const Channel *channel =
RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
- state->channels[i] = event_loop->MakeRawSender(channel);
+ std::tuple<message_bridge::ClippedAverageFilter *, bool> filter =
+ std::make_tuple(nullptr, false);
- state->filters[i] = std::make_tuple(nullptr, false);
+ NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
const Node *target_node = configuration::GetNode(
event_loop->configuration(), channel->source_node()->string_view());
- state->filters[i] = GetFilter(event_loop->node(), target_node);
+ filter = GetFilter(event_loop->node(), target_node);
if (event_loop_factory_ != nullptr) {
- state->channel_target_event_loop_factory[i] =
+ channel_target_event_loop_factory =
event_loop_factory_->GetNodeEventLoopFactory(target_node);
}
}
+
+ state->SetChannel(i, event_loop->MakeRawSender(channel), filter,
+ channel_target_event_loop_factory);
}
// If we didn't find any log files with data in them, we won't ever get a
@@ -733,8 +729,8 @@
return;
}
- state->timer_handler = event_loop->AddTimer([this, state]() {
- if (state->channel_merger->OldestMessage() == monotonic_clock::max_time) {
+ state->set_timer_handler(event_loop->AddTimer([this, state]() {
+ if (state->OldestMessageTime() == monotonic_clock::max_time) {
--live_nodes_;
VLOG(1) << "Node down!";
if (live_nodes_ == 0) {
@@ -748,22 +744,23 @@
FlatbufferVector<MessageHeader> channel_data =
FlatbufferVector<MessageHeader>::Empty();
+ bool dummy_update_time = false;
std::tie(channel_timestamp, channel_index, channel_data) =
- state->channel_merger->PopOldest();
+ state->PopOldest(&dummy_update_time);
const monotonic_clock::time_point monotonic_now =
- state->event_loop->context().monotonic_event_time;
+ state->event_loop()->context().monotonic_event_time;
CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
- << ": " << FlatbufferToJson(state->event_loop->node()) << " Now "
+ << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
<< monotonic_now << " trying to send "
<< channel_timestamp.monotonic_event_time << " failure "
- << state->channel_merger->DebugString();
+ << state->DebugString();
if (channel_timestamp.monotonic_event_time >
- state->channel_merger->monotonic_start_time() ||
+ state->monotonic_start_time() ||
event_loop_factory_ != nullptr) {
if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
- !state->channel_merger->at_end()) ||
+ !state->at_end()) ||
channel_data.message().data() != nullptr) {
CHECK(channel_data.message().data() != nullptr)
<< ": Got a message without data. Forwarding entry which was "
@@ -775,8 +772,7 @@
// destination node (this node). As a proxy, do this by making sure
// that time on the source node is past when the message was sent.
CHECK_LT(channel_timestamp.monotonic_remote_time,
- state->channel_target_event_loop_factory[channel_index]
- ->monotonic_now());
+ state->monotonic_remote_now(channel_index));
update_offsets = true;
@@ -803,31 +799,24 @@
}
fprintf(offset_fp_, "\n");
}
-
- } else {
- CHECK(std::get<0>(state->filters[channel_index]) == nullptr);
}
// If we have access to the factory, use it to fix the realtime time.
- if (state->node_event_loop_factory != nullptr) {
- state->node_event_loop_factory->SetRealtimeOffset(
- channel_timestamp.monotonic_event_time,
- channel_timestamp.realtime_event_time);
- }
+ state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
+ channel_timestamp.realtime_event_time);
- state->channels[channel_index]->Send(
- channel_data.message().data()->Data(),
- channel_data.message().data()->size(),
- channel_timestamp.monotonic_remote_time,
- channel_timestamp.realtime_remote_time,
- channel_timestamp.remote_queue_index);
- } else if (state->channel_merger->at_end()) {
+ state->Send(channel_index, channel_data.message().data()->Data(),
+ channel_data.message().data()->size(),
+ channel_timestamp.monotonic_remote_time,
+ channel_timestamp.realtime_remote_time,
+ channel_timestamp.remote_queue_index);
+ } else if (state->at_end()) {
// We are at the end of the log file and found missing data. Finish
// reading the rest of the log file and call it quits. We don't want to
// replay partial data.
- while (state->channel_merger->OldestMessage() !=
- monotonic_clock::max_time) {
- state->channel_merger->PopOldest();
+ while (state->OldestMessageTime() != monotonic_clock::max_time) {
+ bool update_time_dummy;
+ state->PopOldest(&update_time_dummy);
}
}
@@ -839,17 +828,15 @@
<< " " << FlatbufferToJson(channel_data);
}
- const monotonic_clock::time_point next_time =
- state->channel_merger->OldestMessage();
+ const monotonic_clock::time_point next_time = state->OldestMessageTime();
if (next_time != monotonic_clock::max_time) {
- state->timer_handler->Setup(next_time);
+ state->Setup(next_time);
} else {
// Set a timer up immediately after now to die. If we don't do this, then
// the senders waiting on the message we just read will never get called.
if (event_loop_factory_ != nullptr) {
- state->timer_handler->Setup(monotonic_now +
- event_loop_factory_->send_delay() +
- std::chrono::nanoseconds(1));
+ state->Setup(monotonic_now + event_loop_factory_->send_delay() +
+ std::chrono::nanoseconds(1));
}
}
@@ -859,14 +846,12 @@
if (update_offsets) {
UpdateOffsets();
}
- });
+ }));
++live_nodes_;
- if (state->channel_merger->OldestMessage() != monotonic_clock::max_time) {
- event_loop->OnRun([state]() {
- state->timer_handler->Setup(state->channel_merger->OldestMessage());
- });
+ if (state->OldestMessageTime() != monotonic_clock::max_time) {
+ event_loop->OnRun([state]() { state->Setup(state->OldestMessageTime()); });
}
}
@@ -874,12 +859,7 @@
// Make sure that things get destroyed in the correct order, rather than
// relying on getting the order correct in the class definition.
for (std::unique_ptr<State> &state : states_) {
- for (size_t i = 0; i < state->channels.size(); ++i) {
- state->channels[i].reset();
- }
- state->event_loop_unique_ptr.reset();
- state->event_loop = nullptr;
- state->node_event_loop_factory = nullptr;
+ state->Deregister();
}
event_loop_factory_unique_ptr_.reset();
@@ -910,7 +890,7 @@
void LogReader::MakeRemappedConfig() {
for (std::unique_ptr<State> &state : states_) {
if (state) {
- CHECK(!state->event_loop)
+ CHECK(!state->event_loop())
<< ": Can't change the mapping after the events are scheduled.";
}
}
@@ -1021,5 +1001,103 @@
return remapped_channel;
}
+LogReader::State::State(std::unique_ptr<ChannelMerger> channel_merger)
+ : channel_merger_(std::move(channel_merger)) {}
+
+EventLoop *LogReader::State::SetNodeEventLoopFactory(
+ NodeEventLoopFactory *node_event_loop_factory) {
+ node_event_loop_factory_ = node_event_loop_factory;
+ event_loop_unique_ptr_ =
+ node_event_loop_factory_->MakeEventLoop("log_reader");
+ return event_loop_unique_ptr_.get();
+}
+
+void LogReader::State::SetChannelCount(size_t count) {
+ channels_.resize(count);
+ filters_.resize(count);
+ channel_target_event_loop_factory_.resize(count);
+}
+
+void LogReader::State::SetChannel(
+ size_t channel, std::unique_ptr<RawSender> sender,
+ std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
+ NodeEventLoopFactory *channel_target_event_loop_factory) {
+ channels_[channel] = std::move(sender);
+ filters_[channel] = filter;
+ channel_target_event_loop_factory_[channel] =
+ channel_target_event_loop_factory;
+}
+
+std::tuple<TimestampMerger::DeliveryTimestamp, int,
+ FlatbufferVector<MessageHeader>>
+LogReader::State::PopOldest(bool *update_time) {
+ CHECK_GT(sorted_messages_.size(), 0u);
+
+ std::tuple<TimestampMerger::DeliveryTimestamp, int,
+ FlatbufferVector<MessageHeader>>
+ result = std::move(sorted_messages_.front());
+ VLOG(1) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
+ << std::get<0>(result).monotonic_event_time;
+ sorted_messages_.pop_front();
+ SeedSortedMessages();
+
+ *update_time = false;
+ return std::make_tuple(std::get<0>(result), std::get<1>(result),
+ std::move(std::get<2>(result)));
+}
+
+monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
+ if (sorted_messages_.size() > 0) {
+ VLOG(1) << MaybeNodeName(event_loop_->node()) << "oldest message at "
+ << std::get<0>(sorted_messages_.front()).monotonic_event_time;
+ return std::get<0>(sorted_messages_.front()).monotonic_event_time;
+ }
+
+ return channel_merger_->OldestMessageTime();
+}
+
+void LogReader::State::SeedSortedMessages() {
+ const aos::monotonic_clock::time_point end_queue_time =
+ (sorted_messages_.size() > 0
+ ? std::get<0>(sorted_messages_.front()).monotonic_event_time
+ : channel_merger_->monotonic_start_time()) +
+ std::chrono::seconds(2);
+
+ while (true) {
+ if (channel_merger_->OldestMessageTime() == monotonic_clock::max_time) {
+ return;
+ }
+ if (sorted_messages_.size() > 0) {
+ // Stop placing sorted messages on the list once we have 2 seconds
+ // queued up (but queue at least until the log starts.
+ if (end_queue_time <
+ std::get<0>(sorted_messages_.back()).monotonic_event_time) {
+ return;
+ }
+ }
+
+ TimestampMerger::DeliveryTimestamp channel_timestamp;
+ int channel_index;
+ FlatbufferVector<MessageHeader> channel_data =
+ FlatbufferVector<MessageHeader>::Empty();
+
+ std::tie(channel_timestamp, channel_index, channel_data) =
+ channel_merger_->PopOldest();
+
+ sorted_messages_.emplace_back(channel_timestamp, channel_index,
+ std::move(channel_data));
+ }
+}
+
+void LogReader::State::Deregister() {
+ for (size_t i = 0; i < channels_.size(); ++i) {
+ channels_[i].reset();
+ }
+ event_loop_unique_ptr_.reset();
+ event_loop_ = nullptr;
+ timer_handler_ = nullptr;
+ node_event_loop_factory_ = nullptr;
+}
+
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index c08d6d4..a123dcc 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -374,19 +374,23 @@
Eigen::Matrix<double, Eigen::Dynamic, 1> SolveOffsets();
// State per node.
- struct State {
- // Log file.
- std::unique_ptr<ChannelMerger> channel_merger;
- // Senders.
- std::vector<std::unique_ptr<RawSender>> channels;
+ class State {
+ public:
+ State(std::unique_ptr<ChannelMerger> channel_merger);
- // Factory (if we are in sim) that this loop was created on.
- NodeEventLoopFactory *node_event_loop_factory = nullptr;
- std::unique_ptr<EventLoop> event_loop_unique_ptr;
- // Event loop.
- EventLoop *event_loop = nullptr;
- // And timer used to send messages.
- TimerHandler *timer_handler;
+ // Returns the timestamps, channel_index, and message from a channel.
+ // update_time (will be) set to true when popping this message causes the
+ // filter to change the time offset estimation function.
+ std::tuple<TimestampMerger::DeliveryTimestamp, int,
+ FlatbufferVector<MessageHeader>>
+ PopOldest(bool *update_time);
+
+ // Returns the monotonic time of the oldest message.
+ monotonic_clock::time_point OldestMessageTime() const;
+
+ // Primes the queues inside State. Should be called before calling
+ // OldestMessageTime.
+ void SeedSortedMessages();
// Updates the timestamp filter with the timestamp. Returns true if the
// provided timestamp was actually a forwarding timestamp and used, and
@@ -395,16 +399,131 @@
const TimestampMerger::DeliveryTimestamp &channel_timestamp,
int channel_index);
+ // Returns the starting time for this node.
+ monotonic_clock::time_point monotonic_start_time() const {
+ return channel_merger_->monotonic_start_time();
+ }
+ realtime_clock::time_point realtime_start_time() const {
+ return channel_merger_->realtime_start_time();
+ }
+
+ // Sets the node event loop factory for replaying into a
+ // SimulatedEventLoopFactory. Returns the EventLoop to use.
+ EventLoop *SetNodeEventLoopFactory(
+ NodeEventLoopFactory *node_event_loop_factory);
+
+ // Sets and gets the event loop to use.
+ void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
+ EventLoop *event_loop() { return event_loop_; }
+
+ // Returns the oldest timestamp for the provided channel. This should only
+ // be called before SeedSortedMessages();
+ TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
+ size_t channel) {
+ return channel_merger_->OldestTimestampForChannel(channel);
+ }
+
+ // Sets the current realtime offset from the monotonic clock for this node
+ // (if we are on a simulated event loop).
+ void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
+ realtime_clock::time_point realtime_time) {
+ if (node_event_loop_factory_ != nullptr) {
+ node_event_loop_factory_->SetRealtimeOffset(monotonic_time,
+ realtime_time);
+ }
+ }
+
+ // Converts a timestamp from the monotonic clock on this node to the
+ // distributed clock.
+ distributed_clock::time_point ToDistributedClock(
+ monotonic_clock::time_point time) {
+ return node_event_loop_factory_->ToDistributedClock(time);
+ }
+
+ // Sets the offset (and slope) from the distributed clock.
+ void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
+ double distributed_slope) {
+ node_event_loop_factory_->SetDistributedOffset(distributed_offset,
+ distributed_slope);
+ }
+
+ // Returns the current time on the remote node which sends messages on
+ // channel_index.
+ monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
+ return channel_target_event_loop_factory_[channel_index]->monotonic_now();
+ }
+
+ // Sets the node we will be merging as, and returns true if there is any
+ // data on it.
+ bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
+
+ // Sets the number of channels.
+ void SetChannelCount(size_t count);
+
+ // Sets the sender, filter, and target factory for a channel.
+ void SetChannel(
+ size_t channel, std::unique_ptr<RawSender> sender,
+ std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
+ NodeEventLoopFactory *channel_target_event_loop_factory);
+
+ // Returns if we have read all the messages from all the logs.
+ bool at_end() const { return channel_merger_->at_end(); }
+
+ // Unregisters everything so we can destory the event loop.
+ void Deregister();
+
+ // Sets the current TimerHandle for the replay callback.
+ void set_timer_handler(TimerHandler *timer_handler) {
+ timer_handler_ = timer_handler;
+ }
+
+ // Sets the next wakeup time on the replay callback.
+ void Setup(monotonic_clock::time_point next_time) {
+ timer_handler_->Setup(next_time);
+ }
+
+ // Sends a buffer on the provided channel index.
+ bool Send(size_t channel_index, const void *data, size_t size,
+ aos::monotonic_clock::time_point monotonic_remote_time,
+ aos::realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index) {
+ return channels_[channel_index]->Send(data, size, monotonic_remote_time,
+ realtime_remote_time,
+ remote_queue_index);
+ }
+
+ // Returns a debug string for the channel merger.
+ std::string DebugString() const { return channel_merger_->DebugString(); }
+
+ private:
+ // Log file.
+ std::unique_ptr<ChannelMerger> channel_merger_;
+
+ std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
+ FlatbufferVector<MessageHeader>>>
+ sorted_messages_;
+
+ // Senders.
+ std::vector<std::unique_ptr<RawSender>> channels_;
+
+ // Factory (if we are in sim) that this loop was created on.
+ NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
+ std::unique_ptr<EventLoop> event_loop_unique_ptr_;
+ // Event loop.
+ EventLoop *event_loop_ = nullptr;
+ // And timer used to send messages.
+ TimerHandler *timer_handler_;
+
// Filters (or nullptr if it isn't a forwarded channel) for each channel.
// This corresponds to the object which is shared among all the channels
// going between 2 nodes. The second element in the tuple indicates if this
// is the primary direction or not.
std::vector<std::tuple<message_bridge::ClippedAverageFilter *, bool>>
- filters;
+ filters_;
// List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
// channel) which correspond to the originating node.
- std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory;
+ std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory_;
};
// Node index -> State.