blob: dafc157e54e59dc347c53e0fba189138fe1534aa [file] [log] [blame]
Austin Schuh20b2b082019-09-11 20:42:56 -07001#ifndef AOS_IPC_LIB_LOCKLESS_QUEUE_H_
2#define AOS_IPC_LIB_LOCKLESS_QUEUE_H_
3
4#include <signal.h>
5#include <sys/signalfd.h>
6#include <sys/types.h>
7#include <atomic>
8#include <vector>
9
10#include "aos/ipc_lib/aos_sync.h"
11#include "aos/ipc_lib/index.h"
12#include "aos/time/time.h"
13
14namespace aos {
15namespace ipc_lib {
16
17// Structure to hold the state required to wake a watcher.
18struct Watcher {
19 // Mutex that the watcher locks. If the futex is 0 (or FUTEX_OWNER_DIED),
20 // then this watcher is invalid. The futex variable will then hold the tid of
21 // the watcher, or FUTEX_OWNER_DIED if the task died.
22 //
23 // Note: this is modified with a lock held, but is always able to be read.
24 // Any state modification should happen before the lock is acquired.
25 aos_mutex tid;
26
27 // PID of the watcher.
28 pid_t pid;
29
30 // RT priority of the watcher.
31 int priority;
32};
33
34// Structure to hold the state required to send messages.
35struct Sender {
36 // Mutex that the sender locks. If the futex is 0 (or FUTEX_OWNER_DIED), then
37 // this sender is invalid. The futex variable will then hold the tid of the
38 // sender, or FUTEX_OWNER_DIED if the task died.
39 //
40 // Note: this is modified with a lock held, but is always able to be read.
41 aos_mutex tid;
42
43 // Index of the message we will be filling out.
44 AtomicIndex scratch_index;
45
46 // Index of the element being swapped with scratch_index, or Invalid if there
47 // is nothing to do.
48 AtomicIndex to_replace;
49};
50
51// Structure representing a message.
52struct Message {
53 struct Header {
54 // Index of this message in the queue. Needs to match the index this
55 // message is written into the queue at. The data in this message is only
56 // valid if it matches the index in the queue both before and after all the
57 // data is read.
58 //
59 // Note: a value of 0xffffffff always means that the contents aren't valid.
60 AtomicQueueIndex queue_index;
61
62 // Timestamp of the message. Needs to be atomically incrementing in the
63 // queue, which means that time needs to be re-sampled every time a write
64 // fails.
65 ::aos::monotonic_clock::time_point monotonic_sent_time;
66 ::aos::realtime_clock::time_point realtime_sent_time;
67
68 size_t length;
69 } header;
70
71 char data[];
72};
73
74struct LocklessQueueConfiguration {
75 // Size of the watchers list.
76 size_t num_watchers;
77 // Size of the sender list.
78 size_t num_senders;
79
80 // Size of the list of pointers into the messages list.
81 size_t queue_size;
82 // Size in bytes of the data stored in each Message.
83 size_t message_data_size;
84
85 size_t message_size() const { return message_data_size + sizeof(Message); }
86
87 size_t num_messages() const { return num_senders + queue_size; }
88};
89
90// Structure to hold the state of the queue.
91//
92// Reads and writes are lockless and constant time.
93//
94// Adding a new watcher doesn't need to be constant time for the watcher (this
95// is done before the watcher goes RT), but needs to be RT for the sender.
96struct LocklessQueueMemory;
97
98// Initializes the queue memory. memory must be either a valid pointer to the
99// queue datastructure, or must be zero initialized.
100LocklessQueueMemory *InitializeLocklessQueueMemory(
101 LocklessQueueMemory *memory, LocklessQueueConfiguration config);
102
103// Returns the size of the LocklessQueueMemory.
104size_t LocklessQueueMemorySize(LocklessQueueConfiguration config);
105
106// Prints to stdout the data inside the queue for debugging.
107void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
108
109const static int kWakeupSignal = SIGRTMIN + 2;
110
111// Class to manage sending and receiving data in the lockless queue. This is
112// separate from the actual memory backing the queue so that memory can be
113// managed with mmap to share across the process boundary.
114class LocklessQueue {
115 public:
116 LocklessQueue(LocklessQueueMemory *memory, LocklessQueueConfiguration config);
117 LocklessQueue(const LocklessQueue &) = delete;
118 LocklessQueue &operator=(const LocklessQueue &) = delete;
119
120 ~LocklessQueue();
121
122 // Returns the number of messages in the queue.
123 size_t QueueSize() const;
124
125 // Registers this thread to receive the kWakeupSignal signal when Wakeup is
126 // called. Returns false if there was an error in registration.
127 bool RegisterWakeup(int priority);
128 // Unregisters the wakeup.
129 void UnregisterWakeup();
130
131 // Sends the kWakeupSignal to all threads which have called RegisterWakeup.
132 //
133 // priority of 0 means nonrt. nonrt could have issues, so we don't PI boost
134 // if nonrt.
135 int Wakeup(int current_priority);
136
137 // If you ask for a queue index 2 past the newest, you will still get
138 // NOTHING_NEW until that gets overwritten with new data. If you ask for an
139 // element newer than QueueSize() from the current message, we consider it
140 // behind by a large amount and return TOO_OLD.
141 enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW };
142 ReadResult Read(uint32_t queue_index,
143 ::aos::monotonic_clock::time_point *monotonic_sent_time,
144 ::aos::realtime_clock::time_point *realtime_sent_time,
145 size_t *length, char *data);
146
147 // Returns the index to the latest queue message. Returns empty_queue_index()
148 // if there are no messages in the queue. Do note that this index wraps if
149 // more than 2^32 messages are sent.
150 uint32_t LatestQueueIndex();
151 static constexpr uint32_t empty_queue_index() { return 0xffffffff; }
152
153 // TODO(austin): Return the oldest queue index. This lets us catch up nicely
154 // if we got behind.
155 // The easiest way to implement this is likely going to be to reserve the
156 // first modulo of values for the initial time around, and never reuse them.
157 // That lets us do a simple atomic read of the next index and deduce what has
158 // happened. It will involve the simplest atomic operations.
159
160 // TODO(austin): Make it so we can find the indices which were sent just
161 // before and after a time with a binary search.
162
163 // Sender for blocks of data. The resources associated with a sender are
164 // scoped to this object's lifetime.
165 class Sender {
166 public:
167 Sender(const Sender &) = delete;
168 Sender &operator=(const Sender &) = delete;
169 Sender(Sender &&other)
170 : memory_(other.memory_), sender_index_(other.sender_index_) {
171 other.memory_ = nullptr;
172 other.sender_index_ = -1;
173 }
174 Sender &operator=(Sender &&other) {
175 memory_ = other.memory_;
176 sender_index_ = other.sender_index_;
177 other.memory_ = nullptr;
178 other.sender_index_ = -1;
179 return *this;
180 }
181
182 ~Sender();
183
184 // Sends up to length data. Does not wakeup the target.
185 void Send(const char *data, size_t length);
186
187 private:
188 friend class LocklessQueue;
189
190 Sender(LocklessQueueMemory *memory);
191
192 // Pointer to the backing memory.
193 LocklessQueueMemory *memory_ = nullptr;
194
195 // Index into the sender list.
196 int sender_index_ = -1;
197 };
198
199 // Creates a sender.
200 Sender MakeSender();
201
202 private:
203 LocklessQueueMemory *memory_ = nullptr;
204
205 // Memory and datastructure used to sort a list of watchers to wake
206 // up. This isn't a copy of Watcher since tid is simpler to work with here
207 // than the futex above.
208 struct WatcherCopy {
209 pid_t tid;
210 pid_t pid;
211 int priority;
212 };
213 // TODO(austin): Don't allocate this memory if we aren't going to send.
214 ::std::vector<WatcherCopy> watcher_copy_;
215
216 // Index in the watcher list that our entry is, or -1 if no watcher is
217 // registered.
218 int watcher_index_ = -1;
219
220 const int pid_;
221 const uid_t uid_;
222};
223
224} // namespace ipc_lib
225} // namespace aos
226
227#endif // AOS_IPC_LIB_LOCKLESS_QUEUE_H_