blob: de80f3dccf0f56b74c8010540d6e7d3b2f440a9f [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#ifndef AOS_IPC_LIB_LOCKLESS_QUEUE_H_
2#define AOS_IPC_LIB_LOCKLESS_QUEUE_H_
3
4#include <signal.h>
5#include <sys/signalfd.h>
6#include <sys/types.h>
Austin Schuh20b2b082019-09-11 20:42:56 -07007#include <vector>
Austin Schuhe516ab02020-05-06 21:37:04 -07008#include <optional>
Austin Schuh20b2b082019-09-11 20:42:56 -07009
10#include "aos/ipc_lib/aos_sync.h"
Brian Silvermana1652f32020-01-29 20:41:44 -080011#include "aos/ipc_lib/data_alignment.h"
Austin Schuh20b2b082019-09-11 20:42:56 -070012#include "aos/ipc_lib/index.h"
13#include "aos/time/time.h"
14
15namespace aos {
16namespace ipc_lib {
17
18// Structure to hold the state required to wake a watcher.
19struct Watcher {
20 // Mutex that the watcher locks. If the futex is 0 (or FUTEX_OWNER_DIED),
21 // then this watcher is invalid. The futex variable will then hold the tid of
22 // the watcher, or FUTEX_OWNER_DIED if the task died.
23 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080024 // Note: this is only modified with the queue_setup_lock lock held, but may
25 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070026 // Any state modification should happen before the lock is acquired.
27 aos_mutex tid;
28
29 // PID of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080030 std::atomic<pid_t> pid;
Austin Schuh20b2b082019-09-11 20:42:56 -070031
32 // RT priority of the watcher.
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080033 std::atomic<int> priority;
Austin Schuh20b2b082019-09-11 20:42:56 -070034};
35
36// Structure to hold the state required to send messages.
37struct Sender {
38 // Mutex that the sender locks. If the futex is 0 (or FUTEX_OWNER_DIED), then
39 // this sender is invalid. The futex variable will then hold the tid of the
40 // sender, or FUTEX_OWNER_DIED if the task died.
41 //
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080042 // Note: this is only modified with the queue_setup_lock lock held, but may
43 // always be read.
Austin Schuh20b2b082019-09-11 20:42:56 -070044 aos_mutex tid;
45
46 // Index of the message we will be filling out.
47 AtomicIndex scratch_index;
48
49 // Index of the element being swapped with scratch_index, or Invalid if there
50 // is nothing to do.
51 AtomicIndex to_replace;
52};
53
54// Structure representing a message.
55struct Message {
56 struct Header {
57 // Index of this message in the queue. Needs to match the index this
58 // message is written into the queue at. The data in this message is only
59 // valid if it matches the index in the queue both before and after all the
60 // data is read.
61 //
62 // Note: a value of 0xffffffff always means that the contents aren't valid.
63 AtomicQueueIndex queue_index;
64
Brian Silvermanfafe1fa2019-12-18 21:42:18 -080065 // Timestamp of the message. Needs to be monotonically incrementing in the
Austin Schuh20b2b082019-09-11 20:42:56 -070066 // queue, which means that time needs to be re-sampled every time a write
67 // fails.
68 ::aos::monotonic_clock::time_point monotonic_sent_time;
69 ::aos::realtime_clock::time_point realtime_sent_time;
Austin Schuhad154822019-12-27 15:45:13 -080070 // Timestamps of the message from the remote node. These are transparently
71 // passed through.
72 ::aos::monotonic_clock::time_point monotonic_remote_time;
73 ::aos::realtime_clock::time_point realtime_remote_time;
74
75 // Queue index from the remote node.
76 uint32_t remote_queue_index;
Austin Schuh20b2b082019-09-11 20:42:56 -070077
78 size_t length;
79 } header;
80
Brian Silvermana1652f32020-01-29 20:41:44 -080081 char *data(size_t message_size) { return RoundedData(message_size); }
82 const char *data(size_t message_size) const {
83 return RoundedData(message_size);
84 }
85
86 private:
87 // This returns a non-const pointer into a const object. Be very careful about
88 // const correctness in publicly accessible APIs using it.
89 char *RoundedData(size_t message_size) const {
90 return RoundChannelData(const_cast<char *>(&data_pointer[0]), message_size);
91 }
92
93 char data_pointer[];
Austin Schuh20b2b082019-09-11 20:42:56 -070094};
95
96struct LocklessQueueConfiguration {
97 // Size of the watchers list.
98 size_t num_watchers;
99 // Size of the sender list.
100 size_t num_senders;
101
102 // Size of the list of pointers into the messages list.
103 size_t queue_size;
104 // Size in bytes of the data stored in each Message.
105 size_t message_data_size;
106
Austin Schuh4bc4f902019-12-23 18:04:51 -0800107 size_t message_size() const;
Austin Schuh20b2b082019-09-11 20:42:56 -0700108
109 size_t num_messages() const { return num_senders + queue_size; }
110};
111
112// Structure to hold the state of the queue.
113//
114// Reads and writes are lockless and constant time.
115//
116// Adding a new watcher doesn't need to be constant time for the watcher (this
117// is done before the watcher goes RT), but needs to be RT for the sender.
118struct LocklessQueueMemory;
119
120// Initializes the queue memory. memory must be either a valid pointer to the
121// queue datastructure, or must be zero initialized.
122LocklessQueueMemory *InitializeLocklessQueueMemory(
123 LocklessQueueMemory *memory, LocklessQueueConfiguration config);
124
125// Returns the size of the LocklessQueueMemory.
126size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
127
128// Prints to stdout the data inside the queue for debugging.
129void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
130
Alex Perrycb7da4b2019-08-28 19:35:56 -0700131const static unsigned int kWakeupSignal = SIGRTMIN + 2;
Austin Schuh20b2b082019-09-11 20:42:56 -0700132
133// Class to manage sending and receiving data in the lockless queue. This is
134// separate from the actual memory backing the queue so that memory can be
135// managed with mmap to share across the process boundary.
136class LocklessQueue {
137 public:
138 LocklessQueue(LocklessQueueMemory *memory, LocklessQueueConfiguration config);
139 LocklessQueue(const LocklessQueue &) = delete;
140 LocklessQueue &operator=(const LocklessQueue &) = delete;
141
142 ~LocklessQueue();
143
144 // Returns the number of messages in the queue.
145 size_t QueueSize() const;
146
Alex Perrycb7da4b2019-08-28 19:35:56 -0700147 size_t message_data_size() const;
148
Austin Schuh20b2b082019-09-11 20:42:56 -0700149 // Registers this thread to receive the kWakeupSignal signal when Wakeup is
150 // called. Returns false if there was an error in registration.
151 bool RegisterWakeup(int priority);
152 // Unregisters the wakeup.
153 void UnregisterWakeup();
154
155 // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
156 //
157 // priority of 0 means nonrt. nonrt could have issues, so we don't PI boost
158 // if nonrt.
159 int Wakeup(int current_priority);
160
161 // If you ask for a queue index 2 past the newest, you will still get
162 // NOTHING_NEW until that gets overwritten with new data. If you ask for an
163 // element newer than QueueSize() from the current message, we consider it
Alex Perrycb7da4b2019-08-28 19:35:56 -0700164 // behind by a large amount and return TOO_OLD. If the message is modified
165 // out from underneath us as we read it, return OVERWROTE.
Brian Silverman6b8a3c32020-03-06 11:26:14 -0800166 //
167 // data may be nullptr to indicate the data should not be copied.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700168 enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
Austin Schuh20b2b082019-09-11 20:42:56 -0700169 ReadResult Read(uint32_t queue_index,
170 ::aos::monotonic_clock::time_point *monotonic_sent_time,
171 ::aos::realtime_clock::time_point *realtime_sent_time,
Austin Schuhad154822019-12-27 15:45:13 -0800172 ::aos::monotonic_clock::time_point *monotonic_remote_time,
173 ::aos::realtime_clock::time_point *realtime_remote_time,
174 uint32_t *remote_queue_index, size_t *length, char *data);
Austin Schuh20b2b082019-09-11 20:42:56 -0700175
176 // Returns the index to the latest queue message. Returns empty_queue_index()
177 // if there are no messages in the queue. Do note that this index wraps if
178 // more than 2^32 messages are sent.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700179 QueueIndex LatestQueueIndex();
180 static QueueIndex empty_queue_index() { return QueueIndex::Invalid(); }
181
182 // Returns the size of the queue. This is mostly useful for manipulating
183 // QueueIndex.
184 size_t queue_size() const;
Austin Schuh20b2b082019-09-11 20:42:56 -0700185
186 // TODO(austin): Return the oldest queue index. This lets us catch up nicely
187 // if we got behind.
188 // The easiest way to implement this is likely going to be to reserve the
189 // first modulo of values for the initial time around, and never reuse them.
190 // That lets us do a simple atomic read of the next index and deduce what has
191 // happened. It will involve the simplest atomic operations.
192
193 // TODO(austin): Make it so we can find the indices which were sent just
194 // before and after a time with a binary search.
195
196 // Sender for blocks of data. The resources associated with a sender are
197 // scoped to this object's lifetime.
198 class Sender {
199 public:
200 Sender(const Sender &) = delete;
201 Sender &operator=(const Sender &) = delete;
202 Sender(Sender &&other)
203 : memory_(other.memory_), sender_index_(other.sender_index_) {
204 other.memory_ = nullptr;
205 other.sender_index_ = -1;
206 }
207 Sender &operator=(Sender &&other) {
208 memory_ = other.memory_;
209 sender_index_ = other.sender_index_;
210 other.memory_ = nullptr;
211 other.sender_index_ = -1;
212 return *this;
213 }
214
215 ~Sender();
216
Alex Perrycb7da4b2019-08-28 19:35:56 -0700217 // Sends a message without copying the data.
218 // Copy at most size() bytes of data into the memory pointed to by Data(),
219 // and then call Send().
220 // Note: calls to Data() are expensive enough that you should cache it.
221 size_t size();
222 void *Data();
Austin Schuhad154822019-12-27 15:45:13 -0800223 void Send(size_t length,
224 aos::monotonic_clock::time_point monotonic_remote_time =
225 aos::monotonic_clock::min_time,
226 aos::realtime_clock::time_point realtime_remote_time =
227 aos::realtime_clock::min_time,
228 uint32_t remote_queue_index = 0xffffffff,
229 aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
230 aos::realtime_clock::time_point *realtime_sent_time = nullptr,
231 uint32_t *queue_index = nullptr);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700232
Austin Schuh20b2b082019-09-11 20:42:56 -0700233 // Sends up to length data. Does not wakeup the target.
Austin Schuhad154822019-12-27 15:45:13 -0800234 void Send(const char *data, size_t length,
235 aos::monotonic_clock::time_point monotonic_remote_time =
236 aos::monotonic_clock::min_time,
237 aos::realtime_clock::time_point realtime_remote_time =
238 aos::realtime_clock::min_time,
239 uint32_t remote_queue_index = 0xffffffff,
240 aos::monotonic_clock::time_point *monotonic_sent_time = nullptr,
241 aos::realtime_clock::time_point *realtime_sent_time = nullptr,
242 uint32_t *queue_index = nullptr);
Austin Schuh20b2b082019-09-11 20:42:56 -0700243
244 private:
245 friend class LocklessQueue;
246
247 Sender(LocklessQueueMemory *memory);
248
Austin Schuhe516ab02020-05-06 21:37:04 -0700249 // Returns true if this sender is valid. If it isn't valid, any of the
250 // other methods won't work. This is here to allow the lockless queue to
251 // only build a sender if there was one available.
252 bool valid() const { return sender_index_ != -1 && memory_ != nullptr; }
253
Austin Schuh20b2b082019-09-11 20:42:56 -0700254 // Pointer to the backing memory.
255 LocklessQueueMemory *memory_ = nullptr;
256
257 // Index into the sender list.
258 int sender_index_ = -1;
259 };
260
Austin Schuhe516ab02020-05-06 21:37:04 -0700261 // Creates a sender. If we couldn't allocate a sender, returns nullopt.
262 // TODO(austin): Change the API if we find ourselves with more errors.
263 std::optional<Sender> MakeSender();
Austin Schuh20b2b082019-09-11 20:42:56 -0700264
265 private:
266 LocklessQueueMemory *memory_ = nullptr;
267
268 // Memory and datastructure used to sort a list of watchers to wake
269 // up. This isn't a copy of Watcher since tid is simpler to work with here
270 // than the futex above.
271 struct WatcherCopy {
272 pid_t tid;
273 pid_t pid;
274 int priority;
275 };
276 // TODO(austin): Don't allocate this memory if we aren't going to send.
277 ::std::vector<WatcherCopy> watcher_copy_;
278
279 // Index in the watcher list that our entry is, or -1 if no watcher is
280 // registered.
281 int watcher_index_ = -1;
282
283 const int pid_;
284 const uid_t uid_;
285};
286
287} // namespace ipc_lib
288} // namespace aos
289
290#endif // AOS_IPC_LIB_LOCKLESS_QUEUE_H_