blob: ccdb7b82dd67feaa3a1d0c67a98b66e2c301dbc3 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Austin Schuha36c8902019-12-30 18:07:15 -080010
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070013#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080014#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080015#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "gflags/gflags.h"
18#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080019
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070020#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070021#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070022#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070023#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070024#else
25#define ENABLE_LZMA 0
26#endif
27
28#if ENABLE_LZMA
29#include "aos/events/logging/lzma_encoder.h"
30#endif
31
Austin Schuh7fbf5a72020-09-21 16:28:13 -070032DEFINE_int32(flush_size, 128000,
Austin Schuha36c8902019-12-30 18:07:15 -080033 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070034DEFINE_double(
35 flush_period, 5.0,
36 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080037
Austin Schuha040c3f2021-02-13 16:09:07 -080038DEFINE_double(
39 max_out_of_order, -1,
40 "If set, this overrides the max out of order duration for a log file.");
41
Austin Schuh0e8db662021-07-06 10:43:47 -070042DEFINE_bool(workaround_double_headers, true,
43 "Some old log files have two headers at the beginning. Use the "
44 "last header as the actual header.");
45
Brian Silvermanf51499a2020-09-21 12:49:08 -070046namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070047namespace {
Austin Schuha36c8902019-12-30 18:07:15 -080048
Austin Schuh05b70472020-01-01 17:11:17 -080049namespace chrono = std::chrono;
50
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070051template <typename T>
52void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
53 if (t.has_value()) {
54 *os << *t;
55 } else {
56 *os << "null";
57 }
58}
59} // namespace
60
Brian Silvermanf51499a2020-09-21 12:49:08 -070061DetachedBufferWriter::DetachedBufferWriter(
62 std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
63 : filename_(filename), encoder_(std::move(encoder)) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -070064 if (!util::MkdirPIfSpace(filename, 0777)) {
65 ran_out_of_space_ = true;
66 } else {
67 fd_ = open(std::string(filename).c_str(),
68 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
69 if (fd_ == -1 && errno == ENOSPC) {
70 ran_out_of_space_ = true;
71 } else {
Austin Schuh58646e22021-08-23 23:51:46 -070072 PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
73 << " for writing";
74 VLOG(1) << "Opened " << this->filename() << " for writing";
Brian Silvermana9f2ec92020-10-06 18:00:53 -070075 }
76 }
Austin Schuha36c8902019-12-30 18:07:15 -080077}
78
79DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -070080 Close();
81 if (ran_out_of_space_) {
82 CHECK(acknowledge_ran_out_of_space_)
83 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -070084 }
Austin Schuh2f8fd752020-09-01 22:38:28 -070085}
86
Brian Silvermand90905f2020-09-23 14:42:56 -070087DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070088 *this = std::move(other);
89}
90
Brian Silverman87ac0402020-09-17 14:47:01 -070091// When other is destroyed "soon" (which it should be because we're getting an
92// rvalue reference to it), it will flush etc all the data we have queued up
93// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -070094DetachedBufferWriter &DetachedBufferWriter::operator=(
95 DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070096 std::swap(filename_, other.filename_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070097 std::swap(encoder_, other.encoder_);
Austin Schuh2f8fd752020-09-01 22:38:28 -070098 std::swap(fd_, other.fd_);
Brian Silverman0465fcf2020-09-24 00:29:18 -070099 std::swap(ran_out_of_space_, other.ran_out_of_space_);
100 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700101 std::swap(iovec_, other.iovec_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700102 std::swap(max_write_time_, other.max_write_time_);
103 std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
104 std::swap(max_write_time_messages_, other.max_write_time_messages_);
105 std::swap(total_write_time_, other.total_write_time_);
106 std::swap(total_write_count_, other.total_write_count_);
107 std::swap(total_write_messages_, other.total_write_messages_);
108 std::swap(total_write_bytes_, other.total_write_bytes_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700109 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800110}
111
Brian Silvermanf51499a2020-09-21 12:49:08 -0700112void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700113 if (ran_out_of_space_) {
114 // We don't want any later data to be written after space becomes
115 // available, so refuse to write anything more once we've dropped data
116 // because we ran out of space.
117 VLOG(1) << "Ignoring span: " << span.size();
118 return;
119 }
120
Austin Schuhbd06ae42021-03-31 22:48:21 -0700121 aos::monotonic_clock::time_point now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700122 if (encoder_->may_bypass() && span.size() > 4096u) {
123 // Over this threshold, we'll assume it's cheaper to add an extra
124 // syscall to write the data immediately instead of copying it to
125 // enqueue.
Austin Schuha36c8902019-12-30 18:07:15 -0800126
Brian Silvermanf51499a2020-09-21 12:49:08 -0700127 // First, flush everything.
128 while (encoder_->queue_size() > 0u) {
129 Flush();
130 }
Austin Schuhde031b72020-01-10 19:34:41 -0800131
Brian Silvermanf51499a2020-09-21 12:49:08 -0700132 // Then, write it directly.
133 const auto start = aos::monotonic_clock::now();
134 const ssize_t written = write(fd_, span.data(), span.size());
135 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700136 HandleWriteReturn(written, span.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700137 UpdateStatsForWrite(end - start, written, 1);
Austin Schuhbd06ae42021-03-31 22:48:21 -0700138 now = end;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700139 } else {
140 encoder_->Encode(CopySpanAsDetachedBuffer(span));
Austin Schuhbd06ae42021-03-31 22:48:21 -0700141 now = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800142 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700143
Austin Schuhbd06ae42021-03-31 22:48:21 -0700144 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800145}
146
Brian Silverman0465fcf2020-09-24 00:29:18 -0700147void DetachedBufferWriter::Close() {
148 if (fd_ == -1) {
149 return;
150 }
151 encoder_->Finish();
152 while (encoder_->queue_size() > 0) {
153 Flush();
154 }
155 if (close(fd_) == -1) {
156 if (errno == ENOSPC) {
157 ran_out_of_space_ = true;
158 } else {
159 PLOG(ERROR) << "Closing log file failed";
160 }
161 }
162 fd_ = -1;
Austin Schuh58646e22021-08-23 23:51:46 -0700163 VLOG(1) << "Closed " << filename();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700164}
165
Austin Schuha36c8902019-12-30 18:07:15 -0800166void DetachedBufferWriter::Flush() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700167 if (ran_out_of_space_) {
168 // We don't want any later data to be written after space becomes available,
169 // so refuse to write anything more once we've dropped data because we ran
170 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700171 if (encoder_) {
172 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
173 encoder_->Clear(encoder_->queue().size());
174 } else {
175 VLOG(1) << "No queue to ignore";
176 }
177 return;
178 }
179
180 const auto queue = encoder_->queue();
181 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700182 return;
183 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700184
Austin Schuha36c8902019-12-30 18:07:15 -0800185 iovec_.clear();
Brian Silvermanf51499a2020-09-21 12:49:08 -0700186 const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
187 iovec_.resize(iovec_size);
Austin Schuha36c8902019-12-30 18:07:15 -0800188 size_t counted_size = 0;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700189 for (size_t i = 0; i < iovec_size; ++i) {
190 iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
191 iovec_[i].iov_len = queue[i].size();
192 counted_size += iovec_[i].iov_len;
Austin Schuha36c8902019-12-30 18:07:15 -0800193 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700194
195 const auto start = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800196 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700197 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700198 HandleWriteReturn(written, counted_size);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700199
200 encoder_->Clear(iovec_size);
201
202 UpdateStatsForWrite(end - start, written, iovec_size);
203}
204
Brian Silverman0465fcf2020-09-24 00:29:18 -0700205void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
206 size_t write_size) {
207 if (write_return == -1 && errno == ENOSPC) {
208 ran_out_of_space_ = true;
209 return;
210 }
211 PCHECK(write_return >= 0) << ": write failed";
212 if (write_return < static_cast<ssize_t>(write_size)) {
213 // Sometimes this happens instead of ENOSPC. On a real filesystem, this
214 // never seems to happen in any other case. If we ever want to log to a
215 // socket, this will happen more often. However, until we get there, we'll
216 // just assume it means we ran out of space.
217 ran_out_of_space_ = true;
218 return;
219 }
220}
221
Brian Silvermanf51499a2020-09-21 12:49:08 -0700222void DetachedBufferWriter::UpdateStatsForWrite(
223 aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
224 if (duration > max_write_time_) {
225 max_write_time_ = duration;
226 max_write_time_bytes_ = written;
227 max_write_time_messages_ = iovec_size;
228 }
229 total_write_time_ += duration;
230 ++total_write_count_;
231 total_write_messages_ += iovec_size;
232 total_write_bytes_ += written;
233}
234
Austin Schuhbd06ae42021-03-31 22:48:21 -0700235void DetachedBufferWriter::FlushAtThreshold(
236 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700237 if (ran_out_of_space_) {
238 // We don't want any later data to be written after space becomes available,
239 // so refuse to write anything more once we've dropped data because we ran
240 // out of space.
241 if (encoder_) {
242 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
243 encoder_->Clear(encoder_->queue().size());
244 } else {
245 VLOG(1) << "No queue to ignore";
246 }
247 return;
248 }
249
Austin Schuhbd06ae42021-03-31 22:48:21 -0700250 // We don't want to flush the first time through. Otherwise we will flush as
251 // the log file header might be compressing, defeating any parallelism and
252 // queueing there.
253 if (last_flush_time_ == aos::monotonic_clock::min_time) {
254 last_flush_time_ = now;
255 }
256
Brian Silvermanf51499a2020-09-21 12:49:08 -0700257 // Flush if we are at the max number of iovs per writev, because there's no
258 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700259 // data queued up or if it has been long enough.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700260 while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700261 encoder_->queue_size() >= IOV_MAX ||
262 now > last_flush_time_ +
263 chrono::duration_cast<chrono::nanoseconds>(
264 chrono::duration<double>(FLAGS_flush_period))) {
265 last_flush_time_ = now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700266 Flush();
267 }
Austin Schuha36c8902019-12-30 18:07:15 -0800268}
269
270flatbuffers::Offset<MessageHeader> PackMessage(
271 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
272 int channel_index, LogType log_type) {
273 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
274
275 switch (log_type) {
276 case LogType::kLogMessage:
277 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800278 case LogType::kLogRemoteMessage:
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700279 data_offset = fbb->CreateVector(
280 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800281 break;
282
283 case LogType::kLogDeliveryTimeOnly:
284 break;
285 }
286
287 MessageHeader::Builder message_header_builder(*fbb);
288 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800289
290 switch (log_type) {
291 case LogType::kLogRemoteMessage:
292 message_header_builder.add_queue_index(context.remote_queue_index);
293 message_header_builder.add_monotonic_sent_time(
294 context.monotonic_remote_time.time_since_epoch().count());
295 message_header_builder.add_realtime_sent_time(
296 context.realtime_remote_time.time_since_epoch().count());
297 break;
298
299 case LogType::kLogMessage:
300 case LogType::kLogMessageAndDeliveryTime:
301 case LogType::kLogDeliveryTimeOnly:
302 message_header_builder.add_queue_index(context.queue_index);
303 message_header_builder.add_monotonic_sent_time(
304 context.monotonic_event_time.time_since_epoch().count());
305 message_header_builder.add_realtime_sent_time(
306 context.realtime_event_time.time_since_epoch().count());
307 break;
308 }
Austin Schuha36c8902019-12-30 18:07:15 -0800309
310 switch (log_type) {
311 case LogType::kLogMessage:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800312 case LogType::kLogRemoteMessage:
Austin Schuha36c8902019-12-30 18:07:15 -0800313 message_header_builder.add_data(data_offset);
314 break;
315
316 case LogType::kLogMessageAndDeliveryTime:
317 message_header_builder.add_data(data_offset);
318 [[fallthrough]];
319
320 case LogType::kLogDeliveryTimeOnly:
321 message_header_builder.add_monotonic_remote_time(
322 context.monotonic_remote_time.time_since_epoch().count());
323 message_header_builder.add_realtime_remote_time(
324 context.realtime_remote_time.time_since_epoch().count());
325 message_header_builder.add_remote_queue_index(context.remote_queue_index);
326 break;
327 }
328
329 return message_header_builder.Finish();
330}
331
Brian Silvermanf51499a2020-09-21 12:49:08 -0700332SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
Tyler Chatow2015bc62021-08-04 21:15:09 -0700333 decoder_ = std::make_unique<DummyDecoder>(filename);
334
335 static constexpr std::string_view kXz = ".xz";
James Kuszmauldd0a5042021-10-28 23:38:04 -0700336 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700337 if (filename.substr(filename.size() - kXz.size()) == kXz) {
338#if ENABLE_LZMA
Tyler Chatow2015bc62021-08-04 21:15:09 -0700339 decoder_ = std::make_unique<ThreadedLzmaDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700340#else
341 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
342#endif
James Kuszmauldd0a5042021-10-28 23:38:04 -0700343 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
344 decoder_ = std::make_unique<SnappyDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700345 }
Austin Schuh05b70472020-01-01 17:11:17 -0800346}
347
Austin Schuhcf5f6442021-07-06 10:43:28 -0700348absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800349 // Make sure we have enough for the size.
350 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
351 if (!ReadBlock()) {
352 return absl::Span<const uint8_t>();
353 }
354 }
355
356 // Now make sure we have enough for the message.
357 const size_t data_size =
358 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
359 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -0800360 if (data_size == sizeof(flatbuffers::uoffset_t)) {
361 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
362 LOG(ERROR) << " Rest of log file is "
363 << absl::BytesToHexString(std::string_view(
364 reinterpret_cast<const char *>(data_.data() +
365 consumed_data_),
366 data_.size() - consumed_data_));
367 return absl::Span<const uint8_t>();
368 }
Austin Schuh05b70472020-01-01 17:11:17 -0800369 while (data_.size() < consumed_data_ + data_size) {
370 if (!ReadBlock()) {
371 return absl::Span<const uint8_t>();
372 }
373 }
374
375 // And return it, consuming the data.
376 const uint8_t *data_ptr = data_.data() + consumed_data_;
377
Austin Schuh05b70472020-01-01 17:11:17 -0800378 return absl::Span<const uint8_t>(data_ptr, data_size);
379}
380
Austin Schuhcf5f6442021-07-06 10:43:28 -0700381void SpanReader::ConsumeMessage() {
382 consumed_data_ +=
383 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
384 sizeof(flatbuffers::uoffset_t);
385}
386
387absl::Span<const uint8_t> SpanReader::ReadMessage() {
388 absl::Span<const uint8_t> result = PeekMessage();
389 if (result != absl::Span<const uint8_t>()) {
390 ConsumeMessage();
391 }
392 return result;
393}
394
Austin Schuh05b70472020-01-01 17:11:17 -0800395bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700396 // This is the amount of data we grab at a time. Doing larger chunks minimizes
397 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -0800398 constexpr size_t kReadSize = 256 * 1024;
399
400 // Strip off any unused data at the front.
401 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700402 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -0800403 consumed_data_ = 0;
404 }
405
406 const size_t starting_size = data_.size();
407
408 // This should automatically grow the backing store. It won't shrink if we
409 // get a small chunk later. This reduces allocations when we want to append
410 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700411 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -0800412
Brian Silvermanf51499a2020-09-21 12:49:08 -0700413 const size_t count =
414 decoder_->Read(data_.begin() + starting_size, data_.end());
415 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -0800416 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -0800417 return false;
418 }
Austin Schuh05b70472020-01-01 17:11:17 -0800419
420 return true;
421}
422
Austin Schuhadd6eb32020-11-09 21:24:26 -0800423std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -0700424 SpanReader *span_reader) {
425 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800426
427 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800428 if (config_data == absl::Span<const uint8_t>()) {
429 return std::nullopt;
430 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800431
Austin Schuh5212cad2020-09-09 23:12:09 -0700432 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700433 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -0800434 if (!result.Verify()) {
435 return std::nullopt;
436 }
Austin Schuh0e8db662021-07-06 10:43:47 -0700437
438 if (FLAGS_workaround_double_headers) {
439 while (true) {
440 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
441 if (maybe_header_data == absl::Span<const uint8_t>()) {
442 break;
443 }
444
445 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
446 maybe_header_data);
447 if (maybe_header.Verify()) {
448 LOG(WARNING) << "Found duplicate LogFileHeader in "
449 << span_reader->filename();
450 ResizeableBuffer header_data_copy;
451 header_data_copy.resize(maybe_header_data.size());
452 memcpy(header_data_copy.data(), maybe_header_data.begin(),
453 header_data_copy.size());
454 result = SizePrefixedFlatbufferVector<LogFileHeader>(
455 std::move(header_data_copy));
456
457 span_reader->ConsumeMessage();
458 } else {
459 break;
460 }
461 }
462 }
Austin Schuhe09beb12020-12-11 20:04:27 -0800463 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800464}
465
Austin Schuh0e8db662021-07-06 10:43:47 -0700466std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
467 std::string_view filename) {
468 SpanReader span_reader(filename);
469 return ReadHeader(&span_reader);
470}
471
Austin Schuhadd6eb32020-11-09 21:24:26 -0800472std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -0800473 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -0700474 SpanReader span_reader(filename);
475 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
476 for (size_t i = 0; i < n + 1; ++i) {
477 data_span = span_reader.ReadMessage();
478
479 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800480 if (data_span == absl::Span<const uint8_t>()) {
481 return std::nullopt;
482 }
Austin Schuh5212cad2020-09-09 23:12:09 -0700483 }
484
Brian Silverman354697a2020-09-22 21:06:32 -0700485 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700486 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -0800487 if (!result.Verify()) {
488 return std::nullopt;
489 }
490 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -0700491}
492
Austin Schuh05b70472020-01-01 17:11:17 -0800493MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -0700494 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -0800495 raw_log_file_header_(
496 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -0700497 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
498 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -0800499
500 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -0700501 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -0800502
Austin Schuh0e8db662021-07-06 10:43:47 -0700503 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -0800504
Austin Schuh5b728b72021-06-16 14:57:15 -0700505 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
506
Austin Schuhcde938c2020-02-02 17:30:07 -0800507 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -0800508 FLAGS_max_out_of_order > 0
509 ? chrono::duration_cast<chrono::nanoseconds>(
510 chrono::duration<double>(FLAGS_max_out_of_order))
511 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -0800512
513 VLOG(1) << "Opened " << filename << " as node "
514 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -0800515}
516
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700517std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800518 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
519 if (msg_data == absl::Span<const uint8_t>()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700520 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -0800521 }
522
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700523 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
524 CHECK(msg.Verify()) << ": Corrupted message from " << filename();
Austin Schuh05b70472020-01-01 17:11:17 -0800525
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700526 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -0700527
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700528 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -0800529
530 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700531 VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
532 return result;
533}
534
535std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
536 const MessageHeader &message) {
537 const size_t data_size = message.has_data() ? message.data()->size() : 0;
538
539 UnpackedMessageHeader *const unpacked_message =
540 reinterpret_cast<UnpackedMessageHeader *>(
541 malloc(sizeof(UnpackedMessageHeader) + data_size +
542 kChannelDataAlignment - 1));
543
544 CHECK(message.has_channel_index());
545 CHECK(message.has_monotonic_sent_time());
546
547 absl::Span<uint8_t> span;
548 if (data_size > 0) {
549 span =
550 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
551 &unpacked_message->actual_data[0], data_size)),
552 data_size);
553 }
554
Austin Schuh826e6ce2021-11-18 20:33:10 -0800555 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700556 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -0800557 monotonic_remote_time = aos::monotonic_clock::time_point(
558 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700559 }
560 std::optional<realtime_clock::time_point> realtime_remote_time;
561 if (message.has_realtime_remote_time()) {
562 realtime_remote_time = realtime_clock::time_point(
563 chrono::nanoseconds(message.realtime_remote_time()));
564 }
565
566 std::optional<uint32_t> remote_queue_index;
567 if (message.has_remote_queue_index()) {
568 remote_queue_index = message.remote_queue_index();
569 }
570
571 new (unpacked_message) UnpackedMessageHeader{
572 .channel_index = message.channel_index(),
573 .monotonic_sent_time = monotonic_clock::time_point(
574 chrono::nanoseconds(message.monotonic_sent_time())),
575 .realtime_sent_time = realtime_clock::time_point(
576 chrono::nanoseconds(message.realtime_sent_time())),
577 .queue_index = message.queue_index(),
578 .monotonic_remote_time = monotonic_remote_time,
579 .realtime_remote_time = realtime_remote_time,
580 .remote_queue_index = remote_queue_index,
581 .monotonic_timestamp_time = monotonic_clock::time_point(
582 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
583 .has_monotonic_timestamp_time = message.has_monotonic_timestamp_time(),
584 .span = span};
585
586 if (data_size > 0) {
587 memcpy(span.data(), message.data()->data(), data_size);
588 }
589
590 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
591 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -0800592}
593
Austin Schuhc41603c2020-10-11 16:17:37 -0700594PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -0700595 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
Brian Silvermanfee16972021-09-14 12:06:38 -0700596 if (parts_.parts.size() >= 2) {
597 next_message_reader_.emplace(parts_.parts[1]);
598 }
Austin Schuh48507722021-07-17 17:29:24 -0700599 ComputeBootCounts();
600}
601
602void PartsMessageReader::ComputeBootCounts() {
603 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
604 std::nullopt);
605
606 // We have 3 vintages of log files with different amounts of information.
607 if (log_file_header()->has_boot_uuids()) {
608 // The new hotness with the boots explicitly listed out. We can use the log
609 // file header to compute the boot count of all relevant nodes.
610 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
611 size_t node_index = 0;
612 for (const flatbuffers::String *boot_uuid :
613 *log_file_header()->boot_uuids()) {
614 CHECK(parts_.boots);
615 if (boot_uuid->size() != 0) {
616 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
617 if (it != parts_.boots->boot_count_map.end()) {
618 boot_counts_[node_index] = it->second;
619 }
620 } else if (parts().boots->boots[node_index].size() == 1u) {
621 boot_counts_[node_index] = 0;
622 }
623 ++node_index;
624 }
625 } else {
626 // Older multi-node logs which are guarenteed to have UUIDs logged, or
627 // single node log files with boot UUIDs in the header. We only know how to
628 // order certain boots in certain circumstances.
629 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
630 for (size_t node_index = 0; node_index < boot_counts_.size();
631 ++node_index) {
632 CHECK(parts_.boots);
633 if (parts().boots->boots[node_index].size() == 1u) {
634 boot_counts_[node_index] = 0;
635 }
636 }
637 } else {
638 // Really old single node logs without any UUIDs. They can't reboot.
639 CHECK_EQ(boot_counts_.size(), 1u);
640 boot_counts_[0] = 0u;
641 }
642 }
643}
Austin Schuhc41603c2020-10-11 16:17:37 -0700644
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700645std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -0700646 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700647 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -0700648 message_reader_.ReadMessage();
649 if (message) {
650 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700651 const monotonic_clock::time_point monotonic_sent_time =
652 message->monotonic_sent_time;
653
654 // TODO(austin): Does this work with startup? Might need to use the
655 // start time.
656 // TODO(austin): Does this work with startup when we don't know the
657 // remote start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -0800658 if (monotonic_sent_time >
659 parts_.monotonic_start_time + max_out_of_order_duration()) {
660 after_start_ = true;
661 }
662 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -0800663 CHECK_GE(monotonic_sent_time,
664 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -0800665 << ": Max out of order of " << max_out_of_order_duration().count()
666 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -0800667 << parts_.monotonic_start_time << " currently reading "
668 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -0800669 }
Austin Schuhc41603c2020-10-11 16:17:37 -0700670 return message;
671 }
672 NextLog();
673 }
Austin Schuh32f68492020-11-08 21:45:51 -0800674 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700675 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -0700676}
677
678void PartsMessageReader::NextLog() {
679 if (next_part_index_ == parts_.parts.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -0700680 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -0700681 done_ = true;
682 return;
683 }
Brian Silvermanfee16972021-09-14 12:06:38 -0700684 CHECK(next_message_reader_);
685 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -0700686 ComputeBootCounts();
Brian Silvermanfee16972021-09-14 12:06:38 -0700687 if (next_part_index_ + 1 < parts_.parts.size()) {
688 next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
689 } else {
690 next_message_reader_.reset();
691 }
Austin Schuhc41603c2020-10-11 16:17:37 -0700692 ++next_part_index_;
693}
694
Austin Schuh1be0ce42020-11-29 22:43:26 -0800695bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700696 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700697
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700698 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -0800699 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700700 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -0800701 return false;
702 }
703
704 if (this->channel_index < m2.channel_index) {
705 return true;
706 } else if (this->channel_index > m2.channel_index) {
707 return false;
708 }
709
710 return this->queue_index < m2.queue_index;
711}
712
713bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800714bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700715 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700716
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700717 return timestamp.time == m2.timestamp.time &&
718 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800719}
Austin Schuh1be0ce42020-11-29 22:43:26 -0800720
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700721std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
722 os << "{.channel_index=" << m.channel_index
723 << ", .monotonic_sent_time=" << m.monotonic_sent_time
724 << ", .realtime_sent_time=" << m.realtime_sent_time
725 << ", .queue_index=" << m.queue_index;
726 if (m.monotonic_remote_time) {
Austin Schuh826e6ce2021-11-18 20:33:10 -0800727 os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700728 }
729 os << ", .realtime_remote_time=";
730 PrintOptionalOrNull(&os, m.realtime_remote_time);
731 os << ", .remote_queue_index=";
732 PrintOptionalOrNull(&os, m.remote_queue_index);
733 if (m.has_monotonic_timestamp_time) {
734 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
735 }
736 return os;
737}
738
Austin Schuh1be0ce42020-11-29 22:43:26 -0800739std::ostream &operator<<(std::ostream &os, const Message &m) {
740 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700741 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700742 if (m.data != nullptr) {
Austin Schuh826e6ce2021-11-18 20:33:10 -0800743 if (m.data->remote_queue_index.has_value()) {
744 os << ", .remote_queue_index=" << *m.data->remote_queue_index;
745 }
746 if (m.data->monotonic_remote_time.has_value()) {
747 os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
748 }
Austin Schuhfb1b3292021-11-16 21:20:15 -0800749 os << ", .data=" << m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -0800750 }
751 os << "}";
752 return os;
753}
754
755std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
756 os << "{.channel_index=" << m.channel_index
757 << ", .queue_index=" << m.queue_index
758 << ", .monotonic_event_time=" << m.monotonic_event_time
759 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -0700760 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800761 os << ", .remote_queue_index=" << m.remote_queue_index;
762 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700763 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800764 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
765 }
766 if (m.realtime_remote_time != realtime_clock::min_time) {
767 os << ", .realtime_remote_time=" << m.realtime_remote_time;
768 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700769 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800770 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
771 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700772 if (m.data != nullptr) {
773 os << ", .data=" << *m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -0800774 }
775 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -0800776 return os;
777}
778
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800779LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -0700780 : parts_message_reader_(log_parts),
781 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
782}
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800783
784Message *LogPartsSorter::Front() {
785 // Queue up data until enough data has been queued that the front message is
786 // sorted enough to be safe to pop. This may do nothing, so we should make
787 // sure the nothing path is checked quickly.
788 if (sorted_until() != monotonic_clock::max_time) {
789 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -0700790 if (!messages_.empty() &&
791 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -0800792 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800793 break;
794 }
795
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700796 std::shared_ptr<UnpackedMessageHeader> m =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800797 parts_message_reader_.ReadMessage();
798 // No data left, sorted forever, work through what is left.
799 if (!m) {
800 sorted_until_ = monotonic_clock::max_time;
801 break;
802 }
803
Austin Schuh48507722021-07-17 17:29:24 -0700804 size_t monotonic_timestamp_boot = 0;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700805 if (m->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -0700806 monotonic_timestamp_boot = parts().logger_boot_count;
807 }
808 size_t monotonic_remote_boot = 0xffffff;
809
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700810 if (m->monotonic_remote_time.has_value()) {
milind-ua50344f2021-08-25 18:22:20 -0700811 const Node *node = parts().config->nodes()->Get(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700812 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -0700813
Austin Schuh48507722021-07-17 17:29:24 -0700814 std::optional<size_t> boot = parts_message_reader_.boot_count(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700815 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -0700816 CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
817 << ", with index "
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700818 << source_node_index_[m->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -0700819 monotonic_remote_boot = *boot;
820 }
821
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700822 messages_.insert(
823 Message{.channel_index = m->channel_index,
824 .queue_index = BootQueueIndex{.boot = parts().boot_count,
825 .index = m->queue_index},
826 .timestamp = BootTimestamp{.boot = parts().boot_count,
827 .time = m->monotonic_sent_time},
828 .monotonic_remote_boot = monotonic_remote_boot,
829 .monotonic_timestamp_boot = monotonic_timestamp_boot,
830 .data = std::move(m)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800831
832 // Now, update sorted_until_ to match the new message.
833 if (parts_message_reader_.newest_timestamp() >
834 monotonic_clock::min_time +
835 parts_message_reader_.max_out_of_order_duration()) {
836 sorted_until_ = parts_message_reader_.newest_timestamp() -
837 parts_message_reader_.max_out_of_order_duration();
838 } else {
839 sorted_until_ = monotonic_clock::min_time;
840 }
841 }
842 }
843
844 // Now that we have enough data queued, return a pointer to the oldest piece
845 // of data if it exists.
846 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -0800847 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800848 return nullptr;
849 }
850
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700851 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -0800852 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700853 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800854 return &(*messages_.begin());
855}
856
857void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
858
859std::string LogPartsSorter::DebugString() const {
860 std::stringstream ss;
861 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -0800862 int count = 0;
863 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800864 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -0800865 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
866 ss << m << "\n";
867 } else if (no_dots) {
868 ss << "...\n";
869 no_dots = false;
870 }
871 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800872 }
873 ss << "] <- " << parts_message_reader_.filename();
874 return ss.str();
875}
876
Austin Schuhd2f96102020-12-01 20:27:29 -0800877NodeMerger::NodeMerger(std::vector<LogParts> parts) {
878 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -0700879 // Enforce that we are sorting things only from a single node from a single
880 // boot.
881 const std::string_view part0_node = parts[0].node;
882 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -0800883 for (size_t i = 1; i < parts.size(); ++i) {
884 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -0700885 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
886 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -0800887 }
Austin Schuh715adc12021-06-29 22:07:39 -0700888
889 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
890
Austin Schuhd2f96102020-12-01 20:27:29 -0800891 for (LogParts &part : parts) {
892 parts_sorters_.emplace_back(std::move(part));
893 }
894
Austin Schuhd2f96102020-12-01 20:27:29 -0800895 monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh9dc42612021-09-20 20:41:29 -0700896 realtime_start_time_ = realtime_clock::min_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800897 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -0700898 // We want to capture the earliest meaningful start time here. The start
899 // time defaults to min_time when there's no meaningful value to report, so
900 // let's ignore those.
Austin Schuh9dc42612021-09-20 20:41:29 -0700901 if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time) {
902 bool accept = false;
903 // We want to prioritize start times from the logger node. Really, we
904 // want to prioritize start times with a valid realtime_clock time. So,
905 // if we have a start time without a RT clock, prefer a start time with a
906 // RT clock, even it if is later.
907 if (parts_sorter.realtime_start_time() != realtime_clock::min_time) {
908 // We've got a good one. See if the current start time has a good RT
909 // clock, or if we should use this one instead.
910 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
911 accept = true;
912 } else if (realtime_start_time_ == realtime_clock::min_time) {
913 // The previous start time doesn't have a good RT time, so it is very
914 // likely the start time from a remote part file. We just found a
915 // better start time with a real RT time, so switch to that instead.
916 accept = true;
917 }
918 } else if (realtime_start_time_ == realtime_clock::min_time) {
919 // We don't have a RT time, so take the oldest.
920 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
921 accept = true;
922 }
923 }
924
925 if (accept) {
926 monotonic_start_time_ = parts_sorter.monotonic_start_time();
927 realtime_start_time_ = parts_sorter.realtime_start_time();
928 }
Austin Schuhd2f96102020-12-01 20:27:29 -0800929 }
930 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -0700931
932 // If there was no meaningful start time reported, just use min_time.
933 if (monotonic_start_time_ == monotonic_clock::max_time) {
934 monotonic_start_time_ = monotonic_clock::min_time;
935 realtime_start_time_ = realtime_clock::min_time;
936 }
Austin Schuhd2f96102020-12-01 20:27:29 -0800937}
Austin Schuh8f52ed52020-11-30 23:12:39 -0800938
Austin Schuh0ca51f32020-12-25 21:51:45 -0800939std::vector<const LogParts *> NodeMerger::Parts() const {
940 std::vector<const LogParts *> p;
941 p.reserve(parts_sorters_.size());
942 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
943 p.emplace_back(&parts_sorter.parts());
944 }
945 return p;
946}
947
Austin Schuh8f52ed52020-11-30 23:12:39 -0800948Message *NodeMerger::Front() {
949 // Return the current Front if we have one, otherwise go compute one.
950 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -0800951 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700952 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -0800953 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800954 }
955
956 // Otherwise, do a simple search for the oldest message, deduplicating any
957 // duplicates.
958 Message *oldest = nullptr;
959 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800960 for (LogPartsSorter &parts_sorter : parts_sorters_) {
961 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -0800962 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800963 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -0800964 continue;
965 }
966 if (oldest == nullptr || *m < *oldest) {
967 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -0800968 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800969 } else if (*m == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700970 // Found a duplicate. If there is a choice, we want the one which has
971 // the timestamp time.
972 if (!m->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800973 parts_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700974 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800975 current_->PopFront();
976 current_ = &parts_sorter;
977 oldest = m;
978 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700979 CHECK_EQ(m->data->monotonic_timestamp_time,
980 oldest->data->monotonic_timestamp_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800981 parts_sorter.PopFront();
982 }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800983 }
984
985 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -0800986 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -0800987 }
988
Austin Schuhb000de62020-12-03 22:00:40 -0800989 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700990 CHECK_GE(oldest->timestamp.time, last_message_time_);
991 last_message_time_ = oldest->timestamp.time;
Austin Schuhb000de62020-12-03 22:00:40 -0800992 } else {
993 last_message_time_ = monotonic_clock::max_time;
994 }
995
Austin Schuh8f52ed52020-11-30 23:12:39 -0800996 // Return the oldest message found. This will be nullptr if nothing was
997 // found, indicating there is nothing left.
998 return oldest;
999}
1000
1001void NodeMerger::PopFront() {
1002 CHECK(current_ != nullptr) << "Popping before calling Front()";
1003 current_->PopFront();
1004 current_ = nullptr;
1005}
1006
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001007BootMerger::BootMerger(std::vector<LogParts> files) {
1008 std::vector<std::vector<LogParts>> boots;
1009
1010 // Now, we need to split things out by boot.
1011 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001012 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001013 if (boot_count + 1 > boots.size()) {
1014 boots.resize(boot_count + 1);
1015 }
1016 boots[boot_count].emplace_back(std::move(files[i]));
1017 }
1018
1019 node_mergers_.reserve(boots.size());
1020 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07001021 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001022 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -07001023 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001024 }
1025 node_mergers_.emplace_back(
1026 std::make_unique<NodeMerger>(std::move(boots[i])));
1027 }
1028}
1029
1030Message *BootMerger::Front() {
1031 Message *result = node_mergers_[index_]->Front();
1032
1033 if (result != nullptr) {
1034 return result;
1035 }
1036
1037 if (index_ + 1u == node_mergers_.size()) {
1038 // At the end of the last node merger, just return.
1039 return nullptr;
1040 } else {
1041 ++index_;
1042 return Front();
1043 }
1044}
1045
1046void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
1047
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001048std::vector<const LogParts *> BootMerger::Parts() const {
1049 std::vector<const LogParts *> results;
1050 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
1051 std::vector<const LogParts *> node_parts = node_merger->Parts();
1052
1053 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
1054 std::make_move_iterator(node_parts.end()));
1055 }
1056
1057 return results;
1058}
1059
Austin Schuhd2f96102020-12-01 20:27:29 -08001060TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001061 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -08001062 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001063 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001064 if (!configuration_) {
1065 configuration_ = part->config;
1066 } else {
1067 CHECK_EQ(configuration_.get(), part->config.get());
1068 }
1069 }
1070 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -08001071 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
1072 // pretty simple.
1073 if (configuration::MultiNode(config)) {
1074 nodes_data_.resize(config->nodes()->size());
1075 const Node *my_node = config->nodes()->Get(node());
1076 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
1077 const Node *node = config->nodes()->Get(node_index);
1078 NodeData *node_data = &nodes_data_[node_index];
1079 node_data->channels.resize(config->channels()->size());
1080 // We should save the channel if it is delivered to the node represented
1081 // by the NodeData, but not sent by that node. That combo means it is
1082 // forwarded.
1083 size_t channel_index = 0;
1084 node_data->any_delivered = false;
1085 for (const Channel *channel : *config->channels()) {
1086 node_data->channels[channel_index].delivered =
1087 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08001088 configuration::ChannelIsSendableOnNode(channel, my_node) &&
1089 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08001090 node_data->any_delivered = node_data->any_delivered ||
1091 node_data->channels[channel_index].delivered;
1092 ++channel_index;
1093 }
1094 }
1095
1096 for (const Channel *channel : *config->channels()) {
1097 source_node_.emplace_back(configuration::GetNodeIndex(
1098 config, channel->source_node()->string_view()));
1099 }
1100 }
1101}
1102
1103void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001104 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08001105 CHECK_NE(timestamp_mapper->node(), node());
1106 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
1107
1108 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001109 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08001110 // we could needlessly save data.
1111 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08001112 VLOG(1) << "Registering on node " << node() << " for peer node "
1113 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08001114 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
1115
1116 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07001117
1118 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001119 }
1120}
1121
Austin Schuh79b30942021-01-24 22:32:21 -08001122void TimestampMapper::QueueMessage(Message *m) {
1123 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08001124 .channel_index = m->channel_index,
1125 .queue_index = m->queue_index,
1126 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001127 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07001128 .remote_queue_index = BootQueueIndex::Invalid(),
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001129 .monotonic_remote_time = BootTimestamp::min_time(),
Austin Schuhd2f96102020-12-01 20:27:29 -08001130 .realtime_remote_time = realtime_clock::min_time,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001131 .monotonic_timestamp_time = BootTimestamp::min_time(),
Austin Schuh79b30942021-01-24 22:32:21 -08001132 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08001133}
1134
1135TimestampedMessage *TimestampMapper::Front() {
1136 // No need to fetch anything new. A previous message still exists.
1137 switch (first_message_) {
1138 case FirstMessage::kNeedsUpdate:
1139 break;
1140 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08001141 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001142 case FirstMessage::kNullptr:
1143 return nullptr;
1144 }
1145
Austin Schuh79b30942021-01-24 22:32:21 -08001146 if (matched_messages_.empty()) {
1147 if (!QueueMatched()) {
1148 first_message_ = FirstMessage::kNullptr;
1149 return nullptr;
1150 }
1151 }
1152 first_message_ = FirstMessage::kInMessage;
1153 return &matched_messages_.front();
1154}
1155
1156bool TimestampMapper::QueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08001157 if (nodes_data_.empty()) {
1158 // Simple path. We are single node, so there are no timestamps to match!
1159 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001160 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001161 if (!m) {
Austin Schuh79b30942021-01-24 22:32:21 -08001162 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001163 }
Austin Schuh79b30942021-01-24 22:32:21 -08001164 // Enqueue this message into matched_messages_ so we have a place to
1165 // associate remote timestamps, and return it.
1166 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08001167
Austin Schuh79b30942021-01-24 22:32:21 -08001168 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1169 last_message_time_ = matched_messages_.back().monotonic_event_time;
1170
1171 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001172 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08001173 timestamp_callback_(&matched_messages_.back());
1174 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001175 }
1176
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001177 // We need to only add messages to the list so they get processed for
1178 // messages which are delivered. Reuse the flow below which uses messages_
1179 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08001180 if (messages_.empty()) {
1181 if (!Queue()) {
1182 // Found nothing to add, we are out of data!
Austin Schuh79b30942021-01-24 22:32:21 -08001183 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001184 }
1185
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001186 // Now that it has been added (and cannibalized), forget about it
1187 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001188 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001189 }
1190
1191 Message *m = &(messages_.front());
1192
1193 if (source_node_[m->channel_index] == node()) {
1194 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08001195 QueueMessage(m);
1196 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1197 last_message_time_ = matched_messages_.back().monotonic_event_time;
1198 messages_.pop_front();
1199 timestamp_callback_(&matched_messages_.back());
1200 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001201 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001202 // Got a timestamp, find the matching remote data, match it, and return
1203 // it.
Austin Schuhd2f96102020-12-01 20:27:29 -08001204 Message data = MatchingMessageFor(*m);
1205
1206 // Return the data from the remote. The local message only has timestamp
1207 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08001208 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08001209 .channel_index = m->channel_index,
1210 .queue_index = m->queue_index,
1211 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001212 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07001213 .remote_queue_index =
1214 BootQueueIndex{.boot = m->monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001215 .index = m->data->remote_queue_index.value()},
1216 .monotonic_remote_time = {m->monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08001217 m->data->monotonic_remote_time.value()},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001218 .realtime_remote_time = m->data->realtime_remote_time.value(),
1219 .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
1220 m->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08001221 .data = std::move(data.data)});
1222 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1223 last_message_time_ = matched_messages_.back().monotonic_event_time;
1224 // Since messages_ holds the data, drop it.
1225 messages_.pop_front();
1226 timestamp_callback_(&matched_messages_.back());
1227 return true;
1228 }
1229}
1230
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001231void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08001232 while (last_message_time_ <= queue_time) {
1233 if (!QueueMatched()) {
1234 return;
1235 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001236 }
1237}
1238
Austin Schuhe639ea12021-01-25 13:00:22 -08001239void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001240 // Note: queueing for time doesn't really work well across boots. So we
1241 // just assume that if you are using this, you only care about the current
1242 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001243 //
1244 // TODO(austin): Is that the right concept?
1245 //
Austin Schuhe639ea12021-01-25 13:00:22 -08001246 // Make sure we have something queued first. This makes the end time
1247 // calculation simpler, and is typically what folks want regardless.
1248 if (matched_messages_.empty()) {
1249 if (!QueueMatched()) {
1250 return;
1251 }
1252 }
1253
1254 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001255 std::max(monotonic_start_time(
1256 matched_messages_.front().monotonic_event_time.boot),
1257 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08001258 time_estimation_buffer;
1259
1260 // Place sorted messages on the list until we have
1261 // --time_estimation_buffer_seconds seconds queued up (but queue at least
1262 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001263 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08001264 if (!QueueMatched()) {
1265 return;
1266 }
1267 }
1268}
1269
Austin Schuhd2f96102020-12-01 20:27:29 -08001270void TimestampMapper::PopFront() {
1271 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
1272 first_message_ = FirstMessage::kNeedsUpdate;
1273
Austin Schuh79b30942021-01-24 22:32:21 -08001274 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001275}
1276
1277Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001278 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001279 CHECK_NOTNULL(message.data);
1280 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07001281 const BootQueueIndex remote_queue_index =
1282 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001283 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08001284
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001285 CHECK(message.data->monotonic_remote_time.has_value());
1286 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08001287
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001288 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07001289 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08001290 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001291 const realtime_clock::time_point realtime_remote_time =
1292 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001293
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001294 TimestampMapper *peer =
1295 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001296
1297 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001298 // asked to pull a timestamp from a peer which doesn't exist, return an
1299 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08001300 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001301 // TODO(austin): Make sure the tests hit all these paths with a boot count
1302 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001303 return Message{.channel_index = message.channel_index,
1304 .queue_index = remote_queue_index,
1305 .timestamp = monotonic_remote_time,
1306 .monotonic_remote_boot = 0xffffff,
1307 .monotonic_timestamp_boot = 0xffffff,
1308 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08001309 }
1310
1311 // The queue which will have the matching data, if available.
1312 std::deque<Message> *data_queue =
1313 &peer->nodes_data_[node()].channels[message.channel_index].messages;
1314
Austin Schuh79b30942021-01-24 22:32:21 -08001315 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08001316
1317 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001318 return Message{.channel_index = message.channel_index,
1319 .queue_index = remote_queue_index,
1320 .timestamp = monotonic_remote_time,
1321 .monotonic_remote_boot = 0xffffff,
1322 .monotonic_timestamp_boot = 0xffffff,
1323 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08001324 }
1325
Austin Schuhd2f96102020-12-01 20:27:29 -08001326 if (remote_queue_index < data_queue->front().queue_index ||
1327 remote_queue_index > data_queue->back().queue_index) {
1328 return Message{
1329 .channel_index = message.channel_index,
1330 .queue_index = remote_queue_index,
1331 .timestamp = monotonic_remote_time,
Austin Schuh48507722021-07-17 17:29:24 -07001332 .monotonic_remote_boot = 0xffffff,
1333 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001334 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08001335 }
1336
Austin Schuh993ccb52020-12-12 15:59:32 -08001337 // The algorithm below is constant time with some assumptions. We need there
1338 // to be no missing messages in the data stream. This also assumes a queue
1339 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07001340 if (data_queue->back().queue_index.boot ==
1341 data_queue->front().queue_index.boot &&
1342 (data_queue->back().queue_index.index -
1343 data_queue->front().queue_index.index + 1u ==
1344 data_queue->size())) {
1345 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08001346 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07001347 //
1348 // TODO(austin): Move if not reliable.
1349 Message result = (*data_queue)[remote_queue_index.index -
1350 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08001351
1352 CHECK_EQ(result.timestamp, monotonic_remote_time)
1353 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001354 CHECK_EQ(result.data->realtime_sent_time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001355 realtime_remote_time)
1356 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1357 // Now drop the data off the front. We have deduplicated timestamps, so we
1358 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07001359 data_queue->erase(
1360 data_queue->begin(),
1361 data_queue->begin() +
1362 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08001363 return result;
1364 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07001365 // TODO(austin): Binary search.
1366 auto it = std::find_if(
1367 data_queue->begin(), data_queue->end(),
1368 [remote_queue_index,
1369 remote_boot = monotonic_remote_time.boot](const Message &m) {
1370 return m.queue_index == remote_queue_index &&
1371 m.timestamp.boot == remote_boot;
1372 });
Austin Schuh993ccb52020-12-12 15:59:32 -08001373 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001374 return Message{.channel_index = message.channel_index,
1375 .queue_index = remote_queue_index,
1376 .timestamp = monotonic_remote_time,
1377 .monotonic_remote_boot = 0xffffff,
1378 .monotonic_timestamp_boot = 0xffffff,
1379 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08001380 }
1381
1382 Message result = std::move(*it);
1383
1384 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001385 << ": Queue index matches, but timestamp doesn't. Please "
1386 "investigate!";
1387 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
1388 << ": Queue index matches, but timestamp doesn't. Please "
1389 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08001390
Austin Schuh58646e22021-08-23 23:51:46 -07001391 // TODO(austin): We still go in order, so we can erase from the beginning to
1392 // our iterator minus 1. That'll keep 1 in the queue.
Austin Schuh993ccb52020-12-12 15:59:32 -08001393 data_queue->erase(it);
1394
1395 return result;
1396 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001397}
1398
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001399void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001400 if (queued_until_ > t) {
1401 return;
1402 }
1403 while (true) {
1404 if (!messages_.empty() && messages_.back().timestamp > t) {
1405 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
1406 return;
1407 }
1408
1409 if (!Queue()) {
1410 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001411 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08001412 return;
1413 }
1414
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001415 // Now that it has been added (and cannibalized), forget about it
1416 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001417 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001418 }
1419}
1420
1421bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001422 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001423 if (m == nullptr) {
1424 return false;
1425 }
1426 for (NodeData &node_data : nodes_data_) {
1427 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07001428 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08001429 if (node_data.channels[m->channel_index].delivered) {
1430 // TODO(austin): This copies the data... Probably not worth stressing
1431 // about yet.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001432 // TODO(austin): Bound how big this can get. We tend not to send
1433 // massive data, so we can probably ignore this for a bit.
Austin Schuhd2f96102020-12-01 20:27:29 -08001434 node_data.channels[m->channel_index].messages.emplace_back(*m);
1435 }
1436 }
1437
1438 messages_.emplace_back(std::move(*m));
1439 return true;
1440}
1441
1442std::string TimestampMapper::DebugString() const {
1443 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07001444 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08001445 for (const Message &message : messages_) {
1446 ss << " " << message << "\n";
1447 }
1448 ss << "] queued_until " << queued_until_;
1449 for (const NodeData &ns : nodes_data_) {
1450 if (ns.peer == nullptr) continue;
1451 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
1452 size_t channel_index = 0;
1453 for (const NodeData::ChannelData &channel_data :
1454 ns.peer->nodes_data_[node()].channels) {
1455 if (channel_data.messages.empty()) {
1456 continue;
1457 }
Austin Schuhb000de62020-12-03 22:00:40 -08001458
Austin Schuhd2f96102020-12-01 20:27:29 -08001459 ss << " channel " << channel_index << " [\n";
1460 for (const Message &m : channel_data.messages) {
1461 ss << " " << m << "\n";
1462 }
1463 ss << " ]\n";
1464 ++channel_index;
1465 }
1466 ss << "] queued_until " << ns.peer->queued_until_;
1467 }
1468 return ss.str();
1469}
1470
Austin Schuhee711052020-08-24 16:06:09 -07001471std::string MaybeNodeName(const Node *node) {
1472 if (node != nullptr) {
1473 return node->name()->str() + " ";
1474 }
1475 return "";
1476}
1477
Brian Silvermanf51499a2020-09-21 12:49:08 -07001478} // namespace aos::logger