Merge changes Ia95acd32,I714c3f06,I5b30c3b0
* changes:
Update prints to have which node they are from
Move code to timestamp_filter.cc
Make gmp build, and run all the tests
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 927e9d3..834ab5b 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -299,6 +299,11 @@
continue;
}
+ CHECK_EQ(c->read_method() == ReadMethod::PIN, c->num_readers() != 0)
+ << ": num_readers may be set if and only if read_method is PIN,"
+ " if you want 0 readers do not set PIN: "
+ << CleanedChannelToString(c);
+
// Attempt to insert the channel.
auto result = channels.insert(CopyFlatBuffer(c));
if (!result.second) {
diff --git a/aos/configuration.fbs b/aos/configuration.fbs
index 31d89e7..9a24c8a 100644
--- a/aos/configuration.fbs
+++ b/aos/configuration.fbs
@@ -42,6 +42,13 @@
time_to_live:uint = 0;
}
+enum ReadMethod : ubyte {
+ // Copy all the data out of shared memory into a local buffer for each reader.
+ COPY,
+ // Pin the data in shared memory and read directly from there.
+ PIN,
+}
+
// Table representing a channel. Channels are where data is published and
// subscribed from. The tuple of name, type is the identifying information.
table Channel {
@@ -78,6 +85,14 @@
// node responsible for logging it. Empty implies the node this connection
// is connecting to (i.e. name).
logger_nodes:[string];
+
+ // The way messages are read from shared memory for this channel.
+ read_method:ReadMethod = COPY;
+
+ // Sets the maximum number of senders on a channel.
+ //
+ // Currently, this must be set if and only if read_method is PIN.
+ num_readers:int;
}
// Table to support renaming channel names.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 39a6a54..0212eaa 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -243,7 +243,7 @@
cc_test(
name = "shm_event_loop_test",
srcs = ["shm_event_loop_test.cc"],
- shard_count = 5,
+ shard_count = 16,
deps = [
":event_loop_param_test",
":shm_event_loop",
@@ -267,6 +267,7 @@
name = "simulated_event_loop_test",
srcs = ["simulated_event_loop_test.cc"],
data = ["multinode_pingpong_config.json"],
+ shard_count = 4,
deps = [
":event_loop_param_test",
":ping_lib",
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index e69fd0d..5c7e49d 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -33,6 +33,7 @@
context_.queue_index = 0xffffffff;
context_.size = 0;
context_.data = nullptr;
+ context_.buffer_index = -1;
event_loop_->NewFetcher(this);
}
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index fa78953..c922f2e 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -48,6 +48,7 @@
realtime_clock::time_point realtime_remote_time;
// The rest are only valid for Watchers and Fetchers.
+
// Index in the queue.
uint32_t queue_index;
// Index into the remote queue. Useful to determine if data was lost. In a
@@ -59,6 +60,20 @@
// Pointer to the data.
const void *data;
+ // Index of the message buffer. This will be in [0, NumberBuffers) on
+ // read_method=PIN channels, and -1 for other channels.
+ //
+ // This only tells you about the underlying storage for this message, not
+ // anything about its position in the queue. This is only useful for advanced
+ // zero-copy use cases, on read_method=PIN channels.
+ //
+ // This will uniquely identify a message on this channel at a point in time.
+ // For senders, this point in time is while the sender has the message. With
+ // read_method==PIN, this point in time includes while the caller has access
+ // to this context. For other read_methods, this point in time may be before
+ // the caller has access to this context, which makes this pretty useless.
+ int buffer_index;
+
// Efficiently coppies the flatbuffer into a FlatbufferVector, allocating
// memory in the process. It is vital that T matches the type of the
// underlying flatbuffer.
@@ -166,6 +181,10 @@
return &fbb_allocator_;
}
+ // Index of the buffer which is currently exposed by data() and the various
+ // other accessors. This is the message the caller should be filling out.
+ virtual int buffer_index() = 0;
+
protected:
EventLoop *event_loop() { return event_loop_; }
@@ -351,13 +370,17 @@
// Returns the queue index that this was sent with.
uint32_t sent_queue_index() const { return sender_->sent_queue_index(); }
+ // Returns the buffer index which MakeBuilder() will expose access to. This is
+ // the buffer the caller can fill out.
+ int buffer_index() const { return sender_->buffer_index(); }
+
private:
friend class EventLoop;
Sender(std::unique_ptr<RawSender> sender) : sender_(std::move(sender)) {}
std::unique_ptr<RawSender> sender_;
};
-// Interface for timers
+// Interface for timers.
class TimerHandler {
public:
virtual ~TimerHandler();
@@ -604,6 +627,7 @@
MakeRawWatcher(channel, [watcher](const Context &context, const void *) {
Context new_context = context;
new_context.data = nullptr;
+ new_context.buffer_index = -1;
watcher(new_context);
});
}
@@ -622,9 +646,13 @@
// Prevents the event loop from sending a timing report.
void SkipTimingReport() { skip_timing_report_ = true; }
- // Prevents AOS_LOG being sent to message on /aos
+ // Prevents AOS_LOG being sent to message on /aos.
void SkipAosLog() { skip_logger_ = true; }
+ // Returns the number of buffers for this channel. This corresponds with the
+ // range of Context::buffer_index values for this channel.
+ virtual int NumberBuffers(const Channel *channel) = 0;
+
protected:
// Sets the name of the event loop. This is the application name.
virtual void set_name(const std::string_view name) = 0;
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 8d0d0e2..993011f 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -1,8 +1,9 @@
#include "aos/events/event_loop_param_test.h"
#include <chrono>
+#include <unordered_map>
+#include <unordered_set>
-#include "aos/events/test_message_generated.h"
#include "aos/flatbuffer_merge.h"
#include "glog/logging.h"
#include "gmock/gmock.h"
@@ -14,6 +15,54 @@
namespace chrono = ::std::chrono;
} // namespace
+::std::unique_ptr<EventLoop> AbstractEventLoopTest::Make(
+ std::string_view name) {
+ std::string name_copy(name);
+ if (name == "") {
+ name_copy = "loop";
+ name_copy += std::to_string(event_loop_count_);
+ }
+ ++event_loop_count_;
+ return factory_->Make(name_copy);
+}
+
+void AbstractEventLoopTest::VerifyBuffers(
+ int number_buffers,
+ std::vector<std::reference_wrapper<const Fetcher<TestMessage>>> fetchers,
+ std::vector<std::reference_wrapper<const Sender<TestMessage>>> senders) {
+ // The buffers which are in a sender.
+ std::unordered_set<int> in_sender;
+ for (const Sender<TestMessage> &sender : senders) {
+ const int this_buffer = sender.buffer_index();
+ CHECK_GE(this_buffer, 0);
+ CHECK_LT(this_buffer, number_buffers);
+ CHECK(in_sender.insert(this_buffer).second) << ": " << this_buffer;
+ }
+
+ if (read_method() != ReadMethod::PIN) {
+ // If we're not using PIN, we can't really verify anything about what
+ // buffers the fetchers have.
+ return;
+ }
+
+ // Mapping from TestMessage::value to buffer index.
+ std::unordered_map<int, int> fetcher_values;
+ for (const Fetcher<TestMessage> &fetcher : fetchers) {
+ if (!fetcher.get()) {
+ continue;
+ }
+ const int this_buffer = fetcher.context().buffer_index;
+ CHECK_GE(this_buffer, 0);
+ CHECK_LT(this_buffer, number_buffers);
+ CHECK(in_sender.count(this_buffer) == 0) << ": " << this_buffer;
+ const auto insert_result = fetcher_values.insert(
+ std::make_pair(fetcher.get()->value(), this_buffer));
+ if (!insert_result.second) {
+ CHECK_EQ(this_buffer, insert_result.first->second);
+ }
+ }
+}
+
// Tests that watcher can receive messages from a sender.
// Also tests that OnRun() works.
TEST_P(AbstractEventLoopTest, Basic) {
@@ -92,6 +141,7 @@
loop2->MakeNoArgWatcher<TestMessage>("/test", [&]() {
EXPECT_GT(loop2->context().size, 0u);
EXPECT_EQ(nullptr, loop2->context().data);
+ EXPECT_EQ(-1, loop2->context().buffer_index);
this->Exit();
});
@@ -222,6 +272,7 @@
EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
EXPECT_EQ(fetcher.context().size, 0u);
EXPECT_EQ(fetcher.context().data, nullptr);
+ EXPECT_EQ(fetcher.context().buffer_index, -1);
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
@@ -248,6 +299,13 @@
EXPECT_EQ(fetcher.context().queue_index, 0x0u);
EXPECT_EQ(fetcher.context().size, 20u);
EXPECT_NE(fetcher.context().data, nullptr);
+ if (read_method() == ReadMethod::PIN) {
+ EXPECT_GE(fetcher.context().buffer_index, 0);
+ EXPECT_LT(fetcher.context().buffer_index,
+ loop2->NumberBuffers(fetcher.channel()));
+ } else {
+ EXPECT_EQ(fetcher.context().buffer_index, -1);
+ }
}
// Tests that watcher will receive all messages sent if they are sent after
@@ -560,6 +618,105 @@
EXPECT_EQ(200, fetcher.get()->value());
}
+// Verify that a fetcher still holds its data, even after falling behind.
+TEST_P(AbstractEventLoopTest, FetcherBehindData) {
+ auto send_loop = Make();
+ auto fetch_loop = Make();
+ auto sender = send_loop->MakeSender<TestMessage>("/test");
+ Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test");
+ {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(1);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ }
+ ASSERT_TRUE(fetcher.Fetch());
+ EXPECT_EQ(1, fetcher.get()->value());
+ for (int i = 0; i < 300; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(i + 2);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ }
+ EXPECT_EQ(1, fetcher.get()->value());
+}
+
+// Try a bunch of orderings of operations with fetchers and senders. Verify that
+// all the fetchers have the correct data at each step.
+TEST_P(AbstractEventLoopTest, FetcherPermutations) {
+ for (int max_save = 0; max_save < 5; ++max_save) {
+ SCOPED_TRACE("max_save=" + std::to_string(max_save));
+
+ auto send_loop = Make();
+ auto fetch_loop = Make();
+ auto sender = send_loop->MakeSender<TestMessage>("/test");
+ const auto send_message = [&sender](int i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(i);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ };
+ std::vector<Fetcher<TestMessage>> fetchers;
+ for (int i = 0; i < 10; ++i) {
+ fetchers.emplace_back(fetch_loop->MakeFetcher<TestMessage>("/test"));
+ }
+ send_message(1);
+ const auto verify_buffers = [&]() {
+ std::vector<std::reference_wrapper<const Fetcher<TestMessage>>>
+ fetchers_copy;
+ for (const auto &fetcher : fetchers) {
+ fetchers_copy.emplace_back(fetcher);
+ }
+ std::vector<std::reference_wrapper<const Sender<TestMessage>>>
+ senders_copy;
+ senders_copy.emplace_back(sender);
+ VerifyBuffers(send_loop->NumberBuffers(sender.channel()), fetchers_copy,
+ senders_copy);
+ };
+ for (auto &fetcher : fetchers) {
+ ASSERT_TRUE(fetcher.Fetch());
+ verify_buffers();
+ EXPECT_EQ(1, fetcher.get()->value());
+ }
+
+ for (int save = 1; save <= max_save; ++save) {
+ SCOPED_TRACE("save=" + std::to_string(save));
+ send_message(100 + save);
+ verify_buffers();
+ for (size_t i = 0; i < fetchers.size() - save; ++i) {
+ SCOPED_TRACE("fetcher=" + std::to_string(i));
+ ASSERT_TRUE(fetchers[i].Fetch());
+ verify_buffers();
+ EXPECT_EQ(100 + save, fetchers[i].get()->value());
+ }
+ for (size_t i = fetchers.size() - save; i < fetchers.size() - 1; ++i) {
+ SCOPED_TRACE("fetcher=" + std::to_string(i));
+ EXPECT_EQ(100 + (fetchers.size() - 1 - i), fetchers[i].get()->value());
+ }
+ EXPECT_EQ(1, fetchers.back().get()->value());
+ }
+
+ for (int i = 0; i < 300; ++i) {
+ send_message(200 + i);
+ verify_buffers();
+ }
+
+ for (size_t i = 0; i < fetchers.size() - max_save; ++i) {
+ SCOPED_TRACE("fetcher=" + std::to_string(i));
+ if (max_save > 0) {
+ EXPECT_EQ(100 + max_save, fetchers[i].get()->value());
+ } else {
+ EXPECT_EQ(1, fetchers[i].get()->value());
+ }
+ }
+ for (size_t i = fetchers.size() - max_save; i < fetchers.size() - 1; ++i) {
+ SCOPED_TRACE("fetcher=" + std::to_string(i));
+ EXPECT_EQ(100 + (fetchers.size() - 1 - i), fetchers[i].get()->value());
+ }
+ EXPECT_EQ(1, fetchers.back().get()->value());
+ }
+}
+
// Verify that making a fetcher and watcher for "/test" succeeds.
TEST_P(AbstractEventLoopTest, FetcherAndWatcher) {
auto loop = Make();
@@ -642,7 +799,80 @@
}
EXPECT_DEATH({ loop->MakeSender<TestMessage>("/test"); },
"Failed to create sender on \\{ \"name\": \"/test\", \"type\": "
- "\"aos.TestMessage\" \\}, too many senders.");
+ "\"aos.TestMessage\"[^}]*\\ }, too many senders.");
+}
+
+// Verify that creating too many fetchers fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyFetchers) {
+ if (read_method() != ReadMethod::PIN) {
+ // Other read methods don't limit the number of readers, so just skip this.
+ return;
+ }
+
+ auto loop = Make();
+ std::vector<aos::Fetcher<TestMessage>> fetchers;
+ for (int i = 0; i < 10; ++i) {
+ fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+ }
+ EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+ "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+ "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many fetchers, split between two event loops, fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyFetchersTwoLoops) {
+ if (read_method() != ReadMethod::PIN) {
+ // Other read methods don't limit the number of readers, so just skip this.
+ return;
+ }
+
+ auto loop = Make();
+ auto loop2 = Make();
+ std::vector<aos::Fetcher<TestMessage>> fetchers;
+ for (int i = 0; i < 5; ++i) {
+ fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+ fetchers.emplace_back(loop2->MakeFetcher<TestMessage>("/test"));
+ }
+ EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+ "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+ "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many watchers fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyWatchers) {
+ if (read_method() != ReadMethod::PIN) {
+ // Other read methods don't limit the number of readers, so just skip this.
+ return;
+ }
+
+ std::vector<std::unique_ptr<EventLoop>> loops;
+ for (int i = 0; i < 10; ++i) {
+ loops.emplace_back(Make());
+ loops.back()->MakeWatcher("/test", [](const TestMessage &) {});
+ }
+ EXPECT_DEATH({ Make()->MakeWatcher("/test", [](const TestMessage &) {}); },
+ "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+ "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
+}
+
+// Verify that creating too many watchers and fetchers combined fails.
+TEST_P(AbstractEventLoopDeathTest, TooManyWatchersAndFetchers) {
+ if (read_method() != ReadMethod::PIN) {
+ // Other read methods don't limit the number of readers, so just skip this.
+ return;
+ }
+
+ auto loop = Make();
+ std::vector<aos::Fetcher<TestMessage>> fetchers;
+ std::vector<std::unique_ptr<EventLoop>> loops;
+ for (int i = 0; i < 5; ++i) {
+ fetchers.emplace_back(loop->MakeFetcher<TestMessage>("/test"));
+ loops.emplace_back(Make());
+ loops.back()->MakeWatcher("/test", [](const TestMessage &) {});
+ }
+ EXPECT_DEATH({ loop->MakeFetcher<TestMessage>("/test"); },
+ "Failed to create reader on \\{ \"name\": \"/test\", \"type\": "
+ "\"aos.TestMessage\"[^}]*\\ }, too many readers.");
}
// Verify that we can't create a sender inside OnRun.
@@ -723,6 +953,7 @@
EXPECT_EQ(loop->context().queue_index, 0xffffffffu);
EXPECT_EQ(loop->context().size, 0u);
EXPECT_EQ(loop->context().data, nullptr);
+ EXPECT_EQ(loop->context().buffer_index, -1);
expected_times.push_back(loop->context().monotonic_event_time);
if (times.size() == kCount) {
@@ -965,6 +1196,15 @@
EXPECT_LT(&msg, reinterpret_cast<const void *>(
reinterpret_cast<const char *>(loop1->context().data) +
loop1->context().size));
+ if (read_method() == ReadMethod::PIN) {
+ EXPECT_GE(loop1->context().buffer_index, 0);
+ EXPECT_LT(loop1->context().buffer_index,
+ loop1->NumberBuffers(
+ configuration::GetChannel(loop1->configuration(), "/test",
+ "aos.TestMessage", "", nullptr)));
+ } else {
+ EXPECT_EQ(-1, loop1->context().buffer_index);
+ }
triggered = true;
});
@@ -1137,6 +1377,7 @@
EXPECT_EQ(loop1->context().queue_index, 0xffffffffu);
EXPECT_EQ(loop1->context().size, 0u);
EXPECT_EQ(loop1->context().data, nullptr);
+ EXPECT_EQ(loop1->context().buffer_index, -1);
if (times.size() == kCount) {
LOG(INFO) << "Exiting";
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index cbd5cd1..faf6361 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -1,10 +1,12 @@
#ifndef _AOS_EVENTS_EVENT_LOOP_PARAM_TEST_H_
#define _AOS_EVENTS_EVENT_LOOP_PARAM_TEST_H_
+#include <initializer_list>
#include <string_view>
#include <vector>
#include "aos/events/event_loop.h"
+#include "aos/events/test_message_generated.h"
#include "aos/flatbuffers.h"
#include "aos/json_to_flatbuffer.h"
#include "gtest/gtest.h"
@@ -15,31 +17,30 @@
class EventLoopTestFactory {
public:
EventLoopTestFactory()
- : flatbuffer_(JsonToFlatbuffer("{\n"
- " \"channels\": [ \n"
- " {\n"
- " \"name\": \"/aos\",\n"
- " \"type\": \"aos.logging.LogMessageFbs\"\n"
- " },\n"
- " {\n"
- " \"name\": \"/aos\",\n"
- " \"type\": \"aos.timing.Report\"\n"
- " },\n"
- " {\n"
- " \"name\": \"/test\",\n"
- " \"type\": \"aos.TestMessage\"\n"
- " },\n"
- " {\n"
- " \"name\": \"/test1\",\n"
- " \"type\": \"aos.TestMessage\"\n"
- " },\n"
- " {\n"
- " \"name\": \"/test2\",\n"
- " \"type\": \"aos.TestMessage\"\n"
- " }\n"
- " ]\n"
- "}\n",
- Configuration::MiniReflectTypeTable())) {}
+ : flatbuffer_(JsonToFlatbuffer<Configuration>(R"config({
+ "channels": [
+ {
+ "name": "/aos",
+ "type": "aos.logging.LogMessageFbs"
+ },
+ {
+ "name": "/aos",
+ "type": "aos.timing.Report"
+ },
+ {
+ "name": "/test",
+ "type": "aos.TestMessage"
+ },
+ {
+ "name": "/test1",
+ "type": "aos.TestMessage"
+ },
+ {
+ "name": "/test2",
+ "type": "aos.TestMessage"
+ }
+ ]
+})config")) {}
virtual ~EventLoopTestFactory() {}
@@ -58,8 +59,48 @@
// Advances time by sleeping. Can't be called from inside a loop.
virtual void SleepFor(::std::chrono::nanoseconds duration) = 0;
+ void PinReads() {
+ static const std::string kJson = R"config({
+ "channels": [
+ {
+ "name": "/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "read_method": "PIN",
+ "num_readers": 10
+ },
+ {
+ "name": "/aos",
+ "type": "aos.timing.Report",
+ "read_method": "PIN",
+ "num_readers": 10
+ },
+ {
+ "name": "/test",
+ "type": "aos.TestMessage",
+ "read_method": "PIN",
+ "num_readers": 10
+ },
+ {
+ "name": "/test1",
+ "type": "aos.TestMessage",
+ "read_method": "PIN",
+ "num_readers": 10
+ },
+ {
+ "name": "/test2",
+ "type": "aos.TestMessage",
+ "read_method": "PIN",
+ "num_readers": 10
+ }
+ ]
+})config";
+
+ flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
+ JsonToFlatbuffer(kJson, Configuration::MiniReflectTypeTable()));
+ }
+
void EnableNodes(std::string_view my_node) {
- std::string json = R"config({
+ static const std::string kJson = R"config({
"channels": [
{
"name": "/aos/me",
@@ -127,7 +168,7 @@
})config";
flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
- JsonToFlatbuffer(json, Configuration::MiniReflectTypeTable()));
+ JsonToFlatbuffer(kJson, Configuration::MiniReflectTypeTable()));
my_node_ = configuration::GetNode(&flatbuffer_.message(), my_node);
}
@@ -142,20 +183,20 @@
const Node *my_node_ = nullptr;
};
-class AbstractEventLoopTestBase
- : public ::testing::TestWithParam<std::function<EventLoopTestFactory *()>> {
+class AbstractEventLoopTest
+ : public ::testing::TestWithParam<
+ std::tuple<std::function<EventLoopTestFactory *()>, ReadMethod>> {
public:
- AbstractEventLoopTestBase() { factory_.reset(GetParam()()); }
-
- ::std::unique_ptr<EventLoop> Make(std::string_view name = "") {
- std::string name_copy(name);
- if (name == "") {
- name_copy = "loop";
- name_copy += std::to_string(event_loop_count_);
+ AbstractEventLoopTest() : factory_(std::get<0>(GetParam())()) {
+ if (read_method() == ReadMethod::PIN) {
+ factory_->PinReads();
}
- ++event_loop_count_;
- return factory_->Make(name_copy);
}
+
+ ReadMethod read_method() const { return std::get<1>(GetParam()); }
+
+ ::std::unique_ptr<EventLoop> Make(std::string_view name = "");
+
::std::unique_ptr<EventLoop> MakePrimary(std::string_view name = "primary") {
++event_loop_count_;
return factory_->MakePrimary(name);
@@ -182,17 +223,20 @@
end_timer->set_name("end");
}
- // You can implement all the usual fixture class members here.
- // To access the test parameter, call GetParam() from class
- // TestWithParam<T>.
+ // Verifies that the buffer_index values for all of the given objects are
+ // consistent.
+ void VerifyBuffers(
+ int number_buffers,
+ std::vector<std::reference_wrapper<const Fetcher<TestMessage>>> fetchers,
+ std::vector<std::reference_wrapper<const Sender<TestMessage>>> senders);
+
private:
- ::std::unique_ptr<EventLoopTestFactory> factory_;
+ const ::std::unique_ptr<EventLoopTestFactory> factory_;
int event_loop_count_ = 0;
};
-typedef AbstractEventLoopTestBase AbstractEventLoopDeathTest;
-typedef AbstractEventLoopTestBase AbstractEventLoopTest;
+using AbstractEventLoopDeathTest = AbstractEventLoopTest;
} // namespace testing
} // namespace aos
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index a39a338..5ca1067 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -177,6 +177,7 @@
event_loop_->context_.queue_index = 0xffffffffu;
event_loop_->context_.size = 0;
event_loop_->context_.data = nullptr;
+ event_loop_->context_.buffer_index = -1;
ftrace_.FormatMessage(
"timer: %.*s: start now=%" PRId64 " event=%" PRId64,
@@ -221,6 +222,7 @@
event_loop_->context_.queue_index = 0xffffffffu;
event_loop_->context_.size = 0;
event_loop_->context_.data = nullptr;
+ event_loop_->context_.buffer_index = -1;
// Compute how many cycles elapsed and schedule the next wakeup.
Reschedule(schedule, monotonic_start_time);
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index d2c9112..6013709 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -53,6 +53,8 @@
FLAGS_shm_base = std::string(base) + "/dev/shm/aos";
}
+namespace {
+
std::string ShmFolder(const Channel *channel) {
CHECK(channel->has_name());
CHECK_EQ(channel->name()->string_view()[0], '/');
@@ -60,10 +62,10 @@
}
std::string ShmPath(const Channel *channel) {
CHECK(channel->has_type());
- return ShmFolder(channel) + channel->type()->str() + ".v2";
+ return ShmFolder(channel) + channel->type()->str() + ".v3";
}
-void PageFaultData(char *data, size_t size) {
+void PageFaultDataWrite(char *data, size_t size) {
// This just has to divide the actual page size. Being smaller will make this
// a bit slower than necessary, but not much. 1024 is a pretty conservative
// choice (most pages are probably 4096).
@@ -88,18 +90,40 @@
}
}
+void PageFaultDataRead(const char *data, size_t size) {
+ // This just has to divide the actual page size. Being smaller will make this
+ // a bit slower than necessary, but not much. 1024 is a pretty conservative
+ // choice (most pages are probably 4096).
+ static constexpr size_t kPageSize = 1024;
+ const size_t pages = (size + kPageSize - 1) / kPageSize;
+ for (size_t i = 0; i < pages; ++i) {
+ // We need to ensure there's a readable pagetable entry.
+ __atomic_load_n(&data[i * kPageSize], __ATOMIC_RELAXED);
+ }
+}
+
+ipc_lib::LocklessQueueConfiguration MakeQueueConfiguration(
+ const Channel *channel, std::chrono::seconds channel_storage_duration) {
+ ipc_lib::LocklessQueueConfiguration config;
+
+ config.num_watchers = channel->num_watchers();
+ config.num_senders = channel->num_senders();
+ // The value in the channel will default to 0 if readers are configured to
+ // copy.
+ config.num_pinners = channel->num_readers();
+ config.queue_size = channel_storage_duration.count() * channel->frequency();
+ config.message_data_size = channel->max_size();
+
+ return config;
+}
+
class MMapedQueue {
public:
MMapedQueue(const Channel *channel,
- const std::chrono::seconds channel_storage_duration) {
+ std::chrono::seconds channel_storage_duration)
+ : config_(MakeQueueConfiguration(channel, channel_storage_duration)) {
std::string path = ShmPath(channel);
- config_.num_watchers = channel->num_watchers();
- config_.num_senders = channel->num_senders();
- config_.queue_size =
- channel_storage_duration.count() * channel->frequency();
- config_.message_data_size = channel->max_size();
-
size_ = ipc_lib::LocklessQueueMemorySize(config_);
util::MkdirP(path, FLAGS_permissions);
@@ -107,7 +131,7 @@
// There are 2 cases. Either the file already exists, or it does not
// already exist and we need to create it. Start by trying to create it. If
// that fails, the file has already been created and we can open it
- // normally.. Once the file has been created it wil never be deleted.
+ // normally.. Once the file has been created it will never be deleted.
int fd = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
O_CLOEXEC | FLAGS_permissions);
if (fd == -1 && errno == EEXIST) {
@@ -138,33 +162,51 @@
data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
PCHECK(data_ != MAP_FAILED);
+ const_data_ = mmap(NULL, size_, PROT_READ, MAP_SHARED, fd, 0);
+ PCHECK(const_data_ != MAP_FAILED);
PCHECK(close(fd) == 0);
- PageFaultData(static_cast<char *>(data_), size_);
+ PageFaultDataWrite(static_cast<char *>(data_), size_);
+ PageFaultDataRead(static_cast<const char *>(const_data_), size_);
ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
}
- ~MMapedQueue() { PCHECK(munmap(data_, size_) == 0); }
+ ~MMapedQueue() {
+ PCHECK(munmap(data_, size_) == 0);
+ PCHECK(munmap(const_cast<void *>(const_data_), size_) == 0);
+ }
ipc_lib::LocklessQueueMemory *memory() const {
return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
}
+ const ipc_lib::LocklessQueueMemory *const_memory() const {
+ return reinterpret_cast<const ipc_lib::LocklessQueueMemory *>(const_data_);
+ }
+
const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
- absl::Span<char> GetSharedMemory() const {
+ ipc_lib::LocklessQueue queue() const {
+ return ipc_lib::LocklessQueue(const_memory(), memory(), config());
+ }
+
+ absl::Span<char> GetMutableSharedMemory() const {
return absl::Span<char>(static_cast<char *>(data_), size_);
}
+ absl::Span<const char> GetConstSharedMemory() const {
+ return absl::Span<const char>(static_cast<const char *>(const_data_),
+ size_);
+ }
+
private:
- ipc_lib::LocklessQueueConfiguration config_;
+ const ipc_lib::LocklessQueueConfiguration config_;
size_t size_;
void *data_;
+ const void *const_data_;
};
-namespace {
-
const Node *MaybeMyNode(const Configuration *configuration) {
if (!configuration->has_nodes()) {
return nullptr;
@@ -197,8 +239,7 @@
channel,
chrono::ceil<chrono::seconds>(chrono::nanoseconds(
event_loop->configuration()->channel_storage_duration()))),
- lockless_queue_(lockless_queue_memory_.memory(),
- lockless_queue_memory_.config()) {
+ reader_(lockless_queue_memory_.queue()) {
context_.data = nullptr;
// Point the queue index at the next index to read starting now. This
// makes it such that FetchNext will read the next message sent after
@@ -208,36 +249,59 @@
~SimpleShmFetcher() {}
+ // Sets this object to pin or copy data, as configured in the channel.
+ void RetrieveData() {
+ if (channel_->read_method() == ReadMethod::PIN) {
+ PinDataOnFetch();
+ } else {
+ CopyDataOnFetch();
+ }
+ }
+
// Sets this object to copy data out of the shared memory into a private
// buffer when fetching.
void CopyDataOnFetch() {
+ CHECK(!pin_data());
data_storage_.reset(static_cast<char *>(
malloc(channel_->max_size() + kChannelDataAlignment - 1)));
}
+ // Sets this object to pin data in shared memory when fetching.
+ void PinDataOnFetch() {
+ CHECK(!copy_data());
+ auto maybe_pinner =
+ ipc_lib::LocklessQueuePinner::Make(lockless_queue_memory_.queue());
+ if (!maybe_pinner) {
+ LOG(FATAL) << "Failed to create reader on "
+ << configuration::CleanedChannelToString(channel_)
+ << ", too many readers.";
+ }
+ pinner_ = std::move(maybe_pinner.value());
+ }
+
// Points the next message to fetch at the queue index which will be
// populated next.
void PointAtNextQueueIndex() {
- actual_queue_index_ = lockless_queue_.LatestQueueIndex();
+ actual_queue_index_ = reader_.LatestIndex();
if (!actual_queue_index_.valid()) {
// Nothing in the queue. The next element will show up at the 0th
// index in the queue.
- actual_queue_index_ =
- ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
+ actual_queue_index_ = ipc_lib::QueueIndex::Zero(
+ LocklessQueueSize(lockless_queue_memory_.memory()));
} else {
actual_queue_index_ = actual_queue_index_.Increment();
}
}
bool FetchNext() {
- const ipc_lib::LocklessQueue::ReadResult read_result =
+ const ipc_lib::LocklessQueueReader::Result read_result =
DoFetch(actual_queue_index_);
- return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+ return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
}
bool Fetch() {
- const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
+ const ipc_lib::QueueIndex queue_index = reader_.LatestIndex();
// actual_queue_index_ is only meaningful if it was set by Fetch or
// FetchNext. This happens when valid_data_ has been set. So, only
// skip checking if valid_data_ is true.
@@ -250,50 +314,74 @@
return false;
}
- const ipc_lib::LocklessQueue::ReadResult read_result = DoFetch(queue_index);
+ const ipc_lib::LocklessQueueReader::Result read_result =
+ DoFetch(queue_index);
- CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
+ CHECK(read_result != ipc_lib::LocklessQueueReader::Result::NOTHING_NEW)
<< ": Queue index went backwards. This should never happen. "
<< configuration::CleanedChannelToString(channel_);
- return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+ return read_result == ipc_lib::LocklessQueueReader::Result::GOOD;
}
Context context() const { return context_; }
bool RegisterWakeup(int priority) {
- return lockless_queue_.RegisterWakeup(priority);
+ CHECK(!watcher_);
+ watcher_ = ipc_lib::LocklessQueueWatcher::Make(
+ lockless_queue_memory_.queue(), priority);
+ return static_cast<bool>(watcher_);
}
- void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
-
- absl::Span<char> GetSharedMemory() const {
- return lockless_queue_memory_.GetSharedMemory();
+ void UnregisterWakeup() {
+ CHECK(watcher_);
+ watcher_ = std::nullopt;
}
- absl::Span<char> GetPrivateMemory() const {
- // Can't usefully expose this for pinning, because the buffer changes
- // address for each message. Callers who want to work with that should just
- // grab the whole shared memory buffer instead.
+ absl::Span<char> GetMutableSharedMemory() {
+ return lockless_queue_memory_.GetMutableSharedMemory();
+ }
+
+ absl::Span<const char> GetConstSharedMemory() const {
+ return lockless_queue_memory_.GetConstSharedMemory();
+ }
+
+ absl::Span<const char> GetPrivateMemory() const {
+ if (pin_data()) {
+ return lockless_queue_memory_.GetConstSharedMemory();
+ }
return absl::Span<char>(
const_cast<SimpleShmFetcher *>(this)->data_storage_start(),
- lockless_queue_.message_data_size());
+ LocklessQueueMessageDataSize(lockless_queue_memory_.memory()));
}
private:
- ipc_lib::LocklessQueue::ReadResult DoFetch(ipc_lib::QueueIndex queue_index) {
+ ipc_lib::LocklessQueueReader::Result DoFetch(
+ ipc_lib::QueueIndex queue_index) {
// TODO(austin): Get behind and make sure it dies.
char *copy_buffer = nullptr;
if (copy_data()) {
copy_buffer = data_storage_start();
}
- ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
+ ipc_lib::LocklessQueueReader::Result read_result = reader_.Read(
queue_index.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
&context_.size, copy_buffer);
- if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+ if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
+ if (pin_data()) {
+ const int pin_result = pinner_->PinIndex(queue_index.index());
+ CHECK(pin_result >= 0)
+ << ": Got behind while reading and the last message was modified "
+ "out from under us while we tried to pin it. Don't get so far "
+ "behind on: "
+ << configuration::CleanedChannelToString(channel_);
+ context_.buffer_index = pin_result;
+ } else {
+ context_.buffer_index = -1;
+ }
+
context_.queue_index = queue_index.index();
if (context_.remote_queue_index == 0xffffffffu) {
context_.remote_queue_index = context_.queue_index;
@@ -307,7 +395,9 @@
const char *const data = DataBuffer();
if (data) {
context_.data =
- data + lockless_queue_.message_data_size() - context_.size;
+ data +
+ LocklessQueueMessageDataSize(lockless_queue_memory_.memory()) -
+ context_.size;
} else {
context_.data = nullptr;
}
@@ -317,7 +407,7 @@
// Make sure the data wasn't modified while we were reading it. This
// can only happen if you are reading the last message *while* it is
// being written to, which means you are pretty far behind.
- CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+ CHECK(read_result != ipc_lib::LocklessQueueReader::Result::OVERWROTE)
<< ": Got behind while reading and the last message was modified "
"out from under us while we were reading it. Don't get so far "
"behind on: "
@@ -326,7 +416,7 @@
// We fell behind between when we read the index and read the value.
// This isn't worth recovering from since this means we went to sleep
// for a long time in the middle of this function.
- if (read_result == ipc_lib::LocklessQueue::ReadResult::TOO_OLD) {
+ if (read_result == ipc_lib::LocklessQueueReader::Result::TOO_OLD) {
event_loop_->SendTimingReport();
LOG(FATAL) << "The next message is no longer available. "
<< configuration::CleanedChannelToString(channel_);
@@ -346,22 +436,30 @@
if (copy_data()) {
return data_storage_start();
}
+ if (pin_data()) {
+ return static_cast<const char *>(pinner_->Data());
+ }
return nullptr;
}
bool copy_data() const { return static_cast<bool>(data_storage_); }
+ bool pin_data() const { return static_cast<bool>(pinner_); }
aos::ShmEventLoop *event_loop_;
const Channel *const channel_;
MMapedQueue lockless_queue_memory_;
- ipc_lib::LocklessQueue lockless_queue_;
+ ipc_lib::LocklessQueueReader reader_;
+ // This being nullopt indicates we're not looking for wakeups right now.
+ std::optional<ipc_lib::LocklessQueueWatcher> watcher_;
- ipc_lib::QueueIndex actual_queue_index_ =
- ipc_lib::LocklessQueue::empty_queue_index();
+ ipc_lib::QueueIndex actual_queue_index_ = ipc_lib::QueueIndex::Invalid();
// This being empty indicates we're not going to copy data.
std::unique_ptr<char, decltype(&free)> data_storage_{nullptr, &free};
+ // This being nullopt indicates we're not going to pin messages.
+ std::optional<ipc_lib::LocklessQueuePinner> pinner_;
+
Context context_;
};
@@ -370,7 +468,7 @@
explicit ShmFetcher(ShmEventLoop *event_loop, const Channel *channel)
: RawFetcher(event_loop, channel),
simple_shm_fetcher_(event_loop, channel) {
- simple_shm_fetcher_.CopyDataOnFetch();
+ simple_shm_fetcher_.RetrieveData();
}
~ShmFetcher() { context_.data = nullptr; }
@@ -391,7 +489,7 @@
return std::make_pair(false, monotonic_clock::min_time);
}
- absl::Span<char> GetPrivateMemory() const {
+ absl::Span<const char> GetPrivateMemory() const {
return simple_shm_fetcher_.GetPrivateMemory();
}
@@ -407,15 +505,15 @@
channel,
chrono::ceil<chrono::seconds>(chrono::nanoseconds(
event_loop->configuration()->channel_storage_duration()))),
- lockless_queue_(lockless_queue_memory_.memory(),
- lockless_queue_memory_.config()),
- lockless_queue_sender_(
- VerifySender(lockless_queue_.MakeSender(), channel)) {}
+ lockless_queue_sender_(VerifySender(
+ ipc_lib::LocklessQueueSender::Make(lockless_queue_memory_.queue()),
+ channel)),
+ wake_upper_(lockless_queue_memory_.queue()) {}
~ShmSender() override {}
- static ipc_lib::LocklessQueue::Sender VerifySender(
- std::optional<ipc_lib::LocklessQueue::Sender> &&sender,
+ static ipc_lib::LocklessQueueSender VerifySender(
+ std::optional<ipc_lib::LocklessQueueSender> sender,
const Channel *channel) {
if (sender) {
return std::move(sender.value());
@@ -437,7 +535,7 @@
lockless_queue_sender_.Send(
length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
&monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
- lockless_queue_.Wakeup(event_loop()->priority());
+ wake_upper_.Wakeup(event_loop()->priority());
return true;
}
@@ -452,19 +550,21 @@
monotonic_remote_time, realtime_remote_time,
remote_queue_index, &monotonic_sent_time_,
&realtime_sent_time_, &sent_queue_index_);
- lockless_queue_.Wakeup(event_loop()->priority());
+ wake_upper_.Wakeup(event_loop()->priority());
// TODO(austin): Return an error if we send too fast.
return true;
}
absl::Span<char> GetSharedMemory() const {
- return lockless_queue_memory_.GetSharedMemory();
+ return lockless_queue_memory_.GetMutableSharedMemory();
}
+ int buffer_index() override { return lockless_queue_sender_.buffer_index(); }
+
private:
MMapedQueue lockless_queue_memory_;
- ipc_lib::LocklessQueue lockless_queue_;
- ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
+ ipc_lib::LocklessQueueSender lockless_queue_sender_;
+ ipc_lib::LocklessQueueWakeUpper wake_upper_;
};
// Class to manage the state for a Watcher.
@@ -479,7 +579,7 @@
event_(this),
simple_shm_fetcher_(event_loop, channel) {
if (copy_data) {
- simple_shm_fetcher_.CopyDataOnFetch();
+ simple_shm_fetcher_.RetrieveData();
}
}
@@ -520,8 +620,8 @@
void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
- absl::Span<char> GetSharedMemory() const {
- return simple_shm_fetcher_.GetSharedMemory();
+ absl::Span<const char> GetSharedMemory() const {
+ return simple_shm_fetcher_.GetConstSharedMemory();
}
private:
@@ -948,18 +1048,26 @@
UpdateTimingReport();
}
-absl::Span<char> ShmEventLoop::GetWatcherSharedMemory(const Channel *channel) {
+absl::Span<const char> ShmEventLoop::GetWatcherSharedMemory(
+ const Channel *channel) {
ShmWatcherState *const watcher_state =
static_cast<ShmWatcherState *>(GetWatcherState(channel));
return watcher_state->GetSharedMemory();
}
+int ShmEventLoop::NumberBuffers(const Channel *channel) {
+ return MakeQueueConfiguration(
+ channel, chrono::ceil<chrono::seconds>(chrono::nanoseconds(
+ configuration()->channel_storage_duration())))
+ .num_messages();
+}
+
absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
const aos::RawSender *sender) const {
return static_cast<const ShmSender *>(sender)->GetSharedMemory();
}
-absl::Span<char> ShmEventLoop::GetShmFetcherPrivateMemory(
+absl::Span<const char> ShmEventLoop::GetShmFetcherPrivateMemory(
const aos::RawFetcher *fetcher) const {
return static_cast<const ShmFetcher *>(fetcher)->GetPrivateMemory();
}
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index 57d3b98..8dabcb5 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -24,8 +24,8 @@
} // namespace shm_event_loop_internal
-// Specialization of EventLoop that is built from queues running out of shared
-// memory.
+// Concrete implementation of EventLoop that is built from queues running out of
+// shared memory.
//
// TODO(austin): Timing reports break multiple threads. Need to add back in a
// mutex.
@@ -83,7 +83,7 @@
// Returns the local mapping of the shared memory used by the watcher on the
// specified channel. A watcher must be created on this channel before calling
// this.
- absl::Span<char> GetWatcherSharedMemory(const Channel *channel);
+ absl::Span<const char> GetWatcherSharedMemory(const Channel *channel);
// Returns the local mapping of the shared memory used by the provided Sender.
template <typename T>
@@ -93,11 +93,17 @@
// Returns the local mapping of the private memory used by the provided
// Fetcher to hold messages.
+ //
+ // Note that this may be the entire shared memory region held by this fetcher,
+ // depending on its channel's read_method.
template <typename T>
- absl::Span<char> GetFetcherPrivateMemory(aos::Fetcher<T> *fetcher) const {
+ absl::Span<const char> GetFetcherPrivateMemory(
+ aos::Fetcher<T> *fetcher) const {
return GetShmFetcherPrivateMemory(GetRawFetcher(fetcher));
}
+ int NumberBuffers(const Channel *channel) override;
+
private:
friend class shm_event_loop_internal::ShmWatcherState;
friend class shm_event_loop_internal::ShmTimerHandler;
@@ -125,7 +131,7 @@
absl::Span<char> GetShmSenderSharedMemory(const aos::RawSender *sender) const;
// Private method to access the private memory mapping of a ShmFetcher.
- absl::Span<char> GetShmFetcherPrivateMemory(
+ absl::Span<const char> GetShmFetcherPrivateMemory(
const aos::RawFetcher *fetcher) const;
std::vector<std::function<void()>> on_run_;
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index d25e2f8..d9a8872 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -25,12 +25,12 @@
}
// Clean up anything left there before.
- unlink((FLAGS_shm_base + "/test/aos.TestMessage.v2").c_str());
- unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v2").c_str());
- unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v2").c_str());
- unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v2").c_str());
- unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v2").c_str());
- unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v2").c_str());
+ unlink((FLAGS_shm_base + "/test/aos.TestMessage.v3").c_str());
+ unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v3").c_str());
+ unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v3").c_str());
+ unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v3").c_str());
+ unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v3").c_str());
+ unlink((FLAGS_shm_base + "/aos/aos.logging.LogMessageFbs.v3").c_str());
}
~ShmEventLoopTestFactory() { FLAGS_override_hostname = ""; }
@@ -69,15 +69,25 @@
::aos::ShmEventLoop *primary_event_loop_;
};
-INSTANTIATE_TEST_CASE_P(ShmEventLoopTest, AbstractEventLoopTest,
- ::testing::Values([]() {
- return new ShmEventLoopTestFactory();
- }));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopCopyTest, AbstractEventLoopTest,
+ ::testing::Values(std::make_pair(
+ []() { return new ShmEventLoopTestFactory(); },
+ ReadMethod::COPY)));
-INSTANTIATE_TEST_CASE_P(ShmEventLoopDeathTest, AbstractEventLoopDeathTest,
- ::testing::Values([]() {
- return new ShmEventLoopTestFactory();
- }));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopCopyDeathTest, AbstractEventLoopDeathTest,
+ ::testing::Values(std::make_pair(
+ []() { return new ShmEventLoopTestFactory(); },
+ ReadMethod::COPY)));
+
+INSTANTIATE_TEST_CASE_P(ShmEventLoopPinTest, AbstractEventLoopTest,
+ ::testing::Values(std::make_pair(
+ []() { return new ShmEventLoopTestFactory(); },
+ ReadMethod::PIN)));
+
+INSTANTIATE_TEST_CASE_P(ShmEventLoopPinDeathTest, AbstractEventLoopDeathTest,
+ ::testing::Values(std::make_pair(
+ []() { return new ShmEventLoopTestFactory(); },
+ ReadMethod::PIN)));
} // namespace
@@ -89,12 +99,27 @@
return scheduler == SCHED_FIFO || scheduler == SCHED_RR;
}
+class ShmEventLoopTest : public ::testing::TestWithParam<ReadMethod> {
+ public:
+ ShmEventLoopTest() {
+ if (GetParam() == ReadMethod::PIN) {
+ factory_.PinReads();
+ }
+ }
+
+ ShmEventLoopTestFactory *factory() { return &factory_; }
+
+ private:
+ ShmEventLoopTestFactory factory_;
+};
+
+using ShmEventLoopDeathTest = ShmEventLoopTest;
+
// Tests that every handler type is realtime and runs. There are threads
// involved and it's easy to miss one.
-TEST(ShmEventLoopTest, AllHandlersAreRealtime) {
- ShmEventLoopTestFactory factory;
- auto loop = factory.MakePrimary("primary");
- auto loop2 = factory.Make("loop2");
+TEST_P(ShmEventLoopTest, AllHandlersAreRealtime) {
+ auto loop = factory()->MakePrimary("primary");
+ auto loop2 = factory()->Make("loop2");
loop->SetRuntimeRealtimePriority(1);
@@ -104,10 +129,10 @@
bool did_timer = false;
bool did_watcher = false;
- auto timer = loop->AddTimer([&did_timer, &factory]() {
+ auto timer = loop->AddTimer([this, &did_timer]() {
EXPECT_TRUE(IsRealtime());
did_timer = true;
- factory.Exit();
+ factory()->Exit();
});
loop->MakeWatcher("/test", [&did_watcher](const TestMessage &) {
@@ -126,7 +151,7 @@
msg.Send(builder.Finish());
});
- factory.Run();
+ factory()->Run();
EXPECT_TRUE(did_onrun);
EXPECT_TRUE(did_timer);
@@ -135,16 +160,15 @@
// Tests that missing a deadline inside the function still results in PhasedLoop
// running at the right offset.
-TEST(ShmEventLoopTest, DelayedPhasedLoop) {
- ShmEventLoopTestFactory factory;
- auto loop1 = factory.MakePrimary("primary");
+TEST_P(ShmEventLoopTest, DelayedPhasedLoop) {
+ auto loop1 = factory()->MakePrimary("primary");
::std::vector<::aos::monotonic_clock::time_point> times;
constexpr chrono::milliseconds kOffset = chrono::milliseconds(400);
loop1->AddPhasedLoop(
- [×, &loop1, &kOffset, &factory](int count) {
+ [this, ×, &loop1, &kOffset](int count) {
const ::aos::monotonic_clock::time_point monotonic_now =
loop1->monotonic_now();
@@ -169,7 +193,7 @@
times.push_back(loop1->monotonic_now());
if (times.size() == 2) {
- factory.Exit();
+ factory()->Exit();
}
// Now, add a large delay. This should push us up to 3 cycles.
@@ -177,15 +201,14 @@
},
chrono::seconds(1), kOffset);
- factory.Run();
+ factory()->Run();
EXPECT_EQ(times.size(), 2u);
}
// Test GetWatcherSharedMemory in a few basic scenarios.
-TEST(ShmEventLoopDeathTest, GetWatcherSharedMemory) {
- ShmEventLoopTestFactory factory;
- auto generic_loop1 = factory.MakePrimary("primary");
+TEST_P(ShmEventLoopDeathTest, GetWatcherSharedMemory) {
+ auto generic_loop1 = factory()->MakePrimary("primary");
ShmEventLoop *const loop1 = static_cast<ShmEventLoop *>(generic_loop1.get());
const auto channel = configuration::GetChannel(
loop1->configuration(), "/test", TestMessage::GetFullyQualifiedName(),
@@ -196,31 +219,85 @@
"No watcher found for channel");
// Then, actually create a watcher, and verify it returns something sane.
- loop1->MakeWatcher("/test", [](const TestMessage &) {});
- EXPECT_FALSE(loop1->GetWatcherSharedMemory(channel).empty());
+ absl::Span<const char> shared_memory;
+ bool ran = false;
+ loop1->MakeWatcher("/test", [this, &shared_memory,
+ &ran](const TestMessage &message) {
+ EXPECT_FALSE(ran);
+ ran = true;
+ // If we're using pinning, then we can verify that the message is actually
+ // in the specified region.
+ if (GetParam() == ReadMethod::PIN) {
+ EXPECT_GE(reinterpret_cast<const char *>(&message),
+ shared_memory.begin());
+ EXPECT_LT(reinterpret_cast<const char *>(&message), shared_memory.end());
+ }
+ factory()->Exit();
+ });
+ shared_memory = loop1->GetWatcherSharedMemory(channel);
+ EXPECT_FALSE(shared_memory.empty());
+
+ auto loop2 = factory()->Make("sender");
+ auto sender = loop2->MakeSender<TestMessage>("/test");
+ generic_loop1->OnRun([&sender]() {
+ auto builder = sender.MakeBuilder();
+ TestMessage::Builder test_builder(*builder.fbb());
+ test_builder.add_value(1);
+ CHECK(builder.Send(test_builder.Finish()));
+ });
+ factory()->Run();
+ EXPECT_TRUE(ran);
}
-TEST(ShmEventLoopTest, GetSenderSharedMemory) {
- ShmEventLoopTestFactory factory;
- auto generic_loop1 = factory.MakePrimary("primary");
+TEST_P(ShmEventLoopTest, GetSenderSharedMemory) {
+ auto generic_loop1 = factory()->MakePrimary("primary");
ShmEventLoop *const loop1 = static_cast<ShmEventLoop *>(generic_loop1.get());
- // check that GetSenderSharedMemory returns non-null/non-empty memory span.
+ // Check that GetSenderSharedMemory returns non-null/non-empty memory span.
auto sender = loop1->MakeSender<TestMessage>("/test");
- EXPECT_FALSE(loop1->GetSenderSharedMemory(&sender).empty());
+ const absl::Span<char> shared_memory = loop1->GetSenderSharedMemory(&sender);
+ EXPECT_FALSE(shared_memory.empty());
+
+ auto builder = sender.MakeBuilder();
+ uint8_t *buffer;
+ builder.fbb()->CreateUninitializedVector(5, 1, &buffer);
+ EXPECT_GE(reinterpret_cast<char *>(buffer), shared_memory.begin());
+ EXPECT_LT(reinterpret_cast<char *>(buffer), shared_memory.end());
}
-TEST(ShmEventLoopTest, GetFetcherPrivateMemory) {
- ShmEventLoopTestFactory factory;
- auto generic_loop1 = factory.MakePrimary("primary");
+TEST_P(ShmEventLoopTest, GetFetcherPrivateMemory) {
+ auto generic_loop1 = factory()->MakePrimary("primary");
ShmEventLoop *const loop1 = static_cast<ShmEventLoop *>(generic_loop1.get());
- // check that GetFetcherPrivateMemory returns non-null/non-empty memory span.
+ // Check that GetFetcherPrivateMemory returns non-null/non-empty memory span.
auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
- EXPECT_FALSE(loop1->GetFetcherPrivateMemory(&fetcher).empty());
+ const auto private_memory = loop1->GetFetcherPrivateMemory(&fetcher);
+ EXPECT_FALSE(private_memory.empty());
+
+ auto loop2 = factory()->Make("sender");
+ auto sender = loop2->MakeSender<TestMessage>("/test");
+ {
+ auto builder = sender.MakeBuilder();
+ TestMessage::Builder test_builder(*builder.fbb());
+ test_builder.add_value(1);
+ CHECK(builder.Send(test_builder.Finish()));
+ }
+
+ ASSERT_TRUE(fetcher.Fetch());
+ EXPECT_GE(fetcher.context().data, private_memory.begin());
+ EXPECT_LT(fetcher.context().data, private_memory.end());
}
// TODO(austin): Test that missing a deadline with a timer recovers as expected.
+INSTANTIATE_TEST_CASE_P(ShmEventLoopCopyTest, ShmEventLoopTest,
+ ::testing::Values(ReadMethod::COPY));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopPinTest, ShmEventLoopTest,
+ ::testing::Values(ReadMethod::PIN));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopCopyDeathTest, ShmEventLoopDeathTest,
+ ::testing::Values(ReadMethod::COPY));
+INSTANTIATE_TEST_CASE_P(ShmEventLoopPinDeathTest, ShmEventLoopDeathTest,
+ ::testing::Values(ReadMethod::PIN));
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index bff04b9..c339ce0 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -33,7 +33,6 @@
Context context;
SimulatedChannel *const channel = nullptr;
- int buffer_index;
// The data.
char *data(size_t buffer_size) {
@@ -82,9 +81,10 @@
::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
- SimulatedEventLoop *simulated_event_loop_;
+ SimulatedEventLoop *const simulated_event_loop_;
+ const Channel *const channel_;
+ EventScheduler *const scheduler_;
EventHandler<SimulatedWatcher> event_;
- EventScheduler *scheduler_;
EventScheduler::Token token_;
SimulatedChannel *simulated_channel_ = nullptr;
};
@@ -184,13 +184,33 @@
}
++sender_count_;
}
+
void CountSenderDestroyed() {
--sender_count_;
CHECK_GE(sender_count_, 0);
}
private:
- void CheckBufferCount() { CHECK_LT(sender_count_, number_scratch_buffers()); }
+ void CheckBufferCount() {
+ int reader_count = 0;
+ if (channel()->read_method() == ReadMethod::PIN) {
+ reader_count = watchers_.size() + fetchers_.size();
+ }
+ CHECK_LT(reader_count + sender_count_, number_scratch_buffers());
+ }
+
+ void CheckReaderCount() {
+ if (channel()->read_method() != ReadMethod::PIN) {
+ return;
+ }
+ CheckBufferCount();
+ const int reader_count = watchers_.size() + fetchers_.size();
+ if (reader_count >= channel()->num_readers()) {
+ LOG(FATAL) << "Failed to create reader on "
+ << configuration::CleanedChannelToString(channel())
+ << ", too many readers.";
+ }
+ }
const Channel *const channel_;
EventScheduler *const scheduler_;
@@ -227,11 +247,11 @@
SimulatedMessage::SimulatedMessage(SimulatedChannel *channel_in)
: channel(channel_in) {
- buffer_index = channel->GetBufferIndex();
+ context.buffer_index = channel->GetBufferIndex();
}
SimulatedMessage::~SimulatedMessage() {
- channel->FreeBufferIndex(buffer_index);
+ channel->FreeBufferIndex(context.buffer_index);
}
class SimulatedSender : public RawSender {
@@ -299,6 +319,12 @@
remote_queue_index);
}
+ int buffer_index() override {
+ // First, ensure message_ is allocated.
+ data();
+ return message_->context.buffer_index;
+ }
+
private:
SimulatedChannel *simulated_channel_;
EventLoop *event_loop_;
@@ -354,6 +380,9 @@
void SetMsg(std::shared_ptr<SimulatedMessage> msg) {
msg_ = msg;
context_ = msg_->context;
+ if (channel()->read_method() != ReadMethod::PIN) {
+ context_.buffer_index = -1;
+ }
if (context_.remote_queue_index == 0xffffffffu) {
context_.remote_queue_index = context_.queue_index;
}
@@ -547,6 +576,8 @@
}
}
+ int NumberBuffers(const Channel *channel) override;
+
private:
friend class SimulatedTimerHandler;
friend class SimulatedPhasedLoopHandler;
@@ -644,14 +675,19 @@
return it->second.get();
}
+int SimulatedEventLoop::NumberBuffers(const Channel *channel) {
+ return GetSimulatedChannel(channel)->number_buffers();
+}
+
SimulatedWatcher::SimulatedWatcher(
SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
const Channel *channel,
std::function<void(const Context &context, const void *message)> fn)
: WatcherState(simulated_event_loop, channel, std::move(fn)),
simulated_event_loop_(simulated_event_loop),
- event_(this),
+ channel_(channel),
scheduler_(scheduler),
+ event_(this),
token_(scheduler_->InvalidToken()) {}
SimulatedWatcher::~SimulatedWatcher() {
@@ -659,7 +695,7 @@
if (token_ != scheduler_->InvalidToken()) {
scheduler_->Deschedule(token_);
}
- simulated_channel_->RemoveWatcher(this);
+ CHECK_NOTNULL(simulated_channel_)->RemoveWatcher(this);
}
void SimulatedWatcher::Schedule(std::shared_ptr<SimulatedMessage> message) {
@@ -689,6 +725,9 @@
}
Context context = msgs_.front()->context;
+ if (channel_->read_method() != ReadMethod::PIN) {
+ context.buffer_index = -1;
+ }
if (context.remote_queue_index == 0xffffffffu) {
context.remote_queue_index = context.queue_index;
}
@@ -719,6 +758,7 @@
}
void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
+ CheckReaderCount();
watcher->SetSimulatedChannel(this);
watchers_.emplace_back(watcher);
}
@@ -730,6 +770,7 @@
::std::unique_ptr<RawFetcher> SimulatedChannel::MakeRawFetcher(
EventLoop *event_loop) {
+ CheckReaderCount();
::std::unique_ptr<SimulatedFetcher> fetcher(
new SimulatedFetcher(event_loop, this));
fetchers_.push_back(fetcher.get());
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 513dc1b..78c0d44 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -51,15 +51,31 @@
std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_;
};
-INSTANTIATE_TEST_CASE_P(SimulatedEventLoopDeathTest, AbstractEventLoopDeathTest,
- ::testing::Values([]() {
- return new SimulatedEventLoopTestFactory();
- }));
+INSTANTIATE_TEST_CASE_P(SimulatedEventLoopCopyTest, AbstractEventLoopTest,
+ ::testing::Values(std::make_tuple(
+ []() {
+ return new SimulatedEventLoopTestFactory();
+ },
+ ReadMethod::COPY)));
-INSTANTIATE_TEST_CASE_P(SimulatedEventLoopTest, AbstractEventLoopTest,
- ::testing::Values([]() {
- return new SimulatedEventLoopTestFactory();
- }));
+INSTANTIATE_TEST_CASE_P(
+ SimulatedEventLoopCopyDeathTest, AbstractEventLoopDeathTest,
+ ::testing::Values(
+ std::make_tuple([]() { return new SimulatedEventLoopTestFactory(); },
+ ReadMethod::COPY)));
+
+INSTANTIATE_TEST_CASE_P(SimulatedEventLoopPinTest, AbstractEventLoopTest,
+ ::testing::Values(std::make_tuple(
+ []() {
+ return new SimulatedEventLoopTestFactory();
+ },
+ ReadMethod::PIN)));
+
+INSTANTIATE_TEST_CASE_P(
+ SimulatedEventLoopPinDeathTest, AbstractEventLoopDeathTest,
+ ::testing::Values(
+ std::make_tuple([]() { return new SimulatedEventLoopTestFactory(); },
+ ReadMethod::PIN)));
// Test that creating an event and running the scheduler runs the event.
TEST(EventSchedulerTest, ScheduleEvent) {
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 5e14abc..6c14200 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -134,6 +134,9 @@
srcs = ["index.cc"],
hdrs = ["index.h"],
visibility = ["//visibility:public"],
+ deps = [
+ "@com_github_google_glog//:glog",
+ ],
)
cc_test(
diff --git a/aos/ipc_lib/index.h b/aos/ipc_lib/index.h
index 7d979ea..a47121e 100644
--- a/aos/ipc_lib/index.h
+++ b/aos/ipc_lib/index.h
@@ -5,6 +5,8 @@
#include <atomic>
#include <string>
+#include "glog/logging.h"
+
namespace aos {
namespace ipc_lib {
@@ -52,9 +54,7 @@
}
// Gets the next index.
- QueueIndex Increment() const {
- return IncrementBy(1u);
- }
+ QueueIndex Increment() const { return IncrementBy(1u); }
// Gets the nth next element.
QueueIndex IncrementBy(uint32_t amount) const {
@@ -133,12 +133,12 @@
struct AtomicQueueIndex {
public:
// Atomically reads the index without any ordering constraints.
- QueueIndex RelaxedLoad(uint32_t count) {
+ QueueIndex RelaxedLoad(uint32_t count) const {
return QueueIndex(index_.load(::std::memory_order_relaxed), count);
}
// Full bidirectional barriers here.
- QueueIndex Load(uint32_t count) {
+ QueueIndex Load(uint32_t count) const {
return QueueIndex(index_.load(::std::memory_order_acquire), count);
}
inline void Store(QueueIndex value) {
@@ -148,6 +148,10 @@
// Invalidates the element unconditionally.
inline void Invalidate() { Store(QueueIndex::Invalid()); }
+ inline void RelaxedInvalidate() {
+ index_.store(QueueIndex::Invalid().index_, ::std::memory_order_relaxed);
+ }
+
// Swaps expected for index atomically. Returns true on success, false
// otherwise.
inline bool CompareAndExchangeStrong(QueueIndex expected, QueueIndex index) {
@@ -168,7 +172,9 @@
: Index(queue_index.index_, message_index) {}
Index(uint32_t queue_index, uint16_t message_index)
: index_((queue_index & 0xffff) |
- (static_cast<uint32_t>(message_index) << 16)) {}
+ (static_cast<uint32_t>(message_index) << 16)) {
+ CHECK_LE(message_index, MaxMessages());
+ }
// Index of this message in the message array.
uint16_t message_index() const { return (index_ >> 16) & 0xffff; }
@@ -193,13 +199,13 @@
static constexpr uint16_t MaxMessages() { return 0xfffe; }
bool operator==(const Index other) const { return other.index_ == index_; }
+ bool operator!=(const Index other) const { return other.index_ != index_; }
// Returns a string representing the index.
::std::string DebugString() const;
private:
- Index(uint32_t index)
- : index_(index) {}
+ Index(uint32_t index) : index_(index) {}
friend class AtomicIndex;
@@ -216,7 +222,7 @@
class AtomicIndex {
public:
// Stores and loads atomically without ordering constraints.
- Index RelaxedLoad() {
+ Index RelaxedLoad() const {
return Index(index_.load(::std::memory_order_relaxed));
}
void RelaxedStore(Index index) {
@@ -231,15 +237,20 @@
void Store(Index index) {
index_.store(index.index_, ::std::memory_order_release);
}
- Index Load() { return Index(index_.load(::std::memory_order_acquire)); }
+ Index Load() const { return Index(index_.load(::std::memory_order_acquire)); }
// Swaps expected for index atomically. Returns true on success, false
// otherwise.
- inline bool CompareAndExchangeStrong(Index expected, Index index) {
+ bool CompareAndExchangeStrong(Index expected, Index index) {
return index_.compare_exchange_strong(expected.index_, index.index_,
::std::memory_order_acq_rel);
}
+ bool CompareAndExchangeWeak(Index *expected, Index index) {
+ return index_.compare_exchange_weak(expected->index_, index.index_,
+ ::std::memory_order_acq_rel);
+ }
+
private:
::std::atomic<uint32_t> index_;
};
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index d9a1a71..4115769 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -38,6 +38,97 @@
LocklessQueueMemory *const memory_;
};
+bool IsPinned(LocklessQueueMemory *memory, Index index) {
+ DCHECK(index.valid());
+ const size_t queue_size = memory->queue_size();
+ const QueueIndex message_index =
+ memory->GetMessage(index)->header.queue_index.Load(queue_size);
+ if (!message_index.valid()) {
+ return false;
+ }
+ DCHECK(memory->GetQueue(message_index.Wrapped())->Load() != index)
+ << ": Message is in the queue";
+ for (int pinner_index = 0;
+ pinner_index < static_cast<int>(memory->config.num_pinners);
+ ++pinner_index) {
+ ipc_lib::Pinner *const pinner = memory->GetPinner(pinner_index);
+
+ if (pinner->pinned.RelaxedLoad(queue_size) == message_index) {
+ return true;
+ }
+ }
+ return false;
+}
+
+// Ensures sender->scratch_index (which must contain to_replace) is not pinned.
+//
+// Returns the new scratch_index value.
+Index SwapPinnedSenderScratch(LocklessQueueMemory *const memory,
+ ipc_lib::Sender *const sender,
+ const Index to_replace) {
+ // If anybody's trying to pin this message, then grab a message from a pinner
+ // to write into instead, and leave the message we pulled out of the queue
+ // (currently in our scratch_index) with a pinner.
+ //
+ // This loop will terminate in at most one iteration through the pinners in
+ // any steady-state configuration of the memory. There are only as many
+ // Pinner::pinned values to worry about as there are Pinner::scratch_index
+ // values to check against, plus to_replace, which means there will always be
+ // a free one. We might have to make multiple passes if things are being
+ // changed concurrently though, but nobody dying can make this loop fail to
+ // terminate (because the number of processes that can die is bounded, because
+ // no new ones can start while we've got the lock).
+ for (int pinner_index = 0; true;
+ pinner_index = (pinner_index + 1) % memory->config.num_pinners) {
+ if (!IsPinned(memory, to_replace)) {
+ // No pinners on our current scratch_index, so we're fine now.
+ VLOG(3) << "No pinners: " << to_replace.DebugString();
+ return to_replace;
+ }
+
+ ipc_lib::Pinner *const pinner = memory->GetPinner(pinner_index);
+
+ const Index pinner_scratch = pinner->scratch_index.RelaxedLoad();
+ CHECK(pinner_scratch.valid())
+ << ": Pinner scratch_index should always be valid";
+ if (IsPinned(memory, pinner_scratch)) {
+ // Wouldn't do us any good to swap with this one, so don't bother, and
+ // move onto the next one.
+ VLOG(3) << "Also pinned: " << pinner_scratch.DebugString();
+ continue;
+ }
+
+ sender->to_replace.RelaxedStore(pinner_scratch);
+ aos_compiler_memory_barrier();
+ // Give the pinner the message (which is currently in
+ // sender->scratch_index).
+ if (!pinner->scratch_index.CompareAndExchangeStrong(pinner_scratch,
+ to_replace)) {
+ // Somebody swapped into this pinner before us. The new value is probably
+ // pinned, so we don't want to look at it again immediately.
+ VLOG(3) << "Pinner " << pinner_index
+ << " scratch_index changed: " << pinner_scratch.DebugString()
+ << ", " << to_replace.DebugString();
+ sender->to_replace.RelaxedInvalidate();
+ continue;
+ }
+ aos_compiler_memory_barrier();
+ // Now update the sender's scratch space and record that we succeeded.
+ sender->scratch_index.Store(pinner_scratch);
+ aos_compiler_memory_barrier();
+ // And then record that we succeeded, but definitely after the above
+ // store.
+ sender->to_replace.RelaxedInvalidate();
+ VLOG(3) << "Got new scratch message: " << pinner_scratch.DebugString();
+
+ // If it's in a pinner's scratch_index, it should not be in the queue, which
+ // means nobody new can pin it for real. However, they can still attempt to
+ // pin it, which means we can't verify !IsPinned down here.
+
+ return pinner_scratch;
+ }
+}
+
// Returns true if it succeeded. Returns false if another sender died in the
// middle.
bool DoCleanup(LocklessQueueMemory *memory, const GrabQueueSetupLockOrDie &) {
@@ -48,6 +139,7 @@
aos_compiler_memory_barrier();
const size_t num_senders = memory->num_senders();
+ const size_t num_pinners = memory->num_pinners();
const size_t queue_size = memory->queue_size();
const size_t num_messages = memory->num_messages();
@@ -105,11 +197,17 @@
// to_replace = yyy
// We are in the act of moving to_replace to scratch_index, but didn't
// finish. Easy.
+ //
+ // If doing a pinner swap, we've definitely done it.
// 4) scratch_index = yyy
// to_replace = invalid
// Finished, but died. Looks like 1)
+ // Swapping with a pinner's scratch_index passes through the same states.
+ // We just need to ensure the message that ends up in the senders's
+ // scratch_index isn't pinned, using the same code as sending does.
+
// Any cleanup code needs to follow the same set of states to be robust to
// death, so death can be restarted.
@@ -117,6 +215,14 @@
// 1) or 4). Make sure we aren't corrupted and declare victory.
CHECK(scratch_index.valid());
+ // If it's in 1) with a pinner, the sender might have a pinned message,
+ // so fix that.
+ SwapPinnedSenderScratch(memory, sender, scratch_index);
+
+ // If it's in 4), it may not have completed this step yet. This will
+ // always be a NOP if it's in 1), verified by a DCHECK.
+ memory->GetMessage(scratch_index)->header.queue_index.RelaxedInvalidate();
+
__atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
++valid_senders;
continue;
@@ -129,6 +235,11 @@
// Just need to invalidate to_replace to finish.
sender->to_replace.Invalidate();
+ // Make sure to indicate it's an unused message before a sender gets its
+ // hands on it.
+ memory->GetMessage(scratch_index)->header.queue_index.RelaxedInvalidate();
+ aos_compiler_memory_barrier();
+
// And mark that we succeeded.
__atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
++valid_senders;
@@ -139,6 +250,20 @@
need_recovery[i] = true;
}
+ // Cleaning up pinners is easy. We don't actually have to do anything, but
+ // invalidating its pinned field might help catch bugs elsewhere trying to
+ // read it before it's set.
+ for (size_t i = 0; i < num_pinners; ++i) {
+ Pinner *const pinner = memory->GetPinner(i);
+ const uint32_t tid =
+ __atomic_load_n(&(pinner->tid.futex), __ATOMIC_ACQUIRE);
+ if (!(tid & FUTEX_OWNER_DIED)) {
+ continue;
+ }
+ pinner->pinned.Invalidate();
+ __atomic_store_n(&(pinner->tid.futex), 0, __ATOMIC_RELEASE);
+ }
+
// If all the senders are (or were made) good, there is no need to do the hard
// case.
if (valid_senders == num_senders) {
@@ -162,18 +287,18 @@
return false;
}
++num_missing;
- } else {
- CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
- // We can do a relaxed load here because we're the only person touching
- // this sender at this point, if it matters. If it's not a dead sender,
- // then any message it every has will already be accounted for, so this
- // will always be a NOP.
- const Index scratch_index = sender->scratch_index.RelaxedLoad();
- if (!accounted_for[scratch_index.message_index()]) {
- ++num_accounted_for;
- }
- accounted_for[scratch_index.message_index()] = true;
+ continue;
}
+ CHECK(!need_recovery[i]) << ": Somebody else recovered a sender: " << i;
+ // We can do a relaxed load here because we're the only person touching
+ // this sender at this point, if it matters. If it's not a dead sender,
+ // then any message it ever has will eventually be accounted for if we
+ // make enough tries through the outer loop.
+ const Index scratch_index = sender->scratch_index.RelaxedLoad();
+ if (!accounted_for[scratch_index.message_index()]) {
+ ++num_accounted_for;
+ }
+ accounted_for[scratch_index.message_index()] = true;
}
for (size_t i = 0; i < queue_size; ++i) {
@@ -185,6 +310,16 @@
accounted_for[index.message_index()] = true;
}
+ for (size_t pinner_index = 0; pinner_index < num_pinners; ++pinner_index) {
+ // Same logic as above for scratch_index applies here too.
+ const Index index =
+ memory->GetPinner(pinner_index)->scratch_index.RelaxedLoad();
+ if (!accounted_for[index.message_index()]) {
+ ++num_accounted_for;
+ }
+ accounted_for[index.message_index()] = true;
+ }
+
CHECK_LE(num_accounted_for + num_missing, num_messages);
}
@@ -224,6 +359,9 @@
// atomically insert scratch_index into the queue yet. So
// invalidate to_replace.
sender->to_replace.Invalidate();
+ // Sender definitely will not have gotten here, so finish for it.
+ memory->GetMessage(scratch_index)
+ ->header.queue_index.RelaxedInvalidate();
// And then mark this sender clean.
__atomic_store_n(&(sender->tid.futex), 0, __ATOMIC_RELEASE);
@@ -240,6 +378,12 @@
// scratch_index is accounted for. That means we did the insert,
// but didn't record it.
CHECK(to_replace.valid());
+
+ // Make sure to indicate it's an unused message before a sender gets its
+ // hands on it.
+ memory->GetMessage(to_replace)->header.queue_index.RelaxedInvalidate();
+ aos_compiler_memory_barrier();
+
// Finish the transaction. Copy to_replace, then clear it.
sender->scratch_index.Store(to_replace);
@@ -280,6 +424,13 @@
return syscall(SYS_rt_tgsigqueueinfo, tgid, tid, sig, si);
}
+QueueIndex ZeroOrValid(QueueIndex index) {
+ if (!index.valid()) {
+ return index.Clear();
+ }
+ return index;
+}
+
} // namespace
size_t LocklessQueueConfiguration::message_size() const {
@@ -311,6 +462,9 @@
CHECK_EQ(size % alignof(Sender), 0u);
size += LocklessQueueMemory::SizeOfSenders(config);
+ CHECK_EQ(size % alignof(Pinner), 0u);
+ size += LocklessQueueMemory::SizeOfPinners(config);
+
return size;
}
@@ -371,6 +525,7 @@
// TODO(austin): Check these for out of bounds.
memory->config.num_watchers = config.num_watchers;
memory->config.num_senders = config.num_senders;
+ memory->config.num_pinners = config.num_pinners;
memory->config.queue_size = config.queue_size;
memory->config.message_data_size = config.message_data_size;
@@ -403,6 +558,15 @@
s->to_replace.RelaxedInvalidate();
}
+ for (size_t i = 0; i < memory->num_pinners(); ++i) {
+ ::aos::ipc_lib::Pinner *pinner = memory->GetPinner(i);
+ // Nobody else can possibly be touching these because we haven't set
+ // initialized to true yet.
+ pinner->scratch_index.RelaxedStore(
+ Index(0xffff, i + memory->num_senders() + memory->queue_size()));
+ pinner->pinned.Invalidate();
+ }
+
aos_compiler_memory_barrier();
// Signal everything is done. This needs to be done last, so if we die, we
// redo initialization.
@@ -414,30 +578,57 @@
return memory;
}
-LocklessQueue::LocklessQueue(LocklessQueueMemory *memory,
- LocklessQueueConfiguration config)
- : memory_(InitializeLocklessQueueMemory(memory, config)),
- watcher_copy_(memory_->num_watchers()),
- pid_(getpid()),
- uid_(getuid()) {}
+void LocklessQueue::Initialize() {
+ InitializeLocklessQueueMemory(memory_, config_);
+}
-LocklessQueue::~LocklessQueue() {
- CHECK_EQ(watcher_index_, -1);
+LocklessQueueWatcher::~LocklessQueueWatcher() {
+ if (watcher_index_ == -1) {
+ return;
+ }
+ // Since everything is self consistent, all we need to do is make sure nobody
+ // else is running. Someone dying will get caught in the generic consistency
+ // check.
GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
- const int num_watchers = memory_->num_watchers();
+
+ // Make sure we are registered.
+ CHECK_NE(watcher_index_, -1);
+
+ // Make sure we still own the slot we are supposed to.
+ CHECK(
+ death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
+
+ // The act of unlocking invalidates the entry. Invalidate it.
+ death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
+ // And internally forget the slot.
+ watcher_index_ = -1;
+
// Cleanup is cheap. The next user will do it anyways, so no need for us to do
// anything right now.
// And confirm that nothing is owned by us.
+ const int num_watchers = memory_->num_watchers();
for (int i = 0; i < num_watchers; ++i) {
- CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)));
+ CHECK(!death_notification_is_held(&(memory_->GetWatcher(i)->tid)))
+ << ": " << i;
}
}
-size_t LocklessQueue::QueueSize() const { return memory_->queue_size(); }
+std::optional<LocklessQueueWatcher> LocklessQueueWatcher::Make(
+ LocklessQueue queue, int priority) {
+ queue.Initialize();
+ LocklessQueueWatcher result(queue.memory(), priority);
+ if (result.watcher_index_ != -1) {
+ return std::move(result);
+ } else {
+ return std::nullopt;
+ }
+}
-bool LocklessQueue::RegisterWakeup(int priority) {
+LocklessQueueWatcher::LocklessQueueWatcher(LocklessQueueMemory *memory,
+ int priority)
+ : memory_(memory) {
// TODO(austin): Make sure signal coalescing is turned on. We don't need
// duplicates. That will improve performance under high load.
@@ -466,10 +657,10 @@
// Bail if we failed to find an open slot.
if (watcher_index_ == -1) {
- return false;
+ return;
}
- Watcher *w = memory_->GetWatcher(watcher_index_);
+ Watcher *const w = memory_->GetWatcher(watcher_index_);
w->pid = getpid();
w->priority = priority;
@@ -477,29 +668,15 @@
// Grabbing a mutex is a compiler and memory barrier, so nothing before will
// get rearranged afterwords.
death_notification_init(&(w->tid));
- return true;
}
-void LocklessQueue::UnregisterWakeup() {
- // Since everything is self consistent, all we need to do is make sure nobody
- // else is running. Someone dying will get caught in the generic consistency
- // check.
- GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
-
- // Make sure we are registered.
- CHECK_NE(watcher_index_, -1);
-
- // Make sure we still own the slot we are supposed to.
- CHECK(
- death_notification_is_held(&(memory_->GetWatcher(watcher_index_)->tid)));
-
- // The act of unlocking invalidates the entry. Invalidate it.
- death_notification_release(&(memory_->GetWatcher(watcher_index_)->tid));
- // And internally forget the slot.
- watcher_index_ = -1;
+LocklessQueueWakeUpper::LocklessQueueWakeUpper(LocklessQueue queue)
+ : memory_(queue.const_memory()), pid_(getpid()), uid_(getuid()) {
+ queue.Initialize();
+ watcher_copy_.resize(memory_->num_watchers());
}
-int LocklessQueue::Wakeup(const int current_priority) {
+int LocklessQueueWakeUpper::Wakeup(const int current_priority) {
const size_t num_watchers = memory_->num_watchers();
CHECK_EQ(watcher_copy_.size(), num_watchers);
@@ -511,7 +688,7 @@
// question. There is no way without pidfd's to close this window, and
// creating a pidfd is likely not RT.
for (size_t i = 0; i < num_watchers; ++i) {
- Watcher *w = memory_->GetWatcher(i);
+ const Watcher *w = memory_->GetWatcher(i);
watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_RELAXED);
// Force the load of the TID to come first.
aos_compiler_memory_barrier();
@@ -584,7 +761,8 @@
return count;
}
-LocklessQueue::Sender::Sender(LocklessQueueMemory *memory) : memory_(memory) {
+LocklessQueueSender::LocklessQueueSender(LocklessQueueMemory *memory)
+ : memory_(memory) {
GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
// Since we already have the lock, go ahead and try cleaning up.
@@ -609,47 +787,54 @@
return;
}
- ::aos::ipc_lib::Sender *s = memory_->GetSender(sender_index_);
+ ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
// Indicate that we are now alive by taking over the slot. If the previous
// owner died, we still want to do this.
- death_notification_init(&(s->tid));
+ death_notification_init(&(sender->tid));
+
+ const Index scratch_index = sender->scratch_index.RelaxedLoad();
+ Message *const message = memory_->GetMessage(scratch_index);
+ CHECK(!message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+ << ": " << std::hex << scratch_index.get();
}
-LocklessQueue::Sender::~Sender() {
- if (valid()) {
+LocklessQueueSender::~LocklessQueueSender() {
+ if (sender_index_ != -1) {
+ CHECK(memory_ != nullptr);
death_notification_release(&(memory_->GetSender(sender_index_)->tid));
}
}
-std::optional<LocklessQueue::Sender> LocklessQueue::MakeSender() {
- LocklessQueue::Sender result = LocklessQueue::Sender(memory_);
- if (result.valid()) {
+std::optional<LocklessQueueSender> LocklessQueueSender::Make(
+ LocklessQueue queue) {
+ queue.Initialize();
+ LocklessQueueSender result(queue.memory());
+ if (result.sender_index_ != -1) {
return std::move(result);
} else {
return std::nullopt;
}
}
-QueueIndex ZeroOrValid(QueueIndex index) {
- if (!index.valid()) {
- return index.Clear();
- }
- return index;
+size_t LocklessQueueSender::size() const {
+ return memory_->message_data_size();
}
-size_t LocklessQueue::Sender::size() { return memory_->message_data_size(); }
-
-void *LocklessQueue::Sender::Data() {
+void *LocklessQueueSender::Data() {
::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
- Index scratch_index = sender->scratch_index.RelaxedLoad();
- Message *message = memory_->GetMessage(scratch_index);
- message->header.queue_index.Invalidate();
+ const Index scratch_index = sender->scratch_index.RelaxedLoad();
+ Message *const message = memory_->GetMessage(scratch_index);
+ // We should have invalidated this when we first got the buffer. Verify that
+ // in debug mode.
+ DCHECK(
+ !message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+ << ": " << std::hex << scratch_index.get();
return message->data(memory_->message_data_size());
}
-void LocklessQueue::Sender::Send(
+void LocklessQueueSender::Send(
const char *data, size_t length,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
@@ -666,7 +851,7 @@
monotonic_sent_time, realtime_sent_time, queue_index);
}
-void LocklessQueue::Sender::Send(
+void LocklessQueueSender::Send(
size_t length, aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
@@ -682,6 +867,12 @@
const Index scratch_index = sender->scratch_index.RelaxedLoad();
Message *const message = memory_->GetMessage(scratch_index);
+ // We should have invalidated this when we first got the buffer. Verify that
+ // in debug mode.
+ DCHECK(
+ !message->header.queue_index.RelaxedLoad(memory_->queue_size()).valid())
+ << ": " << std::hex << scratch_index.get();
+
message->header.length = length;
// Pass these through. Any alternative behavior can be implemented out a
// layer.
@@ -689,6 +880,7 @@
message->header.monotonic_remote_time = monotonic_remote_time;
message->header.realtime_remote_time = realtime_remote_time;
+ Index to_replace = Index::Invalid();
while (true) {
const QueueIndex actual_next_queue_index =
memory_->next_queue_index.Load(queue_size);
@@ -698,7 +890,7 @@
// This needs to synchronize with whoever the previous writer at this
// location was.
- const Index to_replace = memory_->LoadIndex(next_queue_index);
+ to_replace = memory_->LoadIndex(next_queue_index);
const QueueIndex decremented_queue_index =
next_queue_index.DecrementBy(queue_size);
@@ -726,9 +918,14 @@
}
// Confirm that the message is what it should be.
+ //
+ // This is just a best-effort check to skip reading the clocks if possible.
+ // If this fails, then the compare-exchange below definitely would, so we
+ // can bail out now.
{
const QueueIndex previous_index =
- memory_->GetMessage(to_replace)->header.queue_index.Load(queue_size);
+ memory_->GetMessage(to_replace)
+ ->header.queue_index.RelaxedLoad(queue_size);
if (previous_index != decremented_queue_index && previous_index.valid()) {
// Retry.
VLOG(3) << "Something fishy happened, queue index doesn't match. "
@@ -794,17 +991,145 @@
aos_compiler_memory_barrier();
// And then record that we succeeded, but definitely after the above store.
sender->to_replace.RelaxedInvalidate();
+
break;
}
+
+ // to_replace is our current scratch_index. It isn't in the queue, which means
+ // nobody new can pin it. They can set their `pinned` to it, but they will
+ // back it out, so they don't count. This means that we just need to find a
+ // message for which no pinner had it in `pinned`, and then we know this
+ // message will never be pinned. We'll start with to_replace, and if that is
+ // pinned then we'll look for a new one to use instead.
+ const Index new_scratch =
+ SwapPinnedSenderScratch(memory_, sender, to_replace);
+
+ // If anybody is looking at this message (they shouldn't be), then try telling
+ // them about it (best-effort).
+ memory_->GetMessage(new_scratch)->header.queue_index.RelaxedInvalidate();
+}
+
+int LocklessQueueSender::buffer_index() const {
+ ::aos::ipc_lib::Sender *const sender = memory_->GetSender(sender_index_);
+ // We can do a relaxed load on our sender because we're the only person
+ // modifying it right now.
+ const Index scratch_index = sender->scratch_index.RelaxedLoad();
+ return scratch_index.message_index();
+}
+
+LocklessQueuePinner::LocklessQueuePinner(
+ LocklessQueueMemory *memory, const LocklessQueueMemory *const_memory)
+ : memory_(memory), const_memory_(const_memory) {
+ GrabQueueSetupLockOrDie grab_queue_setup_lock(memory_);
+
+ // Since we already have the lock, go ahead and try cleaning up.
+ Cleanup(memory_, grab_queue_setup_lock);
+
+ const int num_pinners = memory_->num_pinners();
+
+ for (int i = 0; i < num_pinners; ++i) {
+ ::aos::ipc_lib::Pinner *p = memory->GetPinner(i);
+ // This doesn't need synchronization because we're the only process doing
+ // initialization right now, and nobody else will be touching pinners which
+ // we're interested in.
+ const uint32_t tid = __atomic_load_n(&(p->tid.futex), __ATOMIC_RELAXED);
+ if (tid == 0) {
+ pinner_index_ = i;
+ break;
+ }
+ }
+
+ if (pinner_index_ == -1) {
+ VLOG(1) << "Too many pinners, starting to bail.";
+ return;
+ }
+
+ ::aos::ipc_lib::Pinner *p = memory_->GetPinner(pinner_index_);
+ p->pinned.Invalidate();
+
+ // Indicate that we are now alive by taking over the slot. If the previous
+ // owner died, we still want to do this.
+ death_notification_init(&(p->tid));
+}
+
+LocklessQueuePinner::~LocklessQueuePinner() {
+ if (pinner_index_ != -1) {
+ CHECK(memory_ != nullptr);
+ memory_->GetPinner(pinner_index_)->pinned.Invalidate();
+ aos_compiler_memory_barrier();
+ death_notification_release(&(memory_->GetPinner(pinner_index_)->tid));
+ }
}
-LocklessQueue::ReadResult LocklessQueue::Read(
+std::optional<LocklessQueuePinner> LocklessQueuePinner::Make(
+ LocklessQueue queue) {
+ queue.Initialize();
+ LocklessQueuePinner result(queue.memory(), queue.const_memory());
+ if (result.pinner_index_ != -1) {
+ return std::move(result);
+ } else {
+ return std::nullopt;
+ }
+}
+
+// This method doesn't mess with any scratch_index, so it doesn't have to worry
+// about message ownership.
+int LocklessQueuePinner::PinIndex(uint32_t uint32_queue_index) {
+ const size_t queue_size = memory_->queue_size();
+ const QueueIndex queue_index =
+ QueueIndex::Zero(queue_size).IncrementBy(uint32_queue_index);
+ ipc_lib::Pinner *const pinner = memory_->GetPinner(pinner_index_);
+
+ AtomicIndex *const queue_slot = memory_->GetQueue(queue_index.Wrapped());
+
+ // Indicate that we want to pin this message.
+ pinner->pinned.Store(queue_index);
+ aos_compiler_memory_barrier();
+
+ {
+ const Index message_index = queue_slot->Load();
+ Message *const message = memory_->GetMessage(message_index);
+
+ const QueueIndex message_queue_index =
+ message->header.queue_index.Load(queue_size);
+ if (message_queue_index == queue_index) {
+ VLOG(3) << "Eq: " << std::hex << message_queue_index.index();
+ aos_compiler_memory_barrier();
+ return message_index.message_index();
+ }
+ VLOG(3) << "Message reused: " << std::hex << message_queue_index.index()
+ << ", " << queue_index.index();
+ }
+
+ // Being down here means we asked to pin a message before realizing it's no
+ // longer in the queue, so back that out now.
+ pinner->pinned.Invalidate();
+ VLOG(3) << "Unpinned: " << std::hex << queue_index.index();
+ return -1;
+}
+
+size_t LocklessQueuePinner::size() const {
+ return const_memory_->message_data_size();
+}
+
+const void *LocklessQueuePinner::Data() const {
+ const size_t queue_size = const_memory_->queue_size();
+ const ::aos::ipc_lib::Pinner *const pinner =
+ const_memory_->GetPinner(pinner_index_);
+ QueueIndex pinned = pinner->pinned.RelaxedLoad(queue_size);
+ CHECK(pinned.valid());
+ const Message *message = const_memory_->GetMessage(pinned);
+
+ return message->data(const_memory_->message_data_size());
+}
+
+LocklessQueueReader::Result LocklessQueueReader::Read(
uint32_t uint32_queue_index,
::aos::monotonic_clock::time_point *monotonic_sent_time,
::aos::realtime_clock::time_point *realtime_sent_time,
::aos::monotonic_clock::time_point *monotonic_remote_time,
::aos::realtime_clock::time_point *realtime_remote_time,
- uint32_t *remote_queue_index, size_t *length, char *data) {
+ uint32_t *remote_queue_index, size_t *length, char *data) const {
const size_t queue_size = memory_->queue_size();
// Build up the QueueIndex.
@@ -813,7 +1138,7 @@
// Read the message stored at the requested location.
Index mi = memory_->LoadIndex(queue_index);
- Message *m = memory_->GetMessage(mi);
+ const Message *m = memory_->GetMessage(mi);
while (true) {
// We need to confirm that the data doesn't change while we are reading it.
@@ -826,47 +1151,47 @@
if (starting_queue_index == queue_index.DecrementBy(queue_size)) {
VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
<< ", " << queue_index.DecrementBy(queue_size).index();
- return ReadResult::NOTHING_NEW;
+ return Result::NOTHING_NEW;
+ }
+
+ // Someone has re-used this message between when we pulled it out of the
+ // queue and when we grabbed its index. It is pretty hard to deduce
+ // what happened. Just try again.
+ const Message *const new_m = memory_->GetMessage(queue_index);
+ if (m != new_m) {
+ m = new_m;
+ VLOG(3) << "Retrying, m doesn't match";
+ continue;
+ }
+
+ // We have confirmed that message still points to the same message. This
+ // means that the message didn't get swapped out from under us, so
+ // starting_queue_index is correct.
+ //
+ // Either we got too far behind (signaled by this being a valid
+ // message), or this is one of the initial messages which are invalid.
+ if (starting_queue_index.valid()) {
+ VLOG(3) << "Too old. Tried for " << std::hex << queue_index.index()
+ << ", got " << starting_queue_index.index() << ", behind by "
+ << std::dec
+ << (starting_queue_index.index() - queue_index.index());
+ return Result::TOO_OLD;
+ }
+
+ VLOG(3) << "Initial";
+
+ // There isn't a valid message at this location.
+ //
+ // If someone asks for one of the messages within the first go around,
+ // then they need to wait. They got ahead. Otherwise, they are
+ // asking for something crazy, like something before the beginning of
+ // the queue. Tell them that they are behind.
+ if (uint32_queue_index < memory_->queue_size()) {
+ VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
+ return Result::NOTHING_NEW;
} else {
- // Someone has re-used this message between when we pulled it out of the
- // queue and when we grabbed its index. It is pretty hard to deduce
- // what happened. Just try again.
- Message *const new_m = memory_->GetMessage(queue_index);
- if (m != new_m) {
- m = new_m;
- VLOG(3) << "Retrying, m doesn't match";
- continue;
- }
-
- // We have confirmed that message still points to the same message. This
- // means that the message didn't get swapped out from under us, so
- // starting_queue_index is correct.
- //
- // Either we got too far behind (signaled by this being a valid
- // message), or this is one of the initial messages which are invalid.
- if (starting_queue_index.valid()) {
- VLOG(3) << "Too old. Tried for " << std::hex << queue_index.index()
- << ", got " << starting_queue_index.index() << ", behind by "
- << std::dec
- << (starting_queue_index.index() - queue_index.index());
- return ReadResult::TOO_OLD;
- }
-
- VLOG(3) << "Initial";
-
- // There isn't a valid message at this location.
- //
- // If someone asks for one of the messages within the first go around,
- // then they need to wait. They got ahead. Otherwise, they are
- // asking for something crazy, like something before the beginning of
- // the queue. Tell them that they are behind.
- if (uint32_queue_index < memory_->queue_size()) {
- VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
- return ReadResult::NOTHING_NEW;
- } else {
- VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
- return ReadResult::TOO_OLD;
- }
+ VLOG(3) << "Not near zero, " << std::hex << uint32_queue_index;
+ return Result::TOO_OLD;
}
}
VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
@@ -886,7 +1211,8 @@
*monotonic_remote_time = m->header.monotonic_remote_time;
*realtime_remote_time = m->header.realtime_remote_time;
if (data) {
- memcpy(data, m->data(memory_->message_data_size()), message_data_size());
+ memcpy(data, m->data(memory_->message_data_size()),
+ memory_->message_data_size());
}
*length = m->header.length;
@@ -901,18 +1227,13 @@
<< queue_index.index() << ", finished with "
<< final_queue_index.index() << ", delta: " << std::dec
<< (final_queue_index.index() - queue_index.index());
- return ReadResult::OVERWROTE;
+ return Result::OVERWROTE;
}
- return ReadResult::GOOD;
+ return Result::GOOD;
}
-size_t LocklessQueue::queue_size() const { return memory_->queue_size(); }
-size_t LocklessQueue::message_data_size() const {
- return memory_->message_data_size();
-}
-
-QueueIndex LocklessQueue::LatestQueueIndex() {
+QueueIndex LocklessQueueReader::LatestIndex() const {
const size_t queue_size = memory_->queue_size();
// There is only one interesting case. We need to know if the queue is empty.
@@ -922,9 +1243,16 @@
if (next_queue_index.valid()) {
const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
return current_queue_index;
- } else {
- return empty_queue_index();
}
+ return QueueIndex::Invalid();
+}
+
+size_t LocklessQueueSize(const LocklessQueueMemory *memory) {
+ return memory->queue_size();
+}
+
+size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory) {
+ return memory->message_data_size();
}
namespace {
@@ -960,6 +1288,8 @@
<< ::std::endl;
::std::cout << " size_t num_senders = " << memory->config.num_senders
<< ::std::endl;
+ ::std::cout << " size_t num_pinners = " << memory->config.num_pinners
+ << ::std::endl;
::std::cout << " size_t queue_size = " << memory->config.queue_size
<< ::std::endl;
::std::cout << " size_t message_data_size = "
@@ -1049,6 +1379,22 @@
}
::std::cout << " }" << ::std::endl;
+ ::std::cout << " Pinner pinners[" << memory->num_pinners() << "] {"
+ << ::std::endl;
+ for (size_t i = 0; i < memory->num_pinners(); ++i) {
+ Pinner *p = memory->GetPinner(i);
+ ::std::cout << " [" << i << "] -> Pinner {" << ::std::endl;
+ ::std::cout << " aos_mutex tid = " << PrintMutex(&p->tid)
+ << ::std::endl;
+ ::std::cout << " AtomicIndex scratch_index = "
+ << p->scratch_index.Load().DebugString() << ::std::endl;
+ ::std::cout << " AtomicIndex pinned = "
+ << p->pinned.Load(memory->queue_size()).DebugString()
+ << ::std::endl;
+ ::std::cout << " }" << ::std::endl;
+ }
+ ::std::cout << " }" << ::std::endl;
+
::std::cout << " Watcher watchers[" << memory->num_watchers() << "] {"
<< ::std::endl;
for (size_t i = 0; i < memory->num_watchers(); ++i) {
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index de80f3d..3cd3726 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -4,8 +4,8 @@
#include <signal.h>
#include <sys/signalfd.h>
#include <sys/types.h>
-#include <vector>
#include <optional>
+#include <vector>
#include "aos/ipc_lib/aos_sync.h"
#include "aos/ipc_lib/data_alignment.h"
@@ -51,6 +51,21 @@
AtomicIndex to_replace;
};
+// Structure to hold the state required to pin messages.
+struct Pinner {
+ // The same as Sender::tid. See there for docs.
+ aos_mutex tid;
+
+ // Queue index of the message we have pinned, or Invalid if there isn't one.
+ AtomicQueueIndex pinned;
+
+ // This should always be valid.
+ //
+ // Note that this is fully independent from pinned. It's just a place to stash
+ // a message, to ensure there's always an unpinned one for a writer to grab.
+ AtomicIndex scratch_index;
+};
+
// Structure representing a message.
struct Message {
struct Header {
@@ -98,6 +113,8 @@
size_t num_watchers;
// Size of the sender list.
size_t num_senders;
+ // Size of the pinner list.
+ size_t num_pinners;
// Size of the list of pointers into the messages list.
size_t queue_size;
@@ -106,7 +123,7 @@
size_t message_size() const;
- size_t num_messages() const { return num_senders + queue_size; }
+ size_t num_messages() const { return num_senders + num_pinners + queue_size; }
};
// Structure to hold the state of the queue.
@@ -117,40 +134,72 @@
// is done before the watcher goes RT), but needs to be RT for the sender.
struct LocklessQueueMemory;
+// Returns the size of the LocklessQueueMemory.
+size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
+
// Initializes the queue memory. memory must be either a valid pointer to the
// queue datastructure, or must be zero initialized.
LocklessQueueMemory *InitializeLocklessQueueMemory(
LocklessQueueMemory *memory, LocklessQueueConfiguration config);
-// Returns the size of the LocklessQueueMemory.
-size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
-
-// Prints to stdout the data inside the queue for debugging.
-void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
-
const static unsigned int kWakeupSignal = SIGRTMIN + 2;
-// Class to manage sending and receiving data in the lockless queue. This is
-// separate from the actual memory backing the queue so that memory can be
-// managed with mmap to share across the process boundary.
+// A convenient wrapper for accessing a lockless queue.
class LocklessQueue {
public:
- LocklessQueue(LocklessQueueMemory *memory, LocklessQueueConfiguration config);
- LocklessQueue(const LocklessQueue &) = delete;
- LocklessQueue &operator=(const LocklessQueue &) = delete;
+ LocklessQueue(const LocklessQueueMemory *const_memory,
+ LocklessQueueMemory *memory, LocklessQueueConfiguration config)
+ : const_memory_(const_memory), memory_(memory), config_(config) {}
- ~LocklessQueue();
+ void Initialize();
- // Returns the number of messages in the queue.
- size_t QueueSize() const;
+ LocklessQueueConfiguration config() const { return config_; }
- size_t message_data_size() const;
+ const LocklessQueueMemory *const_memory() { return const_memory_; }
+ LocklessQueueMemory *memory() { return memory_; }
- // Registers this thread to receive the kWakeupSignal signal when Wakeup is
- // called. Returns false if there was an error in registration.
- bool RegisterWakeup(int priority);
- // Unregisters the wakeup.
- void UnregisterWakeup();
+ private:
+ const LocklessQueueMemory *const_memory_;
+ LocklessQueueMemory *memory_;
+ LocklessQueueConfiguration config_;
+};
+
+class LocklessQueueWatcher {
+ public:
+ LocklessQueueWatcher(const LocklessQueueWatcher &) = delete;
+ LocklessQueueWatcher &operator=(const LocklessQueueWatcher &) = delete;
+ LocklessQueueWatcher(LocklessQueueWatcher &&other)
+ : memory_(other.memory_), watcher_index_(other.watcher_index_) {
+ other.watcher_index_ = -1;
+ }
+ LocklessQueueWatcher &operator=(LocklessQueueWatcher &&other) {
+ std::swap(memory_, other.memory_);
+ std::swap(watcher_index_, other.watcher_index_);
+ return *this;
+ }
+
+ ~LocklessQueueWatcher();
+
+ // Registers this thread to receive the kWakeupSignal signal when
+ // LocklessQueueWakeUpper::Wakeup is called. Returns nullopt if there was an
+ // error in registration.
+ // TODO(austin): Change the API if we find ourselves with more errors.
+ static std::optional<LocklessQueueWatcher> Make(LocklessQueue queue,
+ int priority);
+
+ private:
+ LocklessQueueWatcher(LocklessQueueMemory *memory, int priority);
+
+ LocklessQueueMemory *memory_ = nullptr;
+
+ // Index in the watcher list that our entry is, or -1 if no watcher is
+ // registered.
+ int watcher_index_ = -1;
+};
+
+class LocklessQueueWakeUpper {
+ public:
+ LocklessQueueWakeUpper(LocklessQueue queue);
// Sends the kWakeupSignal to all threads which have called RegisterWakeup.
//
@@ -158,113 +207,7 @@
// if nonrt.
int Wakeup(int current_priority);
- // If you ask for a queue index 2 past the newest, you will still get
- // NOTHING_NEW until that gets overwritten with new data. If you ask for an
- // element newer than QueueSize() from the current message, we consider it
- // behind by a large amount and return TOO_OLD. If the message is modified
- // out from underneath us as we read it, return OVERWROTE.
- //
- // data may be nullptr to indicate the data should not be copied.
- enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
- ReadResult Read(uint32_t queue_index,
- ::aos::monotonic_clock::time_point *monotonic_sent_time,
- ::aos::realtime_clock::time_point *realtime_sent_time,
- ::aos::monotonic_clock::time_point *monotonic_remote_time,
- ::aos::realtime_clock::time_point *realtime_remote_time,
- uint32_t *remote_queue_index, size_t *length, char *data);
-
- // Returns the index to the latest queue message. Returns empty_queue_index()
- // if there are no messages in the queue. Do note that this index wraps if
- // more than 2^32 messages are sent.
- QueueIndex LatestQueueIndex();
- static QueueIndex empty_queue_index() { return QueueIndex::Invalid(); }
-
- // Returns the size of the queue. This is mostly useful for manipulating
- // QueueIndex.
- size_t queue_size() const;
-
- // TODO(austin): Return the oldest queue index. This lets us catch up nicely
- // if we got behind.
- // The easiest way to implement this is likely going to be to reserve the
- // first modulo of values for the initial time around, and never reuse them.
- // That lets us do a simple atomic read of the next index and deduce what has
- // happened. It will involve the simplest atomic operations.
-
- // TODO(austin): Make it so we can find the indices which were sent just
- // before and after a time with a binary search.
-
- // Sender for blocks of data. The resources associated with a sender are
- // scoped to this object's lifetime.
- class Sender {
- public:
- Sender(const Sender &) = delete;
- Sender &operator=(const Sender &) = delete;
- Sender(Sender &&other)
- : memory_(other.memory_), sender_index_(other.sender_index_) {
- other.memory_ = nullptr;
- other.sender_index_ = -1;
- }
- Sender &operator=(Sender &&other) {
- memory_ = other.memory_;
- sender_index_ = other.sender_index_;
- other.memory_ = nullptr;
- other.sender_index_ = -1;
- return *this;
- }
-
- ~Sender();
-
- // Sends a message without copying the data.
- // Copy at most size() bytes of data into the memory pointed to by Data(),
- // and then call Send().
- // Note: calls to Data() are expensive enough that you should cache it.
- size_t size();
- void *Data();
- void Send(size_t length,
- aos::monotonic_clock::time_point monotonic_remote_time =
- aos::monotonic_clock::min_time,
- aos::realtime_clock::time_point realtime_remote_time =
- aos::realtime_clock::min_time,
- uint32_t remote_queue_index = 0xffffffff,
- aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
- aos::realtime_clock::time_point *realtime_sent_time = nullptr,
- uint32_t *queue_index = nullptr);
-
- // Sends up to length data. Does not wakeup the target.
- void Send(const char *data, size_t length,
- aos::monotonic_clock::time_point monotonic_remote_time =
- aos::monotonic_clock::min_time,
- aos::realtime_clock::time_point realtime_remote_time =
- aos::realtime_clock::min_time,
- uint32_t remote_queue_index = 0xffffffff,
- aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
- aos::realtime_clock::time_point *realtime_sent_time = nullptr,
- uint32_t *queue_index = nullptr);
-
- private:
- friend class LocklessQueue;
-
- Sender(LocklessQueueMemory *memory);
-
- // Returns true if this sender is valid. If it isn't valid, any of the
- // other methods won't work. This is here to allow the lockless queue to
- // only build a sender if there was one available.
- bool valid() const { return sender_index_ != -1 && memory_ != nullptr; }
-
- // Pointer to the backing memory.
- LocklessQueueMemory *memory_ = nullptr;
-
- // Index into the sender list.
- int sender_index_ = -1;
- };
-
- // Creates a sender. If we couldn't allocate a sender, returns nullopt.
- // TODO(austin): Change the API if we find ourselves with more errors.
- std::optional<Sender> MakeSender();
-
private:
- LocklessQueueMemory *memory_ = nullptr;
-
// Memory and datastructure used to sort a list of watchers to wake
// up. This isn't a copy of Watcher since tid is simpler to work with here
// than the futex above.
@@ -273,17 +216,176 @@
pid_t pid;
int priority;
};
- // TODO(austin): Don't allocate this memory if we aren't going to send.
- ::std::vector<WatcherCopy> watcher_copy_;
- // Index in the watcher list that our entry is, or -1 if no watcher is
- // registered.
- int watcher_index_ = -1;
-
+ const LocklessQueueMemory *const memory_;
const int pid_;
const uid_t uid_;
+
+ ::std::vector<WatcherCopy> watcher_copy_;
};
+// Sender for blocks of data. The resources associated with a sender are
+// scoped to this object's lifetime.
+class LocklessQueueSender {
+ public:
+ LocklessQueueSender(const LocklessQueueSender &) = delete;
+ LocklessQueueSender &operator=(const LocklessQueueSender &) = delete;
+ LocklessQueueSender(LocklessQueueSender &&other)
+ : memory_(other.memory_), sender_index_(other.sender_index_) {
+ other.memory_ = nullptr;
+ other.sender_index_ = -1;
+ }
+ LocklessQueueSender &operator=(LocklessQueueSender &&other) {
+ std::swap(memory_, other.memory_);
+ std::swap(sender_index_, other.sender_index_);
+ return *this;
+ }
+
+ ~LocklessQueueSender();
+
+ // Creates a sender. If we couldn't allocate a sender, returns nullopt.
+ // TODO(austin): Change the API if we find ourselves with more errors.
+ static std::optional<LocklessQueueSender> Make(LocklessQueue queue);
+
+ // Sends a message without copying the data.
+ // Copy at most size() bytes of data into the memory pointed to by Data(),
+ // and then call Send().
+ // Note: calls to Data() are expensive enough that you should cache it.
+ size_t size() const;
+ void *Data();
+ void Send(size_t length,
+ aos::monotonic_clock::time_point monotonic_remote_time =
+ aos::monotonic_clock::min_time,
+ aos::realtime_clock::time_point realtime_remote_time =
+ aos::realtime_clock::min_time,
+ uint32_t remote_queue_index = 0xffffffff,
+ aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+ aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+ uint32_t *queue_index = nullptr);
+
+ // Sends up to length data. Does not wakeup the target.
+ void Send(const char *data, size_t length,
+ aos::monotonic_clock::time_point monotonic_remote_time =
+ aos::monotonic_clock::min_time,
+ aos::realtime_clock::time_point realtime_remote_time =
+ aos::realtime_clock::min_time,
+ uint32_t remote_queue_index = 0xffffffff,
+ aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
+ aos::realtime_clock::time_point *realtime_sent_time = nullptr,
+ uint32_t *queue_index = nullptr);
+
+ int buffer_index() const;
+
+ private:
+ LocklessQueueSender(LocklessQueueMemory *memory);
+
+ // Pointer to the backing memory.
+ LocklessQueueMemory *memory_ = nullptr;
+
+ // Index into the sender list.
+ int sender_index_ = -1;
+};
+
+// Pinner for blocks of data. The resources associated with a pinner are
+// scoped to this object's lifetime.
+class LocklessQueuePinner {
+ public:
+ LocklessQueuePinner(const LocklessQueuePinner &) = delete;
+ LocklessQueuePinner &operator=(const LocklessQueuePinner &) = delete;
+ LocklessQueuePinner(LocklessQueuePinner &&other)
+ : memory_(other.memory_),
+ const_memory_(other.const_memory_),
+ pinner_index_(other.pinner_index_) {
+ other.pinner_index_ = -1;
+ }
+ LocklessQueuePinner &operator=(LocklessQueuePinner &&other) {
+ std::swap(memory_, other.memory_);
+ std::swap(const_memory_, other.const_memory_);
+ std::swap(pinner_index_, other.pinner_index_);
+ return *this;
+ }
+
+ ~LocklessQueuePinner();
+
+ // Creates a pinner. If we couldn't allocate a pinner, returns nullopt.
+ // TODO(austin): Change the API if we find ourselves with more errors.
+ static std::optional<LocklessQueuePinner> Make(LocklessQueue queue);
+
+ // Attempts to pin the message at queue_index.
+ // Un-pins the previous message.
+ // Returns the buffer index (non-negative) if it succeeds.
+ // Returns -1 if that message is no longer in the queue.
+ int PinIndex(uint32_t queue_index);
+
+ // Read at most size() bytes of data into the memory pointed to by Data().
+ // Note: calls to Data() are expensive enough that you should cache it.
+ // Don't call Data() before a successful PinIndex call.
+ size_t size() const;
+ const void *Data() const;
+
+ private:
+ LocklessQueuePinner(LocklessQueueMemory *memory,
+ const LocklessQueueMemory *const_memory);
+
+ // Pointer to the backing memory.
+ LocklessQueueMemory *memory_ = nullptr;
+ const LocklessQueueMemory *const_memory_ = nullptr;
+
+ // Index into the pinner list.
+ int pinner_index_ = -1;
+};
+
+class LocklessQueueReader {
+ public:
+ enum class Result { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
+
+ LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
+ queue.Initialize();
+ }
+
+ // If you ask for a queue index 2 past the newest, you will still get
+ // NOTHING_NEW until that gets overwritten with new data. If you ask for an
+ // element newer than QueueSize() from the current message, we consider it
+ // behind by a large amount and return TOO_OLD. If the message is modified
+ // out from underneath us as we read it, return OVERWROTE.
+ //
+ // data may be nullptr to indicate the data should not be copied.
+ Result Read(uint32_t queue_index,
+ ::aos::monotonic_clock::time_point *monotonic_sent_time,
+ ::aos::realtime_clock::time_point *realtime_sent_time,
+ ::aos::monotonic_clock::time_point *monotonic_remote_time,
+ ::aos::realtime_clock::time_point *realtime_remote_time,
+ uint32_t *remote_queue_index, size_t *length, char *data) const;
+
+ // Returns the index to the latest queue message. Returns empty_queue_index()
+ // if there are no messages in the queue. Do note that this index wraps if
+ // more than 2^32 messages are sent.
+ QueueIndex LatestIndex() const;
+
+ private:
+ const LocklessQueueMemory *const memory_;
+};
+
+// Returns the number of messages which are logically in the queue at a time.
+size_t LocklessQueueSize(const LocklessQueueMemory *memory);
+
+// Returns the number of bytes queue users are allowed to read/write within each
+// message.
+size_t LocklessQueueMessageDataSize(const LocklessQueueMemory *memory);
+
+// TODO(austin): Return the oldest queue index. This lets us catch up nicely
+// if we got behind.
+// The easiest way to implement this is likely going to be to reserve the
+// first modulo of values for the initial time around, and never reuse them.
+// That lets us do a simple atomic read of the next index and deduce what has
+// happened. It will involve the simplest atomic operations.
+
+// TODO(austin): Make it so we can find the indices which were sent just
+// before and after a time with a binary search.
+
+// Prints to stdout the data inside the queue for debugging.
+void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
+
} // namespace ipc_lib
} // namespace aos
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index b7cdfee..b4bb66a 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -487,6 +487,9 @@
}
return false;
}
+
+static int kPinnedMessageIndex = 0;
+
} // namespace
// Tests that death during sends is recovered from correctly.
@@ -503,36 +506,47 @@
LocklessQueueConfiguration config;
config.num_watchers = 2;
config.num_senders = 2;
- config.queue_size = 4;
+ config.num_pinners = 1;
+ config.queue_size = 2;
config.message_data_size = 32;
TestShmRobustness(
config,
[config, tid](void *memory) {
// Initialize the queue and grab the tid.
- LocklessQueue queue(
+ LocklessQueue(
reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
- config);
+ reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+ config)
+ .Initialize();
*tid = gettid();
},
[config](void *memory) {
- // Now try to write 2 messages. We will get killed a bunch as this
- // tries to happen.
LocklessQueue queue(
reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
+ reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
config);
- LocklessQueue::Sender sender = queue.MakeSender().value();
- for (int i = 0; i < 2; ++i) {
+ // Now try to write some messages. We will get killed a bunch as this
+ // tries to happen.
+ LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
+ LocklessQueuePinner pinner = LocklessQueuePinner::Make(queue).value();
+ for (int i = 0; i < 5; ++i) {
char data[100];
size_t s = snprintf(data, sizeof(data), "foobar%d", i + 1);
sender.Send(data, s + 1);
+ // Pin a message, so when we keep writing we will exercise the pinning
+ // logic.
+ if (i == 1) {
+ CHECK_EQ(pinner.PinIndex(1), kPinnedMessageIndex);
+ }
}
},
[config, tid](void *raw_memory) {
+ ::aos::ipc_lib::LocklessQueueMemory *const memory =
+ reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
// Confirm that we can create 2 senders (the number in the queue), and
// send a message. And that all the messages in the queue are valid.
- ::aos::ipc_lib::LocklessQueueMemory *memory =
- reinterpret_cast<::aos::ipc_lib::LocklessQueueMemory *>(raw_memory);
+ LocklessQueue queue(memory, memory, config);
bool print = false;
@@ -550,25 +564,42 @@
}
if (print) {
+ printf("Bad version:\n");
PrintLocklessQueueMemory(memory);
}
- LocklessQueue queue(memory, config);
// Building and destroying a sender will clean up the queue.
- { LocklessQueue::Sender sender = queue.MakeSender().value(); }
+ LocklessQueueSender::Make(queue).value();
if (print) {
printf("Cleaned up version:\n");
PrintLocklessQueueMemory(memory);
}
+ LocklessQueueReader reader(queue);
+
+ // Verify that the pinned message still has its contents. Note that we
+ // need to do this _before_ sending more messages, because the pinner
+ // has been cleaned up.
{
- LocklessQueue::Sender sender = queue.MakeSender().value();
+ const Message *const message =
+ memory->GetMessage(Index(1, kPinnedMessageIndex));
+ const auto queue_index =
+ message->header.queue_index.Load(memory->queue_size());
+ if (queue_index.valid()) {
+ const char *const data = message->data(memory->message_data_size());
+ EXPECT_EQ(data[LocklessQueueMessageDataSize(memory) -
+ message->header.length + 6],
+ '2');
+ }
+ }
+
+ {
+ LocklessQueueSender sender = LocklessQueueSender::Make(queue).value();
{
// Make a second sender to confirm that the slot was freed.
// If the sender doesn't get cleaned up, this will fail.
- LocklessQueue queue2(memory, config);
- queue2.MakeSender().value();
+ LocklessQueueSender::Make(queue).value();
}
// Send a message to make sure that the queue still works.
@@ -590,25 +621,33 @@
char read_data[1024];
size_t length;
- LocklessQueue::ReadResult read_result =
- queue.Read(i, &monotonic_sent_time, &realtime_sent_time,
- &monotonic_remote_time, &realtime_remote_time,
- &remote_queue_index, &length, &(read_data[0]));
+ LocklessQueueReader::Result read_result =
+ reader.Read(i, &monotonic_sent_time, &realtime_sent_time,
+ &monotonic_remote_time, &realtime_remote_time,
+ &remote_queue_index, &length, &(read_data[0]));
- if (read_result != LocklessQueue::ReadResult::GOOD) {
+ if (read_result != LocklessQueueReader::Result::GOOD) {
+ if (read_result == LocklessQueueReader::Result::TOO_OLD) {
+ ++i;
+ continue;
+ }
+ CHECK(read_result == LocklessQueueReader::Result::NOTHING_NEW)
+ << ": " << static_cast<int>(read_result);
break;
}
- EXPECT_GT(read_data[queue.message_data_size() - length + 6],
- last_data)
+ EXPECT_GT(
+ read_data[LocklessQueueMessageDataSize(memory) - length + 6],
+ last_data)
<< ": Got " << read_data;
- last_data = read_data[queue.message_data_size() - length + 6];
+ last_data =
+ read_data[LocklessQueueMessageDataSize(memory) - length + 6];
++i;
}
// Confirm our message got through.
- EXPECT_EQ(last_data, '9');
+ EXPECT_EQ(last_data, '9') << ": Got through " << i;
},
/* prepare_in_child = true */ true);
}
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
index a10609c..bb995b9 100644
--- a/aos/ipc_lib/lockless_queue_memory.h
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -31,6 +31,8 @@
size_t num_watchers() const { return config.num_watchers; }
// Size of the sender list.
size_t num_senders() const { return config.num_senders; }
+ // Size of the pinner list.
+ size_t num_pinners() const { return config.num_pinners; }
// Number of messages logically in the queue at a time.
// List of pointers into the messages list.
@@ -60,9 +62,10 @@
// writing:
//
// AtomicIndex queue[config.queue_size];
- // Message messages[config.queue_size + config.num_senders];
+ // Message messages[config.num_messages()];
// Watcher watchers[config.num_watchers];
// Sender senders[config.num_senders];
+ // Pinner pinners[config.num_pinners];
static constexpr size_t kDataAlignment = alignof(std::max_align_t);
@@ -72,27 +75,49 @@
alignas(kDataAlignment) char data[];
// Memory size functions for all 4 lists.
- size_t SizeOfQueue() { return SizeOfQueue(config); }
+ size_t SizeOfQueue() const { return SizeOfQueue(config); }
static size_t SizeOfQueue(LocklessQueueConfiguration config) {
return AlignmentRoundUp(sizeof(AtomicIndex) * config.queue_size);
}
- size_t SizeOfMessages() { return SizeOfMessages(config); }
+ size_t SizeOfMessages() const { return SizeOfMessages(config); }
static size_t SizeOfMessages(LocklessQueueConfiguration config) {
return AlignmentRoundUp(config.message_size() * config.num_messages());
}
- size_t SizeOfWatchers() { return SizeOfWatchers(config); }
+ size_t SizeOfWatchers() const { return SizeOfWatchers(config); }
static size_t SizeOfWatchers(LocklessQueueConfiguration config) {
return AlignmentRoundUp(sizeof(Watcher) * config.num_watchers);
}
- size_t SizeOfSenders() { return SizeOfSenders(config); }
+ size_t SizeOfSenders() const { return SizeOfSenders(config); }
static size_t SizeOfSenders(LocklessQueueConfiguration config) {
return AlignmentRoundUp(sizeof(Sender) * config.num_senders);
}
- // Getters for each of the 4 lists.
+ size_t SizeOfPinners() const { return SizeOfPinners(config); }
+ static size_t SizeOfPinners(LocklessQueueConfiguration config) {
+ return AlignmentRoundUp(sizeof(Pinner) * config.num_pinners);
+ }
+
+ // Getters for each of the lists.
+
+ Pinner *GetPinner(size_t pinner_index) {
+ static_assert(alignof(Pinner) <= kDataAlignment,
+ "kDataAlignment is too small");
+ return reinterpret_cast<Pinner *>(
+ &data[0] + SizeOfQueue() + SizeOfMessages() + SizeOfWatchers() +
+ SizeOfSenders() + pinner_index * sizeof(Pinner));
+ }
+
+ const Pinner *GetPinner(size_t pinner_index) const {
+ static_assert(alignof(const Pinner) <= kDataAlignment,
+ "kDataAlignment is too small");
+ return reinterpret_cast<const Pinner *>(
+ &data[0] + SizeOfQueue() + SizeOfMessages() + SizeOfWatchers() +
+ SizeOfSenders() + pinner_index * sizeof(Pinner));
+ }
+
Sender *GetSender(size_t sender_index) {
static_assert(alignof(Sender) <= kDataAlignment,
"kDataAlignment is too small");
@@ -109,15 +134,29 @@
watcher_index * sizeof(Watcher));
}
+ const Watcher *GetWatcher(size_t watcher_index) const {
+ static_assert(alignof(const Watcher) <= kDataAlignment,
+ "kDataAlignment is too small");
+ return reinterpret_cast<const Watcher *>(&data[0] + SizeOfQueue() +
+ SizeOfMessages() +
+ watcher_index * sizeof(Watcher));
+ }
+
AtomicIndex *GetQueue(uint32_t index) {
static_assert(alignof(AtomicIndex) <= kDataAlignment,
"kDataAlignment is too small");
return reinterpret_cast<AtomicIndex *>(&data[0] +
sizeof(AtomicIndex) * index);
}
+ const AtomicIndex *GetQueue(uint32_t index) const {
+ static_assert(alignof(const AtomicIndex) <= kDataAlignment,
+ "kDataAlignment is too small");
+ return reinterpret_cast<const AtomicIndex *>(&data[0] +
+ sizeof(AtomicIndex) * index);
+ }
- // There are num_senders + queue_size messages. The free list is really the
- // sender list, since those are messages available to be filled in and sent.
+ // There are num_messages() messages. The free list is really the
+ // sender+pinner list, since those are messages available to be filled in.
// This removes the need to find lost messages when a sender dies.
Message *GetMessage(Index index) {
static_assert(alignof(Message) <= kDataAlignment,
@@ -125,12 +164,19 @@
return reinterpret_cast<Message *>(&data[0] + SizeOfQueue() +
index.message_index() * message_size());
}
+ const Message *GetMessage(Index index) const {
+ static_assert(alignof(const Message) <= kDataAlignment,
+ "kDataAlignment is too small");
+ return reinterpret_cast<const Message *>(
+ &data[0] + SizeOfQueue() + index.message_index() * message_size());
+ }
// Helpers to fetch messages from the queue.
- Index LoadIndex(QueueIndex index) {
+ Index LoadIndex(QueueIndex index) const {
return GetQueue(index.Wrapped())->Load();
}
- Message *GetMessage(QueueIndex index) {
+ Message *GetMessage(QueueIndex index) { return GetMessage(LoadIndex(index)); }
+ const Message *GetMessage(QueueIndex index) const {
return GetMessage(LoadIndex(index));
}
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 65b2a15..e1e2516 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -44,6 +44,7 @@
LocklessQueueTest() {
config_.num_watchers = 10;
config_.num_senders = 100;
+ config_.num_pinners = 5;
config_.queue_size = 10000;
// Exercise the alignment code. This would throw off alignment.
config_.message_data_size = 101;
@@ -55,15 +56,16 @@
Reset();
}
- LocklessQueueMemory *get_memory() {
- return reinterpret_cast<LocklessQueueMemory *>(&(memory_[0]));
+ LocklessQueue queue() {
+ return LocklessQueue(reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
+ reinterpret_cast<LocklessQueueMemory *>(&(memory_[0])),
+ config_);
}
- void Reset() { memset(get_memory(), 0, LocklessQueueMemorySize(config_)); }
+ void Reset() { memset(&memory_[0], 0, LocklessQueueMemorySize(config_)); }
// Runs until the signal is received.
void RunUntilWakeup(Event *ready, int priority) {
- LocklessQueue queue(get_memory(), config_);
internal::EPoll epoll;
SignalFd signalfd({kWakeupSignal});
@@ -74,16 +76,18 @@
epoll.Quit();
});
- // Register to be woken up *after* the signalfd is catching the signals.
- queue.RegisterWakeup(priority);
+ {
+ // Register to be woken up *after* the signalfd is catching the signals.
+ LocklessQueueWatcher watcher =
+ LocklessQueueWatcher::Make(queue(), priority).value();
- // And signal we are now ready.
- ready->Set();
+ // And signal we are now ready.
+ ready->Set();
- epoll.Run();
+ epoll.Run();
- // Cleanup.
- queue.UnregisterWakeup();
+ // Cleanup, ensuring the watcher is destroyed before the signalfd.
+ }
epoll.DeleteFd(signalfd.fd());
}
@@ -98,36 +102,35 @@
// Tests that wakeup doesn't do anything if nothing was registered.
TEST_F(LocklessQueueTest, NoWatcherWakeup) {
- LocklessQueue queue(get_memory(), config_);
+ LocklessQueueWakeUpper wake_upper(queue());
- EXPECT_EQ(queue.Wakeup(7), 0);
+ EXPECT_EQ(wake_upper.Wakeup(7), 0);
}
// Tests that wakeup doesn't do anything if a wakeup was registered and then
// unregistered.
TEST_F(LocklessQueueTest, UnregisteredWatcherWakeup) {
- LocklessQueue queue(get_memory(), config_);
+ LocklessQueueWakeUpper wake_upper(queue());
- queue.RegisterWakeup(5);
- queue.UnregisterWakeup();
+ { LocklessQueueWatcher::Make(queue(), 5).value(); }
- EXPECT_EQ(queue.Wakeup(7), 0);
+ EXPECT_EQ(wake_upper.Wakeup(7), 0);
}
// Tests that wakeup doesn't do anything if the thread dies.
TEST_F(LocklessQueueTest, DiedWatcherWakeup) {
- LocklessQueue queue(get_memory(), config_);
+ LocklessQueueWakeUpper wake_upper(queue());
::std::thread([this]() {
// Use placement new so the destructor doesn't get run.
- ::std::aligned_storage<sizeof(LocklessQueue), alignof(LocklessQueue)>::type
- data;
- LocklessQueue *q = new (&data) LocklessQueue(get_memory(), config_);
- // Register a wakeup.
- q->RegisterWakeup(5);
- }).join();
+ ::std::aligned_storage<sizeof(LocklessQueueWatcher),
+ alignof(LocklessQueueWatcher)>::type data;
+ new (&data)
+ LocklessQueueWatcher(LocklessQueueWatcher::Make(queue(), 5).value());
+ })
+ .join();
- EXPECT_EQ(queue.Wakeup(7), 0);
+ EXPECT_EQ(wake_upper.Wakeup(7), 0);
}
struct WatcherState {
@@ -154,16 +157,13 @@
WatcherState *s = &queues.back();
queues.back().t = ::std::thread([this, &cleanup, s]() {
- LocklessQueue q(get_memory(), config_);
- EXPECT_TRUE(q.RegisterWakeup(0));
+ LocklessQueueWatcher q = LocklessQueueWatcher::Make(queue(), 0).value();
// Signal that this thread is ready.
s->ready.Set();
// And wait until we are asked to shut down.
cleanup.Wait();
-
- q.UnregisterWakeup();
});
}
@@ -173,12 +173,9 @@
}
// Now try to allocate another one. This will fail.
- {
- LocklessQueue queue(get_memory(), config_);
- EXPECT_FALSE(queue.RegisterWakeup(0));
- }
+ EXPECT_FALSE(LocklessQueueWatcher::Make(queue(), 0));
- // Trigger the threads to cleanup their resources, and wait unti they are
+ // Trigger the threads to cleanup their resources, and wait until they are
// done.
cleanup.Set();
for (WatcherState &w : queues) {
@@ -186,23 +183,16 @@
}
// We should now be able to allocate a wakeup.
- {
- LocklessQueue queue(get_memory(), config_);
- EXPECT_TRUE(queue.RegisterWakeup(0));
- queue.UnregisterWakeup();
- }
+ EXPECT_TRUE(LocklessQueueWatcher::Make(queue(), 0));
}
// Tests that too many watchers dies like expected.
TEST_F(LocklessQueueTest, TooManySenders) {
- ::std::vector<::std::unique_ptr<LocklessQueue>> queues;
- ::std::vector<LocklessQueue::Sender> senders;
+ ::std::vector<LocklessQueueSender> senders;
for (size_t i = 0; i < config_.num_senders; ++i) {
- queues.emplace_back(new LocklessQueue(get_memory(), config_));
- senders.emplace_back(queues.back()->MakeSender().value());
+ senders.emplace_back(LocklessQueueSender::Make(queue()).value());
}
- queues.emplace_back(new LocklessQueue(get_memory(), config_));
- EXPECT_FALSE(queues.back()->MakeSender());
+ EXPECT_FALSE(LocklessQueueSender::Make(queue()));
}
// Now, start 2 threads and have them receive the signals.
@@ -211,7 +201,7 @@
EXPECT_LE(kWakeupSignal, SIGRTMAX);
EXPECT_GE(kWakeupSignal, SIGRTMIN);
- LocklessQueue queue(get_memory(), config_);
+ LocklessQueueWakeUpper wake_upper(queue());
// Event used to make sure the thread is ready before the test starts.
Event ready1;
@@ -224,7 +214,7 @@
ready1.Wait();
ready2.Wait();
- EXPECT_EQ(queue.Wakeup(3), 2);
+ EXPECT_EQ(wake_upper.Wakeup(3), 2);
t1.join();
t2.join();
@@ -236,15 +226,14 @@
// Do a simple send test.
TEST_F(LocklessQueueTest, Send) {
- LocklessQueue queue(get_memory(), config_);
-
- LocklessQueue::Sender sender = queue.MakeSender().value();
+ LocklessQueueSender sender = LocklessQueueSender::Make(queue()).value();
+ LocklessQueueReader reader(queue());
// Send enough messages to wrap.
for (int i = 0; i < 20000; ++i) {
// Confirm that the queue index makes sense given the number of sends.
- EXPECT_EQ(queue.LatestQueueIndex().index(),
- i == 0 ? LocklessQueue::empty_queue_index().index() : i - 1);
+ EXPECT_EQ(reader.LatestIndex().index(),
+ i == 0 ? QueueIndex::Invalid().index() : i - 1);
// Send a trivial piece of data.
char data[100];
@@ -253,7 +242,7 @@
// Confirm that the queue index still makes sense. This is easier since the
// empty case has been handled.
- EXPECT_EQ(queue.LatestQueueIndex().index(), i);
+ EXPECT_EQ(reader.LatestIndex().index(), i);
// Read a result from 5 in the past.
::aos::monotonic_clock::time_point monotonic_sent_time;
@@ -270,15 +259,15 @@
} else {
index = index.IncrementBy(i - 5);
}
- LocklessQueue::ReadResult read_result =
- queue.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
- &monotonic_remote_time, &realtime_remote_time,
- &remote_queue_index, &length, &(read_data[0]));
+ LocklessQueueReader::Result read_result =
+ reader.Read(index.index(), &monotonic_sent_time, &realtime_sent_time,
+ &monotonic_remote_time, &realtime_remote_time,
+ &remote_queue_index, &length, &(read_data[0]));
// This should either return GOOD, or TOO_OLD if it is before the start of
// the queue.
- if (read_result != LocklessQueue::ReadResult::GOOD) {
- EXPECT_EQ(read_result, LocklessQueue::ReadResult::TOO_OLD);
+ if (read_result != LocklessQueueReader::Result::GOOD) {
+ EXPECT_EQ(read_result, LocklessQueueReader::Result::TOO_OLD);
}
}
}
@@ -294,9 +283,8 @@
const chrono::seconds print_frequency(FLAGS_print_rate);
- QueueRacer racer(get_memory(), FLAGS_thread_count, kNumMessages, config_);
- const monotonic_clock::time_point start_time =
- monotonic_clock::now();
+ QueueRacer racer(queue(), FLAGS_thread_count, kNumMessages);
+ const monotonic_clock::time_point start_time = monotonic_clock::now();
const monotonic_clock::time_point end_time =
start_time + chrono::seconds(FLAGS_duration);
@@ -332,7 +320,7 @@
// Send enough messages to wrap the 32 bit send counter.
TEST_F(LocklessQueueTest, WrappedSend) {
uint64_t kNumMessages = 0x100010000ul;
- QueueRacer racer(get_memory(), 1, kNumMessages, config_);
+ QueueRacer racer(queue(), 1, kNumMessages);
const monotonic_clock::time_point start_time = monotonic_clock::now();
EXPECT_NO_FATAL_FAILURE(racer.RunIteration(false, 0));
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index f5b3d7e..fcc8668 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -24,19 +24,16 @@
uint64_t event_count = ::std::numeric_limits<uint64_t>::max();
};
-QueueRacer::QueueRacer(LocklessQueueMemory *memory, int num_threads,
- uint64_t num_messages, LocklessQueueConfiguration config)
- : memory_(memory),
- num_threads_(num_threads),
- num_messages_(num_messages),
- config_(config) {
+QueueRacer::QueueRacer(LocklessQueue queue, int num_threads,
+ uint64_t num_messages)
+ : queue_(queue), num_threads_(num_threads), num_messages_(num_messages) {
Reset();
}
void QueueRacer::RunIteration(bool race_reads, int write_wrap_count) {
const bool will_wrap = num_messages_ * num_threads_ *
static_cast<uint64_t>(1 + write_wrap_count) >
- config_.queue_size;
+ queue_.config().queue_size;
// Clear out shmem.
Reset();
@@ -52,13 +49,13 @@
::std::vector<ThreadState> threads(num_threads_);
::std::thread queue_index_racer([this, &poll_index]() {
- LocklessQueue queue(memory_, config_);
+ LocklessQueueReader reader(queue_);
// Track the number of times we wrap, and cache the modulo.
uint64_t wrap_count = 0;
uint32_t last_queue_index = 0;
const uint32_t max_queue_index =
- QueueIndex::MaxIndex(0xffffffffu, queue.QueueSize());
+ QueueIndex::MaxIndex(0xffffffffu, queue_.config().queue_size);
while (poll_index) {
// We want to read everything backwards. This will give us conservative
// bounds. And with enough time and randomness, we will see all the cases
@@ -81,16 +78,14 @@
//
// So, grab them in order.
const uint64_t finished_writes = finished_writes_.load();
- const QueueIndex latest_queue_index_queue_index =
- queue.LatestQueueIndex();
+ const QueueIndex latest_queue_index_queue_index = reader.LatestIndex();
const uint64_t started_writes = started_writes_.load();
const uint32_t latest_queue_index_uint32_t =
latest_queue_index_queue_index.index();
uint64_t latest_queue_index = latest_queue_index_uint32_t;
- if (latest_queue_index_queue_index !=
- LocklessQueue::empty_queue_index()) {
+ if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
// If we got smaller, we wrapped.
if (latest_queue_index_uint32_t < last_queue_index) {
++wrap_count;
@@ -107,22 +102,19 @@
// If we are at the beginning, the queue needs to always return empty.
if (started_writes == 0) {
- EXPECT_EQ(latest_queue_index_queue_index,
- LocklessQueue::empty_queue_index());
+ EXPECT_EQ(latest_queue_index_queue_index, QueueIndex::Invalid());
EXPECT_EQ(finished_writes, 0);
} else {
if (finished_writes == 0) {
// Plausible to be at the beginning, in which case we don't have
// anything to check.
- if (latest_queue_index_queue_index !=
- LocklessQueue::empty_queue_index()) {
+ if (latest_queue_index_queue_index != QueueIndex::Invalid()) {
// Otherwise, we have started. The queue can't have any more
// entries than this.
EXPECT_GE(started_writes, latest_queue_index + 1);
}
} else {
- EXPECT_NE(latest_queue_index_queue_index,
- LocklessQueue::empty_queue_index());
+ EXPECT_NE(latest_queue_index_queue_index, QueueIndex::Invalid());
// latest_queue_index is an index, not a count. So it always reads 1
// low.
EXPECT_GE(latest_queue_index + 1, finished_writes);
@@ -139,42 +131,62 @@
} else {
t.event_count = 0;
}
- t.thread =
- ::std::thread([this, &t, thread_index, &run, write_wrap_count]() {
- // Build up a sender.
- LocklessQueue queue(memory_, config_);
- LocklessQueue::Sender sender = queue.MakeSender().value();
+ t.thread = ::std::thread([this, &t, thread_index, &run,
+ write_wrap_count]() {
+ // Build up a sender.
+ LocklessQueueSender sender = LocklessQueueSender::Make(queue_).value();
+ CHECK_GE(sender.size(), sizeof(ThreadPlusCount));
- // Signal that we are ready to start sending.
- t.ready.Set();
+ // Signal that we are ready to start sending.
+ t.ready.Set();
- // Wait until signaled to start running.
- run.Wait();
+ // Wait until signaled to start running.
+ run.Wait();
- // Gogogo!
- for (uint64_t i = 0;
- i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
- ++i) {
- char data[sizeof(ThreadPlusCount)];
- ThreadPlusCount tpc;
- tpc.thread = thread_index;
- tpc.count = i;
-
- memcpy(data, &tpc, sizeof(ThreadPlusCount));
-
- if (i % 0x800000 == 0x100000) {
- fprintf(stderr, "Sent %" PRIu64 ", %f %%\n", i,
- static_cast<double>(i) /
- static_cast<double>(num_messages_ *
- (1 + write_wrap_count)) *
- 100.0);
+ // Gogogo!
+ for (uint64_t i = 0;
+ i < num_messages_ * static_cast<uint64_t>(1 + write_wrap_count);
+ ++i) {
+ char *const data = static_cast<char *>(sender.Data()) + sender.size() -
+ sizeof(ThreadPlusCount);
+ const char fill = (i + 55) & 0xFF;
+ memset(data, fill, sizeof(ThreadPlusCount));
+ {
+ bool found_nonzero = false;
+ for (size_t i = 0; i < sizeof(ThreadPlusCount); ++i) {
+ if (data[i] != fill) {
+ found_nonzero = true;
}
-
- ++started_writes_;
- sender.Send(data, sizeof(ThreadPlusCount));
- ++finished_writes_;
}
- });
+ CHECK(!found_nonzero) << ": Somebody else is writing to our buffer";
+ }
+
+ ThreadPlusCount tpc;
+ tpc.thread = thread_index;
+ tpc.count = i;
+
+ memcpy(data, &tpc, sizeof(ThreadPlusCount));
+
+ if (i % 0x800000 == 0x100000) {
+ fprintf(
+ stderr, "Sent %" PRIu64 ", %f %%\n", i,
+ static_cast<double>(i) /
+ static_cast<double>(num_messages_ * (1 + write_wrap_count)) *
+ 100.0);
+ }
+
+ ++started_writes_;
+ sender.Send(sizeof(ThreadPlusCount));
+ // Blank out the new scratch buffer, to catch other people using it.
+ {
+ char *const new_data = static_cast<char *>(sender.Data()) +
+ sender.size() - sizeof(ThreadPlusCount);
+ const char new_fill = ~fill;
+ memset(new_data, new_fill, sizeof(ThreadPlusCount));
+ }
+ ++finished_writes_;
+ }
+ });
++thread_index;
}
@@ -234,17 +246,16 @@
void QueueRacer::CheckReads(bool race_reads, int write_wrap_count,
::std::vector<ThreadState> *threads) {
// Now read back the results to double check.
- LocklessQueue queue(memory_, config_);
-
- const bool will_wrap =
- num_messages_ * num_threads_ * (1 + write_wrap_count) > queue.QueueSize();
+ LocklessQueueReader reader(queue_);
+ const bool will_wrap = num_messages_ * num_threads_ * (1 + write_wrap_count) >
+ LocklessQueueSize(queue_.memory());
monotonic_clock::time_point last_monotonic_sent_time =
monotonic_clock::epoch();
uint64_t initial_i = 0;
if (will_wrap) {
initial_i = (1 + write_wrap_count) * num_messages_ * num_threads_ -
- queue.QueueSize();
+ LocklessQueueSize(queue_.memory());
}
for (uint64_t i = initial_i;
@@ -258,27 +269,28 @@
char read_data[1024];
// Handle overflowing the message count for the wrap test.
- const uint32_t wrapped_i = i % static_cast<size_t>(QueueIndex::MaxIndex(
- 0xffffffffu, queue.QueueSize()));
- LocklessQueue::ReadResult read_result =
- queue.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
- &monotonic_remote_time, &realtime_remote_time,
- &remote_queue_index, &length, &(read_data[0]));
+ const uint32_t wrapped_i =
+ i % static_cast<size_t>(QueueIndex::MaxIndex(
+ 0xffffffffu, LocklessQueueSize(queue_.memory())));
+ LocklessQueueReader::Result read_result =
+ reader.Read(wrapped_i, &monotonic_sent_time, &realtime_sent_time,
+ &monotonic_remote_time, &realtime_remote_time,
+ &remote_queue_index, &length, &(read_data[0]));
if (race_reads) {
- if (read_result == LocklessQueue::ReadResult::NOTHING_NEW) {
+ if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
--i;
continue;
}
}
if (race_reads && will_wrap) {
- if (read_result == LocklessQueue::ReadResult::TOO_OLD) {
+ if (read_result == LocklessQueueReader::Result::TOO_OLD) {
continue;
}
}
// Every message should be good.
- ASSERT_EQ(read_result, LocklessQueue::ReadResult::GOOD) << ": i is " << i;
+ ASSERT_EQ(read_result, LocklessQueueReader::Result::GOOD) << ": i is " << i;
// And, confirm that time never went backwards.
ASSERT_GT(monotonic_sent_time, last_monotonic_sent_time);
@@ -289,7 +301,8 @@
ThreadPlusCount tpc;
ASSERT_EQ(length, sizeof(ThreadPlusCount));
- memcpy(&tpc, read_data + queue.message_data_size() - length,
+ memcpy(&tpc,
+ read_data + LocklessQueueMessageDataSize(queue_.memory()) - length,
sizeof(ThreadPlusCount));
if (will_wrap) {
@@ -303,18 +316,18 @@
if (race_reads) {
// Make sure nothing goes backwards. Really not much we can do here.
- ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
- << tpc.thread;
+ ASSERT_LE((*threads)[tpc.thread].event_count, tpc.count)
+ << ": Thread " << tpc.thread;
(*threads)[tpc.thread].event_count = tpc.count;
} else {
// Make sure nothing goes backwards. Really not much we can do here.
- ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
- << tpc.thread;
+ ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
+ << ": Thread " << tpc.thread;
}
} else {
// Confirm that we see every message counter from every thread.
- ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count) << ": Thread "
- << tpc.thread;
+ ASSERT_EQ((*threads)[tpc.thread].event_count, tpc.count)
+ << ": Thread " << tpc.thread;
}
++(*threads)[tpc.thread].event_count;
}
diff --git a/aos/ipc_lib/queue_racer.h b/aos/ipc_lib/queue_racer.h
index eaeedd4..7e92693 100644
--- a/aos/ipc_lib/queue_racer.h
+++ b/aos/ipc_lib/queue_racer.h
@@ -14,8 +14,7 @@
// them together to all write at once.
class QueueRacer {
public:
- QueueRacer(LocklessQueueMemory *memory, int num_threads,
- uint64_t num_messages, LocklessQueueConfiguration config);
+ QueueRacer(LocklessQueue queue, int num_threads, uint64_t num_messages);
// Runs an iteration of the race.
//
@@ -35,13 +34,14 @@
void RunIteration(bool race_reads, int write_wrap_count);
size_t CurrentIndex() {
- LocklessQueue queue(memory_, config_);
- return queue.LatestQueueIndex().index();
+ return LocklessQueueReader(queue_).LatestIndex().index();
}
private:
// Wipes the queue memory out so we get a clean start.
- void Reset() { memset(memory_, 0, LocklessQueueMemorySize(config_)); }
+ void Reset() {
+ memset(queue_.memory(), 0, LocklessQueueMemorySize(queue_.config()));
+ }
// This is a separate method so that when all the ASSERT_* methods, we still
// clean up all the threads. Otherwise we get an assert on the way out of
@@ -49,7 +49,7 @@
void CheckReads(bool race_reads, int write_wrap_count,
::std::vector<ThreadState> *threads);
- LocklessQueueMemory *memory_;
+ LocklessQueue queue_;
const uint64_t num_threads_;
const uint64_t num_messages_;
@@ -60,8 +60,6 @@
::std::atomic<uint64_t> started_writes_;
// Number of writes completed.
::std::atomic<uint64_t> finished_writes_;
-
- const LocklessQueueConfiguration config_;
};
} // namespace ipc_lib