Implement no-arg watchers efficiently in ShmEventLoop

Change-Id: I0efd8d3a639a1c6bb959b2ec263ffe8a3a84917d
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index a74e65b..ba356c4 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -484,7 +484,8 @@
   // Like MakeWatcher, but doesn't have access to the message data. This may be
   // implemented to use less resources than an equivalent MakeWatcher.
   //
-  // The function will still have access to context().
+  // The function will still have access to context(), although that will have
+  // its data field set to nullptr.
   template <typename MessageType>
   void MakeNoArgWatcher(const std::string_view channel_name,
                         std::function<void()> w);
@@ -541,7 +542,9 @@
       const Channel *channel,
       std::function<void(const Context &context)> watcher) {
     MakeRawWatcher(channel, [watcher](const Context &context, const void *) {
-      watcher(context);
+      Context new_context = context;
+      new_context.data = nullptr;
+      watcher(new_context);
     });
   }
 
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index b4cfeb2..f0fb3d3 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -43,6 +43,34 @@
   EXPECT_TRUE(happened);
 }
 
+// Verifies that a no-arg watcher will not have a data pointer.
+TEST_P(AbstractEventLoopTest, NoArgNoData) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test");
+
+  bool happened = false;
+
+  loop2->OnRun([&]() {
+    happened = true;
+
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    ASSERT_TRUE(msg.Send(builder.Finish()));
+  });
+
+  loop2->MakeNoArgWatcher<TestMessage>("/test", [&]() {
+    EXPECT_GT(loop2->context().size, 0u);
+    EXPECT_EQ(nullptr, loop2->context().data);
+    this->Exit();
+  });
+
+  EXPECT_FALSE(happened);
+  Run();
+  EXPECT_TRUE(happened);
+}
+
 // Tests that no-arg watcher can receive messages from a sender.
 // Also tests that OnRun() works.
 TEST_P(AbstractEventLoopTest, BasicNoArg) {
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 6c3944c..5534c83 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -218,7 +218,9 @@
   // context.
   void DoCallCallback(std::function<monotonic_clock::time_point()> get_time,
                       Context context) {
-    CheckChannelDataAlignment(context.data, context.size);
+    if (context.data) {
+      CheckChannelDataAlignment(context.data, context.size);
+    }
     const monotonic_clock::time_point monotonic_start_time = get_time();
     {
       const float start_latency =
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 7b4daf8..afd65a3 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -164,17 +164,19 @@
 
 class SimpleShmFetcher {
  public:
-  explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel)
+  explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel,
+                            bool copy_data)
       : channel_(channel),
         lockless_queue_memory_(
             channel,
             chrono::ceil<chrono::seconds>(chrono::nanoseconds(
                 event_loop->configuration()->channel_storage_duration()))),
         lockless_queue_(lockless_queue_memory_.memory(),
-                        lockless_queue_memory_.config()),
-        data_storage_(static_cast<char *>(malloc(channel->max_size() +
-                                                 kChannelDataAlignment - 1)),
-                      &free) {
+                        lockless_queue_memory_.config()) {
+    if (copy_data) {
+      data_storage_.reset(static_cast<char *>(
+          malloc(channel->max_size() + kChannelDataAlignment - 1)));
+    }
     context_.data = nullptr;
     // Point the queue index at the next index to read starting now.  This
     // makes it such that FetchNext will read the next message sent after
@@ -217,8 +219,12 @@
       if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
         context_.realtime_remote_time = context_.realtime_event_time;
       }
-      context_.data = data_storage_start() +
-                      lockless_queue_.message_data_size() - context_.size;
+      if (copy_data()) {
+        context_.data = data_storage_start() +
+                        lockless_queue_.message_data_size() - context_.size;
+      } else {
+        context_.data = nullptr;
+      }
       actual_queue_index_ = actual_queue_index_.Increment();
     }
 
@@ -267,8 +273,12 @@
       if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
         context_.realtime_remote_time = context_.realtime_event_time;
       }
-      context_.data = data_storage_start() +
-                      lockless_queue_.message_data_size() - context_.size;
+      if (copy_data()) {
+        context_.data = data_storage_start() +
+                        lockless_queue_.message_data_size() - context_.size;
+      } else {
+        context_.data = nullptr;
+      }
       actual_queue_index_ = queue_index.Increment();
     }
 
@@ -308,8 +318,10 @@
 
  private:
   char *data_storage_start() {
+    if (!copy_data()) return nullptr;
     return RoundChannelData(data_storage_.get(), channel_->max_size());
   }
+  bool copy_data() const { return static_cast<bool>(data_storage_); }
 
   const Channel *const channel_;
   MMapedQueue lockless_queue_memory_;
@@ -318,7 +330,8 @@
   ipc_lib::QueueIndex actual_queue_index_ =
       ipc_lib::LocklessQueue::empty_queue_index();
 
-  std::unique_ptr<char, decltype(&free)> data_storage_;
+  // This being empty indicates we're not going to copy data.
+  std::unique_ptr<char, decltype(&free)> data_storage_{nullptr, &free};
 
   Context context_;
 };
@@ -327,7 +340,7 @@
  public:
   explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
       : RawFetcher(event_loop, channel),
-        simple_shm_fetcher_(event_loop, channel) {}
+        simple_shm_fetcher_(event_loop, channel, true) {}
 
   ~ShmFetcher() { context_.data = nullptr; }
 
@@ -406,11 +419,12 @@
  public:
   WatcherState(
       ShmEventLoop *event_loop, const Channel *channel,
-      std::function<void(const Context &context, const void *message)> fn)
+      std::function<void(const Context &context, const void *message)> fn,
+      bool copy_data)
       : aos::WatcherState(event_loop, channel, std::move(fn)),
         event_loop_(event_loop),
         event_(this),
-        simple_shm_fetcher_(event_loop, channel) {}
+        simple_shm_fetcher_(event_loop, channel, copy_data) {}
 
   ~WatcherState() override { event_loop_->RemoveEvent(&event_); }
 
@@ -615,7 +629,18 @@
   TakeWatcher(channel);
 
   NewWatcher(::std::unique_ptr<WatcherState>(
-      new internal::WatcherState(this, channel, std::move(watcher))));
+      new internal::WatcherState(this, channel, std::move(watcher), true)));
+}
+
+void ShmEventLoop::MakeRawNoArgWatcher(
+    const Channel *channel,
+    std::function<void(const Context &context)> watcher) {
+  TakeWatcher(channel);
+
+  NewWatcher(::std::unique_ptr<WatcherState>(new internal::WatcherState(
+      this, channel,
+      [watcher](const Context &context, const void *) { watcher(context); },
+      false)));
 }
 
 TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index fa870b8..d3f1295 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -52,6 +52,9 @@
       const Channel *channel,
       std::function<void(const Context &context, const void *message)> watcher)
       override;
+  void MakeRawNoArgWatcher(
+      const Channel *channel,
+      std::function<void(const Context &context)> watcher) override;
 
   TimerHandler *AddTimer(std::function<void()> callback) override;
   aos::PhasedLoopHandler *AddPhasedLoop(
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 7126ffd..f31d80d 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -792,7 +792,9 @@
   }
   *monotonic_remote_time = m->header.monotonic_remote_time;
   *realtime_remote_time = m->header.realtime_remote_time;
-  memcpy(data, m->data(memory_->message_data_size()), message_data_size());
+  if (data) {
+    memcpy(data, m->data(memory_->message_data_size()), message_data_size());
+  }
   *length = m->header.length;
 
   // And finally, confirm that the message *still* points to the queue index we
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 0384aa8..550485f 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -162,6 +162,8 @@
   // element newer than QueueSize() from the current message, we consider it
   // behind by a large amount and return TOO_OLD.  If the message is modified
   // out from underneath us as we read it, return OVERWROTE.
+  //
+  // data may be nullptr to indicate the data should not be copied.
   enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
   ReadResult Read(uint32_t queue_index,
                   ::aos::monotonic_clock::time_point *monotonic_sent_time,