Expose a unique index for each event loop buffer
This can be helpful for indexing into other datastructures based on the
messages, by giving an identifier which will be unique as long as the
message is pinned.
Change-Id: I49ce18fba25a796005e64b40e5d1d5c55ca15543
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index b33fe98..e45c065 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -53,6 +53,8 @@
FLAGS_shm_base = std::string(base) + "/dev/shm/aos";
}
+namespace {
+
std::string ShmFolder(const Channel *channel) {
CHECK(channel->has_name());
CHECK_EQ(channel->name()->string_view()[0], '/');
@@ -88,21 +90,28 @@
}
}
+ipc_lib::LocklessQueueConfiguration MakeQueueConfiguration(
+ const Channel *channel, std::chrono::seconds channel_storage_duration) {
+ ipc_lib::LocklessQueueConfiguration config;
+
+ config.num_watchers = channel->num_watchers();
+ config.num_senders = channel->num_senders();
+ // The value in the channel will default to 0 if readers are configured to
+ // copy.
+ config.num_pinners = channel->num_readers();
+ config.queue_size = channel_storage_duration.count() * channel->frequency();
+ config.message_data_size = channel->max_size();
+
+ return config;
+}
+
class MMapedQueue {
public:
MMapedQueue(const Channel *channel,
- const std::chrono::seconds channel_storage_duration) {
+ std::chrono::seconds channel_storage_duration)
+ : config_(MakeQueueConfiguration(channel, channel_storage_duration)) {
std::string path = ShmPath(channel);
- config_.num_watchers = channel->num_watchers();
- config_.num_senders = channel->num_senders();
- // The value in the channel will default to 0 if readers are configured to
- // copy.
- config_.num_pinners = channel->num_readers();
- config_.queue_size =
- channel_storage_duration.count() * channel->frequency();
- config_.message_data_size = channel->max_size();
-
size_ = ipc_lib::LocklessQueueMemorySize(config_);
util::MkdirP(path, FLAGS_permissions);
@@ -110,7 +119,7 @@
// There are 2 cases. Either the file already exists, or it does not
// already exist and we need to create it. Start by trying to create it. If
// that fails, the file has already been created and we can open it
- // normally.. Once the file has been created it wil never be deleted.
+ // normally.. Once the file has been created it will never be deleted.
int fd = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
O_CLOEXEC | FLAGS_permissions);
if (fd == -1 && errno == EEXIST) {
@@ -160,14 +169,12 @@
}
private:
- ipc_lib::LocklessQueueConfiguration config_;
+ const ipc_lib::LocklessQueueConfiguration config_;
size_t size_;
void *data_;
};
-namespace {
-
const Node *MaybeMyNode(const Configuration *configuration) {
if (!configuration->has_nodes()) {
return nullptr;
@@ -320,11 +327,15 @@
if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
if (pin_data()) {
- CHECK(pinner_->PinIndex(queue_index.index()))
+ const int pin_result = pinner_->PinIndex(queue_index.index());
+ CHECK(pin_result >= 0)
<< ": Got behind while reading and the last message was modified "
"out from under us while we tried to pin it. Don't get so far "
"behind on: "
<< configuration::CleanedChannelToString(channel_);
+ context_.buffer_index = pin_result;
+ } else {
+ context_.buffer_index = -1;
}
context_.queue_index = queue_index.index();
@@ -501,6 +512,8 @@
return lockless_queue_memory_.GetSharedMemory();
}
+ int buffer_index() override { return lockless_queue_sender_.buffer_index(); }
+
private:
MMapedQueue lockless_queue_memory_;
ipc_lib::LocklessQueue lockless_queue_;
@@ -994,6 +1007,13 @@
return watcher_state->GetSharedMemory();
}
+int ShmEventLoop::NumberBuffers(const Channel *channel) {
+ return MakeQueueConfiguration(
+ channel, chrono::ceil<chrono::seconds>(chrono::nanoseconds(
+ configuration()->channel_storage_duration())))
+ .num_messages();
+}
+
absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
const aos::RawSender *sender) const {
return static_cast<const ShmSender *>(sender)->GetSharedMemory();