Merge "Add relative encoder zeroing strategy"
diff --git a/WORKSPACE b/WORKSPACE
index 4a1b88f..ab7a70f 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -561,6 +561,27 @@
url = "http://www.frc971.org/Build-Dependencies/emscripten-llvm-e" + emscripten_version + ".tar.gz",
)
+new_http_archive(
+ name = "webrtc_x64",
+ build_file = "@//debian:webrtc.BUILD",
+ sha256 = "bd212b2a112a043d08d27f49027091788fa01c7c2ac5f072d096c17d9dbd976f",
+ url = "http://www.frc971.org/Build-Dependencies/webrtc-30326-1a68679-linux-x64.tar.gz",
+)
+
+new_http_archive(
+ name = "webrtc_arm",
+ build_file = "@//debian:webrtc.BUILD",
+ sha256 = "c34badaf313877cd03a0dfd6b71de024d806a7652550a7f1cd7dea523a7c813d",
+ url = "http://www.frc971.org/Build-Dependencies/webrtc-30326-1a68679-linux-arm.tar.gz",
+)
+
+new_http_archive(
+ name = "webrtc_rio",
+ build_file = "@//debian:webrtc.BUILD",
+ sha256 = "d86d3b030099b35ae5ea31c807fb4d0b0352598e79f1ea84877e5504e185faa8",
+ url = "http://www.frc971.org/Build-Dependencies/webrtc-30376-4c4735b-linux-rio.tar.gz",
+)
+
# Fetch our Bazel dependencies that aren't distributed on npm
load("@build_bazel_rules_typescript//:package.bzl", "rules_typescript_dependencies")
diff --git a/aos/config_flattener.cc b/aos/config_flattener.cc
index 6fb4af3..71cda32 100644
--- a/aos/config_flattener.cc
+++ b/aos/config_flattener.cc
@@ -27,8 +27,7 @@
&configuration::MergeConfiguration(config, schemas).message(), true);
// TODO(austin): Figure out how to squash the schemas onto 1 line so it is
- // easier to read? Or figure out how to split them into a second file which
- // gets included.
+ // easier to read?
VLOG(1) << "Flattened config is " << merged_config;
util::WriteStringToFileOrDie(argv[1], merged_config);
return 0;
diff --git a/aos/configuration.cc b/aos/configuration.cc
index a40c47c..c4f139c 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -470,6 +470,18 @@
}
}
+size_t ChannelIndex(const Configuration *configuration,
+ const Channel *channel) {
+ CHECK(configuration->channels() != nullptr) << ": No channels";
+
+ auto c = std::find(configuration->channels()->begin(),
+ configuration->channels()->end(), channel);
+ CHECK(c != configuration->channels()->end())
+ << ": Channel pointer not found in configuration()->channels()";
+
+ return std::distance(configuration->channels()->begin(), c);
+}
+
std::string CleanedChannelToString(const Channel *channel) {
FlatbufferDetachedBuffer<Channel> cleaned_channel = CopyFlatBuffer(channel);
cleaned_channel.mutable_message()->clear_schema();
@@ -591,6 +603,7 @@
}
return nullptr;
}
+
const Node *GetMyNode(const Configuration *config) {
const std::string hostname = (FLAGS_override_hostname.size() > 0)
? FLAGS_override_hostname
@@ -604,6 +617,17 @@
return nullptr;
}
+const Node *GetNode(const Configuration *config, const Node *node) {
+ if (!MultiNode(config)) {
+ CHECK(node == nullptr) << ": Provided a node in a single node world.";
+ return nullptr;
+ } else {
+ CHECK(node != nullptr);
+ CHECK(node->has_name());
+ return GetNode(config, node->name()->string_view());
+ }
+}
+
const Node *GetNode(const Configuration *config, std::string_view name) {
CHECK(config->has_nodes())
<< ": Asking for a node from a single node configuration.";
@@ -616,6 +640,46 @@
return nullptr;
}
+const Node *GetNodeOrDie(const Configuration *config, const Node *node) {
+ if (!MultiNode(config)) {
+ CHECK(node == nullptr) << ": Provided a node in a single node world.";
+ return nullptr;
+ } else {
+ const Node *config_node = GetNode(config, node);
+ if (config_node == nullptr) {
+ LOG(FATAL) << "Couldn't find node matching " << FlatbufferToJson(node);
+ }
+ return config_node;
+ }
+}
+
+int GetNodeIndex(const Configuration *config, const Node *node) {
+ CHECK(config->has_nodes())
+ << ": Asking for a node from a single node configuration.";
+ int node_index = 0;
+ for (const Node *iterated_node : *config->nodes()) {
+ if (iterated_node == node) {
+ return node_index;
+ }
+ ++node_index;
+ }
+ LOG(FATAL) << "Node not found in the configuration.";
+}
+
+std::vector<const Node *> GetNodes(const Configuration *config) {
+ std::vector<const Node *> nodes;
+ if (configuration::MultiNode(config)) {
+ for (const Node *node : *config->nodes()) {
+ nodes.emplace_back(node);
+ }
+ } else {
+ nodes.emplace_back(nullptr);
+ }
+ return nodes;
+}
+
+bool MultiNode(const Configuration *config) { return config->has_nodes(); }
+
bool ChannelIsSendableOnNode(const Channel *channel, const Node *node) {
if (node == nullptr) {
return true;
diff --git a/aos/configuration.h b/aos/configuration.h
index ef30fce..a9fbfe3 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -60,15 +60,32 @@
channel->type()->string_view(), application_name, node);
}
+// Returns the channel index (or dies) of channel in the provided config.
+size_t ChannelIndex(const Configuration *config, const Channel *channel);
+
// Returns the Node out of the config with the matching name, or nullptr if it
// can't be found.
const Node *GetNode(const Configuration *config, std::string_view name);
+const Node *GetNode(const Configuration *config, const Node *node);
+// Returns a matching node, or nullptr if the provided node is nullptr and we
+// are in a single node world.
+const Node *GetNodeOrDie(const Configuration *config, const Node *node);
// Returns the Node out of the configuration which matches our hostname.
// CHECKs if it can't be found.
const Node *GetMyNode(const Configuration *config);
const Node *GetNodeFromHostname(const Configuration *config,
std::string_view name);
+// Returns a vector of the nodes in the config. (nullptr is considered the node
+// in a single node world.)
+std::vector<const Node *> GetNodes(const Configuration *config);
+
+// Returns the node index for a node. Note: node needs to exist inside config.
+int GetNodeIndex(const Configuration *config, const Node *node);
+
+// Returns true if we are running in a multinode configuration.
+bool MultiNode(const Configuration *config);
+
// Returns true if the provided channel is sendable on the provided node.
bool ChannelIsSendableOnNode(const Channel *channel, const Node *node);
// Returns true if the provided channel is able to be watched or fetched on the
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index b8fc5b6..70515fd 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -39,6 +39,16 @@
FlatbufferToJson(config, true));
}
+// Tests that we can get back a ChannelIndex.
+TEST_F(ConfigurationTest, ChannelIndex) {
+ FlatbufferDetachedBuffer<Configuration> config =
+ ReadConfig("aos/testdata/config1.json");
+
+ EXPECT_EQ(
+ ChannelIndex(&config.message(), config.message().channels()->Get(1u)),
+ 1u);
+}
+
// Tests that we can read and merge a multinode configuration.
TEST_F(ConfigurationTest, ConfigMergeMultinode) {
FlatbufferDetachedBuffer<Configuration> config =
@@ -599,6 +609,63 @@
::testing::ElementsAreArray({"pi1"}));
}
+// Tests that we can pull out all the nodes.
+TEST_F(ConfigurationTest, GetNodes) {
+ {
+ FlatbufferDetachedBuffer<Configuration> config =
+ ReadConfig("aos/testdata/good_multinode.json");
+ const Node *pi1 = GetNode(&config.message(), "pi1");
+ const Node *pi2 = GetNode(&config.message(), "pi2");
+
+ EXPECT_THAT(GetNodes(&config.message()), ::testing::ElementsAre(pi1, pi2));
+ }
+
+ {
+ FlatbufferDetachedBuffer<Configuration> config =
+ ReadConfig("aos/testdata/config1.json");
+ EXPECT_THAT(GetNodes(&config.message()), ::testing::ElementsAre(nullptr));
+ }
+}
+
+// Tests that we can extract a node index from a config.
+TEST_F(ConfigurationTest, GetNodeIndex) {
+ FlatbufferDetachedBuffer<Configuration> config =
+ ReadConfig("aos/testdata/good_multinode.json");
+ const Node *pi1 = GetNode(&config.message(), "pi1");
+ const Node *pi2 = GetNode(&config.message(), "pi2");
+
+ EXPECT_EQ(GetNodeIndex(&config.message(), pi1), 0);
+ EXPECT_EQ(GetNodeIndex(&config.message(), pi2), 1);
+}
+
+// Tests that GetNodeOrDie handles both single and multi-node worlds and returns
+// valid nodes.
+TEST_F(ConfigurationDeathTest, GetNodeOrDie) {
+ FlatbufferDetachedBuffer<Configuration> config =
+ ReadConfig("aos/testdata/good_multinode.json");
+ FlatbufferDetachedBuffer<Configuration> config2 =
+ ReadConfig("aos/testdata/good_multinode.json");
+ {
+ // Simple case, nullptr -> nullptr
+ FlatbufferDetachedBuffer<Configuration> single_node_config =
+ ReadConfig("aos/testdata/config1.json");
+ EXPECT_EQ(nullptr, GetNodeOrDie(&single_node_config.message(), nullptr));
+
+ // Confirm that we die when a node is passed in.
+ EXPECT_DEATH(
+ {
+ GetNodeOrDie(&single_node_config.message(),
+ config.message().nodes()->Get(0));
+ },
+ "Provided a node in a single node world.");
+ }
+
+ const Node *pi1 = GetNode(&config.message(), "pi1");
+ // Now try a lookup using a node from a different instance of the config.
+ EXPECT_EQ(pi1,
+ GetNodeOrDie(&config.message(), config2.message().nodes()->Get(0)));
+}
+
} // namespace testing
} // namespace configuration
} // namespace aos
diff --git a/aos/events/BUILD b/aos/events/BUILD
index a183cae..9cfdf10 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -41,6 +41,16 @@
],
)
+cc_test(
+ name = "epoll_test",
+ srcs = ["epoll_test.cc"],
+ deps = [
+ ":epoll",
+ "//aos/testing:googletest",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
cc_library(
name = "event_loop",
srcs = [
@@ -58,6 +68,7 @@
"//aos:configuration",
"//aos:configuration_fbs",
"//aos:flatbuffers",
+ "//aos/ipc_lib:data_alignment",
"//aos/time",
"//aos/util:phased_loop",
"@com_github_google_flatbuffers//:flatbuffers",
@@ -214,6 +225,7 @@
"//aos/ipc_lib:signalfd",
"//aos/stl_mutex",
"//aos/util:phased_loop",
+ "@com_google_absl//absl/base",
],
)
@@ -243,8 +255,11 @@
cc_test(
name = "simulated_event_loop_test",
srcs = ["simulated_event_loop_test.cc"],
+ data = ["multinode_pingpong_config.json"],
deps = [
":event_loop_param_test",
+ ":ping_lib",
+ ":pong_lib",
":simulated_event_loop",
"//aos/testing:googletest",
],
@@ -267,10 +282,12 @@
srcs = [
"event_scheduler.cc",
"simulated_event_loop.cc",
+ "simulated_network_bridge.cc",
],
hdrs = [
"event_scheduler.h",
"simulated_event_loop.h",
+ "simulated_network_bridge.h",
],
visibility = ["//visibility:public"],
deps = [
diff --git a/aos/events/epoll.cc b/aos/events/epoll.cc
index e19cf59..b9e7edb 100644
--- a/aos/events/epoll.cc
+++ b/aos/events/epoll.cc
@@ -71,6 +71,7 @@
}
void EPoll::Run() {
+ run_ = true;
while (true) {
// Pull a single event out. Infinite timeout if we are supposed to be
// running, and 0 length timeout otherwise. This lets us flush the event
@@ -90,34 +91,53 @@
return;
}
}
- EventData *event_data = static_cast<struct EventData *>(event.data.ptr);
- if (event.events & (EPOLLIN | EPOLLPRI)) {
+ EventData *const event_data = static_cast<struct EventData *>(event.data.ptr);
+ if (event.events & kInEvents) {
+ CHECK(event_data->in_fn)
+ << ": No handler registered for input events on " << event_data->fd;
event_data->in_fn();
}
+ if (event.events & kOutEvents) {
+ CHECK(event_data->out_fn)
+ << ": No handler registered for output events on " << event_data->fd;
+ event_data->out_fn();
+ }
}
}
void EPoll::Quit() { PCHECK(write(quit_signal_fd_, "q", 1) == 1); }
void EPoll::OnReadable(int fd, ::std::function<void()> function) {
- ::std::unique_ptr<EventData> event_data(
- new EventData{fd, ::std::move(function)});
+ EventData *event_data = GetEventData(fd);
+ if (event_data == nullptr) {
+ fns_.emplace_back(std::make_unique<EventData>(fd));
+ event_data = fns_.back().get();
+ } else {
+ CHECK(!event_data->in_fn) << ": Duplicate in functions for " << fd;
+ }
+ event_data->in_fn = ::std::move(function);
+ DoEpollCtl(event_data, event_data->events | kInEvents);
+}
- struct epoll_event event;
- event.events = EPOLLIN | EPOLLPRI;
- event.data.ptr = event_data.get();
- PCHECK(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) == 0)
- << ": Failed to add fd " << fd;
- fns_.push_back(::std::move(event_data));
+void EPoll::OnWriteable(int fd, ::std::function<void()> function) {
+ EventData *event_data = GetEventData(fd);
+ if (event_data == nullptr) {
+ fns_.emplace_back(std::make_unique<EventData>(fd));
+ event_data = fns_.back().get();
+ } else {
+ CHECK(!event_data->out_fn) << ": Duplicate out functions for " << fd;
+ }
+ event_data->out_fn = ::std::move(function);
+ DoEpollCtl(event_data, event_data->events | kOutEvents);
}
// Removes fd from the event loop.
void EPoll::DeleteFd(int fd) {
auto element = fns_.begin();
- PCHECK(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr) == 0);
while (fns_.end() != element) {
if (element->get()->fd == fd) {
fns_.erase(element);
+ PCHECK(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr) == 0);
return;
}
++element;
@@ -125,5 +145,50 @@
LOG(FATAL) << "fd " << fd << " not found";
}
+void EPoll::EnableEvents(int fd, uint32_t events) {
+ EventData *const event_data = CHECK_NOTNULL(GetEventData(fd));
+ DoEpollCtl(event_data, event_data->events | events);
+}
+
+void EPoll::DisableEvents(int fd, uint32_t events) {
+ EventData *const event_data = CHECK_NOTNULL(GetEventData(fd));
+ DoEpollCtl(event_data, event_data->events & ~events);
+}
+
+EPoll::EventData *EPoll::GetEventData(int fd) {
+ const auto iterator = std::find_if(
+ fns_.begin(), fns_.end(),
+ [fd](const std::unique_ptr<EventData> &data) { return data->fd == fd; });
+ if (iterator == fns_.end()) {
+ return nullptr;
+ }
+ return iterator->get();
+}
+
+void EPoll::DoEpollCtl(EventData *event_data, const uint32_t new_events) {
+ const uint32_t old_events = event_data->events;
+ event_data->events = new_events;
+ if (new_events == 0) {
+ if (old_events == 0) {
+ // Not added, and doesn't need to be. Nothing to do here.
+ return;
+ }
+ // It was added, but should now be removed.
+ PCHECK(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, event_data->fd, nullptr) == 0);
+ return;
+ }
+
+ int operation = EPOLL_CTL_MOD;
+ if (old_events == 0) {
+ // If it wasn't added before, then this is the first time it's being added.
+ operation = EPOLL_CTL_ADD;
+ }
+ struct epoll_event event;
+ event.events = event_data->events;
+ event.data.ptr = event_data;
+ PCHECK(epoll_ctl(epoll_fd_, operation, event_data->fd, &event) == 0)
+ << ": Failed to " << operation << " epoll fd: " << event_data->fd;
+}
+
} // namespace internal
} // namespace aos
diff --git a/aos/events/epoll.h b/aos/events/epoll.h
index 7bc135c..51a20d0 100644
--- a/aos/events/epoll.h
+++ b/aos/events/epoll.h
@@ -63,25 +63,57 @@
void Quit();
// Registers a function to be called if the fd becomes readable.
- // There should only be 1 function registered for each fd.
+ // Only one function may be registered for readability on each fd.
void OnReadable(int fd, ::std::function<void()> function);
+ // Registers a function to be called if the fd becomes writeable.
+ // Only one function may be registered for writability on each fd.
+ void OnWriteable(int fd, ::std::function<void()> function);
+
// Removes fd from the event loop.
// All Fds must be cleaned up before this class is destroyed.
void DeleteFd(int fd);
+ // Enables calling the existing function registered for fd when it becomes
+ // writeable.
+ void EnableWriteable(int fd) { EnableEvents(fd, kOutEvents); }
+
+ // Disables calling the existing function registered for fd when it becomes
+ // writeable.
+ void DisableWriteable(int fd) { DisableEvents(fd, kOutEvents); }
+
private:
+ // Structure whose pointer should be returned by epoll. Makes looking up the
+ // function fast and easy.
+ struct EventData {
+ EventData(int fd_in) : fd(fd_in) {}
+ // We use pointers to these objects as persistent identifiers, so they can't
+ // be moved.
+ EventData(const EventData &) = delete;
+ EventData &operator=(const EventData &) = delete;
+
+ const int fd;
+ uint32_t events = 0;
+ ::std::function<void()> in_fn, out_fn;
+ };
+
+ void EnableEvents(int fd, uint32_t events);
+ void DisableEvents(int fd, uint32_t events);
+
+ EventData *GetEventData(int fd);
+
+ void DoEpollCtl(EventData *event_data, uint32_t new_events);
+
+ // TODO(Brian): Figure out a nicer way to handle EPOLLPRI than lumping it in
+ // with input.
+ static constexpr uint32_t kInEvents = EPOLLIN | EPOLLPRI;
+ static constexpr uint32_t kOutEvents = EPOLLOUT;
+
::std::atomic<bool> run_{true};
// Main epoll fd.
int epoll_fd_;
- // Structure whose pointer should be returned by epoll. Makes looking up the
- // function fast and easy.
- struct EventData {
- int fd;
- ::std::function<void()> in_fn;
- };
::std::vector<::std::unique_ptr<EventData>> fns_;
// Pipe pair for handling quit.
diff --git a/aos/events/epoll_test.cc b/aos/events/epoll_test.cc
new file mode 100644
index 0000000..4e6bbbc
--- /dev/null
+++ b/aos/events/epoll_test.cc
@@ -0,0 +1,172 @@
+#include "aos/events/epoll.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "gtest/gtest.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace internal {
+namespace testing {
+
+// A simple wrapper around both ends of a pipe along with some helpers to easily
+// read/write data through it.
+class Pipe {
+ public:
+ Pipe() { PCHECK(pipe2(fds_, O_NONBLOCK) == 0); }
+ ~Pipe() {
+ PCHECK(close(fds_[0]) == 0);
+ PCHECK(close(fds_[1]) == 0);
+ }
+
+ int read_fd() { return fds_[0]; }
+ int write_fd() { return fds_[1]; }
+
+ void Write(const std::string &data) {
+ CHECK_EQ(write(write_fd(), data.data(), data.size()),
+ static_cast<ssize_t>(data.size()));
+ }
+
+ std::string Read(size_t size) {
+ std::string result;
+ result.resize(size);
+ CHECK_EQ(read(read_fd(), result.data(), size), static_cast<ssize_t>(size));
+ return result;
+ }
+
+ private:
+ int fds_[2];
+};
+
+class EPollTest : public ::testing::Test {
+ public:
+ void RunFor(std::chrono::nanoseconds duration) {
+ TimerFd timerfd;
+ bool did_quit = false;
+ epoll_.OnReadable(timerfd.fd(), [this, &timerfd, &did_quit]() {
+ CHECK(!did_quit);
+ epoll_.Quit();
+ did_quit = true;
+ timerfd.Read();
+ });
+ timerfd.SetTime(monotonic_clock::now() + duration,
+ monotonic_clock::duration::zero());
+ epoll_.Run();
+ CHECK(did_quit);
+ epoll_.DeleteFd(timerfd.fd());
+ }
+
+ // Tests should avoid relying on ordering for events closer in time than this,
+ // or waiting for longer than this to ensure events happen in order.
+ static constexpr std::chrono::nanoseconds tick_duration() {
+ return std::chrono::milliseconds(50);
+ }
+
+ EPoll epoll_;
+};
+
+// Test that the basics of OnReadable work.
+TEST_F(EPollTest, BasicReadable) {
+ Pipe pipe;
+ bool got_data = false;
+ epoll_.OnReadable(pipe.read_fd(), [&]() {
+ ASSERT_FALSE(got_data);
+ ASSERT_EQ("some", pipe.Read(4));
+ got_data = true;
+ });
+ RunFor(tick_duration());
+ EXPECT_FALSE(got_data);
+
+ pipe.Write("some");
+ RunFor(tick_duration());
+ EXPECT_TRUE(got_data);
+
+ epoll_.DeleteFd(pipe.read_fd());
+}
+
+// Test that the basics of OnWriteable work.
+TEST_F(EPollTest, BasicWriteable) {
+ Pipe pipe;
+ int number_writes = 0;
+ epoll_.OnWriteable(pipe.write_fd(), [&]() {
+ pipe.Write(" ");
+ ++number_writes;
+ });
+
+ // First, fill up the pipe's write buffer.
+ RunFor(tick_duration());
+ EXPECT_GT(number_writes, 0);
+
+ // Now, if we try again, we shouldn't do anything.
+ const int bytes_in_pipe = number_writes;
+ number_writes = 0;
+ RunFor(tick_duration());
+ EXPECT_EQ(number_writes, 0);
+
+ // Empty the pipe, then fill it up again.
+ for (int i = 0; i < bytes_in_pipe; ++i) {
+ ASSERT_EQ(" ", pipe.Read(1));
+ }
+ number_writes = 0;
+ RunFor(tick_duration());
+ EXPECT_EQ(number_writes, bytes_in_pipe);
+
+ epoll_.DeleteFd(pipe.write_fd());
+}
+
+TEST(EPollDeathTest, InvalidFd) {
+ EPoll epoll;
+ Pipe pipe;
+ epoll.OnReadable(pipe.read_fd(), []() {});
+ EXPECT_DEATH(epoll.OnReadable(pipe.read_fd(), []() {}),
+ "Duplicate in functions");
+ epoll.OnWriteable(pipe.read_fd(), []() {});
+ EXPECT_DEATH(epoll.OnWriteable(pipe.read_fd(), []() {}),
+ "Duplicate out functions");
+
+ epoll.DeleteFd(pipe.read_fd());
+ EXPECT_DEATH(epoll.DeleteFd(pipe.read_fd()), "fd [0-9]+ not found");
+ EXPECT_DEATH(epoll.DeleteFd(pipe.write_fd()), "fd [0-9]+ not found");
+}
+
+// Tests that enabling/disabling a writeable FD works.
+TEST_F(EPollTest, WriteableEnableDisable) {
+ Pipe pipe;
+ int number_writes = 0;
+ epoll_.OnWriteable(pipe.write_fd(), [&]() {
+ pipe.Write(" ");
+ ++number_writes;
+ });
+
+ // First, fill up the pipe's write buffer.
+ RunFor(tick_duration());
+ EXPECT_GT(number_writes, 0);
+
+ // Empty the pipe.
+ const int bytes_in_pipe = number_writes;
+ for (int i = 0; i < bytes_in_pipe; ++i) {
+ ASSERT_EQ(" ", pipe.Read(1));
+ }
+
+ // If we disable writeable checking, then nothing should happen.
+ epoll_.DisableWriteable(pipe.write_fd());
+ number_writes = 0;
+ RunFor(tick_duration());
+ EXPECT_EQ(number_writes, 0);
+
+ // Disabling it again should be a NOP.
+ epoll_.DisableWriteable(pipe.write_fd());
+
+ // And then when we re-enable, it should fill the pipe up again.
+ epoll_.EnableWriteable(pipe.write_fd());
+ number_writes = 0;
+ RunFor(tick_duration());
+ EXPECT_EQ(number_writes, bytes_in_pipe);
+
+ epoll_.DeleteFd(pipe.write_fd());
+}
+
+} // namespace testing
+} // namespace internal
+} // namespace aos
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index e368df2..76b83c9 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -71,14 +71,17 @@
}
int EventLoop::ChannelIndex(const Channel *channel) {
- CHECK(configuration_->channels() != nullptr) << ": No channels";
+ return configuration::ChannelIndex(configuration_, channel);
+}
- auto c = std::find(configuration_->channels()->begin(),
- configuration_->channels()->end(), channel);
- CHECK(c != configuration_->channels()->end())
- << ": Channel pointer not found in configuration()->channels()";
-
- return std::distance(configuration()->channels()->begin(), c);
+WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
+ const int channel_index = ChannelIndex(channel);
+ for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
+ if (watcher->channel_index() == channel_index) {
+ return watcher.get();
+ }
+ }
+ LOG(FATAL) << "No watcher found for channel";
}
void EventLoop::NewSender(RawSender *sender) {
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 4a12096..3fe46ec 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -1,4 +1,5 @@
#ifndef AOS_EVENTS_EVENT_LOOP_H_
+
#define AOS_EVENTS_EVENT_LOOP_H_
#include <atomic>
@@ -11,6 +12,7 @@
#include "aos/events/event_loop_generated.h"
#include "aos/events/timing_statistics.h"
#include "aos/flatbuffers.h"
+#include "aos/ipc_lib/data_alignment.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/time/time.h"
#include "aos/util/phased_loop.h"
@@ -141,6 +143,13 @@
// Returns the queue index that this was sent with.
uint32_t sent_queue_index() const { return sent_queue_index_; }
+ // Returns the associated flatbuffers-style allocator. This must be
+ // deallocated before the message is sent.
+ PreallocatedAllocator *fbb_allocator() {
+ fbb_allocator_ = PreallocatedAllocator(data(), size());
+ return &fbb_allocator_;
+ }
+
protected:
EventLoop *event_loop() { return event_loop_; }
@@ -166,6 +175,8 @@
const Channel *channel_;
internal::RawSenderTiming timing_;
+
+ PreallocatedAllocator fbb_allocator_{nullptr, 0};
};
// Fetches the newest message from a channel.
@@ -177,12 +188,26 @@
// Fetches the next message. Returns true if it fetched a new message. This
// method will only return messages sent after the Fetcher was created.
- bool FetchNext() { return fetcher_->FetchNext(); }
+ bool FetchNext() {
+ const bool result = fetcher_->FetchNext();
+ if (result) {
+ CheckChannelDataAlignment(fetcher_->context().data,
+ fetcher_->context().size);
+ }
+ return result;
+ }
// Fetches the most recent message. Returns true if it fetched a new message.
// This will return the latest message regardless of if it was sent before or
// after the fetcher was created.
- bool Fetch() { return fetcher_->Fetch(); }
+ bool Fetch() {
+ const bool result = fetcher_->Fetch();
+ if (result) {
+ CheckChannelDataAlignment(fetcher_->context().data,
+ fetcher_->context().size);
+ }
+ return result;
+ }
// Returns a pointer to the contained flatbuffer, or nullptr if there is no
// available message.
@@ -222,10 +247,17 @@
// builder.Send(t_builder.Finish());
class Builder {
public:
- Builder(RawSender *sender, void *data, size_t size)
- : alloc_(data, size), fbb_(size, &alloc_), sender_(sender) {
+ Builder(RawSender *sender, PreallocatedAllocator *allocator)
+ : fbb_(allocator->size(), allocator), sender_(sender) {
+ CheckChannelDataAlignment(allocator->data(), allocator->size());
fbb_.ForceDefaults(1);
}
+ Builder() {}
+ Builder(const Builder &) = delete;
+ Builder(Builder &&) = default;
+
+ Builder &operator=(const Builder &) = delete;
+ Builder &operator=(Builder &&) = default;
flatbuffers::FlatBufferBuilder *fbb() { return &fbb_; }
@@ -243,7 +275,6 @@
void CheckSent() { fbb_.Finished(); }
private:
- PreallocatedAllocator alloc_;
flatbuffers::FlatBufferBuilder fbb_;
RawSender *sender_;
};
@@ -482,6 +513,16 @@
// Validates that channel exists inside configuration_ and finds its index.
int ChannelIndex(const Channel *channel);
+ // Returns the state for the watcher on the corresponding channel. This
+ // watcher must exist before calling this.
+ WatcherState *GetWatcherState(const Channel *channel);
+
+ // Returns a Sender's protected RawSender
+ template <typename T>
+ static RawSender *GetRawSender(aos::Sender<T> *sender) {
+ return sender->sender_.get();
+ }
+
// Context available for watchers, timers, and phased loops.
Context context_;
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index ea5fd3d..89e80ae 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -55,7 +55,7 @@
virtual void SleepFor(::std::chrono::nanoseconds duration) = 0;
void EnableNodes(std::string_view my_node) {
- std::string json = std::string(R"config({
+ std::string json = R"config({
"channels": [
{
"name": "/aos",
@@ -80,9 +80,12 @@
],
"nodes": [
{
- "name": ")config") +
- std::string(my_node) + R"config(",
+ "name": "me",
"hostname": "myhostname"
+ },
+ {
+ "name": "them",
+ "hostname": "themhostname"
}
]
})config";
@@ -90,17 +93,17 @@
flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
JsonToFlatbuffer(json, Configuration::MiniReflectTypeTable()));
- my_node_ = my_node;
+ my_node_ = configuration::GetNode(&flatbuffer_.message(), my_node);
}
- std::string_view my_node() const { return my_node_; }
+ const Node *my_node() const { return my_node_; }
const Configuration *configuration() { return &flatbuffer_.message(); }
private:
FlatbufferDetachedBuffer<Configuration> flatbuffer_;
- std::string my_node_;
+ const Node *my_node_ = nullptr;
};
class AbstractEventLoopTestBase
@@ -134,7 +137,7 @@
const Configuration *configuration() { return factory_->configuration(); }
- std::string_view my_node() const { return factory_->my_node(); }
+ const Node *my_node() const { return factory_->my_node(); }
// Ends the given event loop at the given time from now.
void EndEventLoop(EventLoop *loop, ::std::chrono::milliseconds duration) {
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index c2c9884..dfb5a6d 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -23,7 +23,7 @@
template <typename T>
typename Sender<T>::Builder Sender<T>::MakeBuilder() {
- return Builder(sender_.get(), sender_->data(), sender_->size());
+ return Builder(sender_.get(), sender_->fbb_allocator());
}
template <typename Watch>
@@ -193,6 +193,7 @@
// context.
void DoCallCallback(std::function<monotonic_clock::time_point()> get_time,
Context context) {
+ CheckChannelDataAlignment(context.data, context.size);
const monotonic_clock::time_point monotonic_start_time = get_time();
{
const float start_latency =
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index b5a530e..57f20ae 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -8,7 +8,7 @@
namespace aos {
EventScheduler::Token EventScheduler::Schedule(
- ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
+ distributed_clock::time_point time, ::std::function<void()> callback) {
return events_list_.emplace(time, callback);
}
@@ -16,9 +16,9 @@
events_list_.erase(token);
}
-void EventScheduler::RunFor(monotonic_clock::duration duration) {
- const ::aos::monotonic_clock::time_point end_time =
- monotonic_now() + duration;
+void EventScheduler::RunFor(distributed_clock::duration duration) {
+ const distributed_clock::time_point end_time =
+ distributed_now() + duration;
is_running_ = true;
for (std::function<void()> &on_run : on_run_) {
on_run();
@@ -26,7 +26,7 @@
on_run_.clear();
while (!events_list_.empty() && is_running_) {
auto iter = events_list_.begin();
- ::aos::monotonic_clock::time_point next_time = iter->first;
+ distributed_clock::time_point next_time = iter->first;
if (next_time > end_time) {
break;
}
@@ -53,4 +53,11 @@
}
}
+std::ostream &operator<<(std::ostream &stream,
+ const aos::distributed_clock::time_point &now) {
+ // Print it the same way we print a monotonic time. Literally.
+ stream << monotonic_clock::time_point(now.time_since_epoch());
+ return stream;
+}
+
} // namespace aos
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 432f4ad..400a307 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -14,15 +14,43 @@
namespace aos {
+// This clock is the basis for distributed time. It is used to synchronize time
+// between multiple nodes. This is a new type so conversions to and from the
+// monotonic and realtime clocks aren't implicit.
+class distributed_clock {
+ public:
+ typedef ::std::chrono::nanoseconds::rep rep;
+ typedef ::std::chrono::nanoseconds::period period;
+ typedef ::std::chrono::nanoseconds duration;
+ typedef ::std::chrono::time_point<distributed_clock> time_point;
+
+ // This clock is the base clock for the simulation and everything is synced to
+ // it. It never jumps.
+ static constexpr bool is_steady = true;
+
+ // Returns the epoch (0).
+ static constexpr time_point epoch() { return time_point(zero()); }
+
+ static constexpr duration zero() { return duration(0); }
+
+ static constexpr time_point min_time{
+ time_point(duration(::std::numeric_limits<duration::rep>::min()))};
+ static constexpr time_point max_time{
+ time_point(duration(::std::numeric_limits<duration::rep>::max()))};
+};
+
+std::ostream &operator<<(std::ostream &stream,
+ const aos::distributed_clock::time_point &now);
+
class EventScheduler {
public:
using ChannelType =
- std::multimap<monotonic_clock::time_point, std::function<void()>>;
+ std::multimap<distributed_clock::time_point, std::function<void()>>;
using Token = ChannelType::iterator;
// Schedule an event with a callback function
// Returns an iterator to the event
- Token Schedule(monotonic_clock::time_point time,
+ Token Schedule(distributed_clock::time_point time,
std::function<void()> callback);
// Schedules a callback when the event scheduler starts.
@@ -38,30 +66,17 @@
// Runs until exited.
void Run();
// Runs for a duration.
- void RunFor(monotonic_clock::duration duration);
+ void RunFor(distributed_clock::duration duration);
void Exit() { is_running_ = false; }
bool is_running() const { return is_running_; }
- monotonic_clock::time_point monotonic_now() const { return now_; }
-
- realtime_clock::time_point realtime_now() const {
- return realtime_clock::time_point(monotonic_now().time_since_epoch() +
- realtime_offset_);
- }
-
- // Sets realtime clock to realtime_now for a given monotonic clock.
- void SetRealtimeOffset(monotonic_clock::time_point monotonic_now,
- realtime_clock::time_point realtime_now) {
- realtime_offset_ =
- realtime_now.time_since_epoch() - monotonic_now.time_since_epoch();
- }
+ distributed_clock::time_point distributed_now() const { return now_; }
private:
// Current execution time.
- monotonic_clock::time_point now_ = monotonic_clock::epoch();
- std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);
+ distributed_clock::time_point now_ = distributed_clock::epoch();
std::vector<std::function<void()>> on_run_;
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index b2be972..efbbcc6 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -68,6 +68,7 @@
"@com_github_google_glog//:glog",
],
)
+
cc_binary(
name = "logger_main",
srcs = [
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 1d1dd8b..80ec8e7 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -31,7 +31,7 @@
reader.Register();
std::unique_ptr<aos::EventLoop> printer_event_loop =
- reader.event_loop_factory()->MakeEventLoop("printer");
+ reader.event_loop_factory()->MakeEventLoop("printer", reader.node());
printer_event_loop->SkipTimingReport();
bool found_channel = false;
@@ -66,7 +66,7 @@
<< aos::FlatbufferToJson(
channel->schema(),
static_cast<const uint8_t *>(message))
- << '\n';
+ << std::endl;
} else {
std::cout << context.realtime_event_time << " ("
<< context.monotonic_event_time << ") "
@@ -75,7 +75,7 @@
<< aos::FlatbufferToJson(
channel->schema(),
static_cast<const uint8_t *>(message))
- << '\n';
+ << std::endl;
}
});
found_channel = true;
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index 2d9604f..c766fe6 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -65,7 +65,7 @@
// Make an eventloop for retrieving stats
std::unique_ptr<aos::EventLoop> stats_event_loop =
- log_reader_factory.MakeEventLoop("logstats");
+ log_reader_factory.MakeEventLoop("logstats", reader.node());
stats_event_loop->SkipTimingReport();
// Read channel info and store in vector
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 51dc10c..2de17d7 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -276,13 +276,16 @@
void LogReader::Register() {
event_loop_factory_unique_ptr_ =
- std::make_unique<SimulatedEventLoopFactory>(configuration(), node());
+ std::make_unique<SimulatedEventLoopFactory>(configuration());
Register(event_loop_factory_unique_ptr_.get());
}
void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
event_loop_factory_ = event_loop_factory;
- event_loop_unique_ptr_ = event_loop_factory_->MakeEventLoop("log_reader");
+ node_event_loop_factory_ =
+ event_loop_factory_->GetNodeEventLoopFactory(node());
+ event_loop_unique_ptr_ =
+ event_loop_factory->MakeEventLoop("log_reader", node());
// We don't run timing reports when trying to print out logged data, because
// otherwise we would end up printing out the timing reports themselves...
// This is only really relevant when we are replaying into a simulation.
@@ -355,8 +358,8 @@
"this.";
// If we have access to the factory, use it to fix the realtime time.
- if (event_loop_factory_ != nullptr) {
- event_loop_factory_->SetRealtimeOffset(
+ if (node_event_loop_factory_ != nullptr) {
+ node_event_loop_factory_->SetRealtimeOffset(
monotonic_clock::time_point(chrono::nanoseconds(
channel_data.message().monotonic_sent_time())),
realtime_clock::time_point(chrono::nanoseconds(
@@ -410,6 +413,7 @@
event_loop_ = nullptr;
event_loop_factory_unique_ptr_.reset();
event_loop_factory_ = nullptr;
+ node_event_loop_factory_ = nullptr;
}
void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
@@ -434,6 +438,9 @@
}
void LogReader::MakeRemappedConfig() {
+ CHECK(!event_loop_)
+ << ": Can't change the mapping after the events are scheduled.";
+
// If no remapping occurred and we are using the original config, then there
// is nothing interesting to do here.
if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 337109b..54b55d8 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -152,6 +152,7 @@
std::vector<std::unique_ptr<RawSender>> channels_;
std::unique_ptr<EventLoop> event_loop_unique_ptr_;
+ NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
EventLoop *event_loop_ = nullptr;
TimerHandler *timer_handler_;
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 5323493..55d0ecc 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -208,8 +208,10 @@
MultinodeLoggerTest()
: config_(aos::configuration::ReadConfig(
"aos/events/logging/multinode_pingpong_config.json")),
- event_loop_factory_(&config_.message(), "pi1"),
- ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
+ event_loop_factory_(&config_.message()),
+ ping_event_loop_(event_loop_factory_.MakeEventLoop(
+ "ping", configuration::GetNode(event_loop_factory_.configuration(),
+ "pi1"))),
ping_(ping_event_loop_.get()) {}
// Config and factory.
@@ -233,8 +235,10 @@
LOG(INFO) << "Logging data to " << logfile;
{
+ const Node *pi1 =
+ configuration::GetNode(event_loop_factory_.configuration(), "pi1");
std::unique_ptr<EventLoop> pong_event_loop =
- event_loop_factory_.MakeEventLoop("pong");
+ event_loop_factory_.MakeEventLoop("pong", pi1);
std::unique_ptr<aos::RawSender> pong_sender(
pong_event_loop->MakeRawSender(aos::configuration::GetChannel(
@@ -262,7 +266,7 @@
DetachedBufferWriter writer(logfile);
std::unique_ptr<EventLoop> logger_event_loop =
- event_loop_factory_.MakeEventLoop("logger");
+ event_loop_factory_.MakeEventLoop("logger", pi1);
event_loop_factory_.RunFor(chrono::milliseconds(95));
@@ -276,20 +280,23 @@
// TODO(austin): Also replay as pi2 or pi3 and make sure we see the pong
// messages. This won't work today yet until the log reading code gets
// significantly better.
- SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration(), reader.node());
+ SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
// This sends out the fetched messages and advances time to the start of the
// log file.
reader.Register(&log_reader_factory);
+ const Node *pi1 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi1");
+
ASSERT_NE(reader.node(), nullptr);
EXPECT_EQ(reader.node()->name()->string_view(), "pi1");
reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
std::unique_ptr<EventLoop> test_event_loop =
- log_reader_factory.MakeEventLoop("test");
+ log_reader_factory.MakeEventLoop("test", pi1);
int ping_count = 10;
int pong_count = 10;
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index cc11520..b4578cb 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -18,6 +18,7 @@
#include "aos/ipc_lib/signalfd.h"
#include "aos/realtime.h"
#include "aos/stl_mutex/stl_mutex.h"
+#include "aos/util/file.h"
#include "aos/util/phased_loop.h"
#include "glog/logging.h"
@@ -72,7 +73,7 @@
size_ = ipc_lib::LocklessQueueMemorySize(config_);
- MkdirP(path);
+ util::MkdirP(path, FLAGS_permissions);
// There are 2 cases. Either the file already exists, or it does not
// already exist and we need to create it. Start by trying to create it. If
@@ -123,24 +124,11 @@
const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
- private:
- void MkdirP(std::string_view path) {
- auto last_slash_pos = path.find_last_of("/");
-
- std::string folder(last_slash_pos == std::string_view::npos
- ? std::string_view("")
- : path.substr(0, last_slash_pos));
- if (folder.empty()) return;
- MkdirP(folder);
- VLOG(1) << "Creating " << folder;
- const int result = mkdir(folder.c_str(), FLAGS_permissions);
- if (result == -1 && errno == EEXIST) {
- VLOG(1) << "Already exists";
- return;
- }
- PCHECK(result == 0) << ": Error creating " << folder;
+ absl::Span<char> GetSharedMemory() const {
+ return absl::Span<char>(static_cast<char *>(data_), size_);
}
+ private:
ipc_lib::LocklessQueueConfiguration config_;
int fd_;
@@ -184,8 +172,8 @@
event_loop->configuration()->channel_storage_duration()))),
lockless_queue_(lockless_queue_memory_.memory(),
lockless_queue_memory_.config()),
- data_storage_(static_cast<AlignedChar *>(aligned_alloc(
- alignof(AlignedChar), channel->max_size())),
+ data_storage_(static_cast<char *>(malloc(channel->max_size() +
+ kChannelDataAlignment - 1)),
&free) {
context_.data = nullptr;
// Point the queue index at the next index to read starting now. This
@@ -217,7 +205,7 @@
actual_queue_index_.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
- &context_.size, reinterpret_cast<char *>(data_storage_.get()));
+ &context_.size, data_storage_start());
if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
context_.queue_index = actual_queue_index_.index();
if (context_.remote_queue_index == 0xffffffffu) {
@@ -229,7 +217,7 @@
if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
context_.realtime_remote_time = context_.realtime_event_time;
}
- context_.data = reinterpret_cast<char *>(data_storage_.get()) +
+ context_.data = data_storage_start() +
lockless_queue_.message_data_size() - context_.size;
actual_queue_index_ = actual_queue_index_.Increment();
}
@@ -267,7 +255,7 @@
queue_index.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
- &context_.size, reinterpret_cast<char *>(data_storage_.get()));
+ &context_.size, data_storage_start());
if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
context_.queue_index = queue_index.index();
if (context_.remote_queue_index == 0xffffffffu) {
@@ -279,7 +267,7 @@
if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
context_.realtime_remote_time = context_.realtime_event_time;
}
- context_.data = reinterpret_cast<char *>(data_storage_.get()) +
+ context_.data = data_storage_start() +
lockless_queue_.message_data_size() - context_.size;
actual_queue_index_ = queue_index.Increment();
}
@@ -314,7 +302,15 @@
void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
+ absl::Span<char> GetSharedMemory() const {
+ return lockless_queue_memory_.GetSharedMemory();
+ }
+
private:
+ char *data_storage_start() {
+ return RoundChannelData(data_storage_.get(), channel_->max_size());
+ }
+
const Channel *const channel_;
MMapedQueue lockless_queue_memory_;
ipc_lib::LocklessQueue lockless_queue_;
@@ -322,14 +318,7 @@
ipc_lib::QueueIndex actual_queue_index_ =
ipc_lib::LocklessQueue::empty_queue_index();
- struct AlignedChar {
- // Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
- // cache lines.
- // V4L2 requires 64 byte alignment for USERPTR.
- alignas(64) char data;
- };
-
- std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
+ std::unique_ptr<char, decltype(&free)> data_storage_;
Context context_;
};
@@ -402,6 +391,10 @@
return true;
}
+ absl::Span<char> GetSharedMemory() const {
+ return lockless_queue_memory_.GetSharedMemory();
+ }
+
private:
MMapedQueue lockless_queue_memory_;
ipc_lib::LocklessQueue lockless_queue_;
@@ -456,6 +449,10 @@
void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
+ absl::Span<char> GetSharedMemory() const {
+ return simple_shm_fetcher_.GetSharedMemory();
+ }
+
private:
bool has_new_data_ = false;
@@ -708,7 +705,8 @@
ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
std::unique_lock<stl_mutex> locker(mutex_);
- event_loops_.erase(std::find(event_loops_.begin(), event_loops_.end(), event_loop));
+ event_loops_.erase(
+ std::find(event_loops_.begin(), event_loops_.end(), event_loop));
if (event_loops_.size() == 0u) {
// The last caller restores the original signal handlers.
@@ -836,6 +834,17 @@
UpdateTimingReport();
}
+absl::Span<char> ShmEventLoop::GetWatcherSharedMemory(const Channel *channel) {
+ internal::WatcherState *const watcher_state =
+ static_cast<internal::WatcherState *>(GetWatcherState(channel));
+ return watcher_state->GetSharedMemory();
+}
+
+absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
+ const aos::RawSender *sender) const {
+ return static_cast<const internal::ShmSender *>(sender)->GetSharedMemory();
+}
+
pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
} // namespace aos
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index 52bb338..fa870b8 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -3,6 +3,8 @@
#include <vector>
+#include "absl/types/span.h"
+
#include "aos/events/epoll.h"
#include "aos/events/event_loop.h"
#include "aos/events/event_loop_generated.h"
@@ -68,8 +70,20 @@
int priority() const override { return priority_; }
+ // Returns the epoll loop used to run the event loop.
internal::EPoll *epoll() { return &epoll_; }
+ // Returns the local mapping of the shared memory used by the watcher on the
+ // specified channel. A watcher must be created on this channel before calling
+ // this.
+ absl::Span<char> GetWatcherSharedMemory(const Channel *channel);
+
+ // Returns the local mapping of the shared memory used by the provided Sender
+ template <typename T>
+ absl::Span<char> GetSenderSharedMemory(aos::Sender<T> *sender) const {
+ return GetShmSenderSharedMemory(GetRawSender(sender));
+ }
+
private:
friend class internal::WatcherState;
friend class internal::TimerHandlerState;
@@ -82,6 +96,9 @@
// Returns the TID of the event loop.
pid_t GetTid() override;
+ // Private method to access the shared memory mapping of a ShmSender
+ absl::Span<char> GetShmSenderSharedMemory(const aos::RawSender *sender) const;
+
std::vector<std::function<void()>> on_run_;
int priority_ = 0;
std::string name_;
@@ -90,7 +107,6 @@
internal::EPoll epoll_;
};
-
} // namespace aos
#endif // AOS_EVENTS_SHM_EVENT_LOOP_H_
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index d33f496..984f978 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -38,7 +38,8 @@
::std::unique_ptr<EventLoop> Make(std::string_view name) override {
if (configuration()->has_nodes()) {
- FLAGS_override_hostname = "myhostname";
+ FLAGS_override_hostname =
+ std::string(my_node()->hostname()->string_view());
}
::std::unique_ptr<ShmEventLoop> loop(new ShmEventLoop(configuration()));
loop->set_name(name);
@@ -47,7 +48,8 @@
::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
if (configuration()->has_nodes()) {
- FLAGS_override_hostname = "myhostname";
+ FLAGS_override_hostname =
+ std::string(my_node()->hostname()->string_view());
}
::std::unique_ptr<ShmEventLoop> loop =
::std::unique_ptr<ShmEventLoop>(new ShmEventLoop(configuration()));
@@ -181,6 +183,34 @@
EXPECT_EQ(times.size(), 2u);
}
+// Test GetWatcherSharedMemory in a few basic scenarios.
+TEST(ShmEventLoopDeathTest, GetWatcherSharedMemory) {
+ ShmEventLoopTestFactory factory;
+ auto generic_loop1 = factory.MakePrimary("primary");
+ ShmEventLoop *const loop1 = static_cast<ShmEventLoop *>(generic_loop1.get());
+ const auto channel = configuration::GetChannel(
+ loop1->configuration(), "/test", TestMessage::GetFullyQualifiedName(),
+ loop1->name(), loop1->node());
+
+ // First verify it handles an invalid channel reasonably.
+ EXPECT_DEATH(loop1->GetWatcherSharedMemory(channel),
+ "No watcher found for channel");
+
+ // Then, actually create a watcher, and verify it returns something sane.
+ loop1->MakeWatcher("/test", [](const TestMessage &) {});
+ EXPECT_FALSE(loop1->GetWatcherSharedMemory(channel).empty());
+}
+
+TEST(ShmEventLoopTest, GetSenderSharedMemory) {
+ ShmEventLoopTestFactory factory;
+ auto generic_loop1 = factory.MakePrimary("primary");
+ ShmEventLoop *const loop1 = static_cast<ShmEventLoop *>(generic_loop1.get());
+
+ // check that GetSenderSharedMemory returns non-null/non-empty memory span
+ auto sender = loop1->MakeSender<TestMessage>("/test");
+ EXPECT_FALSE(loop1->GetSenderSharedMemory(&sender).empty());
+}
+
// TODO(austin): Test that missing a deadline with a timer recovers as expected.
} // namespace testing
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index e34a23f..c857bb6 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -5,6 +5,7 @@
#include <string_view>
#include "absl/container/btree_map.h"
+#include "aos/events/simulated_network_bridge.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/util/phased_loop.h"
@@ -13,19 +14,17 @@
// Container for both a message, and the context for it for simulation. This
// makes tracking the timestamps associated with the data easy.
struct SimulatedMessage {
- // Struct to let us force data to be well aligned.
- struct OveralignedChar {
- char data alignas(64);
- };
-
// Context for the data.
Context context;
// The data.
- char *data() { return reinterpret_cast<char *>(&actual_data[0]); }
+ char *data(size_t buffer_size) {
+ return RoundChannelData(&actual_data[0], buffer_size);
+ }
- // Then the data.
- OveralignedChar actual_data[];
+ // Then the data, including padding on the end so we can align the buffer we
+ // actually return from data().
+ char actual_data[];
};
class SimulatedEventLoop;
@@ -36,6 +35,7 @@
public:
SimulatedWatcher(
SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
+ NodeEventLoopFactory *node_event_loop_factory,
const Channel *channel,
std::function<void(const Context &context, const void *message)> fn);
@@ -59,6 +59,7 @@
SimulatedEventLoop *simulated_event_loop_;
EventHandler<SimulatedWatcher> event_;
EventScheduler *scheduler_;
+ NodeEventLoopFactory *node_event_loop_factory_;
EventScheduler::Token token_;
SimulatedChannel *simulated_channel_ = nullptr;
};
@@ -102,10 +103,6 @@
const Channel *channel() const { return channel_; }
- ::aos::monotonic_clock::time_point monotonic_now() const {
- return scheduler_->monotonic_now();
- }
-
private:
const Channel *channel_;
@@ -126,9 +123,9 @@
// This is a shared_ptr so we don't have to implement refcounting or copying.
std::shared_ptr<SimulatedMessage> MakeSimulatedMessage(size_t size) {
SimulatedMessage *message = reinterpret_cast<SimulatedMessage *>(
- malloc(sizeof(SimulatedMessage) + size));
+ malloc(sizeof(SimulatedMessage) + size + kChannelDataAlignment - 1));
message->context.size = size;
- message->context.data = message->data();
+ message->context.data = message->data(size);
return std::shared_ptr<SimulatedMessage>(message, free);
}
@@ -145,7 +142,7 @@
if (!message_) {
message_ = MakeSimulatedMessage(simulated_channel_->max_size());
}
- return message_->data();
+ return message_->data(simulated_channel_->max_size());
}
size_t size() override { return simulated_channel_->max_size(); }
@@ -186,12 +183,14 @@
message_ = MakeSimulatedMessage(simulated_channel_->max_size());
// Now fill in the message. size is already populated above, and
- // queue_index will be populated in queue_. Put this at the back of the
- // data segment.
- memcpy(message_->data() + simulated_channel_->max_size() - size, msg, size);
+ // queue_index will be populated in simulated_channel_. Put this at the
+ // back of the data segment.
+ memcpy(message_->data(simulated_channel_->max_size()) +
+ simulated_channel_->max_size() - size,
+ msg, size);
- return Send(size, monotonic_remote_time, realtime_remote_time,
- remote_queue_index);
+ return DoSend(size, monotonic_remote_time, realtime_remote_time,
+ remote_queue_index);
}
private:
@@ -204,9 +203,11 @@
class SimulatedFetcher : public RawFetcher {
public:
- explicit SimulatedFetcher(EventLoop *event_loop, SimulatedChannel *queue)
- : RawFetcher(event_loop, queue->channel()), queue_(queue) {}
- ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
+ explicit SimulatedFetcher(EventLoop *event_loop,
+ SimulatedChannel *simulated_channel)
+ : RawFetcher(event_loop, simulated_channel->channel()),
+ simulated_channel_(simulated_channel) {}
+ ~SimulatedFetcher() { simulated_channel_->UnregisterFetcher(this); }
std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
if (msgs_.size() == 0) {
@@ -222,8 +223,8 @@
if (msgs_.size() == 0) {
// TODO(austin): Can we just do this logic unconditionally? It is a lot
// simpler. And call clear, obviously.
- if (!msg_ && queue_->latest_message()) {
- SetMsg(queue_->latest_message());
+ if (!msg_ && simulated_channel_->latest_message()) {
+ SetMsg(simulated_channel_->latest_message());
return std::make_pair(true, event_loop()->monotonic_now());
} else {
return std::make_pair(false, monotonic_clock::min_time);
@@ -260,7 +261,7 @@
msgs_.emplace_back(buffer);
}
- SimulatedChannel *queue_;
+ SimulatedChannel *simulated_channel_;
std::shared_ptr<SimulatedMessage> msg_;
// Messages queued up but not in use.
@@ -270,6 +271,7 @@
class SimulatedTimerHandler : public TimerHandler {
public:
explicit SimulatedTimerHandler(EventScheduler *scheduler,
+ NodeEventLoopFactory *node_event_loop_factory,
SimulatedEventLoop *simulated_event_loop,
::std::function<void()> fn);
~SimulatedTimerHandler() { Disable(); }
@@ -285,6 +287,7 @@
SimulatedEventLoop *simulated_event_loop_;
EventHandler<SimulatedTimerHandler> event_;
EventScheduler *scheduler_;
+ NodeEventLoopFactory *node_event_loop_factory_;
EventScheduler::Token token_;
monotonic_clock::time_point base_;
@@ -294,6 +297,7 @@
class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
public:
SimulatedPhasedLoopHandler(EventScheduler *scheduler,
+ NodeEventLoopFactory *node_event_loop_factory,
SimulatedEventLoop *simulated_event_loop,
::std::function<void(int)> fn,
const monotonic_clock::duration interval,
@@ -309,6 +313,7 @@
EventHandler<SimulatedPhasedLoopHandler> event_;
EventScheduler *scheduler_;
+ NodeEventLoopFactory *node_event_loop_factory_;
EventScheduler::Token token_;
};
@@ -316,6 +321,7 @@
public:
explicit SimulatedEventLoop(
EventScheduler *scheduler,
+ NodeEventLoopFactory *node_event_loop_factory,
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
*channels,
const Configuration *configuration,
@@ -324,6 +330,7 @@
const Node *node, pid_t tid)
: EventLoop(CHECK_NOTNULL(configuration)),
scheduler_(scheduler),
+ node_event_loop_factory_(node_event_loop_factory),
channels_(channels),
raw_event_loops_(raw_event_loops),
node_(node),
@@ -361,11 +368,11 @@
}
::aos::monotonic_clock::time_point monotonic_now() override {
- return scheduler_->monotonic_now();
+ return node_event_loop_factory_->monotonic_now();
}
::aos::realtime_clock::time_point realtime_now() override {
- return scheduler_->realtime_now();
+ return node_event_loop_factory_->realtime_now();
}
::std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
@@ -379,17 +386,17 @@
TimerHandler *AddTimer(::std::function<void()> callback) override {
CHECK(!is_running());
- return NewTimer(::std::unique_ptr<TimerHandler>(
- new SimulatedTimerHandler(scheduler_, this, callback)));
+ return NewTimer(::std::unique_ptr<TimerHandler>(new SimulatedTimerHandler(
+ scheduler_, node_event_loop_factory_, this, callback)));
}
PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset =
::std::chrono::seconds(0)) override {
- return NewPhasedLoop(
- ::std::unique_ptr<PhasedLoopHandler>(new SimulatedPhasedLoopHandler(
- scheduler_, this, callback, interval, offset)));
+ return NewPhasedLoop(::std::unique_ptr<PhasedLoopHandler>(
+ new SimulatedPhasedLoopHandler(scheduler_, node_event_loop_factory_,
+ this, callback, interval, offset)));
}
void OnRun(::std::function<void()> on_run) override {
@@ -433,6 +440,7 @@
pid_t GetTid() override { return tid_; }
EventScheduler *scheduler_;
+ NodeEventLoopFactory *node_event_loop_factory_;
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
*raw_event_loops_;
@@ -459,17 +467,13 @@
}
}
-std::chrono::nanoseconds SimulatedEventLoopFactory::send_delay() const {
- return send_delay_;
-}
-
void SimulatedEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &channel, const void *message)> watcher) {
TakeWatcher(channel);
- std::unique_ptr<SimulatedWatcher> shm_watcher(
- new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
+ std::unique_ptr<SimulatedWatcher> shm_watcher(new SimulatedWatcher(
+ this, scheduler_, node_event_loop_factory_, channel, std::move(watcher)));
GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
NewWatcher(std::move(shm_watcher));
@@ -511,12 +515,13 @@
SimulatedWatcher::SimulatedWatcher(
SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
- const Channel *channel,
+ NodeEventLoopFactory *node_event_loop_factory, const Channel *channel,
std::function<void(const Context &context, const void *message)> fn)
: WatcherState(simulated_event_loop, channel, std::move(fn)),
simulated_event_loop_(simulated_event_loop),
event_(this),
scheduler_(scheduler),
+ node_event_loop_factory_(node_event_loop_factory),
token_(scheduler_->InvalidToken()) {}
SimulatedWatcher::~SimulatedWatcher() {
@@ -574,9 +579,10 @@
}
void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
- token_ =
- scheduler_->Schedule(event_time + simulated_event_loop_->send_delay(),
- [this]() { simulated_event_loop_->HandleEvent(); });
+ token_ = scheduler_->Schedule(
+ node_event_loop_factory_->ToDistributedClock(
+ event_time + simulated_event_loop_->send_delay()),
+ [this]() { simulated_event_loop_->HandleEvent(); });
}
void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
@@ -600,8 +606,8 @@
uint32_t SimulatedChannel::Send(std::shared_ptr<SimulatedMessage> message) {
const uint32_t queue_index = next_queue_index_.index();
message->context.queue_index = queue_index;
- message->context.data =
- message->data() + channel()->max_size() - message->context.size;
+ message->context.data = message->data(channel()->max_size()) +
+ channel()->max_size() - message->context.size;
next_queue_index_ = next_queue_index_.Increment();
latest_message_ = message;
@@ -622,12 +628,13 @@
}
SimulatedTimerHandler::SimulatedTimerHandler(
- EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
- ::std::function<void()> fn)
+ EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
+ SimulatedEventLoop *simulated_event_loop, ::std::function<void()> fn)
: TimerHandler(simulated_event_loop, std::move(fn)),
simulated_event_loop_(simulated_event_loop),
event_(this),
scheduler_(scheduler),
+ node_event_loop_factory_(node_event_loop_factory),
token_(scheduler_->InvalidToken()) {}
void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
@@ -639,10 +646,12 @@
repeat_offset_ = repeat_offset;
if (base < monotonic_now) {
token_ = scheduler_->Schedule(
- monotonic_now, [this]() { simulated_event_loop_->HandleEvent(); });
+ node_event_loop_factory_->ToDistributedClock(monotonic_now),
+ [this]() { simulated_event_loop_->HandleEvent(); });
} else {
token_ = scheduler_->Schedule(
- base, [this]() { simulated_event_loop_->HandleEvent(); });
+ node_event_loop_factory_->ToDistributedClock(base),
+ [this]() { simulated_event_loop_->HandleEvent(); });
}
event_.set_event_time(base_);
simulated_event_loop_->AddEvent(&event_);
@@ -655,7 +664,8 @@
// Reschedule.
while (base_ <= monotonic_now) base_ += repeat_offset_;
token_ = scheduler_->Schedule(
- base_, [this]() { simulated_event_loop_->HandleEvent(); });
+ node_event_loop_factory_->ToDistributedClock(base_),
+ [this]() { simulated_event_loop_->HandleEvent(); });
event_.set_event_time(base_);
simulated_event_loop_->AddEvent(&event_);
} else {
@@ -674,13 +684,15 @@
}
SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
- EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
- ::std::function<void(int)> fn, const monotonic_clock::duration interval,
+ EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
+ SimulatedEventLoop *simulated_event_loop, ::std::function<void(int)> fn,
+ const monotonic_clock::duration interval,
const monotonic_clock::duration offset)
: PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
simulated_event_loop_(simulated_event_loop),
event_(this),
scheduler_(scheduler),
+ node_event_loop_factory_(node_event_loop_factory),
token_(scheduler_->InvalidToken()) {}
SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
@@ -702,51 +714,80 @@
void SimulatedPhasedLoopHandler::Schedule(
monotonic_clock::time_point sleep_time) {
token_ = scheduler_->Schedule(
- sleep_time, [this]() { simulated_event_loop_->HandleEvent(); });
+ node_event_loop_factory_->ToDistributedClock(sleep_time),
+ [this]() { simulated_event_loop_->HandleEvent(); });
event_.set_event_time(sleep_time);
simulated_event_loop_->AddEvent(&event_);
}
+NodeEventLoopFactory::NodeEventLoopFactory(
+ EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
+ const Node *node,
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ *raw_event_loops)
+ : scheduler_(scheduler),
+ factory_(factory),
+ node_(node),
+ raw_event_loops_(raw_event_loops) {}
+
SimulatedEventLoopFactory::SimulatedEventLoopFactory(
const Configuration *configuration)
- : configuration_(CHECK_NOTNULL(configuration)), node_(nullptr) {
- CHECK(!configuration_->has_nodes())
- << ": Got a configuration with multiple nodes and no node was selected.";
-}
-
-SimulatedEventLoopFactory::SimulatedEventLoopFactory(
- const Configuration *configuration, std::string_view node_name)
- : SimulatedEventLoopFactory(
- configuration, configuration::GetNode(configuration, node_name)) {}
-
-SimulatedEventLoopFactory::SimulatedEventLoopFactory(
- const Configuration *configuration, const Node *node)
- : configuration_(CHECK_NOTNULL(configuration)), node_(node) {
- if (node != nullptr) {
- CHECK(configuration_->has_nodes())
- << ": Got a configuration with no nodes and node \""
- << node->name()->string_view() << "\" was selected.";
- bool found = false;
- for (const Node *node : *configuration_->nodes()) {
- if (node == node_) {
- found = true;
- break;
- }
+ : configuration_(CHECK_NOTNULL(configuration)) {
+ if (configuration::MultiNode(configuration_)) {
+ for (const Node *node : *configuration->nodes()) {
+ nodes_.emplace_back(node);
}
- CHECK(found) << ": node must be a pointer in the configuration.";
+ } else {
+ nodes_.emplace_back(nullptr);
+ }
+
+ for (const Node *node : nodes_) {
+ node_factories_.emplace_back(
+ new NodeEventLoopFactory(&scheduler_, this, node, &raw_event_loops_));
+ }
+
+ if (configuration::MultiNode(configuration)) {
+ bridge_ = std::make_unique<message_bridge::SimulatedMessageBridge>(this);
}
}
SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
+NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
+ const Node *node) {
+ auto result = std::find_if(
+ node_factories_.begin(), node_factories_.end(),
+ [node](const std::unique_ptr<NodeEventLoopFactory> &node_factory) {
+ return node_factory->node() == node;
+ });
+
+ CHECK(result != node_factories_.end())
+ << ": Failed to find node " << FlatbufferToJson(node);
+
+ return result->get();
+}
+
::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
+ std::string_view name, const Node *node) {
+ if (node == nullptr) {
+ CHECK(!configuration::MultiNode(configuration()))
+ << ": Can't make a single node event loop in a multi-node world.";
+ } else {
+ CHECK(configuration::MultiNode(configuration()))
+ << ": Can't make a multi-node event loop in a single-node world.";
+ }
+ return GetNodeEventLoopFactory(node)->MakeEventLoop(name);
+}
+
+::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
std::string_view name) {
pid_t tid = tid_;
++tid_;
::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
- &scheduler_, &channels_, configuration_, &raw_event_loops_, node_, tid));
+ scheduler_, this, &channels_, factory_->configuration(), raw_event_loops_,
+ node_, tid));
result->set_name(name);
- result->set_send_delay(send_delay_);
+ result->set_send_delay(factory_->send_delay());
return std::move(result);
}
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index de37c03..8cff0d7 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -23,66 +23,175 @@
// Class for simulated fetchers.
class SimulatedChannel;
+class NodeEventLoopFactory;
+namespace message_bridge {
+class SimulatedMessageBridge;
+}
+
+// There are 2 concepts needed to support multi-node simulations.
+// 1) The node. This is implemented with NodeEventLoopFactory.
+// 2) The "robot" which runs multiple nodes. This is implemented with
+// SimulatedEventLoopFactory.
+//
+// To make things easier, SimulatedEventLoopFactory takes an optional Node
+// argument if you want to make event loops without interacting with the
+// NodeEventLoopFactory object.
+//
+// The basic flow goes something like as follows:
+//
+// SimulatedEventLoopFactory factory(config);
+// const Node *pi1 = configuration::GetNode(factory.configuration(), "pi1");
+// std::unique_ptr<EventLoop> event_loop = factory.MakeEventLoop("ping", pi1);
+//
+// Or
+//
+// SimulatedEventLoopFactory factory(config);
+// const Node *pi1 = configuration::GetNode(factory.configuration(), "pi1");
+// NodeEventLoopFactory *pi1_factory = factory.GetNodeEventLoopFactory(pi1);
+// std::unique_ptr<EventLoop> event_loop = pi1_factory.MakeEventLoop("ping");
+//
+// The distributed_clock is used to be the base time. NodeEventLoopFactory has
+// all the information needed to adjust both the realtime and monotonic clocks
+// relative to the distributed_clock.
class SimulatedEventLoopFactory {
public:
// Constructs a SimulatedEventLoopFactory with the provided configuration.
// This configuration must remain in scope for the lifetime of the factory and
// all sub-objects.
SimulatedEventLoopFactory(const Configuration *configuration);
- SimulatedEventLoopFactory(const Configuration *configuration,
- std::string_view node_name);
- SimulatedEventLoopFactory(const Configuration *configuration,
- const Node *node);
~SimulatedEventLoopFactory();
- ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
+ // Creates an event loop. If running in a multi-node environment, node needs
+ // to point to the node to create this event loop on.
+ ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name,
+ const Node *node = nullptr);
+
+ // Returns the NodeEventLoopFactory for the provided node. The returned
+ // NodeEventLoopFactory is owned by the SimulatedEventLoopFactory and has a
+ // lifetime identical to the factory.
+ NodeEventLoopFactory *GetNodeEventLoopFactory(const Node *node);
// Starts executing the event loops unconditionally.
void Run();
// Executes the event loops for a duration.
- void RunFor(monotonic_clock::duration duration);
+ void RunFor(distributed_clock::duration duration);
// Stops executing all event loops. Meant to be called from within an event
// loop handler.
void Exit() { scheduler_.Exit(); }
- // Sets the simulated send delay for the factory.
+ const std::vector<const Node *> &nodes() const { return nodes_; }
+
+ // Sets the simulated send delay for all messages sent within a single node.
void set_send_delay(std::chrono::nanoseconds send_delay);
- std::chrono::nanoseconds send_delay() const;
+ std::chrono::nanoseconds send_delay() const { return send_delay_; }
+
+ // Sets the simulated network delay for messages forwarded between nodes.
+ void set_network_delay(std::chrono::nanoseconds network_delay);
+ std::chrono::nanoseconds network_delay() const { return network_delay_; }
+
+ // Returns the clock used to synchronize the nodes.
+ distributed_clock::time_point distributed_now() const {
+ return scheduler_.distributed_now();
+ }
+
+ // Returns the configuration used for everything.
+ const Configuration *configuration() const { return configuration_; }
+
+ private:
+ const Configuration *const configuration_;
+ EventScheduler scheduler_;
+ // List of event loops to manage running and not running for.
+ // The function is a callback used to set and clear the running bool on each
+ // event loop.
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ raw_event_loops_;
+
+ std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+ std::chrono::nanoseconds network_delay_ = std::chrono::microseconds(100);
+
+ std::vector<std::unique_ptr<NodeEventLoopFactory>> node_factories_;
+
+ std::vector<const Node *> nodes_;
+
+ std::unique_ptr<message_bridge::SimulatedMessageBridge> bridge_;
+};
+
+// This class holds all the state required to be a single node.
+class NodeEventLoopFactory {
+ public:
+ ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
// Returns the node that this factory is running as, or nullptr if this is a
// single node setup.
const Node *node() const { return node_; }
- monotonic_clock::time_point monotonic_now() const {
- return scheduler_.monotonic_now();
- }
- realtime_clock::time_point realtime_now() const {
- return scheduler_.realtime_now();
- }
-
// Sets realtime clock to realtime_now for a given monotonic clock.
void SetRealtimeOffset(monotonic_clock::time_point monotonic_now,
realtime_clock::time_point realtime_now) {
- scheduler_.SetRealtimeOffset(monotonic_now, realtime_now);
+ realtime_offset_ =
+ realtime_now.time_since_epoch() - monotonic_now.time_since_epoch();
}
- private:
- const Configuration *const configuration_;
- EventScheduler scheduler_;
- // Map from name, type to queue.
- absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
- // List of event loops to manage running and not running for.
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- raw_event_loops_;
+ // Returns the current time on both clocks.
+ inline monotonic_clock::time_point monotonic_now() const;
+ inline realtime_clock::time_point realtime_now() const;
- std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+ // Returns the simulated network delay for messages forwarded between nodes.
+ std::chrono::nanoseconds network_delay() const {
+ return factory_->network_delay();
+ }
+ // Returns the simulated send delay for all messages sent within a single
+ // node.
+ std::chrono::nanoseconds send_delay() const { return factory_->send_delay(); }
+
+ // Converts a time to the distributed clock for scheduling and cross-node time
+ // measurement.
+ inline distributed_clock::time_point ToDistributedClock(
+ monotonic_clock::time_point time) const;
+
+ private:
+ friend class SimulatedEventLoopFactory;
+ NodeEventLoopFactory(
+ EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
+ const Node *node,
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ *raw_event_loops);
+
+ EventScheduler *const scheduler_;
+ SimulatedEventLoopFactory *const factory_;
const Node *const node_;
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ *const raw_event_loops_;
+
+ std::chrono::nanoseconds monotonic_offset_ = std::chrono::seconds(0);
+ std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);
+
+ // Map from name, type to queue.
+ absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
+
+ // pid so we get unique timing reports.
pid_t tid_ = 0;
};
+inline monotonic_clock::time_point NodeEventLoopFactory::monotonic_now() const {
+ return monotonic_clock::time_point(
+ factory_->distributed_now().time_since_epoch() + monotonic_offset_);
+}
+
+inline realtime_clock::time_point NodeEventLoopFactory::realtime_now() const {
+ return realtime_clock::time_point(monotonic_now().time_since_epoch() +
+ realtime_offset_);
+}
+
+inline distributed_clock::time_point NodeEventLoopFactory::ToDistributedClock(
+ monotonic_clock::time_point time) const {
+ return distributed_clock::time_point(time.time_since_epoch() -
+ monotonic_offset_);
+}
+
} // namespace aos
#endif // AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 4328d6f..de5cbae 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -3,6 +3,8 @@
#include <string_view>
#include "aos/events/event_loop_param_test.h"
+#include "aos/events/ping_lib.h"
+#include "aos/events/pong_lib.h"
#include "aos/events/test_message_generated.h"
#include "gtest/gtest.h"
@@ -15,11 +17,11 @@
public:
::std::unique_ptr<EventLoop> Make(std::string_view name) override {
MaybeMake();
- return event_loop_factory_->MakeEventLoop(name);
+ return event_loop_factory_->MakeEventLoop(name, my_node());
}
::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
MaybeMake();
- return event_loop_factory_->MakeEventLoop(name);
+ return event_loop_factory_->MakeEventLoop(name, my_node());
}
void Run() override { event_loop_factory_->Run(); }
@@ -38,8 +40,8 @@
void MaybeMake() {
if (!event_loop_factory_) {
if (configuration()->has_nodes()) {
- event_loop_factory_ = std::make_unique<SimulatedEventLoopFactory>(
- configuration(), my_node());
+ event_loop_factory_ =
+ std::make_unique<SimulatedEventLoopFactory>(configuration());
} else {
event_loop_factory_ =
std::make_unique<SimulatedEventLoopFactory>(configuration());
@@ -64,12 +66,13 @@
int counter = 0;
EventScheduler scheduler;
- scheduler.Schedule(::aos::monotonic_clock::now(),
- [&counter]() { counter += 1; });
+ scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(1),
+ [&counter]() { counter += 1; });
scheduler.Run();
EXPECT_EQ(counter, 1);
- auto token = scheduler.Schedule(::aos::monotonic_clock::now(),
- [&counter]() { counter += 1; });
+ auto token =
+ scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(2),
+ [&counter]() { counter += 1; });
scheduler.Deschedule(token);
scheduler.Run();
EXPECT_EQ(counter, 1);
@@ -80,8 +83,9 @@
int counter = 0;
EventScheduler scheduler;
- auto token = scheduler.Schedule(::aos::monotonic_clock::now(),
- [&counter]() { counter += 1; });
+ auto token =
+ scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(1),
+ [&counter]() { counter += 1; });
scheduler.Deschedule(token);
scheduler.Run();
EXPECT_EQ(counter, 0);
@@ -100,8 +104,6 @@
simulated_event_loop_factory.RunFor(chrono::seconds(1));
EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
- simulated_event_loop_factory.monotonic_now());
- EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
event_loop->monotonic_now());
}
@@ -125,8 +127,6 @@
simulated_event_loop_factory.RunFor(chrono::seconds(1));
EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
- simulated_event_loop_factory.monotonic_now());
- EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
event_loop->monotonic_now());
EXPECT_EQ(counter, 10);
}
@@ -232,5 +232,45 @@
0.0);
}
+// Tests that ping and pong work when on 2 different nodes.
+TEST(SimulatedEventLoopTest, MultinodePingPong) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ "aos/events/multinode_pingpong_config.json");
+ const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+ const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+
+ SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+
+ std::unique_ptr<EventLoop> ping_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+ Ping ping(ping_event_loop.get());
+
+ std::unique_ptr<EventLoop> pong_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+ Pong pong(pong_event_loop.get());
+
+ std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+
+ int pi2_pong_count = 0;
+ pi2_pong_counter_event_loop->MakeWatcher(
+ "/test",
+ [&pi2_pong_count](const examples::Pong & /*pong*/) { ++pi2_pong_count; });
+
+ std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+ int pi1_pong_count = 0;
+ pi1_pong_counter_event_loop->MakeWatcher(
+ "/test",
+ [&pi1_pong_count](const examples::Pong & /*pong*/) { ++pi1_pong_count; });
+
+ simulated_event_loop_factory.RunFor(chrono::seconds(10) +
+ chrono::milliseconds(5));
+
+ EXPECT_EQ(pi1_pong_count, 1001);
+ EXPECT_EQ(pi2_pong_count, 1001);
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
new file mode 100644
index 0000000..57f2efd
--- /dev/null
+++ b/aos/events/simulated_network_bridge.cc
@@ -0,0 +1,165 @@
+#include "aos/events/simulated_network_bridge.h"
+
+#include "aos/events/event_loop.h"
+#include "aos/events/simulated_event_loop.h"
+
+namespace aos {
+namespace message_bridge {
+
+// This class delays messages forwarded between two factories.
+//
+// The basic design is that we need to use the distributed_clock to convert
+// monotonic times from the source to the destination node. We also use a
+// fetcher to manage the queue of data, and a timer to schedule the sends.
+class RawMessageDelayer {
+ public:
+ RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
+ aos::NodeEventLoopFactory *send_node_factory,
+ aos::EventLoop *send_event_loop,
+ std::unique_ptr<aos::RawFetcher> fetcher,
+ std::unique_ptr<aos::RawSender> sender)
+ : fetch_node_factory_(fetch_node_factory),
+ send_node_factory_(send_node_factory),
+ send_event_loop_(send_event_loop),
+ fetcher_(std::move(fetcher)),
+ sender_(std::move(sender)) {
+ timer_ = send_event_loop_->AddTimer([this]() { Send(); });
+
+ Schedule();
+ }
+
+ // Kicks us to re-fetch and schedule the timer.
+ void Schedule() {
+ if (fetcher_->context().data == nullptr || sent_) {
+ sent_ = !fetcher_->FetchNext();
+ }
+
+ if (fetcher_->context().data == nullptr) {
+ return;
+ }
+
+ if (sent_) {
+ return;
+ }
+
+ // Compute the time to publish this message.
+ const monotonic_clock::time_point monotonic_delivered_time =
+ DeliveredTime(fetcher_->context());
+
+ CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
+ << ": Trying to deliver message in the past...";
+
+ timer_->Setup(monotonic_delivered_time);
+ }
+
+ private:
+ // Acutally sends the message, and reschedules.
+ void Send() {
+ // Compute the time to publish this message.
+ const monotonic_clock::time_point monotonic_delivered_time =
+ DeliveredTime(fetcher_->context());
+
+ CHECK_EQ(monotonic_delivered_time, send_node_factory_->monotonic_now())
+ << ": Message to be sent at the wrong time.";
+
+ // And also fill out the send times as well.
+ sender_->Send(fetcher_->context().data, fetcher_->context().size,
+ fetcher_->context().monotonic_event_time,
+ fetcher_->context().realtime_event_time,
+ fetcher_->context().queue_index);
+
+ sent_ = true;
+ Schedule();
+ }
+
+ // Converts from time on the sending node to time on the receiving node.
+ monotonic_clock::time_point DeliveredTime(const Context &context) const {
+ const distributed_clock::time_point distributed_sent_time =
+ fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
+
+ return aos::monotonic_clock::epoch() +
+ (distributed_sent_time - send_node_factory_->ToDistributedClock(
+ aos::monotonic_clock::epoch())) +
+ send_node_factory_->network_delay() +
+ send_node_factory_->send_delay();
+ }
+
+ // Factories used for time conversion.
+ aos::NodeEventLoopFactory *fetch_node_factory_;
+ aos::NodeEventLoopFactory *send_node_factory_;
+
+ // Event loop which sending is scheduled on.
+ aos::EventLoop *send_event_loop_;
+ // Timer used to send.
+ aos::TimerHandler *timer_;
+ // Fetcher used to receive messages.
+ std::unique_ptr<aos::RawFetcher> fetcher_;
+ // Sender to send them back out.
+ std::unique_ptr<aos::RawSender> sender_;
+ // True if we have sent the message in the fetcher.
+ bool sent_ = false;
+};
+
+SimulatedMessageBridge::SimulatedMessageBridge(
+ SimulatedEventLoopFactory *simulated_event_loop_factory) {
+ CHECK(
+ configuration::MultiNode(simulated_event_loop_factory->configuration()));
+
+ // Pre-build up event loops for every node. They are pretty cheap anyways.
+ for (const Node *node : simulated_event_loop_factory->nodes()) {
+ CHECK(event_loop_map_
+ .insert({node, simulated_event_loop_factory->MakeEventLoop(
+ "message_bridge", node)})
+ .second);
+ }
+
+ for (const Channel *channel :
+ *simulated_event_loop_factory->configuration()->channels()) {
+ if (!channel->has_destination_nodes()) {
+ continue;
+ }
+
+ // Find the sending node.
+ const Node *node =
+ configuration::GetNode(simulated_event_loop_factory->configuration(),
+ channel->source_node()->string_view());
+ auto source_event_loop = event_loop_map_.find(node);
+ CHECK(source_event_loop != event_loop_map_.end());
+
+ std::unique_ptr<DelayersVector> delayers =
+ std::make_unique<DelayersVector>();
+
+ // And then build up a RawMessageDelayer for each destination.
+ for (const Connection *connection : *channel->destination_nodes()) {
+ const Node *destination_node =
+ configuration::GetNode(simulated_event_loop_factory->configuration(),
+ connection->name()->string_view());
+ auto destination_event_loop = event_loop_map_.find(destination_node);
+ CHECK(destination_event_loop != event_loop_map_.end());
+
+ delayers->emplace_back(std::make_unique<RawMessageDelayer>(
+ simulated_event_loop_factory->GetNodeEventLoopFactory(node),
+ simulated_event_loop_factory->GetNodeEventLoopFactory(
+ destination_node),
+ destination_event_loop->second.get(),
+ source_event_loop->second->MakeRawFetcher(channel),
+ destination_event_loop->second->MakeRawSender(channel)));
+ }
+
+ // And register every delayer to be poked when a new message shows up.
+ source_event_loop->second->MakeRawWatcher(
+ channel,
+ [captured_delayers = delayers.get()](const Context &, const void *) {
+ for (std::unique_ptr<RawMessageDelayer> &delayer :
+ *captured_delayers) {
+ delayer->Schedule();
+ }
+ });
+ delayers_list_.emplace_back(std::move(delayers));
+ }
+}
+
+SimulatedMessageBridge::~SimulatedMessageBridge() {}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
new file mode 100644
index 0000000..5d613ab
--- /dev/null
+++ b/aos/events/simulated_network_bridge.h
@@ -0,0 +1,36 @@
+#ifndef AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
+#define AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
+
+#include "aos/events/event_loop.h"
+#include "aos/events/simulated_event_loop.h"
+
+namespace aos {
+namespace message_bridge {
+
+class RawMessageDelayer;
+
+// This class moves messages between nodes. It is implemented as a separate
+// class because it would have been even harder to manage forwarding in the
+// SimulatedEventLoopFactory.
+class SimulatedMessageBridge {
+ public:
+ // Constructs the bridge.
+ SimulatedMessageBridge(
+ SimulatedEventLoopFactory *simulated_event_loop_factory);
+ ~SimulatedMessageBridge();
+
+ private:
+ // Map of nodes to event loops. This is a member variable so that the
+ // lifetime of the event loops matches the lifetime of the bridge.
+ std::map<const Node *, std::unique_ptr<aos::EventLoop>> event_loop_map_;
+
+
+ // List of delayers used to resend the messages.
+ using DelayersVector = std::vector<std::unique_ptr<RawMessageDelayer>>;
+ std::vector<std::unique_ptr<DelayersVector>> delayers_list_;
+};
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index c1c0db0..d9fcab6 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -12,6 +12,8 @@
// This class is a base class for all sizes of array backed allocators.
class FixedAllocatorBase : public flatbuffers::Allocator {
public:
+ ~FixedAllocatorBase() override { CHECK(!is_allocated_); }
+
// TODO(austin): Read the contract for these.
uint8_t *allocate(size_t) override;
@@ -25,6 +27,7 @@
virtual size_t size() const = 0;
void Reset() { is_allocated_ = false; }
+ bool is_allocated() const { return is_allocated_; }
private:
bool is_allocated_ = false;
@@ -51,14 +54,32 @@
class PreallocatedAllocator : public FixedAllocatorBase {
public:
PreallocatedAllocator(void *data, size_t size) : data_(data), size_(size) {}
- uint8_t *data() override { return reinterpret_cast<uint8_t *>(data_); }
- const uint8_t *data() const override {
- return reinterpret_cast<const uint8_t *>(data_);
+ PreallocatedAllocator(const PreallocatedAllocator&) = delete;
+ PreallocatedAllocator(PreallocatedAllocator &&other)
+ : data_(other.data_), size_(other.size_) {
+ CHECK(!is_allocated());
+ CHECK(!other.is_allocated());
}
- size_t size() const override { return size_; }
+
+ PreallocatedAllocator &operator=(const PreallocatedAllocator &) = delete;
+ PreallocatedAllocator &operator=(PreallocatedAllocator &&other) {
+ CHECK(!is_allocated());
+ CHECK(!other.is_allocated());
+ data_ = other.data_;
+ size_ = other.size_;
+ return *this;
+ }
+
+ uint8_t *data() final {
+ return reinterpret_cast<uint8_t *>(CHECK_NOTNULL(data_));
+ }
+ const uint8_t *data() const final {
+ return reinterpret_cast<const uint8_t *>(CHECK_NOTNULL(data_));
+ }
+ size_t size() const final { return size_; }
private:
- void* data_;
+ void *data_;
size_t size_;
};
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 57adc6d..91b050c 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -204,6 +204,7 @@
visibility = ["//visibility:public"],
deps = [
":aos_sync",
+ ":data_alignment",
":index",
"//aos:realtime",
"//aos/time",
@@ -259,3 +260,14 @@
"//aos/testing:test_logging",
],
)
+
+cc_library(
+ name = "data_alignment",
+ hdrs = [
+ "data_alignment.h",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "@com_github_google_glog//:glog",
+ ],
+)
diff --git a/aos/ipc_lib/data_alignment.h b/aos/ipc_lib/data_alignment.h
new file mode 100644
index 0000000..2f59b78
--- /dev/null
+++ b/aos/ipc_lib/data_alignment.h
@@ -0,0 +1,41 @@
+#ifndef AOS_IPC_LIB_DATA_ALIGNMENT_H_
+#define AOS_IPC_LIB_DATA_ALIGNMENT_H_
+
+#include "glog/logging.h"
+
+namespace aos {
+
+// All data buffers sent over or received from a channel will guarantee this
+// alignment for their end. Flatbuffers aligns from the end, so this is what
+// matters.
+//
+// 64 is a reasonable choice for now:
+// Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
+// cache lines.
+// V4L2 requires 64 byte alignment for USERPTR buffers.
+static constexpr size_t kChannelDataAlignment = 64;
+
+template <typename T>
+inline void CheckChannelDataAlignment(T *data, size_t size) {
+ CHECK_EQ((reinterpret_cast<uintptr_t>(data) + size) % kChannelDataAlignment,
+ 0u)
+ << ": data pointer is not end aligned as it should be: " << data << " + "
+ << size;
+}
+
+// Aligns the beginning of a channel data buffer. There must be
+// kChannelDataAlignment-1 extra bytes beyond the end to potentially use after
+// aligning it.
+inline char *RoundChannelData(char *data, size_t size) {
+ const uintptr_t data_value = reinterpret_cast<uintptr_t>(data);
+ const uintptr_t data_end = data_value + size;
+ const uintptr_t data_end_max = data_end + (kChannelDataAlignment - 1);
+ const uintptr_t rounded_data_end =
+ data_end_max - (data_end_max % kChannelDataAlignment);
+ const uintptr_t rounded_data = rounded_data_end - size;
+ return reinterpret_cast<char *>(rounded_data);
+}
+
+} // namespace aos
+
+#endif // AOS_IPC_LIB_DATA_ALIGNMENT_H_
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 903150b..c323b8b 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -241,7 +241,8 @@
size_t LocklessQueueConfiguration::message_size() const {
// Round up the message size so following data is aligned appropriately.
- return LocklessQueueMemory::AlignmentRoundUp(message_data_size) +
+ return LocklessQueueMemory::AlignmentRoundUp(message_data_size +
+ (kChannelDataAlignment - 1)) +
sizeof(Message);
}
@@ -549,7 +550,7 @@
Message *message = memory_->GetMessage(scratch_index);
message->header.queue_index.Invalidate();
- return &message->data[0];
+ return message->data(memory_->message_data_size());
}
void LocklessQueue::Sender::Send(
@@ -788,7 +789,7 @@
}
*monotonic_remote_time = m->header.monotonic_remote_time;
*realtime_remote_time = m->header.realtime_remote_time;
- memcpy(data, &m->data[0], message_data_size());
+ memcpy(data, m->data(memory_->message_data_size()), message_data_size());
*length = m->header.length;
// And finally, confirm that the message *still* points to the queue index we
@@ -891,8 +892,9 @@
::std::cout << " }" << ::std::endl;
::std::cout << " data: {";
+ const char *const m_data = m->data(memory->message_data_size());
for (size_t j = 0; j < m->header.length; ++j) {
- char data = m->data[j];
+ char data = m_data[j];
if (j != 0) {
::std::cout << " ";
}
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 976f758..0384aa8 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -7,6 +7,7 @@
#include <vector>
#include "aos/ipc_lib/aos_sync.h"
+#include "aos/ipc_lib/data_alignment.h"
#include "aos/ipc_lib/index.h"
#include "aos/time/time.h"
@@ -76,7 +77,19 @@
size_t length;
} header;
- char data[];
+ char *data(size_t message_size) { return RoundedData(message_size); }
+ const char *data(size_t message_size) const {
+ return RoundedData(message_size);
+ }
+
+ private:
+ // This returns a non-const pointer into a const object. Be very careful about
+ // const correctness in publicly accessible APIs using it.
+ char *RoundedData(size_t message_size) const {
+ return RoundChannelData(const_cast<char *>(&data_pointer[0]), message_size);
+ }
+
+ char data_pointer[];
};
struct LocklessQueueConfiguration {
diff --git a/aos/ipc_lib/lockless_queue_memory.h b/aos/ipc_lib/lockless_queue_memory.h
index 0c0973c..cbe76a7 100644
--- a/aos/ipc_lib/lockless_queue_memory.h
+++ b/aos/ipc_lib/lockless_queue_memory.h
@@ -89,18 +89,24 @@
// Getters for each of the 4 lists.
Sender *GetSender(size_t sender_index) {
+ static_assert(alignof(Sender) <= kDataAlignment,
+ "kDataAlignment is too small");
return reinterpret_cast<Sender *>(&data[0] + SizeOfQueue() +
SizeOfMessages() + SizeOfWatchers() +
sender_index * sizeof(Sender));
}
Watcher *GetWatcher(size_t watcher_index) {
+ static_assert(alignof(Watcher) <= kDataAlignment,
+ "kDataAlignment is too small");
return reinterpret_cast<Watcher *>(&data[0] + SizeOfQueue() +
SizeOfMessages() +
watcher_index * sizeof(Watcher));
}
AtomicIndex *GetQueue(uint32_t index) {
+ static_assert(alignof(AtomicIndex) <= kDataAlignment,
+ "kDataAlignment is too small");
return reinterpret_cast<AtomicIndex *>(&data[0] +
sizeof(AtomicIndex) * index);
}
@@ -109,6 +115,8 @@
// sender list, since those are messages available to be filled in and sent.
// This removes the need to find lost messages when a sender dies.
Message *GetMessage(Index index) {
+ static_assert(alignof(Message) <= kDataAlignment,
+ "kDataAlignment is too small");
return reinterpret_cast<Message *>(&data[0] + SizeOfQueue() +
index.message_index() * message_size());
}
diff --git a/aos/scoped/scoped_ptr.h b/aos/scoped/scoped_ptr.h
deleted file mode 100644
index 336e73b..0000000
--- a/aos/scoped/scoped_ptr.h
+++ /dev/null
@@ -1,43 +0,0 @@
-#ifndef AOS_SCOPED_PTR_H_
-#define AOS_SCOPED_PTR_H_
-
-#include "aos/macros.h"
-
-namespace aos {
-
-// A simple scoped_ptr implementation that works under both linux and vxworks.
-template<typename T>
-class scoped_ptr {
- public:
- typedef T element_type;
-
- explicit scoped_ptr(T *p = NULL) : p_(p) {}
- ~scoped_ptr() {
- delete p_;
- }
-
- T &operator*() const { return *p_; }
- T *operator->() const { return p_; }
- T *get() const { return p_; }
-
- operator bool() const { return p_ != NULL; }
-
- void swap(scoped_ptr<T> &other) {
- T *temp = other.p_;
- other.p_ = p_;
- p_ = other.p_;
- }
- void reset(T *p = NULL) {
- if (p_ != NULL) delete p_;
- p_ = p;
- }
-
- private:
- T *p_;
-
- DISALLOW_COPY_AND_ASSIGN(scoped_ptr<T>);
-};
-
-} // namespace aos
-
-#endif // AOS_SCOPED_PTR_H_
diff --git a/aos/util/file.cc b/aos/util/file.cc
index af1f85b..b334ded 100644
--- a/aos/util/file.cc
+++ b/aos/util/file.cc
@@ -1,6 +1,8 @@
#include "aos/util/file.h"
#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
#include <unistd.h>
#include <string_view>
@@ -46,5 +48,23 @@
}
}
+void MkdirP(std::string_view path, mode_t mode) {
+ auto last_slash_pos = path.find_last_of("/");
+
+ std::string folder(last_slash_pos == std::string_view::npos
+ ? std::string_view("")
+ : path.substr(0, last_slash_pos));
+ if (folder.empty()) return;
+ MkdirP(folder, mode);
+ const int result = mkdir(folder.c_str(), mode);
+ if (result == -1 && errno == EEXIST) {
+ VLOG(2) << folder << " already exists";
+ return;
+ } else {
+ VLOG(1) << "Created " << folder;
+ }
+ PCHECK(result == 0) << ": Error creating " << folder;
+}
+
} // namespace util
} // namespace aos
diff --git a/aos/util/file.h b/aos/util/file.h
index 3aebd87..d6724af 100644
--- a/aos/util/file.h
+++ b/aos/util/file.h
@@ -15,6 +15,8 @@
void WriteStringToFileOrDie(const std::string_view filename,
const std::string_view contents);
+void MkdirP(std::string_view path, mode_t mode);
+
} // namespace util
} // namespace aos
diff --git a/debian/matplotlib.bzl b/debian/matplotlib.bzl
index be520f3..7032745 100644
--- a/debian/matplotlib.bzl
+++ b/debian/matplotlib.bzl
@@ -100,146 +100,146 @@
}
def build_matplotlib(version, tkinter_py_version = None, copy_shared_files = True):
- """Creates a py_library rule for matplotlib for the given python version.
+ """Creates a py_library rule for matplotlib for the given python version.
- See debian/matplotlib.BUILD for the usage.
+ See debian/matplotlib.BUILD for the usage.
- All the rules generated by this will be suffixed by version. Only one
- instance of this macro should set copy_shared_files, which generate the
- files that are shared between python versions.
+ All the rules generated by this will be suffixed by version. Only one
+ instance of this macro should set copy_shared_files, which generate the
+ files that are shared between python versions.
- tkinter_py_version is used because for the Python3 instance, some files
- are in folders named python3 and some are in folders named python3.5...
+ tkinter_py_version is used because for the Python3 instance, some files
+ are in folders named python3 and some are in folders named python3.5...
- version numbers should both be strings.
- """
- if tkinter_py_version == None:
- tkinter_py_version = version
+ version numbers should both be strings.
+ """
+ if tkinter_py_version == None:
+ tkinter_py_version = version
- native.genrule(
- name = "patch_init" + version,
- srcs = [
- "usr/lib/python" + version + "/dist-packages/matplotlib/__init__.py",
- "@//debian:matplotlib_patches",
- ],
- outs = [version + "/matplotlib/__init__.py"],
- cmd = " && ".join([
- "cp $(location usr/lib/python" + version + "/dist-packages/matplotlib/__init__.py) $@",
- "readonly PATCH=\"$$(readlink -f $(location @patch))\"",
- "readonly FILE=\"$$(readlink -f $(location @//debian:matplotlib_patches))\"",
- "(cd $(@D) && \"$${PATCH}\" -p1 < \"$${FILE}\") > /dev/null",
- ]),
- tools = [
- "@patch",
- ],
- )
-
- _src_files = native.glob(
- include = ["usr/lib/python" + version + "/dist-packages/**/*.py"],
- exclude = [
- "usr/lib/python" + version + "/dist-packages/matplotlib/__init__.py",
- ],
- )
-
- _data_files = native.glob([
- "usr/share/matplotlib/mpl-data/**",
- "usr/share/tcltk/**",
- ])
-
- _src_copied = ["/".join([version] + f.split("/")[4:]) for f in _src_files]
-
- _builtin_so_files = native.glob([
- "usr/lib/python" + version + "/dist-packages/**/*x86_64-linux-gnu.so",
- "usr/lib/python" + tkinter_py_version + "/lib-dynload/*.so",
- ])
-
- _system_so_files = native.glob([
- "usr/lib/**/*.so*",
- "lib/x86_64-linux-gnu/**/*.so*",
- ])
-
- _builtin_so_copied = ["/".join([version] + f.split("/")[4:]) for f in _builtin_so_files]
-
- rpath_prefix = "rpathed" + version + "/"
-
- _system_so_copied = [rpath_prefix + f for f in _system_so_files]
-
- _builtin_rpaths = [":".join([
- "\\$$ORIGIN/%s" % rel,
- "\\$$ORIGIN/%s/%s/usr/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
- "\\$$ORIGIN/%s/%s/usr/lib" % (rel, rpath_prefix),
- "\\$$ORIGIN/%s/%s/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
- ]) for rel in ["/".join([".." for _ in so.split("/")[1:]]) for so in _builtin_so_copied]]
-
- _system_rpaths = [":".join([
- "\\$$ORIGIN/%s/%s/usr/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
- "\\$$ORIGIN/%s/%s/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
- ]) for rel in ["/".join([".." for _ in so.split("/")[1:]]) for so in _system_so_copied]]
-
- native.genrule(
- name = "run_patchelf_builtin" + version,
- srcs = _builtin_so_files,
- outs = _builtin_so_copied,
- cmd = "\n".join(
- [
- "cp $(location %s) $(location %s)" % (src, dest)
- for src, dest in zip(_builtin_so_files, _builtin_so_copied)
- ] +
- ["$(location @patchelf) --set-rpath %s $(location %s)" % (rpath, so) for rpath, so in zip(_builtin_rpaths, _builtin_so_copied)],
- ),
- tools = [
- "@patchelf",
- ],
- )
-
- native.genrule(
- name = "run_patchelf_system" + version,
- srcs = _system_so_files,
- outs = _system_so_copied,
- cmd = "\n".join(
- [
- "cp $(location %s) $(location %s)" % (src, dest)
- for src, dest in zip(_system_so_files, _system_so_copied)
- ] +
- ["$(location @patchelf) --set-rpath %s $(location %s)" % (rpath, so) for rpath, so in zip(_system_rpaths, _system_so_copied)],
- ),
- tools = [
- "@patchelf",
- ],
- )
-
- native.genrule(
- name = "copy_files" + version,
- srcs = _src_files,
- outs = _src_copied,
- cmd = " && ".join(["cp $(location %s) $(location %s)" % (src, dest) for src, dest in zip(
- _src_files,
- _src_copied,
- )]),
- )
-
- if copy_shared_files:
native.genrule(
- name = "create_rc" + version,
- outs = ["usr/share/matplotlib/mpl-data/matplotlibrc"],
- cmd = "\n".join([
- "cat > $@ << END",
- # This is necessary to make matplotlib actually plot things to the
- # screen by default.
- "backend : TkAgg",
- "END",
+ name = "patch_init" + version,
+ srcs = [
+ "usr/lib/python" + version + "/dist-packages/matplotlib/__init__.py",
+ "@//debian:matplotlib_patches",
+ ],
+ outs = [version + "/matplotlib/__init__.py"],
+ cmd = " && ".join([
+ "cp $(location usr/lib/python" + version + "/dist-packages/matplotlib/__init__.py) $@",
+ "readonly PATCH=\"$$(readlink -f $(location @patch))\"",
+ "readonly FILE=\"$$(readlink -f $(location @//debian:matplotlib_patches))\"",
+ "(cd $(@D) && \"$${PATCH}\" -p1 < \"$${FILE}\") > /dev/null",
]),
+ tools = [
+ "@patch",
+ ],
)
- native.py_library(
- name = "matplotlib" + version,
- srcs = _src_copied + [
- version + "/matplotlib/__init__.py",
- ],
- data = _data_files + _builtin_so_copied + _system_so_copied + [
- ":usr/share/matplotlib/mpl-data/matplotlibrc",
- ] + native.glob(["etc/**"]),
- imports = ["usr/lib/python" + version + "/dist-packages", version, "."],
- restricted_to = ["@//tools:k8"],
- visibility = ["//visibility:public"],
- )
+ _src_files = native.glob(
+ include = ["usr/lib/python" + version + "/dist-packages/**/*.py"],
+ exclude = [
+ "usr/lib/python" + version + "/dist-packages/matplotlib/__init__.py",
+ ],
+ )
+
+ _data_files = native.glob([
+ "usr/share/matplotlib/mpl-data/**",
+ "usr/share/tcltk/**",
+ ])
+
+ _src_copied = ["/".join([version] + f.split("/")[4:]) for f in _src_files]
+
+ _builtin_so_files = native.glob([
+ "usr/lib/python" + version + "/dist-packages/**/*x86_64-linux-gnu.so",
+ "usr/lib/python" + tkinter_py_version + "/lib-dynload/*.so",
+ ])
+
+ _system_so_files = native.glob([
+ "usr/lib/**/*.so*",
+ "lib/x86_64-linux-gnu/**/*.so*",
+ ])
+
+ _builtin_so_copied = ["/".join([version] + f.split("/")[4:]) for f in _builtin_so_files]
+
+ rpath_prefix = "rpathed" + version + "/"
+
+ _system_so_copied = [rpath_prefix + f for f in _system_so_files]
+
+ _builtin_rpaths = [":".join([
+ "\\$$ORIGIN/%s" % rel,
+ "\\$$ORIGIN/%s/%s/usr/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
+ "\\$$ORIGIN/%s/%s/usr/lib" % (rel, rpath_prefix),
+ "\\$$ORIGIN/%s/%s/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
+ ]) for rel in ["/".join([".." for _ in so.split("/")[1:]]) for so in _builtin_so_copied]]
+
+ _system_rpaths = [":".join([
+ "\\$$ORIGIN/%s/%s/usr/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
+ "\\$$ORIGIN/%s/%s/lib/x86_64-linux-gnu" % (rel, rpath_prefix),
+ ]) for rel in ["/".join([".." for _ in so.split("/")[1:]]) for so in _system_so_copied]]
+
+ native.genrule(
+ name = "run_patchelf_builtin" + version,
+ srcs = _builtin_so_files,
+ outs = _builtin_so_copied,
+ cmd = "\n".join(
+ [
+ "cp $(location %s) $(location %s)" % (src, dest)
+ for src, dest in zip(_builtin_so_files, _builtin_so_copied)
+ ] +
+ ["$(location @patchelf) --set-rpath %s $(location %s)" % (rpath, so) for rpath, so in zip(_builtin_rpaths, _builtin_so_copied)],
+ ),
+ tools = [
+ "@patchelf",
+ ],
+ )
+
+ native.genrule(
+ name = "run_patchelf_system" + version,
+ srcs = _system_so_files,
+ outs = _system_so_copied,
+ cmd = "\n".join(
+ [
+ "cp $(location %s) $(location %s)" % (src, dest)
+ for src, dest in zip(_system_so_files, _system_so_copied)
+ ] +
+ ["$(location @patchelf) --set-rpath %s $(location %s)" % (rpath, so) for rpath, so in zip(_system_rpaths, _system_so_copied)],
+ ),
+ tools = [
+ "@patchelf",
+ ],
+ )
+
+ native.genrule(
+ name = "copy_files" + version,
+ srcs = _src_files,
+ outs = _src_copied,
+ cmd = " && ".join(["cp $(location %s) $(location %s)" % (src, dest) for src, dest in zip(
+ _src_files,
+ _src_copied,
+ )]),
+ )
+
+ if copy_shared_files:
+ native.genrule(
+ name = "create_rc" + version,
+ outs = ["usr/share/matplotlib/mpl-data/matplotlibrc"],
+ cmd = "\n".join([
+ "cat > $@ << END",
+ # This is necessary to make matplotlib actually plot things to the
+ # screen by default.
+ "backend : TkAgg",
+ "END",
+ ]),
+ )
+
+ native.py_library(
+ name = "matplotlib" + version,
+ srcs = _src_copied + [
+ version + "/matplotlib/__init__.py",
+ ],
+ data = _data_files + _builtin_so_copied + _system_so_copied + [
+ ":usr/share/matplotlib/mpl-data/matplotlibrc",
+ ] + native.glob(["etc/**"]),
+ imports = ["usr/lib/python" + version + "/dist-packages", version, "."],
+ restricted_to = ["@//tools:k8"],
+ visibility = ["//visibility:public"],
+ )
diff --git a/debian/packages.bzl b/debian/packages.bzl
index ba1f840..3043452 100644
--- a/debian/packages.bzl
+++ b/debian/packages.bzl
@@ -55,14 +55,14 @@
)
def _convert_deb_to_target(deb):
- """Converts a debian package filename to a valid bazel target name."""
- target = deb
- target = target.replace('-', '_')
- target = target.replace('.', '_')
- target = target.replace(':', '_')
- target = target.replace('+', 'x')
- target = target.replace('~', '_')
- return "deb_%s_repo" % target
+ """Converts a debian package filename to a valid bazel target name."""
+ target = deb
+ target = target.replace("-", "_")
+ target = target.replace(".", "_")
+ target = target.replace(":", "_")
+ target = target.replace("+", "x")
+ target = target.replace("~", "_")
+ return "deb_%s_repo" % target
def generate_repositories_for_debs(files, base_url = "http://www.frc971.org/Build-Dependencies"):
"""A WORKSPACE helper to add all the deb packages in the dictionary as a repo.
diff --git a/debian/webrtc.BUILD b/debian/webrtc.BUILD
new file mode 100644
index 0000000..e90e268
--- /dev/null
+++ b/debian/webrtc.BUILD
@@ -0,0 +1,18 @@
+load("@//tools/build_rules:select.bzl", "cpu_select")
+
+cc_library(
+ name = "webrtc",
+ visibility = ["//visibility:public"],
+ hdrs = glob(["include/**/*.h"]),
+ srcs = cpu_select({
+ "arm": ["lib/arm/Release/libwebrtc_full.a"],
+ "else": ["lib/x64/Release/libwebrtc_full.a"],
+ }),
+ includes = ["include"],
+ deps = [
+ "@com_google_absl//absl/strings",
+ "@com_google_absl//absl/types:optional",
+ "@com_google_absl//absl/types:variant",
+ "@com_google_absl//absl/algorithm:container",
+ ],
+)
diff --git a/third_party/BUILD b/third_party/BUILD
index a4e236f..13f015e 100644
--- a/third_party/BUILD
+++ b/third_party/BUILD
@@ -1,3 +1,5 @@
+load("@//tools/build_rules:select.bzl", "cpu_select")
+
cc_library(
name = "wpilib",
linkstatic = True,
@@ -86,3 +88,15 @@
"//conditions:default": [],
}),
)
+
+cc_library(
+ name = "webrtc",
+ visibility = ["//visibility:public"],
+ deps = cpu_select({
+ "amd64": ["@webrtc_x64//:webrtc"],
+ "armhf": ["@webrtc_arm//:webrtc"],
+ "cortex-m": ["@webrtc_arm//:webrtc"],
+ "roborio": ["@webrtc_rio//:webrtc"],
+ }),
+)
+