blob: 05ee4e0946f7e67123350965fb6b77c446bdc944 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
4#include <limits.h>
5#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
8
9#include <vector>
10
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
Austin Schuha36c8902019-12-30 18:07:15 -080013#include "aos/events/logging/logger_generated.h"
Austin Schuhfa895892020-01-07 20:07:41 -080014#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080015#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "gflags/gflags.h"
18#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080019
20DEFINE_int32(flush_size, 1000000,
21 "Number of outstanding bytes to allow before flushing to disk.");
22
23namespace aos {
24namespace logger {
25
Austin Schuh05b70472020-01-01 17:11:17 -080026namespace chrono = std::chrono;
27
Austin Schuha36c8902019-12-30 18:07:15 -080028DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
Austin Schuh6f3babe2020-01-26 20:34:50 -080029 : filename_(filename) {
30 util::MkdirP(filename, 0777);
31 fd_ = open(std::string(filename).c_str(),
32 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
33 VLOG(1) << "Opened " << filename << " for writing";
34 PCHECK(fd_ != -1) << ": Failed to open " << filename << " for writing";
Austin Schuha36c8902019-12-30 18:07:15 -080035}
36
37DetachedBufferWriter::~DetachedBufferWriter() {
38 Flush();
39 PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
Austin Schuh2f8fd752020-09-01 22:38:28 -070040 VLOG(1) << "Closed " << filename_;
41}
42
43DetachedBufferWriter::DetachedBufferWriter(
44 DetachedBufferWriter &&other) {
45 *this = std::move(other);
46}
47
48DetachedBufferWriter &DetachedBufferWriter::operator=(
49 DetachedBufferWriter &&other) {
50 Flush();
51 std::swap(filename_, other.filename_);
52 std::swap(fd_, other.fd_);
53 std::swap(queued_size_, other.queued_size_);
54 std::swap(written_size_, other.written_size_);
55 std::swap(queue_, other.queue_);
56 std::swap(iovec_, other.iovec_);
57 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -080058}
59
60void DetachedBufferWriter::QueueSizedFlatbuffer(
61 flatbuffers::FlatBufferBuilder *fbb) {
62 QueueSizedFlatbuffer(fbb->Release());
63}
64
Austin Schuhde031b72020-01-10 19:34:41 -080065void DetachedBufferWriter::WriteSizedFlatbuffer(
66 absl::Span<const uint8_t> span) {
67 // Cheat aggressively... Write out the queued up data, and then write this
68 // data once without buffering. It is hard to make a DetachedBuffer out of
69 // this data, and we don't want to worry about lifetimes.
70 Flush();
71 iovec_.clear();
72 iovec_.reserve(1);
73
74 struct iovec n;
75 n.iov_base = const_cast<uint8_t *>(span.data());
76 n.iov_len = span.size();
77 iovec_.emplace_back(n);
78
79 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
80
81 PCHECK(written == static_cast<ssize_t>(n.iov_len))
82 << ": Wrote " << written << " expected " << n.iov_len;
Brian Silverman98360e22020-04-28 16:51:20 -070083 written_size_ += written;
Austin Schuhde031b72020-01-10 19:34:41 -080084}
85
Austin Schuha36c8902019-12-30 18:07:15 -080086void DetachedBufferWriter::QueueSizedFlatbuffer(
87 flatbuffers::DetachedBuffer &&buffer) {
88 queued_size_ += buffer.size();
89 queue_.emplace_back(std::move(buffer));
90
91 // Flush if we are at the max number of iovs per writev, or have written
92 // enough data. Otherwise writev will fail with an invalid argument.
93 if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
94 queue_.size() == IOV_MAX) {
95 Flush();
96 }
97}
98
99void DetachedBufferWriter::Flush() {
100 if (queue_.size() == 0u) {
101 return;
102 }
103 iovec_.clear();
104 iovec_.reserve(queue_.size());
105 size_t counted_size = 0;
106 for (size_t i = 0; i < queue_.size(); ++i) {
107 struct iovec n;
108 n.iov_base = queue_[i].data();
109 n.iov_len = queue_[i].size();
110 counted_size += n.iov_len;
111 iovec_.emplace_back(std::move(n));
112 }
113 CHECK_EQ(counted_size, queued_size_);
114 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
115
116 PCHECK(written == static_cast<ssize_t>(queued_size_))
117 << ": Wrote " << written << " expected " << queued_size_;
Brian Silverman98360e22020-04-28 16:51:20 -0700118 written_size_ += written;
Austin Schuha36c8902019-12-30 18:07:15 -0800119
120 queued_size_ = 0;
121 queue_.clear();
122 // TODO(austin): Handle partial writes in some way other than crashing...
123}
124
125flatbuffers::Offset<MessageHeader> PackMessage(
126 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
127 int channel_index, LogType log_type) {
128 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
129
130 switch (log_type) {
131 case LogType::kLogMessage:
132 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800133 case LogType::kLogRemoteMessage:
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700134 data_offset = fbb->CreateVector(
135 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800136 break;
137
138 case LogType::kLogDeliveryTimeOnly:
139 break;
140 }
141
142 MessageHeader::Builder message_header_builder(*fbb);
143 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800144
145 switch (log_type) {
146 case LogType::kLogRemoteMessage:
147 message_header_builder.add_queue_index(context.remote_queue_index);
148 message_header_builder.add_monotonic_sent_time(
149 context.monotonic_remote_time.time_since_epoch().count());
150 message_header_builder.add_realtime_sent_time(
151 context.realtime_remote_time.time_since_epoch().count());
152 break;
153
154 case LogType::kLogMessage:
155 case LogType::kLogMessageAndDeliveryTime:
156 case LogType::kLogDeliveryTimeOnly:
157 message_header_builder.add_queue_index(context.queue_index);
158 message_header_builder.add_monotonic_sent_time(
159 context.monotonic_event_time.time_since_epoch().count());
160 message_header_builder.add_realtime_sent_time(
161 context.realtime_event_time.time_since_epoch().count());
162 break;
163 }
Austin Schuha36c8902019-12-30 18:07:15 -0800164
165 switch (log_type) {
166 case LogType::kLogMessage:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800167 case LogType::kLogRemoteMessage:
Austin Schuha36c8902019-12-30 18:07:15 -0800168 message_header_builder.add_data(data_offset);
169 break;
170
171 case LogType::kLogMessageAndDeliveryTime:
172 message_header_builder.add_data(data_offset);
173 [[fallthrough]];
174
175 case LogType::kLogDeliveryTimeOnly:
176 message_header_builder.add_monotonic_remote_time(
177 context.monotonic_remote_time.time_since_epoch().count());
178 message_header_builder.add_realtime_remote_time(
179 context.realtime_remote_time.time_since_epoch().count());
180 message_header_builder.add_remote_queue_index(context.remote_queue_index);
181 break;
182 }
183
184 return message_header_builder.Finish();
185}
186
Austin Schuh05b70472020-01-01 17:11:17 -0800187SpanReader::SpanReader(std::string_view filename)
Austin Schuh6f3babe2020-01-26 20:34:50 -0800188 : filename_(filename),
189 fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
Austin Schuh05b70472020-01-01 17:11:17 -0800190 PCHECK(fd_ != -1) << ": Failed to open " << filename;
191}
192
193absl::Span<const uint8_t> SpanReader::ReadMessage() {
194 // Make sure we have enough for the size.
195 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
196 if (!ReadBlock()) {
197 return absl::Span<const uint8_t>();
198 }
199 }
200
201 // Now make sure we have enough for the message.
202 const size_t data_size =
203 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
204 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -0800205 if (data_size == sizeof(flatbuffers::uoffset_t)) {
206 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
207 LOG(ERROR) << " Rest of log file is "
208 << absl::BytesToHexString(std::string_view(
209 reinterpret_cast<const char *>(data_.data() +
210 consumed_data_),
211 data_.size() - consumed_data_));
212 return absl::Span<const uint8_t>();
213 }
Austin Schuh05b70472020-01-01 17:11:17 -0800214 while (data_.size() < consumed_data_ + data_size) {
215 if (!ReadBlock()) {
216 return absl::Span<const uint8_t>();
217 }
218 }
219
220 // And return it, consuming the data.
221 const uint8_t *data_ptr = data_.data() + consumed_data_;
222
223 consumed_data_ += data_size;
224
225 return absl::Span<const uint8_t>(data_ptr, data_size);
226}
227
228bool SpanReader::MessageAvailable() {
229 // Are we big enough to read the size?
230 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
231 return false;
232 }
233
234 // Then, are we big enough to read the full message?
235 const size_t data_size =
236 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
237 sizeof(flatbuffers::uoffset_t);
238 if (data_.size() < consumed_data_ + data_size) {
239 return false;
240 }
241
242 return true;
243}
244
245bool SpanReader::ReadBlock() {
246 if (end_of_file_) {
247 return false;
248 }
249
250 // Appends 256k. This is enough that the read call is efficient. We don't
251 // want to spend too much time reading small chunks because the syscalls for
252 // that will be expensive.
253 constexpr size_t kReadSize = 256 * 1024;
254
255 // Strip off any unused data at the front.
256 if (consumed_data_ != 0) {
257 data_.erase(data_.begin(), data_.begin() + consumed_data_);
258 consumed_data_ = 0;
259 }
260
261 const size_t starting_size = data_.size();
262
263 // This should automatically grow the backing store. It won't shrink if we
264 // get a small chunk later. This reduces allocations when we want to append
265 // more data.
266 data_.resize(data_.size() + kReadSize);
267
268 ssize_t count = read(fd_, &data_[starting_size], kReadSize);
269 data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
270 if (count == 0) {
271 end_of_file_ = true;
272 return false;
273 }
274 PCHECK(count > 0);
275
276 return true;
277}
278
Austin Schuh6f3babe2020-01-26 20:34:50 -0800279FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename) {
280 SpanReader span_reader(filename);
281 // Make sure we have enough to read the size.
282 absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
283
284 // Make sure something was read.
Austin Schuh97789fc2020-08-01 14:42:45 -0700285 CHECK(config_data != absl::Span<const uint8_t>())
286 << ": Failed to read header from: " << filename;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800287
288 // And copy the config so we have it forever.
289 std::vector<uint8_t> data(
290 config_data.begin() + sizeof(flatbuffers::uoffset_t), config_data.end());
291 return FlatbufferVector<LogFileHeader>(std::move(data));
292}
293
Austin Schuh05b70472020-01-01 17:11:17 -0800294MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -0700295 : span_reader_(filename),
296 raw_log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
Austin Schuh05b70472020-01-01 17:11:17 -0800297 // Make sure we have enough to read the size.
Austin Schuh97789fc2020-08-01 14:42:45 -0700298 absl::Span<const uint8_t> header_data = span_reader_.ReadMessage();
Austin Schuh05b70472020-01-01 17:11:17 -0800299
300 // Make sure something was read.
Austin Schuh97789fc2020-08-01 14:42:45 -0700301 CHECK(header_data != absl::Span<const uint8_t>())
302 << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -0800303
Austin Schuh97789fc2020-08-01 14:42:45 -0700304 // And copy the header data so we have it forever.
305 std::vector<uint8_t> header_data_copy(
306 header_data.begin() + sizeof(flatbuffers::uoffset_t), header_data.end());
307 raw_log_file_header_ =
308 FlatbufferVector<LogFileHeader>(std::move(header_data_copy));
Austin Schuh05b70472020-01-01 17:11:17 -0800309
Austin Schuhcde938c2020-02-02 17:30:07 -0800310 max_out_of_order_duration_ =
Austin Schuh2f8fd752020-09-01 22:38:28 -0700311 chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -0800312
313 VLOG(1) << "Opened " << filename << " as node "
314 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -0800315}
316
317std::optional<FlatbufferVector<MessageHeader>> MessageReader::ReadMessage() {
318 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
319 if (msg_data == absl::Span<const uint8_t>()) {
320 return std::nullopt;
321 }
322
323 FlatbufferVector<MessageHeader> result{std::vector<uint8_t>(
324 msg_data.begin() + sizeof(flatbuffers::uoffset_t), msg_data.end())};
325
326 const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
327 chrono::nanoseconds(result.message().monotonic_sent_time()));
328
329 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuh8bd96322020-02-13 21:18:22 -0800330 VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(result);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800331 return std::move(result);
Austin Schuh05b70472020-01-01 17:11:17 -0800332}
333
Austin Schuh6f3babe2020-01-26 20:34:50 -0800334SplitMessageReader::SplitMessageReader(
Austin Schuhfa895892020-01-07 20:07:41 -0800335 const std::vector<std::string> &filenames)
336 : filenames_(filenames),
Austin Schuh97789fc2020-08-01 14:42:45 -0700337 log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
Austin Schuhfa895892020-01-07 20:07:41 -0800338 CHECK(NextLogFile()) << ": filenames is empty. Need files to read.";
339
Austin Schuh6f3babe2020-01-26 20:34:50 -0800340 // Grab any log file header. They should all match (and we will check as we
341 // open more of them).
Austin Schuh97789fc2020-08-01 14:42:45 -0700342 log_file_header_ = message_reader_->raw_log_file_header();
Austin Schuhfa895892020-01-07 20:07:41 -0800343
Austin Schuh2f8fd752020-09-01 22:38:28 -0700344 for (size_t i = 1; i < filenames_.size(); ++i) {
345 MessageReader message_reader(filenames_[i]);
346
347 const monotonic_clock::time_point new_monotonic_start_time(
348 chrono::nanoseconds(
349 message_reader.log_file_header()->monotonic_start_time()));
350 const realtime_clock::time_point new_realtime_start_time(
351 chrono::nanoseconds(
352 message_reader.log_file_header()->realtime_start_time()));
353
354 // There are 2 types of part files. Part files from before time estimation
355 // has started, and part files after. We don't declare a log file "started"
356 // until time estimation is up. And once a log file starts, it should never
357 // stop again, and should remain constant.
358 // To compare both types of headers, we mutate our saved copy of the header
359 // to match the next chunk by updating time if we detect a stopped ->
360 // started transition.
361 if (monotonic_start_time() == monotonic_clock::min_time) {
362 CHECK_EQ(realtime_start_time(), realtime_clock::min_time);
363 // We should only be missing the monotonic start time when logging data
364 // for remote nodes. We don't have a good way to deteremine the remote
365 // realtime offset, so it shouldn't be filled out.
366 // TODO(austin): If we have a good way, feel free to fill it out. It
367 // probably won't be better than we could do in post though with the same
368 // data.
369 CHECK(!log_file_header_.mutable_message()->has_realtime_start_time());
370 if (new_monotonic_start_time != monotonic_clock::min_time) {
371 // If we finally found our start time, update the header. Do this once
372 // because it should never change again.
373 log_file_header_.mutable_message()->mutate_monotonic_start_time(
374 new_monotonic_start_time.time_since_epoch().count());
375 log_file_header_.mutable_message()->mutate_realtime_start_time(
376 new_realtime_start_time.time_since_epoch().count());
377 }
378 }
379
380 // Now compare that the headers match.
381 CHECK(CompareFlatBuffer(message_reader.raw_log_file_header(),
382 log_file_header_))
383 << ": Header is different between log file chunks " << filenames_[0]
384 << " and " << filenames_[i] << ", this is not supported.";
385 }
386
Austin Schuh6f3babe2020-01-26 20:34:50 -0800387 // Setup per channel state.
Austin Schuh05b70472020-01-01 17:11:17 -0800388 channels_.resize(configuration()->channels()->size());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800389 for (ChannelData &channel_data : channels_) {
390 channel_data.data.split_reader = this;
391 // Build up the timestamp list.
392 if (configuration::MultiNode(configuration())) {
393 channel_data.timestamps.resize(configuration()->nodes()->size());
394 for (MessageHeaderQueue &queue : channel_data.timestamps) {
395 queue.timestamps = true;
396 queue.split_reader = this;
397 }
398 }
399 }
Austin Schuh05b70472020-01-01 17:11:17 -0800400
Austin Schuh6f3babe2020-01-26 20:34:50 -0800401 // Build up channels_to_write_ as an optimization to make it fast to figure
402 // out which datastructure to place any new data from a channel on.
403 for (const Channel *channel : *configuration()->channels()) {
404 // This is the main case. We will only see data on this node.
405 if (configuration::ChannelIsSendableOnNode(channel, node())) {
406 channels_to_write_.emplace_back(
407 &channels_[channels_to_write_.size()].data);
408 } else
409 // If we can't send, but can receive, we should be able to see
410 // timestamps here.
411 if (configuration::ChannelIsReadableOnNode(channel, node())) {
412 channels_to_write_.emplace_back(
413 &(channels_[channels_to_write_.size()]
414 .timestamps[configuration::GetNodeIndex(configuration(),
415 node())]));
416 } else {
417 channels_to_write_.emplace_back(nullptr);
418 }
419 }
Austin Schuh05b70472020-01-01 17:11:17 -0800420}
421
Austin Schuh6f3babe2020-01-26 20:34:50 -0800422bool SplitMessageReader::NextLogFile() {
Austin Schuhfa895892020-01-07 20:07:41 -0800423 if (next_filename_index_ == filenames_.size()) {
424 return false;
425 }
426 message_reader_ =
427 std::make_unique<MessageReader>(filenames_[next_filename_index_]);
428
429 // We can't support the config diverging between two log file headers. See if
430 // they are the same.
431 if (next_filename_index_ != 0) {
Austin Schuh97789fc2020-08-01 14:42:45 -0700432 CHECK(CompareFlatBuffer(message_reader_->raw_log_file_header(),
433 log_file_header_))
Austin Schuhfa895892020-01-07 20:07:41 -0800434 << ": Header is different between log file chunks "
435 << filenames_[next_filename_index_] << " and "
436 << filenames_[next_filename_index_ - 1] << ", this is not supported.";
437 }
438
439 ++next_filename_index_;
440 return true;
441}
442
Austin Schuh6f3babe2020-01-26 20:34:50 -0800443bool SplitMessageReader::QueueMessages(
Austin Schuhcde938c2020-02-02 17:30:07 -0800444 monotonic_clock::time_point last_dequeued_time) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800445 // TODO(austin): Once we are happy that everything works, read a 256kb chunk
446 // to reduce the need to re-heap down below.
Austin Schuhcde938c2020-02-02 17:30:07 -0800447
448 // Special case no more data. Otherwise we blow up on the CHECK statement
449 // confirming that we have enough data queued.
450 if (at_end_) {
451 return false;
452 }
453
454 // If this isn't the first time around, confirm that we had enough data queued
455 // to follow the contract.
456 if (time_to_queue_ != monotonic_clock::min_time) {
457 CHECK_LE(last_dequeued_time,
458 newest_timestamp() - max_out_of_order_duration())
459 << " node " << FlatbufferToJson(node()) << " on " << this;
460
461 // Bail if there is enough data already queued.
462 if (last_dequeued_time < time_to_queue_) {
Austin Schuhee711052020-08-24 16:06:09 -0700463 VLOG(1) << MaybeNodeName(target_node_) << "All up to date on " << this
464 << ", dequeued " << last_dequeued_time << " queue time "
465 << time_to_queue_;
Austin Schuhcde938c2020-02-02 17:30:07 -0800466 return true;
467 }
468 } else {
469 // Startup takes a special dance. We want to queue up until the start time,
470 // but we then want to find the next message to read. The conservative
471 // answer is to immediately trigger a second requeue to get things moving.
472 time_to_queue_ = monotonic_start_time();
473 QueueMessages(time_to_queue_);
474 }
475
476 // If we are asked to queue, queue for at least max_out_of_order_duration past
477 // the last known time in the log file (ie the newest timestep read). As long
478 // as we requeue exactly when time_to_queue_ is dequeued and go no further, we
479 // are safe. And since we pop in order, that works.
480 //
481 // Special case the start of the log file. There should be at most 1 message
482 // from each channel at the start of the log file. So always force the start
483 // of the log file to just be read.
484 time_to_queue_ = std::max(time_to_queue_, newest_timestamp());
Austin Schuhee711052020-08-24 16:06:09 -0700485 VLOG(1) << MaybeNodeName(target_node_) << "Queueing, going until "
486 << time_to_queue_ << " " << filename();
Austin Schuhcde938c2020-02-02 17:30:07 -0800487
488 bool was_emplaced = false;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800489 while (true) {
Austin Schuhcde938c2020-02-02 17:30:07 -0800490 // Stop if we have enough.
Brian Silverman98360e22020-04-28 16:51:20 -0700491 if (newest_timestamp() > time_to_queue_ + max_out_of_order_duration() &&
Austin Schuhcde938c2020-02-02 17:30:07 -0800492 was_emplaced) {
Austin Schuhee711052020-08-24 16:06:09 -0700493 VLOG(1) << MaybeNodeName(target_node_) << "Done queueing on " << this
494 << ", queued to " << newest_timestamp() << " with requeue time "
495 << time_to_queue_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800496 return true;
497 }
Austin Schuh05b70472020-01-01 17:11:17 -0800498
Austin Schuh6f3babe2020-01-26 20:34:50 -0800499 if (std::optional<FlatbufferVector<MessageHeader>> msg =
500 message_reader_->ReadMessage()) {
501 const MessageHeader &header = msg.value().message();
502
Austin Schuhcde938c2020-02-02 17:30:07 -0800503 const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
504 chrono::nanoseconds(header.monotonic_sent_time()));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800505
Austin Schuh0b5fd032020-03-28 17:36:49 -0700506 if (VLOG_IS_ON(2)) {
Austin Schuhee711052020-08-24 16:06:09 -0700507 LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this
508 << " " << filename() << " ttq: " << time_to_queue_ << " now "
509 << newest_timestamp() << " start time "
510 << monotonic_start_time() << " " << FlatbufferToJson(&header);
Austin Schuh0b5fd032020-03-28 17:36:49 -0700511 } else if (VLOG_IS_ON(1)) {
512 FlatbufferVector<MessageHeader> copy = msg.value();
513 copy.mutable_message()->clear_data();
Austin Schuhee711052020-08-24 16:06:09 -0700514 LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this << " "
515 << filename() << " ttq: " << time_to_queue_ << " now "
516 << newest_timestamp() << " start time "
517 << monotonic_start_time() << " " << FlatbufferToJson(copy);
Austin Schuh0b5fd032020-03-28 17:36:49 -0700518 }
Austin Schuhcde938c2020-02-02 17:30:07 -0800519
520 const int channel_index = header.channel_index();
521 was_emplaced = channels_to_write_[channel_index]->emplace_back(
522 std::move(msg.value()));
523 if (was_emplaced) {
524 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
525 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800526 } else {
527 if (!NextLogFile()) {
Austin Schuhee711052020-08-24 16:06:09 -0700528 VLOG(1) << MaybeNodeName(target_node_) << "No more files, last was "
529 << filenames_.back();
Austin Schuhcde938c2020-02-02 17:30:07 -0800530 at_end_ = true;
Austin Schuh8bd96322020-02-13 21:18:22 -0800531 for (MessageHeaderQueue *queue : channels_to_write_) {
532 if (queue == nullptr || queue->timestamp_merger == nullptr) {
533 continue;
534 }
535 queue->timestamp_merger->NoticeAtEnd();
536 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800537 return false;
538 }
539 }
Austin Schuh05b70472020-01-01 17:11:17 -0800540 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800541}
542
543void SplitMessageReader::SetTimestampMerger(TimestampMerger *timestamp_merger,
544 int channel_index,
545 const Node *target_node) {
546 const Node *reinterpreted_target_node =
547 configuration::GetNodeOrDie(configuration(), target_node);
Austin Schuhee711052020-08-24 16:06:09 -0700548 target_node_ = reinterpreted_target_node;
549
Austin Schuh6f3babe2020-01-26 20:34:50 -0800550 const Channel *const channel =
551 configuration()->channels()->Get(channel_index);
552
Austin Schuhcde938c2020-02-02 17:30:07 -0800553 VLOG(1) << " Configuring merger " << this << " for channel " << channel_index
554 << " "
555 << configuration::CleanedChannelToString(
556 configuration()->channels()->Get(channel_index));
557
Austin Schuh6f3babe2020-01-26 20:34:50 -0800558 MessageHeaderQueue *message_header_queue = nullptr;
559
560 // Figure out if this log file is from our point of view, or the other node's
561 // point of view.
562 if (node() == reinterpreted_target_node) {
Austin Schuhcde938c2020-02-02 17:30:07 -0800563 VLOG(1) << " Replaying as logged node " << filename();
564
565 if (configuration::ChannelIsSendableOnNode(channel, node())) {
566 VLOG(1) << " Data on node";
567 message_header_queue = &(channels_[channel_index].data);
568 } else if (configuration::ChannelIsReadableOnNode(channel, node())) {
569 VLOG(1) << " Timestamps on node";
570 message_header_queue =
571 &(channels_[channel_index].timestamps[configuration::GetNodeIndex(
572 configuration(), node())]);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800573 } else {
Austin Schuhcde938c2020-02-02 17:30:07 -0800574 VLOG(1) << " Dropping";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800575 }
576 } else {
Austin Schuhcde938c2020-02-02 17:30:07 -0800577 VLOG(1) << " Replaying as other node " << filename();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800578 // We are replaying from another node's point of view. The only interesting
Austin Schuhcde938c2020-02-02 17:30:07 -0800579 // data is data that is sent from our node and received on theirs.
580 if (configuration::ChannelIsReadableOnNode(channel,
581 reinterpreted_target_node) &&
582 configuration::ChannelIsSendableOnNode(channel, node())) {
583 VLOG(1) << " Readable on target node";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800584 // Data from another node.
585 message_header_queue = &(channels_[channel_index].data);
586 } else {
Austin Schuhcde938c2020-02-02 17:30:07 -0800587 VLOG(1) << " Dropping";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800588 // This is either not sendable on the other node, or is a timestamp and
589 // therefore not interesting.
590 }
591 }
592
593 // If we found one, write it down. This will be nullptr when there is nothing
594 // relevant on this channel on this node for the target node. In that case,
595 // we want to drop the message instead of queueing it.
596 if (message_header_queue != nullptr) {
597 message_header_queue->timestamp_merger = timestamp_merger;
598 }
599}
600
601std::tuple<monotonic_clock::time_point, uint32_t,
602 FlatbufferVector<MessageHeader>>
603SplitMessageReader::PopOldest(int channel_index) {
604 CHECK_GT(channels_[channel_index].data.size(), 0u);
Austin Schuhcde938c2020-02-02 17:30:07 -0800605 const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
606 timestamp = channels_[channel_index].data.front_timestamp();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800607 FlatbufferVector<MessageHeader> front =
608 std::move(channels_[channel_index].data.front());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700609 channels_[channel_index].data.PopFront();
Austin Schuhcde938c2020-02-02 17:30:07 -0800610
Austin Schuh2f8fd752020-09-01 22:38:28 -0700611 VLOG(1) << MaybeNodeName(target_node_) << "Popped Data " << this << " "
612 << std::get<0>(timestamp) << " for "
613 << configuration::StrippedChannelToString(
614 configuration()->channels()->Get(channel_index))
615 << " (" << channel_index << ")";
Austin Schuhcde938c2020-02-02 17:30:07 -0800616
617 QueueMessages(std::get<0>(timestamp));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800618
619 return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
620 std::move(front));
621}
622
623std::tuple<monotonic_clock::time_point, uint32_t,
624 FlatbufferVector<MessageHeader>>
Austin Schuh2f8fd752020-09-01 22:38:28 -0700625SplitMessageReader::PopOldestTimestamp(int channel, int node_index) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800626 CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
Austin Schuhcde938c2020-02-02 17:30:07 -0800627 const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
628 timestamp = channels_[channel].timestamps[node_index].front_timestamp();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800629 FlatbufferVector<MessageHeader> front =
630 std::move(channels_[channel].timestamps[node_index].front());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700631 channels_[channel].timestamps[node_index].PopFront();
Austin Schuhcde938c2020-02-02 17:30:07 -0800632
Austin Schuh2f8fd752020-09-01 22:38:28 -0700633 VLOG(1) << MaybeNodeName(target_node_) << "Popped timestamp " << this << " "
Austin Schuhee711052020-08-24 16:06:09 -0700634 << std::get<0>(timestamp) << " for "
635 << configuration::StrippedChannelToString(
636 configuration()->channels()->Get(channel))
Austin Schuh2f8fd752020-09-01 22:38:28 -0700637 << " on "
638 << configuration()->nodes()->Get(node_index)->name()->string_view()
639 << " (" << node_index << ")";
Austin Schuhcde938c2020-02-02 17:30:07 -0800640
641 QueueMessages(std::get<0>(timestamp));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800642
643 return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
644 std::move(front));
645}
646
Austin Schuhcde938c2020-02-02 17:30:07 -0800647bool SplitMessageReader::MessageHeaderQueue::emplace_back(
Austin Schuh6f3babe2020-01-26 20:34:50 -0800648 FlatbufferVector<MessageHeader> &&msg) {
649 CHECK(split_reader != nullptr);
650
651 // If there is no timestamp merger for this queue, nobody is listening. Drop
652 // the message. This happens when a log file from another node is replayed,
653 // and the timestamp mergers down stream just don't care.
654 if (timestamp_merger == nullptr) {
Austin Schuhcde938c2020-02-02 17:30:07 -0800655 return false;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800656 }
657
658 CHECK(timestamps != msg.message().has_data())
659 << ": Got timestamps and data mixed up on a node. "
660 << FlatbufferToJson(msg);
661
662 data_.emplace_back(std::move(msg));
663
664 if (data_.size() == 1u) {
665 // Yup, new data. Notify.
666 if (timestamps) {
667 timestamp_merger->UpdateTimestamp(split_reader, front_timestamp());
668 } else {
669 timestamp_merger->Update(split_reader, front_timestamp());
670 }
671 }
Austin Schuhcde938c2020-02-02 17:30:07 -0800672
673 return true;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800674}
675
Austin Schuh2f8fd752020-09-01 22:38:28 -0700676void SplitMessageReader::MessageHeaderQueue::PopFront() {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800677 data_.pop_front();
678 if (data_.size() != 0u) {
679 // Yup, new data.
680 if (timestamps) {
681 timestamp_merger->UpdateTimestamp(split_reader, front_timestamp());
682 } else {
683 timestamp_merger->Update(split_reader, front_timestamp());
684 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700685 } else {
686 // Poke anyways to update the heap.
687 if (timestamps) {
688 timestamp_merger->UpdateTimestamp(
689 nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
690 } else {
691 timestamp_merger->Update(
692 nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
693 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800694 }
Austin Schuh05b70472020-01-01 17:11:17 -0800695}
696
697namespace {
698
Austin Schuh6f3babe2020-01-26 20:34:50 -0800699bool SplitMessageReaderHeapCompare(
700 const std::tuple<monotonic_clock::time_point, uint32_t,
701 SplitMessageReader *>
702 first,
703 const std::tuple<monotonic_clock::time_point, uint32_t,
704 SplitMessageReader *>
705 second) {
706 if (std::get<0>(first) > std::get<0>(second)) {
707 return true;
708 } else if (std::get<0>(first) == std::get<0>(second)) {
709 if (std::get<1>(first) > std::get<1>(second)) {
710 return true;
711 } else if (std::get<1>(first) == std::get<1>(second)) {
712 return std::get<2>(first) > std::get<2>(second);
713 } else {
714 return false;
715 }
716 } else {
717 return false;
718 }
719}
720
Austin Schuh05b70472020-01-01 17:11:17 -0800721bool ChannelHeapCompare(
722 const std::pair<monotonic_clock::time_point, int> first,
723 const std::pair<monotonic_clock::time_point, int> second) {
724 if (first.first > second.first) {
725 return true;
726 } else if (first.first == second.first) {
727 return first.second > second.second;
728 } else {
729 return false;
730 }
731}
732
733} // namespace
734
Austin Schuh6f3babe2020-01-26 20:34:50 -0800735TimestampMerger::TimestampMerger(
736 const Configuration *configuration,
737 std::vector<SplitMessageReader *> split_message_readers, int channel_index,
738 const Node *target_node, ChannelMerger *channel_merger)
739 : configuration_(configuration),
740 split_message_readers_(std::move(split_message_readers)),
741 channel_index_(channel_index),
742 node_index_(configuration::MultiNode(configuration)
743 ? configuration::GetNodeIndex(configuration, target_node)
744 : -1),
745 channel_merger_(channel_merger) {
746 // Tell the readers we care so they know who to notify.
Austin Schuhcde938c2020-02-02 17:30:07 -0800747 VLOG(1) << "Configuring channel " << channel_index << " target node "
748 << FlatbufferToJson(target_node);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800749 for (SplitMessageReader *reader : split_message_readers_) {
750 reader->SetTimestampMerger(this, channel_index, target_node);
751 }
752
753 // And then determine if we need to track timestamps.
754 const Channel *channel = configuration->channels()->Get(channel_index);
755 if (!configuration::ChannelIsSendableOnNode(channel, target_node) &&
756 configuration::ChannelIsReadableOnNode(channel, target_node)) {
757 has_timestamps_ = true;
758 }
759}
760
761void TimestampMerger::PushMessageHeap(
Austin Schuhcde938c2020-02-02 17:30:07 -0800762 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
763 timestamp,
Austin Schuh6f3babe2020-01-26 20:34:50 -0800764 SplitMessageReader *split_message_reader) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700765 if (split_message_reader != nullptr) {
766 DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
767 [split_message_reader](
768 const std::tuple<monotonic_clock::time_point,
769 uint32_t, SplitMessageReader *>
770 x) {
771 return std::get<2>(x) == split_message_reader;
772 }) == message_heap_.end())
773 << ": Pushing message when it is already in the heap.";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800774
Austin Schuh2f8fd752020-09-01 22:38:28 -0700775 message_heap_.push_back(std::make_tuple(
776 std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800777
Austin Schuh2f8fd752020-09-01 22:38:28 -0700778 std::push_heap(message_heap_.begin(), message_heap_.end(),
779 &SplitMessageReaderHeapCompare);
780 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800781
782 // If we are just a data merger, don't wait for timestamps.
783 if (!has_timestamps_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700784 if (!message_heap_.empty()) {
785 channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
786 pushed_ = true;
787 } else {
788 // Remove ourselves if we are empty.
789 channel_merger_->Update(monotonic_clock::min_time, channel_index_);
790 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800791 }
792}
793
Austin Schuhcde938c2020-02-02 17:30:07 -0800794std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
795TimestampMerger::oldest_message() const {
796 CHECK_GT(message_heap_.size(), 0u);
797 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
798 oldest_message_reader = message_heap_.front();
799 return std::get<2>(oldest_message_reader)->oldest_message(channel_index_);
800}
801
802std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
803TimestampMerger::oldest_timestamp() const {
804 CHECK_GT(timestamp_heap_.size(), 0u);
805 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
806 oldest_message_reader = timestamp_heap_.front();
807 return std::get<2>(oldest_message_reader)
808 ->oldest_message(channel_index_, node_index_);
809}
810
Austin Schuh6f3babe2020-01-26 20:34:50 -0800811void TimestampMerger::PushTimestampHeap(
Austin Schuhcde938c2020-02-02 17:30:07 -0800812 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
813 timestamp,
Austin Schuh6f3babe2020-01-26 20:34:50 -0800814 SplitMessageReader *split_message_reader) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700815 if (split_message_reader != nullptr) {
816 DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
817 [split_message_reader](
818 const std::tuple<monotonic_clock::time_point,
819 uint32_t, SplitMessageReader *>
820 x) {
821 return std::get<2>(x) == split_message_reader;
822 }) == timestamp_heap_.end())
823 << ": Pushing timestamp when it is already in the heap.";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800824
Austin Schuh2f8fd752020-09-01 22:38:28 -0700825 timestamp_heap_.push_back(std::make_tuple(
826 std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800827
Austin Schuh2f8fd752020-09-01 22:38:28 -0700828 std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
829 SplitMessageReaderHeapCompare);
830 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800831
832 // If we are a timestamp merger, don't wait for data. Missing data will be
833 // caught at read time.
834 if (has_timestamps_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700835 if (!timestamp_heap_.empty()) {
836 channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
837 pushed_ = true;
838 } else {
839 // Remove ourselves if we are empty.
840 channel_merger_->Update(monotonic_clock::min_time, channel_index_);
841 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800842 }
843}
844
845std::tuple<monotonic_clock::time_point, uint32_t,
846 FlatbufferVector<MessageHeader>>
847TimestampMerger::PopMessageHeap() {
848 // Pop the oldest message reader pointer off the heap.
849 CHECK_GT(message_heap_.size(), 0u);
850 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
851 oldest_message_reader = message_heap_.front();
852
853 std::pop_heap(message_heap_.begin(), message_heap_.end(),
854 &SplitMessageReaderHeapCompare);
855 message_heap_.pop_back();
856
857 // Pop the oldest message. This re-pushes any messages from the reader to the
858 // message heap.
859 std::tuple<monotonic_clock::time_point, uint32_t,
860 FlatbufferVector<MessageHeader>>
861 oldest_message =
862 std::get<2>(oldest_message_reader)->PopOldest(channel_index_);
863
864 // Confirm that the time and queue_index we have recorded matches.
865 CHECK_EQ(std::get<0>(oldest_message), std::get<0>(oldest_message_reader));
866 CHECK_EQ(std::get<1>(oldest_message), std::get<1>(oldest_message_reader));
867
868 // Now, keep reading until we have found all duplicates.
Brian Silverman8a32ce62020-08-12 12:02:38 -0700869 while (!message_heap_.empty()) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800870 // See if it is a duplicate.
871 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
872 next_oldest_message_reader = message_heap_.front();
873
Austin Schuhcde938c2020-02-02 17:30:07 -0800874 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
875 next_oldest_message_time = std::get<2>(next_oldest_message_reader)
876 ->oldest_message(channel_index_);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800877
878 if (std::get<0>(next_oldest_message_time) == std::get<0>(oldest_message) &&
879 std::get<1>(next_oldest_message_time) == std::get<1>(oldest_message)) {
880 // Pop the message reader pointer.
881 std::pop_heap(message_heap_.begin(), message_heap_.end(),
882 &SplitMessageReaderHeapCompare);
883 message_heap_.pop_back();
884
885 // Pop the next oldest message. This re-pushes any messages from the
886 // reader.
887 std::tuple<monotonic_clock::time_point, uint32_t,
888 FlatbufferVector<MessageHeader>>
889 next_oldest_message = std::get<2>(next_oldest_message_reader)
890 ->PopOldest(channel_index_);
891
892 // And make sure the message matches in it's entirety.
893 CHECK(std::get<2>(oldest_message).span() ==
894 std::get<2>(next_oldest_message).span())
895 << ": Data at the same timestamp doesn't match.";
896 } else {
897 break;
898 }
899 }
900
901 return oldest_message;
902}
903
904std::tuple<monotonic_clock::time_point, uint32_t,
905 FlatbufferVector<MessageHeader>>
906TimestampMerger::PopTimestampHeap() {
907 // Pop the oldest message reader pointer off the heap.
908 CHECK_GT(timestamp_heap_.size(), 0u);
909
910 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
911 oldest_timestamp_reader = timestamp_heap_.front();
912
913 std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
914 &SplitMessageReaderHeapCompare);
915 timestamp_heap_.pop_back();
916
917 CHECK(node_index_ != -1) << ": Timestamps in a single node environment";
918
919 // Pop the oldest message. This re-pushes any timestamps from the reader to
920 // the timestamp heap.
921 std::tuple<monotonic_clock::time_point, uint32_t,
922 FlatbufferVector<MessageHeader>>
923 oldest_timestamp = std::get<2>(oldest_timestamp_reader)
Austin Schuh2f8fd752020-09-01 22:38:28 -0700924 ->PopOldestTimestamp(channel_index_, node_index_);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800925
926 // Confirm that the time we have recorded matches.
927 CHECK_EQ(std::get<0>(oldest_timestamp), std::get<0>(oldest_timestamp_reader));
928 CHECK_EQ(std::get<1>(oldest_timestamp), std::get<1>(oldest_timestamp_reader));
929
Austin Schuh2f8fd752020-09-01 22:38:28 -0700930 // Now, keep reading until we have found all duplicates.
931 while (!timestamp_heap_.empty()) {
932 // See if it is a duplicate.
933 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
934 next_oldest_timestamp_reader = timestamp_heap_.front();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800935
Austin Schuh2f8fd752020-09-01 22:38:28 -0700936 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
937 next_oldest_timestamp_time =
938 std::get<2>(next_oldest_timestamp_reader)
939 ->oldest_message(channel_index_, node_index_);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800940
Austin Schuh2f8fd752020-09-01 22:38:28 -0700941 if (std::get<0>(next_oldest_timestamp_time) ==
942 std::get<0>(oldest_timestamp) &&
943 std::get<1>(next_oldest_timestamp_time) ==
944 std::get<1>(oldest_timestamp)) {
945 // Pop the timestamp reader pointer.
946 std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
947 &SplitMessageReaderHeapCompare);
948 timestamp_heap_.pop_back();
949
950 // Pop the next oldest timestamp. This re-pushes any messages from the
951 // reader.
952 std::tuple<monotonic_clock::time_point, uint32_t,
953 FlatbufferVector<MessageHeader>>
954 next_oldest_timestamp =
955 std::get<2>(next_oldest_timestamp_reader)
956 ->PopOldestTimestamp(channel_index_, node_index_);
957
958 // And make sure the contents matches in it's entirety.
959 CHECK(std::get<2>(oldest_timestamp).span() ==
960 std::get<2>(next_oldest_timestamp).span())
961 << ": Data at the same timestamp doesn't match, "
962 << aos::FlatbufferToJson(std::get<2>(oldest_timestamp)) << " vs "
963 << aos::FlatbufferToJson(std::get<2>(next_oldest_timestamp)) << " "
964 << absl::BytesToHexString(std::string_view(
965 reinterpret_cast<const char *>(
966 std::get<2>(oldest_timestamp).span().data()),
967 std::get<2>(oldest_timestamp).span().size()))
968 << " vs "
969 << absl::BytesToHexString(std::string_view(
970 reinterpret_cast<const char *>(
971 std::get<2>(next_oldest_timestamp).span().data()),
972 std::get<2>(next_oldest_timestamp).span().size()));
973
974 } else {
975 break;
976 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800977 }
978
Austin Schuh2f8fd752020-09-01 22:38:28 -0700979 return oldest_timestamp;
Austin Schuh8bd96322020-02-13 21:18:22 -0800980}
981
Austin Schuh6f3babe2020-01-26 20:34:50 -0800982std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
983TimestampMerger::PopOldest() {
984 if (has_timestamps_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700985 VLOG(1) << "Looking for matching timestamp for "
986 << configuration::StrippedChannelToString(
987 configuration_->channels()->Get(channel_index_))
988 << " (" << channel_index_ << ") "
989 << " at " << std::get<0>(oldest_timestamp());
990
Austin Schuh8bd96322020-02-13 21:18:22 -0800991 // Read the timestamps.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800992 std::tuple<monotonic_clock::time_point, uint32_t,
993 FlatbufferVector<MessageHeader>>
994 oldest_timestamp = PopTimestampHeap();
995
996 TimestampMerger::DeliveryTimestamp timestamp;
997 timestamp.monotonic_event_time =
998 monotonic_clock::time_point(chrono::nanoseconds(
999 std::get<2>(oldest_timestamp).message().monotonic_sent_time()));
1000 timestamp.realtime_event_time =
1001 realtime_clock::time_point(chrono::nanoseconds(
1002 std::get<2>(oldest_timestamp).message().realtime_sent_time()));
1003
1004 // Consistency check.
1005 CHECK_EQ(timestamp.monotonic_event_time, std::get<0>(oldest_timestamp));
1006 CHECK_EQ(std::get<2>(oldest_timestamp).message().queue_index(),
1007 std::get<1>(oldest_timestamp));
1008
1009 monotonic_clock::time_point remote_timestamp_monotonic_time(
1010 chrono::nanoseconds(
1011 std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
1012
Austin Schuh8bd96322020-02-13 21:18:22 -08001013 // See if we have any data. If not, pass the problem up the chain.
Brian Silverman8a32ce62020-08-12 12:02:38 -07001014 if (message_heap_.empty()) {
Austin Schuhee711052020-08-24 16:06:09 -07001015 LOG(WARNING) << MaybeNodeName(configuration_->nodes()->Get(node_index_))
1016 << "No data to match timestamp on "
1017 << configuration::CleanedChannelToString(
1018 configuration_->channels()->Get(channel_index_))
1019 << " (" << channel_index_ << ")";
Austin Schuh8bd96322020-02-13 21:18:22 -08001020 return std::make_tuple(timestamp,
1021 std::move(std::get<2>(oldest_timestamp)));
1022 }
1023
Austin Schuh6f3babe2020-01-26 20:34:50 -08001024 while (true) {
Austin Schuhcde938c2020-02-02 17:30:07 -08001025 {
1026 // Ok, now try grabbing data until we find one which matches.
1027 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
1028 oldest_message_ref = oldest_message();
1029
1030 // Time at which the message was sent (this message is written from the
1031 // sending node's perspective.
1032 monotonic_clock::time_point remote_monotonic_time(chrono::nanoseconds(
1033 std::get<2>(oldest_message_ref)->monotonic_sent_time()));
1034
1035 if (remote_monotonic_time < remote_timestamp_monotonic_time) {
Austin Schuhee711052020-08-24 16:06:09 -07001036 LOG(WARNING) << configuration_->nodes()
1037 ->Get(node_index_)
1038 ->name()
1039 ->string_view()
1040 << " Undelivered message, skipping. Remote time is "
1041 << remote_monotonic_time << " timestamp is "
1042 << remote_timestamp_monotonic_time << " on channel "
1043 << configuration::StrippedChannelToString(
1044 configuration_->channels()->Get(channel_index_))
1045 << " (" << channel_index_ << ")";
Austin Schuhcde938c2020-02-02 17:30:07 -08001046 PopMessageHeap();
1047 continue;
1048 } else if (remote_monotonic_time > remote_timestamp_monotonic_time) {
Austin Schuhee711052020-08-24 16:06:09 -07001049 LOG(WARNING) << configuration_->nodes()
1050 ->Get(node_index_)
1051 ->name()
1052 ->string_view()
1053 << " Data not found. Remote time should be "
1054 << remote_timestamp_monotonic_time
1055 << ", message time is " << remote_monotonic_time
1056 << " on channel "
1057 << configuration::StrippedChannelToString(
1058 configuration_->channels()->Get(channel_index_))
Austin Schuh2f8fd752020-09-01 22:38:28 -07001059 << " (" << channel_index_ << ")"
1060 << (VLOG_IS_ON(1) ? DebugString() : "");
Austin Schuhcde938c2020-02-02 17:30:07 -08001061 return std::make_tuple(timestamp,
1062 std::move(std::get<2>(oldest_timestamp)));
1063 }
1064
1065 timestamp.monotonic_remote_time = remote_monotonic_time;
1066 }
1067
Austin Schuh2f8fd752020-09-01 22:38:28 -07001068 VLOG(1) << "Found matching data "
1069 << configuration::StrippedChannelToString(
1070 configuration_->channels()->Get(channel_index_))
1071 << " (" << channel_index_ << ")";
Austin Schuh6f3babe2020-01-26 20:34:50 -08001072 std::tuple<monotonic_clock::time_point, uint32_t,
1073 FlatbufferVector<MessageHeader>>
1074 oldest_message = PopMessageHeap();
1075
Austin Schuh6f3babe2020-01-26 20:34:50 -08001076 timestamp.realtime_remote_time =
1077 realtime_clock::time_point(chrono::nanoseconds(
1078 std::get<2>(oldest_message).message().realtime_sent_time()));
1079 timestamp.remote_queue_index =
1080 std::get<2>(oldest_message).message().queue_index();
1081
Austin Schuhcde938c2020-02-02 17:30:07 -08001082 CHECK_EQ(timestamp.monotonic_remote_time,
1083 remote_timestamp_monotonic_time);
1084
1085 CHECK_EQ(timestamp.remote_queue_index,
1086 std::get<2>(oldest_timestamp).message().remote_queue_index())
1087 << ": " << FlatbufferToJson(&std::get<2>(oldest_timestamp).message())
1088 << " data "
1089 << FlatbufferToJson(&std::get<2>(oldest_message).message());
Austin Schuh6f3babe2020-01-26 20:34:50 -08001090
Austin Schuh30dd5c52020-08-01 14:43:44 -07001091 return std::make_tuple(timestamp, std::move(std::get<2>(oldest_message)));
Austin Schuh6f3babe2020-01-26 20:34:50 -08001092 }
1093 } else {
1094 std::tuple<monotonic_clock::time_point, uint32_t,
1095 FlatbufferVector<MessageHeader>>
1096 oldest_message = PopMessageHeap();
1097
1098 TimestampMerger::DeliveryTimestamp timestamp;
1099 timestamp.monotonic_event_time =
1100 monotonic_clock::time_point(chrono::nanoseconds(
1101 std::get<2>(oldest_message).message().monotonic_sent_time()));
1102 timestamp.realtime_event_time =
1103 realtime_clock::time_point(chrono::nanoseconds(
1104 std::get<2>(oldest_message).message().realtime_sent_time()));
1105 timestamp.remote_queue_index = 0xffffffff;
1106
1107 CHECK_EQ(std::get<0>(oldest_message), timestamp.monotonic_event_time);
1108 CHECK_EQ(std::get<1>(oldest_message),
1109 std::get<2>(oldest_message).message().queue_index());
1110
Austin Schuh30dd5c52020-08-01 14:43:44 -07001111 return std::make_tuple(timestamp, std::move(std::get<2>(oldest_message)));
Austin Schuh6f3babe2020-01-26 20:34:50 -08001112 }
1113}
1114
Austin Schuh8bd96322020-02-13 21:18:22 -08001115void TimestampMerger::NoticeAtEnd() { channel_merger_->NoticeAtEnd(); }
1116
Austin Schuh6f3babe2020-01-26 20:34:50 -08001117namespace {
1118std::vector<std::unique_ptr<SplitMessageReader>> MakeSplitMessageReaders(
1119 const std::vector<std::vector<std::string>> &filenames) {
1120 CHECK_GT(filenames.size(), 0u);
1121 // Build up all the SplitMessageReaders.
1122 std::vector<std::unique_ptr<SplitMessageReader>> result;
1123 for (const std::vector<std::string> &filenames : filenames) {
1124 result.emplace_back(std::make_unique<SplitMessageReader>(filenames));
1125 }
1126 return result;
1127}
1128} // namespace
1129
1130ChannelMerger::ChannelMerger(
1131 const std::vector<std::vector<std::string>> &filenames)
1132 : split_message_readers_(MakeSplitMessageReaders(filenames)),
Austin Schuh97789fc2020-08-01 14:42:45 -07001133 log_file_header_(split_message_readers_[0]->raw_log_file_header()) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001134 // Now, confirm that the configuration matches for each and pick a start time.
1135 // Also return the list of possible nodes.
1136 for (const std::unique_ptr<SplitMessageReader> &reader :
1137 split_message_readers_) {
1138 CHECK(CompareFlatBuffer(log_file_header_.message().configuration(),
1139 reader->log_file_header()->configuration()))
1140 << ": Replaying log files with different configurations isn't "
1141 "supported";
1142 }
1143
1144 nodes_ = configuration::GetNodes(configuration());
1145}
1146
1147bool ChannelMerger::SetNode(const Node *target_node) {
1148 std::vector<SplitMessageReader *> split_message_readers;
1149 for (const std::unique_ptr<SplitMessageReader> &reader :
1150 split_message_readers_) {
1151 split_message_readers.emplace_back(reader.get());
1152 }
1153
1154 // Go find a log_file_header for this node.
1155 {
1156 bool found_node = false;
1157
1158 for (const std::unique_ptr<SplitMessageReader> &reader :
1159 split_message_readers_) {
James Kuszmaulfc273dc2020-05-09 17:56:19 -07001160 // In order to identify which logfile(s) map to the target node, do a
1161 // logical comparison of the nodes, by confirming that we are either in a
1162 // single-node setup (where the nodes will both be nullptr) or that the
1163 // node names match (but the other node fields--e.g., hostname lists--may
1164 // not).
1165 const bool both_null =
1166 reader->node() == nullptr && target_node == nullptr;
1167 const bool both_have_name =
1168 (reader->node() != nullptr) && (target_node != nullptr) &&
1169 (reader->node()->has_name() && target_node->has_name());
1170 const bool node_names_identical =
1171 both_have_name &&
1172 (reader->node()->name()->string_view() ==
1173 target_node->name()->string_view());
1174 if (both_null || node_names_identical) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001175 if (!found_node) {
1176 found_node = true;
1177 log_file_header_ = CopyFlatBuffer(reader->log_file_header());
Austin Schuhcde938c2020-02-02 17:30:07 -08001178 VLOG(1) << "Found log file " << reader->filename() << " with node "
1179 << FlatbufferToJson(reader->node()) << " start_time "
1180 << monotonic_start_time();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001181 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001182 // Find the earliest start time. That way, if we get a full log file
1183 // directly from the node, and a partial later, we start with the
1184 // full. Update our header to match that.
1185 const monotonic_clock::time_point new_monotonic_start_time(
1186 chrono::nanoseconds(
1187 reader->log_file_header()->monotonic_start_time()));
1188 const realtime_clock::time_point new_realtime_start_time(
1189 chrono::nanoseconds(
1190 reader->log_file_header()->realtime_start_time()));
1191
1192 if (monotonic_start_time() == monotonic_clock::min_time ||
1193 (new_monotonic_start_time != monotonic_clock::min_time &&
1194 new_monotonic_start_time < monotonic_start_time())) {
1195 log_file_header_.mutable_message()->mutate_monotonic_start_time(
1196 new_monotonic_start_time.time_since_epoch().count());
1197 log_file_header_.mutable_message()->mutate_realtime_start_time(
1198 new_realtime_start_time.time_since_epoch().count());
1199 VLOG(1) << "Updated log file " << reader->filename()
1200 << " with node " << FlatbufferToJson(reader->node())
1201 << " start_time " << new_monotonic_start_time;
1202 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001203 }
1204 }
1205 }
1206
1207 if (!found_node) {
1208 LOG(WARNING) << "Failed to find log file for node "
1209 << FlatbufferToJson(target_node);
1210 return false;
1211 }
1212 }
1213
1214 // Build up all the timestamp mergers. This connects up all the
1215 // SplitMessageReaders.
1216 timestamp_mergers_.reserve(configuration()->channels()->size());
1217 for (size_t channel_index = 0;
1218 channel_index < configuration()->channels()->size(); ++channel_index) {
1219 timestamp_mergers_.emplace_back(
1220 configuration(), split_message_readers, channel_index,
1221 configuration::GetNode(configuration(), target_node), this);
1222 }
1223
1224 // And prime everything.
Austin Schuh6f3babe2020-01-26 20:34:50 -08001225 for (std::unique_ptr<SplitMessageReader> &split_message_reader :
1226 split_message_readers_) {
Austin Schuhcde938c2020-02-02 17:30:07 -08001227 split_message_reader->QueueMessages(
1228 split_message_reader->monotonic_start_time());
Austin Schuh6f3babe2020-01-26 20:34:50 -08001229 }
1230
1231 node_ = configuration::GetNodeOrDie(configuration(), target_node);
1232 return true;
1233}
1234
Austin Schuh858c9f32020-08-31 16:56:12 -07001235monotonic_clock::time_point ChannelMerger::OldestMessageTime() const {
Brian Silverman8a32ce62020-08-12 12:02:38 -07001236 if (channel_heap_.empty()) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001237 return monotonic_clock::max_time;
1238 }
1239 return channel_heap_.front().first;
1240}
1241
1242void ChannelMerger::PushChannelHeap(monotonic_clock::time_point timestamp,
1243 int channel_index) {
1244 // Pop and recreate the heap if it has already been pushed. And since we are
1245 // pushing again, we don't need to clear pushed.
1246 if (timestamp_mergers_[channel_index].pushed()) {
Brian Silverman8a32ce62020-08-12 12:02:38 -07001247 const auto channel_iterator = std::find_if(
Austin Schuh6f3babe2020-01-26 20:34:50 -08001248 channel_heap_.begin(), channel_heap_.end(),
1249 [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
1250 return x.second == channel_index;
Brian Silverman8a32ce62020-08-12 12:02:38 -07001251 });
1252 DCHECK(channel_iterator != channel_heap_.end());
1253 if (std::get<0>(*channel_iterator) == timestamp) {
1254 // It's already in the heap, in the correct spot, so nothing
1255 // more for us to do here.
1256 return;
1257 }
1258 channel_heap_.erase(channel_iterator);
Austin Schuh6f3babe2020-01-26 20:34:50 -08001259 std::make_heap(channel_heap_.begin(), channel_heap_.end(),
1260 ChannelHeapCompare);
Austin Schuh8bd96322020-02-13 21:18:22 -08001261
1262 if (timestamp_mergers_[channel_index].has_timestamps()) {
Brian Silverman8a32ce62020-08-12 12:02:38 -07001263 const auto timestamp_iterator = std::find_if(
Austin Schuh8bd96322020-02-13 21:18:22 -08001264 timestamp_heap_.begin(), timestamp_heap_.end(),
1265 [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
1266 return x.second == channel_index;
Brian Silverman8a32ce62020-08-12 12:02:38 -07001267 });
1268 DCHECK(timestamp_iterator != timestamp_heap_.end());
1269 if (std::get<0>(*timestamp_iterator) == timestamp) {
1270 // It's already in the heap, in the correct spot, so nothing
1271 // more for us to do here.
1272 return;
1273 }
1274 timestamp_heap_.erase(timestamp_iterator);
Austin Schuh8bd96322020-02-13 21:18:22 -08001275 std::make_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
1276 ChannelHeapCompare);
1277 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001278 }
1279
Austin Schuh2f8fd752020-09-01 22:38:28 -07001280 if (timestamp == monotonic_clock::min_time) {
1281 timestamp_mergers_[channel_index].set_pushed(false);
1282 return;
1283 }
1284
Austin Schuh05b70472020-01-01 17:11:17 -08001285 channel_heap_.push_back(std::make_pair(timestamp, channel_index));
1286
1287 // The default sort puts the newest message first. Use a custom comparator to
1288 // put the oldest message first.
1289 std::push_heap(channel_heap_.begin(), channel_heap_.end(),
1290 ChannelHeapCompare);
Austin Schuh8bd96322020-02-13 21:18:22 -08001291
1292 if (timestamp_mergers_[channel_index].has_timestamps()) {
1293 timestamp_heap_.push_back(std::make_pair(timestamp, channel_index));
1294 std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
1295 ChannelHeapCompare);
1296 }
Austin Schuh05b70472020-01-01 17:11:17 -08001297}
1298
Austin Schuh2f8fd752020-09-01 22:38:28 -07001299void ChannelMerger::VerifyHeaps() {
1300 {
1301 std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
1302 channel_heap_;
1303 std::make_heap(channel_heap.begin(), channel_heap.end(),
1304 &ChannelHeapCompare);
1305
1306 for (size_t i = 0; i < channel_heap_.size(); ++i) {
1307 CHECK(channel_heap_[i] == channel_heap[i]) << ": Heaps diverged...";
1308 CHECK_EQ(std::get<0>(channel_heap[i]),
1309 timestamp_mergers_[std::get<1>(channel_heap[i])]
1310 .channel_merger_time());
1311 }
1312 }
1313 {
1314 std::vector<std::pair<monotonic_clock::time_point, int>> timestamp_heap =
1315 timestamp_heap_;
1316 std::make_heap(timestamp_heap.begin(), timestamp_heap.end(),
1317 &ChannelHeapCompare);
1318
1319 for (size_t i = 0; i < timestamp_heap_.size(); ++i) {
1320 CHECK(timestamp_heap_[i] == timestamp_heap[i]) << ": Heaps diverged...";
1321 }
1322 }
1323}
1324
Austin Schuh6f3babe2020-01-26 20:34:50 -08001325std::tuple<TimestampMerger::DeliveryTimestamp, int,
1326 FlatbufferVector<MessageHeader>>
1327ChannelMerger::PopOldest() {
Austin Schuh8bd96322020-02-13 21:18:22 -08001328 CHECK_GT(channel_heap_.size(), 0u);
Austin Schuh05b70472020-01-01 17:11:17 -08001329 std::pair<monotonic_clock::time_point, int> oldest_channel_data =
1330 channel_heap_.front();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001331 int channel_index = oldest_channel_data.second;
Austin Schuh05b70472020-01-01 17:11:17 -08001332 std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
1333 &ChannelHeapCompare);
1334 channel_heap_.pop_back();
Austin Schuh8bd96322020-02-13 21:18:22 -08001335
Austin Schuh6f3babe2020-01-26 20:34:50 -08001336 timestamp_mergers_[channel_index].set_pushed(false);
Austin Schuh05b70472020-01-01 17:11:17 -08001337
Austin Schuh6f3babe2020-01-26 20:34:50 -08001338 TimestampMerger *merger = &timestamp_mergers_[channel_index];
Austin Schuh05b70472020-01-01 17:11:17 -08001339
Austin Schuh8bd96322020-02-13 21:18:22 -08001340 if (merger->has_timestamps()) {
1341 CHECK_GT(timestamp_heap_.size(), 0u);
1342 std::pair<monotonic_clock::time_point, int> oldest_timestamp_data =
1343 timestamp_heap_.front();
1344 CHECK(oldest_timestamp_data == oldest_channel_data)
1345 << ": Timestamp heap out of sync.";
1346 std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
1347 &ChannelHeapCompare);
1348 timestamp_heap_.pop_back();
1349 }
1350
Austin Schuhcde938c2020-02-02 17:30:07 -08001351 // Merger handles any queueing needed from here.
Austin Schuh6f3babe2020-01-26 20:34:50 -08001352 std::tuple<TimestampMerger::DeliveryTimestamp,
1353 FlatbufferVector<MessageHeader>>
1354 message = merger->PopOldest();
Brian Silverman8a32ce62020-08-12 12:02:38 -07001355 DCHECK_EQ(std::get<0>(message).monotonic_event_time,
1356 oldest_channel_data.first)
1357 << ": channel_heap_ was corrupted for " << channel_index << ": "
1358 << DebugString();
Austin Schuh05b70472020-01-01 17:11:17 -08001359
Austin Schuh2f8fd752020-09-01 22:38:28 -07001360 CHECK_GE(std::get<0>(message).monotonic_event_time, last_popped_time_)
1361 << ": " << MaybeNodeName(log_file_header()->node())
1362 << "Messages came off the queue out of order. " << DebugString();
1363 last_popped_time_ = std::get<0>(message).monotonic_event_time;
1364
1365 VLOG(1) << "Popped " << last_popped_time_ << " "
1366 << configuration::StrippedChannelToString(
1367 configuration()->channels()->Get(channel_index))
1368 << " (" << channel_index << ")";
1369
Austin Schuh6f3babe2020-01-26 20:34:50 -08001370 return std::make_tuple(std::get<0>(message), channel_index,
1371 std::move(std::get<1>(message)));
1372}
1373
Austin Schuhcde938c2020-02-02 17:30:07 -08001374std::string SplitMessageReader::MessageHeaderQueue::DebugString() const {
1375 std::stringstream ss;
1376 for (size_t i = 0; i < data_.size(); ++i) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001377 if (i < 5 || i + 5 > data_.size()) {
1378 if (timestamps) {
1379 ss << " msg: ";
1380 } else {
1381 ss << " timestamp: ";
1382 }
1383 ss << monotonic_clock::time_point(
1384 chrono::nanoseconds(data_[i].message().monotonic_sent_time()))
Austin Schuhcde938c2020-02-02 17:30:07 -08001385 << " ("
Austin Schuh2f8fd752020-09-01 22:38:28 -07001386 << realtime_clock::time_point(
1387 chrono::nanoseconds(data_[i].message().realtime_sent_time()))
1388 << ") " << data_[i].message().queue_index();
1389 if (timestamps) {
1390 ss << " <- remote "
1391 << monotonic_clock::time_point(chrono::nanoseconds(
1392 data_[i].message().monotonic_remote_time()))
1393 << " ("
1394 << realtime_clock::time_point(chrono::nanoseconds(
1395 data_[i].message().realtime_remote_time()))
1396 << ")";
1397 }
1398 ss << "\n";
1399 } else if (i == 5) {
1400 ss << " ...\n";
Austin Schuh6f3babe2020-01-26 20:34:50 -08001401 }
Austin Schuhcde938c2020-02-02 17:30:07 -08001402 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001403
Austin Schuhcde938c2020-02-02 17:30:07 -08001404 return ss.str();
1405}
Austin Schuh6f3babe2020-01-26 20:34:50 -08001406
Austin Schuhcde938c2020-02-02 17:30:07 -08001407std::string SplitMessageReader::DebugString(int channel) const {
1408 std::stringstream ss;
1409 ss << "[\n";
1410 ss << channels_[channel].data.DebugString();
1411 ss << " ]";
1412 return ss.str();
1413}
Austin Schuh6f3babe2020-01-26 20:34:50 -08001414
Austin Schuhcde938c2020-02-02 17:30:07 -08001415std::string SplitMessageReader::DebugString(int channel, int node_index) const {
1416 std::stringstream ss;
1417 ss << "[\n";
1418 ss << channels_[channel].timestamps[node_index].DebugString();
1419 ss << " ]";
1420 return ss.str();
1421}
1422
1423std::string TimestampMerger::DebugString() const {
1424 std::stringstream ss;
1425
1426 if (timestamp_heap_.size() > 0) {
1427 ss << " timestamp_heap {\n";
1428 std::vector<
1429 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
1430 timestamp_heap = timestamp_heap_;
1431 while (timestamp_heap.size() > 0u) {
1432 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
1433 oldest_timestamp_reader = timestamp_heap.front();
1434
1435 ss << " " << std::get<2>(oldest_timestamp_reader) << " "
1436 << std::get<0>(oldest_timestamp_reader) << " queue_index ("
1437 << std::get<1>(oldest_timestamp_reader) << ") ttq "
1438 << std::get<2>(oldest_timestamp_reader)->time_to_queue() << " "
1439 << std::get<2>(oldest_timestamp_reader)->filename() << " -> "
1440 << std::get<2>(oldest_timestamp_reader)
1441 ->DebugString(channel_index_, node_index_)
1442 << "\n";
1443
1444 std::pop_heap(timestamp_heap.begin(), timestamp_heap.end(),
1445 &SplitMessageReaderHeapCompare);
1446 timestamp_heap.pop_back();
1447 }
1448 ss << " }\n";
1449 }
1450
1451 ss << " message_heap {\n";
1452 {
1453 std::vector<
1454 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
1455 message_heap = message_heap_;
Brian Silverman8a32ce62020-08-12 12:02:38 -07001456 while (!message_heap.empty()) {
Austin Schuhcde938c2020-02-02 17:30:07 -08001457 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
1458 oldest_message_reader = message_heap.front();
1459
1460 ss << " " << std::get<2>(oldest_message_reader) << " "
1461 << std::get<0>(oldest_message_reader) << " queue_index ("
1462 << std::get<1>(oldest_message_reader) << ") ttq "
1463 << std::get<2>(oldest_message_reader)->time_to_queue() << " "
1464 << std::get<2>(oldest_message_reader)->filename() << " -> "
1465 << std::get<2>(oldest_message_reader)->DebugString(channel_index_)
1466 << "\n";
1467
1468 std::pop_heap(message_heap.begin(), message_heap.end(),
1469 &SplitMessageReaderHeapCompare);
1470 message_heap.pop_back();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001471 }
Austin Schuh05b70472020-01-01 17:11:17 -08001472 }
Austin Schuhcde938c2020-02-02 17:30:07 -08001473 ss << " }";
1474
1475 return ss.str();
1476}
1477
1478std::string ChannelMerger::DebugString() const {
1479 std::stringstream ss;
1480 ss << "start_time " << realtime_start_time() << " " << monotonic_start_time()
1481 << "\n";
1482 ss << "channel_heap {\n";
1483 std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
1484 channel_heap_;
Brian Silverman8a32ce62020-08-12 12:02:38 -07001485 while (!channel_heap.empty()) {
Austin Schuhcde938c2020-02-02 17:30:07 -08001486 std::tuple<monotonic_clock::time_point, int> channel = channel_heap.front();
1487 ss << " " << std::get<0>(channel) << " (" << std::get<1>(channel) << ") "
1488 << configuration::CleanedChannelToString(
1489 configuration()->channels()->Get(std::get<1>(channel)))
1490 << "\n";
1491
1492 ss << timestamp_mergers_[std::get<1>(channel)].DebugString() << "\n";
1493
1494 std::pop_heap(channel_heap.begin(), channel_heap.end(),
1495 &ChannelHeapCompare);
1496 channel_heap.pop_back();
1497 }
1498 ss << "}";
1499
1500 return ss.str();
Austin Schuh05b70472020-01-01 17:11:17 -08001501}
1502
Austin Schuhee711052020-08-24 16:06:09 -07001503std::string MaybeNodeName(const Node *node) {
1504 if (node != nullptr) {
1505 return node->name()->str() + " ";
1506 }
1507 return "";
1508}
1509
Austin Schuha36c8902019-12-30 18:07:15 -08001510} // namespace logger
1511} // namespace aos