Teach SimulatedEventLoop to keep track of its buffers

This lets it provide indices like ShmEventLoop can.

Change-Id: I5673fb1fb140fc0eea257d3e6e4b1bc8d5f3b704
diff --git a/aos/events/pingpong.json b/aos/events/pingpong.json
index 0cdba93..db727c6 100644
--- a/aos/events/pingpong.json
+++ b/aos/events/pingpong.json
@@ -2,11 +2,13 @@
   "channels": [
     {
       "name": "/test",
-      "type": "aos.examples.Ping"
+      "type": "aos.examples.Ping",
+      "frequency": 2000
     },
     {
       "name": "/test",
-      "type": "aos.examples.Pong"
+      "type": "aos.examples.Pong",
+      "frequency": 2000
     }
   ],
   "imports": [
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index ee4e6d6..bff04b9 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -3,21 +3,38 @@
 #include <algorithm>
 #include <deque>
 #include <string_view>
+#include <vector>
 
 #include "absl/container/btree_map.h"
+#include "aos/events/aos_logging.h"
 #include "aos/events/simulated_network_bridge.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/util/phased_loop.h"
-#include "aos/events/aos_logging.h"
 
 namespace aos {
 
+class SimulatedEventLoop;
+class SimulatedFetcher;
+class SimulatedChannel;
+
+namespace {
+
 // Container for both a message, and the context for it for simulation.  This
 // makes tracking the timestamps associated with the data easy.
-struct SimulatedMessage {
+struct SimulatedMessage final {
+  SimulatedMessage(const SimulatedMessage &) = delete;
+  SimulatedMessage &operator=(const SimulatedMessage &) = delete;
+
+  // Creates a SimulatedMessage with size bytes of storage.
+  // This is a shared_ptr so we don't have to implement refcounting or copying.
+  static std::shared_ptr<SimulatedMessage> Make(SimulatedChannel *channel);
+
   // Context for the data.
   Context context;
 
+  SimulatedChannel *const channel = nullptr;
+  int buffer_index;
+
   // The data.
   char *data(size_t buffer_size) {
     return RoundChannelData(&actual_data[0], buffer_size);
@@ -26,12 +43,21 @@
   // Then the data, including padding on the end so we can align the buffer we
   // actually return from data().
   char actual_data[];
+
+ private:
+  SimulatedMessage(SimulatedChannel *channel_in);
+  ~SimulatedMessage();
+
+  static void DestroyAndFree(SimulatedMessage *p) {
+    p->~SimulatedMessage();
+    free(p);
+  }
 };
 
-class SimulatedEventLoop;
-class SimulatedFetcher;
-class SimulatedChannel;
+}  // namespace
 
+// TODO(Brian): This should be in the anonymous namespace, but that annoys GCC
+// for some reason...
 class SimulatedWatcher : public WatcherState {
  public:
   SimulatedWatcher(
@@ -65,12 +91,59 @@
 
 class SimulatedChannel {
  public:
-  explicit SimulatedChannel(const Channel *channel, EventScheduler *scheduler)
+  explicit SimulatedChannel(const Channel *channel, EventScheduler *scheduler,
+                            std::chrono::nanoseconds channel_storage_duration)
       : channel_(channel),
         scheduler_(scheduler),
-        next_queue_index_(ipc_lib::QueueIndex::Zero(channel->max_size())) {}
+        channel_storage_duration_(channel_storage_duration),
+        next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())) {
+    available_buffer_indices_.reserve(number_buffers());
+    for (int i = 0; i < number_buffers(); ++i) {
+      available_buffer_indices_.push_back(i);
+    }
+  }
 
-  ~SimulatedChannel() { CHECK_EQ(0u, fetchers_.size()); }
+  ~SimulatedChannel() {
+    latest_message_.reset();
+    CHECK_EQ(static_cast<size_t>(number_buffers()),
+             available_buffer_indices_.size());
+    CHECK_EQ(0u, fetchers_.size());
+    CHECK_EQ(0u, watchers_.size());
+    CHECK_EQ(0, sender_count_);
+  }
+
+  // The number of messages we pretend to have in the queue.
+  int queue_size() const {
+    return channel()->frequency() *
+           std::chrono::duration_cast<std::chrono::duration<double>>(
+               channel_storage_duration_)
+               .count();
+  }
+
+  // The number of extra buffers (beyond the queue) we pretend to have.
+  int number_scratch_buffers() const {
+    // We need to start creating messages before we know how many
+    // senders+readers we'll have, so we need to just pick something which is
+    // always big enough.
+    return 50;
+  }
+
+  int number_buffers() const { return queue_size() + number_scratch_buffers(); }
+
+  int GetBufferIndex() {
+    CHECK(!available_buffer_indices_.empty()) << ": This should be impossible";
+    const int result = available_buffer_indices_.back();
+    available_buffer_indices_.pop_back();
+    return result;
+  }
+
+  void FreeBufferIndex(int i) {
+    DCHECK(std::find(available_buffer_indices_.begin(),
+                     available_buffer_indices_.end(),
+                     i) == available_buffer_indices_.end())
+        << ": Buffer is not in use: " << i;
+    available_buffer_indices_.push_back(i);
+  }
 
   // Makes a connected raw sender which calls Send below.
   ::std::unique_ptr<RawSender> MakeRawSender(EventLoop *event_loop);
@@ -103,6 +176,7 @@
   const Channel *channel() const { return channel_; }
 
   void CountSenderCreated() {
+    CheckBufferCount();
     if (sender_count_ >= channel()->num_senders()) {
       LOG(FATAL) << "Failed to create sender on "
                  << configuration::CleanedChannelToString(channel())
@@ -116,7 +190,11 @@
   }
 
  private:
-  const Channel *channel_;
+  void CheckBufferCount() { CHECK_LT(sender_count_, number_scratch_buffers()); }
+
+  const Channel *const channel_;
+  EventScheduler *const scheduler_;
+  const std::chrono::nanoseconds channel_storage_duration_;
 
   // List of all watchers.
   ::std::vector<SimulatedWatcher *> watchers_;
@@ -124,24 +202,36 @@
   // List of all fetchers.
   ::std::vector<SimulatedFetcher *> fetchers_;
   std::shared_ptr<SimulatedMessage> latest_message_;
-  EventScheduler *scheduler_;
 
   ipc_lib::QueueIndex next_queue_index_;
 
   int sender_count_ = 0;
+
+  std::vector<uint16_t> available_buffer_indices_;
 };
 
 namespace {
 
-// Creates a SimulatedMessage with size bytes of storage.
-// This is a shared_ptr so we don't have to implement refcounting or copying.
-std::shared_ptr<SimulatedMessage> MakeSimulatedMessage(size_t size) {
-  SimulatedMessage *message = reinterpret_cast<SimulatedMessage *>(
+std::shared_ptr<SimulatedMessage> SimulatedMessage::Make(
+    SimulatedChannel *channel) {
+  const size_t size = channel->max_size();
+  SimulatedMessage *const message = reinterpret_cast<SimulatedMessage *>(
       malloc(sizeof(SimulatedMessage) + size + kChannelDataAlignment - 1));
+  new (message) SimulatedMessage(channel);
   message->context.size = size;
   message->context.data = message->data(size);
 
-  return std::shared_ptr<SimulatedMessage>(message, free);
+  return std::shared_ptr<SimulatedMessage>(message,
+                                           &SimulatedMessage::DestroyAndFree);
+}
+
+SimulatedMessage::SimulatedMessage(SimulatedChannel *channel_in)
+    : channel(channel_in) {
+  buffer_index = channel->GetBufferIndex();
+}
+
+SimulatedMessage::~SimulatedMessage() {
+  channel->FreeBufferIndex(buffer_index);
 }
 
 class SimulatedSender : public RawSender {
@@ -156,7 +246,7 @@
 
   void *data() override {
     if (!message_) {
-      message_ = MakeSimulatedMessage(simulated_channel_->max_size());
+      message_ = SimulatedMessage::Make(simulated_channel_);
     }
     return message_->data(simulated_channel_->max_size());
   }
@@ -196,7 +286,7 @@
 
     // This is wasteful, but since flatbuffers fill from the back end of the
     // queue, we need it to be full sized.
-    message_ = MakeSimulatedMessage(simulated_channel_->max_size());
+    message_ = SimulatedMessage::Make(simulated_channel_);
 
     // Now fill in the message.  size is already populated above, and
     // queue_index will be populated in simulated_channel_.  Put this at the
@@ -230,6 +320,8 @@
       return std::make_pair(false, monotonic_clock::min_time);
     }
 
+    CHECK(!fell_behind_) << ": Got behind";
+
     SetMsg(msgs_.front());
     msgs_.pop_front();
     return std::make_pair(true, event_loop()->monotonic_now());
@@ -251,6 +343,7 @@
     // latest message from before we started.
     SetMsg(msgs_.back());
     msgs_.clear();
+    fell_behind_ = false;
     return std::make_pair(true, event_loop()->monotonic_now());
   }
 
@@ -275,6 +368,14 @@
   // Internal method for Simulation to add a message to the buffer.
   void Enqueue(std::shared_ptr<SimulatedMessage> buffer) {
     msgs_.emplace_back(buffer);
+    if (fell_behind_ ||
+        msgs_.size() > static_cast<size_t>(simulated_channel_->queue_size())) {
+      fell_behind_ = true;
+      // Might as well empty out all the intermediate messages now.
+      while (msgs_.size() > 1) {
+        msgs_.pop_front();
+      }
+    }
   }
 
   SimulatedChannel *simulated_channel_;
@@ -282,6 +383,9 @@
 
   // Messages queued up but not in use.
   ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
+
+  // Whether we're currently "behind", which means a FetchNext call will fail.
+  bool fell_behind_ = false;
 };
 
 class SimulatedTimerHandler : public TimerHandler {
@@ -332,8 +436,7 @@
 class SimulatedEventLoop : public EventLoop {
  public:
   explicit SimulatedEventLoop(
-      EventScheduler *scheduler,
-      NodeEventLoopFactory *node_event_loop_factory,
+      EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
       absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
           *channels,
       const Configuration *configuration,
@@ -532,8 +635,10 @@
   if (it == channels_->end()) {
     it = channels_
              ->emplace(SimpleChannel(channel),
-                       std::unique_ptr<SimulatedChannel>(
-                           new SimulatedChannel(channel, scheduler_)))
+                       std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
+                           channel, scheduler_,
+                           std::chrono::nanoseconds(
+                               configuration()->channel_storage_duration()))))
              .first;
   }
   return it->second.get();