blob: f32b3372382fe0476eddfb47e9f2f3949d92f799 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Austin Schuha36c8902019-12-30 18:07:15 -080010
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
Austin Schuhfa895892020-01-07 20:07:41 -080013#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080014#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080015#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080016#include "gflags/gflags.h"
17#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080018
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070019#if defined(__x86_64__)
20#define ENABLE_LZMA 1
21#elif defined(__aarch64__)
22#define ENABLE_LZMA 1
23#else
24#define ENABLE_LZMA 0
25#endif
26
27#if ENABLE_LZMA
28#include "aos/events/logging/lzma_encoder.h"
29#endif
30
Austin Schuh7fbf5a72020-09-21 16:28:13 -070031DEFINE_int32(flush_size, 128000,
Austin Schuha36c8902019-12-30 18:07:15 -080032 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070033DEFINE_double(
34 flush_period, 5.0,
35 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080036
Austin Schuha040c3f2021-02-13 16:09:07 -080037DEFINE_double(
38 max_out_of_order, -1,
39 "If set, this overrides the max out of order duration for a log file.");
40
Brian Silvermanf51499a2020-09-21 12:49:08 -070041namespace aos::logger {
Austin Schuha36c8902019-12-30 18:07:15 -080042
Austin Schuh05b70472020-01-01 17:11:17 -080043namespace chrono = std::chrono;
44
Brian Silvermanf51499a2020-09-21 12:49:08 -070045DetachedBufferWriter::DetachedBufferWriter(
46 std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
47 : filename_(filename), encoder_(std::move(encoder)) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -070048 if (!util::MkdirPIfSpace(filename, 0777)) {
49 ran_out_of_space_ = true;
50 } else {
51 fd_ = open(std::string(filename).c_str(),
52 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
53 if (fd_ == -1 && errno == ENOSPC) {
54 ran_out_of_space_ = true;
55 } else {
56 PCHECK(fd_ != -1) << ": Failed to open " << filename << " for writing";
57 VLOG(1) << "Opened " << filename << " for writing";
58 }
59 }
Austin Schuha36c8902019-12-30 18:07:15 -080060}
61
62DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -070063 Close();
64 if (ran_out_of_space_) {
65 CHECK(acknowledge_ran_out_of_space_)
66 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -070067 }
Austin Schuh2f8fd752020-09-01 22:38:28 -070068}
69
Brian Silvermand90905f2020-09-23 14:42:56 -070070DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070071 *this = std::move(other);
72}
73
Brian Silverman87ac0402020-09-17 14:47:01 -070074// When other is destroyed "soon" (which it should be because we're getting an
75// rvalue reference to it), it will flush etc all the data we have queued up
76// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -070077DetachedBufferWriter &DetachedBufferWriter::operator=(
78 DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070079 std::swap(filename_, other.filename_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070080 std::swap(encoder_, other.encoder_);
Austin Schuh2f8fd752020-09-01 22:38:28 -070081 std::swap(fd_, other.fd_);
Brian Silverman0465fcf2020-09-24 00:29:18 -070082 std::swap(ran_out_of_space_, other.ran_out_of_space_);
83 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh2f8fd752020-09-01 22:38:28 -070084 std::swap(iovec_, other.iovec_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070085 std::swap(max_write_time_, other.max_write_time_);
86 std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
87 std::swap(max_write_time_messages_, other.max_write_time_messages_);
88 std::swap(total_write_time_, other.total_write_time_);
89 std::swap(total_write_count_, other.total_write_count_);
90 std::swap(total_write_messages_, other.total_write_messages_);
91 std::swap(total_write_bytes_, other.total_write_bytes_);
Austin Schuh2f8fd752020-09-01 22:38:28 -070092 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -080093}
94
Brian Silvermanf51499a2020-09-21 12:49:08 -070095void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -070096 if (ran_out_of_space_) {
97 // We don't want any later data to be written after space becomes
98 // available, so refuse to write anything more once we've dropped data
99 // because we ran out of space.
100 VLOG(1) << "Ignoring span: " << span.size();
101 return;
102 }
103
Austin Schuhbd06ae42021-03-31 22:48:21 -0700104 aos::monotonic_clock::time_point now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700105 if (encoder_->may_bypass() && span.size() > 4096u) {
106 // Over this threshold, we'll assume it's cheaper to add an extra
107 // syscall to write the data immediately instead of copying it to
108 // enqueue.
Austin Schuha36c8902019-12-30 18:07:15 -0800109
Brian Silvermanf51499a2020-09-21 12:49:08 -0700110 // First, flush everything.
111 while (encoder_->queue_size() > 0u) {
112 Flush();
113 }
Austin Schuhde031b72020-01-10 19:34:41 -0800114
Brian Silvermanf51499a2020-09-21 12:49:08 -0700115 // Then, write it directly.
116 const auto start = aos::monotonic_clock::now();
117 const ssize_t written = write(fd_, span.data(), span.size());
118 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700119 HandleWriteReturn(written, span.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700120 UpdateStatsForWrite(end - start, written, 1);
Austin Schuhbd06ae42021-03-31 22:48:21 -0700121 now = end;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700122 } else {
123 encoder_->Encode(CopySpanAsDetachedBuffer(span));
Austin Schuhbd06ae42021-03-31 22:48:21 -0700124 now = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800125 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700126
Austin Schuhbd06ae42021-03-31 22:48:21 -0700127 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800128}
129
Brian Silverman0465fcf2020-09-24 00:29:18 -0700130void DetachedBufferWriter::Close() {
131 if (fd_ == -1) {
132 return;
133 }
134 encoder_->Finish();
135 while (encoder_->queue_size() > 0) {
136 Flush();
137 }
138 if (close(fd_) == -1) {
139 if (errno == ENOSPC) {
140 ran_out_of_space_ = true;
141 } else {
142 PLOG(ERROR) << "Closing log file failed";
143 }
144 }
145 fd_ = -1;
146 VLOG(1) << "Closed " << filename_;
147}
148
Austin Schuha36c8902019-12-30 18:07:15 -0800149void DetachedBufferWriter::Flush() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700150 if (ran_out_of_space_) {
151 // We don't want any later data to be written after space becomes available,
152 // so refuse to write anything more once we've dropped data because we ran
153 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700154 if (encoder_) {
155 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
156 encoder_->Clear(encoder_->queue().size());
157 } else {
158 VLOG(1) << "No queue to ignore";
159 }
160 return;
161 }
162
163 const auto queue = encoder_->queue();
164 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700165 return;
166 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700167
Austin Schuha36c8902019-12-30 18:07:15 -0800168 iovec_.clear();
Brian Silvermanf51499a2020-09-21 12:49:08 -0700169 const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
170 iovec_.resize(iovec_size);
Austin Schuha36c8902019-12-30 18:07:15 -0800171 size_t counted_size = 0;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700172 for (size_t i = 0; i < iovec_size; ++i) {
173 iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
174 iovec_[i].iov_len = queue[i].size();
175 counted_size += iovec_[i].iov_len;
Austin Schuha36c8902019-12-30 18:07:15 -0800176 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700177
178 const auto start = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800179 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700180 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700181 HandleWriteReturn(written, counted_size);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700182
183 encoder_->Clear(iovec_size);
184
185 UpdateStatsForWrite(end - start, written, iovec_size);
186}
187
Brian Silverman0465fcf2020-09-24 00:29:18 -0700188void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
189 size_t write_size) {
190 if (write_return == -1 && errno == ENOSPC) {
191 ran_out_of_space_ = true;
192 return;
193 }
194 PCHECK(write_return >= 0) << ": write failed";
195 if (write_return < static_cast<ssize_t>(write_size)) {
196 // Sometimes this happens instead of ENOSPC. On a real filesystem, this
197 // never seems to happen in any other case. If we ever want to log to a
198 // socket, this will happen more often. However, until we get there, we'll
199 // just assume it means we ran out of space.
200 ran_out_of_space_ = true;
201 return;
202 }
203}
204
Brian Silvermanf51499a2020-09-21 12:49:08 -0700205void DetachedBufferWriter::UpdateStatsForWrite(
206 aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
207 if (duration > max_write_time_) {
208 max_write_time_ = duration;
209 max_write_time_bytes_ = written;
210 max_write_time_messages_ = iovec_size;
211 }
212 total_write_time_ += duration;
213 ++total_write_count_;
214 total_write_messages_ += iovec_size;
215 total_write_bytes_ += written;
216}
217
Austin Schuhbd06ae42021-03-31 22:48:21 -0700218void DetachedBufferWriter::FlushAtThreshold(
219 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700220 if (ran_out_of_space_) {
221 // We don't want any later data to be written after space becomes available,
222 // so refuse to write anything more once we've dropped data because we ran
223 // out of space.
224 if (encoder_) {
225 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
226 encoder_->Clear(encoder_->queue().size());
227 } else {
228 VLOG(1) << "No queue to ignore";
229 }
230 return;
231 }
232
Austin Schuhbd06ae42021-03-31 22:48:21 -0700233 // We don't want to flush the first time through. Otherwise we will flush as
234 // the log file header might be compressing, defeating any parallelism and
235 // queueing there.
236 if (last_flush_time_ == aos::monotonic_clock::min_time) {
237 last_flush_time_ = now;
238 }
239
Brian Silvermanf51499a2020-09-21 12:49:08 -0700240 // Flush if we are at the max number of iovs per writev, because there's no
241 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700242 // data queued up or if it has been long enough.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700243 while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700244 encoder_->queue_size() >= IOV_MAX ||
245 now > last_flush_time_ +
246 chrono::duration_cast<chrono::nanoseconds>(
247 chrono::duration<double>(FLAGS_flush_period))) {
248 last_flush_time_ = now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700249 Flush();
250 }
Austin Schuha36c8902019-12-30 18:07:15 -0800251}
252
253flatbuffers::Offset<MessageHeader> PackMessage(
254 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
255 int channel_index, LogType log_type) {
256 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
257
258 switch (log_type) {
259 case LogType::kLogMessage:
260 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800261 case LogType::kLogRemoteMessage:
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700262 data_offset = fbb->CreateVector(
263 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800264 break;
265
266 case LogType::kLogDeliveryTimeOnly:
267 break;
268 }
269
270 MessageHeader::Builder message_header_builder(*fbb);
271 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800272
273 switch (log_type) {
274 case LogType::kLogRemoteMessage:
275 message_header_builder.add_queue_index(context.remote_queue_index);
276 message_header_builder.add_monotonic_sent_time(
277 context.monotonic_remote_time.time_since_epoch().count());
278 message_header_builder.add_realtime_sent_time(
279 context.realtime_remote_time.time_since_epoch().count());
280 break;
281
282 case LogType::kLogMessage:
283 case LogType::kLogMessageAndDeliveryTime:
284 case LogType::kLogDeliveryTimeOnly:
285 message_header_builder.add_queue_index(context.queue_index);
286 message_header_builder.add_monotonic_sent_time(
287 context.monotonic_event_time.time_since_epoch().count());
288 message_header_builder.add_realtime_sent_time(
289 context.realtime_event_time.time_since_epoch().count());
290 break;
291 }
Austin Schuha36c8902019-12-30 18:07:15 -0800292
293 switch (log_type) {
294 case LogType::kLogMessage:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800295 case LogType::kLogRemoteMessage:
Austin Schuha36c8902019-12-30 18:07:15 -0800296 message_header_builder.add_data(data_offset);
297 break;
298
299 case LogType::kLogMessageAndDeliveryTime:
300 message_header_builder.add_data(data_offset);
301 [[fallthrough]];
302
303 case LogType::kLogDeliveryTimeOnly:
304 message_header_builder.add_monotonic_remote_time(
305 context.monotonic_remote_time.time_since_epoch().count());
306 message_header_builder.add_realtime_remote_time(
307 context.realtime_remote_time.time_since_epoch().count());
308 message_header_builder.add_remote_queue_index(context.remote_queue_index);
309 break;
310 }
311
312 return message_header_builder.Finish();
313}
314
Brian Silvermanf51499a2020-09-21 12:49:08 -0700315SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700316 static const std::string_view kXz = ".xz";
317 if (filename.substr(filename.size() - kXz.size()) == kXz) {
318#if ENABLE_LZMA
319 decoder_ = std::make_unique<LzmaDecoder>(filename);
320#else
321 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
322#endif
323 } else {
324 decoder_ = std::make_unique<DummyDecoder>(filename);
325 }
Austin Schuh05b70472020-01-01 17:11:17 -0800326}
327
Austin Schuhcf5f6442021-07-06 10:43:28 -0700328absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800329 // Make sure we have enough for the size.
330 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
331 if (!ReadBlock()) {
332 return absl::Span<const uint8_t>();
333 }
334 }
335
336 // Now make sure we have enough for the message.
337 const size_t data_size =
338 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
339 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -0800340 if (data_size == sizeof(flatbuffers::uoffset_t)) {
341 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
342 LOG(ERROR) << " Rest of log file is "
343 << absl::BytesToHexString(std::string_view(
344 reinterpret_cast<const char *>(data_.data() +
345 consumed_data_),
346 data_.size() - consumed_data_));
347 return absl::Span<const uint8_t>();
348 }
Austin Schuh05b70472020-01-01 17:11:17 -0800349 while (data_.size() < consumed_data_ + data_size) {
350 if (!ReadBlock()) {
351 return absl::Span<const uint8_t>();
352 }
353 }
354
355 // And return it, consuming the data.
356 const uint8_t *data_ptr = data_.data() + consumed_data_;
357
Austin Schuh05b70472020-01-01 17:11:17 -0800358 return absl::Span<const uint8_t>(data_ptr, data_size);
359}
360
Austin Schuhcf5f6442021-07-06 10:43:28 -0700361void SpanReader::ConsumeMessage() {
362 consumed_data_ +=
363 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
364 sizeof(flatbuffers::uoffset_t);
365}
366
367absl::Span<const uint8_t> SpanReader::ReadMessage() {
368 absl::Span<const uint8_t> result = PeekMessage();
369 if (result != absl::Span<const uint8_t>()) {
370 ConsumeMessage();
371 }
372 return result;
373}
374
Austin Schuh05b70472020-01-01 17:11:17 -0800375bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700376 // This is the amount of data we grab at a time. Doing larger chunks minimizes
377 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -0800378 constexpr size_t kReadSize = 256 * 1024;
379
380 // Strip off any unused data at the front.
381 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700382 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -0800383 consumed_data_ = 0;
384 }
385
386 const size_t starting_size = data_.size();
387
388 // This should automatically grow the backing store. It won't shrink if we
389 // get a small chunk later. This reduces allocations when we want to append
390 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700391 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -0800392
Brian Silvermanf51499a2020-09-21 12:49:08 -0700393 const size_t count =
394 decoder_->Read(data_.begin() + starting_size, data_.end());
395 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -0800396 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -0800397 return false;
398 }
Austin Schuh05b70472020-01-01 17:11:17 -0800399
400 return true;
401}
402
Austin Schuhadd6eb32020-11-09 21:24:26 -0800403std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh3bd4c402020-11-06 18:19:06 -0800404 std::string_view filename) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800405 SpanReader span_reader(filename);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800406 absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
407
408 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800409 if (config_data == absl::Span<const uint8_t>()) {
410 return std::nullopt;
411 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800412
Austin Schuh5212cad2020-09-09 23:12:09 -0700413 // And copy the config so we have it forever, removing the size prefix.
Brian Silverman354697a2020-09-22 21:06:32 -0700414 ResizeableBuffer data;
Austin Schuhadd6eb32020-11-09 21:24:26 -0800415 data.resize(config_data.size());
416 memcpy(data.data(), config_data.begin(), data.size());
Austin Schuhe09beb12020-12-11 20:04:27 -0800417 SizePrefixedFlatbufferVector<LogFileHeader> result(std::move(data));
418 if (!result.Verify()) {
419 return std::nullopt;
420 }
421 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800422}
423
Austin Schuhadd6eb32020-11-09 21:24:26 -0800424std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -0800425 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -0700426 SpanReader span_reader(filename);
427 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
428 for (size_t i = 0; i < n + 1; ++i) {
429 data_span = span_reader.ReadMessage();
430
431 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800432 if (data_span == absl::Span<const uint8_t>()) {
433 return std::nullopt;
434 }
Austin Schuh5212cad2020-09-09 23:12:09 -0700435 }
436
Brian Silverman354697a2020-09-22 21:06:32 -0700437 // And copy the config so we have it forever, removing the size prefix.
438 ResizeableBuffer data;
Austin Schuhadd6eb32020-11-09 21:24:26 -0800439 data.resize(data_span.size());
440 memcpy(data.data(), data_span.begin(), data.size());
Austin Schuhe09beb12020-12-11 20:04:27 -0800441 SizePrefixedFlatbufferVector<MessageHeader> result(std::move(data));
442 if (!result.Verify()) {
443 return std::nullopt;
444 }
445 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -0700446}
447
Austin Schuh05b70472020-01-01 17:11:17 -0800448MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -0700449 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -0800450 raw_log_file_header_(
451 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Austin Schuh05b70472020-01-01 17:11:17 -0800452 // Make sure we have enough to read the size.
Austin Schuh97789fc2020-08-01 14:42:45 -0700453 absl::Span<const uint8_t> header_data = span_reader_.ReadMessage();
Austin Schuh05b70472020-01-01 17:11:17 -0800454
455 // Make sure something was read.
Austin Schuh97789fc2020-08-01 14:42:45 -0700456 CHECK(header_data != absl::Span<const uint8_t>())
457 << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -0800458
Austin Schuh97789fc2020-08-01 14:42:45 -0700459 // And copy the header data so we have it forever.
Brian Silverman354697a2020-09-22 21:06:32 -0700460 ResizeableBuffer header_data_copy;
Austin Schuhadd6eb32020-11-09 21:24:26 -0800461 header_data_copy.resize(header_data.size());
462 memcpy(header_data_copy.data(), header_data.begin(), header_data_copy.size());
Austin Schuh97789fc2020-08-01 14:42:45 -0700463 raw_log_file_header_ =
Austin Schuhadd6eb32020-11-09 21:24:26 -0800464 SizePrefixedFlatbufferVector<LogFileHeader>(std::move(header_data_copy));
Austin Schuh05b70472020-01-01 17:11:17 -0800465
Austin Schuhcde938c2020-02-02 17:30:07 -0800466 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -0800467 FLAGS_max_out_of_order > 0
468 ? chrono::duration_cast<chrono::nanoseconds>(
469 chrono::duration<double>(FLAGS_max_out_of_order))
470 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -0800471
472 VLOG(1) << "Opened " << filename << " as node "
473 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -0800474}
475
Austin Schuhadd6eb32020-11-09 21:24:26 -0800476std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
477MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800478 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
479 if (msg_data == absl::Span<const uint8_t>()) {
480 return std::nullopt;
481 }
482
Brian Silverman354697a2020-09-22 21:06:32 -0700483 ResizeableBuffer result_buffer;
Austin Schuhadd6eb32020-11-09 21:24:26 -0800484 result_buffer.resize(msg_data.size());
485 memcpy(result_buffer.data(), msg_data.begin(), result_buffer.size());
486 SizePrefixedFlatbufferVector<MessageHeader> result(std::move(result_buffer));
Austin Schuh05b70472020-01-01 17:11:17 -0800487
488 const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
489 chrono::nanoseconds(result.message().monotonic_sent_time()));
490
491 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuh8bd96322020-02-13 21:18:22 -0800492 VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(result);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800493 return std::move(result);
Austin Schuh05b70472020-01-01 17:11:17 -0800494}
495
Austin Schuhc41603c2020-10-11 16:17:37 -0700496PartsMessageReader::PartsMessageReader(LogParts log_parts)
497 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {}
498
Austin Schuhadd6eb32020-11-09 21:24:26 -0800499std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
Austin Schuhc41603c2020-10-11 16:17:37 -0700500PartsMessageReader::ReadMessage() {
501 while (!done_) {
Austin Schuhadd6eb32020-11-09 21:24:26 -0800502 std::optional<SizePrefixedFlatbufferVector<MessageHeader>> message =
Austin Schuhc41603c2020-10-11 16:17:37 -0700503 message_reader_.ReadMessage();
504 if (message) {
505 newest_timestamp_ = message_reader_.newest_timestamp();
Austin Schuh32f68492020-11-08 21:45:51 -0800506 const monotonic_clock::time_point monotonic_sent_time(
507 chrono::nanoseconds(message->message().monotonic_sent_time()));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800508 // TODO(austin): Does this work with startup? Might need to use the start
509 // time.
510 // TODO(austin): Does this work with startup when we don't know the remote
511 // start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -0800512 if (monotonic_sent_time >
513 parts_.monotonic_start_time + max_out_of_order_duration()) {
514 after_start_ = true;
515 }
516 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -0800517 CHECK_GE(monotonic_sent_time,
518 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -0800519 << ": Max out of order of " << max_out_of_order_duration().count()
520 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -0800521 << parts_.monotonic_start_time << " currently reading "
522 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -0800523 }
Austin Schuhc41603c2020-10-11 16:17:37 -0700524 return message;
525 }
526 NextLog();
527 }
Austin Schuh32f68492020-11-08 21:45:51 -0800528 newest_timestamp_ = monotonic_clock::max_time;
Austin Schuhc41603c2020-10-11 16:17:37 -0700529 return std::nullopt;
530}
531
532void PartsMessageReader::NextLog() {
533 if (next_part_index_ == parts_.parts.size()) {
534 done_ = true;
535 return;
536 }
537 message_reader_ = MessageReader(parts_.parts[next_part_index_]);
538 ++next_part_index_;
539}
540
Austin Schuh1be0ce42020-11-29 22:43:26 -0800541bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700542 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700543
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700544 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -0800545 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700546 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -0800547 return false;
548 }
549
550 if (this->channel_index < m2.channel_index) {
551 return true;
552 } else if (this->channel_index > m2.channel_index) {
553 return false;
554 }
555
556 return this->queue_index < m2.queue_index;
557}
558
559bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800560bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700561 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700562
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700563 return timestamp.time == m2.timestamp.time &&
564 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800565}
Austin Schuh1be0ce42020-11-29 22:43:26 -0800566
567std::ostream &operator<<(std::ostream &os, const Message &m) {
568 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700569 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Austin Schuhd2f96102020-12-01 20:27:29 -0800570 if (m.data.Verify()) {
571 os << ", .data="
572 << aos::FlatbufferToJson(m.data,
573 {.multi_line = false, .max_vector_size = 1});
574 }
575 os << "}";
576 return os;
577}
578
579std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
580 os << "{.channel_index=" << m.channel_index
581 << ", .queue_index=" << m.queue_index
582 << ", .monotonic_event_time=" << m.monotonic_event_time
583 << ", .realtime_event_time=" << m.realtime_event_time;
584 if (m.remote_queue_index != 0xffffffff) {
585 os << ", .remote_queue_index=" << m.remote_queue_index;
586 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700587 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800588 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
589 }
590 if (m.realtime_remote_time != realtime_clock::min_time) {
591 os << ", .realtime_remote_time=" << m.realtime_remote_time;
592 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700593 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800594 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
595 }
Austin Schuhd2f96102020-12-01 20:27:29 -0800596 if (m.data.Verify()) {
597 os << ", .data="
598 << aos::FlatbufferToJson(m.data,
599 {.multi_line = false, .max_vector_size = 1});
600 }
601 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -0800602 return os;
603}
604
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800605LogPartsSorter::LogPartsSorter(LogParts log_parts)
606 : parts_message_reader_(log_parts) {}
607
608Message *LogPartsSorter::Front() {
609 // Queue up data until enough data has been queued that the front message is
610 // sorted enough to be safe to pop. This may do nothing, so we should make
611 // sure the nothing path is checked quickly.
612 if (sorted_until() != monotonic_clock::max_time) {
613 while (true) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700614 if (!messages_.empty() && messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -0800615 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800616 break;
617 }
618
619 std::optional<SizePrefixedFlatbufferVector<MessageHeader>> m =
620 parts_message_reader_.ReadMessage();
621 // No data left, sorted forever, work through what is left.
622 if (!m) {
623 sorted_until_ = monotonic_clock::max_time;
624 break;
625 }
626
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700627 messages_.insert(Message{
628 .channel_index = m.value().message().channel_index(),
629 .queue_index = m.value().message().queue_index(),
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700630 .timestamp =
631 BootTimestamp{
632 .boot = parts().boot_count,
633 .time = monotonic_clock::time_point(std::chrono::nanoseconds(
634 m.value().message().monotonic_sent_time()))},
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700635 .data = std::move(m.value())});
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800636
637 // Now, update sorted_until_ to match the new message.
638 if (parts_message_reader_.newest_timestamp() >
639 monotonic_clock::min_time +
640 parts_message_reader_.max_out_of_order_duration()) {
641 sorted_until_ = parts_message_reader_.newest_timestamp() -
642 parts_message_reader_.max_out_of_order_duration();
643 } else {
644 sorted_until_ = monotonic_clock::min_time;
645 }
646 }
647 }
648
649 // Now that we have enough data queued, return a pointer to the oldest piece
650 // of data if it exists.
651 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -0800652 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800653 return nullptr;
654 }
655
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700656 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -0800657 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700658 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800659 return &(*messages_.begin());
660}
661
662void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
663
664std::string LogPartsSorter::DebugString() const {
665 std::stringstream ss;
666 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -0800667 int count = 0;
668 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800669 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -0800670 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
671 ss << m << "\n";
672 } else if (no_dots) {
673 ss << "...\n";
674 no_dots = false;
675 }
676 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800677 }
678 ss << "] <- " << parts_message_reader_.filename();
679 return ss.str();
680}
681
Austin Schuhd2f96102020-12-01 20:27:29 -0800682NodeMerger::NodeMerger(std::vector<LogParts> parts) {
683 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -0700684 // Enforce that we are sorting things only from a single node from a single
685 // boot.
686 const std::string_view part0_node = parts[0].node;
687 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -0800688 for (size_t i = 1; i < parts.size(); ++i) {
689 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -0700690 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
691 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -0800692 }
Austin Schuh715adc12021-06-29 22:07:39 -0700693
694 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
695
Austin Schuhd2f96102020-12-01 20:27:29 -0800696 for (LogParts &part : parts) {
697 parts_sorters_.emplace_back(std::move(part));
698 }
699
Austin Schuhd2f96102020-12-01 20:27:29 -0800700 monotonic_start_time_ = monotonic_clock::max_time;
701 realtime_start_time_ = realtime_clock::max_time;
702 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
703 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
704 monotonic_start_time_ = parts_sorter.monotonic_start_time();
705 realtime_start_time_ = parts_sorter.realtime_start_time();
706 }
707 }
708}
Austin Schuh8f52ed52020-11-30 23:12:39 -0800709
Austin Schuh0ca51f32020-12-25 21:51:45 -0800710std::vector<const LogParts *> NodeMerger::Parts() const {
711 std::vector<const LogParts *> p;
712 p.reserve(parts_sorters_.size());
713 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
714 p.emplace_back(&parts_sorter.parts());
715 }
716 return p;
717}
718
Austin Schuh8f52ed52020-11-30 23:12:39 -0800719Message *NodeMerger::Front() {
720 // Return the current Front if we have one, otherwise go compute one.
721 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -0800722 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700723 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -0800724 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800725 }
726
727 // Otherwise, do a simple search for the oldest message, deduplicating any
728 // duplicates.
729 Message *oldest = nullptr;
730 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800731 for (LogPartsSorter &parts_sorter : parts_sorters_) {
732 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -0800733 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800734 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -0800735 continue;
736 }
737 if (oldest == nullptr || *m < *oldest) {
738 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -0800739 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800740 } else if (*m == *oldest) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800741 // Found a duplicate. If there is a choice, we want the one which has the
742 // timestamp time.
743 if (!m->data.message().has_monotonic_timestamp_time()) {
744 parts_sorter.PopFront();
745 } else if (!oldest->data.message().has_monotonic_timestamp_time()) {
746 current_->PopFront();
747 current_ = &parts_sorter;
748 oldest = m;
749 } else {
750 CHECK_EQ(m->data.message().monotonic_timestamp_time(),
751 oldest->data.message().monotonic_timestamp_time());
752 parts_sorter.PopFront();
753 }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800754 }
755
756 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -0800757 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -0800758 }
759
Austin Schuhb000de62020-12-03 22:00:40 -0800760 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700761 CHECK_GE(oldest->timestamp.time, last_message_time_);
762 last_message_time_ = oldest->timestamp.time;
Austin Schuhb000de62020-12-03 22:00:40 -0800763 } else {
764 last_message_time_ = monotonic_clock::max_time;
765 }
766
Austin Schuh8f52ed52020-11-30 23:12:39 -0800767 // Return the oldest message found. This will be nullptr if nothing was
768 // found, indicating there is nothing left.
769 return oldest;
770}
771
772void NodeMerger::PopFront() {
773 CHECK(current_ != nullptr) << "Popping before calling Front()";
774 current_->PopFront();
775 current_ = nullptr;
776}
777
Austin Schuhf16ef6a2021-06-30 21:48:17 -0700778BootMerger::BootMerger(std::vector<LogParts> files) {
779 std::vector<std::vector<LogParts>> boots;
780
781 // Now, we need to split things out by boot.
782 for (size_t i = 0; i < files.size(); ++i) {
783 LOG(INFO) << "Trying file " << i;
784 const size_t boot_count = files[i].boot_count;
785 LOG(INFO) << "Boot count " << boot_count;
786 if (boot_count + 1 > boots.size()) {
787 boots.resize(boot_count + 1);
788 }
789 boots[boot_count].emplace_back(std::move(files[i]));
790 }
791
792 node_mergers_.reserve(boots.size());
793 for (size_t i = 0; i < boots.size(); ++i) {
794 LOG(INFO) << "Boot " << i;
795 for (auto &p : boots[i]) {
796 LOG(INFO) << "Part " << p;
797 }
798 node_mergers_.emplace_back(
799 std::make_unique<NodeMerger>(std::move(boots[i])));
800 }
801}
802
803Message *BootMerger::Front() {
804 Message *result = node_mergers_[index_]->Front();
805
806 if (result != nullptr) {
807 return result;
808 }
809
810 if (index_ + 1u == node_mergers_.size()) {
811 // At the end of the last node merger, just return.
812 return nullptr;
813 } else {
814 ++index_;
815 return Front();
816 }
817}
818
819void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
820
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700821std::vector<const LogParts *> BootMerger::Parts() const {
822 std::vector<const LogParts *> results;
823 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
824 std::vector<const LogParts *> node_parts = node_merger->Parts();
825
826 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
827 std::make_move_iterator(node_parts.end()));
828 }
829
830 return results;
831}
832
Austin Schuhd2f96102020-12-01 20:27:29 -0800833TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700834 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -0800835 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700836 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -0800837 if (!configuration_) {
838 configuration_ = part->config;
839 } else {
840 CHECK_EQ(configuration_.get(), part->config.get());
841 }
842 }
843 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -0800844 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
845 // pretty simple.
846 if (configuration::MultiNode(config)) {
847 nodes_data_.resize(config->nodes()->size());
848 const Node *my_node = config->nodes()->Get(node());
849 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
850 const Node *node = config->nodes()->Get(node_index);
851 NodeData *node_data = &nodes_data_[node_index];
852 node_data->channels.resize(config->channels()->size());
853 // We should save the channel if it is delivered to the node represented
854 // by the NodeData, but not sent by that node. That combo means it is
855 // forwarded.
856 size_t channel_index = 0;
857 node_data->any_delivered = false;
858 for (const Channel *channel : *config->channels()) {
859 node_data->channels[channel_index].delivered =
860 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -0800861 configuration::ChannelIsSendableOnNode(channel, my_node) &&
862 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -0800863 node_data->any_delivered = node_data->any_delivered ||
864 node_data->channels[channel_index].delivered;
865 ++channel_index;
866 }
867 }
868
869 for (const Channel *channel : *config->channels()) {
870 source_node_.emplace_back(configuration::GetNodeIndex(
871 config, channel->source_node()->string_view()));
872 }
873 }
874}
875
876void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -0800877 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -0800878 CHECK_NE(timestamp_mapper->node(), node());
879 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
880
881 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
882 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
883 // we could needlessly save data.
884 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -0800885 VLOG(1) << "Registering on node " << node() << " for peer node "
886 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -0800887 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
888
889 timestamp_mapper->nodes_data_[node()].peer = this;
890 }
891}
892
Austin Schuh79b30942021-01-24 22:32:21 -0800893void TimestampMapper::QueueMessage(Message *m) {
894 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -0800895 .channel_index = m->channel_index,
896 .queue_index = m->queue_index,
897 .monotonic_event_time = m->timestamp,
898 .realtime_event_time = aos::realtime_clock::time_point(
899 std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
900 .remote_queue_index = 0xffffffff,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700901 .monotonic_remote_time = BootTimestamp::min_time(),
Austin Schuhd2f96102020-12-01 20:27:29 -0800902 .realtime_remote_time = realtime_clock::min_time,
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700903 .monotonic_timestamp_time = BootTimestamp::min_time(),
Austin Schuh79b30942021-01-24 22:32:21 -0800904 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -0800905}
906
907TimestampedMessage *TimestampMapper::Front() {
908 // No need to fetch anything new. A previous message still exists.
909 switch (first_message_) {
910 case FirstMessage::kNeedsUpdate:
911 break;
912 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -0800913 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -0800914 case FirstMessage::kNullptr:
915 return nullptr;
916 }
917
Austin Schuh79b30942021-01-24 22:32:21 -0800918 if (matched_messages_.empty()) {
919 if (!QueueMatched()) {
920 first_message_ = FirstMessage::kNullptr;
921 return nullptr;
922 }
923 }
924 first_message_ = FirstMessage::kInMessage;
925 return &matched_messages_.front();
926}
927
928bool TimestampMapper::QueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -0800929 if (nodes_data_.empty()) {
930 // Simple path. We are single node, so there are no timestamps to match!
931 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700932 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -0800933 if (!m) {
Austin Schuh79b30942021-01-24 22:32:21 -0800934 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -0800935 }
Austin Schuh79b30942021-01-24 22:32:21 -0800936 // Enqueue this message into matched_messages_ so we have a place to
937 // associate remote timestamps, and return it.
938 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -0800939
Austin Schuh79b30942021-01-24 22:32:21 -0800940 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
941 last_message_time_ = matched_messages_.back().monotonic_event_time;
942
943 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700944 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -0800945 timestamp_callback_(&matched_messages_.back());
946 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -0800947 }
948
949 // We need to only add messages to the list so they get processed for messages
950 // which are delivered. Reuse the flow below which uses messages_ by just
951 // adding the new message to messages_ and continuing.
952 if (messages_.empty()) {
953 if (!Queue()) {
954 // Found nothing to add, we are out of data!
Austin Schuh79b30942021-01-24 22:32:21 -0800955 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -0800956 }
957
958 // Now that it has been added (and cannibalized), forget about it upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700959 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -0800960 }
961
962 Message *m = &(messages_.front());
963
964 if (source_node_[m->channel_index] == node()) {
965 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -0800966 QueueMessage(m);
967 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
968 last_message_time_ = matched_messages_.back().monotonic_event_time;
969 messages_.pop_front();
970 timestamp_callback_(&matched_messages_.back());
971 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -0800972 } else {
973 // Got a timestamp, find the matching remote data, match it, and return it.
974 Message data = MatchingMessageFor(*m);
975
976 // Return the data from the remote. The local message only has timestamp
977 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -0800978 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -0800979 .channel_index = m->channel_index,
980 .queue_index = m->queue_index,
981 .monotonic_event_time = m->timestamp,
982 .realtime_event_time = aos::realtime_clock::time_point(
983 std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
984 .remote_queue_index = m->data.message().remote_queue_index(),
985 .monotonic_remote_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700986 // TODO(austin): 0 is wrong...
987 {0, monotonic_clock::time_point(std::chrono::nanoseconds(
988 m->data.message().monotonic_remote_time()))},
Austin Schuhd2f96102020-12-01 20:27:29 -0800989 .realtime_remote_time = realtime_clock::time_point(
990 std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
Austin Schuh8bf1e632021-01-02 22:41:04 -0800991 .monotonic_timestamp_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -0700992 {0, monotonic_clock::time_point(std::chrono::nanoseconds(
993 m->data.message().monotonic_timestamp_time()))},
Austin Schuh79b30942021-01-24 22:32:21 -0800994 .data = std::move(data.data)});
995 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
996 last_message_time_ = matched_messages_.back().monotonic_event_time;
997 // Since messages_ holds the data, drop it.
998 messages_.pop_front();
999 timestamp_callback_(&matched_messages_.back());
1000 return true;
1001 }
1002}
1003
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001004void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08001005 while (last_message_time_ <= queue_time) {
1006 if (!QueueMatched()) {
1007 return;
1008 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001009 }
1010}
1011
Austin Schuhe639ea12021-01-25 13:00:22 -08001012void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001013 // Note: queueing for time doesn't really work well across boots. So we just
1014 // assume that if you are using this, you only care about the current boot.
1015 //
1016 // TODO(austin): Is that the right concept?
1017 //
Austin Schuhe639ea12021-01-25 13:00:22 -08001018 // Make sure we have something queued first. This makes the end time
1019 // calculation simpler, and is typically what folks want regardless.
1020 if (matched_messages_.empty()) {
1021 if (!QueueMatched()) {
1022 return;
1023 }
1024 }
1025
1026 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001027 std::max(monotonic_start_time(
1028 matched_messages_.front().monotonic_event_time.boot),
1029 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08001030 time_estimation_buffer;
1031
1032 // Place sorted messages on the list until we have
1033 // --time_estimation_buffer_seconds seconds queued up (but queue at least
1034 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001035 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08001036 if (!QueueMatched()) {
1037 return;
1038 }
1039 }
1040}
1041
Austin Schuhd2f96102020-12-01 20:27:29 -08001042void TimestampMapper::PopFront() {
1043 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
1044 first_message_ = FirstMessage::kNeedsUpdate;
1045
Austin Schuh79b30942021-01-24 22:32:21 -08001046 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001047}
1048
1049Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001050 // Figure out what queue index we are looking for.
1051 CHECK(message.data.message().has_remote_queue_index());
1052 const uint32_t remote_queue_index =
1053 message.data.message().remote_queue_index();
1054
1055 CHECK(message.data.message().has_monotonic_remote_time());
1056 CHECK(message.data.message().has_realtime_remote_time());
1057
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001058 const BootTimestamp monotonic_remote_time{
1059 .boot = 0,
1060 .time = monotonic_clock::time_point(std::chrono::nanoseconds(
1061 message.data.message().monotonic_remote_time()))};
Austin Schuhd2f96102020-12-01 20:27:29 -08001062 const realtime_clock::time_point realtime_remote_time(
1063 std::chrono::nanoseconds(message.data.message().realtime_remote_time()));
1064
Austin Schuhfecf1d82020-12-19 16:57:28 -08001065 TimestampMapper *peer = nodes_data_[source_node_[message.channel_index]].peer;
1066
1067 // We only register the peers which we have data for. So, if we are being
1068 // asked to pull a timestamp from a peer which doesn't exist, return an empty
1069 // message.
1070 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001071 // TODO(austin): Make sure the tests hit all these paths with a boot count
1072 // of 1...
Austin Schuhfecf1d82020-12-19 16:57:28 -08001073 return Message{
1074 .channel_index = message.channel_index,
1075 .queue_index = remote_queue_index,
1076 .timestamp = monotonic_remote_time,
1077 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
1078 }
1079
1080 // The queue which will have the matching data, if available.
1081 std::deque<Message> *data_queue =
1082 &peer->nodes_data_[node()].channels[message.channel_index].messages;
1083
Austin Schuh79b30942021-01-24 22:32:21 -08001084 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08001085
1086 if (data_queue->empty()) {
1087 return Message{
1088 .channel_index = message.channel_index,
1089 .queue_index = remote_queue_index,
1090 .timestamp = monotonic_remote_time,
1091 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
1092 }
1093
Austin Schuhd2f96102020-12-01 20:27:29 -08001094 if (remote_queue_index < data_queue->front().queue_index ||
1095 remote_queue_index > data_queue->back().queue_index) {
1096 return Message{
1097 .channel_index = message.channel_index,
1098 .queue_index = remote_queue_index,
1099 .timestamp = monotonic_remote_time,
1100 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
1101 }
1102
Austin Schuh993ccb52020-12-12 15:59:32 -08001103 // The algorithm below is constant time with some assumptions. We need there
1104 // to be no missing messages in the data stream. This also assumes a queue
1105 // hasn't wrapped. That is conservative, but should let us get started.
1106 if (data_queue->back().queue_index - data_queue->front().queue_index + 1u ==
1107 data_queue->size()) {
1108 // Pull the data out and confirm that the timestamps match as expected.
1109 Message result = std::move(
1110 (*data_queue)[remote_queue_index - data_queue->front().queue_index]);
1111
1112 CHECK_EQ(result.timestamp, monotonic_remote_time)
1113 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1114 CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
1115 result.data.message().realtime_sent_time())),
1116 realtime_remote_time)
1117 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1118 // Now drop the data off the front. We have deduplicated timestamps, so we
1119 // are done. And all the data is in order.
1120 data_queue->erase(data_queue->begin(),
1121 data_queue->begin() + (1 + remote_queue_index -
1122 data_queue->front().queue_index));
1123 return result;
1124 } else {
1125 auto it = std::find_if(data_queue->begin(), data_queue->end(),
1126 [remote_queue_index](const Message &m) {
1127 return m.queue_index == remote_queue_index;
1128 });
1129 if (it == data_queue->end()) {
1130 return Message{
1131 .channel_index = message.channel_index,
1132 .queue_index = remote_queue_index,
1133 .timestamp = monotonic_remote_time,
1134 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
1135 }
1136
1137 Message result = std::move(*it);
1138
1139 CHECK_EQ(result.timestamp, monotonic_remote_time)
1140 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1141 CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
1142 result.data.message().realtime_sent_time())),
1143 realtime_remote_time)
1144 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1145
1146 data_queue->erase(it);
1147
1148 return result;
1149 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001150}
1151
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001152void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001153 if (queued_until_ > t) {
1154 return;
1155 }
1156 while (true) {
1157 if (!messages_.empty() && messages_.back().timestamp > t) {
1158 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
1159 return;
1160 }
1161
1162 if (!Queue()) {
1163 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001164 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08001165 return;
1166 }
1167
1168 // Now that it has been added (and cannibalized), forget about it upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001169 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001170 }
1171}
1172
1173bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001174 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001175 if (m == nullptr) {
1176 return false;
1177 }
1178 for (NodeData &node_data : nodes_data_) {
1179 if (!node_data.any_delivered) continue;
1180 if (node_data.channels[m->channel_index].delivered) {
1181 // TODO(austin): This copies the data... Probably not worth stressing
1182 // about yet.
1183 // TODO(austin): Bound how big this can get. We tend not to send massive
1184 // data, so we can probably ignore this for a bit.
1185 node_data.channels[m->channel_index].messages.emplace_back(*m);
1186 }
1187 }
1188
1189 messages_.emplace_back(std::move(*m));
1190 return true;
1191}
1192
1193std::string TimestampMapper::DebugString() const {
1194 std::stringstream ss;
1195 ss << "node " << node() << " [\n";
1196 for (const Message &message : messages_) {
1197 ss << " " << message << "\n";
1198 }
1199 ss << "] queued_until " << queued_until_;
1200 for (const NodeData &ns : nodes_data_) {
1201 if (ns.peer == nullptr) continue;
1202 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
1203 size_t channel_index = 0;
1204 for (const NodeData::ChannelData &channel_data :
1205 ns.peer->nodes_data_[node()].channels) {
1206 if (channel_data.messages.empty()) {
1207 continue;
1208 }
Austin Schuhb000de62020-12-03 22:00:40 -08001209
Austin Schuhd2f96102020-12-01 20:27:29 -08001210 ss << " channel " << channel_index << " [\n";
1211 for (const Message &m : channel_data.messages) {
1212 ss << " " << m << "\n";
1213 }
1214 ss << " ]\n";
1215 ++channel_index;
1216 }
1217 ss << "] queued_until " << ns.peer->queued_until_;
1218 }
1219 return ss.str();
1220}
1221
Austin Schuhee711052020-08-24 16:06:09 -07001222std::string MaybeNodeName(const Node *node) {
1223 if (node != nullptr) {
1224 return node->name()->str() + " ";
1225 }
1226 return "";
1227}
1228
Brian Silvermanf51499a2020-09-21 12:49:08 -07001229} // namespace aos::logger