blob: 938ede7fe940713b9f8c2e5a15bb30ae3e1603ca [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
4#include <limits.h>
5#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
8
9#include <vector>
10
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
Austin Schuha36c8902019-12-30 18:07:15 -080013#include "aos/events/logging/logger_generated.h"
Austin Schuhfa895892020-01-07 20:07:41 -080014#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080015#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "gflags/gflags.h"
18#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080019
20DEFINE_int32(flush_size, 1000000,
21 "Number of outstanding bytes to allow before flushing to disk.");
22
23namespace aos {
24namespace logger {
25
Austin Schuh05b70472020-01-01 17:11:17 -080026namespace chrono = std::chrono;
27
Austin Schuha36c8902019-12-30 18:07:15 -080028DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
Austin Schuh6f3babe2020-01-26 20:34:50 -080029 : filename_(filename) {
30 util::MkdirP(filename, 0777);
31 fd_ = open(std::string(filename).c_str(),
32 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
33 VLOG(1) << "Opened " << filename << " for writing";
34 PCHECK(fd_ != -1) << ": Failed to open " << filename << " for writing";
Austin Schuha36c8902019-12-30 18:07:15 -080035}
36
37DetachedBufferWriter::~DetachedBufferWriter() {
38 Flush();
39 PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
Austin Schuh2f8fd752020-09-01 22:38:28 -070040 VLOG(1) << "Closed " << filename_;
41}
42
43DetachedBufferWriter::DetachedBufferWriter(
44 DetachedBufferWriter &&other) {
45 *this = std::move(other);
46}
47
48DetachedBufferWriter &DetachedBufferWriter::operator=(
49 DetachedBufferWriter &&other) {
50 Flush();
51 std::swap(filename_, other.filename_);
52 std::swap(fd_, other.fd_);
53 std::swap(queued_size_, other.queued_size_);
54 std::swap(written_size_, other.written_size_);
55 std::swap(queue_, other.queue_);
56 std::swap(iovec_, other.iovec_);
57 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -080058}
59
60void DetachedBufferWriter::QueueSizedFlatbuffer(
61 flatbuffers::FlatBufferBuilder *fbb) {
62 QueueSizedFlatbuffer(fbb->Release());
63}
64
Austin Schuhde031b72020-01-10 19:34:41 -080065void DetachedBufferWriter::WriteSizedFlatbuffer(
66 absl::Span<const uint8_t> span) {
67 // Cheat aggressively... Write out the queued up data, and then write this
68 // data once without buffering. It is hard to make a DetachedBuffer out of
69 // this data, and we don't want to worry about lifetimes.
70 Flush();
71 iovec_.clear();
72 iovec_.reserve(1);
73
74 struct iovec n;
75 n.iov_base = const_cast<uint8_t *>(span.data());
76 n.iov_len = span.size();
77 iovec_.emplace_back(n);
78
79 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
80
81 PCHECK(written == static_cast<ssize_t>(n.iov_len))
82 << ": Wrote " << written << " expected " << n.iov_len;
Brian Silverman98360e22020-04-28 16:51:20 -070083 written_size_ += written;
Austin Schuhde031b72020-01-10 19:34:41 -080084}
85
Austin Schuha36c8902019-12-30 18:07:15 -080086void DetachedBufferWriter::QueueSizedFlatbuffer(
87 flatbuffers::DetachedBuffer &&buffer) {
88 queued_size_ += buffer.size();
89 queue_.emplace_back(std::move(buffer));
90
91 // Flush if we are at the max number of iovs per writev, or have written
92 // enough data. Otherwise writev will fail with an invalid argument.
93 if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
94 queue_.size() == IOV_MAX) {
95 Flush();
96 }
97}
98
99void DetachedBufferWriter::Flush() {
100 if (queue_.size() == 0u) {
101 return;
102 }
103 iovec_.clear();
104 iovec_.reserve(queue_.size());
105 size_t counted_size = 0;
106 for (size_t i = 0; i < queue_.size(); ++i) {
107 struct iovec n;
108 n.iov_base = queue_[i].data();
109 n.iov_len = queue_[i].size();
110 counted_size += n.iov_len;
111 iovec_.emplace_back(std::move(n));
112 }
113 CHECK_EQ(counted_size, queued_size_);
114 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
115
116 PCHECK(written == static_cast<ssize_t>(queued_size_))
117 << ": Wrote " << written << " expected " << queued_size_;
Brian Silverman98360e22020-04-28 16:51:20 -0700118 written_size_ += written;
Austin Schuha36c8902019-12-30 18:07:15 -0800119
120 queued_size_ = 0;
121 queue_.clear();
122 // TODO(austin): Handle partial writes in some way other than crashing...
123}
124
125flatbuffers::Offset<MessageHeader> PackMessage(
126 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
127 int channel_index, LogType log_type) {
128 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
129
130 switch (log_type) {
131 case LogType::kLogMessage:
132 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800133 case LogType::kLogRemoteMessage:
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700134 data_offset = fbb->CreateVector(
135 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800136 break;
137
138 case LogType::kLogDeliveryTimeOnly:
139 break;
140 }
141
142 MessageHeader::Builder message_header_builder(*fbb);
143 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800144
145 switch (log_type) {
146 case LogType::kLogRemoteMessage:
147 message_header_builder.add_queue_index(context.remote_queue_index);
148 message_header_builder.add_monotonic_sent_time(
149 context.monotonic_remote_time.time_since_epoch().count());
150 message_header_builder.add_realtime_sent_time(
151 context.realtime_remote_time.time_since_epoch().count());
152 break;
153
154 case LogType::kLogMessage:
155 case LogType::kLogMessageAndDeliveryTime:
156 case LogType::kLogDeliveryTimeOnly:
157 message_header_builder.add_queue_index(context.queue_index);
158 message_header_builder.add_monotonic_sent_time(
159 context.monotonic_event_time.time_since_epoch().count());
160 message_header_builder.add_realtime_sent_time(
161 context.realtime_event_time.time_since_epoch().count());
162 break;
163 }
Austin Schuha36c8902019-12-30 18:07:15 -0800164
165 switch (log_type) {
166 case LogType::kLogMessage:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800167 case LogType::kLogRemoteMessage:
Austin Schuha36c8902019-12-30 18:07:15 -0800168 message_header_builder.add_data(data_offset);
169 break;
170
171 case LogType::kLogMessageAndDeliveryTime:
172 message_header_builder.add_data(data_offset);
173 [[fallthrough]];
174
175 case LogType::kLogDeliveryTimeOnly:
176 message_header_builder.add_monotonic_remote_time(
177 context.monotonic_remote_time.time_since_epoch().count());
178 message_header_builder.add_realtime_remote_time(
179 context.realtime_remote_time.time_since_epoch().count());
180 message_header_builder.add_remote_queue_index(context.remote_queue_index);
181 break;
182 }
183
184 return message_header_builder.Finish();
185}
186
Austin Schuh05b70472020-01-01 17:11:17 -0800187SpanReader::SpanReader(std::string_view filename)
Austin Schuh6f3babe2020-01-26 20:34:50 -0800188 : filename_(filename),
189 fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
Austin Schuh05b70472020-01-01 17:11:17 -0800190 PCHECK(fd_ != -1) << ": Failed to open " << filename;
191}
192
193absl::Span<const uint8_t> SpanReader::ReadMessage() {
194 // Make sure we have enough for the size.
195 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
196 if (!ReadBlock()) {
197 return absl::Span<const uint8_t>();
198 }
199 }
200
201 // Now make sure we have enough for the message.
202 const size_t data_size =
203 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
204 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -0800205 if (data_size == sizeof(flatbuffers::uoffset_t)) {
206 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
207 LOG(ERROR) << " Rest of log file is "
208 << absl::BytesToHexString(std::string_view(
209 reinterpret_cast<const char *>(data_.data() +
210 consumed_data_),
211 data_.size() - consumed_data_));
212 return absl::Span<const uint8_t>();
213 }
Austin Schuh05b70472020-01-01 17:11:17 -0800214 while (data_.size() < consumed_data_ + data_size) {
215 if (!ReadBlock()) {
216 return absl::Span<const uint8_t>();
217 }
218 }
219
220 // And return it, consuming the data.
221 const uint8_t *data_ptr = data_.data() + consumed_data_;
222
223 consumed_data_ += data_size;
224
225 return absl::Span<const uint8_t>(data_ptr, data_size);
226}
227
228bool SpanReader::MessageAvailable() {
229 // Are we big enough to read the size?
230 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
231 return false;
232 }
233
234 // Then, are we big enough to read the full message?
235 const size_t data_size =
236 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
237 sizeof(flatbuffers::uoffset_t);
238 if (data_.size() < consumed_data_ + data_size) {
239 return false;
240 }
241
242 return true;
243}
244
245bool SpanReader::ReadBlock() {
246 if (end_of_file_) {
247 return false;
248 }
249
250 // Appends 256k. This is enough that the read call is efficient. We don't
251 // want to spend too much time reading small chunks because the syscalls for
252 // that will be expensive.
253 constexpr size_t kReadSize = 256 * 1024;
254
255 // Strip off any unused data at the front.
256 if (consumed_data_ != 0) {
257 data_.erase(data_.begin(), data_.begin() + consumed_data_);
258 consumed_data_ = 0;
259 }
260
261 const size_t starting_size = data_.size();
262
263 // This should automatically grow the backing store. It won't shrink if we
264 // get a small chunk later. This reduces allocations when we want to append
265 // more data.
266 data_.resize(data_.size() + kReadSize);
267
268 ssize_t count = read(fd_, &data_[starting_size], kReadSize);
269 data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
270 if (count == 0) {
271 end_of_file_ = true;
272 return false;
273 }
274 PCHECK(count > 0);
275
276 return true;
277}
278
Austin Schuh6f3babe2020-01-26 20:34:50 -0800279FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename) {
280 SpanReader span_reader(filename);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800281 absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
282
283 // Make sure something was read.
Austin Schuh97789fc2020-08-01 14:42:45 -0700284 CHECK(config_data != absl::Span<const uint8_t>())
285 << ": Failed to read header from: " << filename;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800286
Austin Schuh5212cad2020-09-09 23:12:09 -0700287 // And copy the config so we have it forever, removing the size prefix.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800288 std::vector<uint8_t> data(
289 config_data.begin() + sizeof(flatbuffers::uoffset_t), config_data.end());
290 return FlatbufferVector<LogFileHeader>(std::move(data));
291}
292
Austin Schuh5212cad2020-09-09 23:12:09 -0700293FlatbufferVector<MessageHeader> ReadNthMessage(std::string_view filename,
294 size_t n) {
295 SpanReader span_reader(filename);
296 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
297 for (size_t i = 0; i < n + 1; ++i) {
298 data_span = span_reader.ReadMessage();
299
300 // Make sure something was read.
301 CHECK(data_span != absl::Span<const uint8_t>())
302 << ": Failed to read data from: " << filename;
303 }
304
305 // And copy the data so we have it forever.
306 std::vector<uint8_t> data(data_span.begin() + sizeof(flatbuffers::uoffset_t),
307 data_span.end());
308 return FlatbufferVector<MessageHeader>(std::move(data));
309}
310
Austin Schuh05b70472020-01-01 17:11:17 -0800311MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -0700312 : span_reader_(filename),
313 raw_log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
Austin Schuh05b70472020-01-01 17:11:17 -0800314 // Make sure we have enough to read the size.
Austin Schuh97789fc2020-08-01 14:42:45 -0700315 absl::Span<const uint8_t> header_data = span_reader_.ReadMessage();
Austin Schuh05b70472020-01-01 17:11:17 -0800316
317 // Make sure something was read.
Austin Schuh97789fc2020-08-01 14:42:45 -0700318 CHECK(header_data != absl::Span<const uint8_t>())
319 << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -0800320
Austin Schuh97789fc2020-08-01 14:42:45 -0700321 // And copy the header data so we have it forever.
322 std::vector<uint8_t> header_data_copy(
323 header_data.begin() + sizeof(flatbuffers::uoffset_t), header_data.end());
324 raw_log_file_header_ =
325 FlatbufferVector<LogFileHeader>(std::move(header_data_copy));
Austin Schuh05b70472020-01-01 17:11:17 -0800326
Austin Schuhcde938c2020-02-02 17:30:07 -0800327 max_out_of_order_duration_ =
Austin Schuh2f8fd752020-09-01 22:38:28 -0700328 chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -0800329
330 VLOG(1) << "Opened " << filename << " as node "
331 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -0800332}
333
334std::optional<FlatbufferVector<MessageHeader>> MessageReader::ReadMessage() {
335 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
336 if (msg_data == absl::Span<const uint8_t>()) {
337 return std::nullopt;
338 }
339
340 FlatbufferVector<MessageHeader> result{std::vector<uint8_t>(
341 msg_data.begin() + sizeof(flatbuffers::uoffset_t), msg_data.end())};
342
343 const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
344 chrono::nanoseconds(result.message().monotonic_sent_time()));
345
346 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuh8bd96322020-02-13 21:18:22 -0800347 VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(result);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800348 return std::move(result);
Austin Schuh05b70472020-01-01 17:11:17 -0800349}
350
Austin Schuh6f3babe2020-01-26 20:34:50 -0800351SplitMessageReader::SplitMessageReader(
Austin Schuhfa895892020-01-07 20:07:41 -0800352 const std::vector<std::string> &filenames)
353 : filenames_(filenames),
Austin Schuh97789fc2020-08-01 14:42:45 -0700354 log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
Austin Schuhfa895892020-01-07 20:07:41 -0800355 CHECK(NextLogFile()) << ": filenames is empty. Need files to read.";
356
Austin Schuh6f3babe2020-01-26 20:34:50 -0800357 // Grab any log file header. They should all match (and we will check as we
358 // open more of them).
Austin Schuh97789fc2020-08-01 14:42:45 -0700359 log_file_header_ = message_reader_->raw_log_file_header();
Austin Schuhfa895892020-01-07 20:07:41 -0800360
Austin Schuh2f8fd752020-09-01 22:38:28 -0700361 for (size_t i = 1; i < filenames_.size(); ++i) {
362 MessageReader message_reader(filenames_[i]);
363
364 const monotonic_clock::time_point new_monotonic_start_time(
365 chrono::nanoseconds(
366 message_reader.log_file_header()->monotonic_start_time()));
367 const realtime_clock::time_point new_realtime_start_time(
368 chrono::nanoseconds(
369 message_reader.log_file_header()->realtime_start_time()));
370
371 // There are 2 types of part files. Part files from before time estimation
372 // has started, and part files after. We don't declare a log file "started"
373 // until time estimation is up. And once a log file starts, it should never
374 // stop again, and should remain constant.
375 // To compare both types of headers, we mutate our saved copy of the header
376 // to match the next chunk by updating time if we detect a stopped ->
377 // started transition.
378 if (monotonic_start_time() == monotonic_clock::min_time) {
379 CHECK_EQ(realtime_start_time(), realtime_clock::min_time);
380 // We should only be missing the monotonic start time when logging data
381 // for remote nodes. We don't have a good way to deteremine the remote
382 // realtime offset, so it shouldn't be filled out.
383 // TODO(austin): If we have a good way, feel free to fill it out. It
384 // probably won't be better than we could do in post though with the same
385 // data.
386 CHECK(!log_file_header_.mutable_message()->has_realtime_start_time());
387 if (new_monotonic_start_time != monotonic_clock::min_time) {
388 // If we finally found our start time, update the header. Do this once
389 // because it should never change again.
390 log_file_header_.mutable_message()->mutate_monotonic_start_time(
391 new_monotonic_start_time.time_since_epoch().count());
392 log_file_header_.mutable_message()->mutate_realtime_start_time(
393 new_realtime_start_time.time_since_epoch().count());
394 }
395 }
396
Austin Schuh64fab802020-09-09 22:47:47 -0700397 // We don't have a good way to set the realtime start time on remote nodes.
398 // Confirm it remains consistent.
399 CHECK_EQ(log_file_header_.mutable_message()->has_realtime_start_time(),
400 message_reader.log_file_header()->has_realtime_start_time());
401
402 // Parts index will *not* match unless we set them to match. We only want
403 // to accept the start time and parts mismatching, so set them.
404 log_file_header_.mutable_message()->mutate_parts_index(
405 message_reader.log_file_header()->parts_index());
406
Austin Schuh2f8fd752020-09-01 22:38:28 -0700407 // Now compare that the headers match.
Austin Schuh64fab802020-09-09 22:47:47 -0700408 if (!CompareFlatBuffer(message_reader.raw_log_file_header(),
409 log_file_header_)) {
410 if (message_reader.log_file_header()->has_logger_uuid() &&
411 log_file_header_.message().has_logger_uuid() &&
412 message_reader.log_file_header()->logger_uuid()->string_view() !=
413 log_file_header_.message().logger_uuid()->string_view()) {
414 LOG(FATAL) << "Logger UUIDs don't match between log file chunks "
415 << filenames_[0] << " and " << filenames_[i]
416 << ", this is not supported.";
417 }
418 if (message_reader.log_file_header()->has_parts_uuid() &&
419 log_file_header_.message().has_parts_uuid() &&
420 message_reader.log_file_header()->parts_uuid()->string_view() !=
421 log_file_header_.message().parts_uuid()->string_view()) {
422 LOG(FATAL) << "Parts UUIDs don't match between log file chunks "
423 << filenames_[0] << " and " << filenames_[i]
424 << ", this is not supported.";
425 }
426
427 LOG(FATAL) << "Header is different between log file chunks "
428 << filenames_[0] << " and " << filenames_[i]
429 << ", this is not supported.";
430 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700431 }
Austin Schuh64fab802020-09-09 22:47:47 -0700432 // Put the parts index back to the first log file chunk.
433 log_file_header_.mutable_message()->mutate_parts_index(
434 message_reader_->log_file_header()->parts_index());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700435
Austin Schuh6f3babe2020-01-26 20:34:50 -0800436 // Setup per channel state.
Austin Schuh05b70472020-01-01 17:11:17 -0800437 channels_.resize(configuration()->channels()->size());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800438 for (ChannelData &channel_data : channels_) {
439 channel_data.data.split_reader = this;
440 // Build up the timestamp list.
441 if (configuration::MultiNode(configuration())) {
442 channel_data.timestamps.resize(configuration()->nodes()->size());
443 for (MessageHeaderQueue &queue : channel_data.timestamps) {
444 queue.timestamps = true;
445 queue.split_reader = this;
446 }
447 }
448 }
Austin Schuh05b70472020-01-01 17:11:17 -0800449
Austin Schuh6f3babe2020-01-26 20:34:50 -0800450 // Build up channels_to_write_ as an optimization to make it fast to figure
451 // out which datastructure to place any new data from a channel on.
452 for (const Channel *channel : *configuration()->channels()) {
453 // This is the main case. We will only see data on this node.
454 if (configuration::ChannelIsSendableOnNode(channel, node())) {
455 channels_to_write_.emplace_back(
456 &channels_[channels_to_write_.size()].data);
457 } else
458 // If we can't send, but can receive, we should be able to see
459 // timestamps here.
460 if (configuration::ChannelIsReadableOnNode(channel, node())) {
461 channels_to_write_.emplace_back(
462 &(channels_[channels_to_write_.size()]
463 .timestamps[configuration::GetNodeIndex(configuration(),
464 node())]));
465 } else {
466 channels_to_write_.emplace_back(nullptr);
467 }
468 }
Austin Schuh05b70472020-01-01 17:11:17 -0800469}
470
Austin Schuh6f3babe2020-01-26 20:34:50 -0800471bool SplitMessageReader::NextLogFile() {
Austin Schuhfa895892020-01-07 20:07:41 -0800472 if (next_filename_index_ == filenames_.size()) {
473 return false;
474 }
475 message_reader_ =
476 std::make_unique<MessageReader>(filenames_[next_filename_index_]);
477
478 // We can't support the config diverging between two log file headers. See if
479 // they are the same.
480 if (next_filename_index_ != 0) {
Austin Schuh64fab802020-09-09 22:47:47 -0700481 // In order for the headers to identically compare, they need to have the
482 // same parts_index. Rewrite the saved header with the new parts_index,
483 // compare, and then restore.
484 const int32_t original_parts_index =
485 log_file_header_.message().parts_index();
486 log_file_header_.mutable_message()->mutate_parts_index(
487 message_reader_->log_file_header()->parts_index());
488
Austin Schuh97789fc2020-08-01 14:42:45 -0700489 CHECK(CompareFlatBuffer(message_reader_->raw_log_file_header(),
490 log_file_header_))
Austin Schuhfa895892020-01-07 20:07:41 -0800491 << ": Header is different between log file chunks "
492 << filenames_[next_filename_index_] << " and "
493 << filenames_[next_filename_index_ - 1] << ", this is not supported.";
Austin Schuh64fab802020-09-09 22:47:47 -0700494
495 log_file_header_.mutable_message()->mutate_parts_index(
496 original_parts_index);
Austin Schuhfa895892020-01-07 20:07:41 -0800497 }
498
499 ++next_filename_index_;
500 return true;
501}
502
Austin Schuh6f3babe2020-01-26 20:34:50 -0800503bool SplitMessageReader::QueueMessages(
Austin Schuhcde938c2020-02-02 17:30:07 -0800504 monotonic_clock::time_point last_dequeued_time) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800505 // TODO(austin): Once we are happy that everything works, read a 256kb chunk
506 // to reduce the need to re-heap down below.
Austin Schuhcde938c2020-02-02 17:30:07 -0800507
508 // Special case no more data. Otherwise we blow up on the CHECK statement
509 // confirming that we have enough data queued.
510 if (at_end_) {
511 return false;
512 }
513
514 // If this isn't the first time around, confirm that we had enough data queued
515 // to follow the contract.
516 if (time_to_queue_ != monotonic_clock::min_time) {
517 CHECK_LE(last_dequeued_time,
518 newest_timestamp() - max_out_of_order_duration())
519 << " node " << FlatbufferToJson(node()) << " on " << this;
520
521 // Bail if there is enough data already queued.
522 if (last_dequeued_time < time_to_queue_) {
Austin Schuhee711052020-08-24 16:06:09 -0700523 VLOG(1) << MaybeNodeName(target_node_) << "All up to date on " << this
524 << ", dequeued " << last_dequeued_time << " queue time "
525 << time_to_queue_;
Austin Schuhcde938c2020-02-02 17:30:07 -0800526 return true;
527 }
528 } else {
529 // Startup takes a special dance. We want to queue up until the start time,
530 // but we then want to find the next message to read. The conservative
531 // answer is to immediately trigger a second requeue to get things moving.
532 time_to_queue_ = monotonic_start_time();
533 QueueMessages(time_to_queue_);
534 }
535
536 // If we are asked to queue, queue for at least max_out_of_order_duration past
537 // the last known time in the log file (ie the newest timestep read). As long
538 // as we requeue exactly when time_to_queue_ is dequeued and go no further, we
539 // are safe. And since we pop in order, that works.
540 //
541 // Special case the start of the log file. There should be at most 1 message
542 // from each channel at the start of the log file. So always force the start
543 // of the log file to just be read.
544 time_to_queue_ = std::max(time_to_queue_, newest_timestamp());
Austin Schuhee711052020-08-24 16:06:09 -0700545 VLOG(1) << MaybeNodeName(target_node_) << "Queueing, going until "
546 << time_to_queue_ << " " << filename();
Austin Schuhcde938c2020-02-02 17:30:07 -0800547
548 bool was_emplaced = false;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800549 while (true) {
Austin Schuhcde938c2020-02-02 17:30:07 -0800550 // Stop if we have enough.
Brian Silverman98360e22020-04-28 16:51:20 -0700551 if (newest_timestamp() > time_to_queue_ + max_out_of_order_duration() &&
Austin Schuhcde938c2020-02-02 17:30:07 -0800552 was_emplaced) {
Austin Schuhee711052020-08-24 16:06:09 -0700553 VLOG(1) << MaybeNodeName(target_node_) << "Done queueing on " << this
554 << ", queued to " << newest_timestamp() << " with requeue time "
555 << time_to_queue_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800556 return true;
557 }
Austin Schuh05b70472020-01-01 17:11:17 -0800558
Austin Schuh6f3babe2020-01-26 20:34:50 -0800559 if (std::optional<FlatbufferVector<MessageHeader>> msg =
560 message_reader_->ReadMessage()) {
561 const MessageHeader &header = msg.value().message();
562
Austin Schuhcde938c2020-02-02 17:30:07 -0800563 const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
564 chrono::nanoseconds(header.monotonic_sent_time()));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800565
Austin Schuh0b5fd032020-03-28 17:36:49 -0700566 if (VLOG_IS_ON(2)) {
Austin Schuhee711052020-08-24 16:06:09 -0700567 LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this
568 << " " << filename() << " ttq: " << time_to_queue_ << " now "
569 << newest_timestamp() << " start time "
570 << monotonic_start_time() << " " << FlatbufferToJson(&header);
Austin Schuh0b5fd032020-03-28 17:36:49 -0700571 } else if (VLOG_IS_ON(1)) {
572 FlatbufferVector<MessageHeader> copy = msg.value();
573 copy.mutable_message()->clear_data();
Austin Schuhee711052020-08-24 16:06:09 -0700574 LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this << " "
575 << filename() << " ttq: " << time_to_queue_ << " now "
576 << newest_timestamp() << " start time "
577 << monotonic_start_time() << " " << FlatbufferToJson(copy);
Austin Schuh0b5fd032020-03-28 17:36:49 -0700578 }
Austin Schuhcde938c2020-02-02 17:30:07 -0800579
580 const int channel_index = header.channel_index();
581 was_emplaced = channels_to_write_[channel_index]->emplace_back(
582 std::move(msg.value()));
583 if (was_emplaced) {
584 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
585 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800586 } else {
587 if (!NextLogFile()) {
Austin Schuhee711052020-08-24 16:06:09 -0700588 VLOG(1) << MaybeNodeName(target_node_) << "No more files, last was "
589 << filenames_.back();
Austin Schuhcde938c2020-02-02 17:30:07 -0800590 at_end_ = true;
Austin Schuh8bd96322020-02-13 21:18:22 -0800591 for (MessageHeaderQueue *queue : channels_to_write_) {
592 if (queue == nullptr || queue->timestamp_merger == nullptr) {
593 continue;
594 }
595 queue->timestamp_merger->NoticeAtEnd();
596 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800597 return false;
598 }
599 }
Austin Schuh05b70472020-01-01 17:11:17 -0800600 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800601}
602
603void SplitMessageReader::SetTimestampMerger(TimestampMerger *timestamp_merger,
604 int channel_index,
605 const Node *target_node) {
606 const Node *reinterpreted_target_node =
607 configuration::GetNodeOrDie(configuration(), target_node);
Austin Schuhee711052020-08-24 16:06:09 -0700608 target_node_ = reinterpreted_target_node;
609
Austin Schuh6f3babe2020-01-26 20:34:50 -0800610 const Channel *const channel =
611 configuration()->channels()->Get(channel_index);
612
Austin Schuhcde938c2020-02-02 17:30:07 -0800613 VLOG(1) << " Configuring merger " << this << " for channel " << channel_index
614 << " "
615 << configuration::CleanedChannelToString(
616 configuration()->channels()->Get(channel_index));
617
Austin Schuh6f3babe2020-01-26 20:34:50 -0800618 MessageHeaderQueue *message_header_queue = nullptr;
619
620 // Figure out if this log file is from our point of view, or the other node's
621 // point of view.
622 if (node() == reinterpreted_target_node) {
Austin Schuhcde938c2020-02-02 17:30:07 -0800623 VLOG(1) << " Replaying as logged node " << filename();
624
625 if (configuration::ChannelIsSendableOnNode(channel, node())) {
626 VLOG(1) << " Data on node";
627 message_header_queue = &(channels_[channel_index].data);
628 } else if (configuration::ChannelIsReadableOnNode(channel, node())) {
629 VLOG(1) << " Timestamps on node";
630 message_header_queue =
631 &(channels_[channel_index].timestamps[configuration::GetNodeIndex(
632 configuration(), node())]);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800633 } else {
Austin Schuhcde938c2020-02-02 17:30:07 -0800634 VLOG(1) << " Dropping";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800635 }
636 } else {
Austin Schuhcde938c2020-02-02 17:30:07 -0800637 VLOG(1) << " Replaying as other node " << filename();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800638 // We are replaying from another node's point of view. The only interesting
Austin Schuhcde938c2020-02-02 17:30:07 -0800639 // data is data that is sent from our node and received on theirs.
640 if (configuration::ChannelIsReadableOnNode(channel,
641 reinterpreted_target_node) &&
642 configuration::ChannelIsSendableOnNode(channel, node())) {
643 VLOG(1) << " Readable on target node";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800644 // Data from another node.
645 message_header_queue = &(channels_[channel_index].data);
646 } else {
Austin Schuhcde938c2020-02-02 17:30:07 -0800647 VLOG(1) << " Dropping";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800648 // This is either not sendable on the other node, or is a timestamp and
649 // therefore not interesting.
650 }
651 }
652
653 // If we found one, write it down. This will be nullptr when there is nothing
654 // relevant on this channel on this node for the target node. In that case,
655 // we want to drop the message instead of queueing it.
656 if (message_header_queue != nullptr) {
657 message_header_queue->timestamp_merger = timestamp_merger;
658 }
659}
660
661std::tuple<monotonic_clock::time_point, uint32_t,
662 FlatbufferVector<MessageHeader>>
663SplitMessageReader::PopOldest(int channel_index) {
664 CHECK_GT(channels_[channel_index].data.size(), 0u);
Austin Schuhcde938c2020-02-02 17:30:07 -0800665 const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
666 timestamp = channels_[channel_index].data.front_timestamp();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800667 FlatbufferVector<MessageHeader> front =
668 std::move(channels_[channel_index].data.front());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700669 channels_[channel_index].data.PopFront();
Austin Schuhcde938c2020-02-02 17:30:07 -0800670
Austin Schuh2f8fd752020-09-01 22:38:28 -0700671 VLOG(1) << MaybeNodeName(target_node_) << "Popped Data " << this << " "
672 << std::get<0>(timestamp) << " for "
673 << configuration::StrippedChannelToString(
674 configuration()->channels()->Get(channel_index))
675 << " (" << channel_index << ")";
Austin Schuhcde938c2020-02-02 17:30:07 -0800676
677 QueueMessages(std::get<0>(timestamp));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800678
679 return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
680 std::move(front));
681}
682
683std::tuple<monotonic_clock::time_point, uint32_t,
684 FlatbufferVector<MessageHeader>>
Austin Schuh2f8fd752020-09-01 22:38:28 -0700685SplitMessageReader::PopOldestTimestamp(int channel, int node_index) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800686 CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
Austin Schuhcde938c2020-02-02 17:30:07 -0800687 const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
688 timestamp = channels_[channel].timestamps[node_index].front_timestamp();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800689 FlatbufferVector<MessageHeader> front =
690 std::move(channels_[channel].timestamps[node_index].front());
Austin Schuh2f8fd752020-09-01 22:38:28 -0700691 channels_[channel].timestamps[node_index].PopFront();
Austin Schuhcde938c2020-02-02 17:30:07 -0800692
Austin Schuh2f8fd752020-09-01 22:38:28 -0700693 VLOG(1) << MaybeNodeName(target_node_) << "Popped timestamp " << this << " "
Austin Schuhee711052020-08-24 16:06:09 -0700694 << std::get<0>(timestamp) << " for "
695 << configuration::StrippedChannelToString(
696 configuration()->channels()->Get(channel))
Austin Schuh2f8fd752020-09-01 22:38:28 -0700697 << " on "
698 << configuration()->nodes()->Get(node_index)->name()->string_view()
699 << " (" << node_index << ")";
Austin Schuhcde938c2020-02-02 17:30:07 -0800700
701 QueueMessages(std::get<0>(timestamp));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800702
703 return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
704 std::move(front));
705}
706
Austin Schuhcde938c2020-02-02 17:30:07 -0800707bool SplitMessageReader::MessageHeaderQueue::emplace_back(
Austin Schuh6f3babe2020-01-26 20:34:50 -0800708 FlatbufferVector<MessageHeader> &&msg) {
709 CHECK(split_reader != nullptr);
710
711 // If there is no timestamp merger for this queue, nobody is listening. Drop
712 // the message. This happens when a log file from another node is replayed,
713 // and the timestamp mergers down stream just don't care.
714 if (timestamp_merger == nullptr) {
Austin Schuhcde938c2020-02-02 17:30:07 -0800715 return false;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800716 }
717
718 CHECK(timestamps != msg.message().has_data())
719 << ": Got timestamps and data mixed up on a node. "
720 << FlatbufferToJson(msg);
721
722 data_.emplace_back(std::move(msg));
723
724 if (data_.size() == 1u) {
725 // Yup, new data. Notify.
726 if (timestamps) {
727 timestamp_merger->UpdateTimestamp(split_reader, front_timestamp());
728 } else {
729 timestamp_merger->Update(split_reader, front_timestamp());
730 }
731 }
Austin Schuhcde938c2020-02-02 17:30:07 -0800732
733 return true;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800734}
735
Austin Schuh2f8fd752020-09-01 22:38:28 -0700736void SplitMessageReader::MessageHeaderQueue::PopFront() {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800737 data_.pop_front();
738 if (data_.size() != 0u) {
739 // Yup, new data.
740 if (timestamps) {
741 timestamp_merger->UpdateTimestamp(split_reader, front_timestamp());
742 } else {
743 timestamp_merger->Update(split_reader, front_timestamp());
744 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700745 } else {
746 // Poke anyways to update the heap.
747 if (timestamps) {
748 timestamp_merger->UpdateTimestamp(
749 nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
750 } else {
751 timestamp_merger->Update(
752 nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
753 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800754 }
Austin Schuh05b70472020-01-01 17:11:17 -0800755}
756
757namespace {
758
Austin Schuh6f3babe2020-01-26 20:34:50 -0800759bool SplitMessageReaderHeapCompare(
760 const std::tuple<monotonic_clock::time_point, uint32_t,
761 SplitMessageReader *>
762 first,
763 const std::tuple<monotonic_clock::time_point, uint32_t,
764 SplitMessageReader *>
765 second) {
766 if (std::get<0>(first) > std::get<0>(second)) {
767 return true;
768 } else if (std::get<0>(first) == std::get<0>(second)) {
769 if (std::get<1>(first) > std::get<1>(second)) {
770 return true;
771 } else if (std::get<1>(first) == std::get<1>(second)) {
772 return std::get<2>(first) > std::get<2>(second);
773 } else {
774 return false;
775 }
776 } else {
777 return false;
778 }
779}
780
Austin Schuh05b70472020-01-01 17:11:17 -0800781bool ChannelHeapCompare(
782 const std::pair<monotonic_clock::time_point, int> first,
783 const std::pair<monotonic_clock::time_point, int> second) {
784 if (first.first > second.first) {
785 return true;
786 } else if (first.first == second.first) {
787 return first.second > second.second;
788 } else {
789 return false;
790 }
791}
792
793} // namespace
794
Austin Schuh6f3babe2020-01-26 20:34:50 -0800795TimestampMerger::TimestampMerger(
796 const Configuration *configuration,
797 std::vector<SplitMessageReader *> split_message_readers, int channel_index,
798 const Node *target_node, ChannelMerger *channel_merger)
799 : configuration_(configuration),
800 split_message_readers_(std::move(split_message_readers)),
801 channel_index_(channel_index),
802 node_index_(configuration::MultiNode(configuration)
803 ? configuration::GetNodeIndex(configuration, target_node)
804 : -1),
805 channel_merger_(channel_merger) {
806 // Tell the readers we care so they know who to notify.
Austin Schuhcde938c2020-02-02 17:30:07 -0800807 VLOG(1) << "Configuring channel " << channel_index << " target node "
808 << FlatbufferToJson(target_node);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800809 for (SplitMessageReader *reader : split_message_readers_) {
810 reader->SetTimestampMerger(this, channel_index, target_node);
811 }
812
813 // And then determine if we need to track timestamps.
814 const Channel *channel = configuration->channels()->Get(channel_index);
815 if (!configuration::ChannelIsSendableOnNode(channel, target_node) &&
816 configuration::ChannelIsReadableOnNode(channel, target_node)) {
817 has_timestamps_ = true;
818 }
819}
820
821void TimestampMerger::PushMessageHeap(
Austin Schuhcde938c2020-02-02 17:30:07 -0800822 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
823 timestamp,
Austin Schuh6f3babe2020-01-26 20:34:50 -0800824 SplitMessageReader *split_message_reader) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700825 if (split_message_reader != nullptr) {
826 DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
827 [split_message_reader](
828 const std::tuple<monotonic_clock::time_point,
829 uint32_t, SplitMessageReader *>
830 x) {
831 return std::get<2>(x) == split_message_reader;
832 }) == message_heap_.end())
833 << ": Pushing message when it is already in the heap.";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800834
Austin Schuh2f8fd752020-09-01 22:38:28 -0700835 message_heap_.push_back(std::make_tuple(
836 std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800837
Austin Schuh2f8fd752020-09-01 22:38:28 -0700838 std::push_heap(message_heap_.begin(), message_heap_.end(),
839 &SplitMessageReaderHeapCompare);
840 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800841
842 // If we are just a data merger, don't wait for timestamps.
843 if (!has_timestamps_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700844 if (!message_heap_.empty()) {
845 channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
846 pushed_ = true;
847 } else {
848 // Remove ourselves if we are empty.
849 channel_merger_->Update(monotonic_clock::min_time, channel_index_);
850 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800851 }
852}
853
Austin Schuhcde938c2020-02-02 17:30:07 -0800854std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
855TimestampMerger::oldest_message() const {
856 CHECK_GT(message_heap_.size(), 0u);
857 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
858 oldest_message_reader = message_heap_.front();
859 return std::get<2>(oldest_message_reader)->oldest_message(channel_index_);
860}
861
862std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
863TimestampMerger::oldest_timestamp() const {
864 CHECK_GT(timestamp_heap_.size(), 0u);
865 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
866 oldest_message_reader = timestamp_heap_.front();
867 return std::get<2>(oldest_message_reader)
868 ->oldest_message(channel_index_, node_index_);
869}
870
Austin Schuh6f3babe2020-01-26 20:34:50 -0800871void TimestampMerger::PushTimestampHeap(
Austin Schuhcde938c2020-02-02 17:30:07 -0800872 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
873 timestamp,
Austin Schuh6f3babe2020-01-26 20:34:50 -0800874 SplitMessageReader *split_message_reader) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700875 if (split_message_reader != nullptr) {
876 DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
877 [split_message_reader](
878 const std::tuple<monotonic_clock::time_point,
879 uint32_t, SplitMessageReader *>
880 x) {
881 return std::get<2>(x) == split_message_reader;
882 }) == timestamp_heap_.end())
883 << ": Pushing timestamp when it is already in the heap.";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800884
Austin Schuh2f8fd752020-09-01 22:38:28 -0700885 timestamp_heap_.push_back(std::make_tuple(
886 std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800887
Austin Schuh2f8fd752020-09-01 22:38:28 -0700888 std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
889 SplitMessageReaderHeapCompare);
890 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800891
892 // If we are a timestamp merger, don't wait for data. Missing data will be
893 // caught at read time.
894 if (has_timestamps_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700895 if (!timestamp_heap_.empty()) {
896 channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
897 pushed_ = true;
898 } else {
899 // Remove ourselves if we are empty.
900 channel_merger_->Update(monotonic_clock::min_time, channel_index_);
901 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800902 }
903}
904
905std::tuple<monotonic_clock::time_point, uint32_t,
906 FlatbufferVector<MessageHeader>>
907TimestampMerger::PopMessageHeap() {
908 // Pop the oldest message reader pointer off the heap.
909 CHECK_GT(message_heap_.size(), 0u);
910 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
911 oldest_message_reader = message_heap_.front();
912
913 std::pop_heap(message_heap_.begin(), message_heap_.end(),
914 &SplitMessageReaderHeapCompare);
915 message_heap_.pop_back();
916
917 // Pop the oldest message. This re-pushes any messages from the reader to the
918 // message heap.
919 std::tuple<monotonic_clock::time_point, uint32_t,
920 FlatbufferVector<MessageHeader>>
921 oldest_message =
922 std::get<2>(oldest_message_reader)->PopOldest(channel_index_);
923
924 // Confirm that the time and queue_index we have recorded matches.
925 CHECK_EQ(std::get<0>(oldest_message), std::get<0>(oldest_message_reader));
926 CHECK_EQ(std::get<1>(oldest_message), std::get<1>(oldest_message_reader));
927
928 // Now, keep reading until we have found all duplicates.
Brian Silverman8a32ce62020-08-12 12:02:38 -0700929 while (!message_heap_.empty()) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800930 // See if it is a duplicate.
931 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
932 next_oldest_message_reader = message_heap_.front();
933
Austin Schuhcde938c2020-02-02 17:30:07 -0800934 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
935 next_oldest_message_time = std::get<2>(next_oldest_message_reader)
936 ->oldest_message(channel_index_);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800937
938 if (std::get<0>(next_oldest_message_time) == std::get<0>(oldest_message) &&
939 std::get<1>(next_oldest_message_time) == std::get<1>(oldest_message)) {
940 // Pop the message reader pointer.
941 std::pop_heap(message_heap_.begin(), message_heap_.end(),
942 &SplitMessageReaderHeapCompare);
943 message_heap_.pop_back();
944
945 // Pop the next oldest message. This re-pushes any messages from the
946 // reader.
947 std::tuple<monotonic_clock::time_point, uint32_t,
948 FlatbufferVector<MessageHeader>>
949 next_oldest_message = std::get<2>(next_oldest_message_reader)
950 ->PopOldest(channel_index_);
951
952 // And make sure the message matches in it's entirety.
953 CHECK(std::get<2>(oldest_message).span() ==
954 std::get<2>(next_oldest_message).span())
955 << ": Data at the same timestamp doesn't match.";
956 } else {
957 break;
958 }
959 }
960
961 return oldest_message;
962}
963
964std::tuple<monotonic_clock::time_point, uint32_t,
965 FlatbufferVector<MessageHeader>>
966TimestampMerger::PopTimestampHeap() {
967 // Pop the oldest message reader pointer off the heap.
968 CHECK_GT(timestamp_heap_.size(), 0u);
969
970 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
971 oldest_timestamp_reader = timestamp_heap_.front();
972
973 std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
974 &SplitMessageReaderHeapCompare);
975 timestamp_heap_.pop_back();
976
977 CHECK(node_index_ != -1) << ": Timestamps in a single node environment";
978
979 // Pop the oldest message. This re-pushes any timestamps from the reader to
980 // the timestamp heap.
981 std::tuple<monotonic_clock::time_point, uint32_t,
982 FlatbufferVector<MessageHeader>>
983 oldest_timestamp = std::get<2>(oldest_timestamp_reader)
Austin Schuh2f8fd752020-09-01 22:38:28 -0700984 ->PopOldestTimestamp(channel_index_, node_index_);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800985
986 // Confirm that the time we have recorded matches.
987 CHECK_EQ(std::get<0>(oldest_timestamp), std::get<0>(oldest_timestamp_reader));
988 CHECK_EQ(std::get<1>(oldest_timestamp), std::get<1>(oldest_timestamp_reader));
989
Austin Schuh2f8fd752020-09-01 22:38:28 -0700990 // Now, keep reading until we have found all duplicates.
991 while (!timestamp_heap_.empty()) {
992 // See if it is a duplicate.
993 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
994 next_oldest_timestamp_reader = timestamp_heap_.front();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800995
Austin Schuh2f8fd752020-09-01 22:38:28 -0700996 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
997 next_oldest_timestamp_time =
998 std::get<2>(next_oldest_timestamp_reader)
999 ->oldest_message(channel_index_, node_index_);
Austin Schuh6f3babe2020-01-26 20:34:50 -08001000
Austin Schuh2f8fd752020-09-01 22:38:28 -07001001 if (std::get<0>(next_oldest_timestamp_time) ==
1002 std::get<0>(oldest_timestamp) &&
1003 std::get<1>(next_oldest_timestamp_time) ==
1004 std::get<1>(oldest_timestamp)) {
1005 // Pop the timestamp reader pointer.
1006 std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
1007 &SplitMessageReaderHeapCompare);
1008 timestamp_heap_.pop_back();
1009
1010 // Pop the next oldest timestamp. This re-pushes any messages from the
1011 // reader.
1012 std::tuple<monotonic_clock::time_point, uint32_t,
1013 FlatbufferVector<MessageHeader>>
1014 next_oldest_timestamp =
1015 std::get<2>(next_oldest_timestamp_reader)
1016 ->PopOldestTimestamp(channel_index_, node_index_);
1017
1018 // And make sure the contents matches in it's entirety.
1019 CHECK(std::get<2>(oldest_timestamp).span() ==
1020 std::get<2>(next_oldest_timestamp).span())
1021 << ": Data at the same timestamp doesn't match, "
1022 << aos::FlatbufferToJson(std::get<2>(oldest_timestamp)) << " vs "
1023 << aos::FlatbufferToJson(std::get<2>(next_oldest_timestamp)) << " "
1024 << absl::BytesToHexString(std::string_view(
1025 reinterpret_cast<const char *>(
1026 std::get<2>(oldest_timestamp).span().data()),
1027 std::get<2>(oldest_timestamp).span().size()))
1028 << " vs "
1029 << absl::BytesToHexString(std::string_view(
1030 reinterpret_cast<const char *>(
1031 std::get<2>(next_oldest_timestamp).span().data()),
1032 std::get<2>(next_oldest_timestamp).span().size()));
1033
1034 } else {
1035 break;
1036 }
Austin Schuh8bd96322020-02-13 21:18:22 -08001037 }
1038
Austin Schuh2f8fd752020-09-01 22:38:28 -07001039 return oldest_timestamp;
Austin Schuh8bd96322020-02-13 21:18:22 -08001040}
1041
Austin Schuh6f3babe2020-01-26 20:34:50 -08001042std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
1043TimestampMerger::PopOldest() {
1044 if (has_timestamps_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001045 VLOG(1) << "Looking for matching timestamp for "
1046 << configuration::StrippedChannelToString(
1047 configuration_->channels()->Get(channel_index_))
1048 << " (" << channel_index_ << ") "
1049 << " at " << std::get<0>(oldest_timestamp());
1050
Austin Schuh8bd96322020-02-13 21:18:22 -08001051 // Read the timestamps.
Austin Schuh6f3babe2020-01-26 20:34:50 -08001052 std::tuple<monotonic_clock::time_point, uint32_t,
1053 FlatbufferVector<MessageHeader>>
1054 oldest_timestamp = PopTimestampHeap();
1055
1056 TimestampMerger::DeliveryTimestamp timestamp;
1057 timestamp.monotonic_event_time =
1058 monotonic_clock::time_point(chrono::nanoseconds(
1059 std::get<2>(oldest_timestamp).message().monotonic_sent_time()));
1060 timestamp.realtime_event_time =
1061 realtime_clock::time_point(chrono::nanoseconds(
1062 std::get<2>(oldest_timestamp).message().realtime_sent_time()));
1063
1064 // Consistency check.
1065 CHECK_EQ(timestamp.monotonic_event_time, std::get<0>(oldest_timestamp));
1066 CHECK_EQ(std::get<2>(oldest_timestamp).message().queue_index(),
1067 std::get<1>(oldest_timestamp));
1068
1069 monotonic_clock::time_point remote_timestamp_monotonic_time(
1070 chrono::nanoseconds(
1071 std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
1072
Austin Schuh8bd96322020-02-13 21:18:22 -08001073 // See if we have any data. If not, pass the problem up the chain.
Brian Silverman8a32ce62020-08-12 12:02:38 -07001074 if (message_heap_.empty()) {
Austin Schuhee711052020-08-24 16:06:09 -07001075 LOG(WARNING) << MaybeNodeName(configuration_->nodes()->Get(node_index_))
1076 << "No data to match timestamp on "
1077 << configuration::CleanedChannelToString(
1078 configuration_->channels()->Get(channel_index_))
1079 << " (" << channel_index_ << ")";
Austin Schuh8bd96322020-02-13 21:18:22 -08001080 return std::make_tuple(timestamp,
1081 std::move(std::get<2>(oldest_timestamp)));
1082 }
1083
Austin Schuh6f3babe2020-01-26 20:34:50 -08001084 while (true) {
Austin Schuhcde938c2020-02-02 17:30:07 -08001085 {
1086 // Ok, now try grabbing data until we find one which matches.
1087 std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
1088 oldest_message_ref = oldest_message();
1089
1090 // Time at which the message was sent (this message is written from the
1091 // sending node's perspective.
1092 monotonic_clock::time_point remote_monotonic_time(chrono::nanoseconds(
1093 std::get<2>(oldest_message_ref)->monotonic_sent_time()));
1094
1095 if (remote_monotonic_time < remote_timestamp_monotonic_time) {
Austin Schuhee711052020-08-24 16:06:09 -07001096 LOG(WARNING) << configuration_->nodes()
1097 ->Get(node_index_)
1098 ->name()
1099 ->string_view()
1100 << " Undelivered message, skipping. Remote time is "
1101 << remote_monotonic_time << " timestamp is "
1102 << remote_timestamp_monotonic_time << " on channel "
1103 << configuration::StrippedChannelToString(
1104 configuration_->channels()->Get(channel_index_))
1105 << " (" << channel_index_ << ")";
Austin Schuhcde938c2020-02-02 17:30:07 -08001106 PopMessageHeap();
1107 continue;
1108 } else if (remote_monotonic_time > remote_timestamp_monotonic_time) {
Austin Schuhee711052020-08-24 16:06:09 -07001109 LOG(WARNING) << configuration_->nodes()
1110 ->Get(node_index_)
1111 ->name()
1112 ->string_view()
1113 << " Data not found. Remote time should be "
1114 << remote_timestamp_monotonic_time
1115 << ", message time is " << remote_monotonic_time
1116 << " on channel "
1117 << configuration::StrippedChannelToString(
1118 configuration_->channels()->Get(channel_index_))
Austin Schuh2f8fd752020-09-01 22:38:28 -07001119 << " (" << channel_index_ << ")"
1120 << (VLOG_IS_ON(1) ? DebugString() : "");
Austin Schuhcde938c2020-02-02 17:30:07 -08001121 return std::make_tuple(timestamp,
1122 std::move(std::get<2>(oldest_timestamp)));
1123 }
1124
1125 timestamp.monotonic_remote_time = remote_monotonic_time;
1126 }
1127
Austin Schuh2f8fd752020-09-01 22:38:28 -07001128 VLOG(1) << "Found matching data "
1129 << configuration::StrippedChannelToString(
1130 configuration_->channels()->Get(channel_index_))
1131 << " (" << channel_index_ << ")";
Austin Schuh6f3babe2020-01-26 20:34:50 -08001132 std::tuple<monotonic_clock::time_point, uint32_t,
1133 FlatbufferVector<MessageHeader>>
1134 oldest_message = PopMessageHeap();
1135
Austin Schuh6f3babe2020-01-26 20:34:50 -08001136 timestamp.realtime_remote_time =
1137 realtime_clock::time_point(chrono::nanoseconds(
1138 std::get<2>(oldest_message).message().realtime_sent_time()));
1139 timestamp.remote_queue_index =
1140 std::get<2>(oldest_message).message().queue_index();
1141
Austin Schuhcde938c2020-02-02 17:30:07 -08001142 CHECK_EQ(timestamp.monotonic_remote_time,
1143 remote_timestamp_monotonic_time);
1144
1145 CHECK_EQ(timestamp.remote_queue_index,
1146 std::get<2>(oldest_timestamp).message().remote_queue_index())
1147 << ": " << FlatbufferToJson(&std::get<2>(oldest_timestamp).message())
1148 << " data "
1149 << FlatbufferToJson(&std::get<2>(oldest_message).message());
Austin Schuh6f3babe2020-01-26 20:34:50 -08001150
Austin Schuh30dd5c52020-08-01 14:43:44 -07001151 return std::make_tuple(timestamp, std::move(std::get<2>(oldest_message)));
Austin Schuh6f3babe2020-01-26 20:34:50 -08001152 }
1153 } else {
1154 std::tuple<monotonic_clock::time_point, uint32_t,
1155 FlatbufferVector<MessageHeader>>
1156 oldest_message = PopMessageHeap();
1157
1158 TimestampMerger::DeliveryTimestamp timestamp;
1159 timestamp.monotonic_event_time =
1160 monotonic_clock::time_point(chrono::nanoseconds(
1161 std::get<2>(oldest_message).message().monotonic_sent_time()));
1162 timestamp.realtime_event_time =
1163 realtime_clock::time_point(chrono::nanoseconds(
1164 std::get<2>(oldest_message).message().realtime_sent_time()));
1165 timestamp.remote_queue_index = 0xffffffff;
1166
1167 CHECK_EQ(std::get<0>(oldest_message), timestamp.monotonic_event_time);
1168 CHECK_EQ(std::get<1>(oldest_message),
1169 std::get<2>(oldest_message).message().queue_index());
1170
Austin Schuh30dd5c52020-08-01 14:43:44 -07001171 return std::make_tuple(timestamp, std::move(std::get<2>(oldest_message)));
Austin Schuh6f3babe2020-01-26 20:34:50 -08001172 }
1173}
1174
Austin Schuh8bd96322020-02-13 21:18:22 -08001175void TimestampMerger::NoticeAtEnd() { channel_merger_->NoticeAtEnd(); }
1176
Austin Schuh6f3babe2020-01-26 20:34:50 -08001177namespace {
1178std::vector<std::unique_ptr<SplitMessageReader>> MakeSplitMessageReaders(
1179 const std::vector<std::vector<std::string>> &filenames) {
1180 CHECK_GT(filenames.size(), 0u);
1181 // Build up all the SplitMessageReaders.
1182 std::vector<std::unique_ptr<SplitMessageReader>> result;
1183 for (const std::vector<std::string> &filenames : filenames) {
1184 result.emplace_back(std::make_unique<SplitMessageReader>(filenames));
1185 }
1186 return result;
1187}
1188} // namespace
1189
1190ChannelMerger::ChannelMerger(
1191 const std::vector<std::vector<std::string>> &filenames)
1192 : split_message_readers_(MakeSplitMessageReaders(filenames)),
Austin Schuh97789fc2020-08-01 14:42:45 -07001193 log_file_header_(split_message_readers_[0]->raw_log_file_header()) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001194 // Now, confirm that the configuration matches for each and pick a start time.
1195 // Also return the list of possible nodes.
1196 for (const std::unique_ptr<SplitMessageReader> &reader :
1197 split_message_readers_) {
1198 CHECK(CompareFlatBuffer(log_file_header_.message().configuration(),
1199 reader->log_file_header()->configuration()))
1200 << ": Replaying log files with different configurations isn't "
1201 "supported";
1202 }
1203
1204 nodes_ = configuration::GetNodes(configuration());
1205}
1206
1207bool ChannelMerger::SetNode(const Node *target_node) {
1208 std::vector<SplitMessageReader *> split_message_readers;
1209 for (const std::unique_ptr<SplitMessageReader> &reader :
1210 split_message_readers_) {
1211 split_message_readers.emplace_back(reader.get());
1212 }
1213
1214 // Go find a log_file_header for this node.
1215 {
1216 bool found_node = false;
1217
1218 for (const std::unique_ptr<SplitMessageReader> &reader :
1219 split_message_readers_) {
James Kuszmaulfc273dc2020-05-09 17:56:19 -07001220 // In order to identify which logfile(s) map to the target node, do a
1221 // logical comparison of the nodes, by confirming that we are either in a
1222 // single-node setup (where the nodes will both be nullptr) or that the
1223 // node names match (but the other node fields--e.g., hostname lists--may
1224 // not).
1225 const bool both_null =
1226 reader->node() == nullptr && target_node == nullptr;
1227 const bool both_have_name =
1228 (reader->node() != nullptr) && (target_node != nullptr) &&
1229 (reader->node()->has_name() && target_node->has_name());
1230 const bool node_names_identical =
1231 both_have_name &&
1232 (reader->node()->name()->string_view() ==
1233 target_node->name()->string_view());
1234 if (both_null || node_names_identical) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001235 if (!found_node) {
1236 found_node = true;
1237 log_file_header_ = CopyFlatBuffer(reader->log_file_header());
Austin Schuhcde938c2020-02-02 17:30:07 -08001238 VLOG(1) << "Found log file " << reader->filename() << " with node "
1239 << FlatbufferToJson(reader->node()) << " start_time "
1240 << monotonic_start_time();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001241 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001242 // Find the earliest start time. That way, if we get a full log file
1243 // directly from the node, and a partial later, we start with the
1244 // full. Update our header to match that.
1245 const monotonic_clock::time_point new_monotonic_start_time(
1246 chrono::nanoseconds(
1247 reader->log_file_header()->monotonic_start_time()));
1248 const realtime_clock::time_point new_realtime_start_time(
1249 chrono::nanoseconds(
1250 reader->log_file_header()->realtime_start_time()));
1251
1252 if (monotonic_start_time() == monotonic_clock::min_time ||
1253 (new_monotonic_start_time != monotonic_clock::min_time &&
1254 new_monotonic_start_time < monotonic_start_time())) {
1255 log_file_header_.mutable_message()->mutate_monotonic_start_time(
1256 new_monotonic_start_time.time_since_epoch().count());
1257 log_file_header_.mutable_message()->mutate_realtime_start_time(
1258 new_realtime_start_time.time_since_epoch().count());
1259 VLOG(1) << "Updated log file " << reader->filename()
1260 << " with node " << FlatbufferToJson(reader->node())
1261 << " start_time " << new_monotonic_start_time;
1262 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001263 }
1264 }
1265 }
1266
1267 if (!found_node) {
1268 LOG(WARNING) << "Failed to find log file for node "
1269 << FlatbufferToJson(target_node);
1270 return false;
1271 }
1272 }
1273
1274 // Build up all the timestamp mergers. This connects up all the
1275 // SplitMessageReaders.
1276 timestamp_mergers_.reserve(configuration()->channels()->size());
1277 for (size_t channel_index = 0;
1278 channel_index < configuration()->channels()->size(); ++channel_index) {
1279 timestamp_mergers_.emplace_back(
1280 configuration(), split_message_readers, channel_index,
1281 configuration::GetNode(configuration(), target_node), this);
1282 }
1283
1284 // And prime everything.
Austin Schuh6f3babe2020-01-26 20:34:50 -08001285 for (std::unique_ptr<SplitMessageReader> &split_message_reader :
1286 split_message_readers_) {
Austin Schuhcde938c2020-02-02 17:30:07 -08001287 split_message_reader->QueueMessages(
1288 split_message_reader->monotonic_start_time());
Austin Schuh6f3babe2020-01-26 20:34:50 -08001289 }
1290
1291 node_ = configuration::GetNodeOrDie(configuration(), target_node);
1292 return true;
1293}
1294
Austin Schuh858c9f32020-08-31 16:56:12 -07001295monotonic_clock::time_point ChannelMerger::OldestMessageTime() const {
Brian Silverman8a32ce62020-08-12 12:02:38 -07001296 if (channel_heap_.empty()) {
Austin Schuh6f3babe2020-01-26 20:34:50 -08001297 return monotonic_clock::max_time;
1298 }
1299 return channel_heap_.front().first;
1300}
1301
1302void ChannelMerger::PushChannelHeap(monotonic_clock::time_point timestamp,
1303 int channel_index) {
1304 // Pop and recreate the heap if it has already been pushed. And since we are
1305 // pushing again, we don't need to clear pushed.
1306 if (timestamp_mergers_[channel_index].pushed()) {
Brian Silverman8a32ce62020-08-12 12:02:38 -07001307 const auto channel_iterator = std::find_if(
Austin Schuh6f3babe2020-01-26 20:34:50 -08001308 channel_heap_.begin(), channel_heap_.end(),
1309 [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
1310 return x.second == channel_index;
Brian Silverman8a32ce62020-08-12 12:02:38 -07001311 });
1312 DCHECK(channel_iterator != channel_heap_.end());
1313 if (std::get<0>(*channel_iterator) == timestamp) {
1314 // It's already in the heap, in the correct spot, so nothing
1315 // more for us to do here.
1316 return;
1317 }
1318 channel_heap_.erase(channel_iterator);
Austin Schuh6f3babe2020-01-26 20:34:50 -08001319 std::make_heap(channel_heap_.begin(), channel_heap_.end(),
1320 ChannelHeapCompare);
Austin Schuh8bd96322020-02-13 21:18:22 -08001321
1322 if (timestamp_mergers_[channel_index].has_timestamps()) {
Brian Silverman8a32ce62020-08-12 12:02:38 -07001323 const auto timestamp_iterator = std::find_if(
Austin Schuh8bd96322020-02-13 21:18:22 -08001324 timestamp_heap_.begin(), timestamp_heap_.end(),
1325 [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
1326 return x.second == channel_index;
Brian Silverman8a32ce62020-08-12 12:02:38 -07001327 });
1328 DCHECK(timestamp_iterator != timestamp_heap_.end());
1329 if (std::get<0>(*timestamp_iterator) == timestamp) {
1330 // It's already in the heap, in the correct spot, so nothing
1331 // more for us to do here.
1332 return;
1333 }
1334 timestamp_heap_.erase(timestamp_iterator);
Austin Schuh8bd96322020-02-13 21:18:22 -08001335 std::make_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
1336 ChannelHeapCompare);
1337 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001338 }
1339
Austin Schuh2f8fd752020-09-01 22:38:28 -07001340 if (timestamp == monotonic_clock::min_time) {
1341 timestamp_mergers_[channel_index].set_pushed(false);
1342 return;
1343 }
1344
Austin Schuh05b70472020-01-01 17:11:17 -08001345 channel_heap_.push_back(std::make_pair(timestamp, channel_index));
1346
1347 // The default sort puts the newest message first. Use a custom comparator to
1348 // put the oldest message first.
1349 std::push_heap(channel_heap_.begin(), channel_heap_.end(),
1350 ChannelHeapCompare);
Austin Schuh8bd96322020-02-13 21:18:22 -08001351
1352 if (timestamp_mergers_[channel_index].has_timestamps()) {
1353 timestamp_heap_.push_back(std::make_pair(timestamp, channel_index));
1354 std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
1355 ChannelHeapCompare);
1356 }
Austin Schuh05b70472020-01-01 17:11:17 -08001357}
1358
Austin Schuh2f8fd752020-09-01 22:38:28 -07001359void ChannelMerger::VerifyHeaps() {
1360 {
1361 std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
1362 channel_heap_;
1363 std::make_heap(channel_heap.begin(), channel_heap.end(),
1364 &ChannelHeapCompare);
1365
1366 for (size_t i = 0; i < channel_heap_.size(); ++i) {
1367 CHECK(channel_heap_[i] == channel_heap[i]) << ": Heaps diverged...";
1368 CHECK_EQ(std::get<0>(channel_heap[i]),
1369 timestamp_mergers_[std::get<1>(channel_heap[i])]
1370 .channel_merger_time());
1371 }
1372 }
1373 {
1374 std::vector<std::pair<monotonic_clock::time_point, int>> timestamp_heap =
1375 timestamp_heap_;
1376 std::make_heap(timestamp_heap.begin(), timestamp_heap.end(),
1377 &ChannelHeapCompare);
1378
1379 for (size_t i = 0; i < timestamp_heap_.size(); ++i) {
1380 CHECK(timestamp_heap_[i] == timestamp_heap[i]) << ": Heaps diverged...";
1381 }
1382 }
1383}
1384
Austin Schuh6f3babe2020-01-26 20:34:50 -08001385std::tuple<TimestampMerger::DeliveryTimestamp, int,
1386 FlatbufferVector<MessageHeader>>
1387ChannelMerger::PopOldest() {
Austin Schuh8bd96322020-02-13 21:18:22 -08001388 CHECK_GT(channel_heap_.size(), 0u);
Austin Schuh05b70472020-01-01 17:11:17 -08001389 std::pair<monotonic_clock::time_point, int> oldest_channel_data =
1390 channel_heap_.front();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001391 int channel_index = oldest_channel_data.second;
Austin Schuh05b70472020-01-01 17:11:17 -08001392 std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
1393 &ChannelHeapCompare);
1394 channel_heap_.pop_back();
Austin Schuh8bd96322020-02-13 21:18:22 -08001395
Austin Schuh6f3babe2020-01-26 20:34:50 -08001396 timestamp_mergers_[channel_index].set_pushed(false);
Austin Schuh05b70472020-01-01 17:11:17 -08001397
Austin Schuh6f3babe2020-01-26 20:34:50 -08001398 TimestampMerger *merger = &timestamp_mergers_[channel_index];
Austin Schuh05b70472020-01-01 17:11:17 -08001399
Austin Schuh8bd96322020-02-13 21:18:22 -08001400 if (merger->has_timestamps()) {
1401 CHECK_GT(timestamp_heap_.size(), 0u);
1402 std::pair<monotonic_clock::time_point, int> oldest_timestamp_data =
1403 timestamp_heap_.front();
1404 CHECK(oldest_timestamp_data == oldest_channel_data)
1405 << ": Timestamp heap out of sync.";
1406 std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
1407 &ChannelHeapCompare);
1408 timestamp_heap_.pop_back();
1409 }
1410
Austin Schuhcde938c2020-02-02 17:30:07 -08001411 // Merger handles any queueing needed from here.
Austin Schuh6f3babe2020-01-26 20:34:50 -08001412 std::tuple<TimestampMerger::DeliveryTimestamp,
1413 FlatbufferVector<MessageHeader>>
1414 message = merger->PopOldest();
Brian Silverman8a32ce62020-08-12 12:02:38 -07001415 DCHECK_EQ(std::get<0>(message).monotonic_event_time,
1416 oldest_channel_data.first)
1417 << ": channel_heap_ was corrupted for " << channel_index << ": "
1418 << DebugString();
Austin Schuh05b70472020-01-01 17:11:17 -08001419
Austin Schuh2f8fd752020-09-01 22:38:28 -07001420 CHECK_GE(std::get<0>(message).monotonic_event_time, last_popped_time_)
1421 << ": " << MaybeNodeName(log_file_header()->node())
1422 << "Messages came off the queue out of order. " << DebugString();
1423 last_popped_time_ = std::get<0>(message).monotonic_event_time;
1424
1425 VLOG(1) << "Popped " << last_popped_time_ << " "
1426 << configuration::StrippedChannelToString(
1427 configuration()->channels()->Get(channel_index))
1428 << " (" << channel_index << ")";
1429
Austin Schuh6f3babe2020-01-26 20:34:50 -08001430 return std::make_tuple(std::get<0>(message), channel_index,
1431 std::move(std::get<1>(message)));
1432}
1433
Austin Schuhcde938c2020-02-02 17:30:07 -08001434std::string SplitMessageReader::MessageHeaderQueue::DebugString() const {
1435 std::stringstream ss;
1436 for (size_t i = 0; i < data_.size(); ++i) {
Austin Schuh2f8fd752020-09-01 22:38:28 -07001437 if (i < 5 || i + 5 > data_.size()) {
1438 if (timestamps) {
1439 ss << " msg: ";
1440 } else {
1441 ss << " timestamp: ";
1442 }
1443 ss << monotonic_clock::time_point(
1444 chrono::nanoseconds(data_[i].message().monotonic_sent_time()))
Austin Schuhcde938c2020-02-02 17:30:07 -08001445 << " ("
Austin Schuh2f8fd752020-09-01 22:38:28 -07001446 << realtime_clock::time_point(
1447 chrono::nanoseconds(data_[i].message().realtime_sent_time()))
1448 << ") " << data_[i].message().queue_index();
1449 if (timestamps) {
1450 ss << " <- remote "
1451 << monotonic_clock::time_point(chrono::nanoseconds(
1452 data_[i].message().monotonic_remote_time()))
1453 << " ("
1454 << realtime_clock::time_point(chrono::nanoseconds(
1455 data_[i].message().realtime_remote_time()))
1456 << ")";
1457 }
1458 ss << "\n";
1459 } else if (i == 5) {
1460 ss << " ...\n";
Austin Schuh6f3babe2020-01-26 20:34:50 -08001461 }
Austin Schuhcde938c2020-02-02 17:30:07 -08001462 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001463
Austin Schuhcde938c2020-02-02 17:30:07 -08001464 return ss.str();
1465}
Austin Schuh6f3babe2020-01-26 20:34:50 -08001466
Austin Schuhcde938c2020-02-02 17:30:07 -08001467std::string SplitMessageReader::DebugString(int channel) const {
1468 std::stringstream ss;
1469 ss << "[\n";
1470 ss << channels_[channel].data.DebugString();
1471 ss << " ]";
1472 return ss.str();
1473}
Austin Schuh6f3babe2020-01-26 20:34:50 -08001474
Austin Schuhcde938c2020-02-02 17:30:07 -08001475std::string SplitMessageReader::DebugString(int channel, int node_index) const {
1476 std::stringstream ss;
1477 ss << "[\n";
1478 ss << channels_[channel].timestamps[node_index].DebugString();
1479 ss << " ]";
1480 return ss.str();
1481}
1482
1483std::string TimestampMerger::DebugString() const {
1484 std::stringstream ss;
1485
1486 if (timestamp_heap_.size() > 0) {
1487 ss << " timestamp_heap {\n";
1488 std::vector<
1489 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
1490 timestamp_heap = timestamp_heap_;
1491 while (timestamp_heap.size() > 0u) {
1492 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
1493 oldest_timestamp_reader = timestamp_heap.front();
1494
1495 ss << " " << std::get<2>(oldest_timestamp_reader) << " "
1496 << std::get<0>(oldest_timestamp_reader) << " queue_index ("
1497 << std::get<1>(oldest_timestamp_reader) << ") ttq "
1498 << std::get<2>(oldest_timestamp_reader)->time_to_queue() << " "
1499 << std::get<2>(oldest_timestamp_reader)->filename() << " -> "
1500 << std::get<2>(oldest_timestamp_reader)
1501 ->DebugString(channel_index_, node_index_)
1502 << "\n";
1503
1504 std::pop_heap(timestamp_heap.begin(), timestamp_heap.end(),
1505 &SplitMessageReaderHeapCompare);
1506 timestamp_heap.pop_back();
1507 }
1508 ss << " }\n";
1509 }
1510
1511 ss << " message_heap {\n";
1512 {
1513 std::vector<
1514 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
1515 message_heap = message_heap_;
Brian Silverman8a32ce62020-08-12 12:02:38 -07001516 while (!message_heap.empty()) {
Austin Schuhcde938c2020-02-02 17:30:07 -08001517 std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
1518 oldest_message_reader = message_heap.front();
1519
1520 ss << " " << std::get<2>(oldest_message_reader) << " "
1521 << std::get<0>(oldest_message_reader) << " queue_index ("
1522 << std::get<1>(oldest_message_reader) << ") ttq "
1523 << std::get<2>(oldest_message_reader)->time_to_queue() << " "
1524 << std::get<2>(oldest_message_reader)->filename() << " -> "
1525 << std::get<2>(oldest_message_reader)->DebugString(channel_index_)
1526 << "\n";
1527
1528 std::pop_heap(message_heap.begin(), message_heap.end(),
1529 &SplitMessageReaderHeapCompare);
1530 message_heap.pop_back();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001531 }
Austin Schuh05b70472020-01-01 17:11:17 -08001532 }
Austin Schuhcde938c2020-02-02 17:30:07 -08001533 ss << " }";
1534
1535 return ss.str();
1536}
1537
1538std::string ChannelMerger::DebugString() const {
1539 std::stringstream ss;
1540 ss << "start_time " << realtime_start_time() << " " << monotonic_start_time()
1541 << "\n";
1542 ss << "channel_heap {\n";
1543 std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
1544 channel_heap_;
Brian Silverman8a32ce62020-08-12 12:02:38 -07001545 while (!channel_heap.empty()) {
Austin Schuhcde938c2020-02-02 17:30:07 -08001546 std::tuple<monotonic_clock::time_point, int> channel = channel_heap.front();
1547 ss << " " << std::get<0>(channel) << " (" << std::get<1>(channel) << ") "
1548 << configuration::CleanedChannelToString(
1549 configuration()->channels()->Get(std::get<1>(channel)))
1550 << "\n";
1551
1552 ss << timestamp_mergers_[std::get<1>(channel)].DebugString() << "\n";
1553
1554 std::pop_heap(channel_heap.begin(), channel_heap.end(),
1555 &ChannelHeapCompare);
1556 channel_heap.pop_back();
1557 }
1558 ss << "}";
1559
1560 return ss.str();
Austin Schuh05b70472020-01-01 17:11:17 -08001561}
1562
Austin Schuhee711052020-08-24 16:06:09 -07001563std::string MaybeNodeName(const Node *node) {
1564 if (node != nullptr) {
1565 return node->name()->str() + " ";
1566 }
1567 return "";
1568}
1569
Austin Schuha36c8902019-12-30 18:07:15 -08001570} // namespace logger
1571} // namespace aos