Make y2020 multinode!
Update the configs. It all starts on a roboRIO, and forwards RobotState
between nodes. Need to finalize deployment for pi's.
Change-Id: I2601419a94ca154e9663c916e9b8987d73ffa814
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 44381f3..f062ad3 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1048,6 +1048,12 @@
TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestampForChannel(
int channel) const {
+ // If we didn't find any data for this node, we won't have any mergers. Return
+ // an invalid timestamp in that case.
+ if (timestamp_mergers_.size() <= static_cast<size_t>(channel)) {
+ TimestampMerger::DeliveryTimestamp result;
+ return result;
+ }
return timestamp_mergers_[channel].OldestTimestamp();
}
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index ddd3dac..de9d344 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -299,17 +299,29 @@
replay_configuration_(replay_configuration) {
MakeRemappedConfig();
+ if (replay_configuration) {
+ CHECK_EQ(configuration::MultiNode(configuration()),
+ configuration::MultiNode(replay_configuration))
+ << ": Log file and replay config need to both be multi or single node.";
+ }
+
if (!configuration::MultiNode(configuration())) {
states_.emplace_back(std::make_unique<State>());
State *state = states_[0].get();
state->channel_merger = std::make_unique<ChannelMerger>(filenames);
} else {
+ if (replay_configuration) {
+ CHECK_EQ(configuration()->nodes()->size(),
+ replay_configuration->nodes()->size())
+ << ": Log file and replay config need to have matching nodes lists.";
+ }
states_.resize(configuration()->nodes()->size());
}
}
-LogReader::~LogReader() { Deregister();
+LogReader::~LogReader() {
+ Deregister();
if (offset_fp_ != nullptr) {
fclose(offset_fp_);
}
@@ -657,7 +669,7 @@
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
- state->channel_merger->SetNode(event_loop->node());
+ const bool has_data = state->channel_merger->SetNode(event_loop->node());
state->channels.resize(logged_configuration()->channels()->size());
state->filters.resize(state->channels.size());
@@ -685,9 +697,16 @@
}
}
+ // If we didn't find any log files with data in them, we won't ever get a
+ // callback or be live. So skip the rest of the setup.
+ if (!has_data) {
+ return;
+ }
+
state->timer_handler = event_loop->AddTimer([this, state]() {
if (state->channel_merger->OldestMessage() == monotonic_clock::max_time) {
--live_nodes_;
+ VLOG(1) << "Node down!";
if (live_nodes_ == 0) {
event_loop_factory_->Exit();
}
@@ -860,8 +879,10 @@
void LogReader::MakeRemappedConfig() {
for (std::unique_ptr<State> &state : states_) {
- CHECK(!state->event_loop)
- << ": Can't change the mapping after the events are scheduled.";
+ if (state) {
+ CHECK(!state->event_loop)
+ << ": Can't change the mapping after the events are scheduled.";
+ }
}
// If no remapping occurred and we are using the original config, then there
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 6b03b2f..1f56e46 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -32,8 +32,29 @@
// Kicks us to re-fetch and schedule the timer.
void Schedule() {
- if (fetcher_->context().data == nullptr || sent_) {
- sent_ = !fetcher_->FetchNext();
+ // Keep pulling messages out of the fetcher until we find one in the future.
+ while (true) {
+ if (fetcher_->context().data == nullptr || sent_) {
+ sent_ = !fetcher_->FetchNext();
+ }
+ if (sent_) {
+ break;
+ }
+ if (fetcher_->context().monotonic_event_time +
+ send_node_factory_->network_delay() +
+ send_node_factory_->send_delay() >
+ fetch_node_factory_->monotonic_now()) {
+ break;
+ }
+
+ // TODO(austin): Not cool. We want to actually forward these. This means
+ // we need a more sophisticated concept of what is running.
+ LOG(WARNING) << "Not forwarding message on "
+ << configuration::CleanedChannelToString(fetcher_->channel())
+ << " because we aren't running. Set at "
+ << fetcher_->context().monotonic_event_time << " now is "
+ << fetch_node_factory_->monotonic_now();
+ sent_ = true;
}
if (fetcher_->context().data == nullptr) {