blob: fcc5d79f8bd37a33137b37c1861d4e34b2854def [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#ifndef AOS_IPC_LIB_LOCKLESS_QUEUE_H_
2#define AOS_IPC_LIB_LOCKLESS_QUEUE_H_
3
4#include <signal.h>
5#include <sys/signalfd.h>
6#include <sys/types.h>
7#include <atomic>
8#include <vector>
9
10#include "aos/ipc_lib/aos_sync.h"
11#include "aos/ipc_lib/index.h"
12#include "aos/time/time.h"
13
14namespace aos {
15namespace ipc_lib {
16
17// Structure to hold the state required to wake a watcher.
18struct Watcher {
19 // Mutex that the watcher locks. If the futex is 0 (or FUTEX_OWNER_DIED),
20 // then this watcher is invalid. The futex variable will then hold the tid of
21 // the watcher, or FUTEX_OWNER_DIED if the task died.
22 //
23 // Note: this is modified with a lock held, but is always able to be read.
24 // Any state modification should happen before the lock is acquired.
25 aos_mutex tid;
26
27 // PID of the watcher.
28 pid_t pid;
29
30 // RT priority of the watcher.
31 int priority;
32};
33
34// Structure to hold the state required to send messages.
35struct Sender {
36 // Mutex that the sender locks. If the futex is 0 (or FUTEX_OWNER_DIED), then
37 // this sender is invalid. The futex variable will then hold the tid of the
38 // sender, or FUTEX_OWNER_DIED if the task died.
39 //
40 // Note: this is modified with a lock held, but is always able to be read.
41 aos_mutex tid;
42
43 // Index of the message we will be filling out.
44 AtomicIndex scratch_index;
45
46 // Index of the element being swapped with scratch_index, or Invalid if there
47 // is nothing to do.
48 AtomicIndex to_replace;
49};
50
51// Structure representing a message.
52struct Message {
53 struct Header {
54 // Index of this message in the queue. Needs to match the index this
55 // message is written into the queue at. The data in this message is only
56 // valid if it matches the index in the queue both before and after all the
57 // data is read.
58 //
59 // Note: a value of 0xffffffff always means that the contents aren't valid.
60 AtomicQueueIndex queue_index;
61
62 // Timestamp of the message. Needs to be atomically incrementing in the
63 // queue, which means that time needs to be re-sampled every time a write
64 // fails.
65 ::aos::monotonic_clock::time_point monotonic_sent_time;
66 ::aos::realtime_clock::time_point realtime_sent_time;
67
68 size_t length;
69 } header;
70
71 char data[];
72};
73
74struct LocklessQueueConfiguration {
75 // Size of the watchers list.
76 size_t num_watchers;
77 // Size of the sender list.
78 size_t num_senders;
79
80 // Size of the list of pointers into the messages list.
81 size_t queue_size;
82 // Size in bytes of the data stored in each Message.
83 size_t message_data_size;
84
85 size_t message_size() const { return message_data_size + sizeof(Message); }
86
87 size_t num_messages() const { return num_senders + queue_size; }
88};
89
90// Structure to hold the state of the queue.
91//
92// Reads and writes are lockless and constant time.
93//
94// Adding a new watcher doesn't need to be constant time for the watcher (this
95// is done before the watcher goes RT), but needs to be RT for the sender.
96struct LocklessQueueMemory;
97
98// Initializes the queue memory. memory must be either a valid pointer to the
99// queue datastructure, or must be zero initialized.
100LocklessQueueMemory *InitializeLocklessQueueMemory(
101 LocklessQueueMemory *memory, LocklessQueueConfiguration config);
102
103// Returns the size of the LocklessQueueMemory.
104size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
105
106// Prints to stdout the data inside the queue for debugging.
107void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
108
Alex Perrycb7da4b2019-08-28 19:35:56 -0700109const static unsigned int kWakeupSignal = SIGRTMIN + 2;
Austin Schuh20b2b082019-09-11 20:42:56 -0700110
111// Class to manage sending and receiving data in the lockless queue. This is
112// separate from the actual memory backing the queue so that memory can be
113// managed with mmap to share across the process boundary.
114class LocklessQueue {
115 public:
116 LocklessQueue(LocklessQueueMemory *memory, LocklessQueueConfiguration config);
117 LocklessQueue(const LocklessQueue &) = delete;
118 LocklessQueue &operator=(const LocklessQueue &) = delete;
119
120 ~LocklessQueue();
121
122 // Returns the number of messages in the queue.
123 size_t QueueSize() const;
124
Alex Perrycb7da4b2019-08-28 19:35:56 -0700125 size_t message_data_size() const;
126
Austin Schuh20b2b082019-09-11 20:42:56 -0700127 // Registers this thread to receive the kWakeupSignal signal when Wakeup is
128 // called. Returns false if there was an error in registration.
129 bool RegisterWakeup(int priority);
130 // Unregisters the wakeup.
131 void UnregisterWakeup();
132
133 // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
134 //
135 // priority of 0 means nonrt. nonrt could have issues, so we don't PI boost
136 // if nonrt.
137 int Wakeup(int current_priority);
138
139 // If you ask for a queue index 2 past the newest, you will still get
140 // NOTHING_NEW until that gets overwritten with new data. If you ask for an
141 // element newer than QueueSize() from the current message, we consider it
Alex Perrycb7da4b2019-08-28 19:35:56 -0700142 // behind by a large amount and return TOO_OLD. If the message is modified
143 // out from underneath us as we read it, return OVERWROTE.
144 enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
Austin Schuh20b2b082019-09-11 20:42:56 -0700145 ReadResult Read(uint32_t queue_index,
146 ::aos::monotonic_clock::time_point *monotonic_sent_time,
147 ::aos::realtime_clock::time_point *realtime_sent_time,
148 size_t *length, char *data);
149
150 // Returns the index to the latest queue message. Returns empty_queue_index()
151 // if there are no messages in the queue. Do note that this index wraps if
152 // more than 2^32 messages are sent.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700153 QueueIndex LatestQueueIndex();
154 static QueueIndex empty_queue_index() { return QueueIndex::Invalid(); }
155
156 // Returns the size of the queue. This is mostly useful for manipulating
157 // QueueIndex.
158 size_t queue_size() const;
Austin Schuh20b2b082019-09-11 20:42:56 -0700159
160 // TODO(austin): Return the oldest queue index. This lets us catch up nicely
161 // if we got behind.
162 // The easiest way to implement this is likely going to be to reserve the
163 // first modulo of values for the initial time around, and never reuse them.
164 // That lets us do a simple atomic read of the next index and deduce what has
165 // happened. It will involve the simplest atomic operations.
166
167 // TODO(austin): Make it so we can find the indices which were sent just
168 // before and after a time with a binary search.
169
170 // Sender for blocks of data. The resources associated with a sender are
171 // scoped to this object's lifetime.
172 class Sender {
173 public:
174 Sender(const Sender &) = delete;
175 Sender &operator=(const Sender &) = delete;
176 Sender(Sender &&other)
177 : memory_(other.memory_), sender_index_(other.sender_index_) {
178 other.memory_ = nullptr;
179 other.sender_index_ = -1;
180 }
181 Sender &operator=(Sender &&other) {
182 memory_ = other.memory_;
183 sender_index_ = other.sender_index_;
184 other.memory_ = nullptr;
185 other.sender_index_ = -1;
186 return *this;
187 }
188
189 ~Sender();
190
Alex Perrycb7da4b2019-08-28 19:35:56 -0700191 // Sends a message without copying the data.
192 // Copy at most size() bytes of data into the memory pointed to by Data(),
193 // and then call Send().
194 // Note: calls to Data() are expensive enough that you should cache it.
195 size_t size();
196 void *Data();
197 void Send(size_t length);
198
Austin Schuh20b2b082019-09-11 20:42:56 -0700199 // Sends up to length data. Does not wakeup the target.
200 void Send(const char *data, size_t length);
201
202 private:
203 friend class LocklessQueue;
204
205 Sender(LocklessQueueMemory *memory);
206
207 // Pointer to the backing memory.
208 LocklessQueueMemory *memory_ = nullptr;
209
210 // Index into the sender list.
211 int sender_index_ = -1;
212 };
213
214 // Creates a sender.
215 Sender MakeSender();
216
217 private:
218 LocklessQueueMemory *memory_ = nullptr;
219
220 // Memory and datastructure used to sort a list of watchers to wake
221 // up. This isn't a copy of Watcher since tid is simpler to work with here
222 // than the futex above.
223 struct WatcherCopy {
224 pid_t tid;
225 pid_t pid;
226 int priority;
227 };
228 // TODO(austin): Don't allocate this memory if we aren't going to send.
229 ::std::vector<WatcherCopy> watcher_copy_;
230
231 // Index in the watcher list that our entry is, or -1 if no watcher is
232 // registered.
233 int watcher_index_ = -1;
234
235 const int pid_;
236 const uid_t uid_;
237};
238
239} // namespace ipc_lib
240} // namespace aos
241
242#endif // AOS_IPC_LIB_LOCKLESS_QUEUE_H_