blob: 20e38d2fc0918f2ea3923ae31d12bb522ad6556f [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Austin Schuha36c8902019-12-30 18:07:15 -080010
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
Austin Schuhfa895892020-01-07 20:07:41 -080013#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080014#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080015#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080016#include "gflags/gflags.h"
17#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080018
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070019#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070020#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070021#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070022#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070023#else
24#define ENABLE_LZMA 0
25#endif
26
27#if ENABLE_LZMA
28#include "aos/events/logging/lzma_encoder.h"
29#endif
30
Austin Schuh7fbf5a72020-09-21 16:28:13 -070031DEFINE_int32(flush_size, 128000,
Austin Schuha36c8902019-12-30 18:07:15 -080032 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070033DEFINE_double(
34 flush_period, 5.0,
35 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080036
Austin Schuha040c3f2021-02-13 16:09:07 -080037DEFINE_double(
38 max_out_of_order, -1,
39 "If set, this overrides the max out of order duration for a log file.");
40
Austin Schuh0e8db662021-07-06 10:43:47 -070041DEFINE_bool(workaround_double_headers, true,
42 "Some old log files have two headers at the beginning. Use the "
43 "last header as the actual header.");
44
Brian Silvermanf51499a2020-09-21 12:49:08 -070045namespace aos::logger {
Austin Schuha36c8902019-12-30 18:07:15 -080046
Austin Schuh05b70472020-01-01 17:11:17 -080047namespace chrono = std::chrono;
48
Brian Silvermanf51499a2020-09-21 12:49:08 -070049DetachedBufferWriter::DetachedBufferWriter(
50 std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
51 : filename_(filename), encoder_(std::move(encoder)) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -070052 if (!util::MkdirPIfSpace(filename, 0777)) {
53 ran_out_of_space_ = true;
54 } else {
55 fd_ = open(std::string(filename).c_str(),
56 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
57 if (fd_ == -1 && errno == ENOSPC) {
58 ran_out_of_space_ = true;
59 } else {
60 PCHECK(fd_ != -1) << ": Failed to open " << filename << " for writing";
61 VLOG(1) << "Opened " << filename << " for writing";
62 }
63 }
Austin Schuha36c8902019-12-30 18:07:15 -080064}
65
66DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -070067 Close();
68 if (ran_out_of_space_) {
69 CHECK(acknowledge_ran_out_of_space_)
70 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -070071 }
Austin Schuh2f8fd752020-09-01 22:38:28 -070072}
73
Brian Silvermand90905f2020-09-23 14:42:56 -070074DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070075 *this = std::move(other);
76}
77
Brian Silverman87ac0402020-09-17 14:47:01 -070078// When other is destroyed "soon" (which it should be because we're getting an
79// rvalue reference to it), it will flush etc all the data we have queued up
80// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -070081DetachedBufferWriter &DetachedBufferWriter::operator=(
82 DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070083 std::swap(filename_, other.filename_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070084 std::swap(encoder_, other.encoder_);
Austin Schuh2f8fd752020-09-01 22:38:28 -070085 std::swap(fd_, other.fd_);
Brian Silverman0465fcf2020-09-24 00:29:18 -070086 std::swap(ran_out_of_space_, other.ran_out_of_space_);
87 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh2f8fd752020-09-01 22:38:28 -070088 std::swap(iovec_, other.iovec_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070089 std::swap(max_write_time_, other.max_write_time_);
90 std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
91 std::swap(max_write_time_messages_, other.max_write_time_messages_);
92 std::swap(total_write_time_, other.total_write_time_);
93 std::swap(total_write_count_, other.total_write_count_);
94 std::swap(total_write_messages_, other.total_write_messages_);
95 std::swap(total_write_bytes_, other.total_write_bytes_);
Austin Schuh2f8fd752020-09-01 22:38:28 -070096 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -080097}
98
Brian Silvermanf51499a2020-09-21 12:49:08 -070099void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700100 if (ran_out_of_space_) {
101 // We don't want any later data to be written after space becomes
102 // available, so refuse to write anything more once we've dropped data
103 // because we ran out of space.
104 VLOG(1) << "Ignoring span: " << span.size();
105 return;
106 }
107
Austin Schuhbd06ae42021-03-31 22:48:21 -0700108 aos::monotonic_clock::time_point now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700109 if (encoder_->may_bypass() && span.size() > 4096u) {
110 // Over this threshold, we'll assume it's cheaper to add an extra
111 // syscall to write the data immediately instead of copying it to
112 // enqueue.
Austin Schuha36c8902019-12-30 18:07:15 -0800113
Brian Silvermanf51499a2020-09-21 12:49:08 -0700114 // First, flush everything.
115 while (encoder_->queue_size() > 0u) {
116 Flush();
117 }
Austin Schuhde031b72020-01-10 19:34:41 -0800118
Brian Silvermanf51499a2020-09-21 12:49:08 -0700119 // Then, write it directly.
120 const auto start = aos::monotonic_clock::now();
121 const ssize_t written = write(fd_, span.data(), span.size());
122 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700123 HandleWriteReturn(written, span.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700124 UpdateStatsForWrite(end - start, written, 1);
Austin Schuhbd06ae42021-03-31 22:48:21 -0700125 now = end;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700126 } else {
127 encoder_->Encode(CopySpanAsDetachedBuffer(span));
Austin Schuhbd06ae42021-03-31 22:48:21 -0700128 now = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800129 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700130
Austin Schuhbd06ae42021-03-31 22:48:21 -0700131 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800132}
133
Brian Silverman0465fcf2020-09-24 00:29:18 -0700134void DetachedBufferWriter::Close() {
135 if (fd_ == -1) {
136 return;
137 }
138 encoder_->Finish();
139 while (encoder_->queue_size() > 0) {
140 Flush();
141 }
142 if (close(fd_) == -1) {
143 if (errno == ENOSPC) {
144 ran_out_of_space_ = true;
145 } else {
146 PLOG(ERROR) << "Closing log file failed";
147 }
148 }
149 fd_ = -1;
150 VLOG(1) << "Closed " << filename_;
151}
152
Austin Schuha36c8902019-12-30 18:07:15 -0800153void DetachedBufferWriter::Flush() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700154 if (ran_out_of_space_) {
155 // We don't want any later data to be written after space becomes available,
156 // so refuse to write anything more once we've dropped data because we ran
157 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700158 if (encoder_) {
159 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
160 encoder_->Clear(encoder_->queue().size());
161 } else {
162 VLOG(1) << "No queue to ignore";
163 }
164 return;
165 }
166
167 const auto queue = encoder_->queue();
168 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700169 return;
170 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700171
Austin Schuha36c8902019-12-30 18:07:15 -0800172 iovec_.clear();
Brian Silvermanf51499a2020-09-21 12:49:08 -0700173 const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
174 iovec_.resize(iovec_size);
Austin Schuha36c8902019-12-30 18:07:15 -0800175 size_t counted_size = 0;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700176 for (size_t i = 0; i < iovec_size; ++i) {
177 iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
178 iovec_[i].iov_len = queue[i].size();
179 counted_size += iovec_[i].iov_len;
Austin Schuha36c8902019-12-30 18:07:15 -0800180 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700181
182 const auto start = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800183 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700184 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700185 HandleWriteReturn(written, counted_size);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700186
187 encoder_->Clear(iovec_size);
188
189 UpdateStatsForWrite(end - start, written, iovec_size);
190}
191
Brian Silverman0465fcf2020-09-24 00:29:18 -0700192void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
193 size_t write_size) {
194 if (write_return == -1 && errno == ENOSPC) {
195 ran_out_of_space_ = true;
196 return;
197 }
198 PCHECK(write_return >= 0) << ": write failed";
199 if (write_return < static_cast<ssize_t>(write_size)) {
200 // Sometimes this happens instead of ENOSPC. On a real filesystem, this
201 // never seems to happen in any other case. If we ever want to log to a
202 // socket, this will happen more often. However, until we get there, we'll
203 // just assume it means we ran out of space.
204 ran_out_of_space_ = true;
205 return;
206 }
207}
208
Brian Silvermanf51499a2020-09-21 12:49:08 -0700209void DetachedBufferWriter::UpdateStatsForWrite(
210 aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
211 if (duration > max_write_time_) {
212 max_write_time_ = duration;
213 max_write_time_bytes_ = written;
214 max_write_time_messages_ = iovec_size;
215 }
216 total_write_time_ += duration;
217 ++total_write_count_;
218 total_write_messages_ += iovec_size;
219 total_write_bytes_ += written;
220}
221
Austin Schuhbd06ae42021-03-31 22:48:21 -0700222void DetachedBufferWriter::FlushAtThreshold(
223 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700224 if (ran_out_of_space_) {
225 // We don't want any later data to be written after space becomes available,
226 // so refuse to write anything more once we've dropped data because we ran
227 // out of space.
228 if (encoder_) {
229 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
230 encoder_->Clear(encoder_->queue().size());
231 } else {
232 VLOG(1) << "No queue to ignore";
233 }
234 return;
235 }
236
Austin Schuhbd06ae42021-03-31 22:48:21 -0700237 // We don't want to flush the first time through. Otherwise we will flush as
238 // the log file header might be compressing, defeating any parallelism and
239 // queueing there.
240 if (last_flush_time_ == aos::monotonic_clock::min_time) {
241 last_flush_time_ = now;
242 }
243
Brian Silvermanf51499a2020-09-21 12:49:08 -0700244 // Flush if we are at the max number of iovs per writev, because there's no
245 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700246 // data queued up or if it has been long enough.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700247 while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700248 encoder_->queue_size() >= IOV_MAX ||
249 now > last_flush_time_ +
250 chrono::duration_cast<chrono::nanoseconds>(
251 chrono::duration<double>(FLAGS_flush_period))) {
252 last_flush_time_ = now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700253 Flush();
254 }
Austin Schuha36c8902019-12-30 18:07:15 -0800255}
256
257flatbuffers::Offset<MessageHeader> PackMessage(
258 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
259 int channel_index, LogType log_type) {
260 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
261
262 switch (log_type) {
263 case LogType::kLogMessage:
264 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800265 case LogType::kLogRemoteMessage:
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700266 data_offset = fbb->CreateVector(
267 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800268 break;
269
270 case LogType::kLogDeliveryTimeOnly:
271 break;
272 }
273
274 MessageHeader::Builder message_header_builder(*fbb);
275 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800276
277 switch (log_type) {
278 case LogType::kLogRemoteMessage:
279 message_header_builder.add_queue_index(context.remote_queue_index);
280 message_header_builder.add_monotonic_sent_time(
281 context.monotonic_remote_time.time_since_epoch().count());
282 message_header_builder.add_realtime_sent_time(
283 context.realtime_remote_time.time_since_epoch().count());
284 break;
285
286 case LogType::kLogMessage:
287 case LogType::kLogMessageAndDeliveryTime:
288 case LogType::kLogDeliveryTimeOnly:
289 message_header_builder.add_queue_index(context.queue_index);
290 message_header_builder.add_monotonic_sent_time(
291 context.monotonic_event_time.time_since_epoch().count());
292 message_header_builder.add_realtime_sent_time(
293 context.realtime_event_time.time_since_epoch().count());
294 break;
295 }
Austin Schuha36c8902019-12-30 18:07:15 -0800296
297 switch (log_type) {
298 case LogType::kLogMessage:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800299 case LogType::kLogRemoteMessage:
Austin Schuha36c8902019-12-30 18:07:15 -0800300 message_header_builder.add_data(data_offset);
301 break;
302
303 case LogType::kLogMessageAndDeliveryTime:
304 message_header_builder.add_data(data_offset);
305 [[fallthrough]];
306
307 case LogType::kLogDeliveryTimeOnly:
308 message_header_builder.add_monotonic_remote_time(
309 context.monotonic_remote_time.time_since_epoch().count());
310 message_header_builder.add_realtime_remote_time(
311 context.realtime_remote_time.time_since_epoch().count());
312 message_header_builder.add_remote_queue_index(context.remote_queue_index);
313 break;
314 }
315
316 return message_header_builder.Finish();
317}
318
Brian Silvermanf51499a2020-09-21 12:49:08 -0700319SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
Tyler Chatow2015bc62021-08-04 21:15:09 -0700320 decoder_ = std::make_unique<DummyDecoder>(filename);
321
322 static constexpr std::string_view kXz = ".xz";
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700323 if (filename.substr(filename.size() - kXz.size()) == kXz) {
324#if ENABLE_LZMA
Tyler Chatow2015bc62021-08-04 21:15:09 -0700325 decoder_ = std::make_unique<ThreadedLzmaDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700326#else
327 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
328#endif
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700329 }
Austin Schuh05b70472020-01-01 17:11:17 -0800330}
331
Austin Schuhcf5f6442021-07-06 10:43:28 -0700332absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800333 // Make sure we have enough for the size.
334 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
335 if (!ReadBlock()) {
336 return absl::Span<const uint8_t>();
337 }
338 }
339
340 // Now make sure we have enough for the message.
341 const size_t data_size =
342 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
343 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -0800344 if (data_size == sizeof(flatbuffers::uoffset_t)) {
345 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
346 LOG(ERROR) << " Rest of log file is "
347 << absl::BytesToHexString(std::string_view(
348 reinterpret_cast<const char *>(data_.data() +
349 consumed_data_),
350 data_.size() - consumed_data_));
351 return absl::Span<const uint8_t>();
352 }
Austin Schuh05b70472020-01-01 17:11:17 -0800353 while (data_.size() < consumed_data_ + data_size) {
354 if (!ReadBlock()) {
355 return absl::Span<const uint8_t>();
356 }
357 }
358
359 // And return it, consuming the data.
360 const uint8_t *data_ptr = data_.data() + consumed_data_;
361
Austin Schuh05b70472020-01-01 17:11:17 -0800362 return absl::Span<const uint8_t>(data_ptr, data_size);
363}
364
Austin Schuhcf5f6442021-07-06 10:43:28 -0700365void SpanReader::ConsumeMessage() {
366 consumed_data_ +=
367 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
368 sizeof(flatbuffers::uoffset_t);
369}
370
371absl::Span<const uint8_t> SpanReader::ReadMessage() {
372 absl::Span<const uint8_t> result = PeekMessage();
373 if (result != absl::Span<const uint8_t>()) {
374 ConsumeMessage();
375 }
376 return result;
377}
378
Austin Schuh05b70472020-01-01 17:11:17 -0800379bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700380 // This is the amount of data we grab at a time. Doing larger chunks minimizes
381 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -0800382 constexpr size_t kReadSize = 256 * 1024;
383
384 // Strip off any unused data at the front.
385 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700386 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -0800387 consumed_data_ = 0;
388 }
389
390 const size_t starting_size = data_.size();
391
392 // This should automatically grow the backing store. It won't shrink if we
393 // get a small chunk later. This reduces allocations when we want to append
394 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700395 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -0800396
Brian Silvermanf51499a2020-09-21 12:49:08 -0700397 const size_t count =
398 decoder_->Read(data_.begin() + starting_size, data_.end());
399 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -0800400 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -0800401 return false;
402 }
Austin Schuh05b70472020-01-01 17:11:17 -0800403
404 return true;
405}
406
Austin Schuhadd6eb32020-11-09 21:24:26 -0800407std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -0700408 SpanReader *span_reader) {
409 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800410
411 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800412 if (config_data == absl::Span<const uint8_t>()) {
413 return std::nullopt;
414 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800415
Austin Schuh5212cad2020-09-09 23:12:09 -0700416 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700417 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -0800418 if (!result.Verify()) {
419 return std::nullopt;
420 }
Austin Schuh0e8db662021-07-06 10:43:47 -0700421
422 if (FLAGS_workaround_double_headers) {
423 while (true) {
424 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
425 if (maybe_header_data == absl::Span<const uint8_t>()) {
426 break;
427 }
428
429 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
430 maybe_header_data);
431 if (maybe_header.Verify()) {
432 LOG(WARNING) << "Found duplicate LogFileHeader in "
433 << span_reader->filename();
434 ResizeableBuffer header_data_copy;
435 header_data_copy.resize(maybe_header_data.size());
436 memcpy(header_data_copy.data(), maybe_header_data.begin(),
437 header_data_copy.size());
438 result = SizePrefixedFlatbufferVector<LogFileHeader>(
439 std::move(header_data_copy));
440
441 span_reader->ConsumeMessage();
442 } else {
443 break;
444 }
445 }
446 }
Austin Schuhe09beb12020-12-11 20:04:27 -0800447 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800448}
449
Austin Schuh0e8db662021-07-06 10:43:47 -0700450std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
451 std::string_view filename) {
452 SpanReader span_reader(filename);
453 return ReadHeader(&span_reader);
454}
455
Austin Schuhadd6eb32020-11-09 21:24:26 -0800456std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -0800457 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -0700458 SpanReader span_reader(filename);
459 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
460 for (size_t i = 0; i < n + 1; ++i) {
461 data_span = span_reader.ReadMessage();
462
463 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800464 if (data_span == absl::Span<const uint8_t>()) {
465 return std::nullopt;
466 }
Austin Schuh5212cad2020-09-09 23:12:09 -0700467 }
468
Brian Silverman354697a2020-09-22 21:06:32 -0700469 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700470 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -0800471 if (!result.Verify()) {
472 return std::nullopt;
473 }
474 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -0700475}
476
Austin Schuh05b70472020-01-01 17:11:17 -0800477MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -0700478 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -0800479 raw_log_file_header_(
480 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -0700481 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
482 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -0800483
484 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -0700485 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -0800486
Austin Schuh0e8db662021-07-06 10:43:47 -0700487 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -0800488
Austin Schuh5b728b72021-06-16 14:57:15 -0700489 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
490
Austin Schuhcde938c2020-02-02 17:30:07 -0800491 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -0800492 FLAGS_max_out_of_order > 0
493 ? chrono::duration_cast<chrono::nanoseconds>(
494 chrono::duration<double>(FLAGS_max_out_of_order))
495 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -0800496
497 VLOG(1) << "Opened " << filename << " as node "
498 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -0800499}
500
Austin Schuhadd6eb32020-11-09 21:24:26 -0800501std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
502MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800503 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
504 if (msg_data == absl::Span<const uint8_t>()) {
505 return std::nullopt;
506 }
507
Austin Schuhb929c4e2021-07-12 15:32:53 -0700508 SizePrefixedFlatbufferVector<MessageHeader> result(msg_data);
Austin Schuh05b70472020-01-01 17:11:17 -0800509
Austin Schuh0e8db662021-07-06 10:43:47 -0700510 CHECK(result.Verify()) << ": Corrupted message from " << filename();
511
Austin Schuh05b70472020-01-01 17:11:17 -0800512 const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
513 chrono::nanoseconds(result.message().monotonic_sent_time()));
514
515 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuh8bd96322020-02-13 21:18:22 -0800516 VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(result);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800517 return std::move(result);
Austin Schuh05b70472020-01-01 17:11:17 -0800518}
519
Austin Schuhc41603c2020-10-11 16:17:37 -0700520PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -0700521 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
522 ComputeBootCounts();
523}
524
525void PartsMessageReader::ComputeBootCounts() {
526 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
527 std::nullopt);
528
529 // We have 3 vintages of log files with different amounts of information.
530 if (log_file_header()->has_boot_uuids()) {
531 // The new hotness with the boots explicitly listed out. We can use the log
532 // file header to compute the boot count of all relevant nodes.
533 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
534 size_t node_index = 0;
535 for (const flatbuffers::String *boot_uuid :
536 *log_file_header()->boot_uuids()) {
537 CHECK(parts_.boots);
538 if (boot_uuid->size() != 0) {
539 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
540 if (it != parts_.boots->boot_count_map.end()) {
541 boot_counts_[node_index] = it->second;
542 }
543 } else if (parts().boots->boots[node_index].size() == 1u) {
544 boot_counts_[node_index] = 0;
545 }
546 ++node_index;
547 }
548 } else {
549 // Older multi-node logs which are guarenteed to have UUIDs logged, or
550 // single node log files with boot UUIDs in the header. We only know how to
551 // order certain boots in certain circumstances.
552 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
553 for (size_t node_index = 0; node_index < boot_counts_.size();
554 ++node_index) {
555 CHECK(parts_.boots);
556 if (parts().boots->boots[node_index].size() == 1u) {
557 boot_counts_[node_index] = 0;
558 }
559 }
560 } else {
561 // Really old single node logs without any UUIDs. They can't reboot.
562 CHECK_EQ(boot_counts_.size(), 1u);
563 boot_counts_[0] = 0u;
564 }
565 }
566}
Austin Schuhc41603c2020-10-11 16:17:37 -0700567
Austin Schuhadd6eb32020-11-09 21:24:26 -0800568std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
Austin Schuhc41603c2020-10-11 16:17:37 -0700569PartsMessageReader::ReadMessage() {
570 while (!done_) {
Austin Schuhadd6eb32020-11-09 21:24:26 -0800571 std::optional<SizePrefixedFlatbufferVector<MessageHeader>> message =
Austin Schuhc41603c2020-10-11 16:17:37 -0700572 message_reader_.ReadMessage();
573 if (message) {
574 newest_timestamp_ = message_reader_.newest_timestamp();
Austin Schuh32f68492020-11-08 21:45:51 -0800575 const monotonic_clock::time_point monotonic_sent_time(
576 chrono::nanoseconds(message->message().monotonic_sent_time()));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800577 // TODO(austin): Does this work with startup? Might need to use the start
578 // time.
579 // TODO(austin): Does this work with startup when we don't know the remote
580 // start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -0800581 if (monotonic_sent_time >
582 parts_.monotonic_start_time + max_out_of_order_duration()) {
583 after_start_ = true;
584 }
585 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -0800586 CHECK_GE(monotonic_sent_time,
587 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -0800588 << ": Max out of order of " << max_out_of_order_duration().count()
589 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -0800590 << parts_.monotonic_start_time << " currently reading "
591 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -0800592 }
Austin Schuhc41603c2020-10-11 16:17:37 -0700593 return message;
594 }
595 NextLog();
596 }
Austin Schuh32f68492020-11-08 21:45:51 -0800597 newest_timestamp_ = monotonic_clock::max_time;
Austin Schuhc41603c2020-10-11 16:17:37 -0700598 return std::nullopt;
599}
600
601void PartsMessageReader::NextLog() {
602 if (next_part_index_ == parts_.parts.size()) {
603 done_ = true;
604 return;
605 }
606 message_reader_ = MessageReader(parts_.parts[next_part_index_]);
Austin Schuh48507722021-07-17 17:29:24 -0700607 ComputeBootCounts();
Austin Schuhc41603c2020-10-11 16:17:37 -0700608 ++next_part_index_;
609}
610
Austin Schuh1be0ce42020-11-29 22:43:26 -0800611bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700612 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700613
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700614 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -0800615 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700616 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -0800617 return false;
618 }
619
620 if (this->channel_index < m2.channel_index) {
621 return true;
622 } else if (this->channel_index > m2.channel_index) {
623 return false;
624 }
625
626 return this->queue_index < m2.queue_index;
627}
628
629bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800630bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700631 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700632
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700633 return timestamp.time == m2.timestamp.time &&
634 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800635}
Austin Schuh1be0ce42020-11-29 22:43:26 -0800636
637std::ostream &operator<<(std::ostream &os, const Message &m) {
638 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700639 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Austin Schuhd2f96102020-12-01 20:27:29 -0800640 if (m.data.Verify()) {
641 os << ", .data="
642 << aos::FlatbufferToJson(m.data,
643 {.multi_line = false, .max_vector_size = 1});
644 }
645 os << "}";
646 return os;
647}
648
649std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
650 os << "{.channel_index=" << m.channel_index
651 << ", .queue_index=" << m.queue_index
652 << ", .monotonic_event_time=" << m.monotonic_event_time
653 << ", .realtime_event_time=" << m.realtime_event_time;
654 if (m.remote_queue_index != 0xffffffff) {
655 os << ", .remote_queue_index=" << m.remote_queue_index;
656 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700657 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800658 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
659 }
660 if (m.realtime_remote_time != realtime_clock::min_time) {
661 os << ", .realtime_remote_time=" << m.realtime_remote_time;
662 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700663 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800664 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
665 }
Austin Schuhd2f96102020-12-01 20:27:29 -0800666 if (m.data.Verify()) {
667 os << ", .data="
668 << aos::FlatbufferToJson(m.data,
669 {.multi_line = false, .max_vector_size = 1});
670 }
671 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -0800672 return os;
673}
674
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800675LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -0700676 : parts_message_reader_(log_parts),
677 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
678}
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800679
680Message *LogPartsSorter::Front() {
681 // Queue up data until enough data has been queued that the front message is
682 // sorted enough to be safe to pop. This may do nothing, so we should make
683 // sure the nothing path is checked quickly.
684 if (sorted_until() != monotonic_clock::max_time) {
685 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -0700686 if (!messages_.empty() &&
687 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -0800688 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800689 break;
690 }
691
692 std::optional<SizePrefixedFlatbufferVector<MessageHeader>> m =
693 parts_message_reader_.ReadMessage();
694 // No data left, sorted forever, work through what is left.
695 if (!m) {
696 sorted_until_ = monotonic_clock::max_time;
697 break;
698 }
699
Austin Schuh48507722021-07-17 17:29:24 -0700700 size_t monotonic_timestamp_boot = 0;
701 if (m.value().message().has_monotonic_timestamp_time()) {
702 monotonic_timestamp_boot = parts().logger_boot_count;
703 }
704 size_t monotonic_remote_boot = 0xffffff;
705
706 if (m.value().message().has_monotonic_remote_time()) {
707 std::optional<size_t> boot = parts_message_reader_.boot_count(
708 source_node_index_[m->message().channel_index()]);
709 CHECK(boot) << ": Failed to find boot for node "
710 << source_node_index_[m->message().channel_index()];
711 monotonic_remote_boot = *boot;
712 }
713
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700714 messages_.insert(Message{
715 .channel_index = m.value().message().channel_index(),
716 .queue_index = m.value().message().queue_index(),
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700717 .timestamp =
718 BootTimestamp{
719 .boot = parts().boot_count,
720 .time = monotonic_clock::time_point(std::chrono::nanoseconds(
721 m.value().message().monotonic_sent_time()))},
Austin Schuh48507722021-07-17 17:29:24 -0700722 .monotonic_remote_boot = monotonic_remote_boot,
723 .monotonic_timestamp_boot = monotonic_timestamp_boot,
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700724 .data = std::move(m.value())});
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800725
726 // Now, update sorted_until_ to match the new message.
727 if (parts_message_reader_.newest_timestamp() >
728 monotonic_clock::min_time +
729 parts_message_reader_.max_out_of_order_duration()) {
730 sorted_until_ = parts_message_reader_.newest_timestamp() -
731 parts_message_reader_.max_out_of_order_duration();
732 } else {
733 sorted_until_ = monotonic_clock::min_time;
734 }
735 }
736 }
737
738 // Now that we have enough data queued, return a pointer to the oldest piece
739 // of data if it exists.
740 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -0800741 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800742 return nullptr;
743 }
744
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700745 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -0800746 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700747 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800748 return &(*messages_.begin());
749}
750
751void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
752
753std::string LogPartsSorter::DebugString() const {
754 std::stringstream ss;
755 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -0800756 int count = 0;
757 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800758 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -0800759 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
760 ss << m << "\n";
761 } else if (no_dots) {
762 ss << "...\n";
763 no_dots = false;
764 }
765 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800766 }
767 ss << "] <- " << parts_message_reader_.filename();
768 return ss.str();
769}
770
Austin Schuhd2f96102020-12-01 20:27:29 -0800771NodeMerger::NodeMerger(std::vector<LogParts> parts) {
772 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -0700773 // Enforce that we are sorting things only from a single node from a single
774 // boot.
775 const std::string_view part0_node = parts[0].node;
776 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -0800777 for (size_t i = 1; i < parts.size(); ++i) {
778 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -0700779 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
780 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -0800781 }
Austin Schuh715adc12021-06-29 22:07:39 -0700782
783 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
784
Austin Schuhd2f96102020-12-01 20:27:29 -0800785 for (LogParts &part : parts) {
786 parts_sorters_.emplace_back(std::move(part));
787 }
788
Austin Schuhd2f96102020-12-01 20:27:29 -0800789 monotonic_start_time_ = monotonic_clock::max_time;
790 realtime_start_time_ = realtime_clock::max_time;
791 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
792 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
793 monotonic_start_time_ = parts_sorter.monotonic_start_time();
794 realtime_start_time_ = parts_sorter.realtime_start_time();
795 }
796 }
797}
Austin Schuh8f52ed52020-11-30 23:12:39 -0800798
Austin Schuh0ca51f32020-12-25 21:51:45 -0800799std::vector<const LogParts *> NodeMerger::Parts() const {
800 std::vector<const LogParts *> p;
801 p.reserve(parts_sorters_.size());
802 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
803 p.emplace_back(&parts_sorter.parts());
804 }
805 return p;
806}
807
Austin Schuh8f52ed52020-11-30 23:12:39 -0800808Message *NodeMerger::Front() {
809 // Return the current Front if we have one, otherwise go compute one.
810 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -0800811 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700812 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -0800813 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800814 }
815
816 // Otherwise, do a simple search for the oldest message, deduplicating any
817 // duplicates.
818 Message *oldest = nullptr;
819 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800820 for (LogPartsSorter &parts_sorter : parts_sorters_) {
821 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -0800822 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800823 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -0800824 continue;
825 }
826 if (oldest == nullptr || *m < *oldest) {
827 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -0800828 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800829 } else if (*m == *oldest) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800830 // Found a duplicate. If there is a choice, we want the one which has the
831 // timestamp time.
832 if (!m->data.message().has_monotonic_timestamp_time()) {
833 parts_sorter.PopFront();
834 } else if (!oldest->data.message().has_monotonic_timestamp_time()) {
835 current_->PopFront();
836 current_ = &parts_sorter;
837 oldest = m;
838 } else {
839 CHECK_EQ(m->data.message().monotonic_timestamp_time(),
840 oldest->data.message().monotonic_timestamp_time());
841 parts_sorter.PopFront();
842 }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800843 }
844
845 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -0800846 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -0800847 }
848
Austin Schuhb000de62020-12-03 22:00:40 -0800849 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700850 CHECK_GE(oldest->timestamp.time, last_message_time_);
851 last_message_time_ = oldest->timestamp.time;
Austin Schuhb000de62020-12-03 22:00:40 -0800852 } else {
853 last_message_time_ = monotonic_clock::max_time;
854 }
855
Austin Schuh8f52ed52020-11-30 23:12:39 -0800856 // Return the oldest message found. This will be nullptr if nothing was
857 // found, indicating there is nothing left.
858 return oldest;
859}
860
861void NodeMerger::PopFront() {
862 CHECK(current_ != nullptr) << "Popping before calling Front()";
863 current_->PopFront();
864 current_ = nullptr;
865}
866
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700867BootMerger::BootMerger(std::vector<LogParts> files) {
868 std::vector<std::vector<LogParts>> boots;
869
870 // Now, we need to split things out by boot.
871 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700872 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700873 if (boot_count + 1 > boots.size()) {
874 boots.resize(boot_count + 1);
875 }
876 boots[boot_count].emplace_back(std::move(files[i]));
877 }
878
879 node_mergers_.reserve(boots.size());
880 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -0700881 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700882 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -0700883 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700884 }
885 node_mergers_.emplace_back(
886 std::make_unique<NodeMerger>(std::move(boots[i])));
887 }
888}
889
890Message *BootMerger::Front() {
891 Message *result = node_mergers_[index_]->Front();
892
893 if (result != nullptr) {
894 return result;
895 }
896
897 if (index_ + 1u == node_mergers_.size()) {
898 // At the end of the last node merger, just return.
899 return nullptr;
900 } else {
901 ++index_;
902 return Front();
903 }
904}
905
906void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
907
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700908std::vector<const LogParts *> BootMerger::Parts() const {
909 std::vector<const LogParts *> results;
910 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
911 std::vector<const LogParts *> node_parts = node_merger->Parts();
912
913 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
914 std::make_move_iterator(node_parts.end()));
915 }
916
917 return results;
918}
919
Austin Schuhd2f96102020-12-01 20:27:29 -0800920TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700921 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -0800922 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700923 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -0800924 if (!configuration_) {
925 configuration_ = part->config;
926 } else {
927 CHECK_EQ(configuration_.get(), part->config.get());
928 }
929 }
930 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -0800931 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
932 // pretty simple.
933 if (configuration::MultiNode(config)) {
934 nodes_data_.resize(config->nodes()->size());
935 const Node *my_node = config->nodes()->Get(node());
936 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
937 const Node *node = config->nodes()->Get(node_index);
938 NodeData *node_data = &nodes_data_[node_index];
939 node_data->channels.resize(config->channels()->size());
940 // We should save the channel if it is delivered to the node represented
941 // by the NodeData, but not sent by that node. That combo means it is
942 // forwarded.
943 size_t channel_index = 0;
944 node_data->any_delivered = false;
945 for (const Channel *channel : *config->channels()) {
946 node_data->channels[channel_index].delivered =
947 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -0800948 configuration::ChannelIsSendableOnNode(channel, my_node) &&
949 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -0800950 node_data->any_delivered = node_data->any_delivered ||
951 node_data->channels[channel_index].delivered;
952 ++channel_index;
953 }
954 }
955
956 for (const Channel *channel : *config->channels()) {
957 source_node_.emplace_back(configuration::GetNodeIndex(
958 config, channel->source_node()->string_view()));
959 }
960 }
961}
962
963void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -0800964 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -0800965 CHECK_NE(timestamp_mapper->node(), node());
966 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
967
968 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
969 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
970 // we could needlessly save data.
971 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -0800972 VLOG(1) << "Registering on node " << node() << " for peer node "
973 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -0800974 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
975
976 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -0700977
978 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -0800979 }
980}
981
Austin Schuh79b30942021-01-24 22:32:21 -0800982void TimestampMapper::QueueMessage(Message *m) {
983 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -0800984 .channel_index = m->channel_index,
985 .queue_index = m->queue_index,
986 .monotonic_event_time = m->timestamp,
987 .realtime_event_time = aos::realtime_clock::time_point(
988 std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
989 .remote_queue_index = 0xffffffff,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700990 .monotonic_remote_time = BootTimestamp::min_time(),
Austin Schuhd2f96102020-12-01 20:27:29 -0800991 .realtime_remote_time = realtime_clock::min_time,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700992 .monotonic_timestamp_time = BootTimestamp::min_time(),
Austin Schuh79b30942021-01-24 22:32:21 -0800993 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -0800994}
995
996TimestampedMessage *TimestampMapper::Front() {
997 // No need to fetch anything new. A previous message still exists.
998 switch (first_message_) {
999 case FirstMessage::kNeedsUpdate:
1000 break;
1001 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08001002 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001003 case FirstMessage::kNullptr:
1004 return nullptr;
1005 }
1006
Austin Schuh79b30942021-01-24 22:32:21 -08001007 if (matched_messages_.empty()) {
1008 if (!QueueMatched()) {
1009 first_message_ = FirstMessage::kNullptr;
1010 return nullptr;
1011 }
1012 }
1013 first_message_ = FirstMessage::kInMessage;
1014 return &matched_messages_.front();
1015}
1016
1017bool TimestampMapper::QueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08001018 if (nodes_data_.empty()) {
1019 // Simple path. We are single node, so there are no timestamps to match!
1020 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001021 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001022 if (!m) {
Austin Schuh79b30942021-01-24 22:32:21 -08001023 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001024 }
Austin Schuh79b30942021-01-24 22:32:21 -08001025 // Enqueue this message into matched_messages_ so we have a place to
1026 // associate remote timestamps, and return it.
1027 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08001028
Austin Schuh79b30942021-01-24 22:32:21 -08001029 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1030 last_message_time_ = matched_messages_.back().monotonic_event_time;
1031
1032 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001033 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08001034 timestamp_callback_(&matched_messages_.back());
1035 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001036 }
1037
1038 // We need to only add messages to the list so they get processed for messages
1039 // which are delivered. Reuse the flow below which uses messages_ by just
1040 // adding the new message to messages_ and continuing.
1041 if (messages_.empty()) {
1042 if (!Queue()) {
1043 // Found nothing to add, we are out of data!
Austin Schuh79b30942021-01-24 22:32:21 -08001044 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001045 }
1046
1047 // Now that it has been added (and cannibalized), forget about it upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001048 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001049 }
1050
1051 Message *m = &(messages_.front());
1052
1053 if (source_node_[m->channel_index] == node()) {
1054 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08001055 QueueMessage(m);
1056 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1057 last_message_time_ = matched_messages_.back().monotonic_event_time;
1058 messages_.pop_front();
1059 timestamp_callback_(&matched_messages_.back());
1060 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001061 } else {
1062 // Got a timestamp, find the matching remote data, match it, and return it.
1063 Message data = MatchingMessageFor(*m);
1064
1065 // Return the data from the remote. The local message only has timestamp
1066 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08001067 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08001068 .channel_index = m->channel_index,
1069 .queue_index = m->queue_index,
1070 .monotonic_event_time = m->timestamp,
1071 .realtime_event_time = aos::realtime_clock::time_point(
1072 std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
1073 .remote_queue_index = m->data.message().remote_queue_index(),
1074 .monotonic_remote_time =
Austin Schuh48507722021-07-17 17:29:24 -07001075 {m->monotonic_remote_boot,
1076 monotonic_clock::time_point(std::chrono::nanoseconds(
1077 m->data.message().monotonic_remote_time()))},
Austin Schuhd2f96102020-12-01 20:27:29 -08001078 .realtime_remote_time = realtime_clock::time_point(
1079 std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
Austin Schuh8bf1e632021-01-02 22:41:04 -08001080 .monotonic_timestamp_time =
Austin Schuh48507722021-07-17 17:29:24 -07001081 {m->monotonic_timestamp_boot,
1082 monotonic_clock::time_point(std::chrono::nanoseconds(
1083 m->data.message().monotonic_timestamp_time()))},
Austin Schuh79b30942021-01-24 22:32:21 -08001084 .data = std::move(data.data)});
1085 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1086 last_message_time_ = matched_messages_.back().monotonic_event_time;
1087 // Since messages_ holds the data, drop it.
1088 messages_.pop_front();
1089 timestamp_callback_(&matched_messages_.back());
1090 return true;
1091 }
1092}
1093
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001094void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08001095 while (last_message_time_ <= queue_time) {
1096 if (!QueueMatched()) {
1097 return;
1098 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001099 }
1100}
1101
Austin Schuhe639ea12021-01-25 13:00:22 -08001102void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001103 // Note: queueing for time doesn't really work well across boots. So we just
1104 // assume that if you are using this, you only care about the current boot.
1105 //
1106 // TODO(austin): Is that the right concept?
1107 //
Austin Schuhe639ea12021-01-25 13:00:22 -08001108 // Make sure we have something queued first. This makes the end time
1109 // calculation simpler, and is typically what folks want regardless.
1110 if (matched_messages_.empty()) {
1111 if (!QueueMatched()) {
1112 return;
1113 }
1114 }
1115
1116 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001117 std::max(monotonic_start_time(
1118 matched_messages_.front().monotonic_event_time.boot),
1119 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08001120 time_estimation_buffer;
1121
1122 // Place sorted messages on the list until we have
1123 // --time_estimation_buffer_seconds seconds queued up (but queue at least
1124 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001125 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08001126 if (!QueueMatched()) {
1127 return;
1128 }
1129 }
1130}
1131
Austin Schuhd2f96102020-12-01 20:27:29 -08001132void TimestampMapper::PopFront() {
1133 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
1134 first_message_ = FirstMessage::kNeedsUpdate;
1135
Austin Schuh79b30942021-01-24 22:32:21 -08001136 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001137}
1138
1139Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001140 // Figure out what queue index we are looking for.
1141 CHECK(message.data.message().has_remote_queue_index());
1142 const uint32_t remote_queue_index =
1143 message.data.message().remote_queue_index();
1144
1145 CHECK(message.data.message().has_monotonic_remote_time());
1146 CHECK(message.data.message().has_realtime_remote_time());
1147
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001148 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07001149 .boot = message.monotonic_remote_boot,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001150 .time = monotonic_clock::time_point(std::chrono::nanoseconds(
1151 message.data.message().monotonic_remote_time()))};
Austin Schuhd2f96102020-12-01 20:27:29 -08001152 const realtime_clock::time_point realtime_remote_time(
1153 std::chrono::nanoseconds(message.data.message().realtime_remote_time()));
1154
Austin Schuhfecf1d82020-12-19 16:57:28 -08001155 TimestampMapper *peer = nodes_data_[source_node_[message.channel_index]].peer;
1156
1157 // We only register the peers which we have data for. So, if we are being
1158 // asked to pull a timestamp from a peer which doesn't exist, return an empty
1159 // message.
1160 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001161 // TODO(austin): Make sure the tests hit all these paths with a boot count
1162 // of 1...
Austin Schuhfecf1d82020-12-19 16:57:28 -08001163 return Message{
1164 .channel_index = message.channel_index,
1165 .queue_index = remote_queue_index,
1166 .timestamp = monotonic_remote_time,
Austin Schuh48507722021-07-17 17:29:24 -07001167 .monotonic_remote_boot = 0xffffff,
1168 .monotonic_timestamp_boot = 0xffffff,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001169 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
1170 }
1171
1172 // The queue which will have the matching data, if available.
1173 std::deque<Message> *data_queue =
1174 &peer->nodes_data_[node()].channels[message.channel_index].messages;
1175
Austin Schuh79b30942021-01-24 22:32:21 -08001176 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08001177
1178 if (data_queue->empty()) {
1179 return Message{
1180 .channel_index = message.channel_index,
1181 .queue_index = remote_queue_index,
1182 .timestamp = monotonic_remote_time,
Austin Schuh48507722021-07-17 17:29:24 -07001183 .monotonic_remote_boot = 0xffffff,
1184 .monotonic_timestamp_boot = 0xffffff,
Austin Schuhd2f96102020-12-01 20:27:29 -08001185 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
1186 }
1187
Austin Schuhd2f96102020-12-01 20:27:29 -08001188 if (remote_queue_index < data_queue->front().queue_index ||
1189 remote_queue_index > data_queue->back().queue_index) {
1190 return Message{
1191 .channel_index = message.channel_index,
1192 .queue_index = remote_queue_index,
1193 .timestamp = monotonic_remote_time,
Austin Schuh48507722021-07-17 17:29:24 -07001194 .monotonic_remote_boot = 0xffffff,
1195 .monotonic_timestamp_boot = 0xffffff,
Austin Schuhd2f96102020-12-01 20:27:29 -08001196 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
1197 }
1198
Austin Schuh993ccb52020-12-12 15:59:32 -08001199 // The algorithm below is constant time with some assumptions. We need there
1200 // to be no missing messages in the data stream. This also assumes a queue
1201 // hasn't wrapped. That is conservative, but should let us get started.
1202 if (data_queue->back().queue_index - data_queue->front().queue_index + 1u ==
1203 data_queue->size()) {
1204 // Pull the data out and confirm that the timestamps match as expected.
1205 Message result = std::move(
1206 (*data_queue)[remote_queue_index - data_queue->front().queue_index]);
1207
1208 CHECK_EQ(result.timestamp, monotonic_remote_time)
1209 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1210 CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
1211 result.data.message().realtime_sent_time())),
1212 realtime_remote_time)
1213 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1214 // Now drop the data off the front. We have deduplicated timestamps, so we
1215 // are done. And all the data is in order.
1216 data_queue->erase(data_queue->begin(),
1217 data_queue->begin() + (1 + remote_queue_index -
1218 data_queue->front().queue_index));
1219 return result;
1220 } else {
1221 auto it = std::find_if(data_queue->begin(), data_queue->end(),
1222 [remote_queue_index](const Message &m) {
1223 return m.queue_index == remote_queue_index;
1224 });
1225 if (it == data_queue->end()) {
1226 return Message{
1227 .channel_index = message.channel_index,
1228 .queue_index = remote_queue_index,
1229 .timestamp = monotonic_remote_time,
Austin Schuh48507722021-07-17 17:29:24 -07001230 .monotonic_remote_boot = 0xffffff,
1231 .monotonic_timestamp_boot = 0xffffff,
Austin Schuh993ccb52020-12-12 15:59:32 -08001232 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
1233 }
1234
1235 Message result = std::move(*it);
1236
1237 CHECK_EQ(result.timestamp, monotonic_remote_time)
1238 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1239 CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
1240 result.data.message().realtime_sent_time())),
1241 realtime_remote_time)
1242 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1243
1244 data_queue->erase(it);
1245
1246 return result;
1247 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001248}
1249
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001250void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001251 if (queued_until_ > t) {
1252 return;
1253 }
1254 while (true) {
1255 if (!messages_.empty() && messages_.back().timestamp > t) {
1256 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
1257 return;
1258 }
1259
1260 if (!Queue()) {
1261 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001262 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08001263 return;
1264 }
1265
1266 // Now that it has been added (and cannibalized), forget about it upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001267 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001268 }
1269}
1270
1271bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001272 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001273 if (m == nullptr) {
1274 return false;
1275 }
1276 for (NodeData &node_data : nodes_data_) {
1277 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07001278 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08001279 if (node_data.channels[m->channel_index].delivered) {
1280 // TODO(austin): This copies the data... Probably not worth stressing
1281 // about yet.
1282 // TODO(austin): Bound how big this can get. We tend not to send massive
1283 // data, so we can probably ignore this for a bit.
1284 node_data.channels[m->channel_index].messages.emplace_back(*m);
1285 }
1286 }
1287
1288 messages_.emplace_back(std::move(*m));
1289 return true;
1290}
1291
1292std::string TimestampMapper::DebugString() const {
1293 std::stringstream ss;
1294 ss << "node " << node() << " [\n";
1295 for (const Message &message : messages_) {
1296 ss << " " << message << "\n";
1297 }
1298 ss << "] queued_until " << queued_until_;
1299 for (const NodeData &ns : nodes_data_) {
1300 if (ns.peer == nullptr) continue;
1301 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
1302 size_t channel_index = 0;
1303 for (const NodeData::ChannelData &channel_data :
1304 ns.peer->nodes_data_[node()].channels) {
1305 if (channel_data.messages.empty()) {
1306 continue;
1307 }
Austin Schuhb000de62020-12-03 22:00:40 -08001308
Austin Schuhd2f96102020-12-01 20:27:29 -08001309 ss << " channel " << channel_index << " [\n";
1310 for (const Message &m : channel_data.messages) {
1311 ss << " " << m << "\n";
1312 }
1313 ss << " ]\n";
1314 ++channel_index;
1315 }
1316 ss << "] queued_until " << ns.peer->queued_until_;
1317 }
1318 return ss.str();
1319}
1320
Austin Schuhee711052020-08-24 16:06:09 -07001321std::string MaybeNodeName(const Node *node) {
1322 if (node != nullptr) {
1323 return node->name()->str() + " ";
1324 }
1325 return "";
1326}
1327
Brian Silvermanf51499a2020-09-21 12:49:08 -07001328} // namespace aos::logger