Add a sent too fast check for simulation and shm
Returns an error if more than queue_size (frequency *
channel_storage_duration) messages were sent in one
channel_storage_duration.
Signed-off-by: Eric Schmiedeberg <eric.schmiedeberg@bluerivertech.com>
Change-Id: Ie41205ba37b66930d8a9082f2d85d7dc3388e3bf
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index a1ef95d..69e6638 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -151,10 +151,12 @@
class SimulatedChannel {
public:
explicit SimulatedChannel(const Channel *channel,
- std::chrono::nanoseconds channel_storage_duration)
+ std::chrono::nanoseconds channel_storage_duration,
+ const EventScheduler *scheduler)
: channel_(channel),
channel_storage_duration_(channel_storage_duration),
- next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())) {
+ next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())),
+ scheduler_(scheduler) {
available_buffer_indices_.resize(number_buffers());
for (int i = 0; i < number_buffers(); ++i) {
available_buffer_indices_[i] = i;
@@ -296,6 +298,11 @@
int sender_count_ = 0;
std::vector<uint16_t> available_buffer_indices_;
+
+ const EventScheduler *scheduler_;
+
+ // Queue of all the message send times in the last channel_storage_duration_
+ std::queue<monotonic_clock::time_point> last_times_;
};
namespace {
@@ -782,14 +789,14 @@
const Channel *channel) {
auto it = channels_->find(SimpleChannel(channel));
if (it == channels_->end()) {
- it =
- channels_
- ->emplace(
- SimpleChannel(channel),
- std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
- channel, std::chrono::nanoseconds(
- configuration()->channel_storage_duration()))))
- .first;
+ it = channels_
+ ->emplace(SimpleChannel(channel),
+ std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
+ channel,
+ std::chrono::nanoseconds(
+ configuration()->channel_storage_duration()),
+ scheduler_)))
+ .first;
}
return it->second.get();
}
@@ -930,7 +937,21 @@
std::optional<uint32_t> SimulatedChannel::Send(
std::shared_ptr<SimulatedMessage> message) {
- std::optional<uint32_t> queue_index = {next_queue_index_.index()};
+ const auto now = scheduler_->monotonic_now();
+ // Remove times that are greater than or equal to a channel_storage_duration_
+ // ago
+ while (!last_times_.empty() &&
+ (now - last_times_.front() >= channel_storage_duration_)) {
+ last_times_.pop();
+ }
+
+ // Check that we are not sending messages too fast
+ if (static_cast<int>(last_times_.size()) >= queue_size()) {
+ return std::nullopt;
+ }
+
+ const std::optional<uint32_t> queue_index = {next_queue_index_.index()};
+ last_times_.push(now);
message->context.queue_index = *queue_index;
// Points to the actual data depending on the size set in context. Data may