Support nodes rebooting

We have log files which span a reboot.  We want to be able to replay the
timeline across that reboot so we can run simulations and everything
else interesting.

This requires a bunch of stuff, unfortunately.

The most visible one is that we need to be able to "reboot" a node.
This means we need a way of starting it up and stopping it.  There are
now OnStartup and OnShutdown handlers in NodeEventLoopFactory to serve
this purpose, and better application context tracking to make it easier
to start and stop applications through a virtual starter.

This requires LogReader and the simulated network bridge to be
refactored to support nodes coming and going while the main application
continues to run.

From there, everything else is just a massive amount of plumbing of the
BootTimestamp through everything just short of the user.  Boot UUIDs
were put in TimeConverter so everything related to rebooting is all
nicely together.

Change-Id: I2cfb659c5764c1dd80dc66f33cfab3937159e324
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 20e38d2..9522351 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -57,8 +57,9 @@
     if (fd_ == -1 && errno == ENOSPC) {
       ran_out_of_space_ = true;
     } else {
-      PCHECK(fd_ != -1) << ": Failed to open " << filename << " for writing";
-      VLOG(1) << "Opened " << filename << " for writing";
+      PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
+                        << " for writing";
+      VLOG(1) << "Opened " << this->filename() << " for writing";
     }
   }
 }
@@ -147,7 +148,7 @@
     }
   }
   fd_ = -1;
-  VLOG(1) << "Closed " << filename_;
+  VLOG(1) << "Closed " << filename();
 }
 
 void DetachedBufferWriter::Flush() {
@@ -651,7 +652,7 @@
      << ", .queue_index=" << m.queue_index
      << ", .monotonic_event_time=" << m.monotonic_event_time
      << ", .realtime_event_time=" << m.realtime_event_time;
-  if (m.remote_queue_index != 0xffffffff) {
+  if (m.remote_queue_index != BootQueueIndex::Invalid()) {
     os << ", .remote_queue_index=" << m.remote_queue_index;
   }
   if (m.monotonic_remote_time != BootTimestamp::min_time()) {
@@ -713,7 +714,9 @@
 
       messages_.insert(Message{
           .channel_index = m.value().message().channel_index(),
-          .queue_index = m.value().message().queue_index(),
+          .queue_index =
+              BootQueueIndex{.boot = parts().boot_count,
+                             .index = m.value().message().queue_index()},
           .timestamp =
               BootTimestamp{
                   .boot = parts().boot_count,
@@ -986,7 +989,7 @@
       .monotonic_event_time = m->timestamp,
       .realtime_event_time = aos::realtime_clock::time_point(
           std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
-      .remote_queue_index = 0xffffffff,
+      .remote_queue_index = BootQueueIndex::Invalid(),
       .monotonic_remote_time = BootTimestamp::min_time(),
       .realtime_remote_time = realtime_clock::min_time,
       .monotonic_timestamp_time = BootTimestamp::min_time(),
@@ -1070,7 +1073,9 @@
         .monotonic_event_time = m->timestamp,
         .realtime_event_time = aos::realtime_clock::time_point(
             std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
-        .remote_queue_index = m->data.message().remote_queue_index(),
+        .remote_queue_index =
+            BootQueueIndex{.boot = m->monotonic_remote_boot,
+                           .index = m->data.message().remote_queue_index()},
         .monotonic_remote_time =
             {m->monotonic_remote_boot,
              monotonic_clock::time_point(std::chrono::nanoseconds(
@@ -1139,8 +1144,9 @@
 Message TimestampMapper::MatchingMessageFor(const Message &message) {
   // Figure out what queue index we are looking for.
   CHECK(message.data.message().has_remote_queue_index());
-  const uint32_t remote_queue_index =
-      message.data.message().remote_queue_index();
+  const BootQueueIndex remote_queue_index =
+      BootQueueIndex{.boot = message.monotonic_remote_boot,
+                    .index = message.data.message().remote_queue_index()};
 
   CHECK(message.data.message().has_monotonic_remote_time());
   CHECK(message.data.message().has_realtime_remote_time());
@@ -1199,11 +1205,17 @@
   // The algorithm below is constant time with some assumptions.  We need there
   // to be no missing messages in the data stream.  This also assumes a queue
   // hasn't wrapped.  That is conservative, but should let us get started.
-  if (data_queue->back().queue_index - data_queue->front().queue_index + 1u ==
-      data_queue->size()) {
+  if (data_queue->back().queue_index.boot ==
+          data_queue->front().queue_index.boot &&
+      (data_queue->back().queue_index.index -
+           data_queue->front().queue_index.index + 1u ==
+       data_queue->size())) {
+    CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
     // Pull the data out and confirm that the timestamps match as expected.
-    Message result = std::move(
-        (*data_queue)[remote_queue_index - data_queue->front().queue_index]);
+    //
+    // TODO(austin): Move if not reliable.
+    Message result = (*data_queue)[remote_queue_index.index -
+                                   data_queue->front().queue_index.index];
 
     CHECK_EQ(result.timestamp, monotonic_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please investigate!";
@@ -1213,15 +1225,20 @@
         << ": Queue index matches, but timestamp doesn't.  Please investigate!";
     // Now drop the data off the front.  We have deduplicated timestamps, so we
     // are done.  And all the data is in order.
-    data_queue->erase(data_queue->begin(),
-                      data_queue->begin() + (1 + remote_queue_index -
-                                             data_queue->front().queue_index));
+    data_queue->erase(
+        data_queue->begin(),
+        data_queue->begin() +
+            (remote_queue_index.index - data_queue->front().queue_index.index));
     return result;
   } else {
-    auto it = std::find_if(data_queue->begin(), data_queue->end(),
-                           [remote_queue_index](const Message &m) {
-                             return m.queue_index == remote_queue_index;
-                           });
+    // TODO(austin): Binary search.
+    auto it = std::find_if(
+        data_queue->begin(), data_queue->end(),
+        [remote_queue_index,
+         remote_boot = monotonic_remote_time.boot](const Message &m) {
+          return m.queue_index == remote_queue_index &&
+                 m.timestamp.boot == remote_boot;
+        });
     if (it == data_queue->end()) {
       return Message{
           .channel_index = message.channel_index,
@@ -1241,6 +1258,8 @@
              realtime_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please investigate!";
 
+    // TODO(austin): We still go in order, so we can erase from the beginning to
+    // our iterator minus 1.  That'll keep 1 in the queue.
     data_queue->erase(it);
 
     return result;