blob: 5b2bfa653e78f089bac1ed867e439df9ca381295 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#ifndef AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
2#define AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
3
4#include <sys/uio.h>
5
Austin Schuh05b70472020-01-01 17:11:17 -08006#include <deque>
7#include <optional>
Austin Schuhfa895892020-01-07 20:07:41 -08008#include <string>
Austin Schuha36c8902019-12-30 18:07:15 -08009#include <string_view>
10#include <vector>
11
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "absl/types/span.h"
Austin Schuha36c8902019-12-30 18:07:15 -080013#include "aos/events/event_loop.h"
14#include "aos/events/logging/logger_generated.h"
15#include "flatbuffers/flatbuffers.h"
16
17namespace aos {
18namespace logger {
19
20enum class LogType : uint8_t {
21 // The message originated on this node and should be logged here.
22 kLogMessage,
23 // The message originated on another node, but only the delivery times are
24 // logged here.
25 kLogDeliveryTimeOnly,
26 // The message originated on another node. Log it and the delivery times
27 // together. The message_gateway is responsible for logging any messages
28 // which didn't get delivered.
29 kLogMessageAndDeliveryTime
30};
31
32
33// This class manages efficiently writing a sequence of detached buffers to a
34// file. It queues them up and batches the write operation.
35class DetachedBufferWriter {
36 public:
37 DetachedBufferWriter(std::string_view filename);
38 ~DetachedBufferWriter();
39
40 // TODO(austin): Snappy compress the log file if it ends with .snappy!
41
42 // Queues up a finished FlatBufferBuilder to be written. Steals the detached
43 // buffer from it.
44 void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
45 // Queues up a detached buffer directly.
46 void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
Austin Schuhde031b72020-01-10 19:34:41 -080047 // Writes a Span. This is not terribly optimized right now.
48 void WriteSizedFlatbuffer(absl::Span<const uint8_t> span);
Austin Schuha36c8902019-12-30 18:07:15 -080049
50 // Triggers data to be provided to the kernel and written.
51 void Flush();
52
53 private:
54 int fd_ = -1;
55
56 // Size of all the data in the queue.
57 size_t queued_size_ = 0;
58
59 // List of buffers to flush.
60 std::vector<flatbuffers::DetachedBuffer> queue_;
61 // List of iovecs to use with writev. This is a member variable to avoid
62 // churn.
63 std::vector<struct iovec> iovec_;
64};
65
66// Packes a message pointed to by the context into a MessageHeader.
67flatbuffers::Offset<MessageHeader> PackMessage(
68 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
69 int channel_index, LogType log_type);
70
Austin Schuh05b70472020-01-01 17:11:17 -080071// Class to read chunks out of a log file.
72class SpanReader {
73 public:
74 SpanReader(std::string_view filename);
Austin Schuha36c8902019-12-30 18:07:15 -080075
Austin Schuh05b70472020-01-01 17:11:17 -080076 ~SpanReader() { close(fd_); }
77
78 // Returns a span with the data for a message from the log file, excluding
79 // the size.
80 absl::Span<const uint8_t> ReadMessage();
81
82 // Returns true if there is a full message available in the buffer, or if we
83 // will have to read more data from disk.
84 bool MessageAvailable();
85
86 private:
87 // TODO(austin): Optimization:
88 // Allocate the 256k blocks like we do today. But, refcount them with
89 // shared_ptr pointed to by the messageheader that is returned. This avoids
90 // the copy. Need to do more benchmarking.
91
92 // Reads a chunk of data into data_. Returns false if no data was read.
93 bool ReadBlock();
94
95 // File descriptor for the log file.
96 int fd_ = -1;
97
98 // Allocator which doesn't zero initialize memory.
99 template <typename T>
100 struct DefaultInitAllocator {
101 typedef T value_type;
102
103 template <typename U>
104 void construct(U *p) {
105 ::new (static_cast<void *>(p)) U;
106 }
107
108 template <typename U, typename... Args>
109 void construct(U *p, Args &&... args) {
110 ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
111 }
112
113 T *allocate(std::size_t n) {
114 return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
115 }
116
117 template <typename U>
118 void deallocate(U *p, std::size_t /*n*/) {
119 ::operator delete(static_cast<void *>(p));
120 }
121 };
122
123 // Vector to read into. This uses an allocator which doesn't zero
124 // initialize the memory.
125 std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
126
127 // Amount of data consumed already in data_.
128 size_t consumed_data_ = 0;
129
130 // Cached bit for if we have reached the end of the file. Otherwise we will
131 // hammer on the kernel asking for more data each time we send.
132 bool end_of_file_ = false;
133};
134
135// Class which handles reading the header and messages from the log file. This
136// handles any per-file state left before merging below.
137class MessageReader {
138 public:
139 MessageReader(std::string_view filename);
140
141 // Returns the header from the log file.
142 const LogFileHeader *log_file_header() const {
143 return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(
144 configuration_.data());
145 }
146
147 // Returns the minimum maount of data needed to queue up for sorting before
148 // ware guarenteed to not see data out of order.
149 std::chrono::nanoseconds max_out_of_order_duration() const {
150 return max_out_of_order_duration_;
151 }
152
153 monotonic_clock::time_point newest_timestamp() const {
154 return newest_timestamp_;
155 }
156
157 // Returns the next message if there is one.
158 std::optional<FlatbufferVector<MessageHeader>> ReadMessage();
159
160 // The time at which we need to read another chunk from the logfile.
161 monotonic_clock::time_point queue_data_time() const {
162 return newest_timestamp() - max_out_of_order_duration();
163 }
164
165 private:
166 // Log chunk reader.
167 SpanReader span_reader_;
168
169 // Vector holding the data for the configuration.
170 std::vector<uint8_t> configuration_;
171
172 // Minimum amount of data to queue up for sorting before we are guarenteed
173 // to not see data out of order.
174 std::chrono::nanoseconds max_out_of_order_duration_;
175
176 // Timestamp of the newest message in a channel queue.
177 monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
178};
179
180// We need to read a large chunk at a time, then kit it up into parts and
181// sort.
182//
183// We want to read 256 KB chunks at a time. This is the fastest read size.
184// This leaves us with a fragmentation problem though.
185//
186// The easy answer is to read 256 KB chunks. Then, malloc and memcpy those
187// chunks into single flatbuffer messages and manage them in a sorted queue.
188// Everything is copied three times (into 256 kb buffer, then into separate
189// buffer, then into sender), but none of it is all that expensive. We can
190// optimize if it is slow later.
191//
192// As we place the elements in the sorted list of times, keep doing this
193// until we read a message that is newer than the threshold.
194//
195// Then repeat. Keep filling up the sorted list with 256 KB chunks (need a
196// small state machine so we can resume), and keep pulling messages back out
197// and sending.
198//
199// For sorting, we want to use the fact that each channel is sorted, and
200// then merge sort the channels. Have a vector of deques, and then hold a
201// sorted list of pointers to those.
202class SortedMessageReader {
203 public:
Austin Schuhfa895892020-01-07 20:07:41 -0800204 SortedMessageReader(const std::vector<std::string> &filenames);
Austin Schuh05b70472020-01-01 17:11:17 -0800205
206 // Returns the header from the log file.
207 const LogFileHeader *log_file_header() const {
Austin Schuhfa895892020-01-07 20:07:41 -0800208 return &log_file_header_.message();
Austin Schuh05b70472020-01-01 17:11:17 -0800209 }
210
211 // Returns a pointer to the channel with the oldest message in it, and the
212 // timestamp.
213 const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
214 return channel_heap_.front();
215 }
216
217 // Returns the number of channels with data still in them.
218 size_t active_channel_count() const { return channel_heap_.size(); }
219
220 // Returns the configuration from the log file header.
221 const Configuration *configuration() const {
222 return log_file_header()->configuration();
223 }
224
225 // Returns the start time on both the monotonic and realtime clocks.
226 monotonic_clock::time_point monotonic_start_time() {
227 return monotonic_clock::time_point(
228 std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
229 }
230 realtime_clock::time_point realtime_start_time() {
231 return realtime_clock::time_point(
232 std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
233 }
234
235 // Returns the node who's point of view this log file is from. Make sure this
236 // is a pointer in the configuration() nodes list so it can be consumed
237 // elsewhere.
238 const Node *node() const {
239 if (configuration()->has_nodes()) {
240 CHECK(log_file_header()->has_node());
241 CHECK(log_file_header()->node()->has_name());
242 return configuration::GetNode(
243 configuration(), log_file_header()->node()->name()->string_view());
244 } else {
245 CHECK(!log_file_header()->has_node());
246 return nullptr;
247 }
248 }
249
250 // Pops a pointer to the channel with the oldest message in it, and the
251 // timestamp.
252 std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
253 PopOldestChannel();
254
255 private:
Austin Schuhfa895892020-01-07 20:07:41 -0800256 // Moves to the next log file in the list.
257 bool NextLogFile();
258
Austin Schuh05b70472020-01-01 17:11:17 -0800259 // Adds more messages to the sorted list.
260 void QueueMessages();
261
262 // Moves the message to the correct channel queue.
263 void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
264
265 // Pushes a pointer to the channel for the given timestamp to the sorted
266 // channel list.
267 void PushChannelHeap(monotonic_clock::time_point timestamp,
268 int channel_index);
269
270
271 // Datastructure to hold the list of messages, cached timestamp for the
272 // oldest message, and sender to send with.
273 struct ChannelData {
274 monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
275 std::deque<FlatbufferVector<MessageHeader>> data;
276 std::unique_ptr<RawSender> raw_sender;
277
278 // Returns the oldest message.
279 const FlatbufferVector<MessageHeader> &front() { return data.front(); }
280
281 // Returns the timestamp for the oldest message.
282 const monotonic_clock::time_point front_timestamp() {
283 return monotonic_clock::time_point(
284 std::chrono::nanoseconds(front().message().monotonic_sent_time()));
285 }
286 };
287
Austin Schuhfa895892020-01-07 20:07:41 -0800288 std::vector<std::string> filenames_;
289 size_t next_filename_index_ = 0;
290
291 FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
292 std::unique_ptr<MessageReader> message_reader_;
Austin Schuh05b70472020-01-01 17:11:17 -0800293
294 // TODO(austin): Multithreaded read at some point. Gotta go faster!
295 // Especially if we start compressing.
296
297 // List of channels and messages for them.
298 std::vector<ChannelData> channels_;
299
300 // Heap of channels so we can track which channel to send next.
301 std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
302
303};
Austin Schuha36c8902019-12-30 18:07:15 -0800304
305} // namespace logger
306} // namespace aos
307
308#endif // AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_