blob: a3d16f7e879efeacd3de4932437c175638476ef5 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#ifndef AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
2#define AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
3
4#include <sys/uio.h>
5
Austin Schuh97789fc2020-08-01 14:42:45 -07006#include <chrono>
Austin Schuh05b70472020-01-01 17:11:17 -08007#include <deque>
Austin Schuh97789fc2020-08-01 14:42:45 -07008#include <limits>
9#include <memory>
Austin Schuh05b70472020-01-01 17:11:17 -080010#include <optional>
Austin Schuhfa895892020-01-07 20:07:41 -080011#include <string>
Austin Schuha36c8902019-12-30 18:07:15 -080012#include <string_view>
Brian Silverman98360e22020-04-28 16:51:20 -070013#include <tuple>
Austin Schuh97789fc2020-08-01 14:42:45 -070014#include <utility>
Austin Schuha36c8902019-12-30 18:07:15 -080015#include <vector>
16
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "absl/types/span.h"
Brian Silvermanf51499a2020-09-21 12:49:08 -070018#include "aos/containers/resizeable_buffer.h"
Austin Schuha36c8902019-12-30 18:07:15 -080019#include "aos/events/event_loop.h"
Brian Silvermanf51499a2020-09-21 12:49:08 -070020#include "aos/events/logging/buffer_encoder.h"
Austin Schuha36c8902019-12-30 18:07:15 -080021#include "aos/events/logging/logger_generated.h"
Brian Silvermanf51499a2020-09-21 12:49:08 -070022#include "aos/flatbuffers.h"
Austin Schuha36c8902019-12-30 18:07:15 -080023#include "flatbuffers/flatbuffers.h"
24
Brian Silvermanf51499a2020-09-21 12:49:08 -070025namespace aos::logger {
Austin Schuha36c8902019-12-30 18:07:15 -080026
27enum class LogType : uint8_t {
28 // The message originated on this node and should be logged here.
29 kLogMessage,
30 // The message originated on another node, but only the delivery times are
31 // logged here.
32 kLogDeliveryTimeOnly,
33 // The message originated on another node. Log it and the delivery times
34 // together. The message_gateway is responsible for logging any messages
35 // which didn't get delivered.
Austin Schuh6f3babe2020-01-26 20:34:50 -080036 kLogMessageAndDeliveryTime,
37 // The message originated on the other node and should be logged on this node.
38 kLogRemoteMessage
Austin Schuha36c8902019-12-30 18:07:15 -080039};
40
Austin Schuha36c8902019-12-30 18:07:15 -080041// This class manages efficiently writing a sequence of detached buffers to a
Brian Silvermanf51499a2020-09-21 12:49:08 -070042// file. It encodes them, queues them up, and batches the write operation.
Austin Schuha36c8902019-12-30 18:07:15 -080043class DetachedBufferWriter {
44 public:
Brian Silvermanf51499a2020-09-21 12:49:08 -070045 DetachedBufferWriter(std::string_view filename,
46 std::unique_ptr<DetachedBufferEncoder> encoder);
Austin Schuh2f8fd752020-09-01 22:38:28 -070047 DetachedBufferWriter(DetachedBufferWriter &&other);
48 DetachedBufferWriter(const DetachedBufferWriter &) = delete;
49
Austin Schuha36c8902019-12-30 18:07:15 -080050 ~DetachedBufferWriter();
51
Austin Schuh2f8fd752020-09-01 22:38:28 -070052 DetachedBufferWriter &operator=(DetachedBufferWriter &&other);
Brian Silverman98360e22020-04-28 16:51:20 -070053 DetachedBufferWriter &operator=(const DetachedBufferWriter &) = delete;
54
Austin Schuh6f3babe2020-01-26 20:34:50 -080055 std::string_view filename() const { return filename_; }
56
Brian Silvermanf51499a2020-09-21 12:49:08 -070057 // Queues up a finished FlatBufferBuilder to be encoded and written.
58 //
59 // Triggers a flush if there's enough data queued up.
60 //
61 // Steals the detached buffer from it.
62 void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb) {
63 QueueSizedFlatbuffer(fbb->Release());
64 }
65 // May steal the backing storage of buffer, or may leave it alone.
66 void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
67 encoder_->Encode(std::move(buffer));
68 FlushAtThreshold();
69 }
Austin Schuha36c8902019-12-30 18:07:15 -080070
Brian Silvermanf51499a2020-09-21 12:49:08 -070071 // Queues up data in span. May copy or may write it to disk immediately.
72 void QueueSpan(absl::Span<const uint8_t> span);
Austin Schuha36c8902019-12-30 18:07:15 -080073
Brian Silverman0465fcf2020-09-24 00:29:18 -070074 // Indicates we got ENOSPC when trying to write. After this returns true, no
75 // further data is written.
76 bool ran_out_of_space() const { return ran_out_of_space_; }
77
78 // To avoid silently failing to write logfiles, you must call this before
79 // destruction if ran_out_of_space() is true and the situation has been
80 // handled.
81 void acknowledge_out_of_space() {
82 CHECK(ran_out_of_space_);
83 acknowledge_ran_out_of_space_ = true;
84 }
85
86 // Fully flushes and closes the underlying file now. No additional data may be
87 // enqueued after calling this.
88 //
89 // This will be performed in the destructor automatically.
90 //
91 // Note that this may set ran_out_of_space().
92 void Close();
93
Brian Silvermanf51499a2020-09-21 12:49:08 -070094 // Returns the total number of bytes written and currently queued.
95 size_t total_bytes() const { return encoder_->total_bytes(); }
Austin Schuha36c8902019-12-30 18:07:15 -080096
Brian Silvermanf51499a2020-09-21 12:49:08 -070097 // The maximum time for a single write call, or 0 if none have been performed.
98 std::chrono::nanoseconds max_write_time() const { return max_write_time_; }
99 // The number of bytes in the longest write call, or -1 if none have been
100 // performed.
101 int max_write_time_bytes() const { return max_write_time_bytes_; }
102 // The number of buffers in the longest write call, or -1 if none have been
103 // performed.
104 int max_write_time_messages() const { return max_write_time_messages_; }
105 // The total time spent in write calls.
106 std::chrono::nanoseconds total_write_time() const {
107 return total_write_time_;
108 }
109 // The total number of writes which have been performed.
110 int total_write_count() const { return total_write_count_; }
111 // The total number of messages which have been written.
112 int total_write_messages() const { return total_write_messages_; }
113 // The total number of bytes which have been written.
114 int total_write_bytes() const { return total_write_bytes_; }
115 void ResetStatistics() {
116 max_write_time_ = std::chrono::nanoseconds::zero();
117 max_write_time_bytes_ = -1;
118 max_write_time_messages_ = -1;
119 total_write_time_ = std::chrono::nanoseconds::zero();
120 total_write_count_ = 0;
121 total_write_messages_ = 0;
122 total_write_bytes_ = 0;
123 }
Brian Silverman98360e22020-04-28 16:51:20 -0700124
Austin Schuha36c8902019-12-30 18:07:15 -0800125 private:
Brian Silvermanf51499a2020-09-21 12:49:08 -0700126 // Performs a single writev call with as much of the data we have queued up as
127 // possible.
128 //
129 // This will normally take all of the data we have queued up, unless an
130 // encoder has spit out a big enough chunk all at once that we can't manage
131 // all of it.
132 void Flush();
133
Brian Silverman0465fcf2020-09-24 00:29:18 -0700134 // write_return is what write(2) or writev(2) returned. write_size is the
135 // number of bytes we expected it to write.
136 void HandleWriteReturn(ssize_t write_return, size_t write_size);
137
Brian Silvermanf51499a2020-09-21 12:49:08 -0700138 void UpdateStatsForWrite(aos::monotonic_clock::duration duration,
139 ssize_t written, int iovec_size);
140
141 // Flushes data if we've reached the threshold to do that as part of normal
142 // operation.
143 void FlushAtThreshold();
144
Austin Schuh2f8fd752020-09-01 22:38:28 -0700145 std::string filename_;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700146 std::unique_ptr<DetachedBufferEncoder> encoder_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800147
Austin Schuha36c8902019-12-30 18:07:15 -0800148 int fd_ = -1;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700149 bool ran_out_of_space_ = false;
150 bool acknowledge_ran_out_of_space_ = false;
Austin Schuha36c8902019-12-30 18:07:15 -0800151
Austin Schuha36c8902019-12-30 18:07:15 -0800152 // List of iovecs to use with writev. This is a member variable to avoid
153 // churn.
154 std::vector<struct iovec> iovec_;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700155
156 std::chrono::nanoseconds max_write_time_ = std::chrono::nanoseconds::zero();
157 int max_write_time_bytes_ = -1;
158 int max_write_time_messages_ = -1;
159 std::chrono::nanoseconds total_write_time_ = std::chrono::nanoseconds::zero();
160 int total_write_count_ = 0;
161 int total_write_messages_ = 0;
162 int total_write_bytes_ = 0;
Austin Schuha36c8902019-12-30 18:07:15 -0800163};
164
165// Packes a message pointed to by the context into a MessageHeader.
166flatbuffers::Offset<MessageHeader> PackMessage(
167 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
168 int channel_index, LogType log_type);
169
Austin Schuh6f3babe2020-01-26 20:34:50 -0800170FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename);
Austin Schuh5212cad2020-09-09 23:12:09 -0700171FlatbufferVector<MessageHeader> ReadNthMessage(std::string_view filename,
172 size_t n);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800173
Austin Schuh05b70472020-01-01 17:11:17 -0800174// Class to read chunks out of a log file.
175class SpanReader {
176 public:
177 SpanReader(std::string_view filename);
Austin Schuha36c8902019-12-30 18:07:15 -0800178
Austin Schuh6f3babe2020-01-26 20:34:50 -0800179 std::string_view filename() const { return filename_; }
180
Austin Schuh05b70472020-01-01 17:11:17 -0800181 // Returns a span with the data for a message from the log file, excluding
182 // the size.
183 absl::Span<const uint8_t> ReadMessage();
184
185 // Returns true if there is a full message available in the buffer, or if we
186 // will have to read more data from disk.
187 bool MessageAvailable();
188
189 private:
190 // TODO(austin): Optimization:
191 // Allocate the 256k blocks like we do today. But, refcount them with
192 // shared_ptr pointed to by the messageheader that is returned. This avoids
193 // the copy. Need to do more benchmarking.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700194 // And (Brian): Consider just mmapping the file and handing out refcounted
195 // pointers into that too.
Austin Schuh05b70472020-01-01 17:11:17 -0800196
197 // Reads a chunk of data into data_. Returns false if no data was read.
198 bool ReadBlock();
199
Austin Schuh6f3babe2020-01-26 20:34:50 -0800200 const std::string filename_;
201
Brian Silvermanf51499a2020-09-21 12:49:08 -0700202 // File reader and data decoder.
203 std::unique_ptr<DataDecoder> decoder_;
Austin Schuh05b70472020-01-01 17:11:17 -0800204
Brian Silvermanf51499a2020-09-21 12:49:08 -0700205 // Vector to read into.
206 ResizeableBuffer data_;
Austin Schuh05b70472020-01-01 17:11:17 -0800207
208 // Amount of data consumed already in data_.
209 size_t consumed_data_ = 0;
Austin Schuh05b70472020-01-01 17:11:17 -0800210};
211
212// Class which handles reading the header and messages from the log file. This
213// handles any per-file state left before merging below.
214class MessageReader {
215 public:
216 MessageReader(std::string_view filename);
217
Austin Schuh6f3babe2020-01-26 20:34:50 -0800218 std::string_view filename() const { return span_reader_.filename(); }
219
Austin Schuh05b70472020-01-01 17:11:17 -0800220 // Returns the header from the log file.
221 const LogFileHeader *log_file_header() const {
Austin Schuh97789fc2020-08-01 14:42:45 -0700222 return &raw_log_file_header_.message();
223 }
224
225 // Returns the raw data of the header from the log file.
226 const FlatbufferVector<LogFileHeader> &raw_log_file_header() const {
227 return raw_log_file_header_;
Austin Schuh05b70472020-01-01 17:11:17 -0800228 }
229
230 // Returns the minimum maount of data needed to queue up for sorting before
231 // ware guarenteed to not see data out of order.
232 std::chrono::nanoseconds max_out_of_order_duration() const {
233 return max_out_of_order_duration_;
234 }
235
Austin Schuhcde938c2020-02-02 17:30:07 -0800236 // Returns the newest timestamp read out of the log file.
Austin Schuh05b70472020-01-01 17:11:17 -0800237 monotonic_clock::time_point newest_timestamp() const {
238 return newest_timestamp_;
239 }
240
241 // Returns the next message if there is one.
242 std::optional<FlatbufferVector<MessageHeader>> ReadMessage();
243
244 // The time at which we need to read another chunk from the logfile.
245 monotonic_clock::time_point queue_data_time() const {
246 return newest_timestamp() - max_out_of_order_duration();
247 }
248
249 private:
250 // Log chunk reader.
251 SpanReader span_reader_;
252
Austin Schuh97789fc2020-08-01 14:42:45 -0700253 // Vector holding the raw data for the log file header.
254 FlatbufferVector<LogFileHeader> raw_log_file_header_;
Austin Schuh05b70472020-01-01 17:11:17 -0800255
256 // Minimum amount of data to queue up for sorting before we are guarenteed
257 // to not see data out of order.
258 std::chrono::nanoseconds max_out_of_order_duration_;
259
260 // Timestamp of the newest message in a channel queue.
261 monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
262};
263
Austin Schuh6f3babe2020-01-26 20:34:50 -0800264class TimestampMerger;
Austin Schuh05b70472020-01-01 17:11:17 -0800265
Austin Schuh6f3babe2020-01-26 20:34:50 -0800266// A design requirement is that the relevant data for a channel is not more than
267// max_out_of_order_duration out of order. We approach sorting in layers.
268//
269// 1) Split each (maybe chunked) log file into one queue per channel. Read this
270// log file looking for data pertaining to a specific node.
271// (SplitMessageReader)
272// 2) Merge all the data per channel from the different log files into a sorted
273// list of timestamps and messages. (TimestampMerger)
274// 3) Combine the timestamps and messages. (TimestampMerger)
275// 4) Merge all the channels to produce the next message on a node.
276// (ChannelMerger)
277// 5) Duplicate this entire stack per node.
278
279// This class splits messages and timestamps up into a queue per channel, and
280// handles reading data from multiple chunks.
281class SplitMessageReader {
282 public:
283 SplitMessageReader(const std::vector<std::string> &filenames);
284
285 // Sets the TimestampMerger that gets notified for each channel. The node
286 // that the TimestampMerger is merging as needs to be passed in.
287 void SetTimestampMerger(TimestampMerger *timestamp_merger, int channel,
288 const Node *target_node);
289
Austin Schuh2f8fd752020-09-01 22:38:28 -0700290 // Returns the (timestamp, queue_index, message_header) for the oldest message
291 // in a channel, or max_time if there is nothing in the channel.
Austin Schuhcde938c2020-02-02 17:30:07 -0800292 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
293 oldest_message(int channel) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800294 return channels_[channel].data.front_timestamp();
295 }
296
Austin Schuh2f8fd752020-09-01 22:38:28 -0700297 // Returns the (timestamp, queue_index, message_header) for the oldest
298 // delivery time in a channel, or max_time if there is nothing in the channel.
Austin Schuhcde938c2020-02-02 17:30:07 -0800299 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
300 oldest_message(int channel, int destination_node) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800301 return channels_[channel].timestamps[destination_node].front_timestamp();
302 }
303
304 // Returns the timestamp, queue_index, and message for the oldest data on a
305 // channel. Requeues data as needed.
306 std::tuple<monotonic_clock::time_point, uint32_t,
307 FlatbufferVector<MessageHeader>>
308 PopOldest(int channel_index);
309
310 // Returns the timestamp, queue_index, and message for the oldest timestamp on
311 // a channel delivered to a node. Requeues data as needed.
312 std::tuple<monotonic_clock::time_point, uint32_t,
313 FlatbufferVector<MessageHeader>>
Austin Schuh2f8fd752020-09-01 22:38:28 -0700314 PopOldestTimestamp(int channel, int node_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800315
316 // Returns the header for the log files.
Austin Schuh05b70472020-01-01 17:11:17 -0800317 const LogFileHeader *log_file_header() const {
Austin Schuhfa895892020-01-07 20:07:41 -0800318 return &log_file_header_.message();
Austin Schuh05b70472020-01-01 17:11:17 -0800319 }
320
Austin Schuh97789fc2020-08-01 14:42:45 -0700321 const FlatbufferVector<LogFileHeader> &raw_log_file_header() const {
322 return log_file_header_;
323 }
324
Austin Schuh6f3babe2020-01-26 20:34:50 -0800325 // Returns the starting time for this set of log files.
Austin Schuh05b70472020-01-01 17:11:17 -0800326 monotonic_clock::time_point monotonic_start_time() {
327 return monotonic_clock::time_point(
328 std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
329 }
330 realtime_clock::time_point realtime_start_time() {
331 return realtime_clock::time_point(
332 std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
333 }
334
Austin Schuh6f3babe2020-01-26 20:34:50 -0800335 // Returns the configuration from the log file header.
336 const Configuration *configuration() const {
337 return log_file_header()->configuration();
338 }
339
Austin Schuh05b70472020-01-01 17:11:17 -0800340 // Returns the node who's point of view this log file is from. Make sure this
341 // is a pointer in the configuration() nodes list so it can be consumed
342 // elsewhere.
343 const Node *node() const {
344 if (configuration()->has_nodes()) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800345 return configuration::GetNodeOrDie(configuration(),
346 log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -0800347 } else {
348 CHECK(!log_file_header()->has_node());
349 return nullptr;
350 }
351 }
352
Austin Schuh6f3babe2020-01-26 20:34:50 -0800353 // Returns the timestamp of the newest message read from the log file, and the
354 // timestamp that we need to re-queue data.
355 monotonic_clock::time_point newest_timestamp() const {
Austin Schuhcde938c2020-02-02 17:30:07 -0800356 return newest_timestamp_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800357 }
358
Austin Schuhcde938c2020-02-02 17:30:07 -0800359 // Returns the next time to trigger a requeue.
360 monotonic_clock::time_point time_to_queue() const { return time_to_queue_; }
361
362 // Returns the minimum amount of data needed to queue up for sorting before
363 // ware guarenteed to not see data out of order.
364 std::chrono::nanoseconds max_out_of_order_duration() const {
365 return message_reader_->max_out_of_order_duration();
366 }
367
368 std::string_view filename() const { return message_reader_->filename(); }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800369
370 // Adds more messages to the sorted list. This reads enough data such that
371 // oldest_message_time can be replayed safely. Returns false if the log file
372 // has all been read.
373 bool QueueMessages(monotonic_clock::time_point oldest_message_time);
Austin Schuh05b70472020-01-01 17:11:17 -0800374
Austin Schuhcde938c2020-02-02 17:30:07 -0800375 // Returns debug strings for a channel, and timestamps for a node.
376 std::string DebugString(int channel) const;
377 std::string DebugString(int channel, int node_index) const;
378
Austin Schuh8bd96322020-02-13 21:18:22 -0800379 // Returns true if all the messages have been queued from the last log file in
380 // the list of log files chunks.
381 bool at_end() const { return at_end_; }
382
Austin Schuh05b70472020-01-01 17:11:17 -0800383 private:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800384 // TODO(austin): Need to copy or refcount the message instead of running
385 // multiple copies of the reader. Or maybe have a "as_node" index and hide it
386 // inside.
387
Austin Schuhfa895892020-01-07 20:07:41 -0800388 // Moves to the next log file in the list.
389 bool NextLogFile();
390
Austin Schuh6f3babe2020-01-26 20:34:50 -0800391 // Filenames of the log files.
392 std::vector<std::string> filenames_;
393 // And the index of the next file to open.
394 size_t next_filename_index_ = 0;
Austin Schuh05b70472020-01-01 17:11:17 -0800395
Austin Schuhee711052020-08-24 16:06:09 -0700396 // Node we are reading as.
397 const Node *target_node_ = nullptr;
398
Austin Schuh6f3babe2020-01-26 20:34:50 -0800399 // Log file header to report. This is a copy.
Austin Schuh97789fc2020-08-01 14:42:45 -0700400 FlatbufferVector<LogFileHeader> log_file_header_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800401 // Current log file being read.
402 std::unique_ptr<MessageReader> message_reader_;
Austin Schuh05b70472020-01-01 17:11:17 -0800403
404 // Datastructure to hold the list of messages, cached timestamp for the
405 // oldest message, and sender to send with.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800406 struct MessageHeaderQueue {
407 // If true, this is a timestamp queue.
408 bool timestamps = false;
Austin Schuh05b70472020-01-01 17:11:17 -0800409
Austin Schuh6f3babe2020-01-26 20:34:50 -0800410 // Returns a reference to the the oldest message.
411 FlatbufferVector<MessageHeader> &front() {
412 CHECK_GT(data_.size(), 0u);
413 return data_.front();
Austin Schuh05b70472020-01-01 17:11:17 -0800414 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800415
Austin Schuhcde938c2020-02-02 17:30:07 -0800416 // Adds a message to the back of the queue. Returns true if it was actually
417 // emplaced.
418 bool emplace_back(FlatbufferVector<MessageHeader> &&msg);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800419
420 // Drops the front message. Invalidates the front() reference.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700421 void PopFront();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800422
423 // The size of the queue.
424 size_t size() { return data_.size(); }
425
Austin Schuhcde938c2020-02-02 17:30:07 -0800426 // Returns a debug string with info about each message in the queue.
427 std::string DebugString() const;
428
Austin Schuh2f8fd752020-09-01 22:38:28 -0700429 // Returns the (timestamp, queue_index, message_header) for the oldest
430 // message.
Austin Schuhcde938c2020-02-02 17:30:07 -0800431 const std::tuple<monotonic_clock::time_point, uint32_t,
432 const MessageHeader *>
433 front_timestamp() {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700434 const MessageHeader &message = front().message();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800435 return std::make_tuple(
Austin Schuh2f8fd752020-09-01 22:38:28 -0700436 monotonic_clock::time_point(
437 std::chrono::nanoseconds(message.monotonic_sent_time())),
438 message.queue_index(), &message);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800439 }
440
441 // Pointer to the timestamp merger for this queue if available.
442 TimestampMerger *timestamp_merger = nullptr;
443 // Pointer to the reader which feeds this queue.
444 SplitMessageReader *split_reader = nullptr;
445
446 private:
447 // The data.
448 std::deque<FlatbufferVector<MessageHeader>> data_;
Austin Schuh05b70472020-01-01 17:11:17 -0800449 };
450
Austin Schuh6f3babe2020-01-26 20:34:50 -0800451 // All the queues needed for a channel. There isn't going to be data in all
452 // of these.
453 struct ChannelData {
454 // The data queue for the channel.
455 MessageHeaderQueue data;
456 // Queues for timestamps for each node.
457 std::vector<MessageHeaderQueue> timestamps;
458 };
Austin Schuhfa895892020-01-07 20:07:41 -0800459
Austin Schuh6f3babe2020-01-26 20:34:50 -0800460 // Data for all the channels.
Austin Schuh05b70472020-01-01 17:11:17 -0800461 std::vector<ChannelData> channels_;
462
Austin Schuh6f3babe2020-01-26 20:34:50 -0800463 // Once we know the node that this SplitMessageReader will be writing as,
464 // there will be only one MessageHeaderQueue that a specific channel matches.
465 // Precompute this here for efficiency.
466 std::vector<MessageHeaderQueue *> channels_to_write_;
467
Austin Schuhcde938c2020-02-02 17:30:07 -0800468 monotonic_clock::time_point time_to_queue_ = monotonic_clock::min_time;
469
470 // Latches true when we hit the end of the last log file and there is no sense
471 // poking it further.
472 bool at_end_ = false;
473
474 // Timestamp of the newest message that was read and actually queued. We want
475 // to track this independently from the log file because we need the
476 // timestamps here to be timestamps of messages that are queued.
477 monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800478};
479
480class ChannelMerger;
481
482// Sorts channels (and timestamps) from multiple log files for a single channel.
483class TimestampMerger {
484 public:
485 TimestampMerger(const Configuration *configuration,
486 std::vector<SplitMessageReader *> split_message_readers,
487 int channel_index, const Node *target_node,
488 ChannelMerger *channel_merger);
489
490 // Metadata used to schedule the message.
491 struct DeliveryTimestamp {
492 monotonic_clock::time_point monotonic_event_time =
493 monotonic_clock::min_time;
494 realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
495
496 monotonic_clock::time_point monotonic_remote_time =
497 monotonic_clock::min_time;
498 realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
499 uint32_t remote_queue_index = 0xffffffff;
500 };
501
502 // Pushes SplitMessageReader onto the timestamp heap. This should only be
503 // called when timestamps are placed in the channel this class is merging for
504 // the reader.
505 void UpdateTimestamp(
506 SplitMessageReader *split_message_reader,
Austin Schuhcde938c2020-02-02 17:30:07 -0800507 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
508 oldest_message_time) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800509 PushTimestampHeap(oldest_message_time, split_message_reader);
510 }
511 // Pushes SplitMessageReader onto the message heap. This should only be
512 // called when data is placed in the channel this class is merging for the
513 // reader.
514 void Update(
515 SplitMessageReader *split_message_reader,
Austin Schuhcde938c2020-02-02 17:30:07 -0800516 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
517 oldest_message_time) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800518 PushMessageHeap(oldest_message_time, split_message_reader);
519 }
520
Austin Schuhcde938c2020-02-02 17:30:07 -0800521 // Returns the oldest combined timestamp and data for this channel. If there
522 // isn't a matching piece of data, returns only the timestamp with no data.
523 // The caller can determine what the appropriate action is to recover.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800524 std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
525
526 // Tracks if the channel merger has pushed this onto it's heap or not.
527 bool pushed() { return pushed_; }
528 // Sets if this has been pushed to the channel merger heap. Should only be
529 // called by the channel merger.
530 void set_pushed(bool pushed) { pushed_ = pushed; }
531
Austin Schuhcde938c2020-02-02 17:30:07 -0800532 // Returns a debug string with the heaps printed out.
533 std::string DebugString() const;
534
Austin Schuh8bd96322020-02-13 21:18:22 -0800535 // Returns true if we have timestamps.
536 bool has_timestamps() const { return has_timestamps_; }
537
538 // Records that one of the log files ran out of data. This should only be
539 // called by a SplitMessageReader.
540 void NoticeAtEnd();
541
Austin Schuh2f8fd752020-09-01 22:38:28 -0700542 aos::monotonic_clock::time_point channel_merger_time() {
543 if (has_timestamps_) {
544 return std::get<0>(timestamp_heap_[0]);
545 } else {
546 return std::get<0>(message_heap_[0]);
547 }
548 }
549
Austin Schuh6f3babe2020-01-26 20:34:50 -0800550 private:
551 // Pushes messages and timestamps to the corresponding heaps.
552 void PushMessageHeap(
Austin Schuhcde938c2020-02-02 17:30:07 -0800553 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
554 timestamp,
Austin Schuh6f3babe2020-01-26 20:34:50 -0800555 SplitMessageReader *split_message_reader);
556 void PushTimestampHeap(
Austin Schuhcde938c2020-02-02 17:30:07 -0800557 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
558 timestamp,
Austin Schuh6f3babe2020-01-26 20:34:50 -0800559 SplitMessageReader *split_message_reader);
560
561 // Pops a message from the message heap. This automatically triggers the
562 // split message reader to re-fetch any new data.
563 std::tuple<monotonic_clock::time_point, uint32_t,
564 FlatbufferVector<MessageHeader>>
565 PopMessageHeap();
Austin Schuhcde938c2020-02-02 17:30:07 -0800566
567 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
568 oldest_message() const;
569 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
570 oldest_timestamp() const;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800571 // Pops a message from the timestamp heap. This automatically triggers the
572 // split message reader to re-fetch any new data.
573 std::tuple<monotonic_clock::time_point, uint32_t,
574 FlatbufferVector<MessageHeader>>
575 PopTimestampHeap();
576
577 const Configuration *configuration_;
578
579 // If true, this is a forwarded channel and timestamps should be matched.
580 bool has_timestamps_ = false;
581
582 // Tracks if the ChannelMerger has pushed this onto it's queue.
583 bool pushed_ = false;
584
585 // The split message readers used for source data.
586 std::vector<SplitMessageReader *> split_message_readers_;
587
588 // The channel to merge.
589 int channel_index_;
590
591 // Our node.
592 int node_index_;
593
594 // Heaps for messages and timestamps.
595 std::vector<
596 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
597 message_heap_;
598 std::vector<
599 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
600 timestamp_heap_;
601
602 // Parent channel merger.
603 ChannelMerger *channel_merger_;
604};
605
606// This class handles constructing all the split message readers, channel
607// mergers, and combining the results.
608class ChannelMerger {
609 public:
610 // Builds a ChannelMerger around a set of log files. These are of the format:
611 // {
612 // {log1_part0, log1_part1, ...},
613 // {log2}
614 // }
615 // The inner vector is a list of log file chunks which form up a log file.
616 // The outer vector is a list of log files with subsets of the messages, or
617 // messages from different nodes.
618 ChannelMerger(const std::vector<std::vector<std::string>> &filenames);
619
620 // Returns the nodes that we know how to merge.
621 const std::vector<const Node *> nodes() const;
622 // Sets the node that we will return messages as. Returns true if the node
623 // has log files and will produce data. This can only be called once, and
624 // will likely corrupt state if called a second time.
625 bool SetNode(const Node *target_node);
626
627 // Everything else needs the node set before it works.
628
629 // Returns a timestamp for the oldest message in this group of logfiles.
Austin Schuh858c9f32020-08-31 16:56:12 -0700630 monotonic_clock::time_point OldestMessageTime() const;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800631 // Pops the oldest message.
632 std::tuple<TimestampMerger::DeliveryTimestamp, int,
633 FlatbufferVector<MessageHeader>>
634 PopOldest();
635
636 // Returns the config for this set of log files.
637 const Configuration *configuration() const {
638 return log_file_header()->configuration();
639 }
640
641 const LogFileHeader *log_file_header() const {
642 return &log_file_header_.message();
643 }
644
645 // Returns the start times for the configured node's log files.
Austin Schuhcde938c2020-02-02 17:30:07 -0800646 monotonic_clock::time_point monotonic_start_time() const {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800647 return monotonic_clock::time_point(
648 std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
649 }
Austin Schuhcde938c2020-02-02 17:30:07 -0800650 realtime_clock::time_point realtime_start_time() const {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800651 return realtime_clock::time_point(
652 std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
653 }
654
655 // Returns the node set by SetNode above.
656 const Node *node() const { return node_; }
657
658 // Called by the TimestampMerger when new data is available with the provided
659 // timestamp and channel_index.
660 void Update(monotonic_clock::time_point timestamp, int channel_index) {
661 PushChannelHeap(timestamp, channel_index);
662 }
663
Austin Schuhcde938c2020-02-02 17:30:07 -0800664 // Returns a debug string with all the heaps in it. Generally only useful for
665 // debugging what went wrong.
666 std::string DebugString() const;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800667
Austin Schuh8bd96322020-02-13 21:18:22 -0800668 // Returns true if one of the log files has finished reading everything. When
669 // log file chunks are involved, this means that the last chunk in a log file
670 // has been read. It is acceptable to be missing data at this point in time.
671 bool at_end() const { return at_end_; }
672
673 // Marks that one of the log files is at the end. This should only be called
674 // by timestamp mergers.
675 void NoticeAtEnd() { at_end_ = true; }
676
Austin Schuhcde938c2020-02-02 17:30:07 -0800677 private:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800678 // Pushes the timestamp for new data on the provided channel.
679 void PushChannelHeap(monotonic_clock::time_point timestamp,
680 int channel_index);
681
Austin Schuh2f8fd752020-09-01 22:38:28 -0700682 // CHECKs that channel_heap_ and timestamp_heap_ are valid heaps.
683 void VerifyHeaps();
684
Austin Schuh6f3babe2020-01-26 20:34:50 -0800685 // All the message readers.
686 std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
687
688 // The log header we are claiming to be.
Austin Schuh97789fc2020-08-01 14:42:45 -0700689 FlatbufferVector<LogFileHeader> log_file_header_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800690
691 // The timestamp mergers which combine data from the split message readers.
692 std::vector<TimestampMerger> timestamp_mergers_;
693
694 // A heap of the channel readers and timestamps for the oldest data in each.
Austin Schuh05b70472020-01-01 17:11:17 -0800695 std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
696
Austin Schuh6f3babe2020-01-26 20:34:50 -0800697 // Configured node.
698 const Node *node_;
699
Austin Schuh8bd96322020-02-13 21:18:22 -0800700 bool at_end_ = false;
701
Austin Schuh6f3babe2020-01-26 20:34:50 -0800702 // Cached copy of the list of nodes.
703 std::vector<const Node *> nodes_;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700704
705 // Last time popped. Used to detect events being returned out of order.
706 monotonic_clock::time_point last_popped_time_ = monotonic_clock::min_time;
Austin Schuh05b70472020-01-01 17:11:17 -0800707};
Austin Schuha36c8902019-12-30 18:07:15 -0800708
Austin Schuhee711052020-08-24 16:06:09 -0700709// Returns the node name with a trailing space, or an empty string if we are on
710// a single node.
711std::string MaybeNodeName(const Node *);
712
Brian Silvermanf51499a2020-09-21 12:49:08 -0700713} // namespace aos::logger
Austin Schuha36c8902019-12-30 18:07:15 -0800714
715#endif // AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_