blob: 31fdc83cf85289b257c4e21d1d0c2e046c6b7eca [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Austin Schuha36c8902019-12-30 18:07:15 -080010
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
Austin Schuhfa895892020-01-07 20:07:41 -080013#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080014#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080015#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080016#include "gflags/gflags.h"
17#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080018
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070019#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070020#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070021#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070022#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070023#else
24#define ENABLE_LZMA 0
25#endif
26
27#if ENABLE_LZMA
28#include "aos/events/logging/lzma_encoder.h"
29#endif
30
Austin Schuh7fbf5a72020-09-21 16:28:13 -070031DEFINE_int32(flush_size, 128000,
Austin Schuha36c8902019-12-30 18:07:15 -080032 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070033DEFINE_double(
34 flush_period, 5.0,
35 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080036
Austin Schuha040c3f2021-02-13 16:09:07 -080037DEFINE_double(
38 max_out_of_order, -1,
39 "If set, this overrides the max out of order duration for a log file.");
40
Austin Schuh0e8db662021-07-06 10:43:47 -070041DEFINE_bool(workaround_double_headers, true,
42 "Some old log files have two headers at the beginning. Use the "
43 "last header as the actual header.");
44
Brian Silvermanf51499a2020-09-21 12:49:08 -070045namespace aos::logger {
Austin Schuha36c8902019-12-30 18:07:15 -080046
Austin Schuh05b70472020-01-01 17:11:17 -080047namespace chrono = std::chrono;
48
Brian Silvermanf51499a2020-09-21 12:49:08 -070049DetachedBufferWriter::DetachedBufferWriter(
50 std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
51 : filename_(filename), encoder_(std::move(encoder)) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -070052 if (!util::MkdirPIfSpace(filename, 0777)) {
53 ran_out_of_space_ = true;
54 } else {
55 fd_ = open(std::string(filename).c_str(),
56 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
57 if (fd_ == -1 && errno == ENOSPC) {
58 ran_out_of_space_ = true;
59 } else {
Austin Schuh58646e22021-08-23 23:51:46 -070060 PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
61 << " for writing";
62 VLOG(1) << "Opened " << this->filename() << " for writing";
Brian Silvermana9f2ec92020-10-06 18:00:53 -070063 }
64 }
Austin Schuha36c8902019-12-30 18:07:15 -080065}
66
67DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -070068 Close();
69 if (ran_out_of_space_) {
70 CHECK(acknowledge_ran_out_of_space_)
71 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -070072 }
Austin Schuh2f8fd752020-09-01 22:38:28 -070073}
74
Brian Silvermand90905f2020-09-23 14:42:56 -070075DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070076 *this = std::move(other);
77}
78
Brian Silverman87ac0402020-09-17 14:47:01 -070079// When other is destroyed "soon" (which it should be because we're getting an
80// rvalue reference to it), it will flush etc all the data we have queued up
81// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -070082DetachedBufferWriter &DetachedBufferWriter::operator=(
83 DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070084 std::swap(filename_, other.filename_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070085 std::swap(encoder_, other.encoder_);
Austin Schuh2f8fd752020-09-01 22:38:28 -070086 std::swap(fd_, other.fd_);
Brian Silverman0465fcf2020-09-24 00:29:18 -070087 std::swap(ran_out_of_space_, other.ran_out_of_space_);
88 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh2f8fd752020-09-01 22:38:28 -070089 std::swap(iovec_, other.iovec_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070090 std::swap(max_write_time_, other.max_write_time_);
91 std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
92 std::swap(max_write_time_messages_, other.max_write_time_messages_);
93 std::swap(total_write_time_, other.total_write_time_);
94 std::swap(total_write_count_, other.total_write_count_);
95 std::swap(total_write_messages_, other.total_write_messages_);
96 std::swap(total_write_bytes_, other.total_write_bytes_);
Austin Schuh2f8fd752020-09-01 22:38:28 -070097 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -080098}
99
Brian Silvermanf51499a2020-09-21 12:49:08 -0700100void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700101 if (ran_out_of_space_) {
102 // We don't want any later data to be written after space becomes
103 // available, so refuse to write anything more once we've dropped data
104 // because we ran out of space.
105 VLOG(1) << "Ignoring span: " << span.size();
106 return;
107 }
108
Austin Schuhbd06ae42021-03-31 22:48:21 -0700109 aos::monotonic_clock::time_point now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700110 if (encoder_->may_bypass() && span.size() > 4096u) {
111 // Over this threshold, we'll assume it's cheaper to add an extra
112 // syscall to write the data immediately instead of copying it to
113 // enqueue.
Austin Schuha36c8902019-12-30 18:07:15 -0800114
Brian Silvermanf51499a2020-09-21 12:49:08 -0700115 // First, flush everything.
116 while (encoder_->queue_size() > 0u) {
117 Flush();
118 }
Austin Schuhde031b72020-01-10 19:34:41 -0800119
Brian Silvermanf51499a2020-09-21 12:49:08 -0700120 // Then, write it directly.
121 const auto start = aos::monotonic_clock::now();
122 const ssize_t written = write(fd_, span.data(), span.size());
123 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700124 HandleWriteReturn(written, span.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700125 UpdateStatsForWrite(end - start, written, 1);
Austin Schuhbd06ae42021-03-31 22:48:21 -0700126 now = end;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700127 } else {
128 encoder_->Encode(CopySpanAsDetachedBuffer(span));
Austin Schuhbd06ae42021-03-31 22:48:21 -0700129 now = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800130 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700131
Austin Schuhbd06ae42021-03-31 22:48:21 -0700132 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800133}
134
Brian Silverman0465fcf2020-09-24 00:29:18 -0700135void DetachedBufferWriter::Close() {
136 if (fd_ == -1) {
137 return;
138 }
139 encoder_->Finish();
140 while (encoder_->queue_size() > 0) {
141 Flush();
142 }
143 if (close(fd_) == -1) {
144 if (errno == ENOSPC) {
145 ran_out_of_space_ = true;
146 } else {
147 PLOG(ERROR) << "Closing log file failed";
148 }
149 }
150 fd_ = -1;
Austin Schuh58646e22021-08-23 23:51:46 -0700151 VLOG(1) << "Closed " << filename();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700152}
153
Austin Schuha36c8902019-12-30 18:07:15 -0800154void DetachedBufferWriter::Flush() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700155 if (ran_out_of_space_) {
156 // We don't want any later data to be written after space becomes available,
157 // so refuse to write anything more once we've dropped data because we ran
158 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700159 if (encoder_) {
160 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
161 encoder_->Clear(encoder_->queue().size());
162 } else {
163 VLOG(1) << "No queue to ignore";
164 }
165 return;
166 }
167
168 const auto queue = encoder_->queue();
169 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700170 return;
171 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700172
Austin Schuha36c8902019-12-30 18:07:15 -0800173 iovec_.clear();
Brian Silvermanf51499a2020-09-21 12:49:08 -0700174 const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
175 iovec_.resize(iovec_size);
Austin Schuha36c8902019-12-30 18:07:15 -0800176 size_t counted_size = 0;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700177 for (size_t i = 0; i < iovec_size; ++i) {
178 iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
179 iovec_[i].iov_len = queue[i].size();
180 counted_size += iovec_[i].iov_len;
Austin Schuha36c8902019-12-30 18:07:15 -0800181 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700182
183 const auto start = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800184 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700185 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700186 HandleWriteReturn(written, counted_size);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700187
188 encoder_->Clear(iovec_size);
189
190 UpdateStatsForWrite(end - start, written, iovec_size);
191}
192
Brian Silverman0465fcf2020-09-24 00:29:18 -0700193void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
194 size_t write_size) {
195 if (write_return == -1 && errno == ENOSPC) {
196 ran_out_of_space_ = true;
197 return;
198 }
199 PCHECK(write_return >= 0) << ": write failed";
200 if (write_return < static_cast<ssize_t>(write_size)) {
201 // Sometimes this happens instead of ENOSPC. On a real filesystem, this
202 // never seems to happen in any other case. If we ever want to log to a
203 // socket, this will happen more often. However, until we get there, we'll
204 // just assume it means we ran out of space.
205 ran_out_of_space_ = true;
206 return;
207 }
208}
209
Brian Silvermanf51499a2020-09-21 12:49:08 -0700210void DetachedBufferWriter::UpdateStatsForWrite(
211 aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
212 if (duration > max_write_time_) {
213 max_write_time_ = duration;
214 max_write_time_bytes_ = written;
215 max_write_time_messages_ = iovec_size;
216 }
217 total_write_time_ += duration;
218 ++total_write_count_;
219 total_write_messages_ += iovec_size;
220 total_write_bytes_ += written;
221}
222
Austin Schuhbd06ae42021-03-31 22:48:21 -0700223void DetachedBufferWriter::FlushAtThreshold(
224 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700225 if (ran_out_of_space_) {
226 // We don't want any later data to be written after space becomes available,
227 // so refuse to write anything more once we've dropped data because we ran
228 // out of space.
229 if (encoder_) {
230 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
231 encoder_->Clear(encoder_->queue().size());
232 } else {
233 VLOG(1) << "No queue to ignore";
234 }
235 return;
236 }
237
Austin Schuhbd06ae42021-03-31 22:48:21 -0700238 // We don't want to flush the first time through. Otherwise we will flush as
239 // the log file header might be compressing, defeating any parallelism and
240 // queueing there.
241 if (last_flush_time_ == aos::monotonic_clock::min_time) {
242 last_flush_time_ = now;
243 }
244
Brian Silvermanf51499a2020-09-21 12:49:08 -0700245 // Flush if we are at the max number of iovs per writev, because there's no
246 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700247 // data queued up or if it has been long enough.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700248 while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700249 encoder_->queue_size() >= IOV_MAX ||
250 now > last_flush_time_ +
251 chrono::duration_cast<chrono::nanoseconds>(
252 chrono::duration<double>(FLAGS_flush_period))) {
253 last_flush_time_ = now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700254 Flush();
255 }
Austin Schuha36c8902019-12-30 18:07:15 -0800256}
257
258flatbuffers::Offset<MessageHeader> PackMessage(
259 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
260 int channel_index, LogType log_type) {
261 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
262
263 switch (log_type) {
264 case LogType::kLogMessage:
265 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800266 case LogType::kLogRemoteMessage:
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700267 data_offset = fbb->CreateVector(
268 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800269 break;
270
271 case LogType::kLogDeliveryTimeOnly:
272 break;
273 }
274
275 MessageHeader::Builder message_header_builder(*fbb);
276 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800277
278 switch (log_type) {
279 case LogType::kLogRemoteMessage:
280 message_header_builder.add_queue_index(context.remote_queue_index);
281 message_header_builder.add_monotonic_sent_time(
282 context.monotonic_remote_time.time_since_epoch().count());
283 message_header_builder.add_realtime_sent_time(
284 context.realtime_remote_time.time_since_epoch().count());
285 break;
286
287 case LogType::kLogMessage:
288 case LogType::kLogMessageAndDeliveryTime:
289 case LogType::kLogDeliveryTimeOnly:
290 message_header_builder.add_queue_index(context.queue_index);
291 message_header_builder.add_monotonic_sent_time(
292 context.monotonic_event_time.time_since_epoch().count());
293 message_header_builder.add_realtime_sent_time(
294 context.realtime_event_time.time_since_epoch().count());
295 break;
296 }
Austin Schuha36c8902019-12-30 18:07:15 -0800297
298 switch (log_type) {
299 case LogType::kLogMessage:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800300 case LogType::kLogRemoteMessage:
Austin Schuha36c8902019-12-30 18:07:15 -0800301 message_header_builder.add_data(data_offset);
302 break;
303
304 case LogType::kLogMessageAndDeliveryTime:
305 message_header_builder.add_data(data_offset);
306 [[fallthrough]];
307
308 case LogType::kLogDeliveryTimeOnly:
309 message_header_builder.add_monotonic_remote_time(
310 context.monotonic_remote_time.time_since_epoch().count());
311 message_header_builder.add_realtime_remote_time(
312 context.realtime_remote_time.time_since_epoch().count());
313 message_header_builder.add_remote_queue_index(context.remote_queue_index);
314 break;
315 }
316
317 return message_header_builder.Finish();
318}
319
Brian Silvermanf51499a2020-09-21 12:49:08 -0700320SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
Tyler Chatow2015bc62021-08-04 21:15:09 -0700321 decoder_ = std::make_unique<DummyDecoder>(filename);
322
323 static constexpr std::string_view kXz = ".xz";
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700324 if (filename.substr(filename.size() - kXz.size()) == kXz) {
325#if ENABLE_LZMA
Tyler Chatow2015bc62021-08-04 21:15:09 -0700326 decoder_ = std::make_unique<ThreadedLzmaDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700327#else
328 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
329#endif
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700330 }
Austin Schuh05b70472020-01-01 17:11:17 -0800331}
332
Austin Schuhcf5f6442021-07-06 10:43:28 -0700333absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800334 // Make sure we have enough for the size.
335 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
336 if (!ReadBlock()) {
337 return absl::Span<const uint8_t>();
338 }
339 }
340
341 // Now make sure we have enough for the message.
342 const size_t data_size =
343 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
344 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -0800345 if (data_size == sizeof(flatbuffers::uoffset_t)) {
346 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
347 LOG(ERROR) << " Rest of log file is "
348 << absl::BytesToHexString(std::string_view(
349 reinterpret_cast<const char *>(data_.data() +
350 consumed_data_),
351 data_.size() - consumed_data_));
352 return absl::Span<const uint8_t>();
353 }
Austin Schuh05b70472020-01-01 17:11:17 -0800354 while (data_.size() < consumed_data_ + data_size) {
355 if (!ReadBlock()) {
356 return absl::Span<const uint8_t>();
357 }
358 }
359
360 // And return it, consuming the data.
361 const uint8_t *data_ptr = data_.data() + consumed_data_;
362
Austin Schuh05b70472020-01-01 17:11:17 -0800363 return absl::Span<const uint8_t>(data_ptr, data_size);
364}
365
Austin Schuhcf5f6442021-07-06 10:43:28 -0700366void SpanReader::ConsumeMessage() {
367 consumed_data_ +=
368 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
369 sizeof(flatbuffers::uoffset_t);
370}
371
372absl::Span<const uint8_t> SpanReader::ReadMessage() {
373 absl::Span<const uint8_t> result = PeekMessage();
374 if (result != absl::Span<const uint8_t>()) {
375 ConsumeMessage();
376 }
377 return result;
378}
379
Austin Schuh05b70472020-01-01 17:11:17 -0800380bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700381 // This is the amount of data we grab at a time. Doing larger chunks minimizes
382 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -0800383 constexpr size_t kReadSize = 256 * 1024;
384
385 // Strip off any unused data at the front.
386 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700387 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -0800388 consumed_data_ = 0;
389 }
390
391 const size_t starting_size = data_.size();
392
393 // This should automatically grow the backing store. It won't shrink if we
394 // get a small chunk later. This reduces allocations when we want to append
395 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700396 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -0800397
Brian Silvermanf51499a2020-09-21 12:49:08 -0700398 const size_t count =
399 decoder_->Read(data_.begin() + starting_size, data_.end());
400 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -0800401 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -0800402 return false;
403 }
Austin Schuh05b70472020-01-01 17:11:17 -0800404
405 return true;
406}
407
Austin Schuhadd6eb32020-11-09 21:24:26 -0800408std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -0700409 SpanReader *span_reader) {
410 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800411
412 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800413 if (config_data == absl::Span<const uint8_t>()) {
414 return std::nullopt;
415 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800416
Austin Schuh5212cad2020-09-09 23:12:09 -0700417 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700418 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -0800419 if (!result.Verify()) {
420 return std::nullopt;
421 }
Austin Schuh0e8db662021-07-06 10:43:47 -0700422
423 if (FLAGS_workaround_double_headers) {
424 while (true) {
425 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
426 if (maybe_header_data == absl::Span<const uint8_t>()) {
427 break;
428 }
429
430 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
431 maybe_header_data);
432 if (maybe_header.Verify()) {
433 LOG(WARNING) << "Found duplicate LogFileHeader in "
434 << span_reader->filename();
435 ResizeableBuffer header_data_copy;
436 header_data_copy.resize(maybe_header_data.size());
437 memcpy(header_data_copy.data(), maybe_header_data.begin(),
438 header_data_copy.size());
439 result = SizePrefixedFlatbufferVector<LogFileHeader>(
440 std::move(header_data_copy));
441
442 span_reader->ConsumeMessage();
443 } else {
444 break;
445 }
446 }
447 }
Austin Schuhe09beb12020-12-11 20:04:27 -0800448 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800449}
450
Austin Schuh0e8db662021-07-06 10:43:47 -0700451std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
452 std::string_view filename) {
453 SpanReader span_reader(filename);
454 return ReadHeader(&span_reader);
455}
456
Austin Schuhadd6eb32020-11-09 21:24:26 -0800457std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -0800458 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -0700459 SpanReader span_reader(filename);
460 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
461 for (size_t i = 0; i < n + 1; ++i) {
462 data_span = span_reader.ReadMessage();
463
464 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800465 if (data_span == absl::Span<const uint8_t>()) {
466 return std::nullopt;
467 }
Austin Schuh5212cad2020-09-09 23:12:09 -0700468 }
469
Brian Silverman354697a2020-09-22 21:06:32 -0700470 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700471 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -0800472 if (!result.Verify()) {
473 return std::nullopt;
474 }
475 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -0700476}
477
Austin Schuh05b70472020-01-01 17:11:17 -0800478MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -0700479 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -0800480 raw_log_file_header_(
481 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -0700482 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
483 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -0800484
485 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -0700486 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -0800487
Austin Schuh0e8db662021-07-06 10:43:47 -0700488 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -0800489
Austin Schuh5b728b72021-06-16 14:57:15 -0700490 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
491
Austin Schuhcde938c2020-02-02 17:30:07 -0800492 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -0800493 FLAGS_max_out_of_order > 0
494 ? chrono::duration_cast<chrono::nanoseconds>(
495 chrono::duration<double>(FLAGS_max_out_of_order))
496 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -0800497
498 VLOG(1) << "Opened " << filename << " as node "
499 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -0800500}
501
Austin Schuhadd6eb32020-11-09 21:24:26 -0800502std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
503MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800504 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
505 if (msg_data == absl::Span<const uint8_t>()) {
506 return std::nullopt;
507 }
508
Austin Schuhb929c4e2021-07-12 15:32:53 -0700509 SizePrefixedFlatbufferVector<MessageHeader> result(msg_data);
Austin Schuh05b70472020-01-01 17:11:17 -0800510
Austin Schuh0e8db662021-07-06 10:43:47 -0700511 CHECK(result.Verify()) << ": Corrupted message from " << filename();
512
Austin Schuh05b70472020-01-01 17:11:17 -0800513 const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
514 chrono::nanoseconds(result.message().monotonic_sent_time()));
515
516 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuh8bd96322020-02-13 21:18:22 -0800517 VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(result);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800518 return std::move(result);
Austin Schuh05b70472020-01-01 17:11:17 -0800519}
520
Austin Schuhc41603c2020-10-11 16:17:37 -0700521PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -0700522 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
Brian Silvermanfee16972021-09-14 12:06:38 -0700523 if (parts_.parts.size() >= 2) {
524 next_message_reader_.emplace(parts_.parts[1]);
525 }
Austin Schuh48507722021-07-17 17:29:24 -0700526 ComputeBootCounts();
527}
528
529void PartsMessageReader::ComputeBootCounts() {
530 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
531 std::nullopt);
532
533 // We have 3 vintages of log files with different amounts of information.
534 if (log_file_header()->has_boot_uuids()) {
535 // The new hotness with the boots explicitly listed out. We can use the log
536 // file header to compute the boot count of all relevant nodes.
537 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
538 size_t node_index = 0;
539 for (const flatbuffers::String *boot_uuid :
540 *log_file_header()->boot_uuids()) {
541 CHECK(parts_.boots);
542 if (boot_uuid->size() != 0) {
543 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
544 if (it != parts_.boots->boot_count_map.end()) {
545 boot_counts_[node_index] = it->second;
546 }
547 } else if (parts().boots->boots[node_index].size() == 1u) {
548 boot_counts_[node_index] = 0;
549 }
550 ++node_index;
551 }
552 } else {
553 // Older multi-node logs which are guarenteed to have UUIDs logged, or
554 // single node log files with boot UUIDs in the header. We only know how to
555 // order certain boots in certain circumstances.
556 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
557 for (size_t node_index = 0; node_index < boot_counts_.size();
558 ++node_index) {
559 CHECK(parts_.boots);
560 if (parts().boots->boots[node_index].size() == 1u) {
561 boot_counts_[node_index] = 0;
562 }
563 }
564 } else {
565 // Really old single node logs without any UUIDs. They can't reboot.
566 CHECK_EQ(boot_counts_.size(), 1u);
567 boot_counts_[0] = 0u;
568 }
569 }
570}
Austin Schuhc41603c2020-10-11 16:17:37 -0700571
Austin Schuhadd6eb32020-11-09 21:24:26 -0800572std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
Austin Schuhc41603c2020-10-11 16:17:37 -0700573PartsMessageReader::ReadMessage() {
574 while (!done_) {
Austin Schuhadd6eb32020-11-09 21:24:26 -0800575 std::optional<SizePrefixedFlatbufferVector<MessageHeader>> message =
Austin Schuhc41603c2020-10-11 16:17:37 -0700576 message_reader_.ReadMessage();
577 if (message) {
578 newest_timestamp_ = message_reader_.newest_timestamp();
Austin Schuh32f68492020-11-08 21:45:51 -0800579 const monotonic_clock::time_point monotonic_sent_time(
580 chrono::nanoseconds(message->message().monotonic_sent_time()));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800581 // TODO(austin): Does this work with startup? Might need to use the start
582 // time.
583 // TODO(austin): Does this work with startup when we don't know the remote
584 // start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -0800585 if (monotonic_sent_time >
586 parts_.monotonic_start_time + max_out_of_order_duration()) {
587 after_start_ = true;
588 }
589 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -0800590 CHECK_GE(monotonic_sent_time,
591 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -0800592 << ": Max out of order of " << max_out_of_order_duration().count()
593 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -0800594 << parts_.monotonic_start_time << " currently reading "
595 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -0800596 }
Austin Schuhc41603c2020-10-11 16:17:37 -0700597 return message;
598 }
599 NextLog();
600 }
Austin Schuh32f68492020-11-08 21:45:51 -0800601 newest_timestamp_ = monotonic_clock::max_time;
Austin Schuhc41603c2020-10-11 16:17:37 -0700602 return std::nullopt;
603}
604
605void PartsMessageReader::NextLog() {
606 if (next_part_index_ == parts_.parts.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -0700607 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -0700608 done_ = true;
609 return;
610 }
Brian Silvermanfee16972021-09-14 12:06:38 -0700611 CHECK(next_message_reader_);
612 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -0700613 ComputeBootCounts();
Brian Silvermanfee16972021-09-14 12:06:38 -0700614 if (next_part_index_ + 1 < parts_.parts.size()) {
615 next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
616 } else {
617 next_message_reader_.reset();
618 }
Austin Schuhc41603c2020-10-11 16:17:37 -0700619 ++next_part_index_;
620}
621
Austin Schuh1be0ce42020-11-29 22:43:26 -0800622bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700623 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700624
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700625 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -0800626 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700627 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -0800628 return false;
629 }
630
631 if (this->channel_index < m2.channel_index) {
632 return true;
633 } else if (this->channel_index > m2.channel_index) {
634 return false;
635 }
636
637 return this->queue_index < m2.queue_index;
638}
639
640bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800641bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700642 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700643
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700644 return timestamp.time == m2.timestamp.time &&
645 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800646}
Austin Schuh1be0ce42020-11-29 22:43:26 -0800647
648std::ostream &operator<<(std::ostream &os, const Message &m) {
649 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700650 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Austin Schuhd2f96102020-12-01 20:27:29 -0800651 if (m.data.Verify()) {
652 os << ", .data="
653 << aos::FlatbufferToJson(m.data,
654 {.multi_line = false, .max_vector_size = 1});
655 }
656 os << "}";
657 return os;
658}
659
660std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
661 os << "{.channel_index=" << m.channel_index
662 << ", .queue_index=" << m.queue_index
663 << ", .monotonic_event_time=" << m.monotonic_event_time
664 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -0700665 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800666 os << ", .remote_queue_index=" << m.remote_queue_index;
667 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700668 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800669 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
670 }
671 if (m.realtime_remote_time != realtime_clock::min_time) {
672 os << ", .realtime_remote_time=" << m.realtime_remote_time;
673 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700674 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800675 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
676 }
Austin Schuhd2f96102020-12-01 20:27:29 -0800677 if (m.data.Verify()) {
678 os << ", .data="
679 << aos::FlatbufferToJson(m.data,
680 {.multi_line = false, .max_vector_size = 1});
681 }
682 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -0800683 return os;
684}
685
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800686LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -0700687 : parts_message_reader_(log_parts),
688 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
689}
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800690
691Message *LogPartsSorter::Front() {
692 // Queue up data until enough data has been queued that the front message is
693 // sorted enough to be safe to pop. This may do nothing, so we should make
694 // sure the nothing path is checked quickly.
695 if (sorted_until() != monotonic_clock::max_time) {
696 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -0700697 if (!messages_.empty() &&
698 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -0800699 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800700 break;
701 }
702
703 std::optional<SizePrefixedFlatbufferVector<MessageHeader>> m =
704 parts_message_reader_.ReadMessage();
705 // No data left, sorted forever, work through what is left.
706 if (!m) {
707 sorted_until_ = monotonic_clock::max_time;
708 break;
709 }
710
Austin Schuh48507722021-07-17 17:29:24 -0700711 size_t monotonic_timestamp_boot = 0;
712 if (m.value().message().has_monotonic_timestamp_time()) {
713 monotonic_timestamp_boot = parts().logger_boot_count;
714 }
715 size_t monotonic_remote_boot = 0xffffff;
716
717 if (m.value().message().has_monotonic_remote_time()) {
milind-ua50344f2021-08-25 18:22:20 -0700718 const Node *node = parts().config->nodes()->Get(
719 source_node_index_[m->message().channel_index()]);
720
Austin Schuh48507722021-07-17 17:29:24 -0700721 std::optional<size_t> boot = parts_message_reader_.boot_count(
722 source_node_index_[m->message().channel_index()]);
milind-ua50344f2021-08-25 18:22:20 -0700723 CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
724 << ", with index "
Austin Schuh48507722021-07-17 17:29:24 -0700725 << source_node_index_[m->message().channel_index()];
726 monotonic_remote_boot = *boot;
727 }
728
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700729 messages_.insert(Message{
730 .channel_index = m.value().message().channel_index(),
Austin Schuh58646e22021-08-23 23:51:46 -0700731 .queue_index =
732 BootQueueIndex{.boot = parts().boot_count,
733 .index = m.value().message().queue_index()},
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700734 .timestamp =
735 BootTimestamp{
736 .boot = parts().boot_count,
737 .time = monotonic_clock::time_point(std::chrono::nanoseconds(
738 m.value().message().monotonic_sent_time()))},
Austin Schuh48507722021-07-17 17:29:24 -0700739 .monotonic_remote_boot = monotonic_remote_boot,
740 .monotonic_timestamp_boot = monotonic_timestamp_boot,
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700741 .data = std::move(m.value())});
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800742
743 // Now, update sorted_until_ to match the new message.
744 if (parts_message_reader_.newest_timestamp() >
745 monotonic_clock::min_time +
746 parts_message_reader_.max_out_of_order_duration()) {
747 sorted_until_ = parts_message_reader_.newest_timestamp() -
748 parts_message_reader_.max_out_of_order_duration();
749 } else {
750 sorted_until_ = monotonic_clock::min_time;
751 }
752 }
753 }
754
755 // Now that we have enough data queued, return a pointer to the oldest piece
756 // of data if it exists.
757 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -0800758 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800759 return nullptr;
760 }
761
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700762 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -0800763 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700764 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800765 return &(*messages_.begin());
766}
767
768void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
769
770std::string LogPartsSorter::DebugString() const {
771 std::stringstream ss;
772 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -0800773 int count = 0;
774 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800775 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -0800776 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
777 ss << m << "\n";
778 } else if (no_dots) {
779 ss << "...\n";
780 no_dots = false;
781 }
782 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800783 }
784 ss << "] <- " << parts_message_reader_.filename();
785 return ss.str();
786}
787
Austin Schuhd2f96102020-12-01 20:27:29 -0800788NodeMerger::NodeMerger(std::vector<LogParts> parts) {
789 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -0700790 // Enforce that we are sorting things only from a single node from a single
791 // boot.
792 const std::string_view part0_node = parts[0].node;
793 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -0800794 for (size_t i = 1; i < parts.size(); ++i) {
795 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -0700796 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
797 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -0800798 }
Austin Schuh715adc12021-06-29 22:07:39 -0700799
800 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
801
Austin Schuhd2f96102020-12-01 20:27:29 -0800802 for (LogParts &part : parts) {
803 parts_sorters_.emplace_back(std::move(part));
804 }
805
Austin Schuhd2f96102020-12-01 20:27:29 -0800806 monotonic_start_time_ = monotonic_clock::max_time;
807 realtime_start_time_ = realtime_clock::max_time;
808 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -0700809 // We want to capture the earliest meaningful start time here. The start
810 // time defaults to min_time when there's no meaningful value to report, so
811 // let's ignore those.
812 if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time &&
813 parts_sorter.monotonic_start_time() < monotonic_start_time_) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800814 monotonic_start_time_ = parts_sorter.monotonic_start_time();
815 realtime_start_time_ = parts_sorter.realtime_start_time();
816 }
817 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -0700818
819 // If there was no meaningful start time reported, just use min_time.
820 if (monotonic_start_time_ == monotonic_clock::max_time) {
821 monotonic_start_time_ = monotonic_clock::min_time;
822 realtime_start_time_ = realtime_clock::min_time;
823 }
Austin Schuhd2f96102020-12-01 20:27:29 -0800824}
Austin Schuh8f52ed52020-11-30 23:12:39 -0800825
Austin Schuh0ca51f32020-12-25 21:51:45 -0800826std::vector<const LogParts *> NodeMerger::Parts() const {
827 std::vector<const LogParts *> p;
828 p.reserve(parts_sorters_.size());
829 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
830 p.emplace_back(&parts_sorter.parts());
831 }
832 return p;
833}
834
Austin Schuh8f52ed52020-11-30 23:12:39 -0800835Message *NodeMerger::Front() {
836 // Return the current Front if we have one, otherwise go compute one.
837 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -0800838 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700839 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -0800840 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800841 }
842
843 // Otherwise, do a simple search for the oldest message, deduplicating any
844 // duplicates.
845 Message *oldest = nullptr;
846 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800847 for (LogPartsSorter &parts_sorter : parts_sorters_) {
848 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -0800849 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800850 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -0800851 continue;
852 }
853 if (oldest == nullptr || *m < *oldest) {
854 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -0800855 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800856 } else if (*m == *oldest) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800857 // Found a duplicate. If there is a choice, we want the one which has the
858 // timestamp time.
859 if (!m->data.message().has_monotonic_timestamp_time()) {
860 parts_sorter.PopFront();
861 } else if (!oldest->data.message().has_monotonic_timestamp_time()) {
862 current_->PopFront();
863 current_ = &parts_sorter;
864 oldest = m;
865 } else {
866 CHECK_EQ(m->data.message().monotonic_timestamp_time(),
867 oldest->data.message().monotonic_timestamp_time());
868 parts_sorter.PopFront();
869 }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800870 }
871
872 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -0800873 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -0800874 }
875
Austin Schuhb000de62020-12-03 22:00:40 -0800876 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700877 CHECK_GE(oldest->timestamp.time, last_message_time_);
878 last_message_time_ = oldest->timestamp.time;
Austin Schuhb000de62020-12-03 22:00:40 -0800879 } else {
880 last_message_time_ = monotonic_clock::max_time;
881 }
882
Austin Schuh8f52ed52020-11-30 23:12:39 -0800883 // Return the oldest message found. This will be nullptr if nothing was
884 // found, indicating there is nothing left.
885 return oldest;
886}
887
888void NodeMerger::PopFront() {
889 CHECK(current_ != nullptr) << "Popping before calling Front()";
890 current_->PopFront();
891 current_ = nullptr;
892}
893
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700894BootMerger::BootMerger(std::vector<LogParts> files) {
895 std::vector<std::vector<LogParts>> boots;
896
897 // Now, we need to split things out by boot.
898 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700899 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700900 if (boot_count + 1 > boots.size()) {
901 boots.resize(boot_count + 1);
902 }
903 boots[boot_count].emplace_back(std::move(files[i]));
904 }
905
906 node_mergers_.reserve(boots.size());
907 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -0700908 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700909 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -0700910 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700911 }
912 node_mergers_.emplace_back(
913 std::make_unique<NodeMerger>(std::move(boots[i])));
914 }
915}
916
917Message *BootMerger::Front() {
918 Message *result = node_mergers_[index_]->Front();
919
920 if (result != nullptr) {
921 return result;
922 }
923
924 if (index_ + 1u == node_mergers_.size()) {
925 // At the end of the last node merger, just return.
926 return nullptr;
927 } else {
928 ++index_;
929 return Front();
930 }
931}
932
933void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
934
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700935std::vector<const LogParts *> BootMerger::Parts() const {
936 std::vector<const LogParts *> results;
937 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
938 std::vector<const LogParts *> node_parts = node_merger->Parts();
939
940 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
941 std::make_move_iterator(node_parts.end()));
942 }
943
944 return results;
945}
946
Austin Schuhd2f96102020-12-01 20:27:29 -0800947TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700948 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -0800949 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700950 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -0800951 if (!configuration_) {
952 configuration_ = part->config;
953 } else {
954 CHECK_EQ(configuration_.get(), part->config.get());
955 }
956 }
957 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -0800958 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
959 // pretty simple.
960 if (configuration::MultiNode(config)) {
961 nodes_data_.resize(config->nodes()->size());
962 const Node *my_node = config->nodes()->Get(node());
963 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
964 const Node *node = config->nodes()->Get(node_index);
965 NodeData *node_data = &nodes_data_[node_index];
966 node_data->channels.resize(config->channels()->size());
967 // We should save the channel if it is delivered to the node represented
968 // by the NodeData, but not sent by that node. That combo means it is
969 // forwarded.
970 size_t channel_index = 0;
971 node_data->any_delivered = false;
972 for (const Channel *channel : *config->channels()) {
973 node_data->channels[channel_index].delivered =
974 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -0800975 configuration::ChannelIsSendableOnNode(channel, my_node) &&
976 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -0800977 node_data->any_delivered = node_data->any_delivered ||
978 node_data->channels[channel_index].delivered;
979 ++channel_index;
980 }
981 }
982
983 for (const Channel *channel : *config->channels()) {
984 source_node_.emplace_back(configuration::GetNodeIndex(
985 config, channel->source_node()->string_view()));
986 }
987 }
988}
989
990void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -0800991 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -0800992 CHECK_NE(timestamp_mapper->node(), node());
993 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
994
995 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
996 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
997 // we could needlessly save data.
998 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -0800999 VLOG(1) << "Registering on node " << node() << " for peer node "
1000 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08001001 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
1002
1003 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07001004
1005 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001006 }
1007}
1008
Austin Schuh79b30942021-01-24 22:32:21 -08001009void TimestampMapper::QueueMessage(Message *m) {
1010 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08001011 .channel_index = m->channel_index,
1012 .queue_index = m->queue_index,
1013 .monotonic_event_time = m->timestamp,
1014 .realtime_event_time = aos::realtime_clock::time_point(
1015 std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
Austin Schuh58646e22021-08-23 23:51:46 -07001016 .remote_queue_index = BootQueueIndex::Invalid(),
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001017 .monotonic_remote_time = BootTimestamp::min_time(),
Austin Schuhd2f96102020-12-01 20:27:29 -08001018 .realtime_remote_time = realtime_clock::min_time,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001019 .monotonic_timestamp_time = BootTimestamp::min_time(),
Austin Schuh79b30942021-01-24 22:32:21 -08001020 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08001021}
1022
1023TimestampedMessage *TimestampMapper::Front() {
1024 // No need to fetch anything new. A previous message still exists.
1025 switch (first_message_) {
1026 case FirstMessage::kNeedsUpdate:
1027 break;
1028 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08001029 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001030 case FirstMessage::kNullptr:
1031 return nullptr;
1032 }
1033
Austin Schuh79b30942021-01-24 22:32:21 -08001034 if (matched_messages_.empty()) {
1035 if (!QueueMatched()) {
1036 first_message_ = FirstMessage::kNullptr;
1037 return nullptr;
1038 }
1039 }
1040 first_message_ = FirstMessage::kInMessage;
1041 return &matched_messages_.front();
1042}
1043
1044bool TimestampMapper::QueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08001045 if (nodes_data_.empty()) {
1046 // Simple path. We are single node, so there are no timestamps to match!
1047 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001048 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001049 if (!m) {
Austin Schuh79b30942021-01-24 22:32:21 -08001050 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001051 }
Austin Schuh79b30942021-01-24 22:32:21 -08001052 // Enqueue this message into matched_messages_ so we have a place to
1053 // associate remote timestamps, and return it.
1054 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08001055
Austin Schuh79b30942021-01-24 22:32:21 -08001056 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1057 last_message_time_ = matched_messages_.back().monotonic_event_time;
1058
1059 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001060 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08001061 timestamp_callback_(&matched_messages_.back());
1062 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001063 }
1064
1065 // We need to only add messages to the list so they get processed for messages
1066 // which are delivered. Reuse the flow below which uses messages_ by just
1067 // adding the new message to messages_ and continuing.
1068 if (messages_.empty()) {
1069 if (!Queue()) {
1070 // Found nothing to add, we are out of data!
Austin Schuh79b30942021-01-24 22:32:21 -08001071 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001072 }
1073
1074 // Now that it has been added (and cannibalized), forget about it upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001075 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001076 }
1077
1078 Message *m = &(messages_.front());
1079
1080 if (source_node_[m->channel_index] == node()) {
1081 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08001082 QueueMessage(m);
1083 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1084 last_message_time_ = matched_messages_.back().monotonic_event_time;
1085 messages_.pop_front();
1086 timestamp_callback_(&matched_messages_.back());
1087 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001088 } else {
1089 // Got a timestamp, find the matching remote data, match it, and return it.
1090 Message data = MatchingMessageFor(*m);
1091
1092 // Return the data from the remote. The local message only has timestamp
1093 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08001094 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08001095 .channel_index = m->channel_index,
1096 .queue_index = m->queue_index,
1097 .monotonic_event_time = m->timestamp,
1098 .realtime_event_time = aos::realtime_clock::time_point(
1099 std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
Austin Schuh58646e22021-08-23 23:51:46 -07001100 .remote_queue_index =
1101 BootQueueIndex{.boot = m->monotonic_remote_boot,
1102 .index = m->data.message().remote_queue_index()},
Austin Schuhd2f96102020-12-01 20:27:29 -08001103 .monotonic_remote_time =
Austin Schuh48507722021-07-17 17:29:24 -07001104 {m->monotonic_remote_boot,
1105 monotonic_clock::time_point(std::chrono::nanoseconds(
1106 m->data.message().monotonic_remote_time()))},
Austin Schuhd2f96102020-12-01 20:27:29 -08001107 .realtime_remote_time = realtime_clock::time_point(
1108 std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
Austin Schuh8bf1e632021-01-02 22:41:04 -08001109 .monotonic_timestamp_time =
Austin Schuh48507722021-07-17 17:29:24 -07001110 {m->monotonic_timestamp_boot,
1111 monotonic_clock::time_point(std::chrono::nanoseconds(
1112 m->data.message().monotonic_timestamp_time()))},
Austin Schuh79b30942021-01-24 22:32:21 -08001113 .data = std::move(data.data)});
1114 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1115 last_message_time_ = matched_messages_.back().monotonic_event_time;
1116 // Since messages_ holds the data, drop it.
1117 messages_.pop_front();
1118 timestamp_callback_(&matched_messages_.back());
1119 return true;
1120 }
1121}
1122
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001123void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08001124 while (last_message_time_ <= queue_time) {
1125 if (!QueueMatched()) {
1126 return;
1127 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001128 }
1129}
1130
Austin Schuhe639ea12021-01-25 13:00:22 -08001131void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001132 // Note: queueing for time doesn't really work well across boots. So we just
1133 // assume that if you are using this, you only care about the current boot.
1134 //
1135 // TODO(austin): Is that the right concept?
1136 //
Austin Schuhe639ea12021-01-25 13:00:22 -08001137 // Make sure we have something queued first. This makes the end time
1138 // calculation simpler, and is typically what folks want regardless.
1139 if (matched_messages_.empty()) {
1140 if (!QueueMatched()) {
1141 return;
1142 }
1143 }
1144
1145 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001146 std::max(monotonic_start_time(
1147 matched_messages_.front().monotonic_event_time.boot),
1148 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08001149 time_estimation_buffer;
1150
1151 // Place sorted messages on the list until we have
1152 // --time_estimation_buffer_seconds seconds queued up (but queue at least
1153 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001154 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08001155 if (!QueueMatched()) {
1156 return;
1157 }
1158 }
1159}
1160
Austin Schuhd2f96102020-12-01 20:27:29 -08001161void TimestampMapper::PopFront() {
1162 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
1163 first_message_ = FirstMessage::kNeedsUpdate;
1164
Austin Schuh79b30942021-01-24 22:32:21 -08001165 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001166}
1167
1168Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001169 // Figure out what queue index we are looking for.
1170 CHECK(message.data.message().has_remote_queue_index());
Austin Schuh58646e22021-08-23 23:51:46 -07001171 const BootQueueIndex remote_queue_index =
1172 BootQueueIndex{.boot = message.monotonic_remote_boot,
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001173 .index = message.data.message().remote_queue_index()};
Austin Schuhd2f96102020-12-01 20:27:29 -08001174
1175 CHECK(message.data.message().has_monotonic_remote_time());
1176 CHECK(message.data.message().has_realtime_remote_time());
1177
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001178 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07001179 .boot = message.monotonic_remote_boot,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001180 .time = monotonic_clock::time_point(std::chrono::nanoseconds(
1181 message.data.message().monotonic_remote_time()))};
Austin Schuhd2f96102020-12-01 20:27:29 -08001182 const realtime_clock::time_point realtime_remote_time(
1183 std::chrono::nanoseconds(message.data.message().realtime_remote_time()));
1184
Austin Schuhfecf1d82020-12-19 16:57:28 -08001185 TimestampMapper *peer = nodes_data_[source_node_[message.channel_index]].peer;
1186
1187 // We only register the peers which we have data for. So, if we are being
1188 // asked to pull a timestamp from a peer which doesn't exist, return an empty
1189 // message.
1190 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001191 // TODO(austin): Make sure the tests hit all these paths with a boot count
1192 // of 1...
Austin Schuhfecf1d82020-12-19 16:57:28 -08001193 return Message{
1194 .channel_index = message.channel_index,
1195 .queue_index = remote_queue_index,
1196 .timestamp = monotonic_remote_time,
Austin Schuh48507722021-07-17 17:29:24 -07001197 .monotonic_remote_boot = 0xffffff,
1198 .monotonic_timestamp_boot = 0xffffff,
Austin Schuhfecf1d82020-12-19 16:57:28 -08001199 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
1200 }
1201
1202 // The queue which will have the matching data, if available.
1203 std::deque<Message> *data_queue =
1204 &peer->nodes_data_[node()].channels[message.channel_index].messages;
1205
Austin Schuh79b30942021-01-24 22:32:21 -08001206 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08001207
1208 if (data_queue->empty()) {
1209 return Message{
1210 .channel_index = message.channel_index,
1211 .queue_index = remote_queue_index,
1212 .timestamp = monotonic_remote_time,
Austin Schuh48507722021-07-17 17:29:24 -07001213 .monotonic_remote_boot = 0xffffff,
1214 .monotonic_timestamp_boot = 0xffffff,
Austin Schuhd2f96102020-12-01 20:27:29 -08001215 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
1216 }
1217
Austin Schuhd2f96102020-12-01 20:27:29 -08001218 if (remote_queue_index < data_queue->front().queue_index ||
1219 remote_queue_index > data_queue->back().queue_index) {
1220 return Message{
1221 .channel_index = message.channel_index,
1222 .queue_index = remote_queue_index,
1223 .timestamp = monotonic_remote_time,
Austin Schuh48507722021-07-17 17:29:24 -07001224 .monotonic_remote_boot = 0xffffff,
1225 .monotonic_timestamp_boot = 0xffffff,
Austin Schuhd2f96102020-12-01 20:27:29 -08001226 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
1227 }
1228
Austin Schuh993ccb52020-12-12 15:59:32 -08001229 // The algorithm below is constant time with some assumptions. We need there
1230 // to be no missing messages in the data stream. This also assumes a queue
1231 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07001232 if (data_queue->back().queue_index.boot ==
1233 data_queue->front().queue_index.boot &&
1234 (data_queue->back().queue_index.index -
1235 data_queue->front().queue_index.index + 1u ==
1236 data_queue->size())) {
1237 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08001238 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07001239 //
1240 // TODO(austin): Move if not reliable.
1241 Message result = (*data_queue)[remote_queue_index.index -
1242 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08001243
1244 CHECK_EQ(result.timestamp, monotonic_remote_time)
1245 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1246 CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
1247 result.data.message().realtime_sent_time())),
1248 realtime_remote_time)
1249 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1250 // Now drop the data off the front. We have deduplicated timestamps, so we
1251 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07001252 data_queue->erase(
1253 data_queue->begin(),
1254 data_queue->begin() +
1255 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08001256 return result;
1257 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07001258 // TODO(austin): Binary search.
1259 auto it = std::find_if(
1260 data_queue->begin(), data_queue->end(),
1261 [remote_queue_index,
1262 remote_boot = monotonic_remote_time.boot](const Message &m) {
1263 return m.queue_index == remote_queue_index &&
1264 m.timestamp.boot == remote_boot;
1265 });
Austin Schuh993ccb52020-12-12 15:59:32 -08001266 if (it == data_queue->end()) {
1267 return Message{
1268 .channel_index = message.channel_index,
1269 .queue_index = remote_queue_index,
1270 .timestamp = monotonic_remote_time,
Austin Schuh48507722021-07-17 17:29:24 -07001271 .monotonic_remote_boot = 0xffffff,
1272 .monotonic_timestamp_boot = 0xffffff,
Austin Schuh993ccb52020-12-12 15:59:32 -08001273 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
1274 }
1275
1276 Message result = std::move(*it);
1277
1278 CHECK_EQ(result.timestamp, monotonic_remote_time)
1279 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1280 CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
1281 result.data.message().realtime_sent_time())),
1282 realtime_remote_time)
1283 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1284
Austin Schuh58646e22021-08-23 23:51:46 -07001285 // TODO(austin): We still go in order, so we can erase from the beginning to
1286 // our iterator minus 1. That'll keep 1 in the queue.
Austin Schuh993ccb52020-12-12 15:59:32 -08001287 data_queue->erase(it);
1288
1289 return result;
1290 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001291}
1292
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001293void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001294 if (queued_until_ > t) {
1295 return;
1296 }
1297 while (true) {
1298 if (!messages_.empty() && messages_.back().timestamp > t) {
1299 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
1300 return;
1301 }
1302
1303 if (!Queue()) {
1304 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001305 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08001306 return;
1307 }
1308
1309 // Now that it has been added (and cannibalized), forget about it upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001310 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001311 }
1312}
1313
1314bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001315 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001316 if (m == nullptr) {
1317 return false;
1318 }
1319 for (NodeData &node_data : nodes_data_) {
1320 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07001321 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08001322 if (node_data.channels[m->channel_index].delivered) {
1323 // TODO(austin): This copies the data... Probably not worth stressing
1324 // about yet.
1325 // TODO(austin): Bound how big this can get. We tend not to send massive
1326 // data, so we can probably ignore this for a bit.
1327 node_data.channels[m->channel_index].messages.emplace_back(*m);
1328 }
1329 }
1330
1331 messages_.emplace_back(std::move(*m));
1332 return true;
1333}
1334
1335std::string TimestampMapper::DebugString() const {
1336 std::stringstream ss;
1337 ss << "node " << node() << " [\n";
1338 for (const Message &message : messages_) {
1339 ss << " " << message << "\n";
1340 }
1341 ss << "] queued_until " << queued_until_;
1342 for (const NodeData &ns : nodes_data_) {
1343 if (ns.peer == nullptr) continue;
1344 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
1345 size_t channel_index = 0;
1346 for (const NodeData::ChannelData &channel_data :
1347 ns.peer->nodes_data_[node()].channels) {
1348 if (channel_data.messages.empty()) {
1349 continue;
1350 }
Austin Schuhb000de62020-12-03 22:00:40 -08001351
Austin Schuhd2f96102020-12-01 20:27:29 -08001352 ss << " channel " << channel_index << " [\n";
1353 for (const Message &m : channel_data.messages) {
1354 ss << " " << m << "\n";
1355 }
1356 ss << " ]\n";
1357 ++channel_index;
1358 }
1359 ss << "] queued_until " << ns.peer->queued_until_;
1360 }
1361 return ss.str();
1362}
1363
Austin Schuhee711052020-08-24 16:06:09 -07001364std::string MaybeNodeName(const Node *node) {
1365 if (node != nullptr) {
1366 return node->name()->str() + " ";
1367 }
1368 return "";
1369}
1370
Brian Silvermanf51499a2020-09-21 12:49:08 -07001371} // namespace aos::logger