Allow multiple senders on the same channel
I will do this to allow giving the kernel multiple buffers for image
frames.
Also deduplicate some infrastructure between EventLoop implementations.
Change-Id: Ifd2c9a29747481ea36be1654960dcf86303a11f4
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 6f1f453..a183cae 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -61,6 +61,7 @@
"//aos/time",
"//aos/util:phased_loop",
"@com_github_google_flatbuffers//:flatbuffers",
+ "@com_google_absl//absl/container:btree",
],
)
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index a370ca8..e368df2 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -127,6 +127,35 @@
return watchers_.back().get();
}
+void EventLoop::TakeWatcher(const Channel *channel) {
+ CHECK(!is_running()) << ": Cannot add new objects while running.";
+ ChannelIndex(channel);
+
+ CHECK(taken_senders_.find(channel) == taken_senders_.end())
+ << ": " << FlatbufferToJson(channel) << " is already being used.";
+
+ auto result = taken_watchers_.insert(channel);
+ CHECK(result.second) << ": " << FlatbufferToJson(channel)
+ << " is already being used.";
+
+ if (!configuration::ChannelIsReadableOnNode(channel, node())) {
+ LOG(FATAL) << ": " << FlatbufferToJson(channel)
+ << " is not able to be watched on this node. Check your "
+ "configuration.";
+ }
+}
+
+void EventLoop::TakeSender(const Channel *channel) {
+ CHECK(!is_running()) << ": Cannot add new objects while running.";
+ ChannelIndex(channel);
+
+ CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
+ << ": Channel " << FlatbufferToJson(channel) << " is already being used.";
+
+ // We don't care if this is a duplicate.
+ taken_senders_.insert(channel);
+}
+
void EventLoop::SendTimingReport() {
// We need to do a fancy dance here to get all the accounting to work right.
// We want to copy the memory here, but then send after resetting. Otherwise
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index ea2bf6e..4a12096 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -14,6 +14,8 @@
#include "aos/json_to_flatbuffer.h"
#include "aos/time/time.h"
#include "aos/util/phased_loop.h"
+
+#include "absl/container/btree_set.h"
#include "flatbuffers/flatbuffers.h"
#include "glog/logging.h"
@@ -499,6 +501,11 @@
void DeleteFetcher(RawFetcher *fetcher);
WatcherState *NewWatcher(std::unique_ptr<WatcherState> watcher);
+ // Tracks that we have a (single) watcher on the given channel.
+ void TakeWatcher(const Channel *channel);
+ // Tracks that we have at least one sender on the given channel.
+ void TakeSender(const Channel *channel);
+
std::vector<RawSender *> senders_;
std::vector<RawFetcher *> fetchers_;
@@ -536,6 +543,8 @@
// If true, don't send out timing reports.
bool skip_timing_report_ = false;
+
+ absl::btree_set<const Channel *> taken_watchers_, taken_senders_;
};
} // namespace aos
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index e1e05a2..b72a8e2 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -45,6 +45,47 @@
EXPECT_TRUE(happened);
}
+// Tests that watcher can receive messages from two senders.
+// Also tests that OnRun() works.
+TEST_P(AbstractEventLoopTest, BasicTwoSenders) {
+ auto loop1 = Make();
+ auto loop2 = MakePrimary();
+
+ aos::Sender<TestMessage> sender1 = loop1->MakeSender<TestMessage>("/test");
+ aos::Sender<TestMessage> sender2 = loop1->MakeSender<TestMessage>("/test");
+
+ bool happened = false;
+
+ loop2->OnRun([&]() {
+ happened = true;
+
+ {
+ aos::Sender<TestMessage>::Builder msg = sender1.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ }
+ {
+ aos::Sender<TestMessage>::Builder msg = sender2.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ }
+ });
+
+ int messages_received = 0;
+ loop2->MakeWatcher("/test", [&](const TestMessage &message) {
+ EXPECT_EQ(message.value(), 200);
+ this->Exit();
+ ++messages_received;
+ });
+
+ EXPECT_FALSE(happened);
+ Run();
+ EXPECT_TRUE(happened);
+ EXPECT_EQ(messages_received, 2);
+}
+
// Tests that a fetcher can fetch from a sender.
// Also tests that OnRun() works.
TEST_P(AbstractEventLoopTest, FetchWithoutRun) {
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 60e8216..c2c9884 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -36,13 +36,6 @@
<< ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
<< T::GetFullyQualifiedName() << "\" } not found in config.";
- if (!configuration::ChannelIsReadableOnNode(channel, node())) {
- LOG(FATAL) << "Channel { \"name\": \"" << channel_name << "\", \"type\": \""
- << T::GetFullyQualifiedName()
- << "\" } is not able to be watched on this node. Check your "
- "configuration.";
- }
-
return MakeRawWatcher(
channel, [this, w](const Context &context, const void *message) {
context_ = context;
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 8824181..cc11520 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -323,7 +323,10 @@
ipc_lib::LocklessQueue::empty_queue_index();
struct AlignedChar {
- alignas(32) char data;
+ // Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
+ // cache lines.
+ // V4L2 requires 64 byte alignment for USERPTR.
+ alignas(64) char data;
};
std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
@@ -591,7 +594,7 @@
::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
const Channel *channel) {
- Take(channel);
+ TakeSender(channel);
return ::std::unique_ptr<RawSender>(new internal::ShmSender(this, channel));
}
@@ -599,14 +602,7 @@
void ShmEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &context, const void *message)> watcher) {
- Take(channel);
-
- if (!configuration::ChannelIsReadableOnNode(channel, node())) {
- LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
- << "\", \"type\": \"" << channel->type()->string_view()
- << "\" } is not able to be watched on this node. Check your "
- "configuration.";
- }
+ TakeWatcher(channel);
NewWatcher(::std::unique_ptr<WatcherState>(
new internal::WatcherState(this, channel, std::move(watcher))));
@@ -828,19 +824,6 @@
CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
}
-void ShmEventLoop::Take(const Channel *channel) {
- CHECK(!is_running()) << ": Cannot add new objects while running.";
-
- // Cheat aggresively. Use the shared memory path as a proxy for a unique
- // identifier for the channel.
- const std::string path = ShmPath(channel);
-
- const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
- CHECK(prior == taken_.end()) << ": " << path << " is already being used.";
-
- taken_.emplace_back(path);
-}
-
void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
if (is_running()) {
LOG(FATAL) << "Cannot set realtime priority while running.";
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index c888e57..52bb338 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -79,10 +79,6 @@
void HandleEvent();
- // Tracks that we can't have multiple watchers or a sender and a watcher (or
- // multiple senders) on a single queue (path).
- void Take(const Channel *channel);
-
// Returns the TID of the event loop.
pid_t GetTid() override;
@@ -90,7 +86,6 @@
int priority_ = 0;
std::string name_;
const Node *const node_;
- std::vector<std::string> taken_;
internal::EPoll epoll_;
};
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index d783a3a..e34a23f 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -5,7 +5,6 @@
#include <string_view>
#include "absl/container/btree_map.h"
-#include "absl/container/btree_set.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/util/phased_loop.h"
@@ -16,7 +15,7 @@
struct SimulatedMessage {
// Struct to let us force data to be well aligned.
struct OveralignedChar {
- char data alignas(32);
+ char data alignas(64);
};
// Context for the data.
@@ -406,8 +405,6 @@
SimulatedChannel *GetSimulatedChannel(const Channel *channel);
- void Take(const Channel *channel);
-
void SetRuntimeRealtimePriority(int priority) override {
CHECK(!is_running()) << ": Cannot set realtime priority while running.";
priority_ = priority;
@@ -439,7 +436,6 @@
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
*raw_event_loops_;
- absl::btree_set<SimpleChannel> taken_;
::std::string name_;
@@ -470,15 +466,7 @@
void SimulatedEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &channel, const void *message)> watcher) {
- ChannelIndex(channel);
- Take(channel);
-
- if (!configuration::ChannelIsReadableOnNode(channel, node())) {
- LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
- << "\", \"type\": \"" << channel->type()->string_view()
- << "\" } is not able to be watched on this node. Check your "
- "configuration.";
- }
+ TakeWatcher(channel);
std::unique_ptr<SimulatedWatcher> shm_watcher(
new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
@@ -489,8 +477,8 @@
std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
const Channel *channel) {
- ChannelIndex(channel);
- Take(channel);
+ TakeSender(channel);
+
return GetSimulatedChannel(channel)->MakeRawSender(this);
}
@@ -719,14 +707,6 @@
simulated_event_loop_->AddEvent(&event_);
}
-void SimulatedEventLoop::Take(const Channel *channel) {
- CHECK(!is_running()) << ": Cannot add new objects while running.";
-
- auto result = taken_.insert(SimpleChannel(channel));
- CHECK(result.second) << ": " << FlatbufferToJson(channel)
- << " is already being used.";
-}
-
SimulatedEventLoopFactory::SimulatedEventLoopFactory(
const Configuration *configuration)
: configuration_(CHECK_NOTNULL(configuration)), node_(nullptr) {