Make y2020 multinode!

Update the configs.  It all starts on a roboRIO, and forwards RobotState
between nodes.  Need to finalize deployment for pi's.

Change-Id: I2601419a94ca154e9663c916e9b8987d73ffa814
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 44381f3..f062ad3 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1048,6 +1048,12 @@
 
 TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestampForChannel(
     int channel) const {
+  // If we didn't find any data for this node, we won't have any mergers. Return
+  // an invalid timestamp in that case.
+  if (timestamp_mergers_.size() <= static_cast<size_t>(channel)) {
+    TimestampMerger::DeliveryTimestamp result;
+    return result;
+  }
   return timestamp_mergers_[channel].OldestTimestamp();
 }
 
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index ddd3dac..de9d344 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -299,17 +299,29 @@
       replay_configuration_(replay_configuration) {
   MakeRemappedConfig();
 
+  if (replay_configuration) {
+    CHECK_EQ(configuration::MultiNode(configuration()),
+             configuration::MultiNode(replay_configuration))
+        << ": Log file and replay config need to both be multi or single node.";
+  }
+
   if (!configuration::MultiNode(configuration())) {
     states_.emplace_back(std::make_unique<State>());
     State *state = states_[0].get();
 
     state->channel_merger = std::make_unique<ChannelMerger>(filenames);
   } else {
+    if (replay_configuration) {
+      CHECK_EQ(configuration()->nodes()->size(),
+               replay_configuration->nodes()->size())
+          << ": Log file and replay config need to have matching nodes lists.";
+    }
     states_.resize(configuration()->nodes()->size());
   }
 }
 
-LogReader::~LogReader() { Deregister();
+LogReader::~LogReader() {
+  Deregister();
   if (offset_fp_ != nullptr) {
     fclose(offset_fp_);
   }
@@ -657,7 +669,7 @@
   event_loop->SkipTimingReport();
   event_loop->SkipAosLog();
 
-  state->channel_merger->SetNode(event_loop->node());
+  const bool has_data = state->channel_merger->SetNode(event_loop->node());
 
   state->channels.resize(logged_configuration()->channels()->size());
   state->filters.resize(state->channels.size());
@@ -685,9 +697,16 @@
     }
   }
 
+  // If we didn't find any log files with data in them, we won't ever get a
+  // callback or be live.  So skip the rest of the setup.
+  if (!has_data) {
+    return;
+  }
+
   state->timer_handler = event_loop->AddTimer([this, state]() {
     if (state->channel_merger->OldestMessage() == monotonic_clock::max_time) {
       --live_nodes_;
+      VLOG(1) << "Node down!";
       if (live_nodes_ == 0) {
         event_loop_factory_->Exit();
       }
@@ -860,8 +879,10 @@
 
 void LogReader::MakeRemappedConfig() {
   for (std::unique_ptr<State> &state : states_) {
-    CHECK(!state->event_loop)
-        << ": Can't change the mapping after the events are scheduled.";
+    if (state) {
+      CHECK(!state->event_loop)
+          << ": Can't change the mapping after the events are scheduled.";
+    }
   }
 
   // If no remapping occurred and we are using the original config, then there
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 6b03b2f..1f56e46 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -32,8 +32,29 @@
 
   // Kicks us to re-fetch and schedule the timer.
   void Schedule() {
-    if (fetcher_->context().data == nullptr || sent_) {
-      sent_ = !fetcher_->FetchNext();
+    // Keep pulling messages out of the fetcher until we find one in the future.
+    while (true) {
+      if (fetcher_->context().data == nullptr || sent_) {
+        sent_ = !fetcher_->FetchNext();
+      }
+      if (sent_) {
+        break;
+      }
+      if (fetcher_->context().monotonic_event_time +
+              send_node_factory_->network_delay() +
+              send_node_factory_->send_delay() >
+          fetch_node_factory_->monotonic_now()) {
+        break;
+      }
+
+      // TODO(austin): Not cool.  We want to actually forward these.  This means
+      // we need a more sophisticated concept of what is running.
+      LOG(WARNING) << "Not forwarding message on "
+                   << configuration::CleanedChannelToString(fetcher_->channel())
+                   << " because we aren't running.  Set at "
+                   << fetcher_->context().monotonic_event_time << " now is "
+                   << fetch_node_factory_->monotonic_now();
+      sent_ = true;
     }
 
     if (fetcher_->context().data == nullptr) {