blob: d539386c29c5cd2f4504ffaf082b29a2d37ebb6d [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#ifndef AOS_IPC_LIB_LOCKLESS_QUEUE_H_
2#define AOS_IPC_LIB_LOCKLESS_QUEUE_H_
3
4#include <signal.h>
5#include <sys/signalfd.h>
6#include <sys/types.h>
Austin Schuh20b2b082019-09-11 20:42:56 -07007#include <vector>
8
9#include "aos/ipc_lib/aos_sync.h"
10#include "aos/ipc_lib/index.h"
11#include "aos/time/time.h"
12
13namespace aos {
14namespace ipc_lib {
15
16// Structure to hold the state required to wake a watcher.
17struct Watcher {
18 // Mutex that the watcher locks. If the futex is 0 (or FUTEX_OWNER_DIED),
19 // then this watcher is invalid. The futex variable will then hold the tid of
20 // the watcher, or FUTEX_OWNER_DIED if the task died.
21 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080022 // Note: this is only modified with the queue_setup_lock lock held, but may
23 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070024 // Any state modification should happen before the lock is acquired.
25 aos_mutex tid;
26
27 // PID of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080028 std::atomic<pid_t> pid;
Austin Schuh20b2b082019-09-11 20:42:56 -070029
30 // RT priority of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080031 std::atomic<int> priority;
Austin Schuh20b2b082019-09-11 20:42:56 -070032};
33
34// Structure to hold the state required to send messages.
35struct Sender {
36 // Mutex that the sender locks. If the futex is 0 (or FUTEX_OWNER_DIED), then
37 // this sender is invalid. The futex variable will then hold the tid of the
38 // sender, or FUTEX_OWNER_DIED if the task died.
39 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080040 // Note: this is only modified with the queue_setup_lock lock held, but may
41 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070042 aos_mutex tid;
43
44 // Index of the message we will be filling out.
45 AtomicIndex scratch_index;
46
47 // Index of the element being swapped with scratch_index, or Invalid if there
48 // is nothing to do.
49 AtomicIndex to_replace;
50};
51
52// Structure representing a message.
53struct Message {
54 struct Header {
55 // Index of this message in the queue. Needs to match the index this
56 // message is written into the queue at. The data in this message is only
57 // valid if it matches the index in the queue both before and after all the
58 // data is read.
59 //
60 // Note: a value of 0xffffffff always means that the contents aren't valid.
61 AtomicQueueIndex queue_index;
62
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080063 // Timestamp of the message. Needs to be monotonically incrementing in the
Austin Schuh20b2b082019-09-11 20:42:56 -070064 // queue, which means that time needs to be re-sampled every time a write
65 // fails.
66 ::aos::monotonic_clock::time_point monotonic_sent_time;
67 ::aos::realtime_clock::time_point realtime_sent_time;
68
69 size_t length;
70 } header;
71
72 char data[];
73};
74
75struct LocklessQueueConfiguration {
76 // Size of the watchers list.
77 size_t num_watchers;
78 // Size of the sender list.
79 size_t num_senders;
80
81 // Size of the list of pointers into the messages list.
82 size_t queue_size;
83 // Size in bytes of the data stored in each Message.
84 size_t message_data_size;
85
Austin Schuh4bc4f902019-12-23 18:04:51 -080086 size_t message_size() const;
Austin Schuh20b2b082019-09-11 20:42:56 -070087
88 size_t num_messages() const { return num_senders + queue_size; }
89};
90
91// Structure to hold the state of the queue.
92//
93// Reads and writes are lockless and constant time.
94//
95// Adding a new watcher doesn't need to be constant time for the watcher (this
96// is done before the watcher goes RT), but needs to be RT for the sender.
97struct LocklessQueueMemory;
98
99// Initializes the queue memory. memory must be either a valid pointer to the
100// queue datastructure, or must be zero initialized.
101LocklessQueueMemory *InitializeLocklessQueueMemory(
102 LocklessQueueMemory *memory, LocklessQueueConfiguration config);
103
104// Returns the size of the LocklessQueueMemory.
105size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
106
107// Prints to stdout the data inside the queue for debugging.
108void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
109
Alex Perrycb7da4b2019-08-28 19:35:56 -0700110const static unsigned int kWakeupSignal = SIGRTMIN + 2;
Austin Schuh20b2b082019-09-11 20:42:56 -0700111
112// Class to manage sending and receiving data in the lockless queue. This is
113// separate from the actual memory backing the queue so that memory can be
114// managed with mmap to share across the process boundary.
115class LocklessQueue {
116 public:
117 LocklessQueue(LocklessQueueMemory *memory, LocklessQueueConfiguration config);
118 LocklessQueue(const LocklessQueue &) = delete;
119 LocklessQueue &operator=(const LocklessQueue &) = delete;
120
121 ~LocklessQueue();
122
123 // Returns the number of messages in the queue.
124 size_t QueueSize() const;
125
Alex Perrycb7da4b2019-08-28 19:35:56 -0700126 size_t message_data_size() const;
127
Austin Schuh20b2b082019-09-11 20:42:56 -0700128 // Registers this thread to receive the kWakeupSignal signal when Wakeup is
129 // called. Returns false if there was an error in registration.
130 bool RegisterWakeup(int priority);
131 // Unregisters the wakeup.
132 void UnregisterWakeup();
133
134 // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
135 //
136 // priority of 0 means nonrt. nonrt could have issues, so we don't PI boost
137 // if nonrt.
138 int Wakeup(int current_priority);
139
140 // If you ask for a queue index 2 past the newest, you will still get
141 // NOTHING_NEW until that gets overwritten with new data. If you ask for an
142 // element newer than QueueSize() from the current message, we consider it
Alex Perrycb7da4b2019-08-28 19:35:56 -0700143 // behind by a large amount and return TOO_OLD. If the message is modified
144 // out from underneath us as we read it, return OVERWROTE.
145 enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
Austin Schuh20b2b082019-09-11 20:42:56 -0700146 ReadResult Read(uint32_t queue_index,
147 ::aos::monotonic_clock::time_point *monotonic_sent_time,
148 ::aos::realtime_clock::time_point *realtime_sent_time,
149 size_t *length, char *data);
150
151 // Returns the index to the latest queue message. Returns empty_queue_index()
152 // if there are no messages in the queue. Do note that this index wraps if
153 // more than 2^32 messages are sent.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700154 QueueIndex LatestQueueIndex();
155 static QueueIndex empty_queue_index() { return QueueIndex::Invalid(); }
156
157 // Returns the size of the queue. This is mostly useful for manipulating
158 // QueueIndex.
159 size_t queue_size() const;
Austin Schuh20b2b082019-09-11 20:42:56 -0700160
161 // TODO(austin): Return the oldest queue index. This lets us catch up nicely
162 // if we got behind.
163 // The easiest way to implement this is likely going to be to reserve the
164 // first modulo of values for the initial time around, and never reuse them.
165 // That lets us do a simple atomic read of the next index and deduce what has
166 // happened. It will involve the simplest atomic operations.
167
168 // TODO(austin): Make it so we can find the indices which were sent just
169 // before and after a time with a binary search.
170
171 // Sender for blocks of data. The resources associated with a sender are
172 // scoped to this object's lifetime.
173 class Sender {
174 public:
175 Sender(const Sender &) = delete;
176 Sender &operator=(const Sender &) = delete;
177 Sender(Sender &&other)
178 : memory_(other.memory_), sender_index_(other.sender_index_) {
179 other.memory_ = nullptr;
180 other.sender_index_ = -1;
181 }
182 Sender &operator=(Sender &&other) {
183 memory_ = other.memory_;
184 sender_index_ = other.sender_index_;
185 other.memory_ = nullptr;
186 other.sender_index_ = -1;
187 return *this;
188 }
189
190 ~Sender();
191
Alex Perrycb7da4b2019-08-28 19:35:56 -0700192 // Sends a message without copying the data.
193 // Copy at most size() bytes of data into the memory pointed to by Data(),
194 // and then call Send().
195 // Note: calls to Data() are expensive enough that you should cache it.
196 size_t size();
197 void *Data();
198 void Send(size_t length);
199
Austin Schuh20b2b082019-09-11 20:42:56 -0700200 // Sends up to length data. Does not wakeup the target.
201 void Send(const char *data, size_t length);
202
203 private:
204 friend class LocklessQueue;
205
206 Sender(LocklessQueueMemory *memory);
207
208 // Pointer to the backing memory.
209 LocklessQueueMemory *memory_ = nullptr;
210
211 // Index into the sender list.
212 int sender_index_ = -1;
213 };
214
215 // Creates a sender.
216 Sender MakeSender();
217
218 private:
219 LocklessQueueMemory *memory_ = nullptr;
220
221 // Memory and datastructure used to sort a list of watchers to wake
222 // up. This isn't a copy of Watcher since tid is simpler to work with here
223 // than the futex above.
224 struct WatcherCopy {
225 pid_t tid;
226 pid_t pid;
227 int priority;
228 };
229 // TODO(austin): Don't allocate this memory if we aren't going to send.
230 ::std::vector<WatcherCopy> watcher_copy_;
231
232 // Index in the watcher list that our entry is, or -1 if no watcher is
233 // registered.
234 int watcher_index_ = -1;
235
236 const int pid_;
237 const uid_t uid_;
238};
239
240} // namespace ipc_lib
241} // namespace aos
242
243#endif // AOS_IPC_LIB_LOCKLESS_QUEUE_H_