blob: 02a9fdda7113e71363c1e2241b9cc21ee16ee079 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Austin Schuha36c8902019-12-30 18:07:15 -080010
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070013#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080014#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080015#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "gflags/gflags.h"
18#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080019
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070020#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070021#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070022#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070023#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070024#else
25#define ENABLE_LZMA 0
26#endif
27
28#if ENABLE_LZMA
29#include "aos/events/logging/lzma_encoder.h"
30#endif
Austin Schuh86110712022-09-16 15:40:54 -070031#if ENABLE_S3
32#include "aos/events/logging/s3_fetcher.h"
33#endif
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070034
Austin Schuh48d10d62022-10-16 22:19:23 -070035DEFINE_int32(flush_size, 128 * 1024,
Austin Schuha36c8902019-12-30 18:07:15 -080036 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070037DEFINE_double(
38 flush_period, 5.0,
39 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080040
Austin Schuha040c3f2021-02-13 16:09:07 -080041DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080042 max_network_delay, 1.0,
43 "Max time to assume a message takes to cross the network before we are "
44 "willing to drop it from our buffers and assume it didn't make it. "
45 "Increasing this number can increase memory usage depending on the packet "
46 "loss of your network or if the timestamps aren't logged for a message.");
47
48DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080049 max_out_of_order, -1,
50 "If set, this overrides the max out of order duration for a log file.");
51
Austin Schuh0e8db662021-07-06 10:43:47 -070052DEFINE_bool(workaround_double_headers, true,
53 "Some old log files have two headers at the beginning. Use the "
54 "last header as the actual header.");
55
Brian Smarttea913d42021-12-10 15:02:38 -080056DEFINE_bool(crash_on_corrupt_message, true,
57 "When true, MessageReader will crash the first time a message "
58 "with corrupted format is found. When false, the crash will be "
59 "suppressed, and any remaining readable messages will be "
60 "evaluated to present verified vs corrupted stats.");
61
62DEFINE_bool(ignore_corrupt_messages, false,
63 "When true, and crash_on_corrupt_message is false, then any "
64 "corrupt message found by MessageReader be silently ignored, "
65 "providing access to all uncorrupted messages in a logfile.");
66
Austin Schuh4c3cdb72023-02-11 15:05:26 -080067DEFINE_bool(direct, false,
68 "If true, write using O_DIRECT and write 512 byte aligned blocks "
69 "whenever possible.");
70
Brian Silvermanf51499a2020-09-21 12:49:08 -070071namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070072namespace {
Austin Schuha36c8902019-12-30 18:07:15 -080073
Austin Schuh05b70472020-01-01 17:11:17 -080074namespace chrono = std::chrono;
75
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070076template <typename T>
77void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
78 if (t.has_value()) {
79 *os << *t;
80 } else {
81 *os << "null";
82 }
83}
84} // namespace
85
Austin Schuh48d10d62022-10-16 22:19:23 -070086DetachedBufferWriter::DetachedBufferWriter(std::string_view filename,
87 std::unique_ptr<DataEncoder> encoder)
Austin Schuh4c3cdb72023-02-11 15:05:26 -080088 : filename_(filename),
89 encoder_(std::move(encoder)),
90 supports_odirect_(FLAGS_direct) {
91 iovec_.reserve(10);
Brian Silvermana9f2ec92020-10-06 18:00:53 -070092 if (!util::MkdirPIfSpace(filename, 0777)) {
93 ran_out_of_space_ = true;
94 } else {
James Kuszmaul9776b392023-01-14 14:08:08 -080095 fd_ = open(filename_.c_str(), O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
Brian Silvermana9f2ec92020-10-06 18:00:53 -070096 if (fd_ == -1 && errno == ENOSPC) {
97 ran_out_of_space_ = true;
98 } else {
Austin Schuh58646e22021-08-23 23:51:46 -070099 PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
100 << " for writing";
101 VLOG(1) << "Opened " << this->filename() << " for writing";
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800102
103 flags_ = fcntl(fd_, F_GETFL, 0);
104 PCHECK(flags_ >= 0) << ": Failed to get flags for " << this->filename();
105
106 EnableDirect();
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700107 }
108 }
Austin Schuha36c8902019-12-30 18:07:15 -0800109}
110
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800111void DetachedBufferWriter::EnableDirect() {
112 if (supports_odirect_ && !ODirectEnabled()) {
113 const int new_flags = flags_ | O_DIRECT;
114 // Track if we failed to set O_DIRECT. Note: Austin hasn't seen this call
115 // fail. The write call tends to fail instead.
116 if (fcntl(fd_, F_SETFL, new_flags) == -1) {
117 PLOG(WARNING) << "Failed to set O_DIRECT on " << filename();
118 supports_odirect_ = false;
119 } else {
120 VLOG(1) << "Enabled O_DIRECT on " << filename();
121 flags_ = new_flags;
122 }
123 }
124}
125
126void DetachedBufferWriter::DisableDirect() {
127 if (supports_odirect_ && ODirectEnabled()) {
128 flags_ = flags_ & (~O_DIRECT);
129 PCHECK(fcntl(fd_, F_SETFL, flags_) != -1) << ": Failed to disable O_DIRECT";
130 VLOG(1) << "Disabled O_DIRECT on " << filename();
131 }
132}
133
Austin Schuha36c8902019-12-30 18:07:15 -0800134DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700135 Close();
136 if (ran_out_of_space_) {
137 CHECK(acknowledge_ran_out_of_space_)
138 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -0700139 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700140}
141
Brian Silvermand90905f2020-09-23 14:42:56 -0700142DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700143 *this = std::move(other);
144}
145
Brian Silverman87ac0402020-09-17 14:47:01 -0700146// When other is destroyed "soon" (which it should be because we're getting an
147// rvalue reference to it), it will flush etc all the data we have queued up
148// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700149DetachedBufferWriter &DetachedBufferWriter::operator=(
150 DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700151 std::swap(filename_, other.filename_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700152 std::swap(encoder_, other.encoder_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700153 std::swap(fd_, other.fd_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700154 std::swap(ran_out_of_space_, other.ran_out_of_space_);
155 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700156 std::swap(iovec_, other.iovec_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700157 std::swap(max_write_time_, other.max_write_time_);
158 std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
159 std::swap(max_write_time_messages_, other.max_write_time_messages_);
160 std::swap(total_write_time_, other.total_write_time_);
161 std::swap(total_write_count_, other.total_write_count_);
162 std::swap(total_write_messages_, other.total_write_messages_);
163 std::swap(total_write_bytes_, other.total_write_bytes_);
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800164 std::swap(last_synced_bytes_, other.last_synced_bytes_);
165 std::swap(supports_odirect_, other.supports_odirect_);
166 std::swap(flags_, other.flags_);
167 std::swap(last_flush_time_, other.last_flush_time_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700168 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800169}
170
Austin Schuh8bdfc492023-02-11 12:53:13 -0800171void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *copier,
Austin Schuh7ef11a42023-02-04 17:15:12 -0800172 aos::monotonic_clock::time_point now) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700173 if (ran_out_of_space_) {
174 // We don't want any later data to be written after space becomes
175 // available, so refuse to write anything more once we've dropped data
176 // because we ran out of space.
Austin Schuh48d10d62022-10-16 22:19:23 -0700177 return;
Austin Schuha36c8902019-12-30 18:07:15 -0800178 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700179
Austin Schuh8bdfc492023-02-11 12:53:13 -0800180 const size_t message_size = copier->size();
181 size_t overall_bytes_written = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -0700182
Austin Schuh8bdfc492023-02-11 12:53:13 -0800183 // Keep writing chunks until we've written it all. If we end up with a
184 // partial write, this means we need to flush to disk.
185 do {
186 const size_t bytes_written = encoder_->Encode(copier, overall_bytes_written);
187 CHECK(bytes_written != 0);
188
189 overall_bytes_written += bytes_written;
190 if (overall_bytes_written < message_size) {
191 VLOG(1) << "Flushing because of a partial write, tried to write "
192 << message_size << " wrote " << overall_bytes_written;
193 Flush(now);
194 }
195 } while (overall_bytes_written < message_size);
196
Austin Schuhbd06ae42021-03-31 22:48:21 -0700197 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800198}
199
Brian Silverman0465fcf2020-09-24 00:29:18 -0700200void DetachedBufferWriter::Close() {
201 if (fd_ == -1) {
202 return;
203 }
204 encoder_->Finish();
205 while (encoder_->queue_size() > 0) {
Austin Schuh8bdfc492023-02-11 12:53:13 -0800206 Flush(monotonic_clock::max_time);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700207 }
208 if (close(fd_) == -1) {
209 if (errno == ENOSPC) {
210 ran_out_of_space_ = true;
211 } else {
212 PLOG(ERROR) << "Closing log file failed";
213 }
214 }
215 fd_ = -1;
Austin Schuh58646e22021-08-23 23:51:46 -0700216 VLOG(1) << "Closed " << filename();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700217}
218
Austin Schuh8bdfc492023-02-11 12:53:13 -0800219void DetachedBufferWriter::Flush(aos::monotonic_clock::time_point now) {
220 last_flush_time_ = now;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700221 if (ran_out_of_space_) {
222 // We don't want any later data to be written after space becomes available,
223 // so refuse to write anything more once we've dropped data because we ran
224 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700225 if (encoder_) {
226 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
227 encoder_->Clear(encoder_->queue().size());
228 } else {
229 VLOG(1) << "No queue to ignore";
230 }
231 return;
232 }
233
234 const auto queue = encoder_->queue();
235 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700236 return;
237 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700238
Austin Schuha36c8902019-12-30 18:07:15 -0800239 iovec_.clear();
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800240 size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700241 iovec_.resize(iovec_size);
Austin Schuha36c8902019-12-30 18:07:15 -0800242 size_t counted_size = 0;
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800243
244 // Ok, we now need to figure out if we were aligned, and if we were, how much
245 // of the data we are being asked to write is aligned.
246 //
247 // The file is aligned if it is a multiple of kSector in length. The data is
248 // aligned if it's memory is kSector aligned, and the length is a multiple of
249 // kSector in length.
250 bool aligned = (total_write_bytes_ % kSector) == 0;
251 size_t write_index = 0;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700252 for (size_t i = 0; i < iovec_size; ++i) {
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800253 iovec_[write_index].iov_base = const_cast<uint8_t *>(queue[i].data());
254
255 // Make sure the address is aligned, or give up. This should be uncommon,
256 // but is always possible.
257 if ((reinterpret_cast<size_t>(iovec_[write_index].iov_base) &
258 (kSector - 1)) != 0) {
259 aligned = false;
260 }
261
262 // Now, see if the length is a multiple of kSector. The goal is to figure
263 // out if/how much memory we can write out with O_DIRECT so that only the
264 // last little bit is done with non-direct IO to keep it fast.
265 iovec_[write_index].iov_len = queue[i].size();
266 if ((iovec_[write_index].iov_len % kSector) != 0) {
267 VLOG(1) << "Unaligned length on " << filename();
268 // If we've got over a sector of data to write, write it out with O_DIRECT
269 // and then continue writing the rest unaligned.
270 if (aligned && iovec_[write_index].iov_len > kSector) {
271 const size_t aligned_size =
272 iovec_[write_index].iov_len & (~(kSector - 1));
273 VLOG(1) << "Was aligned, writing last chunk rounded from "
274 << queue[i].size() << " to " << aligned_size;
275 iovec_[write_index].iov_len = aligned_size;
276
277 WriteV(iovec_.data(), i + 1, true, counted_size + aligned_size);
278
279 // Now, everything before here has been written. Make an iovec out of
280 // the last bytes, and keep going.
281 iovec_size -= write_index;
282 iovec_.resize(iovec_size);
283 write_index = 0;
284 counted_size = 0;
285
286 iovec_[write_index].iov_base =
287 const_cast<uint8_t *>(queue[i].data() + aligned_size);
288 iovec_[write_index].iov_len = queue[i].size() - aligned_size;
289 }
290 aligned = false;
291 }
292 VLOG(1) << "Writing " << iovec_[write_index].iov_len << " to "
293 << filename();
294 counted_size += iovec_[write_index].iov_len;
295 ++write_index;
296 }
297
298 // Either write the aligned data if it is all aligned, or write the rest
299 // unaligned if we wrote aligned up above.
300 WriteV(iovec_.data(), iovec_.size(), aligned, counted_size);
301
302 encoder_->Clear(iovec_size);
303}
304
305size_t DetachedBufferWriter::WriteV(struct iovec *iovec_data, size_t iovec_size,
306 bool aligned, size_t counted_size) {
307 // Configure the file descriptor to match the mode we should be in. This is
308 // safe to over-call since it only does the syscall if needed.
309 if (aligned) {
310 EnableDirect();
311 } else {
312 DisableDirect();
Austin Schuha36c8902019-12-30 18:07:15 -0800313 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700314
315 const auto start = aos::monotonic_clock::now();
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800316 const ssize_t written = writev(fd_, iovec_data, iovec_size);
317
318 if (written > 0) {
319 // Flush asynchronously and force the data out of the cache.
320 sync_file_range(fd_, total_write_bytes_, written, SYNC_FILE_RANGE_WRITE);
321 if (last_synced_bytes_ != 0) {
322 // Per Linus' recommendation online on how to do fast file IO, do a
323 // blocking flush of the previous write chunk, and then tell the kernel to
324 // drop the pages from the cache. This makes sure we can't get too far
325 // ahead.
326 sync_file_range(fd_, last_synced_bytes_,
327 total_write_bytes_ - last_synced_bytes_,
328 SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
329 SYNC_FILE_RANGE_WAIT_AFTER);
330 posix_fadvise(fd_, last_synced_bytes_,
331 total_write_bytes_ - last_synced_bytes_,
332 POSIX_FADV_DONTNEED);
333
334 last_synced_bytes_ = total_write_bytes_;
335 }
336 }
337
Brian Silvermanf51499a2020-09-21 12:49:08 -0700338 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700339 HandleWriteReturn(written, counted_size);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700340
Brian Silvermanf51499a2020-09-21 12:49:08 -0700341 UpdateStatsForWrite(end - start, written, iovec_size);
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800342
343 return written;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700344}
345
Brian Silverman0465fcf2020-09-24 00:29:18 -0700346void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
347 size_t write_size) {
348 if (write_return == -1 && errno == ENOSPC) {
349 ran_out_of_space_ = true;
350 return;
351 }
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800352 PCHECK(write_return >= 0) << ": write failed, got " << write_return;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700353 if (write_return < static_cast<ssize_t>(write_size)) {
354 // Sometimes this happens instead of ENOSPC. On a real filesystem, this
355 // never seems to happen in any other case. If we ever want to log to a
356 // socket, this will happen more often. However, until we get there, we'll
357 // just assume it means we ran out of space.
358 ran_out_of_space_ = true;
359 return;
360 }
361}
362
Brian Silvermanf51499a2020-09-21 12:49:08 -0700363void DetachedBufferWriter::UpdateStatsForWrite(
364 aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
365 if (duration > max_write_time_) {
366 max_write_time_ = duration;
367 max_write_time_bytes_ = written;
368 max_write_time_messages_ = iovec_size;
369 }
370 total_write_time_ += duration;
371 ++total_write_count_;
372 total_write_messages_ += iovec_size;
373 total_write_bytes_ += written;
374}
375
Austin Schuhbd06ae42021-03-31 22:48:21 -0700376void DetachedBufferWriter::FlushAtThreshold(
377 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700378 if (ran_out_of_space_) {
379 // We don't want any later data to be written after space becomes available,
380 // so refuse to write anything more once we've dropped data because we ran
381 // out of space.
382 if (encoder_) {
383 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
384 encoder_->Clear(encoder_->queue().size());
385 } else {
386 VLOG(1) << "No queue to ignore";
387 }
388 return;
389 }
390
Austin Schuhbd06ae42021-03-31 22:48:21 -0700391 // We don't want to flush the first time through. Otherwise we will flush as
392 // the log file header might be compressing, defeating any parallelism and
393 // queueing there.
394 if (last_flush_time_ == aos::monotonic_clock::min_time) {
395 last_flush_time_ = now;
396 }
397
Brian Silvermanf51499a2020-09-21 12:49:08 -0700398 // Flush if we are at the max number of iovs per writev, because there's no
399 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700400 // data queued up or if it has been long enough.
Austin Schuh8bdfc492023-02-11 12:53:13 -0800401 while (encoder_->space() == 0 ||
402 encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700403 encoder_->queue_size() >= IOV_MAX ||
404 now > last_flush_time_ +
405 chrono::duration_cast<chrono::nanoseconds>(
406 chrono::duration<double>(FLAGS_flush_period))) {
Austin Schuh8bdfc492023-02-11 12:53:13 -0800407 VLOG(1) << "Chose to flush at " << now << ", last " << last_flush_time_;
408 Flush(now);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700409 }
Austin Schuha36c8902019-12-30 18:07:15 -0800410}
411
Austin Schuhf2d0e682022-10-16 14:20:58 -0700412// Do the magic dance to convert the endianness of the data and append it to the
413// buffer.
414namespace {
415
416// TODO(austin): Look at the generated code to see if building the header is
417// efficient or not.
418template <typename T>
419uint8_t *Push(uint8_t *buffer, const T data) {
420 const T endian_data = flatbuffers::EndianScalar<T>(data);
421 std::memcpy(buffer, &endian_data, sizeof(T));
422 return buffer + sizeof(T);
423}
424
425uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
426 std::memcpy(buffer, data, size);
427 return buffer + size;
428}
429
430uint8_t *Pad(uint8_t *buffer, size_t padding) {
431 std::memset(buffer, 0, padding);
432 return buffer + padding;
433}
434} // namespace
435
436flatbuffers::Offset<MessageHeader> PackRemoteMessage(
437 flatbuffers::FlatBufferBuilder *fbb,
438 const message_bridge::RemoteMessage *msg, int channel_index,
439 const aos::monotonic_clock::time_point monotonic_timestamp_time) {
440 logger::MessageHeader::Builder message_header_builder(*fbb);
441 // Note: this must match the same order as MessageBridgeServer and
442 // PackMessage. We want identical headers to have identical
443 // on-the-wire formats to make comparing them easier.
444
445 message_header_builder.add_channel_index(channel_index);
446
447 message_header_builder.add_queue_index(msg->queue_index());
448 message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
449 message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
450
451 message_header_builder.add_monotonic_remote_time(
452 msg->monotonic_remote_time());
453 message_header_builder.add_realtime_remote_time(msg->realtime_remote_time());
454 message_header_builder.add_remote_queue_index(msg->remote_queue_index());
455
456 message_header_builder.add_monotonic_timestamp_time(
457 monotonic_timestamp_time.time_since_epoch().count());
458
459 return message_header_builder.Finish();
460}
461
462size_t PackRemoteMessageInline(
463 uint8_t *buffer, const message_bridge::RemoteMessage *msg,
464 int channel_index,
Austin Schuh71a40d42023-02-04 21:22:22 -0800465 const aos::monotonic_clock::time_point monotonic_timestamp_time,
466 size_t start_byte, size_t end_byte) {
Austin Schuhf2d0e682022-10-16 14:20:58 -0700467 const flatbuffers::uoffset_t message_size = PackRemoteMessageSize();
Austin Schuh71a40d42023-02-04 21:22:22 -0800468 DCHECK_EQ((start_byte % 8u), 0u);
469 DCHECK_EQ((end_byte % 8u), 0u);
470 DCHECK_LE(start_byte, end_byte);
471 DCHECK_LE(end_byte, message_size);
Austin Schuhf2d0e682022-10-16 14:20:58 -0700472
Austin Schuh71a40d42023-02-04 21:22:22 -0800473 switch (start_byte) {
474 case 0x00u:
475 if ((end_byte) == 0x00u) {
476 break;
477 }
478 // clang-format off
479 // header:
480 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
481 buffer = Push<flatbuffers::uoffset_t>(
482 buffer, message_size - sizeof(flatbuffers::uoffset_t));
483 // +0x04 | 20 00 00 00 | UOffset32 | 0x00000020 (32) Loc: +0x24 | offset to root table `aos.logger.MessageHeader`
484 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x20);
485 [[fallthrough]];
486 case 0x08u:
487 if ((end_byte) == 0x08u) {
488 break;
489 }
490 //
491 // padding:
492 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
493 buffer = Pad(buffer, 6);
494 //
495 // vtable (aos.logger.MessageHeader):
496 // +0x0E | 16 00 | uint16_t | 0x0016 (22) | size of this vtable
497 buffer = Push<flatbuffers::voffset_t>(buffer, 0x16);
498 [[fallthrough]];
499 case 0x10u:
500 if ((end_byte) == 0x10u) {
501 break;
502 }
503 // +0x10 | 3C 00 | uint16_t | 0x003C (60) | size of referring table
504 buffer = Push<flatbuffers::voffset_t>(buffer, 0x3c);
505 // +0x12 | 38 00 | VOffset16 | 0x0038 (56) | offset to field `channel_index` (id: 0)
506 buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
507 // +0x14 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `monotonic_sent_time` (id: 1)
508 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
509 // +0x16 | 24 00 | VOffset16 | 0x0024 (36) | offset to field `realtime_sent_time` (id: 2)
510 buffer = Push<flatbuffers::voffset_t>(buffer, 0x24);
511 [[fallthrough]];
512 case 0x18u:
513 if ((end_byte) == 0x18u) {
514 break;
515 }
516 // +0x18 | 34 00 | VOffset16 | 0x0034 (52) | offset to field `queue_index` (id: 3)
517 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
518 // +0x1A | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
519 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
520 // +0x1C | 1C 00 | VOffset16 | 0x001C (28) | offset to field `monotonic_remote_time` (id: 5)
521 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
522 // +0x1E | 14 00 | VOffset16 | 0x0014 (20) | offset to field `realtime_remote_time` (id: 6)
523 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
524 [[fallthrough]];
525 case 0x20u:
526 if ((end_byte) == 0x20u) {
527 break;
528 }
529 // +0x20 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `remote_queue_index` (id: 7)
530 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
531 // +0x22 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `monotonic_timestamp_time` (id: 8)
532 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
533 //
534 // root_table (aos.logger.MessageHeader):
535 // +0x24 | 16 00 00 00 | SOffset32 | 0x00000016 (22) Loc: +0x0E | offset to vtable
536 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x16);
537 [[fallthrough]];
538 case 0x28u:
539 if ((end_byte) == 0x28u) {
540 break;
541 }
542 // +0x28 | F6 0B D8 11 A4 A8 B1 71 | int64_t | 0x71B1A8A411D80BF6 (8192514619791117302) | table field `monotonic_timestamp_time` (Long)
543 buffer = Push<int64_t>(buffer,
544 monotonic_timestamp_time.time_since_epoch().count());
545 [[fallthrough]];
546 case 0x30u:
547 if ((end_byte) == 0x30u) {
548 break;
549 }
550 // +0x30 | 00 00 00 00 | uint8_t[4] | .... | padding
551 // TODO(austin): Can we re-arrange the order to ditch the padding?
552 // (Answer is yes, but what is the impact elsewhere? It will change the
553 // binary format)
554 buffer = Pad(buffer, 4);
555 // +0x34 | 75 00 00 00 | uint32_t | 0x00000075 (117) | table field `remote_queue_index` (UInt)
556 buffer = Push<uint32_t>(buffer, msg->remote_queue_index());
557 [[fallthrough]];
558 case 0x38u:
559 if ((end_byte) == 0x38u) {
560 break;
561 }
562 // +0x38 | AA B0 43 0A 35 BE FA D2 | int64_t | 0xD2FABE350A43B0AA (-3244071446552268630) | table field `realtime_remote_time` (Long)
563 buffer = Push<int64_t>(buffer, msg->realtime_remote_time());
564 [[fallthrough]];
565 case 0x40u:
566 if ((end_byte) == 0x40u) {
567 break;
568 }
569 // +0x40 | D5 40 30 F3 C1 A7 26 1D | int64_t | 0x1D26A7C1F33040D5 (2100550727665467605) | table field `monotonic_remote_time` (Long)
570 buffer = Push<int64_t>(buffer, msg->monotonic_remote_time());
571 [[fallthrough]];
572 case 0x48u:
573 if ((end_byte) == 0x48u) {
574 break;
575 }
576 // +0x48 | 5B 25 32 A1 4A E8 46 CA | int64_t | 0xCA46E84AA132255B (-3871151422448720549) | table field `realtime_sent_time` (Long)
577 buffer = Push<int64_t>(buffer, msg->realtime_sent_time());
578 [[fallthrough]];
579 case 0x50u:
580 if ((end_byte) == 0x50u) {
581 break;
582 }
583 // +0x50 | 49 7D 45 1F 8C 36 6B A3 | int64_t | 0xA36B368C1F457D49 (-6671178447571288759) | table field `monotonic_sent_time` (Long)
584 buffer = Push<int64_t>(buffer, msg->monotonic_sent_time());
585 [[fallthrough]];
586 case 0x58u:
587 if ((end_byte) == 0x58u) {
588 break;
589 }
590 // +0x58 | 33 00 00 00 | uint32_t | 0x00000033 (51) | table field `queue_index` (UInt)
591 buffer = Push<uint32_t>(buffer, msg->queue_index());
592 // +0x5C | 76 00 00 00 | uint32_t | 0x00000076 (118) | table field `channel_index` (UInt)
593 buffer = Push<uint32_t>(buffer, channel_index);
594 // clang-format on
595 [[fallthrough]];
596 case 0x60u:
597 if ((end_byte) == 0x60u) {
598 break;
599 }
600 }
Austin Schuhf2d0e682022-10-16 14:20:58 -0700601
Austin Schuh71a40d42023-02-04 21:22:22 -0800602 return end_byte - start_byte;
Austin Schuhf2d0e682022-10-16 14:20:58 -0700603}
604
Austin Schuha36c8902019-12-30 18:07:15 -0800605flatbuffers::Offset<MessageHeader> PackMessage(
606 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
607 int channel_index, LogType log_type) {
608 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
609
610 switch (log_type) {
611 case LogType::kLogMessage:
612 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800613 case LogType::kLogRemoteMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700614 // Since the timestamps are 8 byte aligned, we are going to end up adding
615 // padding in the middle of the message to pad everything out to 8 byte
616 // alignment. That's rather wasteful. To make things efficient to mmap
617 // while reading uncompressed logs, we'd actually rather the message be
618 // aligned. So, force 8 byte alignment (enough to preserve alignment
619 // inside the nested message so that we can read it without moving it)
620 // here.
621 fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8);
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700622 data_offset = fbb->CreateVector(
623 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800624 break;
625
626 case LogType::kLogDeliveryTimeOnly:
627 break;
628 }
629
630 MessageHeader::Builder message_header_builder(*fbb);
631 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800632
Austin Schuhfa30c352022-10-16 11:12:02 -0700633 // These are split out into very explicit serialization calls because the
634 // order here changes the order things are written out on the wire, and we
635 // want to control and understand it here. Changing the order can increase
636 // the amount of padding bytes in the middle.
637 //
James Kuszmaul9776b392023-01-14 14:08:08 -0800638 // It is also easier to follow... And doesn't actually make things much
639 // bigger.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800640 switch (log_type) {
641 case LogType::kLogRemoteMessage:
642 message_header_builder.add_queue_index(context.remote_queue_index);
Austin Schuhfa30c352022-10-16 11:12:02 -0700643 message_header_builder.add_data(data_offset);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800644 message_header_builder.add_monotonic_sent_time(
645 context.monotonic_remote_time.time_since_epoch().count());
646 message_header_builder.add_realtime_sent_time(
647 context.realtime_remote_time.time_since_epoch().count());
648 break;
649
Austin Schuh6f3babe2020-01-26 20:34:50 -0800650 case LogType::kLogDeliveryTimeOnly:
651 message_header_builder.add_queue_index(context.queue_index);
652 message_header_builder.add_monotonic_sent_time(
653 context.monotonic_event_time.time_since_epoch().count());
654 message_header_builder.add_realtime_sent_time(
655 context.realtime_event_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800656 message_header_builder.add_monotonic_remote_time(
657 context.monotonic_remote_time.time_since_epoch().count());
658 message_header_builder.add_realtime_remote_time(
659 context.realtime_remote_time.time_since_epoch().count());
660 message_header_builder.add_remote_queue_index(context.remote_queue_index);
661 break;
Austin Schuhfa30c352022-10-16 11:12:02 -0700662
663 case LogType::kLogMessage:
664 message_header_builder.add_queue_index(context.queue_index);
665 message_header_builder.add_data(data_offset);
666 message_header_builder.add_monotonic_sent_time(
667 context.monotonic_event_time.time_since_epoch().count());
668 message_header_builder.add_realtime_sent_time(
669 context.realtime_event_time.time_since_epoch().count());
670 break;
671
672 case LogType::kLogMessageAndDeliveryTime:
673 message_header_builder.add_queue_index(context.queue_index);
674 message_header_builder.add_remote_queue_index(context.remote_queue_index);
675 message_header_builder.add_monotonic_sent_time(
676 context.monotonic_event_time.time_since_epoch().count());
677 message_header_builder.add_realtime_sent_time(
678 context.realtime_event_time.time_since_epoch().count());
679 message_header_builder.add_monotonic_remote_time(
680 context.monotonic_remote_time.time_since_epoch().count());
681 message_header_builder.add_realtime_remote_time(
682 context.realtime_remote_time.time_since_epoch().count());
683 message_header_builder.add_data(data_offset);
684 break;
Austin Schuha36c8902019-12-30 18:07:15 -0800685 }
686
687 return message_header_builder.Finish();
688}
689
Austin Schuhfa30c352022-10-16 11:12:02 -0700690flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) {
691 switch (log_type) {
692 case LogType::kLogMessage:
693 return
694 // Root table size + offset.
695 sizeof(flatbuffers::uoffset_t) * 2 +
696 // 6 padding bytes to pad the header out properly.
697 6 +
698 // vtable header (size + size of table)
699 sizeof(flatbuffers::voffset_t) * 2 +
700 // offsets to all the fields.
701 sizeof(flatbuffers::voffset_t) * 5 +
702 // pointer to vtable
703 sizeof(flatbuffers::soffset_t) +
704 // pointer to data
705 sizeof(flatbuffers::uoffset_t) +
706 // realtime_sent_time, monotonic_sent_time
707 sizeof(int64_t) * 2 +
708 // queue_index, channel_index
709 sizeof(uint32_t) * 2;
710
711 case LogType::kLogDeliveryTimeOnly:
712 return
713 // Root table size + offset.
714 sizeof(flatbuffers::uoffset_t) * 2 +
715 // 6 padding bytes to pad the header out properly.
716 4 +
717 // vtable header (size + size of table)
718 sizeof(flatbuffers::voffset_t) * 2 +
719 // offsets to all the fields.
720 sizeof(flatbuffers::voffset_t) * 8 +
721 // pointer to vtable
722 sizeof(flatbuffers::soffset_t) +
723 // remote_queue_index
724 sizeof(uint32_t) +
725 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
726 // monotonic_sent_time
727 sizeof(int64_t) * 4 +
728 // queue_index, channel_index
729 sizeof(uint32_t) * 2;
730
731 case LogType::kLogMessageAndDeliveryTime:
732 return
733 // Root table size + offset.
734 sizeof(flatbuffers::uoffset_t) * 2 +
735 // 4 padding bytes to pad the header out properly.
736 4 +
737 // vtable header (size + size of table)
738 sizeof(flatbuffers::voffset_t) * 2 +
739 // offsets to all the fields.
740 sizeof(flatbuffers::voffset_t) * 8 +
741 // pointer to vtable
742 sizeof(flatbuffers::soffset_t) +
743 // pointer to data
744 sizeof(flatbuffers::uoffset_t) +
745 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
746 // monotonic_sent_time
747 sizeof(int64_t) * 4 +
748 // remote_queue_index, queue_index, channel_index
749 sizeof(uint32_t) * 3;
750
751 case LogType::kLogRemoteMessage:
752 return
753 // Root table size + offset.
754 sizeof(flatbuffers::uoffset_t) * 2 +
755 // 6 padding bytes to pad the header out properly.
756 6 +
757 // vtable header (size + size of table)
758 sizeof(flatbuffers::voffset_t) * 2 +
759 // offsets to all the fields.
760 sizeof(flatbuffers::voffset_t) * 5 +
761 // pointer to vtable
762 sizeof(flatbuffers::soffset_t) +
763 // realtime_sent_time, monotonic_sent_time
764 sizeof(int64_t) * 2 +
765 // pointer to data
766 sizeof(flatbuffers::uoffset_t) +
767 // queue_index, channel_index
768 sizeof(uint32_t) * 2;
769 }
770 LOG(FATAL);
771}
772
James Kuszmaul9776b392023-01-14 14:08:08 -0800773flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size) {
Austin Schuhfa30c352022-10-16 11:12:02 -0700774 static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
775 "Update size logic please.");
776 const flatbuffers::uoffset_t aligned_data_length =
Austin Schuh48d10d62022-10-16 22:19:23 -0700777 ((data_size + 7) & 0xfffffff8u);
Austin Schuhfa30c352022-10-16 11:12:02 -0700778 switch (log_type) {
779 case LogType::kLogDeliveryTimeOnly:
780 return PackMessageHeaderSize(log_type);
781
782 case LogType::kLogMessage:
783 case LogType::kLogMessageAndDeliveryTime:
784 case LogType::kLogRemoteMessage:
785 return PackMessageHeaderSize(log_type) +
786 // Vector...
787 sizeof(flatbuffers::uoffset_t) + aligned_data_length;
788 }
789 LOG(FATAL);
790}
791
Austin Schuhfa30c352022-10-16 11:12:02 -0700792size_t PackMessageInline(uint8_t *buffer, const Context &context,
Austin Schuh71a40d42023-02-04 21:22:22 -0800793 int channel_index, LogType log_type, size_t start_byte,
794 size_t end_byte) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700795 // TODO(austin): Figure out how to copy directly from shared memory instead of
796 // first into the fetcher's memory and then into here. That would save a lot
797 // of memory.
Austin Schuhfa30c352022-10-16 11:12:02 -0700798 const flatbuffers::uoffset_t message_size =
Austin Schuh48d10d62022-10-16 22:19:23 -0700799 PackMessageSize(log_type, context.size);
Austin Schuh71a40d42023-02-04 21:22:22 -0800800 DCHECK_EQ((message_size % 8), 0u) << ": Non 8 byte length...";
801 DCHECK_EQ((start_byte % 8u), 0u);
802 DCHECK_EQ((end_byte % 8u), 0u);
803 DCHECK_LE(start_byte, end_byte);
804 DCHECK_LE(end_byte, message_size);
Austin Schuhfa30c352022-10-16 11:12:02 -0700805
806 // Pack all the data in. This is brittle but easy to change. Use the
807 // InlinePackMessage.Equivilent unit test to verify everything matches.
808 switch (log_type) {
809 case LogType::kLogMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -0800810 switch (start_byte) {
811 case 0x00u:
812 if ((end_byte) == 0x00u) {
813 break;
814 }
815 // clang-format off
816 // header:
817 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
818 buffer = Push<flatbuffers::uoffset_t>(
819 buffer, message_size - sizeof(flatbuffers::uoffset_t));
820
821 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
822 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
823 [[fallthrough]];
824 case 0x08u:
825 if ((end_byte) == 0x08u) {
826 break;
827 }
828 //
829 // padding:
830 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
831 buffer = Pad(buffer, 6);
832 //
833 // vtable (aos.logger.MessageHeader):
834 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
835 buffer = Push<flatbuffers::voffset_t>(buffer, 0xe);
836 [[fallthrough]];
837 case 0x10u:
838 if ((end_byte) == 0x10u) {
839 break;
840 }
841 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
842 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
843 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
844 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
845 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
846 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
847 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
848 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
849 [[fallthrough]];
850 case 0x18u:
851 if ((end_byte) == 0x18u) {
852 break;
853 }
854 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
855 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
856 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
857 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
858 //
859 // root_table (aos.logger.MessageHeader):
860 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
861 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e);
862 [[fallthrough]];
863 case 0x20u:
864 if ((end_byte) == 0x20u) {
865 break;
866 }
867 // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long)
868 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
869 [[fallthrough]];
870 case 0x28u:
871 if ((end_byte) == 0x28u) {
872 break;
873 }
874 // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long)
875 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
876 [[fallthrough]];
877 case 0x30u:
878 if ((end_byte) == 0x30u) {
879 break;
880 }
881 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
882 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c);
883 // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt)
884 buffer = Push<uint32_t>(buffer, context.queue_index);
885 [[fallthrough]];
886 case 0x38u:
887 if ((end_byte) == 0x38u) {
888 break;
889 }
890 // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt)
891 buffer = Push<uint32_t>(buffer, channel_index);
892 //
893 // vector (aos.logger.MessageHeader.data):
894 // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items)
895 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
896 [[fallthrough]];
897 case 0x40u:
898 if ((end_byte) == 0x40u) {
899 break;
900 }
901 [[fallthrough]];
902 default:
903 // +0x40 | FF | uint8_t | 0xFF (255) | value[0]
904 // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1]
905 // +0x42 | EE | uint8_t | 0xEE (238) | value[2]
906 // +0x43 | 00 | uint8_t | 0x00 (0) | value[3]
907 // +0x44 | 20 | uint8_t | 0x20 (32) | value[4]
908 // +0x45 | 4D | uint8_t | 0x4D (77) | value[5]
909 // +0x46 | FF | uint8_t | 0xFF (255) | value[6]
910 // +0x47 | 25 | uint8_t | 0x25 (37) | value[7]
911 // +0x48 | 3C | uint8_t | 0x3C (60) | value[8]
912 // +0x49 | 17 | uint8_t | 0x17 (23) | value[9]
913 // +0x4A | 65 | uint8_t | 0x65 (101) | value[10]
914 // +0x4B | 2F | uint8_t | 0x2F (47) | value[11]
915 // +0x4C | 63 | uint8_t | 0x63 (99) | value[12]
916 // +0x4D | 58 | uint8_t | 0x58 (88) | value[13]
917 //
918 // padding:
919 // +0x4E | 00 00 | uint8_t[2] | .. | padding
920 // clang-format on
921 if (start_byte <= 0x40 && end_byte == message_size) {
922 // The easy one, slap it all down.
923 buffer = PushBytes(buffer, context.data, context.size);
924 buffer =
925 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
926 } else {
927 const size_t data_start_byte =
928 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
929 const size_t data_end_byte = end_byte - 0x40;
930 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
931 if (data_start_byte < padded_size) {
932 buffer = PushBytes(
933 buffer,
934 reinterpret_cast<const uint8_t *>(context.data) +
935 data_start_byte,
936 std::min(context.size, data_end_byte) - data_start_byte);
937 if (data_end_byte == padded_size) {
938 // We can only pad the last 7 bytes, so this only gets written
939 // if we write the last byte.
940 buffer = Pad(buffer,
941 ((context.size + 7) & 0xfffffff8u) - context.size);
942 }
943 }
944 }
945 break;
946 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700947 break;
948
949 case LogType::kLogDeliveryTimeOnly:
Austin Schuh71a40d42023-02-04 21:22:22 -0800950 switch (start_byte) {
951 case 0x00u:
952 if ((end_byte) == 0x00u) {
953 break;
954 }
955 // clang-format off
956 // header:
957 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
958 buffer = Push<flatbuffers::uoffset_t>(
959 buffer, message_size - sizeof(flatbuffers::uoffset_t));
960 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
961 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
Austin Schuhfa30c352022-10-16 11:12:02 -0700962
Austin Schuh71a40d42023-02-04 21:22:22 -0800963 [[fallthrough]];
964 case 0x08u:
965 if ((end_byte) == 0x08u) {
966 break;
967 }
968 //
969 // padding:
970 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
971 buffer = Pad(buffer, 4);
972 //
973 // vtable (aos.logger.MessageHeader):
974 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
975 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
976 // +0x0E | 30 00 | uint16_t | 0x0030 (48) | size of referring table
977 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
978 [[fallthrough]];
979 case 0x10u:
980 if ((end_byte) == 0x10u) {
981 break;
982 }
983 // +0x10 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `channel_index` (id: 0)
984 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
985 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
986 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
987 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
988 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
989 // +0x16 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `queue_index` (id: 3)
990 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
991 [[fallthrough]];
992 case 0x18u:
993 if ((end_byte) == 0x18u) {
994 break;
995 }
996 // +0x18 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
997 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
998 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
999 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
1000 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
1001 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
1002 // +0x1E | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
1003 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
1004 [[fallthrough]];
1005 case 0x20u:
1006 if ((end_byte) == 0x20u) {
1007 break;
1008 }
1009 //
1010 // root_table (aos.logger.MessageHeader):
1011 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
1012 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
1013 // +0x24 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `remote_queue_index` (UInt)
1014 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1015 [[fallthrough]];
1016 case 0x28u:
1017 if ((end_byte) == 0x28u) {
1018 break;
1019 }
1020 // +0x28 | C6 85 F1 AB 83 B5 CD EB | int64_t | 0xEBCDB583ABF185C6 (-1455307527440726586) | table field `realtime_remote_time` (Long)
1021 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
1022 [[fallthrough]];
1023 case 0x30u:
1024 if ((end_byte) == 0x30u) {
1025 break;
1026 }
1027 // +0x30 | 47 24 D3 97 1E 42 2D 99 | int64_t | 0x992D421E97D32447 (-7409193112790948793) | table field `monotonic_remote_time` (Long)
1028 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
1029 [[fallthrough]];
1030 case 0x38u:
1031 if ((end_byte) == 0x38u) {
1032 break;
1033 }
1034 // +0x38 | C8 B9 A7 AB 79 F2 CD 60 | int64_t | 0x60CDF279ABA7B9C8 (6975498002251626952) | table field `realtime_sent_time` (Long)
1035 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
1036 [[fallthrough]];
1037 case 0x40u:
1038 if ((end_byte) == 0x40u) {
1039 break;
1040 }
1041 // +0x40 | EA 8F 2A 0F AF 01 7A AB | int64_t | 0xAB7A01AF0F2A8FEA (-6090553694679822358) | table field `monotonic_sent_time` (Long)
1042 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
1043 [[fallthrough]];
1044 case 0x48u:
1045 if ((end_byte) == 0x48u) {
1046 break;
1047 }
1048 // +0x48 | F5 00 00 00 | uint32_t | 0x000000F5 (245) | table field `queue_index` (UInt)
1049 buffer = Push<uint32_t>(buffer, context.queue_index);
1050 // +0x4C | 88 00 00 00 | uint32_t | 0x00000088 (136) | table field `channel_index` (UInt)
1051 buffer = Push<uint32_t>(buffer, channel_index);
1052
1053 // clang-format on
1054 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001055 break;
1056
1057 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh71a40d42023-02-04 21:22:22 -08001058 switch (start_byte) {
1059 case 0x00u:
1060 if ((end_byte) == 0x00u) {
1061 break;
1062 }
1063 // clang-format off
1064 // header:
1065 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
1066 buffer = Push<flatbuffers::uoffset_t>(
1067 buffer, message_size - sizeof(flatbuffers::uoffset_t));
1068 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
1069 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
1070 [[fallthrough]];
1071 case 0x08u:
1072 if ((end_byte) == 0x08u) {
1073 break;
1074 }
1075 //
1076 // padding:
1077 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
1078 buffer = Pad(buffer, 4);
1079 //
1080 // vtable (aos.logger.MessageHeader):
1081 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
1082 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
1083 // +0x0E | 34 00 | uint16_t | 0x0034 (52) | size of referring table
1084 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
1085 [[fallthrough]];
1086 case 0x10u:
1087 if ((end_byte) == 0x10u) {
1088 break;
1089 }
1090 // +0x10 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `channel_index` (id: 0)
1091 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
1092 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
1093 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
1094 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
1095 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
1096 // +0x16 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `queue_index` (id: 3)
1097 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
1098 [[fallthrough]];
1099 case 0x18u:
1100 if ((end_byte) == 0x18u) {
1101 break;
1102 }
1103 // +0x18 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `data` (id: 4)
1104 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
1105 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
1106 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
1107 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
1108 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
1109 // +0x1E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `remote_queue_index` (id: 7)
1110 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
1111 [[fallthrough]];
1112 case 0x20u:
1113 if ((end_byte) == 0x20u) {
1114 break;
1115 }
1116 //
1117 // root_table (aos.logger.MessageHeader):
1118 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
1119 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
1120 // +0x24 | 30 00 00 00 | UOffset32 | 0x00000030 (48) Loc: +0x54 | offset to field `data` (vector)
1121 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x30);
1122 [[fallthrough]];
1123 case 0x28u:
1124 if ((end_byte) == 0x28u) {
1125 break;
1126 }
1127 // +0x28 | C4 C8 87 BF 40 6C 1F 29 | int64_t | 0x291F6C40BF87C8C4 (2963206105180129476) | table field `realtime_remote_time` (Long)
1128 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
1129 [[fallthrough]];
1130 case 0x30u:
1131 if ((end_byte) == 0x30u) {
1132 break;
1133 }
1134 // +0x30 | 0F 00 26 FD D2 6D C0 1F | int64_t | 0x1FC06DD2FD26000F (2287949363661897743) | table field `monotonic_remote_time` (Long)
1135 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
1136 [[fallthrough]];
1137 case 0x38u:
1138 if ((end_byte) == 0x38u) {
1139 break;
1140 }
1141 // +0x38 | 29 75 09 C0 73 73 BF 88 | int64_t | 0x88BF7373C0097529 (-8593022623019338455) | table field `realtime_sent_time` (Long)
1142 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
1143 [[fallthrough]];
1144 case 0x40u:
1145 if ((end_byte) == 0x40u) {
1146 break;
1147 }
1148 // +0x40 | 6D 8A AE 04 50 25 9C E9 | int64_t | 0xE99C255004AE8A6D (-1613373540899321235) | table field `monotonic_sent_time` (Long)
1149 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
1150 [[fallthrough]];
1151 case 0x48u:
1152 if ((end_byte) == 0x48u) {
1153 break;
1154 }
1155 // +0x48 | 47 00 00 00 | uint32_t | 0x00000047 (71) | table field `remote_queue_index` (UInt)
1156 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1157 // +0x4C | 4C 00 00 00 | uint32_t | 0x0000004C (76) | table field `queue_index` (UInt)
1158 buffer = Push<uint32_t>(buffer, context.queue_index);
1159 [[fallthrough]];
1160 case 0x50u:
1161 if ((end_byte) == 0x50u) {
1162 break;
1163 }
1164 // +0x50 | 72 00 00 00 | uint32_t | 0x00000072 (114) | table field `channel_index` (UInt)
1165 buffer = Push<uint32_t>(buffer, channel_index);
1166 //
1167 // vector (aos.logger.MessageHeader.data):
1168 // +0x54 | 07 00 00 00 | uint32_t | 0x00000007 (7) | length of vector (# items)
1169 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
1170 [[fallthrough]];
1171 case 0x58u:
1172 if ((end_byte) == 0x58u) {
1173 break;
1174 }
1175 [[fallthrough]];
1176 default:
1177 // +0x58 | B1 | uint8_t | 0xB1 (177) | value[0]
1178 // +0x59 | 4A | uint8_t | 0x4A (74) | value[1]
1179 // +0x5A | 50 | uint8_t | 0x50 (80) | value[2]
1180 // +0x5B | 24 | uint8_t | 0x24 (36) | value[3]
1181 // +0x5C | AF | uint8_t | 0xAF (175) | value[4]
1182 // +0x5D | C8 | uint8_t | 0xC8 (200) | value[5]
1183 // +0x5E | D5 | uint8_t | 0xD5 (213) | value[6]
1184 //
1185 // padding:
1186 // +0x5F | 00 | uint8_t[1] | . | padding
1187 // clang-format on
1188
1189 if (start_byte <= 0x58 && end_byte == message_size) {
1190 // The easy one, slap it all down.
1191 buffer = PushBytes(buffer, context.data, context.size);
1192 buffer =
1193 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1194 } else {
1195 const size_t data_start_byte =
1196 start_byte < 0x58 ? 0x0u : (start_byte - 0x58);
1197 const size_t data_end_byte = end_byte - 0x58;
1198 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1199 if (data_start_byte < padded_size) {
1200 buffer = PushBytes(
1201 buffer,
1202 reinterpret_cast<const uint8_t *>(context.data) +
1203 data_start_byte,
1204 std::min(context.size, data_end_byte) - data_start_byte);
1205 if (data_end_byte == padded_size) {
1206 // We can only pad the last 7 bytes, so this only gets written
1207 // if we write the last byte.
1208 buffer = Pad(buffer,
1209 ((context.size + 7) & 0xfffffff8u) - context.size);
1210 }
1211 }
1212 }
1213
1214 break;
1215 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001216
1217 break;
1218
1219 case LogType::kLogRemoteMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -08001220 switch (start_byte) {
1221 case 0x00u:
1222 if ((end_byte) == 0x00u) {
1223 break;
1224 }
1225 // This is the message we need to recreate.
1226 //
1227 // clang-format off
1228 // header:
1229 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
1230 buffer = Push<flatbuffers::uoffset_t>(
1231 buffer, message_size - sizeof(flatbuffers::uoffset_t));
1232 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
1233 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
1234 [[fallthrough]];
1235 case 0x08u:
1236 if ((end_byte) == 0x08u) {
1237 break;
1238 }
1239 //
1240 // padding:
1241 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1242 buffer = Pad(buffer, 6);
1243 //
1244 // vtable (aos.logger.MessageHeader):
1245 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
1246 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e);
1247 [[fallthrough]];
1248 case 0x10u:
1249 if ((end_byte) == 0x10u) {
1250 break;
1251 }
1252 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
1253 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
1254 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
1255 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
1256 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
1257 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
1258 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
1259 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
1260 [[fallthrough]];
1261 case 0x18u:
1262 if ((end_byte) == 0x18u) {
1263 break;
1264 }
1265 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
1266 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
1267 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
1268 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
1269 //
1270 // root_table (aos.logger.MessageHeader):
1271 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
1272 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E);
1273 [[fallthrough]];
1274 case 0x20u:
1275 if ((end_byte) == 0x20u) {
1276 break;
1277 }
1278 // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long)
1279 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
1280 [[fallthrough]];
1281 case 0x28u:
1282 if ((end_byte) == 0x28u) {
1283 break;
1284 }
1285 // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long)
1286 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
1287 [[fallthrough]];
1288 case 0x30u:
1289 if ((end_byte) == 0x30u) {
1290 break;
1291 }
1292 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
1293 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C);
1294 // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt)
1295 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1296 [[fallthrough]];
1297 case 0x38u:
1298 if ((end_byte) == 0x38u) {
1299 break;
1300 }
1301 // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt)
1302 buffer = Push<uint32_t>(buffer, channel_index);
1303 //
1304 // vector (aos.logger.MessageHeader.data):
1305 // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items)
1306 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
1307 [[fallthrough]];
1308 case 0x40u:
1309 if ((end_byte) == 0x40u) {
1310 break;
1311 }
1312 [[fallthrough]];
1313 default:
1314 // +0x40 | 38 | uint8_t | 0x38 (56) | value[0]
1315 // +0x41 | 1A | uint8_t | 0x1A (26) | value[1]
1316 // ...
1317 // +0x58 | 90 | uint8_t | 0x90 (144) | value[24]
1318 // +0x59 | 92 | uint8_t | 0x92 (146) | value[25]
1319 //
1320 // padding:
1321 // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1322 // clang-format on
1323 if (start_byte <= 0x40 && end_byte == message_size) {
1324 // The easy one, slap it all down.
1325 buffer = PushBytes(buffer, context.data, context.size);
1326 buffer =
1327 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1328 } else {
1329 const size_t data_start_byte =
1330 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
1331 const size_t data_end_byte = end_byte - 0x40;
1332 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1333 if (data_start_byte < padded_size) {
1334 buffer = PushBytes(
1335 buffer,
1336 reinterpret_cast<const uint8_t *>(context.data) +
1337 data_start_byte,
1338 std::min(context.size, data_end_byte) - data_start_byte);
1339 if (data_end_byte == padded_size) {
1340 // We can only pad the last 7 bytes, so this only gets written
1341 // if we write the last byte.
1342 buffer = Pad(buffer,
1343 ((context.size + 7) & 0xfffffff8u) - context.size);
1344 }
1345 }
1346 }
1347 break;
1348 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001349 }
1350
Austin Schuh71a40d42023-02-04 21:22:22 -08001351 return end_byte - start_byte;
Austin Schuhfa30c352022-10-16 11:12:02 -07001352}
1353
Austin Schuhcd368422021-11-22 21:23:29 -08001354SpanReader::SpanReader(std::string_view filename, bool quiet)
1355 : filename_(filename) {
Austin Schuh86110712022-09-16 15:40:54 -07001356 static constexpr std::string_view kS3 = "s3:";
1357 if (filename.substr(0, kS3.size()) == kS3) {
1358#if ENABLE_S3
1359 decoder_ = std::make_unique<S3Fetcher>(filename);
1360#else
1361 LOG(FATAL) << "Reading files from S3 not supported on this platform";
1362#endif
1363 } else {
1364 decoder_ = std::make_unique<DummyDecoder>(filename);
1365 }
Tyler Chatow2015bc62021-08-04 21:15:09 -07001366
1367 static constexpr std::string_view kXz = ".xz";
James Kuszmauldd0a5042021-10-28 23:38:04 -07001368 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001369 if (filename.substr(filename.size() - kXz.size()) == kXz) {
1370#if ENABLE_LZMA
Austin Schuhcd368422021-11-22 21:23:29 -08001371 decoder_ =
1372 std::make_unique<ThreadedLzmaDecoder>(std::move(decoder_), quiet);
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001373#else
Austin Schuhcd368422021-11-22 21:23:29 -08001374 (void)quiet;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001375 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
1376#endif
James Kuszmauldd0a5042021-10-28 23:38:04 -07001377 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
1378 decoder_ = std::make_unique<SnappyDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001379 }
Austin Schuh05b70472020-01-01 17:11:17 -08001380}
1381
Austin Schuhcf5f6442021-07-06 10:43:28 -07001382absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001383 // Make sure we have enough for the size.
1384 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
1385 if (!ReadBlock()) {
1386 return absl::Span<const uint8_t>();
1387 }
1388 }
1389
1390 // Now make sure we have enough for the message.
1391 const size_t data_size =
1392 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1393 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -08001394 if (data_size == sizeof(flatbuffers::uoffset_t)) {
1395 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
1396 LOG(ERROR) << " Rest of log file is "
1397 << absl::BytesToHexString(std::string_view(
1398 reinterpret_cast<const char *>(data_.data() +
1399 consumed_data_),
1400 data_.size() - consumed_data_));
1401 return absl::Span<const uint8_t>();
1402 }
Austin Schuh05b70472020-01-01 17:11:17 -08001403 while (data_.size() < consumed_data_ + data_size) {
1404 if (!ReadBlock()) {
1405 return absl::Span<const uint8_t>();
1406 }
1407 }
1408
1409 // And return it, consuming the data.
1410 const uint8_t *data_ptr = data_.data() + consumed_data_;
1411
Austin Schuh05b70472020-01-01 17:11:17 -08001412 return absl::Span<const uint8_t>(data_ptr, data_size);
1413}
1414
Austin Schuhcf5f6442021-07-06 10:43:28 -07001415void SpanReader::ConsumeMessage() {
Brian Smarttea913d42021-12-10 15:02:38 -08001416 size_t consumed_size =
Austin Schuhcf5f6442021-07-06 10:43:28 -07001417 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1418 sizeof(flatbuffers::uoffset_t);
Brian Smarttea913d42021-12-10 15:02:38 -08001419 consumed_data_ += consumed_size;
1420 total_consumed_ += consumed_size;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001421}
1422
1423absl::Span<const uint8_t> SpanReader::ReadMessage() {
1424 absl::Span<const uint8_t> result = PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001425 if (!result.empty()) {
Austin Schuhcf5f6442021-07-06 10:43:28 -07001426 ConsumeMessage();
Brian Smarttea913d42021-12-10 15:02:38 -08001427 } else {
1428 is_finished_ = true;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001429 }
1430 return result;
1431}
1432
Austin Schuh05b70472020-01-01 17:11:17 -08001433bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001434 // This is the amount of data we grab at a time. Doing larger chunks minimizes
1435 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -08001436 constexpr size_t kReadSize = 256 * 1024;
1437
1438 // Strip off any unused data at the front.
1439 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001440 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -08001441 consumed_data_ = 0;
1442 }
1443
1444 const size_t starting_size = data_.size();
1445
1446 // This should automatically grow the backing store. It won't shrink if we
1447 // get a small chunk later. This reduces allocations when we want to append
1448 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -07001449 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -08001450
Brian Silvermanf51499a2020-09-21 12:49:08 -07001451 const size_t count =
1452 decoder_->Read(data_.begin() + starting_size, data_.end());
1453 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -08001454 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -08001455 return false;
1456 }
Austin Schuh05b70472020-01-01 17:11:17 -08001457
Brian Smarttea913d42021-12-10 15:02:38 -08001458 total_read_ += count;
1459
Austin Schuh05b70472020-01-01 17:11:17 -08001460 return true;
1461}
1462
Austin Schuhadd6eb32020-11-09 21:24:26 -08001463std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -07001464 SpanReader *span_reader) {
1465 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001466
1467 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001468 if (config_data.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001469 return std::nullopt;
1470 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001471
Austin Schuh5212cad2020-09-09 23:12:09 -07001472 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001473 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -08001474 if (!result.Verify()) {
1475 return std::nullopt;
1476 }
Austin Schuh0e8db662021-07-06 10:43:47 -07001477
Austin Schuhcc2c9a52022-12-12 15:55:13 -08001478 // We only know of busted headers in the versions of the log file header
1479 // *before* the logger_sha1 field was added. At some point before that point,
1480 // the logic to track when a header has been written was rewritten in such a
1481 // way that it can't happen anymore. We've seen some logs where the body
1482 // parses as a header recently, so the simple solution of always looking is
1483 // failing us.
1484 if (FLAGS_workaround_double_headers && !result.message().has_logger_sha1()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001485 while (true) {
1486 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001487 if (maybe_header_data.empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001488 break;
1489 }
1490
1491 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
1492 maybe_header_data);
1493 if (maybe_header.Verify()) {
1494 LOG(WARNING) << "Found duplicate LogFileHeader in "
1495 << span_reader->filename();
1496 ResizeableBuffer header_data_copy;
1497 header_data_copy.resize(maybe_header_data.size());
1498 memcpy(header_data_copy.data(), maybe_header_data.begin(),
1499 header_data_copy.size());
1500 result = SizePrefixedFlatbufferVector<LogFileHeader>(
1501 std::move(header_data_copy));
1502
1503 span_reader->ConsumeMessage();
1504 } else {
1505 break;
1506 }
1507 }
1508 }
Austin Schuhe09beb12020-12-11 20:04:27 -08001509 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001510}
1511
Austin Schuh0e8db662021-07-06 10:43:47 -07001512std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
1513 std::string_view filename) {
1514 SpanReader span_reader(filename);
1515 return ReadHeader(&span_reader);
1516}
1517
Austin Schuhadd6eb32020-11-09 21:24:26 -08001518std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -08001519 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -07001520 SpanReader span_reader(filename);
1521 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
1522 for (size_t i = 0; i < n + 1; ++i) {
1523 data_span = span_reader.ReadMessage();
1524
1525 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001526 if (data_span.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001527 return std::nullopt;
1528 }
Austin Schuh5212cad2020-09-09 23:12:09 -07001529 }
1530
Brian Silverman354697a2020-09-22 21:06:32 -07001531 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001532 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -08001533 if (!result.Verify()) {
1534 return std::nullopt;
1535 }
1536 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -07001537}
1538
Austin Schuh05b70472020-01-01 17:11:17 -08001539MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -07001540 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -08001541 raw_log_file_header_(
1542 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001543 set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
1544 set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
1545
Austin Schuh0e8db662021-07-06 10:43:47 -07001546 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
1547 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -08001548
1549 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -07001550 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -08001551
Austin Schuh0e8db662021-07-06 10:43:47 -07001552 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -08001553
Austin Schuh5b728b72021-06-16 14:57:15 -07001554 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
1555
Brian Smarttea913d42021-12-10 15:02:38 -08001556 total_verified_before_ = span_reader_.TotalConsumed();
1557
Austin Schuhcde938c2020-02-02 17:30:07 -08001558 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -08001559 FLAGS_max_out_of_order > 0
1560 ? chrono::duration_cast<chrono::nanoseconds>(
1561 chrono::duration<double>(FLAGS_max_out_of_order))
1562 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -08001563
1564 VLOG(1) << "Opened " << filename << " as node "
1565 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -08001566}
1567
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001568std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001569 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001570 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001571 if (is_corrupted()) {
1572 LOG(ERROR) << "Total corrupted volumes: before = "
1573 << total_verified_before_
1574 << " | corrupted = " << total_corrupted_
1575 << " | during = " << total_verified_during_
1576 << " | after = " << total_verified_after_ << std::endl;
1577 }
1578
1579 if (span_reader_.IsIncomplete()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001580 LOG(ERROR) << "Unable to access some messages in " << filename() << " : "
1581 << span_reader_.TotalRead() << " bytes read, "
Brian Smarttea913d42021-12-10 15:02:38 -08001582 << span_reader_.TotalConsumed() << " bytes usable."
1583 << std::endl;
1584 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001585 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -08001586 }
1587
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001588 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
Brian Smarttea913d42021-12-10 15:02:38 -08001589
1590 if (crash_on_corrupt_message_flag_) {
1591 CHECK(msg.Verify()) << "Corrupted message at offset "
Austin Schuh60e77942022-05-16 17:48:24 -07001592 << total_verified_before_ << " found within "
1593 << filename()
Brian Smarttea913d42021-12-10 15:02:38 -08001594 << "; set --nocrash_on_corrupt_message to see summary;"
1595 << " also set --ignore_corrupt_messages to process"
1596 << " anyway";
1597
1598 } else if (!msg.Verify()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001599 LOG(ERROR) << "Corrupted message at offset " << total_verified_before_
Brian Smarttea913d42021-12-10 15:02:38 -08001600 << " from " << filename() << std::endl;
1601
1602 total_corrupted_ += msg_data.size();
1603
1604 while (true) {
1605 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1606
James Kuszmaul9776b392023-01-14 14:08:08 -08001607 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001608 if (!ignore_corrupt_messages_flag_) {
1609 LOG(ERROR) << "Total corrupted volumes: before = "
1610 << total_verified_before_
1611 << " | corrupted = " << total_corrupted_
1612 << " | during = " << total_verified_during_
1613 << " | after = " << total_verified_after_ << std::endl;
1614
1615 if (span_reader_.IsIncomplete()) {
1616 LOG(ERROR) << "Unable to access some messages in " << filename()
1617 << " : " << span_reader_.TotalRead() << " bytes read, "
1618 << span_reader_.TotalConsumed() << " bytes usable."
1619 << std::endl;
1620 }
1621 return nullptr;
1622 }
1623 break;
1624 }
1625
1626 SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
1627
1628 if (!next_msg.Verify()) {
1629 total_corrupted_ += msg_data.size();
1630 total_verified_during_ += total_verified_after_;
1631 total_verified_after_ = 0;
1632
1633 } else {
1634 total_verified_after_ += msg_data.size();
1635 if (ignore_corrupt_messages_flag_) {
1636 msg = next_msg;
1637 break;
1638 }
1639 }
1640 }
1641 }
1642
1643 if (is_corrupted()) {
1644 total_verified_after_ += msg_data.size();
1645 } else {
1646 total_verified_before_ += msg_data.size();
1647 }
Austin Schuh05b70472020-01-01 17:11:17 -08001648
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001649 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -07001650
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001651 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001652
1653 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -08001654
1655 if (VLOG_IS_ON(3)) {
1656 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
1657 } else if (VLOG_IS_ON(2)) {
1658 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
1659 msg_copy.mutable_message()->clear_data();
1660 VLOG(2) << "Read from " << filename() << " data "
1661 << FlatbufferToJson(msg_copy);
1662 }
1663
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001664 return result;
1665}
1666
1667std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
1668 const MessageHeader &message) {
1669 const size_t data_size = message.has_data() ? message.data()->size() : 0;
1670
1671 UnpackedMessageHeader *const unpacked_message =
1672 reinterpret_cast<UnpackedMessageHeader *>(
1673 malloc(sizeof(UnpackedMessageHeader) + data_size +
1674 kChannelDataAlignment - 1));
1675
1676 CHECK(message.has_channel_index());
1677 CHECK(message.has_monotonic_sent_time());
1678
1679 absl::Span<uint8_t> span;
1680 if (data_size > 0) {
1681 span =
1682 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
1683 &unpacked_message->actual_data[0], data_size)),
1684 data_size);
1685 }
1686
Austin Schuh826e6ce2021-11-18 20:33:10 -08001687 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001688 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001689 monotonic_remote_time = aos::monotonic_clock::time_point(
1690 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001691 }
1692 std::optional<realtime_clock::time_point> realtime_remote_time;
1693 if (message.has_realtime_remote_time()) {
1694 realtime_remote_time = realtime_clock::time_point(
1695 chrono::nanoseconds(message.realtime_remote_time()));
1696 }
1697
1698 std::optional<uint32_t> remote_queue_index;
1699 if (message.has_remote_queue_index()) {
1700 remote_queue_index = message.remote_queue_index();
1701 }
1702
James Kuszmaul9776b392023-01-14 14:08:08 -08001703 new (unpacked_message) UnpackedMessageHeader(
1704 message.channel_index(),
1705 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001706 chrono::nanoseconds(message.monotonic_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001707 realtime_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001708 chrono::nanoseconds(message.realtime_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001709 message.queue_index(), monotonic_remote_time, realtime_remote_time,
1710 remote_queue_index,
1711 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001712 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001713 message.has_monotonic_timestamp_time(), span);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001714
1715 if (data_size > 0) {
1716 memcpy(span.data(), message.data()->data(), data_size);
1717 }
1718
1719 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
1720 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -08001721}
1722
Austin Schuhc41603c2020-10-11 16:17:37 -07001723PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001724 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001725 if (parts_.parts.size() >= 2) {
1726 next_message_reader_.emplace(parts_.parts[1]);
1727 }
Austin Schuh48507722021-07-17 17:29:24 -07001728 ComputeBootCounts();
1729}
1730
1731void PartsMessageReader::ComputeBootCounts() {
1732 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
1733 std::nullopt);
1734
1735 // We have 3 vintages of log files with different amounts of information.
1736 if (log_file_header()->has_boot_uuids()) {
1737 // The new hotness with the boots explicitly listed out. We can use the log
1738 // file header to compute the boot count of all relevant nodes.
1739 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
1740 size_t node_index = 0;
1741 for (const flatbuffers::String *boot_uuid :
1742 *log_file_header()->boot_uuids()) {
1743 CHECK(parts_.boots);
1744 if (boot_uuid->size() != 0) {
1745 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
1746 if (it != parts_.boots->boot_count_map.end()) {
1747 boot_counts_[node_index] = it->second;
1748 }
1749 } else if (parts().boots->boots[node_index].size() == 1u) {
1750 boot_counts_[node_index] = 0;
1751 }
1752 ++node_index;
1753 }
1754 } else {
1755 // Older multi-node logs which are guarenteed to have UUIDs logged, or
1756 // single node log files with boot UUIDs in the header. We only know how to
1757 // order certain boots in certain circumstances.
1758 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
1759 for (size_t node_index = 0; node_index < boot_counts_.size();
1760 ++node_index) {
1761 CHECK(parts_.boots);
1762 if (parts().boots->boots[node_index].size() == 1u) {
1763 boot_counts_[node_index] = 0;
1764 }
1765 }
1766 } else {
1767 // Really old single node logs without any UUIDs. They can't reboot.
1768 CHECK_EQ(boot_counts_.size(), 1u);
1769 boot_counts_[0] = 0u;
1770 }
1771 }
1772}
Austin Schuhc41603c2020-10-11 16:17:37 -07001773
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001774std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -07001775 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001776 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -07001777 message_reader_.ReadMessage();
1778 if (message) {
1779 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001780 const monotonic_clock::time_point monotonic_sent_time =
1781 message->monotonic_sent_time;
1782
1783 // TODO(austin): Does this work with startup? Might need to use the
1784 // start time.
1785 // TODO(austin): Does this work with startup when we don't know the
1786 // remote start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -08001787 if (monotonic_sent_time >
1788 parts_.monotonic_start_time + max_out_of_order_duration()) {
1789 after_start_ = true;
1790 }
1791 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -08001792 CHECK_GE(monotonic_sent_time,
1793 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -08001794 << ": Max out of order of " << max_out_of_order_duration().count()
1795 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -08001796 << parts_.monotonic_start_time << " currently reading "
1797 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -08001798 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001799 return message;
1800 }
1801 NextLog();
1802 }
Austin Schuh32f68492020-11-08 21:45:51 -08001803 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001804 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -07001805}
1806
1807void PartsMessageReader::NextLog() {
1808 if (next_part_index_ == parts_.parts.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001809 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -07001810 done_ = true;
1811 return;
1812 }
Brian Silvermanfee16972021-09-14 12:06:38 -07001813 CHECK(next_message_reader_);
1814 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -07001815 ComputeBootCounts();
Brian Silvermanfee16972021-09-14 12:06:38 -07001816 if (next_part_index_ + 1 < parts_.parts.size()) {
1817 next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
1818 } else {
1819 next_message_reader_.reset();
1820 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001821 ++next_part_index_;
1822}
1823
Austin Schuh1be0ce42020-11-29 22:43:26 -08001824bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001825 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001826
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001827 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001828 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001829 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001830 return false;
1831 }
1832
1833 if (this->channel_index < m2.channel_index) {
1834 return true;
1835 } else if (this->channel_index > m2.channel_index) {
1836 return false;
1837 }
1838
1839 return this->queue_index < m2.queue_index;
1840}
1841
1842bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001843bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001844 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001845
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001846 return timestamp.time == m2.timestamp.time &&
1847 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001848}
Austin Schuh1be0ce42020-11-29 22:43:26 -08001849
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001850std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
1851 os << "{.channel_index=" << m.channel_index
1852 << ", .monotonic_sent_time=" << m.monotonic_sent_time
1853 << ", .realtime_sent_time=" << m.realtime_sent_time
1854 << ", .queue_index=" << m.queue_index;
1855 if (m.monotonic_remote_time) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001856 os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001857 }
1858 os << ", .realtime_remote_time=";
1859 PrintOptionalOrNull(&os, m.realtime_remote_time);
1860 os << ", .remote_queue_index=";
1861 PrintOptionalOrNull(&os, m.remote_queue_index);
1862 if (m.has_monotonic_timestamp_time) {
1863 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1864 }
Austin Schuh22cf7862022-09-19 19:09:42 -07001865 os << "}";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001866 return os;
1867}
1868
Austin Schuh1be0ce42020-11-29 22:43:26 -08001869std::ostream &operator<<(std::ostream &os, const Message &m) {
1870 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001871 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001872 if (m.data != nullptr) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001873 if (m.data->remote_queue_index.has_value()) {
1874 os << ", .remote_queue_index=" << *m.data->remote_queue_index;
1875 }
1876 if (m.data->monotonic_remote_time.has_value()) {
1877 os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
1878 }
Austin Schuhfb1b3292021-11-16 21:20:15 -08001879 os << ", .data=" << m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -08001880 }
1881 os << "}";
1882 return os;
1883}
1884
1885std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
1886 os << "{.channel_index=" << m.channel_index
1887 << ", .queue_index=" << m.queue_index
1888 << ", .monotonic_event_time=" << m.monotonic_event_time
1889 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -07001890 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001891 os << ", .remote_queue_index=" << m.remote_queue_index;
1892 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001893 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001894 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
1895 }
1896 if (m.realtime_remote_time != realtime_clock::min_time) {
1897 os << ", .realtime_remote_time=" << m.realtime_remote_time;
1898 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001899 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001900 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1901 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001902 if (m.data != nullptr) {
1903 os << ", .data=" << *m.data;
Austin Schuh22cf7862022-09-19 19:09:42 -07001904 } else {
1905 os << ", .data=nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08001906 }
1907 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -08001908 return os;
1909}
1910
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001911LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001912 : parts_message_reader_(log_parts),
1913 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
1914}
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001915
1916Message *LogPartsSorter::Front() {
1917 // Queue up data until enough data has been queued that the front message is
1918 // sorted enough to be safe to pop. This may do nothing, so we should make
1919 // sure the nothing path is checked quickly.
1920 if (sorted_until() != monotonic_clock::max_time) {
1921 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -07001922 if (!messages_.empty() &&
1923 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -08001924 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001925 break;
1926 }
1927
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001928 std::shared_ptr<UnpackedMessageHeader> m =
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001929 parts_message_reader_.ReadMessage();
1930 // No data left, sorted forever, work through what is left.
1931 if (!m) {
1932 sorted_until_ = monotonic_clock::max_time;
1933 break;
1934 }
1935
Austin Schuh48507722021-07-17 17:29:24 -07001936 size_t monotonic_timestamp_boot = 0;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001937 if (m->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -07001938 monotonic_timestamp_boot = parts().logger_boot_count;
1939 }
1940 size_t monotonic_remote_boot = 0xffffff;
1941
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001942 if (m->monotonic_remote_time.has_value()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001943 const Node *node =
1944 parts().config->nodes()->Get(source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001945
Austin Schuh48507722021-07-17 17:29:24 -07001946 std::optional<size_t> boot = parts_message_reader_.boot_count(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001947 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001948 CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
Austin Schuh60e77942022-05-16 17:48:24 -07001949 << ", with index " << source_node_index_[m->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -07001950 monotonic_remote_boot = *boot;
1951 }
1952
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001953 messages_.insert(
1954 Message{.channel_index = m->channel_index,
1955 .queue_index = BootQueueIndex{.boot = parts().boot_count,
1956 .index = m->queue_index},
1957 .timestamp = BootTimestamp{.boot = parts().boot_count,
1958 .time = m->monotonic_sent_time},
1959 .monotonic_remote_boot = monotonic_remote_boot,
1960 .monotonic_timestamp_boot = monotonic_timestamp_boot,
1961 .data = std::move(m)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001962
1963 // Now, update sorted_until_ to match the new message.
1964 if (parts_message_reader_.newest_timestamp() >
1965 monotonic_clock::min_time +
1966 parts_message_reader_.max_out_of_order_duration()) {
1967 sorted_until_ = parts_message_reader_.newest_timestamp() -
1968 parts_message_reader_.max_out_of_order_duration();
1969 } else {
1970 sorted_until_ = monotonic_clock::min_time;
1971 }
1972 }
1973 }
1974
1975 // Now that we have enough data queued, return a pointer to the oldest piece
1976 // of data if it exists.
1977 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -08001978 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001979 return nullptr;
1980 }
1981
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001982 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -08001983 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001984 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001985 return &(*messages_.begin());
1986}
1987
1988void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
1989
1990std::string LogPartsSorter::DebugString() const {
1991 std::stringstream ss;
1992 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001993 int count = 0;
1994 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001995 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001996 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
1997 ss << m << "\n";
1998 } else if (no_dots) {
1999 ss << "...\n";
2000 no_dots = false;
2001 }
2002 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08002003 }
2004 ss << "] <- " << parts_message_reader_.filename();
2005 return ss.str();
2006}
2007
Austin Schuhd2f96102020-12-01 20:27:29 -08002008NodeMerger::NodeMerger(std::vector<LogParts> parts) {
2009 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -07002010 // Enforce that we are sorting things only from a single node from a single
2011 // boot.
2012 const std::string_view part0_node = parts[0].node;
2013 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -08002014 for (size_t i = 1; i < parts.size(); ++i) {
2015 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -07002016 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
2017 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -08002018 }
Austin Schuh715adc12021-06-29 22:07:39 -07002019
2020 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
2021
Austin Schuhd2f96102020-12-01 20:27:29 -08002022 for (LogParts &part : parts) {
2023 parts_sorters_.emplace_back(std::move(part));
2024 }
2025
Austin Schuhd2f96102020-12-01 20:27:29 -08002026 monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh9dc42612021-09-20 20:41:29 -07002027 realtime_start_time_ = realtime_clock::min_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002028 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -07002029 // We want to capture the earliest meaningful start time here. The start
2030 // time defaults to min_time when there's no meaningful value to report, so
2031 // let's ignore those.
Austin Schuh9dc42612021-09-20 20:41:29 -07002032 if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time) {
2033 bool accept = false;
2034 // We want to prioritize start times from the logger node. Really, we
2035 // want to prioritize start times with a valid realtime_clock time. So,
2036 // if we have a start time without a RT clock, prefer a start time with a
2037 // RT clock, even it if is later.
2038 if (parts_sorter.realtime_start_time() != realtime_clock::min_time) {
2039 // We've got a good one. See if the current start time has a good RT
2040 // clock, or if we should use this one instead.
2041 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
2042 accept = true;
2043 } else if (realtime_start_time_ == realtime_clock::min_time) {
2044 // The previous start time doesn't have a good RT time, so it is very
2045 // likely the start time from a remote part file. We just found a
2046 // better start time with a real RT time, so switch to that instead.
2047 accept = true;
2048 }
2049 } else if (realtime_start_time_ == realtime_clock::min_time) {
2050 // We don't have a RT time, so take the oldest.
2051 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
2052 accept = true;
2053 }
2054 }
2055
2056 if (accept) {
2057 monotonic_start_time_ = parts_sorter.monotonic_start_time();
2058 realtime_start_time_ = parts_sorter.realtime_start_time();
2059 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002060 }
2061 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -07002062
2063 // If there was no meaningful start time reported, just use min_time.
2064 if (monotonic_start_time_ == monotonic_clock::max_time) {
2065 monotonic_start_time_ = monotonic_clock::min_time;
2066 realtime_start_time_ = realtime_clock::min_time;
2067 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002068}
Austin Schuh8f52ed52020-11-30 23:12:39 -08002069
Austin Schuh0ca51f32020-12-25 21:51:45 -08002070std::vector<const LogParts *> NodeMerger::Parts() const {
2071 std::vector<const LogParts *> p;
2072 p.reserve(parts_sorters_.size());
2073 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
2074 p.emplace_back(&parts_sorter.parts());
2075 }
2076 return p;
2077}
2078
Austin Schuh8f52ed52020-11-30 23:12:39 -08002079Message *NodeMerger::Front() {
2080 // Return the current Front if we have one, otherwise go compute one.
2081 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -08002082 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002083 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -08002084 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -08002085 }
2086
2087 // Otherwise, do a simple search for the oldest message, deduplicating any
2088 // duplicates.
2089 Message *oldest = nullptr;
2090 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002091 for (LogPartsSorter &parts_sorter : parts_sorters_) {
2092 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -08002093 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002094 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08002095 continue;
2096 }
2097 if (oldest == nullptr || *m < *oldest) {
2098 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -08002099 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -08002100 } else if (*m == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002101 // Found a duplicate. If there is a choice, we want the one which has
2102 // the timestamp time.
2103 if (!m->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08002104 parts_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002105 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08002106 current_->PopFront();
2107 current_ = &parts_sorter;
2108 oldest = m;
2109 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002110 CHECK_EQ(m->data->monotonic_timestamp_time,
2111 oldest->data->monotonic_timestamp_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08002112 parts_sorter.PopFront();
2113 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08002114 }
2115
2116 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -08002117 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08002118 }
2119
Austin Schuhb000de62020-12-03 22:00:40 -08002120 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002121 CHECK_GE(oldest->timestamp.time, last_message_time_);
2122 last_message_time_ = oldest->timestamp.time;
Austin Schuh5dd22842021-11-17 16:09:39 -08002123 monotonic_oldest_time_ =
2124 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08002125 } else {
2126 last_message_time_ = monotonic_clock::max_time;
2127 }
2128
Austin Schuh8f52ed52020-11-30 23:12:39 -08002129 // Return the oldest message found. This will be nullptr if nothing was
2130 // found, indicating there is nothing left.
2131 return oldest;
2132}
2133
2134void NodeMerger::PopFront() {
2135 CHECK(current_ != nullptr) << "Popping before calling Front()";
2136 current_->PopFront();
2137 current_ = nullptr;
2138}
2139
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002140BootMerger::BootMerger(std::vector<LogParts> files) {
2141 std::vector<std::vector<LogParts>> boots;
2142
2143 // Now, we need to split things out by boot.
2144 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002145 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002146 if (boot_count + 1 > boots.size()) {
2147 boots.resize(boot_count + 1);
2148 }
2149 boots[boot_count].emplace_back(std::move(files[i]));
2150 }
2151
2152 node_mergers_.reserve(boots.size());
2153 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07002154 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002155 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -07002156 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002157 }
2158 node_mergers_.emplace_back(
2159 std::make_unique<NodeMerger>(std::move(boots[i])));
2160 }
2161}
2162
2163Message *BootMerger::Front() {
2164 Message *result = node_mergers_[index_]->Front();
2165
2166 if (result != nullptr) {
2167 return result;
2168 }
2169
2170 if (index_ + 1u == node_mergers_.size()) {
2171 // At the end of the last node merger, just return.
2172 return nullptr;
2173 } else {
2174 ++index_;
2175 return Front();
2176 }
2177}
2178
2179void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
2180
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002181std::vector<const LogParts *> BootMerger::Parts() const {
2182 std::vector<const LogParts *> results;
2183 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
2184 std::vector<const LogParts *> node_parts = node_merger->Parts();
2185
2186 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
2187 std::make_move_iterator(node_parts.end()));
2188 }
2189
2190 return results;
2191}
2192
Austin Schuhd2f96102020-12-01 20:27:29 -08002193TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002194 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -08002195 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002196 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002197 if (!configuration_) {
2198 configuration_ = part->config;
2199 } else {
2200 CHECK_EQ(configuration_.get(), part->config.get());
2201 }
2202 }
2203 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -08002204 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
2205 // pretty simple.
2206 if (configuration::MultiNode(config)) {
2207 nodes_data_.resize(config->nodes()->size());
2208 const Node *my_node = config->nodes()->Get(node());
2209 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
2210 const Node *node = config->nodes()->Get(node_index);
2211 NodeData *node_data = &nodes_data_[node_index];
2212 node_data->channels.resize(config->channels()->size());
2213 // We should save the channel if it is delivered to the node represented
2214 // by the NodeData, but not sent by that node. That combo means it is
2215 // forwarded.
2216 size_t channel_index = 0;
2217 node_data->any_delivered = false;
2218 for (const Channel *channel : *config->channels()) {
2219 node_data->channels[channel_index].delivered =
2220 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08002221 configuration::ChannelIsSendableOnNode(channel, my_node) &&
2222 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08002223 node_data->any_delivered = node_data->any_delivered ||
2224 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08002225 if (node_data->channels[channel_index].delivered) {
2226 const Connection *connection =
2227 configuration::ConnectionToNode(channel, node);
2228 node_data->channels[channel_index].time_to_live =
2229 chrono::nanoseconds(connection->time_to_live());
2230 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002231 ++channel_index;
2232 }
2233 }
2234
2235 for (const Channel *channel : *config->channels()) {
2236 source_node_.emplace_back(configuration::GetNodeIndex(
2237 config, channel->source_node()->string_view()));
2238 }
2239 }
2240}
2241
2242void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002243 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08002244 CHECK_NE(timestamp_mapper->node(), node());
2245 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
2246
2247 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002248 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08002249 // we could needlessly save data.
2250 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08002251 VLOG(1) << "Registering on node " << node() << " for peer node "
2252 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08002253 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
2254
2255 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07002256
2257 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08002258 }
2259}
2260
Austin Schuh79b30942021-01-24 22:32:21 -08002261void TimestampMapper::QueueMessage(Message *m) {
Austin Schuh60e77942022-05-16 17:48:24 -07002262 matched_messages_.emplace_back(
2263 TimestampedMessage{.channel_index = m->channel_index,
2264 .queue_index = m->queue_index,
2265 .monotonic_event_time = m->timestamp,
2266 .realtime_event_time = m->data->realtime_sent_time,
2267 .remote_queue_index = BootQueueIndex::Invalid(),
2268 .monotonic_remote_time = BootTimestamp::min_time(),
2269 .realtime_remote_time = realtime_clock::min_time,
2270 .monotonic_timestamp_time = BootTimestamp::min_time(),
2271 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08002272}
2273
2274TimestampedMessage *TimestampMapper::Front() {
2275 // No need to fetch anything new. A previous message still exists.
2276 switch (first_message_) {
2277 case FirstMessage::kNeedsUpdate:
2278 break;
2279 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08002280 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002281 case FirstMessage::kNullptr:
2282 return nullptr;
2283 }
2284
Austin Schuh79b30942021-01-24 22:32:21 -08002285 if (matched_messages_.empty()) {
2286 if (!QueueMatched()) {
2287 first_message_ = FirstMessage::kNullptr;
2288 return nullptr;
2289 }
2290 }
2291 first_message_ = FirstMessage::kInMessage;
2292 return &matched_messages_.front();
2293}
2294
2295bool TimestampMapper::QueueMatched() {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002296 MatchResult result = MatchResult::kEndOfFile;
2297 do {
2298 result = MaybeQueueMatched();
2299 } while (result == MatchResult::kSkipped);
2300 return result == MatchResult::kQueued;
2301}
2302
2303bool TimestampMapper::CheckReplayChannelsAndMaybePop(
2304 const TimestampedMessage & /*message*/) {
2305 if (replay_channels_callback_ &&
2306 !replay_channels_callback_(matched_messages_.back())) {
2307 matched_messages_.pop_back();
2308 return true;
2309 }
2310 return false;
2311}
2312
2313TimestampMapper::MatchResult TimestampMapper::MaybeQueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08002314 if (nodes_data_.empty()) {
2315 // Simple path. We are single node, so there are no timestamps to match!
2316 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002317 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002318 if (!m) {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002319 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002320 }
Austin Schuh79b30942021-01-24 22:32:21 -08002321 // Enqueue this message into matched_messages_ so we have a place to
2322 // associate remote timestamps, and return it.
2323 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08002324
Austin Schuh79b30942021-01-24 22:32:21 -08002325 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2326 last_message_time_ = matched_messages_.back().monotonic_event_time;
2327
2328 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002329 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08002330 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002331 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2332 return MatchResult::kSkipped;
2333 }
2334 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002335 }
2336
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002337 // We need to only add messages to the list so they get processed for
2338 // messages which are delivered. Reuse the flow below which uses messages_
2339 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08002340 if (messages_.empty()) {
2341 if (!Queue()) {
2342 // Found nothing to add, we are out of data!
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002343 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002344 }
2345
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002346 // Now that it has been added (and cannibalized), forget about it
2347 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002348 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002349 }
2350
2351 Message *m = &(messages_.front());
2352
2353 if (source_node_[m->channel_index] == node()) {
2354 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08002355 QueueMessage(m);
2356 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2357 last_message_time_ = matched_messages_.back().monotonic_event_time;
2358 messages_.pop_front();
2359 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002360 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2361 return MatchResult::kSkipped;
2362 }
2363 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002364 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002365 // Got a timestamp, find the matching remote data, match it, and return
2366 // it.
Austin Schuhd2f96102020-12-01 20:27:29 -08002367 Message data = MatchingMessageFor(*m);
2368
2369 // Return the data from the remote. The local message only has timestamp
2370 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08002371 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08002372 .channel_index = m->channel_index,
2373 .queue_index = m->queue_index,
2374 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002375 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07002376 .remote_queue_index =
2377 BootQueueIndex{.boot = m->monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002378 .index = m->data->remote_queue_index.value()},
2379 .monotonic_remote_time = {m->monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002380 m->data->monotonic_remote_time.value()},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002381 .realtime_remote_time = m->data->realtime_remote_time.value(),
2382 .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
2383 m->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08002384 .data = std::move(data.data)});
2385 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2386 last_message_time_ = matched_messages_.back().monotonic_event_time;
2387 // Since messages_ holds the data, drop it.
2388 messages_.pop_front();
2389 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002390 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2391 return MatchResult::kSkipped;
2392 }
2393 return MatchResult::kQueued;
Austin Schuh79b30942021-01-24 22:32:21 -08002394 }
2395}
2396
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002397void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08002398 while (last_message_time_ <= queue_time) {
2399 if (!QueueMatched()) {
2400 return;
2401 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002402 }
2403}
2404
Austin Schuhe639ea12021-01-25 13:00:22 -08002405void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002406 // Note: queueing for time doesn't really work well across boots. So we
2407 // just assume that if you are using this, you only care about the current
2408 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002409 //
2410 // TODO(austin): Is that the right concept?
2411 //
Austin Schuhe639ea12021-01-25 13:00:22 -08002412 // Make sure we have something queued first. This makes the end time
2413 // calculation simpler, and is typically what folks want regardless.
2414 if (matched_messages_.empty()) {
2415 if (!QueueMatched()) {
2416 return;
2417 }
2418 }
2419
2420 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002421 std::max(monotonic_start_time(
2422 matched_messages_.front().monotonic_event_time.boot),
2423 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08002424 time_estimation_buffer;
2425
2426 // Place sorted messages on the list until we have
2427 // --time_estimation_buffer_seconds seconds queued up (but queue at least
2428 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002429 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08002430 if (!QueueMatched()) {
2431 return;
2432 }
2433 }
2434}
2435
Austin Schuhd2f96102020-12-01 20:27:29 -08002436void TimestampMapper::PopFront() {
2437 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08002438 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002439 first_message_ = FirstMessage::kNeedsUpdate;
2440
Austin Schuh79b30942021-01-24 22:32:21 -08002441 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002442}
2443
2444Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002445 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002446 CHECK_NOTNULL(message.data);
2447 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07002448 const BootQueueIndex remote_queue_index =
2449 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002450 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08002451
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002452 CHECK(message.data->monotonic_remote_time.has_value());
2453 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08002454
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002455 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07002456 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002457 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002458 const realtime_clock::time_point realtime_remote_time =
2459 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002460
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002461 TimestampMapper *peer =
2462 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08002463
2464 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002465 // asked to pull a timestamp from a peer which doesn't exist, return an
2466 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08002467 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002468 // TODO(austin): Make sure the tests hit all these paths with a boot count
2469 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002470 return Message{.channel_index = message.channel_index,
2471 .queue_index = remote_queue_index,
2472 .timestamp = monotonic_remote_time,
2473 .monotonic_remote_boot = 0xffffff,
2474 .monotonic_timestamp_boot = 0xffffff,
2475 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08002476 }
2477
2478 // The queue which will have the matching data, if available.
2479 std::deque<Message> *data_queue =
2480 &peer->nodes_data_[node()].channels[message.channel_index].messages;
2481
Austin Schuh79b30942021-01-24 22:32:21 -08002482 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08002483
2484 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002485 return Message{.channel_index = message.channel_index,
2486 .queue_index = remote_queue_index,
2487 .timestamp = monotonic_remote_time,
2488 .monotonic_remote_boot = 0xffffff,
2489 .monotonic_timestamp_boot = 0xffffff,
2490 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002491 }
2492
Austin Schuhd2f96102020-12-01 20:27:29 -08002493 if (remote_queue_index < data_queue->front().queue_index ||
2494 remote_queue_index > data_queue->back().queue_index) {
Austin Schuh60e77942022-05-16 17:48:24 -07002495 return Message{.channel_index = message.channel_index,
2496 .queue_index = remote_queue_index,
2497 .timestamp = monotonic_remote_time,
2498 .monotonic_remote_boot = 0xffffff,
2499 .monotonic_timestamp_boot = 0xffffff,
2500 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002501 }
2502
Austin Schuh993ccb52020-12-12 15:59:32 -08002503 // The algorithm below is constant time with some assumptions. We need there
2504 // to be no missing messages in the data stream. This also assumes a queue
2505 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07002506 if (data_queue->back().queue_index.boot ==
2507 data_queue->front().queue_index.boot &&
2508 (data_queue->back().queue_index.index -
2509 data_queue->front().queue_index.index + 1u ==
2510 data_queue->size())) {
2511 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08002512 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07002513 //
2514 // TODO(austin): Move if not reliable.
2515 Message result = (*data_queue)[remote_queue_index.index -
2516 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08002517
2518 CHECK_EQ(result.timestamp, monotonic_remote_time)
2519 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh6a7358f2021-11-18 22:40:40 -08002520 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08002521 << ": Queue index matches, but timestamp doesn't. Please investigate!";
2522 // Now drop the data off the front. We have deduplicated timestamps, so we
2523 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07002524 data_queue->erase(
2525 data_queue->begin(),
2526 data_queue->begin() +
2527 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08002528 return result;
2529 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07002530 // TODO(austin): Binary search.
2531 auto it = std::find_if(
2532 data_queue->begin(), data_queue->end(),
2533 [remote_queue_index,
2534 remote_boot = monotonic_remote_time.boot](const Message &m) {
2535 return m.queue_index == remote_queue_index &&
2536 m.timestamp.boot == remote_boot;
2537 });
Austin Schuh993ccb52020-12-12 15:59:32 -08002538 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002539 return Message{.channel_index = message.channel_index,
2540 .queue_index = remote_queue_index,
2541 .timestamp = monotonic_remote_time,
2542 .monotonic_remote_boot = 0xffffff,
2543 .monotonic_timestamp_boot = 0xffffff,
2544 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08002545 }
2546
2547 Message result = std::move(*it);
2548
2549 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002550 << ": Queue index matches, but timestamp doesn't. Please "
2551 "investigate!";
2552 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
2553 << ": Queue index matches, but timestamp doesn't. Please "
2554 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08002555
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08002556 // Erase everything up to this message. We want to keep 1 message in the
2557 // queue so we can handle reliable messages forwarded across boots.
2558 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08002559
2560 return result;
2561 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002562}
2563
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002564void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002565 if (queued_until_ > t) {
2566 return;
2567 }
2568 while (true) {
2569 if (!messages_.empty() && messages_.back().timestamp > t) {
2570 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
2571 return;
2572 }
2573
2574 if (!Queue()) {
2575 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002576 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08002577 return;
2578 }
2579
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002580 // Now that it has been added (and cannibalized), forget about it
2581 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002582 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002583 }
2584}
2585
2586bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002587 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002588 if (m == nullptr) {
2589 return false;
2590 }
2591 for (NodeData &node_data : nodes_data_) {
2592 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07002593 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08002594 if (node_data.channels[m->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08002595 // If we have data but no timestamps (logs where the timestamps didn't get
2596 // logged are classic), we can grow this indefinitely. We don't need to
2597 // keep anything that is older than the last message returned.
2598
2599 // We have the time on the source node.
2600 // We care to wait until we have the time on the destination node.
2601 std::deque<Message> &messages =
2602 node_data.channels[m->channel_index].messages;
2603 // Max delay over the network is the TTL, so let's take the queue time and
2604 // add TTL to it. Don't forget any messages which are reliable until
2605 // someone can come up with a good reason to forget those too.
2606 if (node_data.channels[m->channel_index].time_to_live >
2607 chrono::nanoseconds(0)) {
2608 // We need to make *some* assumptions about network delay for this to
2609 // work. We want to only look at the RX side. This means we need to
2610 // track the last time a message was popped from any channel from the
2611 // node sending this message, and compare that to the max time we expect
2612 // that a message will take to be delivered across the network. This
2613 // assumes that messages are popped in time order as a proxy for
2614 // measuring the distributed time at this layer.
2615 //
2616 // Leave at least 1 message in here so we can handle reboots and
2617 // messages getting sent twice.
2618 while (messages.size() > 1u &&
2619 messages.begin()->timestamp +
2620 node_data.channels[m->channel_index].time_to_live +
2621 chrono::duration_cast<chrono::nanoseconds>(
2622 chrono::duration<double>(FLAGS_max_network_delay)) <
2623 last_popped_message_time_) {
2624 messages.pop_front();
2625 }
2626 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002627 node_data.channels[m->channel_index].messages.emplace_back(*m);
2628 }
2629 }
2630
2631 messages_.emplace_back(std::move(*m));
2632 return true;
2633}
2634
2635std::string TimestampMapper::DebugString() const {
2636 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07002637 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002638 for (const Message &message : messages_) {
2639 ss << " " << message << "\n";
2640 }
2641 ss << "] queued_until " << queued_until_;
2642 for (const NodeData &ns : nodes_data_) {
2643 if (ns.peer == nullptr) continue;
2644 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
2645 size_t channel_index = 0;
2646 for (const NodeData::ChannelData &channel_data :
2647 ns.peer->nodes_data_[node()].channels) {
2648 if (channel_data.messages.empty()) {
2649 continue;
2650 }
Austin Schuhb000de62020-12-03 22:00:40 -08002651
Austin Schuhd2f96102020-12-01 20:27:29 -08002652 ss << " channel " << channel_index << " [\n";
2653 for (const Message &m : channel_data.messages) {
2654 ss << " " << m << "\n";
2655 }
2656 ss << " ]\n";
2657 ++channel_index;
2658 }
2659 ss << "] queued_until " << ns.peer->queued_until_;
2660 }
2661 return ss.str();
2662}
2663
Austin Schuhee711052020-08-24 16:06:09 -07002664std::string MaybeNodeName(const Node *node) {
2665 if (node != nullptr) {
2666 return node->name()->str() + " ";
2667 }
2668 return "";
2669}
2670
Brian Silvermanf51499a2020-09-21 12:49:08 -07002671} // namespace aos::logger