Teach SimulatedEventLoop to keep track of its buffers
This lets it provide indices like ShmEventLoop can.
Change-Id: I5673fb1fb140fc0eea257d3e6e4b1bc8d5f3b704
diff --git a/aos/events/pingpong.json b/aos/events/pingpong.json
index 0cdba93..db727c6 100644
--- a/aos/events/pingpong.json
+++ b/aos/events/pingpong.json
@@ -2,11 +2,13 @@
"channels": [
{
"name": "/test",
- "type": "aos.examples.Ping"
+ "type": "aos.examples.Ping",
+ "frequency": 2000
},
{
"name": "/test",
- "type": "aos.examples.Pong"
+ "type": "aos.examples.Pong",
+ "frequency": 2000
}
],
"imports": [
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index ee4e6d6..bff04b9 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -3,21 +3,38 @@
#include <algorithm>
#include <deque>
#include <string_view>
+#include <vector>
#include "absl/container/btree_map.h"
+#include "aos/events/aos_logging.h"
#include "aos/events/simulated_network_bridge.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/util/phased_loop.h"
-#include "aos/events/aos_logging.h"
namespace aos {
+class SimulatedEventLoop;
+class SimulatedFetcher;
+class SimulatedChannel;
+
+namespace {
+
// Container for both a message, and the context for it for simulation. This
// makes tracking the timestamps associated with the data easy.
-struct SimulatedMessage {
+struct SimulatedMessage final {
+ SimulatedMessage(const SimulatedMessage &) = delete;
+ SimulatedMessage &operator=(const SimulatedMessage &) = delete;
+
+ // Creates a SimulatedMessage with size bytes of storage.
+ // This is a shared_ptr so we don't have to implement refcounting or copying.
+ static std::shared_ptr<SimulatedMessage> Make(SimulatedChannel *channel);
+
// Context for the data.
Context context;
+ SimulatedChannel *const channel = nullptr;
+ int buffer_index;
+
// The data.
char *data(size_t buffer_size) {
return RoundChannelData(&actual_data[0], buffer_size);
@@ -26,12 +43,21 @@
// Then the data, including padding on the end so we can align the buffer we
// actually return from data().
char actual_data[];
+
+ private:
+ SimulatedMessage(SimulatedChannel *channel_in);
+ ~SimulatedMessage();
+
+ static void DestroyAndFree(SimulatedMessage *p) {
+ p->~SimulatedMessage();
+ free(p);
+ }
};
-class SimulatedEventLoop;
-class SimulatedFetcher;
-class SimulatedChannel;
+} // namespace
+// TODO(Brian): This should be in the anonymous namespace, but that annoys GCC
+// for some reason...
class SimulatedWatcher : public WatcherState {
public:
SimulatedWatcher(
@@ -65,12 +91,59 @@
class SimulatedChannel {
public:
- explicit SimulatedChannel(const Channel *channel, EventScheduler *scheduler)
+ explicit SimulatedChannel(const Channel *channel, EventScheduler *scheduler,
+ std::chrono::nanoseconds channel_storage_duration)
: channel_(channel),
scheduler_(scheduler),
- next_queue_index_(ipc_lib::QueueIndex::Zero(channel->max_size())) {}
+ channel_storage_duration_(channel_storage_duration),
+ next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())) {
+ available_buffer_indices_.reserve(number_buffers());
+ for (int i = 0; i < number_buffers(); ++i) {
+ available_buffer_indices_.push_back(i);
+ }
+ }
- ~SimulatedChannel() { CHECK_EQ(0u, fetchers_.size()); }
+ ~SimulatedChannel() {
+ latest_message_.reset();
+ CHECK_EQ(static_cast<size_t>(number_buffers()),
+ available_buffer_indices_.size());
+ CHECK_EQ(0u, fetchers_.size());
+ CHECK_EQ(0u, watchers_.size());
+ CHECK_EQ(0, sender_count_);
+ }
+
+ // The number of messages we pretend to have in the queue.
+ int queue_size() const {
+ return channel()->frequency() *
+ std::chrono::duration_cast<std::chrono::duration<double>>(
+ channel_storage_duration_)
+ .count();
+ }
+
+ // The number of extra buffers (beyond the queue) we pretend to have.
+ int number_scratch_buffers() const {
+ // We need to start creating messages before we know how many
+ // senders+readers we'll have, so we need to just pick something which is
+ // always big enough.
+ return 50;
+ }
+
+ int number_buffers() const { return queue_size() + number_scratch_buffers(); }
+
+ int GetBufferIndex() {
+ CHECK(!available_buffer_indices_.empty()) << ": This should be impossible";
+ const int result = available_buffer_indices_.back();
+ available_buffer_indices_.pop_back();
+ return result;
+ }
+
+ void FreeBufferIndex(int i) {
+ DCHECK(std::find(available_buffer_indices_.begin(),
+ available_buffer_indices_.end(),
+ i) == available_buffer_indices_.end())
+ << ": Buffer is not in use: " << i;
+ available_buffer_indices_.push_back(i);
+ }
// Makes a connected raw sender which calls Send below.
::std::unique_ptr<RawSender> MakeRawSender(EventLoop *event_loop);
@@ -103,6 +176,7 @@
const Channel *channel() const { return channel_; }
void CountSenderCreated() {
+ CheckBufferCount();
if (sender_count_ >= channel()->num_senders()) {
LOG(FATAL) << "Failed to create sender on "
<< configuration::CleanedChannelToString(channel())
@@ -116,7 +190,11 @@
}
private:
- const Channel *channel_;
+ void CheckBufferCount() { CHECK_LT(sender_count_, number_scratch_buffers()); }
+
+ const Channel *const channel_;
+ EventScheduler *const scheduler_;
+ const std::chrono::nanoseconds channel_storage_duration_;
// List of all watchers.
::std::vector<SimulatedWatcher *> watchers_;
@@ -124,24 +202,36 @@
// List of all fetchers.
::std::vector<SimulatedFetcher *> fetchers_;
std::shared_ptr<SimulatedMessage> latest_message_;
- EventScheduler *scheduler_;
ipc_lib::QueueIndex next_queue_index_;
int sender_count_ = 0;
+
+ std::vector<uint16_t> available_buffer_indices_;
};
namespace {
-// Creates a SimulatedMessage with size bytes of storage.
-// This is a shared_ptr so we don't have to implement refcounting or copying.
-std::shared_ptr<SimulatedMessage> MakeSimulatedMessage(size_t size) {
- SimulatedMessage *message = reinterpret_cast<SimulatedMessage *>(
+std::shared_ptr<SimulatedMessage> SimulatedMessage::Make(
+ SimulatedChannel *channel) {
+ const size_t size = channel->max_size();
+ SimulatedMessage *const message = reinterpret_cast<SimulatedMessage *>(
malloc(sizeof(SimulatedMessage) + size + kChannelDataAlignment - 1));
+ new (message) SimulatedMessage(channel);
message->context.size = size;
message->context.data = message->data(size);
- return std::shared_ptr<SimulatedMessage>(message, free);
+ return std::shared_ptr<SimulatedMessage>(message,
+ &SimulatedMessage::DestroyAndFree);
+}
+
+SimulatedMessage::SimulatedMessage(SimulatedChannel *channel_in)
+ : channel(channel_in) {
+ buffer_index = channel->GetBufferIndex();
+}
+
+SimulatedMessage::~SimulatedMessage() {
+ channel->FreeBufferIndex(buffer_index);
}
class SimulatedSender : public RawSender {
@@ -156,7 +246,7 @@
void *data() override {
if (!message_) {
- message_ = MakeSimulatedMessage(simulated_channel_->max_size());
+ message_ = SimulatedMessage::Make(simulated_channel_);
}
return message_->data(simulated_channel_->max_size());
}
@@ -196,7 +286,7 @@
// This is wasteful, but since flatbuffers fill from the back end of the
// queue, we need it to be full sized.
- message_ = MakeSimulatedMessage(simulated_channel_->max_size());
+ message_ = SimulatedMessage::Make(simulated_channel_);
// Now fill in the message. size is already populated above, and
// queue_index will be populated in simulated_channel_. Put this at the
@@ -230,6 +320,8 @@
return std::make_pair(false, monotonic_clock::min_time);
}
+ CHECK(!fell_behind_) << ": Got behind";
+
SetMsg(msgs_.front());
msgs_.pop_front();
return std::make_pair(true, event_loop()->monotonic_now());
@@ -251,6 +343,7 @@
// latest message from before we started.
SetMsg(msgs_.back());
msgs_.clear();
+ fell_behind_ = false;
return std::make_pair(true, event_loop()->monotonic_now());
}
@@ -275,6 +368,14 @@
// Internal method for Simulation to add a message to the buffer.
void Enqueue(std::shared_ptr<SimulatedMessage> buffer) {
msgs_.emplace_back(buffer);
+ if (fell_behind_ ||
+ msgs_.size() > static_cast<size_t>(simulated_channel_->queue_size())) {
+ fell_behind_ = true;
+ // Might as well empty out all the intermediate messages now.
+ while (msgs_.size() > 1) {
+ msgs_.pop_front();
+ }
+ }
}
SimulatedChannel *simulated_channel_;
@@ -282,6 +383,9 @@
// Messages queued up but not in use.
::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
+
+ // Whether we're currently "behind", which means a FetchNext call will fail.
+ bool fell_behind_ = false;
};
class SimulatedTimerHandler : public TimerHandler {
@@ -332,8 +436,7 @@
class SimulatedEventLoop : public EventLoop {
public:
explicit SimulatedEventLoop(
- EventScheduler *scheduler,
- NodeEventLoopFactory *node_event_loop_factory,
+ EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
*channels,
const Configuration *configuration,
@@ -532,8 +635,10 @@
if (it == channels_->end()) {
it = channels_
->emplace(SimpleChannel(channel),
- std::unique_ptr<SimulatedChannel>(
- new SimulatedChannel(channel, scheduler_)))
+ std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
+ channel, scheduler_,
+ std::chrono::nanoseconds(
+ configuration()->channel_storage_duration()))))
.first;
}
return it->second.get();