Add should_fetch to lockless_queue so we can conditionally fetch
For logging, we want to be able to only fetch the next message if it is
time to do so. This adds a callback which is provided all the context
information at the right point to decide if we actually fetch or not.
Next step is to actually expose it through EventLoop.
Change-Id: Ic15eedc46116f8acfcb5a80ed551e89e9d6b7fa8
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index f83b558..2cafb48 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -10,6 +10,7 @@
#include "absl/types/span.h"
+#include "aos/events/context.h"
#include "aos/ipc_lib/aos_sync.h"
#include "aos/ipc_lib/data_alignment.h"
#include "aos/ipc_lib/index.h"
@@ -406,7 +407,19 @@
class LocklessQueueReader {
public:
- enum class Result { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
+ enum class Result {
+ // Message we read was too old and no longer is in the queue.
+ TOO_OLD,
+ // Success!
+ GOOD,
+ // The message is in the future and we haven't written it yet.
+ NOTHING_NEW,
+ // There is a message, but should_read() returned false so we didn't fetch
+ // it.
+ FILTERED,
+ // The message got overwritten while we were reading it.
+ OVERWROTE,
+ };
LocklessQueueReader(LocklessQueue queue) : memory_(queue.const_memory()) {
queue.Initialize();
@@ -416,7 +429,8 @@
// NOTHING_NEW until that gets overwritten with new data. If you ask for an
// element newer than QueueSize() from the current message, we consider it
// behind by a large amount and return TOO_OLD. If the message is modified
- // out from underneath us as we read it, return OVERWROTE.
+ // out from underneath us as we read it, return OVERWROTE. If we found a new
+ // message, but the filter function returned false, return FILTERED.
//
// data may be nullptr to indicate the data should not be copied.
Result Read(uint32_t queue_index,
@@ -425,7 +439,8 @@
monotonic_clock::time_point *monotonic_remote_time,
realtime_clock::time_point *realtime_remote_time,
uint32_t *remote_queue_index, UUID *source_boot_uuid,
- size_t *length, char *data) const;
+ size_t *length, char *data,
+ std::function<bool(const Context &context)> should_read) const;
// Returns the index to the latest queue message. Returns empty_queue_index()
// if there are no messages in the queue. Do note that this index wraps if