Merge "Add node_boots parsing from log file headers"
diff --git a/README.md b/README.md
index bcfe5c6..839f9c8 100644
--- a/README.md
+++ b/README.md
@@ -77,7 +77,7 @@
1. Step 1: Add Bazel distribution URI as a package source
```
sudo apt install curl
- curl -fsSL https://bazel.build/bazel-release.pub.gpg | apt-key add -
+ curl -fsSL https://bazel.build/bazel-release.pub.gpg | sudo apt-key add -
echo "deb [arch=amd64] https://storage.googleapis.com/bazel-apt stable jdk1.8" | sudo tee /etc/apt/sources.list.d/bazel.list
```
2. Step 2: Install Bazel
diff --git a/aos/configuration.cc b/aos/configuration.cc
index d794a37..76704a4 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -10,10 +10,14 @@
#include <map>
#include <set>
+#include <string>
#include <string_view>
+#include <vector>
#include "absl/container/btree_set.h"
#include "absl/strings/str_cat.h"
+#include "absl/strings/str_join.h"
+#include "absl/strings/str_split.h"
#include "aos/configuration_generated.h"
#include "aos/flatbuffer_merge.h"
#include "aos/json_to_flatbuffer.h"
@@ -131,10 +135,33 @@
return buffer;
}
+std::string RemoveDotDots(const std::string_view filename) {
+ std::vector<std::string> split = absl::StrSplit(filename, '/');
+ auto iterator = split.begin();
+ while (iterator != split.end()) {
+ if (iterator->empty()) {
+ iterator = split.erase(iterator);
+ } else if (*iterator == ".") {
+ iterator = split.erase(iterator);
+ } else if (*iterator == "..") {
+ CHECK(iterator != split.begin())
+ << ": Import path may not start with ..: " << filename;
+ auto previous = iterator;
+ --previous;
+ split.erase(iterator);
+ iterator = split.erase(previous);
+ } else {
+ ++iterator;
+ }
+ }
+ return absl::StrJoin(split, "/");
+}
+
FlatbufferDetachedBuffer<Configuration> ReadConfig(
const std::string_view path, absl::btree_set<std::string> *visited_paths,
const std::vector<std::string_view> &extra_import_paths) {
std::string binary_path = MaybeReplaceExtension(path, ".json", ".bfbs");
+ VLOG(1) << "Looking up: " << path << ", starting with: " << binary_path;
bool binary_path_exists = util::PathExists(binary_path);
std::string raw_path(path);
// For each .json file, look and see if we can find a .bfbs file next to it
@@ -151,13 +178,15 @@
bool found_path = false;
for (const auto &import_path : extra_import_paths) {
- raw_path = std::string(import_path) + "/" + std::string(path);
+ raw_path = std::string(import_path) + "/" + RemoveDotDots(path);
binary_path = MaybeReplaceExtension(raw_path, ".json", ".bfbs");
+ VLOG(1) << "Checking: " << binary_path;
binary_path_exists = util::PathExists(binary_path);
if (binary_path_exists) {
found_path = true;
break;
}
+ VLOG(1) << "Checking: " << raw_path;
if (util::PathExists(raw_path)) {
found_path = true;
break;
@@ -560,11 +589,11 @@
FlatbufferDetachedBuffer<Configuration> ReadConfig(
const std::string_view path,
- const std::vector<std::string_view> &import_paths) {
+ const std::vector<std::string_view> &extra_import_paths) {
// We only want to read a file once. So track the visited files in a set.
absl::btree_set<std::string> visited_paths;
FlatbufferDetachedBuffer<Configuration> read_config =
- ReadConfig(path, &visited_paths, import_paths);
+ ReadConfig(path, &visited_paths, extra_import_paths);
// If we only read one file, and it had a .bfbs extension, it has to be a
// fully formatted config. Do a quick verification and return it.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 4194514..79d09e4 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -225,7 +225,10 @@
cc_test(
name = "pingpong_test",
srcs = ["pingpong_test.cc"],
- data = [":pingpong_config"],
+ data = [
+ ":multinode_pingpong_test_split_config",
+ ":pingpong_config",
+ ],
target_compatible_with = ["@platforms//os:linux"],
deps = [
":ping_lib",
diff --git a/aos/events/channel_preallocated_allocator.h b/aos/events/channel_preallocated_allocator.h
index c4f0eca..76fb694 100644
--- a/aos/events/channel_preallocated_allocator.h
+++ b/aos/events/channel_preallocated_allocator.h
@@ -62,11 +62,12 @@
uint8_t *reallocate_downward(uint8_t * /*old_p*/, size_t /*old_size*/,
size_t new_size, size_t /*in_use_back*/,
size_t /*in_use_front*/) override {
- LOG(FATAL) << "Requested " << new_size << " bytes, max size "
- << channel_->max_size() << " for channel "
- << configuration::CleanedChannelToString(channel_)
- << ". Increase the memory reserved to at least " << new_size
- << ".";
+ LOG(FATAL)
+ << "Requested " << new_size
+ << " bytes (includes extra for room to grow even more), max size "
+ << channel_->max_size() << " for channel "
+ << configuration::CleanedChannelToString(channel_)
+ << ". Increase the memory reserved to at least " << new_size << ".";
return nullptr;
}
diff --git a/aos/events/epoll.cc b/aos/events/epoll.cc
index 1c4427b..8cb553b 100644
--- a/aos/events/epoll.cc
+++ b/aos/events/epoll.cc
@@ -4,6 +4,7 @@
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <unistd.h>
+
#include <atomic>
#include <vector>
@@ -109,62 +110,66 @@
}
EventData *const event_data = static_cast<struct EventData *>(event.data.ptr);
- if (event.events & kInEvents) {
- CHECK(event_data->in_fn)
- << ": No handler registered for input events on " << event_data->fd;
- event_data->in_fn();
- }
- if (event.events & kOutEvents) {
- CHECK(event_data->out_fn)
- << ": No handler registered for output events on " << event_data->fd;
- event_data->out_fn();
- }
- if (event.events & kErrorEvents) {
- CHECK(event_data->err_fn)
- << ": No handler registered for error events on " << event_data->fd;
- event_data->err_fn();
- }
+ event_data->DoCallbacks(event.events);
return true;
}
-void EPoll::Quit() { PCHECK(write(quit_signal_fd_, "q", 1) == 1); }
+void EPoll::Quit() {
+ // Shortcut to break us out of infinite loops. We might write more than once
+ // to the pipe, but we'll stop once the first is read on the other end.
+ if (!run_) {
+ return;
+ }
+ PCHECK(write(quit_signal_fd_, "q", 1) == 1);
+}
void EPoll::OnReadable(int fd, ::std::function<void()> function) {
EventData *event_data = GetEventData(fd);
if (event_data == nullptr) {
- fns_.emplace_back(std::make_unique<EventData>(fd));
+ fns_.emplace_back(std::make_unique<InOutEventData>(fd));
event_data = fns_.back().get();
} else {
- CHECK(!event_data->in_fn) << ": Duplicate in functions for " << fd;
+ CHECK(!static_cast<InOutEventData *>(event_data)->in_fn)
+ << ": Duplicate in functions for " << fd;
}
- event_data->in_fn = ::std::move(function);
+ static_cast<InOutEventData *>(event_data)->in_fn = ::std::move(function);
DoEpollCtl(event_data, event_data->events | kInEvents);
}
void EPoll::OnError(int fd, ::std::function<void()> function) {
EventData *event_data = GetEventData(fd);
if (event_data == nullptr) {
- fns_.emplace_back(std::make_unique<EventData>(fd));
+ fns_.emplace_back(std::make_unique<InOutEventData>(fd));
event_data = fns_.back().get();
} else {
- CHECK(!event_data->err_fn) << ": Duplicate in functions for " << fd;
+ CHECK(!static_cast<InOutEventData *>(event_data)->err_fn)
+ << ": Duplicate error functions for " << fd;
}
- event_data->err_fn = ::std::move(function);
+ static_cast<InOutEventData *>(event_data)->err_fn = ::std::move(function);
DoEpollCtl(event_data, event_data->events | kErrorEvents);
}
void EPoll::OnWriteable(int fd, ::std::function<void()> function) {
EventData *event_data = GetEventData(fd);
if (event_data == nullptr) {
- fns_.emplace_back(std::make_unique<EventData>(fd));
+ fns_.emplace_back(std::make_unique<InOutEventData>(fd));
event_data = fns_.back().get();
} else {
- CHECK(!event_data->out_fn) << ": Duplicate out functions for " << fd;
+ CHECK(!static_cast<InOutEventData *>(event_data)->out_fn)
+ << ": Duplicate out functions for " << fd;
}
- event_data->out_fn = ::std::move(function);
+ static_cast<InOutEventData *>(event_data)->out_fn = ::std::move(function);
DoEpollCtl(event_data, event_data->events | kOutEvents);
}
+void EPoll::OnEvents(int fd, ::std::function<void(uint32_t)> function) {
+ if (GetEventData(fd) != nullptr) {
+ LOG(FATAL) << "May not replace OnEvents handlers";
+ }
+ fns_.emplace_back(std::make_unique<SingleEventData>(fd));
+ static_cast<SingleEventData *>(fns_.back().get())->fn = std::move(function);
+}
+
void EPoll::ForgetClosedFd(int fd) {
auto element = fns_.begin();
while (fns_.end() != element) {
@@ -177,6 +182,10 @@
LOG(FATAL) << "fd " << fd << " not found";
}
+void EPoll::SetEvents(int fd, uint32_t events) {
+ DoEpollCtl(CHECK_NOTNULL(GetEventData(fd)), events);
+}
+
// Removes fd from the event loop.
void EPoll::DeleteFd(int fd) {
auto element = fns_.begin();
@@ -192,6 +201,21 @@
LOG(FATAL) << "fd " << fd << " not found";
}
+void EPoll::InOutEventData::DoCallbacks(uint32_t events) {
+ if (events & kInEvents) {
+ CHECK(in_fn) << ": No handler registered for input events on " << fd;
+ in_fn();
+ }
+ if (events & kOutEvents) {
+ CHECK(out_fn) << ": No handler registered for output events on " << fd;
+ out_fn();
+ }
+ if (events & kErrorEvents) {
+ CHECK(err_fn) << ": No handler registered for error events on " << fd;
+ err_fn();
+ }
+}
+
void EPoll::EnableEvents(int fd, uint32_t events) {
EventData *const event_data = CHECK_NOTNULL(GetEventData(fd));
DoEpollCtl(event_data, event_data->events | events);
@@ -214,12 +238,14 @@
void EPoll::DoEpollCtl(EventData *event_data, const uint32_t new_events) {
const uint32_t old_events = event_data->events;
+ if (old_events == new_events) {
+ // Shortcut without calling into the kernel. This happens often with
+ // external event loop integrations that are emulating poll, so make it
+ // fast.
+ return;
+ }
event_data->events = new_events;
if (new_events == 0) {
- if (old_events == 0) {
- // Not added, and doesn't need to be. Nothing to do here.
- return;
- }
// It was added, but should now be removed.
PCHECK(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, event_data->fd, nullptr) == 0);
return;
diff --git a/aos/events/epoll.h b/aos/events/epoll.h
index 4bcedf1..2b7eb76 100644
--- a/aos/events/epoll.h
+++ b/aos/events/epoll.h
@@ -5,6 +5,7 @@
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <unistd.h>
+
#include <atomic>
#include <functional>
#include <vector>
@@ -68,21 +69,34 @@
// Quits. Async safe.
void Quit();
- // Called before waiting on the epoll file descriptor.
+ // Adds a function which will be called before waiting on the epoll file
+ // descriptor.
void BeforeWait(std::function<void()> function);
// Registers a function to be called if the fd becomes readable.
// Only one function may be registered for readability on each fd.
+ // A fd may be registered exclusively with OnReadable/OnWriteable/OnError OR
+ // OnEvents.
void OnReadable(int fd, ::std::function<void()> function);
// Registers a function to be called if the fd reports an error.
// Only one function may be registered for errors on each fd.
+ // A fd may be registered exclusively with OnReadable/OnWriteable/OnError OR
+ // OnEvents.
void OnError(int fd, ::std::function<void()> function);
// Registers a function to be called if the fd becomes writeable.
// Only one function may be registered for writability on each fd.
+ // A fd may be registered exclusively with OnReadable/OnWriteable/OnError OR
+ // OnEvents.
void OnWriteable(int fd, ::std::function<void()> function);
+ // Registers a function to be called when the configured events occur on fd.
+ // Which events occur will be passed to the function.
+ // A fd may be registered exclusively with OnReadable/OnWriteable/OnError OR
+ // OnEvents.
+ void OnEvents(int fd, ::std::function<void(uint32_t)> function);
+
// Removes fd from the event loop.
// All Fds must be cleaned up before this class is destroyed.
void DeleteFd(int fd);
@@ -100,19 +114,50 @@
// writeable.
void DisableWriteable(int fd) { DisableEvents(fd, kOutEvents); }
+ // Sets the epoll events for the given fd. Be careful using this with
+ // OnReadable/OnWriteable/OnError: enabled events which fire with no handler
+ // registered will result in a crash.
+ void SetEvents(int fd, uint32_t events);
+
+ // Returns whether we're currently running. This changes to false when we
+ // start draining events to finish.
+ bool should_run() const { return run_; }
+
private:
// Structure whose pointer should be returned by epoll. Makes looking up the
// function fast and easy.
struct EventData {
EventData(int fd_in) : fd(fd_in) {}
+ virtual ~EventData() = default;
+
// We use pointers to these objects as persistent identifiers, so they can't
// be moved.
EventData(const EventData &) = delete;
EventData &operator=(const EventData &) = delete;
+ // Calls the appropriate callbacks when events are returned from the kernel.
+ virtual void DoCallbacks(uint32_t events) = 0;
+
const int fd;
uint32_t events = 0;
+ };
+
+ struct InOutEventData : public EventData {
+ InOutEventData(int fd) : EventData(fd) {}
+ ~InOutEventData() override = default;
+
std::function<void()> in_fn, out_fn, err_fn;
+
+ void DoCallbacks(uint32_t events) override;
+ };
+
+ struct SingleEventData : public EventData {
+ SingleEventData(int fd) : EventData(fd) {}
+ ~SingleEventData() override = default;
+
+ std::function<void(uint32_t)> fn;
+
+ void DoCallbacks(uint32_t events) override { fn(events); }
};
void EnableEvents(int fd, uint32_t events);
diff --git a/aos/events/epoll_test.cc b/aos/events/epoll_test.cc
index 66460c2..053a09b 100644
--- a/aos/events/epoll_test.cc
+++ b/aos/events/epoll_test.cc
@@ -3,8 +3,8 @@
#include <fcntl.h>
#include <unistd.h>
-#include "gtest/gtest.h"
#include "glog/logging.h"
+#include "gtest/gtest.h"
namespace aos {
namespace internal {
@@ -48,22 +48,22 @@
};
class EPollTest : public ::testing::Test {
- public:
- void RunFor(std::chrono::nanoseconds duration) {
- TimerFd timerfd;
- bool did_quit = false;
- epoll_.OnReadable(timerfd.fd(), [this, &timerfd, &did_quit]() {
- CHECK(!did_quit);
- epoll_.Quit();
- did_quit = true;
- timerfd.Read();
- });
- timerfd.SetTime(monotonic_clock::now() + duration,
- monotonic_clock::duration::zero());
- epoll_.Run();
- CHECK(did_quit);
- epoll_.DeleteFd(timerfd.fd());
- }
+ public:
+ void RunFor(std::chrono::nanoseconds duration) {
+ TimerFd timerfd;
+ bool did_quit = false;
+ epoll_.OnReadable(timerfd.fd(), [this, &timerfd, &did_quit]() {
+ CHECK(!did_quit);
+ epoll_.Quit();
+ did_quit = true;
+ timerfd.Read();
+ });
+ timerfd.SetTime(monotonic_clock::now() + duration,
+ monotonic_clock::duration::zero());
+ epoll_.Run();
+ CHECK(did_quit);
+ epoll_.DeleteFd(timerfd.fd());
+ }
// Tests should avoid relying on ordering for events closer in time than this,
// or waiting for longer than this to ensure events happen in order.
@@ -71,7 +71,7 @@
return std::chrono::milliseconds(50);
}
- EPoll epoll_;
+ EPoll epoll_;
};
// Test that the basics of OnReadable work.
@@ -201,6 +201,11 @@
epoll_.DeleteFd(pipe.write_fd());
}
+TEST_F(EPollTest, QuitInBeforeWait) {
+ epoll_.BeforeWait([this]() { epoll_.Quit(); });
+ epoll_.Run();
+}
+
} // namespace testing
} // namespace internal
} // namespace aos
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index 29e4c22..b0487d5 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -493,6 +493,19 @@
return result;
}
+void EventLoop::SetTimerContext(
+ monotonic_clock::time_point monotonic_event_time) {
+ context_.monotonic_event_time = monotonic_event_time;
+ context_.monotonic_remote_time = monotonic_clock::min_time;
+ context_.realtime_event_time = realtime_clock::min_time;
+ context_.realtime_remote_time = realtime_clock::min_time;
+ context_.queue_index = 0xffffffffu;
+ context_.size = 0u;
+ context_.data = nullptr;
+ context_.buffer_index = -1;
+ context_.source_boot_uuid = boot_uuid();
+}
+
void WatcherState::set_timing_report(timing::Watcher *watcher) {
CHECK_NOTNULL(watcher);
watcher_ = watcher;
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index d23314e..8ef8d57 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -7,6 +7,7 @@
#include <string>
#include <string_view>
+#include "absl/container/btree_set.h"
#include "aos/configuration.h"
#include "aos/configuration_generated.h"
#include "aos/events/channel_preallocated_allocator.h"
@@ -20,8 +21,6 @@
#include "aos/time/time.h"
#include "aos/util/phased_loop.h"
#include "aos/uuid.h"
-
-#include "absl/container/btree_set.h"
#include "flatbuffers/flatbuffers.h"
#include "glog/logging.h"
@@ -77,7 +76,7 @@
// UUID of the remote node which sent this message, or this node in the case
// of events which are local to this node.
- UUID remote_boot_uuid = UUID::Zero();
+ UUID source_boot_uuid = UUID::Zero();
// Efficiently copies the flatbuffer into a FlatbufferVector, allocating
// memory in the process. It is vital that T matches the type of the
@@ -115,6 +114,7 @@
protected:
EventLoop *event_loop() { return event_loop_; }
+ const EventLoop *event_loop() const { return event_loop_; }
Context context_;
@@ -154,7 +154,7 @@
bool Send(size_t size);
bool Send(size_t size, monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index, const UUID &remote_boot_uuid);
+ uint32_t remote_queue_index, const UUID &source_boot_uuid);
// Sends a single block of data by copying it.
// The remote arguments have the same meaning as in Send above.
@@ -162,7 +162,7 @@
bool Send(const void *data, size_t size,
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index, const UUID &remote_boot_uuid);
+ uint32_t remote_queue_index, const UUID &source_boot_uuid);
const Channel *channel() const { return channel_; }
@@ -190,6 +190,7 @@
protected:
EventLoop *event_loop() { return event_loop_; }
+ const EventLoop *event_loop() const { return event_loop_; }
monotonic_clock::time_point monotonic_sent_time_ = monotonic_clock::min_time;
realtime_clock::time_point realtime_sent_time_ = realtime_clock::min_time;
@@ -202,12 +203,12 @@
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
- const UUID &remote_boot_uuid) = 0;
+ const UUID &source_boot_uuid) = 0;
virtual bool DoSend(size_t size,
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
- const UUID &remote_boot_uuid) = 0;
+ const UUID &source_boot_uuid) = 0;
EventLoop *const event_loop_;
const Channel *const channel_;
@@ -473,8 +474,13 @@
Ftrace ftrace_;
};
+// Note, it is supported to create only:
+// multiple fetchers, and (one sender or one watcher) per <name, type>
+// tuple.
class EventLoop {
public:
+ // Holds configuration by reference for the lifetime of this object. It may
+ // never be mutated externally in any way.
EventLoop(const Configuration *configuration);
virtual ~EventLoop();
@@ -495,10 +501,6 @@
return GetChannel<T>(channel_name) != nullptr;
}
- // Note, it is supported to create:
- // multiple fetchers, and (one sender or one watcher) per <name, type>
- // tuple.
-
// Makes a class that will always fetch the most recent value
// sent to the provided channel.
template <typename T>
@@ -596,7 +598,7 @@
// TODO(austin): OnExit for cleanup.
- // Threadsafe.
+ // May be safely called from any thread.
bool is_running() const { return is_running_.load(); }
// Sets the scheduler priority to run the event loop at. This may not be
@@ -737,6 +739,10 @@
// If true, don't send AOS_LOG to /aos
bool skip_logger_ = false;
+ // Sets context_ for a timed event which is supposed to happen at the provided
+ // time.
+ void SetTimerContext(monotonic_clock::time_point monotonic_event_time);
+
private:
virtual pid_t GetTid() = 0;
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 2ffcb58..1460663 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -274,7 +274,7 @@
EXPECT_EQ(fetcher.context().monotonic_remote_time, monotonic_clock::min_time);
EXPECT_EQ(fetcher.context().realtime_event_time, realtime_clock::min_time);
EXPECT_EQ(fetcher.context().realtime_remote_time, realtime_clock::min_time);
- EXPECT_EQ(fetcher.context().remote_boot_uuid, UUID::Zero());
+ EXPECT_EQ(fetcher.context().source_boot_uuid, UUID::Zero());
EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
EXPECT_EQ(fetcher.context().size, 0u);
EXPECT_EQ(fetcher.context().data, nullptr);
@@ -302,7 +302,7 @@
EXPECT_LE(fetcher.context().monotonic_event_time, monotonic_now + kEpsilon);
EXPECT_GE(fetcher.context().realtime_event_time, realtime_now - kEpsilon);
EXPECT_LE(fetcher.context().realtime_event_time, realtime_now + kEpsilon);
- EXPECT_EQ(fetcher.context().remote_boot_uuid, loop2->boot_uuid());
+ EXPECT_EQ(fetcher.context().source_boot_uuid, loop2->boot_uuid());
EXPECT_EQ(fetcher.context().queue_index, 0x0u);
EXPECT_EQ(fetcher.context().size, 20u);
EXPECT_NE(fetcher.context().data, nullptr);
@@ -957,7 +957,7 @@
EXPECT_EQ(loop->context().monotonic_remote_time, monotonic_clock::min_time);
EXPECT_EQ(loop->context().realtime_event_time, realtime_clock::min_time);
EXPECT_EQ(loop->context().realtime_remote_time, realtime_clock::min_time);
- EXPECT_EQ(loop->context().remote_boot_uuid, loop->boot_uuid());
+ EXPECT_EQ(loop->context().source_boot_uuid, loop->boot_uuid());
EXPECT_EQ(loop->context().queue_index, 0xffffffffu);
EXPECT_EQ(loop->context().size, 0u);
EXPECT_EQ(loop->context().data, nullptr);
@@ -1252,7 +1252,7 @@
loop1->context().monotonic_event_time);
EXPECT_EQ(loop1->context().realtime_remote_time,
loop1->context().realtime_event_time);
- EXPECT_EQ(loop1->context().remote_boot_uuid, loop1->boot_uuid());
+ EXPECT_EQ(loop1->context().source_boot_uuid, loop1->boot_uuid());
const aos::monotonic_clock::time_point monotonic_now =
loop1->monotonic_now();
@@ -1300,7 +1300,7 @@
fetcher.context().realtime_remote_time);
EXPECT_EQ(fetcher.context().monotonic_event_time,
fetcher.context().monotonic_remote_time);
- EXPECT_EQ(fetcher.context().remote_boot_uuid, loop1->boot_uuid());
+ EXPECT_EQ(fetcher.context().source_boot_uuid, loop1->boot_uuid());
EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500))
<< ": Got "
@@ -1353,7 +1353,7 @@
loop1->context().monotonic_event_time);
EXPECT_EQ(loop1->context().realtime_remote_time,
loop1->context().realtime_event_time);
- EXPECT_EQ(loop1->context().remote_boot_uuid, loop1->boot_uuid());
+ EXPECT_EQ(loop1->context().source_boot_uuid, loop1->boot_uuid());
const aos::monotonic_clock::time_point monotonic_now =
loop1->monotonic_now();
@@ -1387,7 +1387,7 @@
fetcher.context().realtime_remote_time);
EXPECT_EQ(fetcher.context().monotonic_event_time,
fetcher.context().monotonic_remote_time);
- EXPECT_EQ(fetcher.context().remote_boot_uuid, loop1->boot_uuid());
+ EXPECT_EQ(fetcher.context().source_boot_uuid, loop1->boot_uuid());
EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500))
<< ": Got "
@@ -1445,7 +1445,7 @@
EXPECT_EQ(loop1->context().monotonic_remote_time,
monotonic_clock::min_time);
- EXPECT_EQ(loop1->context().remote_boot_uuid, loop1->boot_uuid());
+ EXPECT_EQ(loop1->context().source_boot_uuid, loop1->boot_uuid());
EXPECT_EQ(loop1->context().realtime_event_time,
realtime_clock::min_time);
EXPECT_EQ(loop1->context().realtime_remote_time,
@@ -1956,7 +1956,7 @@
const aos::realtime_clock::time_point realtime_remote_time =
aos::realtime_clock::time_point(chrono::seconds(3132));
const uint32_t remote_queue_index = 0x254971;
- const UUID remote_boot_uuid = UUID::Random();
+ const UUID source_boot_uuid = UUID::Random();
std::unique_ptr<aos::RawSender> sender =
loop1->MakeRawSender(configuration::GetChannel(
@@ -1969,20 +1969,20 @@
loop2->OnRun([&]() {
EXPECT_TRUE(sender->Send(kMessage.span().data(), kMessage.span().size(),
monotonic_remote_time, realtime_remote_time,
- remote_queue_index, remote_boot_uuid));
+ remote_queue_index, source_boot_uuid));
});
bool happened = false;
loop2->MakeRawWatcher(
configuration::GetChannel(loop2->configuration(), "/test",
"aos.TestMessage", "", nullptr),
- [this, monotonic_remote_time, realtime_remote_time, remote_boot_uuid,
+ [this, monotonic_remote_time, realtime_remote_time, source_boot_uuid,
remote_queue_index, &fetcher,
&happened](const Context &context, const void * /*message*/) {
happened = true;
EXPECT_EQ(monotonic_remote_time, context.monotonic_remote_time);
EXPECT_EQ(realtime_remote_time, context.realtime_remote_time);
- EXPECT_EQ(remote_boot_uuid, context.remote_boot_uuid);
+ EXPECT_EQ(source_boot_uuid, context.source_boot_uuid);
EXPECT_EQ(remote_queue_index, context.remote_queue_index);
ASSERT_TRUE(fetcher->Fetch());
@@ -2212,6 +2212,35 @@
Run();
}
+// Tests that event loop's context's monotonic time is set to a value on OnRun.
+TEST_P(AbstractEventLoopTest, SetContextOnRun) {
+ auto loop = MakePrimary();
+
+ // We want to check that monotonic event time is before monotonic now
+ // called inside of callback, but after time point obtained callback.
+ aos::monotonic_clock::time_point monotonic_event_time_on_run;
+
+ loop->OnRun([&]() {
+ monotonic_event_time_on_run = loop->context().monotonic_event_time;
+ EXPECT_LE(monotonic_event_time_on_run, loop->monotonic_now());
+ EXPECT_EQ(loop->context().monotonic_remote_time, monotonic_clock::min_time);
+ EXPECT_EQ(loop->context().realtime_event_time, realtime_clock::min_time);
+ EXPECT_EQ(loop->context().realtime_remote_time, realtime_clock::min_time);
+ EXPECT_EQ(loop->context().source_boot_uuid, loop->boot_uuid());
+ EXPECT_EQ(loop->context().queue_index, 0xffffffffu);
+ EXPECT_EQ(loop->context().size, 0u);
+ EXPECT_EQ(loop->context().data, nullptr);
+ EXPECT_EQ(loop->context().buffer_index, -1);
+ });
+
+ EndEventLoop(loop.get(), ::std::chrono::milliseconds(200));
+
+ const aos::monotonic_clock::time_point before_run_time =
+ loop->monotonic_now();
+ Run();
+ EXPECT_GE(monotonic_event_time_on_run, before_run_time);
+}
+
// Tests that watchers fail when created on the wrong node.
TEST_P(AbstractEventLoopDeathTest, NodeWatcher) {
EnableNodes("them");
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index e4e879f..c509170 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -182,15 +182,7 @@
CHECK_NOTNULL(timing_.timer);
const monotonic_clock::time_point monotonic_start_time = get_time();
- event_loop_->context_.monotonic_event_time = event_time;
- event_loop_->context_.monotonic_remote_time = monotonic_clock::min_time;
- event_loop_->context_.realtime_remote_time =
- event_loop_->context_.realtime_event_time = realtime_clock::min_time;
- event_loop_->context_.queue_index = 0xffffffffu;
- event_loop_->context_.size = 0;
- event_loop_->context_.data = nullptr;
- event_loop_->context_.buffer_index = -1;
- event_loop_->context_.remote_boot_uuid = event_loop_->boot_uuid();
+ event_loop_->SetTimerContext(event_time);
ftrace_.FormatMessage(
"timer: %.*s: start now=%" PRId64 " event=%" PRId64,
@@ -228,15 +220,7 @@
const monotonic_clock::time_point monotonic_start_time = get_time();
// Update the context to hold the desired wakeup time.
- event_loop_->context_.monotonic_event_time = phased_loop_.sleep_time();
- event_loop_->context_.monotonic_remote_time = monotonic_clock::min_time;
- event_loop_->context_.realtime_remote_time =
- event_loop_->context_.realtime_event_time = realtime_clock::min_time;
- event_loop_->context_.queue_index = 0xffffffffu;
- event_loop_->context_.size = 0;
- event_loop_->context_.data = nullptr;
- event_loop_->context_.buffer_index = -1;
- event_loop_->context_.remote_boot_uuid = event_loop_->boot_uuid();
+ event_loop_->SetTimerContext(phased_loop_.sleep_time());
// Compute how many cycles elapsed
cycles_elapsed_ += phased_loop_.Iterate(monotonic_start_time);
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index 49b170d..3739f49 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -50,6 +50,8 @@
::std::function<void()> callback = ::std::move(iter->second);
events_list_.erase(iter);
callback();
+
+ converter_->ObserveTimePassed(scheduler_scheduler_->distributed_now());
}
void EventScheduler::RunOnRun() {
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 397b5f0..f981ef2 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -58,6 +58,9 @@
// node.
virtual monotonic_clock::time_point FromDistributedClock(
size_t node_index, distributed_clock::time_point time) = 0;
+
+ // Called whenever time passes this point and we can forget about it.
+ virtual void ObserveTimePassed(distributed_clock::time_point time) = 0;
};
class EventSchedulerScheduler;
@@ -151,6 +154,8 @@
size_t /*node_index*/, distributed_clock::time_point time) override {
return monotonic_clock::epoch() + time.time_since_epoch();
}
+
+ void ObserveTimePassed(distributed_clock::time_point /*time*/) override {}
};
UnityConverter unity_converter_;
diff --git a/aos/events/event_scheduler_test.cc b/aos/events/event_scheduler_test.cc
index 1ad0a84..3bcf6ca 100644
--- a/aos/events/event_scheduler_test.cc
+++ b/aos/events/event_scheduler_test.cc
@@ -41,6 +41,8 @@
distributed_offset_[node_index];
}
+ void ObserveTimePassed(distributed_clock::time_point /*time*/) override {}
+
private:
// Offset to the distributed clock.
// distributed = monotonic + offset;
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 5c69f07..342a491 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -39,6 +39,8 @@
"confirming they can be parsed.");
DEFINE_bool(print_parts_only, false,
"If true, only print out the results of logfile sorting.");
+DEFINE_bool(channels, false,
+ "If true, print out all the configured channels for this log.");
// Print the flatbuffer out to stdout, both to remove the unnecessary cruft from
// glog and to allow the user to readily redirect just the logged output
@@ -227,6 +229,15 @@
aos::logger::LogReader reader(logfiles);
+ if (FLAGS_channels) {
+ const aos::Configuration *config = reader.configuration();
+ for (const aos::Channel *channel : *config->channels()) {
+ std::cout << channel->name()->c_str() << " " << channel->type()->c_str()
+ << '\n';
+ }
+ return 0;
+ }
+
aos::FastStringBuilder builder;
aos::SimulatedEventLoopFactory event_loop_factory(reader.configuration());
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 6630441..abae9a1 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -713,7 +713,7 @@
if (our_node_index != f.data_node_index) {
// And update our boot UUID if the UUID has changed.
if (node_state_[f.data_node_index].SetBootUUID(
- f.fetcher->context().remote_boot_uuid)) {
+ f.fetcher->context().source_boot_uuid)) {
MaybeWriteHeader(f.data_node_index);
}
}
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 89be828..cd2ceb8 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -320,7 +320,7 @@
static const std::string_view kXz = ".xz";
if (filename.substr(filename.size() - kXz.size()) == kXz) {
#if ENABLE_LZMA
- decoder_ = std::make_unique<LzmaDecoder>(filename);
+ decoder_ = std::make_unique<ThreadedLzmaDecoder>(filename);
#else
LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
#endif
@@ -972,6 +972,8 @@
CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
timestamp_mapper->nodes_data_[node()].peer = this;
+
+ node_data->save_for_peer = true;
}
}
@@ -1271,6 +1273,7 @@
}
for (NodeData &node_data : nodes_data_) {
if (!node_data.any_delivered) continue;
+ if (!node_data.save_for_peer) continue;
if (node_data.channels[m->channel_index].delivered) {
// TODO(austin): This copies the data... Probably not worth stressing
// about yet.
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index f31f4df..56b582f 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -654,6 +654,9 @@
// bools in delivered below are true.
bool any_delivered = false;
+ // True if we have a peer and therefore should be saving data for it.
+ bool save_for_peer = false;
+
// Peer pointer. This node is only to be considered if a peer is set.
TimestampMapper *peer = nullptr;
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
index 54dc6f7..60a203e 100644
--- a/aos/events/logging/lzma_encoder.cc
+++ b/aos/events/logging/lzma_encoder.cc
@@ -205,4 +205,102 @@
return end - begin;
}
+ThreadedLzmaDecoder::ThreadedLzmaDecoder(std::string_view filename)
+ : decoder_(filename), decode_thread_([this] {
+ std::unique_lock lock(decode_mutex_);
+ while (true) {
+ // Wake if the queue is too small or we are finished.
+ continue_decoding_.wait(lock, [this] {
+ return decoded_queue_.size() < kQueueSize || finished_;
+ });
+
+ if (finished_) {
+ return;
+ }
+
+ while (true) {
+ CHECK(!finished_);
+ // Release our lock on the queue before doing decompression work.
+ lock.unlock();
+
+ ResizeableBuffer buffer;
+ buffer.resize(kBufSize);
+
+ const size_t bytes_read =
+ decoder_.Read(buffer.begin(), buffer.end());
+ buffer.resize(bytes_read);
+
+ // Relock the queue and move the new buffer to the end. This should
+ // be fast. We also need to stay locked when we wait().
+ lock.lock();
+ if (bytes_read > 0) {
+ decoded_queue_.emplace_back(std::move(buffer));
+ } else {
+ finished_ = true;
+ }
+
+ // If we've filled the queue or are out of data, go back to sleep.
+ if (decoded_queue_.size() >= kQueueSize || finished_) {
+ break;
+ }
+ }
+
+ // Notify main thread in case it was waiting for us to queue more
+ // data.
+ queue_filled_.notify_one();
+ }
+ }) {}
+
+ThreadedLzmaDecoder::~ThreadedLzmaDecoder() {
+ // Wake up decode thread so it can return.
+ {
+ std::scoped_lock lock(decode_mutex_);
+ finished_ = true;
+ }
+ continue_decoding_.notify_one();
+ decode_thread_.join();
+}
+
+size_t ThreadedLzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
+ std::unique_lock lock(decode_mutex_);
+
+ // Strip any empty buffers
+ for (auto iter = decoded_queue_.begin(); iter != decoded_queue_.end();) {
+ if (iter->size() == 0) {
+ iter = decoded_queue_.erase(iter);
+ } else {
+ ++iter;
+ }
+ }
+
+ // If the queue is empty, sleep until the decoder thread has produced another
+ // buffer.
+ if (decoded_queue_.empty()) {
+ continue_decoding_.notify_one();
+ queue_filled_.wait(lock,
+ [this] { return finished_ || !decoded_queue_.empty(); });
+ if (finished_ && decoded_queue_.empty()) {
+ return 0;
+ }
+ }
+ // Sanity check if the queue is empty and we're not finished.
+ CHECK(!decoded_queue_.empty()) << "Decoded queue unexpectedly empty";
+
+ ResizeableBuffer &front_buffer = decoded_queue_.front();
+
+ // Copy some data from our working buffer to the requested destination.
+ const std::size_t bytes_requested = end - begin;
+ const std::size_t bytes_to_copy =
+ std::min(bytes_requested, front_buffer.size());
+ memcpy(begin, front_buffer.data(), bytes_to_copy);
+ front_buffer.erase_front(bytes_to_copy);
+
+ // Ensure the decoding thread wakes up if the queue isn't full.
+ if (!finished_ && decoded_queue_.size() < kQueueSize) {
+ continue_decoding_.notify_one();
+ }
+
+ return bytes_to_copy;
+}
+
} // namespace aos::logger
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
index 919f5fa..972ed6c 100644
--- a/aos/events/logging/lzma_encoder.h
+++ b/aos/events/logging/lzma_encoder.h
@@ -1,13 +1,16 @@
#ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
#define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
-#include "absl/types/span.h"
-#include "flatbuffers/flatbuffers.h"
-#include "lzma.h"
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+#include "absl/types/span.h"
#include "aos/containers/resizeable_buffer.h"
#include "aos/events/logging/buffer_encoder.h"
#include "aos/events/logging/logger_generated.h"
+#include "flatbuffers/flatbuffers.h"
+#include "lzma.h"
namespace aos::logger {
@@ -78,6 +81,38 @@
std::string filename_;
};
+// Decompresses data with liblzma in a new thread, up to a maximum queue
+// size. Calls to Read() will return data from the queue if available,
+// or block until more data is queued or the stream finishes.
+class ThreadedLzmaDecoder : public DataDecoder {
+ public:
+ explicit ThreadedLzmaDecoder(std::string_view filename);
+ ThreadedLzmaDecoder(const ThreadedLzmaDecoder &) = delete;
+ ThreadedLzmaDecoder &operator=(const ThreadedLzmaDecoder &) = delete;
+
+ ~ThreadedLzmaDecoder();
+
+ size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+ static constexpr size_t kBufSize{256 * 1024};
+ static constexpr size_t kQueueSize{8};
+
+ LzmaDecoder decoder_;
+
+ // Queue of decompressed data to return on calls to Read
+ std::vector<ResizeableBuffer> decoded_queue_;
+
+ // Mutex to control access to decoded_queue_.
+ std::mutex decode_mutex_;
+ std::condition_variable continue_decoding_;
+ std::condition_variable queue_filled_;
+
+ bool finished_ = false;
+
+ std::thread decode_thread_;
+};
+
} // namespace aos::logger
#endif // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
diff --git a/aos/events/logging/lzma_encoder_test.cc b/aos/events/logging/lzma_encoder_test.cc
index 63ed6c2..bbd0c60 100644
--- a/aos/events/logging/lzma_encoder_test.cc
+++ b/aos/events/logging/lzma_encoder_test.cc
@@ -17,6 +17,16 @@
}),
::testing::Range(0, 100)));
+INSTANTIATE_TEST_SUITE_P(
+ LzmaThreaded, BufferEncoderTest,
+ ::testing::Combine(::testing::Values([]() {
+ return std::make_unique<LzmaEncoder>(2);
+ }),
+ ::testing::Values([](std::string_view filename) {
+ return std::make_unique<ThreadedLzmaDecoder>(filename);
+ }),
+ ::testing::Range(0, 100)));
+
// Tests that we return as much of the file as we can read if the end is
// corrupted.
TEST_F(BufferEncoderBaseTest, CorruptedBuffer) {
diff --git a/aos/events/pingpong_test.cc b/aos/events/pingpong_test.cc
index 07340f7..9c874ef 100644
--- a/aos/events/pingpong_test.cc
+++ b/aos/events/pingpong_test.cc
@@ -79,5 +79,93 @@
EXPECT_EQ(ping_count, 1001);
}
+// Multi-node ping pong test. This test carefully mirrors the structure of the
+// single node test above to help highlight the similarities and differences.
+class MultiNodePingPongTest : public ::testing::Test {
+ public:
+ MultiNodePingPongTest()
+ : config_(aos::configuration::ReadConfig(ArtifactPath(
+ "aos/events/multinode_pingpong_test_split_config.json"))),
+ event_loop_factory_(&config_.message()),
+ pi1_(
+ configuration::GetNode(event_loop_factory_.configuration(), "pi1")),
+ pi2_(
+ configuration::GetNode(event_loop_factory_.configuration(), "pi2")),
+ ping_event_loop_(event_loop_factory_.MakeEventLoop("ping", pi1_)),
+ ping_(ping_event_loop_.get()),
+ pong_event_loop_(event_loop_factory_.MakeEventLoop("pong", pi2_)),
+ pong_(pong_event_loop_.get()) {}
+
+ // Config and factory.
+ // The factory is a factory for connected event loops. Each application needs
+ // a separate event loop (because you can't send a message to yourself in a
+ // single event loop). The created event loops can then send messages to each
+ // other and trigger callbacks to be called, or fetchers to receive data.
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+ SimulatedEventLoopFactory event_loop_factory_;
+
+ // Convenience pointers for each Node.
+ const Node *pi1_;
+ const Node *pi2_;
+
+ // Event loop and app for Ping
+ std::unique_ptr<EventLoop> ping_event_loop_;
+ Ping ping_;
+
+ // Event loop and app for Pong
+ std::unique_ptr<EventLoop> pong_event_loop_;
+ Pong pong_;
+};
+
+// Tests that the number of pong messages matches the number of ping messages
+// (on both nodes this time)
+TEST_F(MultiNodePingPongTest, AlwaysReplies) {
+ // For grins, test that ping and pong appear on both nodes and match.
+ std::unique_ptr<EventLoop> pi1_test_event_loop =
+ event_loop_factory_.MakeEventLoop("test", pi1_);
+ std::unique_ptr<EventLoop> pi2_test_event_loop =
+ event_loop_factory_.MakeEventLoop("test", pi2_);
+
+ int pi1_ping_count = 0;
+ int pi2_ping_count = 0;
+ int pi1_pong_count = 0;
+ int pi2_pong_count = 0;
+
+ // Confirm that the ping value matches on both nodes.
+ pi1_test_event_loop->MakeWatcher(
+ "/test", [&pi1_ping_count](const examples::Ping &ping) {
+ EXPECT_EQ(ping.value(), pi1_ping_count + 1);
+ ++pi1_ping_count;
+ });
+ pi2_test_event_loop->MakeWatcher(
+ "/test", [&pi2_ping_count](const examples::Ping &ping) {
+ EXPECT_EQ(ping.value(), pi2_ping_count + 1);
+ ++pi2_ping_count;
+ });
+
+ // Confirm that the ping and pong counts both match, and the value also
+ // matches.
+ pi2_test_event_loop->MakeWatcher(
+ "/test", [&pi2_pong_count, &pi2_ping_count](const examples::Pong &pong) {
+ EXPECT_EQ(pong.value(), pi2_pong_count + 1);
+ ++pi2_pong_count;
+ EXPECT_EQ(pi2_ping_count, pi2_pong_count);
+ });
+ pi1_test_event_loop->MakeWatcher(
+ "/test", [&pi1_pong_count, &pi1_ping_count](const examples::Pong &pong) {
+ EXPECT_EQ(pong.value(), pi1_pong_count + 1);
+ ++pi1_pong_count;
+ EXPECT_EQ(pi1_ping_count, pi1_pong_count);
+ });
+
+ // Since forwarding takes "time", we need to run a bit longer to let pong
+ // finish the last cycle.
+ event_loop_factory_.RunFor(chrono::seconds(10) + chrono::milliseconds(5));
+
+ // We run at t=0 and t=10 seconds, which means we run 1 extra time.
+ EXPECT_EQ(pi1_ping_count, 1001);
+ EXPECT_EQ(pi2_ping_count, 1001);
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 63e1cb9..c70fef6 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -372,7 +372,7 @@
queue_index.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
- &context_.remote_boot_uuid, &context_.size, copy_buffer);
+ &context_.source_boot_uuid, &context_.size, copy_buffer);
if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
if (pin_data()) {
@@ -477,9 +477,13 @@
simple_shm_fetcher_.RetrieveData();
}
- ~ShmFetcher() { context_.data = nullptr; }
+ ~ShmFetcher() override {
+ shm_event_loop()->CheckCurrentThread();
+ context_.data = nullptr;
+ }
std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
+ shm_event_loop()->CheckCurrentThread();
if (simple_shm_fetcher_.FetchNext()) {
context_ = simple_shm_fetcher_.context();
return std::make_pair(true, monotonic_clock::now());
@@ -488,6 +492,7 @@
}
std::pair<bool, monotonic_clock::time_point> DoFetch() override {
+ shm_event_loop()->CheckCurrentThread();
if (simple_shm_fetcher_.Fetch()) {
context_ = simple_shm_fetcher_.context();
return std::make_pair(true, monotonic_clock::now());
@@ -500,6 +505,10 @@
}
private:
+ const ShmEventLoop *shm_event_loop() const {
+ return static_cast<const ShmEventLoop *>(event_loop());
+ }
+
SimpleShmFetcher simple_shm_fetcher_;
};
@@ -517,7 +526,7 @@
channel)),
wake_upper_(lockless_queue_memory_.queue()) {}
- ~ShmSender() override {}
+ ~ShmSender() override { shm_event_loop()->CheckCurrentThread(); }
static ipc_lib::LocklessQueueSender VerifySender(
std::optional<ipc_lib::LocklessQueueSender> sender,
@@ -530,24 +539,32 @@
<< ", too many senders.";
}
- void *data() override { return lockless_queue_sender_.Data(); }
- size_t size() override { return lockless_queue_sender_.size(); }
+ void *data() override {
+ shm_event_loop()->CheckCurrentThread();
+ return lockless_queue_sender_.Data();
+ }
+ size_t size() override {
+ shm_event_loop()->CheckCurrentThread();
+ return lockless_queue_sender_.size();
+ }
bool DoSend(size_t length,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
- const UUID &remote_boot_uuid) override {
+ const UUID &source_boot_uuid) override {
+ shm_event_loop()->CheckCurrentThread();
CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
<< ": Sent too big a message on "
<< configuration::CleanedChannelToString(channel());
CHECK(lockless_queue_sender_.Send(length, monotonic_remote_time,
realtime_remote_time, remote_queue_index,
- remote_boot_uuid, &monotonic_sent_time_,
+ source_boot_uuid, &monotonic_sent_time_,
&realtime_sent_time_, &sent_queue_index_))
<< ": Somebody wrote outside the buffer of their message on channel "
<< configuration::CleanedChannelToString(channel());
- wake_upper_.Wakeup(event_loop()->priority());
+ wake_upper_.Wakeup(event_loop()->is_running() ? event_loop()->priority()
+ : 0);
return true;
}
@@ -555,17 +572,19 @@
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
- const UUID &remote_boot_uuid) override {
+ const UUID &source_boot_uuid) override {
+ shm_event_loop()->CheckCurrentThread();
CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
<< ": Sent too big a message on "
<< configuration::CleanedChannelToString(channel());
CHECK(lockless_queue_sender_.Send(
reinterpret_cast<const char *>(msg), length, monotonic_remote_time,
- realtime_remote_time, remote_queue_index, remote_boot_uuid,
+ realtime_remote_time, remote_queue_index, source_boot_uuid,
&monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_))
<< ": Somebody wrote outside the buffer of their message on channel "
<< configuration::CleanedChannelToString(channel());
- wake_upper_.Wakeup(event_loop()->priority());
+ wake_upper_.Wakeup(event_loop()->is_running() ? event_loop()->priority()
+ : 0);
// TODO(austin): Return an error if we send too fast.
return true;
}
@@ -574,9 +593,16 @@
return lockless_queue_memory_.GetMutableSharedMemory();
}
- int buffer_index() override { return lockless_queue_sender_.buffer_index(); }
+ int buffer_index() override {
+ shm_event_loop()->CheckCurrentThread();
+ return lockless_queue_sender_.buffer_index();
+ }
private:
+ const ShmEventLoop *shm_event_loop() const {
+ return static_cast<const ShmEventLoop *>(event_loop());
+ }
+
MMappedQueue lockless_queue_memory_;
ipc_lib::LocklessQueueSender lockless_queue_sender_;
ipc_lib::LocklessQueueWakeUpper wake_upper_;
@@ -599,9 +625,13 @@
}
}
- ~ShmWatcherState() override { event_loop_->RemoveEvent(&event_); }
+ ~ShmWatcherState() override {
+ event_loop_->CheckCurrentThread();
+ event_loop_->RemoveEvent(&event_);
+ }
void Startup(EventLoop *event_loop) override {
+ event_loop_->CheckCurrentThread();
simple_shm_fetcher_.PointAtNextQueueIndex();
CHECK(RegisterWakeup(event_loop->priority()));
}
@@ -666,6 +696,7 @@
}
~ShmTimerHandler() {
+ shm_event_loop_->CheckCurrentThread();
Disable();
shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
}
@@ -705,6 +736,7 @@
void Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) override {
+ shm_event_loop_->CheckCurrentThread();
if (event_.valid()) {
shm_event_loop_->RemoveEvent(&event_);
}
@@ -717,6 +749,7 @@
}
void Disable() override {
+ shm_event_loop_->CheckCurrentThread();
shm_event_loop_->RemoveEvent(&event_);
timerfd_.Disable();
disabled_ = true;
@@ -766,6 +799,7 @@
}
~ShmPhasedLoopHandler() override {
+ shm_event_loop_->CheckCurrentThread();
shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
shm_event_loop_->RemoveEvent(&event_);
}
@@ -773,6 +807,7 @@
private:
// Reschedules the timer.
void Schedule(monotonic_clock::time_point sleep_time) override {
+ shm_event_loop_->CheckCurrentThread();
if (event_.valid()) {
shm_event_loop_->RemoveEvent(&event_);
}
@@ -792,6 +827,7 @@
::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
const Channel *channel) {
+ CheckCurrentThread();
if (!configuration::ChannelIsReadableOnNode(channel, node())) {
LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
<< "\", \"type\": \"" << channel->type()->string_view()
@@ -805,6 +841,7 @@
::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
const Channel *channel) {
+ CheckCurrentThread();
TakeSender(channel);
return ::std::unique_ptr<RawSender>(new ShmSender(shm_base_, this, channel));
@@ -813,6 +850,7 @@
void ShmEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &context, const void *message)> watcher) {
+ CheckCurrentThread();
TakeWatcher(channel);
NewWatcher(::std::unique_ptr<WatcherState>(
@@ -822,6 +860,7 @@
void ShmEventLoop::MakeRawNoArgWatcher(
const Channel *channel,
std::function<void(const Context &context)> watcher) {
+ CheckCurrentThread();
TakeWatcher(channel);
NewWatcher(::std::unique_ptr<WatcherState>(new ShmWatcherState(
@@ -831,6 +870,7 @@
}
TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
+ CheckCurrentThread();
return NewTimer(::std::unique_ptr<TimerHandler>(
new ShmTimerHandler(this, ::std::move(callback))));
}
@@ -839,14 +879,28 @@
::std::function<void(int)> callback,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset) {
+ CheckCurrentThread();
return NewPhasedLoop(::std::unique_ptr<PhasedLoopHandler>(
new ShmPhasedLoopHandler(this, ::std::move(callback), interval, offset)));
}
void ShmEventLoop::OnRun(::std::function<void()> on_run) {
+ CheckCurrentThread();
on_run_.push_back(::std::move(on_run));
}
+void ShmEventLoop::CheckCurrentThread() const {
+ if (__builtin_expect(check_mutex_ != nullptr, false)) {
+ CHECK(check_mutex_->is_locked())
+ << ": The configured mutex is not locked while calling a "
+ "ShmEventLoop function";
+ }
+ if (__builtin_expect(!!check_tid_, false)) {
+ CHECK_EQ(syscall(SYS_gettid), *check_tid_)
+ << ": Being called from the wrong thread";
+ }
+}
+
// This is a bit tricky because watchers can generate new events at any time (as
// long as it's in the past). We want to check the watchers at least once before
// declaring there are no events to handle, and we want to check them again if
@@ -1021,6 +1075,7 @@
};
void ShmEventLoop::Run() {
+ CheckCurrentThread();
SignalHandler::global()->Register(this);
if (watchers_.size() > 0) {
@@ -1063,6 +1118,7 @@
}
// Now that we are RT, run all the OnRun handlers.
+ SetTimerContext(monotonic_clock::now());
for (const auto &run : on_run_) {
run();
}
@@ -1100,6 +1156,7 @@
void ShmEventLoop::Exit() { epoll_.Quit(); }
ShmEventLoop::~ShmEventLoop() {
+ CheckCurrentThread();
// Force everything with a registered fd with epoll to be destroyed now.
timers_.clear();
phased_loops_.clear();
@@ -1109,6 +1166,7 @@
}
void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
+ CheckCurrentThread();
if (is_running()) {
LOG(FATAL) << "Cannot set realtime priority while running.";
}
@@ -1116,6 +1174,7 @@
}
void ShmEventLoop::SetRuntimeAffinity(const cpu_set_t &cpuset) {
+ CheckCurrentThread();
if (is_running()) {
LOG(FATAL) << "Cannot set affinity while running.";
}
@@ -1123,18 +1182,21 @@
}
void ShmEventLoop::set_name(const std::string_view name) {
+ CheckCurrentThread();
name_ = std::string(name);
UpdateTimingReport();
}
absl::Span<const char> ShmEventLoop::GetWatcherSharedMemory(
const Channel *channel) {
+ CheckCurrentThread();
ShmWatcherState *const watcher_state =
static_cast<ShmWatcherState *>(GetWatcherState(channel));
return watcher_state->GetSharedMemory();
}
int ShmEventLoop::NumberBuffers(const Channel *channel) {
+ CheckCurrentThread();
return MakeQueueConfiguration(
channel, chrono::ceil<chrono::seconds>(chrono::nanoseconds(
configuration()->channel_storage_duration())))
@@ -1143,14 +1205,19 @@
absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
const aos::RawSender *sender) const {
+ CheckCurrentThread();
return static_cast<const ShmSender *>(sender)->GetSharedMemory();
}
absl::Span<const char> ShmEventLoop::GetShmFetcherPrivateMemory(
const aos::RawFetcher *fetcher) const {
+ CheckCurrentThread();
return static_cast<const ShmFetcher *>(fetcher)->GetPrivateMemory();
}
-pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
+pid_t ShmEventLoop::GetTid() {
+ CheckCurrentThread();
+ return syscall(SYS_gettid);
+}
} // namespace aos
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index 845857c..3245f11 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -4,11 +4,11 @@
#include <vector>
#include "absl/types/span.h"
-
#include "aos/events/epoll.h"
#include "aos/events/event_loop.h"
#include "aos/events/event_loop_generated.h"
#include "aos/ipc_lib/signalfd.h"
+#include "aos/stl_mutex/stl_mutex.h"
DECLARE_string(application_name);
DECLARE_string(shm_base);
@@ -92,6 +92,7 @@
// Returns the local mapping of the shared memory used by the provided Sender.
template <typename T>
absl::Span<char> GetSenderSharedMemory(aos::Sender<T> *sender) const {
+ CheckCurrentThread();
return GetShmSenderSharedMemory(GetRawSender(sender));
}
@@ -103,11 +104,30 @@
template <typename T>
absl::Span<const char> GetFetcherPrivateMemory(
aos::Fetcher<T> *fetcher) const {
+ CheckCurrentThread();
return GetShmFetcherPrivateMemory(GetRawFetcher(fetcher));
}
int NumberBuffers(const Channel *channel) override;
+ // All public-facing APIs will verify this mutex is held when they are called.
+ // For normal use with everything in a single thread, this is unnecessary.
+ //
+ // This is helpful as a safety check when using a ShmEventLoop with external
+ // synchronization across multiple threads. It will NOT reliably catch race
+ // conditions, but if you have a race condition triggered repeatedly it'll
+ // probably catch it eventually.
+ void CheckForMutex(aos::stl_mutex *check_mutex) {
+ check_mutex_ = check_mutex;
+ }
+
+ // All public-facing APIs will verify they are called in this thread.
+ // For normal use with the whole program in a single thread, this is
+ // unnecessary. It's helpful as a safety check for programs with multiple
+ // threads, where the EventLoop should only be interacted with from a single
+ // one.
+ void LockToThread() { check_tid_ = GetTid(); }
+
private:
friend class shm_event_loop_internal::ShmWatcherState;
friend class shm_event_loop_internal::ShmTimerHandler;
@@ -126,6 +146,8 @@
return result;
}
+ void CheckCurrentThread() const;
+
void HandleEvent();
// Returns the TID of the event loop.
@@ -151,6 +173,9 @@
std::string name_;
const Node *const node_;
+ aos::stl_mutex *check_mutex_ = nullptr;
+ std::optional<pid_t> check_tid_;
+
internal::EPoll epoll_;
// Only set during Run().
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index 5524597..4c12213 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -3,13 +3,12 @@
#include <string_view>
#include "aos/events/event_loop_param_test.h"
+#include "aos/events/test_message_generated.h"
+#include "aos/network/team_number.h"
#include "aos/realtime.h"
#include "glog/logging.h"
#include "gtest/gtest.h"
-#include "aos/events/test_message_generated.h"
-#include "aos/network/team_number.h"
-
namespace aos {
namespace testing {
namespace {
@@ -123,6 +122,36 @@
using ShmEventLoopDeathTest = ShmEventLoopTest;
+// Tests that we don't leave the calling thread realtime when calling Send
+// before Run.
+TEST_P(ShmEventLoopTest, SendBeforeRun) {
+ auto loop = factory()->MakePrimary("primary");
+ loop->SetRuntimeRealtimePriority(1);
+
+ auto loop2 = factory()->Make("loop2");
+ loop2->SetRuntimeRealtimePriority(2);
+ loop2->MakeWatcher("/test", [](const TestMessage &) {});
+ // Need the other one running for its watcher to record in SHM that it wants
+ // wakers to boost their priority, so leave it running in a thread for this
+ // test.
+ std::thread loop2_thread(
+ [&loop2]() { static_cast<ShmEventLoop *>(loop2.get())->Run(); });
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ auto sender = loop->MakeSender<TestMessage>("/test");
+ EXPECT_FALSE(IsRealtime());
+ {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ msg.Send(builder.Finish());
+ }
+ EXPECT_FALSE(IsRealtime());
+
+ static_cast<ShmEventLoop *>(loop2.get())->Exit();
+ loop2_thread.join();
+}
+
// Tests that every handler type is realtime and runs. There are threads
// involved and it's easy to miss one.
TEST_P(ShmEventLoopTest, AllHandlersAreRealtime) {
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 51021b6..4dc37fa 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -295,13 +295,13 @@
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
- const UUID &remote_boot_uuid) override;
+ const UUID &source_boot_uuid) override;
bool DoSend(const void *msg, size_t size,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
- const UUID &remote_boot_uuid) override;
+ const UUID &source_boot_uuid) override;
int buffer_index() override {
// First, ensure message_ is allocated.
@@ -542,6 +542,7 @@
CHECK(!is_running()) << ": Cannot register OnRun callback while running.";
scheduler_->ScheduleOnRun([this, on_run = std::move(on_run)]() {
ScopedMarkRealtimeRestorer rt(priority() > 0);
+ SetTimerContext(monotonic_now());
on_run();
});
}
@@ -852,7 +853,7 @@
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
- const UUID &remote_boot_uuid) {
+ const UUID &source_boot_uuid) {
// The allocations in here are due to infrastructure and don't count in the
// no mallocs in RT code.
ScopedNotRealtime nrt;
@@ -862,7 +863,7 @@
message_->context.remote_queue_index = remote_queue_index;
message_->context.realtime_event_time = event_loop_->realtime_now();
message_->context.realtime_remote_time = realtime_remote_time;
- message_->context.remote_boot_uuid = remote_boot_uuid;
+ message_->context.source_boot_uuid = source_boot_uuid;
CHECK_LE(length, message_->context.size);
message_->context.size = length;
@@ -882,7 +883,7 @@
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
- const UUID &remote_boot_uuid) {
+ const UUID &source_boot_uuid) {
CHECK_LE(size, this->size())
<< ": Attempting to send too big a message on "
<< configuration::CleanedChannelToString(simulated_channel_->channel());
@@ -899,7 +900,7 @@
msg, size);
return DoSend(size, monotonic_remote_time, realtime_remote_time,
- remote_queue_index, remote_boot_uuid);
+ remote_queue_index, source_boot_uuid);
}
SimulatedTimerHandler::SimulatedTimerHandler(
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 7191365..4cb51de 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -1492,13 +1492,13 @@
pi1_remote_timestamp->MakeWatcher(
"/pi2/aos", [&expected_boot_uuid,
&pi1_remote_timestamp](const message_bridge::Timestamp &) {
- EXPECT_EQ(pi1_remote_timestamp->context().remote_boot_uuid,
+ EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
expected_boot_uuid);
});
pi1_remote_timestamp->MakeWatcher(
"/test",
[&expected_boot_uuid, &pi1_remote_timestamp](const examples::Pong &) {
- EXPECT_EQ(pi1_remote_timestamp->context().remote_boot_uuid,
+ EXPECT_EQ(pi1_remote_timestamp->context().source_boot_uuid,
expected_boot_uuid);
});
pi1_remote_timestamp->MakeWatcher(
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 40b468f..11e8c56 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -137,7 +137,7 @@
fetcher_->context().monotonic_event_time,
fetcher_->context().realtime_event_time,
fetcher_->context().queue_index,
- fetcher_->context().remote_boot_uuid);
+ fetcher_->context().source_boot_uuid);
// And simulate message_bridge's offset recovery.
client_status_->SampleFilter(client_index_,
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 98701b4..fe86f6c 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -908,7 +908,7 @@
const char *data, size_t length,
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index, const UUID &remote_boot_uuid,
+ uint32_t remote_queue_index, const UUID &source_boot_uuid,
monotonic_clock::time_point *monotonic_sent_time,
realtime_clock::time_point *realtime_sent_time, uint32_t *queue_index) {
CHECK_LE(length, size());
@@ -917,14 +917,14 @@
// adhere to this convention and place it at the end.
memcpy((reinterpret_cast<char *>(Data()) + size() - length), data, length);
return Send(length, monotonic_remote_time, realtime_remote_time,
- remote_queue_index, remote_boot_uuid, monotonic_sent_time,
+ remote_queue_index, source_boot_uuid, monotonic_sent_time,
realtime_sent_time, queue_index);
}
bool LocklessQueueSender::Send(
size_t length, monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index, const UUID &remote_boot_uuid,
+ uint32_t remote_queue_index, const UUID &source_boot_uuid,
monotonic_clock::time_point *monotonic_sent_time,
realtime_clock::time_point *realtime_sent_time, uint32_t *queue_index) {
const size_t queue_size = memory_->queue_size();
@@ -949,7 +949,7 @@
// Pass these through. Any alternative behavior can be implemented out a
// layer.
message->header.remote_queue_index = remote_queue_index;
- message->header.remote_boot_uuid = remote_boot_uuid;
+ message->header.source_boot_uuid = source_boot_uuid;
message->header.monotonic_remote_time = monotonic_remote_time;
message->header.realtime_remote_time = realtime_remote_time;
@@ -1210,7 +1210,7 @@
realtime_clock::time_point *realtime_sent_time,
monotonic_clock::time_point *monotonic_remote_time,
realtime_clock::time_point *realtime_remote_time,
- uint32_t *remote_queue_index, UUID *remote_boot_uuid, size_t *length,
+ uint32_t *remote_queue_index, UUID *source_boot_uuid, size_t *length,
char *data) const {
const size_t queue_size = memory_->queue_size();
@@ -1294,7 +1294,7 @@
}
*monotonic_remote_time = m->header.monotonic_remote_time;
*realtime_remote_time = m->header.realtime_remote_time;
- *remote_boot_uuid = m->header.remote_boot_uuid;
+ *source_boot_uuid = m->header.source_boot_uuid;
if (data) {
memcpy(data, m->data(memory_->message_data_size()),
memory_->message_data_size());
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 41aa0fb..b0fc425 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -94,7 +94,7 @@
uint32_t remote_queue_index;
// Remote boot UUID for this message.
- UUID remote_boot_uuid;
+ UUID source_boot_uuid;
size_t length;
} header;
@@ -309,7 +309,7 @@
void *Data();
bool Send(size_t length, monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index, const UUID &remote_boot_uuid,
+ uint32_t remote_queue_index, const UUID &source_boot_uuid,
monotonic_clock::time_point *monotonic_sent_time = nullptr,
realtime_clock::time_point *realtime_sent_time = nullptr,
uint32_t *queue_index = nullptr);
@@ -318,7 +318,7 @@
bool Send(const char *data, size_t length,
monotonic_clock::time_point monotonic_remote_time,
realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index, const UUID &remote_boot_uuid,
+ uint32_t remote_queue_index, const UUID &source_boot_uuid,
monotonic_clock::time_point *monotonic_sent_time = nullptr,
realtime_clock::time_point *realtime_sent_time = nullptr,
uint32_t *queue_index = nullptr);
@@ -404,7 +404,7 @@
realtime_clock::time_point *realtime_sent_time,
monotonic_clock::time_point *monotonic_remote_time,
realtime_clock::time_point *realtime_remote_time,
- uint32_t *remote_queue_index, UUID *remote_boot_uuid,
+ uint32_t *remote_queue_index, UUID *source_boot_uuid,
size_t *length, char *data) const;
// Returns the index to the latest queue message. Returns empty_queue_index()
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index b521d9e..ae780b8 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -622,14 +622,14 @@
monotonic_clock::time_point monotonic_remote_time;
realtime_clock::time_point realtime_remote_time;
uint32_t remote_queue_index;
- UUID remote_boot_uuid;
+ UUID source_boot_uuid;
char read_data[1024];
size_t length;
LocklessQueueReader::Result read_result = reader.Read(
i, &monotonic_sent_time, &realtime_sent_time,
&monotonic_remote_time, &realtime_remote_time,
- &remote_queue_index, &remote_boot_uuid, &length, &(read_data[0]));
+ &remote_queue_index, &source_boot_uuid, &length, &(read_data[0]));
if (read_result != LocklessQueueReader::Result::GOOD) {
if (read_result == LocklessQueueReader::Result::TOO_OLD) {
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index 91f995b..57867e4 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -251,7 +251,7 @@
monotonic_clock::time_point monotonic_remote_time;
realtime_clock::time_point realtime_remote_time;
uint32_t remote_queue_index;
- UUID remote_boot_uuid;
+ UUID source_boot_uuid;
char read_data[1024];
size_t length;
@@ -264,7 +264,7 @@
LocklessQueueReader::Result read_result = reader.Read(
index.index(), &monotonic_sent_time, &realtime_sent_time,
&monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
- &remote_boot_uuid, &length, &(read_data[0]));
+ &source_boot_uuid, &length, &(read_data[0]));
// This should either return GOOD, or TOO_OLD if it is before the start of
// the queue.
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index cf46807..d14a638 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -267,7 +267,7 @@
realtime_clock::time_point realtime_sent_time;
monotonic_clock::time_point monotonic_remote_time;
realtime_clock::time_point realtime_remote_time;
- UUID remote_boot_uuid;
+ UUID source_boot_uuid;
uint32_t remote_queue_index;
size_t length;
char read_data[1024];
@@ -279,7 +279,7 @@
LocklessQueueReader::Result read_result = reader.Read(
wrapped_i, &monotonic_sent_time, &realtime_sent_time,
&monotonic_remote_time, &realtime_remote_time, &remote_queue_index,
- &remote_boot_uuid, &length, &(read_data[0]));
+ &source_boot_uuid, &length, &(read_data[0]));
if (race_reads) {
if (read_result == LocklessQueueReader::Result::NOTHING_NEW) {
@@ -302,7 +302,7 @@
EXPECT_EQ(monotonic_remote_time, aos::monotonic_clock::min_time);
EXPECT_EQ(realtime_remote_time, aos::realtime_clock::min_time);
- EXPECT_EQ(remote_boot_uuid, UUID::Zero());
+ EXPECT_EQ(source_boot_uuid, UUID::Zero());
ThreadPlusCount tpc;
ASSERT_EQ(length, sizeof(ThreadPlusCount));
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index 08350a8..1044b2f 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -186,24 +186,26 @@
}
void SctpClientConnection::SendConnect() {
+ VLOG(1) << "Sending Connect";
// Try to send the connect message. If that fails, retry.
- if (!client_.Send(kConnectStream(),
- std::string_view(reinterpret_cast<const char *>(
- connect_message_.span().data()),
- connect_message_.span().size()),
- 0)) {
+ if (client_.Send(kConnectStream(),
+ std::string_view(reinterpret_cast<const char *>(
+ connect_message_.span().data()),
+ connect_message_.span().size()),
+ 0)) {
+ ScheduleConnectTimeout();
+ } else {
NodeDisconnected();
}
}
void SctpClientConnection::NodeConnected(sctp_assoc_t assoc_id) {
- connect_timer_->Disable();
+ ScheduleConnectTimeout();
// We want to tell the kernel to schedule the packets on this new stream with
// the priority scheduler. This only needs to be done once per stream.
client_.SetPriorityScheduler(assoc_id);
- remote_assoc_id_ = assoc_id;
connection_->mutate_state(State::CONNECTED);
client_status_->SampleReset(client_index_);
}
@@ -212,13 +214,14 @@
connect_timer_->Setup(
event_loop_->monotonic_now() + chrono::milliseconds(100),
chrono::milliseconds(100));
- remote_assoc_id_ = 0;
connection_->mutate_state(State::DISCONNECTED);
connection_->mutate_monotonic_offset(0);
client_status_->SampleReset(client_index_);
}
void SctpClientConnection::HandleData(const Message *message) {
+ ScheduleConnectTimeout();
+
const RemoteData *remote_data =
flatbuffers::GetSizePrefixedRoot<RemoteData>(message->data());
diff --git a/aos/network/message_bridge_client_lib.h b/aos/network/message_bridge_client_lib.h
index 2b48906..0552f79 100644
--- a/aos/network/message_bridge_client_lib.h
+++ b/aos/network/message_bridge_client_lib.h
@@ -53,6 +53,14 @@
void NodeDisconnected();
void HandleData(const Message *message);
+ // Schedules connect_timer_ for a ways in the future. If one of our messages
+ // gets dropped, the server might be waiting for this, so if we don't hear
+ // from the server for a while we'll try sending it again.
+ void ScheduleConnectTimeout() {
+ connect_timer_->Setup(event_loop_->context().monotonic_event_time +
+ std::chrono::seconds(1));
+ }
+
// Event loop to register the server on.
aos::ShmEventLoop *const event_loop_;
@@ -81,10 +89,6 @@
// Timer which fires to handle reconnections.
aos::TimerHandler *connect_timer_;
- // id of the server once known. This is only valid if connection_ says
- // connected.
- sctp_assoc_t remote_assoc_id_ = 0;
-
// ClientConnection statistics message to modify. This will be published
// periodicially.
MessageBridgeClientStatus *client_status_;
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index ea75693..c929e31 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -40,7 +40,7 @@
context.size);
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
- context.remote_boot_uuid.PackVector(&fbb);
+ context.source_boot_uuid.PackVector(&fbb);
RemoteData::Builder remote_data_builder(fbb);
remote_data_builder.add_channel_index(channel_index_);
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index 48b442a..a662990 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -359,7 +359,7 @@
context.realtime_event_time = timestamp_sender_.realtime_sent_time();
context.queue_index = timestamp_sender_.sent_queue_index();
context.size = timestamp_copy.span().size();
- context.remote_boot_uuid = event_loop_->boot_uuid();
+ context.source_boot_uuid = event_loop_->boot_uuid();
context.data = timestamp_copy.span().data();
// Since we are building up the timestamp to send here, we need to trigger the
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 1953dc2..8222760 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -160,7 +160,7 @@
pi1_test_event_loop->MakeWatcher(
"/pi2/aos", [this](const Timestamp ×tamp) {
VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(×tamp);
- EXPECT_EQ(pi1_test_event_loop->context().remote_boot_uuid,
+ EXPECT_EQ(pi1_test_event_loop->context().source_boot_uuid,
pi2_boot_uuid_);
});
}
@@ -275,7 +275,7 @@
pi2_test_event_loop->MakeWatcher(
"/pi1/aos", [this](const Timestamp ×tamp) {
VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(×tamp);
- EXPECT_EQ(pi2_test_event_loop->context().remote_boot_uuid,
+ EXPECT_EQ(pi2_test_event_loop->context().source_boot_uuid,
pi1_boot_uuid_);
});
pi2_test_event_loop->MakeWatcher(
@@ -405,7 +405,7 @@
int pong_count = 0;
pong_event_loop.MakeWatcher("/test", [&pong_count, &pong_event_loop,
this](const examples::Ping &ping) {
- EXPECT_EQ(pong_event_loop.context().remote_boot_uuid, pi1_boot_uuid_);
+ EXPECT_EQ(pong_event_loop.context().source_boot_uuid, pi1_boot_uuid_);
++pong_count;
VLOG(1) << "Got ping back " << FlatbufferToJson(&ping);
});
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index a43fdfd..ebb7a19 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -353,11 +353,26 @@
CHECK(!times_.empty())
<< ": Found no times to do timestamp estimation, please investigate.";
+}
+
+void InterpolatedTimeConverter::ObserveTimePassed(
+ distributed_clock::time_point time) {
// Keep at least 500 points and time_estimation_buffer_seconds seconds of
// time. This should be enough to handle any reasonable amount of history.
- while (times_.size() > kHistoryMinCount &&
- std::get<0>(times_.front()) + time_estimation_buffer_seconds_ <
- std::get<0>(times_.back())) {
+ while (true) {
+ if (times_.size() < kHistoryMinCount) {
+ return;
+ }
+ if (std::get<0>(times_[1]) + time_estimation_buffer_seconds_ > time) {
+ VLOG(1) << "Not popping because "
+ << std::get<0>(times_[1]) + time_estimation_buffer_seconds_
+ << " > " << time;
+ return;
+ }
+
+ VLOG(1) << "Popping sample because " << times_.size() << " > "
+ << kHistoryMinCount << " && " << std::get<0>(times_[1]) << " < "
+ << time - time_estimation_buffer_seconds_;
times_.pop_front();
have_popped_ = true;
}
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index 929c837..96f7228 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -177,6 +177,9 @@
monotonic_clock::time_point FromDistributedClock(
size_t node_index, distributed_clock::time_point time) override;
+ // Called whenever time passes this point and we can forget about it.
+ void ObserveTimePassed(distributed_clock::time_point time) override;
+
private:
// Returns the next timestamp, or nullopt if there isn't one. It is assumed
// that if there isn't one, there never will be one.
diff --git a/aos/network/multinode_timestamp_filter_test.cc b/aos/network/multinode_timestamp_filter_test.cc
index f1cf0aa..bdc1992 100644
--- a/aos/network/multinode_timestamp_filter_test.cc
+++ b/aos/network/multinode_timestamp_filter_test.cc
@@ -223,13 +223,8 @@
EXPECT_EQ(me + chrono::milliseconds(10),
time_converter.FromDistributedClock(0, de + kDt));
- // Force 10.1 seconds now. This will forget the 0th point at the origin.
- EXPECT_EQ(
- de + kDefaultHistoryDuration + kDt,
- time_converter.ToDistributedClock(0, me + kDefaultHistoryDuration + kDt));
- EXPECT_EQ(me + kDefaultHistoryDuration + kDt,
- time_converter.FromDistributedClock(
- 0, de + kDefaultHistoryDuration + kDt));
+ // Now force ourselves to forget.
+ time_converter.ObserveTimePassed(de + kDefaultHistoryDuration + kDt * 3 / 2);
// Yup, can't read the origin anymore.
EXPECT_DEATH({ LOG(INFO) << time_converter.ToDistributedClock(0, me); },
diff --git a/aos/stl_mutex/stl_mutex.h b/aos/stl_mutex/stl_mutex.h
index 86a5988..e3a930e 100644
--- a/aos/stl_mutex/stl_mutex.h
+++ b/aos/stl_mutex/stl_mutex.h
@@ -61,6 +61,11 @@
bool owner_died() const { return owner_died_; }
void consistent() { owner_died_ = false; }
+ // Returns whether this mutex is locked by the current thread. This is very
+ // hard to use reliably, please think very carefully before using it for
+ // anything beyond probabilistic assertion checks.
+ bool is_locked() const { return mutex_islocked(&native_handle_); }
+
private:
aos_mutex native_handle_;
@@ -82,7 +87,7 @@
constexpr stl_recursive_mutex() {}
void lock() {
- if (mutex_islocked(mutex_.native_handle())) {
+ if (mutex_.is_locked()) {
CHECK(!owner_died());
++recursive_locks_;
} else {
@@ -95,7 +100,7 @@
}
}
bool try_lock() {
- if (mutex_islocked(mutex_.native_handle())) {
+ if (mutex_.is_locked()) {
CHECK(!owner_died());
++recursive_locks_;
return true;
diff --git a/frc971/control_loops/drivetrain/drivetrain_test_lib.cc b/frc971/control_loops/drivetrain/drivetrain_test_lib.cc
index fb685f3..7680688 100644
--- a/frc971/control_loops/drivetrain/drivetrain_test_lib.cc
+++ b/frc971/control_loops/drivetrain/drivetrain_test_lib.cc
@@ -122,7 +122,6 @@
dt_config_.make_hybrid_drivetrain_velocity_loop()))) {
Reinitialize();
last_U_.setZero();
-
event_loop_->AddPhasedLoop(
[this](int) {
// Skip this the first time.
@@ -144,10 +143,11 @@
first_ = false;
SendPositionMessage();
SendTruthMessage();
+ SendImuMessage();
},
dt_config_.dt);
-
- event_loop_->AddPhasedLoop([this](int) { SendImuMessage(); },
+ // TODO(milind): We should be able to get IMU readings at 1 kHz instead of 2.
+ event_loop_->AddPhasedLoop([this](int) { ReadImu(); },
std::chrono::microseconds(500));
}
@@ -188,54 +188,78 @@
}
}
-void DrivetrainSimulation::SendImuMessage() {
+void DrivetrainSimulation::ReadImu() {
+ // Don't accumalate readings when we aren't sending them
if (!send_messages_) {
return;
}
- auto builder = imu_sender_.MakeBuilder();
- frc971::ADIS16470DiagStat::Builder diag_stat_builder =
- builder.MakeBuilder<frc971::ADIS16470DiagStat>();
- diag_stat_builder.add_clock_error(false);
- diag_stat_builder.add_memory_failure(imu_faulted_);
- diag_stat_builder.add_sensor_failure(false);
- diag_stat_builder.add_standby_mode(false);
- diag_stat_builder.add_spi_communication_error(false);
- diag_stat_builder.add_flash_memory_update_error(false);
- diag_stat_builder.add_data_path_overrun(false);
-
- const auto diag_stat_offset = diag_stat_builder.Finish();
-
- frc971::IMUValues::Builder imu_builder =
- builder.MakeBuilder<frc971::IMUValues>();
- imu_builder.add_self_test_diag_stat(diag_stat_offset);
const Eigen::Vector3d gyro =
dt_config_.imu_transform.inverse() *
Eigen::Vector3d(0.0, 0.0,
(drivetrain_plant_.X(3, 0) - drivetrain_plant_.X(1, 0)) /
(dt_config_.robot_radius * 2.0));
- imu_builder.add_gyro_x(gyro.x());
- imu_builder.add_gyro_y(gyro.y());
- imu_builder.add_gyro_z(gyro.z());
+
// Acceleration due to gravity, in m/s/s.
constexpr double kG = 9.807;
const Eigen::Vector3d accel =
dt_config_.imu_transform.inverse() *
Eigen::Vector3d(last_acceleration_.x() / kG, last_acceleration_.y() / kG,
1.0);
- imu_builder.add_accelerometer_x(accel.x());
- imu_builder.add_accelerometer_y(accel.y());
- imu_builder.add_accelerometer_z(accel.z());
- imu_builder.add_monotonic_timestamp_ns(
+ const int64_t timestamp =
std::chrono::duration_cast<std::chrono::nanoseconds>(
event_loop_->monotonic_now().time_since_epoch())
- .count());
- flatbuffers::Offset<frc971::IMUValues> imu_values_offsets =
- imu_builder.Finish();
+ .count();
+ imu_readings_.push({.gyro = gyro,
+ .accel = accel,
+ .timestamp = timestamp,
+ .faulted = imu_faulted_});
+}
+
+void DrivetrainSimulation::SendImuMessage() {
+ if (!send_messages_) {
+ return;
+ }
+
+ std::vector<flatbuffers::Offset<IMUValues>> imu_values;
+ auto builder = imu_sender_.MakeBuilder();
+
+ // Send all the IMU readings and pop the ones we have sent
+ while (!imu_readings_.empty()) {
+ const auto imu_reading = imu_readings_.front();
+ imu_readings_.pop();
+
+ frc971::ADIS16470DiagStat::Builder diag_stat_builder =
+ builder.MakeBuilder<frc971::ADIS16470DiagStat>();
+ diag_stat_builder.add_clock_error(false);
+ diag_stat_builder.add_memory_failure(imu_reading.faulted);
+ diag_stat_builder.add_sensor_failure(false);
+ diag_stat_builder.add_standby_mode(false);
+ diag_stat_builder.add_spi_communication_error(false);
+ diag_stat_builder.add_flash_memory_update_error(false);
+ diag_stat_builder.add_data_path_overrun(false);
+
+ const auto diag_stat_offset = diag_stat_builder.Finish();
+
+ frc971::IMUValues::Builder imu_builder =
+ builder.MakeBuilder<frc971::IMUValues>();
+ imu_builder.add_self_test_diag_stat(diag_stat_offset);
+
+ imu_builder.add_gyro_x(imu_reading.gyro.x());
+ imu_builder.add_gyro_y(imu_reading.gyro.y());
+ imu_builder.add_gyro_z(imu_reading.gyro.z());
+
+ imu_builder.add_accelerometer_x(imu_reading.accel.x());
+ imu_builder.add_accelerometer_y(imu_reading.accel.y());
+ imu_builder.add_accelerometer_z(imu_reading.accel.z());
+ imu_builder.add_monotonic_timestamp_ns(imu_reading.timestamp);
+
+ imu_values.push_back(imu_builder.Finish());
+ }
+
flatbuffers::Offset<
flatbuffers::Vector<flatbuffers::Offset<frc971::IMUValues>>>
- imu_values_offset = builder.fbb()->CreateVector(&imu_values_offsets, 1);
-
+ imu_values_offset = builder.fbb()->CreateVector(imu_values);
frc971::IMUValuesBatch::Builder imu_values_batch_builder =
builder.MakeBuilder<frc971::IMUValuesBatch>();
imu_values_batch_builder.add_readings(imu_values_offset);
diff --git a/frc971/control_loops/drivetrain/drivetrain_test_lib.h b/frc971/control_loops/drivetrain/drivetrain_test_lib.h
index 2075e66..b98711b 100644
--- a/frc971/control_loops/drivetrain/drivetrain_test_lib.h
+++ b/frc971/control_loops/drivetrain/drivetrain_test_lib.h
@@ -1,6 +1,9 @@
#ifndef FRC971_CONTROL_LOOPS_DRIVETRAIN_DRIVETRAIN_TEST_LIB_H_
#define FRC971_CONTROL_LOOPS_DRIVETRAIN_DRIVETRAIN_TEST_LIB_H_
+#include <queue>
+#include <vector>
+
#include "aos/events/event_loop.h"
#include "frc971/control_loops/control_loops_generated.h"
#include "frc971/control_loops/drivetrain/drivetrain_config.h"
@@ -76,16 +79,27 @@
// Set whether we should send out the drivetrain Position and IMU messages
// (this will keep sending the "truth" message).
void set_send_messages(const bool send_messages) {
+ if (!send_messages && !imu_readings_.empty()) {
+ // Flush current IMU readings
+ SendImuMessage();
+ }
send_messages_ = send_messages;
}
- void set_imu_faulted(const bool fault_imu) {
- imu_faulted_ = fault_imu;
- }
+ void set_imu_faulted(const bool fault_imu) { imu_faulted_ = fault_imu; }
private:
+ struct ImuReading {
+ Eigen::Vector3d gyro;
+ Eigen::Vector3d accel;
+ int64_t timestamp;
+ bool faulted;
+ };
+
// Sends out the position queue messages.
void SendPositionMessage();
+ // Reads and stores the IMU state
+ void ReadImu();
// Sends out the IMU messages.
void SendImuMessage();
// Sends out the "truth" status message.
@@ -109,6 +123,8 @@
bool imu_faulted_ = false;
+ std::queue<ImuReading> imu_readings_;
+
DrivetrainConfig<double> dt_config_;
DrivetrainPlant drivetrain_plant_;
diff --git a/y2021_bot3/control_loops/python/drivetrain.py b/y2021_bot3/control_loops/python/drivetrain.py
index e5b001a..fdd08c2 100644
--- a/y2021_bot3/control_loops/python/drivetrain.py
+++ b/y2021_bot3/control_loops/python/drivetrain.py
@@ -16,10 +16,11 @@
J=6.0,
mass=58.0,
# TODO(austin): Measure radius a bit better.
- robot_radius=0.7 / 2.0,
- wheel_radius=6.0 * 0.0254 / 2.0,
+ robot_radius= 0.39,
+ wheel_radius= 3/39.37,
motor_type=control_loop.Falcon(),
- G=(8.0 / 70.0) * (17.0 / 24.0),
+ num_motors = 3,
+ G=8.0 / 80.0,
q_pos=0.24,
q_vel=2.5,
efficiency=0.80,