blob: 58c0fc353526a6d95762cb0973aa6ede5f1317cb [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Austin Schuha36c8902019-12-30 18:07:15 -080010
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070013#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080014#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080015#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "gflags/gflags.h"
18#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080019
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070020#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070021#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070022#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070023#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070024#else
25#define ENABLE_LZMA 0
26#endif
27
28#if ENABLE_LZMA
29#include "aos/events/logging/lzma_encoder.h"
30#endif
Austin Schuh86110712022-09-16 15:40:54 -070031#if ENABLE_S3
32#include "aos/events/logging/s3_fetcher.h"
33#endif
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070034
Austin Schuh48d10d62022-10-16 22:19:23 -070035DEFINE_int32(flush_size, 128 * 1024,
Austin Schuha36c8902019-12-30 18:07:15 -080036 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070037DEFINE_double(
38 flush_period, 5.0,
39 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080040
Austin Schuha040c3f2021-02-13 16:09:07 -080041DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080042 max_network_delay, 1.0,
43 "Max time to assume a message takes to cross the network before we are "
44 "willing to drop it from our buffers and assume it didn't make it. "
45 "Increasing this number can increase memory usage depending on the packet "
46 "loss of your network or if the timestamps aren't logged for a message.");
47
48DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080049 max_out_of_order, -1,
50 "If set, this overrides the max out of order duration for a log file.");
51
Austin Schuh0e8db662021-07-06 10:43:47 -070052DEFINE_bool(workaround_double_headers, true,
53 "Some old log files have two headers at the beginning. Use the "
54 "last header as the actual header.");
55
Brian Smarttea913d42021-12-10 15:02:38 -080056DEFINE_bool(crash_on_corrupt_message, true,
57 "When true, MessageReader will crash the first time a message "
58 "with corrupted format is found. When false, the crash will be "
59 "suppressed, and any remaining readable messages will be "
60 "evaluated to present verified vs corrupted stats.");
61
62DEFINE_bool(ignore_corrupt_messages, false,
63 "When true, and crash_on_corrupt_message is false, then any "
64 "corrupt message found by MessageReader be silently ignored, "
65 "providing access to all uncorrupted messages in a logfile.");
66
Austin Schuh4c3cdb72023-02-11 15:05:26 -080067DEFINE_bool(direct, false,
68 "If true, write using O_DIRECT and write 512 byte aligned blocks "
69 "whenever possible.");
70
Brian Silvermanf51499a2020-09-21 12:49:08 -070071namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070072namespace {
Austin Schuha36c8902019-12-30 18:07:15 -080073
Austin Schuh05b70472020-01-01 17:11:17 -080074namespace chrono = std::chrono;
75
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070076template <typename T>
77void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
78 if (t.has_value()) {
79 *os << *t;
80 } else {
81 *os << "null";
82 }
83}
84} // namespace
85
Austin Schuh48d10d62022-10-16 22:19:23 -070086DetachedBufferWriter::DetachedBufferWriter(std::string_view filename,
87 std::unique_ptr<DataEncoder> encoder)
Austin Schuh4c3cdb72023-02-11 15:05:26 -080088 : filename_(filename),
89 encoder_(std::move(encoder)),
90 supports_odirect_(FLAGS_direct) {
91 iovec_.reserve(10);
Brian Silvermana9f2ec92020-10-06 18:00:53 -070092 if (!util::MkdirPIfSpace(filename, 0777)) {
93 ran_out_of_space_ = true;
94 } else {
James Kuszmaul9776b392023-01-14 14:08:08 -080095 fd_ = open(filename_.c_str(), O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
Brian Silvermana9f2ec92020-10-06 18:00:53 -070096 if (fd_ == -1 && errno == ENOSPC) {
97 ran_out_of_space_ = true;
98 } else {
Austin Schuh58646e22021-08-23 23:51:46 -070099 PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
100 << " for writing";
101 VLOG(1) << "Opened " << this->filename() << " for writing";
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800102
103 flags_ = fcntl(fd_, F_GETFL, 0);
104 PCHECK(flags_ >= 0) << ": Failed to get flags for " << this->filename();
105
106 EnableDirect();
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700107 }
108 }
Austin Schuha36c8902019-12-30 18:07:15 -0800109}
110
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800111void DetachedBufferWriter::EnableDirect() {
112 if (supports_odirect_ && !ODirectEnabled()) {
113 const int new_flags = flags_ | O_DIRECT;
114 // Track if we failed to set O_DIRECT. Note: Austin hasn't seen this call
115 // fail. The write call tends to fail instead.
116 if (fcntl(fd_, F_SETFL, new_flags) == -1) {
117 PLOG(WARNING) << "Failed to set O_DIRECT on " << filename();
118 supports_odirect_ = false;
119 } else {
120 VLOG(1) << "Enabled O_DIRECT on " << filename();
121 flags_ = new_flags;
122 }
123 }
124}
125
126void DetachedBufferWriter::DisableDirect() {
127 if (supports_odirect_ && ODirectEnabled()) {
128 flags_ = flags_ & (~O_DIRECT);
129 PCHECK(fcntl(fd_, F_SETFL, flags_) != -1) << ": Failed to disable O_DIRECT";
130 VLOG(1) << "Disabled O_DIRECT on " << filename();
131 }
132}
133
Austin Schuha36c8902019-12-30 18:07:15 -0800134DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700135 Close();
136 if (ran_out_of_space_) {
137 CHECK(acknowledge_ran_out_of_space_)
138 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -0700139 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700140}
141
Brian Silvermand90905f2020-09-23 14:42:56 -0700142DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700143 *this = std::move(other);
144}
145
Brian Silverman87ac0402020-09-17 14:47:01 -0700146// When other is destroyed "soon" (which it should be because we're getting an
147// rvalue reference to it), it will flush etc all the data we have queued up
148// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700149DetachedBufferWriter &DetachedBufferWriter::operator=(
150 DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700151 std::swap(filename_, other.filename_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700152 std::swap(encoder_, other.encoder_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700153 std::swap(fd_, other.fd_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700154 std::swap(ran_out_of_space_, other.ran_out_of_space_);
155 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700156 std::swap(iovec_, other.iovec_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700157 std::swap(max_write_time_, other.max_write_time_);
158 std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
159 std::swap(max_write_time_messages_, other.max_write_time_messages_);
160 std::swap(total_write_time_, other.total_write_time_);
161 std::swap(total_write_count_, other.total_write_count_);
162 std::swap(total_write_messages_, other.total_write_messages_);
163 std::swap(total_write_bytes_, other.total_write_bytes_);
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800164 std::swap(last_synced_bytes_, other.last_synced_bytes_);
165 std::swap(supports_odirect_, other.supports_odirect_);
166 std::swap(flags_, other.flags_);
167 std::swap(last_flush_time_, other.last_flush_time_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700168 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800169}
170
Austin Schuh8bdfc492023-02-11 12:53:13 -0800171void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *copier,
Austin Schuh7ef11a42023-02-04 17:15:12 -0800172 aos::monotonic_clock::time_point now) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700173 if (ran_out_of_space_) {
174 // We don't want any later data to be written after space becomes
175 // available, so refuse to write anything more once we've dropped data
176 // because we ran out of space.
Austin Schuh48d10d62022-10-16 22:19:23 -0700177 return;
Austin Schuha36c8902019-12-30 18:07:15 -0800178 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700179
Austin Schuh8bdfc492023-02-11 12:53:13 -0800180 const size_t message_size = copier->size();
181 size_t overall_bytes_written = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -0700182
Austin Schuh8bdfc492023-02-11 12:53:13 -0800183 // Keep writing chunks until we've written it all. If we end up with a
184 // partial write, this means we need to flush to disk.
185 do {
186 const size_t bytes_written = encoder_->Encode(copier, overall_bytes_written);
187 CHECK(bytes_written != 0);
188
189 overall_bytes_written += bytes_written;
190 if (overall_bytes_written < message_size) {
191 VLOG(1) << "Flushing because of a partial write, tried to write "
192 << message_size << " wrote " << overall_bytes_written;
193 Flush(now);
194 }
195 } while (overall_bytes_written < message_size);
196
Austin Schuhbd06ae42021-03-31 22:48:21 -0700197 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800198}
199
Brian Silverman0465fcf2020-09-24 00:29:18 -0700200void DetachedBufferWriter::Close() {
201 if (fd_ == -1) {
202 return;
203 }
204 encoder_->Finish();
205 while (encoder_->queue_size() > 0) {
Austin Schuh8bdfc492023-02-11 12:53:13 -0800206 Flush(monotonic_clock::max_time);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700207 }
208 if (close(fd_) == -1) {
209 if (errno == ENOSPC) {
210 ran_out_of_space_ = true;
211 } else {
212 PLOG(ERROR) << "Closing log file failed";
213 }
214 }
215 fd_ = -1;
Austin Schuh58646e22021-08-23 23:51:46 -0700216 VLOG(1) << "Closed " << filename();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700217}
218
Austin Schuh8bdfc492023-02-11 12:53:13 -0800219void DetachedBufferWriter::Flush(aos::monotonic_clock::time_point now) {
220 last_flush_time_ = now;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700221 if (ran_out_of_space_) {
222 // We don't want any later data to be written after space becomes available,
223 // so refuse to write anything more once we've dropped data because we ran
224 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700225 if (encoder_) {
226 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
227 encoder_->Clear(encoder_->queue().size());
228 } else {
229 VLOG(1) << "No queue to ignore";
230 }
231 return;
232 }
233
234 const auto queue = encoder_->queue();
235 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700236 return;
237 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700238
Austin Schuha36c8902019-12-30 18:07:15 -0800239 iovec_.clear();
Austin Schuh313d1ba2023-03-24 15:06:30 -0700240 const size_t original_iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
241 size_t iovec_size = original_iovec_size;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700242 iovec_.resize(iovec_size);
Austin Schuha36c8902019-12-30 18:07:15 -0800243 size_t counted_size = 0;
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800244
245 // Ok, we now need to figure out if we were aligned, and if we were, how much
246 // of the data we are being asked to write is aligned.
247 //
248 // The file is aligned if it is a multiple of kSector in length. The data is
249 // aligned if it's memory is kSector aligned, and the length is a multiple of
250 // kSector in length.
Austin Schuh313d1ba2023-03-24 15:06:30 -0700251 bool aligned = (file_written_bytes_ % kSector) == 0;
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800252 size_t write_index = 0;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700253 for (size_t i = 0; i < iovec_size; ++i) {
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800254 iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
255
256 // Make sure the address is aligned, or give up. This should be uncommon,
257 // but is always possible.
258 if ((reinterpret_cast<size_t>(iovec_[write_index].iov_base) &
259 (kSector - 1)) != 0) {
260 aligned = false;
261 }
262
263 // Now, see if the length is a multiple of kSector. The goal is to figure
264 // out if/how much memory we can write out with O_DIRECT so that only the
265 // last little bit is done with non-direct IO to keep it fast.
266 iovec_[write_index].iov_len = queue[i].size();
267 if ((iovec_[write_index].iov_len % kSector) != 0) {
268 VLOG(1) << "Unaligned length on " << filename();
269 // If we've got over a sector of data to write, write it out with O_DIRECT
270 // and then continue writing the rest unaligned.
271 if (aligned && iovec_[write_index].iov_len > kSector) {
272 const size_t aligned_size =
273 iovec_[write_index].iov_len & (~(kSector - 1));
274 VLOG(1) << "Was aligned, writing last chunk rounded from "
275 << queue[i].size() << " to " << aligned_size;
276 iovec_[write_index].iov_len = aligned_size;
277
278 WriteV(iovec_.data(), i + 1, true, counted_size + aligned_size);
279
280 // Now, everything before here has been written. Make an iovec out of
281 // the last bytes, and keep going.
282 iovec_size -= write_index;
283 iovec_.resize(iovec_size);
284 write_index = 0;
285 counted_size = 0;
286
287 iovec_[write_index].iov_base =
288 const_cast<uint8_t *>(queue[i].data() + aligned_size);
289 iovec_[write_index].iov_len = queue[i].size() - aligned_size;
290 }
291 aligned = false;
292 }
293 VLOG(1) << "Writing " << iovec_[write_index].iov_len << " to "
294 << filename();
295 counted_size += iovec_[write_index].iov_len;
296 ++write_index;
297 }
298
Austin Schuh313d1ba2023-03-24 15:06:30 -0700299 if (counted_size > 0) {
300 // Either write the aligned data if it is all aligned, or write the rest
301 // unaligned if we wrote aligned up above.
302 WriteV(iovec_.data(), iovec_.size(), aligned, counted_size);
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800303
Austin Schuh313d1ba2023-03-24 15:06:30 -0700304 encoder_->Clear(original_iovec_size);
305 }
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800306}
307
308size_t DetachedBufferWriter::WriteV(struct iovec *iovec_data, size_t iovec_size,
309 bool aligned, size_t counted_size) {
310 // Configure the file descriptor to match the mode we should be in. This is
311 // safe to over-call since it only does the syscall if needed.
312 if (aligned) {
313 EnableDirect();
314 } else {
315 DisableDirect();
Austin Schuha36c8902019-12-30 18:07:15 -0800316 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700317
318 const auto start = aos::monotonic_clock::now();
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800319 const ssize_t written = writev(fd_, iovec_data, iovec_size);
320
321 if (written > 0) {
322 // Flush asynchronously and force the data out of the cache.
Austin Schuh313d1ba2023-03-24 15:06:30 -0700323 sync_file_range(fd_, file_written_bytes_, written, SYNC_FILE_RANGE_WRITE);
324 if (file_written_bytes_ != 0) {
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800325 // Per Linus' recommendation online on how to do fast file IO, do a
326 // blocking flush of the previous write chunk, and then tell the kernel to
327 // drop the pages from the cache. This makes sure we can't get too far
328 // ahead.
329 sync_file_range(fd_, last_synced_bytes_,
Austin Schuh313d1ba2023-03-24 15:06:30 -0700330 file_written_bytes_ - last_synced_bytes_,
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800331 SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
332 SYNC_FILE_RANGE_WAIT_AFTER);
333 posix_fadvise(fd_, last_synced_bytes_,
Austin Schuh313d1ba2023-03-24 15:06:30 -0700334 file_written_bytes_ - last_synced_bytes_,
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800335 POSIX_FADV_DONTNEED);
336
Austin Schuh313d1ba2023-03-24 15:06:30 -0700337 last_synced_bytes_ = file_written_bytes_;
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800338 }
339 }
340
Brian Silvermanf51499a2020-09-21 12:49:08 -0700341 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700342 HandleWriteReturn(written, counted_size);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700343
Brian Silvermanf51499a2020-09-21 12:49:08 -0700344 UpdateStatsForWrite(end - start, written, iovec_size);
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800345
346 return written;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700347}
348
Brian Silverman0465fcf2020-09-24 00:29:18 -0700349void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
350 size_t write_size) {
351 if (write_return == -1 && errno == ENOSPC) {
352 ran_out_of_space_ = true;
353 return;
354 }
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800355 PCHECK(write_return >= 0) << ": write failed, got " << write_return;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700356 if (write_return < static_cast<ssize_t>(write_size)) {
357 // Sometimes this happens instead of ENOSPC. On a real filesystem, this
358 // never seems to happen in any other case. If we ever want to log to a
359 // socket, this will happen more often. However, until we get there, we'll
360 // just assume it means we ran out of space.
361 ran_out_of_space_ = true;
362 return;
363 }
364}
365
Brian Silvermanf51499a2020-09-21 12:49:08 -0700366void DetachedBufferWriter::UpdateStatsForWrite(
367 aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
368 if (duration > max_write_time_) {
369 max_write_time_ = duration;
370 max_write_time_bytes_ = written;
371 max_write_time_messages_ = iovec_size;
372 }
373 total_write_time_ += duration;
374 ++total_write_count_;
375 total_write_messages_ += iovec_size;
376 total_write_bytes_ += written;
Austin Schuh313d1ba2023-03-24 15:06:30 -0700377 file_written_bytes_ += written;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700378}
379
Austin Schuhbd06ae42021-03-31 22:48:21 -0700380void DetachedBufferWriter::FlushAtThreshold(
381 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700382 if (ran_out_of_space_) {
383 // We don't want any later data to be written after space becomes available,
384 // so refuse to write anything more once we've dropped data because we ran
385 // out of space.
386 if (encoder_) {
387 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
388 encoder_->Clear(encoder_->queue().size());
389 } else {
390 VLOG(1) << "No queue to ignore";
391 }
392 return;
393 }
394
Austin Schuhbd06ae42021-03-31 22:48:21 -0700395 // We don't want to flush the first time through. Otherwise we will flush as
396 // the log file header might be compressing, defeating any parallelism and
397 // queueing there.
398 if (last_flush_time_ == aos::monotonic_clock::min_time) {
399 last_flush_time_ = now;
400 }
401
Brian Silvermanf51499a2020-09-21 12:49:08 -0700402 // Flush if we are at the max number of iovs per writev, because there's no
403 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700404 // data queued up or if it has been long enough.
Austin Schuh8bdfc492023-02-11 12:53:13 -0800405 while (encoder_->space() == 0 ||
406 encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700407 encoder_->queue_size() >= IOV_MAX ||
408 now > last_flush_time_ +
409 chrono::duration_cast<chrono::nanoseconds>(
410 chrono::duration<double>(FLAGS_flush_period))) {
Austin Schuh8bdfc492023-02-11 12:53:13 -0800411 VLOG(1) << "Chose to flush at " << now << ", last " << last_flush_time_;
412 Flush(now);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700413 }
Austin Schuha36c8902019-12-30 18:07:15 -0800414}
415
Austin Schuhf2d0e682022-10-16 14:20:58 -0700416// Do the magic dance to convert the endianness of the data and append it to the
417// buffer.
418namespace {
419
420// TODO(austin): Look at the generated code to see if building the header is
421// efficient or not.
422template <typename T>
423uint8_t *Push(uint8_t *buffer, const T data) {
424 const T endian_data = flatbuffers::EndianScalar<T>(data);
425 std::memcpy(buffer, &endian_data, sizeof(T));
426 return buffer + sizeof(T);
427}
428
429uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
430 std::memcpy(buffer, data, size);
431 return buffer + size;
432}
433
434uint8_t *Pad(uint8_t *buffer, size_t padding) {
435 std::memset(buffer, 0, padding);
436 return buffer + padding;
437}
438} // namespace
439
440flatbuffers::Offset<MessageHeader> PackRemoteMessage(
441 flatbuffers::FlatBufferBuilder *fbb,
442 const message_bridge::RemoteMessage *msg, int channel_index,
443 const aos::monotonic_clock::time_point monotonic_timestamp_time) {
444 logger::MessageHeader::Builder message_header_builder(*fbb);
445 // Note: this must match the same order as MessageBridgeServer and
446 // PackMessage. We want identical headers to have identical
447 // on-the-wire formats to make comparing them easier.
448
449 message_header_builder.add_channel_index(channel_index);
450
451 message_header_builder.add_queue_index(msg->queue_index());
452 message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
453 message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
454
455 message_header_builder.add_monotonic_remote_time(
456 msg->monotonic_remote_time());
457 message_header_builder.add_realtime_remote_time(msg->realtime_remote_time());
458 message_header_builder.add_remote_queue_index(msg->remote_queue_index());
459
460 message_header_builder.add_monotonic_timestamp_time(
461 monotonic_timestamp_time.time_since_epoch().count());
462
463 return message_header_builder.Finish();
464}
465
466size_t PackRemoteMessageInline(
467 uint8_t *buffer, const message_bridge::RemoteMessage *msg,
468 int channel_index,
Austin Schuh71a40d42023-02-04 21:22:22 -0800469 const aos::monotonic_clock::time_point monotonic_timestamp_time,
470 size_t start_byte, size_t end_byte) {
Austin Schuhf2d0e682022-10-16 14:20:58 -0700471 const flatbuffers::uoffset_t message_size = PackRemoteMessageSize();
Austin Schuh71a40d42023-02-04 21:22:22 -0800472 DCHECK_EQ((start_byte % 8u), 0u);
473 DCHECK_EQ((end_byte % 8u), 0u);
474 DCHECK_LE(start_byte, end_byte);
475 DCHECK_LE(end_byte, message_size);
Austin Schuhf2d0e682022-10-16 14:20:58 -0700476
Austin Schuh71a40d42023-02-04 21:22:22 -0800477 switch (start_byte) {
478 case 0x00u:
479 if ((end_byte) == 0x00u) {
480 break;
481 }
482 // clang-format off
483 // header:
484 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
485 buffer = Push<flatbuffers::uoffset_t>(
486 buffer, message_size - sizeof(flatbuffers::uoffset_t));
487 // +0x04 | 20 00 00 00 | UOffset32 | 0x00000020 (32) Loc: +0x24 | offset to root table `aos.logger.MessageHeader`
488 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x20);
489 [[fallthrough]];
490 case 0x08u:
491 if ((end_byte) == 0x08u) {
492 break;
493 }
494 //
495 // padding:
496 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
497 buffer = Pad(buffer, 6);
498 //
499 // vtable (aos.logger.MessageHeader):
500 // +0x0E | 16 00 | uint16_t | 0x0016 (22) | size of this vtable
501 buffer = Push<flatbuffers::voffset_t>(buffer, 0x16);
502 [[fallthrough]];
503 case 0x10u:
504 if ((end_byte) == 0x10u) {
505 break;
506 }
507 // +0x10 | 3C 00 | uint16_t | 0x003C (60) | size of referring table
508 buffer = Push<flatbuffers::voffset_t>(buffer, 0x3c);
509 // +0x12 | 38 00 | VOffset16 | 0x0038 (56) | offset to field `channel_index` (id: 0)
510 buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
511 // +0x14 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `monotonic_sent_time` (id: 1)
512 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
513 // +0x16 | 24 00 | VOffset16 | 0x0024 (36) | offset to field `realtime_sent_time` (id: 2)
514 buffer = Push<flatbuffers::voffset_t>(buffer, 0x24);
515 [[fallthrough]];
516 case 0x18u:
517 if ((end_byte) == 0x18u) {
518 break;
519 }
520 // +0x18 | 34 00 | VOffset16 | 0x0034 (52) | offset to field `queue_index` (id: 3)
521 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
522 // +0x1A | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
523 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
524 // +0x1C | 1C 00 | VOffset16 | 0x001C (28) | offset to field `monotonic_remote_time` (id: 5)
525 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
526 // +0x1E | 14 00 | VOffset16 | 0x0014 (20) | offset to field `realtime_remote_time` (id: 6)
527 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
528 [[fallthrough]];
529 case 0x20u:
530 if ((end_byte) == 0x20u) {
531 break;
532 }
533 // +0x20 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `remote_queue_index` (id: 7)
534 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
535 // +0x22 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `monotonic_timestamp_time` (id: 8)
536 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
537 //
538 // root_table (aos.logger.MessageHeader):
539 // +0x24 | 16 00 00 00 | SOffset32 | 0x00000016 (22) Loc: +0x0E | offset to vtable
540 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x16);
541 [[fallthrough]];
542 case 0x28u:
543 if ((end_byte) == 0x28u) {
544 break;
545 }
546 // +0x28 | F6 0B D8 11 A4 A8 B1 71 | int64_t | 0x71B1A8A411D80BF6 (8192514619791117302) | table field `monotonic_timestamp_time` (Long)
547 buffer = Push<int64_t>(buffer,
548 monotonic_timestamp_time.time_since_epoch().count());
549 [[fallthrough]];
550 case 0x30u:
551 if ((end_byte) == 0x30u) {
552 break;
553 }
554 // +0x30 | 00 00 00 00 | uint8_t[4] | .... | padding
555 // TODO(austin): Can we re-arrange the order to ditch the padding?
556 // (Answer is yes, but what is the impact elsewhere? It will change the
557 // binary format)
558 buffer = Pad(buffer, 4);
559 // +0x34 | 75 00 00 00 | uint32_t | 0x00000075 (117) | table field `remote_queue_index` (UInt)
560 buffer = Push<uint32_t>(buffer, msg->remote_queue_index());
561 [[fallthrough]];
562 case 0x38u:
563 if ((end_byte) == 0x38u) {
564 break;
565 }
566 // +0x38 | AA B0 43 0A 35 BE FA D2 | int64_t | 0xD2FABE350A43B0AA (-3244071446552268630) | table field `realtime_remote_time` (Long)
567 buffer = Push<int64_t>(buffer, msg->realtime_remote_time());
568 [[fallthrough]];
569 case 0x40u:
570 if ((end_byte) == 0x40u) {
571 break;
572 }
573 // +0x40 | D5 40 30 F3 C1 A7 26 1D | int64_t | 0x1D26A7C1F33040D5 (2100550727665467605) | table field `monotonic_remote_time` (Long)
574 buffer = Push<int64_t>(buffer, msg->monotonic_remote_time());
575 [[fallthrough]];
576 case 0x48u:
577 if ((end_byte) == 0x48u) {
578 break;
579 }
580 // +0x48 | 5B 25 32 A1 4A E8 46 CA | int64_t | 0xCA46E84AA132255B (-3871151422448720549) | table field `realtime_sent_time` (Long)
581 buffer = Push<int64_t>(buffer, msg->realtime_sent_time());
582 [[fallthrough]];
583 case 0x50u:
584 if ((end_byte) == 0x50u) {
585 break;
586 }
587 // +0x50 | 49 7D 45 1F 8C 36 6B A3 | int64_t | 0xA36B368C1F457D49 (-6671178447571288759) | table field `monotonic_sent_time` (Long)
588 buffer = Push<int64_t>(buffer, msg->monotonic_sent_time());
589 [[fallthrough]];
590 case 0x58u:
591 if ((end_byte) == 0x58u) {
592 break;
593 }
594 // +0x58 | 33 00 00 00 | uint32_t | 0x00000033 (51) | table field `queue_index` (UInt)
595 buffer = Push<uint32_t>(buffer, msg->queue_index());
596 // +0x5C | 76 00 00 00 | uint32_t | 0x00000076 (118) | table field `channel_index` (UInt)
597 buffer = Push<uint32_t>(buffer, channel_index);
598 // clang-format on
599 [[fallthrough]];
600 case 0x60u:
601 if ((end_byte) == 0x60u) {
602 break;
603 }
604 }
Austin Schuhf2d0e682022-10-16 14:20:58 -0700605
Austin Schuh71a40d42023-02-04 21:22:22 -0800606 return end_byte - start_byte;
Austin Schuhf2d0e682022-10-16 14:20:58 -0700607}
608
Austin Schuha36c8902019-12-30 18:07:15 -0800609flatbuffers::Offset<MessageHeader> PackMessage(
610 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
611 int channel_index, LogType log_type) {
612 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
613
614 switch (log_type) {
615 case LogType::kLogMessage:
616 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800617 case LogType::kLogRemoteMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700618 // Since the timestamps are 8 byte aligned, we are going to end up adding
619 // padding in the middle of the message to pad everything out to 8 byte
620 // alignment. That's rather wasteful. To make things efficient to mmap
621 // while reading uncompressed logs, we'd actually rather the message be
622 // aligned. So, force 8 byte alignment (enough to preserve alignment
623 // inside the nested message so that we can read it without moving it)
624 // here.
625 fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8);
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700626 data_offset = fbb->CreateVector(
627 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800628 break;
629
630 case LogType::kLogDeliveryTimeOnly:
631 break;
632 }
633
634 MessageHeader::Builder message_header_builder(*fbb);
635 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800636
Austin Schuhfa30c352022-10-16 11:12:02 -0700637 // These are split out into very explicit serialization calls because the
638 // order here changes the order things are written out on the wire, and we
639 // want to control and understand it here. Changing the order can increase
640 // the amount of padding bytes in the middle.
641 //
James Kuszmaul9776b392023-01-14 14:08:08 -0800642 // It is also easier to follow... And doesn't actually make things much
643 // bigger.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800644 switch (log_type) {
645 case LogType::kLogRemoteMessage:
646 message_header_builder.add_queue_index(context.remote_queue_index);
Austin Schuhfa30c352022-10-16 11:12:02 -0700647 message_header_builder.add_data(data_offset);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800648 message_header_builder.add_monotonic_sent_time(
649 context.monotonic_remote_time.time_since_epoch().count());
650 message_header_builder.add_realtime_sent_time(
651 context.realtime_remote_time.time_since_epoch().count());
652 break;
653
Austin Schuh6f3babe2020-01-26 20:34:50 -0800654 case LogType::kLogDeliveryTimeOnly:
655 message_header_builder.add_queue_index(context.queue_index);
656 message_header_builder.add_monotonic_sent_time(
657 context.monotonic_event_time.time_since_epoch().count());
658 message_header_builder.add_realtime_sent_time(
659 context.realtime_event_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800660 message_header_builder.add_monotonic_remote_time(
661 context.monotonic_remote_time.time_since_epoch().count());
662 message_header_builder.add_realtime_remote_time(
663 context.realtime_remote_time.time_since_epoch().count());
664 message_header_builder.add_remote_queue_index(context.remote_queue_index);
665 break;
Austin Schuhfa30c352022-10-16 11:12:02 -0700666
667 case LogType::kLogMessage:
668 message_header_builder.add_queue_index(context.queue_index);
669 message_header_builder.add_data(data_offset);
670 message_header_builder.add_monotonic_sent_time(
671 context.monotonic_event_time.time_since_epoch().count());
672 message_header_builder.add_realtime_sent_time(
673 context.realtime_event_time.time_since_epoch().count());
674 break;
675
676 case LogType::kLogMessageAndDeliveryTime:
677 message_header_builder.add_queue_index(context.queue_index);
678 message_header_builder.add_remote_queue_index(context.remote_queue_index);
679 message_header_builder.add_monotonic_sent_time(
680 context.monotonic_event_time.time_since_epoch().count());
681 message_header_builder.add_realtime_sent_time(
682 context.realtime_event_time.time_since_epoch().count());
683 message_header_builder.add_monotonic_remote_time(
684 context.monotonic_remote_time.time_since_epoch().count());
685 message_header_builder.add_realtime_remote_time(
686 context.realtime_remote_time.time_since_epoch().count());
687 message_header_builder.add_data(data_offset);
688 break;
Austin Schuha36c8902019-12-30 18:07:15 -0800689 }
690
691 return message_header_builder.Finish();
692}
693
Austin Schuhfa30c352022-10-16 11:12:02 -0700694flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) {
695 switch (log_type) {
696 case LogType::kLogMessage:
697 return
698 // Root table size + offset.
699 sizeof(flatbuffers::uoffset_t) * 2 +
700 // 6 padding bytes to pad the header out properly.
701 6 +
702 // vtable header (size + size of table)
703 sizeof(flatbuffers::voffset_t) * 2 +
704 // offsets to all the fields.
705 sizeof(flatbuffers::voffset_t) * 5 +
706 // pointer to vtable
707 sizeof(flatbuffers::soffset_t) +
708 // pointer to data
709 sizeof(flatbuffers::uoffset_t) +
710 // realtime_sent_time, monotonic_sent_time
711 sizeof(int64_t) * 2 +
712 // queue_index, channel_index
713 sizeof(uint32_t) * 2;
714
715 case LogType::kLogDeliveryTimeOnly:
716 return
717 // Root table size + offset.
718 sizeof(flatbuffers::uoffset_t) * 2 +
719 // 6 padding bytes to pad the header out properly.
720 4 +
721 // vtable header (size + size of table)
722 sizeof(flatbuffers::voffset_t) * 2 +
723 // offsets to all the fields.
724 sizeof(flatbuffers::voffset_t) * 8 +
725 // pointer to vtable
726 sizeof(flatbuffers::soffset_t) +
727 // remote_queue_index
728 sizeof(uint32_t) +
729 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
730 // monotonic_sent_time
731 sizeof(int64_t) * 4 +
732 // queue_index, channel_index
733 sizeof(uint32_t) * 2;
734
735 case LogType::kLogMessageAndDeliveryTime:
736 return
737 // Root table size + offset.
738 sizeof(flatbuffers::uoffset_t) * 2 +
739 // 4 padding bytes to pad the header out properly.
740 4 +
741 // vtable header (size + size of table)
742 sizeof(flatbuffers::voffset_t) * 2 +
743 // offsets to all the fields.
744 sizeof(flatbuffers::voffset_t) * 8 +
745 // pointer to vtable
746 sizeof(flatbuffers::soffset_t) +
747 // pointer to data
748 sizeof(flatbuffers::uoffset_t) +
749 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
750 // monotonic_sent_time
751 sizeof(int64_t) * 4 +
752 // remote_queue_index, queue_index, channel_index
753 sizeof(uint32_t) * 3;
754
755 case LogType::kLogRemoteMessage:
756 return
757 // Root table size + offset.
758 sizeof(flatbuffers::uoffset_t) * 2 +
759 // 6 padding bytes to pad the header out properly.
760 6 +
761 // vtable header (size + size of table)
762 sizeof(flatbuffers::voffset_t) * 2 +
763 // offsets to all the fields.
764 sizeof(flatbuffers::voffset_t) * 5 +
765 // pointer to vtable
766 sizeof(flatbuffers::soffset_t) +
767 // realtime_sent_time, monotonic_sent_time
768 sizeof(int64_t) * 2 +
769 // pointer to data
770 sizeof(flatbuffers::uoffset_t) +
771 // queue_index, channel_index
772 sizeof(uint32_t) * 2;
773 }
774 LOG(FATAL);
775}
776
James Kuszmaul9776b392023-01-14 14:08:08 -0800777flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size) {
Austin Schuhfa30c352022-10-16 11:12:02 -0700778 static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
779 "Update size logic please.");
780 const flatbuffers::uoffset_t aligned_data_length =
Austin Schuh48d10d62022-10-16 22:19:23 -0700781 ((data_size + 7) & 0xfffffff8u);
Austin Schuhfa30c352022-10-16 11:12:02 -0700782 switch (log_type) {
783 case LogType::kLogDeliveryTimeOnly:
784 return PackMessageHeaderSize(log_type);
785
786 case LogType::kLogMessage:
787 case LogType::kLogMessageAndDeliveryTime:
788 case LogType::kLogRemoteMessage:
789 return PackMessageHeaderSize(log_type) +
790 // Vector...
791 sizeof(flatbuffers::uoffset_t) + aligned_data_length;
792 }
793 LOG(FATAL);
794}
795
Austin Schuhfa30c352022-10-16 11:12:02 -0700796size_t PackMessageInline(uint8_t *buffer, const Context &context,
Austin Schuh71a40d42023-02-04 21:22:22 -0800797 int channel_index, LogType log_type, size_t start_byte,
798 size_t end_byte) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700799 // TODO(austin): Figure out how to copy directly from shared memory instead of
800 // first into the fetcher's memory and then into here. That would save a lot
801 // of memory.
Austin Schuhfa30c352022-10-16 11:12:02 -0700802 const flatbuffers::uoffset_t message_size =
Austin Schuh48d10d62022-10-16 22:19:23 -0700803 PackMessageSize(log_type, context.size);
Austin Schuh71a40d42023-02-04 21:22:22 -0800804 DCHECK_EQ((message_size % 8), 0u) << ": Non 8 byte length...";
805 DCHECK_EQ((start_byte % 8u), 0u);
806 DCHECK_EQ((end_byte % 8u), 0u);
807 DCHECK_LE(start_byte, end_byte);
808 DCHECK_LE(end_byte, message_size);
Austin Schuhfa30c352022-10-16 11:12:02 -0700809
810 // Pack all the data in. This is brittle but easy to change. Use the
811 // InlinePackMessage.Equivilent unit test to verify everything matches.
812 switch (log_type) {
813 case LogType::kLogMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -0800814 switch (start_byte) {
815 case 0x00u:
816 if ((end_byte) == 0x00u) {
817 break;
818 }
819 // clang-format off
820 // header:
821 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
822 buffer = Push<flatbuffers::uoffset_t>(
823 buffer, message_size - sizeof(flatbuffers::uoffset_t));
824
825 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
826 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
827 [[fallthrough]];
828 case 0x08u:
829 if ((end_byte) == 0x08u) {
830 break;
831 }
832 //
833 // padding:
834 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
835 buffer = Pad(buffer, 6);
836 //
837 // vtable (aos.logger.MessageHeader):
838 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
839 buffer = Push<flatbuffers::voffset_t>(buffer, 0xe);
840 [[fallthrough]];
841 case 0x10u:
842 if ((end_byte) == 0x10u) {
843 break;
844 }
845 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
846 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
847 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
848 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
849 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
850 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
851 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
852 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
853 [[fallthrough]];
854 case 0x18u:
855 if ((end_byte) == 0x18u) {
856 break;
857 }
858 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
859 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
860 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
861 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
862 //
863 // root_table (aos.logger.MessageHeader):
864 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
865 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e);
866 [[fallthrough]];
867 case 0x20u:
868 if ((end_byte) == 0x20u) {
869 break;
870 }
871 // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long)
872 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
873 [[fallthrough]];
874 case 0x28u:
875 if ((end_byte) == 0x28u) {
876 break;
877 }
878 // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long)
879 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
880 [[fallthrough]];
881 case 0x30u:
882 if ((end_byte) == 0x30u) {
883 break;
884 }
885 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
886 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c);
887 // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt)
888 buffer = Push<uint32_t>(buffer, context.queue_index);
889 [[fallthrough]];
890 case 0x38u:
891 if ((end_byte) == 0x38u) {
892 break;
893 }
894 // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt)
895 buffer = Push<uint32_t>(buffer, channel_index);
896 //
897 // vector (aos.logger.MessageHeader.data):
898 // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items)
899 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
900 [[fallthrough]];
901 case 0x40u:
902 if ((end_byte) == 0x40u) {
903 break;
904 }
905 [[fallthrough]];
906 default:
907 // +0x40 | FF | uint8_t | 0xFF (255) | value[0]
908 // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1]
909 // +0x42 | EE | uint8_t | 0xEE (238) | value[2]
910 // +0x43 | 00 | uint8_t | 0x00 (0) | value[3]
911 // +0x44 | 20 | uint8_t | 0x20 (32) | value[4]
912 // +0x45 | 4D | uint8_t | 0x4D (77) | value[5]
913 // +0x46 | FF | uint8_t | 0xFF (255) | value[6]
914 // +0x47 | 25 | uint8_t | 0x25 (37) | value[7]
915 // +0x48 | 3C | uint8_t | 0x3C (60) | value[8]
916 // +0x49 | 17 | uint8_t | 0x17 (23) | value[9]
917 // +0x4A | 65 | uint8_t | 0x65 (101) | value[10]
918 // +0x4B | 2F | uint8_t | 0x2F (47) | value[11]
919 // +0x4C | 63 | uint8_t | 0x63 (99) | value[12]
920 // +0x4D | 58 | uint8_t | 0x58 (88) | value[13]
921 //
922 // padding:
923 // +0x4E | 00 00 | uint8_t[2] | .. | padding
924 // clang-format on
925 if (start_byte <= 0x40 && end_byte == message_size) {
926 // The easy one, slap it all down.
927 buffer = PushBytes(buffer, context.data, context.size);
928 buffer =
929 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
930 } else {
931 const size_t data_start_byte =
932 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
933 const size_t data_end_byte = end_byte - 0x40;
934 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
935 if (data_start_byte < padded_size) {
936 buffer = PushBytes(
937 buffer,
938 reinterpret_cast<const uint8_t *>(context.data) +
939 data_start_byte,
940 std::min(context.size, data_end_byte) - data_start_byte);
941 if (data_end_byte == padded_size) {
942 // We can only pad the last 7 bytes, so this only gets written
943 // if we write the last byte.
944 buffer = Pad(buffer,
945 ((context.size + 7) & 0xfffffff8u) - context.size);
946 }
947 }
948 }
949 break;
950 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700951 break;
952
953 case LogType::kLogDeliveryTimeOnly:
Austin Schuh71a40d42023-02-04 21:22:22 -0800954 switch (start_byte) {
955 case 0x00u:
956 if ((end_byte) == 0x00u) {
957 break;
958 }
959 // clang-format off
960 // header:
961 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
962 buffer = Push<flatbuffers::uoffset_t>(
963 buffer, message_size - sizeof(flatbuffers::uoffset_t));
964 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
965 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
Austin Schuhfa30c352022-10-16 11:12:02 -0700966
Austin Schuh71a40d42023-02-04 21:22:22 -0800967 [[fallthrough]];
968 case 0x08u:
969 if ((end_byte) == 0x08u) {
970 break;
971 }
972 //
973 // padding:
974 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
975 buffer = Pad(buffer, 4);
976 //
977 // vtable (aos.logger.MessageHeader):
978 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
979 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
980 // +0x0E | 30 00 | uint16_t | 0x0030 (48) | size of referring table
981 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
982 [[fallthrough]];
983 case 0x10u:
984 if ((end_byte) == 0x10u) {
985 break;
986 }
987 // +0x10 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `channel_index` (id: 0)
988 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
989 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
990 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
991 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
992 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
993 // +0x16 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `queue_index` (id: 3)
994 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
995 [[fallthrough]];
996 case 0x18u:
997 if ((end_byte) == 0x18u) {
998 break;
999 }
1000 // +0x18 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
1001 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
1002 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
1003 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
1004 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
1005 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
1006 // +0x1E | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
1007 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
1008 [[fallthrough]];
1009 case 0x20u:
1010 if ((end_byte) == 0x20u) {
1011 break;
1012 }
1013 //
1014 // root_table (aos.logger.MessageHeader):
1015 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
1016 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
1017 // +0x24 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `remote_queue_index` (UInt)
1018 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1019 [[fallthrough]];
1020 case 0x28u:
1021 if ((end_byte) == 0x28u) {
1022 break;
1023 }
1024 // +0x28 | C6 85 F1 AB 83 B5 CD EB | int64_t | 0xEBCDB583ABF185C6 (-1455307527440726586) | table field `realtime_remote_time` (Long)
1025 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
1026 [[fallthrough]];
1027 case 0x30u:
1028 if ((end_byte) == 0x30u) {
1029 break;
1030 }
1031 // +0x30 | 47 24 D3 97 1E 42 2D 99 | int64_t | 0x992D421E97D32447 (-7409193112790948793) | table field `monotonic_remote_time` (Long)
1032 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
1033 [[fallthrough]];
1034 case 0x38u:
1035 if ((end_byte) == 0x38u) {
1036 break;
1037 }
1038 // +0x38 | C8 B9 A7 AB 79 F2 CD 60 | int64_t | 0x60CDF279ABA7B9C8 (6975498002251626952) | table field `realtime_sent_time` (Long)
1039 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
1040 [[fallthrough]];
1041 case 0x40u:
1042 if ((end_byte) == 0x40u) {
1043 break;
1044 }
1045 // +0x40 | EA 8F 2A 0F AF 01 7A AB | int64_t | 0xAB7A01AF0F2A8FEA (-6090553694679822358) | table field `monotonic_sent_time` (Long)
1046 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
1047 [[fallthrough]];
1048 case 0x48u:
1049 if ((end_byte) == 0x48u) {
1050 break;
1051 }
1052 // +0x48 | F5 00 00 00 | uint32_t | 0x000000F5 (245) | table field `queue_index` (UInt)
1053 buffer = Push<uint32_t>(buffer, context.queue_index);
1054 // +0x4C | 88 00 00 00 | uint32_t | 0x00000088 (136) | table field `channel_index` (UInt)
1055 buffer = Push<uint32_t>(buffer, channel_index);
1056
1057 // clang-format on
1058 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001059 break;
1060
1061 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh71a40d42023-02-04 21:22:22 -08001062 switch (start_byte) {
1063 case 0x00u:
1064 if ((end_byte) == 0x00u) {
1065 break;
1066 }
1067 // clang-format off
1068 // header:
1069 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
1070 buffer = Push<flatbuffers::uoffset_t>(
1071 buffer, message_size - sizeof(flatbuffers::uoffset_t));
1072 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
1073 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
1074 [[fallthrough]];
1075 case 0x08u:
1076 if ((end_byte) == 0x08u) {
1077 break;
1078 }
1079 //
1080 // padding:
1081 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
1082 buffer = Pad(buffer, 4);
1083 //
1084 // vtable (aos.logger.MessageHeader):
1085 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
1086 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
1087 // +0x0E | 34 00 | uint16_t | 0x0034 (52) | size of referring table
1088 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
1089 [[fallthrough]];
1090 case 0x10u:
1091 if ((end_byte) == 0x10u) {
1092 break;
1093 }
1094 // +0x10 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `channel_index` (id: 0)
1095 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
1096 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
1097 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
1098 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
1099 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
1100 // +0x16 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `queue_index` (id: 3)
1101 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
1102 [[fallthrough]];
1103 case 0x18u:
1104 if ((end_byte) == 0x18u) {
1105 break;
1106 }
1107 // +0x18 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `data` (id: 4)
1108 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
1109 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
1110 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
1111 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
1112 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
1113 // +0x1E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `remote_queue_index` (id: 7)
1114 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
1115 [[fallthrough]];
1116 case 0x20u:
1117 if ((end_byte) == 0x20u) {
1118 break;
1119 }
1120 //
1121 // root_table (aos.logger.MessageHeader):
1122 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
1123 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
1124 // +0x24 | 30 00 00 00 | UOffset32 | 0x00000030 (48) Loc: +0x54 | offset to field `data` (vector)
1125 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x30);
1126 [[fallthrough]];
1127 case 0x28u:
1128 if ((end_byte) == 0x28u) {
1129 break;
1130 }
1131 // +0x28 | C4 C8 87 BF 40 6C 1F 29 | int64_t | 0x291F6C40BF87C8C4 (2963206105180129476) | table field `realtime_remote_time` (Long)
1132 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
1133 [[fallthrough]];
1134 case 0x30u:
1135 if ((end_byte) == 0x30u) {
1136 break;
1137 }
1138 // +0x30 | 0F 00 26 FD D2 6D C0 1F | int64_t | 0x1FC06DD2FD26000F (2287949363661897743) | table field `monotonic_remote_time` (Long)
1139 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
1140 [[fallthrough]];
1141 case 0x38u:
1142 if ((end_byte) == 0x38u) {
1143 break;
1144 }
1145 // +0x38 | 29 75 09 C0 73 73 BF 88 | int64_t | 0x88BF7373C0097529 (-8593022623019338455) | table field `realtime_sent_time` (Long)
1146 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
1147 [[fallthrough]];
1148 case 0x40u:
1149 if ((end_byte) == 0x40u) {
1150 break;
1151 }
1152 // +0x40 | 6D 8A AE 04 50 25 9C E9 | int64_t | 0xE99C255004AE8A6D (-1613373540899321235) | table field `monotonic_sent_time` (Long)
1153 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
1154 [[fallthrough]];
1155 case 0x48u:
1156 if ((end_byte) == 0x48u) {
1157 break;
1158 }
1159 // +0x48 | 47 00 00 00 | uint32_t | 0x00000047 (71) | table field `remote_queue_index` (UInt)
1160 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1161 // +0x4C | 4C 00 00 00 | uint32_t | 0x0000004C (76) | table field `queue_index` (UInt)
1162 buffer = Push<uint32_t>(buffer, context.queue_index);
1163 [[fallthrough]];
1164 case 0x50u:
1165 if ((end_byte) == 0x50u) {
1166 break;
1167 }
1168 // +0x50 | 72 00 00 00 | uint32_t | 0x00000072 (114) | table field `channel_index` (UInt)
1169 buffer = Push<uint32_t>(buffer, channel_index);
1170 //
1171 // vector (aos.logger.MessageHeader.data):
1172 // +0x54 | 07 00 00 00 | uint32_t | 0x00000007 (7) | length of vector (# items)
1173 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
1174 [[fallthrough]];
1175 case 0x58u:
1176 if ((end_byte) == 0x58u) {
1177 break;
1178 }
1179 [[fallthrough]];
1180 default:
1181 // +0x58 | B1 | uint8_t | 0xB1 (177) | value[0]
1182 // +0x59 | 4A | uint8_t | 0x4A (74) | value[1]
1183 // +0x5A | 50 | uint8_t | 0x50 (80) | value[2]
1184 // +0x5B | 24 | uint8_t | 0x24 (36) | value[3]
1185 // +0x5C | AF | uint8_t | 0xAF (175) | value[4]
1186 // +0x5D | C8 | uint8_t | 0xC8 (200) | value[5]
1187 // +0x5E | D5 | uint8_t | 0xD5 (213) | value[6]
1188 //
1189 // padding:
1190 // +0x5F | 00 | uint8_t[1] | . | padding
1191 // clang-format on
1192
1193 if (start_byte <= 0x58 && end_byte == message_size) {
1194 // The easy one, slap it all down.
1195 buffer = PushBytes(buffer, context.data, context.size);
1196 buffer =
1197 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1198 } else {
1199 const size_t data_start_byte =
1200 start_byte < 0x58 ? 0x0u : (start_byte - 0x58);
1201 const size_t data_end_byte = end_byte - 0x58;
1202 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1203 if (data_start_byte < padded_size) {
1204 buffer = PushBytes(
1205 buffer,
1206 reinterpret_cast<const uint8_t *>(context.data) +
1207 data_start_byte,
1208 std::min(context.size, data_end_byte) - data_start_byte);
1209 if (data_end_byte == padded_size) {
1210 // We can only pad the last 7 bytes, so this only gets written
1211 // if we write the last byte.
1212 buffer = Pad(buffer,
1213 ((context.size + 7) & 0xfffffff8u) - context.size);
1214 }
1215 }
1216 }
1217
1218 break;
1219 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001220
1221 break;
1222
1223 case LogType::kLogRemoteMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -08001224 switch (start_byte) {
1225 case 0x00u:
1226 if ((end_byte) == 0x00u) {
1227 break;
1228 }
1229 // This is the message we need to recreate.
1230 //
1231 // clang-format off
1232 // header:
1233 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
1234 buffer = Push<flatbuffers::uoffset_t>(
1235 buffer, message_size - sizeof(flatbuffers::uoffset_t));
1236 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
1237 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
1238 [[fallthrough]];
1239 case 0x08u:
1240 if ((end_byte) == 0x08u) {
1241 break;
1242 }
1243 //
1244 // padding:
1245 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1246 buffer = Pad(buffer, 6);
1247 //
1248 // vtable (aos.logger.MessageHeader):
1249 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
1250 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e);
1251 [[fallthrough]];
1252 case 0x10u:
1253 if ((end_byte) == 0x10u) {
1254 break;
1255 }
1256 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
1257 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
1258 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
1259 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
1260 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
1261 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
1262 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
1263 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
1264 [[fallthrough]];
1265 case 0x18u:
1266 if ((end_byte) == 0x18u) {
1267 break;
1268 }
1269 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
1270 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
1271 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
1272 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
1273 //
1274 // root_table (aos.logger.MessageHeader):
1275 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
1276 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E);
1277 [[fallthrough]];
1278 case 0x20u:
1279 if ((end_byte) == 0x20u) {
1280 break;
1281 }
1282 // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long)
1283 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
1284 [[fallthrough]];
1285 case 0x28u:
1286 if ((end_byte) == 0x28u) {
1287 break;
1288 }
1289 // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long)
1290 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
1291 [[fallthrough]];
1292 case 0x30u:
1293 if ((end_byte) == 0x30u) {
1294 break;
1295 }
1296 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
1297 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C);
1298 // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt)
1299 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1300 [[fallthrough]];
1301 case 0x38u:
1302 if ((end_byte) == 0x38u) {
1303 break;
1304 }
1305 // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt)
1306 buffer = Push<uint32_t>(buffer, channel_index);
1307 //
1308 // vector (aos.logger.MessageHeader.data):
1309 // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items)
1310 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
1311 [[fallthrough]];
1312 case 0x40u:
1313 if ((end_byte) == 0x40u) {
1314 break;
1315 }
1316 [[fallthrough]];
1317 default:
1318 // +0x40 | 38 | uint8_t | 0x38 (56) | value[0]
1319 // +0x41 | 1A | uint8_t | 0x1A (26) | value[1]
1320 // ...
1321 // +0x58 | 90 | uint8_t | 0x90 (144) | value[24]
1322 // +0x59 | 92 | uint8_t | 0x92 (146) | value[25]
1323 //
1324 // padding:
1325 // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1326 // clang-format on
1327 if (start_byte <= 0x40 && end_byte == message_size) {
1328 // The easy one, slap it all down.
1329 buffer = PushBytes(buffer, context.data, context.size);
1330 buffer =
1331 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1332 } else {
1333 const size_t data_start_byte =
1334 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
1335 const size_t data_end_byte = end_byte - 0x40;
1336 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1337 if (data_start_byte < padded_size) {
1338 buffer = PushBytes(
1339 buffer,
1340 reinterpret_cast<const uint8_t *>(context.data) +
1341 data_start_byte,
1342 std::min(context.size, data_end_byte) - data_start_byte);
1343 if (data_end_byte == padded_size) {
1344 // We can only pad the last 7 bytes, so this only gets written
1345 // if we write the last byte.
1346 buffer = Pad(buffer,
1347 ((context.size + 7) & 0xfffffff8u) - context.size);
1348 }
1349 }
1350 }
1351 break;
1352 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001353 }
1354
Austin Schuh71a40d42023-02-04 21:22:22 -08001355 return end_byte - start_byte;
Austin Schuhfa30c352022-10-16 11:12:02 -07001356}
1357
Austin Schuhcd368422021-11-22 21:23:29 -08001358SpanReader::SpanReader(std::string_view filename, bool quiet)
1359 : filename_(filename) {
Austin Schuh86110712022-09-16 15:40:54 -07001360 static constexpr std::string_view kS3 = "s3:";
1361 if (filename.substr(0, kS3.size()) == kS3) {
1362#if ENABLE_S3
1363 decoder_ = std::make_unique<S3Fetcher>(filename);
1364#else
1365 LOG(FATAL) << "Reading files from S3 not supported on this platform";
1366#endif
1367 } else {
1368 decoder_ = std::make_unique<DummyDecoder>(filename);
1369 }
Tyler Chatow2015bc62021-08-04 21:15:09 -07001370
1371 static constexpr std::string_view kXz = ".xz";
James Kuszmauldd0a5042021-10-28 23:38:04 -07001372 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001373 if (filename.substr(filename.size() - kXz.size()) == kXz) {
1374#if ENABLE_LZMA
Austin Schuhcd368422021-11-22 21:23:29 -08001375 decoder_ =
1376 std::make_unique<ThreadedLzmaDecoder>(std::move(decoder_), quiet);
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001377#else
Austin Schuhcd368422021-11-22 21:23:29 -08001378 (void)quiet;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001379 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
1380#endif
James Kuszmauldd0a5042021-10-28 23:38:04 -07001381 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
1382 decoder_ = std::make_unique<SnappyDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001383 }
Austin Schuh05b70472020-01-01 17:11:17 -08001384}
1385
Austin Schuhcf5f6442021-07-06 10:43:28 -07001386absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001387 // Make sure we have enough for the size.
1388 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
1389 if (!ReadBlock()) {
1390 return absl::Span<const uint8_t>();
1391 }
1392 }
1393
1394 // Now make sure we have enough for the message.
1395 const size_t data_size =
1396 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1397 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -08001398 if (data_size == sizeof(flatbuffers::uoffset_t)) {
1399 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
1400 LOG(ERROR) << " Rest of log file is "
1401 << absl::BytesToHexString(std::string_view(
1402 reinterpret_cast<const char *>(data_.data() +
1403 consumed_data_),
1404 data_.size() - consumed_data_));
1405 return absl::Span<const uint8_t>();
1406 }
Austin Schuh05b70472020-01-01 17:11:17 -08001407 while (data_.size() < consumed_data_ + data_size) {
1408 if (!ReadBlock()) {
1409 return absl::Span<const uint8_t>();
1410 }
1411 }
1412
1413 // And return it, consuming the data.
1414 const uint8_t *data_ptr = data_.data() + consumed_data_;
1415
Austin Schuh05b70472020-01-01 17:11:17 -08001416 return absl::Span<const uint8_t>(data_ptr, data_size);
1417}
1418
Austin Schuhcf5f6442021-07-06 10:43:28 -07001419void SpanReader::ConsumeMessage() {
Brian Smarttea913d42021-12-10 15:02:38 -08001420 size_t consumed_size =
Austin Schuhcf5f6442021-07-06 10:43:28 -07001421 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1422 sizeof(flatbuffers::uoffset_t);
Brian Smarttea913d42021-12-10 15:02:38 -08001423 consumed_data_ += consumed_size;
1424 total_consumed_ += consumed_size;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001425}
1426
1427absl::Span<const uint8_t> SpanReader::ReadMessage() {
1428 absl::Span<const uint8_t> result = PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001429 if (!result.empty()) {
Austin Schuhcf5f6442021-07-06 10:43:28 -07001430 ConsumeMessage();
Brian Smarttea913d42021-12-10 15:02:38 -08001431 } else {
1432 is_finished_ = true;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001433 }
1434 return result;
1435}
1436
Austin Schuh05b70472020-01-01 17:11:17 -08001437bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001438 // This is the amount of data we grab at a time. Doing larger chunks minimizes
1439 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -08001440 constexpr size_t kReadSize = 256 * 1024;
1441
1442 // Strip off any unused data at the front.
1443 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001444 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -08001445 consumed_data_ = 0;
1446 }
1447
1448 const size_t starting_size = data_.size();
1449
1450 // This should automatically grow the backing store. It won't shrink if we
1451 // get a small chunk later. This reduces allocations when we want to append
1452 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -07001453 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -08001454
Brian Silvermanf51499a2020-09-21 12:49:08 -07001455 const size_t count =
1456 decoder_->Read(data_.begin() + starting_size, data_.end());
1457 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -08001458 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -08001459 return false;
1460 }
Austin Schuh05b70472020-01-01 17:11:17 -08001461
Brian Smarttea913d42021-12-10 15:02:38 -08001462 total_read_ += count;
1463
Austin Schuh05b70472020-01-01 17:11:17 -08001464 return true;
1465}
1466
Austin Schuhadd6eb32020-11-09 21:24:26 -08001467std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -07001468 SpanReader *span_reader) {
1469 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001470
1471 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001472 if (config_data.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001473 return std::nullopt;
1474 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001475
Austin Schuh5212cad2020-09-09 23:12:09 -07001476 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001477 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -08001478 if (!result.Verify()) {
1479 return std::nullopt;
1480 }
Austin Schuh0e8db662021-07-06 10:43:47 -07001481
Austin Schuhcc2c9a52022-12-12 15:55:13 -08001482 // We only know of busted headers in the versions of the log file header
1483 // *before* the logger_sha1 field was added. At some point before that point,
1484 // the logic to track when a header has been written was rewritten in such a
1485 // way that it can't happen anymore. We've seen some logs where the body
1486 // parses as a header recently, so the simple solution of always looking is
1487 // failing us.
1488 if (FLAGS_workaround_double_headers && !result.message().has_logger_sha1()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001489 while (true) {
1490 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001491 if (maybe_header_data.empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001492 break;
1493 }
1494
1495 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
1496 maybe_header_data);
1497 if (maybe_header.Verify()) {
1498 LOG(WARNING) << "Found duplicate LogFileHeader in "
1499 << span_reader->filename();
1500 ResizeableBuffer header_data_copy;
1501 header_data_copy.resize(maybe_header_data.size());
1502 memcpy(header_data_copy.data(), maybe_header_data.begin(),
1503 header_data_copy.size());
1504 result = SizePrefixedFlatbufferVector<LogFileHeader>(
1505 std::move(header_data_copy));
1506
1507 span_reader->ConsumeMessage();
1508 } else {
1509 break;
1510 }
1511 }
1512 }
Austin Schuhe09beb12020-12-11 20:04:27 -08001513 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001514}
1515
Austin Schuh0e8db662021-07-06 10:43:47 -07001516std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
1517 std::string_view filename) {
1518 SpanReader span_reader(filename);
1519 return ReadHeader(&span_reader);
1520}
1521
Austin Schuhadd6eb32020-11-09 21:24:26 -08001522std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -08001523 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -07001524 SpanReader span_reader(filename);
1525 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
1526 for (size_t i = 0; i < n + 1; ++i) {
1527 data_span = span_reader.ReadMessage();
1528
1529 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001530 if (data_span.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001531 return std::nullopt;
1532 }
Austin Schuh5212cad2020-09-09 23:12:09 -07001533 }
1534
Brian Silverman354697a2020-09-22 21:06:32 -07001535 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001536 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -08001537 if (!result.Verify()) {
1538 return std::nullopt;
1539 }
1540 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -07001541}
1542
Austin Schuh05b70472020-01-01 17:11:17 -08001543MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -07001544 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -08001545 raw_log_file_header_(
1546 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001547 set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
1548 set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
1549
Austin Schuh0e8db662021-07-06 10:43:47 -07001550 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
1551 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -08001552
1553 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -07001554 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -08001555
Austin Schuh0e8db662021-07-06 10:43:47 -07001556 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -08001557
Austin Schuh5b728b72021-06-16 14:57:15 -07001558 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
1559
Brian Smarttea913d42021-12-10 15:02:38 -08001560 total_verified_before_ = span_reader_.TotalConsumed();
1561
Austin Schuhcde938c2020-02-02 17:30:07 -08001562 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -08001563 FLAGS_max_out_of_order > 0
1564 ? chrono::duration_cast<chrono::nanoseconds>(
1565 chrono::duration<double>(FLAGS_max_out_of_order))
1566 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -08001567
1568 VLOG(1) << "Opened " << filename << " as node "
1569 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -08001570}
1571
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001572std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001573 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001574 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001575 if (is_corrupted()) {
1576 LOG(ERROR) << "Total corrupted volumes: before = "
1577 << total_verified_before_
1578 << " | corrupted = " << total_corrupted_
1579 << " | during = " << total_verified_during_
1580 << " | after = " << total_verified_after_ << std::endl;
1581 }
1582
1583 if (span_reader_.IsIncomplete()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001584 LOG(ERROR) << "Unable to access some messages in " << filename() << " : "
1585 << span_reader_.TotalRead() << " bytes read, "
Brian Smarttea913d42021-12-10 15:02:38 -08001586 << span_reader_.TotalConsumed() << " bytes usable."
1587 << std::endl;
1588 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001589 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -08001590 }
1591
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001592 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
Brian Smarttea913d42021-12-10 15:02:38 -08001593
1594 if (crash_on_corrupt_message_flag_) {
1595 CHECK(msg.Verify()) << "Corrupted message at offset "
Austin Schuh60e77942022-05-16 17:48:24 -07001596 << total_verified_before_ << " found within "
1597 << filename()
Brian Smarttea913d42021-12-10 15:02:38 -08001598 << "; set --nocrash_on_corrupt_message to see summary;"
1599 << " also set --ignore_corrupt_messages to process"
1600 << " anyway";
1601
1602 } else if (!msg.Verify()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001603 LOG(ERROR) << "Corrupted message at offset " << total_verified_before_
Brian Smarttea913d42021-12-10 15:02:38 -08001604 << " from " << filename() << std::endl;
1605
1606 total_corrupted_ += msg_data.size();
1607
1608 while (true) {
1609 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1610
James Kuszmaul9776b392023-01-14 14:08:08 -08001611 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001612 if (!ignore_corrupt_messages_flag_) {
1613 LOG(ERROR) << "Total corrupted volumes: before = "
1614 << total_verified_before_
1615 << " | corrupted = " << total_corrupted_
1616 << " | during = " << total_verified_during_
1617 << " | after = " << total_verified_after_ << std::endl;
1618
1619 if (span_reader_.IsIncomplete()) {
1620 LOG(ERROR) << "Unable to access some messages in " << filename()
1621 << " : " << span_reader_.TotalRead() << " bytes read, "
1622 << span_reader_.TotalConsumed() << " bytes usable."
1623 << std::endl;
1624 }
1625 return nullptr;
1626 }
1627 break;
1628 }
1629
1630 SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
1631
1632 if (!next_msg.Verify()) {
1633 total_corrupted_ += msg_data.size();
1634 total_verified_during_ += total_verified_after_;
1635 total_verified_after_ = 0;
1636
1637 } else {
1638 total_verified_after_ += msg_data.size();
1639 if (ignore_corrupt_messages_flag_) {
1640 msg = next_msg;
1641 break;
1642 }
1643 }
1644 }
1645 }
1646
1647 if (is_corrupted()) {
1648 total_verified_after_ += msg_data.size();
1649 } else {
1650 total_verified_before_ += msg_data.size();
1651 }
Austin Schuh05b70472020-01-01 17:11:17 -08001652
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001653 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -07001654
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001655 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001656
1657 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -08001658
1659 if (VLOG_IS_ON(3)) {
1660 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
1661 } else if (VLOG_IS_ON(2)) {
1662 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
1663 msg_copy.mutable_message()->clear_data();
1664 VLOG(2) << "Read from " << filename() << " data "
1665 << FlatbufferToJson(msg_copy);
1666 }
1667
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001668 return result;
1669}
1670
1671std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
1672 const MessageHeader &message) {
1673 const size_t data_size = message.has_data() ? message.data()->size() : 0;
1674
1675 UnpackedMessageHeader *const unpacked_message =
1676 reinterpret_cast<UnpackedMessageHeader *>(
1677 malloc(sizeof(UnpackedMessageHeader) + data_size +
1678 kChannelDataAlignment - 1));
1679
1680 CHECK(message.has_channel_index());
1681 CHECK(message.has_monotonic_sent_time());
1682
1683 absl::Span<uint8_t> span;
1684 if (data_size > 0) {
1685 span =
1686 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
1687 &unpacked_message->actual_data[0], data_size)),
1688 data_size);
1689 }
1690
Austin Schuh826e6ce2021-11-18 20:33:10 -08001691 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001692 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001693 monotonic_remote_time = aos::monotonic_clock::time_point(
1694 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001695 }
1696 std::optional<realtime_clock::time_point> realtime_remote_time;
1697 if (message.has_realtime_remote_time()) {
1698 realtime_remote_time = realtime_clock::time_point(
1699 chrono::nanoseconds(message.realtime_remote_time()));
1700 }
1701
1702 std::optional<uint32_t> remote_queue_index;
1703 if (message.has_remote_queue_index()) {
1704 remote_queue_index = message.remote_queue_index();
1705 }
1706
James Kuszmaul9776b392023-01-14 14:08:08 -08001707 new (unpacked_message) UnpackedMessageHeader(
1708 message.channel_index(),
1709 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001710 chrono::nanoseconds(message.monotonic_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001711 realtime_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001712 chrono::nanoseconds(message.realtime_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001713 message.queue_index(), monotonic_remote_time, realtime_remote_time,
1714 remote_queue_index,
1715 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001716 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001717 message.has_monotonic_timestamp_time(), span);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001718
1719 if (data_size > 0) {
1720 memcpy(span.data(), message.data()->data(), data_size);
1721 }
1722
1723 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
1724 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -08001725}
1726
Austin Schuhc41603c2020-10-11 16:17:37 -07001727PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001728 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001729 if (parts_.parts.size() >= 2) {
1730 next_message_reader_.emplace(parts_.parts[1]);
1731 }
Austin Schuh48507722021-07-17 17:29:24 -07001732 ComputeBootCounts();
1733}
1734
1735void PartsMessageReader::ComputeBootCounts() {
1736 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
1737 std::nullopt);
1738
1739 // We have 3 vintages of log files with different amounts of information.
1740 if (log_file_header()->has_boot_uuids()) {
1741 // The new hotness with the boots explicitly listed out. We can use the log
1742 // file header to compute the boot count of all relevant nodes.
1743 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
1744 size_t node_index = 0;
1745 for (const flatbuffers::String *boot_uuid :
1746 *log_file_header()->boot_uuids()) {
1747 CHECK(parts_.boots);
1748 if (boot_uuid->size() != 0) {
1749 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
1750 if (it != parts_.boots->boot_count_map.end()) {
1751 boot_counts_[node_index] = it->second;
1752 }
1753 } else if (parts().boots->boots[node_index].size() == 1u) {
1754 boot_counts_[node_index] = 0;
1755 }
1756 ++node_index;
1757 }
1758 } else {
1759 // Older multi-node logs which are guarenteed to have UUIDs logged, or
1760 // single node log files with boot UUIDs in the header. We only know how to
1761 // order certain boots in certain circumstances.
1762 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
1763 for (size_t node_index = 0; node_index < boot_counts_.size();
1764 ++node_index) {
1765 CHECK(parts_.boots);
1766 if (parts().boots->boots[node_index].size() == 1u) {
1767 boot_counts_[node_index] = 0;
1768 }
1769 }
1770 } else {
1771 // Really old single node logs without any UUIDs. They can't reboot.
1772 CHECK_EQ(boot_counts_.size(), 1u);
1773 boot_counts_[0] = 0u;
1774 }
1775 }
1776}
Austin Schuhc41603c2020-10-11 16:17:37 -07001777
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001778std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -07001779 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001780 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -07001781 message_reader_.ReadMessage();
1782 if (message) {
1783 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001784 const monotonic_clock::time_point monotonic_sent_time =
1785 message->monotonic_sent_time;
1786
1787 // TODO(austin): Does this work with startup? Might need to use the
1788 // start time.
1789 // TODO(austin): Does this work with startup when we don't know the
1790 // remote start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -08001791 if (monotonic_sent_time >
1792 parts_.monotonic_start_time + max_out_of_order_duration()) {
1793 after_start_ = true;
1794 }
1795 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -08001796 CHECK_GE(monotonic_sent_time,
1797 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -08001798 << ": Max out of order of " << max_out_of_order_duration().count()
1799 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -08001800 << parts_.monotonic_start_time << " currently reading "
1801 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -08001802 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001803 return message;
1804 }
1805 NextLog();
1806 }
Austin Schuh32f68492020-11-08 21:45:51 -08001807 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001808 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -07001809}
1810
1811void PartsMessageReader::NextLog() {
1812 if (next_part_index_ == parts_.parts.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001813 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -07001814 done_ = true;
1815 return;
1816 }
Brian Silvermanfee16972021-09-14 12:06:38 -07001817 CHECK(next_message_reader_);
1818 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -07001819 ComputeBootCounts();
Brian Silvermanfee16972021-09-14 12:06:38 -07001820 if (next_part_index_ + 1 < parts_.parts.size()) {
1821 next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
1822 } else {
1823 next_message_reader_.reset();
1824 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001825 ++next_part_index_;
1826}
1827
Austin Schuh1be0ce42020-11-29 22:43:26 -08001828bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001829 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001830
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001831 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001832 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001833 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001834 return false;
1835 }
1836
1837 if (this->channel_index < m2.channel_index) {
1838 return true;
1839 } else if (this->channel_index > m2.channel_index) {
1840 return false;
1841 }
1842
1843 return this->queue_index < m2.queue_index;
1844}
1845
1846bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001847bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001848 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001849
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001850 return timestamp.time == m2.timestamp.time &&
1851 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001852}
Austin Schuh1be0ce42020-11-29 22:43:26 -08001853
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001854std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
1855 os << "{.channel_index=" << m.channel_index
1856 << ", .monotonic_sent_time=" << m.monotonic_sent_time
1857 << ", .realtime_sent_time=" << m.realtime_sent_time
1858 << ", .queue_index=" << m.queue_index;
1859 if (m.monotonic_remote_time) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001860 os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001861 }
1862 os << ", .realtime_remote_time=";
1863 PrintOptionalOrNull(&os, m.realtime_remote_time);
1864 os << ", .remote_queue_index=";
1865 PrintOptionalOrNull(&os, m.remote_queue_index);
1866 if (m.has_monotonic_timestamp_time) {
1867 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1868 }
Austin Schuh22cf7862022-09-19 19:09:42 -07001869 os << "}";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001870 return os;
1871}
1872
Austin Schuh1be0ce42020-11-29 22:43:26 -08001873std::ostream &operator<<(std::ostream &os, const Message &m) {
1874 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001875 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001876 if (m.data != nullptr) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001877 if (m.data->remote_queue_index.has_value()) {
1878 os << ", .remote_queue_index=" << *m.data->remote_queue_index;
1879 }
1880 if (m.data->monotonic_remote_time.has_value()) {
1881 os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
1882 }
Austin Schuhfb1b3292021-11-16 21:20:15 -08001883 os << ", .data=" << m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -08001884 }
1885 os << "}";
1886 return os;
1887}
1888
1889std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
1890 os << "{.channel_index=" << m.channel_index
1891 << ", .queue_index=" << m.queue_index
1892 << ", .monotonic_event_time=" << m.monotonic_event_time
1893 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -07001894 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001895 os << ", .remote_queue_index=" << m.remote_queue_index;
1896 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001897 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001898 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
1899 }
1900 if (m.realtime_remote_time != realtime_clock::min_time) {
1901 os << ", .realtime_remote_time=" << m.realtime_remote_time;
1902 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001903 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001904 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1905 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001906 if (m.data != nullptr) {
1907 os << ", .data=" << *m.data;
Austin Schuh22cf7862022-09-19 19:09:42 -07001908 } else {
1909 os << ", .data=nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08001910 }
1911 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -08001912 return os;
1913}
1914
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001915LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001916 : parts_message_reader_(log_parts),
1917 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
1918}
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001919
1920Message *LogPartsSorter::Front() {
1921 // Queue up data until enough data has been queued that the front message is
1922 // sorted enough to be safe to pop. This may do nothing, so we should make
1923 // sure the nothing path is checked quickly.
1924 if (sorted_until() != monotonic_clock::max_time) {
1925 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -07001926 if (!messages_.empty() &&
1927 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -08001928 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001929 break;
1930 }
1931
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001932 std::shared_ptr<UnpackedMessageHeader> m =
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001933 parts_message_reader_.ReadMessage();
1934 // No data left, sorted forever, work through what is left.
1935 if (!m) {
1936 sorted_until_ = monotonic_clock::max_time;
1937 break;
1938 }
1939
Austin Schuh48507722021-07-17 17:29:24 -07001940 size_t monotonic_timestamp_boot = 0;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001941 if (m->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -07001942 monotonic_timestamp_boot = parts().logger_boot_count;
1943 }
1944 size_t monotonic_remote_boot = 0xffffff;
1945
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001946 if (m->monotonic_remote_time.has_value()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001947 const Node *node =
1948 parts().config->nodes()->Get(source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001949
Austin Schuh48507722021-07-17 17:29:24 -07001950 std::optional<size_t> boot = parts_message_reader_.boot_count(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001951 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001952 CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
Austin Schuh60e77942022-05-16 17:48:24 -07001953 << ", with index " << source_node_index_[m->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -07001954 monotonic_remote_boot = *boot;
1955 }
1956
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001957 messages_.insert(
1958 Message{.channel_index = m->channel_index,
1959 .queue_index = BootQueueIndex{.boot = parts().boot_count,
1960 .index = m->queue_index},
1961 .timestamp = BootTimestamp{.boot = parts().boot_count,
1962 .time = m->monotonic_sent_time},
1963 .monotonic_remote_boot = monotonic_remote_boot,
1964 .monotonic_timestamp_boot = monotonic_timestamp_boot,
1965 .data = std::move(m)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001966
1967 // Now, update sorted_until_ to match the new message.
1968 if (parts_message_reader_.newest_timestamp() >
1969 monotonic_clock::min_time +
1970 parts_message_reader_.max_out_of_order_duration()) {
1971 sorted_until_ = parts_message_reader_.newest_timestamp() -
1972 parts_message_reader_.max_out_of_order_duration();
1973 } else {
1974 sorted_until_ = monotonic_clock::min_time;
1975 }
1976 }
1977 }
1978
1979 // Now that we have enough data queued, return a pointer to the oldest piece
1980 // of data if it exists.
1981 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -08001982 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001983 return nullptr;
1984 }
1985
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001986 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -08001987 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001988 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001989 return &(*messages_.begin());
1990}
1991
1992void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
1993
1994std::string LogPartsSorter::DebugString() const {
1995 std::stringstream ss;
1996 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001997 int count = 0;
1998 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001999 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -08002000 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
2001 ss << m << "\n";
2002 } else if (no_dots) {
2003 ss << "...\n";
2004 no_dots = false;
2005 }
2006 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08002007 }
2008 ss << "] <- " << parts_message_reader_.filename();
2009 return ss.str();
2010}
2011
Austin Schuhd2f96102020-12-01 20:27:29 -08002012NodeMerger::NodeMerger(std::vector<LogParts> parts) {
2013 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -07002014 // Enforce that we are sorting things only from a single node from a single
2015 // boot.
2016 const std::string_view part0_node = parts[0].node;
2017 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -08002018 for (size_t i = 1; i < parts.size(); ++i) {
2019 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -07002020 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
2021 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -08002022 }
Austin Schuh715adc12021-06-29 22:07:39 -07002023
2024 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
2025
Austin Schuhd2f96102020-12-01 20:27:29 -08002026 for (LogParts &part : parts) {
2027 parts_sorters_.emplace_back(std::move(part));
2028 }
2029
Austin Schuhd2f96102020-12-01 20:27:29 -08002030 monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh9dc42612021-09-20 20:41:29 -07002031 realtime_start_time_ = realtime_clock::min_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002032 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -07002033 // We want to capture the earliest meaningful start time here. The start
2034 // time defaults to min_time when there's no meaningful value to report, so
2035 // let's ignore those.
Austin Schuh9dc42612021-09-20 20:41:29 -07002036 if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time) {
2037 bool accept = false;
2038 // We want to prioritize start times from the logger node. Really, we
2039 // want to prioritize start times with a valid realtime_clock time. So,
2040 // if we have a start time without a RT clock, prefer a start time with a
2041 // RT clock, even it if is later.
2042 if (parts_sorter.realtime_start_time() != realtime_clock::min_time) {
2043 // We've got a good one. See if the current start time has a good RT
2044 // clock, or if we should use this one instead.
2045 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
2046 accept = true;
2047 } else if (realtime_start_time_ == realtime_clock::min_time) {
2048 // The previous start time doesn't have a good RT time, so it is very
2049 // likely the start time from a remote part file. We just found a
2050 // better start time with a real RT time, so switch to that instead.
2051 accept = true;
2052 }
2053 } else if (realtime_start_time_ == realtime_clock::min_time) {
2054 // We don't have a RT time, so take the oldest.
2055 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
2056 accept = true;
2057 }
2058 }
2059
2060 if (accept) {
2061 monotonic_start_time_ = parts_sorter.monotonic_start_time();
2062 realtime_start_time_ = parts_sorter.realtime_start_time();
2063 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002064 }
2065 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -07002066
2067 // If there was no meaningful start time reported, just use min_time.
2068 if (monotonic_start_time_ == monotonic_clock::max_time) {
2069 monotonic_start_time_ = monotonic_clock::min_time;
2070 realtime_start_time_ = realtime_clock::min_time;
2071 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002072}
Austin Schuh8f52ed52020-11-30 23:12:39 -08002073
Austin Schuh0ca51f32020-12-25 21:51:45 -08002074std::vector<const LogParts *> NodeMerger::Parts() const {
2075 std::vector<const LogParts *> p;
2076 p.reserve(parts_sorters_.size());
2077 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
2078 p.emplace_back(&parts_sorter.parts());
2079 }
2080 return p;
2081}
2082
Austin Schuh8f52ed52020-11-30 23:12:39 -08002083Message *NodeMerger::Front() {
2084 // Return the current Front if we have one, otherwise go compute one.
2085 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -08002086 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002087 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -08002088 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -08002089 }
2090
2091 // Otherwise, do a simple search for the oldest message, deduplicating any
2092 // duplicates.
2093 Message *oldest = nullptr;
2094 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002095 for (LogPartsSorter &parts_sorter : parts_sorters_) {
2096 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -08002097 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002098 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08002099 continue;
2100 }
2101 if (oldest == nullptr || *m < *oldest) {
2102 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -08002103 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -08002104 } else if (*m == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002105 // Found a duplicate. If there is a choice, we want the one which has
2106 // the timestamp time.
2107 if (!m->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08002108 parts_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002109 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08002110 current_->PopFront();
2111 current_ = &parts_sorter;
2112 oldest = m;
2113 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002114 CHECK_EQ(m->data->monotonic_timestamp_time,
2115 oldest->data->monotonic_timestamp_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08002116 parts_sorter.PopFront();
2117 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08002118 }
2119
2120 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -08002121 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08002122 }
2123
Austin Schuhb000de62020-12-03 22:00:40 -08002124 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002125 CHECK_GE(oldest->timestamp.time, last_message_time_);
2126 last_message_time_ = oldest->timestamp.time;
Austin Schuh5dd22842021-11-17 16:09:39 -08002127 monotonic_oldest_time_ =
2128 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08002129 } else {
2130 last_message_time_ = monotonic_clock::max_time;
2131 }
2132
Austin Schuh8f52ed52020-11-30 23:12:39 -08002133 // Return the oldest message found. This will be nullptr if nothing was
2134 // found, indicating there is nothing left.
2135 return oldest;
2136}
2137
2138void NodeMerger::PopFront() {
2139 CHECK(current_ != nullptr) << "Popping before calling Front()";
2140 current_->PopFront();
2141 current_ = nullptr;
2142}
2143
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002144BootMerger::BootMerger(std::vector<LogParts> files) {
2145 std::vector<std::vector<LogParts>> boots;
2146
2147 // Now, we need to split things out by boot.
2148 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002149 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002150 if (boot_count + 1 > boots.size()) {
2151 boots.resize(boot_count + 1);
2152 }
2153 boots[boot_count].emplace_back(std::move(files[i]));
2154 }
2155
2156 node_mergers_.reserve(boots.size());
2157 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07002158 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002159 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -07002160 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002161 }
2162 node_mergers_.emplace_back(
2163 std::make_unique<NodeMerger>(std::move(boots[i])));
2164 }
2165}
2166
2167Message *BootMerger::Front() {
2168 Message *result = node_mergers_[index_]->Front();
2169
2170 if (result != nullptr) {
2171 return result;
2172 }
2173
2174 if (index_ + 1u == node_mergers_.size()) {
2175 // At the end of the last node merger, just return.
2176 return nullptr;
2177 } else {
2178 ++index_;
2179 return Front();
2180 }
2181}
2182
2183void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
2184
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002185std::vector<const LogParts *> BootMerger::Parts() const {
2186 std::vector<const LogParts *> results;
2187 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
2188 std::vector<const LogParts *> node_parts = node_merger->Parts();
2189
2190 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
2191 std::make_move_iterator(node_parts.end()));
2192 }
2193
2194 return results;
2195}
2196
Austin Schuhd2f96102020-12-01 20:27:29 -08002197TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002198 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -08002199 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002200 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002201 if (!configuration_) {
2202 configuration_ = part->config;
2203 } else {
2204 CHECK_EQ(configuration_.get(), part->config.get());
2205 }
2206 }
2207 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -08002208 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
2209 // pretty simple.
2210 if (configuration::MultiNode(config)) {
2211 nodes_data_.resize(config->nodes()->size());
2212 const Node *my_node = config->nodes()->Get(node());
2213 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
2214 const Node *node = config->nodes()->Get(node_index);
2215 NodeData *node_data = &nodes_data_[node_index];
2216 node_data->channels.resize(config->channels()->size());
2217 // We should save the channel if it is delivered to the node represented
2218 // by the NodeData, but not sent by that node. That combo means it is
2219 // forwarded.
2220 size_t channel_index = 0;
2221 node_data->any_delivered = false;
2222 for (const Channel *channel : *config->channels()) {
2223 node_data->channels[channel_index].delivered =
2224 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08002225 configuration::ChannelIsSendableOnNode(channel, my_node) &&
2226 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08002227 node_data->any_delivered = node_data->any_delivered ||
2228 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08002229 if (node_data->channels[channel_index].delivered) {
2230 const Connection *connection =
2231 configuration::ConnectionToNode(channel, node);
2232 node_data->channels[channel_index].time_to_live =
2233 chrono::nanoseconds(connection->time_to_live());
2234 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002235 ++channel_index;
2236 }
2237 }
2238
2239 for (const Channel *channel : *config->channels()) {
2240 source_node_.emplace_back(configuration::GetNodeIndex(
2241 config, channel->source_node()->string_view()));
2242 }
2243 }
2244}
2245
2246void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002247 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08002248 CHECK_NE(timestamp_mapper->node(), node());
2249 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
2250
2251 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002252 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08002253 // we could needlessly save data.
2254 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08002255 VLOG(1) << "Registering on node " << node() << " for peer node "
2256 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08002257 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
2258
2259 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07002260
2261 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08002262 }
2263}
2264
Austin Schuh79b30942021-01-24 22:32:21 -08002265void TimestampMapper::QueueMessage(Message *m) {
Austin Schuh60e77942022-05-16 17:48:24 -07002266 matched_messages_.emplace_back(
2267 TimestampedMessage{.channel_index = m->channel_index,
2268 .queue_index = m->queue_index,
2269 .monotonic_event_time = m->timestamp,
2270 .realtime_event_time = m->data->realtime_sent_time,
2271 .remote_queue_index = BootQueueIndex::Invalid(),
2272 .monotonic_remote_time = BootTimestamp::min_time(),
2273 .realtime_remote_time = realtime_clock::min_time,
2274 .monotonic_timestamp_time = BootTimestamp::min_time(),
2275 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08002276}
2277
2278TimestampedMessage *TimestampMapper::Front() {
2279 // No need to fetch anything new. A previous message still exists.
2280 switch (first_message_) {
2281 case FirstMessage::kNeedsUpdate:
2282 break;
2283 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08002284 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002285 case FirstMessage::kNullptr:
2286 return nullptr;
2287 }
2288
Austin Schuh79b30942021-01-24 22:32:21 -08002289 if (matched_messages_.empty()) {
2290 if (!QueueMatched()) {
2291 first_message_ = FirstMessage::kNullptr;
2292 return nullptr;
2293 }
2294 }
2295 first_message_ = FirstMessage::kInMessage;
2296 return &matched_messages_.front();
2297}
2298
2299bool TimestampMapper::QueueMatched() {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002300 MatchResult result = MatchResult::kEndOfFile;
2301 do {
2302 result = MaybeQueueMatched();
2303 } while (result == MatchResult::kSkipped);
2304 return result == MatchResult::kQueued;
2305}
2306
2307bool TimestampMapper::CheckReplayChannelsAndMaybePop(
2308 const TimestampedMessage & /*message*/) {
2309 if (replay_channels_callback_ &&
2310 !replay_channels_callback_(matched_messages_.back())) {
2311 matched_messages_.pop_back();
2312 return true;
2313 }
2314 return false;
2315}
2316
2317TimestampMapper::MatchResult TimestampMapper::MaybeQueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08002318 if (nodes_data_.empty()) {
2319 // Simple path. We are single node, so there are no timestamps to match!
2320 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002321 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002322 if (!m) {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002323 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002324 }
Austin Schuh79b30942021-01-24 22:32:21 -08002325 // Enqueue this message into matched_messages_ so we have a place to
2326 // associate remote timestamps, and return it.
2327 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08002328
Austin Schuh79b30942021-01-24 22:32:21 -08002329 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2330 last_message_time_ = matched_messages_.back().monotonic_event_time;
2331
2332 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002333 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08002334 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002335 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2336 return MatchResult::kSkipped;
2337 }
2338 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002339 }
2340
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002341 // We need to only add messages to the list so they get processed for
2342 // messages which are delivered. Reuse the flow below which uses messages_
2343 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08002344 if (messages_.empty()) {
2345 if (!Queue()) {
2346 // Found nothing to add, we are out of data!
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002347 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002348 }
2349
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002350 // Now that it has been added (and cannibalized), forget about it
2351 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002352 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002353 }
2354
2355 Message *m = &(messages_.front());
2356
2357 if (source_node_[m->channel_index] == node()) {
2358 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08002359 QueueMessage(m);
2360 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2361 last_message_time_ = matched_messages_.back().monotonic_event_time;
2362 messages_.pop_front();
2363 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002364 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2365 return MatchResult::kSkipped;
2366 }
2367 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002368 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002369 // Got a timestamp, find the matching remote data, match it, and return
2370 // it.
Austin Schuhd2f96102020-12-01 20:27:29 -08002371 Message data = MatchingMessageFor(*m);
2372
2373 // Return the data from the remote. The local message only has timestamp
2374 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08002375 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08002376 .channel_index = m->channel_index,
2377 .queue_index = m->queue_index,
2378 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002379 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07002380 .remote_queue_index =
2381 BootQueueIndex{.boot = m->monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002382 .index = m->data->remote_queue_index.value()},
2383 .monotonic_remote_time = {m->monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002384 m->data->monotonic_remote_time.value()},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002385 .realtime_remote_time = m->data->realtime_remote_time.value(),
2386 .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
2387 m->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08002388 .data = std::move(data.data)});
2389 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2390 last_message_time_ = matched_messages_.back().monotonic_event_time;
2391 // Since messages_ holds the data, drop it.
2392 messages_.pop_front();
2393 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002394 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2395 return MatchResult::kSkipped;
2396 }
2397 return MatchResult::kQueued;
Austin Schuh79b30942021-01-24 22:32:21 -08002398 }
2399}
2400
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002401void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08002402 while (last_message_time_ <= queue_time) {
2403 if (!QueueMatched()) {
2404 return;
2405 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002406 }
2407}
2408
Austin Schuhe639ea12021-01-25 13:00:22 -08002409void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002410 // Note: queueing for time doesn't really work well across boots. So we
2411 // just assume that if you are using this, you only care about the current
2412 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002413 //
2414 // TODO(austin): Is that the right concept?
2415 //
Austin Schuhe639ea12021-01-25 13:00:22 -08002416 // Make sure we have something queued first. This makes the end time
2417 // calculation simpler, and is typically what folks want regardless.
2418 if (matched_messages_.empty()) {
2419 if (!QueueMatched()) {
2420 return;
2421 }
2422 }
2423
2424 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002425 std::max(monotonic_start_time(
2426 matched_messages_.front().monotonic_event_time.boot),
2427 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08002428 time_estimation_buffer;
2429
2430 // Place sorted messages on the list until we have
2431 // --time_estimation_buffer_seconds seconds queued up (but queue at least
2432 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002433 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08002434 if (!QueueMatched()) {
2435 return;
2436 }
2437 }
2438}
2439
Austin Schuhd2f96102020-12-01 20:27:29 -08002440void TimestampMapper::PopFront() {
2441 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08002442 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002443 first_message_ = FirstMessage::kNeedsUpdate;
2444
Austin Schuh79b30942021-01-24 22:32:21 -08002445 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002446}
2447
2448Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002449 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002450 CHECK_NOTNULL(message.data);
2451 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07002452 const BootQueueIndex remote_queue_index =
2453 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002454 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08002455
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002456 CHECK(message.data->monotonic_remote_time.has_value());
2457 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08002458
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002459 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07002460 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002461 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002462 const realtime_clock::time_point realtime_remote_time =
2463 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002464
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002465 TimestampMapper *peer =
2466 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08002467
2468 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002469 // asked to pull a timestamp from a peer which doesn't exist, return an
2470 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08002471 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002472 // TODO(austin): Make sure the tests hit all these paths with a boot count
2473 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002474 return Message{.channel_index = message.channel_index,
2475 .queue_index = remote_queue_index,
2476 .timestamp = monotonic_remote_time,
2477 .monotonic_remote_boot = 0xffffff,
2478 .monotonic_timestamp_boot = 0xffffff,
2479 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08002480 }
2481
2482 // The queue which will have the matching data, if available.
2483 std::deque<Message> *data_queue =
2484 &peer->nodes_data_[node()].channels[message.channel_index].messages;
2485
Austin Schuh79b30942021-01-24 22:32:21 -08002486 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08002487
2488 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002489 return Message{.channel_index = message.channel_index,
2490 .queue_index = remote_queue_index,
2491 .timestamp = monotonic_remote_time,
2492 .monotonic_remote_boot = 0xffffff,
2493 .monotonic_timestamp_boot = 0xffffff,
2494 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002495 }
2496
Austin Schuhd2f96102020-12-01 20:27:29 -08002497 if (remote_queue_index < data_queue->front().queue_index ||
2498 remote_queue_index > data_queue->back().queue_index) {
Austin Schuh60e77942022-05-16 17:48:24 -07002499 return Message{.channel_index = message.channel_index,
2500 .queue_index = remote_queue_index,
2501 .timestamp = monotonic_remote_time,
2502 .monotonic_remote_boot = 0xffffff,
2503 .monotonic_timestamp_boot = 0xffffff,
2504 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002505 }
2506
Austin Schuh993ccb52020-12-12 15:59:32 -08002507 // The algorithm below is constant time with some assumptions. We need there
2508 // to be no missing messages in the data stream. This also assumes a queue
2509 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07002510 if (data_queue->back().queue_index.boot ==
2511 data_queue->front().queue_index.boot &&
2512 (data_queue->back().queue_index.index -
2513 data_queue->front().queue_index.index + 1u ==
2514 data_queue->size())) {
2515 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08002516 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07002517 //
2518 // TODO(austin): Move if not reliable.
2519 Message result = (*data_queue)[remote_queue_index.index -
2520 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08002521
2522 CHECK_EQ(result.timestamp, monotonic_remote_time)
2523 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh6a7358f2021-11-18 22:40:40 -08002524 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08002525 << ": Queue index matches, but timestamp doesn't. Please investigate!";
2526 // Now drop the data off the front. We have deduplicated timestamps, so we
2527 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07002528 data_queue->erase(
2529 data_queue->begin(),
2530 data_queue->begin() +
2531 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08002532 return result;
2533 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07002534 // TODO(austin): Binary search.
2535 auto it = std::find_if(
2536 data_queue->begin(), data_queue->end(),
2537 [remote_queue_index,
2538 remote_boot = monotonic_remote_time.boot](const Message &m) {
2539 return m.queue_index == remote_queue_index &&
2540 m.timestamp.boot == remote_boot;
2541 });
Austin Schuh993ccb52020-12-12 15:59:32 -08002542 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002543 return Message{.channel_index = message.channel_index,
2544 .queue_index = remote_queue_index,
2545 .timestamp = monotonic_remote_time,
2546 .monotonic_remote_boot = 0xffffff,
2547 .monotonic_timestamp_boot = 0xffffff,
2548 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08002549 }
2550
2551 Message result = std::move(*it);
2552
2553 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002554 << ": Queue index matches, but timestamp doesn't. Please "
2555 "investigate!";
2556 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
2557 << ": Queue index matches, but timestamp doesn't. Please "
2558 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08002559
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08002560 // Erase everything up to this message. We want to keep 1 message in the
2561 // queue so we can handle reliable messages forwarded across boots.
2562 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08002563
2564 return result;
2565 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002566}
2567
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002568void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002569 if (queued_until_ > t) {
2570 return;
2571 }
2572 while (true) {
2573 if (!messages_.empty() && messages_.back().timestamp > t) {
2574 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
2575 return;
2576 }
2577
2578 if (!Queue()) {
2579 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002580 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08002581 return;
2582 }
2583
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002584 // Now that it has been added (and cannibalized), forget about it
2585 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002586 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002587 }
2588}
2589
2590bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002591 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002592 if (m == nullptr) {
2593 return false;
2594 }
2595 for (NodeData &node_data : nodes_data_) {
2596 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07002597 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08002598 if (node_data.channels[m->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08002599 // If we have data but no timestamps (logs where the timestamps didn't get
2600 // logged are classic), we can grow this indefinitely. We don't need to
2601 // keep anything that is older than the last message returned.
2602
2603 // We have the time on the source node.
2604 // We care to wait until we have the time on the destination node.
2605 std::deque<Message> &messages =
2606 node_data.channels[m->channel_index].messages;
2607 // Max delay over the network is the TTL, so let's take the queue time and
2608 // add TTL to it. Don't forget any messages which are reliable until
2609 // someone can come up with a good reason to forget those too.
2610 if (node_data.channels[m->channel_index].time_to_live >
2611 chrono::nanoseconds(0)) {
2612 // We need to make *some* assumptions about network delay for this to
2613 // work. We want to only look at the RX side. This means we need to
2614 // track the last time a message was popped from any channel from the
2615 // node sending this message, and compare that to the max time we expect
2616 // that a message will take to be delivered across the network. This
2617 // assumes that messages are popped in time order as a proxy for
2618 // measuring the distributed time at this layer.
2619 //
2620 // Leave at least 1 message in here so we can handle reboots and
2621 // messages getting sent twice.
2622 while (messages.size() > 1u &&
2623 messages.begin()->timestamp +
2624 node_data.channels[m->channel_index].time_to_live +
2625 chrono::duration_cast<chrono::nanoseconds>(
2626 chrono::duration<double>(FLAGS_max_network_delay)) <
2627 last_popped_message_time_) {
2628 messages.pop_front();
2629 }
2630 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002631 node_data.channels[m->channel_index].messages.emplace_back(*m);
2632 }
2633 }
2634
2635 messages_.emplace_back(std::move(*m));
2636 return true;
2637}
2638
2639std::string TimestampMapper::DebugString() const {
2640 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07002641 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002642 for (const Message &message : messages_) {
2643 ss << " " << message << "\n";
2644 }
2645 ss << "] queued_until " << queued_until_;
2646 for (const NodeData &ns : nodes_data_) {
2647 if (ns.peer == nullptr) continue;
2648 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
2649 size_t channel_index = 0;
2650 for (const NodeData::ChannelData &channel_data :
2651 ns.peer->nodes_data_[node()].channels) {
2652 if (channel_data.messages.empty()) {
2653 continue;
2654 }
Austin Schuhb000de62020-12-03 22:00:40 -08002655
Austin Schuhd2f96102020-12-01 20:27:29 -08002656 ss << " channel " << channel_index << " [\n";
2657 for (const Message &m : channel_data.messages) {
2658 ss << " " << m << "\n";
2659 }
2660 ss << " ]\n";
2661 ++channel_index;
2662 }
2663 ss << "] queued_until " << ns.peer->queued_until_;
2664 }
2665 return ss.str();
2666}
2667
Austin Schuhee711052020-08-24 16:06:09 -07002668std::string MaybeNodeName(const Node *node) {
2669 if (node != nullptr) {
2670 return node->name()->str() + " ";
2671 }
2672 return "";
2673}
2674
Brian Silvermanf51499a2020-09-21 12:49:08 -07002675} // namespace aos::logger