blob: dd824188bc350666e456b2f74962392cb5ecec44 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Austin Schuha36c8902019-12-30 18:07:15 -080010
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
Austin Schuhfa895892020-01-07 20:07:41 -080013#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080014#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080015#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080016#include "gflags/gflags.h"
17#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080018
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070019#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070020#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070021#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070022#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070023#else
24#define ENABLE_LZMA 0
25#endif
26
27#if ENABLE_LZMA
28#include "aos/events/logging/lzma_encoder.h"
29#endif
30
Austin Schuh7fbf5a72020-09-21 16:28:13 -070031DEFINE_int32(flush_size, 128000,
Austin Schuha36c8902019-12-30 18:07:15 -080032 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070033DEFINE_double(
34 flush_period, 5.0,
35 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080036
Austin Schuha040c3f2021-02-13 16:09:07 -080037DEFINE_double(
38 max_out_of_order, -1,
39 "If set, this overrides the max out of order duration for a log file.");
40
Austin Schuh0e8db662021-07-06 10:43:47 -070041DEFINE_bool(workaround_double_headers, true,
42 "Some old log files have two headers at the beginning. Use the "
43 "last header as the actual header.");
44
Brian Silvermanf51499a2020-09-21 12:49:08 -070045namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070046namespace {
Austin Schuha36c8902019-12-30 18:07:15 -080047
Austin Schuh05b70472020-01-01 17:11:17 -080048namespace chrono = std::chrono;
49
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070050template <typename T>
51void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
52 if (t.has_value()) {
53 *os << *t;
54 } else {
55 *os << "null";
56 }
57}
58} // namespace
59
Brian Silvermanf51499a2020-09-21 12:49:08 -070060DetachedBufferWriter::DetachedBufferWriter(
61 std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
62 : filename_(filename), encoder_(std::move(encoder)) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -070063 if (!util::MkdirPIfSpace(filename, 0777)) {
64 ran_out_of_space_ = true;
65 } else {
66 fd_ = open(std::string(filename).c_str(),
67 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
68 if (fd_ == -1 && errno == ENOSPC) {
69 ran_out_of_space_ = true;
70 } else {
Austin Schuh58646e22021-08-23 23:51:46 -070071 PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
72 << " for writing";
73 VLOG(1) << "Opened " << this->filename() << " for writing";
Brian Silvermana9f2ec92020-10-06 18:00:53 -070074 }
75 }
Austin Schuha36c8902019-12-30 18:07:15 -080076}
77
78DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -070079 Close();
80 if (ran_out_of_space_) {
81 CHECK(acknowledge_ran_out_of_space_)
82 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -070083 }
Austin Schuh2f8fd752020-09-01 22:38:28 -070084}
85
Brian Silvermand90905f2020-09-23 14:42:56 -070086DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070087 *this = std::move(other);
88}
89
Brian Silverman87ac0402020-09-17 14:47:01 -070090// When other is destroyed "soon" (which it should be because we're getting an
91// rvalue reference to it), it will flush etc all the data we have queued up
92// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -070093DetachedBufferWriter &DetachedBufferWriter::operator=(
94 DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070095 std::swap(filename_, other.filename_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070096 std::swap(encoder_, other.encoder_);
Austin Schuh2f8fd752020-09-01 22:38:28 -070097 std::swap(fd_, other.fd_);
Brian Silverman0465fcf2020-09-24 00:29:18 -070098 std::swap(ran_out_of_space_, other.ran_out_of_space_);
99 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700100 std::swap(iovec_, other.iovec_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700101 std::swap(max_write_time_, other.max_write_time_);
102 std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
103 std::swap(max_write_time_messages_, other.max_write_time_messages_);
104 std::swap(total_write_time_, other.total_write_time_);
105 std::swap(total_write_count_, other.total_write_count_);
106 std::swap(total_write_messages_, other.total_write_messages_);
107 std::swap(total_write_bytes_, other.total_write_bytes_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700108 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800109}
110
Brian Silvermanf51499a2020-09-21 12:49:08 -0700111void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700112 if (ran_out_of_space_) {
113 // We don't want any later data to be written after space becomes
114 // available, so refuse to write anything more once we've dropped data
115 // because we ran out of space.
116 VLOG(1) << "Ignoring span: " << span.size();
117 return;
118 }
119
Austin Schuhbd06ae42021-03-31 22:48:21 -0700120 aos::monotonic_clock::time_point now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700121 if (encoder_->may_bypass() && span.size() > 4096u) {
122 // Over this threshold, we'll assume it's cheaper to add an extra
123 // syscall to write the data immediately instead of copying it to
124 // enqueue.
Austin Schuha36c8902019-12-30 18:07:15 -0800125
Brian Silvermanf51499a2020-09-21 12:49:08 -0700126 // First, flush everything.
127 while (encoder_->queue_size() > 0u) {
128 Flush();
129 }
Austin Schuhde031b72020-01-10 19:34:41 -0800130
Brian Silvermanf51499a2020-09-21 12:49:08 -0700131 // Then, write it directly.
132 const auto start = aos::monotonic_clock::now();
133 const ssize_t written = write(fd_, span.data(), span.size());
134 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700135 HandleWriteReturn(written, span.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700136 UpdateStatsForWrite(end - start, written, 1);
Austin Schuhbd06ae42021-03-31 22:48:21 -0700137 now = end;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700138 } else {
139 encoder_->Encode(CopySpanAsDetachedBuffer(span));
Austin Schuhbd06ae42021-03-31 22:48:21 -0700140 now = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800141 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700142
Austin Schuhbd06ae42021-03-31 22:48:21 -0700143 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800144}
145
Brian Silverman0465fcf2020-09-24 00:29:18 -0700146void DetachedBufferWriter::Close() {
147 if (fd_ == -1) {
148 return;
149 }
150 encoder_->Finish();
151 while (encoder_->queue_size() > 0) {
152 Flush();
153 }
154 if (close(fd_) == -1) {
155 if (errno == ENOSPC) {
156 ran_out_of_space_ = true;
157 } else {
158 PLOG(ERROR) << "Closing log file failed";
159 }
160 }
161 fd_ = -1;
Austin Schuh58646e22021-08-23 23:51:46 -0700162 VLOG(1) << "Closed " << filename();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700163}
164
Austin Schuha36c8902019-12-30 18:07:15 -0800165void DetachedBufferWriter::Flush() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700166 if (ran_out_of_space_) {
167 // We don't want any later data to be written after space becomes available,
168 // so refuse to write anything more once we've dropped data because we ran
169 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700170 if (encoder_) {
171 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
172 encoder_->Clear(encoder_->queue().size());
173 } else {
174 VLOG(1) << "No queue to ignore";
175 }
176 return;
177 }
178
179 const auto queue = encoder_->queue();
180 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700181 return;
182 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700183
Austin Schuha36c8902019-12-30 18:07:15 -0800184 iovec_.clear();
Brian Silvermanf51499a2020-09-21 12:49:08 -0700185 const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
186 iovec_.resize(iovec_size);
Austin Schuha36c8902019-12-30 18:07:15 -0800187 size_t counted_size = 0;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700188 for (size_t i = 0; i < iovec_size; ++i) {
189 iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
190 iovec_[i].iov_len = queue[i].size();
191 counted_size += iovec_[i].iov_len;
Austin Schuha36c8902019-12-30 18:07:15 -0800192 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700193
194 const auto start = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800195 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700196 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700197 HandleWriteReturn(written, counted_size);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700198
199 encoder_->Clear(iovec_size);
200
201 UpdateStatsForWrite(end - start, written, iovec_size);
202}
203
Brian Silverman0465fcf2020-09-24 00:29:18 -0700204void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
205 size_t write_size) {
206 if (write_return == -1 && errno == ENOSPC) {
207 ran_out_of_space_ = true;
208 return;
209 }
210 PCHECK(write_return >= 0) << ": write failed";
211 if (write_return < static_cast<ssize_t>(write_size)) {
212 // Sometimes this happens instead of ENOSPC. On a real filesystem, this
213 // never seems to happen in any other case. If we ever want to log to a
214 // socket, this will happen more often. However, until we get there, we'll
215 // just assume it means we ran out of space.
216 ran_out_of_space_ = true;
217 return;
218 }
219}
220
Brian Silvermanf51499a2020-09-21 12:49:08 -0700221void DetachedBufferWriter::UpdateStatsForWrite(
222 aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
223 if (duration > max_write_time_) {
224 max_write_time_ = duration;
225 max_write_time_bytes_ = written;
226 max_write_time_messages_ = iovec_size;
227 }
228 total_write_time_ += duration;
229 ++total_write_count_;
230 total_write_messages_ += iovec_size;
231 total_write_bytes_ += written;
232}
233
Austin Schuhbd06ae42021-03-31 22:48:21 -0700234void DetachedBufferWriter::FlushAtThreshold(
235 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700236 if (ran_out_of_space_) {
237 // We don't want any later data to be written after space becomes available,
238 // so refuse to write anything more once we've dropped data because we ran
239 // out of space.
240 if (encoder_) {
241 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
242 encoder_->Clear(encoder_->queue().size());
243 } else {
244 VLOG(1) << "No queue to ignore";
245 }
246 return;
247 }
248
Austin Schuhbd06ae42021-03-31 22:48:21 -0700249 // We don't want to flush the first time through. Otherwise we will flush as
250 // the log file header might be compressing, defeating any parallelism and
251 // queueing there.
252 if (last_flush_time_ == aos::monotonic_clock::min_time) {
253 last_flush_time_ = now;
254 }
255
Brian Silvermanf51499a2020-09-21 12:49:08 -0700256 // Flush if we are at the max number of iovs per writev, because there's no
257 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700258 // data queued up or if it has been long enough.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700259 while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700260 encoder_->queue_size() >= IOV_MAX ||
261 now > last_flush_time_ +
262 chrono::duration_cast<chrono::nanoseconds>(
263 chrono::duration<double>(FLAGS_flush_period))) {
264 last_flush_time_ = now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700265 Flush();
266 }
Austin Schuha36c8902019-12-30 18:07:15 -0800267}
268
269flatbuffers::Offset<MessageHeader> PackMessage(
270 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
271 int channel_index, LogType log_type) {
272 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
273
274 switch (log_type) {
275 case LogType::kLogMessage:
276 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800277 case LogType::kLogRemoteMessage:
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700278 data_offset = fbb->CreateVector(
279 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800280 break;
281
282 case LogType::kLogDeliveryTimeOnly:
283 break;
284 }
285
286 MessageHeader::Builder message_header_builder(*fbb);
287 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800288
289 switch (log_type) {
290 case LogType::kLogRemoteMessage:
291 message_header_builder.add_queue_index(context.remote_queue_index);
292 message_header_builder.add_monotonic_sent_time(
293 context.monotonic_remote_time.time_since_epoch().count());
294 message_header_builder.add_realtime_sent_time(
295 context.realtime_remote_time.time_since_epoch().count());
296 break;
297
298 case LogType::kLogMessage:
299 case LogType::kLogMessageAndDeliveryTime:
300 case LogType::kLogDeliveryTimeOnly:
301 message_header_builder.add_queue_index(context.queue_index);
302 message_header_builder.add_monotonic_sent_time(
303 context.monotonic_event_time.time_since_epoch().count());
304 message_header_builder.add_realtime_sent_time(
305 context.realtime_event_time.time_since_epoch().count());
306 break;
307 }
Austin Schuha36c8902019-12-30 18:07:15 -0800308
309 switch (log_type) {
310 case LogType::kLogMessage:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800311 case LogType::kLogRemoteMessage:
Austin Schuha36c8902019-12-30 18:07:15 -0800312 message_header_builder.add_data(data_offset);
313 break;
314
315 case LogType::kLogMessageAndDeliveryTime:
316 message_header_builder.add_data(data_offset);
317 [[fallthrough]];
318
319 case LogType::kLogDeliveryTimeOnly:
320 message_header_builder.add_monotonic_remote_time(
321 context.monotonic_remote_time.time_since_epoch().count());
322 message_header_builder.add_realtime_remote_time(
323 context.realtime_remote_time.time_since_epoch().count());
324 message_header_builder.add_remote_queue_index(context.remote_queue_index);
325 break;
326 }
327
328 return message_header_builder.Finish();
329}
330
Brian Silvermanf51499a2020-09-21 12:49:08 -0700331SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
Tyler Chatow2015bc62021-08-04 21:15:09 -0700332 decoder_ = std::make_unique<DummyDecoder>(filename);
333
334 static constexpr std::string_view kXz = ".xz";
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700335 if (filename.substr(filename.size() - kXz.size()) == kXz) {
336#if ENABLE_LZMA
Tyler Chatow2015bc62021-08-04 21:15:09 -0700337 decoder_ = std::make_unique<ThreadedLzmaDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700338#else
339 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
340#endif
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700341 }
Austin Schuh05b70472020-01-01 17:11:17 -0800342}
343
Austin Schuhcf5f6442021-07-06 10:43:28 -0700344absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800345 // Make sure we have enough for the size.
346 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
347 if (!ReadBlock()) {
348 return absl::Span<const uint8_t>();
349 }
350 }
351
352 // Now make sure we have enough for the message.
353 const size_t data_size =
354 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
355 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -0800356 if (data_size == sizeof(flatbuffers::uoffset_t)) {
357 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
358 LOG(ERROR) << " Rest of log file is "
359 << absl::BytesToHexString(std::string_view(
360 reinterpret_cast<const char *>(data_.data() +
361 consumed_data_),
362 data_.size() - consumed_data_));
363 return absl::Span<const uint8_t>();
364 }
Austin Schuh05b70472020-01-01 17:11:17 -0800365 while (data_.size() < consumed_data_ + data_size) {
366 if (!ReadBlock()) {
367 return absl::Span<const uint8_t>();
368 }
369 }
370
371 // And return it, consuming the data.
372 const uint8_t *data_ptr = data_.data() + consumed_data_;
373
Austin Schuh05b70472020-01-01 17:11:17 -0800374 return absl::Span<const uint8_t>(data_ptr, data_size);
375}
376
Austin Schuhcf5f6442021-07-06 10:43:28 -0700377void SpanReader::ConsumeMessage() {
378 consumed_data_ +=
379 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
380 sizeof(flatbuffers::uoffset_t);
381}
382
383absl::Span<const uint8_t> SpanReader::ReadMessage() {
384 absl::Span<const uint8_t> result = PeekMessage();
385 if (result != absl::Span<const uint8_t>()) {
386 ConsumeMessage();
387 }
388 return result;
389}
390
Austin Schuh05b70472020-01-01 17:11:17 -0800391bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700392 // This is the amount of data we grab at a time. Doing larger chunks minimizes
393 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -0800394 constexpr size_t kReadSize = 256 * 1024;
395
396 // Strip off any unused data at the front.
397 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700398 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -0800399 consumed_data_ = 0;
400 }
401
402 const size_t starting_size = data_.size();
403
404 // This should automatically grow the backing store. It won't shrink if we
405 // get a small chunk later. This reduces allocations when we want to append
406 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700407 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -0800408
Brian Silvermanf51499a2020-09-21 12:49:08 -0700409 const size_t count =
410 decoder_->Read(data_.begin() + starting_size, data_.end());
411 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -0800412 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -0800413 return false;
414 }
Austin Schuh05b70472020-01-01 17:11:17 -0800415
416 return true;
417}
418
Austin Schuhadd6eb32020-11-09 21:24:26 -0800419std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -0700420 SpanReader *span_reader) {
421 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800422
423 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800424 if (config_data == absl::Span<const uint8_t>()) {
425 return std::nullopt;
426 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800427
Austin Schuh5212cad2020-09-09 23:12:09 -0700428 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700429 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -0800430 if (!result.Verify()) {
431 return std::nullopt;
432 }
Austin Schuh0e8db662021-07-06 10:43:47 -0700433
434 if (FLAGS_workaround_double_headers) {
435 while (true) {
436 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
437 if (maybe_header_data == absl::Span<const uint8_t>()) {
438 break;
439 }
440
441 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
442 maybe_header_data);
443 if (maybe_header.Verify()) {
444 LOG(WARNING) << "Found duplicate LogFileHeader in "
445 << span_reader->filename();
446 ResizeableBuffer header_data_copy;
447 header_data_copy.resize(maybe_header_data.size());
448 memcpy(header_data_copy.data(), maybe_header_data.begin(),
449 header_data_copy.size());
450 result = SizePrefixedFlatbufferVector<LogFileHeader>(
451 std::move(header_data_copy));
452
453 span_reader->ConsumeMessage();
454 } else {
455 break;
456 }
457 }
458 }
Austin Schuhe09beb12020-12-11 20:04:27 -0800459 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800460}
461
Austin Schuh0e8db662021-07-06 10:43:47 -0700462std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
463 std::string_view filename) {
464 SpanReader span_reader(filename);
465 return ReadHeader(&span_reader);
466}
467
Austin Schuhadd6eb32020-11-09 21:24:26 -0800468std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -0800469 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -0700470 SpanReader span_reader(filename);
471 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
472 for (size_t i = 0; i < n + 1; ++i) {
473 data_span = span_reader.ReadMessage();
474
475 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800476 if (data_span == absl::Span<const uint8_t>()) {
477 return std::nullopt;
478 }
Austin Schuh5212cad2020-09-09 23:12:09 -0700479 }
480
Brian Silverman354697a2020-09-22 21:06:32 -0700481 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700482 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -0800483 if (!result.Verify()) {
484 return std::nullopt;
485 }
486 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -0700487}
488
Austin Schuh05b70472020-01-01 17:11:17 -0800489MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -0700490 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -0800491 raw_log_file_header_(
492 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -0700493 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
494 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -0800495
496 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -0700497 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -0800498
Austin Schuh0e8db662021-07-06 10:43:47 -0700499 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -0800500
Austin Schuh5b728b72021-06-16 14:57:15 -0700501 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
502
Austin Schuhcde938c2020-02-02 17:30:07 -0800503 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -0800504 FLAGS_max_out_of_order > 0
505 ? chrono::duration_cast<chrono::nanoseconds>(
506 chrono::duration<double>(FLAGS_max_out_of_order))
507 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -0800508
509 VLOG(1) << "Opened " << filename << " as node "
510 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -0800511}
512
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700513std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800514 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
515 if (msg_data == absl::Span<const uint8_t>()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700516 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -0800517 }
518
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700519 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
520 CHECK(msg.Verify()) << ": Corrupted message from " << filename();
Austin Schuh05b70472020-01-01 17:11:17 -0800521
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700522 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -0700523
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700524 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -0800525
526 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700527 VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
528 return result;
529}
530
531std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
532 const MessageHeader &message) {
533 const size_t data_size = message.has_data() ? message.data()->size() : 0;
534
535 UnpackedMessageHeader *const unpacked_message =
536 reinterpret_cast<UnpackedMessageHeader *>(
537 malloc(sizeof(UnpackedMessageHeader) + data_size +
538 kChannelDataAlignment - 1));
539
540 CHECK(message.has_channel_index());
541 CHECK(message.has_monotonic_sent_time());
542
543 absl::Span<uint8_t> span;
544 if (data_size > 0) {
545 span =
546 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
547 &unpacked_message->actual_data[0], data_size)),
548 data_size);
549 }
550
551 std::optional<std::chrono::nanoseconds> monotonic_remote_time;
552 if (message.has_monotonic_remote_time()) {
553 monotonic_remote_time =
554 std::chrono::nanoseconds(message.monotonic_remote_time());
555 }
556 std::optional<realtime_clock::time_point> realtime_remote_time;
557 if (message.has_realtime_remote_time()) {
558 realtime_remote_time = realtime_clock::time_point(
559 chrono::nanoseconds(message.realtime_remote_time()));
560 }
561
562 std::optional<uint32_t> remote_queue_index;
563 if (message.has_remote_queue_index()) {
564 remote_queue_index = message.remote_queue_index();
565 }
566
567 new (unpacked_message) UnpackedMessageHeader{
568 .channel_index = message.channel_index(),
569 .monotonic_sent_time = monotonic_clock::time_point(
570 chrono::nanoseconds(message.monotonic_sent_time())),
571 .realtime_sent_time = realtime_clock::time_point(
572 chrono::nanoseconds(message.realtime_sent_time())),
573 .queue_index = message.queue_index(),
574 .monotonic_remote_time = monotonic_remote_time,
575 .realtime_remote_time = realtime_remote_time,
576 .remote_queue_index = remote_queue_index,
577 .monotonic_timestamp_time = monotonic_clock::time_point(
578 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
579 .has_monotonic_timestamp_time = message.has_monotonic_timestamp_time(),
580 .span = span};
581
582 if (data_size > 0) {
583 memcpy(span.data(), message.data()->data(), data_size);
584 }
585
586 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
587 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -0800588}
589
Austin Schuhc41603c2020-10-11 16:17:37 -0700590PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -0700591 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
Brian Silvermanfee16972021-09-14 12:06:38 -0700592 if (parts_.parts.size() >= 2) {
593 next_message_reader_.emplace(parts_.parts[1]);
594 }
Austin Schuh48507722021-07-17 17:29:24 -0700595 ComputeBootCounts();
596}
597
598void PartsMessageReader::ComputeBootCounts() {
599 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
600 std::nullopt);
601
602 // We have 3 vintages of log files with different amounts of information.
603 if (log_file_header()->has_boot_uuids()) {
604 // The new hotness with the boots explicitly listed out. We can use the log
605 // file header to compute the boot count of all relevant nodes.
606 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
607 size_t node_index = 0;
608 for (const flatbuffers::String *boot_uuid :
609 *log_file_header()->boot_uuids()) {
610 CHECK(parts_.boots);
611 if (boot_uuid->size() != 0) {
612 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
613 if (it != parts_.boots->boot_count_map.end()) {
614 boot_counts_[node_index] = it->second;
615 }
616 } else if (parts().boots->boots[node_index].size() == 1u) {
617 boot_counts_[node_index] = 0;
618 }
619 ++node_index;
620 }
621 } else {
622 // Older multi-node logs which are guarenteed to have UUIDs logged, or
623 // single node log files with boot UUIDs in the header. We only know how to
624 // order certain boots in certain circumstances.
625 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
626 for (size_t node_index = 0; node_index < boot_counts_.size();
627 ++node_index) {
628 CHECK(parts_.boots);
629 if (parts().boots->boots[node_index].size() == 1u) {
630 boot_counts_[node_index] = 0;
631 }
632 }
633 } else {
634 // Really old single node logs without any UUIDs. They can't reboot.
635 CHECK_EQ(boot_counts_.size(), 1u);
636 boot_counts_[0] = 0u;
637 }
638 }
639}
Austin Schuhc41603c2020-10-11 16:17:37 -0700640
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700641std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -0700642 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700643 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -0700644 message_reader_.ReadMessage();
645 if (message) {
646 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700647 const monotonic_clock::time_point monotonic_sent_time =
648 message->monotonic_sent_time;
649
650 // TODO(austin): Does this work with startup? Might need to use the
651 // start time.
652 // TODO(austin): Does this work with startup when we don't know the
653 // remote start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -0800654 if (monotonic_sent_time >
655 parts_.monotonic_start_time + max_out_of_order_duration()) {
656 after_start_ = true;
657 }
658 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -0800659 CHECK_GE(monotonic_sent_time,
660 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -0800661 << ": Max out of order of " << max_out_of_order_duration().count()
662 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -0800663 << parts_.monotonic_start_time << " currently reading "
664 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -0800665 }
Austin Schuhc41603c2020-10-11 16:17:37 -0700666 return message;
667 }
668 NextLog();
669 }
Austin Schuh32f68492020-11-08 21:45:51 -0800670 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700671 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -0700672}
673
674void PartsMessageReader::NextLog() {
675 if (next_part_index_ == parts_.parts.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -0700676 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -0700677 done_ = true;
678 return;
679 }
Brian Silvermanfee16972021-09-14 12:06:38 -0700680 CHECK(next_message_reader_);
681 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -0700682 ComputeBootCounts();
Brian Silvermanfee16972021-09-14 12:06:38 -0700683 if (next_part_index_ + 1 < parts_.parts.size()) {
684 next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
685 } else {
686 next_message_reader_.reset();
687 }
Austin Schuhc41603c2020-10-11 16:17:37 -0700688 ++next_part_index_;
689}
690
Austin Schuh1be0ce42020-11-29 22:43:26 -0800691bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700692 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700693
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700694 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -0800695 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700696 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -0800697 return false;
698 }
699
700 if (this->channel_index < m2.channel_index) {
701 return true;
702 } else if (this->channel_index > m2.channel_index) {
703 return false;
704 }
705
706 return this->queue_index < m2.queue_index;
707}
708
709bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800710bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700711 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700712
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700713 return timestamp.time == m2.timestamp.time &&
714 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800715}
Austin Schuh1be0ce42020-11-29 22:43:26 -0800716
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700717std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
718 os << "{.channel_index=" << m.channel_index
719 << ", .monotonic_sent_time=" << m.monotonic_sent_time
720 << ", .realtime_sent_time=" << m.realtime_sent_time
721 << ", .queue_index=" << m.queue_index;
722 if (m.monotonic_remote_time) {
723 os << ", .monotonic_remote_time=" << m.monotonic_remote_time->count();
724 }
725 os << ", .realtime_remote_time=";
726 PrintOptionalOrNull(&os, m.realtime_remote_time);
727 os << ", .remote_queue_index=";
728 PrintOptionalOrNull(&os, m.remote_queue_index);
729 if (m.has_monotonic_timestamp_time) {
730 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
731 }
732 return os;
733}
734
Austin Schuh1be0ce42020-11-29 22:43:26 -0800735std::ostream &operator<<(std::ostream &os, const Message &m) {
736 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700737 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700738 if (m.data != nullptr) {
739 os << ", .data=" << m;
Austin Schuhd2f96102020-12-01 20:27:29 -0800740 }
741 os << "}";
742 return os;
743}
744
745std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
746 os << "{.channel_index=" << m.channel_index
747 << ", .queue_index=" << m.queue_index
748 << ", .monotonic_event_time=" << m.monotonic_event_time
749 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -0700750 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800751 os << ", .remote_queue_index=" << m.remote_queue_index;
752 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700753 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800754 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
755 }
756 if (m.realtime_remote_time != realtime_clock::min_time) {
757 os << ", .realtime_remote_time=" << m.realtime_remote_time;
758 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700759 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800760 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
761 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700762 if (m.data != nullptr) {
763 os << ", .data=" << *m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -0800764 }
765 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -0800766 return os;
767}
768
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800769LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -0700770 : parts_message_reader_(log_parts),
771 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
772}
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800773
774Message *LogPartsSorter::Front() {
775 // Queue up data until enough data has been queued that the front message is
776 // sorted enough to be safe to pop. This may do nothing, so we should make
777 // sure the nothing path is checked quickly.
778 if (sorted_until() != monotonic_clock::max_time) {
779 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -0700780 if (!messages_.empty() &&
781 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -0800782 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800783 break;
784 }
785
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700786 std::shared_ptr<UnpackedMessageHeader> m =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800787 parts_message_reader_.ReadMessage();
788 // No data left, sorted forever, work through what is left.
789 if (!m) {
790 sorted_until_ = monotonic_clock::max_time;
791 break;
792 }
793
Austin Schuh48507722021-07-17 17:29:24 -0700794 size_t monotonic_timestamp_boot = 0;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700795 if (m->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -0700796 monotonic_timestamp_boot = parts().logger_boot_count;
797 }
798 size_t monotonic_remote_boot = 0xffffff;
799
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700800 if (m->monotonic_remote_time.has_value()) {
milind-ua50344f2021-08-25 18:22:20 -0700801 const Node *node = parts().config->nodes()->Get(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700802 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -0700803
Austin Schuh48507722021-07-17 17:29:24 -0700804 std::optional<size_t> boot = parts_message_reader_.boot_count(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700805 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -0700806 CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
807 << ", with index "
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700808 << source_node_index_[m->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -0700809 monotonic_remote_boot = *boot;
810 }
811
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700812 messages_.insert(
813 Message{.channel_index = m->channel_index,
814 .queue_index = BootQueueIndex{.boot = parts().boot_count,
815 .index = m->queue_index},
816 .timestamp = BootTimestamp{.boot = parts().boot_count,
817 .time = m->monotonic_sent_time},
818 .monotonic_remote_boot = monotonic_remote_boot,
819 .monotonic_timestamp_boot = monotonic_timestamp_boot,
820 .data = std::move(m)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800821
822 // Now, update sorted_until_ to match the new message.
823 if (parts_message_reader_.newest_timestamp() >
824 monotonic_clock::min_time +
825 parts_message_reader_.max_out_of_order_duration()) {
826 sorted_until_ = parts_message_reader_.newest_timestamp() -
827 parts_message_reader_.max_out_of_order_duration();
828 } else {
829 sorted_until_ = monotonic_clock::min_time;
830 }
831 }
832 }
833
834 // Now that we have enough data queued, return a pointer to the oldest piece
835 // of data if it exists.
836 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -0800837 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800838 return nullptr;
839 }
840
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700841 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -0800842 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700843 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800844 return &(*messages_.begin());
845}
846
847void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
848
849std::string LogPartsSorter::DebugString() const {
850 std::stringstream ss;
851 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -0800852 int count = 0;
853 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800854 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -0800855 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
856 ss << m << "\n";
857 } else if (no_dots) {
858 ss << "...\n";
859 no_dots = false;
860 }
861 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800862 }
863 ss << "] <- " << parts_message_reader_.filename();
864 return ss.str();
865}
866
Austin Schuhd2f96102020-12-01 20:27:29 -0800867NodeMerger::NodeMerger(std::vector<LogParts> parts) {
868 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -0700869 // Enforce that we are sorting things only from a single node from a single
870 // boot.
871 const std::string_view part0_node = parts[0].node;
872 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -0800873 for (size_t i = 1; i < parts.size(); ++i) {
874 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -0700875 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
876 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -0800877 }
Austin Schuh715adc12021-06-29 22:07:39 -0700878
879 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
880
Austin Schuhd2f96102020-12-01 20:27:29 -0800881 for (LogParts &part : parts) {
882 parts_sorters_.emplace_back(std::move(part));
883 }
884
Austin Schuhd2f96102020-12-01 20:27:29 -0800885 monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh9dc42612021-09-20 20:41:29 -0700886 realtime_start_time_ = realtime_clock::min_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800887 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -0700888 // We want to capture the earliest meaningful start time here. The start
889 // time defaults to min_time when there's no meaningful value to report, so
890 // let's ignore those.
Austin Schuh9dc42612021-09-20 20:41:29 -0700891 if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time) {
892 bool accept = false;
893 // We want to prioritize start times from the logger node. Really, we
894 // want to prioritize start times with a valid realtime_clock time. So,
895 // if we have a start time without a RT clock, prefer a start time with a
896 // RT clock, even it if is later.
897 if (parts_sorter.realtime_start_time() != realtime_clock::min_time) {
898 // We've got a good one. See if the current start time has a good RT
899 // clock, or if we should use this one instead.
900 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
901 accept = true;
902 } else if (realtime_start_time_ == realtime_clock::min_time) {
903 // The previous start time doesn't have a good RT time, so it is very
904 // likely the start time from a remote part file. We just found a
905 // better start time with a real RT time, so switch to that instead.
906 accept = true;
907 }
908 } else if (realtime_start_time_ == realtime_clock::min_time) {
909 // We don't have a RT time, so take the oldest.
910 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
911 accept = true;
912 }
913 }
914
915 if (accept) {
916 monotonic_start_time_ = parts_sorter.monotonic_start_time();
917 realtime_start_time_ = parts_sorter.realtime_start_time();
918 }
Austin Schuhd2f96102020-12-01 20:27:29 -0800919 }
920 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -0700921
922 // If there was no meaningful start time reported, just use min_time.
923 if (monotonic_start_time_ == monotonic_clock::max_time) {
924 monotonic_start_time_ = monotonic_clock::min_time;
925 realtime_start_time_ = realtime_clock::min_time;
926 }
Austin Schuhd2f96102020-12-01 20:27:29 -0800927}
Austin Schuh8f52ed52020-11-30 23:12:39 -0800928
Austin Schuh0ca51f32020-12-25 21:51:45 -0800929std::vector<const LogParts *> NodeMerger::Parts() const {
930 std::vector<const LogParts *> p;
931 p.reserve(parts_sorters_.size());
932 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
933 p.emplace_back(&parts_sorter.parts());
934 }
935 return p;
936}
937
Austin Schuh8f52ed52020-11-30 23:12:39 -0800938Message *NodeMerger::Front() {
939 // Return the current Front if we have one, otherwise go compute one.
940 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -0800941 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700942 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -0800943 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800944 }
945
946 // Otherwise, do a simple search for the oldest message, deduplicating any
947 // duplicates.
948 Message *oldest = nullptr;
949 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800950 for (LogPartsSorter &parts_sorter : parts_sorters_) {
951 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -0800952 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800953 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -0800954 continue;
955 }
956 if (oldest == nullptr || *m < *oldest) {
957 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -0800958 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800959 } else if (*m == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700960 // Found a duplicate. If there is a choice, we want the one which has
961 // the timestamp time.
962 if (!m->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800963 parts_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700964 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800965 current_->PopFront();
966 current_ = &parts_sorter;
967 oldest = m;
968 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700969 CHECK_EQ(m->data->monotonic_timestamp_time,
970 oldest->data->monotonic_timestamp_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -0800971 parts_sorter.PopFront();
972 }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800973 }
974
975 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -0800976 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -0800977 }
978
Austin Schuhb000de62020-12-03 22:00:40 -0800979 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700980 CHECK_GE(oldest->timestamp.time, last_message_time_);
981 last_message_time_ = oldest->timestamp.time;
Austin Schuhb000de62020-12-03 22:00:40 -0800982 } else {
983 last_message_time_ = monotonic_clock::max_time;
984 }
985
Austin Schuh8f52ed52020-11-30 23:12:39 -0800986 // Return the oldest message found. This will be nullptr if nothing was
987 // found, indicating there is nothing left.
988 return oldest;
989}
990
991void NodeMerger::PopFront() {
992 CHECK(current_ != nullptr) << "Popping before calling Front()";
993 current_->PopFront();
994 current_ = nullptr;
995}
996
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700997BootMerger::BootMerger(std::vector<LogParts> files) {
998 std::vector<std::vector<LogParts>> boots;
999
1000 // Now, we need to split things out by boot.
1001 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001002 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001003 if (boot_count + 1 > boots.size()) {
1004 boots.resize(boot_count + 1);
1005 }
1006 boots[boot_count].emplace_back(std::move(files[i]));
1007 }
1008
1009 node_mergers_.reserve(boots.size());
1010 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07001011 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001012 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -07001013 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001014 }
1015 node_mergers_.emplace_back(
1016 std::make_unique<NodeMerger>(std::move(boots[i])));
1017 }
1018}
1019
1020Message *BootMerger::Front() {
1021 Message *result = node_mergers_[index_]->Front();
1022
1023 if (result != nullptr) {
1024 return result;
1025 }
1026
1027 if (index_ + 1u == node_mergers_.size()) {
1028 // At the end of the last node merger, just return.
1029 return nullptr;
1030 } else {
1031 ++index_;
1032 return Front();
1033 }
1034}
1035
1036void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
1037
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001038std::vector<const LogParts *> BootMerger::Parts() const {
1039 std::vector<const LogParts *> results;
1040 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
1041 std::vector<const LogParts *> node_parts = node_merger->Parts();
1042
1043 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
1044 std::make_move_iterator(node_parts.end()));
1045 }
1046
1047 return results;
1048}
1049
Austin Schuhd2f96102020-12-01 20:27:29 -08001050TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001051 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -08001052 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001053 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001054 if (!configuration_) {
1055 configuration_ = part->config;
1056 } else {
1057 CHECK_EQ(configuration_.get(), part->config.get());
1058 }
1059 }
1060 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -08001061 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
1062 // pretty simple.
1063 if (configuration::MultiNode(config)) {
1064 nodes_data_.resize(config->nodes()->size());
1065 const Node *my_node = config->nodes()->Get(node());
1066 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
1067 const Node *node = config->nodes()->Get(node_index);
1068 NodeData *node_data = &nodes_data_[node_index];
1069 node_data->channels.resize(config->channels()->size());
1070 // We should save the channel if it is delivered to the node represented
1071 // by the NodeData, but not sent by that node. That combo means it is
1072 // forwarded.
1073 size_t channel_index = 0;
1074 node_data->any_delivered = false;
1075 for (const Channel *channel : *config->channels()) {
1076 node_data->channels[channel_index].delivered =
1077 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08001078 configuration::ChannelIsSendableOnNode(channel, my_node) &&
1079 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08001080 node_data->any_delivered = node_data->any_delivered ||
1081 node_data->channels[channel_index].delivered;
1082 ++channel_index;
1083 }
1084 }
1085
1086 for (const Channel *channel : *config->channels()) {
1087 source_node_.emplace_back(configuration::GetNodeIndex(
1088 config, channel->source_node()->string_view()));
1089 }
1090 }
1091}
1092
1093void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001094 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08001095 CHECK_NE(timestamp_mapper->node(), node());
1096 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
1097
1098 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001099 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08001100 // we could needlessly save data.
1101 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08001102 VLOG(1) << "Registering on node " << node() << " for peer node "
1103 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08001104 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
1105
1106 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07001107
1108 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001109 }
1110}
1111
Austin Schuh79b30942021-01-24 22:32:21 -08001112void TimestampMapper::QueueMessage(Message *m) {
1113 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08001114 .channel_index = m->channel_index,
1115 .queue_index = m->queue_index,
1116 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001117 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07001118 .remote_queue_index = BootQueueIndex::Invalid(),
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001119 .monotonic_remote_time = BootTimestamp::min_time(),
Austin Schuhd2f96102020-12-01 20:27:29 -08001120 .realtime_remote_time = realtime_clock::min_time,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001121 .monotonic_timestamp_time = BootTimestamp::min_time(),
Austin Schuh79b30942021-01-24 22:32:21 -08001122 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08001123}
1124
1125TimestampedMessage *TimestampMapper::Front() {
1126 // No need to fetch anything new. A previous message still exists.
1127 switch (first_message_) {
1128 case FirstMessage::kNeedsUpdate:
1129 break;
1130 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08001131 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001132 case FirstMessage::kNullptr:
1133 return nullptr;
1134 }
1135
Austin Schuh79b30942021-01-24 22:32:21 -08001136 if (matched_messages_.empty()) {
1137 if (!QueueMatched()) {
1138 first_message_ = FirstMessage::kNullptr;
1139 return nullptr;
1140 }
1141 }
1142 first_message_ = FirstMessage::kInMessage;
1143 return &matched_messages_.front();
1144}
1145
1146bool TimestampMapper::QueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08001147 if (nodes_data_.empty()) {
1148 // Simple path. We are single node, so there are no timestamps to match!
1149 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001150 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001151 if (!m) {
Austin Schuh79b30942021-01-24 22:32:21 -08001152 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001153 }
Austin Schuh79b30942021-01-24 22:32:21 -08001154 // Enqueue this message into matched_messages_ so we have a place to
1155 // associate remote timestamps, and return it.
1156 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08001157
Austin Schuh79b30942021-01-24 22:32:21 -08001158 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1159 last_message_time_ = matched_messages_.back().monotonic_event_time;
1160
1161 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001162 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08001163 timestamp_callback_(&matched_messages_.back());
1164 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001165 }
1166
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001167 // We need to only add messages to the list so they get processed for
1168 // messages which are delivered. Reuse the flow below which uses messages_
1169 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08001170 if (messages_.empty()) {
1171 if (!Queue()) {
1172 // Found nothing to add, we are out of data!
Austin Schuh79b30942021-01-24 22:32:21 -08001173 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001174 }
1175
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001176 // Now that it has been added (and cannibalized), forget about it
1177 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001178 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001179 }
1180
1181 Message *m = &(messages_.front());
1182
1183 if (source_node_[m->channel_index] == node()) {
1184 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08001185 QueueMessage(m);
1186 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1187 last_message_time_ = matched_messages_.back().monotonic_event_time;
1188 messages_.pop_front();
1189 timestamp_callback_(&matched_messages_.back());
1190 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001191 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001192 // Got a timestamp, find the matching remote data, match it, and return
1193 // it.
Austin Schuhd2f96102020-12-01 20:27:29 -08001194 Message data = MatchingMessageFor(*m);
1195
1196 // Return the data from the remote. The local message only has timestamp
1197 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08001198 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08001199 .channel_index = m->channel_index,
1200 .queue_index = m->queue_index,
1201 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001202 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07001203 .remote_queue_index =
1204 BootQueueIndex{.boot = m->monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001205 .index = m->data->remote_queue_index.value()},
1206 .monotonic_remote_time = {m->monotonic_remote_boot,
1207 monotonic_clock::time_point(
1208 m->data->monotonic_remote_time.value())},
1209 .realtime_remote_time = m->data->realtime_remote_time.value(),
1210 .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
1211 m->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08001212 .data = std::move(data.data)});
1213 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1214 last_message_time_ = matched_messages_.back().monotonic_event_time;
1215 // Since messages_ holds the data, drop it.
1216 messages_.pop_front();
1217 timestamp_callback_(&matched_messages_.back());
1218 return true;
1219 }
1220}
1221
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001222void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08001223 while (last_message_time_ <= queue_time) {
1224 if (!QueueMatched()) {
1225 return;
1226 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001227 }
1228}
1229
Austin Schuhe639ea12021-01-25 13:00:22 -08001230void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001231 // Note: queueing for time doesn't really work well across boots. So we
1232 // just assume that if you are using this, you only care about the current
1233 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001234 //
1235 // TODO(austin): Is that the right concept?
1236 //
Austin Schuhe639ea12021-01-25 13:00:22 -08001237 // Make sure we have something queued first. This makes the end time
1238 // calculation simpler, and is typically what folks want regardless.
1239 if (matched_messages_.empty()) {
1240 if (!QueueMatched()) {
1241 return;
1242 }
1243 }
1244
1245 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001246 std::max(monotonic_start_time(
1247 matched_messages_.front().monotonic_event_time.boot),
1248 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08001249 time_estimation_buffer;
1250
1251 // Place sorted messages on the list until we have
1252 // --time_estimation_buffer_seconds seconds queued up (but queue at least
1253 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001254 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08001255 if (!QueueMatched()) {
1256 return;
1257 }
1258 }
1259}
1260
Austin Schuhd2f96102020-12-01 20:27:29 -08001261void TimestampMapper::PopFront() {
1262 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
1263 first_message_ = FirstMessage::kNeedsUpdate;
1264
Austin Schuh79b30942021-01-24 22:32:21 -08001265 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001266}
1267
1268Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001269 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001270 CHECK_NOTNULL(message.data);
1271 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07001272 const BootQueueIndex remote_queue_index =
1273 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001274 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08001275
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001276 CHECK(message.data->monotonic_remote_time.has_value());
1277 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08001278
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001279 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07001280 .boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001281 .time = monotonic_clock::time_point(
1282 message.data->monotonic_remote_time.value())};
1283 const realtime_clock::time_point realtime_remote_time =
1284 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001285
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001286 TimestampMapper *peer =
1287 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001288
1289 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001290 // asked to pull a timestamp from a peer which doesn't exist, return an
1291 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08001292 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001293 // TODO(austin): Make sure the tests hit all these paths with a boot count
1294 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001295 return Message{.channel_index = message.channel_index,
1296 .queue_index = remote_queue_index,
1297 .timestamp = monotonic_remote_time,
1298 .monotonic_remote_boot = 0xffffff,
1299 .monotonic_timestamp_boot = 0xffffff,
1300 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08001301 }
1302
1303 // The queue which will have the matching data, if available.
1304 std::deque<Message> *data_queue =
1305 &peer->nodes_data_[node()].channels[message.channel_index].messages;
1306
Austin Schuh79b30942021-01-24 22:32:21 -08001307 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08001308
1309 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001310 return Message{.channel_index = message.channel_index,
1311 .queue_index = remote_queue_index,
1312 .timestamp = monotonic_remote_time,
1313 .monotonic_remote_boot = 0xffffff,
1314 .monotonic_timestamp_boot = 0xffffff,
1315 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08001316 }
1317
Austin Schuhd2f96102020-12-01 20:27:29 -08001318 if (remote_queue_index < data_queue->front().queue_index ||
1319 remote_queue_index > data_queue->back().queue_index) {
1320 return Message{
1321 .channel_index = message.channel_index,
1322 .queue_index = remote_queue_index,
1323 .timestamp = monotonic_remote_time,
Austin Schuh48507722021-07-17 17:29:24 -07001324 .monotonic_remote_boot = 0xffffff,
1325 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001326 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08001327 }
1328
Austin Schuh993ccb52020-12-12 15:59:32 -08001329 // The algorithm below is constant time with some assumptions. We need there
1330 // to be no missing messages in the data stream. This also assumes a queue
1331 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07001332 if (data_queue->back().queue_index.boot ==
1333 data_queue->front().queue_index.boot &&
1334 (data_queue->back().queue_index.index -
1335 data_queue->front().queue_index.index + 1u ==
1336 data_queue->size())) {
1337 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08001338 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07001339 //
1340 // TODO(austin): Move if not reliable.
1341 Message result = (*data_queue)[remote_queue_index.index -
1342 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08001343
1344 CHECK_EQ(result.timestamp, monotonic_remote_time)
1345 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001346 CHECK_EQ(result.data->realtime_sent_time,
Austin Schuh993ccb52020-12-12 15:59:32 -08001347 realtime_remote_time)
1348 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1349 // Now drop the data off the front. We have deduplicated timestamps, so we
1350 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07001351 data_queue->erase(
1352 data_queue->begin(),
1353 data_queue->begin() +
1354 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08001355 return result;
1356 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07001357 // TODO(austin): Binary search.
1358 auto it = std::find_if(
1359 data_queue->begin(), data_queue->end(),
1360 [remote_queue_index,
1361 remote_boot = monotonic_remote_time.boot](const Message &m) {
1362 return m.queue_index == remote_queue_index &&
1363 m.timestamp.boot == remote_boot;
1364 });
Austin Schuh993ccb52020-12-12 15:59:32 -08001365 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001366 return Message{.channel_index = message.channel_index,
1367 .queue_index = remote_queue_index,
1368 .timestamp = monotonic_remote_time,
1369 .monotonic_remote_boot = 0xffffff,
1370 .monotonic_timestamp_boot = 0xffffff,
1371 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08001372 }
1373
1374 Message result = std::move(*it);
1375
1376 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001377 << ": Queue index matches, but timestamp doesn't. Please "
1378 "investigate!";
1379 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
1380 << ": Queue index matches, but timestamp doesn't. Please "
1381 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08001382
Austin Schuh58646e22021-08-23 23:51:46 -07001383 // TODO(austin): We still go in order, so we can erase from the beginning to
1384 // our iterator minus 1. That'll keep 1 in the queue.
Austin Schuh993ccb52020-12-12 15:59:32 -08001385 data_queue->erase(it);
1386
1387 return result;
1388 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001389}
1390
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001391void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001392 if (queued_until_ > t) {
1393 return;
1394 }
1395 while (true) {
1396 if (!messages_.empty() && messages_.back().timestamp > t) {
1397 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
1398 return;
1399 }
1400
1401 if (!Queue()) {
1402 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001403 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08001404 return;
1405 }
1406
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001407 // Now that it has been added (and cannibalized), forget about it
1408 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001409 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001410 }
1411}
1412
1413bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001414 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001415 if (m == nullptr) {
1416 return false;
1417 }
1418 for (NodeData &node_data : nodes_data_) {
1419 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07001420 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08001421 if (node_data.channels[m->channel_index].delivered) {
1422 // TODO(austin): This copies the data... Probably not worth stressing
1423 // about yet.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001424 // TODO(austin): Bound how big this can get. We tend not to send
1425 // massive data, so we can probably ignore this for a bit.
Austin Schuhd2f96102020-12-01 20:27:29 -08001426 node_data.channels[m->channel_index].messages.emplace_back(*m);
1427 }
1428 }
1429
1430 messages_.emplace_back(std::move(*m));
1431 return true;
1432}
1433
1434std::string TimestampMapper::DebugString() const {
1435 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07001436 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08001437 for (const Message &message : messages_) {
1438 ss << " " << message << "\n";
1439 }
1440 ss << "] queued_until " << queued_until_;
1441 for (const NodeData &ns : nodes_data_) {
1442 if (ns.peer == nullptr) continue;
1443 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
1444 size_t channel_index = 0;
1445 for (const NodeData::ChannelData &channel_data :
1446 ns.peer->nodes_data_[node()].channels) {
1447 if (channel_data.messages.empty()) {
1448 continue;
1449 }
Austin Schuhb000de62020-12-03 22:00:40 -08001450
Austin Schuhd2f96102020-12-01 20:27:29 -08001451 ss << " channel " << channel_index << " [\n";
1452 for (const Message &m : channel_data.messages) {
1453 ss << " " << m << "\n";
1454 }
1455 ss << " ]\n";
1456 ++channel_index;
1457 }
1458 ss << "] queued_until " << ns.peer->queued_until_;
1459 }
1460 return ss.str();
1461}
1462
Austin Schuhee711052020-08-24 16:06:09 -07001463std::string MaybeNodeName(const Node *node) {
1464 if (node != nullptr) {
1465 return node->name()->str() + " ";
1466 }
1467 return "";
1468}
1469
Brian Silvermanf51499a2020-09-21 12:49:08 -07001470} // namespace aos::logger