blob: 285fe05b09076f04bcabe399b763cef2c323507b [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#ifndef AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
2#define AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
3
4#include <sys/uio.h>
5
Austin Schuh05b70472020-01-01 17:11:17 -08006#include <deque>
7#include <optional>
Austin Schuha36c8902019-12-30 18:07:15 -08008#include <string_view>
9#include <vector>
10
Austin Schuh05b70472020-01-01 17:11:17 -080011#include "absl/types/span.h"
Austin Schuha36c8902019-12-30 18:07:15 -080012#include "aos/events/event_loop.h"
13#include "aos/events/logging/logger_generated.h"
14#include "flatbuffers/flatbuffers.h"
15
16namespace aos {
17namespace logger {
18
19enum class LogType : uint8_t {
20 // The message originated on this node and should be logged here.
21 kLogMessage,
22 // The message originated on another node, but only the delivery times are
23 // logged here.
24 kLogDeliveryTimeOnly,
25 // The message originated on another node. Log it and the delivery times
26 // together. The message_gateway is responsible for logging any messages
27 // which didn't get delivered.
28 kLogMessageAndDeliveryTime
29};
30
31
32// This class manages efficiently writing a sequence of detached buffers to a
33// file. It queues them up and batches the write operation.
34class DetachedBufferWriter {
35 public:
36 DetachedBufferWriter(std::string_view filename);
37 ~DetachedBufferWriter();
38
39 // TODO(austin): Snappy compress the log file if it ends with .snappy!
40
41 // Queues up a finished FlatBufferBuilder to be written. Steals the detached
42 // buffer from it.
43 void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
44 // Queues up a detached buffer directly.
45 void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
46
47 // Triggers data to be provided to the kernel and written.
48 void Flush();
49
50 private:
51 int fd_ = -1;
52
53 // Size of all the data in the queue.
54 size_t queued_size_ = 0;
55
56 // List of buffers to flush.
57 std::vector<flatbuffers::DetachedBuffer> queue_;
58 // List of iovecs to use with writev. This is a member variable to avoid
59 // churn.
60 std::vector<struct iovec> iovec_;
61};
62
63// Packes a message pointed to by the context into a MessageHeader.
64flatbuffers::Offset<MessageHeader> PackMessage(
65 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
66 int channel_index, LogType log_type);
67
Austin Schuh05b70472020-01-01 17:11:17 -080068// Class to read chunks out of a log file.
69class SpanReader {
70 public:
71 SpanReader(std::string_view filename);
Austin Schuha36c8902019-12-30 18:07:15 -080072
Austin Schuh05b70472020-01-01 17:11:17 -080073 ~SpanReader() { close(fd_); }
74
75 // Returns a span with the data for a message from the log file, excluding
76 // the size.
77 absl::Span<const uint8_t> ReadMessage();
78
79 // Returns true if there is a full message available in the buffer, or if we
80 // will have to read more data from disk.
81 bool MessageAvailable();
82
83 private:
84 // TODO(austin): Optimization:
85 // Allocate the 256k blocks like we do today. But, refcount them with
86 // shared_ptr pointed to by the messageheader that is returned. This avoids
87 // the copy. Need to do more benchmarking.
88
89 // Reads a chunk of data into data_. Returns false if no data was read.
90 bool ReadBlock();
91
92 // File descriptor for the log file.
93 int fd_ = -1;
94
95 // Allocator which doesn't zero initialize memory.
96 template <typename T>
97 struct DefaultInitAllocator {
98 typedef T value_type;
99
100 template <typename U>
101 void construct(U *p) {
102 ::new (static_cast<void *>(p)) U;
103 }
104
105 template <typename U, typename... Args>
106 void construct(U *p, Args &&... args) {
107 ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
108 }
109
110 T *allocate(std::size_t n) {
111 return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
112 }
113
114 template <typename U>
115 void deallocate(U *p, std::size_t /*n*/) {
116 ::operator delete(static_cast<void *>(p));
117 }
118 };
119
120 // Vector to read into. This uses an allocator which doesn't zero
121 // initialize the memory.
122 std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
123
124 // Amount of data consumed already in data_.
125 size_t consumed_data_ = 0;
126
127 // Cached bit for if we have reached the end of the file. Otherwise we will
128 // hammer on the kernel asking for more data each time we send.
129 bool end_of_file_ = false;
130};
131
132// Class which handles reading the header and messages from the log file. This
133// handles any per-file state left before merging below.
134class MessageReader {
135 public:
136 MessageReader(std::string_view filename);
137
138 // Returns the header from the log file.
139 const LogFileHeader *log_file_header() const {
140 return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(
141 configuration_.data());
142 }
143
144 // Returns the minimum maount of data needed to queue up for sorting before
145 // ware guarenteed to not see data out of order.
146 std::chrono::nanoseconds max_out_of_order_duration() const {
147 return max_out_of_order_duration_;
148 }
149
150 monotonic_clock::time_point newest_timestamp() const {
151 return newest_timestamp_;
152 }
153
154 // Returns the next message if there is one.
155 std::optional<FlatbufferVector<MessageHeader>> ReadMessage();
156
157 // The time at which we need to read another chunk from the logfile.
158 monotonic_clock::time_point queue_data_time() const {
159 return newest_timestamp() - max_out_of_order_duration();
160 }
161
162 private:
163 // Log chunk reader.
164 SpanReader span_reader_;
165
166 // Vector holding the data for the configuration.
167 std::vector<uint8_t> configuration_;
168
169 // Minimum amount of data to queue up for sorting before we are guarenteed
170 // to not see data out of order.
171 std::chrono::nanoseconds max_out_of_order_duration_;
172
173 // Timestamp of the newest message in a channel queue.
174 monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
175};
176
177// We need to read a large chunk at a time, then kit it up into parts and
178// sort.
179//
180// We want to read 256 KB chunks at a time. This is the fastest read size.
181// This leaves us with a fragmentation problem though.
182//
183// The easy answer is to read 256 KB chunks. Then, malloc and memcpy those
184// chunks into single flatbuffer messages and manage them in a sorted queue.
185// Everything is copied three times (into 256 kb buffer, then into separate
186// buffer, then into sender), but none of it is all that expensive. We can
187// optimize if it is slow later.
188//
189// As we place the elements in the sorted list of times, keep doing this
190// until we read a message that is newer than the threshold.
191//
192// Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
193// small state machine so we can resume), and keep pulling messages back out
194// and sending.
195//
196// For sorting, we want to use the fact that each channel is sorted, and
197// then merge sort the channels. Have a vector of deques, and then hold a
198// sorted list of pointers to those.
199class SortedMessageReader {
200 public:
201 SortedMessageReader(std::string_view filename);
202
203 // Returns the header from the log file.
204 const LogFileHeader *log_file_header() const {
205 return message_reader_.log_file_header();
206 }
207
208 // Returns a pointer to the channel with the oldest message in it, and the
209 // timestamp.
210 const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
211 return channel_heap_.front();
212 }
213
214 // Returns the number of channels with data still in them.
215 size_t active_channel_count() const { return channel_heap_.size(); }
216
217 // Returns the configuration from the log file header.
218 const Configuration *configuration() const {
219 return log_file_header()->configuration();
220 }
221
222 // Returns the start time on both the monotonic and realtime clocks.
223 monotonic_clock::time_point monotonic_start_time() {
224 return monotonic_clock::time_point(
225 std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
226 }
227 realtime_clock::time_point realtime_start_time() {
228 return realtime_clock::time_point(
229 std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
230 }
231
232 // Returns the node who's point of view this log file is from. Make sure this
233 // is a pointer in the configuration() nodes list so it can be consumed
234 // elsewhere.
235 const Node *node() const {
236 if (configuration()->has_nodes()) {
237 CHECK(log_file_header()->has_node());
238 CHECK(log_file_header()->node()->has_name());
239 return configuration::GetNode(
240 configuration(), log_file_header()->node()->name()->string_view());
241 } else {
242 CHECK(!log_file_header()->has_node());
243 return nullptr;
244 }
245 }
246
247 // Pops a pointer to the channel with the oldest message in it, and the
248 // timestamp.
249 std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
250 PopOldestChannel();
251
252 private:
253 // Adds more messages to the sorted list.
254 void QueueMessages();
255
256 // Moves the message to the correct channel queue.
257 void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
258
259 // Pushes a pointer to the channel for the given timestamp to the sorted
260 // channel list.
261 void PushChannelHeap(monotonic_clock::time_point timestamp,
262 int channel_index);
263
264
265 // Datastructure to hold the list of messages, cached timestamp for the
266 // oldest message, and sender to send with.
267 struct ChannelData {
268 monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
269 std::deque<FlatbufferVector<MessageHeader>> data;
270 std::unique_ptr<RawSender> raw_sender;
271
272 // Returns the oldest message.
273 const FlatbufferVector<MessageHeader> &front() { return data.front(); }
274
275 // Returns the timestamp for the oldest message.
276 const monotonic_clock::time_point front_timestamp() {
277 return monotonic_clock::time_point(
278 std::chrono::nanoseconds(front().message().monotonic_sent_time()));
279 }
280 };
281
282 MessageReader message_reader_;
283
284 // TODO(austin): Multithreaded read at some point. Gotta go faster!
285 // Especially if we start compressing.
286
287 // List of channels and messages for them.
288 std::vector<ChannelData> channels_;
289
290 // Heap of channels so we can track which channel to send next.
291 std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
292
293};
Austin Schuha36c8902019-12-30 18:07:15 -0800294
295} // namespace logger
296} // namespace aos
297
298#endif // AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_