Merge "Use MakeRawNoArgWatcher where appropriate"
diff --git a/aos/events/BUILD b/aos/events/BUILD
index a905728..d35f206 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -64,6 +64,7 @@
         "event_loop_tmpl.h",
     ],
     hdrs = [
+        "channel_preallocated_allocator.h",
         "event_loop.h",
     ],
     visibility = ["//visibility:public"],
diff --git a/aos/events/channel_preallocated_allocator.h b/aos/events/channel_preallocated_allocator.h
new file mode 100644
index 0000000..8a5d68f
--- /dev/null
+++ b/aos/events/channel_preallocated_allocator.h
@@ -0,0 +1,77 @@
+#ifndef AOS_EVENTS_CHANNEL_PREALLOCATED_ALLOCATOR_
+#define AOS_EVENTS_CHANNEL_PREALLOCATED_ALLOCATOR_
+
+#include "aos/configuration.h"
+#include "aos/configuration_generated.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+
+class ChannelPreallocatedAllocator : public flatbuffers::Allocator {
+ public:
+  ChannelPreallocatedAllocator(uint8_t *data, size_t size,
+                               const Channel *channel)
+      : data_(data), size_(size), channel_(channel) {}
+
+  ChannelPreallocatedAllocator(const ChannelPreallocatedAllocator &) = delete;
+  ChannelPreallocatedAllocator(ChannelPreallocatedAllocator &&other)
+      : data_(other.data_), size_(other.size_), channel_(other.channel_) {
+    CHECK(!is_allocated());
+    CHECK(!other.is_allocated());
+  }
+
+  ChannelPreallocatedAllocator &operator=(
+      const ChannelPreallocatedAllocator &) = delete;
+  ChannelPreallocatedAllocator &operator=(
+      ChannelPreallocatedAllocator &&other) {
+    CHECK(!is_allocated());
+    CHECK(!other.is_allocated());
+    data_ = other.data_;
+    size_ = other.size_;
+    channel_ = other.channel_;
+    return *this;
+  }
+  ~ChannelPreallocatedAllocator() override { CHECK(!is_allocated_); }
+
+  // TODO(austin): Read the contract for these.
+  uint8_t *allocate(size_t /*size*/) override {
+    if (is_allocated_) {
+      LOG(FATAL) << "Can't allocate more memory with a fixed size allocator.  "
+                    "Increase the memory reserved.";
+    }
+
+    is_allocated_ = true;
+    return data_;
+  }
+
+  void deallocate(uint8_t *, size_t) override { is_allocated_ = false; }
+
+  uint8_t *reallocate_downward(uint8_t * /*old_p*/, size_t /*old_size*/,
+                               size_t new_size, size_t /*in_use_back*/,
+                               size_t /*in_use_front*/) override {
+    LOG(FATAL) << "Requested " << new_size << " bytes, max size "
+               << channel_->max_size() << " for channel "
+               << configuration::CleanedChannelToString(channel_)
+               << ".  Increase the memory reserved to at least " << new_size
+               << ".";
+    return nullptr;
+  }
+
+  void Reset() { is_allocated_ = false; }
+  bool is_allocated() const { return is_allocated_; }
+
+  bool allocated() { return is_allocated_; }
+
+  size_t size() const { return size_; }
+  const uint8_t *data() const { return data_; }
+
+ private:
+  bool is_allocated_ = false;
+  uint8_t *data_;
+  size_t size_;
+  const Channel *channel_;
+};
+
+}  // namespace aos
+
+#endif  // AOS_EVENTS_CHANNEL_PREALLOCATED_ALLOCATOR_
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index a74e65b..bc5a5ae 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -1,5 +1,4 @@
 #ifndef AOS_EVENTS_EVENT_LOOP_H_
-
 #define AOS_EVENTS_EVENT_LOOP_H_
 
 #include <atomic>
@@ -8,6 +7,7 @@
 
 #include "aos/configuration.h"
 #include "aos/configuration_generated.h"
+#include "aos/events/channel_preallocated_allocator.h"
 #include "aos/events/event_loop_event.h"
 #include "aos/events/event_loop_generated.h"
 #include "aos/events/timing_statistics.h"
@@ -145,8 +145,9 @@
 
   // Returns the associated flatbuffers-style allocator. This must be
   // deallocated before the message is sent.
-  PreallocatedAllocator *fbb_allocator() {
-    fbb_allocator_ = PreallocatedAllocator(data(), size());
+  ChannelPreallocatedAllocator *fbb_allocator() {
+    fbb_allocator_ = ChannelPreallocatedAllocator(
+        reinterpret_cast<uint8_t *>(data()), size(), channel());
     return &fbb_allocator_;
   }
 
@@ -176,7 +177,7 @@
 
   internal::RawSenderTiming timing_;
 
-  PreallocatedAllocator fbb_allocator_{nullptr, 0};
+  ChannelPreallocatedAllocator fbb_allocator_{nullptr, 0, nullptr};
 };
 
 // Fetches the newest message from a channel.
@@ -247,7 +248,7 @@
   // builder.Send(t_builder.Finish());
   class Builder {
    public:
-    Builder(RawSender *sender, PreallocatedAllocator *allocator)
+    Builder(RawSender *sender, ChannelPreallocatedAllocator *allocator)
         : fbb_(allocator->size(), allocator),
           allocator_(allocator),
           sender_(sender) {
@@ -283,7 +284,7 @@
 
    private:
     flatbuffers::FlatBufferBuilder fbb_;
-    PreallocatedAllocator *allocator_;
+    ChannelPreallocatedAllocator *allocator_;
     RawSender *sender_;
   };
 
@@ -484,7 +485,8 @@
   // Like MakeWatcher, but doesn't have access to the message data. This may be
   // implemented to use less resources than an equivalent MakeWatcher.
   //
-  // The function will still have access to context().
+  // The function will still have access to context(), although that will have
+  // its data field set to nullptr.
   template <typename MessageType>
   void MakeNoArgWatcher(const std::string_view channel_name,
                         std::function<void()> w);
@@ -541,7 +543,9 @@
       const Channel *channel,
       std::function<void(const Context &context)> watcher) {
     MakeRawWatcher(channel, [watcher](const Context &context, const void *) {
-      watcher(context);
+      Context new_context = context;
+      new_context.data = nullptr;
+      watcher(new_context);
     });
   }
 
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index b4cfeb2..f0fb3d3 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -43,6 +43,34 @@
   EXPECT_TRUE(happened);
 }
 
+// Verifies that a no-arg watcher will not have a data pointer.
+TEST_P(AbstractEventLoopTest, NoArgNoData) {
+  auto loop1 = Make();
+  auto loop2 = MakePrimary();
+
+  aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test");
+
+  bool happened = false;
+
+  loop2->OnRun([&]() {
+    happened = true;
+
+    aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+    TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+    ASSERT_TRUE(msg.Send(builder.Finish()));
+  });
+
+  loop2->MakeNoArgWatcher<TestMessage>("/test", [&]() {
+    EXPECT_GT(loop2->context().size, 0u);
+    EXPECT_EQ(nullptr, loop2->context().data);
+    this->Exit();
+  });
+
+  EXPECT_FALSE(happened);
+  Run();
+  EXPECT_TRUE(happened);
+}
+
 // Tests that no-arg watcher can receive messages from a sender.
 // Also tests that OnRun() works.
 TEST_P(AbstractEventLoopTest, BasicNoArg) {
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 6c3944c..5534c83 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -218,7 +218,9 @@
   // context.
   void DoCallCallback(std::function<monotonic_clock::time_point()> get_time,
                       Context context) {
-    CheckChannelDataAlignment(context.data, context.size);
+    if (context.data) {
+      CheckChannelDataAlignment(context.data, context.size);
+    }
     const monotonic_clock::time_point monotonic_start_time = get_time();
     {
       const float start_latency =
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 7b4daf8..afd65a3 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -164,17 +164,19 @@
 
 class SimpleShmFetcher {
  public:
-  explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel)
+  explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel,
+                            bool copy_data)
       : channel_(channel),
         lockless_queue_memory_(
             channel,
             chrono::ceil<chrono::seconds>(chrono::nanoseconds(
                 event_loop->configuration()->channel_storage_duration()))),
         lockless_queue_(lockless_queue_memory_.memory(),
-                        lockless_queue_memory_.config()),
-        data_storage_(static_cast<char *>(malloc(channel->max_size() +
-                                                 kChannelDataAlignment - 1)),
-                      &free) {
+                        lockless_queue_memory_.config()) {
+    if (copy_data) {
+      data_storage_.reset(static_cast<char *>(
+          malloc(channel->max_size() + kChannelDataAlignment - 1)));
+    }
     context_.data = nullptr;
     // Point the queue index at the next index to read starting now.  This
     // makes it such that FetchNext will read the next message sent after
@@ -217,8 +219,12 @@
       if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
         context_.realtime_remote_time = context_.realtime_event_time;
       }
-      context_.data = data_storage_start() +
-                      lockless_queue_.message_data_size() - context_.size;
+      if (copy_data()) {
+        context_.data = data_storage_start() +
+                        lockless_queue_.message_data_size() - context_.size;
+      } else {
+        context_.data = nullptr;
+      }
       actual_queue_index_ = actual_queue_index_.Increment();
     }
 
@@ -267,8 +273,12 @@
       if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
         context_.realtime_remote_time = context_.realtime_event_time;
       }
-      context_.data = data_storage_start() +
-                      lockless_queue_.message_data_size() - context_.size;
+      if (copy_data()) {
+        context_.data = data_storage_start() +
+                        lockless_queue_.message_data_size() - context_.size;
+      } else {
+        context_.data = nullptr;
+      }
       actual_queue_index_ = queue_index.Increment();
     }
 
@@ -308,8 +318,10 @@
 
  private:
   char *data_storage_start() {
+    if (!copy_data()) return nullptr;
     return RoundChannelData(data_storage_.get(), channel_->max_size());
   }
+  bool copy_data() const { return static_cast<bool>(data_storage_); }
 
   const Channel *const channel_;
   MMapedQueue lockless_queue_memory_;
@@ -318,7 +330,8 @@
   ipc_lib::QueueIndex actual_queue_index_ =
       ipc_lib::LocklessQueue::empty_queue_index();
 
-  std::unique_ptr<char, decltype(&free)> data_storage_;
+  // This being empty indicates we're not going to copy data.
+  std::unique_ptr<char, decltype(&free)> data_storage_{nullptr, &free};
 
   Context context_;
 };
@@ -327,7 +340,7 @@
  public:
   explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
       : RawFetcher(event_loop, channel),
-        simple_shm_fetcher_(event_loop, channel) {}
+        simple_shm_fetcher_(event_loop, channel, true) {}
 
   ~ShmFetcher() { context_.data = nullptr; }
 
@@ -406,11 +419,12 @@
  public:
   WatcherState(
       ShmEventLoop *event_loop, const Channel *channel,
-      std::function<void(const Context &context, const void *message)> fn)
+      std::function<void(const Context &context, const void *message)> fn,
+      bool copy_data)
       : aos::WatcherState(event_loop, channel, std::move(fn)),
         event_loop_(event_loop),
         event_(this),
-        simple_shm_fetcher_(event_loop, channel) {}
+        simple_shm_fetcher_(event_loop, channel, copy_data) {}
 
   ~WatcherState() override { event_loop_->RemoveEvent(&event_); }
 
@@ -615,7 +629,18 @@
   TakeWatcher(channel);
 
   NewWatcher(::std::unique_ptr<WatcherState>(
-      new internal::WatcherState(this, channel, std::move(watcher))));
+      new internal::WatcherState(this, channel, std::move(watcher), true)));
+}
+
+void ShmEventLoop::MakeRawNoArgWatcher(
+    const Channel *channel,
+    std::function<void(const Context &context)> watcher) {
+  TakeWatcher(channel);
+
+  NewWatcher(::std::unique_ptr<WatcherState>(new internal::WatcherState(
+      this, channel,
+      [watcher](const Context &context, const void *) { watcher(context); },
+      false)));
 }
 
 TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index fa870b8..d3f1295 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -52,6 +52,9 @@
       const Channel *channel,
       std::function<void(const Context &context, const void *message)> watcher)
       override;
+  void MakeRawNoArgWatcher(
+      const Channel *channel,
+      std::function<void(const Context &context)> watcher) override;
 
   TimerHandler *AddTimer(std::function<void()> callback) override;
   aos::PhasedLoopHandler *AddPhasedLoop(
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 7126ffd..f31d80d 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -792,7 +792,9 @@
   }
   *monotonic_remote_time = m->header.monotonic_remote_time;
   *realtime_remote_time = m->header.realtime_remote_time;
-  memcpy(data, m->data(memory_->message_data_size()), message_data_size());
+  if (data) {
+    memcpy(data, m->data(memory_->message_data_size()), message_data_size());
+  }
   *length = m->header.length;
 
   // And finally, confirm that the message *still* points to the queue index we
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 0384aa8..550485f 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -162,6 +162,8 @@
   // element newer than QueueSize() from the current message, we consider it
   // behind by a large amount and return TOO_OLD.  If the message is modified
   // out from underneath us as we read it, return OVERWROTE.
+  //
+  // data may be nullptr to indicate the data should not be copied.
   enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
   ReadResult Read(uint32_t queue_index,
                   ::aos::monotonic_clock::time_point *monotonic_sent_time,
diff --git a/y2020/vision/BUILD b/y2020/vision/BUILD
index 3baa61b..86088e6 100644
--- a/y2020/vision/BUILD
+++ b/y2020/vision/BUILD
@@ -78,6 +78,7 @@
         "//aos:init",
         "//aos/events:shm_event_loop",
         "//third_party:opencv",
+        "//y2020/vision/sift:sift_fbs",
     ],
 )
 
diff --git a/y2020/vision/viewer.cc b/y2020/vision/viewer.cc
index be9b980..cd087c7 100644
--- a/y2020/vision/viewer.cc
+++ b/y2020/vision/viewer.cc
@@ -1,3 +1,4 @@
+#include <map>
 #include <opencv2/calib3d.hpp>
 #include <opencv2/features2d.hpp>
 #include <opencv2/highgui/highgui.hpp>
@@ -5,6 +6,7 @@
 
 #include "aos/events/shm_event_loop.h"
 #include "aos/init.h"
+#include "y2020/vision/sift/sift_generated.h"
 #include "y2020/vision/vision_generated.h"
 
 DEFINE_string(config, "config.json", "Path to the config file to use.");
@@ -14,23 +16,62 @@
 namespace {
 
 void ViewerMain() {
+  struct TargetData {
+    float x;
+    float y;
+    float radius;
+  };
+
+  std::map<int64_t, TargetData> target_data_map;
+
   aos::FlatbufferDetachedBuffer<aos::Configuration> config =
       aos::configuration::ReadConfig(FLAGS_config);
 
   aos::ShmEventLoop event_loop(&config.message());
 
-  event_loop.MakeWatcher("/camera", [](const CameraImage &image) {
-    cv::Mat image_mat(image.rows(), image.cols(), CV_8U);
-    CHECK(image_mat.isContinuous());
-    const int number_pixels = image.rows() * image.cols();
-    for (int i = 0; i < number_pixels; ++i) {
-      reinterpret_cast<uint8_t *>(image_mat.data)[i] =
-          image.data()->data()[i * 2];
-    }
+  event_loop.MakeWatcher(
+      "/camera", [&target_data_map](const CameraImage &image) {
+        cv::Mat image_mat(image.rows(), image.cols(), CV_8U);
+        CHECK(image_mat.isContinuous());
+        const int number_pixels = image.rows() * image.cols();
+        for (int i = 0; i < number_pixels; ++i) {
+          reinterpret_cast<uint8_t *>(image_mat.data)[i] =
+              image.data()->data()[i * 2];
+        }
 
-    cv::imshow("Display", image_mat);
-    cv::waitKey(1);
-  });
+        int64_t timestamp = image.monotonic_timestamp_ns();
+        auto target_it = target_data_map.find(timestamp);
+        if (target_it != target_data_map.end()) {
+          float x = target_it->second.x;
+          float y = target_it->second.y;
+          float radius = target_it->second.radius;
+          cv::circle(image_mat, cv::Point2f(x, y), radius, 255, 5);
+        } else {
+          LOG(INFO) << "Couldn't find timestamp match for timestamp: "
+                    << timestamp;
+        }
+        cv::imshow("Display", image_mat);
+        int keystroke = cv::waitKey(1);
+        if ((keystroke & 0xFF) == static_cast<int>('q')) {
+          exit(0);
+        }
+      });
+
+  event_loop.MakeWatcher(
+      "/camera", [&target_data_map](const sift::ImageMatchResult &match) {
+        int64_t timestamp = match.image_monotonic_timestamp_ns();
+        if (match.camera_poses() != NULL && match.camera_poses()->size() > 0) {
+          LOG(INFO) << "Got match!\n";
+          TargetData target_data = {
+              match.camera_poses()->Get(0)->query_target_point_x(),
+              match.camera_poses()->Get(0)->query_target_point_y(),
+              match.camera_poses()->Get(0)->query_target_point_radius()};
+          target_data_map[timestamp] = target_data;
+          while (target_data_map.size() > 10u) {
+            target_data_map.erase(target_data_map.begin());
+          }
+        }
+      });
 
   event_loop.Run();
 }