blob: 9a849b24c35d421f979867fe5a56c0d94df0ec5e [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#ifndef AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
2#define AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
3
4#include <sys/uio.h>
5
Austin Schuh05b70472020-01-01 17:11:17 -08006#include <deque>
7#include <optional>
Austin Schuhfa895892020-01-07 20:07:41 -08008#include <string>
Austin Schuha36c8902019-12-30 18:07:15 -08009#include <string_view>
10#include <vector>
11
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "absl/types/span.h"
Austin Schuha36c8902019-12-30 18:07:15 -080013#include "aos/events/event_loop.h"
14#include "aos/events/logging/logger_generated.h"
15#include "flatbuffers/flatbuffers.h"
16
17namespace aos {
18namespace logger {
19
20enum class LogType : uint8_t {
21 // The message originated on this node and should be logged here.
22 kLogMessage,
23 // The message originated on another node, but only the delivery times are
24 // logged here.
25 kLogDeliveryTimeOnly,
26 // The message originated on another node. Log it and the delivery times
27 // together. The message_gateway is responsible for logging any messages
28 // which didn't get delivered.
Austin Schuh6f3babe2020-01-26 20:34:50 -080029 kLogMessageAndDeliveryTime,
30 // The message originated on the other node and should be logged on this node.
31 kLogRemoteMessage
Austin Schuha36c8902019-12-30 18:07:15 -080032};
33
Austin Schuha36c8902019-12-30 18:07:15 -080034// This class manages efficiently writing a sequence of detached buffers to a
35// file. It queues them up and batches the write operation.
36class DetachedBufferWriter {
37 public:
38 DetachedBufferWriter(std::string_view filename);
39 ~DetachedBufferWriter();
40
Austin Schuh6f3babe2020-01-26 20:34:50 -080041 std::string_view filename() const { return filename_; }
42
Austin Schuha36c8902019-12-30 18:07:15 -080043 // TODO(austin): Snappy compress the log file if it ends with .snappy!
44
45 // Queues up a finished FlatBufferBuilder to be written. Steals the detached
46 // buffer from it.
47 void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
48 // Queues up a detached buffer directly.
49 void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
Austin Schuhde031b72020-01-10 19:34:41 -080050 // Writes a Span. This is not terribly optimized right now.
51 void WriteSizedFlatbuffer(absl::Span<const uint8_t> span);
Austin Schuha36c8902019-12-30 18:07:15 -080052
53 // Triggers data to be provided to the kernel and written.
54 void Flush();
55
56 private:
Austin Schuh6f3babe2020-01-26 20:34:50 -080057 const std::string filename_;
58
Austin Schuha36c8902019-12-30 18:07:15 -080059 int fd_ = -1;
60
61 // Size of all the data in the queue.
62 size_t queued_size_ = 0;
63
64 // List of buffers to flush.
65 std::vector<flatbuffers::DetachedBuffer> queue_;
66 // List of iovecs to use with writev. This is a member variable to avoid
67 // churn.
68 std::vector<struct iovec> iovec_;
69};
70
71// Packes a message pointed to by the context into a MessageHeader.
72flatbuffers::Offset<MessageHeader> PackMessage(
73 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
74 int channel_index, LogType log_type);
75
Austin Schuh6f3babe2020-01-26 20:34:50 -080076FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename);
77
Austin Schuh05b70472020-01-01 17:11:17 -080078// Class to read chunks out of a log file.
79class SpanReader {
80 public:
81 SpanReader(std::string_view filename);
Austin Schuha36c8902019-12-30 18:07:15 -080082
Austin Schuh05b70472020-01-01 17:11:17 -080083 ~SpanReader() { close(fd_); }
84
Austin Schuh6f3babe2020-01-26 20:34:50 -080085 std::string_view filename() const { return filename_; }
86
Austin Schuh05b70472020-01-01 17:11:17 -080087 // Returns a span with the data for a message from the log file, excluding
88 // the size.
89 absl::Span<const uint8_t> ReadMessage();
90
91 // Returns true if there is a full message available in the buffer, or if we
92 // will have to read more data from disk.
93 bool MessageAvailable();
94
95 private:
96 // TODO(austin): Optimization:
97 // Allocate the 256k blocks like we do today. But, refcount them with
98 // shared_ptr pointed to by the messageheader that is returned. This avoids
99 // the copy. Need to do more benchmarking.
100
101 // Reads a chunk of data into data_. Returns false if no data was read.
102 bool ReadBlock();
103
Austin Schuh6f3babe2020-01-26 20:34:50 -0800104 const std::string filename_;
105
Austin Schuh05b70472020-01-01 17:11:17 -0800106 // File descriptor for the log file.
107 int fd_ = -1;
108
109 // Allocator which doesn't zero initialize memory.
110 template <typename T>
111 struct DefaultInitAllocator {
112 typedef T value_type;
113
114 template <typename U>
115 void construct(U *p) {
116 ::new (static_cast<void *>(p)) U;
117 }
118
119 template <typename U, typename... Args>
120 void construct(U *p, Args &&... args) {
121 ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
122 }
123
124 T *allocate(std::size_t n) {
125 return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
126 }
127
128 template <typename U>
129 void deallocate(U *p, std::size_t /*n*/) {
130 ::operator delete(static_cast<void *>(p));
131 }
132 };
133
134 // Vector to read into. This uses an allocator which doesn't zero
135 // initialize the memory.
136 std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
137
138 // Amount of data consumed already in data_.
139 size_t consumed_data_ = 0;
140
141 // Cached bit for if we have reached the end of the file. Otherwise we will
142 // hammer on the kernel asking for more data each time we send.
143 bool end_of_file_ = false;
144};
145
146// Class which handles reading the header and messages from the log file. This
147// handles any per-file state left before merging below.
148class MessageReader {
149 public:
150 MessageReader(std::string_view filename);
151
Austin Schuh6f3babe2020-01-26 20:34:50 -0800152 std::string_view filename() const { return span_reader_.filename(); }
153
Austin Schuh05b70472020-01-01 17:11:17 -0800154 // Returns the header from the log file.
155 const LogFileHeader *log_file_header() const {
156 return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(
157 configuration_.data());
158 }
159
160 // Returns the minimum maount of data needed to queue up for sorting before
161 // ware guarenteed to not see data out of order.
162 std::chrono::nanoseconds max_out_of_order_duration() const {
163 return max_out_of_order_duration_;
164 }
165
166 monotonic_clock::time_point newest_timestamp() const {
167 return newest_timestamp_;
168 }
169
170 // Returns the next message if there is one.
171 std::optional<FlatbufferVector<MessageHeader>> ReadMessage();
172
173 // The time at which we need to read another chunk from the logfile.
174 monotonic_clock::time_point queue_data_time() const {
175 return newest_timestamp() - max_out_of_order_duration();
176 }
177
178 private:
179 // Log chunk reader.
180 SpanReader span_reader_;
181
182 // Vector holding the data for the configuration.
183 std::vector<uint8_t> configuration_;
184
185 // Minimum amount of data to queue up for sorting before we are guarenteed
186 // to not see data out of order.
187 std::chrono::nanoseconds max_out_of_order_duration_;
188
189 // Timestamp of the newest message in a channel queue.
190 monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
191};
192
Austin Schuh6f3babe2020-01-26 20:34:50 -0800193class TimestampMerger;
Austin Schuh05b70472020-01-01 17:11:17 -0800194
Austin Schuh6f3babe2020-01-26 20:34:50 -0800195// A design requirement is that the relevant data for a channel is not more than
196// max_out_of_order_duration out of order. We approach sorting in layers.
197//
198// 1) Split each (maybe chunked) log file into one queue per channel. Read this
199// log file looking for data pertaining to a specific node.
200// (SplitMessageReader)
201// 2) Merge all the data per channel from the different log files into a sorted
202// list of timestamps and messages. (TimestampMerger)
203// 3) Combine the timestamps and messages. (TimestampMerger)
204// 4) Merge all the channels to produce the next message on a node.
205// (ChannelMerger)
206// 5) Duplicate this entire stack per node.
207
208// This class splits messages and timestamps up into a queue per channel, and
209// handles reading data from multiple chunks.
210class SplitMessageReader {
211 public:
212 SplitMessageReader(const std::vector<std::string> &filenames);
213
214 // Sets the TimestampMerger that gets notified for each channel. The node
215 // that the TimestampMerger is merging as needs to be passed in.
216 void SetTimestampMerger(TimestampMerger *timestamp_merger, int channel,
217 const Node *target_node);
218
219 // Returns the (timestamp, queue_idex) for the oldest message in a channel, or
220 // max_time if there is nothing in the channel.
221 std::tuple<monotonic_clock::time_point, uint32_t> oldest_message(
222 int channel) {
223 return channels_[channel].data.front_timestamp();
224 }
225
226 // Returns the (timestamp, queue_index) for the oldest delivery time in a
227 // channel, or max_time if there is nothing in the channel.
228 std::tuple<monotonic_clock::time_point, uint32_t> oldest_message(
229 int channel, int destination_node) {
230 return channels_[channel].timestamps[destination_node].front_timestamp();
231 }
232
233 // Returns the timestamp, queue_index, and message for the oldest data on a
234 // channel. Requeues data as needed.
235 std::tuple<monotonic_clock::time_point, uint32_t,
236 FlatbufferVector<MessageHeader>>
237 PopOldest(int channel_index);
238
239 // Returns the timestamp, queue_index, and message for the oldest timestamp on
240 // a channel delivered to a node. Requeues data as needed.
241 std::tuple<monotonic_clock::time_point, uint32_t,
242 FlatbufferVector<MessageHeader>>
243 PopOldest(int channel, int node_index);
244
245 // Returns the header for the log files.
Austin Schuh05b70472020-01-01 17:11:17 -0800246 const LogFileHeader *log_file_header() const {
Austin Schuhfa895892020-01-07 20:07:41 -0800247 return &log_file_header_.message();
Austin Schuh05b70472020-01-01 17:11:17 -0800248 }
249
Austin Schuh6f3babe2020-01-26 20:34:50 -0800250 // Returns the starting time for this set of log files.
Austin Schuh05b70472020-01-01 17:11:17 -0800251 monotonic_clock::time_point monotonic_start_time() {
252 return monotonic_clock::time_point(
253 std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
254 }
255 realtime_clock::time_point realtime_start_time() {
256 return realtime_clock::time_point(
257 std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
258 }
259
Austin Schuh6f3babe2020-01-26 20:34:50 -0800260 // Returns the configuration from the log file header.
261 const Configuration *configuration() const {
262 return log_file_header()->configuration();
263 }
264
Austin Schuh05b70472020-01-01 17:11:17 -0800265 // Returns the node who's point of view this log file is from. Make sure this
266 // is a pointer in the configuration() nodes list so it can be consumed
267 // elsewhere.
268 const Node *node() const {
269 if (configuration()->has_nodes()) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800270 return configuration::GetNodeOrDie(configuration(),
271 log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -0800272 } else {
273 CHECK(!log_file_header()->has_node());
274 return nullptr;
275 }
276 }
277
Austin Schuh6f3babe2020-01-26 20:34:50 -0800278 // Returns the timestamp of the newest message read from the log file, and the
279 // timestamp that we need to re-queue data.
280 monotonic_clock::time_point newest_timestamp() const {
281 return message_reader_->newest_timestamp();
282 }
283 monotonic_clock::time_point queue_data_time() const {
284 return message_reader_->queue_data_time();
285 }
286
287
288 // Adds more messages to the sorted list. This reads enough data such that
289 // oldest_message_time can be replayed safely. Returns false if the log file
290 // has all been read.
291 bool QueueMessages(monotonic_clock::time_point oldest_message_time);
Austin Schuh05b70472020-01-01 17:11:17 -0800292
293 private:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800294 // TODO(austin): Need to copy or refcount the message instead of running
295 // multiple copies of the reader. Or maybe have a "as_node" index and hide it
296 // inside.
297
Austin Schuhfa895892020-01-07 20:07:41 -0800298 // Moves to the next log file in the list.
299 bool NextLogFile();
300
Austin Schuh6f3babe2020-01-26 20:34:50 -0800301 // Filenames of the log files.
302 std::vector<std::string> filenames_;
303 // And the index of the next file to open.
304 size_t next_filename_index_ = 0;
Austin Schuh05b70472020-01-01 17:11:17 -0800305
Austin Schuh6f3babe2020-01-26 20:34:50 -0800306 // Log file header to report. This is a copy.
307 FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
308 // Current log file being read.
309 std::unique_ptr<MessageReader> message_reader_;
Austin Schuh05b70472020-01-01 17:11:17 -0800310
311 // Datastructure to hold the list of messages, cached timestamp for the
312 // oldest message, and sender to send with.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800313 struct MessageHeaderQueue {
314 // If true, this is a timestamp queue.
315 bool timestamps = false;
Austin Schuh05b70472020-01-01 17:11:17 -0800316
Austin Schuh6f3babe2020-01-26 20:34:50 -0800317 // Returns a reference to the the oldest message.
318 FlatbufferVector<MessageHeader> &front() {
319 CHECK_GT(data_.size(), 0u);
320 return data_.front();
Austin Schuh05b70472020-01-01 17:11:17 -0800321 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800322
323 // Adds a message to the back of the queue.
324 void emplace_back(FlatbufferVector<MessageHeader> &&msg);
325
326 // Drops the front message. Invalidates the front() reference.
327 void pop_front();
328
329 // The size of the queue.
330 size_t size() { return data_.size(); }
331
332 // Returns the (timestamp, queue_index) for the oldest message.
333 const std::tuple<monotonic_clock::time_point, uint32_t> front_timestamp() {
334 CHECK_GT(data_.size(), 0u);
335 return std::make_tuple(
336 monotonic_clock::time_point(std::chrono::nanoseconds(
337 front().message().monotonic_sent_time())),
338 front().message().queue_index());
339 }
340
341 // Pointer to the timestamp merger for this queue if available.
342 TimestampMerger *timestamp_merger = nullptr;
343 // Pointer to the reader which feeds this queue.
344 SplitMessageReader *split_reader = nullptr;
345
346 private:
347 // The data.
348 std::deque<FlatbufferVector<MessageHeader>> data_;
Austin Schuh05b70472020-01-01 17:11:17 -0800349 };
350
Austin Schuh6f3babe2020-01-26 20:34:50 -0800351 // All the queues needed for a channel. There isn't going to be data in all
352 // of these.
353 struct ChannelData {
354 // The data queue for the channel.
355 MessageHeaderQueue data;
356 // Queues for timestamps for each node.
357 std::vector<MessageHeaderQueue> timestamps;
358 };
Austin Schuhfa895892020-01-07 20:07:41 -0800359
Austin Schuh6f3babe2020-01-26 20:34:50 -0800360 // Data for all the channels.
Austin Schuh05b70472020-01-01 17:11:17 -0800361 std::vector<ChannelData> channels_;
362
Austin Schuh6f3babe2020-01-26 20:34:50 -0800363 // Once we know the node that this SplitMessageReader will be writing as,
364 // there will be only one MessageHeaderQueue that a specific channel matches.
365 // Precompute this here for efficiency.
366 std::vector<MessageHeaderQueue *> channels_to_write_;
367
368 // Number of messages queued.
369 size_t queued_messages_ = 0;
370};
371
372class ChannelMerger;
373
374// Sorts channels (and timestamps) from multiple log files for a single channel.
375class TimestampMerger {
376 public:
377 TimestampMerger(const Configuration *configuration,
378 std::vector<SplitMessageReader *> split_message_readers,
379 int channel_index, const Node *target_node,
380 ChannelMerger *channel_merger);
381
382 // Metadata used to schedule the message.
383 struct DeliveryTimestamp {
384 monotonic_clock::time_point monotonic_event_time =
385 monotonic_clock::min_time;
386 realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
387
388 monotonic_clock::time_point monotonic_remote_time =
389 monotonic_clock::min_time;
390 realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
391 uint32_t remote_queue_index = 0xffffffff;
392 };
393
394 // Pushes SplitMessageReader onto the timestamp heap. This should only be
395 // called when timestamps are placed in the channel this class is merging for
396 // the reader.
397 void UpdateTimestamp(
398 SplitMessageReader *split_message_reader,
399 std::tuple<monotonic_clock::time_point, uint32_t> oldest_message_time) {
400 PushTimestampHeap(oldest_message_time, split_message_reader);
401 }
402 // Pushes SplitMessageReader onto the message heap. This should only be
403 // called when data is placed in the channel this class is merging for the
404 // reader.
405 void Update(
406 SplitMessageReader *split_message_reader,
407 std::tuple<monotonic_clock::time_point, uint32_t> oldest_message_time) {
408 PushMessageHeap(oldest_message_time, split_message_reader);
409 }
410
411 // Returns the oldest combined timestamp and data for this channel.
412 std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
413
414 // Tracks if the channel merger has pushed this onto it's heap or not.
415 bool pushed() { return pushed_; }
416 // Sets if this has been pushed to the channel merger heap. Should only be
417 // called by the channel merger.
418 void set_pushed(bool pushed) { pushed_ = pushed; }
419
420 private:
421 // Pushes messages and timestamps to the corresponding heaps.
422 void PushMessageHeap(
423 std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
424 SplitMessageReader *split_message_reader);
425 void PushTimestampHeap(
426 std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
427 SplitMessageReader *split_message_reader);
428
429 // Pops a message from the message heap. This automatically triggers the
430 // split message reader to re-fetch any new data.
431 std::tuple<monotonic_clock::time_point, uint32_t,
432 FlatbufferVector<MessageHeader>>
433 PopMessageHeap();
434 // Pops a message from the timestamp heap. This automatically triggers the
435 // split message reader to re-fetch any new data.
436 std::tuple<monotonic_clock::time_point, uint32_t,
437 FlatbufferVector<MessageHeader>>
438 PopTimestampHeap();
439
440 const Configuration *configuration_;
441
442 // If true, this is a forwarded channel and timestamps should be matched.
443 bool has_timestamps_ = false;
444
445 // Tracks if the ChannelMerger has pushed this onto it's queue.
446 bool pushed_ = false;
447
448 // The split message readers used for source data.
449 std::vector<SplitMessageReader *> split_message_readers_;
450
451 // The channel to merge.
452 int channel_index_;
453
454 // Our node.
455 int node_index_;
456
457 // Heaps for messages and timestamps.
458 std::vector<
459 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
460 message_heap_;
461 std::vector<
462 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
463 timestamp_heap_;
464
465 // Parent channel merger.
466 ChannelMerger *channel_merger_;
467};
468
469// This class handles constructing all the split message readers, channel
470// mergers, and combining the results.
471class ChannelMerger {
472 public:
473 // Builds a ChannelMerger around a set of log files. These are of the format:
474 // {
475 // {log1_part0, log1_part1, ...},
476 // {log2}
477 // }
478 // The inner vector is a list of log file chunks which form up a log file.
479 // The outer vector is a list of log files with subsets of the messages, or
480 // messages from different nodes.
481 ChannelMerger(const std::vector<std::vector<std::string>> &filenames);
482
483 // Returns the nodes that we know how to merge.
484 const std::vector<const Node *> nodes() const;
485 // Sets the node that we will return messages as. Returns true if the node
486 // has log files and will produce data. This can only be called once, and
487 // will likely corrupt state if called a second time.
488 bool SetNode(const Node *target_node);
489
490 // Everything else needs the node set before it works.
491
492 // Returns a timestamp for the oldest message in this group of logfiles.
493 monotonic_clock::time_point OldestMessage() const;
494 // Pops the oldest message.
495 std::tuple<TimestampMerger::DeliveryTimestamp, int,
496 FlatbufferVector<MessageHeader>>
497 PopOldest();
498
499 // Returns the config for this set of log files.
500 const Configuration *configuration() const {
501 return log_file_header()->configuration();
502 }
503
504 const LogFileHeader *log_file_header() const {
505 return &log_file_header_.message();
506 }
507
508 // Returns the start times for the configured node's log files.
509 monotonic_clock::time_point monotonic_start_time() {
510 return monotonic_clock::time_point(
511 std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
512 }
513 realtime_clock::time_point realtime_start_time() {
514 return realtime_clock::time_point(
515 std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
516 }
517
518 // Returns the node set by SetNode above.
519 const Node *node() const { return node_; }
520
521 // Called by the TimestampMerger when new data is available with the provided
522 // timestamp and channel_index.
523 void Update(monotonic_clock::time_point timestamp, int channel_index) {
524 PushChannelHeap(timestamp, channel_index);
525 }
526
527 private:
528 // Queues messages from each SplitMessageReader until enough data is queued
529 // such that we can guarentee all sorting has happened.
530 void QueueMessages(monotonic_clock::time_point oldest_message_time);
531
532 // Pushes the timestamp for new data on the provided channel.
533 void PushChannelHeap(monotonic_clock::time_point timestamp,
534 int channel_index);
535
536 // All the message readers.
537 std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
538
539 // The log header we are claiming to be.
540 FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
541
542 // The timestamp mergers which combine data from the split message readers.
543 std::vector<TimestampMerger> timestamp_mergers_;
544
545 // A heap of the channel readers and timestamps for the oldest data in each.
Austin Schuh05b70472020-01-01 17:11:17 -0800546 std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
547
Austin Schuh6f3babe2020-01-26 20:34:50 -0800548 // This holds a heap of split_message_readers sorted by the time at which they
549 // need to have QueueMessages called on them.
550 std::vector<std::pair<monotonic_clock::time_point, int>>
551 split_message_reader_heap_;
552
553 // Configured node.
554 const Node *node_;
555
556 // Cached copy of the list of nodes.
557 std::vector<const Node *> nodes_;
Austin Schuh05b70472020-01-01 17:11:17 -0800558};
Austin Schuha36c8902019-12-30 18:07:15 -0800559
560} // namespace logger
561} // namespace aos
562
563#endif // AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_