blob: ecdf5d64709d4a4954562f748124709ae236971a [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Austin Schuha36c8902019-12-30 18:07:15 -080010
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070013#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080014#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080015#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "gflags/gflags.h"
18#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080019
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070020#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070021#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070022#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070023#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070024#else
25#define ENABLE_LZMA 0
26#endif
27
28#if ENABLE_LZMA
29#include "aos/events/logging/lzma_encoder.h"
30#endif
31
Austin Schuh7fbf5a72020-09-21 16:28:13 -070032DEFINE_int32(flush_size, 128000,
Austin Schuha36c8902019-12-30 18:07:15 -080033 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070034DEFINE_double(
35 flush_period, 5.0,
36 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080037
Austin Schuha040c3f2021-02-13 16:09:07 -080038DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080039 max_network_delay, 1.0,
40 "Max time to assume a message takes to cross the network before we are "
41 "willing to drop it from our buffers and assume it didn't make it. "
42 "Increasing this number can increase memory usage depending on the packet "
43 "loss of your network or if the timestamps aren't logged for a message.");
44
45DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080046 max_out_of_order, -1,
47 "If set, this overrides the max out of order duration for a log file.");
48
Austin Schuh0e8db662021-07-06 10:43:47 -070049DEFINE_bool(workaround_double_headers, true,
50 "Some old log files have two headers at the beginning. Use the "
51 "last header as the actual header.");
52
Brian Smarttea913d42021-12-10 15:02:38 -080053DEFINE_bool(crash_on_corrupt_message, true,
54 "When true, MessageReader will crash the first time a message "
55 "with corrupted format is found. When false, the crash will be "
56 "suppressed, and any remaining readable messages will be "
57 "evaluated to present verified vs corrupted stats.");
58
59DEFINE_bool(ignore_corrupt_messages, false,
60 "When true, and crash_on_corrupt_message is false, then any "
61 "corrupt message found by MessageReader be silently ignored, "
62 "providing access to all uncorrupted messages in a logfile.");
63
Brian Silvermanf51499a2020-09-21 12:49:08 -070064namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070065namespace {
Austin Schuha36c8902019-12-30 18:07:15 -080066
Austin Schuh05b70472020-01-01 17:11:17 -080067namespace chrono = std::chrono;
68
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070069template <typename T>
70void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
71 if (t.has_value()) {
72 *os << *t;
73 } else {
74 *os << "null";
75 }
76}
77} // namespace
78
Brian Silvermanf51499a2020-09-21 12:49:08 -070079DetachedBufferWriter::DetachedBufferWriter(
80 std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
81 : filename_(filename), encoder_(std::move(encoder)) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -070082 if (!util::MkdirPIfSpace(filename, 0777)) {
83 ran_out_of_space_ = true;
84 } else {
85 fd_ = open(std::string(filename).c_str(),
86 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
87 if (fd_ == -1 && errno == ENOSPC) {
88 ran_out_of_space_ = true;
89 } else {
Austin Schuh58646e22021-08-23 23:51:46 -070090 PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
91 << " for writing";
92 VLOG(1) << "Opened " << this->filename() << " for writing";
Brian Silvermana9f2ec92020-10-06 18:00:53 -070093 }
94 }
Austin Schuha36c8902019-12-30 18:07:15 -080095}
96
97DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -070098 Close();
99 if (ran_out_of_space_) {
100 CHECK(acknowledge_ran_out_of_space_)
101 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -0700102 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700103}
104
Brian Silvermand90905f2020-09-23 14:42:56 -0700105DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700106 *this = std::move(other);
107}
108
Brian Silverman87ac0402020-09-17 14:47:01 -0700109// When other is destroyed "soon" (which it should be because we're getting an
110// rvalue reference to it), it will flush etc all the data we have queued up
111// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700112DetachedBufferWriter &DetachedBufferWriter::operator=(
113 DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700114 std::swap(filename_, other.filename_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700115 std::swap(encoder_, other.encoder_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700116 std::swap(fd_, other.fd_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700117 std::swap(ran_out_of_space_, other.ran_out_of_space_);
118 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700119 std::swap(iovec_, other.iovec_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700120 std::swap(max_write_time_, other.max_write_time_);
121 std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
122 std::swap(max_write_time_messages_, other.max_write_time_messages_);
123 std::swap(total_write_time_, other.total_write_time_);
124 std::swap(total_write_count_, other.total_write_count_);
125 std::swap(total_write_messages_, other.total_write_messages_);
126 std::swap(total_write_bytes_, other.total_write_bytes_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700127 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800128}
129
Brian Silvermanf51499a2020-09-21 12:49:08 -0700130void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700131 if (ran_out_of_space_) {
132 // We don't want any later data to be written after space becomes
133 // available, so refuse to write anything more once we've dropped data
134 // because we ran out of space.
135 VLOG(1) << "Ignoring span: " << span.size();
136 return;
137 }
138
Austin Schuhbd06ae42021-03-31 22:48:21 -0700139 aos::monotonic_clock::time_point now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700140 if (encoder_->may_bypass() && span.size() > 4096u) {
141 // Over this threshold, we'll assume it's cheaper to add an extra
142 // syscall to write the data immediately instead of copying it to
143 // enqueue.
Austin Schuha36c8902019-12-30 18:07:15 -0800144
Brian Silvermanf51499a2020-09-21 12:49:08 -0700145 // First, flush everything.
146 while (encoder_->queue_size() > 0u) {
147 Flush();
148 }
Austin Schuhde031b72020-01-10 19:34:41 -0800149
Brian Silvermanf51499a2020-09-21 12:49:08 -0700150 // Then, write it directly.
151 const auto start = aos::monotonic_clock::now();
152 const ssize_t written = write(fd_, span.data(), span.size());
153 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700154 HandleWriteReturn(written, span.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700155 UpdateStatsForWrite(end - start, written, 1);
Austin Schuhbd06ae42021-03-31 22:48:21 -0700156 now = end;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700157 } else {
158 encoder_->Encode(CopySpanAsDetachedBuffer(span));
Austin Schuhbd06ae42021-03-31 22:48:21 -0700159 now = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800160 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700161
Austin Schuhbd06ae42021-03-31 22:48:21 -0700162 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800163}
164
Brian Silverman0465fcf2020-09-24 00:29:18 -0700165void DetachedBufferWriter::Close() {
166 if (fd_ == -1) {
167 return;
168 }
169 encoder_->Finish();
170 while (encoder_->queue_size() > 0) {
171 Flush();
172 }
173 if (close(fd_) == -1) {
174 if (errno == ENOSPC) {
175 ran_out_of_space_ = true;
176 } else {
177 PLOG(ERROR) << "Closing log file failed";
178 }
179 }
180 fd_ = -1;
Austin Schuh58646e22021-08-23 23:51:46 -0700181 VLOG(1) << "Closed " << filename();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700182}
183
Austin Schuha36c8902019-12-30 18:07:15 -0800184void DetachedBufferWriter::Flush() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700185 if (ran_out_of_space_) {
186 // We don't want any later data to be written after space becomes available,
187 // so refuse to write anything more once we've dropped data because we ran
188 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700189 if (encoder_) {
190 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
191 encoder_->Clear(encoder_->queue().size());
192 } else {
193 VLOG(1) << "No queue to ignore";
194 }
195 return;
196 }
197
198 const auto queue = encoder_->queue();
199 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700200 return;
201 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700202
Austin Schuha36c8902019-12-30 18:07:15 -0800203 iovec_.clear();
Brian Silvermanf51499a2020-09-21 12:49:08 -0700204 const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
205 iovec_.resize(iovec_size);
Austin Schuha36c8902019-12-30 18:07:15 -0800206 size_t counted_size = 0;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700207 for (size_t i = 0; i < iovec_size; ++i) {
208 iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
209 iovec_[i].iov_len = queue[i].size();
210 counted_size += iovec_[i].iov_len;
Austin Schuha36c8902019-12-30 18:07:15 -0800211 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700212
213 const auto start = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800214 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700215 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700216 HandleWriteReturn(written, counted_size);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700217
218 encoder_->Clear(iovec_size);
219
220 UpdateStatsForWrite(end - start, written, iovec_size);
221}
222
Brian Silverman0465fcf2020-09-24 00:29:18 -0700223void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
224 size_t write_size) {
225 if (write_return == -1 && errno == ENOSPC) {
226 ran_out_of_space_ = true;
227 return;
228 }
229 PCHECK(write_return >= 0) << ": write failed";
230 if (write_return < static_cast<ssize_t>(write_size)) {
231 // Sometimes this happens instead of ENOSPC. On a real filesystem, this
232 // never seems to happen in any other case. If we ever want to log to a
233 // socket, this will happen more often. However, until we get there, we'll
234 // just assume it means we ran out of space.
235 ran_out_of_space_ = true;
236 return;
237 }
238}
239
Brian Silvermanf51499a2020-09-21 12:49:08 -0700240void DetachedBufferWriter::UpdateStatsForWrite(
241 aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
242 if (duration > max_write_time_) {
243 max_write_time_ = duration;
244 max_write_time_bytes_ = written;
245 max_write_time_messages_ = iovec_size;
246 }
247 total_write_time_ += duration;
248 ++total_write_count_;
249 total_write_messages_ += iovec_size;
250 total_write_bytes_ += written;
251}
252
Austin Schuhbd06ae42021-03-31 22:48:21 -0700253void DetachedBufferWriter::FlushAtThreshold(
254 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700255 if (ran_out_of_space_) {
256 // We don't want any later data to be written after space becomes available,
257 // so refuse to write anything more once we've dropped data because we ran
258 // out of space.
259 if (encoder_) {
260 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
261 encoder_->Clear(encoder_->queue().size());
262 } else {
263 VLOG(1) << "No queue to ignore";
264 }
265 return;
266 }
267
Austin Schuhbd06ae42021-03-31 22:48:21 -0700268 // We don't want to flush the first time through. Otherwise we will flush as
269 // the log file header might be compressing, defeating any parallelism and
270 // queueing there.
271 if (last_flush_time_ == aos::monotonic_clock::min_time) {
272 last_flush_time_ = now;
273 }
274
Brian Silvermanf51499a2020-09-21 12:49:08 -0700275 // Flush if we are at the max number of iovs per writev, because there's no
276 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700277 // data queued up or if it has been long enough.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700278 while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700279 encoder_->queue_size() >= IOV_MAX ||
280 now > last_flush_time_ +
281 chrono::duration_cast<chrono::nanoseconds>(
282 chrono::duration<double>(FLAGS_flush_period))) {
283 last_flush_time_ = now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700284 Flush();
285 }
Austin Schuha36c8902019-12-30 18:07:15 -0800286}
287
288flatbuffers::Offset<MessageHeader> PackMessage(
289 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
290 int channel_index, LogType log_type) {
291 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
292
293 switch (log_type) {
294 case LogType::kLogMessage:
295 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800296 case LogType::kLogRemoteMessage:
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700297 data_offset = fbb->CreateVector(
298 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800299 break;
300
301 case LogType::kLogDeliveryTimeOnly:
302 break;
303 }
304
305 MessageHeader::Builder message_header_builder(*fbb);
306 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800307
308 switch (log_type) {
309 case LogType::kLogRemoteMessage:
310 message_header_builder.add_queue_index(context.remote_queue_index);
311 message_header_builder.add_monotonic_sent_time(
312 context.monotonic_remote_time.time_since_epoch().count());
313 message_header_builder.add_realtime_sent_time(
314 context.realtime_remote_time.time_since_epoch().count());
315 break;
316
317 case LogType::kLogMessage:
318 case LogType::kLogMessageAndDeliveryTime:
319 case LogType::kLogDeliveryTimeOnly:
320 message_header_builder.add_queue_index(context.queue_index);
321 message_header_builder.add_monotonic_sent_time(
322 context.monotonic_event_time.time_since_epoch().count());
323 message_header_builder.add_realtime_sent_time(
324 context.realtime_event_time.time_since_epoch().count());
325 break;
326 }
Austin Schuha36c8902019-12-30 18:07:15 -0800327
328 switch (log_type) {
329 case LogType::kLogMessage:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800330 case LogType::kLogRemoteMessage:
Austin Schuha36c8902019-12-30 18:07:15 -0800331 message_header_builder.add_data(data_offset);
332 break;
333
334 case LogType::kLogMessageAndDeliveryTime:
335 message_header_builder.add_data(data_offset);
336 [[fallthrough]];
337
338 case LogType::kLogDeliveryTimeOnly:
339 message_header_builder.add_monotonic_remote_time(
340 context.monotonic_remote_time.time_since_epoch().count());
341 message_header_builder.add_realtime_remote_time(
342 context.realtime_remote_time.time_since_epoch().count());
343 message_header_builder.add_remote_queue_index(context.remote_queue_index);
344 break;
345 }
346
347 return message_header_builder.Finish();
348}
349
Austin Schuhcd368422021-11-22 21:23:29 -0800350SpanReader::SpanReader(std::string_view filename, bool quiet)
351 : filename_(filename) {
Tyler Chatow2015bc62021-08-04 21:15:09 -0700352 decoder_ = std::make_unique<DummyDecoder>(filename);
353
354 static constexpr std::string_view kXz = ".xz";
James Kuszmauldd0a5042021-10-28 23:38:04 -0700355 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700356 if (filename.substr(filename.size() - kXz.size()) == kXz) {
357#if ENABLE_LZMA
Austin Schuhcd368422021-11-22 21:23:29 -0800358 decoder_ =
359 std::make_unique<ThreadedLzmaDecoder>(std::move(decoder_), quiet);
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700360#else
Austin Schuhcd368422021-11-22 21:23:29 -0800361 (void)quiet;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700362 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
363#endif
James Kuszmauldd0a5042021-10-28 23:38:04 -0700364 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
365 decoder_ = std::make_unique<SnappyDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700366 }
Austin Schuh05b70472020-01-01 17:11:17 -0800367}
368
Austin Schuhcf5f6442021-07-06 10:43:28 -0700369absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800370 // Make sure we have enough for the size.
371 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
372 if (!ReadBlock()) {
373 return absl::Span<const uint8_t>();
374 }
375 }
376
377 // Now make sure we have enough for the message.
378 const size_t data_size =
379 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
380 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -0800381 if (data_size == sizeof(flatbuffers::uoffset_t)) {
382 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
383 LOG(ERROR) << " Rest of log file is "
384 << absl::BytesToHexString(std::string_view(
385 reinterpret_cast<const char *>(data_.data() +
386 consumed_data_),
387 data_.size() - consumed_data_));
388 return absl::Span<const uint8_t>();
389 }
Austin Schuh05b70472020-01-01 17:11:17 -0800390 while (data_.size() < consumed_data_ + data_size) {
391 if (!ReadBlock()) {
392 return absl::Span<const uint8_t>();
393 }
394 }
395
396 // And return it, consuming the data.
397 const uint8_t *data_ptr = data_.data() + consumed_data_;
398
Austin Schuh05b70472020-01-01 17:11:17 -0800399 return absl::Span<const uint8_t>(data_ptr, data_size);
400}
401
Austin Schuhcf5f6442021-07-06 10:43:28 -0700402void SpanReader::ConsumeMessage() {
Brian Smarttea913d42021-12-10 15:02:38 -0800403 size_t consumed_size =
Austin Schuhcf5f6442021-07-06 10:43:28 -0700404 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
405 sizeof(flatbuffers::uoffset_t);
Brian Smarttea913d42021-12-10 15:02:38 -0800406 consumed_data_ += consumed_size;
407 total_consumed_ += consumed_size;
Austin Schuhcf5f6442021-07-06 10:43:28 -0700408}
409
410absl::Span<const uint8_t> SpanReader::ReadMessage() {
411 absl::Span<const uint8_t> result = PeekMessage();
412 if (result != absl::Span<const uint8_t>()) {
413 ConsumeMessage();
Brian Smarttea913d42021-12-10 15:02:38 -0800414 } else {
415 is_finished_ = true;
Austin Schuhcf5f6442021-07-06 10:43:28 -0700416 }
417 return result;
418}
419
Austin Schuh05b70472020-01-01 17:11:17 -0800420bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700421 // This is the amount of data we grab at a time. Doing larger chunks minimizes
422 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -0800423 constexpr size_t kReadSize = 256 * 1024;
424
425 // Strip off any unused data at the front.
426 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700427 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -0800428 consumed_data_ = 0;
429 }
430
431 const size_t starting_size = data_.size();
432
433 // This should automatically grow the backing store. It won't shrink if we
434 // get a small chunk later. This reduces allocations when we want to append
435 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700436 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -0800437
Brian Silvermanf51499a2020-09-21 12:49:08 -0700438 const size_t count =
439 decoder_->Read(data_.begin() + starting_size, data_.end());
440 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -0800441 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -0800442 return false;
443 }
Austin Schuh05b70472020-01-01 17:11:17 -0800444
Brian Smarttea913d42021-12-10 15:02:38 -0800445 total_read_ += count;
446
Austin Schuh05b70472020-01-01 17:11:17 -0800447 return true;
448}
449
Austin Schuhadd6eb32020-11-09 21:24:26 -0800450std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -0700451 SpanReader *span_reader) {
452 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800453
454 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800455 if (config_data == absl::Span<const uint8_t>()) {
456 return std::nullopt;
457 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800458
Austin Schuh5212cad2020-09-09 23:12:09 -0700459 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700460 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -0800461 if (!result.Verify()) {
462 return std::nullopt;
463 }
Austin Schuh0e8db662021-07-06 10:43:47 -0700464
465 if (FLAGS_workaround_double_headers) {
466 while (true) {
467 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
468 if (maybe_header_data == absl::Span<const uint8_t>()) {
469 break;
470 }
471
472 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
473 maybe_header_data);
474 if (maybe_header.Verify()) {
475 LOG(WARNING) << "Found duplicate LogFileHeader in "
476 << span_reader->filename();
477 ResizeableBuffer header_data_copy;
478 header_data_copy.resize(maybe_header_data.size());
479 memcpy(header_data_copy.data(), maybe_header_data.begin(),
480 header_data_copy.size());
481 result = SizePrefixedFlatbufferVector<LogFileHeader>(
482 std::move(header_data_copy));
483
484 span_reader->ConsumeMessage();
485 } else {
486 break;
487 }
488 }
489 }
Austin Schuhe09beb12020-12-11 20:04:27 -0800490 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800491}
492
Austin Schuh0e8db662021-07-06 10:43:47 -0700493std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
494 std::string_view filename) {
495 SpanReader span_reader(filename);
496 return ReadHeader(&span_reader);
497}
498
Austin Schuhadd6eb32020-11-09 21:24:26 -0800499std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -0800500 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -0700501 SpanReader span_reader(filename);
502 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
503 for (size_t i = 0; i < n + 1; ++i) {
504 data_span = span_reader.ReadMessage();
505
506 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800507 if (data_span == absl::Span<const uint8_t>()) {
508 return std::nullopt;
509 }
Austin Schuh5212cad2020-09-09 23:12:09 -0700510 }
511
Brian Silverman354697a2020-09-22 21:06:32 -0700512 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700513 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -0800514 if (!result.Verify()) {
515 return std::nullopt;
516 }
517 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -0700518}
519
Austin Schuh05b70472020-01-01 17:11:17 -0800520MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -0700521 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -0800522 raw_log_file_header_(
523 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -0800524 set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
525 set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
526
Austin Schuh0e8db662021-07-06 10:43:47 -0700527 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
528 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -0800529
530 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -0700531 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -0800532
Austin Schuh0e8db662021-07-06 10:43:47 -0700533 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -0800534
Austin Schuh5b728b72021-06-16 14:57:15 -0700535 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
536
Brian Smarttea913d42021-12-10 15:02:38 -0800537 total_verified_before_ = span_reader_.TotalConsumed();
538
Austin Schuhcde938c2020-02-02 17:30:07 -0800539 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -0800540 FLAGS_max_out_of_order > 0
541 ? chrono::duration_cast<chrono::nanoseconds>(
542 chrono::duration<double>(FLAGS_max_out_of_order))
543 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -0800544
545 VLOG(1) << "Opened " << filename << " as node "
546 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -0800547}
548
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700549std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800550 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
551 if (msg_data == absl::Span<const uint8_t>()) {
Brian Smarttea913d42021-12-10 15:02:38 -0800552 if (is_corrupted()) {
553 LOG(ERROR) << "Total corrupted volumes: before = "
554 << total_verified_before_
555 << " | corrupted = " << total_corrupted_
556 << " | during = " << total_verified_during_
557 << " | after = " << total_verified_after_ << std::endl;
558 }
559
560 if (span_reader_.IsIncomplete()) {
561 LOG(ERROR) << "Unable to access some messages in " << filename()
562 << " : " << span_reader_.TotalRead() << " bytes read, "
563 << span_reader_.TotalConsumed() << " bytes usable."
564 << std::endl;
565 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700566 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -0800567 }
568
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700569 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
Brian Smarttea913d42021-12-10 15:02:38 -0800570
571 if (crash_on_corrupt_message_flag_) {
572 CHECK(msg.Verify()) << "Corrupted message at offset "
573 << total_verified_before_
574 << " found within " << filename()
575 << "; set --nocrash_on_corrupt_message to see summary;"
576 << " also set --ignore_corrupt_messages to process"
577 << " anyway";
578
579 } else if (!msg.Verify()) {
580 LOG(ERROR) << "Corrupted message at offset "
581 << total_verified_before_
582 << " from " << filename() << std::endl;
583
584 total_corrupted_ += msg_data.size();
585
586 while (true) {
587 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
588
589 if (msg_data == absl::Span<const uint8_t>()) {
590 if (!ignore_corrupt_messages_flag_) {
591 LOG(ERROR) << "Total corrupted volumes: before = "
592 << total_verified_before_
593 << " | corrupted = " << total_corrupted_
594 << " | during = " << total_verified_during_
595 << " | after = " << total_verified_after_ << std::endl;
596
597 if (span_reader_.IsIncomplete()) {
598 LOG(ERROR) << "Unable to access some messages in " << filename()
599 << " : " << span_reader_.TotalRead() << " bytes read, "
600 << span_reader_.TotalConsumed() << " bytes usable."
601 << std::endl;
602 }
603 return nullptr;
604 }
605 break;
606 }
607
608 SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
609
610 if (!next_msg.Verify()) {
611 total_corrupted_ += msg_data.size();
612 total_verified_during_ += total_verified_after_;
613 total_verified_after_ = 0;
614
615 } else {
616 total_verified_after_ += msg_data.size();
617 if (ignore_corrupt_messages_flag_) {
618 msg = next_msg;
619 break;
620 }
621 }
622 }
623 }
624
625 if (is_corrupted()) {
626 total_verified_after_ += msg_data.size();
627 } else {
628 total_verified_before_ += msg_data.size();
629 }
Austin Schuh05b70472020-01-01 17:11:17 -0800630
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700631 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -0700632
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700633 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -0800634
635 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -0800636
637 if (VLOG_IS_ON(3)) {
638 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
639 } else if (VLOG_IS_ON(2)) {
640 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
641 msg_copy.mutable_message()->clear_data();
642 VLOG(2) << "Read from " << filename() << " data "
643 << FlatbufferToJson(msg_copy);
644 }
645
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700646 return result;
647}
648
649std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
650 const MessageHeader &message) {
651 const size_t data_size = message.has_data() ? message.data()->size() : 0;
652
653 UnpackedMessageHeader *const unpacked_message =
654 reinterpret_cast<UnpackedMessageHeader *>(
655 malloc(sizeof(UnpackedMessageHeader) + data_size +
656 kChannelDataAlignment - 1));
657
658 CHECK(message.has_channel_index());
659 CHECK(message.has_monotonic_sent_time());
660
661 absl::Span<uint8_t> span;
662 if (data_size > 0) {
663 span =
664 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
665 &unpacked_message->actual_data[0], data_size)),
666 data_size);
667 }
668
Austin Schuh826e6ce2021-11-18 20:33:10 -0800669 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700670 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -0800671 monotonic_remote_time = aos::monotonic_clock::time_point(
672 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700673 }
674 std::optional<realtime_clock::time_point> realtime_remote_time;
675 if (message.has_realtime_remote_time()) {
676 realtime_remote_time = realtime_clock::time_point(
677 chrono::nanoseconds(message.realtime_remote_time()));
678 }
679
680 std::optional<uint32_t> remote_queue_index;
681 if (message.has_remote_queue_index()) {
682 remote_queue_index = message.remote_queue_index();
683 }
684
685 new (unpacked_message) UnpackedMessageHeader{
686 .channel_index = message.channel_index(),
687 .monotonic_sent_time = monotonic_clock::time_point(
688 chrono::nanoseconds(message.monotonic_sent_time())),
689 .realtime_sent_time = realtime_clock::time_point(
690 chrono::nanoseconds(message.realtime_sent_time())),
691 .queue_index = message.queue_index(),
692 .monotonic_remote_time = monotonic_remote_time,
693 .realtime_remote_time = realtime_remote_time,
694 .remote_queue_index = remote_queue_index,
695 .monotonic_timestamp_time = monotonic_clock::time_point(
696 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
697 .has_monotonic_timestamp_time = message.has_monotonic_timestamp_time(),
698 .span = span};
699
700 if (data_size > 0) {
701 memcpy(span.data(), message.data()->data(), data_size);
702 }
703
704 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
705 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -0800706}
707
Austin Schuhc41603c2020-10-11 16:17:37 -0700708PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -0700709 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
Brian Silvermanfee16972021-09-14 12:06:38 -0700710 if (parts_.parts.size() >= 2) {
711 next_message_reader_.emplace(parts_.parts[1]);
712 }
Austin Schuh48507722021-07-17 17:29:24 -0700713 ComputeBootCounts();
714}
715
716void PartsMessageReader::ComputeBootCounts() {
717 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
718 std::nullopt);
719
720 // We have 3 vintages of log files with different amounts of information.
721 if (log_file_header()->has_boot_uuids()) {
722 // The new hotness with the boots explicitly listed out. We can use the log
723 // file header to compute the boot count of all relevant nodes.
724 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
725 size_t node_index = 0;
726 for (const flatbuffers::String *boot_uuid :
727 *log_file_header()->boot_uuids()) {
728 CHECK(parts_.boots);
729 if (boot_uuid->size() != 0) {
730 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
731 if (it != parts_.boots->boot_count_map.end()) {
732 boot_counts_[node_index] = it->second;
733 }
734 } else if (parts().boots->boots[node_index].size() == 1u) {
735 boot_counts_[node_index] = 0;
736 }
737 ++node_index;
738 }
739 } else {
740 // Older multi-node logs which are guarenteed to have UUIDs logged, or
741 // single node log files with boot UUIDs in the header. We only know how to
742 // order certain boots in certain circumstances.
743 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
744 for (size_t node_index = 0; node_index < boot_counts_.size();
745 ++node_index) {
746 CHECK(parts_.boots);
747 if (parts().boots->boots[node_index].size() == 1u) {
748 boot_counts_[node_index] = 0;
749 }
750 }
751 } else {
752 // Really old single node logs without any UUIDs. They can't reboot.
753 CHECK_EQ(boot_counts_.size(), 1u);
754 boot_counts_[0] = 0u;
755 }
756 }
757}
Austin Schuhc41603c2020-10-11 16:17:37 -0700758
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700759std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -0700760 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700761 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -0700762 message_reader_.ReadMessage();
763 if (message) {
764 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700765 const monotonic_clock::time_point monotonic_sent_time =
766 message->monotonic_sent_time;
767
768 // TODO(austin): Does this work with startup? Might need to use the
769 // start time.
770 // TODO(austin): Does this work with startup when we don't know the
771 // remote start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -0800772 if (monotonic_sent_time >
773 parts_.monotonic_start_time + max_out_of_order_duration()) {
774 after_start_ = true;
775 }
776 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -0800777 CHECK_GE(monotonic_sent_time,
778 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -0800779 << ": Max out of order of " << max_out_of_order_duration().count()
780 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -0800781 << parts_.monotonic_start_time << " currently reading "
782 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -0800783 }
Austin Schuhc41603c2020-10-11 16:17:37 -0700784 return message;
785 }
786 NextLog();
787 }
Austin Schuh32f68492020-11-08 21:45:51 -0800788 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700789 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -0700790}
791
792void PartsMessageReader::NextLog() {
793 if (next_part_index_ == parts_.parts.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -0700794 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -0700795 done_ = true;
796 return;
797 }
Brian Silvermanfee16972021-09-14 12:06:38 -0700798 CHECK(next_message_reader_);
799 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -0700800 ComputeBootCounts();
Brian Silvermanfee16972021-09-14 12:06:38 -0700801 if (next_part_index_ + 1 < parts_.parts.size()) {
802 next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
803 } else {
804 next_message_reader_.reset();
805 }
Austin Schuhc41603c2020-10-11 16:17:37 -0700806 ++next_part_index_;
807}
808
Austin Schuh1be0ce42020-11-29 22:43:26 -0800809bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700810 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700811
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700812 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -0800813 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700814 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -0800815 return false;
816 }
817
818 if (this->channel_index < m2.channel_index) {
819 return true;
820 } else if (this->channel_index > m2.channel_index) {
821 return false;
822 }
823
824 return this->queue_index < m2.queue_index;
825}
826
827bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800828bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700829 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700830
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700831 return timestamp.time == m2.timestamp.time &&
832 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800833}
Austin Schuh1be0ce42020-11-29 22:43:26 -0800834
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700835std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
836 os << "{.channel_index=" << m.channel_index
837 << ", .monotonic_sent_time=" << m.monotonic_sent_time
838 << ", .realtime_sent_time=" << m.realtime_sent_time
839 << ", .queue_index=" << m.queue_index;
840 if (m.monotonic_remote_time) {
Austin Schuh826e6ce2021-11-18 20:33:10 -0800841 os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700842 }
843 os << ", .realtime_remote_time=";
844 PrintOptionalOrNull(&os, m.realtime_remote_time);
845 os << ", .remote_queue_index=";
846 PrintOptionalOrNull(&os, m.remote_queue_index);
847 if (m.has_monotonic_timestamp_time) {
848 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
849 }
850 return os;
851}
852
Austin Schuh1be0ce42020-11-29 22:43:26 -0800853std::ostream &operator<<(std::ostream &os, const Message &m) {
854 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700855 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700856 if (m.data != nullptr) {
Austin Schuh826e6ce2021-11-18 20:33:10 -0800857 if (m.data->remote_queue_index.has_value()) {
858 os << ", .remote_queue_index=" << *m.data->remote_queue_index;
859 }
860 if (m.data->monotonic_remote_time.has_value()) {
861 os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
862 }
Austin Schuhfb1b3292021-11-16 21:20:15 -0800863 os << ", .data=" << m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -0800864 }
865 os << "}";
866 return os;
867}
868
869std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
870 os << "{.channel_index=" << m.channel_index
871 << ", .queue_index=" << m.queue_index
872 << ", .monotonic_event_time=" << m.monotonic_event_time
873 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -0700874 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800875 os << ", .remote_queue_index=" << m.remote_queue_index;
876 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700877 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800878 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
879 }
880 if (m.realtime_remote_time != realtime_clock::min_time) {
881 os << ", .realtime_remote_time=" << m.realtime_remote_time;
882 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700883 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800884 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
885 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700886 if (m.data != nullptr) {
887 os << ", .data=" << *m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -0800888 }
889 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -0800890 return os;
891}
892
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800893LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -0700894 : parts_message_reader_(log_parts),
895 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
896}
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800897
898Message *LogPartsSorter::Front() {
899 // Queue up data until enough data has been queued that the front message is
900 // sorted enough to be safe to pop. This may do nothing, so we should make
901 // sure the nothing path is checked quickly.
902 if (sorted_until() != monotonic_clock::max_time) {
903 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -0700904 if (!messages_.empty() &&
905 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -0800906 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800907 break;
908 }
909
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700910 std::shared_ptr<UnpackedMessageHeader> m =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800911 parts_message_reader_.ReadMessage();
912 // No data left, sorted forever, work through what is left.
913 if (!m) {
914 sorted_until_ = monotonic_clock::max_time;
915 break;
916 }
917
Austin Schuh48507722021-07-17 17:29:24 -0700918 size_t monotonic_timestamp_boot = 0;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700919 if (m->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -0700920 monotonic_timestamp_boot = parts().logger_boot_count;
921 }
922 size_t monotonic_remote_boot = 0xffffff;
923
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700924 if (m->monotonic_remote_time.has_value()) {
milind-ua50344f2021-08-25 18:22:20 -0700925 const Node *node = parts().config->nodes()->Get(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700926 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -0700927
Austin Schuh48507722021-07-17 17:29:24 -0700928 std::optional<size_t> boot = parts_message_reader_.boot_count(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700929 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -0700930 CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
931 << ", with index "
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700932 << source_node_index_[m->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -0700933 monotonic_remote_boot = *boot;
934 }
935
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700936 messages_.insert(
937 Message{.channel_index = m->channel_index,
938 .queue_index = BootQueueIndex{.boot = parts().boot_count,
939 .index = m->queue_index},
940 .timestamp = BootTimestamp{.boot = parts().boot_count,
941 .time = m->monotonic_sent_time},
942 .monotonic_remote_boot = monotonic_remote_boot,
943 .monotonic_timestamp_boot = monotonic_timestamp_boot,
944 .data = std::move(m)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800945
946 // Now, update sorted_until_ to match the new message.
947 if (parts_message_reader_.newest_timestamp() >
948 monotonic_clock::min_time +
949 parts_message_reader_.max_out_of_order_duration()) {
950 sorted_until_ = parts_message_reader_.newest_timestamp() -
951 parts_message_reader_.max_out_of_order_duration();
952 } else {
953 sorted_until_ = monotonic_clock::min_time;
954 }
955 }
956 }
957
958 // Now that we have enough data queued, return a pointer to the oldest piece
959 // of data if it exists.
960 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -0800961 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800962 return nullptr;
963 }
964
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700965 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -0800966 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700967 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800968 return &(*messages_.begin());
969}
970
971void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
972
973std::string LogPartsSorter::DebugString() const {
974 std::stringstream ss;
975 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -0800976 int count = 0;
977 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800978 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -0800979 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
980 ss << m << "\n";
981 } else if (no_dots) {
982 ss << "...\n";
983 no_dots = false;
984 }
985 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800986 }
987 ss << "] <- " << parts_message_reader_.filename();
988 return ss.str();
989}
990
Austin Schuhd2f96102020-12-01 20:27:29 -0800991NodeMerger::NodeMerger(std::vector<LogParts> parts) {
992 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -0700993 // Enforce that we are sorting things only from a single node from a single
994 // boot.
995 const std::string_view part0_node = parts[0].node;
996 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -0800997 for (size_t i = 1; i < parts.size(); ++i) {
998 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -0700999 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
1000 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -08001001 }
Austin Schuh715adc12021-06-29 22:07:39 -07001002
1003 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
1004
Austin Schuhd2f96102020-12-01 20:27:29 -08001005 for (LogParts &part : parts) {
1006 parts_sorters_.emplace_back(std::move(part));
1007 }
1008
Austin Schuhd2f96102020-12-01 20:27:29 -08001009 monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh9dc42612021-09-20 20:41:29 -07001010 realtime_start_time_ = realtime_clock::min_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001011 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001012 // We want to capture the earliest meaningful start time here. The start
1013 // time defaults to min_time when there's no meaningful value to report, so
1014 // let's ignore those.
Austin Schuh9dc42612021-09-20 20:41:29 -07001015 if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time) {
1016 bool accept = false;
1017 // We want to prioritize start times from the logger node. Really, we
1018 // want to prioritize start times with a valid realtime_clock time. So,
1019 // if we have a start time without a RT clock, prefer a start time with a
1020 // RT clock, even it if is later.
1021 if (parts_sorter.realtime_start_time() != realtime_clock::min_time) {
1022 // We've got a good one. See if the current start time has a good RT
1023 // clock, or if we should use this one instead.
1024 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1025 accept = true;
1026 } else if (realtime_start_time_ == realtime_clock::min_time) {
1027 // The previous start time doesn't have a good RT time, so it is very
1028 // likely the start time from a remote part file. We just found a
1029 // better start time with a real RT time, so switch to that instead.
1030 accept = true;
1031 }
1032 } else if (realtime_start_time_ == realtime_clock::min_time) {
1033 // We don't have a RT time, so take the oldest.
1034 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1035 accept = true;
1036 }
1037 }
1038
1039 if (accept) {
1040 monotonic_start_time_ = parts_sorter.monotonic_start_time();
1041 realtime_start_time_ = parts_sorter.realtime_start_time();
1042 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001043 }
1044 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001045
1046 // If there was no meaningful start time reported, just use min_time.
1047 if (monotonic_start_time_ == monotonic_clock::max_time) {
1048 monotonic_start_time_ = monotonic_clock::min_time;
1049 realtime_start_time_ = realtime_clock::min_time;
1050 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001051}
Austin Schuh8f52ed52020-11-30 23:12:39 -08001052
Austin Schuh0ca51f32020-12-25 21:51:45 -08001053std::vector<const LogParts *> NodeMerger::Parts() const {
1054 std::vector<const LogParts *> p;
1055 p.reserve(parts_sorters_.size());
1056 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
1057 p.emplace_back(&parts_sorter.parts());
1058 }
1059 return p;
1060}
1061
Austin Schuh8f52ed52020-11-30 23:12:39 -08001062Message *NodeMerger::Front() {
1063 // Return the current Front if we have one, otherwise go compute one.
1064 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -08001065 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001066 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -08001067 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001068 }
1069
1070 // Otherwise, do a simple search for the oldest message, deduplicating any
1071 // duplicates.
1072 Message *oldest = nullptr;
1073 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001074 for (LogPartsSorter &parts_sorter : parts_sorters_) {
1075 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -08001076 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001077 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001078 continue;
1079 }
1080 if (oldest == nullptr || *m < *oldest) {
1081 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -08001082 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001083 } else if (*m == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001084 // Found a duplicate. If there is a choice, we want the one which has
1085 // the timestamp time.
1086 if (!m->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001087 parts_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001088 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001089 current_->PopFront();
1090 current_ = &parts_sorter;
1091 oldest = m;
1092 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001093 CHECK_EQ(m->data->monotonic_timestamp_time,
1094 oldest->data->monotonic_timestamp_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001095 parts_sorter.PopFront();
1096 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001097 }
1098
1099 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -08001100 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001101 }
1102
Austin Schuhb000de62020-12-03 22:00:40 -08001103 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001104 CHECK_GE(oldest->timestamp.time, last_message_time_);
1105 last_message_time_ = oldest->timestamp.time;
Austin Schuh5dd22842021-11-17 16:09:39 -08001106 monotonic_oldest_time_ =
1107 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08001108 } else {
1109 last_message_time_ = monotonic_clock::max_time;
1110 }
1111
Austin Schuh8f52ed52020-11-30 23:12:39 -08001112 // Return the oldest message found. This will be nullptr if nothing was
1113 // found, indicating there is nothing left.
1114 return oldest;
1115}
1116
1117void NodeMerger::PopFront() {
1118 CHECK(current_ != nullptr) << "Popping before calling Front()";
1119 current_->PopFront();
1120 current_ = nullptr;
1121}
1122
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001123BootMerger::BootMerger(std::vector<LogParts> files) {
1124 std::vector<std::vector<LogParts>> boots;
1125
1126 // Now, we need to split things out by boot.
1127 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001128 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001129 if (boot_count + 1 > boots.size()) {
1130 boots.resize(boot_count + 1);
1131 }
1132 boots[boot_count].emplace_back(std::move(files[i]));
1133 }
1134
1135 node_mergers_.reserve(boots.size());
1136 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07001137 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001138 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -07001139 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001140 }
1141 node_mergers_.emplace_back(
1142 std::make_unique<NodeMerger>(std::move(boots[i])));
1143 }
1144}
1145
1146Message *BootMerger::Front() {
1147 Message *result = node_mergers_[index_]->Front();
1148
1149 if (result != nullptr) {
1150 return result;
1151 }
1152
1153 if (index_ + 1u == node_mergers_.size()) {
1154 // At the end of the last node merger, just return.
1155 return nullptr;
1156 } else {
1157 ++index_;
1158 return Front();
1159 }
1160}
1161
1162void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
1163
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001164std::vector<const LogParts *> BootMerger::Parts() const {
1165 std::vector<const LogParts *> results;
1166 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
1167 std::vector<const LogParts *> node_parts = node_merger->Parts();
1168
1169 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
1170 std::make_move_iterator(node_parts.end()));
1171 }
1172
1173 return results;
1174}
1175
Austin Schuhd2f96102020-12-01 20:27:29 -08001176TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001177 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -08001178 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001179 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001180 if (!configuration_) {
1181 configuration_ = part->config;
1182 } else {
1183 CHECK_EQ(configuration_.get(), part->config.get());
1184 }
1185 }
1186 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -08001187 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
1188 // pretty simple.
1189 if (configuration::MultiNode(config)) {
1190 nodes_data_.resize(config->nodes()->size());
1191 const Node *my_node = config->nodes()->Get(node());
1192 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
1193 const Node *node = config->nodes()->Get(node_index);
1194 NodeData *node_data = &nodes_data_[node_index];
1195 node_data->channels.resize(config->channels()->size());
1196 // We should save the channel if it is delivered to the node represented
1197 // by the NodeData, but not sent by that node. That combo means it is
1198 // forwarded.
1199 size_t channel_index = 0;
1200 node_data->any_delivered = false;
1201 for (const Channel *channel : *config->channels()) {
1202 node_data->channels[channel_index].delivered =
1203 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08001204 configuration::ChannelIsSendableOnNode(channel, my_node) &&
1205 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08001206 node_data->any_delivered = node_data->any_delivered ||
1207 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08001208 if (node_data->channels[channel_index].delivered) {
1209 const Connection *connection =
1210 configuration::ConnectionToNode(channel, node);
1211 node_data->channels[channel_index].time_to_live =
1212 chrono::nanoseconds(connection->time_to_live());
1213 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001214 ++channel_index;
1215 }
1216 }
1217
1218 for (const Channel *channel : *config->channels()) {
1219 source_node_.emplace_back(configuration::GetNodeIndex(
1220 config, channel->source_node()->string_view()));
1221 }
1222 }
1223}
1224
1225void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001226 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08001227 CHECK_NE(timestamp_mapper->node(), node());
1228 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
1229
1230 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001231 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08001232 // we could needlessly save data.
1233 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08001234 VLOG(1) << "Registering on node " << node() << " for peer node "
1235 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08001236 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
1237
1238 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07001239
1240 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001241 }
1242}
1243
Austin Schuh79b30942021-01-24 22:32:21 -08001244void TimestampMapper::QueueMessage(Message *m) {
1245 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08001246 .channel_index = m->channel_index,
1247 .queue_index = m->queue_index,
1248 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001249 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07001250 .remote_queue_index = BootQueueIndex::Invalid(),
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001251 .monotonic_remote_time = BootTimestamp::min_time(),
Austin Schuhd2f96102020-12-01 20:27:29 -08001252 .realtime_remote_time = realtime_clock::min_time,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001253 .monotonic_timestamp_time = BootTimestamp::min_time(),
Austin Schuh79b30942021-01-24 22:32:21 -08001254 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08001255}
1256
1257TimestampedMessage *TimestampMapper::Front() {
1258 // No need to fetch anything new. A previous message still exists.
1259 switch (first_message_) {
1260 case FirstMessage::kNeedsUpdate:
1261 break;
1262 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08001263 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001264 case FirstMessage::kNullptr:
1265 return nullptr;
1266 }
1267
Austin Schuh79b30942021-01-24 22:32:21 -08001268 if (matched_messages_.empty()) {
1269 if (!QueueMatched()) {
1270 first_message_ = FirstMessage::kNullptr;
1271 return nullptr;
1272 }
1273 }
1274 first_message_ = FirstMessage::kInMessage;
1275 return &matched_messages_.front();
1276}
1277
1278bool TimestampMapper::QueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08001279 if (nodes_data_.empty()) {
1280 // Simple path. We are single node, so there are no timestamps to match!
1281 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001282 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001283 if (!m) {
Austin Schuh79b30942021-01-24 22:32:21 -08001284 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001285 }
Austin Schuh79b30942021-01-24 22:32:21 -08001286 // Enqueue this message into matched_messages_ so we have a place to
1287 // associate remote timestamps, and return it.
1288 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08001289
Austin Schuh79b30942021-01-24 22:32:21 -08001290 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1291 last_message_time_ = matched_messages_.back().monotonic_event_time;
1292
1293 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001294 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08001295 timestamp_callback_(&matched_messages_.back());
1296 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001297 }
1298
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001299 // We need to only add messages to the list so they get processed for
1300 // messages which are delivered. Reuse the flow below which uses messages_
1301 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08001302 if (messages_.empty()) {
1303 if (!Queue()) {
1304 // Found nothing to add, we are out of data!
Austin Schuh79b30942021-01-24 22:32:21 -08001305 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001306 }
1307
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001308 // Now that it has been added (and cannibalized), forget about it
1309 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001310 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001311 }
1312
1313 Message *m = &(messages_.front());
1314
1315 if (source_node_[m->channel_index] == node()) {
1316 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08001317 QueueMessage(m);
1318 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1319 last_message_time_ = matched_messages_.back().monotonic_event_time;
1320 messages_.pop_front();
1321 timestamp_callback_(&matched_messages_.back());
1322 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001323 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001324 // Got a timestamp, find the matching remote data, match it, and return
1325 // it.
Austin Schuhd2f96102020-12-01 20:27:29 -08001326 Message data = MatchingMessageFor(*m);
1327
1328 // Return the data from the remote. The local message only has timestamp
1329 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08001330 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08001331 .channel_index = m->channel_index,
1332 .queue_index = m->queue_index,
1333 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001334 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07001335 .remote_queue_index =
1336 BootQueueIndex{.boot = m->monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001337 .index = m->data->remote_queue_index.value()},
1338 .monotonic_remote_time = {m->monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08001339 m->data->monotonic_remote_time.value()},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001340 .realtime_remote_time = m->data->realtime_remote_time.value(),
1341 .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
1342 m->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08001343 .data = std::move(data.data)});
1344 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1345 last_message_time_ = matched_messages_.back().monotonic_event_time;
1346 // Since messages_ holds the data, drop it.
1347 messages_.pop_front();
1348 timestamp_callback_(&matched_messages_.back());
1349 return true;
1350 }
1351}
1352
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001353void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08001354 while (last_message_time_ <= queue_time) {
1355 if (!QueueMatched()) {
1356 return;
1357 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001358 }
1359}
1360
Austin Schuhe639ea12021-01-25 13:00:22 -08001361void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001362 // Note: queueing for time doesn't really work well across boots. So we
1363 // just assume that if you are using this, you only care about the current
1364 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001365 //
1366 // TODO(austin): Is that the right concept?
1367 //
Austin Schuhe639ea12021-01-25 13:00:22 -08001368 // Make sure we have something queued first. This makes the end time
1369 // calculation simpler, and is typically what folks want regardless.
1370 if (matched_messages_.empty()) {
1371 if (!QueueMatched()) {
1372 return;
1373 }
1374 }
1375
1376 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001377 std::max(monotonic_start_time(
1378 matched_messages_.front().monotonic_event_time.boot),
1379 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08001380 time_estimation_buffer;
1381
1382 // Place sorted messages on the list until we have
1383 // --time_estimation_buffer_seconds seconds queued up (but queue at least
1384 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001385 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08001386 if (!QueueMatched()) {
1387 return;
1388 }
1389 }
1390}
1391
Austin Schuhd2f96102020-12-01 20:27:29 -08001392void TimestampMapper::PopFront() {
1393 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08001394 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001395 first_message_ = FirstMessage::kNeedsUpdate;
1396
Austin Schuh79b30942021-01-24 22:32:21 -08001397 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001398}
1399
1400Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001401 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001402 CHECK_NOTNULL(message.data);
1403 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07001404 const BootQueueIndex remote_queue_index =
1405 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001406 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08001407
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001408 CHECK(message.data->monotonic_remote_time.has_value());
1409 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08001410
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001411 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07001412 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08001413 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001414 const realtime_clock::time_point realtime_remote_time =
1415 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001416
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001417 TimestampMapper *peer =
1418 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001419
1420 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001421 // asked to pull a timestamp from a peer which doesn't exist, return an
1422 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08001423 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001424 // TODO(austin): Make sure the tests hit all these paths with a boot count
1425 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001426 return Message{.channel_index = message.channel_index,
1427 .queue_index = remote_queue_index,
1428 .timestamp = monotonic_remote_time,
1429 .monotonic_remote_boot = 0xffffff,
1430 .monotonic_timestamp_boot = 0xffffff,
1431 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08001432 }
1433
1434 // The queue which will have the matching data, if available.
1435 std::deque<Message> *data_queue =
1436 &peer->nodes_data_[node()].channels[message.channel_index].messages;
1437
Austin Schuh79b30942021-01-24 22:32:21 -08001438 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08001439
1440 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001441 return Message{.channel_index = message.channel_index,
1442 .queue_index = remote_queue_index,
1443 .timestamp = monotonic_remote_time,
1444 .monotonic_remote_boot = 0xffffff,
1445 .monotonic_timestamp_boot = 0xffffff,
1446 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08001447 }
1448
Austin Schuhd2f96102020-12-01 20:27:29 -08001449 if (remote_queue_index < data_queue->front().queue_index ||
1450 remote_queue_index > data_queue->back().queue_index) {
1451 return Message{
1452 .channel_index = message.channel_index,
1453 .queue_index = remote_queue_index,
1454 .timestamp = monotonic_remote_time,
Austin Schuh48507722021-07-17 17:29:24 -07001455 .monotonic_remote_boot = 0xffffff,
1456 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001457 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08001458 }
1459
Austin Schuh993ccb52020-12-12 15:59:32 -08001460 // The algorithm below is constant time with some assumptions. We need there
1461 // to be no missing messages in the data stream. This also assumes a queue
1462 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07001463 if (data_queue->back().queue_index.boot ==
1464 data_queue->front().queue_index.boot &&
1465 (data_queue->back().queue_index.index -
1466 data_queue->front().queue_index.index + 1u ==
1467 data_queue->size())) {
1468 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08001469 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07001470 //
1471 // TODO(austin): Move if not reliable.
1472 Message result = (*data_queue)[remote_queue_index.index -
1473 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08001474
1475 CHECK_EQ(result.timestamp, monotonic_remote_time)
1476 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh6a7358f2021-11-18 22:40:40 -08001477 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08001478 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1479 // Now drop the data off the front. We have deduplicated timestamps, so we
1480 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07001481 data_queue->erase(
1482 data_queue->begin(),
1483 data_queue->begin() +
1484 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08001485 return result;
1486 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07001487 // TODO(austin): Binary search.
1488 auto it = std::find_if(
1489 data_queue->begin(), data_queue->end(),
1490 [remote_queue_index,
1491 remote_boot = monotonic_remote_time.boot](const Message &m) {
1492 return m.queue_index == remote_queue_index &&
1493 m.timestamp.boot == remote_boot;
1494 });
Austin Schuh993ccb52020-12-12 15:59:32 -08001495 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001496 return Message{.channel_index = message.channel_index,
1497 .queue_index = remote_queue_index,
1498 .timestamp = monotonic_remote_time,
1499 .monotonic_remote_boot = 0xffffff,
1500 .monotonic_timestamp_boot = 0xffffff,
1501 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08001502 }
1503
1504 Message result = std::move(*it);
1505
1506 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001507 << ": Queue index matches, but timestamp doesn't. Please "
1508 "investigate!";
1509 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
1510 << ": Queue index matches, but timestamp doesn't. Please "
1511 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08001512
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08001513 // Erase everything up to this message. We want to keep 1 message in the
1514 // queue so we can handle reliable messages forwarded across boots.
1515 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08001516
1517 return result;
1518 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001519}
1520
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001521void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001522 if (queued_until_ > t) {
1523 return;
1524 }
1525 while (true) {
1526 if (!messages_.empty() && messages_.back().timestamp > t) {
1527 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
1528 return;
1529 }
1530
1531 if (!Queue()) {
1532 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001533 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08001534 return;
1535 }
1536
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001537 // Now that it has been added (and cannibalized), forget about it
1538 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001539 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001540 }
1541}
1542
1543bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001544 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001545 if (m == nullptr) {
1546 return false;
1547 }
1548 for (NodeData &node_data : nodes_data_) {
1549 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07001550 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08001551 if (node_data.channels[m->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08001552 // If we have data but no timestamps (logs where the timestamps didn't get
1553 // logged are classic), we can grow this indefinitely. We don't need to
1554 // keep anything that is older than the last message returned.
1555
1556 // We have the time on the source node.
1557 // We care to wait until we have the time on the destination node.
1558 std::deque<Message> &messages =
1559 node_data.channels[m->channel_index].messages;
1560 // Max delay over the network is the TTL, so let's take the queue time and
1561 // add TTL to it. Don't forget any messages which are reliable until
1562 // someone can come up with a good reason to forget those too.
1563 if (node_data.channels[m->channel_index].time_to_live >
1564 chrono::nanoseconds(0)) {
1565 // We need to make *some* assumptions about network delay for this to
1566 // work. We want to only look at the RX side. This means we need to
1567 // track the last time a message was popped from any channel from the
1568 // node sending this message, and compare that to the max time we expect
1569 // that a message will take to be delivered across the network. This
1570 // assumes that messages are popped in time order as a proxy for
1571 // measuring the distributed time at this layer.
1572 //
1573 // Leave at least 1 message in here so we can handle reboots and
1574 // messages getting sent twice.
1575 while (messages.size() > 1u &&
1576 messages.begin()->timestamp +
1577 node_data.channels[m->channel_index].time_to_live +
1578 chrono::duration_cast<chrono::nanoseconds>(
1579 chrono::duration<double>(FLAGS_max_network_delay)) <
1580 last_popped_message_time_) {
1581 messages.pop_front();
1582 }
1583 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001584 node_data.channels[m->channel_index].messages.emplace_back(*m);
1585 }
1586 }
1587
1588 messages_.emplace_back(std::move(*m));
1589 return true;
1590}
1591
1592std::string TimestampMapper::DebugString() const {
1593 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07001594 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08001595 for (const Message &message : messages_) {
1596 ss << " " << message << "\n";
1597 }
1598 ss << "] queued_until " << queued_until_;
1599 for (const NodeData &ns : nodes_data_) {
1600 if (ns.peer == nullptr) continue;
1601 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
1602 size_t channel_index = 0;
1603 for (const NodeData::ChannelData &channel_data :
1604 ns.peer->nodes_data_[node()].channels) {
1605 if (channel_data.messages.empty()) {
1606 continue;
1607 }
Austin Schuhb000de62020-12-03 22:00:40 -08001608
Austin Schuhd2f96102020-12-01 20:27:29 -08001609 ss << " channel " << channel_index << " [\n";
1610 for (const Message &m : channel_data.messages) {
1611 ss << " " << m << "\n";
1612 }
1613 ss << " ]\n";
1614 ++channel_index;
1615 }
1616 ss << "] queued_until " << ns.peer->queued_until_;
1617 }
1618 return ss.str();
1619}
1620
Austin Schuhee711052020-08-24 16:06:09 -07001621std::string MaybeNodeName(const Node *node) {
1622 if (node != nullptr) {
1623 return node->name()->str() + " ";
1624 }
1625 return "";
1626}
1627
Brian Silvermanf51499a2020-09-21 12:49:08 -07001628} // namespace aos::logger