Change the event-loop restriction
It's weird but acceptable to create as many fetchers as you want.
RobotState is a common thing that multiple people want. We don't want
to support senders and watchers on the same loop (do you get your own
messages?) until there's a valid use case.
Change-Id: I215f452311d3f47ffd487d78885ae2848c6ffe9b
diff --git a/aos/events/event-loop.h b/aos/events/event-loop.h
index a187981..0590d7b 100644
--- a/aos/events/event-loop.h
+++ b/aos/events/event-loop.h
@@ -59,11 +59,13 @@
const T &operator*() const { return *get(); }
const T *operator->() const { return get(); }
- // Sends the message to the queue. Should only be called once.
- void Send() {
+ // Sends the message to the queue. Should only be called once. Returns true
+ // if the message was successfully sent, and false otherwise.
+ bool Send() {
RawSender *sender = &msg_.get_deleter();
msg_->SetTimeToNow();
- sender->Send(reinterpret_cast<RawSender::SendContext *>(msg_.release()));
+ return sender->Send(
+ reinterpret_cast<RawSender::SendContext *>(msg_.release()));
}
private:
@@ -91,6 +93,10 @@
// Current time.
virtual monotonic_clock::time_point monotonic_now() = 0;
+ // Note, it is supported to create:
+ // multiple fetchers, and (one sender or one watcher) per <path, type>
+ // tuple.
+
// Makes a class that will always fetch the most recent value
// sent to path.
template <typename T>
diff --git a/aos/events/event-loop_param_test.cc b/aos/events/event-loop_param_test.cc
index 78e744a..efac8f9 100644
--- a/aos/events/event-loop_param_test.cc
+++ b/aos/events/event-loop_param_test.cc
@@ -55,44 +55,54 @@
EXPECT_TRUE(happened);
}
-// Verify that making a fetcher and handler for "/test" dies.
-TEST_P(AbstractEventLoopTest, FetcherAndHandler) {
+// Verify that making a fetcher and watcher for "/test" succeeds.
+TEST_P(AbstractEventLoopTest, FetcherAndWatcher) {
auto loop = Make();
auto fetcher = loop->MakeFetcher<TestMessage>("/test");
- EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}), "/test");
+ loop->MakeWatcher("/test", [&](const TestMessage &) {});
}
-// Verify that making 2 fetchers for "/test" fails.
+// Verify that making 2 fetchers for "/test" succeeds.
TEST_P(AbstractEventLoopTest, TwoFetcher) {
auto loop = Make();
auto fetcher = loop->MakeFetcher<TestMessage>("/test");
- EXPECT_DEATH(loop->MakeFetcher<TestMessage>("/test"), "/test");
+ auto fetcher2 = loop->MakeFetcher<TestMessage>("/test");
}
-// Verify that registering a handler twice for "/test" fails.
-TEST_P(AbstractEventLoopTest, TwoHandler) {
+// Verify that registering a watcher twice for "/test" fails.
+TEST_P(AbstractEventLoopTest, TwoWatcher) {
auto loop = Make();
loop->MakeWatcher("/test", [&](const TestMessage &) {});
- EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}), "/test");
+ EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}),
+ "/test");
+}
+
+// Verify that registering a watcher and a sender for "/test" fails.
+TEST_P(AbstractEventLoopTest, WatcherAndSender) {
+ auto loop = Make();
+ auto sender = loop->MakeSender<TestMessage>("/test");
+ EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}),
+ "/test");
}
// Verify that Quit() works when there are multiple watchers.
-TEST_P(AbstractEventLoopTest, MultipleFetcherQuit) {
- auto loop = Make();
+TEST_P(AbstractEventLoopTest, MultipleWatcherQuit) {
+ auto loop1 = Make();
+ auto loop2 = Make();
- auto sender = loop->MakeSender<TestMessage>("/test2");
+ auto sender = loop1->MakeSender<TestMessage>("/test2");
{
auto msg = sender.MakeMessage();
msg->msg_value = 200;
msg.Send();
}
- loop->MakeWatcher("/test1", [&](const TestMessage &) {});
- loop->MakeWatcher("/test2", [&](const TestMessage &message) {
+ loop2->MakeWatcher("/test1", [&](const TestMessage &) {});
+ loop2->MakeWatcher("/test2", [&](const TestMessage &message) {
EXPECT_EQ(message.msg_value, 200);
- loop->Exit();
+ loop2->Exit();
});
- loop->Run();
+ loop2->Run();
}
// Verify that timer intervals and duration function properly.
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
index 945ebc0..781c963 100644
--- a/aos/events/shm-event-loop.cc
+++ b/aos/events/shm-event-loop.cc
@@ -1,12 +1,14 @@
#include "aos/events/shm-event-loop.h"
-#include "aos/logging/logging.h"
-#include "aos/queue.h"
#include <sys/timerfd.h>
+#include <algorithm>
#include <atomic>
#include <chrono>
#include <stdexcept>
+#include "aos/logging/logging.h"
+#include "aos/queue.h"
+
namespace aos {
ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
@@ -173,13 +175,13 @@
std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
const std::string &path, const QueueTypeInfo &type) {
- Take(path);
return std::unique_ptr<RawFetcher>(new ShmFetcher(
RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
}
std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
const std::string &path, const QueueTypeInfo &type) {
+ Take(path);
return std::unique_ptr<RawSender>(new ShmSender(
RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
}
@@ -269,8 +271,12 @@
if (is_running()) {
::aos::Die("Cannot add new objects while running.\n");
}
- if (!taken_.emplace(path).second) {
- ::aos::Die("%s already has a listener / watcher.", path.c_str());
+
+ const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
+ if (prior != taken_.end()) {
+ ::aos::Die("%s is already being used.", path.c_str());
+ } else {
+ taken_.emplace_back(path);
}
}
diff --git a/aos/events/shm-event-loop.h b/aos/events/shm-event-loop.h
index 49d65f7..e8ff267 100644
--- a/aos/events/shm-event-loop.h
+++ b/aos/events/shm-event-loop.h
@@ -17,6 +17,10 @@
// Specialization of EventLoop that is built from queues running out of shared
// memory. See more details at aos/queue.h
+//
+// This object must be interacted with from one thread, but the Senders and
+// Fetchers may be used from multiple threads afterwords (as long as their
+// destructors are called back in one thread again)
class ShmEventLoop : public EventLoop {
public:
ShmEventLoop();
@@ -73,12 +77,13 @@
};
// Exclude multiple of the same type for path.
- void Take(const std::string &path);
std::vector<std::function<void()>> on_run_;
std::shared_ptr<ThreadState> thread_state_;
- std::unordered_set<std::string> taken_;
+ void Take(const std::string &path);
+
+ std::vector<::std::string> taken_;
};
} // namespace aos
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
index e82c381..a23b18c 100644
--- a/aos/events/simulated-event-loop.cc
+++ b/aos/events/simulated-event-loop.cc
@@ -1,4 +1,7 @@
#include "aos/events/simulated-event-loop.h"
+
+#include <algorithm>
+
#include "aos/queue.h"
namespace aos {
@@ -80,13 +83,13 @@
std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
const std::string &path, const QueueTypeInfo &type) {
+ Take(path);
::std::pair<::std::string, QueueTypeInfo> key(path, type);
return GetSimulatedQueue(key)->MakeRawSender();
}
std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
const std::string &path, const QueueTypeInfo &type) {
- Take(path);
::std::pair<::std::string, QueueTypeInfo> key(path, type);
return GetSimulatedQueue(key)->MakeRawFetcher();
}
@@ -116,12 +119,15 @@
return std::unique_ptr<RawFetcher>(new SimulatedFetcher(this));
}
-void SimulatedEventLoop::Take(const std::string &path) {
+void SimulatedEventLoop::Take(const ::std::string &path) {
if (is_running()) {
::aos::Die("Cannot add new objects while running.\n");
}
- if (!taken_.emplace(path).second) {
- ::aos::Die("%s already has a listener / watcher.", path.c_str());
+ const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
+ if (prior != taken_.end()) {
+ ::aos::Die("%s is already being used.", path.c_str());
+ } else {
+ taken_.emplace_back(path);
}
}
} // namespace aos
diff --git a/aos/events/simulated-event-loop.h b/aos/events/simulated-event-loop.h
index 79b48b8..b5c94bf 100644
--- a/aos/events/simulated-event-loop.h
+++ b/aos/events/simulated-event-loop.h
@@ -212,8 +212,7 @@
std::function<void(const aos::Message *message)> watcher) override;
TimerHandler *AddTimer(::std::function<void()> callback) override {
- timers_.emplace_back(
- new SimulatedTimerHandler(scheduler_, callback));
+ timers_.emplace_back(new SimulatedTimerHandler(scheduler_, callback));
return timers_.back().get();
}
@@ -232,19 +231,20 @@
SimulatedQueue *GetSimulatedQueue(
const ::std::pair<::std::string, QueueTypeInfo> &);
- void Take(const std::string &path);
+ void Take(const ::std::string &path);
private:
EventScheduler *scheduler_;
- std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue> *queues_;
- std::unordered_set<std::string> taken_;
- std::vector<std::unique_ptr<TimerHandler>> timers_;
+ ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
+ *queues_;
+ ::std::vector<std::string> taken_;
+ ::std::vector<std::unique_ptr<TimerHandler>> timers_;
};
class SimulatedEventLoopFactory {
public:
- std::unique_ptr<EventLoop> CreateEventLoop() {
- return std::unique_ptr<EventLoop>(
+ ::std::unique_ptr<EventLoop> CreateEventLoop() {
+ return ::std::unique_ptr<EventLoop>(
new SimulatedEventLoop(&scheduler_, &queues_));
}