blob: 6b8e9aa5d15824e0df9fe867ca4fcef7786f852e [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#ifndef AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
2#define AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
3
4#include <sys/uio.h>
5
Austin Schuh05b70472020-01-01 17:11:17 -08006#include <deque>
7#include <optional>
Austin Schuhfa895892020-01-07 20:07:41 -08008#include <string>
Austin Schuha36c8902019-12-30 18:07:15 -08009#include <string_view>
10#include <vector>
11
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "absl/types/span.h"
Austin Schuha36c8902019-12-30 18:07:15 -080013#include "aos/events/event_loop.h"
14#include "aos/events/logging/logger_generated.h"
15#include "flatbuffers/flatbuffers.h"
16
17namespace aos {
18namespace logger {
19
20enum class LogType : uint8_t {
21 // The message originated on this node and should be logged here.
22 kLogMessage,
23 // The message originated on another node, but only the delivery times are
24 // logged here.
25 kLogDeliveryTimeOnly,
26 // The message originated on another node. Log it and the delivery times
27 // together. The message_gateway is responsible for logging any messages
28 // which didn't get delivered.
29 kLogMessageAndDeliveryTime
30};
31
32
33// This class manages efficiently writing a sequence of detached buffers to a
34// file. It queues them up and batches the write operation.
35class DetachedBufferWriter {
36 public:
37 DetachedBufferWriter(std::string_view filename);
38 ~DetachedBufferWriter();
39
40 // TODO(austin): Snappy compress the log file if it ends with .snappy!
41
42 // Queues up a finished FlatBufferBuilder to be written. Steals the detached
43 // buffer from it.
44 void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
45 // Queues up a detached buffer directly.
46 void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
47
48 // Triggers data to be provided to the kernel and written.
49 void Flush();
50
51 private:
52 int fd_ = -1;
53
54 // Size of all the data in the queue.
55 size_t queued_size_ = 0;
56
57 // List of buffers to flush.
58 std::vector<flatbuffers::DetachedBuffer> queue_;
59 // List of iovecs to use with writev. This is a member variable to avoid
60 // churn.
61 std::vector<struct iovec> iovec_;
62};
63
64// Packes a message pointed to by the context into a MessageHeader.
65flatbuffers::Offset<MessageHeader> PackMessage(
66 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
67 int channel_index, LogType log_type);
68
Austin Schuh05b70472020-01-01 17:11:17 -080069// Class to read chunks out of a log file.
70class SpanReader {
71 public:
72 SpanReader(std::string_view filename);
Austin Schuha36c8902019-12-30 18:07:15 -080073
Austin Schuh05b70472020-01-01 17:11:17 -080074 ~SpanReader() { close(fd_); }
75
76 // Returns a span with the data for a message from the log file, excluding
77 // the size.
78 absl::Span<const uint8_t> ReadMessage();
79
80 // Returns true if there is a full message available in the buffer, or if we
81 // will have to read more data from disk.
82 bool MessageAvailable();
83
84 private:
85 // TODO(austin): Optimization:
86 // Allocate the 256k blocks like we do today. But, refcount them with
87 // shared_ptr pointed to by the messageheader that is returned. This avoids
88 // the copy. Need to do more benchmarking.
89
90 // Reads a chunk of data into data_. Returns false if no data was read.
91 bool ReadBlock();
92
93 // File descriptor for the log file.
94 int fd_ = -1;
95
96 // Allocator which doesn't zero initialize memory.
97 template <typename T>
98 struct DefaultInitAllocator {
99 typedef T value_type;
100
101 template <typename U>
102 void construct(U *p) {
103 ::new (static_cast<void *>(p)) U;
104 }
105
106 template <typename U, typename... Args>
107 void construct(U *p, Args &&... args) {
108 ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
109 }
110
111 T *allocate(std::size_t n) {
112 return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
113 }
114
115 template <typename U>
116 void deallocate(U *p, std::size_t /*n*/) {
117 ::operator delete(static_cast<void *>(p));
118 }
119 };
120
121 // Vector to read into. This uses an allocator which doesn't zero
122 // initialize the memory.
123 std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
124
125 // Amount of data consumed already in data_.
126 size_t consumed_data_ = 0;
127
128 // Cached bit for if we have reached the end of the file. Otherwise we will
129 // hammer on the kernel asking for more data each time we send.
130 bool end_of_file_ = false;
131};
132
133// Class which handles reading the header and messages from the log file. This
134// handles any per-file state left before merging below.
135class MessageReader {
136 public:
137 MessageReader(std::string_view filename);
138
139 // Returns the header from the log file.
140 const LogFileHeader *log_file_header() const {
141 return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(
142 configuration_.data());
143 }
144
145 // Returns the minimum maount of data needed to queue up for sorting before
146 // ware guarenteed to not see data out of order.
147 std::chrono::nanoseconds max_out_of_order_duration() const {
148 return max_out_of_order_duration_;
149 }
150
151 monotonic_clock::time_point newest_timestamp() const {
152 return newest_timestamp_;
153 }
154
155 // Returns the next message if there is one.
156 std::optional<FlatbufferVector<MessageHeader>> ReadMessage();
157
158 // The time at which we need to read another chunk from the logfile.
159 monotonic_clock::time_point queue_data_time() const {
160 return newest_timestamp() - max_out_of_order_duration();
161 }
162
163 private:
164 // Log chunk reader.
165 SpanReader span_reader_;
166
167 // Vector holding the data for the configuration.
168 std::vector<uint8_t> configuration_;
169
170 // Minimum amount of data to queue up for sorting before we are guarenteed
171 // to not see data out of order.
172 std::chrono::nanoseconds max_out_of_order_duration_;
173
174 // Timestamp of the newest message in a channel queue.
175 monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
176};
177
178// We need to read a large chunk at a time, then kit it up into parts and
179// sort.
180//
181// We want to read 256 KB chunks at a time. This is the fastest read size.
182// This leaves us with a fragmentation problem though.
183//
184// The easy answer is to read 256 KB chunks. Then, malloc and memcpy those
185// chunks into single flatbuffer messages and manage them in a sorted queue.
186// Everything is copied three times (into 256 kb buffer, then into separate
187// buffer, then into sender), but none of it is all that expensive. We can
188// optimize if it is slow later.
189//
190// As we place the elements in the sorted list of times, keep doing this
191// until we read a message that is newer than the threshold.
192//
193// Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
194// small state machine so we can resume), and keep pulling messages back out
195// and sending.
196//
197// For sorting, we want to use the fact that each channel is sorted, and
198// then merge sort the channels. Have a vector of deques, and then hold a
199// sorted list of pointers to those.
200class SortedMessageReader {
201 public:
Austin Schuhfa895892020-01-07 20:07:41 -0800202 SortedMessageReader(const std::vector<std::string> &filenames);
Austin Schuh05b70472020-01-01 17:11:17 -0800203
204 // Returns the header from the log file.
205 const LogFileHeader *log_file_header() const {
Austin Schuhfa895892020-01-07 20:07:41 -0800206 return &log_file_header_.message();
Austin Schuh05b70472020-01-01 17:11:17 -0800207 }
208
209 // Returns a pointer to the channel with the oldest message in it, and the
210 // timestamp.
211 const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
212 return channel_heap_.front();
213 }
214
215 // Returns the number of channels with data still in them.
216 size_t active_channel_count() const { return channel_heap_.size(); }
217
218 // Returns the configuration from the log file header.
219 const Configuration *configuration() const {
220 return log_file_header()->configuration();
221 }
222
223 // Returns the start time on both the monotonic and realtime clocks.
224 monotonic_clock::time_point monotonic_start_time() {
225 return monotonic_clock::time_point(
226 std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
227 }
228 realtime_clock::time_point realtime_start_time() {
229 return realtime_clock::time_point(
230 std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
231 }
232
233 // Returns the node who's point of view this log file is from. Make sure this
234 // is a pointer in the configuration() nodes list so it can be consumed
235 // elsewhere.
236 const Node *node() const {
237 if (configuration()->has_nodes()) {
238 CHECK(log_file_header()->has_node());
239 CHECK(log_file_header()->node()->has_name());
240 return configuration::GetNode(
241 configuration(), log_file_header()->node()->name()->string_view());
242 } else {
243 CHECK(!log_file_header()->has_node());
244 return nullptr;
245 }
246 }
247
248 // Pops a pointer to the channel with the oldest message in it, and the
249 // timestamp.
250 std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
251 PopOldestChannel();
252
253 private:
Austin Schuhfa895892020-01-07 20:07:41 -0800254 // Moves to the next log file in the list.
255 bool NextLogFile();
256
Austin Schuh05b70472020-01-01 17:11:17 -0800257 // Adds more messages to the sorted list.
258 void QueueMessages();
259
260 // Moves the message to the correct channel queue.
261 void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
262
263 // Pushes a pointer to the channel for the given timestamp to the sorted
264 // channel list.
265 void PushChannelHeap(monotonic_clock::time_point timestamp,
266 int channel_index);
267
268
269 // Datastructure to hold the list of messages, cached timestamp for the
270 // oldest message, and sender to send with.
271 struct ChannelData {
272 monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
273 std::deque<FlatbufferVector<MessageHeader>> data;
274 std::unique_ptr<RawSender> raw_sender;
275
276 // Returns the oldest message.
277 const FlatbufferVector<MessageHeader> &front() { return data.front(); }
278
279 // Returns the timestamp for the oldest message.
280 const monotonic_clock::time_point front_timestamp() {
281 return monotonic_clock::time_point(
282 std::chrono::nanoseconds(front().message().monotonic_sent_time()));
283 }
284 };
285
Austin Schuhfa895892020-01-07 20:07:41 -0800286 std::vector<std::string> filenames_;
287 size_t next_filename_index_ = 0;
288
289 FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
290 std::unique_ptr<MessageReader> message_reader_;
Austin Schuh05b70472020-01-01 17:11:17 -0800291
292 // TODO(austin): Multithreaded read at some point. Gotta go faster!
293 // Especially if we start compressing.
294
295 // List of channels and messages for them.
296 std::vector<ChannelData> channels_;
297
298 // Heap of channels so we can track which channel to send next.
299 std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
300
301};
Austin Schuha36c8902019-12-30 18:07:15 -0800302
303} // namespace logger
304} // namespace aos
305
306#endif // AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_