Add a sent too fast check for simulation and shm
Returns an error if more than queue_size (frequency *
channel_storage_duration) messages were sent in one
channel_storage_duration.
Signed-off-by: Eric Schmiedeberg <eric.schmiedeberg@bluerivertech.com>
Change-Id: Ie41205ba37b66930d8a9082f2d85d7dc3388e3bf
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index fe86f6c..5f12423 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -4,6 +4,7 @@
#include <sys/types.h>
#include <syscall.h>
#include <unistd.h>
+
#include <algorithm>
#include <iomanip>
#include <iostream>
@@ -503,7 +504,7 @@
bool bad = false;
- for (size_t i = 0; i < redzone.size(); ++i) {
+ for (size_t i = 0; i < redzone.size() && !bad; ++i) {
if (memcmp(&redzone[i], &redzone_value, 1)) {
bad = true;
}
@@ -603,6 +604,7 @@
Message *const message =
memory->GetMessage(Index(QueueIndex::Zero(memory->queue_size()), i));
message->header.queue_index.Invalidate();
+ message->header.monotonic_sent_time = monotonic_clock::min_time;
FillRedzone(memory, message->PreRedzone(memory->message_data_size()));
FillRedzone(memory, message->PostRedzone(memory->message_data_size(),
memory->message_size()));
@@ -831,8 +833,16 @@
return count;
}
-LocklessQueueSender::LocklessQueueSender(LocklessQueueMemory *memory)
- : memory_(memory) {
+std::ostream &operator<<(std::ostream &os,
+ const LocklessQueueSender::Result r) {
+ os << static_cast<int>(r);
+ return os;
+}
+
+LocklessQueueSender::LocklessQueueSender(
+ LocklessQueueMemory *memory,
+ monotonic_clock::duration channel_storage_duration)
+ : memory_(memory), channel_storage_duration_(channel_storage_duration) {
GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
// Since we already have the lock, go ahead and try cleaning up.
@@ -877,9 +887,9 @@
}
std::optional<LocklessQueueSender> LocklessQueueSender::Make(
- LocklessQueue queue) {
+ LocklessQueue queue, monotonic_clock::duration channel_storage_duration) {
queue.Initialize();
- LocklessQueueSender result(queue.memory());
+ LocklessQueueSender result(queue.memory(), channel_storage_duration);
if (result.sender_index_ != -1) {
return std::move(result);
} else {
@@ -904,7 +914,7 @@
return message->data(memory_->message_data_size());
}
-bool LocklessQueueSender::Send(
+LocklessQueueSender::Result LocklessQueueSender::Send(
const char *data, size_t length,
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
@@ -921,7 +931,7 @@
realtime_sent_time, queue_index);
}
-bool LocklessQueueSender::Send(
+LocklessQueueSender::Result LocklessQueueSender::Send(
size_t length, monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index, const UUID &source_boot_uuid,
@@ -936,7 +946,7 @@
const Index scratch_index = sender->scratch_index.RelaxedLoad();
Message *const message = memory_->GetMessage(scratch_index);
if (CheckBothRedzones(memory_, message)) {
- return false;
+ return Result::INVALID_REDZONE;
}
// We should have invalidated this when we first got the buffer. Verify that
@@ -995,11 +1005,14 @@
// This is just a best-effort check to skip reading the clocks if possible.
// If this fails, then the compare-exchange below definitely would, so we
// can bail out now.
+ const Message *message_to_replace = memory_->GetMessage(to_replace);
+ bool is_previous_index_valid = false;
{
const QueueIndex previous_index =
- memory_->GetMessage(to_replace)
- ->header.queue_index.RelaxedLoad(queue_size);
- if (previous_index != decremented_queue_index && previous_index.valid()) {
+ message_to_replace->header.queue_index.RelaxedLoad(queue_size);
+ is_previous_index_valid = previous_index.valid();
+ if (previous_index != decremented_queue_index &&
+ is_previous_index_valid) {
// Retry.
VLOG(3) << "Something fishy happened, queue index doesn't match. "
"Retrying. Previous index was "
@@ -1011,6 +1024,7 @@
message->header.monotonic_sent_time = ::aos::monotonic_clock::now();
message->header.realtime_sent_time = ::aos::realtime_clock::now();
+
if (monotonic_sent_time != nullptr) {
*monotonic_sent_time = message->header.monotonic_sent_time;
}
@@ -1021,8 +1035,48 @@
*queue_index = next_queue_index.index();
}
+ const auto to_replace_monotonic_sent_time =
+ message_to_replace->header.monotonic_sent_time;
+
+ // If we are overwriting a message sent in the last
+ // channel_storage_duration_, that means that we would be sending more than
+ // queue_size messages and would therefore be sending too fast. If the
+ // previous index is not valid then the message hasn't been filled out yet
+ // so we aren't sending too fast. And, if it is not less than the sent time
+ // of the message that we are going to write, someone else beat us and the
+ // compare and exchange below will fail.
+ if (is_previous_index_valid &&
+ (to_replace_monotonic_sent_time <
+ message->header.monotonic_sent_time) &&
+ (message->header.monotonic_sent_time - to_replace_monotonic_sent_time <
+ channel_storage_duration_)) {
+ // There is a possibility that another context beat us to writing out the
+ // message in the queue, but we beat that context to acquiring the sent
+ // time. In this case our sent time is *greater than* the other context's
+ // sent time. Therefore, we can check if we got beat filling out this
+ // message *after* doing the above check to determine if we hit this edge
+ // case. Otherwise, messages are being sent too fast.
+ const QueueIndex previous_index =
+ message_to_replace->header.queue_index.Load(queue_size);
+ if (previous_index != decremented_queue_index && previous_index.valid()) {
+ VLOG(3) << "Got beat during check for messages being sent too fast"
+ "Retrying.";
+ continue;
+ } else {
+ VLOG(3) << "Messages sent too fast. Returning. Attempted index: "
+ << decremented_queue_index.index()
+ << " message sent time: " << message->header.monotonic_sent_time
+ << " message to replace sent time: "
+ << to_replace_monotonic_sent_time;
+ // Since we are not using the message obtained from scratch_index
+ // and we are not retrying, we need to invalidate its queue_index.
+ message->header.queue_index.Invalidate();
+ return Result::MESSAGES_SENT_TOO_FAST;
+ }
+ }
+
// Before we are fully done filling out the message, update the Sender state
- // with the new index to write. This re-uses the barrier for the
+ // with the new index to write. This re-uses the barrier for the
// queue_index store.
const Index index_to_write(next_queue_index, scratch_index.message_index());
@@ -1085,7 +1139,7 @@
// If anybody is looking at this message (they shouldn't be), then try telling
// them about it (best-effort).
memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
- return true;
+ return Result::GOOD;
}
int LocklessQueueSender::buffer_index() const {