Merge changes Iead5bb1f,I5673fb1f,I6bc0937b
* changes:
Fix Recovery when senders die while it's in progress
Teach SimulatedEventLoop to keep track of its buffers
Refactor SimpleShmFetcher
diff --git a/aos/events/pingpong.json b/aos/events/pingpong.json
index 0cdba93..db727c6 100644
--- a/aos/events/pingpong.json
+++ b/aos/events/pingpong.json
@@ -2,11 +2,13 @@
"channels": [
{
"name": "/test",
- "type": "aos.examples.Ping"
+ "type": "aos.examples.Ping",
+ "frequency": 2000
},
{
"name": "/test",
- "type": "aos.examples.Pong"
+ "type": "aos.examples.Pong",
+ "frequency": 2000
}
],
"imports": [
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 2383050..d2c9112 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -190,8 +190,7 @@
class SimpleShmFetcher {
public:
- explicit SimpleShmFetcher(ShmEventLoop *event_loop, const Channel *channel,
- bool copy_data)
+ explicit SimpleShmFetcher(ShmEventLoop *event_loop, const Channel *channel)
: event_loop_(event_loop),
channel_(channel),
lockless_queue_memory_(
@@ -200,10 +199,6 @@
event_loop->configuration()->channel_storage_duration()))),
lockless_queue_(lockless_queue_memory_.memory(),
lockless_queue_memory_.config()) {
- if (copy_data) {
- data_storage_.reset(static_cast<char *>(
- malloc(channel->max_size() + kChannelDataAlignment - 1)));
- }
context_.data = nullptr;
// Point the queue index at the next index to read starting now. This
// makes it such that FetchNext will read the next message sent after
@@ -213,6 +208,13 @@
~SimpleShmFetcher() {}
+ // Sets this object to copy data out of the shared memory into a private
+ // buffer when fetching.
+ void CopyDataOnFetch() {
+ data_storage_.reset(static_cast<char *>(
+ malloc(channel_->max_size() + kChannelDataAlignment - 1)));
+ }
+
// Points the next message to fetch at the queue index which will be
// populated next.
void PointAtNextQueueIndex() {
@@ -228,47 +230,8 @@
}
bool FetchNext() {
- // TODO(austin): Get behind and make sure it dies both here and with
- // Fetch.
- ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
- actual_queue_index_.index(), &context_.monotonic_event_time,
- &context_.realtime_event_time, &context_.monotonic_remote_time,
- &context_.realtime_remote_time, &context_.remote_queue_index,
- &context_.size, data_storage_start());
- if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
- context_.queue_index = actual_queue_index_.index();
- if (context_.remote_queue_index == 0xffffffffu) {
- context_.remote_queue_index = context_.queue_index;
- }
- if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
- context_.monotonic_remote_time = context_.monotonic_event_time;
- }
- if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
- context_.realtime_remote_time = context_.realtime_event_time;
- }
- if (copy_data()) {
- context_.data = data_storage_start() +
- lockless_queue_.message_data_size() - context_.size;
- } else {
- context_.data = nullptr;
- }
- actual_queue_index_ = actual_queue_index_.Increment();
- }
-
- // Make sure the data wasn't modified while we were reading it. This
- // can only happen if you are reading the last message *while* it is
- // being written to, which means you are pretty far behind.
- CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
- << ": Got behind while reading and the last message was modified "
- "out from under us while we were reading it. Don't get so far "
- "behind. "
- << configuration::CleanedChannelToString(channel_);
-
- if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
- event_loop_->SendTimingReport();
- LOG(FATAL) << "The next message is no longer available. "
- << configuration::CleanedChannelToString(channel_);
- }
+ const ipc_lib::LocklessQueue::ReadResult read_result =
+ DoFetch(actual_queue_index_);
return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
}
@@ -287,53 +250,12 @@
return false;
}
- ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
- queue_index.index(), &context_.monotonic_event_time,
- &context_.realtime_event_time, &context_.monotonic_remote_time,
- &context_.realtime_remote_time, &context_.remote_queue_index,
- &context_.size, data_storage_start());
- if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
- context_.queue_index = queue_index.index();
- if (context_.remote_queue_index == 0xffffffffu) {
- context_.remote_queue_index = context_.queue_index;
- }
- if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
- context_.monotonic_remote_time = context_.monotonic_event_time;
- }
- if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
- context_.realtime_remote_time = context_.realtime_event_time;
- }
- if (copy_data()) {
- context_.data = data_storage_start() +
- lockless_queue_.message_data_size() - context_.size;
- } else {
- context_.data = nullptr;
- }
- actual_queue_index_ = queue_index.Increment();
- }
-
- // Make sure the data wasn't modified while we were reading it. This
- // can only happen if you are reading the last message *while* it is
- // being written to, which means you are pretty far behind.
- CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
- << ": Got behind while reading and the last message was modified "
- "out from under us while we were reading it. Don't get so far "
- "behind."
- << configuration::CleanedChannelToString(channel_);
+ const ipc_lib::LocklessQueue::ReadResult read_result = DoFetch(queue_index);
CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
<< ": Queue index went backwards. This should never happen. "
<< configuration::CleanedChannelToString(channel_);
- // We fell behind between when we read the index and read the value.
- // This isn't worth recovering from since this means we went to sleep
- // for a long time in the middle of this function.
- if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
- event_loop_->SendTimingReport();
- LOG(FATAL) << "The next message is no longer available. "
- << configuration::CleanedChannelToString(channel_);
- }
-
return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
}
@@ -350,17 +272,83 @@
}
absl::Span<char> GetPrivateMemory() const {
- CHECK(copy_data());
+ // Can't usefully expose this for pinning, because the buffer changes
+ // address for each message. Callers who want to work with that should just
+ // grab the whole shared memory buffer instead.
return absl::Span<char>(
const_cast<SimpleShmFetcher *>(this)->data_storage_start(),
lockless_queue_.message_data_size());
}
private:
- char *data_storage_start() {
- if (!copy_data()) return nullptr;
+ ipc_lib::LocklessQueue::ReadResult DoFetch(ipc_lib::QueueIndex queue_index) {
+ // TODO(austin): Get behind and make sure it dies.
+ char *copy_buffer = nullptr;
+ if (copy_data()) {
+ copy_buffer = data_storage_start();
+ }
+ ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
+ queue_index.index(), &context_.monotonic_event_time,
+ &context_.realtime_event_time, &context_.monotonic_remote_time,
+ &context_.realtime_remote_time, &context_.remote_queue_index,
+ &context_.size, copy_buffer);
+
+ if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+ context_.queue_index = queue_index.index();
+ if (context_.remote_queue_index == 0xffffffffu) {
+ context_.remote_queue_index = context_.queue_index;
+ }
+ if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
+ context_.monotonic_remote_time = context_.monotonic_event_time;
+ }
+ if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
+ context_.realtime_remote_time = context_.realtime_event_time;
+ }
+ const char *const data = DataBuffer();
+ if (data) {
+ context_.data =
+ data + lockless_queue_.message_data_size() - context_.size;
+ } else {
+ context_.data = nullptr;
+ }
+ actual_queue_index_ = queue_index.Increment();
+ }
+
+ // Make sure the data wasn't modified while we were reading it. This
+ // can only happen if you are reading the last message *while* it is
+ // being written to, which means you are pretty far behind.
+ CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+ << ": Got behind while reading and the last message was modified "
+ "out from under us while we were reading it. Don't get so far "
+ "behind on: "
+ << configuration::CleanedChannelToString(channel_);
+
+ // We fell behind between when we read the index and read the value.
+ // This isn't worth recovering from since this means we went to sleep
+ // for a long time in the middle of this function.
+ if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
+ event_loop_->SendTimingReport();
+ LOG(FATAL) << "The next message is no longer available. "
+ << configuration::CleanedChannelToString(channel_);
+ }
+
+ return read_result;
+ }
+
+ char *data_storage_start() const {
+ CHECK(copy_data());
return RoundChannelData(data_storage_.get(), channel_->max_size());
}
+
+ // Note that for some modes the return value will change as new messages are
+ // read.
+ const char *DataBuffer() const {
+ if (copy_data()) {
+ return data_storage_start();
+ }
+ return nullptr;
+ }
+
bool copy_data() const { return static_cast<bool>(data_storage_); }
aos::ShmEventLoop *event_loop_;
@@ -381,7 +369,9 @@
public:
explicit ShmFetcher(ShmEventLoop *event_loop, const Channel *channel)
: RawFetcher(event_loop, channel),
- simple_shm_fetcher_(event_loop, channel, true) {}
+ simple_shm_fetcher_(event_loop, channel) {
+ simple_shm_fetcher_.CopyDataOnFetch();
+ }
~ShmFetcher() { context_.data = nullptr; }
@@ -487,7 +477,11 @@
: WatcherState(event_loop, channel, std::move(fn)),
event_loop_(event_loop),
event_(this),
- simple_shm_fetcher_(event_loop, channel, copy_data) {}
+ simple_shm_fetcher_(event_loop, channel) {
+ if (copy_data) {
+ simple_shm_fetcher_.CopyDataOnFetch();
+ }
+ }
~ShmWatcherState() override { event_loop_->RemoveEvent(&event_); }
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index ee4e6d6..bff04b9 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -3,21 +3,38 @@
#include <algorithm>
#include <deque>
#include <string_view>
+#include <vector>
#include "absl/container/btree_map.h"
+#include "aos/events/aos_logging.h"
#include "aos/events/simulated_network_bridge.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/util/phased_loop.h"
-#include "aos/events/aos_logging.h"
namespace aos {
+class SimulatedEventLoop;
+class SimulatedFetcher;
+class SimulatedChannel;
+
+namespace {
+
// Container for both a message, and the context for it for simulation. This
// makes tracking the timestamps associated with the data easy.
-struct SimulatedMessage {
+struct SimulatedMessage final {
+ SimulatedMessage(const SimulatedMessage &) = delete;
+ SimulatedMessage &operator=(const SimulatedMessage &) = delete;
+
+ // Creates a SimulatedMessage with size bytes of storage.
+ // This is a shared_ptr so we don't have to implement refcounting or copying.
+ static std::shared_ptr<SimulatedMessage> Make(SimulatedChannel *channel);
+
// Context for the data.
Context context;
+ SimulatedChannel *const channel = nullptr;
+ int buffer_index;
+
// The data.
char *data(size_t buffer_size) {
return RoundChannelData(&actual_data[0], buffer_size);
@@ -26,12 +43,21 @@
// Then the data, including padding on the end so we can align the buffer we
// actually return from data().
char actual_data[];
+
+ private:
+ SimulatedMessage(SimulatedChannel *channel_in);
+ ~SimulatedMessage();
+
+ static void DestroyAndFree(SimulatedMessage *p) {
+ p->~SimulatedMessage();
+ free(p);
+ }
};
-class SimulatedEventLoop;
-class SimulatedFetcher;
-class SimulatedChannel;
+} // namespace
+// TODO(Brian): This should be in the anonymous namespace, but that annoys GCC
+// for some reason...
class SimulatedWatcher : public WatcherState {
public:
SimulatedWatcher(
@@ -65,12 +91,59 @@
class SimulatedChannel {
public:
- explicit SimulatedChannel(const Channel *channel, EventScheduler *scheduler)
+ explicit SimulatedChannel(const Channel *channel, EventScheduler *scheduler,
+ std::chrono::nanoseconds channel_storage_duration)
: channel_(channel),
scheduler_(scheduler),
- next_queue_index_(ipc_lib::QueueIndex::Zero(channel->max_size())) {}
+ channel_storage_duration_(channel_storage_duration),
+ next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())) {
+ available_buffer_indices_.reserve(number_buffers());
+ for (int i = 0; i < number_buffers(); ++i) {
+ available_buffer_indices_.push_back(i);
+ }
+ }
- ~SimulatedChannel() { CHECK_EQ(0u, fetchers_.size()); }
+ ~SimulatedChannel() {
+ latest_message_.reset();
+ CHECK_EQ(static_cast<size_t>(number_buffers()),
+ available_buffer_indices_.size());
+ CHECK_EQ(0u, fetchers_.size());
+ CHECK_EQ(0u, watchers_.size());
+ CHECK_EQ(0, sender_count_);
+ }
+
+ // The number of messages we pretend to have in the queue.
+ int queue_size() const {
+ return channel()->frequency() *
+ std::chrono::duration_cast<std::chrono::duration<double>>(
+ channel_storage_duration_)
+ .count();
+ }
+
+ // The number of extra buffers (beyond the queue) we pretend to have.
+ int number_scratch_buffers() const {
+ // We need to start creating messages before we know how many
+ // senders+readers we'll have, so we need to just pick something which is
+ // always big enough.
+ return 50;
+ }
+
+ int number_buffers() const { return queue_size() + number_scratch_buffers(); }
+
+ int GetBufferIndex() {
+ CHECK(!available_buffer_indices_.empty()) << ": This should be impossible";
+ const int result = available_buffer_indices_.back();
+ available_buffer_indices_.pop_back();
+ return result;
+ }
+
+ void FreeBufferIndex(int i) {
+ DCHECK(std::find(available_buffer_indices_.begin(),
+ available_buffer_indices_.end(),
+ i) == available_buffer_indices_.end())
+ << ": Buffer is not in use: " << i;
+ available_buffer_indices_.push_back(i);
+ }
// Makes a connected raw sender which calls Send below.
::std::unique_ptr<RawSender> MakeRawSender(EventLoop *event_loop);
@@ -103,6 +176,7 @@
const Channel *channel() const { return channel_; }
void CountSenderCreated() {
+ CheckBufferCount();
if (sender_count_ >= channel()->num_senders()) {
LOG(FATAL) << "Failed to create sender on "
<< configuration::CleanedChannelToString(channel())
@@ -116,7 +190,11 @@
}
private:
- const Channel *channel_;
+ void CheckBufferCount() { CHECK_LT(sender_count_, number_scratch_buffers()); }
+
+ const Channel *const channel_;
+ EventScheduler *const scheduler_;
+ const std::chrono::nanoseconds channel_storage_duration_;
// List of all watchers.
::std::vector<SimulatedWatcher *> watchers_;
@@ -124,24 +202,36 @@
// List of all fetchers.
::std::vector<SimulatedFetcher *> fetchers_;
std::shared_ptr<SimulatedMessage> latest_message_;
- EventScheduler *scheduler_;
ipc_lib::QueueIndex next_queue_index_;
int sender_count_ = 0;
+
+ std::vector<uint16_t> available_buffer_indices_;
};
namespace {
-// Creates a SimulatedMessage with size bytes of storage.
-// This is a shared_ptr so we don't have to implement refcounting or copying.
-std::shared_ptr<SimulatedMessage> MakeSimulatedMessage(size_t size) {
- SimulatedMessage *message = reinterpret_cast<SimulatedMessage *>(
+std::shared_ptr<SimulatedMessage> SimulatedMessage::Make(
+ SimulatedChannel *channel) {
+ const size_t size = channel->max_size();
+ SimulatedMessage *const message = reinterpret_cast<SimulatedMessage *>(
malloc(sizeof(SimulatedMessage) + size + kChannelDataAlignment - 1));
+ new (message) SimulatedMessage(channel);
message->context.size = size;
message->context.data = message->data(size);
- return std::shared_ptr<SimulatedMessage>(message, free);
+ return std::shared_ptr<SimulatedMessage>(message,
+ &SimulatedMessage::DestroyAndFree);
+}
+
+SimulatedMessage::SimulatedMessage(SimulatedChannel *channel_in)
+ : channel(channel_in) {
+ buffer_index = channel->GetBufferIndex();
+}
+
+SimulatedMessage::~SimulatedMessage() {
+ channel->FreeBufferIndex(buffer_index);
}
class SimulatedSender : public RawSender {
@@ -156,7 +246,7 @@
void *data() override {
if (!message_) {
- message_ = MakeSimulatedMessage(simulated_channel_->max_size());
+ message_ = SimulatedMessage::Make(simulated_channel_);
}
return message_->data(simulated_channel_->max_size());
}
@@ -196,7 +286,7 @@
// This is wasteful, but since flatbuffers fill from the back end of the
// queue, we need it to be full sized.
- message_ = MakeSimulatedMessage(simulated_channel_->max_size());
+ message_ = SimulatedMessage::Make(simulated_channel_);
// Now fill in the message. size is already populated above, and
// queue_index will be populated in simulated_channel_. Put this at the
@@ -230,6 +320,8 @@
return std::make_pair(false, monotonic_clock::min_time);
}
+ CHECK(!fell_behind_) << ": Got behind";
+
SetMsg(msgs_.front());
msgs_.pop_front();
return std::make_pair(true, event_loop()->monotonic_now());
@@ -251,6 +343,7 @@
// latest message from before we started.
SetMsg(msgs_.back());
msgs_.clear();
+ fell_behind_ = false;
return std::make_pair(true, event_loop()->monotonic_now());
}
@@ -275,6 +368,14 @@
// Internal method for Simulation to add a message to the buffer.
void Enqueue(std::shared_ptr<SimulatedMessage> buffer) {
msgs_.emplace_back(buffer);
+ if (fell_behind_ ||
+ msgs_.size() > static_cast<size_t>(simulated_channel_->queue_size())) {
+ fell_behind_ = true;
+ // Might as well empty out all the intermediate messages now.
+ while (msgs_.size() > 1) {
+ msgs_.pop_front();
+ }
+ }
}
SimulatedChannel *simulated_channel_;
@@ -282,6 +383,9 @@
// Messages queued up but not in use.
::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
+
+ // Whether we're currently "behind", which means a FetchNext call will fail.
+ bool fell_behind_ = false;
};
class SimulatedTimerHandler : public TimerHandler {
@@ -332,8 +436,7 @@
class SimulatedEventLoop : public EventLoop {
public:
explicit SimulatedEventLoop(
- EventScheduler *scheduler,
- NodeEventLoopFactory *node_event_loop_factory,
+ EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
*channels,
const Configuration *configuration,
@@ -532,8 +635,10 @@
if (it == channels_->end()) {
it = channels_
->emplace(SimpleChannel(channel),
- std::unique_ptr<SimulatedChannel>(
- new SimulatedChannel(channel, scheduler_)))
+ std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
+ channel, scheduler_,
+ std::chrono::nanoseconds(
+ configuration()->channel_storage_duration()))))
.first;
}
return it->second.get();
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 030054a..165f617 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -34,7 +34,9 @@
LocklessQueueMemory *const memory_;
};
-void Cleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
+// Returns true if it succeeded. Returns false if another sender died in the
+// middle.
+bool DoCleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
// Make sure we start looking at shared memory fresh right now. We'll handle
// people dying partway through by either cleaning up after them or not, but
// we want to ensure we clean up after anybody who has already died when we
@@ -62,6 +64,8 @@
// queue is active while we do this, it may take a couple of go arounds to see
// everything.
+ ::std::vector<bool> need_recovery(num_senders, false);
+
// Do the easy case. Find all senders who have died. See if they are either
// consistent already, or if they have copied over to_replace to the scratch
// index, but haven't cleared to_replace. Count them.
@@ -70,65 +74,71 @@
Sender *sender = memory->GetSender(i);
const uint32_t tid =
__atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
- if (tid & FUTEX_OWNER_DIED) {
- VLOG(3) << "Found an easy death for sender " << i;
- // We can do a relaxed load here because we're the only person touching
- // this sender at this point.
- const Index to_replace = sender->to_replace.RelaxedLoad();
- const Index scratch_index = sender->scratch_index.Load();
-
- // I find it easiest to think about this in terms of the set of observable
- // states. The main code progresses through the following states:
-
- // 1) scratch_index = xxx
- // to_replace = invalid
- // This is unambiguous. Already good.
-
- // 2) scratch_index = xxx
- // to_replace = yyy
- // Very ambiguous. Is xxx or yyy the correct one? Need to either roll
- // this forwards or backwards.
-
- // 3) scratch_index = yyy
- // to_replace = yyy
- // We are in the act of moving to_replace to scratch_index, but didn't
- // finish. Easy.
-
- // 4) scratch_index = yyy
- // to_replace = invalid
- // Finished, but died. Looks like 1)
-
- // Any cleanup code needs to follow the same set of states to be robust to
- // death, so death can be restarted.
-
- // Could be 2) or 3).
- if (to_replace.valid()) {
- // 3)
- if (to_replace == scratch_index) {
- // Just need to invalidate to_replace to finish.
- sender->to_replace.Invalidate();
-
- // And mark that we succeeded.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
- ++valid_senders;
- }
- } else {
- // 1) or 4). Make sure we aren't corrupted and declare victory.
- CHECK(scratch_index.valid());
-
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
- ++valid_senders;
- }
- } else {
+ if (!(tid & FUTEX_OWNER_DIED)) {
// Not dead.
++valid_senders;
+ continue;
}
+ VLOG(3) << "Found an easy death for sender " << i;
+ // We can do a relaxed load here because we're the only person touching
+ // this sender at this point.
+ const Index to_replace = sender->to_replace.RelaxedLoad();
+ const Index scratch_index = sender->scratch_index.Load();
+
+ // I find it easiest to think about this in terms of the set of observable
+ // states. The main code progresses through the following states:
+
+ // 1) scratch_index = xxx
+ // to_replace = invalid
+ // This is unambiguous. Already good.
+
+ // 2) scratch_index = xxx
+ // to_replace = yyy
+ // Very ambiguous. Is xxx or yyy the correct one? Need to either roll
+ // this forwards or backwards.
+
+ // 3) scratch_index = yyy
+ // to_replace = yyy
+ // We are in the act of moving to_replace to scratch_index, but didn't
+ // finish. Easy.
+
+ // 4) scratch_index = yyy
+ // to_replace = invalid
+ // Finished, but died. Looks like 1)
+
+ // Any cleanup code needs to follow the same set of states to be robust to
+ // death, so death can be restarted.
+
+ if (!to_replace.valid()) {
+ // 1) or 4). Make sure we aren't corrupted and declare victory.
+ CHECK(scratch_index.valid());
+
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ ++valid_senders;
+ continue;
+ }
+
+ // Could be 2) or 3) at this point.
+
+ if (to_replace == scratch_index) {
+ // 3) for sure.
+ // Just need to invalidate to_replace to finish.
+ sender->to_replace.Invalidate();
+
+ // And mark that we succeeded.
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ ++valid_senders;
+ continue;
+ }
+
+ // Must be 2). Mark it for later.
+ need_recovery[i] = true;
}
// If all the senders are (or were made) good, there is no need to do the hard
// case.
if (valid_senders == num_senders) {
- return;
+ return true;
}
VLOG(3) << "Starting hard cleanup";
@@ -144,8 +154,12 @@
const uint32_t tid =
__atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
if (tid & FUTEX_OWNER_DIED) {
+ if (!need_recovery[i]) {
+ return false;
+ }
++num_missing;
} else {
+ CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
// We can do a relaxed load here because we're the only person touching
// this sender at this point, if it matters. If it's not a dead sender,
// then any message it every has will already be accounted for, so this
@@ -166,6 +180,8 @@
}
accounted_for[index.message_index()] = true;
}
+
+ CHECK_LE(num_accounted_for + num_missing, num_messages);
}
while (num_missing != 0) {
@@ -174,67 +190,83 @@
Sender *sender = memory->GetSender(i);
const uint32_t tid =
__atomic_load_n(&(sender->tid.futex), __ATOMIC_ACQUIRE);
- if (tid & FUTEX_OWNER_DIED) {
- // We can do relaxed loads here because we're the only person touching
- // this sender at this point.
- const Index scratch_index = sender->scratch_index.RelaxedLoad();
- const Index to_replace = sender->to_replace.RelaxedLoad();
+ if (!(tid & FUTEX_OWNER_DIED)) {
+ CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
+ continue;
+ }
+ if (!need_recovery[i]) {
+ return false;
+ }
+ // We can do relaxed loads here because we're the only person touching
+ // this sender at this point.
+ const Index scratch_index = sender->scratch_index.RelaxedLoad();
+ const Index to_replace = sender->to_replace.RelaxedLoad();
- // Candidate.
- if (to_replace.valid()) {
- CHECK_LE(to_replace.message_index(), accounted_for.size());
- }
- if (scratch_index.valid()) {
- CHECK_LE(scratch_index.message_index(), accounted_for.size());
- }
- if (!to_replace.valid() || accounted_for[to_replace.message_index()]) {
- CHECK(scratch_index.valid());
- VLOG(3) << "Sender " << i
- << " died, to_replace is already accounted for";
- // If both are accounted for, we are corrupt...
- CHECK(!accounted_for[scratch_index.message_index()]);
+ // Candidate.
+ if (to_replace.valid()) {
+ CHECK_LE(to_replace.message_index(), accounted_for.size());
+ }
+ if (scratch_index.valid()) {
+ CHECK_LE(scratch_index.message_index(), accounted_for.size());
+ }
+ if (!to_replace.valid() || accounted_for[to_replace.message_index()]) {
+ CHECK(scratch_index.valid());
+ VLOG(3) << "Sender " << i
+ << " died, to_replace is already accounted for";
+ // If both are accounted for, we are corrupt...
+ CHECK(!accounted_for[scratch_index.message_index()]);
- // to_replace is already accounted for. This means that we didn't
- // atomically insert scratch_index into the queue yet. So
- // invalidate to_replace.
- sender->to_replace.Invalidate();
+ // to_replace is already accounted for. This means that we didn't
+ // atomically insert scratch_index into the queue yet. So
+ // invalidate to_replace.
+ sender->to_replace.Invalidate();
- // And then mark this sender clean.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ // And then mark this sender clean.
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ need_recovery[i] = false;
- // And account for scratch_index.
- accounted_for[scratch_index.message_index()] = true;
- --num_missing;
- ++num_accounted_for;
- } else if (!scratch_index.valid() ||
- accounted_for[scratch_index.message_index()]) {
- VLOG(3) << "Sender " << i
- << " died, scratch_index is already accounted for";
- // scratch_index is accounted for. That means we did the insert,
- // but didn't record it.
- CHECK(to_replace.valid());
- // Finish the transaction. Copy to_replace, then clear it.
+ // And account for scratch_index.
+ accounted_for[scratch_index.message_index()] = true;
+ --num_missing;
+ ++num_accounted_for;
+ } else if (!scratch_index.valid() ||
+ accounted_for[scratch_index.message_index()]) {
+ VLOG(3) << "Sender " << i
+ << " died, scratch_index is already accounted for";
+ // scratch_index is accounted for. That means we did the insert,
+ // but didn't record it.
+ CHECK(to_replace.valid());
+ // Finish the transaction. Copy to_replace, then clear it.
- sender->scratch_index.Store(to_replace);
- sender->to_replace.Invalidate();
+ sender->scratch_index.Store(to_replace);
+ sender->to_replace.Invalidate();
- // And then mark this sender clean.
- __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ // And then mark this sender clean.
+ __atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
+ need_recovery[i] = false;
- // And account for to_replace.
- accounted_for[to_replace.message_index()] = true;
- --num_missing;
- ++num_accounted_for;
- } else {
- VLOG(3) << "Sender " << i << " died, neither is accounted for";
- // Ambiguous. There will be an unambiguous one somewhere that we
- // can do first.
- }
+ // And account for to_replace.
+ accounted_for[to_replace.message_index()] = true;
+ --num_missing;
+ ++num_accounted_for;
+ } else {
+ VLOG(3) << "Sender " << i << " died, neither is accounted for";
+ // Ambiguous. There will be an unambiguous one somewhere that we
+ // can do first.
}
}
// CHECK that we are making progress.
CHECK_NE(num_missing, starting_num_missing);
}
+ return true;
+}
+
+void Cleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &lock) {
+ // The number of iterations is bounded here because there are only a finite
+ // number of senders in existence which could die, and no new ones can be
+ // created while we're in here holding the lock.
+ while (!DoCleanup(memory, lock)) {
+ }
}
// Exposes rt_tgsigqueueinfo so we can send the signal *just* to the target