Merge changes I0efd8d3a,Ica7beba9
* changes:
Implement no-arg watchers efficiently in ShmEventLoop
Add an API for watchers without an argument
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 4ea6724..bc5a5ae 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -303,9 +303,7 @@
// Returns the name of the underlying queue.
const Channel *channel() const { return sender_->channel(); }
- operator bool() {
- return sender_ ? true : false;
- }
+ operator bool() { return sender_ ? true : false; }
// Returns the time_points that the last message was sent at.
aos::monotonic_clock::time_point monotonic_sent_time() const {
@@ -465,13 +463,33 @@
// This will watch messages sent to the provided channel.
//
- // Watch is a functor that have a call signature like so:
- // void Event(const MessageType& type);
+ // w must have a non-polymorphic operator() (aka it can only be called with a
+ // single set of arguments; no overloading or templates). It must be callable
+ // with this signature:
+ // void(const MessageType &);
//
- // TODO(parker): Need to support ::std::bind. For now, use lambdas.
- // TODO(austin): Do we need a functor? Or is a std::function good enough?
+ // Lambdas are a common form for w. A std::function will work too.
+ //
+ // Note that bind expressions have polymorphic call operators, so they are not
+ // allowed.
+ //
+ // We template Watch as a whole instead of using std::function<void(const T
+ // &)> to allow deducing MessageType from lambdas and other things which are
+ // implicitly convertible to std::function, but not actually std::function
+ // instantiations. Template deduction guides might allow solving this
+ // differently in newer versions of C++, but those have their own corner
+ // cases.
template <typename Watch>
- void MakeWatcher(const std::string_view name, Watch &&w);
+ void MakeWatcher(const std::string_view channel_name, Watch &&w);
+
+ // Like MakeWatcher, but doesn't have access to the message data. This may be
+ // implemented to use less resources than an equivalent MakeWatcher.
+ //
+ // The function will still have access to context(), although that will have
+ // its data field set to nullptr.
+ template <typename MessageType>
+ void MakeNoArgWatcher(const std::string_view channel_name,
+ std::function<void()> w);
// The passed in function will be called when the event loop starts.
// Use this to run code once the thread goes into "real-time-mode",
@@ -519,6 +537,18 @@
std::function<void(const Context &context, const void *message)>
watcher) = 0;
+ // Watches channel (name, type) for new messages, without needing to extract
+ // the message contents. Default implementation simply re-uses MakeRawWatcher.
+ virtual void MakeRawNoArgWatcher(
+ const Channel *channel,
+ std::function<void(const Context &context)> watcher) {
+ MakeRawWatcher(channel, [watcher](const Context &context, const void *) {
+ Context new_context = context;
+ new_context.data = nullptr;
+ watcher(new_context);
+ });
+ }
+
// Creates a raw sender for the provided channel. This is used for reflection
// based sending.
// Note: this ignores any node constraints. Ignore at your own peril.
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 096d84a..f0fb3d3 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -43,6 +43,94 @@
EXPECT_TRUE(happened);
}
+// Verifies that a no-arg watcher will not have a data pointer.
+TEST_P(AbstractEventLoopTest, NoArgNoData) {
+ auto loop1 = Make();
+ auto loop2 = MakePrimary();
+
+ aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test");
+
+ bool happened = false;
+
+ loop2->OnRun([&]() {
+ happened = true;
+
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ });
+
+ loop2->MakeNoArgWatcher<TestMessage>("/test", [&]() {
+ EXPECT_GT(loop2->context().size, 0u);
+ EXPECT_EQ(nullptr, loop2->context().data);
+ this->Exit();
+ });
+
+ EXPECT_FALSE(happened);
+ Run();
+ EXPECT_TRUE(happened);
+}
+
+// Tests that no-arg watcher can receive messages from a sender.
+// Also tests that OnRun() works.
+TEST_P(AbstractEventLoopTest, BasicNoArg) {
+ auto loop1 = Make();
+ auto loop2 = MakePrimary();
+
+ aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test");
+
+ bool happened = false;
+
+ loop2->OnRun([&]() {
+ happened = true;
+
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ });
+
+ aos::Fetcher<TestMessage> fetcher = loop2->MakeFetcher<TestMessage>("/test");
+ loop2->MakeNoArgWatcher<TestMessage>("/test", [&]() {
+ ASSERT_TRUE(fetcher.Fetch());
+ EXPECT_EQ(fetcher->value(), 200);
+ this->Exit();
+ });
+
+ EXPECT_FALSE(happened);
+ Run();
+ EXPECT_TRUE(happened);
+}
+
+// Tests that a watcher can be created with an std::function.
+TEST_P(AbstractEventLoopTest, BasicFunction) {
+ auto loop1 = Make();
+ auto loop2 = MakePrimary();
+
+ aos::Sender<TestMessage> sender = loop1->MakeSender<TestMessage>("/test");
+
+ bool happened = false;
+
+ loop2->OnRun([&]() {
+ happened = true;
+
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ });
+
+ loop2->MakeWatcher("/test", std::function<void(const TestMessage &)>(
+ [&](const TestMessage &message) {
+ EXPECT_EQ(message.value(), 200);
+ this->Exit();
+ }));
+
+ EXPECT_FALSE(happened);
+ Run();
+ EXPECT_TRUE(happened);
+}
+
// Tests that watcher can receive messages from two senders.
// Also tests that OnRun() works.
TEST_P(AbstractEventLoopTest, BasicTwoSenders) {
@@ -463,6 +551,9 @@
EXPECT_DEATH(
{ loop->MakeWatcher("/test/invalid", [&](const TestMessage &) {}); },
"/test/invalid");
+ EXPECT_DEATH(
+ { loop->MakeNoArgWatcher<TestMessage>("/test/invalid", [&]() {}); },
+ "/test/invalid");
}
// Verify that registering a watcher twice for "/test" fails.
@@ -471,6 +562,16 @@
loop->MakeWatcher("/test", [&](const TestMessage &) {});
EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}),
"/test");
+ EXPECT_DEATH(loop->MakeNoArgWatcher<TestMessage>("/test", [&]() {}), "/test");
+}
+
+// Verify that registering a no-arg watcher twice for "/test" fails.
+TEST_P(AbstractEventLoopDeathTest, TwoNoArgWatcher) {
+ auto loop = Make();
+ loop->MakeNoArgWatcher<TestMessage>("/test", [&]() {});
+ EXPECT_DEATH(loop->MakeWatcher("/test", [&](const TestMessage &) {}),
+ "/test");
+ EXPECT_DEATH(loop->MakeNoArgWatcher<TestMessage>("/test", [&]() {}), "/test");
}
// Verify that SetRuntimeRealtimePriority fails while running.
@@ -512,6 +613,16 @@
EXPECT_DEATH(Run(), "running");
}
+// Verify that we can't create a no-arg watcher inside OnRun.
+TEST_P(AbstractEventLoopDeathTest, NoArgWatcherInOnRun) {
+ auto loop1 = MakePrimary();
+
+ loop1->OnRun(
+ [&]() { loop1->MakeNoArgWatcher<TestMessage>("/test", [&]() {}); });
+
+ EXPECT_DEATH(Run(), "running");
+}
+
// Verify that Quit() works when there are multiple watchers.
TEST_P(AbstractEventLoopTest, MultipleWatcherQuit) {
auto loop1 = Make();
@@ -722,7 +833,7 @@
"Channel pointer not found in configuration\\(\\)->channels\\(\\)");
}
-// Verify that the send time on a message is roughly right.
+// Verify that the send time on a message is roughly right when using a watcher.
TEST_P(AbstractEventLoopTest, MessageSendTime) {
auto loop1 = MakePrimary();
auto loop2 = Make();
@@ -737,7 +848,7 @@
});
bool triggered = false;
- loop1->MakeWatcher("/test", [&triggered, &loop1](const TestMessage &msg) {
+ loop1->MakeWatcher("/test", [&](const TestMessage &msg) {
// Confirm that the data pointer makes sense from a watcher, and all the
// timestamps look right.
EXPECT_GT(&msg, loop1->context().data);
@@ -770,7 +881,92 @@
EXPECT_TRUE(triggered);
- EXPECT_TRUE(fetcher.Fetch());
+ ASSERT_TRUE(fetcher.Fetch());
+
+ monotonic_clock::duration monotonic_time_offset =
+ fetcher.context().monotonic_event_time -
+ (loop1->monotonic_now() - ::std::chrono::seconds(1));
+ realtime_clock::duration realtime_time_offset =
+ fetcher.context().realtime_event_time -
+ (loop1->realtime_now() - ::std::chrono::seconds(1));
+
+ EXPECT_EQ(fetcher.context().realtime_event_time,
+ fetcher.context().realtime_remote_time);
+ EXPECT_EQ(fetcher.context().monotonic_event_time,
+ fetcher.context().monotonic_remote_time);
+
+ EXPECT_TRUE(monotonic_time_offset > ::std::chrono::milliseconds(-500))
+ << ": Got "
+ << fetcher.context().monotonic_event_time.time_since_epoch().count()
+ << " expected " << loop1->monotonic_now().time_since_epoch().count();
+ // Confirm that the data pointer makes sense.
+ EXPECT_GT(fetcher.get(), fetcher.context().data);
+ EXPECT_LT(fetcher.get(),
+ reinterpret_cast<void *>(
+ reinterpret_cast<char *>(fetcher.context().data) +
+ fetcher.context().size));
+ EXPECT_TRUE(monotonic_time_offset < ::std::chrono::milliseconds(500))
+ << ": Got "
+ << fetcher.context().monotonic_event_time.time_since_epoch().count()
+ << " expected " << loop1->monotonic_now().time_since_epoch().count();
+
+ EXPECT_TRUE(realtime_time_offset > ::std::chrono::milliseconds(-500))
+ << ": Got "
+ << fetcher.context().realtime_event_time.time_since_epoch().count()
+ << " expected " << loop1->realtime_now().time_since_epoch().count();
+ EXPECT_TRUE(realtime_time_offset < ::std::chrono::milliseconds(500))
+ << ": Got "
+ << fetcher.context().realtime_event_time.time_since_epoch().count()
+ << " expected " << loop1->realtime_now().time_since_epoch().count();
+}
+
+// Verify that the send time on a message is roughly right when using a no-arg
+// watcher. To get a message, we need to use a fetcher to actually access the
+// message. This is also the main use case for no-arg fetchers.
+TEST_P(AbstractEventLoopTest, MessageSendTimeNoArg) {
+ auto loop1 = MakePrimary();
+ auto loop2 = Make();
+ auto sender = loop2->MakeSender<TestMessage>("/test");
+ auto fetcher = loop1->MakeFetcher<TestMessage>("/test");
+
+ auto test_timer = loop1->AddTimer([&sender]() {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ });
+
+ bool triggered = false;
+ loop1->MakeNoArgWatcher<TestMessage>("/test", [&]() {
+ // Confirm that we can indeed use a fetcher on this channel from this
+ // context, and it results in a sane data pointer and timestamps.
+ ASSERT_TRUE(fetcher.Fetch());
+
+ EXPECT_EQ(loop1->context().monotonic_remote_time,
+ loop1->context().monotonic_event_time);
+ EXPECT_EQ(loop1->context().realtime_remote_time,
+ loop1->context().realtime_event_time);
+
+ const aos::monotonic_clock::time_point monotonic_now =
+ loop1->monotonic_now();
+ const aos::realtime_clock::time_point realtime_now = loop1->realtime_now();
+
+ EXPECT_LE(loop1->context().monotonic_event_time, monotonic_now);
+ EXPECT_LE(loop1->context().realtime_event_time, realtime_now);
+ EXPECT_GE(loop1->context().monotonic_event_time + chrono::milliseconds(500),
+ monotonic_now);
+ EXPECT_GE(loop1->context().realtime_event_time + chrono::milliseconds(500),
+ realtime_now);
+
+ triggered = true;
+ });
+
+ test_timer->Setup(loop1->monotonic_now() + ::std::chrono::seconds(1));
+
+ EndEventLoop(loop1.get(), ::std::chrono::seconds(2));
+ Run();
+
+ ASSERT_TRUE(triggered);
monotonic_clock::duration monotonic_time_offset =
fetcher.context().monotonic_event_time -
@@ -1336,6 +1532,19 @@
[](const Context &, const void *) {});
}
+// Tests that no-arg watchers work with a node setup.
+TEST_P(AbstractEventLoopTest, NodeNoArgWatcher) {
+ EnableNodes("me");
+
+ auto loop1 = Make();
+ auto loop2 = Make();
+ loop1->MakeWatcher("/test", [](const TestMessage &) {});
+ loop2->MakeRawNoArgWatcher(
+ configuration::GetChannel(configuration(), "/test", "aos.TestMessage", "",
+ nullptr),
+ [](const Context &) {});
+}
+
// Tests that fetcher work with a node setup.
TEST_P(AbstractEventLoopTest, NodeFetcher) {
EnableNodes("me");
@@ -1370,6 +1579,16 @@
[](const Context &, const void *) {});
},
"node");
+ EXPECT_DEATH({ loop1->MakeNoArgWatcher<TestMessage>("/test", []() {}); },
+ "node");
+ EXPECT_DEATH(
+ {
+ loop2->MakeRawNoArgWatcher(
+ configuration::GetChannel(configuration(), "/test",
+ "aos.TestMessage", "", nullptr),
+ [](const Context &) {});
+ },
+ "node");
}
// Tests that fetchers fail when created on the wrong node.
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 1daf88c..5534c83 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -6,13 +6,16 @@
#include "glog/logging.h"
namespace aos {
+namespace event_loop_internal {
-// From a watch functor, this will extract the message type of the argument.
-// This is the template forward declaration, and it extracts the call operator
-// as a PTMF to be used by the following specialization.
+// From a watch functor, specializations of this will extract the message type
+// of the template argument. If T is not a valid message type, there will be no
+// matching specialization.
+//
+// This is just the forward declaration, which will be used by one of the
+// following specializations to match valid argument types.
template <class T>
-struct watch_message_type_trait
- : watch_message_type_trait<decltype(&T::operator())> {};
+struct watch_message_type_trait;
// From a watch functor, this will extract the message type of the argument.
// This is the template specialization.
@@ -21,6 +24,8 @@
using message_type = typename std::decay<A1>::type;
};
+} // namespace event_loop_internal
+
template <typename T>
typename Sender<T>::Builder Sender<T>::MakeBuilder() {
return Builder(sender_.get(), sender_->fbb_allocator());
@@ -28,19 +33,38 @@
template <typename Watch>
void EventLoop::MakeWatcher(const std::string_view channel_name, Watch &&w) {
- using T = typename watch_message_type_trait<Watch>::message_type;
+ using MessageType =
+ typename event_loop_internal::watch_message_type_trait<decltype(
+ &Watch::operator())>::message_type;
const Channel *channel = configuration::GetChannel(
- configuration_, channel_name, T::GetFullyQualifiedName(), name(), node());
+ configuration_, channel_name, MessageType::GetFullyQualifiedName(),
+ name(), node());
CHECK(channel != nullptr)
<< ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
- << T::GetFullyQualifiedName() << "\" } not found in config.";
+ << MessageType::GetFullyQualifiedName() << "\" } not found in config.";
- return MakeRawWatcher(
- channel, [this, w](const Context &context, const void *message) {
- context_ = context;
- w(*flatbuffers::GetRoot<T>(reinterpret_cast<const char *>(message)));
- });
+ MakeRawWatcher(channel,
+ [this, w](const Context &context, const void *message) {
+ context_ = context;
+ w(*flatbuffers::GetRoot<MessageType>(
+ reinterpret_cast<const char *>(message)));
+ });
+}
+
+template <typename MessageType>
+void EventLoop::MakeNoArgWatcher(const std::string_view channel_name,
+ std::function<void()> w) {
+ const Channel *channel = configuration::GetChannel(
+ configuration_, channel_name, MessageType::GetFullyQualifiedName(),
+ name(), node());
+ CHECK(channel != nullptr)
+ << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
+ << MessageType::GetFullyQualifiedName() << "\" } not found in config.";
+ MakeRawNoArgWatcher(channel, [this, w](const Context &context) {
+ context_ = context;
+ w();
+ });
}
inline bool RawFetcher::FetchNext() {
@@ -194,7 +218,9 @@
// context.
void DoCallCallback(std::function<monotonic_clock::time_point()> get_time,
Context context) {
- CheckChannelDataAlignment(context.data, context.size);
+ if (context.data) {
+ CheckChannelDataAlignment(context.data, context.size);
+ }
const monotonic_clock::time_point monotonic_start_time = get_time();
{
const float start_latency =
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 7b4daf8..afd65a3 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -164,17 +164,19 @@
class SimpleShmFetcher {
public:
- explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel)
+ explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel,
+ bool copy_data)
: channel_(channel),
lockless_queue_memory_(
channel,
chrono::ceil<chrono::seconds>(chrono::nanoseconds(
event_loop->configuration()->channel_storage_duration()))),
lockless_queue_(lockless_queue_memory_.memory(),
- lockless_queue_memory_.config()),
- data_storage_(static_cast<char *>(malloc(channel->max_size() +
- kChannelDataAlignment - 1)),
- &free) {
+ lockless_queue_memory_.config()) {
+ if (copy_data) {
+ data_storage_.reset(static_cast<char *>(
+ malloc(channel->max_size() + kChannelDataAlignment - 1)));
+ }
context_.data = nullptr;
// Point the queue index at the next index to read starting now. This
// makes it such that FetchNext will read the next message sent after
@@ -217,8 +219,12 @@
if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
context_.realtime_remote_time = context_.realtime_event_time;
}
- context_.data = data_storage_start() +
- lockless_queue_.message_data_size() - context_.size;
+ if (copy_data()) {
+ context_.data = data_storage_start() +
+ lockless_queue_.message_data_size() - context_.size;
+ } else {
+ context_.data = nullptr;
+ }
actual_queue_index_ = actual_queue_index_.Increment();
}
@@ -267,8 +273,12 @@
if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
context_.realtime_remote_time = context_.realtime_event_time;
}
- context_.data = data_storage_start() +
- lockless_queue_.message_data_size() - context_.size;
+ if (copy_data()) {
+ context_.data = data_storage_start() +
+ lockless_queue_.message_data_size() - context_.size;
+ } else {
+ context_.data = nullptr;
+ }
actual_queue_index_ = queue_index.Increment();
}
@@ -308,8 +318,10 @@
private:
char *data_storage_start() {
+ if (!copy_data()) return nullptr;
return RoundChannelData(data_storage_.get(), channel_->max_size());
}
+ bool copy_data() const { return static_cast<bool>(data_storage_); }
const Channel *const channel_;
MMapedQueue lockless_queue_memory_;
@@ -318,7 +330,8 @@
ipc_lib::QueueIndex actual_queue_index_ =
ipc_lib::LocklessQueue::empty_queue_index();
- std::unique_ptr<char, decltype(&free)> data_storage_;
+ // This being empty indicates we're not going to copy data.
+ std::unique_ptr<char, decltype(&free)> data_storage_{nullptr, &free};
Context context_;
};
@@ -327,7 +340,7 @@
public:
explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
: RawFetcher(event_loop, channel),
- simple_shm_fetcher_(event_loop, channel) {}
+ simple_shm_fetcher_(event_loop, channel, true) {}
~ShmFetcher() { context_.data = nullptr; }
@@ -406,11 +419,12 @@
public:
WatcherState(
ShmEventLoop *event_loop, const Channel *channel,
- std::function<void(const Context &context, const void *message)> fn)
+ std::function<void(const Context &context, const void *message)> fn,
+ bool copy_data)
: aos::WatcherState(event_loop, channel, std::move(fn)),
event_loop_(event_loop),
event_(this),
- simple_shm_fetcher_(event_loop, channel) {}
+ simple_shm_fetcher_(event_loop, channel, copy_data) {}
~WatcherState() override { event_loop_->RemoveEvent(&event_); }
@@ -615,7 +629,18 @@
TakeWatcher(channel);
NewWatcher(::std::unique_ptr<WatcherState>(
- new internal::WatcherState(this, channel, std::move(watcher))));
+ new internal::WatcherState(this, channel, std::move(watcher), true)));
+}
+
+void ShmEventLoop::MakeRawNoArgWatcher(
+ const Channel *channel,
+ std::function<void(const Context &context)> watcher) {
+ TakeWatcher(channel);
+
+ NewWatcher(::std::unique_ptr<WatcherState>(new internal::WatcherState(
+ this, channel,
+ [watcher](const Context &context, const void *) { watcher(context); },
+ false)));
}
TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index fa870b8..d3f1295 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -52,6 +52,9 @@
const Channel *channel,
std::function<void(const Context &context, const void *message)> watcher)
override;
+ void MakeRawNoArgWatcher(
+ const Channel *channel,
+ std::function<void(const Context &context)> watcher) override;
TimerHandler *AddTimer(std::function<void()> callback) override;
aos::PhasedLoopHandler *AddPhasedLoop(
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index 7126ffd..f31d80d 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -792,7 +792,9 @@
}
*monotonic_remote_time = m->header.monotonic_remote_time;
*realtime_remote_time = m->header.realtime_remote_time;
- memcpy(data, m->data(memory_->message_data_size()), message_data_size());
+ if (data) {
+ memcpy(data, m->data(memory_->message_data_size()), message_data_size());
+ }
*length = m->header.length;
// And finally, confirm that the message *still* points to the queue index we
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index 0384aa8..550485f 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -162,6 +162,8 @@
// element newer than QueueSize() from the current message, we consider it
// behind by a large amount and return TOO_OLD. If the message is modified
// out from underneath us as we read it, return OVERWROTE.
+ //
+ // data may be nullptr to indicate the data should not be copied.
enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
ReadResult Read(uint32_t queue_index,
::aos::monotonic_clock::time_point *monotonic_sent_time,