Convert aos over to flatbuffers
Everything builds, and all the tests pass. I suspect that some entries
are missing from the config files, but those will be found pretty
quickly on startup.
There is no logging or live introspection of queue messages.
Change-Id: I496ee01ed68f202c7851bed7e8786cee30df29f5
diff --git a/aos/events/BUILD b/aos/events/BUILD
index b69fc66..17ed688 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -1,3 +1,19 @@
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+
+package(default_visibility = ["//visibility:public"])
+
+flatbuffer_cc_library(
+ name = "test_message_fbs",
+ srcs = ["test_message.fbs"],
+ gen_reflections = 1,
+)
+
+flatbuffer_cc_library(
+ name = "pingpong_fbs",
+ srcs = ["pingpong.fbs"],
+ gen_reflections = 1,
+)
+
cc_library(
name = "epoll",
srcs = ["epoll.cc"],
@@ -10,72 +26,97 @@
)
cc_library(
- name = "raw-event-loop",
- hdrs = ["raw-event-loop.h"],
- deps = [
- "//aos:queues",
- "//aos/time",
- ],
-)
-
-cc_library(
- name = "event-loop",
- srcs = ["event-loop-tmpl.h"],
+ name = "event_loop",
+ srcs = ["event_loop_tmpl.h"],
hdrs = [
- "event-loop.h",
- "raw-event-loop.h",
+ "event_loop.h",
],
visibility = ["//visibility:public"],
deps = [
- ":raw-event-loop",
- "//aos:queues",
+ "//aos:configuration",
+ "//aos:configuration_fbs",
+ "//aos:flatbuffers",
"//aos/time",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ ],
+)
+
+cc_binary(
+ name = "ping",
+ srcs = [
+ "ping.cc",
+ ],
+ data = ["config.fb.json"],
+ deps = [
+ ":pingpong_fbs",
+ ":shm_event_loop",
+ "//aos:configuration",
+ "//aos:init",
+ "//aos:json_to_flatbuffer",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
+cc_binary(
+ name = "pong",
+ srcs = [
+ "pong.cc",
+ ],
+ data = ["config.fb.json"],
+ deps = [
+ ":pingpong_fbs",
+ ":shm_event_loop",
+ "//aos:configuration",
+ "//aos:init",
+ "//aos:json_to_flatbuffer",
+ "@com_github_google_glog//:glog",
],
)
cc_library(
- name = "shm-event-loop",
- srcs = ["shm-event-loop.cc"],
- hdrs = ["shm-event-loop.h"],
+ name = "shm_event_loop",
+ srcs = ["shm_event_loop.cc"],
+ hdrs = ["shm_event_loop.h"],
visibility = ["//visibility:public"],
deps = [
":epoll",
- ":event-loop",
- "//aos:init",
- "//aos:queues",
- "//aos/logging",
+ ":event_loop",
+ ":test_message_fbs",
+ "//aos:realtime",
+ "//aos/ipc_lib:lockless_queue",
+ "//aos/ipc_lib:signalfd",
"//aos/util:phased_loop",
],
)
cc_test(
- name = "shm-event-loop_test",
- srcs = ["shm-event-loop_test.cc"],
+ name = "shm_event_loop_test",
+ srcs = ["shm_event_loop_test.cc"],
shard_count = 5,
deps = [
- ":event-loop_param_test",
- ":shm-event-loop",
- "//aos/testing:test_shm",
+ ":event_loop_param_test",
+ ":shm_event_loop",
+ ":test_message_fbs",
],
)
cc_library(
- name = "event-loop_param_test",
+ name = "event_loop_param_test",
testonly = True,
- srcs = ["event-loop_param_test.cc"],
- hdrs = ["event-loop_param_test.h"],
+ srcs = ["event_loop_param_test.cc"],
+ hdrs = ["event_loop_param_test.h"],
deps = [
- ":event-loop",
- "//aos/logging:queue_logging",
+ ":event_loop",
+ ":test_message_fbs",
"//aos/testing:googletest",
],
)
cc_test(
name = "simulated_event_loop_test",
- srcs = ["simulated-event-loop_test.cc"],
+ srcs = ["simulated_event_loop_test.cc"],
deps = [
- ":event-loop_param_test",
+ ":event_loop_param_test",
":simulated_event_loop",
"//aos/testing:googletest",
],
@@ -84,14 +125,19 @@
cc_library(
name = "simulated_event_loop",
testonly = True,
- srcs = ["simulated-event-loop.cc"],
- hdrs = ["simulated-event-loop.h"],
+ srcs = [
+ "event_scheduler.cc",
+ "simulated_event_loop.cc",
+ ],
+ hdrs = [
+ "event_scheduler.h",
+ "simulated_event_loop.h",
+ ],
visibility = ["//visibility:public"],
deps = [
- ":event-loop",
- "//aos:queues",
- "//aos/logging",
- "//aos/testing:test_logging",
+ ":event_loop",
+ "//aos/ipc_lib:index",
"//aos/util:phased_loop",
+ "@com_google_absl//absl/container:btree",
],
)
diff --git a/aos/events/config.fb.json b/aos/events/config.fb.json
new file mode 100644
index 0000000..4068243
--- /dev/null
+++ b/aos/events/config.fb.json
@@ -0,0 +1,12 @@
+{
+ "channels": [
+ {
+ "name": "/test",
+ "type": "aos.examples.Ping"
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Pong"
+ }
+ ]
+}
diff --git a/aos/events/epoll.h b/aos/events/epoll.h
index f6ebe95..0f335e5 100644
--- a/aos/events/epoll.h
+++ b/aos/events/epoll.h
@@ -6,6 +6,7 @@
#include <sys/timerfd.h>
#include <unistd.h>
#include <atomic>
+#include <functional>
#include <vector>
#include "aos/time/time.h"
diff --git a/aos/events/event-loop-tmpl.h b/aos/events/event-loop-tmpl.h
deleted file mode 100644
index 4c53be3..0000000
--- a/aos/events/event-loop-tmpl.h
+++ /dev/null
@@ -1,40 +0,0 @@
-#ifndef _AOS_EVENTS_EVENT_LOOP_TMPL_H_
-#define _AOS_EVENTS_EVENT_LOOP_TMPL_H_
-
-#include <type_traits>
-#include "aos/events/event-loop.h"
-
-namespace aos {
-
-// From a watch functor, this will extract the message type of the argument.
-// This is the template forward declaration, and it extracts the call operator
-// as a PTMF to be used by the following specialization.
-template <class T>
-struct watch_message_type_trait
- : watch_message_type_trait<decltype(&T::operator())> {};
-
-// From a watch functor, this will extract the message type of the argument.
-// This is the template specialization.
-template <class ClassType, class ReturnType, class A1>
-struct watch_message_type_trait<ReturnType (ClassType::*)(A1) const> {
- using message_type = typename std::decay<A1>::type;
-};
-
-template <typename T>
-typename Sender<T>::Message Sender<T>::MakeMessage() {
- return Message(sender_.get());
-}
-
-template <typename Watch>
-void EventLoop::MakeWatcher(const std::string &path, Watch &&w) {
- using T = typename watch_message_type_trait<Watch>::message_type;
-
- return MakeRawWatcher(path, QueueTypeInfo::Get<T>(),
- [w](const Message *message) {
- w(*reinterpret_cast<const T *>(message));
- });
-}
-
-} // namespace aos
-
-#endif // _AOS_EVENTS_EVENT_LOOP_TMPL_H
diff --git a/aos/events/event-loop.h b/aos/events/event-loop.h
deleted file mode 100644
index 4d6fda5..0000000
--- a/aos/events/event-loop.h
+++ /dev/null
@@ -1,143 +0,0 @@
-#ifndef _AOS_EVENTS_EVENT_LOOP_H_
-#define _AOS_EVENTS_EVENT_LOOP_H_
-
-#include <string>
-#include "aos/queue.h"
-#include "aos/time/time.h"
-#include "aos/events/raw-event-loop.h"
-
-namespace aos {
-
-// Fetches the newest message from a queue.
-template <typename T>
-class Fetcher {
- public:
- Fetcher() {}
- // Fetches the next message. Returns true if it fetched a new message. This
- // method will only return messages sent after the Fetcher was created.
- bool FetchNext() { return fetcher_->FetchNext(); }
- // Fetches the most recent message. Returns true if it fetched a new message.
- // This will return the latest message regardless of if it was sent before or
- // after the fetcher was created.
- bool Fetch() { return fetcher_->Fetch(); }
-
- const T *get() const {
- return reinterpret_cast<const T *>(fetcher_->most_recent());
- }
- const T &operator*() const { return *get(); }
- const T *operator->() const { return get(); }
-
- private:
- friend class EventLoop;
- Fetcher(::std::unique_ptr<RawFetcher> fetcher) : fetcher_(::std::move(fetcher)) {}
- ::std::unique_ptr<RawFetcher> fetcher_;
-};
-
-// Sends messages to a queue.
-template <typename T>
-class Sender {
- public:
- typedef T Type;
-
- Sender() {}
-
- // Represents a single message about to be sent to the queue.
- // The lifecycle goes:
- //
- // Message msg = sender.MakeMessage();
- // Populate(msg.get());
- // msg.Send();
- //
- // Or:
- //
- // Message msg = sender.MakeMessage();
- // PopulateOrNot(msg.get());
- class Message {
- public:
- Message(RawSender *sender)
- : msg_(reinterpret_cast<T *>(sender->GetMessage()), *sender) {
- msg_->Zero();
- }
-
- T *get() { return msg_.get(); }
- const T *get() const { return msg_.get(); }
- T &operator*() { return *get(); }
- T *operator->() { return get(); }
- const T &operator*() const { return *get(); }
- const T *operator->() const { return get(); }
-
- // Sends the message to the queue. Should only be called once. Returns true
- // if the message was successfully sent, and false otherwise.
- bool Send() {
- RawSender *sender = &msg_.get_deleter();
- return sender->Send(msg_.release());
- }
-
- private:
- std::unique_ptr<T, RawSender &> msg_;
- };
-
- // Constructs an above message.
- Message MakeMessage();
-
- // Returns the name of the underlying queue.
- const char *name() const { return sender_->name(); }
-
- private:
- friend class EventLoop;
- Sender(::std::unique_ptr<RawSender> sender) : sender_(::std::move(sender)) {}
- ::std::unique_ptr<RawSender> sender_;
-};
-
-// TODO(parker): Consider making EventLoop wrap a RawEventLoop rather than
-// inheriting.
-class EventLoop : public RawEventLoop {
- public:
- virtual ~EventLoop() {}
-
- // Current time.
- virtual monotonic_clock::time_point monotonic_now() = 0;
-
- // Note, it is supported to create:
- // multiple fetchers, and (one sender or one watcher) per <path, type>
- // tuple.
-
- // Makes a class that will always fetch the most recent value
- // sent to path.
- template <typename T>
- Fetcher<T> MakeFetcher(const ::std::string &path) {
- return Fetcher<T>(MakeRawFetcher(path, QueueTypeInfo::Get<T>()));
- }
-
- // Makes class that allows constructing and sending messages to
- // address path.
- template <typename T>
- Sender<T> MakeSender(const ::std::string &path) {
- return Sender<T>(MakeRawSender(path, QueueTypeInfo::Get<T>()));
- }
-
- // Watch is a functor that have a call signature like so:
- // void Event(const MessageType& type);
- //
- // This will watch messages sent to path.
- // Note that T needs to match both send and recv side.
- // TODO(parker): Need to support ::std::bind. For now, use lambdas.
- template <typename Watch>
- void MakeWatcher(const ::std::string &path, Watch &&w);
-
- // The passed in function will be called when the event loop starts.
- // Use this to run code once the thread goes into "real-time-mode",
- virtual void OnRun(::std::function<void()>) = 0;
-
- // TODO(austin): OnExit
-
- // Sets the scheduler priority to run the event loop at. This may not be
- // called after we go into "real-time-mode".
- virtual void SetRuntimeRealtimePriority(int priority) = 0;
-};
-
-} // namespace aos
-
-#include "aos/events/event-loop-tmpl.h"
-
-#endif // _AOS_EVENTS_EVENT_LOOP_H
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
new file mode 100644
index 0000000..f614517
--- /dev/null
+++ b/aos/events/event_loop.h
@@ -0,0 +1,328 @@
+#ifndef AOS_EVENTS_EVENT_LOOP_H_
+#define AOS_EVENTS_EVENT_LOOP_H_
+
+#include <atomic>
+#include <string>
+
+#include "absl/strings/string_view.h"
+#include "aos/configuration.h"
+#include "aos/configuration_generated.h"
+#include "aos/flatbuffers.h"
+#include "aos/json_to_flatbuffer.h"
+#include "aos/time/time.h"
+#include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+// Struct available on Watchers and Fetchers with context about the current
+// message.
+struct Context {
+ // Time that the message was sent.
+ monotonic_clock::time_point monotonic_sent_time;
+ realtime_clock::time_point realtime_sent_time;
+ // Index in the queue.
+ uint32_t queue_index;
+ // Size of the data sent.
+ size_t size;
+ // Pointer to the data.
+ void *data;
+};
+
+// Raw version of fetcher. Contains a local variable that the fetcher will
+// update. This is used for reflection and as an interface to implement typed
+// fetchers.
+class RawFetcher {
+ public:
+ RawFetcher() {}
+ virtual ~RawFetcher() {}
+
+ // Non-blocking fetch of the next message in the queue. Returns true if there
+ // was a new message and we got it.
+ virtual bool FetchNext() = 0;
+
+ // Non-blocking fetch of the latest message:
+ virtual bool Fetch() = 0;
+
+ // Returns a pointer to data in the most recent message, or nullptr if there
+ // is no message.
+ const void *most_recent_data() const { return data_; }
+
+ const Context &context() const { return context_; }
+
+ protected:
+ RawFetcher(const RawFetcher &) = delete;
+ RawFetcher &operator=(const RawFetcher &) = delete;
+
+ void *data_ = nullptr;
+ Context context_;
+};
+
+// Raw version of sender. Sends a block of data. This is used for reflection
+// and as a building block to implement typed senders.
+class RawSender {
+ public:
+ RawSender() {}
+ virtual ~RawSender() {}
+
+ // Sends a message without copying it. The users starts by copying up to
+ // size() bytes into the data backed by data(). They then call Send to send.
+ // Returns true on a successful send.
+ virtual void *data() = 0;
+ virtual size_t size() = 0;
+ virtual bool Send(size_t size) = 0;
+
+ // Sends a single block of data by copying it.
+ virtual bool Send(void *data, size_t size) = 0;
+
+ // Returns the name of this sender.
+ virtual const absl::string_view name() const = 0;
+
+ protected:
+ RawSender(const RawSender &) = delete;
+ RawSender &operator=(const RawSender &) = delete;
+};
+
+
+// Fetches the newest message from a channel.
+// This provides a polling based interface for channels.
+template <typename T>
+class Fetcher {
+ public:
+ Fetcher() {}
+
+ // Fetches the next message. Returns true if it fetched a new message. This
+ // method will only return messages sent after the Fetcher was created.
+ bool FetchNext() { return fetcher_->FetchNext(); }
+
+ // Fetches the most recent message. Returns true if it fetched a new message.
+ // This will return the latest message regardless of if it was sent before or
+ // after the fetcher was created.
+ bool Fetch() { return fetcher_->Fetch(); }
+
+ // Returns a pointer to the contained flatbuffer, or nullptr if there is no
+ // available message.
+ const T *get() const {
+ return fetcher_->most_recent_data() != nullptr
+ ? flatbuffers::GetRoot<T>(reinterpret_cast<const char *>(
+ fetcher_->most_recent_data()))
+ : nullptr;
+ }
+
+ // Returns the context holding timestamps and other metadata about the
+ // message.
+ const Context &context() const { return fetcher_->context(); }
+
+ const T &operator*() const { return *get(); }
+ const T *operator->() const { return get(); }
+
+ private:
+ friend class EventLoop;
+ Fetcher(::std::unique_ptr<RawFetcher> fetcher)
+ : fetcher_(::std::move(fetcher)) {}
+ ::std::unique_ptr<RawFetcher> fetcher_;
+};
+
+// Sends messages to a channel.
+template <typename T>
+class Sender {
+ public:
+ Sender() {}
+
+ // Represents a single message about to be sent to the queue.
+ // The lifecycle goes:
+ //
+ // Builder builder = sender.MakeBuilder();
+ // T::Builder t_builder = builder.MakeBuilder<T>();
+ // Populate(&t_builder);
+ // builder.Send(t_builder.Finish());
+ class Builder {
+ public:
+ Builder(RawSender *sender, void *data, size_t size)
+ : alloc_(data, size), fbb_(size, &alloc_), sender_(sender) {
+ fbb_.ForceDefaults(1);
+ }
+
+ flatbuffers::FlatBufferBuilder *fbb() { return &fbb_; }
+
+ template <typename T2>
+ typename T2::Builder MakeBuilder() {
+ return typename T2::Builder(fbb_);
+ }
+
+ bool Send(flatbuffers::Offset<T> offset) {
+ fbb_.Finish(offset);
+ return sender_->Send(fbb_.GetSize());
+ }
+
+ // CHECKs that this message was sent.
+ void CheckSent() { fbb_.Finished(); }
+
+ private:
+ PreallocatedAllocator alloc_;
+ flatbuffers::FlatBufferBuilder fbb_;
+ RawSender *sender_;
+ };
+
+ // Constructs an above builder.
+ Builder MakeBuilder();
+
+ // Returns the name of the underlying queue.
+ const absl::string_view name() const { return sender_->name(); }
+
+ private:
+ friend class EventLoop;
+ Sender(std::unique_ptr<RawSender> sender) : sender_(std::move(sender)) {}
+ std::unique_ptr<RawSender> sender_;
+};
+
+// Interface for timers
+class TimerHandler {
+ public:
+ virtual ~TimerHandler() {}
+
+ // Timer should sleep until base, base + offset, base + offset * 2, ...
+ // If repeat_offset isn't set, the timer only expires once.
+ virtual void Setup(monotonic_clock::time_point base,
+ monotonic_clock::duration repeat_offset =
+ ::aos::monotonic_clock::zero()) = 0;
+
+ // Stop future calls to callback().
+ virtual void Disable() = 0;
+};
+
+// Interface for phased loops. They are built on timers.
+class PhasedLoopHandler {
+ public:
+ virtual ~PhasedLoopHandler() {}
+
+ // Sets the interval and offset. Any changes to interval and offset only take
+ // effect when the handler finishes running.
+ virtual void set_interval_and_offset(
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset) = 0;
+};
+
+// TODO(austin): Ping pong example apps, and then start doing introspection.
+// TODO(austin): Timing reporter. Publish statistics on latencies of
+// handlers.
+class EventLoop {
+ public:
+ EventLoop(const Configuration *configuration)
+ : configuration_(configuration) {}
+
+ virtual ~EventLoop() {}
+
+ // Current time.
+ virtual monotonic_clock::time_point monotonic_now() = 0;
+ virtual realtime_clock::time_point realtime_now() = 0;
+
+ // Note, it is supported to create:
+ // multiple fetchers, and (one sender or one watcher) per <name, type>
+ // tuple.
+
+ // Makes a class that will always fetch the most recent value
+ // sent to the provided channel.
+ template <typename T>
+ Fetcher<T> MakeFetcher(const absl::string_view channel_name) {
+ const Channel *channel = configuration::GetChannel(
+ configuration_, channel_name, T::GetFullyQualifiedName(), name());
+ CHECK(channel != nullptr)
+ << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
+ << T::GetFullyQualifiedName() << "\" } not found in config.";
+
+ return Fetcher<T>(MakeRawFetcher(channel));
+ }
+
+ // Makes class that allows constructing and sending messages to
+ // the provided channel.
+ template <typename T>
+ Sender<T> MakeSender(const absl::string_view channel_name) {
+ const Channel *channel = configuration::GetChannel(
+ configuration_, channel_name, T::GetFullyQualifiedName(), name());
+ CHECK(channel != nullptr)
+ << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
+ << T::GetFullyQualifiedName() << "\" } not found in config.";
+
+ return Sender<T>(MakeRawSender(channel));
+ }
+
+ // This will watch messages sent to the provided channel.
+ //
+ // Watch is a functor that have a call signature like so:
+ // void Event(const MessageType& type);
+ //
+ // TODO(parker): Need to support ::std::bind. For now, use lambdas.
+ // TODO(austin): Do we need a functor? Or is a std::function good enough?
+ template <typename Watch>
+ void MakeWatcher(const absl::string_view name, Watch &&w);
+
+ // The passed in function will be called when the event loop starts.
+ // Use this to run code once the thread goes into "real-time-mode",
+ virtual void OnRun(::std::function<void()> on_run) = 0;
+
+ // Sets the name of the event loop. This is the application name.
+ virtual void set_name(const absl::string_view name) = 0;
+ // Gets the name of the event loop.
+ virtual const absl::string_view name() const = 0;
+
+ // Creates a timer that executes callback when the timer expires
+ // Returns a TimerHandle for configuration of the timer
+ virtual TimerHandler *AddTimer(::std::function<void()> callback) = 0;
+
+ // Creates a timer that executes callback periodically at the specified
+ // interval and offset. Returns a PhasedLoopHandler for interacting with the
+ // timer.
+ virtual PhasedLoopHandler *AddPhasedLoop(
+ ::std::function<void(int)> callback,
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset = ::std::chrono::seconds(0)) = 0;
+
+ // TODO(austin): OnExit
+
+ // Threadsafe.
+ bool is_running() const { return is_running_.load(); }
+
+ // Sets the scheduler priority to run the event loop at. This may not be
+ // called after we go into "real-time-mode".
+ virtual void SetRuntimeRealtimePriority(int priority) = 0;
+
+ // Fetches new messages from the provided channel (path, type). Note: this
+ // channel must be a member of the exact configuration object this was built
+ // with.
+ virtual std::unique_ptr<RawFetcher> MakeRawFetcher(
+ const Channel *channel) = 0;
+
+ // Will watch channel (name, type) for new messages
+ virtual void MakeRawWatcher(
+ const Channel *channel,
+ std::function<void(const Context &context, const void *message)>
+ watcher) = 0;
+
+ // Returns the context for the current message.
+ // TODO(austin): Fill out whatever is useful for timers.
+ const Context &context() const { return context_; }
+
+ // Returns the configuration that this event loop was built with.
+ const Configuration *configuration() const { return configuration_; }
+
+ protected:
+ void set_is_running(bool value) { is_running_.store(value); }
+
+ // Will send new messages from channel (path, type).
+ virtual std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) = 0;
+
+ private:
+ ::std::atomic<bool> is_running_{false};
+
+ // Context available for watchers.
+ Context context_;
+
+ const Configuration *configuration_;
+};
+
+} // namespace aos
+
+#include "aos/events/event_loop_tmpl.h"
+
+#endif // AOS_EVENTS_EVENT_LOOP_H
diff --git a/aos/events/event-loop_param_test.cc b/aos/events/event_loop_param_test.cc
similarity index 64%
rename from aos/events/event-loop_param_test.cc
rename to aos/events/event_loop_param_test.cc
index 04343cc..8a0f9b6 100644
--- a/aos/events/event-loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -1,11 +1,13 @@
-#include "aos/events/event-loop_param_test.h"
+#include "aos/events/event_loop_param_test.h"
#include <chrono>
+#include "glog/logging.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "glog/logging.h"
+#include "aos/events/test_message_generated.h"
namespace aos {
namespace testing {
@@ -13,39 +15,27 @@
namespace chrono = ::std::chrono;
} // namespace
-struct TestMessage : public ::aos::Message {
- enum { kQueueLength = 100, kHash = 0x696c0cdc };
- int msg_value;
-
- void Zero() {
- ::aos::Message::Zero();
- msg_value = 0;
- }
- static size_t Size() { return 1 + ::aos::Message::Size(); }
- size_t Print(char *buffer, size_t length) const;
- TestMessage() { Zero(); }
-};
-
// Tests that watcher can receive messages from a sender.
// Also tests that OnRun() works.
TEST_P(AbstractEventLoopTest, Basic) {
auto loop1 = Make();
auto loop2 = MakePrimary();
- auto sender = loop1->MakeSender<TestMessage>("/test");
+ aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test");
bool happened = false;
loop2->OnRun([&]() {
happened = true;
- auto msg = sender.MakeMessage();
- msg->msg_value = 200;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
});
loop2->MakeWatcher("/test", [&](const TestMessage &message) {
- EXPECT_EQ(message.msg_value, 200);
+ EXPECT_EQ(message.value(), 200);
this->Exit();
});
@@ -67,13 +57,14 @@
EXPECT_FALSE(fetcher.Fetch());
- auto msg = sender.MakeMessage();
- msg->msg_value = 200;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
EXPECT_TRUE(fetcher.Fetch());
ASSERT_FALSE(fetcher.get() == nullptr);
- EXPECT_EQ(fetcher->msg_value, 200);
+ EXPECT_EQ(fetcher.get()->value(), 200);
}
// Tests that watcher will receive all messages sent if they are sent after
@@ -87,7 +78,7 @@
::std::vector<int> values;
loop2->MakeWatcher("/test", [&](const TestMessage &message) {
- values.push_back(message.msg_value);
+ values.push_back(message.value());
if (values.size() == 2) {
this->Exit();
}
@@ -95,21 +86,24 @@
// Before Run, should be ignored.
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 199;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(199);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
loop2->OnRun([&]() {
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 200;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 201;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(201);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
});
@@ -129,18 +123,20 @@
::std::vector<int> values;
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 200;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 201;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(201);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
loop2->MakeWatcher("/test", [&](const TestMessage &message) {
- values.push_back(message.msg_value);
+ values.push_back(message.value());
});
// Add a timer to actually quit.
@@ -164,20 +160,22 @@
::std::vector<int> values;
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 200;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 201;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(201);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
// Add a timer to actually quit.
auto test_timer = loop2->AddTimer([&fetcher, &values, this]() {
while (fetcher.FetchNext()) {
- values.push_back(fetcher->msg_value);
+ values.push_back(fetcher.get()->value());
}
this->Exit();
});
@@ -200,14 +198,16 @@
::std::vector<int> values;
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 200;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 201;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(201);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
@@ -215,7 +215,7 @@
// Add a timer to actually quit.
auto test_timer = loop2->AddTimer([&fetcher, &values, this]() {
while (fetcher.FetchNext()) {
- values.push_back(fetcher->msg_value);
+ values.push_back(fetcher.get()->value());
}
this->Exit();
});
@@ -239,14 +239,16 @@
::std::vector<int> values;
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 200;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 201;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(201);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
@@ -254,11 +256,11 @@
// Add a timer to actually quit.
auto test_timer = loop2->AddTimer([&fetcher, &values, this]() {
if (fetcher.Fetch()) {
- values.push_back(fetcher->msg_value);
+ values.push_back(fetcher.get()->value());
}
// Do it again to make sure we don't double fetch.
if (fetcher.Fetch()) {
- values.push_back(fetcher->msg_value);
+ values.push_back(fetcher.get()->value());
}
this->Exit();
});
@@ -281,14 +283,16 @@
::std::vector<int> values;
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 200;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 201;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(201);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
@@ -296,31 +300,34 @@
// Add a timer to actually quit.
auto test_timer = loop2->AddTimer([&fetcher, &values, &sender, this]() {
if (fetcher.Fetch()) {
- values.push_back(fetcher->msg_value);
+ values.push_back(fetcher.get()->value());
}
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 202;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(202);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 203;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(203);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 204;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(204);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
if (fetcher.FetchNext()) {
- values.push_back(fetcher->msg_value);
+ values.push_back(fetcher.get()->value());
}
if (fetcher.Fetch()) {
- values.push_back(fetcher->msg_value);
+ values.push_back(fetcher.get()->value());
}
this->Exit();
@@ -345,29 +352,31 @@
Fetcher<TestMessage> fetcher = fetch_loop->MakeFetcher<TestMessage>("/test");
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 100;
- ASSERT_TRUE(msg.Send());
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(100);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
{
- auto msg = sender.MakeMessage();
- msg->msg_value = 200;
- ASSERT_TRUE(msg.Send());
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
}
ASSERT_TRUE(fetcher.FetchNext());
ASSERT_NE(nullptr, fetcher.get());
- EXPECT_EQ(100, fetcher->msg_value);
+ EXPECT_EQ(100, fetcher.get()->value());
ASSERT_TRUE(fetcher.FetchNext());
ASSERT_NE(nullptr, fetcher.get());
- EXPECT_EQ(200, fetcher->msg_value);
+ EXPECT_EQ(200, fetcher.get()->value());
// When we run off the end of the queue, expect to still have the old message:
ASSERT_FALSE(fetcher.FetchNext());
ASSERT_NE(nullptr, fetcher.get());
- EXPECT_EQ(200, fetcher->msg_value);
+ EXPECT_EQ(200, fetcher.get()->value());
}
// Verify that making a fetcher and watcher for "/test" succeeds.
@@ -384,6 +393,14 @@
auto fetcher2 = loop->MakeFetcher<TestMessage>("/test");
}
+// Verify that registering a watcher for an invalid channel name dies.
+TEST_P(AbstractEventLoopDeathTest, InvalidChannelName) {
+ auto loop = Make();
+ EXPECT_DEATH(
+ { loop->MakeWatcher("/test/invalid", [&](const TestMessage &) {}); },
+ "/test/invalid");
+}
+
// Verify that registering a watcher twice for "/test" fails.
TEST_P(AbstractEventLoopDeathTest, TwoWatcher) {
auto loop = Make();
@@ -438,16 +455,17 @@
loop2->MakeWatcher("/test1", [&](const TestMessage &) {});
loop2->MakeWatcher("/test2", [&](const TestMessage &message) {
- EXPECT_EQ(message.msg_value, 200);
+ EXPECT_EQ(message.value(), 200);
this->Exit();
});
auto sender = loop1->MakeSender<TestMessage>("/test2");
loop2->OnRun([&]() {
- auto msg = sender.MakeMessage();
- msg->msg_value = 200;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
});
Run();
@@ -525,9 +543,18 @@
auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
auto test_timer = loop1->AddTimer([&sender]() {
- auto msg = sender.MakeMessage();
- msg->msg_value = 200;
- msg.Send();
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ });
+
+ loop2->MakeWatcher("/test", [&loop2](const TestMessage &msg) {
+ // Confirm that the data pointer makes sense from a watcher.
+ EXPECT_GT(&msg, loop2->context().data);
+ EXPECT_LT(&msg, reinterpret_cast<void *>(
+ reinterpret_cast<char *>(loop2->context().data) +
+ loop2->context().size));
});
test_timer->Setup(loop1->monotonic_now() + ::std::chrono::seconds(1));
@@ -537,15 +564,36 @@
EXPECT_TRUE(fetcher.Fetch());
- monotonic_clock::duration time_offset =
- fetcher->sent_time - (loop1->monotonic_now() - ::std::chrono::seconds(1));
+ monotonic_clock::duration monotonic_time_offset =
+ fetcher.context().monotonic_sent_time -
+ (loop1->monotonic_now() - ::std::chrono::seconds(1));
+ realtime_clock::duration realtime_time_offset =
+ fetcher.context().realtime_sent_time -
+ (loop1->realtime_now() - ::std::chrono::seconds(1));
- EXPECT_TRUE(time_offset > ::std::chrono::milliseconds(-500))
- << ": Got " << fetcher->sent_time.time_since_epoch().count()
+ EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500))
+ << ": Got "
+ << fetcher.context().monotonic_sent_time.time_since_epoch().count()
<< " expected " << loop1->monotonic_now().time_since_epoch().count();
- EXPECT_TRUE(time_offset < ::std::chrono::milliseconds(500))
- << ": Got " << fetcher->sent_time.time_since_epoch().count()
+ // Confirm that the data pointer makes sense.
+ EXPECT_GT(fetcher.get(), fetcher.context().data);
+ EXPECT_LT(fetcher.get(),
+ reinterpret_cast<void *>(
+ reinterpret_cast<char *>(fetcher.context().data) +
+ fetcher.context().size));
+ EXPECT_TRUE(monotonic_time_offset < ::std::chrono::milliseconds(500))
+ << ": Got "
+ << fetcher.context().monotonic_sent_time.time_since_epoch().count()
<< " expected " << loop1->monotonic_now().time_since_epoch().count();
+
+ EXPECT_TRUE(realtime_time_offset > ::std::chrono::milliseconds(-500))
+ << ": Got "
+ << fetcher.context().realtime_sent_time.time_since_epoch().count()
+ << " expected " << loop1->realtime_now().time_since_epoch().count();
+ EXPECT_TRUE(realtime_time_offset < ::std::chrono::milliseconds(500))
+ << ": Got "
+ << fetcher.context().realtime_sent_time.time_since_epoch().count()
+ << " expected " << loop1->realtime_now().time_since_epoch().count();
}
// Tests that a couple phased loops run in a row result in the correct offset
@@ -612,41 +660,5 @@
1.0, 0.1);
}
-// Verify that sending lots and lots of messages and using FetchNext gets a
-// contiguous block of messages and doesn't crash.
-// TODO(austin): We should store the same number of messages in simulation and
-// reality.
-TEST_P(AbstractEventLoopTest, LotsOfSends) {
- auto loop1 = MakePrimary();
- auto loop2 = Make();
- auto sender = loop1->MakeSender<TestMessage>("/test");
- auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
-
- auto test_timer = loop1->AddTimer([&sender, &fetcher, this]() {
- for (int i = 0; i < 100000; ++i) {
- auto msg = sender.MakeMessage();
- msg->msg_value = i;
- msg.Send();
- }
-
- int last = 0;
- if (fetcher.FetchNext()) {
- last = fetcher->msg_value;
- }
- while (fetcher.FetchNext()) {
- EXPECT_EQ(last + 1, fetcher->msg_value);
- ++last;
- }
-
- this->Exit();
- });
-
- loop1->OnRun([&test_timer, &loop1]() {
- test_timer->Setup(loop1->monotonic_now() + ::std::chrono::milliseconds(10));
- });
-
- Run();
-}
-
} // namespace testing
} // namespace aos
diff --git a/aos/events/event-loop_param_test.h b/aos/events/event_loop_param_test.h
similarity index 61%
rename from aos/events/event-loop_param_test.h
rename to aos/events/event_loop_param_test.h
index 83f0b37..fb08ac3 100644
--- a/aos/events/event-loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -3,7 +3,9 @@
#include <vector>
-#include "aos/events/event-loop.h"
+#include "aos/events/event_loop.h"
+#include "aos/flatbuffers.h"
+#include "aos/json_to_flatbuffer.h"
#include "gtest/gtest.h"
namespace aos {
@@ -11,6 +13,25 @@
class EventLoopTestFactory {
public:
+ EventLoopTestFactory()
+ : flatbuffer_(JsonToFlatbuffer("{\n"
+ " \"channels\": [ \n"
+ " {\n"
+ " \"name\": \"/test\",\n"
+ " \"type\": \"aos.TestMessage\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"/test1\",\n"
+ " \"type\": \"aos.TestMessage\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"/test2\",\n"
+ " \"type\": \"aos.TestMessage\"\n"
+ " }\n"
+ " ]\n"
+ "}\n",
+ Configuration::MiniReflectTypeTable())) {}
+
virtual ~EventLoopTestFactory() {}
// Makes a connected event loop.
@@ -27,6 +48,11 @@
// Advances time by sleeping. Can't be called from inside a loop.
virtual void SleepFor(::std::chrono::nanoseconds duration) = 0;
+
+ const Configuration *configuration() { return &flatbuffer_.message(); }
+
+ private:
+ FlatbufferDetachedBuffer<Configuration> flatbuffer_;
};
class AbstractEventLoopTestBase
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
new file mode 100644
index 0000000..9910d4d
--- /dev/null
+++ b/aos/events/event_loop_tmpl.h
@@ -0,0 +1,48 @@
+#ifndef AOS_EVENTS_EVENT_LOOP_TMPL_H_
+#define AOS_EVENTS_EVENT_LOOP_TMPL_H_
+
+#include <type_traits>
+#include "aos/events/event_loop.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+// From a watch functor, this will extract the message type of the argument.
+// This is the template forward declaration, and it extracts the call operator
+// as a PTMF to be used by the following specialization.
+template <class T>
+struct watch_message_type_trait
+ : watch_message_type_trait<decltype(&T::operator())> {};
+
+// From a watch functor, this will extract the message type of the argument.
+// This is the template specialization.
+template <class ClassType, class ReturnType, class A1>
+struct watch_message_type_trait<ReturnType (ClassType::*)(A1) const> {
+ using message_type = typename std::decay<A1>::type;
+};
+
+template <typename T>
+typename Sender<T>::Builder Sender<T>::MakeBuilder() {
+ return Builder(sender_.get(), sender_->data(), sender_->size());
+}
+
+template <typename Watch>
+void EventLoop::MakeWatcher(const absl::string_view channel_name, Watch &&w) {
+ using T = typename watch_message_type_trait<Watch>::message_type;
+ const Channel *channel = configuration::GetChannel(
+ configuration_, channel_name, T::GetFullyQualifiedName(), name());
+
+ CHECK(channel != nullptr)
+ << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
+ << T::GetFullyQualifiedName() << "\" } not found in config.";
+
+ return MakeRawWatcher(
+ channel, [this, w](const Context &context, const void *message) {
+ context_ = context;
+ w(*flatbuffers::GetRoot<T>(reinterpret_cast<const char *>(message)));
+ });
+}
+
+} // namespace aos
+
+#endif // AOS_EVENTS_EVENT_LOOP_TMPL_H
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
new file mode 100644
index 0000000..a4cfa72
--- /dev/null
+++ b/aos/events/event_scheduler.cc
@@ -0,0 +1,48 @@
+#include "aos/events/event_scheduler.h"
+
+#include <algorithm>
+#include <deque>
+
+#include "aos/events/event_loop.h"
+
+namespace aos {
+
+EventScheduler::Token EventScheduler::Schedule(
+ ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
+ return events_list_.emplace(time, callback);
+}
+
+void EventScheduler::Deschedule(EventScheduler::Token token) {
+ events_list_.erase(token);
+}
+
+void EventScheduler::RunFor(monotonic_clock::duration duration) {
+ const ::aos::monotonic_clock::time_point end_time =
+ monotonic_now() + duration;
+ is_running_ = true;
+ while (!events_list_.empty() && is_running_) {
+ auto iter = events_list_.begin();
+ ::aos::monotonic_clock::time_point next_time = iter->first;
+ if (next_time > end_time) {
+ break;
+ }
+ now_ = iter->first;
+ ::std::function<void()> callback = ::std::move(iter->second);
+ events_list_.erase(iter);
+ callback();
+ }
+ now_ = end_time;
+}
+
+void EventScheduler::Run() {
+ is_running_ = true;
+ while (!events_list_.empty() && is_running_) {
+ auto iter = events_list_.begin();
+ now_ = iter->first;
+ ::std::function<void()> callback = ::std::move(iter->second);
+ events_list_.erase(iter);
+ callback();
+ }
+}
+
+} // namespace aos
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
new file mode 100644
index 0000000..66d34b3
--- /dev/null
+++ b/aos/events/event_scheduler.h
@@ -0,0 +1,61 @@
+#ifndef AOS_EVENTS_EVENT_SCHEDULER_H_
+#define AOS_EVENTS_EVENT_SCHEDULER_H_
+
+#include <algorithm>
+#include <map>
+#include <memory>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "aos/events/event_loop.h"
+#include "aos/time/time.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+class EventScheduler {
+ public:
+ using ChannelType =
+ std::multimap<monotonic_clock::time_point, std::function<void()>>;
+ using Token = ChannelType::iterator;
+
+ // Schedule an event with a callback function
+ // Returns an iterator to the event
+ Token Schedule(monotonic_clock::time_point time,
+ std::function<void()> callback);
+
+ Token InvalidToken() { return events_list_.end(); }
+
+ // Deschedule an event by its iterator
+ void Deschedule(Token token);
+
+ // Runs until exited.
+ void Run();
+ // Runs for a duration.
+ void RunFor(monotonic_clock::duration duration);
+
+ void Exit() { is_running_ = false; }
+
+ bool is_running() const { return is_running_; }
+
+ monotonic_clock::time_point monotonic_now() const { return now_; }
+ realtime_clock::time_point realtime_now() const {
+ // TODO(austin): Make this all configurable...
+ return realtime_clock::epoch() + now_.time_since_epoch() +
+ std::chrono::seconds(1000000);
+ }
+
+ private:
+ // Current execution time.
+ monotonic_clock::time_point now_ = monotonic_clock::epoch();
+
+ // Multimap holding times to run functions. These are stored in order, and
+ // the order is the callback tree.
+ ChannelType events_list_;
+ bool is_running_ = false;
+};
+
+} // namespace aos
+
+#endif // AOS_EVENTS_EVENT_SCHEDULER_H_
diff --git a/aos/events/ping.cc b/aos/events/ping.cc
new file mode 100644
index 0000000..b904cb2
--- /dev/null
+++ b/aos/events/ping.cc
@@ -0,0 +1,88 @@
+#include <chrono>
+
+#include "aos/configuration.h"
+#include "aos/events/pingpong_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+#include "aos/json_to_flatbuffer.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+DEFINE_int32(sleep_ms, 10, "Time to sleep between pings");
+
+namespace aos {
+
+namespace chrono = std::chrono;
+
+class Ping {
+ public:
+ Ping(EventLoop *event_loop)
+ : event_loop_(event_loop),
+ sender_(event_loop_->MakeSender<examples::Ping>("/test")) {
+ timer_handle_ = event_loop_->AddTimer([this]() { SendPing(); });
+
+ event_loop_->MakeWatcher(
+ "/test", [this](const examples::Pong &pong) { HandlePong(pong); });
+
+ event_loop_->OnRun([this]() {
+ timer_handle_->Setup(event_loop_->monotonic_now(),
+ chrono::milliseconds(FLAGS_sleep_ms));
+ });
+
+ event_loop_->SetRuntimeRealtimePriority(5);
+ }
+
+ void SendPing() {
+ ++count_;
+ aos::Sender<examples::Ping>::Builder msg = sender_.MakeBuilder();
+ examples::Ping::Builder builder = msg.MakeBuilder<examples::Ping>();
+ builder.add_value(count_);
+ builder.add_send_time(
+ event_loop_->monotonic_now().time_since_epoch().count());
+ CHECK(msg.Send(builder.Finish()));
+ VLOG(2) << "Sending ping";
+ }
+
+ void HandlePong(const examples::Pong &pong) {
+ const aos::monotonic_clock::time_point monotonic_send_time(
+ chrono::nanoseconds(pong.initial_send_time()));
+ const aos::monotonic_clock::time_point monotonic_now =
+ event_loop_->monotonic_now();
+
+ const chrono::nanoseconds round_trip_time =
+ monotonic_now - monotonic_send_time;
+
+ if (pong.value() == count_) {
+ VLOG(1) << "Elapsed time " << round_trip_time.count() << " ns "
+ << FlatbufferToJson(&pong);
+ } else {
+ VLOG(1) << "Missmatched pong message";
+ }
+ }
+
+ private:
+ EventLoop *event_loop_;
+ aos::Sender<examples::Ping> sender_;
+ TimerHandler *timer_handle_;
+ int count_ = 0;
+};
+
+} // namespace aos
+
+int main(int argc, char **argv) {
+ FLAGS_logtostderr = true;
+ google::InitGoogleLogging(argv[0]);
+ ::gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig("aos/events/config.fb.json");
+
+ ::aos::ShmEventLoop event_loop(&config.message());
+
+ aos::Ping ping(&event_loop);
+
+ event_loop.Run();
+
+ ::aos::Cleanup();
+ return 0;
+}
diff --git a/aos/events/pingpong.fbs b/aos/events/pingpong.fbs
new file mode 100644
index 0000000..67e5015
--- /dev/null
+++ b/aos/events/pingpong.fbs
@@ -0,0 +1,14 @@
+namespace aos.examples;
+
+table Ping {
+ value:int;
+ send_time:long;
+}
+
+table Pong {
+ value:int;
+ initial_send_time:long;
+}
+
+root_type Ping;
+root_type Pong;
diff --git a/aos/events/pong.cc b/aos/events/pong.cc
new file mode 100644
index 0000000..1504bfb
--- /dev/null
+++ b/aos/events/pong.cc
@@ -0,0 +1,54 @@
+#include <chrono>
+
+#include "aos/configuration.h"
+#include "aos/events/pingpong_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+#include "aos/json_to_flatbuffer.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+namespace chrono = std::chrono;
+
+class Pong {
+ public:
+ Pong(EventLoop *event_loop)
+ : event_loop_(event_loop),
+ sender_(event_loop_->MakeSender<examples::Pong>("/test")) {
+ event_loop_->MakeWatcher("/test", [this](const examples::Ping &ping) {
+ aos::Sender<examples::Pong>::Builder msg = sender_.MakeBuilder();
+ examples::Pong::Builder builder = msg.MakeBuilder<examples::Pong>();
+ builder.add_value(ping.value());
+ builder.add_initial_send_time(ping.send_time());
+ CHECK(msg.Send(builder.Finish()));
+ });
+
+ event_loop_->SetRuntimeRealtimePriority(5);
+ }
+
+ private:
+ EventLoop *event_loop_;
+ aos::Sender<examples::Pong> sender_;
+};
+
+} // namespace aos
+
+int main(int argc, char **argv) {
+ FLAGS_logtostderr = true;
+ google::InitGoogleLogging(argv[0]);
+ ::gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig("aos/events/config.fb.json");
+
+ ::aos::ShmEventLoop event_loop(&config.message());
+
+ aos::Pong ping(&event_loop);
+
+ event_loop.Run();
+
+ ::aos::Cleanup();
+ return 0;
+}
diff --git a/aos/events/raw-event-loop.h b/aos/events/raw-event-loop.h
deleted file mode 100644
index 887c7ab..0000000
--- a/aos/events/raw-event-loop.h
+++ /dev/null
@@ -1,179 +0,0 @@
-#ifndef _AOS_EVENTS_RAW_EVENT_LOOP_H_
-#define _AOS_EVENTS_RAW_EVENT_LOOP_H_
-
-#include <atomic>
-#include <memory>
-#include <string>
-#include "aos/queue.h"
-#include "aos/time/time.h"
-
-// This file contains raw versions of the classes in event-loop.h.
-//
-// Users should look exclusively at event-loop.h. Only people who wish to
-// implement a new IPC layer (like a fake layer for example) may wish to use
-// these classes.
-namespace aos {
-
-// Raw version of fetcher. Contains a local variable that the fetcher will
-// update.
-// It is the job of the typed version to cast this to the appropriate type.
-class RawFetcher {
- public:
- RawFetcher() {}
- virtual ~RawFetcher() {}
-
- // Non-blocking fetch of the next message in the queue. Returns true if there
- // was a new message and we got it.
- virtual bool FetchNext() = 0;
- // Non-blocking fetch of the latest message:
- virtual bool Fetch() = 0;
-
- const void *most_recent() { return most_recent_; }
-
- protected:
- RawFetcher(const RawFetcher &) = delete;
- RawFetcher &operator=(const RawFetcher &) = delete;
- void set_most_recent(const void *most_recent) { most_recent_ = most_recent; }
-
- private:
- const void *most_recent_ = nullptr;
-};
-
-// Raw version of sender. Sending a message is a 3 part process. Fetch an opaque
-// token, cast that token to the message type, populate and then calling one of
-// Send() or Free().
-class RawSender {
- public:
- RawSender() {}
- virtual ~RawSender() {}
-
- virtual aos::Message *GetMessage() = 0;
-
- virtual void Free(aos::Message *msg) = 0;
-
- virtual bool Send(aos::Message *msg) = 0;
-
- // Call operator that calls Free().
- template <typename T>
- void operator()(T *t) {
- Free(t);
- }
-
- virtual const char *name() const = 0;
-
- protected:
- RawSender(const RawSender &) = delete;
- RawSender &operator=(const RawSender &) = delete;
-};
-
-// Opaque Information extracted from a particular type passed to the underlying
-// system so that it knows how much memory to allocate etc.
-struct QueueTypeInfo {
- // Message size:
- size_t size;
- // This should be a globally unique identifier for the type.
- int hash;
- // Config parameter for how long the queue should be.
- int queue_length;
-
- template <typename T>
- static QueueTypeInfo Get() {
- QueueTypeInfo info;
- info.size = sizeof(T);
- info.hash = T::kHash;
- info.queue_length = T::kQueueLength;
- return info;
- }
-
- // Necessary for the comparison of QueueTypeInfo objects in the
- // SimulatedEventLoop.
- bool operator<(const QueueTypeInfo &other) const {
- if (size != other.size) return size < other.size;
- if (hash != other.hash) return hash < other.hash;
- return queue_length < other.queue_length;
- }
-};
-
-// Interface for timers
-class TimerHandler {
- public:
- virtual ~TimerHandler() {}
-
- // Timer should sleep until base, base + offset, base + offset * 2, ...
- // If repeat_offset isn't set, the timer only expires once.
- virtual void Setup(monotonic_clock::time_point base,
- monotonic_clock::duration repeat_offset =
- ::aos::monotonic_clock::zero()) = 0;
-
- // Stop future calls to callback().
- virtual void Disable() = 0;
-};
-
-// Interface for phased loops. They are built on timers.
-class PhasedLoopHandler {
- public:
- virtual ~PhasedLoopHandler() {}
-
- // Sets the interval and offset. Any changes to interval and offset only take
- // effect when the handler finishes running.
- virtual void set_interval_and_offset(
- const monotonic_clock::duration interval,
- const monotonic_clock::duration offset) = 0;
-};
-
-class EventScheduler;
-
-// Virtual base class for all event queue-types.
-class RawEventLoop {
- public:
- virtual ~RawEventLoop() {}
-
- // Current time.
- virtual monotonic_clock::time_point monotonic_now() = 0;
-
- // The passed in function will be called when the event loop starts.
- // Use this to run code once the thread goes into "real-time-mode",
- virtual void OnRun(::std::function<void()> on_run) = 0;
-
- // Sets the name of the event loop.
- virtual void set_name(const char *name) = 0;
-
- // Threadsafe.
- bool is_running() const { return is_running_.load(); }
-
- // Creates a timer that executes callback when the timer expires
- // Returns a TimerHandle for configuration of the timer
- virtual TimerHandler *AddTimer(::std::function<void()> callback) = 0;
-
- // Creates a timer that executes callback periodically at the specified
- // interval and offset. Returns a PhasedLoopHandler for interacting with the
- // timer.
- virtual PhasedLoopHandler *AddPhasedLoop(
- ::std::function<void(int)> callback,
- const monotonic_clock::duration interval,
- const monotonic_clock::duration offset = ::std::chrono::seconds(0)) = 0;
-
- protected:
- friend class EventScheduler;
- void set_is_running(bool value) { is_running_.store(value); }
-
- // Will send new messages from (path, type).
- virtual std::unique_ptr<RawSender> MakeRawSender(
- const std::string &path, const QueueTypeInfo &type) = 0;
-
- // Will fetch new messages from (path, type).
- virtual std::unique_ptr<RawFetcher> MakeRawFetcher(
- const std::string &path, const QueueTypeInfo &type) = 0;
-
- // Will watch (path, type) for new messages
- virtual void MakeRawWatcher(
- const std::string &path, const QueueTypeInfo &type,
- std::function<void(const Message *message)> watcher) = 0;
-
- private:
- ::std::atomic<bool> is_running_{false};
-};
-
-} // namespace aos
-
-#endif // _AOS_EVENTS_RAW_EVENT_LOOP_H_
diff --git a/aos/events/shm-event-loop.cc b/aos/events/shm-event-loop.cc
deleted file mode 100644
index 5034f0b..0000000
--- a/aos/events/shm-event-loop.cc
+++ /dev/null
@@ -1,507 +0,0 @@
-#include "aos/events/shm-event-loop.h"
-
-#include <sys/timerfd.h>
-#include <algorithm>
-#include <atomic>
-#include <chrono>
-#include <stdexcept>
-
-#include "aos/events/epoll.h"
-#include "aos/init.h"
-#include "aos/logging/logging.h"
-#include "aos/queue.h"
-#include "aos/util/phased_loop.h"
-
-namespace aos {
-
-ShmEventLoop::ShmEventLoop() {}
-
-namespace {
-
-namespace chrono = ::std::chrono;
-
-class ShmFetcher : public RawFetcher {
- public:
- explicit ShmFetcher(RawQueue *queue) : queue_(queue) {
- // Move index_ to point to the end of the queue as it is at construction
- // time. Also grab the oldest message but don't expose it to the user yet.
- static constexpr Options<RawQueue> kOptions =
- RawQueue::kFromEnd | RawQueue::kNonBlock;
- msg_ = queue_->ReadMessageIndex(kOptions, &index_);
- }
- ~ShmFetcher() {
- if (msg_) {
- queue_->FreeMessage(msg_);
- }
- }
-
- bool FetchNext() override {
- const void *msg = queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_);
- // Only update the internal pointer if we got a new message.
- if (msg != nullptr) {
- queue_->FreeMessage(msg_);
- msg_ = msg;
- set_most_recent(msg_);
- }
- return msg != nullptr;
- }
-
- bool Fetch() override {
- static constexpr Options<RawQueue> kOptions =
- RawQueue::kFromEnd | RawQueue::kNonBlock;
- const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
- // Only update the internal pointer if we got a new message.
- if (msg != nullptr && msg != msg_) {
- queue_->FreeMessage(msg_);
- msg_ = msg;
- set_most_recent(msg_);
- return true;
- } else {
- // The message has to get freed if we didn't use it (and
- // RawQueue::FreeMessage is ok to call on nullptr).
- queue_->FreeMessage(msg);
-
- // We have a message from construction time. Give it to the user now.
- if (msg_ != nullptr && most_recent() != msg_) {
- set_most_recent(msg_);
- return true;
- } else {
- return false;
- }
- }
- }
-
- private:
- int index_ = 0;
- RawQueue *queue_;
- const void *msg_ = nullptr;
-};
-
-class ShmSender : public RawSender {
- public:
- explicit ShmSender(RawQueue *queue) : queue_(queue) {}
-
- ::aos::Message *GetMessage() override {
- return reinterpret_cast<::aos::Message *>(queue_->GetMessage());
- }
-
- void Free(::aos::Message *msg) override { queue_->FreeMessage(msg); }
-
- bool Send(::aos::Message *msg) override {
- assert(queue_ != nullptr);
- {
- // TODO(austin): This lets multiple senders reorder messages since time
- // isn't acquired with a lock held.
- if (msg->sent_time == monotonic_clock::min_time) {
- msg->sent_time = monotonic_clock::now();
- }
- }
- return queue_->WriteMessage(msg, RawQueue::kOverride);
- }
-
- const char *name() const override { return queue_->name(); }
-
- private:
- RawQueue *queue_;
-};
-
-} // namespace
-
-namespace internal {
-
-// Class to manage the state for a Watcher.
-class WatcherThreadState {
- public:
- WatcherThreadState(
- ShmEventLoop::ThreadState *thread_state, RawQueue *queue,
- ::std::function<void(const ::aos::Message *message)> watcher)
- : thread_state_(thread_state),
- queue_(queue),
- index_(0),
- watcher_(::std::move(watcher)) {}
-
- ~WatcherThreadState() {
- // Only kill the thread if it is running.
- if (running_) {
- // TODO(austin): CHECK that we aren't RT here.
-
- // Try joining. If we fail, we weren't asleep on the condition in the
- // queue. So hit it again and again until that's true.
- struct timespec end_time;
- AOS_PCHECK(clock_gettime(CLOCK_REALTIME, &end_time) == 0);
- while (true) {
- void *retval = nullptr;
- end_time.tv_nsec += 100000000;
- if (end_time.tv_nsec > 1000000000L) {
- end_time.tv_nsec -= 1000000000L;
- ++end_time.tv_sec;
- }
- int ret = pthread_timedjoin_np(pthread_, &retval, &end_time);
- if (ret == ETIMEDOUT) continue;
- AOS_PCHECK(ret == 0);
- break;
- }
- }
- }
-
- // Starts the thread and waits until it is running.
- void Start() {
- AOS_PCHECK(pthread_create(&pthread_, nullptr, &StaticRun, this) == 0);
- IPCRecursiveMutexLocker locker(&thread_started_mutex_);
- if (locker.owner_died()) ::aos::Die("Owner died");
- while (!running_) {
- AOS_CHECK(!thread_started_condition_.Wait());
- }
- }
-
- void GrabQueueIndex() {
- // Right after we are signaled to start, point index to the current index
- // so we don't read any messages received before now. Otherwise we will
- // get a significantly delayed read.
- static constexpr Options<RawQueue> kOptions =
- RawQueue::kFromEnd | RawQueue::kNonBlock;
- const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
- if (msg) {
- queue_->FreeMessage(msg);
- }
- }
-
- private:
- // Runs Run given a WatcherThreadState as the argument. This is an adapter
- // between pthreads and Run.
- static void *StaticRun(void *arg) {
- WatcherThreadState *watcher_thread_state =
- reinterpret_cast<WatcherThreadState *>(arg);
- watcher_thread_state->Run();
- return nullptr;
- }
-
- // Runs the watcher callback on new messages.
- void Run() {
- ::aos::SetCurrentThreadName(thread_state_->name() + ".watcher");
-
- // Signal the main thread that we are now ready.
- thread_state_->MaybeSetCurrentThreadRealtimePriority();
- {
- IPCRecursiveMutexLocker locker(&thread_started_mutex_);
- if (locker.owner_died()) ::aos::Die("Owner died");
- running_ = true;
- thread_started_condition_.Broadcast();
- }
-
- // Wait for the global start before handling events.
- thread_state_->WaitForStart();
-
- // Bail immediately if we are supposed to stop.
- if (!thread_state_->is_running()) {
- ::aos::UnsetCurrentThreadRealtimePriority();
- return;
- }
-
- const void *msg = nullptr;
- while (true) {
- msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_,
- chrono::seconds(1));
- // We hit a timeout. Confirm that we should be running and retry. Note,
- // is_running is threadsafe (it's an atomic underneath). Worst case, we
- // check again in a second.
- if (msg == nullptr) {
- if (!thread_state_->is_running()) break;
- continue;
- }
-
- {
- // Grab the lock so that only one callback can be called at a time.
- MutexLocker locker(&thread_state_->mutex_);
- if (!thread_state_->is_running()) break;
-
- watcher_(reinterpret_cast<const Message *>(msg));
- // watcher_ may have exited the event loop.
- if (!thread_state_->is_running()) break;
- }
- // Drop the reference.
- queue_->FreeMessage(msg);
- }
-
- // And drop the last reference.
- queue_->FreeMessage(msg);
- // Now that everything is cleaned up, drop RT priority before destroying the
- // thread.
- ::aos::UnsetCurrentThreadRealtimePriority();
- }
- pthread_t pthread_;
- ShmEventLoop::ThreadState *thread_state_;
- RawQueue *queue_;
- int32_t index_;
- bool running_ = false;
-
- ::std::function<void(const Message *message)> watcher_;
-
- // Mutex and condition variable used to wait until the thread is started
- // before going RT.
- ::aos::Mutex thread_started_mutex_;
- ::aos::Condition thread_started_condition_{&thread_started_mutex_};
-};
-
-// Adapter class to adapt a timerfd to a TimerHandler.
-// The part of the API which is accessed by the TimerHandler interface needs to
-// be threadsafe. This means Setup and Disable.
-class TimerHandlerState : public TimerHandler {
- public:
- TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
- : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
- shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
- MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
- timerfd_.Read();
- fn_();
- });
- }
-
- ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
-
- void Setup(monotonic_clock::time_point base,
- monotonic_clock::duration repeat_offset) override {
- // SetTime is threadsafe already.
- timerfd_.SetTime(base, repeat_offset);
- }
-
- void Disable() override {
- // Disable is also threadsafe already.
- timerfd_.Disable();
- }
-
- private:
- ShmEventLoop *shm_event_loop_;
-
- TimerFd timerfd_;
-
- // Function to be run on the thread
- ::std::function<void()> fn_;
-};
-
-// Adapter class to the timerfd and PhasedLoop.
-// The part of the API which is accessed by the PhasedLoopHandler interface
-// needs to be threadsafe. This means set_interval_and_offset
-class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
- public:
- PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
- const monotonic_clock::duration interval,
- const monotonic_clock::duration offset)
- : shm_event_loop_(shm_event_loop),
- phased_loop_(interval, shm_event_loop_->monotonic_now(), offset),
- fn_(::std::move(fn)) {
- shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
- MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
- {
- MutexLocker locker(&mutex_);
- timerfd_.Read();
- }
- // Call the function. To avoid needing a recursive mutex, drop the lock
- // before running the function.
- fn_(cycles_elapsed_);
- {
- MutexLocker locker(&mutex_);
- Reschedule();
- }
- });
- }
-
- ~PhasedLoopHandler() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
-
- void set_interval_and_offset(
- const monotonic_clock::duration interval,
- const monotonic_clock::duration offset) override {
- MutexLocker locker(&mutex_);
- phased_loop_.set_interval_and_offset(interval, offset);
- }
-
- void Startup() {
- MutexLocker locker(&mutex_);
- phased_loop_.Reset(shm_event_loop_->monotonic_now());
- Reschedule();
- }
-
- private:
- // Reschedules the timer. Must be called with the mutex held.
- void Reschedule() {
- cycles_elapsed_ = phased_loop_.Iterate(shm_event_loop_->monotonic_now());
- timerfd_.SetTime(phased_loop_.sleep_time(), ::aos::monotonic_clock::zero());
- }
-
- ShmEventLoop *shm_event_loop_;
-
- // Mutex to protect access to the timerfd_ (not strictly necessary), and the
- // phased_loop (necessary).
- ::aos::Mutex mutex_;
-
- TimerFd timerfd_;
- time::PhasedLoop phased_loop_;
-
- int cycles_elapsed_ = 1;
-
- // Function to be run
- const ::std::function<void(int)> fn_;
-};
-} // namespace internal
-
-::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
- const ::std::string &path, const QueueTypeInfo &type) {
- return ::std::unique_ptr<RawFetcher>(new ShmFetcher(
- RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
-}
-
-::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
- const ::std::string &path, const QueueTypeInfo &type) {
- Take(path);
- return ::std::unique_ptr<RawSender>(new ShmSender(
- RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
-}
-
-void ShmEventLoop::MakeRawWatcher(
- const ::std::string &path, const QueueTypeInfo &type,
- ::std::function<void(const Message *message)> watcher) {
- Take(path);
- ::std::unique_ptr<internal::WatcherThreadState> state(
- new internal::WatcherThreadState(
- &thread_state_, RawQueue::Fetch(path.c_str(), type.size, type.hash,
- type.queue_length),
- std::move(watcher)));
- watchers_.push_back(::std::move(state));
-}
-
-TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
- ::std::unique_ptr<internal::TimerHandlerState> timer(
- new internal::TimerHandlerState(this, ::std::move(callback)));
-
- timers_.push_back(::std::move(timer));
-
- return timers_.back().get();
-}
-
-PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
- ::std::function<void(int)> callback,
- const monotonic_clock::duration interval,
- const monotonic_clock::duration offset) {
- ::std::unique_ptr<internal::PhasedLoopHandler> phased_loop(
- new internal::PhasedLoopHandler(this, ::std::move(callback), interval,
- offset));
-
- phased_loops_.push_back(::std::move(phased_loop));
-
- return phased_loops_.back().get();
-}
-
-void ShmEventLoop::OnRun(::std::function<void()> on_run) {
- on_run_.push_back(::std::move(on_run));
-}
-
-void ShmEventLoop::set_name(const char *name) { thread_state_.name_ = name; }
-
-void ShmEventLoop::Run() {
- // Start all the watcher threads.
- for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
- watcher->Start();
- }
-
- ::aos::SetCurrentThreadName(thread_state_.name());
-
- // Now, all the threads are up. Lock everything into memory and go RT.
- if (thread_state_.priority_ != -1) {
- ::aos::InitRT();
- }
- thread_state_.MaybeSetCurrentThreadRealtimePriority();
- set_is_running(true);
-
- // Now that we are realtime (but before the OnRun handlers run), snap the
- // queue index.
- for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
- watcher->GrabQueueIndex();
- }
-
- // Now that we are RT, run all the OnRun handlers.
- for (const auto &run : on_run_) {
- run();
- }
-
- // Start up all the phased loops.
- for (::std::unique_ptr<internal::PhasedLoopHandler> &phased_loop :
- phased_loops_) {
- phased_loop->Startup();
- }
- // TODO(austin): We don't need a separate watcher thread if there are only
- // watchers and fetchers. Could lazily create the epoll loop and pick a
- // victim watcher to run in this thread.
- // Trigger all the threads to start now.
- thread_state_.Start();
-
- // And start our main event loop which runs all the timers and handles Quit.
- epoll_.Run();
-
- // Once epoll exits, there is no useful nonrt work left to do.
- set_is_running(false);
-
- // Signal all the watcher threads to exit. After this point, no more
- // callbacks will be handled.
- thread_state_.Exit();
-
- // Nothing time or synchronization critical needs to happen after this point.
- // Drop RT priority.
- ::aos::UnsetCurrentThreadRealtimePriority();
-
- // The watcher threads get cleaned up in the destructor.
-}
-
-void ShmEventLoop::ThreadState::Start() {
- MutexLocker locker(&mutex_);
- loop_running_ = true;
- if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
- loop_running_cond_.Broadcast();
-}
-
-void ShmEventLoop::ThreadState::WaitForStart() {
- MutexLocker locker(&mutex_);
- while (!(loop_running_ || loop_finished_)) {
- Condition::WaitResult wait_result =
- loop_running_cond_.WaitTimed(chrono::milliseconds(1000));
- if (wait_result == Condition::WaitResult::kOwnerDied) {
- ::aos::Die("ShmEventLoop mutex lock problem.\n");
- }
- }
-}
-
-void ShmEventLoop::ThreadState::MaybeSetCurrentThreadRealtimePriority() {
- if (priority_ != -1) {
- ::aos::SetCurrentThreadRealtimePriority(priority_);
- }
-}
-
-void ShmEventLoop::Exit() { epoll_.Quit(); }
-
-void ShmEventLoop::ThreadState::Exit() {
- IPCRecursiveMutexLocker locker(&mutex_);
- if (locker.owner_died()) ::aos::Die("Owner died");
- loop_running_ = false;
- loop_finished_ = true;
- loop_running_cond_.Broadcast();
-}
-
-ShmEventLoop::~ShmEventLoop() {
- if (is_running()) {
- ::aos::Die("ShmEventLoop destroyed while running\n");
- }
-}
-
-void ShmEventLoop::Take(const ::std::string &path) {
- if (is_running()) {
- ::aos::Die("Cannot add new objects while running.\n");
- }
-
- const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
- if (prior != taken_.end()) {
- ::aos::Die("%s is already being used.", path.c_str());
- } else {
- taken_.emplace_back(path);
- }
-}
-
-} // namespace aos
diff --git a/aos/events/shm-event-loop.h b/aos/events/shm-event-loop.h
deleted file mode 100644
index 5db8319..0000000
--- a/aos/events/shm-event-loop.h
+++ /dev/null
@@ -1,124 +0,0 @@
-#ifndef AOS_EVENTS_SHM_EVENT_LOOP_H_
-#define AOS_EVENTS_SHM_EVENT_LOOP_H_
-
-#include <unordered_set>
-#include <vector>
-
-#include "aos/condition.h"
-#include "aos/events/epoll.h"
-#include "aos/events/event-loop.h"
-#include "aos/mutex/mutex.h"
-
-namespace aos {
-namespace internal {
-
-class WatcherThreadState;
-class TimerHandlerState;
-class PhasedLoopHandler;
-
-} // namespace internal
-
-// Specialization of EventLoop that is built from queues running out of shared
-// memory. See more details at aos/queue.h
-//
-// This object must be interacted with from one thread, but the Senders and
-// Fetchers may be used from multiple threads afterwords (as long as their
-// destructors are called back in one thread again)
-class ShmEventLoop : public EventLoop {
- public:
- ShmEventLoop();
- ~ShmEventLoop() override;
-
- ::aos::monotonic_clock::time_point monotonic_now() override {
- return ::aos::monotonic_clock::now();
- }
-
- ::std::unique_ptr<RawSender> MakeRawSender(
- const ::std::string &path, const QueueTypeInfo &type) override;
- ::std::unique_ptr<RawFetcher> MakeRawFetcher(
- const ::std::string &path, const QueueTypeInfo &type) override;
-
- void MakeRawWatcher(
- const ::std::string &path, const QueueTypeInfo &type,
- ::std::function<void(const aos::Message *message)> watcher) override;
-
- TimerHandler *AddTimer(::std::function<void()> callback) override;
- ::aos::PhasedLoopHandler *AddPhasedLoop(
- ::std::function<void(int)> callback,
- const monotonic_clock::duration interval,
- const monotonic_clock::duration offset =
- ::std::chrono::seconds(0)) override;
-
- void OnRun(::std::function<void()> on_run) override;
- void Run();
- void Exit();
-
- // TODO(austin): Add a function to register control-C call.
-
- void SetRuntimeRealtimePriority(int priority) override {
- if (is_running()) {
- ::aos::Die("Cannot set realtime priority while running.");
- }
- thread_state_.priority_ = priority;
- }
-
- void set_name(const char *name) override;
-
- private:
- friend class internal::WatcherThreadState;
- friend class internal::TimerHandlerState;
- friend class internal::PhasedLoopHandler;
- // This ThreadState ensures that two watchers in the same loop cannot be
- // triggered concurrently. Because watchers block threads indefinitely, this
- // has to be shared_ptr in case the EventLoop is destroyed before the thread
- // receives any new events.
- class ThreadState {
- public:
- void WaitForStart();
-
- bool is_running() { return loop_running_; }
-
- void Start();
-
- void Exit();
-
- void MaybeSetCurrentThreadRealtimePriority();
-
- const ::std::string &name() const { return name_; }
-
- private:
- friend class internal::WatcherThreadState;
- friend class internal::TimerHandlerState;
- friend class internal::PhasedLoopHandler;
- friend class ShmEventLoop;
-
- // This mutex ensures that only one watch event happens at a time.
- ::aos::Mutex mutex_;
- // Block on this until the loop starts.
- ::aos::Condition loop_running_cond_{&mutex_};
- // Used to notify watchers that the loop is done.
- ::std::atomic<bool> loop_running_{false};
- bool loop_finished_ = false;
- int priority_ = -1;
-
- // Immutable after Start is called.
- ::std::string name_;
- };
-
- // Tracks that we can't have multiple watchers or a sender and a watcher (or
- // multiple senders) on a single queue (path).
- void Take(const ::std::string &path);
-
- ::std::vector<::std::function<void()>> on_run_;
- ThreadState thread_state_;
- ::std::vector<::std::string> taken_;
- internal::EPoll epoll_;
-
- ::std::vector<::std::unique_ptr<internal::TimerHandlerState>> timers_;
- ::std::vector<::std::unique_ptr<internal::PhasedLoopHandler>> phased_loops_;
- ::std::vector<::std::unique_ptr<internal::WatcherThreadState>> watchers_;
-};
-
-} // namespace aos
-
-#endif // AOS_EVENTS_SHM_EVENT_LOOP_H_
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
new file mode 100644
index 0000000..5a88717
--- /dev/null
+++ b/aos/events/shm_event_loop.cc
@@ -0,0 +1,611 @@
+#include "glog/logging.h"
+
+#include "aos/events/shm_event_loop.h"
+
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <sys/timerfd.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <algorithm>
+#include <atomic>
+#include <chrono>
+#include <stdexcept>
+
+#include "aos/events/epoll.h"
+#include "aos/ipc_lib/lockless_queue.h"
+#include "aos/realtime.h"
+#include "aos/util/phased_loop.h"
+
+DEFINE_string(shm_base, "/dev/shm/aos",
+ "Directory to place queue backing mmaped files in.");
+DEFINE_uint32(permissions, 0770,
+ "Permissions to make shared memory files and folders.");
+
+namespace aos {
+
+std::string ShmFolder(const Channel *channel) {
+ CHECK(channel->has_name());
+ CHECK_EQ(channel->name()->string_view()[0], '/');
+ return FLAGS_shm_base + channel->name()->str() + "/";
+}
+std::string ShmPath(const Channel *channel) {
+ CHECK(channel->has_type());
+ return ShmFolder(channel) + channel->type()->str() + ".v0";
+}
+
+class MMapedQueue {
+ public:
+ MMapedQueue(const Channel *channel) {
+ std::string path = ShmPath(channel);
+
+ // TODO(austin): Pull these out into the config if there is a need.
+ config_.num_watchers = 10;
+ config_.num_senders = 10;
+ config_.queue_size = 2 * channel->frequency();
+ config_.message_data_size = channel->max_size();
+
+ size_ = ipc_lib::LocklessQueueMemorySize(config_);
+
+ MkdirP(path);
+
+ // There are 2 cases. Either the file already exists, or it does not
+ // already exist and we need to create it. Start by trying to create it. If
+ // that fails, the file has already been created and we can open it
+ // normally.. Once the file has been created it wil never be deleted.
+ fd_ = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
+ O_CLOEXEC | FLAGS_permissions);
+ if (fd_ == -1 && errno == EEXIST) {
+ VLOG(1) << path << " already created.";
+ // File already exists.
+ fd_ = open(path.c_str(), O_RDWR, O_CLOEXEC);
+ PCHECK(fd_ != -1) << ": Failed to open " << path;
+ while (true) {
+ struct stat st;
+ PCHECK(fstat(fd_, &st) == 0);
+ if (st.st_size != 0) {
+ CHECK_EQ(static_cast<size_t>(st.st_size), size_)
+ << ": Size of " << path
+ << " doesn't match expected size of backing queue file. Did the "
+ "queue definition change?";
+ break;
+ } else {
+ // The creating process didn't get around to it yet. Give it a bit.
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ VLOG(1) << path << " is zero size, waiting";
+ }
+ }
+ } else {
+ VLOG(1) << "Created " << path;
+ PCHECK(fd_ != -1) << ": Failed to open " << path;
+ PCHECK(ftruncate(fd_, size_) == 0);
+ }
+
+ data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
+ PCHECK(data_ != MAP_FAILED);
+
+ ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
+ }
+
+ ~MMapedQueue() {
+ PCHECK(munmap(data_, size_) == 0);
+ PCHECK(close(fd_) == 0);
+ }
+
+ ipc_lib::LocklessQueueMemory *memory() const {
+ return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
+ }
+
+ const ipc_lib::LocklessQueueConfiguration &config() const {
+ return config_;
+ }
+
+ private:
+ void MkdirP(absl::string_view path) {
+ struct stat st;
+ auto last_slash_pos = path.find_last_of("/");
+
+ std::string folder(last_slash_pos == absl::string_view::npos
+ ? absl::string_view("")
+ : path.substr(0, last_slash_pos));
+ if (stat(folder.c_str(), &st) == -1) {
+ PCHECK(errno == ENOENT);
+ CHECK_NE(folder, "") << ": Base path doesn't exist";
+ MkdirP(folder);
+ VLOG(1) << "Creating " << folder;
+ PCHECK(mkdir(folder.c_str(), FLAGS_permissions) == 0);
+ }
+ }
+
+ ipc_lib::LocklessQueueConfiguration config_;
+
+ int fd_;
+
+ size_t size_;
+ void *data_;
+};
+
+// Returns the portion of the path after the last /.
+absl::string_view Filename(absl::string_view path) {
+ auto last_slash_pos = path.find_last_of("/");
+
+ return last_slash_pos == absl::string_view::npos
+ ? path
+ : path.substr(last_slash_pos + 1, path.size());
+}
+
+ShmEventLoop::ShmEventLoop(const Configuration *configuration)
+ : EventLoop(configuration), name_(Filename(program_invocation_name)) {}
+
+namespace {
+
+namespace chrono = ::std::chrono;
+
+class ShmFetcher : public RawFetcher {
+ public:
+ explicit ShmFetcher(const Channel *channel)
+ : lockless_queue_memory_(channel),
+ lockless_queue_(lockless_queue_memory_.memory(),
+ lockless_queue_memory_.config()),
+ data_storage_(static_cast<AlignedChar *>(aligned_alloc(
+ alignof(AlignedChar), channel->max_size())),
+ &free) {
+ context_.data = nullptr;
+ // Point the queue index at the next index to read starting now. This
+ // makes it such that FetchNext will read the next message sent after
+ // the fetcher is created.
+ PointAtNextQueueIndex();
+ }
+
+ ~ShmFetcher() { data_ = nullptr; }
+
+ // Points the next message to fetch at the queue index which will be
+ // populated next.
+ void PointAtNextQueueIndex() {
+ actual_queue_index_ = lockless_queue_.LatestQueueIndex();
+ if (!actual_queue_index_.valid()) {
+ // Nothing in the queue. The next element will show up at the 0th
+ // index in the queue.
+ actual_queue_index_ =
+ ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
+ } else {
+ actual_queue_index_ = actual_queue_index_.Increment();
+ }
+ }
+
+ bool FetchNext() override {
+ // TODO(austin): Write a test which starts with nothing in the queue,
+ // and then calls FetchNext() after something is sent.
+ // TODO(austin): Get behind and make sure it dies both here and with
+ // Fetch.
+ ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
+ actual_queue_index_.index(), &context_.monotonic_sent_time,
+ &context_.realtime_sent_time, &context_.size,
+ reinterpret_cast<char *>(data_storage_.get()));
+ if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+ context_.queue_index = actual_queue_index_.index();
+ data_ = reinterpret_cast<char *>(data_storage_.get()) +
+ lockless_queue_.message_data_size() - context_.size;
+ context_.data = data_;
+ actual_queue_index_ = actual_queue_index_.Increment();
+ }
+
+ // Make sure the data wasn't modified while we were reading it. This
+ // can only happen if you are reading the last message *while* it is
+ // being written to, which means you are pretty far behind.
+ CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+ << ": Got behind while reading and the last message was modified "
+ "out "
+ "from under us while we were reading it. Don't get so far "
+ "behind.";
+
+ CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
+ << ": The next message is no longer available.";
+ return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+ }
+
+ bool Fetch() override {
+ const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
+ // actual_queue_index_ is only meaningful if it was set by Fetch or
+ // FetchNext. This happens when valid_data_ has been set. So, only
+ // skip checking if valid_data_ is true.
+ //
+ // Also, if the latest queue index is invalid, we are empty. So there
+ // is nothing to fetch.
+ if ((data_ != nullptr &&
+ queue_index == actual_queue_index_.DecrementBy(1u)) ||
+ !queue_index.valid()) {
+ return false;
+ }
+
+ ipc_lib::LocklessQueue::ReadResult read_result =
+ lockless_queue_.Read(queue_index.index(), &context_.monotonic_sent_time,
+ &context_.realtime_sent_time, &context_.size,
+ reinterpret_cast<char *>(data_storage_.get()));
+ if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
+ context_.queue_index = queue_index.index();
+ data_ = reinterpret_cast<char *>(data_storage_.get()) +
+ lockless_queue_.message_data_size() - context_.size;
+ context_.data = data_;
+ actual_queue_index_ = queue_index.Increment();
+ }
+
+ // Make sure the data wasn't modified while we were reading it. This
+ // can only happen if you are reading the last message *while* it is
+ // being written to, which means you are pretty far behind.
+ CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
+ << ": Got behind while reading and the last message was modified "
+ "out "
+ "from under us while we were reading it. Don't get so far "
+ "behind.";
+
+ CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
+ << ": Queue index went backwards. This should never happen.";
+
+ // We fell behind between when we read the index and read the value.
+ // This isn't worth recovering from since this means we went to sleep
+ // for a long time in the middle of this function.
+ CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
+ << ": The next message is no longer available.";
+ return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
+ }
+
+ bool RegisterWakeup(int priority) {
+ return lockless_queue_.RegisterWakeup(priority);
+ }
+
+ void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
+
+ private:
+ MMapedQueue lockless_queue_memory_;
+ ipc_lib::LocklessQueue lockless_queue_;
+
+ ipc_lib::QueueIndex actual_queue_index_ =
+ ipc_lib::LocklessQueue::empty_queue_index();
+
+ struct AlignedChar {
+ alignas(32) char data;
+ };
+
+ std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
+};
+
+class ShmSender : public RawSender {
+ public:
+ explicit ShmSender(const Channel *channel, const ShmEventLoop *shm_event_loop)
+ : RawSender(),
+ shm_event_loop_(shm_event_loop),
+ name_(channel->name()->str()),
+ lockless_queue_memory_(channel),
+ lockless_queue_(lockless_queue_memory_.memory(),
+ lockless_queue_memory_.config()),
+ lockless_queue_sender_(lockless_queue_.MakeSender()) {}
+
+ void *data() override { return lockless_queue_sender_.Data(); }
+ size_t size() override { return lockless_queue_sender_.size(); }
+ bool Send(size_t size) override {
+ lockless_queue_sender_.Send(size);
+ lockless_queue_.Wakeup(shm_event_loop_->priority());
+ return true;
+ }
+
+ bool Send(void *msg, size_t length) override {
+ lockless_queue_sender_.Send(reinterpret_cast<char *>(msg), length);
+ lockless_queue_.Wakeup(shm_event_loop_->priority());
+ // TODO(austin): Return an error if we send too fast.
+ return true;
+ }
+
+ const absl::string_view name() const override { return name_; }
+
+ private:
+ const ShmEventLoop *shm_event_loop_;
+ std::string name_;
+ MMapedQueue lockless_queue_memory_;
+ ipc_lib::LocklessQueue lockless_queue_;
+ ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
+};
+
+} // namespace
+
+namespace internal {
+
+// Class to manage the state for a Watcher.
+class WatcherState {
+ public:
+ WatcherState(
+ const Channel *channel,
+ std::function<void(const Context &context, const void *message)> watcher)
+ : shm_fetcher_(channel), watcher_(watcher) {}
+
+ ~WatcherState() {}
+
+ // Points the next message to fetch at the queue index which will be populated
+ // next.
+ void PointAtNextQueueIndex() { shm_fetcher_.PointAtNextQueueIndex(); }
+
+ // Returns true if there is new data available.
+ bool HasNewData() {
+ if (!has_new_data_) {
+ has_new_data_ = shm_fetcher_.FetchNext();
+ }
+
+ return has_new_data_;
+ }
+
+ // Returns the time of the current data sample.
+ aos::monotonic_clock::time_point event_time() const {
+ return shm_fetcher_.context().monotonic_sent_time;
+ }
+
+ // Consumes the data by calling the callback.
+ void CallCallback() {
+ CHECK(has_new_data_);
+ watcher_(shm_fetcher_.context(), shm_fetcher_.most_recent_data());
+ has_new_data_ = false;
+ }
+
+ // Starts the thread and waits until it is running.
+ bool RegisterWakeup(int priority) {
+ return shm_fetcher_.RegisterWakeup(priority);
+ }
+
+ void UnregisterWakeup() { return shm_fetcher_.UnregisterWakeup(); }
+
+ private:
+ bool has_new_data_ = false;
+
+ ShmFetcher shm_fetcher_;
+
+ std::function<void(const Context &context, const void *message)> watcher_;
+};
+
+// Adapter class to adapt a timerfd to a TimerHandler.
+// The part of the API which is accessed by the TimerHandler interface needs to
+// be threadsafe. This means Setup and Disable.
+class TimerHandlerState : public TimerHandler {
+ public:
+ TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
+ : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
+ shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
+ timerfd_.Read();
+ fn_();
+ });
+ }
+
+ ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
+
+ void Setup(monotonic_clock::time_point base,
+ monotonic_clock::duration repeat_offset) override {
+ // SetTime is threadsafe already.
+ timerfd_.SetTime(base, repeat_offset);
+ }
+
+ void Disable() override {
+ // Disable is also threadsafe already.
+ timerfd_.Disable();
+ }
+
+ private:
+ ShmEventLoop *shm_event_loop_;
+
+ TimerFd timerfd_;
+
+ // Function to be run on the thread
+ ::std::function<void()> fn_;
+};
+
+// Adapter class to the timerfd and PhasedLoop.
+// The part of the API which is accessed by the PhasedLoopHandler interface
+// needs to be threadsafe. This means set_interval_and_offset
+class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
+ public:
+ PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset)
+ : shm_event_loop_(shm_event_loop),
+ phased_loop_(interval, shm_event_loop_->monotonic_now(), offset),
+ fn_(::std::move(fn)) {
+ shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
+ timerfd_.Read();
+ // Call the function. To avoid needing a recursive mutex, drop the lock
+ // before running the function.
+ fn_(cycles_elapsed_);
+ Reschedule();
+ });
+ }
+
+ ~PhasedLoopHandler() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
+
+ void set_interval_and_offset(
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset) override {
+ phased_loop_.set_interval_and_offset(interval, offset);
+ }
+
+ void Startup() {
+ phased_loop_.Reset(shm_event_loop_->monotonic_now());
+ Reschedule();
+ }
+
+ private:
+ // Reschedules the timer. Must be called with the mutex held.
+ void Reschedule() {
+ cycles_elapsed_ = phased_loop_.Iterate(shm_event_loop_->monotonic_now());
+ timerfd_.SetTime(phased_loop_.sleep_time(), ::aos::monotonic_clock::zero());
+ }
+
+ ShmEventLoop *shm_event_loop_;
+
+ TimerFd timerfd_;
+ time::PhasedLoop phased_loop_;
+
+ int cycles_elapsed_ = 1;
+
+ // Function to be run
+ const ::std::function<void(int)> fn_;
+};
+} // namespace internal
+
+::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
+ const Channel *channel) {
+ return ::std::unique_ptr<RawFetcher>(new ShmFetcher(channel));
+}
+
+::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
+ const Channel *channel) {
+ Take(channel);
+ return ::std::unique_ptr<RawSender>(new ShmSender(channel, this));
+}
+
+void ShmEventLoop::MakeRawWatcher(
+ const Channel *channel,
+ std::function<void(const Context &context, const void *message)> watcher) {
+ Take(channel);
+
+ ::std::unique_ptr<internal::WatcherState> state(
+ new internal::WatcherState(
+ channel, std::move(watcher)));
+ watchers_.push_back(::std::move(state));
+}
+
+TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
+ ::std::unique_ptr<internal::TimerHandlerState> timer(
+ new internal::TimerHandlerState(this, ::std::move(callback)));
+
+ timers_.push_back(::std::move(timer));
+
+ return timers_.back().get();
+}
+
+PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
+ ::std::function<void(int)> callback,
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset) {
+ ::std::unique_ptr<internal::PhasedLoopHandler> phased_loop(
+ new internal::PhasedLoopHandler(this, ::std::move(callback), interval,
+ offset));
+
+ phased_loops_.push_back(::std::move(phased_loop));
+
+ return phased_loops_.back().get();
+}
+
+void ShmEventLoop::OnRun(::std::function<void()> on_run) {
+ on_run_.push_back(::std::move(on_run));
+}
+
+void ShmEventLoop::Run() {
+ std::unique_ptr<ipc_lib::SignalFd> signalfd;
+
+ if (watchers_.size() > 0) {
+ signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
+
+ epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() {
+ signalfd_siginfo result = signalfd_ptr->Read();
+ CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
+
+ // TODO(austin): We should really be checking *everything*, not just
+ // watchers, and calling the oldest thing first. That will improve
+ // determinism a lot.
+
+ while (true) {
+ // Call the handlers in time order of their messages.
+ aos::monotonic_clock::time_point min_event_time =
+ aos::monotonic_clock::max_time;
+ size_t min_watcher_index = -1;
+ size_t watcher_index = 0;
+ for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
+ if (watcher->HasNewData()) {
+ if (watcher->event_time() < min_event_time) {
+ min_watcher_index = watcher_index;
+ min_event_time = watcher->event_time();
+ }
+ }
+ ++watcher_index;
+ }
+
+ if (min_event_time == aos::monotonic_clock::max_time) {
+ break;
+ }
+
+ watchers_[min_watcher_index]->CallCallback();
+ }
+ });
+ }
+
+ // Now, all the threads are up. Lock everything into memory and go RT.
+ if (priority_ != 0) {
+ ::aos::InitRT();
+
+ LOG(INFO) << "Setting priority to " << priority_;
+ ::aos::SetCurrentThreadRealtimePriority(priority_);
+ }
+
+ set_is_running(true);
+
+ // Now that we are realtime (but before the OnRun handlers run), snap the
+ // queue index.
+ for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
+ watcher->PointAtNextQueueIndex();
+ CHECK(watcher->RegisterWakeup(priority_));
+ }
+
+ // Now that we are RT, run all the OnRun handlers.
+ for (const auto &run : on_run_) {
+ run();
+ }
+
+ // Start up all the phased loops.
+ for (::std::unique_ptr<internal::PhasedLoopHandler> &phased_loop :
+ phased_loops_) {
+ phased_loop->Startup();
+ }
+
+ // And start our main event loop which runs all the timers and handles Quit.
+ epoll_.Run();
+
+ // Once epoll exits, there is no useful nonrt work left to do.
+ set_is_running(false);
+
+ // Nothing time or synchronization critical needs to happen after this point.
+ // Drop RT priority.
+ ::aos::UnsetCurrentThreadRealtimePriority();
+
+ for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
+ watcher->UnregisterWakeup();
+ }
+
+ if (watchers_.size() > 0) {
+ epoll_.DeleteFd(signalfd->fd());
+ signalfd.reset();
+ }
+}
+
+void ShmEventLoop::Exit() { epoll_.Quit(); }
+
+ShmEventLoop::~ShmEventLoop() {
+ CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
+}
+
+void ShmEventLoop::Take(const Channel *channel) {
+ CHECK(!is_running()) << ": Cannot add new objects while running.";
+
+ // Cheat aggresively. Use the shared memory path as a proxy for a unique
+ // identifier for the channel.
+ const std::string path = ShmPath(channel);
+
+ const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
+ CHECK(prior == taken_.end()) << ": " << path << " is already being used.";
+
+ taken_.emplace_back(path);
+}
+
+void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
+ if (is_running()) {
+ LOG(FATAL) << "Cannot set realtime priority while running.";
+ }
+ priority_ = priority;
+}
+
+} // namespace aos
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
new file mode 100644
index 0000000..e859201
--- /dev/null
+++ b/aos/events/shm_event_loop.h
@@ -0,0 +1,92 @@
+#ifndef AOS_EVENTS_SHM_EVENT_LOOP_H_
+#define AOS_EVENTS_SHM_EVENT_LOOP_H_
+
+#include <unordered_set>
+#include <vector>
+
+#include "aos/events/epoll.h"
+#include "aos/events/event_loop.h"
+#include "aos/ipc_lib/signalfd.h"
+#include "aos/mutex/mutex.h"
+
+namespace aos {
+namespace internal {
+
+class WatcherState;
+class TimerHandlerState;
+class PhasedLoopHandler;
+
+} // namespace internal
+
+// Specialization of EventLoop that is built from queues running out of shared
+// memory. See more details at aos/queue.h
+//
+// This object must be interacted with from one thread, but the Senders and
+// Fetchers may be used from multiple threads afterwords (as long as their
+// destructors are called back in one thread again)
+class ShmEventLoop : public EventLoop {
+ public:
+ ShmEventLoop(const Configuration *configuration);
+ ~ShmEventLoop() override;
+
+ aos::monotonic_clock::time_point monotonic_now() override {
+ return aos::monotonic_clock::now();
+ }
+ aos::realtime_clock::time_point realtime_now() override {
+ return aos::realtime_clock::now();
+ }
+
+ std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
+ std::unique_ptr<RawFetcher> MakeRawFetcher(const Channel *channel) override;
+
+ void MakeRawWatcher(
+ const Channel *channel,
+ std::function<void(const Context &context, const void *message)> watcher)
+ override;
+
+ TimerHandler *AddTimer(std::function<void()> callback) override;
+ aos::PhasedLoopHandler *AddPhasedLoop(
+ std::function<void(int)> callback,
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset =
+ std::chrono::seconds(0)) override;
+
+ void OnRun(std::function<void()> on_run) override;
+ void Run();
+ void Exit();
+
+ // TODO(austin): Add a function to register control-C call.
+
+ void SetRuntimeRealtimePriority(int priority) override;
+
+ void set_name(const absl::string_view name) override {
+ name_ = std::string(name);
+ }
+ const absl::string_view name() const override { return name_; }
+
+ int priority() const { return priority_; }
+
+ private:
+ friend class internal::WatcherState;
+ friend class internal::TimerHandlerState;
+ friend class internal::PhasedLoopHandler;
+
+ // Tracks that we can't have multiple watchers or a sender and a watcher (or
+ // multiple senders) on a single queue (path).
+ void Take(const Channel *channel);
+
+ std::vector<std::function<void()>> on_run_;
+ int priority_ = 0;
+ std::string name_;
+ std::vector<std::string> taken_;
+
+ internal::EPoll epoll_;
+
+ std::vector<std::unique_ptr<internal::TimerHandlerState>> timers_;
+ std::vector<std::unique_ptr<internal::PhasedLoopHandler>> phased_loops_;
+ std::vector<std::unique_ptr<internal::WatcherState>> watchers_;
+};
+
+} // namespace aos
+
+#endif // AOS_EVENTS_SHM_EVENT_LOOP_H_
diff --git a/aos/events/shm-event-loop_test.cc b/aos/events/shm_event_loop_test.cc
similarity index 72%
rename from aos/events/shm-event-loop_test.cc
rename to aos/events/shm_event_loop_test.cc
index e0d0b49..4406e01 100644
--- a/aos/events/shm-event-loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -1,9 +1,13 @@
-#include "aos/events/shm-event-loop.h"
+#include "aos/events/shm_event_loop.h"
-#include "aos/events/event-loop_param_test.h"
-#include "aos/testing/test_shm.h"
+#include "aos/events/event_loop_param_test.h"
+#include "glog/logging.h"
#include "gtest/gtest.h"
+#include "aos/events/test_message_generated.h"
+
+DECLARE_string(shm_base);
+
namespace aos {
namespace testing {
namespace {
@@ -11,28 +15,40 @@
class ShmEventLoopTestFactory : public EventLoopTestFactory {
public:
+ ShmEventLoopTestFactory() {
+ // Put all the queue files in ${TEST_TMPDIR} if it is set, otherwise
+ // everything will be reusing /dev/shm when sharded.
+ char *test_tmpdir = getenv("TEST_TMPDIR");
+ if (test_tmpdir != nullptr) {
+ FLAGS_shm_base = std::string(test_tmpdir) + "/aos";
+ }
+
+ // Clean up anything left there before.
+ unlink((FLAGS_shm_base + "test/aos.TestMessage.v0").c_str());
+ unlink((FLAGS_shm_base + "test1/aos.TestMessage.v0").c_str());
+ unlink((FLAGS_shm_base + "test2/aos.TestMessage.v0").c_str());
+ }
+
::std::unique_ptr<EventLoop> Make() override {
- return ::std::unique_ptr<EventLoop>(new ShmEventLoop());
+ return ::std::unique_ptr<EventLoop>(new ShmEventLoop(configuration()));
}
::std::unique_ptr<EventLoop> MakePrimary() override {
::std::unique_ptr<ShmEventLoop> loop =
- ::std::unique_ptr<ShmEventLoop>(new ShmEventLoop());
+ ::std::unique_ptr<ShmEventLoop>(new ShmEventLoop(configuration()));
primary_event_loop_ = loop.get();
return ::std::move(loop);
}
- void Run() override { AOS_CHECK_NOTNULL(primary_event_loop_)->Run(); }
+ void Run() override { CHECK_NOTNULL(primary_event_loop_)->Run(); }
- void Exit() override { AOS_CHECK_NOTNULL(primary_event_loop_)->Exit(); }
+ void Exit() override { CHECK_NOTNULL(primary_event_loop_)->Exit(); }
void SleepFor(::std::chrono::nanoseconds duration) override {
::std::this_thread::sleep_for(duration);
}
private:
- ::aos::testing::TestSharedMemory my_shm_;
-
::aos::ShmEventLoop *primary_event_loop_;
};
@@ -46,24 +62,13 @@
return new ShmEventLoopTestFactory();
}));
-struct TestMessage : public ::aos::Message {
- enum { kQueueLength = 100, kHash = 0x696c0cdc };
- int msg_value;
-
- void Zero() { msg_value = 0; }
- static size_t Size() { return 1 + ::aos::Message::Size(); }
- size_t Print(char *buffer, size_t length) const;
- TestMessage() { Zero(); }
-};
-
} // namespace
bool IsRealtime() {
int scheduler;
- if ((scheduler = sched_getscheduler(0)) == -1) {
- AOS_PLOG(FATAL, "sched_getscheduler(0) failed\n");
- }
- AOS_LOG(INFO, "scheduler is %d\n", scheduler);
+ PCHECK((scheduler = sched_getscheduler(0)) != -1);
+
+ LOG(INFO) << "scheduler is " << scheduler;
return scheduler == SCHED_FIFO || scheduler == SCHED_RR;
}
@@ -82,7 +87,7 @@
bool did_timer = false;
bool did_watcher = false;
- auto timer = loop->AddTimer([&did_timer, &loop, &factory]() {
+ auto timer = loop->AddTimer([&did_timer, &factory]() {
EXPECT_TRUE(IsRealtime());
did_timer = true;
factory.Exit();
@@ -97,9 +102,11 @@
EXPECT_TRUE(IsRealtime());
did_onrun = true;
timer->Setup(loop->monotonic_now() + chrono::milliseconds(100));
- auto msg = sender.MakeMessage();
- msg->msg_value = 200;
- msg.Send();
+
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ msg.Send(builder.Finish());
});
factory.Run();
diff --git a/aos/events/simulated-event-loop.cc b/aos/events/simulated-event-loop.cc
deleted file mode 100644
index 85ea21b..0000000
--- a/aos/events/simulated-event-loop.cc
+++ /dev/null
@@ -1,402 +0,0 @@
-#include "aos/events/simulated-event-loop.h"
-
-#include <algorithm>
-#include <deque>
-
-#include "aos/logging/logging.h"
-#include "aos/queue.h"
-#include "aos/testing/test_logging.h"
-#include "aos/util/phased_loop.h"
-
-namespace aos {
-namespace {
-
-class SimulatedSender : public RawSender {
- public:
- SimulatedSender(SimulatedQueue *queue, EventLoop *event_loop)
- : queue_(queue), event_loop_(event_loop) {
- testing::EnableTestLogging();
- }
- ~SimulatedSender() {}
-
- aos::Message *GetMessage() override {
- return RefCountedBuffer(queue_->size()).release();
- }
-
- void Free(aos::Message *msg) override { RefCountedBuffer tmp(msg); }
-
- bool Send(aos::Message *msg) override {
- {
- if (msg->sent_time == monotonic_clock::min_time) {
- msg->sent_time = event_loop_->monotonic_now();
- }
- }
- queue_->Send(RefCountedBuffer(msg));
- return true;
- }
-
- const char *name() const override { return queue_->name(); }
-
- private:
- SimulatedQueue *queue_;
- EventLoop *event_loop_;
-};
-} // namespace
-
-class SimulatedFetcher : public RawFetcher {
- public:
- explicit SimulatedFetcher(SimulatedQueue *queue) : queue_(queue) {}
- ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
-
- bool FetchNext() override {
- if (msgs_.size() == 0) return false;
-
- msg_ = msgs_.front();
- msgs_.pop_front();
- set_most_recent(msg_.get());
- return true;
- }
-
- bool Fetch() override {
- if (msgs_.size() == 0) {
- if (!msg_ && queue_->latest_message()) {
- msg_ = queue_->latest_message();
- set_most_recent(msg_.get());
- return true;
- } else {
- return false;
- }
- }
-
- // We've had a message enqueued, so we don't need to go looking for the
- // latest message from before we started.
- msg_ = msgs_.back();
- msgs_.clear();
- set_most_recent(msg_.get());
- return true;
- }
-
- private:
- friend class SimulatedQueue;
-
- // Internal method for Simulation to add a message to the buffer.
- void Enqueue(RefCountedBuffer buffer) {
- msgs_.emplace_back(buffer);
- }
-
- SimulatedQueue *queue_;
- RefCountedBuffer msg_;
-
- // Messages queued up but not in use.
- ::std::deque<RefCountedBuffer> msgs_;
-};
-
-class SimulatedTimerHandler : public TimerHandler {
- public:
- explicit SimulatedTimerHandler(EventScheduler *scheduler,
- ::std::function<void()> fn)
- : scheduler_(scheduler), fn_(fn) {}
- ~SimulatedTimerHandler() {}
-
- void Setup(monotonic_clock::time_point base,
- monotonic_clock::duration repeat_offset) override {
- Disable();
- const ::aos::monotonic_clock::time_point monotonic_now =
- scheduler_->monotonic_now();
- base_ = base;
- repeat_offset_ = repeat_offset;
- if (base < monotonic_now) {
- token_ = scheduler_->Schedule(monotonic_now, [this]() { HandleEvent(); });
- } else {
- token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
- }
- }
-
- void HandleEvent() {
- const ::aos::monotonic_clock::time_point monotonic_now =
- scheduler_->monotonic_now();
- if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
- // Reschedule.
- while (base_ <= monotonic_now) base_ += repeat_offset_;
- token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
- } else {
- token_ = EventScheduler::Token();
- }
- fn_();
- }
-
- void Disable() override {
- if (token_ != EventScheduler::Token()) {
- scheduler_->Deschedule(token_);
- token_ = EventScheduler::Token();
- }
- }
-
- ::aos::monotonic_clock::time_point monotonic_now() const {
- return scheduler_->monotonic_now();
- }
-
- private:
- EventScheduler *scheduler_;
- EventScheduler::Token token_;
- // Function to be run on the thread
- ::std::function<void()> fn_;
- monotonic_clock::time_point base_;
- monotonic_clock::duration repeat_offset_;
-};
-
-class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
- public:
- SimulatedPhasedLoopHandler(EventScheduler *scheduler,
- ::std::function<void(int)> fn,
- const monotonic_clock::duration interval,
- const monotonic_clock::duration offset)
- : simulated_timer_handler_(scheduler, [this]() { HandleTimerWakeup(); }),
- phased_loop_(interval, simulated_timer_handler_.monotonic_now(),
- offset),
- fn_(fn) {
- // TODO(austin): This assumes time doesn't change between when the
- // constructor is called and when we start running. It's probably a safe
- // assumption.
- Reschedule();
- }
-
- void HandleTimerWakeup() {
- fn_(cycles_elapsed_);
- Reschedule();
- }
-
- void set_interval_and_offset(
- const monotonic_clock::duration interval,
- const monotonic_clock::duration offset) override {
- phased_loop_.set_interval_and_offset(interval, offset);
- }
-
- void Reschedule() {
- cycles_elapsed_ =
- phased_loop_.Iterate(simulated_timer_handler_.monotonic_now());
- simulated_timer_handler_.Setup(phased_loop_.sleep_time(),
- ::aos::monotonic_clock::zero());
- }
-
- private:
- SimulatedTimerHandler simulated_timer_handler_;
-
- time::PhasedLoop phased_loop_;
-
- int cycles_elapsed_ = 1;
-
- ::std::function<void(int)> fn_;
-};
-
-class SimulatedEventLoop : public EventLoop {
- public:
- explicit SimulatedEventLoop(
- EventScheduler *scheduler,
- ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
- *queues)
- : scheduler_(scheduler), queues_(queues) {
- scheduler_->AddRawEventLoop(this);
- }
- ~SimulatedEventLoop() override { scheduler_->RemoveRawEventLoop(this); };
-
- ::aos::monotonic_clock::time_point monotonic_now() override {
- return scheduler_->monotonic_now();
- }
-
- ::std::unique_ptr<RawSender> MakeRawSender(
- const ::std::string &path, const QueueTypeInfo &type) override;
-
- ::std::unique_ptr<RawFetcher> MakeRawFetcher(
- const ::std::string &path, const QueueTypeInfo &type) override;
-
- void MakeRawWatcher(
- const ::std::string &path, const QueueTypeInfo &type,
- ::std::function<void(const ::aos::Message *message)> watcher) override;
-
- TimerHandler *AddTimer(::std::function<void()> callback) override {
- timers_.emplace_back(new SimulatedTimerHandler(scheduler_, callback));
- return timers_.back().get();
- }
-
- PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
- const monotonic_clock::duration interval,
- const monotonic_clock::duration offset =
- ::std::chrono::seconds(0)) override {
- phased_loops_.emplace_back(
- new SimulatedPhasedLoopHandler(scheduler_, callback, interval, offset));
- return phased_loops_.back().get();
- }
-
- void OnRun(::std::function<void()> on_run) override {
- scheduler_->Schedule(scheduler_->monotonic_now(), on_run);
- }
-
- void set_name(const char *name) override { name_ = name; }
-
- SimulatedQueue *GetSimulatedQueue(
- const ::std::pair<::std::string, QueueTypeInfo> &);
-
- void Take(const ::std::string &path);
-
- void SetRuntimeRealtimePriority(int /*priority*/) override {
- if (is_running()) {
- ::aos::Die("Cannot set realtime priority while running.");
- }
- }
-
- private:
- EventScheduler *scheduler_;
- ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue>
- *queues_;
- ::std::vector<std::string> taken_;
- ::std::vector<std::unique_ptr<TimerHandler>> timers_;
- ::std::vector<std::unique_ptr<PhasedLoopHandler>> phased_loops_;
-
- ::std::string name_;
-};
-
-EventScheduler::Token EventScheduler::Schedule(
- ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
- return events_list_.emplace(time, callback);
-}
-
-void EventScheduler::Deschedule(EventScheduler::Token token) {
- events_list_.erase(token);
-}
-
-void EventScheduler::RunFor(monotonic_clock::duration duration) {
- const ::aos::monotonic_clock::time_point end_time =
- monotonic_now() + duration;
- testing::MockTime(monotonic_now());
- for (RawEventLoop *event_loop : raw_event_loops_) {
- event_loop->set_is_running(true);
- }
- is_running_ = true;
- while (!events_list_.empty() && is_running_) {
- auto iter = events_list_.begin();
- ::aos::monotonic_clock::time_point next_time = iter->first;
- if (next_time > end_time) {
- break;
- }
- now_ = iter->first;
- testing::MockTime(now_);
- ::std::function<void()> callback = ::std::move(iter->second);
- events_list_.erase(iter);
- callback();
- }
- now_ = end_time;
- if (!is_running_) {
- for (RawEventLoop *event_loop : raw_event_loops_) {
- event_loop->set_is_running(false);
- }
- }
- testing::UnMockTime();
-}
-
-void EventScheduler::Run() {
- testing::MockTime(monotonic_now());
- for (RawEventLoop *event_loop : raw_event_loops_) {
- event_loop->set_is_running(true);
- }
- is_running_ = true;
- while (!events_list_.empty() && is_running_) {
- auto iter = events_list_.begin();
- now_ = iter->first;
- testing::MockTime(now_);
- ::std::function<void()> callback = ::std::move(iter->second);
- events_list_.erase(iter);
- callback();
- }
- if (!is_running_) {
- for (RawEventLoop *event_loop : raw_event_loops_) {
- event_loop->set_is_running(false);
- }
- }
- testing::UnMockTime();
-}
-
-void SimulatedEventLoop::MakeRawWatcher(
- const std::string &path, const QueueTypeInfo &type,
- std::function<void(const aos::Message *message)> watcher) {
- Take(path);
- ::std::pair<::std::string, QueueTypeInfo> key(path, type);
- GetSimulatedQueue(key)->MakeRawWatcher(watcher);
-}
-
-std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
- const std::string &path, const QueueTypeInfo &type) {
- Take(path);
- ::std::pair<::std::string, QueueTypeInfo> key(path, type);
- return GetSimulatedQueue(key)->MakeRawSender(this);
-}
-
-std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
- const std::string &path, const QueueTypeInfo &type) {
- ::std::pair<::std::string, QueueTypeInfo> key(path, type);
- return GetSimulatedQueue(key)->MakeRawFetcher();
-}
-
-SimulatedQueue *SimulatedEventLoop::GetSimulatedQueue(
- const ::std::pair<::std::string, QueueTypeInfo> &type) {
- auto it = queues_->find(type);
- if (it == queues_->end()) {
- it =
- queues_
- ->emplace(type, SimulatedQueue(type.second, type.first, scheduler_))
- .first;
- }
- return &it->second;
-}
-
-void SimulatedQueue::MakeRawWatcher(
- ::std::function<void(const aos::Message *message)> watcher) {
- watchers_.push_back(watcher);
-}
-
-::std::unique_ptr<RawSender> SimulatedQueue::MakeRawSender(
- EventLoop *event_loop) {
- return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
-}
-
-::std::unique_ptr<RawFetcher> SimulatedQueue::MakeRawFetcher() {
- ::std::unique_ptr<SimulatedFetcher> fetcher(new SimulatedFetcher(this));
- fetchers_.push_back(fetcher.get());
- return ::std::move(fetcher);
-}
-
-void SimulatedQueue::Send(RefCountedBuffer message) {
- latest_message_ = message;
- if (scheduler_->is_running()) {
- for (auto &watcher : watchers_) {
- scheduler_->Schedule(scheduler_->monotonic_now(),
- [watcher, message]() { watcher(message.get()); });
- }
- }
- for (auto &fetcher : fetchers_) {
- fetcher->Enqueue(message);
- }
-}
-
-void SimulatedQueue::UnregisterFetcher(SimulatedFetcher *fetcher) {
- fetchers_.erase(::std::find(fetchers_.begin(), fetchers_.end(), fetcher));
-}
-
-void SimulatedEventLoop::Take(const ::std::string &path) {
- if (is_running()) {
- ::aos::Die("Cannot add new objects while running.\n");
- }
- const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
- if (prior != taken_.end()) {
- ::aos::Die("%s is already being used.", path.c_str());
- } else {
- taken_.emplace_back(path);
- }
-}
-
-::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop() {
- return ::std::unique_ptr<EventLoop>(
- new SimulatedEventLoop(&scheduler_, &queues_));
-}
-
-} // namespace aos
diff --git a/aos/events/simulated-event-loop.h b/aos/events/simulated-event-loop.h
deleted file mode 100644
index 2fcc9d1..0000000
--- a/aos/events/simulated-event-loop.h
+++ /dev/null
@@ -1,214 +0,0 @@
-#ifndef _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
-#define _AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
-
-#include <algorithm>
-#include <map>
-#include <memory>
-#include <unordered_set>
-#include <utility>
-#include <vector>
-
-#include "aos/events/event-loop.h"
-
-namespace aos {
-
-// This class manages allocation of queue messages for simulation.
-// Unfortunately, because the current interfaces all assume that we pass around
-// raw pointers to messages we can't use a std::shared_ptr or the such, and
-// because aos::Message's themselves to not have any sort of built-in support
-// for this, we need to manage memory for the Messages in some custom fashion.
-// In this case, we do so by allocating a ref-counter in the bytes immediately
-// preceding the aos::Message. We then provide a constructor that takes just a
-// pointer to an existing message and we assume that it was allocated using this
-// class, and can decrement the counter if the RefCountedBuffer we constructed
-// goes out of scope. There are currently no checks to ensure that pointers
-// passed into this class were actually allocated using this class.
-class RefCountedBuffer {
- public:
- RefCountedBuffer() {}
- ~RefCountedBuffer() { clear(); }
-
- // Create a RefCountedBuffer for some Message that was already allocated using
- // a RefCountedBuffer class. This, or some function like it, is required to
- // allow us to let users of the simulated event loops work with raw pointers
- // to messages.
- explicit RefCountedBuffer(aos::Message *data) : data_(data) {}
-
- // Allocates memory for a new message of a given size. Does not initialize the
- // memory or call any constructors.
- explicit RefCountedBuffer(size_t size) {
- data_ = reinterpret_cast<uint8_t *>(malloc(kRefCountSize + size)) +
- kRefCountSize;
- // Initialize the allocated memory with an integer
- *GetRefCount() = 1;
- }
-
- RefCountedBuffer(const RefCountedBuffer &other) {
- data_ = other.data_;
- if (data_ != nullptr) {
- ++*GetRefCount();
- }
- }
-
- RefCountedBuffer(RefCountedBuffer &&other) { std::swap(data_, other.data_); }
-
- RefCountedBuffer &operator=(const RefCountedBuffer &other) {
- if (this == &other) return *this;
- clear();
- data_ = other.data_;
- ++*GetRefCount();
- return *this;
- }
-
- RefCountedBuffer &operator=(RefCountedBuffer &&other) {
- if (this == &other) return *this;
- std::swap(data_, other.data_);
- return *this;
- }
-
- operator bool() const { return data_ != nullptr; }
-
- aos::Message *get() const { return static_cast<aos::Message *>(data_); }
-
- aos::Message *release() {
- auto tmp = get();
- data_ = nullptr;
- return tmp;
- }
-
- void clear() {
- if (data_ != nullptr) {
- if (--*GetRefCount() == 0) {
- // Free memory block from the start of the allocated block
- free(GetRefCount());
- }
- data_ = nullptr;
- }
- }
-
- private:
- void *data_ = nullptr;
- // Qty. memory to be allocated to the ref counter
- static constexpr size_t kRefCountSize = sizeof(int64_t);
-
- int64_t *GetRefCount() {
- // Need to cast the void* to an 8 bit long object (size of void* is
- // technically 0)
- return reinterpret_cast<int64_t *>(static_cast<void *>(
- reinterpret_cast<uint8_t *>(data_) - kRefCountSize));
- }
-};
-
-class EventScheduler {
- public:
- using QueueType = ::std::multimap<::aos::monotonic_clock::time_point,
- ::std::function<void()>>;
- using Token = QueueType::iterator;
-
- // Schedule an event with a callback function
- // Returns an iterator to the event
- Token Schedule(::aos::monotonic_clock::time_point time,
- ::std::function<void()> callback);
-
- // Deschedule an event by its iterator
- void Deschedule(Token token);
-
- void Run();
- void RunFor(::aos::monotonic_clock::duration duration);
-
- void Exit() {
- is_running_ = false;
- }
-
- bool is_running() const { return is_running_; }
-
- void AddRawEventLoop(RawEventLoop *event_loop) {
- raw_event_loops_.push_back(event_loop);
- }
- void RemoveRawEventLoop(RawEventLoop *event_loop) {
- raw_event_loops_.erase(::std::find(raw_event_loops_.begin(),
- raw_event_loops_.end(), event_loop));
- }
-
- ::aos::monotonic_clock::time_point monotonic_now() const { return now_; }
-
- private:
- ::aos::monotonic_clock::time_point now_ = ::aos::monotonic_clock::epoch();
- QueueType events_list_;
- bool is_running_ = false;
- ::std::vector<RawEventLoop *> raw_event_loops_;
-};
-
-// Class for simulated fetchers.
-class SimulatedFetcher;
-
-class SimulatedQueue {
- public:
- explicit SimulatedQueue(const QueueTypeInfo &type, const ::std::string &name,
- EventScheduler *scheduler)
- : type_(type), name_(name), scheduler_(scheduler){};
-
- ~SimulatedQueue() { AOS_CHECK_EQ(0u, fetchers_.size()); }
-
- // Makes a connected raw sender which calls Send below.
- ::std::unique_ptr<RawSender> MakeRawSender(EventLoop *event_loop);
-
- // Makes a connected raw fetcher.
- ::std::unique_ptr<RawFetcher> MakeRawFetcher();
-
- // Registers a watcher for the queue.
- void MakeRawWatcher(
- ::std::function<void(const ::aos::Message *message)> watcher);
-
- // Sends the message to all the connected receivers and fetchers.
- void Send(RefCountedBuffer message);
-
- // Unregisters a fetcher.
- void UnregisterFetcher(SimulatedFetcher *fetcher);
-
- const RefCountedBuffer &latest_message() { return latest_message_; }
-
- size_t size() const { return type_.size; }
-
- const char *name() const { return name_.c_str(); }
-
- private:
- const QueueTypeInfo type_;
- const ::std::string name_;
-
- // List of all watchers.
- ::std::vector<std::function<void(const aos::Message *message)>> watchers_;
-
- // List of all fetchers.
- ::std::vector<SimulatedFetcher *> fetchers_;
- RefCountedBuffer latest_message_;
- EventScheduler *scheduler_;
-};
-
-class SimulatedEventLoopFactory {
- public:
- ::std::unique_ptr<EventLoop> MakeEventLoop();
-
- // Starts executing the event loops unconditionally.
- void Run() { scheduler_.Run(); }
- // Executes the event loops for a duration.
- void RunFor(monotonic_clock::duration duration) {
- scheduler_.RunFor(duration);
- }
-
- // Stops executing all event loops. Meant to be called from within an event
- // loop handler.
- void Exit() { scheduler_.Exit(); }
-
- monotonic_clock::time_point monotonic_now() const {
- return scheduler_.monotonic_now();
- }
-
- private:
- EventScheduler scheduler_;
- ::std::map<::std::pair<::std::string, QueueTypeInfo>, SimulatedQueue> queues_;
-};
-
-} // namespace aos
-
-#endif //_AOS_EVENTS_TEST_EVENT_LOOP_H_
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
new file mode 100644
index 0000000..9bc74d5
--- /dev/null
+++ b/aos/events/simulated_event_loop.cc
@@ -0,0 +1,514 @@
+#include "aos/events/simulated_event_loop.h"
+
+#include <algorithm>
+#include <deque>
+
+#include "absl/container/btree_map.h"
+#include "absl/container/btree_set.h"
+#include "aos/json_to_flatbuffer.h"
+#include "aos/util/phased_loop.h"
+
+namespace aos {
+
+// Container for both a message, and the context for it for simulation. This
+// makes tracking the timestamps associated with the data easy.
+struct SimulatedMessage {
+ // Struct to let us force data to be well aligned.
+ struct OveralignedChar {
+ char data alignas(32);
+ };
+
+ // Context for the data.
+ Context context;
+
+ // The data.
+ char *data() { return reinterpret_cast<char *>(&actual_data[0]); }
+
+ // Then the data.
+ OveralignedChar actual_data[];
+};
+
+class SimulatedFetcher;
+
+class SimulatedChannel {
+ public:
+ explicit SimulatedChannel(const Channel *channel, EventScheduler *scheduler)
+ : channel_(CopyFlatBuffer(channel)),
+ scheduler_(scheduler),
+ next_queue_index_(ipc_lib::QueueIndex::Zero(channel->max_size())) {}
+
+ ~SimulatedChannel() { CHECK_EQ(0u, fetchers_.size()); }
+
+ // Makes a connected raw sender which calls Send below.
+ ::std::unique_ptr<RawSender> MakeRawSender(EventLoop *event_loop);
+
+ // Makes a connected raw fetcher.
+ ::std::unique_ptr<RawFetcher> MakeRawFetcher();
+
+ // Registers a watcher for the queue.
+ void MakeRawWatcher(
+ ::std::function<void(const Context &context, const void *message)>
+ watcher);
+
+ // Sends the message to all the connected receivers and fetchers.
+ void Send(std::shared_ptr<SimulatedMessage> message);
+
+ // Unregisters a fetcher.
+ void UnregisterFetcher(SimulatedFetcher *fetcher);
+
+ std::shared_ptr<SimulatedMessage> latest_message() { return latest_message_; }
+
+ size_t max_size() const { return channel_.message().max_size(); }
+
+ const absl::string_view name() const {
+ return channel_.message().name()->string_view();
+ }
+
+ const Channel *channel() const { return &channel_.message(); }
+
+ private:
+ const FlatbufferDetachedBuffer<Channel> channel_;
+
+ // List of all watchers.
+ ::std::vector<
+ std::function<void(const Context &context, const void *message)>>
+ watchers_;
+
+ // List of all fetchers.
+ ::std::vector<SimulatedFetcher *> fetchers_;
+ std::shared_ptr<SimulatedMessage> latest_message_;
+ EventScheduler *scheduler_;
+
+ ipc_lib::QueueIndex next_queue_index_;
+};
+
+namespace {
+
+// Creates a SimulatedMessage with size bytes of storage.
+// This is a shared_ptr so we don't have to implement refcounting or copying.
+std::shared_ptr<SimulatedMessage> MakeSimulatedMessage(size_t size) {
+ SimulatedMessage *message = reinterpret_cast<SimulatedMessage *>(
+ malloc(sizeof(SimulatedMessage) + size));
+ message->context.size = size;
+ message->context.data = message->data();
+
+ return std::shared_ptr<SimulatedMessage>(message, free);
+}
+
+class SimulatedSender : public RawSender {
+ public:
+ SimulatedSender(SimulatedChannel *simulated_channel, EventLoop *event_loop)
+ : simulated_channel_(simulated_channel), event_loop_(event_loop) {}
+ ~SimulatedSender() {}
+
+ void *data() override {
+ if (!message_) {
+ message_ = MakeSimulatedMessage(simulated_channel_->max_size());
+ }
+ return message_->data();
+ }
+
+ size_t size() override { return simulated_channel_->max_size(); }
+
+ bool Send(size_t length) override {
+ CHECK_LE(length, size()) << ": Attempting to send too big a message.";
+ message_->context.monotonic_sent_time = event_loop_->monotonic_now();
+ message_->context.realtime_sent_time = event_loop_->realtime_now();
+ CHECK_LE(length, message_->context.size);
+ message_->context.size = length;
+
+ // TODO(austin): Track sending too fast.
+ simulated_channel_->Send(message_);
+
+ // Drop the reference to the message so that we allocate a new message for
+ // next time. Otherwise we will continue to reuse the same memory for all
+ // messages and corrupt it.
+ message_.reset();
+ return true;
+ }
+
+ bool Send(void *msg, size_t size) override {
+ CHECK_LE(size, this->size()) << ": Attempting to send too big a message.";
+
+ // This is wasteful, but since flatbuffers fill from the back end of the
+ // queue, we need it to be full sized.
+ message_ = MakeSimulatedMessage(simulated_channel_->max_size());
+
+ // Now fill in the message. size is already populated above, and
+ // queue_index will be populated in queue_. Put this at the back of the
+ // data segment.
+ memcpy(message_->data() + simulated_channel_->max_size() - size, msg, size);
+
+ return Send(size);
+ }
+
+ const absl::string_view name() const override {
+ return simulated_channel_->name();
+ }
+
+ private:
+ SimulatedChannel *simulated_channel_;
+ EventLoop *event_loop_;
+
+ std::shared_ptr<SimulatedMessage> message_;
+};
+} // namespace
+
+class SimulatedFetcher : public RawFetcher {
+ public:
+ explicit SimulatedFetcher(SimulatedChannel *queue) : queue_(queue) {}
+ ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
+
+ bool FetchNext() override {
+ if (msgs_.size() == 0) return false;
+
+ SetMsg(msgs_.front());
+ msgs_.pop_front();
+ return true;
+ }
+
+ bool Fetch() override {
+ if (msgs_.size() == 0) {
+ if (!msg_ && queue_->latest_message()) {
+ SetMsg(queue_->latest_message());
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // We've had a message enqueued, so we don't need to go looking for the
+ // latest message from before we started.
+ SetMsg(msgs_.back());
+ msgs_.clear();
+ return true;
+ }
+
+ private:
+ friend class SimulatedChannel;
+
+ // Updates the state inside RawFetcher to point to the data in msg_.
+ void SetMsg(std::shared_ptr<SimulatedMessage> msg) {
+ msg_ = msg;
+ data_ = msg_->context.data;
+ context_ = msg_->context;
+ }
+
+ // Internal method for Simulation to add a message to the buffer.
+ void Enqueue(std::shared_ptr<SimulatedMessage> buffer) {
+ msgs_.emplace_back(buffer);
+ }
+
+ SimulatedChannel *queue_;
+ std::shared_ptr<SimulatedMessage> msg_;
+
+ // Messages queued up but not in use.
+ ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
+};
+
+class SimulatedTimerHandler : public TimerHandler {
+ public:
+ explicit SimulatedTimerHandler(EventScheduler *scheduler,
+ ::std::function<void()> fn)
+ : scheduler_(scheduler), token_(scheduler_->InvalidToken()), fn_(fn) {}
+ ~SimulatedTimerHandler() {}
+
+ void Setup(monotonic_clock::time_point base,
+ monotonic_clock::duration repeat_offset) override {
+ Disable();
+ const ::aos::monotonic_clock::time_point monotonic_now =
+ scheduler_->monotonic_now();
+ base_ = base;
+ repeat_offset_ = repeat_offset;
+ if (base < monotonic_now) {
+ token_ = scheduler_->Schedule(monotonic_now, [this]() { HandleEvent(); });
+ } else {
+ token_ = scheduler_->Schedule(base, [this]() { HandleEvent(); });
+ }
+ }
+
+ void HandleEvent() {
+ const ::aos::monotonic_clock::time_point monotonic_now =
+ scheduler_->monotonic_now();
+ if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
+ // Reschedule.
+ while (base_ <= monotonic_now) base_ += repeat_offset_;
+ token_ = scheduler_->Schedule(base_, [this]() { HandleEvent(); });
+ } else {
+ token_ = scheduler_->InvalidToken();
+ }
+ fn_();
+ }
+
+ void Disable() override {
+ if (token_ != scheduler_->InvalidToken()) {
+ scheduler_->Deschedule(token_);
+ token_ = scheduler_->InvalidToken();
+ }
+ }
+
+ ::aos::monotonic_clock::time_point monotonic_now() const {
+ return scheduler_->monotonic_now();
+ }
+
+ private:
+ EventScheduler *scheduler_;
+ EventScheduler::Token token_;
+ // Function to be run on the thread
+ ::std::function<void()> fn_;
+ monotonic_clock::time_point base_;
+ monotonic_clock::duration repeat_offset_;
+};
+
+class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
+ public:
+ SimulatedPhasedLoopHandler(EventScheduler *scheduler,
+ ::std::function<void(int)> fn,
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset)
+ : simulated_timer_handler_(scheduler, [this]() { HandleTimerWakeup(); }),
+ phased_loop_(interval, simulated_timer_handler_.monotonic_now(),
+ offset),
+ fn_(fn) {
+ // TODO(austin): This assumes time doesn't change between when the
+ // constructor is called and when we start running. It's probably a safe
+ // assumption.
+ Reschedule();
+ }
+
+ void HandleTimerWakeup() {
+ fn_(cycles_elapsed_);
+ Reschedule();
+ }
+
+ void set_interval_and_offset(
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset) override {
+ phased_loop_.set_interval_and_offset(interval, offset);
+ }
+
+ void Reschedule() {
+ cycles_elapsed_ =
+ phased_loop_.Iterate(simulated_timer_handler_.monotonic_now());
+ simulated_timer_handler_.Setup(phased_loop_.sleep_time(),
+ ::aos::monotonic_clock::zero());
+ }
+
+ private:
+ SimulatedTimerHandler simulated_timer_handler_;
+
+ time::PhasedLoop phased_loop_;
+
+ int cycles_elapsed_ = 1;
+
+ ::std::function<void(int)> fn_;
+};
+
+class SimulatedEventLoop : public EventLoop {
+ public:
+ explicit SimulatedEventLoop(
+ EventScheduler *scheduler,
+ absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
+ *channels,
+ const Configuration *configuration,
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ *raw_event_loops)
+ : EventLoop(configuration),
+ scheduler_(scheduler),
+ channels_(channels),
+ raw_event_loops_(raw_event_loops) {
+ raw_event_loops_->push_back(
+ std::make_pair(this, [this](bool value) { set_is_running(value); }));
+ }
+ ~SimulatedEventLoop() override {
+ for (auto it = raw_event_loops_->begin(); it != raw_event_loops_->end();
+ ++it) {
+ if (it->first == this) {
+ raw_event_loops_->erase(it);
+ break;
+ }
+ }
+ }
+
+ ::aos::monotonic_clock::time_point monotonic_now() override {
+ return scheduler_->monotonic_now();
+ }
+
+ ::aos::realtime_clock::time_point realtime_now() override {
+ return scheduler_->realtime_now();
+ }
+
+ ::std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
+
+ ::std::unique_ptr<RawFetcher> MakeRawFetcher(const Channel *channel) override;
+
+ void MakeRawWatcher(
+ const Channel *channel,
+ ::std::function<void(const Context &context, const void *message)>
+ watcher) override;
+
+ TimerHandler *AddTimer(::std::function<void()> callback) override {
+ timers_.emplace_back(new SimulatedTimerHandler(scheduler_, callback));
+ return timers_.back().get();
+ }
+
+ PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset =
+ ::std::chrono::seconds(0)) override {
+ phased_loops_.emplace_back(
+ new SimulatedPhasedLoopHandler(scheduler_, callback, interval, offset));
+ return phased_loops_.back().get();
+ }
+
+ void OnRun(::std::function<void()> on_run) override {
+ scheduler_->Schedule(scheduler_->monotonic_now(), on_run);
+ }
+
+ void set_name(const absl::string_view name) override {
+ name_ = std::string(name);
+ }
+ const absl::string_view name() const override { return name_; }
+
+ SimulatedChannel *GetSimulatedChannel(const Channel *channel);
+
+ void Take(const Channel *channel);
+
+ void SetRuntimeRealtimePriority(int /*priority*/) override {
+ CHECK(!is_running()) << ": Cannot set realtime priority while running.";
+ }
+
+ private:
+ EventScheduler *scheduler_;
+ absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ *raw_event_loops_;
+ absl::btree_set<SimpleChannel> taken_;
+ ::std::vector<std::unique_ptr<TimerHandler>> timers_;
+ ::std::vector<std::unique_ptr<PhasedLoopHandler>> phased_loops_;
+
+ ::std::string name_;
+};
+
+void SimulatedEventLoop::MakeRawWatcher(
+ const Channel *channel,
+ std::function<void(const Context &channel, const void *message)> watcher) {
+ Take(channel);
+ GetSimulatedChannel(channel)->MakeRawWatcher(watcher);
+}
+
+std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
+ const Channel *channel) {
+ Take(channel);
+ return GetSimulatedChannel(channel)->MakeRawSender(this);
+}
+
+std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
+ const Channel *channel) {
+ return GetSimulatedChannel(channel)->MakeRawFetcher();
+}
+
+SimulatedChannel *SimulatedEventLoop::GetSimulatedChannel(
+ const Channel *channel) {
+ auto it = channels_->find(SimpleChannel(channel));
+ if (it == channels_->end()) {
+ it = channels_
+ ->emplace(SimpleChannel(channel),
+ std::unique_ptr<SimulatedChannel>(
+ new SimulatedChannel(channel, scheduler_)))
+ .first;
+ }
+ return it->second.get();
+}
+
+void SimulatedChannel::MakeRawWatcher(
+ ::std::function<void(const Context &context, const void *message)>
+ watcher) {
+ watchers_.push_back(watcher);
+}
+
+::std::unique_ptr<RawSender> SimulatedChannel::MakeRawSender(
+ EventLoop *event_loop) {
+ return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
+}
+
+::std::unique_ptr<RawFetcher> SimulatedChannel::MakeRawFetcher() {
+ ::std::unique_ptr<SimulatedFetcher> fetcher(new SimulatedFetcher(this));
+ fetchers_.push_back(fetcher.get());
+ return ::std::move(fetcher);
+}
+
+void SimulatedChannel::Send(std::shared_ptr<SimulatedMessage> message) {
+ message->context.queue_index = next_queue_index_.index();
+ message->context.data =
+ message->data() + channel()->max_size() - message->context.size;
+ next_queue_index_ = next_queue_index_.Increment();
+
+ latest_message_ = message;
+ if (scheduler_->is_running()) {
+ for (auto &watcher : watchers_) {
+ scheduler_->Schedule(scheduler_->monotonic_now(), [watcher, message]() {
+ watcher(message->context, message->context.data);
+ });
+ }
+ }
+ for (auto &fetcher : fetchers_) {
+ fetcher->Enqueue(message);
+ }
+}
+
+void SimulatedChannel::UnregisterFetcher(SimulatedFetcher *fetcher) {
+ fetchers_.erase(::std::find(fetchers_.begin(), fetchers_.end(), fetcher));
+}
+
+SimpleChannel::SimpleChannel(const Channel *channel)
+ : name(CHECK_NOTNULL(CHECK_NOTNULL(channel)->name())->str()),
+ type(CHECK_NOTNULL(CHECK_NOTNULL(channel)->type())->str()) {}
+
+void SimulatedEventLoop::Take(const Channel *channel) {
+ CHECK(!is_running()) << ": Cannot add new objects while running.";
+
+ auto result = taken_.insert(SimpleChannel(channel));
+ CHECK(result.second) << ": " << FlatbufferToJson(channel)
+ << " is already being used.";
+}
+
+SimulatedEventLoopFactory::SimulatedEventLoopFactory(
+ const Configuration *configuration)
+ : configuration_(configuration) {}
+SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
+
+::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop() {
+ return ::std::unique_ptr<EventLoop>(new SimulatedEventLoop(
+ &scheduler_, &channels_, configuration_, &raw_event_loops_));
+}
+
+void SimulatedEventLoopFactory::RunFor(monotonic_clock::duration duration) {
+ for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
+ raw_event_loops_) {
+ event_loop.second(true);
+ }
+ scheduler_.RunFor(duration);
+ if (!scheduler_.is_running()) {
+ for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
+ raw_event_loops_) {
+ event_loop.second(false);
+ }
+ }
+}
+
+void SimulatedEventLoopFactory::Run() {
+ for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
+ raw_event_loops_) {
+ event_loop.second(true);
+ }
+ scheduler_.Run();
+ if (!scheduler_.is_running()) {
+ for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
+ raw_event_loops_) {
+ event_loop.second(false);
+ }
+ }
+}
+
+} // namespace aos
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
new file mode 100644
index 0000000..485ab68
--- /dev/null
+++ b/aos/events/simulated_event_loop.h
@@ -0,0 +1,86 @@
+#ifndef AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
+#define AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
+
+#include <algorithm>
+#include <map>
+#include <memory>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "absl/container/btree_map.h"
+#include "aos/events/event_loop.h"
+#include "aos/events/event_scheduler.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/flatbuffers.h"
+#include "aos/ipc_lib/index.h"
+#include "glog/logging.h"
+
+namespace aos {
+
+// Class for simulated fetchers.
+class SimulatedChannel;
+
+struct SimpleChannel {
+ SimpleChannel(const Channel *channel);
+ std::string name;
+ std::string type;
+
+ std::string DebugString() const {
+ return std::string("{ ") + name + ", " + type + "}";
+ }
+
+ bool operator==(const SimpleChannel &other) const {
+ return name == other.name && type == other.type;
+ }
+ bool operator<(const SimpleChannel &other) const {
+ int name_compare = other.name.compare(name);
+ if (name_compare == 0) {
+ return other.type < type;
+ } else if (name_compare < 0) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
+
+class SimulatedEventLoopFactory {
+ public:
+ // Constructs a SimulatedEventLoopFactory with the provided configuration.
+ // This configuration must remain in scope for the lifetime of the factory and
+ // all sub-objects.
+ SimulatedEventLoopFactory(const Configuration *configuration);
+ ~SimulatedEventLoopFactory();
+
+ ::std::unique_ptr<EventLoop> MakeEventLoop();
+
+ // Starts executing the event loops unconditionally.
+ void Run();
+ // Executes the event loops for a duration.
+ void RunFor(monotonic_clock::duration duration);
+
+ // Stops executing all event loops. Meant to be called from within an event
+ // loop handler.
+ void Exit() { scheduler_.Exit(); }
+
+ monotonic_clock::time_point monotonic_now() const {
+ return scheduler_.monotonic_now();
+ }
+ realtime_clock::time_point realtime_now() const {
+ return scheduler_.realtime_now();
+ }
+
+ private:
+ const Configuration *configuration_;
+ EventScheduler scheduler_;
+ // Map from name, type to queue.
+ absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
+ // List of event loops to manage running and not running for.
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ raw_event_loops_;
+};
+
+} // namespace aos
+
+#endif // AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
diff --git a/aos/events/simulated-event-loop_test.cc b/aos/events/simulated_event_loop_test.cc
similarity index 90%
rename from aos/events/simulated-event-loop_test.cc
rename to aos/events/simulated_event_loop_test.cc
index e987a11..356c25b 100644
--- a/aos/events/simulated-event-loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -1,5 +1,6 @@
-#include "aos/events/simulated-event-loop.h"
-#include "aos/events/event-loop_param_test.h"
+#include "aos/events/simulated_event_loop.h"
+
+#include "aos/events/event_loop_param_test.h"
#include "gtest/gtest.h"
namespace aos {
@@ -9,6 +10,8 @@
class SimulatedEventLoopTestFactory : public EventLoopTestFactory {
public:
+ SimulatedEventLoopTestFactory() : event_loop_factory_(configuration()) {}
+
::std::unique_ptr<EventLoop> Make() override {
return event_loop_factory_.MakeEventLoop();
}
@@ -68,7 +71,7 @@
// Test that running for a time period with no handlers causes time to progress
// correctly.
TEST(SimulatedEventLoopTest, RunForNoHandlers) {
- SimulatedEventLoopFactory simulated_event_loop_factory;
+ SimulatedEventLoopFactory simulated_event_loop_factory(nullptr);
::std::unique_ptr<EventLoop> event_loop =
simulated_event_loop_factory.MakeEventLoop();
@@ -83,12 +86,12 @@
// Test that running for a time with a periodic handler causes time to end
// correctly.
TEST(SimulatedEventLoopTest, RunForTimerHandler) {
- SimulatedEventLoopFactory simulated_event_loop_factory;
+ SimulatedEventLoopFactory simulated_event_loop_factory(nullptr);
::std::unique_ptr<EventLoop> event_loop =
simulated_event_loop_factory.MakeEventLoop();
int counter = 0;
- auto timer = event_loop->AddTimer([&counter, &event_loop]() { ++counter; });
+ auto timer = event_loop->AddTimer([&counter]() { ++counter; });
event_loop->OnRun([&event_loop, &timer] {
timer->Setup(event_loop->monotonic_now() + chrono::milliseconds(50),
chrono::milliseconds(100));
diff --git a/aos/events/test_message.fbs b/aos/events/test_message.fbs
new file mode 100644
index 0000000..1d26c2e
--- /dev/null
+++ b/aos/events/test_message.fbs
@@ -0,0 +1,7 @@
+namespace aos;
+
+table TestMessage {
+ value:int;
+}
+
+root_type TestMessage;