blob: 550485f7df5460507686fad5ef63f6b3fdbbc1a5 [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#ifndef AOS_IPC_LIB_LOCKLESS_QUEUE_H_
2#define AOS_IPC_LIB_LOCKLESS_QUEUE_H_
3
4#include <signal.h>
5#include <sys/signalfd.h>
6#include <sys/types.h>
Austin Schuh20b2b082019-09-11 20:42:56 -07007#include <vector>
8
9#include "aos/ipc_lib/aos_sync.h"
Brian Silvermana1652f32020-01-29 20:41:44 -080010#include "aos/ipc_lib/data_alignment.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070011#include "aos/ipc_lib/index.h"
12#include "aos/time/time.h"
13
14namespace aos {
15namespace ipc_lib {
16
17// Structure to hold the state required to wake a watcher.
18struct Watcher {
19 // Mutex that the watcher locks. If the futex is 0 (or FUTEX_OWNER_DIED),
20 // then this watcher is invalid. The futex variable will then hold the tid of
21 // the watcher, or FUTEX_OWNER_DIED if the task died.
22 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080023 // Note: this is only modified with the queue_setup_lock lock held, but may
24 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070025 // Any state modification should happen before the lock is acquired.
26 aos_mutex tid;
27
28 // PID of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080029 std::atomic<pid_t> pid;
Austin Schuh20b2b082019-09-11 20:42:56 -070030
31 // RT priority of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080032 std::atomic<int> priority;
Austin Schuh20b2b082019-09-11 20:42:56 -070033};
34
35// Structure to hold the state required to send messages.
36struct Sender {
37 // Mutex that the sender locks. If the futex is 0 (or FUTEX_OWNER_DIED), then
38 // this sender is invalid. The futex variable will then hold the tid of the
39 // sender, or FUTEX_OWNER_DIED if the task died.
40 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080041 // Note: this is only modified with the queue_setup_lock lock held, but may
42 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070043 aos_mutex tid;
44
45 // Index of the message we will be filling out.
46 AtomicIndex scratch_index;
47
48 // Index of the element being swapped with scratch_index, or Invalid if there
49 // is nothing to do.
50 AtomicIndex to_replace;
51};
52
53// Structure representing a message.
54struct Message {
55 struct Header {
56 // Index of this message in the queue. Needs to match the index this
57 // message is written into the queue at. The data in this message is only
58 // valid if it matches the index in the queue both before and after all the
59 // data is read.
60 //
61 // Note: a value of 0xffffffff always means that the contents aren't valid.
62 AtomicQueueIndex queue_index;
63
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080064 // Timestamp of the message. Needs to be monotonically incrementing in the
Austin Schuh20b2b082019-09-11 20:42:56 -070065 // queue, which means that time needs to be re-sampled every time a write
66 // fails.
67 ::aos::monotonic_clock::time_point monotonic_sent_time;
68 ::aos::realtime_clock::time_point realtime_sent_time;
Austin Schuhad154822019-12-27 15:45:13 -080069 // Timestamps of the message from the remote node. These are transparently
70 // passed through.
71 ::aos::monotonic_clock::time_point monotonic_remote_time;
72 ::aos::realtime_clock::time_point realtime_remote_time;
73
74 // Queue index from the remote node.
75 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -070076
77 size_t length;
78 } header;
79
Brian Silvermana1652f32020-01-29 20:41:44 -080080 char *data(size_t message_size) { return RoundedData(message_size); }
81 const char *data(size_t message_size) const {
82 return RoundedData(message_size);
83 }
84
85 private:
86 // This returns a non-const pointer into a const object. Be very careful about
87 // const correctness in publicly accessible APIs using it.
88 char *RoundedData(size_t message_size) const {
89 return RoundChannelData(const_cast<char *>(&data_pointer[0]), message_size);
90 }
91
92 char data_pointer[];
Austin Schuh20b2b082019-09-11 20:42:56 -070093};
94
95struct LocklessQueueConfiguration {
96 // Size of the watchers list.
97 size_t num_watchers;
98 // Size of the sender list.
99 size_t num_senders;
100
101 // Size of the list of pointers into the messages list.
102 size_t queue_size;
103 // Size in bytes of the data stored in each Message.
104 size_t message_data_size;
105
Austin Schuh4bc4f902019-12-23 18:04:51 -0800106 size_t message_size() const;
Austin Schuh20b2b082019-09-11 20:42:56 -0700107
108 size_t num_messages() const { return num_senders + queue_size; }
109};
110
111// Structure to hold the state of the queue.
112//
113// Reads and writes are lockless and constant time.
114//
115// Adding a new watcher doesn't need to be constant time for the watcher (this
116// is done before the watcher goes RT), but needs to be RT for the sender.
117struct LocklessQueueMemory;
118
119// Initializes the queue memory. memory must be either a valid pointer to the
120// queue datastructure, or must be zero initialized.
121LocklessQueueMemory *InitializeLocklessQueueMemory(
122 LocklessQueueMemory *memory, LocklessQueueConfiguration config);
123
124// Returns the size of the LocklessQueueMemory.
125size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
126
127// Prints to stdout the data inside the queue for debugging.
128void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
129
Alex Perrycb7da4b2019-08-28 19:35:56 -0700130const static unsigned int kWakeupSignal = SIGRTMIN + 2;
Austin Schuh20b2b082019-09-11 20:42:56 -0700131
132// Class to manage sending and receiving data in the lockless queue. This is
133// separate from the actual memory backing the queue so that memory can be
134// managed with mmap to share across the process boundary.
135class LocklessQueue {
136 public:
137 LocklessQueue(LocklessQueueMemory *memory, LocklessQueueConfiguration config);
138 LocklessQueue(const LocklessQueue &) = delete;
139 LocklessQueue &operator=(const LocklessQueue &) = delete;
140
141 ~LocklessQueue();
142
143 // Returns the number of messages in the queue.
144 size_t QueueSize() const;
145
Alex Perrycb7da4b2019-08-28 19:35:56 -0700146 size_t message_data_size() const;
147
Austin Schuh20b2b082019-09-11 20:42:56 -0700148 // Registers this thread to receive the kWakeupSignal signal when Wakeup is
149 // called. Returns false if there was an error in registration.
150 bool RegisterWakeup(int priority);
151 // Unregisters the wakeup.
152 void UnregisterWakeup();
153
154 // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
155 //
156 // priority of 0 means nonrt. nonrt could have issues, so we don't PI boost
157 // if nonrt.
158 int Wakeup(int current_priority);
159
160 // If you ask for a queue index 2 past the newest, you will still get
161 // NOTHING_NEW until that gets overwritten with new data. If you ask for an
162 // element newer than QueueSize() from the current message, we consider it
Alex Perrycb7da4b2019-08-28 19:35:56 -0700163 // behind by a large amount and return TOO_OLD. If the message is modified
164 // out from underneath us as we read it, return OVERWROTE.
Brian Silverman6b8a3c32020-03-06 11:26:14 -0800165 //
166 // data may be nullptr to indicate the data should not be copied.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700167 enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
Austin Schuh20b2b082019-09-11 20:42:56 -0700168 ReadResult Read(uint32_t queue_index,
169 ::aos::monotonic_clock::time_point *monotonic_sent_time,
170 ::aos::realtime_clock::time_point *realtime_sent_time,
Austin Schuhad154822019-12-27 15:45:13 -0800171 ::aos::monotonic_clock::time_point *monotonic_remote_time,
172 ::aos::realtime_clock::time_point *realtime_remote_time,
173 uint32_t *remote_queue_index, size_t *length, char *data);
Austin Schuh20b2b082019-09-11 20:42:56 -0700174
175 // Returns the index to the latest queue message. Returns empty_queue_index()
176 // if there are no messages in the queue. Do note that this index wraps if
177 // more than 2^32 messages are sent.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700178 QueueIndex LatestQueueIndex();
179 static QueueIndex empty_queue_index() { return QueueIndex::Invalid(); }
180
181 // Returns the size of the queue. This is mostly useful for manipulating
182 // QueueIndex.
183 size_t queue_size() const;
Austin Schuh20b2b082019-09-11 20:42:56 -0700184
185 // TODO(austin): Return the oldest queue index. This lets us catch up nicely
186 // if we got behind.
187 // The easiest way to implement this is likely going to be to reserve the
188 // first modulo of values for the initial time around, and never reuse them.
189 // That lets us do a simple atomic read of the next index and deduce what has
190 // happened. It will involve the simplest atomic operations.
191
192 // TODO(austin): Make it so we can find the indices which were sent just
193 // before and after a time with a binary search.
194
195 // Sender for blocks of data. The resources associated with a sender are
196 // scoped to this object's lifetime.
197 class Sender {
198 public:
199 Sender(const Sender &) = delete;
200 Sender &operator=(const Sender &) = delete;
201 Sender(Sender &&other)
202 : memory_(other.memory_), sender_index_(other.sender_index_) {
203 other.memory_ = nullptr;
204 other.sender_index_ = -1;
205 }
206 Sender &operator=(Sender &&other) {
207 memory_ = other.memory_;
208 sender_index_ = other.sender_index_;
209 other.memory_ = nullptr;
210 other.sender_index_ = -1;
211 return *this;
212 }
213
214 ~Sender();
215
Alex Perrycb7da4b2019-08-28 19:35:56 -0700216 // Sends a message without copying the data.
217 // Copy at most size() bytes of data into the memory pointed to by Data(),
218 // and then call Send().
219 // Note: calls to Data() are expensive enough that you should cache it.
220 size_t size();
221 void *Data();
Austin Schuhad154822019-12-27 15:45:13 -0800222 void Send(size_t length,
223 aos::monotonic_clock::time_point monotonic_remote_time =
224 aos::monotonic_clock::min_time,
225 aos::realtime_clock::time_point realtime_remote_time =
226 aos::realtime_clock::min_time,
227 uint32_t remote_queue_index = 0xffffffff,
228 aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
229 aos::realtime_clock::time_point *realtime_sent_time = nullptr,
230 uint32_t *queue_index = nullptr);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700231
Austin Schuh20b2b082019-09-11 20:42:56 -0700232 // Sends up to length data. Does not wakeup the target.
Austin Schuhad154822019-12-27 15:45:13 -0800233 void Send(const char *data, size_t length,
234 aos::monotonic_clock::time_point monotonic_remote_time =
235 aos::monotonic_clock::min_time,
236 aos::realtime_clock::time_point realtime_remote_time =
237 aos::realtime_clock::min_time,
238 uint32_t remote_queue_index = 0xffffffff,
239 aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
240 aos::realtime_clock::time_point *realtime_sent_time = nullptr,
241 uint32_t *queue_index = nullptr);
Austin Schuh20b2b082019-09-11 20:42:56 -0700242
243 private:
244 friend class LocklessQueue;
245
246 Sender(LocklessQueueMemory *memory);
247
248 // Pointer to the backing memory.
249 LocklessQueueMemory *memory_ = nullptr;
250
251 // Index into the sender list.
252 int sender_index_ = -1;
253 };
254
255 // Creates a sender.
256 Sender MakeSender();
257
258 private:
259 LocklessQueueMemory *memory_ = nullptr;
260
261 // Memory and datastructure used to sort a list of watchers to wake
262 // up. This isn't a copy of Watcher since tid is simpler to work with here
263 // than the futex above.
264 struct WatcherCopy {
265 pid_t tid;
266 pid_t pid;
267 int priority;
268 };
269 // TODO(austin): Don't allocate this memory if we aren't going to send.
270 ::std::vector<WatcherCopy> watcher_copy_;
271
272 // Index in the watcher list that our entry is, or -1 if no watcher is
273 // registered.
274 int watcher_index_ = -1;
275
276 const int pid_;
277 const uid_t uid_;
278};
279
280} // namespace ipc_lib
281} // namespace aos
282
283#endif // AOS_IPC_LIB_LOCKLESS_QUEUE_H_