Implement no-arg watchers efficiently in ShmEventLoop
Change-Id: I0efd8d3a639a1c6bb959b2ec263ffe8a3a84917d
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 7b4daf8..afd65a3 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -164,17 +164,19 @@
class SimpleShmFetcher {
public:
- explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel)
+ explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel,
+ bool copy_data)
: channel_(channel),
lockless_queue_memory_(
channel,
chrono::ceil<chrono::seconds>(chrono::nanoseconds(
event_loop->configuration()->channel_storage_duration()))),
lockless_queue_(lockless_queue_memory_.memory(),
- lockless_queue_memory_.config()),
- data_storage_(static_cast<char *>(malloc(channel->max_size() +
- kChannelDataAlignment - 1)),
- &free) {
+ lockless_queue_memory_.config()) {
+ if (copy_data) {
+ data_storage_.reset(static_cast<char *>(
+ malloc(channel->max_size() + kChannelDataAlignment - 1)));
+ }
context_.data = nullptr;
// Point the queue index at the next index to read starting now. This
// makes it such that FetchNext will read the next message sent after
@@ -217,8 +219,12 @@
if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
context_.realtime_remote_time = context_.realtime_event_time;
}
- context_.data = data_storage_start() +
- lockless_queue_.message_data_size() - context_.size;
+ if (copy_data()) {
+ context_.data = data_storage_start() +
+ lockless_queue_.message_data_size() - context_.size;
+ } else {
+ context_.data = nullptr;
+ }
actual_queue_index_ = actual_queue_index_.Increment();
}
@@ -267,8 +273,12 @@
if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
context_.realtime_remote_time = context_.realtime_event_time;
}
- context_.data = data_storage_start() +
- lockless_queue_.message_data_size() - context_.size;
+ if (copy_data()) {
+ context_.data = data_storage_start() +
+ lockless_queue_.message_data_size() - context_.size;
+ } else {
+ context_.data = nullptr;
+ }
actual_queue_index_ = queue_index.Increment();
}
@@ -308,8 +318,10 @@
private:
char *data_storage_start() {
+ if (!copy_data()) return nullptr;
return RoundChannelData(data_storage_.get(), channel_->max_size());
}
+ bool copy_data() const { return static_cast<bool>(data_storage_); }
const Channel *const channel_;
MMapedQueue lockless_queue_memory_;
@@ -318,7 +330,8 @@
ipc_lib::QueueIndex actual_queue_index_ =
ipc_lib::LocklessQueue::empty_queue_index();
- std::unique_ptr<char, decltype(&free)> data_storage_;
+ // This being empty indicates we're not going to copy data.
+ std::unique_ptr<char, decltype(&free)> data_storage_{nullptr, &free};
Context context_;
};
@@ -327,7 +340,7 @@
public:
explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
: RawFetcher(event_loop, channel),
- simple_shm_fetcher_(event_loop, channel) {}
+ simple_shm_fetcher_(event_loop, channel, true) {}
~ShmFetcher() { context_.data = nullptr; }
@@ -406,11 +419,12 @@
public:
WatcherState(
ShmEventLoop *event_loop, const Channel *channel,
- std::function<void(const Context &context, const void *message)> fn)
+ std::function<void(const Context &context, const void *message)> fn,
+ bool copy_data)
: aos::WatcherState(event_loop, channel, std::move(fn)),
event_loop_(event_loop),
event_(this),
- simple_shm_fetcher_(event_loop, channel) {}
+ simple_shm_fetcher_(event_loop, channel, copy_data) {}
~WatcherState() override { event_loop_->RemoveEvent(&event_); }
@@ -615,7 +629,18 @@
TakeWatcher(channel);
NewWatcher(::std::unique_ptr<WatcherState>(
- new internal::WatcherState(this, channel, std::move(watcher))));
+ new internal::WatcherState(this, channel, std::move(watcher), true)));
+}
+
+void ShmEventLoop::MakeRawNoArgWatcher(
+ const Channel *channel,
+ std::function<void(const Context &context)> watcher) {
+ TakeWatcher(channel);
+
+ NewWatcher(::std::unique_ptr<WatcherState>(new internal::WatcherState(
+ this, channel,
+ [watcher](const Context &context, const void *) { watcher(context); },
+ false)));
}
TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {