blob: 59b106e5f348e2ce32389c28b176adb1a1394182 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Austin Schuha36c8902019-12-30 18:07:15 -080010
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070013#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080014#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080015#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "gflags/gflags.h"
18#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080019
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070020#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070021#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070022#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070023#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070024#else
25#define ENABLE_LZMA 0
26#endif
27
28#if ENABLE_LZMA
29#include "aos/events/logging/lzma_encoder.h"
30#endif
31
Austin Schuh7fbf5a72020-09-21 16:28:13 -070032DEFINE_int32(flush_size, 128000,
Austin Schuha36c8902019-12-30 18:07:15 -080033 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070034DEFINE_double(
35 flush_period, 5.0,
36 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080037
Austin Schuha040c3f2021-02-13 16:09:07 -080038DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080039 max_network_delay, 1.0,
40 "Max time to assume a message takes to cross the network before we are "
41 "willing to drop it from our buffers and assume it didn't make it. "
42 "Increasing this number can increase memory usage depending on the packet "
43 "loss of your network or if the timestamps aren't logged for a message.");
44
45DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080046 max_out_of_order, -1,
47 "If set, this overrides the max out of order duration for a log file.");
48
Austin Schuh0e8db662021-07-06 10:43:47 -070049DEFINE_bool(workaround_double_headers, true,
50 "Some old log files have two headers at the beginning. Use the "
51 "last header as the actual header.");
52
Brian Silvermanf51499a2020-09-21 12:49:08 -070053namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070054namespace {
Austin Schuha36c8902019-12-30 18:07:15 -080055
Austin Schuh05b70472020-01-01 17:11:17 -080056namespace chrono = std::chrono;
57
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070058template <typename T>
59void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
60 if (t.has_value()) {
61 *os << *t;
62 } else {
63 *os << "null";
64 }
65}
66} // namespace
67
Brian Silvermanf51499a2020-09-21 12:49:08 -070068DetachedBufferWriter::DetachedBufferWriter(
69 std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
70 : filename_(filename), encoder_(std::move(encoder)) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -070071 if (!util::MkdirPIfSpace(filename, 0777)) {
72 ran_out_of_space_ = true;
73 } else {
74 fd_ = open(std::string(filename).c_str(),
75 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
76 if (fd_ == -1 && errno == ENOSPC) {
77 ran_out_of_space_ = true;
78 } else {
Austin Schuh58646e22021-08-23 23:51:46 -070079 PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
80 << " for writing";
81 VLOG(1) << "Opened " << this->filename() << " for writing";
Brian Silvermana9f2ec92020-10-06 18:00:53 -070082 }
83 }
Austin Schuha36c8902019-12-30 18:07:15 -080084}
85
86DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -070087 Close();
88 if (ran_out_of_space_) {
89 CHECK(acknowledge_ran_out_of_space_)
90 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -070091 }
Austin Schuh2f8fd752020-09-01 22:38:28 -070092}
93
Brian Silvermand90905f2020-09-23 14:42:56 -070094DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070095 *this = std::move(other);
96}
97
Brian Silverman87ac0402020-09-17 14:47:01 -070098// When other is destroyed "soon" (which it should be because we're getting an
99// rvalue reference to it), it will flush etc all the data we have queued up
100// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700101DetachedBufferWriter &DetachedBufferWriter::operator=(
102 DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700103 std::swap(filename_, other.filename_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700104 std::swap(encoder_, other.encoder_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700105 std::swap(fd_, other.fd_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700106 std::swap(ran_out_of_space_, other.ran_out_of_space_);
107 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700108 std::swap(iovec_, other.iovec_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700109 std::swap(max_write_time_, other.max_write_time_);
110 std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
111 std::swap(max_write_time_messages_, other.max_write_time_messages_);
112 std::swap(total_write_time_, other.total_write_time_);
113 std::swap(total_write_count_, other.total_write_count_);
114 std::swap(total_write_messages_, other.total_write_messages_);
115 std::swap(total_write_bytes_, other.total_write_bytes_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700116 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800117}
118
Brian Silvermanf51499a2020-09-21 12:49:08 -0700119void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700120 if (ran_out_of_space_) {
121 // We don't want any later data to be written after space becomes
122 // available, so refuse to write anything more once we've dropped data
123 // because we ran out of space.
124 VLOG(1) << "Ignoring span: " << span.size();
125 return;
126 }
127
Austin Schuhbd06ae42021-03-31 22:48:21 -0700128 aos::monotonic_clock::time_point now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700129 if (encoder_->may_bypass() && span.size() > 4096u) {
130 // Over this threshold, we'll assume it's cheaper to add an extra
131 // syscall to write the data immediately instead of copying it to
132 // enqueue.
Austin Schuha36c8902019-12-30 18:07:15 -0800133
Brian Silvermanf51499a2020-09-21 12:49:08 -0700134 // First, flush everything.
135 while (encoder_->queue_size() > 0u) {
136 Flush();
137 }
Austin Schuhde031b72020-01-10 19:34:41 -0800138
Brian Silvermanf51499a2020-09-21 12:49:08 -0700139 // Then, write it directly.
140 const auto start = aos::monotonic_clock::now();
141 const ssize_t written = write(fd_, span.data(), span.size());
142 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700143 HandleWriteReturn(written, span.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700144 UpdateStatsForWrite(end - start, written, 1);
Austin Schuhbd06ae42021-03-31 22:48:21 -0700145 now = end;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700146 } else {
147 encoder_->Encode(CopySpanAsDetachedBuffer(span));
Austin Schuhbd06ae42021-03-31 22:48:21 -0700148 now = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800149 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700150
Austin Schuhbd06ae42021-03-31 22:48:21 -0700151 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800152}
153
Brian Silverman0465fcf2020-09-24 00:29:18 -0700154void DetachedBufferWriter::Close() {
155 if (fd_ == -1) {
156 return;
157 }
158 encoder_->Finish();
159 while (encoder_->queue_size() > 0) {
160 Flush();
161 }
162 if (close(fd_) == -1) {
163 if (errno == ENOSPC) {
164 ran_out_of_space_ = true;
165 } else {
166 PLOG(ERROR) << "Closing log file failed";
167 }
168 }
169 fd_ = -1;
Austin Schuh58646e22021-08-23 23:51:46 -0700170 VLOG(1) << "Closed " << filename();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700171}
172
Austin Schuha36c8902019-12-30 18:07:15 -0800173void DetachedBufferWriter::Flush() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700174 if (ran_out_of_space_) {
175 // We don't want any later data to be written after space becomes available,
176 // so refuse to write anything more once we've dropped data because we ran
177 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700178 if (encoder_) {
179 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
180 encoder_->Clear(encoder_->queue().size());
181 } else {
182 VLOG(1) << "No queue to ignore";
183 }
184 return;
185 }
186
187 const auto queue = encoder_->queue();
188 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700189 return;
190 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700191
Austin Schuha36c8902019-12-30 18:07:15 -0800192 iovec_.clear();
Brian Silvermanf51499a2020-09-21 12:49:08 -0700193 const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
194 iovec_.resize(iovec_size);
Austin Schuha36c8902019-12-30 18:07:15 -0800195 size_t counted_size = 0;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700196 for (size_t i = 0; i < iovec_size; ++i) {
197 iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
198 iovec_[i].iov_len = queue[i].size();
199 counted_size += iovec_[i].iov_len;
Austin Schuha36c8902019-12-30 18:07:15 -0800200 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700201
202 const auto start = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800203 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700204 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700205 HandleWriteReturn(written, counted_size);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700206
207 encoder_->Clear(iovec_size);
208
209 UpdateStatsForWrite(end - start, written, iovec_size);
210}
211
Brian Silverman0465fcf2020-09-24 00:29:18 -0700212void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
213 size_t write_size) {
214 if (write_return == -1 && errno == ENOSPC) {
215 ran_out_of_space_ = true;
216 return;
217 }
218 PCHECK(write_return >= 0) << ": write failed";
219 if (write_return < static_cast<ssize_t>(write_size)) {
220 // Sometimes this happens instead of ENOSPC. On a real filesystem, this
221 // never seems to happen in any other case. If we ever want to log to a
222 // socket, this will happen more often. However, until we get there, we'll
223 // just assume it means we ran out of space.
224 ran_out_of_space_ = true;
225 return;
226 }
227}
228
Brian Silvermanf51499a2020-09-21 12:49:08 -0700229void DetachedBufferWriter::UpdateStatsForWrite(
230 aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
231 if (duration > max_write_time_) {
232 max_write_time_ = duration;
233 max_write_time_bytes_ = written;
234 max_write_time_messages_ = iovec_size;
235 }
236 total_write_time_ += duration;
237 ++total_write_count_;
238 total_write_messages_ += iovec_size;
239 total_write_bytes_ += written;
240}
241
Austin Schuhbd06ae42021-03-31 22:48:21 -0700242void DetachedBufferWriter::FlushAtThreshold(
243 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700244 if (ran_out_of_space_) {
245 // We don't want any later data to be written after space becomes available,
246 // so refuse to write anything more once we've dropped data because we ran
247 // out of space.
248 if (encoder_) {
249 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
250 encoder_->Clear(encoder_->queue().size());
251 } else {
252 VLOG(1) << "No queue to ignore";
253 }
254 return;
255 }
256
Austin Schuhbd06ae42021-03-31 22:48:21 -0700257 // We don't want to flush the first time through. Otherwise we will flush as
258 // the log file header might be compressing, defeating any parallelism and
259 // queueing there.
260 if (last_flush_time_ == aos::monotonic_clock::min_time) {
261 last_flush_time_ = now;
262 }
263
Brian Silvermanf51499a2020-09-21 12:49:08 -0700264 // Flush if we are at the max number of iovs per writev, because there's no
265 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700266 // data queued up or if it has been long enough.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700267 while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700268 encoder_->queue_size() >= IOV_MAX ||
269 now > last_flush_time_ +
270 chrono::duration_cast<chrono::nanoseconds>(
271 chrono::duration<double>(FLAGS_flush_period))) {
272 last_flush_time_ = now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700273 Flush();
274 }
Austin Schuha36c8902019-12-30 18:07:15 -0800275}
276
277flatbuffers::Offset<MessageHeader> PackMessage(
278 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
279 int channel_index, LogType log_type) {
280 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
281
282 switch (log_type) {
283 case LogType::kLogMessage:
284 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800285 case LogType::kLogRemoteMessage:
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700286 data_offset = fbb->CreateVector(
287 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800288 break;
289
290 case LogType::kLogDeliveryTimeOnly:
291 break;
292 }
293
294 MessageHeader::Builder message_header_builder(*fbb);
295 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800296
297 switch (log_type) {
298 case LogType::kLogRemoteMessage:
299 message_header_builder.add_queue_index(context.remote_queue_index);
300 message_header_builder.add_monotonic_sent_time(
301 context.monotonic_remote_time.time_since_epoch().count());
302 message_header_builder.add_realtime_sent_time(
303 context.realtime_remote_time.time_since_epoch().count());
304 break;
305
306 case LogType::kLogMessage:
307 case LogType::kLogMessageAndDeliveryTime:
308 case LogType::kLogDeliveryTimeOnly:
309 message_header_builder.add_queue_index(context.queue_index);
310 message_header_builder.add_monotonic_sent_time(
311 context.monotonic_event_time.time_since_epoch().count());
312 message_header_builder.add_realtime_sent_time(
313 context.realtime_event_time.time_since_epoch().count());
314 break;
315 }
Austin Schuha36c8902019-12-30 18:07:15 -0800316
317 switch (log_type) {
318 case LogType::kLogMessage:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800319 case LogType::kLogRemoteMessage:
Austin Schuha36c8902019-12-30 18:07:15 -0800320 message_header_builder.add_data(data_offset);
321 break;
322
323 case LogType::kLogMessageAndDeliveryTime:
324 message_header_builder.add_data(data_offset);
325 [[fallthrough]];
326
327 case LogType::kLogDeliveryTimeOnly:
328 message_header_builder.add_monotonic_remote_time(
329 context.monotonic_remote_time.time_since_epoch().count());
330 message_header_builder.add_realtime_remote_time(
331 context.realtime_remote_time.time_since_epoch().count());
332 message_header_builder.add_remote_queue_index(context.remote_queue_index);
333 break;
334 }
335
336 return message_header_builder.Finish();
337}
338
Austin Schuhcd368422021-11-22 21:23:29 -0800339SpanReader::SpanReader(std::string_view filename, bool quiet)
340 : filename_(filename) {
Tyler Chatow2015bc62021-08-04 21:15:09 -0700341 decoder_ = std::make_unique<DummyDecoder>(filename);
342
343 static constexpr std::string_view kXz = ".xz";
James Kuszmauldd0a5042021-10-28 23:38:04 -0700344 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700345 if (filename.substr(filename.size() - kXz.size()) == kXz) {
346#if ENABLE_LZMA
Austin Schuhcd368422021-11-22 21:23:29 -0800347 decoder_ =
348 std::make_unique<ThreadedLzmaDecoder>(std::move(decoder_), quiet);
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700349#else
Austin Schuhcd368422021-11-22 21:23:29 -0800350 (void)quiet;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700351 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
352#endif
James Kuszmauldd0a5042021-10-28 23:38:04 -0700353 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
354 decoder_ = std::make_unique<SnappyDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700355 }
Austin Schuh05b70472020-01-01 17:11:17 -0800356}
357
Austin Schuhcf5f6442021-07-06 10:43:28 -0700358absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800359 // Make sure we have enough for the size.
360 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
361 if (!ReadBlock()) {
362 return absl::Span<const uint8_t>();
363 }
364 }
365
366 // Now make sure we have enough for the message.
367 const size_t data_size =
368 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
369 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -0800370 if (data_size == sizeof(flatbuffers::uoffset_t)) {
371 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
372 LOG(ERROR) << " Rest of log file is "
373 << absl::BytesToHexString(std::string_view(
374 reinterpret_cast<const char *>(data_.data() +
375 consumed_data_),
376 data_.size() - consumed_data_));
377 return absl::Span<const uint8_t>();
378 }
Austin Schuh05b70472020-01-01 17:11:17 -0800379 while (data_.size() < consumed_data_ + data_size) {
380 if (!ReadBlock()) {
381 return absl::Span<const uint8_t>();
382 }
383 }
384
385 // And return it, consuming the data.
386 const uint8_t *data_ptr = data_.data() + consumed_data_;
387
Austin Schuh05b70472020-01-01 17:11:17 -0800388 return absl::Span<const uint8_t>(data_ptr, data_size);
389}
390
Austin Schuhcf5f6442021-07-06 10:43:28 -0700391void SpanReader::ConsumeMessage() {
392 consumed_data_ +=
393 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
394 sizeof(flatbuffers::uoffset_t);
395}
396
397absl::Span<const uint8_t> SpanReader::ReadMessage() {
398 absl::Span<const uint8_t> result = PeekMessage();
399 if (result != absl::Span<const uint8_t>()) {
400 ConsumeMessage();
401 }
402 return result;
403}
404
Austin Schuh05b70472020-01-01 17:11:17 -0800405bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700406 // This is the amount of data we grab at a time. Doing larger chunks minimizes
407 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -0800408 constexpr size_t kReadSize = 256 * 1024;
409
410 // Strip off any unused data at the front.
411 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700412 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -0800413 consumed_data_ = 0;
414 }
415
416 const size_t starting_size = data_.size();
417
418 // This should automatically grow the backing store. It won't shrink if we
419 // get a small chunk later. This reduces allocations when we want to append
420 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700421 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -0800422
Brian Silvermanf51499a2020-09-21 12:49:08 -0700423 const size_t count =
424 decoder_->Read(data_.begin() + starting_size, data_.end());
425 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -0800426 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -0800427 return false;
428 }
Austin Schuh05b70472020-01-01 17:11:17 -0800429
430 return true;
431}
432
Austin Schuhadd6eb32020-11-09 21:24:26 -0800433std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -0700434 SpanReader *span_reader) {
435 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800436
437 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800438 if (config_data == absl::Span<const uint8_t>()) {
439 return std::nullopt;
440 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800441
Austin Schuh5212cad2020-09-09 23:12:09 -0700442 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700443 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -0800444 if (!result.Verify()) {
445 return std::nullopt;
446 }
Austin Schuh0e8db662021-07-06 10:43:47 -0700447
448 if (FLAGS_workaround_double_headers) {
449 while (true) {
450 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
451 if (maybe_header_data == absl::Span<const uint8_t>()) {
452 break;
453 }
454
455 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
456 maybe_header_data);
457 if (maybe_header.Verify()) {
458 LOG(WARNING) << "Found duplicate LogFileHeader in "
459 << span_reader->filename();
460 ResizeableBuffer header_data_copy;
461 header_data_copy.resize(maybe_header_data.size());
462 memcpy(header_data_copy.data(), maybe_header_data.begin(),
463 header_data_copy.size());
464 result = SizePrefixedFlatbufferVector<LogFileHeader>(
465 std::move(header_data_copy));
466
467 span_reader->ConsumeMessage();
468 } else {
469 break;
470 }
471 }
472 }
Austin Schuhe09beb12020-12-11 20:04:27 -0800473 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800474}
475
Austin Schuh0e8db662021-07-06 10:43:47 -0700476std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
477 std::string_view filename) {
478 SpanReader span_reader(filename);
479 return ReadHeader(&span_reader);
480}
481
Austin Schuhadd6eb32020-11-09 21:24:26 -0800482std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -0800483 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -0700484 SpanReader span_reader(filename);
485 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
486 for (size_t i = 0; i < n + 1; ++i) {
487 data_span = span_reader.ReadMessage();
488
489 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800490 if (data_span == absl::Span<const uint8_t>()) {
491 return std::nullopt;
492 }
Austin Schuh5212cad2020-09-09 23:12:09 -0700493 }
494
Brian Silverman354697a2020-09-22 21:06:32 -0700495 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700496 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -0800497 if (!result.Verify()) {
498 return std::nullopt;
499 }
500 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -0700501}
502
Austin Schuh05b70472020-01-01 17:11:17 -0800503MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -0700504 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -0800505 raw_log_file_header_(
506 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -0700507 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
508 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -0800509
510 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -0700511 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -0800512
Austin Schuh0e8db662021-07-06 10:43:47 -0700513 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -0800514
Austin Schuh5b728b72021-06-16 14:57:15 -0700515 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
516
Austin Schuhcde938c2020-02-02 17:30:07 -0800517 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -0800518 FLAGS_max_out_of_order > 0
519 ? chrono::duration_cast<chrono::nanoseconds>(
520 chrono::duration<double>(FLAGS_max_out_of_order))
521 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -0800522
523 VLOG(1) << "Opened " << filename << " as node "
524 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -0800525}
526
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700527std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800528 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
529 if (msg_data == absl::Span<const uint8_t>()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700530 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -0800531 }
532
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700533 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
534 CHECK(msg.Verify()) << ": Corrupted message from " << filename();
Austin Schuh05b70472020-01-01 17:11:17 -0800535
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700536 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -0700537
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700538 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -0800539
540 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -0800541
542 if (VLOG_IS_ON(3)) {
543 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
544 } else if (VLOG_IS_ON(2)) {
545 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
546 msg_copy.mutable_message()->clear_data();
547 VLOG(2) << "Read from " << filename() << " data "
548 << FlatbufferToJson(msg_copy);
549 }
550
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700551 return result;
552}
553
554std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
555 const MessageHeader &message) {
556 const size_t data_size = message.has_data() ? message.data()->size() : 0;
557
558 UnpackedMessageHeader *const unpacked_message =
559 reinterpret_cast<UnpackedMessageHeader *>(
560 malloc(sizeof(UnpackedMessageHeader) + data_size +
561 kChannelDataAlignment - 1));
562
563 CHECK(message.has_channel_index());
564 CHECK(message.has_monotonic_sent_time());
565
566 absl::Span<uint8_t> span;
567 if (data_size > 0) {
568 span =
569 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
570 &unpacked_message->actual_data[0], data_size)),
571 data_size);
572 }
573
Austin Schuh826e6ce2021-11-18 20:33:10 -0800574 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700575 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -0800576 monotonic_remote_time = aos::monotonic_clock::time_point(
577 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700578 }
579 std::optional<realtime_clock::time_point> realtime_remote_time;
580 if (message.has_realtime_remote_time()) {
581 realtime_remote_time = realtime_clock::time_point(
582 chrono::nanoseconds(message.realtime_remote_time()));
583 }
584
585 std::optional<uint32_t> remote_queue_index;
586 if (message.has_remote_queue_index()) {
587 remote_queue_index = message.remote_queue_index();
588 }
589
590 new (unpacked_message) UnpackedMessageHeader{
591 .channel_index = message.channel_index(),
592 .monotonic_sent_time = monotonic_clock::time_point(
593 chrono::nanoseconds(message.monotonic_sent_time())),
594 .realtime_sent_time = realtime_clock::time_point(
595 chrono::nanoseconds(message.realtime_sent_time())),
596 .queue_index = message.queue_index(),
597 .monotonic_remote_time = monotonic_remote_time,
598 .realtime_remote_time = realtime_remote_time,
599 .remote_queue_index = remote_queue_index,
600 .monotonic_timestamp_time = monotonic_clock::time_point(
601 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
602 .has_monotonic_timestamp_time = message.has_monotonic_timestamp_time(),
603 .span = span};
604
605 if (data_size > 0) {
606 memcpy(span.data(), message.data()->data(), data_size);
607 }
608
609 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
610 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -0800611}
612
Austin Schuhc41603c2020-10-11 16:17:37 -0700613PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -0700614 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
Brian Silvermanfee16972021-09-14 12:06:38 -0700615 if (parts_.parts.size() >= 2) {
616 next_message_reader_.emplace(parts_.parts[1]);
617 }
Austin Schuh48507722021-07-17 17:29:24 -0700618 ComputeBootCounts();
619}
620
621void PartsMessageReader::ComputeBootCounts() {
622 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
623 std::nullopt);
624
625 // We have 3 vintages of log files with different amounts of information.
626 if (log_file_header()->has_boot_uuids()) {
627 // The new hotness with the boots explicitly listed out. We can use the log
628 // file header to compute the boot count of all relevant nodes.
629 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
630 size_t node_index = 0;
631 for (const flatbuffers::String *boot_uuid :
632 *log_file_header()->boot_uuids()) {
633 CHECK(parts_.boots);
634 if (boot_uuid->size() != 0) {
635 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
636 if (it != parts_.boots->boot_count_map.end()) {
637 boot_counts_[node_index] = it->second;
638 }
639 } else if (parts().boots->boots[node_index].size() == 1u) {
640 boot_counts_[node_index] = 0;
641 }
642 ++node_index;
643 }
644 } else {
645 // Older multi-node logs which are guarenteed to have UUIDs logged, or
646 // single node log files with boot UUIDs in the header. We only know how to
647 // order certain boots in certain circumstances.
648 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
649 for (size_t node_index = 0; node_index < boot_counts_.size();
650 ++node_index) {
651 CHECK(parts_.boots);
652 if (parts().boots->boots[node_index].size() == 1u) {
653 boot_counts_[node_index] = 0;
654 }
655 }
656 } else {
657 // Really old single node logs without any UUIDs. They can't reboot.
658 CHECK_EQ(boot_counts_.size(), 1u);
659 boot_counts_[0] = 0u;
660 }
661 }
662}
Austin Schuhc41603c2020-10-11 16:17:37 -0700663
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700664std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -0700665 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700666 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -0700667 message_reader_.ReadMessage();
668 if (message) {
669 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700670 const monotonic_clock::time_point monotonic_sent_time =
671 message->monotonic_sent_time;
672
673 // TODO(austin): Does this work with startup? Might need to use the
674 // start time.
675 // TODO(austin): Does this work with startup when we don't know the
676 // remote start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -0800677 if (monotonic_sent_time >
678 parts_.monotonic_start_time + max_out_of_order_duration()) {
679 after_start_ = true;
680 }
681 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -0800682 CHECK_GE(monotonic_sent_time,
683 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -0800684 << ": Max out of order of " << max_out_of_order_duration().count()
685 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -0800686 << parts_.monotonic_start_time << " currently reading "
687 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -0800688 }
Austin Schuhc41603c2020-10-11 16:17:37 -0700689 return message;
690 }
691 NextLog();
692 }
Austin Schuh32f68492020-11-08 21:45:51 -0800693 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700694 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -0700695}
696
697void PartsMessageReader::NextLog() {
698 if (next_part_index_ == parts_.parts.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -0700699 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -0700700 done_ = true;
701 return;
702 }
Brian Silvermanfee16972021-09-14 12:06:38 -0700703 CHECK(next_message_reader_);
704 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -0700705 ComputeBootCounts();
Brian Silvermanfee16972021-09-14 12:06:38 -0700706 if (next_part_index_ + 1 < parts_.parts.size()) {
707 next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
708 } else {
709 next_message_reader_.reset();
710 }
Austin Schuhc41603c2020-10-11 16:17:37 -0700711 ++next_part_index_;
712}
713
Austin Schuh1be0ce42020-11-29 22:43:26 -0800714bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700715 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700716
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700717 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -0800718 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700719 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -0800720 return false;
721 }
722
723 if (this->channel_index < m2.channel_index) {
724 return true;
725 } else if (this->channel_index > m2.channel_index) {
726 return false;
727 }
728
729 return this->queue_index < m2.queue_index;
730}
731
732bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800733bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700734 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700735
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700736 return timestamp.time == m2.timestamp.time &&
737 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800738}
Austin Schuh1be0ce42020-11-29 22:43:26 -0800739
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700740std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
741 os << "{.channel_index=" << m.channel_index
742 << ", .monotonic_sent_time=" << m.monotonic_sent_time
743 << ", .realtime_sent_time=" << m.realtime_sent_time
744 << ", .queue_index=" << m.queue_index;
745 if (m.monotonic_remote_time) {
Austin Schuh826e6ce2021-11-18 20:33:10 -0800746 os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700747 }
748 os << ", .realtime_remote_time=";
749 PrintOptionalOrNull(&os, m.realtime_remote_time);
750 os << ", .remote_queue_index=";
751 PrintOptionalOrNull(&os, m.remote_queue_index);
752 if (m.has_monotonic_timestamp_time) {
753 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
754 }
755 return os;
756}
757
Austin Schuh1be0ce42020-11-29 22:43:26 -0800758std::ostream &operator<<(std::ostream &os, const Message &m) {
759 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700760 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700761 if (m.data != nullptr) {
Austin Schuh826e6ce2021-11-18 20:33:10 -0800762 if (m.data->remote_queue_index.has_value()) {
763 os << ", .remote_queue_index=" << *m.data->remote_queue_index;
764 }
765 if (m.data->monotonic_remote_time.has_value()) {
766 os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
767 }
Austin Schuhfb1b3292021-11-16 21:20:15 -0800768 os << ", .data=" << m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -0800769 }
770 os << "}";
771 return os;
772}
773
774std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
775 os << "{.channel_index=" << m.channel_index
776 << ", .queue_index=" << m.queue_index
777 << ", .monotonic_event_time=" << m.monotonic_event_time
778 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -0700779 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800780 os << ", .remote_queue_index=" << m.remote_queue_index;
781 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700782 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800783 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
784 }
785 if (m.realtime_remote_time != realtime_clock::min_time) {
786 os << ", .realtime_remote_time=" << m.realtime_remote_time;
787 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700788 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800789 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
790 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700791 if (m.data != nullptr) {
792 os << ", .data=" << *m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -0800793 }
794 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -0800795 return os;
796}
797
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800798LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -0700799 : parts_message_reader_(log_parts),
800 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
801}
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800802
803Message *LogPartsSorter::Front() {
804 // Queue up data until enough data has been queued that the front message is
805 // sorted enough to be safe to pop. This may do nothing, so we should make
806 // sure the nothing path is checked quickly.
807 if (sorted_until() != monotonic_clock::max_time) {
808 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -0700809 if (!messages_.empty() &&
810 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -0800811 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800812 break;
813 }
814
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700815 std::shared_ptr<UnpackedMessageHeader> m =
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800816 parts_message_reader_.ReadMessage();
817 // No data left, sorted forever, work through what is left.
818 if (!m) {
819 sorted_until_ = monotonic_clock::max_time;
820 break;
821 }
822
Austin Schuh48507722021-07-17 17:29:24 -0700823 size_t monotonic_timestamp_boot = 0;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700824 if (m->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -0700825 monotonic_timestamp_boot = parts().logger_boot_count;
826 }
827 size_t monotonic_remote_boot = 0xffffff;
828
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700829 if (m->monotonic_remote_time.has_value()) {
milind-ua50344f2021-08-25 18:22:20 -0700830 const Node *node = parts().config->nodes()->Get(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700831 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -0700832
Austin Schuh48507722021-07-17 17:29:24 -0700833 std::optional<size_t> boot = parts_message_reader_.boot_count(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700834 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -0700835 CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
836 << ", with index "
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700837 << source_node_index_[m->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -0700838 monotonic_remote_boot = *boot;
839 }
840
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700841 messages_.insert(
842 Message{.channel_index = m->channel_index,
843 .queue_index = BootQueueIndex{.boot = parts().boot_count,
844 .index = m->queue_index},
845 .timestamp = BootTimestamp{.boot = parts().boot_count,
846 .time = m->monotonic_sent_time},
847 .monotonic_remote_boot = monotonic_remote_boot,
848 .monotonic_timestamp_boot = monotonic_timestamp_boot,
849 .data = std::move(m)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800850
851 // Now, update sorted_until_ to match the new message.
852 if (parts_message_reader_.newest_timestamp() >
853 monotonic_clock::min_time +
854 parts_message_reader_.max_out_of_order_duration()) {
855 sorted_until_ = parts_message_reader_.newest_timestamp() -
856 parts_message_reader_.max_out_of_order_duration();
857 } else {
858 sorted_until_ = monotonic_clock::min_time;
859 }
860 }
861 }
862
863 // Now that we have enough data queued, return a pointer to the oldest piece
864 // of data if it exists.
865 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -0800866 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800867 return nullptr;
868 }
869
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700870 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -0800871 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700872 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800873 return &(*messages_.begin());
874}
875
876void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
877
878std::string LogPartsSorter::DebugString() const {
879 std::stringstream ss;
880 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -0800881 int count = 0;
882 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800883 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -0800884 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
885 ss << m << "\n";
886 } else if (no_dots) {
887 ss << "...\n";
888 no_dots = false;
889 }
890 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800891 }
892 ss << "] <- " << parts_message_reader_.filename();
893 return ss.str();
894}
895
Austin Schuhd2f96102020-12-01 20:27:29 -0800896NodeMerger::NodeMerger(std::vector<LogParts> parts) {
897 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -0700898 // Enforce that we are sorting things only from a single node from a single
899 // boot.
900 const std::string_view part0_node = parts[0].node;
901 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -0800902 for (size_t i = 1; i < parts.size(); ++i) {
903 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -0700904 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
905 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -0800906 }
Austin Schuh715adc12021-06-29 22:07:39 -0700907
908 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
909
Austin Schuhd2f96102020-12-01 20:27:29 -0800910 for (LogParts &part : parts) {
911 parts_sorters_.emplace_back(std::move(part));
912 }
913
Austin Schuhd2f96102020-12-01 20:27:29 -0800914 monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh9dc42612021-09-20 20:41:29 -0700915 realtime_start_time_ = realtime_clock::min_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800916 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -0700917 // We want to capture the earliest meaningful start time here. The start
918 // time defaults to min_time when there's no meaningful value to report, so
919 // let's ignore those.
Austin Schuh9dc42612021-09-20 20:41:29 -0700920 if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time) {
921 bool accept = false;
922 // We want to prioritize start times from the logger node. Really, we
923 // want to prioritize start times with a valid realtime_clock time. So,
924 // if we have a start time without a RT clock, prefer a start time with a
925 // RT clock, even it if is later.
926 if (parts_sorter.realtime_start_time() != realtime_clock::min_time) {
927 // We've got a good one. See if the current start time has a good RT
928 // clock, or if we should use this one instead.
929 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
930 accept = true;
931 } else if (realtime_start_time_ == realtime_clock::min_time) {
932 // The previous start time doesn't have a good RT time, so it is very
933 // likely the start time from a remote part file. We just found a
934 // better start time with a real RT time, so switch to that instead.
935 accept = true;
936 }
937 } else if (realtime_start_time_ == realtime_clock::min_time) {
938 // We don't have a RT time, so take the oldest.
939 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
940 accept = true;
941 }
942 }
943
944 if (accept) {
945 monotonic_start_time_ = parts_sorter.monotonic_start_time();
946 realtime_start_time_ = parts_sorter.realtime_start_time();
947 }
Austin Schuhd2f96102020-12-01 20:27:29 -0800948 }
949 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -0700950
951 // If there was no meaningful start time reported, just use min_time.
952 if (monotonic_start_time_ == monotonic_clock::max_time) {
953 monotonic_start_time_ = monotonic_clock::min_time;
954 realtime_start_time_ = realtime_clock::min_time;
955 }
Austin Schuhd2f96102020-12-01 20:27:29 -0800956}
Austin Schuh8f52ed52020-11-30 23:12:39 -0800957
Austin Schuh0ca51f32020-12-25 21:51:45 -0800958std::vector<const LogParts *> NodeMerger::Parts() const {
959 std::vector<const LogParts *> p;
960 p.reserve(parts_sorters_.size());
961 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
962 p.emplace_back(&parts_sorter.parts());
963 }
964 return p;
965}
966
Austin Schuh8f52ed52020-11-30 23:12:39 -0800967Message *NodeMerger::Front() {
968 // Return the current Front if we have one, otherwise go compute one.
969 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -0800970 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700971 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -0800972 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800973 }
974
975 // Otherwise, do a simple search for the oldest message, deduplicating any
976 // duplicates.
977 Message *oldest = nullptr;
978 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800979 for (LogPartsSorter &parts_sorter : parts_sorters_) {
980 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -0800981 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800982 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -0800983 continue;
984 }
985 if (oldest == nullptr || *m < *oldest) {
986 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -0800987 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800988 } else if (*m == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700989 // Found a duplicate. If there is a choice, we want the one which has
990 // the timestamp time.
991 if (!m->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800992 parts_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700993 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800994 current_->PopFront();
995 current_ = &parts_sorter;
996 oldest = m;
997 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700998 CHECK_EQ(m->data->monotonic_timestamp_time,
999 oldest->data->monotonic_timestamp_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001000 parts_sorter.PopFront();
1001 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001002 }
1003
1004 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -08001005 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001006 }
1007
Austin Schuhb000de62020-12-03 22:00:40 -08001008 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001009 CHECK_GE(oldest->timestamp.time, last_message_time_);
1010 last_message_time_ = oldest->timestamp.time;
Austin Schuh5dd22842021-11-17 16:09:39 -08001011 monotonic_oldest_time_ =
1012 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08001013 } else {
1014 last_message_time_ = monotonic_clock::max_time;
1015 }
1016
Austin Schuh8f52ed52020-11-30 23:12:39 -08001017 // Return the oldest message found. This will be nullptr if nothing was
1018 // found, indicating there is nothing left.
1019 return oldest;
1020}
1021
1022void NodeMerger::PopFront() {
1023 CHECK(current_ != nullptr) << "Popping before calling Front()";
1024 current_->PopFront();
1025 current_ = nullptr;
1026}
1027
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001028BootMerger::BootMerger(std::vector<LogParts> files) {
1029 std::vector<std::vector<LogParts>> boots;
1030
1031 // Now, we need to split things out by boot.
1032 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001033 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001034 if (boot_count + 1 > boots.size()) {
1035 boots.resize(boot_count + 1);
1036 }
1037 boots[boot_count].emplace_back(std::move(files[i]));
1038 }
1039
1040 node_mergers_.reserve(boots.size());
1041 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07001042 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001043 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -07001044 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001045 }
1046 node_mergers_.emplace_back(
1047 std::make_unique<NodeMerger>(std::move(boots[i])));
1048 }
1049}
1050
1051Message *BootMerger::Front() {
1052 Message *result = node_mergers_[index_]->Front();
1053
1054 if (result != nullptr) {
1055 return result;
1056 }
1057
1058 if (index_ + 1u == node_mergers_.size()) {
1059 // At the end of the last node merger, just return.
1060 return nullptr;
1061 } else {
1062 ++index_;
1063 return Front();
1064 }
1065}
1066
1067void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
1068
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001069std::vector<const LogParts *> BootMerger::Parts() const {
1070 std::vector<const LogParts *> results;
1071 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
1072 std::vector<const LogParts *> node_parts = node_merger->Parts();
1073
1074 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
1075 std::make_move_iterator(node_parts.end()));
1076 }
1077
1078 return results;
1079}
1080
Austin Schuhd2f96102020-12-01 20:27:29 -08001081TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001082 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -08001083 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001084 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001085 if (!configuration_) {
1086 configuration_ = part->config;
1087 } else {
1088 CHECK_EQ(configuration_.get(), part->config.get());
1089 }
1090 }
1091 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -08001092 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
1093 // pretty simple.
1094 if (configuration::MultiNode(config)) {
1095 nodes_data_.resize(config->nodes()->size());
1096 const Node *my_node = config->nodes()->Get(node());
1097 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
1098 const Node *node = config->nodes()->Get(node_index);
1099 NodeData *node_data = &nodes_data_[node_index];
1100 node_data->channels.resize(config->channels()->size());
1101 // We should save the channel if it is delivered to the node represented
1102 // by the NodeData, but not sent by that node. That combo means it is
1103 // forwarded.
1104 size_t channel_index = 0;
1105 node_data->any_delivered = false;
1106 for (const Channel *channel : *config->channels()) {
1107 node_data->channels[channel_index].delivered =
1108 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08001109 configuration::ChannelIsSendableOnNode(channel, my_node) &&
1110 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08001111 node_data->any_delivered = node_data->any_delivered ||
1112 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08001113 if (node_data->channels[channel_index].delivered) {
1114 const Connection *connection =
1115 configuration::ConnectionToNode(channel, node);
1116 node_data->channels[channel_index].time_to_live =
1117 chrono::nanoseconds(connection->time_to_live());
1118 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001119 ++channel_index;
1120 }
1121 }
1122
1123 for (const Channel *channel : *config->channels()) {
1124 source_node_.emplace_back(configuration::GetNodeIndex(
1125 config, channel->source_node()->string_view()));
1126 }
1127 }
1128}
1129
1130void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001131 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08001132 CHECK_NE(timestamp_mapper->node(), node());
1133 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
1134
1135 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001136 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08001137 // we could needlessly save data.
1138 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08001139 VLOG(1) << "Registering on node " << node() << " for peer node "
1140 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08001141 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
1142
1143 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07001144
1145 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001146 }
1147}
1148
Austin Schuh79b30942021-01-24 22:32:21 -08001149void TimestampMapper::QueueMessage(Message *m) {
1150 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08001151 .channel_index = m->channel_index,
1152 .queue_index = m->queue_index,
1153 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001154 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07001155 .remote_queue_index = BootQueueIndex::Invalid(),
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001156 .monotonic_remote_time = BootTimestamp::min_time(),
Austin Schuhd2f96102020-12-01 20:27:29 -08001157 .realtime_remote_time = realtime_clock::min_time,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001158 .monotonic_timestamp_time = BootTimestamp::min_time(),
Austin Schuh79b30942021-01-24 22:32:21 -08001159 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08001160}
1161
1162TimestampedMessage *TimestampMapper::Front() {
1163 // No need to fetch anything new. A previous message still exists.
1164 switch (first_message_) {
1165 case FirstMessage::kNeedsUpdate:
1166 break;
1167 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08001168 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001169 case FirstMessage::kNullptr:
1170 return nullptr;
1171 }
1172
Austin Schuh79b30942021-01-24 22:32:21 -08001173 if (matched_messages_.empty()) {
1174 if (!QueueMatched()) {
1175 first_message_ = FirstMessage::kNullptr;
1176 return nullptr;
1177 }
1178 }
1179 first_message_ = FirstMessage::kInMessage;
1180 return &matched_messages_.front();
1181}
1182
1183bool TimestampMapper::QueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08001184 if (nodes_data_.empty()) {
1185 // Simple path. We are single node, so there are no timestamps to match!
1186 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001187 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001188 if (!m) {
Austin Schuh79b30942021-01-24 22:32:21 -08001189 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001190 }
Austin Schuh79b30942021-01-24 22:32:21 -08001191 // Enqueue this message into matched_messages_ so we have a place to
1192 // associate remote timestamps, and return it.
1193 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08001194
Austin Schuh79b30942021-01-24 22:32:21 -08001195 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1196 last_message_time_ = matched_messages_.back().monotonic_event_time;
1197
1198 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001199 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08001200 timestamp_callback_(&matched_messages_.back());
1201 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001202 }
1203
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001204 // We need to only add messages to the list so they get processed for
1205 // messages which are delivered. Reuse the flow below which uses messages_
1206 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08001207 if (messages_.empty()) {
1208 if (!Queue()) {
1209 // Found nothing to add, we are out of data!
Austin Schuh79b30942021-01-24 22:32:21 -08001210 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001211 }
1212
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001213 // Now that it has been added (and cannibalized), forget about it
1214 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001215 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001216 }
1217
1218 Message *m = &(messages_.front());
1219
1220 if (source_node_[m->channel_index] == node()) {
1221 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08001222 QueueMessage(m);
1223 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1224 last_message_time_ = matched_messages_.back().monotonic_event_time;
1225 messages_.pop_front();
1226 timestamp_callback_(&matched_messages_.back());
1227 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001228 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001229 // Got a timestamp, find the matching remote data, match it, and return
1230 // it.
Austin Schuhd2f96102020-12-01 20:27:29 -08001231 Message data = MatchingMessageFor(*m);
1232
1233 // Return the data from the remote. The local message only has timestamp
1234 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08001235 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08001236 .channel_index = m->channel_index,
1237 .queue_index = m->queue_index,
1238 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001239 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07001240 .remote_queue_index =
1241 BootQueueIndex{.boot = m->monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001242 .index = m->data->remote_queue_index.value()},
1243 .monotonic_remote_time = {m->monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08001244 m->data->monotonic_remote_time.value()},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001245 .realtime_remote_time = m->data->realtime_remote_time.value(),
1246 .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
1247 m->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08001248 .data = std::move(data.data)});
1249 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1250 last_message_time_ = matched_messages_.back().monotonic_event_time;
1251 // Since messages_ holds the data, drop it.
1252 messages_.pop_front();
1253 timestamp_callback_(&matched_messages_.back());
1254 return true;
1255 }
1256}
1257
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001258void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08001259 while (last_message_time_ <= queue_time) {
1260 if (!QueueMatched()) {
1261 return;
1262 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001263 }
1264}
1265
Austin Schuhe639ea12021-01-25 13:00:22 -08001266void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001267 // Note: queueing for time doesn't really work well across boots. So we
1268 // just assume that if you are using this, you only care about the current
1269 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001270 //
1271 // TODO(austin): Is that the right concept?
1272 //
Austin Schuhe639ea12021-01-25 13:00:22 -08001273 // Make sure we have something queued first. This makes the end time
1274 // calculation simpler, and is typically what folks want regardless.
1275 if (matched_messages_.empty()) {
1276 if (!QueueMatched()) {
1277 return;
1278 }
1279 }
1280
1281 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001282 std::max(monotonic_start_time(
1283 matched_messages_.front().monotonic_event_time.boot),
1284 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08001285 time_estimation_buffer;
1286
1287 // Place sorted messages on the list until we have
1288 // --time_estimation_buffer_seconds seconds queued up (but queue at least
1289 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001290 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08001291 if (!QueueMatched()) {
1292 return;
1293 }
1294 }
1295}
1296
Austin Schuhd2f96102020-12-01 20:27:29 -08001297void TimestampMapper::PopFront() {
1298 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08001299 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001300 first_message_ = FirstMessage::kNeedsUpdate;
1301
Austin Schuh79b30942021-01-24 22:32:21 -08001302 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001303}
1304
1305Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001306 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001307 CHECK_NOTNULL(message.data);
1308 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07001309 const BootQueueIndex remote_queue_index =
1310 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001311 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08001312
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001313 CHECK(message.data->monotonic_remote_time.has_value());
1314 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08001315
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001316 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07001317 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08001318 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001319 const realtime_clock::time_point realtime_remote_time =
1320 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001321
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001322 TimestampMapper *peer =
1323 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001324
1325 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001326 // asked to pull a timestamp from a peer which doesn't exist, return an
1327 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08001328 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001329 // TODO(austin): Make sure the tests hit all these paths with a boot count
1330 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001331 return Message{.channel_index = message.channel_index,
1332 .queue_index = remote_queue_index,
1333 .timestamp = monotonic_remote_time,
1334 .monotonic_remote_boot = 0xffffff,
1335 .monotonic_timestamp_boot = 0xffffff,
1336 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08001337 }
1338
1339 // The queue which will have the matching data, if available.
1340 std::deque<Message> *data_queue =
1341 &peer->nodes_data_[node()].channels[message.channel_index].messages;
1342
Austin Schuh79b30942021-01-24 22:32:21 -08001343 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08001344
1345 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001346 return Message{.channel_index = message.channel_index,
1347 .queue_index = remote_queue_index,
1348 .timestamp = monotonic_remote_time,
1349 .monotonic_remote_boot = 0xffffff,
1350 .monotonic_timestamp_boot = 0xffffff,
1351 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08001352 }
1353
Austin Schuhd2f96102020-12-01 20:27:29 -08001354 if (remote_queue_index < data_queue->front().queue_index ||
1355 remote_queue_index > data_queue->back().queue_index) {
1356 return Message{
1357 .channel_index = message.channel_index,
1358 .queue_index = remote_queue_index,
1359 .timestamp = monotonic_remote_time,
Austin Schuh48507722021-07-17 17:29:24 -07001360 .monotonic_remote_boot = 0xffffff,
1361 .monotonic_timestamp_boot = 0xffffff,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001362 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08001363 }
1364
Austin Schuh993ccb52020-12-12 15:59:32 -08001365 // The algorithm below is constant time with some assumptions. We need there
1366 // to be no missing messages in the data stream. This also assumes a queue
1367 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07001368 if (data_queue->back().queue_index.boot ==
1369 data_queue->front().queue_index.boot &&
1370 (data_queue->back().queue_index.index -
1371 data_queue->front().queue_index.index + 1u ==
1372 data_queue->size())) {
1373 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08001374 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07001375 //
1376 // TODO(austin): Move if not reliable.
1377 Message result = (*data_queue)[remote_queue_index.index -
1378 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08001379
1380 CHECK_EQ(result.timestamp, monotonic_remote_time)
1381 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh6a7358f2021-11-18 22:40:40 -08001382 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08001383 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1384 // Now drop the data off the front. We have deduplicated timestamps, so we
1385 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07001386 data_queue->erase(
1387 data_queue->begin(),
1388 data_queue->begin() +
1389 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08001390 return result;
1391 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07001392 // TODO(austin): Binary search.
1393 auto it = std::find_if(
1394 data_queue->begin(), data_queue->end(),
1395 [remote_queue_index,
1396 remote_boot = monotonic_remote_time.boot](const Message &m) {
1397 return m.queue_index == remote_queue_index &&
1398 m.timestamp.boot == remote_boot;
1399 });
Austin Schuh993ccb52020-12-12 15:59:32 -08001400 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001401 return Message{.channel_index = message.channel_index,
1402 .queue_index = remote_queue_index,
1403 .timestamp = monotonic_remote_time,
1404 .monotonic_remote_boot = 0xffffff,
1405 .monotonic_timestamp_boot = 0xffffff,
1406 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08001407 }
1408
1409 Message result = std::move(*it);
1410
1411 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001412 << ": Queue index matches, but timestamp doesn't. Please "
1413 "investigate!";
1414 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
1415 << ": Queue index matches, but timestamp doesn't. Please "
1416 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08001417
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08001418 // Erase everything up to this message. We want to keep 1 message in the
1419 // queue so we can handle reliable messages forwarded across boots.
1420 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08001421
1422 return result;
1423 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001424}
1425
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001426void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001427 if (queued_until_ > t) {
1428 return;
1429 }
1430 while (true) {
1431 if (!messages_.empty() && messages_.back().timestamp > t) {
1432 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
1433 return;
1434 }
1435
1436 if (!Queue()) {
1437 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001438 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08001439 return;
1440 }
1441
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001442 // Now that it has been added (and cannibalized), forget about it
1443 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001444 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001445 }
1446}
1447
1448bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001449 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001450 if (m == nullptr) {
1451 return false;
1452 }
1453 for (NodeData &node_data : nodes_data_) {
1454 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07001455 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08001456 if (node_data.channels[m->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08001457 // If we have data but no timestamps (logs where the timestamps didn't get
1458 // logged are classic), we can grow this indefinitely. We don't need to
1459 // keep anything that is older than the last message returned.
1460
1461 // We have the time on the source node.
1462 // We care to wait until we have the time on the destination node.
1463 std::deque<Message> &messages =
1464 node_data.channels[m->channel_index].messages;
1465 // Max delay over the network is the TTL, so let's take the queue time and
1466 // add TTL to it. Don't forget any messages which are reliable until
1467 // someone can come up with a good reason to forget those too.
1468 if (node_data.channels[m->channel_index].time_to_live >
1469 chrono::nanoseconds(0)) {
1470 // We need to make *some* assumptions about network delay for this to
1471 // work. We want to only look at the RX side. This means we need to
1472 // track the last time a message was popped from any channel from the
1473 // node sending this message, and compare that to the max time we expect
1474 // that a message will take to be delivered across the network. This
1475 // assumes that messages are popped in time order as a proxy for
1476 // measuring the distributed time at this layer.
1477 //
1478 // Leave at least 1 message in here so we can handle reboots and
1479 // messages getting sent twice.
1480 while (messages.size() > 1u &&
1481 messages.begin()->timestamp +
1482 node_data.channels[m->channel_index].time_to_live +
1483 chrono::duration_cast<chrono::nanoseconds>(
1484 chrono::duration<double>(FLAGS_max_network_delay)) <
1485 last_popped_message_time_) {
1486 messages.pop_front();
1487 }
1488 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001489 node_data.channels[m->channel_index].messages.emplace_back(*m);
1490 }
1491 }
1492
1493 messages_.emplace_back(std::move(*m));
1494 return true;
1495}
1496
1497std::string TimestampMapper::DebugString() const {
1498 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07001499 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08001500 for (const Message &message : messages_) {
1501 ss << " " << message << "\n";
1502 }
1503 ss << "] queued_until " << queued_until_;
1504 for (const NodeData &ns : nodes_data_) {
1505 if (ns.peer == nullptr) continue;
1506 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
1507 size_t channel_index = 0;
1508 for (const NodeData::ChannelData &channel_data :
1509 ns.peer->nodes_data_[node()].channels) {
1510 if (channel_data.messages.empty()) {
1511 continue;
1512 }
Austin Schuhb000de62020-12-03 22:00:40 -08001513
Austin Schuhd2f96102020-12-01 20:27:29 -08001514 ss << " channel " << channel_index << " [\n";
1515 for (const Message &m : channel_data.messages) {
1516 ss << " " << m << "\n";
1517 }
1518 ss << " ]\n";
1519 ++channel_index;
1520 }
1521 ss << "] queued_until " << ns.peer->queued_until_;
1522 }
1523 return ss.str();
1524}
1525
Austin Schuhee711052020-08-24 16:06:09 -07001526std::string MaybeNodeName(const Node *node) {
1527 if (node != nullptr) {
1528 return node->name()->str() + " ";
1529 }
1530 return "";
1531}
1532
Brian Silvermanf51499a2020-09-21 12:49:08 -07001533} // namespace aos::logger