Merge "Use MakeRawNoArgWatcher where appropriate"
diff --git a/aos/events/BUILD b/aos/events/BUILD
index a905728..d35f206 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -64,6 +64,7 @@
"event_loop_tmpl.h",
],
hdrs = [
+ "channel_preallocated_allocator.h",
"event_loop.h",
],
visibility = ["//visibility:public"],
diff --git a/aos/events/channel_preallocated_allocator.h b/aos/events/channel_preallocated_allocator.h
new file mode 100644
index 0000000..8a5d68f
--- /dev/null
+++ b/aos/events/channel_preallocated_allocator.h
@@ -0,0 +1,77 @@
+#ifndef AOS_EVENTS_CHANNEL_PREALLOCATED_ALLOCATOR_
+#define AOS_EVENTS_CHANNEL_PREALLOCATED_ALLOCATOR_
+
+#include "aos/configuration.h"
+#include "aos/configuration_generated.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+
+class ChannelPreallocatedAllocator : public flatbuffers::Allocator {
+ public:
+ ChannelPreallocatedAllocator(uint8_t *data, size_t size,
+ const Channel *channel)
+ : data_(data), size_(size), channel_(channel) {}
+
+ ChannelPreallocatedAllocator(const ChannelPreallocatedAllocator &) = delete;
+ ChannelPreallocatedAllocator(ChannelPreallocatedAllocator &&other)
+ : data_(other.data_), size_(other.size_), channel_(other.channel_) {
+ CHECK(!is_allocated());
+ CHECK(!other.is_allocated());
+ }
+
+ ChannelPreallocatedAllocator &operator=(
+ const ChannelPreallocatedAllocator &) = delete;
+ ChannelPreallocatedAllocator &operator=(
+ ChannelPreallocatedAllocator &&other) {
+ CHECK(!is_allocated());
+ CHECK(!other.is_allocated());
+ data_ = other.data_;
+ size_ = other.size_;
+ channel_ = other.channel_;
+ return *this;
+ }
+ ~ChannelPreallocatedAllocator() override { CHECK(!is_allocated_); }
+
+ // TODO(austin): Read the contract for these.
+ uint8_t *allocate(size_t /*size*/) override {
+ if (is_allocated_) {
+ LOG(FATAL) << "Can't allocate more memory with a fixed size allocator. "
+ "Increase the memory reserved.";
+ }
+
+ is_allocated_ = true;
+ return data_;
+ }
+
+ void deallocate(uint8_t *, size_t) override { is_allocated_ = false; }
+
+ uint8_t *reallocate_downward(uint8_t * /*old_p*/, size_t /*old_size*/,
+ size_t new_size, size_t /*in_use_back*/,
+ size_t /*in_use_front*/) override {
+ LOG(FATAL) << "Requested " << new_size << " bytes, max size "
+ << channel_->max_size() << " for channel "
+ << configuration::CleanedChannelToString(channel_)
+ << ". Increase the memory reserved to at least " << new_size
+ << ".";
+ return nullptr;
+ }
+
+ void Reset() { is_allocated_ = false; }
+ bool is_allocated() const { return is_allocated_; }
+
+ bool allocated() { return is_allocated_; }
+
+ size_t size() const { return size_; }
+ const uint8_t *data() const { return data_; }
+
+ private:
+ bool is_allocated_ = false;
+ uint8_t *data_;
+ size_t size_;
+ const Channel *channel_;
+};
+
+} // namespace aos
+
+#endif // AOS_EVENTS_CHANNEL_PREALLOCATED_ALLOCATOR_
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index a74e65b..bc5a5ae 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -1,5 +1,4 @@
#ifndef AOS_EVENTS_EVENT_LOOP_H_
-
#define AOS_EVENTS_EVENT_LOOP_H_
#include <atomic>
@@ -8,6 +7,7 @@
#include "aos/configuration.h"
#include "aos/configuration_generated.h"
+#include "aos/events/channel_preallocated_allocator.h"
#include "aos/events/event_loop_event.h"
#include "aos/events/event_loop_generated.h"
#include "aos/events/timing_statistics.h"
@@ -145,8 +145,9 @@
// Returns the associated flatbuffers-style allocator. This must be
// deallocated before the message is sent.
- PreallocatedAllocator *fbb_allocator() {
- fbb_allocator_ = PreallocatedAllocator(data(), size());
+ ChannelPreallocatedAllocator *fbb_allocator() {
+ fbb_allocator_ = ChannelPreallocatedAllocator(
+ reinterpret_cast<uint8_t *>(data()), size(), channel());
return &fbb_allocator_;
}
@@ -176,7 +177,7 @@
internal::RawSenderTiming timing_;
- PreallocatedAllocator fbb_allocator_{nullptr, 0};
+ ChannelPreallocatedAllocator fbb_allocator_{nullptr, 0, nullptr};
};
// Fetches the newest message from a channel.
@@ -247,7 +248,7 @@
// builder.Send(t_builder.Finish());
class Builder {
public:
- Builder(RawSender *sender, PreallocatedAllocator *allocator)
+ Builder(RawSender *sender, ChannelPreallocatedAllocator *allocator)
: fbb_(allocator->size(), allocator),
allocator_(allocator),
sender_(sender) {
@@ -283,7 +284,7 @@
private:
flatbuffers::FlatBufferBuilder fbb_;
- PreallocatedAllocator *allocator_;
+ ChannelPreallocatedAllocator *allocator_;
RawSender *sender_;
};
@@ -484,7 +485,8 @@
// Like MakeWatcher, but doesn't have access to the message data. This may be
// implemented to use less resources than an equivalent MakeWatcher.
//
- // The function will still have access to context().
+ // The function will still have access to context(), although that will have
+ // its data field set to nullptr.
template <typename MessageType>
void MakeNoArgWatcher(const std::string_view channel_name,
std::function<void()> w);
@@ -541,7 +543,9 @@
const Channel *channel,
std::function<void(const Context &context)> watcher) {
MakeRawWatcher(channel, [watcher](const Context &context, const void *) {
- watcher(context);
+ Context new_context = context;
+ new_context.data = nullptr;
+ watcher(new_context);
});
}
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index b4cfeb2..f0fb3d3 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -43,6 +43,34 @@
EXPECT_TRUE(happened);
}
+// Verifies that a no-arg watcher will not have a data pointer.
+TEST_P(AbstractEventLoopTest, NoArgNoData) {
+ auto loop1 = Make();
+ auto loop2 = MakePrimary();
+
+ aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test");
+
+ bool happened = false;
+
+ loop2->OnRun([&]() {
+ happened = true;
+
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ });
+
+ loop2->MakeNoArgWatcher<TestMessage>("/test", [&]() {
+ EXPECT_GT(loop2->context().size, 0u);
+ EXPECT_EQ(nullptr, loop2->context().data);
+ this->Exit();
+ });
+
+ EXPECT_FALSE(happened);
+ Run();
+ EXPECT_TRUE(happened);
+}
+
// Tests that no-arg watcher can receive messages from a sender.
// Also tests that OnRun() works.
TEST_P(AbstractEventLoopTest, BasicNoArg) {
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 6c3944c..5534c83 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -218,7 +218,9 @@
// context.
void DoCallCallback(std::function<monotonic_clock::time_point()> get_time,
Context context) {
- CheckChannelDataAlignment(context.data, context.size);
+ if (context.data) {
+ CheckChannelDataAlignment(context.data, context.size);
+ }
const monotonic_clock::time_point monotonic_start_time = get_time();
{
const float start_latency =
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 7b4daf8..afd65a3 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -164,17 +164,19 @@
class SimpleShmFetcher {
public:
- explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel)
+ explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel,
+ bool copy_data)
: channel_(channel),
lockless_queue_memory_(
channel,
chrono::ceil<chrono::seconds>(chrono::nanoseconds(
event_loop->configuration()->channel_storage_duration()))),
lockless_queue_(lockless_queue_memory_.memory(),
- lockless_queue_memory_.config()),
- data_storage_(static_cast<char *>(malloc(channel->max_size() +
- kChannelDataAlignment - 1)),
- &free) {
+ lockless_queue_memory_.config()) {
+ if (copy_data) {
+ data_storage_.reset(static_cast<char *>(
+ malloc(channel->max_size() + kChannelDataAlignment - 1)));
+ }
context_.data = nullptr;
// Point the queue index at the next index to read starting now. This
// makes it such that FetchNext will read the next message sent after
@@ -217,8 +219,12 @@
if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
context_.realtime_remote_time = context_.realtime_event_time;
}
- context_.data = data_storage_start() +
- lockless_queue_.message_data_size() - context_.size;
+ if (copy_data()) {
+ context_.data = data_storage_start() +
+ lockless_queue_.message_data_size() - context_.size;
+ } else {
+ context_.data = nullptr;
+ }
actual_queue_index_ = actual_queue_index_.Increment();
}
@@ -267,8 +273,12 @@
if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
context_.realtime_remote_time = context_.realtime_event_time;
}
- context_.data = data_storage_start() +
- lockless_queue_.message_data_size() - context_.size;
+ if (copy_data()) {
+ context_.data = data_storage_start() +
+ lockless_queue_.message_data_size() - context_.size;
+ } else {
+ context_.data = nullptr;
+ }
actual_queue_index_ = queue_index.Increment();
}
@@ -308,8 +318,10 @@
private:
char *data_storage_start() {
+ if (!copy_data()) return nullptr;
return RoundChannelData(data_storage_.get(), channel_->max_size());
}
+ bool copy_data() const { return static_cast<bool>(data_storage_); }
const Channel *const channel_;
MMapedQueue lockless_queue_memory_;
@@ -318,7 +330,8 @@
ipc_lib::QueueIndex actual_queue_index_ =
ipc_lib::LocklessQueue::empty_queue_index();
- std::unique_ptr<char, decltype(&free)> data_storage_;
+ // This being empty indicates we're not going to copy data.
+ std::unique_ptr<char, decltype(&free)> data_storage_{nullptr, &free};
Context context_;
};
@@ -327,7 +340,7 @@
public:
explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
: RawFetcher(event_loop, channel),
- simple_shm_fetcher_(event_loop, channel) {}
+ simple_shm_fetcher_(event_loop, channel, true) {}
~ShmFetcher() { context_.data = nullptr; }
@@ -406,11 +419,12 @@
public:
WatcherState(
ShmEventLoop *event_loop, const Channel *channel,
- std::function<void(const Context &context, const void *message)> fn)
+ std::function<void(const Context &context, const void *message)> fn,
+ bool copy_data)
: aos::WatcherState(event_loop, channel, std::move(fn)),
event_loop_(event_loop),
event_(this),
- simple_shm_fetcher_(event_loop, channel) {}
+ simple_shm_fetcher_(event_loop, channel, copy_data) {}
~WatcherState() override { event_loop_->RemoveEvent(&event_); }
@@ -615,7 +629,18 @@
TakeWatcher(channel);
NewWatcher(::std::unique_ptr<WatcherState>(
- new internal::WatcherState(this, channel, std::move(watcher))));
+ new internal::WatcherState(this, channel, std::move(watcher), true)));
+}
+
+void ShmEventLoop::MakeRawNoArgWatcher(
+ const Channel *channel,
+ std::function<void(const Context &context)> watcher) {
+ TakeWatcher(channel);
+
+ NewWatcher(::std::unique_ptr<WatcherState>(new internal::WatcherState(
+ this, channel,
+ [watcher](const Context &context, const void *) { watcher(context); },
+ false)));
}
TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index fa870b8..d3f1295 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -52,6 +52,9 @@
const Channel *channel,
std::function<void(const Context &context, const void *message)> watcher)
override;
+ void MakeRawNoArgWatcher(
+ const Channel *channel,
+ std::function<void(const Context &context)> watcher) override;
TimerHandler *AddTimer(std::function<void()> callback) override;
aos::PhasedLoopHandler *AddPhasedLoop(
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 7126ffd..f31d80d 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -792,7 +792,9 @@
}
*monotonic_remote_time = m->header.monotonic_remote_time;
*realtime_remote_time = m->header.realtime_remote_time;
- memcpy(data, m->data(memory_->message_data_size()), message_data_size());
+ if (data) {
+ memcpy(data, m->data(memory_->message_data_size()), message_data_size());
+ }
*length = m->header.length;
// And finally, confirm that the message *still* points to the queue index we
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 0384aa8..550485f 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -162,6 +162,8 @@
// element newer than QueueSize() from the current message, we consider it
// behind by a large amount and return TOO_OLD. If the message is modified
// out from underneath us as we read it, return OVERWROTE.
+ //
+ // data may be nullptr to indicate the data should not be copied.
enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
ReadResult Read(uint32_t queue_index,
::aos::monotonic_clock::time_point *monotonic_sent_time,
diff --git a/y2020/vision/BUILD b/y2020/vision/BUILD
index 3baa61b..86088e6 100644
--- a/y2020/vision/BUILD
+++ b/y2020/vision/BUILD
@@ -78,6 +78,7 @@
"//aos:init",
"//aos/events:shm_event_loop",
"//third_party:opencv",
+ "//y2020/vision/sift:sift_fbs",
],
)
diff --git a/y2020/vision/viewer.cc b/y2020/vision/viewer.cc
index be9b980..cd087c7 100644
--- a/y2020/vision/viewer.cc
+++ b/y2020/vision/viewer.cc
@@ -1,3 +1,4 @@
+#include <map>
#include <opencv2/calib3d.hpp>
#include <opencv2/features2d.hpp>
#include <opencv2/highgui/highgui.hpp>
@@ -5,6 +6,7 @@
#include "aos/events/shm_event_loop.h"
#include "aos/init.h"
+#include "y2020/vision/sift/sift_generated.h"
#include "y2020/vision/vision_generated.h"
DEFINE_string(config, "config.json", "Path to the config file to use.");
@@ -14,23 +16,62 @@
namespace {
void ViewerMain() {
+ struct TargetData {
+ float x;
+ float y;
+ float radius;
+ };
+
+ std::map<int64_t, TargetData> target_data_map;
+
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
aos::configuration::ReadConfig(FLAGS_config);
aos::ShmEventLoop event_loop(&config.message());
- event_loop.MakeWatcher("/camera", [](const CameraImage &image) {
- cv::Mat image_mat(image.rows(), image.cols(), CV_8U);
- CHECK(image_mat.isContinuous());
- const int number_pixels = image.rows() * image.cols();
- for (int i = 0; i < number_pixels; ++i) {
- reinterpret_cast<uint8_t *>(image_mat.data)[i] =
- image.data()->data()[i * 2];
- }
+ event_loop.MakeWatcher(
+ "/camera", [&target_data_map](const CameraImage &image) {
+ cv::Mat image_mat(image.rows(), image.cols(), CV_8U);
+ CHECK(image_mat.isContinuous());
+ const int number_pixels = image.rows() * image.cols();
+ for (int i = 0; i < number_pixels; ++i) {
+ reinterpret_cast<uint8_t *>(image_mat.data)[i] =
+ image.data()->data()[i * 2];
+ }
- cv::imshow("Display", image_mat);
- cv::waitKey(1);
- });
+ int64_t timestamp = image.monotonic_timestamp_ns();
+ auto target_it = target_data_map.find(timestamp);
+ if (target_it != target_data_map.end()) {
+ float x = target_it->second.x;
+ float y = target_it->second.y;
+ float radius = target_it->second.radius;
+ cv::circle(image_mat, cv::Point2f(x, y), radius, 255, 5);
+ } else {
+ LOG(INFO) << "Couldn't find timestamp match for timestamp: "
+ << timestamp;
+ }
+ cv::imshow("Display", image_mat);
+ int keystroke = cv::waitKey(1);
+ if ((keystroke & 0xFF) == static_cast<int>('q')) {
+ exit(0);
+ }
+ });
+
+ event_loop.MakeWatcher(
+ "/camera", [&target_data_map](const sift::ImageMatchResult &match) {
+ int64_t timestamp = match.image_monotonic_timestamp_ns();
+ if (match.camera_poses() != NULL && match.camera_poses()->size() > 0) {
+ LOG(INFO) << "Got match!\n";
+ TargetData target_data = {
+ match.camera_poses()->Get(0)->query_target_point_x(),
+ match.camera_poses()->Get(0)->query_target_point_y(),
+ match.camera_poses()->Get(0)->query_target_point_radius()};
+ target_data_map[timestamp] = target_data;
+ while (target_data_map.size() > 10u) {
+ target_data_map.erase(target_data_map.begin());
+ }
+ }
+ });
event_loop.Run();
}