Add should_fetch to lockless_queue so we can conditionally fetch

For logging, we want to be able to only fetch the next message if it is
time to do so.  This adds a callback which is provided all the context
information at the right point to decide if we actually fetch or not.
Next step is to actually expose it through EventLoop.

Change-Id: Ic15eedc46116f8acfcb5a80ed551e89e9d6b7fa8
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 08e3998..a65c71a 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -96,6 +96,20 @@
 )
 
 cc_library(
+    name = "context",
+    hdrs = [
+        "context.h",
+    ],
+    target_compatible_with = ["@platforms//os:linux"],
+    visibility = ["//visibility:public"],
+    deps = [
+        "//aos:flatbuffers",
+        "//aos:uuid",
+        "//aos/time",
+    ],
+)
+
+cc_library(
     name = "event_loop",
     srcs = [
         "event_loop.cc",
@@ -109,6 +123,7 @@
     target_compatible_with = ["@platforms//os:linux"],
     visibility = ["//visibility:public"],
     deps = [
+        ":context",
         ":event_loop_fbs",
         ":timing_statistics",
         "//aos:configuration",
diff --git a/aos/events/context.h b/aos/events/context.h
new file mode 100644
index 0000000..6648949
--- /dev/null
+++ b/aos/events/context.h
@@ -0,0 +1,70 @@
+#ifndef AOS_EVENTS_CONTEXT_H_
+#define AOS_EVENTS_CONTEXT_H_
+
+#include "aos/flatbuffers.h"
+#include "aos/time/time.h"
+#include "aos/uuid.h"
+
+namespace aos {
+
+// Struct available on Watchers, Fetchers, Timers, and PhasedLoops with context
+// about the current message.
+struct Context {
+  // Time that the message was sent on this node, or the timer was triggered.
+  monotonic_clock::time_point monotonic_event_time;
+  // Realtime the message was sent on this node.  This is set to min_time for
+  // Timers and PhasedLoops.
+  realtime_clock::time_point realtime_event_time;
+
+  // The rest are only valid for Watchers and Fetchers.
+
+  // For a single-node configuration, these two are identical to *_event_time.
+  // In a multinode configuration, these are the times that the message was
+  // sent on the original node.
+  monotonic_clock::time_point monotonic_remote_time;
+  realtime_clock::time_point realtime_remote_time;
+
+  // Index in the queue.
+  uint32_t queue_index;
+  // Index into the remote queue.  Useful to determine if data was lost.  In a
+  // single-node configuration, this will match queue_index.
+  uint32_t remote_queue_index;
+
+  // Size of the data sent.
+  size_t size;
+  // Pointer to the data.
+  const void *data;
+
+  // Index of the message buffer. This will be in [0, NumberBuffers) on
+  // read_method=PIN channels, and -1 for other channels.
+  //
+  // This only tells you about the underlying storage for this message, not
+  // anything about its position in the queue. This is only useful for advanced
+  // zero-copy use cases, on read_method=PIN channels.
+  //
+  // This will uniquely identify a message on this channel at a point in time.
+  // For senders, this point in time is while the sender has the message. With
+  // read_method==PIN, this point in time includes while the caller has access
+  // to this context. For other read_methods, this point in time may be before
+  // the caller has access to this context, which makes this pretty useless.
+  int buffer_index;
+
+  // UUID of the remote node which sent this message, or this node in the case
+  // of events which are local to this node.
+  UUID source_boot_uuid = UUID::Zero();
+
+  // Efficiently copies the flatbuffer into a FlatbufferVector, allocating
+  // memory in the process.  It is vital that T matches the type of the
+  // underlying flatbuffer.
+  template <typename T>
+  FlatbufferVector<T> CopyFlatBuffer() const {
+    ResizeableBuffer buffer;
+    buffer.resize(size);
+    memcpy(buffer.data(), data, size);
+    return FlatbufferVector<T>(std::move(buffer));
+  }
+};
+
+}  // namespace aos
+
+#endif  // AOS_EVENTS_CONTEXT_H_
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index e53bc21..17e3e00 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -15,6 +15,7 @@
 #include "aos/configuration.h"
 #include "aos/configuration_generated.h"
 #include "aos/events/channel_preallocated_allocator.h"
+#include "aos/events/context.h"
 #include "aos/events/event_loop_event.h"
 #include "aos/events/event_loop_generated.h"
 #include "aos/events/timing_statistics.h"
@@ -34,64 +35,6 @@
 class EventLoop;
 class WatcherState;
 
-// Struct available on Watchers, Fetchers, Timers, and PhasedLoops with context
-// about the current message.
-struct Context {
-  // Time that the message was sent on this node, or the timer was triggered.
-  monotonic_clock::time_point monotonic_event_time;
-  // Realtime the message was sent on this node.  This is set to min_time for
-  // Timers and PhasedLoops.
-  realtime_clock::time_point realtime_event_time;
-
-  // The rest are only valid for Watchers and Fetchers.
-
-  // For a single-node configuration, these two are identical to *_event_time.
-  // In a multinode configuration, these are the times that the message was
-  // sent on the original node.
-  monotonic_clock::time_point monotonic_remote_time;
-  realtime_clock::time_point realtime_remote_time;
-
-  // Index in the queue.
-  uint32_t queue_index;
-  // Index into the remote queue.  Useful to determine if data was lost.  In a
-  // single-node configuration, this will match queue_index.
-  uint32_t remote_queue_index;
-
-  // Size of the data sent.
-  size_t size;
-  // Pointer to the data.
-  const void *data;
-
-  // Index of the message buffer. This will be in [0, NumberBuffers) on
-  // read_method=PIN channels, and -1 for other channels.
-  //
-  // This only tells you about the underlying storage for this message, not
-  // anything about its position in the queue. This is only useful for advanced
-  // zero-copy use cases, on read_method=PIN channels.
-  //
-  // This will uniquely identify a message on this channel at a point in time.
-  // For senders, this point in time is while the sender has the message. With
-  // read_method==PIN, this point in time includes while the caller has access
-  // to this context. For other read_methods, this point in time may be before
-  // the caller has access to this context, which makes this pretty useless.
-  int buffer_index;
-
-  // UUID of the remote node which sent this message, or this node in the case
-  // of events which are local to this node.
-  UUID source_boot_uuid = UUID::Zero();
-
-  // Efficiently copies the flatbuffer into a FlatbufferVector, allocating
-  // memory in the process.  It is vital that T matches the type of the
-  // underlying flatbuffer.
-  template <typename T>
-  FlatbufferVector<T> CopyFlatBuffer() const {
-    ResizeableBuffer buffer;
-    buffer.resize(size);
-    memcpy(buffer.data(), data, size);
-    return FlatbufferVector<T>(std::move(buffer));
-  }
-};
-
 // Raw version of fetcher. Contains a local variable that the fetcher will
 // update.  This is used for reflection and as an interface to implement typed
 // fetchers.
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 314e366..8ed6e0e 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -226,7 +226,8 @@
         queue_index.index(), &context_.monotonic_event_time,
         &context_.realtime_event_time, &context_.monotonic_remote_time,
         &context_.realtime_remote_time, &context_.remote_queue_index,
-        &context_.source_boot_uuid, &context_.size, copy_buffer);
+        &context_.source_boot_uuid, &context_.size, copy_buffer,
+        std::ref(should_fetch_));
 
     if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
       if (pin_data()) {
@@ -320,6 +321,11 @@
   std::optional<ipc_lib::LocklessQueuePinner> pinner_;
 
   Context context_;
+
+  // Pre-allocated should_fetch function so we don't allocate.
+  std::function<bool(const Context &)> should_fetch_ = [](const Context &) {
+    return true;
+  };
 };
 
 class ShmFetcher : public RawFetcher {