blob: c75c8fb259dc026b55104eaff60a0707a693a19f [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#ifndef AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
2#define AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
3
4#include <sys/uio.h>
5
Austin Schuh97789fc2020-08-01 14:42:45 -07006#include <chrono>
Austin Schuh05b70472020-01-01 17:11:17 -08007#include <deque>
Austin Schuh97789fc2020-08-01 14:42:45 -07008#include <limits>
9#include <memory>
Austin Schuh05b70472020-01-01 17:11:17 -080010#include <optional>
Austin Schuhfa895892020-01-07 20:07:41 -080011#include <string>
Austin Schuha36c8902019-12-30 18:07:15 -080012#include <string_view>
Brian Silverman98360e22020-04-28 16:51:20 -070013#include <tuple>
Austin Schuh97789fc2020-08-01 14:42:45 -070014#include <utility>
Austin Schuha36c8902019-12-30 18:07:15 -080015#include <vector>
16
Austin Schuh4b5c22a2020-11-30 22:58:43 -080017#include "absl/container/btree_set.h"
Austin Schuh05b70472020-01-01 17:11:17 -080018#include "absl/types/span.h"
Brian Silvermanf51499a2020-09-21 12:49:08 -070019#include "aos/containers/resizeable_buffer.h"
Austin Schuha36c8902019-12-30 18:07:15 -080020#include "aos/events/event_loop.h"
Austin Schuh2dc8c7d2021-07-01 17:41:28 -070021#include "aos/events/logging/boot_timestamp.h"
Brian Silvermanf51499a2020-09-21 12:49:08 -070022#include "aos/events/logging/buffer_encoder.h"
Alexei Strots01395492023-03-20 13:59:56 -070023#include "aos/events/logging/log_backend.h"
Austin Schuhc41603c2020-10-11 16:17:37 -070024#include "aos/events/logging/logfile_sorting.h"
Austin Schuha36c8902019-12-30 18:07:15 -080025#include "aos/events/logging/logger_generated.h"
Brian Silvermanf51499a2020-09-21 12:49:08 -070026#include "aos/flatbuffers.h"
Austin Schuhf2d0e682022-10-16 14:20:58 -070027#include "aos/network/remote_message_generated.h"
Austin Schuha36c8902019-12-30 18:07:15 -080028#include "flatbuffers/flatbuffers.h"
29
Brian Silvermanf51499a2020-09-21 12:49:08 -070030namespace aos::logger {
Austin Schuha36c8902019-12-30 18:07:15 -080031
32enum class LogType : uint8_t {
33 // The message originated on this node and should be logged here.
34 kLogMessage,
35 // The message originated on another node, but only the delivery times are
36 // logged here.
37 kLogDeliveryTimeOnly,
38 // The message originated on another node. Log it and the delivery times
39 // together. The message_gateway is responsible for logging any messages
40 // which didn't get delivered.
Austin Schuh6f3babe2020-01-26 20:34:50 -080041 kLogMessageAndDeliveryTime,
42 // The message originated on the other node and should be logged on this node.
43 kLogRemoteMessage
Austin Schuha36c8902019-12-30 18:07:15 -080044};
45
Austin Schuha36c8902019-12-30 18:07:15 -080046// This class manages efficiently writing a sequence of detached buffers to a
Brian Silvermanf51499a2020-09-21 12:49:08 -070047// file. It encodes them, queues them up, and batches the write operation.
Alexei Strots01395492023-03-20 13:59:56 -070048
Austin Schuha36c8902019-12-30 18:07:15 -080049class DetachedBufferWriter {
50 public:
Brian Silvermana9f2ec92020-10-06 18:00:53 -070051 // Marker struct for one of our constructor overloads.
52 struct already_out_of_space_t {};
53
Alexei Strots01395492023-03-20 13:59:56 -070054 DetachedBufferWriter(std::unique_ptr<FileHandler> file_handler,
Austin Schuh48d10d62022-10-16 22:19:23 -070055 std::unique_ptr<DataEncoder> encoder);
Brian Silvermana9f2ec92020-10-06 18:00:53 -070056 // Creates a dummy instance which won't even open a file. It will act as if
57 // opening the file ran out of space immediately.
58 DetachedBufferWriter(already_out_of_space_t) : ran_out_of_space_(true) {}
Austin Schuh2f8fd752020-09-01 22:38:28 -070059 DetachedBufferWriter(DetachedBufferWriter &&other);
60 DetachedBufferWriter(const DetachedBufferWriter &) = delete;
61
Austin Schuha36c8902019-12-30 18:07:15 -080062 ~DetachedBufferWriter();
63
Austin Schuh2f8fd752020-09-01 22:38:28 -070064 DetachedBufferWriter &operator=(DetachedBufferWriter &&other);
Brian Silverman98360e22020-04-28 16:51:20 -070065 DetachedBufferWriter &operator=(const DetachedBufferWriter &) = delete;
66
Alexei Strots01395492023-03-20 13:59:56 -070067 std::string_view filename() const { return file_handler_->filename(); }
Austin Schuh6f3babe2020-01-26 20:34:50 -080068
Brian Silvermana9f2ec92020-10-06 18:00:53 -070069 // This will be true until Close() is called, unless the file couldn't be
70 // created due to running out of space.
Alexei Strots01395492023-03-20 13:59:56 -070071 bool is_open() const { return file_handler_->is_open(); }
Brian Silvermana9f2ec92020-10-06 18:00:53 -070072
Brian Silvermanf51499a2020-09-21 12:49:08 -070073 // Queues up a finished FlatBufferBuilder to be encoded and written.
74 //
75 // Triggers a flush if there's enough data queued up.
76 //
77 // Steals the detached buffer from it.
Austin Schuh48d10d62022-10-16 22:19:23 -070078 void CopyMessage(DataEncoder::Copier *coppier,
79 aos::monotonic_clock::time_point now);
Austin Schuha36c8902019-12-30 18:07:15 -080080
Brian Silverman0465fcf2020-09-24 00:29:18 -070081 // Indicates we got ENOSPC when trying to write. After this returns true, no
82 // further data is written.
83 bool ran_out_of_space() const { return ran_out_of_space_; }
84
85 // To avoid silently failing to write logfiles, you must call this before
86 // destruction if ran_out_of_space() is true and the situation has been
87 // handled.
88 void acknowledge_out_of_space() {
89 CHECK(ran_out_of_space_);
90 acknowledge_ran_out_of_space_ = true;
91 }
92
93 // Fully flushes and closes the underlying file now. No additional data may be
94 // enqueued after calling this.
95 //
96 // This will be performed in the destructor automatically.
97 //
98 // Note that this may set ran_out_of_space().
99 void Close();
100
Brian Silvermanf51499a2020-09-21 12:49:08 -0700101 // Returns the total number of bytes written and currently queued.
Austin Schuha426f1f2021-03-31 22:27:41 -0700102 size_t total_bytes() const {
103 if (!encoder_) {
104 return 0;
105 }
106 return encoder_->total_bytes();
107 }
Austin Schuha36c8902019-12-30 18:07:15 -0800108
Alexei Strots01395492023-03-20 13:59:56 -0700109 WriteStats* WriteStatistics() const { return file_handler_->WriteStatistics(); }
Brian Silverman98360e22020-04-28 16:51:20 -0700110
Austin Schuha36c8902019-12-30 18:07:15 -0800111 private:
Brian Silvermanf51499a2020-09-21 12:49:08 -0700112 // Performs a single writev call with as much of the data we have queued up as
Austin Schuh8bdfc492023-02-11 12:53:13 -0800113 // possible. now is the time we flushed at, to be recorded in
114 // last_flush_time_.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700115 //
116 // This will normally take all of the data we have queued up, unless an
117 // encoder has spit out a big enough chunk all at once that we can't manage
118 // all of it.
Austin Schuh8bdfc492023-02-11 12:53:13 -0800119 void Flush(aos::monotonic_clock::time_point now);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700120
Brian Silvermanf51499a2020-09-21 12:49:08 -0700121 // Flushes data if we've reached the threshold to do that as part of normal
Austin Schuhbd06ae42021-03-31 22:48:21 -0700122 // operation either due to the outstanding queued data, or because we have
123 // passed our flush period. now is the current time to save some CPU grabbing
124 // the current time. It just needs to be close.
125 void FlushAtThreshold(aos::monotonic_clock::time_point now);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700126
Alexei Strots01395492023-03-20 13:59:56 -0700127 std::unique_ptr<FileHandler> file_handler_;
Austin Schuh48d10d62022-10-16 22:19:23 -0700128 std::unique_ptr<DataEncoder> encoder_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800129
Brian Silverman0465fcf2020-09-24 00:29:18 -0700130 bool ran_out_of_space_ = false;
131 bool acknowledge_ran_out_of_space_ = false;
Austin Schuha36c8902019-12-30 18:07:15 -0800132
Austin Schuhbd06ae42021-03-31 22:48:21 -0700133 aos::monotonic_clock::time_point last_flush_time_ =
134 aos::monotonic_clock::min_time;
Austin Schuha36c8902019-12-30 18:07:15 -0800135};
136
Alexei Strots01395492023-03-20 13:59:56 -0700137// Specialized writer to single file
138class DetachedBufferFileWriter : public FileBackend,
139 public DetachedBufferWriter {
140 public:
141 DetachedBufferFileWriter(std::string_view filename,
142 std::unique_ptr<DataEncoder> encoder)
143 : FileBackend("/"),
144 DetachedBufferWriter(FileBackend::RequestFile(filename),
145 std::move(encoder)) {}
146};
147
Austin Schuhf2d0e682022-10-16 14:20:58 -0700148// Repacks the provided RemoteMessage into fbb.
149flatbuffers::Offset<MessageHeader> PackRemoteMessage(
150 flatbuffers::FlatBufferBuilder *fbb,
151 const message_bridge::RemoteMessage *msg, int channel_index,
152 const aos::monotonic_clock::time_point monotonic_timestamp_time);
153
154constexpr flatbuffers::uoffset_t PackRemoteMessageSize() { return 96u; }
155size_t PackRemoteMessageInline(
156 uint8_t *data, const message_bridge::RemoteMessage *msg, int channel_index,
Austin Schuh71a40d42023-02-04 21:22:22 -0800157 const aos::monotonic_clock::time_point monotonic_timestamp_time,
158 size_t start_byte, size_t end_byte);
Austin Schuhf2d0e682022-10-16 14:20:58 -0700159
Austin Schuha36c8902019-12-30 18:07:15 -0800160// Packes a message pointed to by the context into a MessageHeader.
161flatbuffers::Offset<MessageHeader> PackMessage(
162 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
163 int channel_index, LogType log_type);
164
Austin Schuhfa30c352022-10-16 11:12:02 -0700165// Returns the size that the packed message from PackMessage or
166// PackMessageInline will be.
Austin Schuh48d10d62022-10-16 22:19:23 -0700167flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size);
Austin Schuhfa30c352022-10-16 11:12:02 -0700168
169// Packs the provided message pointed to by context into the provided buffer.
170// This is equivalent to PackMessage, but doesn't require allocating a
171// FlatBufferBuilder underneath.
172size_t PackMessageInline(uint8_t *data, const Context &contex,
Austin Schuh71a40d42023-02-04 21:22:22 -0800173 int channel_index, LogType log_type, size_t start_byte,
174 size_t end_byte);
Austin Schuhfa30c352022-10-16 11:12:02 -0700175
Austin Schuh05b70472020-01-01 17:11:17 -0800176// Class to read chunks out of a log file.
177class SpanReader {
178 public:
Austin Schuhcd368422021-11-22 21:23:29 -0800179 SpanReader(std::string_view filename, bool quiet = false);
Austin Schuha36c8902019-12-30 18:07:15 -0800180
Austin Schuh6f3babe2020-01-26 20:34:50 -0800181 std::string_view filename() const { return filename_; }
182
Brian Smarttea913d42021-12-10 15:02:38 -0800183 size_t TotalRead() const { return total_read_; }
184 size_t TotalConsumed() const { return total_consumed_; }
Austin Schuh60e77942022-05-16 17:48:24 -0700185 bool IsIncomplete() const {
186 return is_finished_ && total_consumed_ < total_read_;
187 }
Brian Smarttea913d42021-12-10 15:02:38 -0800188
Austin Schuhcf5f6442021-07-06 10:43:28 -0700189 // Returns a span with the data for the next message from the log file,
190 // including the size. The result is only guarenteed to be valid until
191 // ReadMessage() or PeekMessage() is called again.
Austin Schuh05b70472020-01-01 17:11:17 -0800192 absl::Span<const uint8_t> ReadMessage();
193
Austin Schuhcf5f6442021-07-06 10:43:28 -0700194 // Returns a span with the data for the next message without consuming it.
195 // Multiple calls to PeekMessage return the same data. ReadMessage or
196 // ConsumeMessage must be called to get the next message.
197 absl::Span<const uint8_t> PeekMessage();
198 // Consumes the message so the next call to ReadMessage or PeekMessage returns
199 // new data. This does not invalidate the data.
200 void ConsumeMessage();
201
Austin Schuh05b70472020-01-01 17:11:17 -0800202 private:
203 // TODO(austin): Optimization:
204 // Allocate the 256k blocks like we do today. But, refcount them with
205 // shared_ptr pointed to by the messageheader that is returned. This avoids
206 // the copy. Need to do more benchmarking.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700207 // And (Brian): Consider just mmapping the file and handing out refcounted
208 // pointers into that too.
Austin Schuh05b70472020-01-01 17:11:17 -0800209
210 // Reads a chunk of data into data_. Returns false if no data was read.
211 bool ReadBlock();
212
Austin Schuhc41603c2020-10-11 16:17:37 -0700213 std::string filename_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800214
Brian Silvermanf51499a2020-09-21 12:49:08 -0700215 // File reader and data decoder.
216 std::unique_ptr<DataDecoder> decoder_;
Austin Schuh05b70472020-01-01 17:11:17 -0800217
Brian Silvermanf51499a2020-09-21 12:49:08 -0700218 // Vector to read into.
219 ResizeableBuffer data_;
Austin Schuh05b70472020-01-01 17:11:17 -0800220
221 // Amount of data consumed already in data_.
222 size_t consumed_data_ = 0;
Brian Smarttea913d42021-12-10 15:02:38 -0800223
224 // Accumulates the total volume of bytes read from filename_
225 size_t total_read_ = 0;
226
227 // Accumulates the total volume of read bytes that were 'consumed' into
228 // messages. May be less than total_read_, if the last message (span) is
229 // either truncated or somehow corrupt.
230 size_t total_consumed_ = 0;
231
232 // Reached the end, no more readable messages.
233 bool is_finished_ = false;
Austin Schuh05b70472020-01-01 17:11:17 -0800234};
235
Brian Silvermanfee16972021-09-14 12:06:38 -0700236// Reads the last header from a log file. This handles any duplicate headers
237// that were written.
238std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
239 SpanReader *span_reader);
240std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
241 std::string_view filename);
242// Reads the Nth message from a log file, excluding the header. Note: this
243// doesn't handle duplicate headers.
244std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
245 std::string_view filename, size_t n);
246
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700247class UnpackedMessageHeader;
248
Austin Schuh05b70472020-01-01 17:11:17 -0800249// Class which handles reading the header and messages from the log file. This
250// handles any per-file state left before merging below.
251class MessageReader {
252 public:
253 MessageReader(std::string_view filename);
254
Austin Schuh6f3babe2020-01-26 20:34:50 -0800255 std::string_view filename() const { return span_reader_.filename(); }
256
Austin Schuh05b70472020-01-01 17:11:17 -0800257 // Returns the header from the log file.
258 const LogFileHeader *log_file_header() const {
Austin Schuh97789fc2020-08-01 14:42:45 -0700259 return &raw_log_file_header_.message();
260 }
261
262 // Returns the raw data of the header from the log file.
Austin Schuhadd6eb32020-11-09 21:24:26 -0800263 const SizePrefixedFlatbufferVector<LogFileHeader> &raw_log_file_header()
264 const {
Austin Schuh97789fc2020-08-01 14:42:45 -0700265 return raw_log_file_header_;
Austin Schuh05b70472020-01-01 17:11:17 -0800266 }
267
268 // Returns the minimum maount of data needed to queue up for sorting before
269 // ware guarenteed to not see data out of order.
270 std::chrono::nanoseconds max_out_of_order_duration() const {
271 return max_out_of_order_duration_;
272 }
273
Austin Schuhcde938c2020-02-02 17:30:07 -0800274 // Returns the newest timestamp read out of the log file.
Austin Schuh05b70472020-01-01 17:11:17 -0800275 monotonic_clock::time_point newest_timestamp() const {
276 return newest_timestamp_;
277 }
278
279 // Returns the next message if there is one.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700280 std::shared_ptr<UnpackedMessageHeader> ReadMessage();
Austin Schuh05b70472020-01-01 17:11:17 -0800281
282 // The time at which we need to read another chunk from the logfile.
283 monotonic_clock::time_point queue_data_time() const {
284 return newest_timestamp() - max_out_of_order_duration();
285 }
286
Brian Smarttea913d42021-12-10 15:02:38 -0800287 // Flag value setters for testing
288 void set_crash_on_corrupt_message_flag(bool b) {
289 crash_on_corrupt_message_flag_ = b;
290 }
291 void set_ignore_corrupt_messages_flag(bool b) {
292 ignore_corrupt_messages_flag_ = b;
293 }
294
Austin Schuh05b70472020-01-01 17:11:17 -0800295 private:
296 // Log chunk reader.
297 SpanReader span_reader_;
298
Austin Schuh97789fc2020-08-01 14:42:45 -0700299 // Vector holding the raw data for the log file header.
Austin Schuhadd6eb32020-11-09 21:24:26 -0800300 SizePrefixedFlatbufferVector<LogFileHeader> raw_log_file_header_;
Austin Schuh05b70472020-01-01 17:11:17 -0800301
302 // Minimum amount of data to queue up for sorting before we are guarenteed
303 // to not see data out of order.
304 std::chrono::nanoseconds max_out_of_order_duration_;
305
306 // Timestamp of the newest message in a channel queue.
307 monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
Brian Smarttea913d42021-12-10 15:02:38 -0800308
309 // Total volume of verifiable messages from the beginning of the file.
310 // TODO - are message counts also useful?
311 size_t total_verified_before_ = 0;
312
313 // Total volume of messages with corrupted flatbuffer formatting, if any.
314 // Excludes corrupted message content.
315 // TODO - if the layout included something as simple as a CRC (relatively
316 // fast and robust enough) for each span, then corrupted content could be
317 // included in this check.
318 size_t total_corrupted_ = 0;
319
320 // Total volume of verifiable messages intermixed with corrupted messages,
321 // if any. Will be == 0 if total_corrupted_ == 0.
322 size_t total_verified_during_ = 0;
323
324 // Total volume of verifiable messages found after the last corrupted one,
325 // if any. Will be == 0 if total_corrupted_ == 0.
326 size_t total_verified_after_ = 0;
327
328 bool is_corrupted() const { return total_corrupted_ > 0; }
329
330 bool crash_on_corrupt_message_flag_ = true;
331 bool ignore_corrupt_messages_flag_ = false;
Austin Schuh05b70472020-01-01 17:11:17 -0800332};
333
Austin Schuhc41603c2020-10-11 16:17:37 -0700334// A class to seamlessly read messages from a list of part files.
335class PartsMessageReader {
336 public:
337 PartsMessageReader(LogParts log_parts);
338
339 std::string_view filename() const { return message_reader_.filename(); }
340
Austin Schuhd2f96102020-12-01 20:27:29 -0800341 // Returns the LogParts that holds the filenames we are reading.
342 const LogParts &parts() const { return parts_; }
343
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800344 const LogFileHeader *log_file_header() const {
345 return message_reader_.log_file_header();
346 }
347
Austin Schuhc41603c2020-10-11 16:17:37 -0700348 // Returns the minimum amount of data needed to queue up for sorting before
349 // we are guarenteed to not see data out of order.
350 std::chrono::nanoseconds max_out_of_order_duration() const {
351 return message_reader_.max_out_of_order_duration();
352 }
353
354 // Returns the newest timestamp read out of the log file.
355 monotonic_clock::time_point newest_timestamp() const {
356 return newest_timestamp_;
357 }
358
359 // Returns the next message if there is one, or nullopt if we have reached the
360 // end of all the files.
361 // Note: reading the next message may change the max_out_of_order_duration().
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700362 std::shared_ptr<UnpackedMessageHeader> ReadMessage();
Austin Schuhc41603c2020-10-11 16:17:37 -0700363
Austin Schuh48507722021-07-17 17:29:24 -0700364 // Returns the boot count for the requested node, or std::nullopt if we don't
365 // know.
366 std::optional<size_t> boot_count(size_t node_index) const {
367 CHECK_GE(node_index, 0u);
368 CHECK_LT(node_index, boot_counts_.size());
369 return boot_counts_[node_index];
370 }
371
Austin Schuhc41603c2020-10-11 16:17:37 -0700372 private:
373 // Opens the next log and updates message_reader_. Sets done_ if there is
374 // nothing more to do.
375 void NextLog();
Austin Schuh48507722021-07-17 17:29:24 -0700376 void ComputeBootCounts();
Austin Schuhc41603c2020-10-11 16:17:37 -0700377
378 const LogParts parts_;
379 size_t next_part_index_ = 1u;
380 bool done_ = false;
381 MessageReader message_reader_;
Brian Silvermanfee16972021-09-14 12:06:38 -0700382 // We instantiate the next one early, to allow implementations to prefetch.
383 // TODO(Brian): To get optimal performance when downloading, this needs more
384 // communication with the implementation to prioritize the next part and add
385 // more parallelism when it helps. Maybe some kind of a queue of parts in
386 // order, and the implementation gets to pull however many make sense off the
387 // front?
388 std::optional<MessageReader> next_message_reader_;
Austin Schuhc41603c2020-10-11 16:17:37 -0700389
Austin Schuh315b96b2020-12-11 21:21:12 -0800390 // True after we have seen a message after the start of the log. The
391 // guarentees on logging essentially are that all data from before the
392 // starting time of the log may be arbitrarily out of order, but once we get
393 // max_out_of_order_duration past the start, everything will remain within
394 // max_out_of_order_duration. We shouldn't see anything before the start
395 // after we've seen a message that is at least max_out_of_order_duration after
396 // the start.
397 bool after_start_ = false;
398
Austin Schuhc41603c2020-10-11 16:17:37 -0700399 monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
Austin Schuh48507722021-07-17 17:29:24 -0700400
401 // Per node boot counts.
402 std::vector<std::optional<size_t>> boot_counts_;
Austin Schuhc41603c2020-10-11 16:17:37 -0700403};
404
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700405// Stores MessageHeader as a flat header and inline, aligned block of data.
406class UnpackedMessageHeader {
407 public:
James Kuszmaul9776b392023-01-14 14:08:08 -0800408 UnpackedMessageHeader(
409 uint32_t channel_index, monotonic_clock::time_point monotonic_sent_time,
410 realtime_clock::time_point realtime_sent_time, uint32_t queue_index,
411 std::optional<monotonic_clock::time_point> monotonic_remote_time,
412 std::optional<realtime_clock::time_point> realtime_remote_time,
413 std::optional<uint32_t> remote_queue_index,
414 monotonic_clock::time_point monotonic_timestamp_time,
415 bool has_monotonic_timestamp_time, absl::Span<const uint8_t> span)
416 : channel_index(channel_index),
417 monotonic_sent_time(monotonic_sent_time),
418 realtime_sent_time(realtime_sent_time),
419 queue_index(queue_index),
420 monotonic_remote_time(monotonic_remote_time),
421 realtime_remote_time(realtime_remote_time),
422 remote_queue_index(remote_queue_index),
423 monotonic_timestamp_time(monotonic_timestamp_time),
424 has_monotonic_timestamp_time(has_monotonic_timestamp_time),
425 span(span) {}
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700426 UnpackedMessageHeader(const UnpackedMessageHeader &) = delete;
427 UnpackedMessageHeader &operator=(const UnpackedMessageHeader &) = delete;
428
429 // The channel.
430 uint32_t channel_index = 0xffffffff;
431
432 monotonic_clock::time_point monotonic_sent_time;
433 realtime_clock::time_point realtime_sent_time;
434
435 // The local queue index.
436 uint32_t queue_index = 0xffffffff;
437
Austin Schuh826e6ce2021-11-18 20:33:10 -0800438 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700439
440 std::optional<realtime_clock::time_point> realtime_remote_time;
441 std::optional<uint32_t> remote_queue_index;
442
443 // This field is defaulted in the flatbuffer, so we need to store both the
444 // possibly defaulted value and whether it is defaulted.
445 monotonic_clock::time_point monotonic_timestamp_time;
446 bool has_monotonic_timestamp_time;
447
448 static std::shared_ptr<UnpackedMessageHeader> MakeMessage(
449 const MessageHeader &message);
450
451 // Note: we are storing a span here because we need something to put in the
452 // SharedSpan pointer that RawSender takes. We are using the aliasing
453 // constructor of shared_ptr to avoid the allocation, and it needs a nice
454 // pointer to track.
455 absl::Span<const uint8_t> span;
456
457 char actual_data[];
458
459 private:
460 ~UnpackedMessageHeader() {}
461
462 static void DestroyAndFree(UnpackedMessageHeader *p) {
463 p->~UnpackedMessageHeader();
464 free(p);
465 }
466};
467
468std::ostream &operator<<(std::ostream &os,
469 const UnpackedMessageHeader &message);
470
Austin Schuh1be0ce42020-11-29 22:43:26 -0800471// Struct to hold a message as it gets sorted on a single node.
472struct Message {
473 // The channel.
474 uint32_t channel_index = 0xffffffff;
475 // The local queue index.
Austin Schuh58646e22021-08-23 23:51:46 -0700476 // TODO(austin): Technically the boot inside queue_index is redundant with
477 // timestamp. In practice, it is less error-prone to duplicate it. Maybe a
478 // function to return the combined struct?
479 BootQueueIndex queue_index;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700480 // The local timestamp.
481 BootTimestamp timestamp;
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700482
Austin Schuh48507722021-07-17 17:29:24 -0700483 // Remote boot when this is a timestamp.
484 size_t monotonic_remote_boot = 0xffffff;
485
486 size_t monotonic_timestamp_boot = 0xffffff;
487
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700488 std::shared_ptr<UnpackedMessageHeader> data;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800489
490 bool operator<(const Message &m2) const;
491 bool operator>=(const Message &m2) const;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800492 bool operator==(const Message &m2) const;
Austin Schuh1be0ce42020-11-29 22:43:26 -0800493};
494
495std::ostream &operator<<(std::ostream &os, const Message &m);
496
Austin Schuhd2f96102020-12-01 20:27:29 -0800497// Structure to hold a full message and all the timestamps, which may or may not
498// have been sent from a remote node. The remote_queue_index will be invalid if
499// this message is from the point of view of the node which sent it.
500struct TimestampedMessage {
501 uint32_t channel_index = 0xffffffff;
502
Austin Schuh58646e22021-08-23 23:51:46 -0700503 BootQueueIndex queue_index;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700504 BootTimestamp monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800505 realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
506
Austin Schuh58646e22021-08-23 23:51:46 -0700507 BootQueueIndex remote_queue_index;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700508 BootTimestamp monotonic_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800509 realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
510
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700511 BootTimestamp monotonic_timestamp_time;
Austin Schuh8bf1e632021-01-02 22:41:04 -0800512
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700513 std::shared_ptr<UnpackedMessageHeader> data;
Austin Schuhd2f96102020-12-01 20:27:29 -0800514};
515
516std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m);
517
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800518// Class to sort the resulting messages from a PartsMessageReader.
519class LogPartsSorter {
520 public:
521 LogPartsSorter(LogParts log_parts);
522
Austin Schuh0ca51f32020-12-25 21:51:45 -0800523 // Returns the parts that this is sorting messages from.
524 const LogParts &parts() const { return parts_message_reader_.parts(); }
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800525
Austin Schuhd2f96102020-12-01 20:27:29 -0800526 monotonic_clock::time_point monotonic_start_time() const {
Austin Schuh0ca51f32020-12-25 21:51:45 -0800527 return parts().monotonic_start_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800528 }
529 realtime_clock::time_point realtime_start_time() const {
Austin Schuh0ca51f32020-12-25 21:51:45 -0800530 return parts().realtime_start_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800531 }
532
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800533 // The time this data is sorted until.
534 monotonic_clock::time_point sorted_until() const { return sorted_until_; }
535
536 // Returns the next sorted message from the log file. It is safe to call
537 // std::move() on the result to move the data flatbuffer from it.
538 Message *Front();
539 // Pops the front message. This should only be called after a call to
540 // Front().
541 void PopFront();
542
543 // Returns a debug string representing the contents of this sorter.
544 std::string DebugString() const;
545
546 private:
547 // Log parts reader we are wrapping.
548 PartsMessageReader parts_message_reader_;
549 // Cache of the time we are sorted until.
550 aos::monotonic_clock::time_point sorted_until_ = monotonic_clock::min_time;
551
Austin Schuhb000de62020-12-03 22:00:40 -0800552 // Timestamp of the last message returned. Used to make sure nothing goes
553 // backwards.
554 monotonic_clock::time_point last_message_time_ = monotonic_clock::min_time;
555
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800556 // Set used for efficient sorting of messages. We can benchmark and evaluate
557 // other data structures if this proves to be the bottleneck.
558 absl::btree_set<Message> messages_;
Austin Schuh48507722021-07-17 17:29:24 -0700559
560 // Mapping from channel to source node.
561 // TODO(austin): Should we put this in Boots so it can be cached for everyone?
562 std::vector<size_t> source_node_index_;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800563};
564
Austin Schuh8f52ed52020-11-30 23:12:39 -0800565// Class to run merge sort on the messages from multiple LogPartsSorter
566// instances.
567class NodeMerger {
568 public:
Austin Schuhd2f96102020-12-01 20:27:29 -0800569 NodeMerger(std::vector<LogParts> parts);
570
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700571 // Copying and moving will mess up the internal raw pointers. Just don't do
572 // it.
573 NodeMerger(NodeMerger const &) = delete;
574 NodeMerger(NodeMerger &&) = delete;
575 void operator=(NodeMerger const &) = delete;
576 void operator=(NodeMerger &&) = delete;
577
Austin Schuhd2f96102020-12-01 20:27:29 -0800578 // Node index in the configuration of this node.
579 int node() const { return node_; }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800580
Austin Schuh0ca51f32020-12-25 21:51:45 -0800581 // List of parts being sorted together.
582 std::vector<const LogParts *> Parts() const;
583
584 const Configuration *configuration() const {
585 return parts_sorters_[0].parts().config.get();
Austin Schuhd2f96102020-12-01 20:27:29 -0800586 }
587
588 monotonic_clock::time_point monotonic_start_time() const {
589 return monotonic_start_time_;
590 }
591 realtime_clock::time_point realtime_start_time() const {
592 return realtime_start_time_;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800593 }
Austin Schuh5dd22842021-11-17 16:09:39 -0800594 monotonic_clock::time_point monotonic_oldest_time() const {
595 return monotonic_oldest_time_;
596 }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800597
598 // The time this data is sorted until.
599 monotonic_clock::time_point sorted_until() const { return sorted_until_; }
600
601 // Returns the next sorted message from the set of log files. It is safe to
602 // call std::move() on the result to move the data flatbuffer from it.
603 Message *Front();
604 // Pops the front message. This should only be called after a call to
605 // Front().
606 void PopFront();
607
608 private:
609 // Unsorted list of all parts sorters.
Austin Schuhd2f96102020-12-01 20:27:29 -0800610 std::vector<LogPartsSorter> parts_sorters_;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800611 // Pointer to the parts sorter holding the current Front message if one
612 // exists, or nullptr if a new one needs to be found.
613 LogPartsSorter *current_ = nullptr;
614 // Cached sorted_until value.
615 aos::monotonic_clock::time_point sorted_until_ = monotonic_clock::min_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800616
617 // Cached node.
618 int node_;
619
Austin Schuhb000de62020-12-03 22:00:40 -0800620 // Timestamp of the last message returned. Used to make sure nothing goes
621 // backwards.
622 monotonic_clock::time_point last_message_time_ = monotonic_clock::min_time;
623
Austin Schuhd2f96102020-12-01 20:27:29 -0800624 realtime_clock::time_point realtime_start_time_ = realtime_clock::max_time;
625 monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh60e77942022-05-16 17:48:24 -0700626 monotonic_clock::time_point monotonic_oldest_time_ =
627 monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800628};
629
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700630// Class to concatenate multiple boots worth of logs into a single per-node
631// stream.
632class BootMerger {
633 public:
634 BootMerger(std::vector<LogParts> file);
635
636 // Copying and moving will mess up the internal raw pointers. Just don't do
637 // it.
638 BootMerger(BootMerger const &) = delete;
639 BootMerger(BootMerger &&) = delete;
640 void operator=(BootMerger const &) = delete;
641 void operator=(BootMerger &&) = delete;
642
643 // Node index in the configuration of this node.
644 int node() const { return node_mergers_[0]->node(); }
645
646 // List of parts being sorted together.
647 std::vector<const LogParts *> Parts() const;
648
649 const Configuration *configuration() const {
650 return node_mergers_[0]->configuration();
651 }
652
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700653 monotonic_clock::time_point monotonic_start_time(size_t boot) const {
654 CHECK_LT(boot, node_mergers_.size());
655 return node_mergers_[boot]->monotonic_start_time();
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700656 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700657 realtime_clock::time_point realtime_start_time(size_t boot) const {
658 CHECK_LT(boot, node_mergers_.size());
659 return node_mergers_[boot]->realtime_start_time();
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700660 }
Austin Schuh5dd22842021-11-17 16:09:39 -0800661 monotonic_clock::time_point monotonic_oldest_time(size_t boot) const {
662 CHECK_LT(boot, node_mergers_.size());
663 return node_mergers_[boot]->monotonic_oldest_time();
664 }
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700665
666 bool started() const {
667 return node_mergers_[index_]->sorted_until() != monotonic_clock::min_time ||
668 index_ != 0;
669 }
670
671 // Returns the next sorted message from the set of log files. It is safe to
672 // call std::move() on the result to move the data flatbuffer from it.
673 Message *Front();
674 // Pops the front message. This should only be called after a call to
675 // Front().
676 void PopFront();
677
678 private:
679 int index_ = 0;
680
681 // TODO(austin): Sanjay points out this is pretty inefficient. Don't keep so
682 // many things open.
683 std::vector<std::unique_ptr<NodeMerger>> node_mergers_;
684};
685
Austin Schuhd2f96102020-12-01 20:27:29 -0800686// Class to match timestamps with the corresponding data from other nodes.
Austin Schuh79b30942021-01-24 22:32:21 -0800687//
688// This class also buffers data for the node it represents, and supports
689// notifying when new data is queued as well as queueing until a point in time.
Austin Schuhd2f96102020-12-01 20:27:29 -0800690class TimestampMapper {
691 public:
692 TimestampMapper(std::vector<LogParts> file);
693
694 // Copying and moving will mess up the internal raw pointers. Just don't do
695 // it.
696 TimestampMapper(TimestampMapper const &) = delete;
697 TimestampMapper(TimestampMapper &&) = delete;
698 void operator=(TimestampMapper const &) = delete;
699 void operator=(TimestampMapper &&) = delete;
700
701 // TODO(austin): It would be super helpful to provide a way to queue up to
702 // time X without matching timestamps, and to then be able to pull the
703 // timestamps out of this queue. This lets us bootstrap time estimation
704 // without exploding memory usage worst case.
705
Austin Schuh0ca51f32020-12-25 21:51:45 -0800706 const Configuration *configuration() const { return configuration_.get(); }
Austin Schuhd2f96102020-12-01 20:27:29 -0800707
708 // Returns which node this is sorting for.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700709 size_t node() const { return boot_merger_.node(); }
Austin Schuhd2f96102020-12-01 20:27:29 -0800710
711 // The start time of this log.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700712 monotonic_clock::time_point monotonic_start_time(size_t boot) const {
713 return boot_merger_.monotonic_start_time(boot);
Austin Schuhd2f96102020-12-01 20:27:29 -0800714 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700715 realtime_clock::time_point realtime_start_time(size_t boot) const {
716 return boot_merger_.realtime_start_time(boot);
Austin Schuhd2f96102020-12-01 20:27:29 -0800717 }
Austin Schuh5dd22842021-11-17 16:09:39 -0800718 // Returns the oldest timestamp on a message on this boot.
719 monotonic_clock::time_point monotonic_oldest_time(size_t boot) const {
720 return boot_merger_.monotonic_oldest_time(boot);
721 }
Austin Schuhd2f96102020-12-01 20:27:29 -0800722
723 // Uses timestamp_mapper as the peer for its node. Only one mapper may be set
724 // for each node. Peers are used to look up the data for timestamps on this
725 // node.
726 void AddPeer(TimestampMapper *timestamp_mapper);
727
Austin Schuh24bf4972021-06-29 22:09:08 -0700728 // Returns true if anything has been queued up.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700729 bool started() const { return boot_merger_.started(); }
Austin Schuhd2f96102020-12-01 20:27:29 -0800730
731 // Returns the next message for this node.
732 TimestampedMessage *Front();
733 // Pops the next message. Front must be called first.
734 void PopFront();
735
736 // Returns debug information about this node.
737 std::string DebugString() const;
738
Austin Schuh79b30942021-01-24 22:32:21 -0800739 // Queues data the provided time.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700740 void QueueUntil(BootTimestamp queue_time);
Austin Schuhe639ea12021-01-25 13:00:22 -0800741 // Queues until we have time_estimation_buffer of data in the queue.
742 void QueueFor(std::chrono::nanoseconds time_estimation_buffer);
Austin Schuh79b30942021-01-24 22:32:21 -0800743
Austin Schuh06601222021-01-26 17:02:50 -0800744 // Queues until the condition is met.
745 template <typename T>
746 void QueueUntilCondition(T fn) {
747 while (true) {
748 if (fn()) {
749 break;
750 }
751 if (!QueueMatched()) {
752 break;
753 }
754 }
755 }
756
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700757 // Sets the callback that can be used to skip messages.
758 void set_replay_channels_callback(
759 std::function<bool(const TimestampedMessage &)> fn) {
760 replay_channels_callback_ = fn;
761 }
762
Austin Schuh79b30942021-01-24 22:32:21 -0800763 // Sets a callback to be called whenever a full message is queued.
764 void set_timestamp_callback(std::function<void(TimestampedMessage *)> fn) {
765 timestamp_callback_ = fn;
766 }
767
Austin Schuhd2f96102020-12-01 20:27:29 -0800768 private:
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700769 // Result of MaybeQueueMatched
770 enum class MatchResult : uint8_t {
771 kEndOfFile, // End of the log file being read
772 kQueued, // Message was queued
773 kSkipped // Message was skipped over
774 };
775
Austin Schuhd2f96102020-12-01 20:27:29 -0800776 // The state for a remote node. This holds the data that needs to be matched
777 // with the remote node's timestamps.
778 struct NodeData {
779 // True if we should save data here. This should be true if any of the
780 // bools in delivered below are true.
781 bool any_delivered = false;
782
Austin Schuh36c00932021-07-19 18:13:21 -0700783 // True if we have a peer and therefore should be saving data for it.
784 bool save_for_peer = false;
785
Austin Schuhd2f96102020-12-01 20:27:29 -0800786 // Peer pointer. This node is only to be considered if a peer is set.
787 TimestampMapper *peer = nullptr;
788
789 struct ChannelData {
790 // Deque per channel. This contains the data from the outside
791 // TimestampMapper node which is relevant for the node this NodeData
792 // points to.
793 std::deque<Message> messages;
794 // Bool tracking per channel if a message is delivered to the node this
795 // NodeData represents.
796 bool delivered = false;
Austin Schuh6a7358f2021-11-18 22:40:40 -0800797 // The TTL for delivery.
798 std::chrono::nanoseconds time_to_live = std::chrono::nanoseconds(0);
Austin Schuhd2f96102020-12-01 20:27:29 -0800799 };
800
801 // Vector with per channel data.
802 std::vector<ChannelData> channels;
803 };
804
805 // Returns (and forgets about) the data for the provided timestamp message
806 // showing when it was delivered to this node.
807 Message MatchingMessageFor(const Message &message);
808
809 // Queues up a single message into our message queue, and any nodes that this
810 // message is delivered to. Returns true if one was available, false
811 // otherwise.
812 bool Queue();
813
Austin Schuh79b30942021-01-24 22:32:21 -0800814 // Queues up a single matched message into our matched message queue. Returns
815 // true if one was queued, and false otherwise.
816 bool QueueMatched();
817
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700818 // Queues a message if the replay_channels_callback is passed and the end of
819 // the log file has not been reached.
820 MatchResult MaybeQueueMatched();
821
Austin Schuhd2f96102020-12-01 20:27:29 -0800822 // Queues up data until we have at least one message >= to time t.
823 // Useful for triggering a remote node to read enough data to have the
824 // timestamp you care about available.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700825 void QueueUnmatchedUntil(BootTimestamp t);
Austin Schuhd2f96102020-12-01 20:27:29 -0800826
Austin Schuh79b30942021-01-24 22:32:21 -0800827 // Queues m into matched_messages_.
828 void QueueMessage(Message *m);
Austin Schuhd2f96102020-12-01 20:27:29 -0800829
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700830 // If a replay_channels_callback was set and the callback returns false, a
831 // matched message is popped and true is returned. Otherwise false is
832 // returned.
833 bool CheckReplayChannelsAndMaybePop(const TimestampedMessage &message);
834
Austin Schuh58646e22021-08-23 23:51:46 -0700835 // Returns the name of the node this class is sorting for.
836 std::string_view node_name() const {
837 return configuration_->has_nodes() ? configuration_->nodes()
838 ->Get(boot_merger_.node())
839 ->name()
840 ->string_view()
841 : "(single node)";
842 }
843
Austin Schuhd2f96102020-12-01 20:27:29 -0800844 // The node merger to source messages from.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700845 BootMerger boot_merger_;
Austin Schuh0ca51f32020-12-25 21:51:45 -0800846
847 std::shared_ptr<const Configuration> configuration_;
848
Austin Schuhd2f96102020-12-01 20:27:29 -0800849 // The buffer of messages for this node. These are not matched with any
850 // remote data.
851 std::deque<Message> messages_;
852 // The node index for the source node for each channel.
853 std::vector<size_t> source_node_;
854
855 // Vector per node. Not all nodes will have anything.
856 std::vector<NodeData> nodes_data_;
857
858 // Latest message to return.
Austin Schuh79b30942021-01-24 22:32:21 -0800859 std::deque<TimestampedMessage> matched_messages_;
Austin Schuhd2f96102020-12-01 20:27:29 -0800860
Austin Schuh79b30942021-01-24 22:32:21 -0800861 // Tracks the state of the first message in matched_messages_. Do we need to
862 // update it, is it valid, or should we return nullptr?
Austin Schuhd2f96102020-12-01 20:27:29 -0800863 enum class FirstMessage {
864 kNeedsUpdate,
865 kInMessage,
866 kNullptr,
867 };
868 FirstMessage first_message_ = FirstMessage::kNeedsUpdate;
869
870 // Timestamp of the last message returned. Used to make sure nothing goes
871 // backwards.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700872 BootTimestamp last_message_time_ = BootTimestamp::min_time();
Austin Schuh6a7358f2021-11-18 22:40:40 -0800873 BootTimestamp last_popped_message_time_ = BootTimestamp::min_time();
Austin Schuhd2f96102020-12-01 20:27:29 -0800874 // Time this node is queued up until. Used for caching.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700875 BootTimestamp queued_until_ = BootTimestamp::min_time();
Austin Schuh79b30942021-01-24 22:32:21 -0800876
877 std::function<void(TimestampedMessage *)> timestamp_callback_;
Eric Schmiedebergb38477e2022-12-02 16:08:04 -0700878 std::function<bool(TimestampedMessage &)> replay_channels_callback_;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800879};
880
Austin Schuhee711052020-08-24 16:06:09 -0700881// Returns the node name with a trailing space, or an empty string if we are on
882// a single node.
883std::string MaybeNodeName(const Node *);
884
Austin Schuh71a40d42023-02-04 21:22:22 -0800885// Class to copy a RemoteMessage into the provided buffer.
886class RemoteMessageCopier : public DataEncoder::Copier {
887 public:
888 RemoteMessageCopier(const message_bridge::RemoteMessage *message,
889 int channel_index,
890 aos::monotonic_clock::time_point monotonic_timestamp_time,
891 EventLoop *event_loop)
892 : DataEncoder::Copier(PackRemoteMessageSize()),
893 message_(message),
894 channel_index_(channel_index),
895 monotonic_timestamp_time_(monotonic_timestamp_time),
896 event_loop_(event_loop) {}
897
898 monotonic_clock::time_point end_time() const { return end_time_; }
899
900 size_t Copy(uint8_t *data, size_t start_byte, size_t end_byte) final {
901 size_t result = PackRemoteMessageInline(data, message_, channel_index_,
902 monotonic_timestamp_time_,
903 start_byte, end_byte);
904 end_time_ = event_loop_->monotonic_now();
905 return result;
906 }
907
908 private:
909 const message_bridge::RemoteMessage *message_;
910 int channel_index_;
911 aos::monotonic_clock::time_point monotonic_timestamp_time_;
912 EventLoop *event_loop_;
913 monotonic_clock::time_point end_time_;
914};
915
916// Class to copy a context into the provided buffer.
917class ContextDataCopier : public DataEncoder::Copier {
918 public:
919 ContextDataCopier(const Context &context, int channel_index, LogType log_type,
920 EventLoop *event_loop)
921 : DataEncoder::Copier(PackMessageSize(log_type, context.size)),
922 context_(context),
923 channel_index_(channel_index),
924 log_type_(log_type),
925 event_loop_(event_loop) {}
926
927 monotonic_clock::time_point end_time() const { return end_time_; }
928
929 size_t Copy(uint8_t *data, size_t start_byte, size_t end_byte) final {
930 size_t result = PackMessageInline(data, context_, channel_index_, log_type_,
931 start_byte, end_byte);
932 end_time_ = event_loop_->monotonic_now();
933 return result;
934 }
935
936 private:
937 const Context &context_;
938 const int channel_index_;
939 const LogType log_type_;
940 EventLoop *event_loop_;
941 monotonic_clock::time_point end_time_;
942};
943
Brian Silvermanf51499a2020-09-21 12:49:08 -0700944} // namespace aos::logger
Austin Schuha36c8902019-12-30 18:07:15 -0800945
946#endif // AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_