Merge "Add a utility to print out contents of queue memory"
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 8e5e27c..7125bda 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -461,7 +461,7 @@
       }
     } else {
       if (!NextLogFile()) {
-        VLOG(1) << "End of log file " << filenames_.back();
+        VLOG(1) << "No more files, last was " << filenames_.back();
         at_end_ = true;
         for (MessageHeaderQueue *queue : channels_to_write_) {
           if (queue == nullptr || queue->timestamp_merger == nullptr) {
@@ -541,7 +541,8 @@
       std::move(channels_[channel_index].data.front());
   channels_[channel_index].data.pop_front();
 
-  VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp);
+  VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp) << " for "
+          << channel_index;
 
   QueueMessages(std::get<0>(timestamp));
 
@@ -559,7 +560,8 @@
       std::move(channels_[channel].timestamps[node_index].front());
   channels_[channel].timestamps[node_index].pop_front();
 
-  VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp);
+  VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp) << " for "
+          << channel << " on " << node_index;
 
   QueueMessages(std::get<0>(timestamp));
 
@@ -693,7 +695,7 @@
 
   // If we are just a data merger, don't wait for timestamps.
   if (!has_timestamps_) {
-    channel_merger_->Update(std::get<0>(timestamp), channel_index_);
+    channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
     pushed_ = true;
   }
 }
@@ -737,7 +739,7 @@
   // If we are a timestamp merger, don't wait for data.  Missing data will be
   // caught at read time.
   if (has_timestamps_) {
-    channel_merger_->Update(std::get<0>(timestamp), channel_index_);
+    channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
     pushed_ = true;
   }
 }
@@ -766,7 +768,7 @@
   CHECK_EQ(std::get<1>(oldest_message), std::get<1>(oldest_message_reader));
 
   // Now, keep reading until we have found all duplicates.
-  while (message_heap_.size() > 0u) {
+  while (!message_heap_.empty()) {
     // See if it is a duplicate.
     std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
         next_oldest_message_reader = message_heap_.front();
@@ -888,7 +890,7 @@
             std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
 
     // See if we have any data.  If not, pass the problem up the chain.
-    if (message_heap_.size() == 0u) {
+    if (message_heap_.empty()) {
       VLOG(1) << "No data to match timestamp on "
               << configuration::CleanedChannelToString(
                      configuration_->channels()->Get(channel_index_));
@@ -1072,14 +1074,14 @@
 }
 
 monotonic_clock::time_point ChannelMerger::OldestMessage() const {
-  if (channel_heap_.size() == 0u) {
+  if (channel_heap_.empty()) {
     return monotonic_clock::max_time;
   }
   return channel_heap_.front().first;
 }
 
 TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestamp() const {
-  if (timestamp_heap_.size() == 0u) {
+  if (timestamp_heap_.empty()) {
     return TimestampMerger::DeliveryTimestamp{};
   }
   return timestamp_mergers_[timestamp_heap_.front().second].OldestTimestamp();
@@ -1101,20 +1103,34 @@
   // Pop and recreate the heap if it has already been pushed.  And since we are
   // pushing again, we don't need to clear pushed.
   if (timestamp_mergers_[channel_index].pushed()) {
-    channel_heap_.erase(std::find_if(
+    const auto channel_iterator = std::find_if(
         channel_heap_.begin(), channel_heap_.end(),
         [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
           return x.second == channel_index;
-        }));
+        });
+    DCHECK(channel_iterator != channel_heap_.end());
+    if (std::get<0>(*channel_iterator) == timestamp) {
+      // It's already in the heap, in the correct spot, so nothing
+      // more for us to do here.
+      return;
+    }
+    channel_heap_.erase(channel_iterator);
     std::make_heap(channel_heap_.begin(), channel_heap_.end(),
                    ChannelHeapCompare);
 
     if (timestamp_mergers_[channel_index].has_timestamps()) {
-      timestamp_heap_.erase(std::find_if(
+      const auto timestamp_iterator = std::find_if(
           timestamp_heap_.begin(), timestamp_heap_.end(),
           [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
             return x.second == channel_index;
-          }));
+          });
+      DCHECK(timestamp_iterator != timestamp_heap_.end());
+      if (std::get<0>(*timestamp_iterator) == timestamp) {
+        // It's already in the heap, in the correct spot, so nothing
+        // more for us to do here.
+        return;
+      }
+      timestamp_heap_.erase(timestamp_iterator);
       std::make_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
                      ChannelHeapCompare);
     }
@@ -1164,6 +1180,10 @@
   std::tuple<TimestampMerger::DeliveryTimestamp,
              FlatbufferVector<MessageHeader>>
       message = merger->PopOldest();
+  DCHECK_EQ(std::get<0>(message).monotonic_event_time,
+            oldest_channel_data.first)
+      << ": channel_heap_ was corrupted for " << channel_index << ": "
+      << DebugString();
 
   return std::make_tuple(std::get<0>(message), channel_index,
                          std::move(std::get<1>(message)));
@@ -1247,7 +1267,7 @@
     std::vector<
         std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
         message_heap = message_heap_;
-    while (message_heap.size() > 0u) {
+    while (!message_heap.empty()) {
       std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
           oldest_message_reader = message_heap.front();
 
@@ -1276,7 +1296,7 @@
   ss << "channel_heap {\n";
   std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
       channel_heap_;
-  while (channel_heap.size() > 0u) {
+  while (!channel_heap.empty()) {
     std::tuple<monotonic_clock::time_point, int> channel = channel_heap.front();
     ss << "  " << std::get<0>(channel) << " (" << std::get<1>(channel) << ") "
        << configuration::CleanedChannelToString(
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index e767427..8969ee2 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -599,7 +599,9 @@
   // to timestamps on log files where the timestamp log file starts before the
   // data.  In this case, it is reasonable to expect missing data.
   ignore_missing_data_ = true;
+  VLOG(1) << "Running until start time: " << start_time;
   event_loop_factory_->RunFor(start_time.time_since_epoch());
+  VLOG(1) << "At start time";
   // Now that we are running for real, missing data means that the log file is
   // corrupted or went wrong.
   ignore_missing_data_ = false;
diff --git a/aos/events/pingpong.json b/aos/events/pingpong.json
index 0cdba93..db727c6 100644
--- a/aos/events/pingpong.json
+++ b/aos/events/pingpong.json
@@ -2,11 +2,13 @@
   "channels": [
     {
       "name": "/test",
-      "type": "aos.examples.Ping"
+      "type": "aos.examples.Ping",
+      "frequency": 2000
     },
     {
       "name": "/test",
-      "type": "aos.examples.Pong"
+      "type": "aos.examples.Pong",
+      "frequency": 2000
     }
   ],
   "imports": [
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 2383050..d2c9112 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -190,8 +190,7 @@
 
 class SimpleShmFetcher {
  public:
-  explicit SimpleShmFetcher(ShmEventLoop *event_loop, const Channel *channel,
-                            bool copy_data)
+  explicit SimpleShmFetcher(ShmEventLoop *event_loop, const Channel *channel)
       : event_loop_(event_loop),
         channel_(channel),
         lockless_queue_memory_(
@@ -200,10 +199,6 @@
                 event_loop->configuration()->channel_storage_duration()))),
         lockless_queue_(lockless_queue_memory_.memory(),
                         lockless_queue_memory_.config()) {
-    if (copy_data) {
-      data_storage_.reset(static_cast<char *>(
-          malloc(channel->max_size() + kChannelDataAlignment - 1)));
-    }
     context_.data = nullptr;
     // Point the queue index at the next index to read starting now.  This
     // makes it such that FetchNext will read the next message sent after
@@ -213,6 +208,13 @@
 
   ~SimpleShmFetcher() {}
 
+  // Sets this object to copy data out of the shared memory into a private
+  // buffer when fetching.
+  void CopyDataOnFetch() {
+    data_storage_.reset(static_cast<char *>(
+        malloc(channel_->max_size() + kChannelDataAlignment - 1)));
+  }
+
   // Points the next message to fetch at the queue index which will be
   // populated next.
   void PointAtNextQueueIndex() {
@@ -228,47 +230,8 @@
   }
 
   bool FetchNext() {
-    // TODO(austin): Get behind and make sure it dies both here and with
-    // Fetch.
-    ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
-        actual_queue_index_.index(), &context_.monotonic_event_time,
-        &context_.realtime_event_time, &context_.monotonic_remote_time,
-        &context_.realtime_remote_time, &context_.remote_queue_index,
-        &context_.size, data_storage_start());
-    if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
-      context_.queue_index = actual_queue_index_.index();
-      if (context_.remote_queue_index == 0xffffffffu) {
-        context_.remote_queue_index = context_.queue_index;
-      }
-      if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
-        context_.monotonic_remote_time = context_.monotonic_event_time;
-      }
-      if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
-        context_.realtime_remote_time = context_.realtime_event_time;
-      }
-      if (copy_data()) {
-        context_.data = data_storage_start() +
-                        lockless_queue_.message_data_size() - context_.size;
-      } else {
-        context_.data = nullptr;
-      }
-      actual_queue_index_ = actual_queue_index_.Increment();
-    }
-
-    // Make sure the data wasn't modified while we were reading it.  This
-    // can only happen if you are reading the last message *while* it is
-    // being written to, which means you are pretty far behind.
-    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
-        << ": Got behind while reading and the last message was modified "
-           "out from under us while we were reading it.  Don't get so far "
-           "behind.  "
-        << configuration::CleanedChannelToString(channel_);
-
-    if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
-      event_loop_->SendTimingReport();
-      LOG(FATAL) << "The next message is no longer available.  "
-                 << configuration::CleanedChannelToString(channel_);
-    }
+    const ipc_lib::LocklessQueue::ReadResult read_result =
+        DoFetch(actual_queue_index_);
 
     return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
   }
@@ -287,53 +250,12 @@
       return false;
     }
 
-    ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
-        queue_index.index(), &context_.monotonic_event_time,
-        &context_.realtime_event_time, &context_.monotonic_remote_time,
-        &context_.realtime_remote_time, &context_.remote_queue_index,
-        &context_.size, data_storage_start());
-    if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
-      context_.queue_index = queue_index.index();
-      if (context_.remote_queue_index == 0xffffffffu) {
-        context_.remote_queue_index = context_.queue_index;
-      }
-      if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
-        context_.monotonic_remote_time = context_.monotonic_event_time;
-      }
-      if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
-        context_.realtime_remote_time = context_.realtime_event_time;
-      }
-      if (copy_data()) {
-        context_.data = data_storage_start() +
-                        lockless_queue_.message_data_size() - context_.size;
-      } else {
-        context_.data = nullptr;
-      }
-      actual_queue_index_ = queue_index.Increment();
-    }
-
-    // Make sure the data wasn't modified while we were reading it.  This
-    // can only happen if you are reading the last message *while* it is
-    // being written to, which means you are pretty far behind.
-    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
-        << ": Got behind while reading and the last message was modified "
-           "out from under us while we were reading it.  Don't get so far "
-           "behind."
-        << configuration::CleanedChannelToString(channel_);
+    const ipc_lib::LocklessQueue::ReadResult read_result = DoFetch(queue_index);
 
     CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
         << ": Queue index went backwards.  This should never happen.  "
         << configuration::CleanedChannelToString(channel_);
 
-    // We fell behind between when we read the index and read the value.
-    // This isn't worth recovering from since this means we went to sleep
-    // for a long time in the middle of this function.
-    if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
-      event_loop_->SendTimingReport();
-      LOG(FATAL) << "The next message is no longer available.  "
-                 << configuration::CleanedChannelToString(channel_);
-    }
-
     return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
   }
 
@@ -350,17 +272,83 @@
   }
 
   absl::Span<char> GetPrivateMemory() const {
-    CHECK(copy_data());
+    // Can't usefully expose this for pinning, because the buffer changes
+    // address for each message. Callers who want to work with that should just
+    // grab the whole shared memory buffer instead.
     return absl::Span<char>(
         const_cast<SimpleShmFetcher *>(this)->data_storage_start(),
         lockless_queue_.message_data_size());
   }
 
  private:
-  char *data_storage_start() {
-    if (!copy_data()) return nullptr;
+  ipc_lib::LocklessQueue::ReadResult DoFetch(ipc_lib::QueueIndex queue_index) {
+    // TODO(austin): Get behind and make sure it dies.
+    char *copy_buffer = nullptr;
+    if (copy_data()) {
+      copy_buffer = data_storage_start();
+    }
+    ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
+        queue_index.index(), &context_.monotonic_event_time,
+        &context_.realtime_event_time, &context_.monotonic_remote_time,
+        &context_.realtime_remote_time, &context_.remote_queue_index,
+        &context_.size, copy_buffer);
+
+    if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+      context_.queue_index = queue_index.index();
+      if (context_.remote_queue_index == 0xffffffffu) {
+        context_.remote_queue_index = context_.queue_index;
+      }
+      if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
+        context_.monotonic_remote_time = context_.monotonic_event_time;
+      }
+      if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
+        context_.realtime_remote_time = context_.realtime_event_time;
+      }
+      const char *const data = DataBuffer();
+      if (data) {
+        context_.data =
+            data + lockless_queue_.message_data_size() - context_.size;
+      } else {
+        context_.data = nullptr;
+      }
+      actual_queue_index_ = queue_index.Increment();
+    }
+
+    // Make sure the data wasn't modified while we were reading it.  This
+    // can only happen if you are reading the last message *while* it is
+    // being written to, which means you are pretty far behind.
+    CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+        << ": Got behind while reading and the last message was modified "
+           "out from under us while we were reading it.  Don't get so far "
+           "behind on: "
+        << configuration::CleanedChannelToString(channel_);
+
+    // We fell behind between when we read the index and read the value.
+    // This isn't worth recovering from since this means we went to sleep
+    // for a long time in the middle of this function.
+    if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
+      event_loop_->SendTimingReport();
+      LOG(FATAL) << "The next message is no longer available.  "
+                 << configuration::CleanedChannelToString(channel_);
+    }
+
+    return read_result;
+  }
+
+  char *data_storage_start() const {
+    CHECK(copy_data());
     return RoundChannelData(data_storage_.get(), channel_->max_size());
   }
+
+  // Note that for some modes the return value will change as new messages are
+  // read.
+  const char *DataBuffer() const {
+    if (copy_data()) {
+      return data_storage_start();
+    }
+    return nullptr;
+  }
+
   bool copy_data() const { return static_cast<bool>(data_storage_); }
 
   aos::ShmEventLoop *event_loop_;
@@ -381,7 +369,9 @@
  public:
   explicit ShmFetcher(ShmEventLoop *event_loop, const Channel *channel)
       : RawFetcher(event_loop, channel),
-        simple_shm_fetcher_(event_loop, channel, true) {}
+        simple_shm_fetcher_(event_loop, channel) {
+    simple_shm_fetcher_.CopyDataOnFetch();
+  }
 
   ~ShmFetcher() { context_.data = nullptr; }
 
@@ -487,7 +477,11 @@
       : WatcherState(event_loop, channel, std::move(fn)),
         event_loop_(event_loop),
         event_(this),
-        simple_shm_fetcher_(event_loop, channel, copy_data) {}
+        simple_shm_fetcher_(event_loop, channel) {
+    if (copy_data) {
+      simple_shm_fetcher_.CopyDataOnFetch();
+    }
+  }
 
   ~ShmWatcherState() override { event_loop_->RemoveEvent(&event_); }
 
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index ee4e6d6..bff04b9 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -3,21 +3,38 @@
 #include <algorithm>
 #include <deque>
 #include <string_view>
+#include <vector>
 
 #include "absl/container/btree_map.h"
+#include "aos/events/aos_logging.h"
 #include "aos/events/simulated_network_bridge.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/util/phased_loop.h"
-#include "aos/events/aos_logging.h"
 
 namespace aos {
 
+class SimulatedEventLoop;
+class SimulatedFetcher;
+class SimulatedChannel;
+
+namespace {
+
 // Container for both a message, and the context for it for simulation.  This
 // makes tracking the timestamps associated with the data easy.
-struct SimulatedMessage {
+struct SimulatedMessage final {
+  SimulatedMessage(const SimulatedMessage &) = delete;
+  SimulatedMessage &operator=(const SimulatedMessage &) = delete;
+
+  // Creates a SimulatedMessage with size bytes of storage.
+  // This is a shared_ptr so we don't have to implement refcounting or copying.
+  static std::shared_ptr<SimulatedMessage> Make(SimulatedChannel *channel);
+
   // Context for the data.
   Context context;
 
+  SimulatedChannel *const channel = nullptr;
+  int buffer_index;
+
   // The data.
   char *data(size_t buffer_size) {
     return RoundChannelData(&actual_data[0], buffer_size);
@@ -26,12 +43,21 @@
   // Then the data, including padding on the end so we can align the buffer we
   // actually return from data().
   char actual_data[];
+
+ private:
+  SimulatedMessage(SimulatedChannel *channel_in);
+  ~SimulatedMessage();
+
+  static void DestroyAndFree(SimulatedMessage *p) {
+    p->~SimulatedMessage();
+    free(p);
+  }
 };
 
-class SimulatedEventLoop;
-class SimulatedFetcher;
-class SimulatedChannel;
+}  // namespace
 
+// TODO(Brian): This should be in the anonymous namespace, but that annoys GCC
+// for some reason...
 class SimulatedWatcher : public WatcherState {
  public:
   SimulatedWatcher(
@@ -65,12 +91,59 @@
 
 class SimulatedChannel {
  public:
-  explicit SimulatedChannel(const Channel *channel, EventScheduler *scheduler)
+  explicit SimulatedChannel(const Channel *channel, EventScheduler *scheduler,
+                            std::chrono::nanoseconds channel_storage_duration)
       : channel_(channel),
         scheduler_(scheduler),
-        next_queue_index_(ipc_lib::QueueIndex::Zero(channel->max_size())) {}
+        channel_storage_duration_(channel_storage_duration),
+        next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())) {
+    available_buffer_indices_.reserve(number_buffers());
+    for (int i = 0; i < number_buffers(); ++i) {
+      available_buffer_indices_.push_back(i);
+    }
+  }
 
-  ~SimulatedChannel() { CHECK_EQ(0u, fetchers_.size()); }
+  ~SimulatedChannel() {
+    latest_message_.reset();
+    CHECK_EQ(static_cast<size_t>(number_buffers()),
+             available_buffer_indices_.size());
+    CHECK_EQ(0u, fetchers_.size());
+    CHECK_EQ(0u, watchers_.size());
+    CHECK_EQ(0, sender_count_);
+  }
+
+  // The number of messages we pretend to have in the queue.
+  int queue_size() const {
+    return channel()->frequency() *
+           std::chrono::duration_cast<std::chrono::duration<double>>(
+               channel_storage_duration_)
+               .count();
+  }
+
+  // The number of extra buffers (beyond the queue) we pretend to have.
+  int number_scratch_buffers() const {
+    // We need to start creating messages before we know how many
+    // senders+readers we'll have, so we need to just pick something which is
+    // always big enough.
+    return 50;
+  }
+
+  int number_buffers() const { return queue_size() + number_scratch_buffers(); }
+
+  int GetBufferIndex() {
+    CHECK(!available_buffer_indices_.empty()) << ": This should be impossible";
+    const int result = available_buffer_indices_.back();
+    available_buffer_indices_.pop_back();
+    return result;
+  }
+
+  void FreeBufferIndex(int i) {
+    DCHECK(std::find(available_buffer_indices_.begin(),
+                     available_buffer_indices_.end(),
+                     i) == available_buffer_indices_.end())
+        << ": Buffer is not in use: " << i;
+    available_buffer_indices_.push_back(i);
+  }
 
   // Makes a connected raw sender which calls Send below.
   ::std::unique_ptr<RawSender> MakeRawSender(EventLoop *event_loop);
@@ -103,6 +176,7 @@
   const Channel *channel() const { return channel_; }
 
   void CountSenderCreated() {
+    CheckBufferCount();
     if (sender_count_ >= channel()->num_senders()) {
       LOG(FATAL) << "Failed to create sender on "
                  << configuration::CleanedChannelToString(channel())
@@ -116,7 +190,11 @@
   }
 
  private:
-  const Channel *channel_;
+  void CheckBufferCount() { CHECK_LT(sender_count_, number_scratch_buffers()); }
+
+  const Channel *const channel_;
+  EventScheduler *const scheduler_;
+  const std::chrono::nanoseconds channel_storage_duration_;
 
   // List of all watchers.
   ::std::vector<SimulatedWatcher *> watchers_;
@@ -124,24 +202,36 @@
   // List of all fetchers.
   ::std::vector<SimulatedFetcher *> fetchers_;
   std::shared_ptr<SimulatedMessage> latest_message_;
-  EventScheduler *scheduler_;
 
   ipc_lib::QueueIndex next_queue_index_;
 
   int sender_count_ = 0;
+
+  std::vector<uint16_t> available_buffer_indices_;
 };
 
 namespace {
 
-// Creates a SimulatedMessage with size bytes of storage.
-// This is a shared_ptr so we don't have to implement refcounting or copying.
-std::shared_ptr<SimulatedMessage> MakeSimulatedMessage(size_t size) {
-  SimulatedMessage *message = reinterpret_cast<SimulatedMessage *>(
+std::shared_ptr<SimulatedMessage> SimulatedMessage::Make(
+    SimulatedChannel *channel) {
+  const size_t size = channel->max_size();
+  SimulatedMessage *const message = reinterpret_cast<SimulatedMessage *>(
       malloc(sizeof(SimulatedMessage) + size + kChannelDataAlignment - 1));
+  new (message) SimulatedMessage(channel);
   message->context.size = size;
   message->context.data = message->data(size);
 
-  return std::shared_ptr<SimulatedMessage>(message, free);
+  return std::shared_ptr<SimulatedMessage>(message,
+                                           &SimulatedMessage::DestroyAndFree);
+}
+
+SimulatedMessage::SimulatedMessage(SimulatedChannel *channel_in)
+    : channel(channel_in) {
+  buffer_index = channel->GetBufferIndex();
+}
+
+SimulatedMessage::~SimulatedMessage() {
+  channel->FreeBufferIndex(buffer_index);
 }
 
 class SimulatedSender : public RawSender {
@@ -156,7 +246,7 @@
 
   void *data() override {
     if (!message_) {
-      message_ = MakeSimulatedMessage(simulated_channel_->max_size());
+      message_ = SimulatedMessage::Make(simulated_channel_);
     }
     return message_->data(simulated_channel_->max_size());
   }
@@ -196,7 +286,7 @@
 
     // This is wasteful, but since flatbuffers fill from the back end of the
     // queue, we need it to be full sized.
-    message_ = MakeSimulatedMessage(simulated_channel_->max_size());
+    message_ = SimulatedMessage::Make(simulated_channel_);
 
     // Now fill in the message.  size is already populated above, and
     // queue_index will be populated in simulated_channel_.  Put this at the
@@ -230,6 +320,8 @@
       return std::make_pair(false, monotonic_clock::min_time);
     }
 
+    CHECK(!fell_behind_) << ": Got behind";
+
     SetMsg(msgs_.front());
     msgs_.pop_front();
     return std::make_pair(true, event_loop()->monotonic_now());
@@ -251,6 +343,7 @@
     // latest message from before we started.
     SetMsg(msgs_.back());
     msgs_.clear();
+    fell_behind_ = false;
     return std::make_pair(true, event_loop()->monotonic_now());
   }
 
@@ -275,6 +368,14 @@
   // Internal method for Simulation to add a message to the buffer.
   void Enqueue(std::shared_ptr<SimulatedMessage> buffer) {
     msgs_.emplace_back(buffer);
+    if (fell_behind_ ||
+        msgs_.size() > static_cast<size_t>(simulated_channel_->queue_size())) {
+      fell_behind_ = true;
+      // Might as well empty out all the intermediate messages now.
+      while (msgs_.size() > 1) {
+        msgs_.pop_front();
+      }
+    }
   }
 
   SimulatedChannel *simulated_channel_;
@@ -282,6 +383,9 @@
 
   // Messages queued up but not in use.
   ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
+
+  // Whether we're currently "behind", which means a FetchNext call will fail.
+  bool fell_behind_ = false;
 };
 
 class SimulatedTimerHandler : public TimerHandler {
@@ -332,8 +436,7 @@
 class SimulatedEventLoop : public EventLoop {
  public:
   explicit SimulatedEventLoop(
-      EventScheduler *scheduler,
-      NodeEventLoopFactory *node_event_loop_factory,
+      EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
       absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
           *channels,
       const Configuration *configuration,
@@ -532,8 +635,10 @@
   if (it == channels_->end()) {
     it = channels_
              ->emplace(SimpleChannel(channel),
-                       std::unique_ptr<SimulatedChannel>(
-                           new SimulatedChannel(channel, scheduler_)))
+                       std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
+                           channel, scheduler_,
+                           std::chrono::nanoseconds(
+                               configuration()->channel_storage_duration()))))
              .first;
   }
   return it->second.get();
diff --git a/aos/flatbuffers.cc b/aos/flatbuffers.cc
index 06d11f4..4f6497e 100644
--- a/aos/flatbuffers.cc
+++ b/aos/flatbuffers.cc
@@ -4,11 +4,12 @@
 
 namespace aos {
 
-uint8_t *FixedAllocatorBase::allocate(size_t) {
+uint8_t *FixedAllocatorBase::allocate(size_t allocated_size) {
   if (is_allocated_) {
     LOG(FATAL) << "Can't allocate more memory with a fixed size allocator.  "
                   "Increase the memory reserved.";
   }
+  CHECK_LE(allocated_size, size());
 
   is_allocated_ = true;
   return data();
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 4aa0ff8..e016770 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -19,7 +19,12 @@
   // TODO(austin): Read the contract for these.
   uint8_t *allocate(size_t) override;
 
-  void deallocate(uint8_t *, size_t) override { is_allocated_ = false; }
+  void deallocate(uint8_t *allocated_data, size_t allocated_size) override {
+    DCHECK_LE(allocated_size, size());
+    DCHECK_EQ(allocated_data, data());
+    CHECK(is_allocated_);
+    is_allocated_ = false;
+  }
 
   uint8_t *reallocate_downward(uint8_t *, size_t, size_t, size_t,
                                size_t) override;
@@ -28,7 +33,10 @@
   virtual uint8_t *data() = 0;
   virtual size_t size() const = 0;
 
-  void Reset() { is_allocated_ = false; }
+  void Reset() {
+    CHECK(!is_allocated_);
+    is_allocated_ = false;
+  }
   bool is_allocated() const { return is_allocated_; }
 
   bool allocated() { return is_allocated_; }
@@ -241,31 +249,38 @@
 template <typename T, size_t Size>
 class FlatbufferFixedAllocatorArray final : public Flatbuffer<T> {
  public:
-  FlatbufferFixedAllocatorArray() : buffer_(), allocator_(&buffer_[0], Size) {
-    builder_ = flatbuffers::FlatBufferBuilder(Size, &allocator_);
-    builder_.ForceDefaults(true);
+  FlatbufferFixedAllocatorArray() : buffer_(), allocator_(&buffer_[0], Size) {}
+
+  FlatbufferFixedAllocatorArray(const FlatbufferFixedAllocatorArray &) = delete;
+  void operator=(const Flatbuffer<T> &) = delete;
+
+  void CopyFrom(const Flatbuffer<T> &other) {
+    CHECK(!allocator_.is_allocated()) << ": May not overwrite while building";
+    memcpy(buffer_.begin(), other.data(), other.size());
+    data_ = buffer_.begin();
+    size_ = other.size();
   }
 
   void Reset() {
-    allocator_.Reset();
+    CHECK(!allocator_.is_allocated()) << ": May not reset while building";
     builder_ = flatbuffers::FlatBufferBuilder(Size, &allocator_);
     builder_.ForceDefaults(true);
   }
 
   flatbuffers::FlatBufferBuilder *Builder() {
-    if (allocator_.allocated()) {
-      LOG(FATAL) << "Array backed flatbuffer can only be built once";
-    }
+    CHECK(!allocator_.allocated())
+        << ": Array backed flatbuffer can only be built once";
+    builder_ = flatbuffers::FlatBufferBuilder(Size, &allocator_);
+    builder_.ForceDefaults(true);
     return &builder_;
   }
 
   void Finish(flatbuffers::Offset<T> root) {
-    if (!allocator_.allocated()) {
-      LOG(FATAL) << "Cannot finish if never building";
-    }
+    CHECK(allocator_.allocated()) << ": Cannot finish if not building";
     builder_.Finish(root);
     data_ = builder_.GetBufferPointer();
     size_ = builder_.GetSize();
+    DCHECK_LE(size_, Size);
   }
 
   const uint8_t *data() const override {
@@ -284,8 +299,6 @@
   flatbuffers::FlatBufferBuilder builder_;
   uint8_t *data_ = nullptr;
   size_t size_ = 0;
-
-  DISALLOW_COPY_AND_ASSIGN(FlatbufferFixedAllocatorArray);
 };
 
 // This object associates the message type with the memory storing the
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index a450560..d9a1a71 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -38,7 +38,9 @@
   LocklessQueueMemory *const memory_;
 };
 
-void Cleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
+// Returns true if it succeeded. Returns false if another sender died in the
+// middle.
+bool DoCleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
   // Make sure we start looking at shared memory fresh right now. We'll handle
   // people dying partway through by either cleaning up after them or not, but
   // we want to ensure we clean up after anybody who has already died when we
@@ -66,6 +68,8 @@
   // queue is active while we do this, it may take a couple of go arounds to see
   // everything.
 
+  ::std::vector<bool> need_recovery(num_senders, false);
+
   // Do the easy case.  Find all senders who have died.  See if they are either
   // consistent already, or if they have copied over to_replace to the scratch
   // index, but haven't cleared to_replace.  Count them.
@@ -74,65 +78,71 @@
     Sender *sender = memory->GetSender(i);
     const uint32_t tid =
         __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
-    if (tid & FUTEX_OWNER_DIED) {
-      VLOG(3) << "Found an easy death for sender " << i;
-      // We can do a relaxed load here because we're the only person touching
-      // this sender at this point.
-      const Index to_replace = sender->to_replace.RelaxedLoad();
-      const Index scratch_index = sender->scratch_index.Load();
-
-      // I find it easiest to think about this in terms of the set of observable
-      // states.  The main code progresses through the following states:
-
-      // 1) scratch_index = xxx
-      //    to_replace = invalid
-      // This is unambiguous.  Already good.
-
-      // 2) scratch_index = xxx
-      //    to_replace = yyy
-      // Very ambiguous.  Is xxx or yyy the correct one?  Need to either roll
-      // this forwards or backwards.
-
-      // 3) scratch_index = yyy
-      //    to_replace = yyy
-      // We are in the act of moving to_replace to scratch_index, but didn't
-      // finish.  Easy.
-
-      // 4) scratch_index = yyy
-      //    to_replace = invalid
-      // Finished, but died.  Looks like 1)
-
-      // Any cleanup code needs to follow the same set of states to be robust to
-      // death, so death can be restarted.
-
-      // Could be 2) or 3).
-      if (to_replace.valid()) {
-        // 3)
-        if (to_replace == scratch_index) {
-          // Just need to invalidate to_replace to finish.
-          sender->to_replace.Invalidate();
-
-          // And mark that we succeeded.
-          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
-          ++valid_senders;
-        }
-      } else {
-        // 1) or 4).  Make sure we aren't corrupted and declare victory.
-        CHECK(scratch_index.valid());
-
-        __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
-        ++valid_senders;
-      }
-    } else {
+    if (!(tid & FUTEX_OWNER_DIED)) {
       // Not dead.
       ++valid_senders;
+      continue;
     }
+    VLOG(3) << "Found an easy death for sender " << i;
+    // We can do a relaxed load here because we're the only person touching
+    // this sender at this point.
+    const Index to_replace = sender->to_replace.RelaxedLoad();
+    const Index scratch_index = sender->scratch_index.Load();
+
+    // I find it easiest to think about this in terms of the set of observable
+    // states.  The main code progresses through the following states:
+
+    // 1) scratch_index = xxx
+    //    to_replace = invalid
+    // This is unambiguous.  Already good.
+
+    // 2) scratch_index = xxx
+    //    to_replace = yyy
+    // Very ambiguous.  Is xxx or yyy the correct one?  Need to either roll
+    // this forwards or backwards.
+
+    // 3) scratch_index = yyy
+    //    to_replace = yyy
+    // We are in the act of moving to_replace to scratch_index, but didn't
+    // finish.  Easy.
+
+    // 4) scratch_index = yyy
+    //    to_replace = invalid
+    // Finished, but died.  Looks like 1)
+
+    // Any cleanup code needs to follow the same set of states to be robust to
+    // death, so death can be restarted.
+
+    if (!to_replace.valid()) {
+      // 1) or 4).  Make sure we aren't corrupted and declare victory.
+      CHECK(scratch_index.valid());
+
+      __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+      ++valid_senders;
+      continue;
+    }
+
+    // Could be 2) or 3) at this point.
+
+    if (to_replace == scratch_index) {
+      // 3) for sure.
+      // Just need to invalidate to_replace to finish.
+      sender->to_replace.Invalidate();
+
+      // And mark that we succeeded.
+      __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+      ++valid_senders;
+      continue;
+    }
+
+    // Must be 2). Mark it for later.
+    need_recovery[i] = true;
   }
 
   // If all the senders are (or were made) good, there is no need to do the hard
   // case.
   if (valid_senders == num_senders) {
-    return;
+    return true;
   }
 
   VLOG(3) << "Starting hard cleanup";
@@ -148,8 +158,12 @@
       const uint32_t tid =
           __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
       if (tid & FUTEX_OWNER_DIED) {
+        if (!need_recovery[i]) {
+          return false;
+        }
         ++num_missing;
       } else {
+        CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
         // We can do a relaxed load here because we're the only person touching
         // this sender at this point, if it matters. If it's not a dead sender,
         // then any message it every has will already be accounted for, so this
@@ -170,6 +184,8 @@
       }
       accounted_for[index.message_index()] = true;
     }
+
+    CHECK_LE(num_accounted_for + num_missing, num_messages);
   }
 
   while (num_missing != 0) {
@@ -178,67 +194,83 @@
       Sender *sender = memory->GetSender(i);
       const uint32_t tid =
           __atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
-      if (tid & FUTEX_OWNER_DIED) {
-        // We can do relaxed loads here because we're the only person touching
-        // this sender at this point.
-        const Index scratch_index = sender->scratch_index.RelaxedLoad();
-        const Index to_replace = sender->to_replace.RelaxedLoad();
+      if (!(tid & FUTEX_OWNER_DIED)) {
+        CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
+        continue;
+      }
+      if (!need_recovery[i]) {
+        return false;
+      }
+      // We can do relaxed loads here because we're the only person touching
+      // this sender at this point.
+      const Index scratch_index = sender->scratch_index.RelaxedLoad();
+      const Index to_replace = sender->to_replace.RelaxedLoad();
 
-        // Candidate.
-        if (to_replace.valid()) {
-          CHECK_LE(to_replace.message_index(), accounted_for.size());
-        }
-        if (scratch_index.valid()) {
-          CHECK_LE(scratch_index.message_index(), accounted_for.size());
-        }
-        if (!to_replace.valid() || accounted_for[to_replace.message_index()]) {
-          CHECK(scratch_index.valid());
-          VLOG(3) << "Sender " << i
-                  << " died, to_replace is already accounted for";
-          // If both are accounted for, we are corrupt...
-          CHECK(!accounted_for[scratch_index.message_index()]);
+      // Candidate.
+      if (to_replace.valid()) {
+        CHECK_LE(to_replace.message_index(), accounted_for.size());
+      }
+      if (scratch_index.valid()) {
+        CHECK_LE(scratch_index.message_index(), accounted_for.size());
+      }
+      if (!to_replace.valid() || accounted_for[to_replace.message_index()]) {
+        CHECK(scratch_index.valid());
+        VLOG(3) << "Sender " << i
+                << " died, to_replace is already accounted for";
+        // If both are accounted for, we are corrupt...
+        CHECK(!accounted_for[scratch_index.message_index()]);
 
-          // to_replace is already accounted for.  This means that we didn't
-          // atomically insert scratch_index into the queue yet.  So
-          // invalidate to_replace.
-          sender->to_replace.Invalidate();
+        // to_replace is already accounted for.  This means that we didn't
+        // atomically insert scratch_index into the queue yet.  So
+        // invalidate to_replace.
+        sender->to_replace.Invalidate();
 
-          // And then mark this sender clean.
-          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+        // And then mark this sender clean.
+        __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+        need_recovery[i] = false;
 
-          // And account for scratch_index.
-          accounted_for[scratch_index.message_index()] = true;
-          --num_missing;
-          ++num_accounted_for;
-        } else if (!scratch_index.valid() ||
-                   accounted_for[scratch_index.message_index()]) {
-          VLOG(3) << "Sender " << i
-                  << " died, scratch_index is already accounted for";
-          // scratch_index is accounted for.  That means we did the insert,
-          // but didn't record it.
-          CHECK(to_replace.valid());
-          // Finish the transaction.  Copy to_replace, then clear it.
+        // And account for scratch_index.
+        accounted_for[scratch_index.message_index()] = true;
+        --num_missing;
+        ++num_accounted_for;
+      } else if (!scratch_index.valid() ||
+                 accounted_for[scratch_index.message_index()]) {
+        VLOG(3) << "Sender " << i
+                << " died, scratch_index is already accounted for";
+        // scratch_index is accounted for.  That means we did the insert,
+        // but didn't record it.
+        CHECK(to_replace.valid());
+        // Finish the transaction.  Copy to_replace, then clear it.
 
-          sender->scratch_index.Store(to_replace);
-          sender->to_replace.Invalidate();
+        sender->scratch_index.Store(to_replace);
+        sender->to_replace.Invalidate();
 
-          // And then mark this sender clean.
-          __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+        // And then mark this sender clean.
+        __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+        need_recovery[i] = false;
 
-          // And account for to_replace.
-          accounted_for[to_replace.message_index()] = true;
-          --num_missing;
-          ++num_accounted_for;
-        } else {
-          VLOG(3) << "Sender " << i << " died, neither is accounted for";
-          // Ambiguous.  There will be an unambiguous one somewhere that we
-          // can do first.
-        }
+        // And account for to_replace.
+        accounted_for[to_replace.message_index()] = true;
+        --num_missing;
+        ++num_accounted_for;
+      } else {
+        VLOG(3) << "Sender " << i << " died, neither is accounted for";
+        // Ambiguous.  There will be an unambiguous one somewhere that we
+        // can do first.
       }
     }
     // CHECK that we are making progress.
     CHECK_NE(num_missing, starting_num_missing);
   }
+  return true;
+}
+
+void Cleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &lock) {
+  // The number of iterations is bounded here because there are only a finite
+  // number of senders in existence which could die, and no new ones can be
+  // created while we're in here holding the lock.
+  while (!DoCleanup(memory, lock)) {
+  }
 }
 
 // Exposes rt_tgsigqueueinfo so we can send the signal *just* to the target