fixed some queue bugs when ReadMessageIndex fell behind
There were bugs with detecting when it needed to do something special
and what it did about it.
This includes adding an assert to catch when it tries reading from a
freed message rather than failing when somebody attempts to double-free
it later and a test for doing it wrong.
This was what made running log_streamer and binary_log_writer at
the same time fail.
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
index 8a86b7f..6c8b046 100644
--- a/aos/linux_code/ipc_lib/queue.cc
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -6,6 +6,7 @@
#include <assert.h>
#include <memory>
+#include <algorithm>
#include "aos/common/logging/logging.h"
#include "aos/common/type_traits.h"
@@ -319,7 +320,8 @@
}
}
if (kReadDebug) {
- printf("queue: %p->read start=%d end=%d\n", this, data_start_, data_end_);
+ printf("queue: %p->read(%p) start=%d end=%d\n", this, index, data_start_,
+ data_end_);
}
return true;
}
@@ -390,7 +392,6 @@
printf("queue: %p reading from d2: %d\n", this, data_start_);
}
msg = data_[data_start_];
- // TODO(brians): Doesn't this need to increment the ref count?
data_start_ = (data_start_ + 1) % data_length_;
}
}
@@ -419,20 +420,23 @@
// TODO(parker): Handle integer wrap on the index.
- // How many unread messages we have.
- const int offset = messages_ - *index;
// Where we're going to start reading.
- int my_start = data_end_ - offset;
- if (my_start < 0) { // If we want to read off the end of the buffer.
- // Unwrap it.
- my_start += data_length_;
- }
- if (offset >= data_length_) { // If we're behind the available messages.
+ int my_start;
+
+ const int unread_messages = messages_ - *index;
+ const int current_messages = ::std::abs(data_start_ - data_end_);
+ if (unread_messages > current_messages) { // If we're behind the available messages.
// Catch index up to the last available message.
- *index += data_start_ - my_start;
+ *index = messages_ - current_messages;
// And that's the one we're going to read.
my_start = data_start_;
+ } else {
+ // Just start reading at the first available message that we haven't yet
+ // read.
+ my_start = (data_start_ + unread_messages - 1) % data_length_;
}
+
+
if (options & kPeek) {
msg = ReadPeek(options, my_start);
} else {
@@ -453,6 +457,10 @@
if (kReadDebug) {
printf("queue: %p reading from d1: %d\n", this, my_start);
}
+ // This assert checks that we're either within both endpoints (duh) or
+ // outside of both of them (if the queue is wrapped around).
+ assert((my_start >= data_start_ && my_start < data_end_) ||
+ (my_start > data_end_ && my_start <= data_start_));
msg = data_[my_start];
++(*index);
}
diff --git a/aos/linux_code/ipc_lib/queue.h b/aos/linux_code/ipc_lib/queue.h
index a58b65e..4d00cde 100644
--- a/aos/linux_code/ipc_lib/queue.h
+++ b/aos/linux_code/ipc_lib/queue.h
@@ -79,7 +79,7 @@
// Writes a message into the queue.
// This function takes ownership of msg.
// NOTE: msg must point to a valid message from this queue
- // Returns truen on success.
+ // Returns true on success.
bool WriteMessage(void *msg, int options);
// Reads a message out of the queue.
diff --git a/aos/linux_code/ipc_lib/queue_test.cc b/aos/linux_code/ipc_lib/queue_test.cc
index 0a982cc..65527c8 100644
--- a/aos/linux_code/ipc_lib/queue_test.cc
+++ b/aos/linux_code/ipc_lib/queue_test.cc
@@ -381,6 +381,31 @@
// TODO(brians) finish this
}
+// There used to be a bug where reading first without an index and then with an
+// index would crash. This test makes sure that's fixed.
+TEST_F(QueueTest, ReadIndexAndNot) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
+
+ // Write a message, read it (with ReadMessage), and then write another
+ // message (before freeing the read one so the queue allocates a distinct
+ // message to use for it).
+ TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
+ ASSERT_NE(nullptr, msg);
+ ASSERT_TRUE(queue->WriteMessage(msg, 0));
+ const void *read_msg = queue->ReadMessage(0);
+ EXPECT_NE(nullptr, read_msg);
+ msg = static_cast<TestMessage *>(queue->GetMessage());
+ queue->FreeMessage(read_msg);
+ ASSERT_NE(nullptr, msg);
+ ASSERT_TRUE(queue->WriteMessage(msg, 0));
+
+ int index = 0;
+ const void *second_read_msg = queue->ReadMessageIndex(0, &index);
+ EXPECT_NE(nullptr, second_read_msg);
+ EXPECT_NE(read_msg, second_read_msg)
+ << "We already took that message out of the queue.";
+}
+
TEST_F(QueueTest, Recycle) {
// TODO(brians) basic test of recycle queue
// include all of the ways a message can get into the recycle queue