blob: 2eeab52f529deb72f24cb015785baeb489f08546 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Austin Schuha36c8902019-12-30 18:07:15 -080010
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070013#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080014#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080015#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "gflags/gflags.h"
18#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080019
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070020#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070021#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070022#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070023#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070024#else
25#define ENABLE_LZMA 0
26#endif
27
28#if ENABLE_LZMA
29#include "aos/events/logging/lzma_encoder.h"
30#endif
31
Austin Schuh7fbf5a72020-09-21 16:28:13 -070032DEFINE_int32(flush_size, 128000,
Austin Schuha36c8902019-12-30 18:07:15 -080033 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070034DEFINE_double(
35 flush_period, 5.0,
36 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080037
Austin Schuha040c3f2021-02-13 16:09:07 -080038DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080039 max_network_delay, 1.0,
40 "Max time to assume a message takes to cross the network before we are "
41 "willing to drop it from our buffers and assume it didn't make it. "
42 "Increasing this number can increase memory usage depending on the packet "
43 "loss of your network or if the timestamps aren't logged for a message.");
44
45DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080046 max_out_of_order, -1,
47 "If set, this overrides the max out of order duration for a log file.");
48
Austin Schuh0e8db662021-07-06 10:43:47 -070049DEFINE_bool(workaround_double_headers, true,
50 "Some old log files have two headers at the beginning. Use the "
51 "last header as the actual header.");
52
Brian Smarttea913d42021-12-10 15:02:38 -080053DEFINE_bool(crash_on_corrupt_message, true,
54 "When true, MessageReader will crash the first time a message "
55 "with corrupted format is found. When false, the crash will be "
56 "suppressed, and any remaining readable messages will be "
57 "evaluated to present verified vs corrupted stats.");
58
59DEFINE_bool(ignore_corrupt_messages, false,
60 "When true, and crash_on_corrupt_message is false, then any "
61 "corrupt message found by MessageReader be silently ignored, "
62 "providing access to all uncorrupted messages in a logfile.");
63
Brian Silvermanf51499a2020-09-21 12:49:08 -070064namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070065namespace {
Austin Schuha36c8902019-12-30 18:07:15 -080066
Austin Schuh05b70472020-01-01 17:11:17 -080067namespace chrono = std::chrono;
68
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070069template <typename T>
70void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
71 if (t.has_value()) {
72 *os << *t;
73 } else {
74 *os << "null";
75 }
76}
77} // namespace
78
Brian Silvermanf51499a2020-09-21 12:49:08 -070079DetachedBufferWriter::DetachedBufferWriter(
80 std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
81 : filename_(filename), encoder_(std::move(encoder)) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -070082 if (!util::MkdirPIfSpace(filename, 0777)) {
83 ran_out_of_space_ = true;
84 } else {
85 fd_ = open(std::string(filename).c_str(),
86 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
87 if (fd_ == -1 && errno == ENOSPC) {
88 ran_out_of_space_ = true;
89 } else {
Austin Schuh58646e22021-08-23 23:51:46 -070090 PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
91 << " for writing";
92 VLOG(1) << "Opened " << this->filename() << " for writing";
Brian Silvermana9f2ec92020-10-06 18:00:53 -070093 }
94 }
Austin Schuha36c8902019-12-30 18:07:15 -080095}
96
97DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -070098 Close();
99 if (ran_out_of_space_) {
100 CHECK(acknowledge_ran_out_of_space_)
101 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -0700102 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700103}
104
Brian Silvermand90905f2020-09-23 14:42:56 -0700105DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700106 *this = std::move(other);
107}
108
Brian Silverman87ac0402020-09-17 14:47:01 -0700109// When other is destroyed "soon" (which it should be because we're getting an
110// rvalue reference to it), it will flush etc all the data we have queued up
111// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700112DetachedBufferWriter &DetachedBufferWriter::operator=(
113 DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700114 std::swap(filename_, other.filename_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700115 std::swap(encoder_, other.encoder_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700116 std::swap(fd_, other.fd_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700117 std::swap(ran_out_of_space_, other.ran_out_of_space_);
118 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700119 std::swap(iovec_, other.iovec_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700120 std::swap(max_write_time_, other.max_write_time_);
121 std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
122 std::swap(max_write_time_messages_, other.max_write_time_messages_);
123 std::swap(total_write_time_, other.total_write_time_);
124 std::swap(total_write_count_, other.total_write_count_);
125 std::swap(total_write_messages_, other.total_write_messages_);
126 std::swap(total_write_bytes_, other.total_write_bytes_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700127 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800128}
129
Brian Silvermanf51499a2020-09-21 12:49:08 -0700130void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700131 if (ran_out_of_space_) {
132 // We don't want any later data to be written after space becomes
133 // available, so refuse to write anything more once we've dropped data
134 // because we ran out of space.
135 VLOG(1) << "Ignoring span: " << span.size();
136 return;
137 }
138
Austin Schuhbd06ae42021-03-31 22:48:21 -0700139 aos::monotonic_clock::time_point now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700140 if (encoder_->may_bypass() && span.size() > 4096u) {
141 // Over this threshold, we'll assume it's cheaper to add an extra
142 // syscall to write the data immediately instead of copying it to
143 // enqueue.
Austin Schuha36c8902019-12-30 18:07:15 -0800144
Brian Silvermanf51499a2020-09-21 12:49:08 -0700145 // First, flush everything.
146 while (encoder_->queue_size() > 0u) {
147 Flush();
148 }
Austin Schuhde031b72020-01-10 19:34:41 -0800149
Brian Silvermanf51499a2020-09-21 12:49:08 -0700150 // Then, write it directly.
151 const auto start = aos::monotonic_clock::now();
152 const ssize_t written = write(fd_, span.data(), span.size());
153 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700154 HandleWriteReturn(written, span.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700155 UpdateStatsForWrite(end - start, written, 1);
Austin Schuhbd06ae42021-03-31 22:48:21 -0700156 now = end;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700157 } else {
158 encoder_->Encode(CopySpanAsDetachedBuffer(span));
Austin Schuhbd06ae42021-03-31 22:48:21 -0700159 now = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800160 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700161
Austin Schuhbd06ae42021-03-31 22:48:21 -0700162 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800163}
164
Brian Silverman0465fcf2020-09-24 00:29:18 -0700165void DetachedBufferWriter::Close() {
166 if (fd_ == -1) {
167 return;
168 }
169 encoder_->Finish();
170 while (encoder_->queue_size() > 0) {
171 Flush();
172 }
173 if (close(fd_) == -1) {
174 if (errno == ENOSPC) {
175 ran_out_of_space_ = true;
176 } else {
177 PLOG(ERROR) << "Closing log file failed";
178 }
179 }
180 fd_ = -1;
Austin Schuh58646e22021-08-23 23:51:46 -0700181 VLOG(1) << "Closed " << filename();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700182}
183
Austin Schuha36c8902019-12-30 18:07:15 -0800184void DetachedBufferWriter::Flush() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700185 if (ran_out_of_space_) {
186 // We don't want any later data to be written after space becomes available,
187 // so refuse to write anything more once we've dropped data because we ran
188 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700189 if (encoder_) {
190 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
191 encoder_->Clear(encoder_->queue().size());
192 } else {
193 VLOG(1) << "No queue to ignore";
194 }
195 return;
196 }
197
198 const auto queue = encoder_->queue();
199 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700200 return;
201 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700202
Austin Schuha36c8902019-12-30 18:07:15 -0800203 iovec_.clear();
Brian Silvermanf51499a2020-09-21 12:49:08 -0700204 const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
205 iovec_.resize(iovec_size);
Austin Schuha36c8902019-12-30 18:07:15 -0800206 size_t counted_size = 0;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700207 for (size_t i = 0; i < iovec_size; ++i) {
208 iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
209 iovec_[i].iov_len = queue[i].size();
210 counted_size += iovec_[i].iov_len;
Austin Schuha36c8902019-12-30 18:07:15 -0800211 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700212
213 const auto start = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800214 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700215 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700216 HandleWriteReturn(written, counted_size);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700217
218 encoder_->Clear(iovec_size);
219
220 UpdateStatsForWrite(end - start, written, iovec_size);
221}
222
Brian Silverman0465fcf2020-09-24 00:29:18 -0700223void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
224 size_t write_size) {
225 if (write_return == -1 && errno == ENOSPC) {
226 ran_out_of_space_ = true;
227 return;
228 }
229 PCHECK(write_return >= 0) << ": write failed";
230 if (write_return < static_cast<ssize_t>(write_size)) {
231 // Sometimes this happens instead of ENOSPC. On a real filesystem, this
232 // never seems to happen in any other case. If we ever want to log to a
233 // socket, this will happen more often. However, until we get there, we'll
234 // just assume it means we ran out of space.
235 ran_out_of_space_ = true;
236 return;
237 }
238}
239
Brian Silvermanf51499a2020-09-21 12:49:08 -0700240void DetachedBufferWriter::UpdateStatsForWrite(
241 aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
242 if (duration > max_write_time_) {
243 max_write_time_ = duration;
244 max_write_time_bytes_ = written;
245 max_write_time_messages_ = iovec_size;
246 }
247 total_write_time_ += duration;
248 ++total_write_count_;
249 total_write_messages_ += iovec_size;
250 total_write_bytes_ += written;
251}
252
Austin Schuhbd06ae42021-03-31 22:48:21 -0700253void DetachedBufferWriter::FlushAtThreshold(
254 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700255 if (ran_out_of_space_) {
256 // We don't want any later data to be written after space becomes available,
257 // so refuse to write anything more once we've dropped data because we ran
258 // out of space.
259 if (encoder_) {
260 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
261 encoder_->Clear(encoder_->queue().size());
262 } else {
263 VLOG(1) << "No queue to ignore";
264 }
265 return;
266 }
267
Austin Schuhbd06ae42021-03-31 22:48:21 -0700268 // We don't want to flush the first time through. Otherwise we will flush as
269 // the log file header might be compressing, defeating any parallelism and
270 // queueing there.
271 if (last_flush_time_ == aos::monotonic_clock::min_time) {
272 last_flush_time_ = now;
273 }
274
Brian Silvermanf51499a2020-09-21 12:49:08 -0700275 // Flush if we are at the max number of iovs per writev, because there's no
276 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700277 // data queued up or if it has been long enough.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700278 while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700279 encoder_->queue_size() >= IOV_MAX ||
280 now > last_flush_time_ +
281 chrono::duration_cast<chrono::nanoseconds>(
282 chrono::duration<double>(FLAGS_flush_period))) {
283 last_flush_time_ = now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700284 Flush();
285 }
Austin Schuha36c8902019-12-30 18:07:15 -0800286}
287
288flatbuffers::Offset<MessageHeader> PackMessage(
289 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
290 int channel_index, LogType log_type) {
291 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
292
293 switch (log_type) {
294 case LogType::kLogMessage:
295 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800296 case LogType::kLogRemoteMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700297 // Since the timestamps are 8 byte aligned, we are going to end up adding
298 // padding in the middle of the message to pad everything out to 8 byte
299 // alignment. That's rather wasteful. To make things efficient to mmap
300 // while reading uncompressed logs, we'd actually rather the message be
301 // aligned. So, force 8 byte alignment (enough to preserve alignment
302 // inside the nested message so that we can read it without moving it)
303 // here.
304 fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8);
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700305 data_offset = fbb->CreateVector(
306 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800307 break;
308
309 case LogType::kLogDeliveryTimeOnly:
310 break;
311 }
312
313 MessageHeader::Builder message_header_builder(*fbb);
314 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800315
Austin Schuhfa30c352022-10-16 11:12:02 -0700316 // These are split out into very explicit serialization calls because the
317 // order here changes the order things are written out on the wire, and we
318 // want to control and understand it here. Changing the order can increase
319 // the amount of padding bytes in the middle.
320 //
321 // It is also easier to follow... And doesn't actually make things much bigger.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800322 switch (log_type) {
323 case LogType::kLogRemoteMessage:
324 message_header_builder.add_queue_index(context.remote_queue_index);
Austin Schuhfa30c352022-10-16 11:12:02 -0700325 message_header_builder.add_data(data_offset);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800326 message_header_builder.add_monotonic_sent_time(
327 context.monotonic_remote_time.time_since_epoch().count());
328 message_header_builder.add_realtime_sent_time(
329 context.realtime_remote_time.time_since_epoch().count());
330 break;
331
Austin Schuh6f3babe2020-01-26 20:34:50 -0800332 case LogType::kLogDeliveryTimeOnly:
333 message_header_builder.add_queue_index(context.queue_index);
334 message_header_builder.add_monotonic_sent_time(
335 context.monotonic_event_time.time_since_epoch().count());
336 message_header_builder.add_realtime_sent_time(
337 context.realtime_event_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800338 message_header_builder.add_monotonic_remote_time(
339 context.monotonic_remote_time.time_since_epoch().count());
340 message_header_builder.add_realtime_remote_time(
341 context.realtime_remote_time.time_since_epoch().count());
342 message_header_builder.add_remote_queue_index(context.remote_queue_index);
343 break;
Austin Schuhfa30c352022-10-16 11:12:02 -0700344
345 case LogType::kLogMessage:
346 message_header_builder.add_queue_index(context.queue_index);
347 message_header_builder.add_data(data_offset);
348 message_header_builder.add_monotonic_sent_time(
349 context.monotonic_event_time.time_since_epoch().count());
350 message_header_builder.add_realtime_sent_time(
351 context.realtime_event_time.time_since_epoch().count());
352 break;
353
354 case LogType::kLogMessageAndDeliveryTime:
355 message_header_builder.add_queue_index(context.queue_index);
356 message_header_builder.add_remote_queue_index(context.remote_queue_index);
357 message_header_builder.add_monotonic_sent_time(
358 context.monotonic_event_time.time_since_epoch().count());
359 message_header_builder.add_realtime_sent_time(
360 context.realtime_event_time.time_since_epoch().count());
361 message_header_builder.add_monotonic_remote_time(
362 context.monotonic_remote_time.time_since_epoch().count());
363 message_header_builder.add_realtime_remote_time(
364 context.realtime_remote_time.time_since_epoch().count());
365 message_header_builder.add_data(data_offset);
366 break;
Austin Schuha36c8902019-12-30 18:07:15 -0800367 }
368
369 return message_header_builder.Finish();
370}
371
Austin Schuhfa30c352022-10-16 11:12:02 -0700372flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) {
373 switch (log_type) {
374 case LogType::kLogMessage:
375 return
376 // Root table size + offset.
377 sizeof(flatbuffers::uoffset_t) * 2 +
378 // 6 padding bytes to pad the header out properly.
379 6 +
380 // vtable header (size + size of table)
381 sizeof(flatbuffers::voffset_t) * 2 +
382 // offsets to all the fields.
383 sizeof(flatbuffers::voffset_t) * 5 +
384 // pointer to vtable
385 sizeof(flatbuffers::soffset_t) +
386 // pointer to data
387 sizeof(flatbuffers::uoffset_t) +
388 // realtime_sent_time, monotonic_sent_time
389 sizeof(int64_t) * 2 +
390 // queue_index, channel_index
391 sizeof(uint32_t) * 2;
392
393 case LogType::kLogDeliveryTimeOnly:
394 return
395 // Root table size + offset.
396 sizeof(flatbuffers::uoffset_t) * 2 +
397 // 6 padding bytes to pad the header out properly.
398 4 +
399 // vtable header (size + size of table)
400 sizeof(flatbuffers::voffset_t) * 2 +
401 // offsets to all the fields.
402 sizeof(flatbuffers::voffset_t) * 8 +
403 // pointer to vtable
404 sizeof(flatbuffers::soffset_t) +
405 // remote_queue_index
406 sizeof(uint32_t) +
407 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
408 // monotonic_sent_time
409 sizeof(int64_t) * 4 +
410 // queue_index, channel_index
411 sizeof(uint32_t) * 2;
412
413 case LogType::kLogMessageAndDeliveryTime:
414 return
415 // Root table size + offset.
416 sizeof(flatbuffers::uoffset_t) * 2 +
417 // 4 padding bytes to pad the header out properly.
418 4 +
419 // vtable header (size + size of table)
420 sizeof(flatbuffers::voffset_t) * 2 +
421 // offsets to all the fields.
422 sizeof(flatbuffers::voffset_t) * 8 +
423 // pointer to vtable
424 sizeof(flatbuffers::soffset_t) +
425 // pointer to data
426 sizeof(flatbuffers::uoffset_t) +
427 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
428 // monotonic_sent_time
429 sizeof(int64_t) * 4 +
430 // remote_queue_index, queue_index, channel_index
431 sizeof(uint32_t) * 3;
432
433 case LogType::kLogRemoteMessage:
434 return
435 // Root table size + offset.
436 sizeof(flatbuffers::uoffset_t) * 2 +
437 // 6 padding bytes to pad the header out properly.
438 6 +
439 // vtable header (size + size of table)
440 sizeof(flatbuffers::voffset_t) * 2 +
441 // offsets to all the fields.
442 sizeof(flatbuffers::voffset_t) * 5 +
443 // pointer to vtable
444 sizeof(flatbuffers::soffset_t) +
445 // realtime_sent_time, monotonic_sent_time
446 sizeof(int64_t) * 2 +
447 // pointer to data
448 sizeof(flatbuffers::uoffset_t) +
449 // queue_index, channel_index
450 sizeof(uint32_t) * 2;
451 }
452 LOG(FATAL);
453}
454
455flatbuffers::uoffset_t PackMessageSize(LogType log_type,
456 const Context &context) {
457 static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
458 "Update size logic please.");
459 const flatbuffers::uoffset_t aligned_data_length =
460 ((context.size + 7) & 0xfffffff8u);
461 switch (log_type) {
462 case LogType::kLogDeliveryTimeOnly:
463 return PackMessageHeaderSize(log_type);
464
465 case LogType::kLogMessage:
466 case LogType::kLogMessageAndDeliveryTime:
467 case LogType::kLogRemoteMessage:
468 return PackMessageHeaderSize(log_type) +
469 // Vector...
470 sizeof(flatbuffers::uoffset_t) + aligned_data_length;
471 }
472 LOG(FATAL);
473}
474
475// Do the magic dance to convert the endianness of the data and append it to the
476// buffer.
477namespace {
478
479// TODO(austin): Look at the generated code to see if building the header is
480// efficient or not.
481template <typename T>
482uint8_t *Push(uint8_t *buffer, const T data) {
483 const T endian_data = flatbuffers::EndianScalar<T>(data);
484 std::memcpy(buffer, &endian_data, sizeof(T));
485 return buffer + sizeof(T);
486}
487
488uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
489 std::memcpy(buffer, data, size);
490 return buffer + size;
491}
492
493uint8_t *Pad(uint8_t *buffer, size_t padding) {
494 std::memset(buffer, 0, padding);
495 return buffer + padding;
496}
497} // namespace
498
499size_t PackMessageInline(uint8_t *buffer, const Context &context,
500 int channel_index, LogType log_type) {
501 const flatbuffers::uoffset_t message_size =
502 PackMessageSize(log_type, context);
503
504 buffer = Push<flatbuffers::uoffset_t>(
505 buffer, message_size - sizeof(flatbuffers::uoffset_t));
506
507 // Pack all the data in. This is brittle but easy to change. Use the
508 // InlinePackMessage.Equivilent unit test to verify everything matches.
509 switch (log_type) {
510 case LogType::kLogMessage:
511 // clang-format off
512 // header:
513 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
514 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
515 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
516 //
517 // padding:
518 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
519 buffer = Pad(buffer, 6);
520 //
521 // vtable (aos.logger.MessageHeader):
522 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
523 buffer = Push<flatbuffers::voffset_t>(buffer, 0xe);
524 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
525 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
526 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
527 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
528 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
529 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
530 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
531 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
532 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
533 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
534 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
535 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
536 //
537 // root_table (aos.logger.MessageHeader):
538 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
539 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e);
540 // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long)
541 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
542 // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long)
543 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
544 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
545 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c);
546 // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt)
547 buffer = Push<uint32_t>(buffer, context.queue_index);
548 // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt)
549 buffer = Push<uint32_t>(buffer, channel_index);
550 //
551 // vector (aos.logger.MessageHeader.data):
552 // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items)
553 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
554 // +0x40 | FF | uint8_t | 0xFF (255) | value[0]
555 // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1]
556 // +0x42 | EE | uint8_t | 0xEE (238) | value[2]
557 // +0x43 | 00 | uint8_t | 0x00 (0) | value[3]
558 // +0x44 | 20 | uint8_t | 0x20 (32) | value[4]
559 // +0x45 | 4D | uint8_t | 0x4D (77) | value[5]
560 // +0x46 | FF | uint8_t | 0xFF (255) | value[6]
561 // +0x47 | 25 | uint8_t | 0x25 (37) | value[7]
562 // +0x48 | 3C | uint8_t | 0x3C (60) | value[8]
563 // +0x49 | 17 | uint8_t | 0x17 (23) | value[9]
564 // +0x4A | 65 | uint8_t | 0x65 (101) | value[10]
565 // +0x4B | 2F | uint8_t | 0x2F (47) | value[11]
566 // +0x4C | 63 | uint8_t | 0x63 (99) | value[12]
567 // +0x4D | 58 | uint8_t | 0x58 (88) | value[13]
568 buffer = PushBytes(buffer, context.data, context.size);
569 //
570 // padding:
571 // +0x4E | 00 00 | uint8_t[2] | .. | padding
572 buffer = Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
573 // clang-format on
574 break;
575
576 case LogType::kLogDeliveryTimeOnly:
577 // clang-format off
578 // header:
579 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
580 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
581 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
582 //
583 // padding:
584 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
585 buffer = Pad(buffer, 4);
586 //
587 // vtable (aos.logger.MessageHeader):
588 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
589 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
590 // +0x0E | 30 00 | uint16_t | 0x0030 (48) | size of referring table
591 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
592 // +0x10 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `channel_index` (id: 0)
593 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
594 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
595 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
596 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
597 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
598 // +0x16 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `queue_index` (id: 3)
599 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
600 // +0x18 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
601 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
602 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
603 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
604 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
605 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
606 // +0x1E | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
607 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
608 //
609 // root_table (aos.logger.MessageHeader):
610 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
611 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
612 // +0x24 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `remote_queue_index` (UInt)
613 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
614 // +0x28 | C6 85 F1 AB 83 B5 CD EB | int64_t | 0xEBCDB583ABF185C6 (-1455307527440726586) | table field `realtime_remote_time` (Long)
615 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
616 // +0x30 | 47 24 D3 97 1E 42 2D 99 | int64_t | 0x992D421E97D32447 (-7409193112790948793) | table field `monotonic_remote_time` (Long)
617 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
618 // +0x38 | C8 B9 A7 AB 79 F2 CD 60 | int64_t | 0x60CDF279ABA7B9C8 (6975498002251626952) | table field `realtime_sent_time` (Long)
619 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
620 // +0x40 | EA 8F 2A 0F AF 01 7A AB | int64_t | 0xAB7A01AF0F2A8FEA (-6090553694679822358) | table field `monotonic_sent_time` (Long)
621 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
622 // +0x48 | F5 00 00 00 | uint32_t | 0x000000F5 (245) | table field `queue_index` (UInt)
623 buffer = Push<uint32_t>(buffer, context.queue_index);
624 // +0x4C | 88 00 00 00 | uint32_t | 0x00000088 (136) | table field `channel_index` (UInt)
625 buffer = Push<uint32_t>(buffer, channel_index);
626
627 // clang-format on
628 break;
629
630 case LogType::kLogMessageAndDeliveryTime:
631 // clang-format off
632 // header:
633 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
634 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
635 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
636 //
637 // padding:
638 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
639 buffer = Pad(buffer, 4);
640 //
641 // vtable (aos.logger.MessageHeader):
642 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
643 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
644 // +0x0E | 34 00 | uint16_t | 0x0034 (52) | size of referring table
645 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
646 // +0x10 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `channel_index` (id: 0)
647 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
648 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
649 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
650 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
651 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
652 // +0x16 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `queue_index` (id: 3)
653 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
654 // +0x18 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `data` (id: 4)
655 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
656 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
657 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
658 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
659 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
660 // +0x1E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `remote_queue_index` (id: 7)
661 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
662 //
663 // root_table (aos.logger.MessageHeader):
664 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
665 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
666 // +0x24 | 30 00 00 00 | UOffset32 | 0x00000030 (48) Loc: +0x54 | offset to field `data` (vector)
667 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x30);
668 // +0x28 | C4 C8 87 BF 40 6C 1F 29 | int64_t | 0x291F6C40BF87C8C4 (2963206105180129476) | table field `realtime_remote_time` (Long)
669 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
670 // +0x30 | 0F 00 26 FD D2 6D C0 1F | int64_t | 0x1FC06DD2FD26000F (2287949363661897743) | table field `monotonic_remote_time` (Long)
671 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
672 // +0x38 | 29 75 09 C0 73 73 BF 88 | int64_t | 0x88BF7373C0097529 (-8593022623019338455) | table field `realtime_sent_time` (Long)
673 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
674 // +0x40 | 6D 8A AE 04 50 25 9C E9 | int64_t | 0xE99C255004AE8A6D (-1613373540899321235) | table field `monotonic_sent_time` (Long)
675 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
676 // +0x48 | 47 00 00 00 | uint32_t | 0x00000047 (71) | table field `remote_queue_index` (UInt)
677 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
678 // +0x4C | 4C 00 00 00 | uint32_t | 0x0000004C (76) | table field `queue_index` (UInt)
679 buffer = Push<uint32_t>(buffer, context.queue_index);
680 // +0x50 | 72 00 00 00 | uint32_t | 0x00000072 (114) | table field `channel_index` (UInt)
681 buffer = Push<uint32_t>(buffer, channel_index);
682 //
683 // vector (aos.logger.MessageHeader.data):
684 // +0x54 | 07 00 00 00 | uint32_t | 0x00000007 (7) | length of vector (# items)
685 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
686 // +0x58 | B1 | uint8_t | 0xB1 (177) | value[0]
687 // +0x59 | 4A | uint8_t | 0x4A (74) | value[1]
688 // +0x5A | 50 | uint8_t | 0x50 (80) | value[2]
689 // +0x5B | 24 | uint8_t | 0x24 (36) | value[3]
690 // +0x5C | AF | uint8_t | 0xAF (175) | value[4]
691 // +0x5D | C8 | uint8_t | 0xC8 (200) | value[5]
692 // +0x5E | D5 | uint8_t | 0xD5 (213) | value[6]
693 buffer = PushBytes(buffer, context.data, context.size);
694 //
695 // padding:
696 // +0x5F | 00 | uint8_t[1] | . | padding
697 buffer = Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
698 // clang-format on
699
700 break;
701
702 case LogType::kLogRemoteMessage:
703 // This is the message we need to recreate.
704 //
705 // clang-format off
706 // header:
707 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
708 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
709 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
710 //
711 // padding:
712 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
713 buffer = Pad(buffer, 6);
714 //
715 // vtable (aos.logger.MessageHeader):
716 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
717 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e);
718 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
719 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
720 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
721 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
722 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
723 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
724 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
725 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
726 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
727 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
728 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
729 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
730 //
731 // root_table (aos.logger.MessageHeader):
732 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
733 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E);
734 // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long)
735 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
736 // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long)
737 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
738 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
739 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C);
740 // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt)
741 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
742 // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt)
743 buffer = Push<uint32_t>(buffer, channel_index);
744 //
745 // vector (aos.logger.MessageHeader.data):
746 // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items)
747 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
748 // +0x40 | 38 | uint8_t | 0x38 (56) | value[0]
749 // +0x41 | 1A | uint8_t | 0x1A (26) | value[1]
750 // ...
751 // +0x58 | 90 | uint8_t | 0x90 (144) | value[24]
752 // +0x59 | 92 | uint8_t | 0x92 (146) | value[25]
753 buffer = PushBytes(buffer, context.data, context.size);
754 //
755 // padding:
756 // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
757 buffer = Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
758 // clang-format on
759 }
760
761 return message_size;
762}
763
Austin Schuhcd368422021-11-22 21:23:29 -0800764SpanReader::SpanReader(std::string_view filename, bool quiet)
765 : filename_(filename) {
Tyler Chatow2015bc62021-08-04 21:15:09 -0700766 decoder_ = std::make_unique<DummyDecoder>(filename);
767
768 static constexpr std::string_view kXz = ".xz";
James Kuszmauldd0a5042021-10-28 23:38:04 -0700769 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700770 if (filename.substr(filename.size() - kXz.size()) == kXz) {
771#if ENABLE_LZMA
Austin Schuhcd368422021-11-22 21:23:29 -0800772 decoder_ =
773 std::make_unique<ThreadedLzmaDecoder>(std::move(decoder_), quiet);
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700774#else
Austin Schuhcd368422021-11-22 21:23:29 -0800775 (void)quiet;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700776 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
777#endif
James Kuszmauldd0a5042021-10-28 23:38:04 -0700778 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
779 decoder_ = std::make_unique<SnappyDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700780 }
Austin Schuh05b70472020-01-01 17:11:17 -0800781}
782
Austin Schuhcf5f6442021-07-06 10:43:28 -0700783absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800784 // Make sure we have enough for the size.
785 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
786 if (!ReadBlock()) {
787 return absl::Span<const uint8_t>();
788 }
789 }
790
791 // Now make sure we have enough for the message.
792 const size_t data_size =
793 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
794 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -0800795 if (data_size == sizeof(flatbuffers::uoffset_t)) {
796 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
797 LOG(ERROR) << " Rest of log file is "
798 << absl::BytesToHexString(std::string_view(
799 reinterpret_cast<const char *>(data_.data() +
800 consumed_data_),
801 data_.size() - consumed_data_));
802 return absl::Span<const uint8_t>();
803 }
Austin Schuh05b70472020-01-01 17:11:17 -0800804 while (data_.size() < consumed_data_ + data_size) {
805 if (!ReadBlock()) {
806 return absl::Span<const uint8_t>();
807 }
808 }
809
810 // And return it, consuming the data.
811 const uint8_t *data_ptr = data_.data() + consumed_data_;
812
Austin Schuh05b70472020-01-01 17:11:17 -0800813 return absl::Span<const uint8_t>(data_ptr, data_size);
814}
815
Austin Schuhcf5f6442021-07-06 10:43:28 -0700816void SpanReader::ConsumeMessage() {
Brian Smarttea913d42021-12-10 15:02:38 -0800817 size_t consumed_size =
Austin Schuhcf5f6442021-07-06 10:43:28 -0700818 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
819 sizeof(flatbuffers::uoffset_t);
Brian Smarttea913d42021-12-10 15:02:38 -0800820 consumed_data_ += consumed_size;
821 total_consumed_ += consumed_size;
Austin Schuhcf5f6442021-07-06 10:43:28 -0700822}
823
824absl::Span<const uint8_t> SpanReader::ReadMessage() {
825 absl::Span<const uint8_t> result = PeekMessage();
826 if (result != absl::Span<const uint8_t>()) {
827 ConsumeMessage();
Brian Smarttea913d42021-12-10 15:02:38 -0800828 } else {
829 is_finished_ = true;
Austin Schuhcf5f6442021-07-06 10:43:28 -0700830 }
831 return result;
832}
833
Austin Schuh05b70472020-01-01 17:11:17 -0800834bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700835 // This is the amount of data we grab at a time. Doing larger chunks minimizes
836 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -0800837 constexpr size_t kReadSize = 256 * 1024;
838
839 // Strip off any unused data at the front.
840 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700841 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -0800842 consumed_data_ = 0;
843 }
844
845 const size_t starting_size = data_.size();
846
847 // This should automatically grow the backing store. It won't shrink if we
848 // get a small chunk later. This reduces allocations when we want to append
849 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700850 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -0800851
Brian Silvermanf51499a2020-09-21 12:49:08 -0700852 const size_t count =
853 decoder_->Read(data_.begin() + starting_size, data_.end());
854 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -0800855 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -0800856 return false;
857 }
Austin Schuh05b70472020-01-01 17:11:17 -0800858
Brian Smarttea913d42021-12-10 15:02:38 -0800859 total_read_ += count;
860
Austin Schuh05b70472020-01-01 17:11:17 -0800861 return true;
862}
863
Austin Schuhadd6eb32020-11-09 21:24:26 -0800864std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -0700865 SpanReader *span_reader) {
866 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800867
868 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800869 if (config_data == absl::Span<const uint8_t>()) {
870 return std::nullopt;
871 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800872
Austin Schuh5212cad2020-09-09 23:12:09 -0700873 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700874 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -0800875 if (!result.Verify()) {
876 return std::nullopt;
877 }
Austin Schuh0e8db662021-07-06 10:43:47 -0700878
879 if (FLAGS_workaround_double_headers) {
880 while (true) {
881 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
882 if (maybe_header_data == absl::Span<const uint8_t>()) {
883 break;
884 }
885
886 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
887 maybe_header_data);
888 if (maybe_header.Verify()) {
889 LOG(WARNING) << "Found duplicate LogFileHeader in "
890 << span_reader->filename();
891 ResizeableBuffer header_data_copy;
892 header_data_copy.resize(maybe_header_data.size());
893 memcpy(header_data_copy.data(), maybe_header_data.begin(),
894 header_data_copy.size());
895 result = SizePrefixedFlatbufferVector<LogFileHeader>(
896 std::move(header_data_copy));
897
898 span_reader->ConsumeMessage();
899 } else {
900 break;
901 }
902 }
903 }
Austin Schuhe09beb12020-12-11 20:04:27 -0800904 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800905}
906
Austin Schuh0e8db662021-07-06 10:43:47 -0700907std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
908 std::string_view filename) {
909 SpanReader span_reader(filename);
910 return ReadHeader(&span_reader);
911}
912
Austin Schuhadd6eb32020-11-09 21:24:26 -0800913std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -0800914 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -0700915 SpanReader span_reader(filename);
916 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
917 for (size_t i = 0; i < n + 1; ++i) {
918 data_span = span_reader.ReadMessage();
919
920 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800921 if (data_span == absl::Span<const uint8_t>()) {
922 return std::nullopt;
923 }
Austin Schuh5212cad2020-09-09 23:12:09 -0700924 }
925
Brian Silverman354697a2020-09-22 21:06:32 -0700926 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700927 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -0800928 if (!result.Verify()) {
929 return std::nullopt;
930 }
931 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -0700932}
933
Austin Schuh05b70472020-01-01 17:11:17 -0800934MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -0700935 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -0800936 raw_log_file_header_(
937 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -0800938 set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
939 set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
940
Austin Schuh0e8db662021-07-06 10:43:47 -0700941 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
942 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -0800943
944 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -0700945 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -0800946
Austin Schuh0e8db662021-07-06 10:43:47 -0700947 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -0800948
Austin Schuh5b728b72021-06-16 14:57:15 -0700949 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
950
Brian Smarttea913d42021-12-10 15:02:38 -0800951 total_verified_before_ = span_reader_.TotalConsumed();
952
Austin Schuhcde938c2020-02-02 17:30:07 -0800953 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -0800954 FLAGS_max_out_of_order > 0
955 ? chrono::duration_cast<chrono::nanoseconds>(
956 chrono::duration<double>(FLAGS_max_out_of_order))
957 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -0800958
959 VLOG(1) << "Opened " << filename << " as node "
960 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -0800961}
962
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700963std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800964 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
965 if (msg_data == absl::Span<const uint8_t>()) {
Brian Smarttea913d42021-12-10 15:02:38 -0800966 if (is_corrupted()) {
967 LOG(ERROR) << "Total corrupted volumes: before = "
968 << total_verified_before_
969 << " | corrupted = " << total_corrupted_
970 << " | during = " << total_verified_during_
971 << " | after = " << total_verified_after_ << std::endl;
972 }
973
974 if (span_reader_.IsIncomplete()) {
Austin Schuh60e77942022-05-16 17:48:24 -0700975 LOG(ERROR) << "Unable to access some messages in " << filename() << " : "
976 << span_reader_.TotalRead() << " bytes read, "
Brian Smarttea913d42021-12-10 15:02:38 -0800977 << span_reader_.TotalConsumed() << " bytes usable."
978 << std::endl;
979 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700980 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -0800981 }
982
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700983 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
Brian Smarttea913d42021-12-10 15:02:38 -0800984
985 if (crash_on_corrupt_message_flag_) {
986 CHECK(msg.Verify()) << "Corrupted message at offset "
Austin Schuh60e77942022-05-16 17:48:24 -0700987 << total_verified_before_ << " found within "
988 << filename()
Brian Smarttea913d42021-12-10 15:02:38 -0800989 << "; set --nocrash_on_corrupt_message to see summary;"
990 << " also set --ignore_corrupt_messages to process"
991 << " anyway";
992
993 } else if (!msg.Verify()) {
Austin Schuh60e77942022-05-16 17:48:24 -0700994 LOG(ERROR) << "Corrupted message at offset " << total_verified_before_
Brian Smarttea913d42021-12-10 15:02:38 -0800995 << " from " << filename() << std::endl;
996
997 total_corrupted_ += msg_data.size();
998
999 while (true) {
1000 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1001
1002 if (msg_data == absl::Span<const uint8_t>()) {
1003 if (!ignore_corrupt_messages_flag_) {
1004 LOG(ERROR) << "Total corrupted volumes: before = "
1005 << total_verified_before_
1006 << " | corrupted = " << total_corrupted_
1007 << " | during = " << total_verified_during_
1008 << " | after = " << total_verified_after_ << std::endl;
1009
1010 if (span_reader_.IsIncomplete()) {
1011 LOG(ERROR) << "Unable to access some messages in " << filename()
1012 << " : " << span_reader_.TotalRead() << " bytes read, "
1013 << span_reader_.TotalConsumed() << " bytes usable."
1014 << std::endl;
1015 }
1016 return nullptr;
1017 }
1018 break;
1019 }
1020
1021 SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
1022
1023 if (!next_msg.Verify()) {
1024 total_corrupted_ += msg_data.size();
1025 total_verified_during_ += total_verified_after_;
1026 total_verified_after_ = 0;
1027
1028 } else {
1029 total_verified_after_ += msg_data.size();
1030 if (ignore_corrupt_messages_flag_) {
1031 msg = next_msg;
1032 break;
1033 }
1034 }
1035 }
1036 }
1037
1038 if (is_corrupted()) {
1039 total_verified_after_ += msg_data.size();
1040 } else {
1041 total_verified_before_ += msg_data.size();
1042 }
Austin Schuh05b70472020-01-01 17:11:17 -08001043
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001044 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -07001045
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001046 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001047
1048 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -08001049
1050 if (VLOG_IS_ON(3)) {
1051 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
1052 } else if (VLOG_IS_ON(2)) {
1053 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
1054 msg_copy.mutable_message()->clear_data();
1055 VLOG(2) << "Read from " << filename() << " data "
1056 << FlatbufferToJson(msg_copy);
1057 }
1058
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001059 return result;
1060}
1061
1062std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
1063 const MessageHeader &message) {
1064 const size_t data_size = message.has_data() ? message.data()->size() : 0;
1065
1066 UnpackedMessageHeader *const unpacked_message =
1067 reinterpret_cast<UnpackedMessageHeader *>(
1068 malloc(sizeof(UnpackedMessageHeader) + data_size +
1069 kChannelDataAlignment - 1));
1070
1071 CHECK(message.has_channel_index());
1072 CHECK(message.has_monotonic_sent_time());
1073
1074 absl::Span<uint8_t> span;
1075 if (data_size > 0) {
1076 span =
1077 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
1078 &unpacked_message->actual_data[0], data_size)),
1079 data_size);
1080 }
1081
Austin Schuh826e6ce2021-11-18 20:33:10 -08001082 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001083 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001084 monotonic_remote_time = aos::monotonic_clock::time_point(
1085 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001086 }
1087 std::optional<realtime_clock::time_point> realtime_remote_time;
1088 if (message.has_realtime_remote_time()) {
1089 realtime_remote_time = realtime_clock::time_point(
1090 chrono::nanoseconds(message.realtime_remote_time()));
1091 }
1092
1093 std::optional<uint32_t> remote_queue_index;
1094 if (message.has_remote_queue_index()) {
1095 remote_queue_index = message.remote_queue_index();
1096 }
1097
1098 new (unpacked_message) UnpackedMessageHeader{
1099 .channel_index = message.channel_index(),
1100 .monotonic_sent_time = monotonic_clock::time_point(
1101 chrono::nanoseconds(message.monotonic_sent_time())),
1102 .realtime_sent_time = realtime_clock::time_point(
1103 chrono::nanoseconds(message.realtime_sent_time())),
1104 .queue_index = message.queue_index(),
1105 .monotonic_remote_time = monotonic_remote_time,
1106 .realtime_remote_time = realtime_remote_time,
1107 .remote_queue_index = remote_queue_index,
1108 .monotonic_timestamp_time = monotonic_clock::time_point(
1109 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
1110 .has_monotonic_timestamp_time = message.has_monotonic_timestamp_time(),
1111 .span = span};
1112
1113 if (data_size > 0) {
1114 memcpy(span.data(), message.data()->data(), data_size);
1115 }
1116
1117 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
1118 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -08001119}
1120
Austin Schuhc41603c2020-10-11 16:17:37 -07001121PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001122 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001123 if (parts_.parts.size() >= 2) {
1124 next_message_reader_.emplace(parts_.parts[1]);
1125 }
Austin Schuh48507722021-07-17 17:29:24 -07001126 ComputeBootCounts();
1127}
1128
1129void PartsMessageReader::ComputeBootCounts() {
1130 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
1131 std::nullopt);
1132
1133 // We have 3 vintages of log files with different amounts of information.
1134 if (log_file_header()->has_boot_uuids()) {
1135 // The new hotness with the boots explicitly listed out. We can use the log
1136 // file header to compute the boot count of all relevant nodes.
1137 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
1138 size_t node_index = 0;
1139 for (const flatbuffers::String *boot_uuid :
1140 *log_file_header()->boot_uuids()) {
1141 CHECK(parts_.boots);
1142 if (boot_uuid->size() != 0) {
1143 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
1144 if (it != parts_.boots->boot_count_map.end()) {
1145 boot_counts_[node_index] = it->second;
1146 }
1147 } else if (parts().boots->boots[node_index].size() == 1u) {
1148 boot_counts_[node_index] = 0;
1149 }
1150 ++node_index;
1151 }
1152 } else {
1153 // Older multi-node logs which are guarenteed to have UUIDs logged, or
1154 // single node log files with boot UUIDs in the header. We only know how to
1155 // order certain boots in certain circumstances.
1156 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
1157 for (size_t node_index = 0; node_index < boot_counts_.size();
1158 ++node_index) {
1159 CHECK(parts_.boots);
1160 if (parts().boots->boots[node_index].size() == 1u) {
1161 boot_counts_[node_index] = 0;
1162 }
1163 }
1164 } else {
1165 // Really old single node logs without any UUIDs. They can't reboot.
1166 CHECK_EQ(boot_counts_.size(), 1u);
1167 boot_counts_[0] = 0u;
1168 }
1169 }
1170}
Austin Schuhc41603c2020-10-11 16:17:37 -07001171
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001172std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -07001173 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001174 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -07001175 message_reader_.ReadMessage();
1176 if (message) {
1177 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001178 const monotonic_clock::time_point monotonic_sent_time =
1179 message->monotonic_sent_time;
1180
1181 // TODO(austin): Does this work with startup? Might need to use the
1182 // start time.
1183 // TODO(austin): Does this work with startup when we don't know the
1184 // remote start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -08001185 if (monotonic_sent_time >
1186 parts_.monotonic_start_time + max_out_of_order_duration()) {
1187 after_start_ = true;
1188 }
1189 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -08001190 CHECK_GE(monotonic_sent_time,
1191 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -08001192 << ": Max out of order of " << max_out_of_order_duration().count()
1193 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -08001194 << parts_.monotonic_start_time << " currently reading "
1195 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -08001196 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001197 return message;
1198 }
1199 NextLog();
1200 }
Austin Schuh32f68492020-11-08 21:45:51 -08001201 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001202 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -07001203}
1204
1205void PartsMessageReader::NextLog() {
1206 if (next_part_index_ == parts_.parts.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001207 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -07001208 done_ = true;
1209 return;
1210 }
Brian Silvermanfee16972021-09-14 12:06:38 -07001211 CHECK(next_message_reader_);
1212 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -07001213 ComputeBootCounts();
Brian Silvermanfee16972021-09-14 12:06:38 -07001214 if (next_part_index_ + 1 < parts_.parts.size()) {
1215 next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
1216 } else {
1217 next_message_reader_.reset();
1218 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001219 ++next_part_index_;
1220}
1221
Austin Schuh1be0ce42020-11-29 22:43:26 -08001222bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001223 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001224
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001225 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001226 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001227 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001228 return false;
1229 }
1230
1231 if (this->channel_index < m2.channel_index) {
1232 return true;
1233 } else if (this->channel_index > m2.channel_index) {
1234 return false;
1235 }
1236
1237 return this->queue_index < m2.queue_index;
1238}
1239
1240bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001241bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001242 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001243
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001244 return timestamp.time == m2.timestamp.time &&
1245 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001246}
Austin Schuh1be0ce42020-11-29 22:43:26 -08001247
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001248std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
1249 os << "{.channel_index=" << m.channel_index
1250 << ", .monotonic_sent_time=" << m.monotonic_sent_time
1251 << ", .realtime_sent_time=" << m.realtime_sent_time
1252 << ", .queue_index=" << m.queue_index;
1253 if (m.monotonic_remote_time) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001254 os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001255 }
1256 os << ", .realtime_remote_time=";
1257 PrintOptionalOrNull(&os, m.realtime_remote_time);
1258 os << ", .remote_queue_index=";
1259 PrintOptionalOrNull(&os, m.remote_queue_index);
1260 if (m.has_monotonic_timestamp_time) {
1261 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1262 }
Austin Schuh22cf7862022-09-19 19:09:42 -07001263 os << "}";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001264 return os;
1265}
1266
Austin Schuh1be0ce42020-11-29 22:43:26 -08001267std::ostream &operator<<(std::ostream &os, const Message &m) {
1268 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001269 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001270 if (m.data != nullptr) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001271 if (m.data->remote_queue_index.has_value()) {
1272 os << ", .remote_queue_index=" << *m.data->remote_queue_index;
1273 }
1274 if (m.data->monotonic_remote_time.has_value()) {
1275 os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
1276 }
Austin Schuhfb1b3292021-11-16 21:20:15 -08001277 os << ", .data=" << m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -08001278 }
1279 os << "}";
1280 return os;
1281}
1282
1283std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
1284 os << "{.channel_index=" << m.channel_index
1285 << ", .queue_index=" << m.queue_index
1286 << ", .monotonic_event_time=" << m.monotonic_event_time
1287 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -07001288 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001289 os << ", .remote_queue_index=" << m.remote_queue_index;
1290 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001291 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001292 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
1293 }
1294 if (m.realtime_remote_time != realtime_clock::min_time) {
1295 os << ", .realtime_remote_time=" << m.realtime_remote_time;
1296 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001297 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001298 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1299 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001300 if (m.data != nullptr) {
1301 os << ", .data=" << *m.data;
Austin Schuh22cf7862022-09-19 19:09:42 -07001302 } else {
1303 os << ", .data=nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08001304 }
1305 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -08001306 return os;
1307}
1308
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001309LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001310 : parts_message_reader_(log_parts),
1311 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
1312}
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001313
1314Message *LogPartsSorter::Front() {
1315 // Queue up data until enough data has been queued that the front message is
1316 // sorted enough to be safe to pop. This may do nothing, so we should make
1317 // sure the nothing path is checked quickly.
1318 if (sorted_until() != monotonic_clock::max_time) {
1319 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -07001320 if (!messages_.empty() &&
1321 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -08001322 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001323 break;
1324 }
1325
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001326 std::shared_ptr<UnpackedMessageHeader> m =
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001327 parts_message_reader_.ReadMessage();
1328 // No data left, sorted forever, work through what is left.
1329 if (!m) {
1330 sorted_until_ = monotonic_clock::max_time;
1331 break;
1332 }
1333
Austin Schuh48507722021-07-17 17:29:24 -07001334 size_t monotonic_timestamp_boot = 0;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001335 if (m->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -07001336 monotonic_timestamp_boot = parts().logger_boot_count;
1337 }
1338 size_t monotonic_remote_boot = 0xffffff;
1339
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001340 if (m->monotonic_remote_time.has_value()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001341 const Node *node =
1342 parts().config->nodes()->Get(source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001343
Austin Schuh48507722021-07-17 17:29:24 -07001344 std::optional<size_t> boot = parts_message_reader_.boot_count(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001345 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001346 CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
Austin Schuh60e77942022-05-16 17:48:24 -07001347 << ", with index " << source_node_index_[m->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -07001348 monotonic_remote_boot = *boot;
1349 }
1350
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001351 messages_.insert(
1352 Message{.channel_index = m->channel_index,
1353 .queue_index = BootQueueIndex{.boot = parts().boot_count,
1354 .index = m->queue_index},
1355 .timestamp = BootTimestamp{.boot = parts().boot_count,
1356 .time = m->monotonic_sent_time},
1357 .monotonic_remote_boot = monotonic_remote_boot,
1358 .monotonic_timestamp_boot = monotonic_timestamp_boot,
1359 .data = std::move(m)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001360
1361 // Now, update sorted_until_ to match the new message.
1362 if (parts_message_reader_.newest_timestamp() >
1363 monotonic_clock::min_time +
1364 parts_message_reader_.max_out_of_order_duration()) {
1365 sorted_until_ = parts_message_reader_.newest_timestamp() -
1366 parts_message_reader_.max_out_of_order_duration();
1367 } else {
1368 sorted_until_ = monotonic_clock::min_time;
1369 }
1370 }
1371 }
1372
1373 // Now that we have enough data queued, return a pointer to the oldest piece
1374 // of data if it exists.
1375 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -08001376 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001377 return nullptr;
1378 }
1379
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001380 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -08001381 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001382 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001383 return &(*messages_.begin());
1384}
1385
1386void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
1387
1388std::string LogPartsSorter::DebugString() const {
1389 std::stringstream ss;
1390 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001391 int count = 0;
1392 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001393 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001394 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
1395 ss << m << "\n";
1396 } else if (no_dots) {
1397 ss << "...\n";
1398 no_dots = false;
1399 }
1400 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001401 }
1402 ss << "] <- " << parts_message_reader_.filename();
1403 return ss.str();
1404}
1405
Austin Schuhd2f96102020-12-01 20:27:29 -08001406NodeMerger::NodeMerger(std::vector<LogParts> parts) {
1407 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -07001408 // Enforce that we are sorting things only from a single node from a single
1409 // boot.
1410 const std::string_view part0_node = parts[0].node;
1411 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -08001412 for (size_t i = 1; i < parts.size(); ++i) {
1413 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -07001414 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
1415 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -08001416 }
Austin Schuh715adc12021-06-29 22:07:39 -07001417
1418 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
1419
Austin Schuhd2f96102020-12-01 20:27:29 -08001420 for (LogParts &part : parts) {
1421 parts_sorters_.emplace_back(std::move(part));
1422 }
1423
Austin Schuhd2f96102020-12-01 20:27:29 -08001424 monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh9dc42612021-09-20 20:41:29 -07001425 realtime_start_time_ = realtime_clock::min_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001426 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001427 // We want to capture the earliest meaningful start time here. The start
1428 // time defaults to min_time when there's no meaningful value to report, so
1429 // let's ignore those.
Austin Schuh9dc42612021-09-20 20:41:29 -07001430 if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time) {
1431 bool accept = false;
1432 // We want to prioritize start times from the logger node. Really, we
1433 // want to prioritize start times with a valid realtime_clock time. So,
1434 // if we have a start time without a RT clock, prefer a start time with a
1435 // RT clock, even it if is later.
1436 if (parts_sorter.realtime_start_time() != realtime_clock::min_time) {
1437 // We've got a good one. See if the current start time has a good RT
1438 // clock, or if we should use this one instead.
1439 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1440 accept = true;
1441 } else if (realtime_start_time_ == realtime_clock::min_time) {
1442 // The previous start time doesn't have a good RT time, so it is very
1443 // likely the start time from a remote part file. We just found a
1444 // better start time with a real RT time, so switch to that instead.
1445 accept = true;
1446 }
1447 } else if (realtime_start_time_ == realtime_clock::min_time) {
1448 // We don't have a RT time, so take the oldest.
1449 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1450 accept = true;
1451 }
1452 }
1453
1454 if (accept) {
1455 monotonic_start_time_ = parts_sorter.monotonic_start_time();
1456 realtime_start_time_ = parts_sorter.realtime_start_time();
1457 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001458 }
1459 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001460
1461 // If there was no meaningful start time reported, just use min_time.
1462 if (monotonic_start_time_ == monotonic_clock::max_time) {
1463 monotonic_start_time_ = monotonic_clock::min_time;
1464 realtime_start_time_ = realtime_clock::min_time;
1465 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001466}
Austin Schuh8f52ed52020-11-30 23:12:39 -08001467
Austin Schuh0ca51f32020-12-25 21:51:45 -08001468std::vector<const LogParts *> NodeMerger::Parts() const {
1469 std::vector<const LogParts *> p;
1470 p.reserve(parts_sorters_.size());
1471 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
1472 p.emplace_back(&parts_sorter.parts());
1473 }
1474 return p;
1475}
1476
Austin Schuh8f52ed52020-11-30 23:12:39 -08001477Message *NodeMerger::Front() {
1478 // Return the current Front if we have one, otherwise go compute one.
1479 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -08001480 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001481 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -08001482 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001483 }
1484
1485 // Otherwise, do a simple search for the oldest message, deduplicating any
1486 // duplicates.
1487 Message *oldest = nullptr;
1488 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001489 for (LogPartsSorter &parts_sorter : parts_sorters_) {
1490 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -08001491 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001492 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001493 continue;
1494 }
1495 if (oldest == nullptr || *m < *oldest) {
1496 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -08001497 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001498 } else if (*m == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001499 // Found a duplicate. If there is a choice, we want the one which has
1500 // the timestamp time.
1501 if (!m->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001502 parts_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001503 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001504 current_->PopFront();
1505 current_ = &parts_sorter;
1506 oldest = m;
1507 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001508 CHECK_EQ(m->data->monotonic_timestamp_time,
1509 oldest->data->monotonic_timestamp_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001510 parts_sorter.PopFront();
1511 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001512 }
1513
1514 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -08001515 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001516 }
1517
Austin Schuhb000de62020-12-03 22:00:40 -08001518 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001519 CHECK_GE(oldest->timestamp.time, last_message_time_);
1520 last_message_time_ = oldest->timestamp.time;
Austin Schuh5dd22842021-11-17 16:09:39 -08001521 monotonic_oldest_time_ =
1522 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08001523 } else {
1524 last_message_time_ = monotonic_clock::max_time;
1525 }
1526
Austin Schuh8f52ed52020-11-30 23:12:39 -08001527 // Return the oldest message found. This will be nullptr if nothing was
1528 // found, indicating there is nothing left.
1529 return oldest;
1530}
1531
1532void NodeMerger::PopFront() {
1533 CHECK(current_ != nullptr) << "Popping before calling Front()";
1534 current_->PopFront();
1535 current_ = nullptr;
1536}
1537
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001538BootMerger::BootMerger(std::vector<LogParts> files) {
1539 std::vector<std::vector<LogParts>> boots;
1540
1541 // Now, we need to split things out by boot.
1542 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001543 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001544 if (boot_count + 1 > boots.size()) {
1545 boots.resize(boot_count + 1);
1546 }
1547 boots[boot_count].emplace_back(std::move(files[i]));
1548 }
1549
1550 node_mergers_.reserve(boots.size());
1551 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07001552 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001553 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -07001554 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001555 }
1556 node_mergers_.emplace_back(
1557 std::make_unique<NodeMerger>(std::move(boots[i])));
1558 }
1559}
1560
1561Message *BootMerger::Front() {
1562 Message *result = node_mergers_[index_]->Front();
1563
1564 if (result != nullptr) {
1565 return result;
1566 }
1567
1568 if (index_ + 1u == node_mergers_.size()) {
1569 // At the end of the last node merger, just return.
1570 return nullptr;
1571 } else {
1572 ++index_;
1573 return Front();
1574 }
1575}
1576
1577void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
1578
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001579std::vector<const LogParts *> BootMerger::Parts() const {
1580 std::vector<const LogParts *> results;
1581 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
1582 std::vector<const LogParts *> node_parts = node_merger->Parts();
1583
1584 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
1585 std::make_move_iterator(node_parts.end()));
1586 }
1587
1588 return results;
1589}
1590
Austin Schuhd2f96102020-12-01 20:27:29 -08001591TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001592 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -08001593 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001594 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001595 if (!configuration_) {
1596 configuration_ = part->config;
1597 } else {
1598 CHECK_EQ(configuration_.get(), part->config.get());
1599 }
1600 }
1601 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -08001602 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
1603 // pretty simple.
1604 if (configuration::MultiNode(config)) {
1605 nodes_data_.resize(config->nodes()->size());
1606 const Node *my_node = config->nodes()->Get(node());
1607 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
1608 const Node *node = config->nodes()->Get(node_index);
1609 NodeData *node_data = &nodes_data_[node_index];
1610 node_data->channels.resize(config->channels()->size());
1611 // We should save the channel if it is delivered to the node represented
1612 // by the NodeData, but not sent by that node. That combo means it is
1613 // forwarded.
1614 size_t channel_index = 0;
1615 node_data->any_delivered = false;
1616 for (const Channel *channel : *config->channels()) {
1617 node_data->channels[channel_index].delivered =
1618 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08001619 configuration::ChannelIsSendableOnNode(channel, my_node) &&
1620 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08001621 node_data->any_delivered = node_data->any_delivered ||
1622 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08001623 if (node_data->channels[channel_index].delivered) {
1624 const Connection *connection =
1625 configuration::ConnectionToNode(channel, node);
1626 node_data->channels[channel_index].time_to_live =
1627 chrono::nanoseconds(connection->time_to_live());
1628 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001629 ++channel_index;
1630 }
1631 }
1632
1633 for (const Channel *channel : *config->channels()) {
1634 source_node_.emplace_back(configuration::GetNodeIndex(
1635 config, channel->source_node()->string_view()));
1636 }
1637 }
1638}
1639
1640void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001641 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08001642 CHECK_NE(timestamp_mapper->node(), node());
1643 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
1644
1645 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001646 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08001647 // we could needlessly save data.
1648 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08001649 VLOG(1) << "Registering on node " << node() << " for peer node "
1650 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08001651 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
1652
1653 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07001654
1655 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001656 }
1657}
1658
Austin Schuh79b30942021-01-24 22:32:21 -08001659void TimestampMapper::QueueMessage(Message *m) {
Austin Schuh60e77942022-05-16 17:48:24 -07001660 matched_messages_.emplace_back(
1661 TimestampedMessage{.channel_index = m->channel_index,
1662 .queue_index = m->queue_index,
1663 .monotonic_event_time = m->timestamp,
1664 .realtime_event_time = m->data->realtime_sent_time,
1665 .remote_queue_index = BootQueueIndex::Invalid(),
1666 .monotonic_remote_time = BootTimestamp::min_time(),
1667 .realtime_remote_time = realtime_clock::min_time,
1668 .monotonic_timestamp_time = BootTimestamp::min_time(),
1669 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08001670}
1671
1672TimestampedMessage *TimestampMapper::Front() {
1673 // No need to fetch anything new. A previous message still exists.
1674 switch (first_message_) {
1675 case FirstMessage::kNeedsUpdate:
1676 break;
1677 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08001678 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001679 case FirstMessage::kNullptr:
1680 return nullptr;
1681 }
1682
Austin Schuh79b30942021-01-24 22:32:21 -08001683 if (matched_messages_.empty()) {
1684 if (!QueueMatched()) {
1685 first_message_ = FirstMessage::kNullptr;
1686 return nullptr;
1687 }
1688 }
1689 first_message_ = FirstMessage::kInMessage;
1690 return &matched_messages_.front();
1691}
1692
1693bool TimestampMapper::QueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08001694 if (nodes_data_.empty()) {
1695 // Simple path. We are single node, so there are no timestamps to match!
1696 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001697 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001698 if (!m) {
Austin Schuh79b30942021-01-24 22:32:21 -08001699 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001700 }
Austin Schuh79b30942021-01-24 22:32:21 -08001701 // Enqueue this message into matched_messages_ so we have a place to
1702 // associate remote timestamps, and return it.
1703 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08001704
Austin Schuh79b30942021-01-24 22:32:21 -08001705 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1706 last_message_time_ = matched_messages_.back().monotonic_event_time;
1707
1708 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001709 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08001710 timestamp_callback_(&matched_messages_.back());
1711 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001712 }
1713
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001714 // We need to only add messages to the list so they get processed for
1715 // messages which are delivered. Reuse the flow below which uses messages_
1716 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08001717 if (messages_.empty()) {
1718 if (!Queue()) {
1719 // Found nothing to add, we are out of data!
Austin Schuh79b30942021-01-24 22:32:21 -08001720 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001721 }
1722
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001723 // Now that it has been added (and cannibalized), forget about it
1724 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001725 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001726 }
1727
1728 Message *m = &(messages_.front());
1729
1730 if (source_node_[m->channel_index] == node()) {
1731 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08001732 QueueMessage(m);
1733 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1734 last_message_time_ = matched_messages_.back().monotonic_event_time;
1735 messages_.pop_front();
1736 timestamp_callback_(&matched_messages_.back());
1737 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001738 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001739 // Got a timestamp, find the matching remote data, match it, and return
1740 // it.
Austin Schuhd2f96102020-12-01 20:27:29 -08001741 Message data = MatchingMessageFor(*m);
1742
1743 // Return the data from the remote. The local message only has timestamp
1744 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08001745 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08001746 .channel_index = m->channel_index,
1747 .queue_index = m->queue_index,
1748 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001749 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07001750 .remote_queue_index =
1751 BootQueueIndex{.boot = m->monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001752 .index = m->data->remote_queue_index.value()},
1753 .monotonic_remote_time = {m->monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08001754 m->data->monotonic_remote_time.value()},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001755 .realtime_remote_time = m->data->realtime_remote_time.value(),
1756 .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
1757 m->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08001758 .data = std::move(data.data)});
1759 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1760 last_message_time_ = matched_messages_.back().monotonic_event_time;
1761 // Since messages_ holds the data, drop it.
1762 messages_.pop_front();
1763 timestamp_callback_(&matched_messages_.back());
1764 return true;
1765 }
1766}
1767
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001768void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08001769 while (last_message_time_ <= queue_time) {
1770 if (!QueueMatched()) {
1771 return;
1772 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001773 }
1774}
1775
Austin Schuhe639ea12021-01-25 13:00:22 -08001776void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001777 // Note: queueing for time doesn't really work well across boots. So we
1778 // just assume that if you are using this, you only care about the current
1779 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001780 //
1781 // TODO(austin): Is that the right concept?
1782 //
Austin Schuhe639ea12021-01-25 13:00:22 -08001783 // Make sure we have something queued first. This makes the end time
1784 // calculation simpler, and is typically what folks want regardless.
1785 if (matched_messages_.empty()) {
1786 if (!QueueMatched()) {
1787 return;
1788 }
1789 }
1790
1791 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001792 std::max(monotonic_start_time(
1793 matched_messages_.front().monotonic_event_time.boot),
1794 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08001795 time_estimation_buffer;
1796
1797 // Place sorted messages on the list until we have
1798 // --time_estimation_buffer_seconds seconds queued up (but queue at least
1799 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001800 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08001801 if (!QueueMatched()) {
1802 return;
1803 }
1804 }
1805}
1806
Austin Schuhd2f96102020-12-01 20:27:29 -08001807void TimestampMapper::PopFront() {
1808 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08001809 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001810 first_message_ = FirstMessage::kNeedsUpdate;
1811
Austin Schuh79b30942021-01-24 22:32:21 -08001812 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001813}
1814
1815Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001816 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001817 CHECK_NOTNULL(message.data);
1818 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07001819 const BootQueueIndex remote_queue_index =
1820 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001821 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08001822
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001823 CHECK(message.data->monotonic_remote_time.has_value());
1824 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08001825
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001826 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07001827 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08001828 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001829 const realtime_clock::time_point realtime_remote_time =
1830 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001831
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001832 TimestampMapper *peer =
1833 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001834
1835 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001836 // asked to pull a timestamp from a peer which doesn't exist, return an
1837 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08001838 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001839 // TODO(austin): Make sure the tests hit all these paths with a boot count
1840 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001841 return Message{.channel_index = message.channel_index,
1842 .queue_index = remote_queue_index,
1843 .timestamp = monotonic_remote_time,
1844 .monotonic_remote_boot = 0xffffff,
1845 .monotonic_timestamp_boot = 0xffffff,
1846 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08001847 }
1848
1849 // The queue which will have the matching data, if available.
1850 std::deque<Message> *data_queue =
1851 &peer->nodes_data_[node()].channels[message.channel_index].messages;
1852
Austin Schuh79b30942021-01-24 22:32:21 -08001853 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08001854
1855 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001856 return Message{.channel_index = message.channel_index,
1857 .queue_index = remote_queue_index,
1858 .timestamp = monotonic_remote_time,
1859 .monotonic_remote_boot = 0xffffff,
1860 .monotonic_timestamp_boot = 0xffffff,
1861 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08001862 }
1863
Austin Schuhd2f96102020-12-01 20:27:29 -08001864 if (remote_queue_index < data_queue->front().queue_index ||
1865 remote_queue_index > data_queue->back().queue_index) {
Austin Schuh60e77942022-05-16 17:48:24 -07001866 return Message{.channel_index = message.channel_index,
1867 .queue_index = remote_queue_index,
1868 .timestamp = monotonic_remote_time,
1869 .monotonic_remote_boot = 0xffffff,
1870 .monotonic_timestamp_boot = 0xffffff,
1871 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08001872 }
1873
Austin Schuh993ccb52020-12-12 15:59:32 -08001874 // The algorithm below is constant time with some assumptions. We need there
1875 // to be no missing messages in the data stream. This also assumes a queue
1876 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07001877 if (data_queue->back().queue_index.boot ==
1878 data_queue->front().queue_index.boot &&
1879 (data_queue->back().queue_index.index -
1880 data_queue->front().queue_index.index + 1u ==
1881 data_queue->size())) {
1882 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08001883 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07001884 //
1885 // TODO(austin): Move if not reliable.
1886 Message result = (*data_queue)[remote_queue_index.index -
1887 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08001888
1889 CHECK_EQ(result.timestamp, monotonic_remote_time)
1890 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh6a7358f2021-11-18 22:40:40 -08001891 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08001892 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1893 // Now drop the data off the front. We have deduplicated timestamps, so we
1894 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07001895 data_queue->erase(
1896 data_queue->begin(),
1897 data_queue->begin() +
1898 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08001899 return result;
1900 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07001901 // TODO(austin): Binary search.
1902 auto it = std::find_if(
1903 data_queue->begin(), data_queue->end(),
1904 [remote_queue_index,
1905 remote_boot = monotonic_remote_time.boot](const Message &m) {
1906 return m.queue_index == remote_queue_index &&
1907 m.timestamp.boot == remote_boot;
1908 });
Austin Schuh993ccb52020-12-12 15:59:32 -08001909 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001910 return Message{.channel_index = message.channel_index,
1911 .queue_index = remote_queue_index,
1912 .timestamp = monotonic_remote_time,
1913 .monotonic_remote_boot = 0xffffff,
1914 .monotonic_timestamp_boot = 0xffffff,
1915 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08001916 }
1917
1918 Message result = std::move(*it);
1919
1920 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001921 << ": Queue index matches, but timestamp doesn't. Please "
1922 "investigate!";
1923 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
1924 << ": Queue index matches, but timestamp doesn't. Please "
1925 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08001926
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08001927 // Erase everything up to this message. We want to keep 1 message in the
1928 // queue so we can handle reliable messages forwarded across boots.
1929 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08001930
1931 return result;
1932 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001933}
1934
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001935void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001936 if (queued_until_ > t) {
1937 return;
1938 }
1939 while (true) {
1940 if (!messages_.empty() && messages_.back().timestamp > t) {
1941 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
1942 return;
1943 }
1944
1945 if (!Queue()) {
1946 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001947 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08001948 return;
1949 }
1950
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001951 // Now that it has been added (and cannibalized), forget about it
1952 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001953 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001954 }
1955}
1956
1957bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001958 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001959 if (m == nullptr) {
1960 return false;
1961 }
1962 for (NodeData &node_data : nodes_data_) {
1963 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07001964 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08001965 if (node_data.channels[m->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08001966 // If we have data but no timestamps (logs where the timestamps didn't get
1967 // logged are classic), we can grow this indefinitely. We don't need to
1968 // keep anything that is older than the last message returned.
1969
1970 // We have the time on the source node.
1971 // We care to wait until we have the time on the destination node.
1972 std::deque<Message> &messages =
1973 node_data.channels[m->channel_index].messages;
1974 // Max delay over the network is the TTL, so let's take the queue time and
1975 // add TTL to it. Don't forget any messages which are reliable until
1976 // someone can come up with a good reason to forget those too.
1977 if (node_data.channels[m->channel_index].time_to_live >
1978 chrono::nanoseconds(0)) {
1979 // We need to make *some* assumptions about network delay for this to
1980 // work. We want to only look at the RX side. This means we need to
1981 // track the last time a message was popped from any channel from the
1982 // node sending this message, and compare that to the max time we expect
1983 // that a message will take to be delivered across the network. This
1984 // assumes that messages are popped in time order as a proxy for
1985 // measuring the distributed time at this layer.
1986 //
1987 // Leave at least 1 message in here so we can handle reboots and
1988 // messages getting sent twice.
1989 while (messages.size() > 1u &&
1990 messages.begin()->timestamp +
1991 node_data.channels[m->channel_index].time_to_live +
1992 chrono::duration_cast<chrono::nanoseconds>(
1993 chrono::duration<double>(FLAGS_max_network_delay)) <
1994 last_popped_message_time_) {
1995 messages.pop_front();
1996 }
1997 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001998 node_data.channels[m->channel_index].messages.emplace_back(*m);
1999 }
2000 }
2001
2002 messages_.emplace_back(std::move(*m));
2003 return true;
2004}
2005
2006std::string TimestampMapper::DebugString() const {
2007 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07002008 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002009 for (const Message &message : messages_) {
2010 ss << " " << message << "\n";
2011 }
2012 ss << "] queued_until " << queued_until_;
2013 for (const NodeData &ns : nodes_data_) {
2014 if (ns.peer == nullptr) continue;
2015 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
2016 size_t channel_index = 0;
2017 for (const NodeData::ChannelData &channel_data :
2018 ns.peer->nodes_data_[node()].channels) {
2019 if (channel_data.messages.empty()) {
2020 continue;
2021 }
Austin Schuhb000de62020-12-03 22:00:40 -08002022
Austin Schuhd2f96102020-12-01 20:27:29 -08002023 ss << " channel " << channel_index << " [\n";
2024 for (const Message &m : channel_data.messages) {
2025 ss << " " << m << "\n";
2026 }
2027 ss << " ]\n";
2028 ++channel_index;
2029 }
2030 ss << "] queued_until " << ns.peer->queued_until_;
2031 }
2032 return ss.str();
2033}
2034
Austin Schuhee711052020-08-24 16:06:09 -07002035std::string MaybeNodeName(const Node *node) {
2036 if (node != nullptr) {
2037 return node->name()->str() + " ";
2038 }
2039 return "";
2040}
2041
Brian Silvermanf51499a2020-09-21 12:49:08 -07002042} // namespace aos::logger