Add should_fetch to lockless_queue so we can conditionally fetch
For logging, we want to be able to only fetch the next message if it is
time to do so. This adds a callback which is provided all the context
information at the right point to decide if we actually fetch or not.
Next step is to actually expose it through EventLoop.
Change-Id: Ic15eedc46116f8acfcb5a80ed551e89e9d6b7fa8
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 08e3998..a65c71a 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -96,6 +96,20 @@
)
cc_library(
+ name = "context",
+ hdrs = [
+ "context.h",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//aos:flatbuffers",
+ "//aos:uuid",
+ "//aos/time",
+ ],
+)
+
+cc_library(
name = "event_loop",
srcs = [
"event_loop.cc",
@@ -109,6 +123,7 @@
target_compatible_with = ["@platforms//os:linux"],
visibility = ["//visibility:public"],
deps = [
+ ":context",
":event_loop_fbs",
":timing_statistics",
"//aos:configuration",
diff --git a/aos/events/context.h b/aos/events/context.h
new file mode 100644
index 0000000..6648949
--- /dev/null
+++ b/aos/events/context.h
@@ -0,0 +1,70 @@
+#ifndef AOS_EVENTS_CONTEXT_H_
+#define AOS_EVENTS_CONTEXT_H_
+
+#include "aos/flatbuffers.h"
+#include "aos/time/time.h"
+#include "aos/uuid.h"
+
+namespace aos {
+
+// Struct available on Watchers, Fetchers, Timers, and PhasedLoops with context
+// about the current message.
+struct Context {
+ // Time that the message was sent on this node, or the timer was triggered.
+ monotonic_clock::time_point monotonic_event_time;
+ // Realtime the message was sent on this node. This is set to min_time for
+ // Timers and PhasedLoops.
+ realtime_clock::time_point realtime_event_time;
+
+ // The rest are only valid for Watchers and Fetchers.
+
+ // For a single-node configuration, these two are identical to *_event_time.
+ // In a multinode configuration, these are the times that the message was
+ // sent on the original node.
+ monotonic_clock::time_point monotonic_remote_time;
+ realtime_clock::time_point realtime_remote_time;
+
+ // Index in the queue.
+ uint32_t queue_index;
+ // Index into the remote queue. Useful to determine if data was lost. In a
+ // single-node configuration, this will match queue_index.
+ uint32_t remote_queue_index;
+
+ // Size of the data sent.
+ size_t size;
+ // Pointer to the data.
+ const void *data;
+
+ // Index of the message buffer. This will be in [0, NumberBuffers) on
+ // read_method=PIN channels, and -1 for other channels.
+ //
+ // This only tells you about the underlying storage for this message, not
+ // anything about its position in the queue. This is only useful for advanced
+ // zero-copy use cases, on read_method=PIN channels.
+ //
+ // This will uniquely identify a message on this channel at a point in time.
+ // For senders, this point in time is while the sender has the message. With
+ // read_method==PIN, this point in time includes while the caller has access
+ // to this context. For other read_methods, this point in time may be before
+ // the caller has access to this context, which makes this pretty useless.
+ int buffer_index;
+
+ // UUID of the remote node which sent this message, or this node in the case
+ // of events which are local to this node.
+ UUID source_boot_uuid = UUID::Zero();
+
+ // Efficiently copies the flatbuffer into a FlatbufferVector, allocating
+ // memory in the process. It is vital that T matches the type of the
+ // underlying flatbuffer.
+ template <typename T>
+ FlatbufferVector<T> CopyFlatBuffer() const {
+ ResizeableBuffer buffer;
+ buffer.resize(size);
+ memcpy(buffer.data(), data, size);
+ return FlatbufferVector<T>(std::move(buffer));
+ }
+};
+
+} // namespace aos
+
+#endif // AOS_EVENTS_CONTEXT_H_
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index e53bc21..17e3e00 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -15,6 +15,7 @@
#include "aos/configuration.h"
#include "aos/configuration_generated.h"
#include "aos/events/channel_preallocated_allocator.h"
+#include "aos/events/context.h"
#include "aos/events/event_loop_event.h"
#include "aos/events/event_loop_generated.h"
#include "aos/events/timing_statistics.h"
@@ -34,64 +35,6 @@
class EventLoop;
class WatcherState;
-// Struct available on Watchers, Fetchers, Timers, and PhasedLoops with context
-// about the current message.
-struct Context {
- // Time that the message was sent on this node, or the timer was triggered.
- monotonic_clock::time_point monotonic_event_time;
- // Realtime the message was sent on this node. This is set to min_time for
- // Timers and PhasedLoops.
- realtime_clock::time_point realtime_event_time;
-
- // The rest are only valid for Watchers and Fetchers.
-
- // For a single-node configuration, these two are identical to *_event_time.
- // In a multinode configuration, these are the times that the message was
- // sent on the original node.
- monotonic_clock::time_point monotonic_remote_time;
- realtime_clock::time_point realtime_remote_time;
-
- // Index in the queue.
- uint32_t queue_index;
- // Index into the remote queue. Useful to determine if data was lost. In a
- // single-node configuration, this will match queue_index.
- uint32_t remote_queue_index;
-
- // Size of the data sent.
- size_t size;
- // Pointer to the data.
- const void *data;
-
- // Index of the message buffer. This will be in [0, NumberBuffers) on
- // read_method=PIN channels, and -1 for other channels.
- //
- // This only tells you about the underlying storage for this message, not
- // anything about its position in the queue. This is only useful for advanced
- // zero-copy use cases, on read_method=PIN channels.
- //
- // This will uniquely identify a message on this channel at a point in time.
- // For senders, this point in time is while the sender has the message. With
- // read_method==PIN, this point in time includes while the caller has access
- // to this context. For other read_methods, this point in time may be before
- // the caller has access to this context, which makes this pretty useless.
- int buffer_index;
-
- // UUID of the remote node which sent this message, or this node in the case
- // of events which are local to this node.
- UUID source_boot_uuid = UUID::Zero();
-
- // Efficiently copies the flatbuffer into a FlatbufferVector, allocating
- // memory in the process. It is vital that T matches the type of the
- // underlying flatbuffer.
- template <typename T>
- FlatbufferVector<T> CopyFlatBuffer() const {
- ResizeableBuffer buffer;
- buffer.resize(size);
- memcpy(buffer.data(), data, size);
- return FlatbufferVector<T>(std::move(buffer));
- }
-};
-
// Raw version of fetcher. Contains a local variable that the fetcher will
// update. This is used for reflection and as an interface to implement typed
// fetchers.
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 314e366..8ed6e0e 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -226,7 +226,8 @@
queue_index.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
- &context_.source_boot_uuid, &context_.size, copy_buffer);
+ &context_.source_boot_uuid, &context_.size, copy_buffer,
+ std::ref(should_fetch_));
if (read_result == ipc_lib::LocklessQueueReader::Result::GOOD) {
if (pin_data()) {
@@ -320,6 +321,11 @@
std::optional<ipc_lib::LocklessQueuePinner> pinner_;
Context context_;
+
+ // Pre-allocated should_fetch function so we don't allocate.
+ std::function<bool(const Context &)> should_fetch_ = [](const Context &) {
+ return true;
+ };
};
class ShmFetcher : public RawFetcher {