blob: 84ea84aa075afb33cc3033bf6028d0b07881ad79 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Austin Schuha36c8902019-12-30 18:07:15 -080010
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
Austin Schuhfa895892020-01-07 20:07:41 -080013#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080014#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080015#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080016#include "gflags/gflags.h"
17#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080018
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070019#if defined(__x86_64__)
20#define ENABLE_LZMA 1
21#elif defined(__aarch64__)
22#define ENABLE_LZMA 1
23#else
24#define ENABLE_LZMA 0
25#endif
26
27#if ENABLE_LZMA
28#include "aos/events/logging/lzma_encoder.h"
29#endif
30
Austin Schuh7fbf5a72020-09-21 16:28:13 -070031DEFINE_int32(flush_size, 128000,
Austin Schuha36c8902019-12-30 18:07:15 -080032 "Number of outstanding bytes to allow before flushing to disk.");
33
Austin Schuha040c3f2021-02-13 16:09:07 -080034DEFINE_double(
35 max_out_of_order, -1,
36 "If set, this overrides the max out of order duration for a log file.");
37
Brian Silvermanf51499a2020-09-21 12:49:08 -070038namespace aos::logger {
Austin Schuha36c8902019-12-30 18:07:15 -080039
Austin Schuh05b70472020-01-01 17:11:17 -080040namespace chrono = std::chrono;
41
Brian Silvermanf51499a2020-09-21 12:49:08 -070042DetachedBufferWriter::DetachedBufferWriter(
43 std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
44 : filename_(filename), encoder_(std::move(encoder)) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -070045 if (!util::MkdirPIfSpace(filename, 0777)) {
46 ran_out_of_space_ = true;
47 } else {
48 fd_ = open(std::string(filename).c_str(),
49 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
50 if (fd_ == -1 && errno == ENOSPC) {
51 ran_out_of_space_ = true;
52 } else {
53 PCHECK(fd_ != -1) << ": Failed to open " << filename << " for writing";
54 VLOG(1) << "Opened " << filename << " for writing";
55 }
56 }
Austin Schuha36c8902019-12-30 18:07:15 -080057}
58
59DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -070060 Close();
61 if (ran_out_of_space_) {
62 CHECK(acknowledge_ran_out_of_space_)
63 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -070064 }
Austin Schuh2f8fd752020-09-01 22:38:28 -070065}
66
Brian Silvermand90905f2020-09-23 14:42:56 -070067DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070068 *this = std::move(other);
69}
70
Brian Silverman87ac0402020-09-17 14:47:01 -070071// When other is destroyed "soon" (which it should be because we're getting an
72// rvalue reference to it), it will flush etc all the data we have queued up
73// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -070074DetachedBufferWriter &DetachedBufferWriter::operator=(
75 DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -070076 std::swap(filename_, other.filename_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070077 std::swap(encoder_, other.encoder_);
Austin Schuh2f8fd752020-09-01 22:38:28 -070078 std::swap(fd_, other.fd_);
Brian Silverman0465fcf2020-09-24 00:29:18 -070079 std::swap(ran_out_of_space_, other.ran_out_of_space_);
80 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh2f8fd752020-09-01 22:38:28 -070081 std::swap(iovec_, other.iovec_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070082 std::swap(max_write_time_, other.max_write_time_);
83 std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
84 std::swap(max_write_time_messages_, other.max_write_time_messages_);
85 std::swap(total_write_time_, other.total_write_time_);
86 std::swap(total_write_count_, other.total_write_count_);
87 std::swap(total_write_messages_, other.total_write_messages_);
88 std::swap(total_write_bytes_, other.total_write_bytes_);
Austin Schuh2f8fd752020-09-01 22:38:28 -070089 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -080090}
91
Brian Silvermanf51499a2020-09-21 12:49:08 -070092void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -070093 if (ran_out_of_space_) {
94 // We don't want any later data to be written after space becomes
95 // available, so refuse to write anything more once we've dropped data
96 // because we ran out of space.
97 VLOG(1) << "Ignoring span: " << span.size();
98 return;
99 }
100
Brian Silvermanf51499a2020-09-21 12:49:08 -0700101 if (encoder_->may_bypass() && span.size() > 4096u) {
102 // Over this threshold, we'll assume it's cheaper to add an extra
103 // syscall to write the data immediately instead of copying it to
104 // enqueue.
Austin Schuha36c8902019-12-30 18:07:15 -0800105
Brian Silvermanf51499a2020-09-21 12:49:08 -0700106 // First, flush everything.
107 while (encoder_->queue_size() > 0u) {
108 Flush();
109 }
Austin Schuhde031b72020-01-10 19:34:41 -0800110
Brian Silvermanf51499a2020-09-21 12:49:08 -0700111 // Then, write it directly.
112 const auto start = aos::monotonic_clock::now();
113 const ssize_t written = write(fd_, span.data(), span.size());
114 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700115 HandleWriteReturn(written, span.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700116 UpdateStatsForWrite(end - start, written, 1);
117 } else {
118 encoder_->Encode(CopySpanAsDetachedBuffer(span));
Austin Schuha36c8902019-12-30 18:07:15 -0800119 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700120
121 FlushAtThreshold();
Austin Schuha36c8902019-12-30 18:07:15 -0800122}
123
Brian Silverman0465fcf2020-09-24 00:29:18 -0700124void DetachedBufferWriter::Close() {
125 if (fd_ == -1) {
126 return;
127 }
128 encoder_->Finish();
129 while (encoder_->queue_size() > 0) {
130 Flush();
131 }
132 if (close(fd_) == -1) {
133 if (errno == ENOSPC) {
134 ran_out_of_space_ = true;
135 } else {
136 PLOG(ERROR) << "Closing log file failed";
137 }
138 }
139 fd_ = -1;
140 VLOG(1) << "Closed " << filename_;
141}
142
Austin Schuha36c8902019-12-30 18:07:15 -0800143void DetachedBufferWriter::Flush() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700144 if (ran_out_of_space_) {
145 // We don't want any later data to be written after space becomes available,
146 // so refuse to write anything more once we've dropped data because we ran
147 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700148 if (encoder_) {
149 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
150 encoder_->Clear(encoder_->queue().size());
151 } else {
152 VLOG(1) << "No queue to ignore";
153 }
154 return;
155 }
156
157 const auto queue = encoder_->queue();
158 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700159 return;
160 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700161
Austin Schuha36c8902019-12-30 18:07:15 -0800162 iovec_.clear();
Brian Silvermanf51499a2020-09-21 12:49:08 -0700163 const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
164 iovec_.resize(iovec_size);
Austin Schuha36c8902019-12-30 18:07:15 -0800165 size_t counted_size = 0;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700166 for (size_t i = 0; i < iovec_size; ++i) {
167 iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
168 iovec_[i].iov_len = queue[i].size();
169 counted_size += iovec_[i].iov_len;
Austin Schuha36c8902019-12-30 18:07:15 -0800170 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700171
172 const auto start = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800173 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700174 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700175 HandleWriteReturn(written, counted_size);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700176
177 encoder_->Clear(iovec_size);
178
179 UpdateStatsForWrite(end - start, written, iovec_size);
180}
181
Brian Silverman0465fcf2020-09-24 00:29:18 -0700182void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
183 size_t write_size) {
184 if (write_return == -1 && errno == ENOSPC) {
185 ran_out_of_space_ = true;
186 return;
187 }
188 PCHECK(write_return >= 0) << ": write failed";
189 if (write_return < static_cast<ssize_t>(write_size)) {
190 // Sometimes this happens instead of ENOSPC. On a real filesystem, this
191 // never seems to happen in any other case. If we ever want to log to a
192 // socket, this will happen more often. However, until we get there, we'll
193 // just assume it means we ran out of space.
194 ran_out_of_space_ = true;
195 return;
196 }
197}
198
Brian Silvermanf51499a2020-09-21 12:49:08 -0700199void DetachedBufferWriter::UpdateStatsForWrite(
200 aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
201 if (duration > max_write_time_) {
202 max_write_time_ = duration;
203 max_write_time_bytes_ = written;
204 max_write_time_messages_ = iovec_size;
205 }
206 total_write_time_ += duration;
207 ++total_write_count_;
208 total_write_messages_ += iovec_size;
209 total_write_bytes_ += written;
210}
211
212void DetachedBufferWriter::FlushAtThreshold() {
Austin Schuha426f1f2021-03-31 22:27:41 -0700213 if (ran_out_of_space_) {
214 // We don't want any later data to be written after space becomes available,
215 // so refuse to write anything more once we've dropped data because we ran
216 // out of space.
217 if (encoder_) {
218 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
219 encoder_->Clear(encoder_->queue().size());
220 } else {
221 VLOG(1) << "No queue to ignore";
222 }
223 return;
224 }
225
Brian Silvermanf51499a2020-09-21 12:49:08 -0700226 // Flush if we are at the max number of iovs per writev, because there's no
227 // point queueing up any more data in memory. Also flush once we have enough
228 // data queued up.
229 while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
230 encoder_->queue_size() >= IOV_MAX) {
231 Flush();
232 }
Austin Schuha36c8902019-12-30 18:07:15 -0800233}
234
235flatbuffers::Offset<MessageHeader> PackMessage(
236 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
237 int channel_index, LogType log_type) {
238 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
239
240 switch (log_type) {
241 case LogType::kLogMessage:
242 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800243 case LogType::kLogRemoteMessage:
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700244 data_offset = fbb->CreateVector(
245 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800246 break;
247
248 case LogType::kLogDeliveryTimeOnly:
249 break;
250 }
251
252 MessageHeader::Builder message_header_builder(*fbb);
253 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800254
255 switch (log_type) {
256 case LogType::kLogRemoteMessage:
257 message_header_builder.add_queue_index(context.remote_queue_index);
258 message_header_builder.add_monotonic_sent_time(
259 context.monotonic_remote_time.time_since_epoch().count());
260 message_header_builder.add_realtime_sent_time(
261 context.realtime_remote_time.time_since_epoch().count());
262 break;
263
264 case LogType::kLogMessage:
265 case LogType::kLogMessageAndDeliveryTime:
266 case LogType::kLogDeliveryTimeOnly:
267 message_header_builder.add_queue_index(context.queue_index);
268 message_header_builder.add_monotonic_sent_time(
269 context.monotonic_event_time.time_since_epoch().count());
270 message_header_builder.add_realtime_sent_time(
271 context.realtime_event_time.time_since_epoch().count());
272 break;
273 }
Austin Schuha36c8902019-12-30 18:07:15 -0800274
275 switch (log_type) {
276 case LogType::kLogMessage:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800277 case LogType::kLogRemoteMessage:
Austin Schuha36c8902019-12-30 18:07:15 -0800278 message_header_builder.add_data(data_offset);
279 break;
280
281 case LogType::kLogMessageAndDeliveryTime:
282 message_header_builder.add_data(data_offset);
283 [[fallthrough]];
284
285 case LogType::kLogDeliveryTimeOnly:
286 message_header_builder.add_monotonic_remote_time(
287 context.monotonic_remote_time.time_since_epoch().count());
288 message_header_builder.add_realtime_remote_time(
289 context.realtime_remote_time.time_since_epoch().count());
290 message_header_builder.add_remote_queue_index(context.remote_queue_index);
291 break;
292 }
293
294 return message_header_builder.Finish();
295}
296
Brian Silvermanf51499a2020-09-21 12:49:08 -0700297SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700298 static const std::string_view kXz = ".xz";
299 if (filename.substr(filename.size() - kXz.size()) == kXz) {
300#if ENABLE_LZMA
301 decoder_ = std::make_unique<LzmaDecoder>(filename);
302#else
303 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
304#endif
305 } else {
306 decoder_ = std::make_unique<DummyDecoder>(filename);
307 }
Austin Schuh05b70472020-01-01 17:11:17 -0800308}
309
310absl::Span<const uint8_t> SpanReader::ReadMessage() {
311 // Make sure we have enough for the size.
312 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
313 if (!ReadBlock()) {
314 return absl::Span<const uint8_t>();
315 }
316 }
317
318 // Now make sure we have enough for the message.
319 const size_t data_size =
320 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
321 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -0800322 if (data_size == sizeof(flatbuffers::uoffset_t)) {
323 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
324 LOG(ERROR) << " Rest of log file is "
325 << absl::BytesToHexString(std::string_view(
326 reinterpret_cast<const char *>(data_.data() +
327 consumed_data_),
328 data_.size() - consumed_data_));
329 return absl::Span<const uint8_t>();
330 }
Austin Schuh05b70472020-01-01 17:11:17 -0800331 while (data_.size() < consumed_data_ + data_size) {
332 if (!ReadBlock()) {
333 return absl::Span<const uint8_t>();
334 }
335 }
336
337 // And return it, consuming the data.
338 const uint8_t *data_ptr = data_.data() + consumed_data_;
339
340 consumed_data_ += data_size;
341
342 return absl::Span<const uint8_t>(data_ptr, data_size);
343}
344
Austin Schuh05b70472020-01-01 17:11:17 -0800345bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700346 // This is the amount of data we grab at a time. Doing larger chunks minimizes
347 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -0800348 constexpr size_t kReadSize = 256 * 1024;
349
350 // Strip off any unused data at the front.
351 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700352 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -0800353 consumed_data_ = 0;
354 }
355
356 const size_t starting_size = data_.size();
357
358 // This should automatically grow the backing store. It won't shrink if we
359 // get a small chunk later. This reduces allocations when we want to append
360 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700361 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -0800362
Brian Silvermanf51499a2020-09-21 12:49:08 -0700363 const size_t count =
364 decoder_->Read(data_.begin() + starting_size, data_.end());
365 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -0800366 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -0800367 return false;
368 }
Austin Schuh05b70472020-01-01 17:11:17 -0800369
370 return true;
371}
372
Austin Schuhadd6eb32020-11-09 21:24:26 -0800373std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh3bd4c402020-11-06 18:19:06 -0800374 std::string_view filename) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800375 SpanReader span_reader(filename);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800376 absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
377
378 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800379 if (config_data == absl::Span<const uint8_t>()) {
380 return std::nullopt;
381 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800382
Austin Schuh5212cad2020-09-09 23:12:09 -0700383 // And copy the config so we have it forever, removing the size prefix.
Brian Silverman354697a2020-09-22 21:06:32 -0700384 ResizeableBuffer data;
Austin Schuhadd6eb32020-11-09 21:24:26 -0800385 data.resize(config_data.size());
386 memcpy(data.data(), config_data.begin(), data.size());
Austin Schuhe09beb12020-12-11 20:04:27 -0800387 SizePrefixedFlatbufferVector<LogFileHeader> result(std::move(data));
388 if (!result.Verify()) {
389 return std::nullopt;
390 }
391 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800392}
393
Austin Schuhadd6eb32020-11-09 21:24:26 -0800394std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -0800395 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -0700396 SpanReader span_reader(filename);
397 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
398 for (size_t i = 0; i < n + 1; ++i) {
399 data_span = span_reader.ReadMessage();
400
401 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800402 if (data_span == absl::Span<const uint8_t>()) {
403 return std::nullopt;
404 }
Austin Schuh5212cad2020-09-09 23:12:09 -0700405 }
406
Brian Silverman354697a2020-09-22 21:06:32 -0700407 // And copy the config so we have it forever, removing the size prefix.
408 ResizeableBuffer data;
Austin Schuhadd6eb32020-11-09 21:24:26 -0800409 data.resize(data_span.size());
410 memcpy(data.data(), data_span.begin(), data.size());
Austin Schuhe09beb12020-12-11 20:04:27 -0800411 SizePrefixedFlatbufferVector<MessageHeader> result(std::move(data));
412 if (!result.Verify()) {
413 return std::nullopt;
414 }
415 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -0700416}
417
Austin Schuh05b70472020-01-01 17:11:17 -0800418MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -0700419 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -0800420 raw_log_file_header_(
421 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Austin Schuh05b70472020-01-01 17:11:17 -0800422 // Make sure we have enough to read the size.
Austin Schuh97789fc2020-08-01 14:42:45 -0700423 absl::Span<const uint8_t> header_data = span_reader_.ReadMessage();
Austin Schuh05b70472020-01-01 17:11:17 -0800424
425 // Make sure something was read.
Austin Schuh97789fc2020-08-01 14:42:45 -0700426 CHECK(header_data != absl::Span<const uint8_t>())
427 << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -0800428
Austin Schuh97789fc2020-08-01 14:42:45 -0700429 // And copy the header data so we have it forever.
Brian Silverman354697a2020-09-22 21:06:32 -0700430 ResizeableBuffer header_data_copy;
Austin Schuhadd6eb32020-11-09 21:24:26 -0800431 header_data_copy.resize(header_data.size());
432 memcpy(header_data_copy.data(), header_data.begin(), header_data_copy.size());
Austin Schuh97789fc2020-08-01 14:42:45 -0700433 raw_log_file_header_ =
Austin Schuhadd6eb32020-11-09 21:24:26 -0800434 SizePrefixedFlatbufferVector<LogFileHeader>(std::move(header_data_copy));
Austin Schuh05b70472020-01-01 17:11:17 -0800435
Austin Schuhcde938c2020-02-02 17:30:07 -0800436 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -0800437 FLAGS_max_out_of_order > 0
438 ? chrono::duration_cast<chrono::nanoseconds>(
439 chrono::duration<double>(FLAGS_max_out_of_order))
440 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -0800441
442 VLOG(1) << "Opened " << filename << " as node "
443 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -0800444}
445
Austin Schuhadd6eb32020-11-09 21:24:26 -0800446std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
447MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800448 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
449 if (msg_data == absl::Span<const uint8_t>()) {
450 return std::nullopt;
451 }
452
Brian Silverman354697a2020-09-22 21:06:32 -0700453 ResizeableBuffer result_buffer;
Austin Schuhadd6eb32020-11-09 21:24:26 -0800454 result_buffer.resize(msg_data.size());
455 memcpy(result_buffer.data(), msg_data.begin(), result_buffer.size());
456 SizePrefixedFlatbufferVector<MessageHeader> result(std::move(result_buffer));
Austin Schuh05b70472020-01-01 17:11:17 -0800457
458 const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
459 chrono::nanoseconds(result.message().monotonic_sent_time()));
460
461 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuh8bd96322020-02-13 21:18:22 -0800462 VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(result);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800463 return std::move(result);
Austin Schuh05b70472020-01-01 17:11:17 -0800464}
465
Austin Schuhc41603c2020-10-11 16:17:37 -0700466PartsMessageReader::PartsMessageReader(LogParts log_parts)
467 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {}
468
Austin Schuhadd6eb32020-11-09 21:24:26 -0800469std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
Austin Schuhc41603c2020-10-11 16:17:37 -0700470PartsMessageReader::ReadMessage() {
471 while (!done_) {
Austin Schuhadd6eb32020-11-09 21:24:26 -0800472 std::optional<SizePrefixedFlatbufferVector<MessageHeader>> message =
Austin Schuhc41603c2020-10-11 16:17:37 -0700473 message_reader_.ReadMessage();
474 if (message) {
475 newest_timestamp_ = message_reader_.newest_timestamp();
Austin Schuh32f68492020-11-08 21:45:51 -0800476 const monotonic_clock::time_point monotonic_sent_time(
477 chrono::nanoseconds(message->message().monotonic_sent_time()));
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800478 // TODO(austin): Does this work with startup? Might need to use the start
479 // time.
480 // TODO(austin): Does this work with startup when we don't know the remote
481 // start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -0800482 if (monotonic_sent_time >
483 parts_.monotonic_start_time + max_out_of_order_duration()) {
484 after_start_ = true;
485 }
486 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -0800487 CHECK_GE(monotonic_sent_time,
488 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -0800489 << ": Max out of order of " << max_out_of_order_duration().count()
490 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -0800491 << parts_.monotonic_start_time << " currently reading "
492 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -0800493 }
Austin Schuhc41603c2020-10-11 16:17:37 -0700494 return message;
495 }
496 NextLog();
497 }
Austin Schuh32f68492020-11-08 21:45:51 -0800498 newest_timestamp_ = monotonic_clock::max_time;
Austin Schuhc41603c2020-10-11 16:17:37 -0700499 return std::nullopt;
500}
501
502void PartsMessageReader::NextLog() {
503 if (next_part_index_ == parts_.parts.size()) {
504 done_ = true;
505 return;
506 }
507 message_reader_ = MessageReader(parts_.parts[next_part_index_]);
508 ++next_part_index_;
509}
510
Austin Schuh1be0ce42020-11-29 22:43:26 -0800511bool Message::operator<(const Message &m2) const {
512 if (this->timestamp < m2.timestamp) {
513 return true;
514 } else if (this->timestamp > m2.timestamp) {
515 return false;
516 }
517
518 if (this->channel_index < m2.channel_index) {
519 return true;
520 } else if (this->channel_index > m2.channel_index) {
521 return false;
522 }
523
524 return this->queue_index < m2.queue_index;
525}
526
527bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800528bool Message::operator==(const Message &m2) const {
529 return timestamp == m2.timestamp && channel_index == m2.channel_index &&
530 queue_index == m2.queue_index;
531}
Austin Schuh1be0ce42020-11-29 22:43:26 -0800532
533std::ostream &operator<<(std::ostream &os, const Message &m) {
534 os << "{.channel_index=" << m.channel_index
Austin Schuhd2f96102020-12-01 20:27:29 -0800535 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
536 if (m.data.Verify()) {
537 os << ", .data="
538 << aos::FlatbufferToJson(m.data,
539 {.multi_line = false, .max_vector_size = 1});
540 }
541 os << "}";
542 return os;
543}
544
545std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
546 os << "{.channel_index=" << m.channel_index
547 << ", .queue_index=" << m.queue_index
548 << ", .monotonic_event_time=" << m.monotonic_event_time
549 << ", .realtime_event_time=" << m.realtime_event_time;
550 if (m.remote_queue_index != 0xffffffff) {
551 os << ", .remote_queue_index=" << m.remote_queue_index;
552 }
553 if (m.monotonic_remote_time != monotonic_clock::min_time) {
554 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
555 }
556 if (m.realtime_remote_time != realtime_clock::min_time) {
557 os << ", .realtime_remote_time=" << m.realtime_remote_time;
558 }
Austin Schuh8bf1e632021-01-02 22:41:04 -0800559 if (m.monotonic_timestamp_time != monotonic_clock::min_time) {
560 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
561 }
Austin Schuhd2f96102020-12-01 20:27:29 -0800562 if (m.data.Verify()) {
563 os << ", .data="
564 << aos::FlatbufferToJson(m.data,
565 {.multi_line = false, .max_vector_size = 1});
566 }
567 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -0800568 return os;
569}
570
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800571LogPartsSorter::LogPartsSorter(LogParts log_parts)
572 : parts_message_reader_(log_parts) {}
573
574Message *LogPartsSorter::Front() {
575 // Queue up data until enough data has been queued that the front message is
576 // sorted enough to be safe to pop. This may do nothing, so we should make
577 // sure the nothing path is checked quickly.
578 if (sorted_until() != monotonic_clock::max_time) {
579 while (true) {
Austin Schuhb000de62020-12-03 22:00:40 -0800580 if (!messages_.empty() && messages_.begin()->timestamp < sorted_until() &&
581 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800582 break;
583 }
584
585 std::optional<SizePrefixedFlatbufferVector<MessageHeader>> m =
586 parts_message_reader_.ReadMessage();
587 // No data left, sorted forever, work through what is left.
588 if (!m) {
589 sorted_until_ = monotonic_clock::max_time;
590 break;
591 }
592
593 messages_.insert(
594 {.channel_index = m.value().message().channel_index(),
595 .queue_index = m.value().message().queue_index(),
596 .timestamp = monotonic_clock::time_point(std::chrono::nanoseconds(
597 m.value().message().monotonic_sent_time())),
598 .data = std::move(m.value())});
599
600 // Now, update sorted_until_ to match the new message.
601 if (parts_message_reader_.newest_timestamp() >
602 monotonic_clock::min_time +
603 parts_message_reader_.max_out_of_order_duration()) {
604 sorted_until_ = parts_message_reader_.newest_timestamp() -
605 parts_message_reader_.max_out_of_order_duration();
606 } else {
607 sorted_until_ = monotonic_clock::min_time;
608 }
609 }
610 }
611
612 // Now that we have enough data queued, return a pointer to the oldest piece
613 // of data if it exists.
614 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -0800615 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800616 return nullptr;
617 }
618
Austin Schuh315b96b2020-12-11 21:21:12 -0800619 CHECK_GE(messages_.begin()->timestamp, last_message_time_)
620 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuhb000de62020-12-03 22:00:40 -0800621 last_message_time_ = messages_.begin()->timestamp;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800622 return &(*messages_.begin());
623}
624
625void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
626
627std::string LogPartsSorter::DebugString() const {
628 std::stringstream ss;
629 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -0800630 int count = 0;
631 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800632 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -0800633 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
634 ss << m << "\n";
635 } else if (no_dots) {
636 ss << "...\n";
637 no_dots = false;
638 }
639 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -0800640 }
641 ss << "] <- " << parts_message_reader_.filename();
642 return ss.str();
643}
644
Austin Schuhd2f96102020-12-01 20:27:29 -0800645NodeMerger::NodeMerger(std::vector<LogParts> parts) {
646 CHECK_GE(parts.size(), 1u);
647 const std::string part0_node = parts[0].node;
648 for (size_t i = 1; i < parts.size(); ++i) {
649 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
650 }
651 for (LogParts &part : parts) {
652 parts_sorters_.emplace_back(std::move(part));
653 }
654
Austin Schuh0ca51f32020-12-25 21:51:45 -0800655 node_ = configuration::GetNodeIndex(configuration(), part0_node);
Austin Schuhd2f96102020-12-01 20:27:29 -0800656
657 monotonic_start_time_ = monotonic_clock::max_time;
658 realtime_start_time_ = realtime_clock::max_time;
659 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
660 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
661 monotonic_start_time_ = parts_sorter.monotonic_start_time();
662 realtime_start_time_ = parts_sorter.realtime_start_time();
663 }
664 }
665}
Austin Schuh8f52ed52020-11-30 23:12:39 -0800666
Austin Schuh0ca51f32020-12-25 21:51:45 -0800667std::vector<const LogParts *> NodeMerger::Parts() const {
668 std::vector<const LogParts *> p;
669 p.reserve(parts_sorters_.size());
670 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
671 p.emplace_back(&parts_sorter.parts());
672 }
673 return p;
674}
675
Austin Schuh8f52ed52020-11-30 23:12:39 -0800676Message *NodeMerger::Front() {
677 // Return the current Front if we have one, otherwise go compute one.
678 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -0800679 Message *result = current_->Front();
680 CHECK_GE(result->timestamp, last_message_time_);
681 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800682 }
683
684 // Otherwise, do a simple search for the oldest message, deduplicating any
685 // duplicates.
686 Message *oldest = nullptr;
687 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -0800688 for (LogPartsSorter &parts_sorter : parts_sorters_) {
689 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -0800690 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800691 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -0800692 continue;
693 }
694 if (oldest == nullptr || *m < *oldest) {
695 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -0800696 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -0800697 } else if (*m == *oldest) {
Austin Schuh8bf1e632021-01-02 22:41:04 -0800698 // Found a duplicate. If there is a choice, we want the one which has the
699 // timestamp time.
700 if (!m->data.message().has_monotonic_timestamp_time()) {
701 parts_sorter.PopFront();
702 } else if (!oldest->data.message().has_monotonic_timestamp_time()) {
703 current_->PopFront();
704 current_ = &parts_sorter;
705 oldest = m;
706 } else {
707 CHECK_EQ(m->data.message().monotonic_timestamp_time(),
708 oldest->data.message().monotonic_timestamp_time());
709 parts_sorter.PopFront();
710 }
Austin Schuh8f52ed52020-11-30 23:12:39 -0800711 }
712
713 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -0800714 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -0800715 }
716
Austin Schuhb000de62020-12-03 22:00:40 -0800717 if (oldest) {
718 CHECK_GE(oldest->timestamp, last_message_time_);
719 last_message_time_ = oldest->timestamp;
720 } else {
721 last_message_time_ = monotonic_clock::max_time;
722 }
723
Austin Schuh8f52ed52020-11-30 23:12:39 -0800724 // Return the oldest message found. This will be nullptr if nothing was
725 // found, indicating there is nothing left.
726 return oldest;
727}
728
729void NodeMerger::PopFront() {
730 CHECK(current_ != nullptr) << "Popping before calling Front()";
731 current_->PopFront();
732 current_ = nullptr;
733}
734
Austin Schuhd2f96102020-12-01 20:27:29 -0800735TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
736 : node_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -0800737 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh0ca51f32020-12-25 21:51:45 -0800738 for (const LogParts *part : node_merger_.Parts()) {
739 if (!configuration_) {
740 configuration_ = part->config;
741 } else {
742 CHECK_EQ(configuration_.get(), part->config.get());
743 }
744 }
745 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -0800746 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
747 // pretty simple.
748 if (configuration::MultiNode(config)) {
749 nodes_data_.resize(config->nodes()->size());
750 const Node *my_node = config->nodes()->Get(node());
751 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
752 const Node *node = config->nodes()->Get(node_index);
753 NodeData *node_data = &nodes_data_[node_index];
754 node_data->channels.resize(config->channels()->size());
755 // We should save the channel if it is delivered to the node represented
756 // by the NodeData, but not sent by that node. That combo means it is
757 // forwarded.
758 size_t channel_index = 0;
759 node_data->any_delivered = false;
760 for (const Channel *channel : *config->channels()) {
761 node_data->channels[channel_index].delivered =
762 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -0800763 configuration::ChannelIsSendableOnNode(channel, my_node) &&
764 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -0800765 node_data->any_delivered = node_data->any_delivered ||
766 node_data->channels[channel_index].delivered;
767 ++channel_index;
768 }
769 }
770
771 for (const Channel *channel : *config->channels()) {
772 source_node_.emplace_back(configuration::GetNodeIndex(
773 config, channel->source_node()->string_view()));
774 }
775 }
776}
777
778void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -0800779 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -0800780 CHECK_NE(timestamp_mapper->node(), node());
781 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
782
783 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
784 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
785 // we could needlessly save data.
786 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -0800787 VLOG(1) << "Registering on node " << node() << " for peer node "
788 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -0800789 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
790
791 timestamp_mapper->nodes_data_[node()].peer = this;
792 }
793}
794
Austin Schuh79b30942021-01-24 22:32:21 -0800795void TimestampMapper::QueueMessage(Message *m) {
796 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -0800797 .channel_index = m->channel_index,
798 .queue_index = m->queue_index,
799 .monotonic_event_time = m->timestamp,
800 .realtime_event_time = aos::realtime_clock::time_point(
801 std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
802 .remote_queue_index = 0xffffffff,
803 .monotonic_remote_time = monotonic_clock::min_time,
804 .realtime_remote_time = realtime_clock::min_time,
Austin Schuh8bf1e632021-01-02 22:41:04 -0800805 .monotonic_timestamp_time = monotonic_clock::min_time,
Austin Schuh79b30942021-01-24 22:32:21 -0800806 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -0800807}
808
809TimestampedMessage *TimestampMapper::Front() {
810 // No need to fetch anything new. A previous message still exists.
811 switch (first_message_) {
812 case FirstMessage::kNeedsUpdate:
813 break;
814 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -0800815 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -0800816 case FirstMessage::kNullptr:
817 return nullptr;
818 }
819
Austin Schuh79b30942021-01-24 22:32:21 -0800820 if (matched_messages_.empty()) {
821 if (!QueueMatched()) {
822 first_message_ = FirstMessage::kNullptr;
823 return nullptr;
824 }
825 }
826 first_message_ = FirstMessage::kInMessage;
827 return &matched_messages_.front();
828}
829
830bool TimestampMapper::QueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -0800831 if (nodes_data_.empty()) {
832 // Simple path. We are single node, so there are no timestamps to match!
833 CHECK_EQ(messages_.size(), 0u);
834 Message *m = node_merger_.Front();
835 if (!m) {
Austin Schuh79b30942021-01-24 22:32:21 -0800836 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -0800837 }
Austin Schuh79b30942021-01-24 22:32:21 -0800838 // Enqueue this message into matched_messages_ so we have a place to
839 // associate remote timestamps, and return it.
840 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -0800841
Austin Schuh79b30942021-01-24 22:32:21 -0800842 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
843 last_message_time_ = matched_messages_.back().monotonic_event_time;
844
845 // We are thin wrapper around node_merger. Call it directly.
846 node_merger_.PopFront();
847 timestamp_callback_(&matched_messages_.back());
848 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -0800849 }
850
851 // We need to only add messages to the list so they get processed for messages
852 // which are delivered. Reuse the flow below which uses messages_ by just
853 // adding the new message to messages_ and continuing.
854 if (messages_.empty()) {
855 if (!Queue()) {
856 // Found nothing to add, we are out of data!
Austin Schuh79b30942021-01-24 22:32:21 -0800857 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -0800858 }
859
860 // Now that it has been added (and cannibalized), forget about it upstream.
861 node_merger_.PopFront();
862 }
863
864 Message *m = &(messages_.front());
865
866 if (source_node_[m->channel_index] == node()) {
867 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -0800868 QueueMessage(m);
869 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
870 last_message_time_ = matched_messages_.back().monotonic_event_time;
871 messages_.pop_front();
872 timestamp_callback_(&matched_messages_.back());
873 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -0800874 } else {
875 // Got a timestamp, find the matching remote data, match it, and return it.
876 Message data = MatchingMessageFor(*m);
877
878 // Return the data from the remote. The local message only has timestamp
879 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -0800880 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -0800881 .channel_index = m->channel_index,
882 .queue_index = m->queue_index,
883 .monotonic_event_time = m->timestamp,
884 .realtime_event_time = aos::realtime_clock::time_point(
885 std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
886 .remote_queue_index = m->data.message().remote_queue_index(),
887 .monotonic_remote_time =
888 monotonic_clock::time_point(std::chrono::nanoseconds(
889 m->data.message().monotonic_remote_time())),
890 .realtime_remote_time = realtime_clock::time_point(
891 std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
Austin Schuh8bf1e632021-01-02 22:41:04 -0800892 .monotonic_timestamp_time =
893 monotonic_clock::time_point(std::chrono::nanoseconds(
894 m->data.message().monotonic_timestamp_time())),
Austin Schuh79b30942021-01-24 22:32:21 -0800895 .data = std::move(data.data)});
896 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
897 last_message_time_ = matched_messages_.back().monotonic_event_time;
898 // Since messages_ holds the data, drop it.
899 messages_.pop_front();
900 timestamp_callback_(&matched_messages_.back());
901 return true;
902 }
903}
904
905void TimestampMapper::QueueUntil(monotonic_clock::time_point queue_time) {
906 while (last_message_time_ <= queue_time) {
907 if (!QueueMatched()) {
908 return;
909 }
Austin Schuhd2f96102020-12-01 20:27:29 -0800910 }
911}
912
Austin Schuhe639ea12021-01-25 13:00:22 -0800913void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
914 // Make sure we have something queued first. This makes the end time
915 // calculation simpler, and is typically what folks want regardless.
916 if (matched_messages_.empty()) {
917 if (!QueueMatched()) {
918 return;
919 }
920 }
921
922 const aos::monotonic_clock::time_point end_queue_time =
923 std::max(monotonic_start_time(),
924 matched_messages_.front().monotonic_event_time) +
925 time_estimation_buffer;
926
927 // Place sorted messages on the list until we have
928 // --time_estimation_buffer_seconds seconds queued up (but queue at least
929 // until the log starts).
930 while (end_queue_time >= last_message_time_) {
931 if (!QueueMatched()) {
932 return;
933 }
934 }
935}
936
Austin Schuhd2f96102020-12-01 20:27:29 -0800937void TimestampMapper::PopFront() {
938 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
939 first_message_ = FirstMessage::kNeedsUpdate;
940
Austin Schuh79b30942021-01-24 22:32:21 -0800941 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -0800942}
943
944Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -0800945 // Figure out what queue index we are looking for.
946 CHECK(message.data.message().has_remote_queue_index());
947 const uint32_t remote_queue_index =
948 message.data.message().remote_queue_index();
949
950 CHECK(message.data.message().has_monotonic_remote_time());
951 CHECK(message.data.message().has_realtime_remote_time());
952
953 const monotonic_clock::time_point monotonic_remote_time(
954 std::chrono::nanoseconds(message.data.message().monotonic_remote_time()));
955 const realtime_clock::time_point realtime_remote_time(
956 std::chrono::nanoseconds(message.data.message().realtime_remote_time()));
957
Austin Schuhfecf1d82020-12-19 16:57:28 -0800958 TimestampMapper *peer = nodes_data_[source_node_[message.channel_index]].peer;
959
960 // We only register the peers which we have data for. So, if we are being
961 // asked to pull a timestamp from a peer which doesn't exist, return an empty
962 // message.
963 if (peer == nullptr) {
964 return Message{
965 .channel_index = message.channel_index,
966 .queue_index = remote_queue_index,
967 .timestamp = monotonic_remote_time,
968 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
969 }
970
971 // The queue which will have the matching data, if available.
972 std::deque<Message> *data_queue =
973 &peer->nodes_data_[node()].channels[message.channel_index].messages;
974
Austin Schuh79b30942021-01-24 22:32:21 -0800975 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -0800976
977 if (data_queue->empty()) {
978 return Message{
979 .channel_index = message.channel_index,
980 .queue_index = remote_queue_index,
981 .timestamp = monotonic_remote_time,
982 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
983 }
984
Austin Schuhd2f96102020-12-01 20:27:29 -0800985 if (remote_queue_index < data_queue->front().queue_index ||
986 remote_queue_index > data_queue->back().queue_index) {
987 return Message{
988 .channel_index = message.channel_index,
989 .queue_index = remote_queue_index,
990 .timestamp = monotonic_remote_time,
991 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
992 }
993
Austin Schuh993ccb52020-12-12 15:59:32 -0800994 // The algorithm below is constant time with some assumptions. We need there
995 // to be no missing messages in the data stream. This also assumes a queue
996 // hasn't wrapped. That is conservative, but should let us get started.
997 if (data_queue->back().queue_index - data_queue->front().queue_index + 1u ==
998 data_queue->size()) {
999 // Pull the data out and confirm that the timestamps match as expected.
1000 Message result = std::move(
1001 (*data_queue)[remote_queue_index - data_queue->front().queue_index]);
1002
1003 CHECK_EQ(result.timestamp, monotonic_remote_time)
1004 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1005 CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
1006 result.data.message().realtime_sent_time())),
1007 realtime_remote_time)
1008 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1009 // Now drop the data off the front. We have deduplicated timestamps, so we
1010 // are done. And all the data is in order.
1011 data_queue->erase(data_queue->begin(),
1012 data_queue->begin() + (1 + remote_queue_index -
1013 data_queue->front().queue_index));
1014 return result;
1015 } else {
1016 auto it = std::find_if(data_queue->begin(), data_queue->end(),
1017 [remote_queue_index](const Message &m) {
1018 return m.queue_index == remote_queue_index;
1019 });
1020 if (it == data_queue->end()) {
1021 return Message{
1022 .channel_index = message.channel_index,
1023 .queue_index = remote_queue_index,
1024 .timestamp = monotonic_remote_time,
1025 .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
1026 }
1027
1028 Message result = std::move(*it);
1029
1030 CHECK_EQ(result.timestamp, monotonic_remote_time)
1031 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1032 CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
1033 result.data.message().realtime_sent_time())),
1034 realtime_remote_time)
1035 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1036
1037 data_queue->erase(it);
1038
1039 return result;
1040 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001041}
1042
Austin Schuh79b30942021-01-24 22:32:21 -08001043void TimestampMapper::QueueUnmatchedUntil(monotonic_clock::time_point t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001044 if (queued_until_ > t) {
1045 return;
1046 }
1047 while (true) {
1048 if (!messages_.empty() && messages_.back().timestamp > t) {
1049 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
1050 return;
1051 }
1052
1053 if (!Queue()) {
1054 // Found nothing to add, we are out of data!
1055 queued_until_ = monotonic_clock::max_time;
1056 return;
1057 }
1058
1059 // Now that it has been added (and cannibalized), forget about it upstream.
1060 node_merger_.PopFront();
1061 }
1062}
1063
1064bool TimestampMapper::Queue() {
1065 Message *m = node_merger_.Front();
1066 if (m == nullptr) {
1067 return false;
1068 }
1069 for (NodeData &node_data : nodes_data_) {
1070 if (!node_data.any_delivered) continue;
1071 if (node_data.channels[m->channel_index].delivered) {
1072 // TODO(austin): This copies the data... Probably not worth stressing
1073 // about yet.
1074 // TODO(austin): Bound how big this can get. We tend not to send massive
1075 // data, so we can probably ignore this for a bit.
1076 node_data.channels[m->channel_index].messages.emplace_back(*m);
1077 }
1078 }
1079
1080 messages_.emplace_back(std::move(*m));
1081 return true;
1082}
1083
1084std::string TimestampMapper::DebugString() const {
1085 std::stringstream ss;
1086 ss << "node " << node() << " [\n";
1087 for (const Message &message : messages_) {
1088 ss << " " << message << "\n";
1089 }
1090 ss << "] queued_until " << queued_until_;
1091 for (const NodeData &ns : nodes_data_) {
1092 if (ns.peer == nullptr) continue;
1093 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
1094 size_t channel_index = 0;
1095 for (const NodeData::ChannelData &channel_data :
1096 ns.peer->nodes_data_[node()].channels) {
1097 if (channel_data.messages.empty()) {
1098 continue;
1099 }
Austin Schuhb000de62020-12-03 22:00:40 -08001100
Austin Schuhd2f96102020-12-01 20:27:29 -08001101 ss << " channel " << channel_index << " [\n";
1102 for (const Message &m : channel_data.messages) {
1103 ss << " " << m << "\n";
1104 }
1105 ss << " ]\n";
1106 ++channel_index;
1107 }
1108 ss << "] queued_until " << ns.peer->queued_until_;
1109 }
1110 return ss.str();
1111}
1112
Austin Schuhee711052020-08-24 16:06:09 -07001113std::string MaybeNodeName(const Node *node) {
1114 if (node != nullptr) {
1115 return node->name()->str() + " ";
1116 }
1117 return "";
1118}
1119
Brian Silvermanf51499a2020-09-21 12:49:08 -07001120} // namespace aos::logger