blob: 0d10a9986b2f998b1bee5391fc665cd6c34ca622 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Alexei Strots01395492023-03-20 13:59:56 -070010#include <filesystem>
Austin Schuha36c8902019-12-30 18:07:15 -080011
Austin Schuhe4fca832020-03-07 16:58:53 -080012#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070013#include "flatbuffers/flatbuffers.h"
14#include "gflags/gflags.h"
15#include "glog/logging.h"
16
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070018#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080019#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080020#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080021
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070022#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070023#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070024#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070025#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070026#else
27#define ENABLE_LZMA 0
28#endif
29
30#if ENABLE_LZMA
31#include "aos/events/logging/lzma_encoder.h"
32#endif
Austin Schuh86110712022-09-16 15:40:54 -070033#if ENABLE_S3
34#include "aos/events/logging/s3_fetcher.h"
35#endif
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070036
Austin Schuh48d10d62022-10-16 22:19:23 -070037DEFINE_int32(flush_size, 128 * 1024,
Austin Schuha36c8902019-12-30 18:07:15 -080038 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070039DEFINE_double(
40 flush_period, 5.0,
41 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080042
Austin Schuha040c3f2021-02-13 16:09:07 -080043DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080044 max_network_delay, 1.0,
45 "Max time to assume a message takes to cross the network before we are "
46 "willing to drop it from our buffers and assume it didn't make it. "
47 "Increasing this number can increase memory usage depending on the packet "
48 "loss of your network or if the timestamps aren't logged for a message.");
49
50DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080051 max_out_of_order, -1,
52 "If set, this overrides the max out of order duration for a log file.");
53
Austin Schuh0e8db662021-07-06 10:43:47 -070054DEFINE_bool(workaround_double_headers, true,
55 "Some old log files have two headers at the beginning. Use the "
56 "last header as the actual header.");
57
Brian Smarttea913d42021-12-10 15:02:38 -080058DEFINE_bool(crash_on_corrupt_message, true,
59 "When true, MessageReader will crash the first time a message "
60 "with corrupted format is found. When false, the crash will be "
61 "suppressed, and any remaining readable messages will be "
62 "evaluated to present verified vs corrupted stats.");
63
64DEFINE_bool(ignore_corrupt_messages, false,
65 "When true, and crash_on_corrupt_message is false, then any "
66 "corrupt message found by MessageReader be silently ignored, "
67 "providing access to all uncorrupted messages in a logfile.");
68
Alexei Strotsa3194712023-04-21 23:30:50 -070069DECLARE_bool(quiet_sorting);
70
Brian Silvermanf51499a2020-09-21 12:49:08 -070071namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070072namespace {
Austin Schuh05b70472020-01-01 17:11:17 -080073namespace chrono = std::chrono;
74
Alexei Strotscee7b372023-04-21 11:57:54 -070075std::unique_ptr<DataDecoder> ResolveDecoder(std::string_view filename,
76 bool quiet) {
77 static constexpr std::string_view kS3 = "s3:";
78
79 std::unique_ptr<DataDecoder> decoder;
80
81 if (filename.substr(0, kS3.size()) == kS3) {
82#if ENABLE_S3
83 decoder = std::make_unique<S3Fetcher>(filename);
84#else
85 LOG(FATAL) << "Reading files from S3 not supported on this platform";
86#endif
87 } else {
88 decoder = std::make_unique<DummyDecoder>(filename);
89 }
90
91 static constexpr std::string_view kXz = ".xz";
92 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
93 if (filename.substr(filename.size() - kXz.size()) == kXz) {
94#if ENABLE_LZMA
95 decoder = std::make_unique<ThreadedLzmaDecoder>(std::move(decoder), quiet);
96#else
97 (void)quiet;
98 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
99#endif
100 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
101 decoder = std::make_unique<SnappyDecoder>(std::move(decoder));
102 }
103 return decoder;
104}
105
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700106template <typename T>
107void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
108 if (t.has_value()) {
109 *os << *t;
110 } else {
111 *os << "null";
112 }
113}
Philipp Schrader10397952023-06-15 11:43:07 -0700114
115// A dummy LogSink implementation that handles the special case when we create
116// a DetachedBufferWriter when there's no space left on the system. The
117// DetachedBufferWriter frequently dereferences log_sink_, so we want a class
118// here that effectively refuses to do anything meaningful.
119class OutOfDiskSpaceLogSink : public LogSink {
120 public:
121 WriteCode OpenForWrite() override { return WriteCode::kOutOfSpace; }
122 WriteCode Close() override { return WriteCode::kOk; }
123 bool is_open() const override { return false; }
124 WriteResult Write(
125 const absl::Span<const absl::Span<const uint8_t>> &) override {
126 return WriteResult{
127 .code = WriteCode::kOutOfSpace,
128 .messages_written = 0,
129 };
130 }
131 std::string_view name() const override { return "<out_of_disk_space>"; }
132};
133
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700134} // namespace
135
Alexei Strotsbc082d82023-05-03 08:43:42 -0700136DetachedBufferWriter::DetachedBufferWriter(std::unique_ptr<LogSink> log_sink,
137 std::unique_ptr<DataEncoder> encoder)
138 : log_sink_(std::move(log_sink)), encoder_(std::move(encoder)) {
139 CHECK(log_sink_);
140 ran_out_of_space_ = log_sink_->OpenForWrite() == WriteCode::kOutOfSpace;
Alexei Strots01395492023-03-20 13:59:56 -0700141 if (ran_out_of_space_) {
142 LOG(WARNING) << "And we are out of space";
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800143 }
144}
145
Philipp Schrader10397952023-06-15 11:43:07 -0700146DetachedBufferWriter::DetachedBufferWriter(already_out_of_space_t)
147 : DetachedBufferWriter(std::make_unique<OutOfDiskSpaceLogSink>(), nullptr) {
148}
149
Austin Schuha36c8902019-12-30 18:07:15 -0800150DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700151 Close();
152 if (ran_out_of_space_) {
153 CHECK(acknowledge_ran_out_of_space_)
154 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -0700155 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700156}
157
Brian Silvermand90905f2020-09-23 14:42:56 -0700158DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700159 *this = std::move(other);
160}
161
Brian Silverman87ac0402020-09-17 14:47:01 -0700162// When other is destroyed "soon" (which it should be because we're getting an
163// rvalue reference to it), it will flush etc all the data we have queued up
164// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700165DetachedBufferWriter &DetachedBufferWriter::operator=(
166 DetachedBufferWriter &&other) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700167 std::swap(log_sink_, other.log_sink_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700168 std::swap(encoder_, other.encoder_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700169 std::swap(ran_out_of_space_, other.ran_out_of_space_);
170 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800171 std::swap(last_flush_time_, other.last_flush_time_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700172 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800173}
174
Maxwell Gumleyd26e6292024-04-24 10:45:07 -0600175std::chrono::nanoseconds DetachedBufferWriter::CopyMessage(
176 DataEncoder::Copier *copier, aos::monotonic_clock::time_point now) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700177 if (ran_out_of_space_) {
178 // We don't want any later data to be written after space becomes
179 // available, so refuse to write anything more once we've dropped data
180 // because we ran out of space.
Maxwell Gumleyd26e6292024-04-24 10:45:07 -0600181 return std::chrono::nanoseconds::zero();
Austin Schuha36c8902019-12-30 18:07:15 -0800182 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700183
Austin Schuh8bdfc492023-02-11 12:53:13 -0800184 const size_t message_size = copier->size();
185 size_t overall_bytes_written = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -0700186
Maxwell Gumleyd26e6292024-04-24 10:45:07 -0600187 std::chrono::nanoseconds total_encode_duration =
188 std::chrono::nanoseconds::zero();
189
Austin Schuh8bdfc492023-02-11 12:53:13 -0800190 // Keep writing chunks until we've written it all. If we end up with a
191 // partial write, this means we need to flush to disk.
192 do {
Maxwell Gumleyd26e6292024-04-24 10:45:07 -0600193 // Initialize encode_duration for the case that the encoder cannot measure
194 // encode duration for a single message.
195 std::chrono::nanoseconds encode_duration = std::chrono::nanoseconds::zero();
Alexei Strots01395492023-03-20 13:59:56 -0700196 const size_t bytes_written =
Maxwell Gumleyd26e6292024-04-24 10:45:07 -0600197 encoder_->Encode(copier, overall_bytes_written, &encode_duration);
198
Austin Schuh8bdfc492023-02-11 12:53:13 -0800199 CHECK(bytes_written != 0);
200
201 overall_bytes_written += bytes_written;
202 if (overall_bytes_written < message_size) {
203 VLOG(1) << "Flushing because of a partial write, tried to write "
204 << message_size << " wrote " << overall_bytes_written;
205 Flush(now);
206 }
Maxwell Gumleyd26e6292024-04-24 10:45:07 -0600207 total_encode_duration += encode_duration;
Austin Schuh8bdfc492023-02-11 12:53:13 -0800208 } while (overall_bytes_written < message_size);
209
Maxwell Gumleyd26e6292024-04-24 10:45:07 -0600210 WriteStatistics()->UpdateEncodeDuration(total_encode_duration);
211
Austin Schuhbd06ae42021-03-31 22:48:21 -0700212 FlushAtThreshold(now);
Maxwell Gumleyd26e6292024-04-24 10:45:07 -0600213 return total_encode_duration;
Austin Schuha36c8902019-12-30 18:07:15 -0800214}
215
Brian Silverman0465fcf2020-09-24 00:29:18 -0700216void DetachedBufferWriter::Close() {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700217 if (!log_sink_->is_open()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700218 return;
219 }
Maxwell Gumleyd26e6292024-04-24 10:45:07 -0600220 // Initialize encode_duration for the case that the encoder cannot measure
221 // encode duration for a single message.
222 std::chrono::nanoseconds encode_duration = std::chrono::nanoseconds::zero();
223 encoder_->Finish(&encode_duration);
224 WriteStats().UpdateEncodeDuration(encode_duration);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700225 while (encoder_->queue_size() > 0) {
Austin Schuh8bdfc492023-02-11 12:53:13 -0800226 Flush(monotonic_clock::max_time);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700227 }
Austin Schuhb2461652023-05-01 08:30:56 -0700228 encoder_.reset();
Alexei Strotsbc082d82023-05-03 08:43:42 -0700229 ran_out_of_space_ = log_sink_->Close() == WriteCode::kOutOfSpace;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700230}
231
Austin Schuh8bdfc492023-02-11 12:53:13 -0800232void DetachedBufferWriter::Flush(aos::monotonic_clock::time_point now) {
233 last_flush_time_ = now;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700234 if (ran_out_of_space_) {
235 // We don't want any later data to be written after space becomes available,
236 // so refuse to write anything more once we've dropped data because we ran
237 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700238 if (encoder_) {
239 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
240 encoder_->Clear(encoder_->queue().size());
241 } else {
242 VLOG(1) << "No queue to ignore";
243 }
244 return;
245 }
246
247 const auto queue = encoder_->queue();
248 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700249 return;
250 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700251
Alexei Strotsbc082d82023-05-03 08:43:42 -0700252 const WriteResult result = log_sink_->Write(queue);
Alexei Strots01395492023-03-20 13:59:56 -0700253 encoder_->Clear(result.messages_written);
254 ran_out_of_space_ = result.code == WriteCode::kOutOfSpace;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700255}
256
Austin Schuhbd06ae42021-03-31 22:48:21 -0700257void DetachedBufferWriter::FlushAtThreshold(
258 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700259 if (ran_out_of_space_) {
260 // We don't want any later data to be written after space becomes available,
261 // so refuse to write anything more once we've dropped data because we ran
262 // out of space.
263 if (encoder_) {
264 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
265 encoder_->Clear(encoder_->queue().size());
266 } else {
267 VLOG(1) << "No queue to ignore";
268 }
269 return;
270 }
271
Austin Schuhbd06ae42021-03-31 22:48:21 -0700272 // We don't want to flush the first time through. Otherwise we will flush as
273 // the log file header might be compressing, defeating any parallelism and
274 // queueing there.
275 if (last_flush_time_ == aos::monotonic_clock::min_time) {
276 last_flush_time_ = now;
277 }
278
Brian Silvermanf51499a2020-09-21 12:49:08 -0700279 // Flush if we are at the max number of iovs per writev, because there's no
280 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700281 // data queued up or if it has been long enough.
Austin Schuh8bdfc492023-02-11 12:53:13 -0800282 while (encoder_->space() == 0 ||
283 encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700284 encoder_->queue_size() >= IOV_MAX ||
Austin Schuh3ebaf782023-04-07 16:03:28 -0700285 (now > last_flush_time_ +
286 chrono::duration_cast<chrono::nanoseconds>(
287 chrono::duration<double>(FLAGS_flush_period)) &&
288 encoder_->queued_bytes() != 0)) {
289 VLOG(1) << "Chose to flush at " << now << ", last " << last_flush_time_
290 << " queued bytes " << encoder_->queued_bytes();
Austin Schuh8bdfc492023-02-11 12:53:13 -0800291 Flush(now);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700292 }
Austin Schuha36c8902019-12-30 18:07:15 -0800293}
294
Austin Schuhf2d0e682022-10-16 14:20:58 -0700295// Do the magic dance to convert the endianness of the data and append it to the
296// buffer.
297namespace {
298
299// TODO(austin): Look at the generated code to see if building the header is
300// efficient or not.
301template <typename T>
302uint8_t *Push(uint8_t *buffer, const T data) {
303 const T endian_data = flatbuffers::EndianScalar<T>(data);
304 std::memcpy(buffer, &endian_data, sizeof(T));
305 return buffer + sizeof(T);
306}
307
308uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
309 std::memcpy(buffer, data, size);
310 return buffer + size;
311}
312
313uint8_t *Pad(uint8_t *buffer, size_t padding) {
314 std::memset(buffer, 0, padding);
315 return buffer + padding;
316}
317} // namespace
318
319flatbuffers::Offset<MessageHeader> PackRemoteMessage(
320 flatbuffers::FlatBufferBuilder *fbb,
321 const message_bridge::RemoteMessage *msg, int channel_index,
322 const aos::monotonic_clock::time_point monotonic_timestamp_time) {
323 logger::MessageHeader::Builder message_header_builder(*fbb);
324 // Note: this must match the same order as MessageBridgeServer and
325 // PackMessage. We want identical headers to have identical
326 // on-the-wire formats to make comparing them easier.
327
328 message_header_builder.add_channel_index(channel_index);
329
330 message_header_builder.add_queue_index(msg->queue_index());
331 message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
332 message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
333
Austin Schuhb5224ec2024-03-27 15:20:09 -0700334 message_header_builder.add_monotonic_timestamp_time(
335 monotonic_timestamp_time.time_since_epoch().count());
336
337 message_header_builder.add_monotonic_remote_transmit_time(
338 msg->monotonic_remote_transmit_time());
339
Austin Schuhf2d0e682022-10-16 14:20:58 -0700340 message_header_builder.add_monotonic_remote_time(
341 msg->monotonic_remote_time());
342 message_header_builder.add_realtime_remote_time(msg->realtime_remote_time());
343 message_header_builder.add_remote_queue_index(msg->remote_queue_index());
344
Austin Schuhf2d0e682022-10-16 14:20:58 -0700345 return message_header_builder.Finish();
346}
347
348size_t PackRemoteMessageInline(
349 uint8_t *buffer, const message_bridge::RemoteMessage *msg,
350 int channel_index,
Austin Schuh71a40d42023-02-04 21:22:22 -0800351 const aos::monotonic_clock::time_point monotonic_timestamp_time,
352 size_t start_byte, size_t end_byte) {
Austin Schuhf2d0e682022-10-16 14:20:58 -0700353 const flatbuffers::uoffset_t message_size = PackRemoteMessageSize();
Austin Schuh71a40d42023-02-04 21:22:22 -0800354 DCHECK_EQ((start_byte % 8u), 0u);
355 DCHECK_EQ((end_byte % 8u), 0u);
356 DCHECK_LE(start_byte, end_byte);
357 DCHECK_LE(end_byte, message_size);
Austin Schuhf2d0e682022-10-16 14:20:58 -0700358
Austin Schuh71a40d42023-02-04 21:22:22 -0800359 switch (start_byte) {
360 case 0x00u:
361 if ((end_byte) == 0x00u) {
362 break;
363 }
364 // clang-format off
365 // header:
Austin Schuhb5224ec2024-03-27 15:20:09 -0700366 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
367
Austin Schuh71a40d42023-02-04 21:22:22 -0800368 buffer = Push<flatbuffers::uoffset_t>(
369 buffer, message_size - sizeof(flatbuffers::uoffset_t));
Austin Schuhb5224ec2024-03-27 15:20:09 -0700370 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
371 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1C);
Austin Schuh71a40d42023-02-04 21:22:22 -0800372 [[fallthrough]];
373 case 0x08u:
374 if ((end_byte) == 0x08u) {
375 break;
376 }
377 //
Austin Schuh71a40d42023-02-04 21:22:22 -0800378 // vtable (aos.logger.MessageHeader):
Austin Schuhb5224ec2024-03-27 15:20:09 -0700379 // +0x08 | 18 00 | uint16_t | 0x0018 (24) | size of this vtable
380 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
381 // +0x0A | 40 00 | uint16_t | 0x0040 (64) | size of referring table
382 buffer = Push<flatbuffers::voffset_t>(buffer, 0x40);
383 // +0x0C | 3C 00 | VOffset16 | 0x003C (60) | offset to field `channel_index` (id: 0)
384 buffer = Push<flatbuffers::voffset_t>(buffer, 0x3c);
385 // +0x0E | 30 00 | VOffset16 | 0x0030 (48) | offset to field `monotonic_sent_time` (id: 1)
386 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
Austin Schuh71a40d42023-02-04 21:22:22 -0800387 [[fallthrough]];
388 case 0x10u:
389 if ((end_byte) == 0x10u) {
390 break;
391 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700392 // +0x10 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `realtime_sent_time` (id: 2)
393 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
394 // +0x12 | 38 00 | VOffset16 | 0x0038 (56) | offset to field `queue_index` (id: 3)
Austin Schuh71a40d42023-02-04 21:22:22 -0800395 buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700396 // +0x14 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
397 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
398 // +0x16 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
399 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
Austin Schuh71a40d42023-02-04 21:22:22 -0800400 [[fallthrough]];
401 case 0x18u:
402 if ((end_byte) == 0x18u) {
403 break;
404 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700405 // +0x18 | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
406 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
407 // +0x1A | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
408 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
409 // +0x1C | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_timestamp_time` (id: 8)
410 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
411 // +0x1E | 18 00 | VOffset16 | 0x0018 (24) | offset to field `monotonic_remote_transmit_time` (id: 9)
412 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
Austin Schuh71a40d42023-02-04 21:22:22 -0800413 [[fallthrough]];
414 case 0x20u:
415 if ((end_byte) == 0x20u) {
416 break;
417 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700418
419
Austin Schuh71a40d42023-02-04 21:22:22 -0800420 // root_table (aos.logger.MessageHeader):
Austin Schuhb5224ec2024-03-27 15:20:09 -0700421 // +0x20 | 18 00 00 00 | SOffset32 | 0x00000018 (24) Loc: +0x08 | offset to vtable
422 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
423 // +0x24 | 8B 00 00 00 | uint32_t | 0x0000008B (139) | table field `remote_queue_index` (UInt)
424 buffer = Push<uint32_t>(buffer, msg->remote_queue_index());
Austin Schuh71a40d42023-02-04 21:22:22 -0800425 [[fallthrough]];
426 case 0x28u:
427 if ((end_byte) == 0x28u) {
428 break;
429 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700430 // +0x28 | D4 C9 48 86 92 8B 6A AF | int64_t | 0xAF6A8B928648C9D4 (-5806675308106429996) | table field `realtime_remote_time` (Long)
431 buffer = Push<int64_t>(buffer, msg->realtime_remote_time());
Austin Schuh71a40d42023-02-04 21:22:22 -0800432 [[fallthrough]];
433 case 0x30u:
434 if ((end_byte) == 0x30u) {
435 break;
436 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700437 // +0x30 | 65 B1 32 50 FE 54 50 6B | int64_t | 0x6B5054FE5032B165 (7732774011439067493) | table field `monotonic_remote_time` (Long)
438 buffer = Push<int64_t>(buffer, msg->monotonic_remote_time());
Austin Schuh71a40d42023-02-04 21:22:22 -0800439 [[fallthrough]];
440 case 0x38u:
441 if ((end_byte) == 0x38u) {
442 break;
443 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700444 // +0x38 | EA 4D CC E0 FC 20 86 71 | int64_t | 0x718620FCE0CC4DEA (8180262043640417770) | table field `monotonic_remote_transmit_time` (Long)
445 buffer = Push<int64_t>(buffer, msg->monotonic_remote_transmit_time());
Austin Schuh71a40d42023-02-04 21:22:22 -0800446 [[fallthrough]];
447 case 0x40u:
448 if ((end_byte) == 0x40u) {
449 break;
450 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700451 // +0x40 | 8E 59 CF 88 9D DF 02 07 | int64_t | 0x0702DF9D88CF598E (505211975917066638) | table field `monotonic_timestamp_time` (Long)
452 buffer = Push<int64_t>(buffer,
453 monotonic_timestamp_time.time_since_epoch().count());
Austin Schuh71a40d42023-02-04 21:22:22 -0800454 [[fallthrough]];
455 case 0x48u:
456 if ((end_byte) == 0x48u) {
457 break;
458 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700459 // +0x48 | 14 D5 A7 D8 B2 E4 EF 89 | int64_t | 0x89EFE4B2D8A7D514 (-8507329714289388268) | table field `realtime_sent_time` (Long)
Austin Schuh71a40d42023-02-04 21:22:22 -0800460 buffer = Push<int64_t>(buffer, msg->realtime_sent_time());
461 [[fallthrough]];
462 case 0x50u:
463 if ((end_byte) == 0x50u) {
464 break;
465 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700466 // +0x50 | 19 7D 7F EF 86 8D 92 65 | int64_t | 0x65928D86EF7F7D19 (7319067955113721113) | table field `monotonic_sent_time` (Long)
Austin Schuh71a40d42023-02-04 21:22:22 -0800467 buffer = Push<int64_t>(buffer, msg->monotonic_sent_time());
468 [[fallthrough]];
469 case 0x58u:
470 if ((end_byte) == 0x58u) {
471 break;
472 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700473 // +0x58 | FC 00 00 00 | uint32_t | 0x000000FC (252) | table field `queue_index` (UInt)
Austin Schuh71a40d42023-02-04 21:22:22 -0800474 buffer = Push<uint32_t>(buffer, msg->queue_index());
Austin Schuhb5224ec2024-03-27 15:20:09 -0700475 // +0x5C | 9C 00 00 00 | uint32_t | 0x0000009C (156) | table field `channel_index` (UInt)
Austin Schuh71a40d42023-02-04 21:22:22 -0800476 buffer = Push<uint32_t>(buffer, channel_index);
477 // clang-format on
478 [[fallthrough]];
479 case 0x60u:
480 if ((end_byte) == 0x60u) {
481 break;
482 }
483 }
Austin Schuhf2d0e682022-10-16 14:20:58 -0700484
Austin Schuh71a40d42023-02-04 21:22:22 -0800485 return end_byte - start_byte;
Austin Schuhf2d0e682022-10-16 14:20:58 -0700486}
487
Austin Schuha36c8902019-12-30 18:07:15 -0800488flatbuffers::Offset<MessageHeader> PackMessage(
489 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
490 int channel_index, LogType log_type) {
491 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
492
493 switch (log_type) {
494 case LogType::kLogMessage:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800495 case LogType::kLogRemoteMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700496 // Since the timestamps are 8 byte aligned, we are going to end up adding
497 // padding in the middle of the message to pad everything out to 8 byte
498 // alignment. That's rather wasteful. To make things efficient to mmap
499 // while reading uncompressed logs, we'd actually rather the message be
500 // aligned. So, force 8 byte alignment (enough to preserve alignment
501 // inside the nested message so that we can read it without moving it)
502 // here.
503 fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8);
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700504 data_offset = fbb->CreateVector(
505 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800506 break;
507
508 case LogType::kLogDeliveryTimeOnly:
509 break;
510 }
511
512 MessageHeader::Builder message_header_builder(*fbb);
513 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800514
Austin Schuhfa30c352022-10-16 11:12:02 -0700515 // These are split out into very explicit serialization calls because the
516 // order here changes the order things are written out on the wire, and we
517 // want to control and understand it here. Changing the order can increase
518 // the amount of padding bytes in the middle.
519 //
James Kuszmaul9776b392023-01-14 14:08:08 -0800520 // It is also easier to follow... And doesn't actually make things much
521 // bigger.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800522 switch (log_type) {
523 case LogType::kLogRemoteMessage:
524 message_header_builder.add_queue_index(context.remote_queue_index);
Austin Schuhfa30c352022-10-16 11:12:02 -0700525 message_header_builder.add_data(data_offset);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800526 message_header_builder.add_monotonic_sent_time(
527 context.monotonic_remote_time.time_since_epoch().count());
528 message_header_builder.add_realtime_sent_time(
529 context.realtime_remote_time.time_since_epoch().count());
530 break;
531
Austin Schuh6f3babe2020-01-26 20:34:50 -0800532 case LogType::kLogDeliveryTimeOnly:
533 message_header_builder.add_queue_index(context.queue_index);
534 message_header_builder.add_monotonic_sent_time(
535 context.monotonic_event_time.time_since_epoch().count());
536 message_header_builder.add_realtime_sent_time(
537 context.realtime_event_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800538 message_header_builder.add_monotonic_remote_time(
539 context.monotonic_remote_time.time_since_epoch().count());
540 message_header_builder.add_realtime_remote_time(
541 context.realtime_remote_time.time_since_epoch().count());
Austin Schuhb5224ec2024-03-27 15:20:09 -0700542 message_header_builder.add_monotonic_remote_transmit_time(
543 context.monotonic_remote_transmit_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800544 message_header_builder.add_remote_queue_index(context.remote_queue_index);
545 break;
Austin Schuhfa30c352022-10-16 11:12:02 -0700546
547 case LogType::kLogMessage:
548 message_header_builder.add_queue_index(context.queue_index);
549 message_header_builder.add_data(data_offset);
550 message_header_builder.add_monotonic_sent_time(
551 context.monotonic_event_time.time_since_epoch().count());
552 message_header_builder.add_realtime_sent_time(
553 context.realtime_event_time.time_since_epoch().count());
554 break;
Austin Schuha36c8902019-12-30 18:07:15 -0800555 }
556
557 return message_header_builder.Finish();
558}
559
Austin Schuhfa30c352022-10-16 11:12:02 -0700560flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) {
561 switch (log_type) {
562 case LogType::kLogMessage:
563 return
564 // Root table size + offset.
565 sizeof(flatbuffers::uoffset_t) * 2 +
566 // 6 padding bytes to pad the header out properly.
567 6 +
568 // vtable header (size + size of table)
569 sizeof(flatbuffers::voffset_t) * 2 +
570 // offsets to all the fields.
571 sizeof(flatbuffers::voffset_t) * 5 +
572 // pointer to vtable
573 sizeof(flatbuffers::soffset_t) +
574 // pointer to data
575 sizeof(flatbuffers::uoffset_t) +
576 // realtime_sent_time, monotonic_sent_time
577 sizeof(int64_t) * 2 +
578 // queue_index, channel_index
579 sizeof(uint32_t) * 2;
580
581 case LogType::kLogDeliveryTimeOnly:
582 return
583 // Root table size + offset.
584 sizeof(flatbuffers::uoffset_t) * 2 +
Austin Schuhfa30c352022-10-16 11:12:02 -0700585 // vtable header (size + size of table)
586 sizeof(flatbuffers::voffset_t) * 2 +
587 // offsets to all the fields.
Austin Schuhb5224ec2024-03-27 15:20:09 -0700588 sizeof(flatbuffers::voffset_t) * 10 +
Austin Schuhfa30c352022-10-16 11:12:02 -0700589 // pointer to vtable
590 sizeof(flatbuffers::soffset_t) +
591 // remote_queue_index
592 sizeof(uint32_t) +
593 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
594 // monotonic_sent_time
Austin Schuhb5224ec2024-03-27 15:20:09 -0700595 sizeof(int64_t) * 5 +
Austin Schuhfa30c352022-10-16 11:12:02 -0700596 // queue_index, channel_index
597 sizeof(uint32_t) * 2;
598
Austin Schuhfa30c352022-10-16 11:12:02 -0700599 case LogType::kLogRemoteMessage:
600 return
601 // Root table size + offset.
602 sizeof(flatbuffers::uoffset_t) * 2 +
603 // 6 padding bytes to pad the header out properly.
604 6 +
605 // vtable header (size + size of table)
606 sizeof(flatbuffers::voffset_t) * 2 +
607 // offsets to all the fields.
608 sizeof(flatbuffers::voffset_t) * 5 +
609 // pointer to vtable
610 sizeof(flatbuffers::soffset_t) +
611 // realtime_sent_time, monotonic_sent_time
612 sizeof(int64_t) * 2 +
613 // pointer to data
614 sizeof(flatbuffers::uoffset_t) +
615 // queue_index, channel_index
616 sizeof(uint32_t) * 2;
617 }
618 LOG(FATAL);
619}
620
James Kuszmaul9776b392023-01-14 14:08:08 -0800621flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size) {
Austin Schuhfa30c352022-10-16 11:12:02 -0700622 static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
623 "Update size logic please.");
624 const flatbuffers::uoffset_t aligned_data_length =
Austin Schuh48d10d62022-10-16 22:19:23 -0700625 ((data_size + 7) & 0xfffffff8u);
Austin Schuhfa30c352022-10-16 11:12:02 -0700626 switch (log_type) {
627 case LogType::kLogDeliveryTimeOnly:
628 return PackMessageHeaderSize(log_type);
629
630 case LogType::kLogMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700631 case LogType::kLogRemoteMessage:
632 return PackMessageHeaderSize(log_type) +
633 // Vector...
634 sizeof(flatbuffers::uoffset_t) + aligned_data_length;
635 }
636 LOG(FATAL);
637}
638
Austin Schuhfa30c352022-10-16 11:12:02 -0700639size_t PackMessageInline(uint8_t *buffer, const Context &context,
Austin Schuh71a40d42023-02-04 21:22:22 -0800640 int channel_index, LogType log_type, size_t start_byte,
641 size_t end_byte) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700642 // TODO(austin): Figure out how to copy directly from shared memory instead of
643 // first into the fetcher's memory and then into here. That would save a lot
644 // of memory.
Austin Schuhfa30c352022-10-16 11:12:02 -0700645 const flatbuffers::uoffset_t message_size =
Austin Schuh48d10d62022-10-16 22:19:23 -0700646 PackMessageSize(log_type, context.size);
Austin Schuh71a40d42023-02-04 21:22:22 -0800647 DCHECK_EQ((message_size % 8), 0u) << ": Non 8 byte length...";
648 DCHECK_EQ((start_byte % 8u), 0u);
649 DCHECK_EQ((end_byte % 8u), 0u);
650 DCHECK_LE(start_byte, end_byte);
651 DCHECK_LE(end_byte, message_size);
Austin Schuhfa30c352022-10-16 11:12:02 -0700652
653 // Pack all the data in. This is brittle but easy to change. Use the
654 // InlinePackMessage.Equivilent unit test to verify everything matches.
655 switch (log_type) {
656 case LogType::kLogMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -0800657 switch (start_byte) {
658 case 0x00u:
659 if ((end_byte) == 0x00u) {
660 break;
661 }
662 // clang-format off
663 // header:
664 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
665 buffer = Push<flatbuffers::uoffset_t>(
666 buffer, message_size - sizeof(flatbuffers::uoffset_t));
667
668 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
669 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
670 [[fallthrough]];
671 case 0x08u:
672 if ((end_byte) == 0x08u) {
673 break;
674 }
675 //
676 // padding:
677 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
678 buffer = Pad(buffer, 6);
679 //
680 // vtable (aos.logger.MessageHeader):
681 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
682 buffer = Push<flatbuffers::voffset_t>(buffer, 0xe);
683 [[fallthrough]];
684 case 0x10u:
685 if ((end_byte) == 0x10u) {
686 break;
687 }
688 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
689 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
690 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
691 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
692 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
693 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
694 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
695 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
696 [[fallthrough]];
697 case 0x18u:
698 if ((end_byte) == 0x18u) {
699 break;
700 }
701 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
702 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
703 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
704 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
705 //
706 // root_table (aos.logger.MessageHeader):
707 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
708 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e);
709 [[fallthrough]];
710 case 0x20u:
711 if ((end_byte) == 0x20u) {
712 break;
713 }
714 // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long)
715 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
716 [[fallthrough]];
717 case 0x28u:
718 if ((end_byte) == 0x28u) {
719 break;
720 }
721 // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long)
722 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
723 [[fallthrough]];
724 case 0x30u:
725 if ((end_byte) == 0x30u) {
726 break;
727 }
728 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
729 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c);
730 // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt)
731 buffer = Push<uint32_t>(buffer, context.queue_index);
732 [[fallthrough]];
733 case 0x38u:
734 if ((end_byte) == 0x38u) {
735 break;
736 }
737 // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt)
738 buffer = Push<uint32_t>(buffer, channel_index);
739 //
740 // vector (aos.logger.MessageHeader.data):
741 // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items)
742 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
743 [[fallthrough]];
744 case 0x40u:
745 if ((end_byte) == 0x40u) {
746 break;
747 }
748 [[fallthrough]];
749 default:
750 // +0x40 | FF | uint8_t | 0xFF (255) | value[0]
751 // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1]
752 // +0x42 | EE | uint8_t | 0xEE (238) | value[2]
753 // +0x43 | 00 | uint8_t | 0x00 (0) | value[3]
754 // +0x44 | 20 | uint8_t | 0x20 (32) | value[4]
755 // +0x45 | 4D | uint8_t | 0x4D (77) | value[5]
756 // +0x46 | FF | uint8_t | 0xFF (255) | value[6]
757 // +0x47 | 25 | uint8_t | 0x25 (37) | value[7]
758 // +0x48 | 3C | uint8_t | 0x3C (60) | value[8]
759 // +0x49 | 17 | uint8_t | 0x17 (23) | value[9]
760 // +0x4A | 65 | uint8_t | 0x65 (101) | value[10]
761 // +0x4B | 2F | uint8_t | 0x2F (47) | value[11]
762 // +0x4C | 63 | uint8_t | 0x63 (99) | value[12]
763 // +0x4D | 58 | uint8_t | 0x58 (88) | value[13]
764 //
765 // padding:
766 // +0x4E | 00 00 | uint8_t[2] | .. | padding
767 // clang-format on
768 if (start_byte <= 0x40 && end_byte == message_size) {
769 // The easy one, slap it all down.
770 buffer = PushBytes(buffer, context.data, context.size);
771 buffer =
772 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
773 } else {
774 const size_t data_start_byte =
775 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
776 const size_t data_end_byte = end_byte - 0x40;
777 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
778 if (data_start_byte < padded_size) {
779 buffer = PushBytes(
780 buffer,
781 reinterpret_cast<const uint8_t *>(context.data) +
782 data_start_byte,
783 std::min(context.size, data_end_byte) - data_start_byte);
784 if (data_end_byte == padded_size) {
785 // We can only pad the last 7 bytes, so this only gets written
786 // if we write the last byte.
787 buffer = Pad(buffer,
788 ((context.size + 7) & 0xfffffff8u) - context.size);
789 }
790 }
791 }
792 break;
793 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700794 break;
795
796 case LogType::kLogDeliveryTimeOnly:
Austin Schuh71a40d42023-02-04 21:22:22 -0800797 switch (start_byte) {
798 case 0x00u:
799 if ((end_byte) == 0x00u) {
800 break;
801 }
802 // clang-format off
803 // header:
Austin Schuhb5224ec2024-03-27 15:20:09 -0700804 // +0x00 | 54 00 00 00 | UOffset32 | 0x00000054 (84) Loc: +0x54 | size prefix
Austin Schuh71a40d42023-02-04 21:22:22 -0800805 buffer = Push<flatbuffers::uoffset_t>(
806 buffer, message_size - sizeof(flatbuffers::uoffset_t));
Austin Schuhb5224ec2024-03-27 15:20:09 -0700807 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
Austin Schuh71a40d42023-02-04 21:22:22 -0800808 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
Austin Schuhfa30c352022-10-16 11:12:02 -0700809
Austin Schuh71a40d42023-02-04 21:22:22 -0800810 [[fallthrough]];
811 case 0x08u:
812 if ((end_byte) == 0x08u) {
813 break;
814 }
815 //
Austin Schuh71a40d42023-02-04 21:22:22 -0800816 // vtable (aos.logger.MessageHeader):
Austin Schuhb5224ec2024-03-27 15:20:09 -0700817 // +0x08 | 18 00 | uint16_t | 0x0018 (24) | size of this vtable
818 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
819 // +0x0A | 38 00 | uint16_t | 0x0038 (56) | size of referring table
820 buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
821 // +0x0C | 34 00 | VOffset16 | 0x0034 (52) | offset to field `channel_index` (id: 0)
822 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
823 // +0x0E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `monotonic_sent_time` (id: 1)
824 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
Austin Schuh71a40d42023-02-04 21:22:22 -0800825 [[fallthrough]];
826 case 0x10u:
827 if ((end_byte) == 0x10u) {
828 break;
829 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700830 // +0x10 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `realtime_sent_time` (id: 2)
Austin Schuh71a40d42023-02-04 21:22:22 -0800831 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700832 // +0x12 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `queue_index` (id: 3)
833 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
834 // +0x14 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
835 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
836 // +0x16 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `monotonic_remote_time` (id: 5)
Austin Schuh71a40d42023-02-04 21:22:22 -0800837 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
Austin Schuh71a40d42023-02-04 21:22:22 -0800838 [[fallthrough]];
839 case 0x18u:
840 if ((end_byte) == 0x18u) {
841 break;
842 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700843 // +0x18 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `realtime_remote_time` (id: 6)
Austin Schuh71a40d42023-02-04 21:22:22 -0800844 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700845 // +0x1A | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
Austin Schuh71a40d42023-02-04 21:22:22 -0800846 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700847 // +0x1C | 00 00 | VOffset16 | 0x0000 (0) | offset to field `monotonic_timestamp_time` (id: 8) <defaults to -9223372036854775808> (Long)
848 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
849 // +0x1E | 08 00 | VOffset16 | 0x0008 (8) | offset to field `monotonic_remote_transmit_time` (id: 9)
850 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
Austin Schuh71a40d42023-02-04 21:22:22 -0800851 [[fallthrough]];
852 case 0x20u:
853 if ((end_byte) == 0x20u) {
854 break;
855 }
Austin Schuh71a40d42023-02-04 21:22:22 -0800856 // root_table (aos.logger.MessageHeader):
Austin Schuhb5224ec2024-03-27 15:20:09 -0700857 // +0x20 | 18 00 00 00 | SOffset32 | 0x00000018 (24) Loc: +0x08 | offset to vtable
858 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
859 // +0x24 | 3F 9A 69 37 | uint32_t | 0x37699A3F (929667647) | table field `remote_queue_index` (UInt)
Austin Schuh71a40d42023-02-04 21:22:22 -0800860 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
861 [[fallthrough]];
862 case 0x28u:
863 if ((end_byte) == 0x28u) {
864 break;
865 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700866 // +0x28 | 00 00 00 00 00 00 00 80 | int64_t | 0x8000000000000000 (-9223372036854775808) | table field `monotonic_remote_transmit_time` (Long)
867 buffer = Push<int64_t>(buffer, context.monotonic_remote_transmit_time.time_since_epoch().count());
Austin Schuh71a40d42023-02-04 21:22:22 -0800868 [[fallthrough]];
869 case 0x30u:
870 if ((end_byte) == 0x30u) {
871 break;
872 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700873 // +0x30 | 1D CE 4A 38 54 33 C9 F8 | int64_t | 0xF8C93354384ACE1D (-519827845169885667) | table field `realtime_remote_time` (Long)
874 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
Austin Schuh71a40d42023-02-04 21:22:22 -0800875 [[fallthrough]];
876 case 0x38u:
877 if ((end_byte) == 0x38u) {
878 break;
879 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700880 // +0x38 | FE EA DF 1D C7 3F C6 03 | int64_t | 0x03C63FC71DDFEAFE (271974951934749438) | table field `monotonic_remote_time` (Long)
881 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
Austin Schuh71a40d42023-02-04 21:22:22 -0800882 [[fallthrough]];
883 case 0x40u:
884 if ((end_byte) == 0x40u) {
885 break;
886 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700887 // +0x40 | 4E 0C 96 6E FB B5 CE 12 | int64_t | 0x12CEB5FB6E960C4E (1355220629381844046) | table field `realtime_sent_time` (Long)
888 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
Austin Schuh71a40d42023-02-04 21:22:22 -0800889 [[fallthrough]];
890 case 0x48u:
891 if ((end_byte) == 0x48u) {
892 break;
893 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700894 // +0x48 | 51 56 56 F9 0A 0B 0F 12 | int64_t | 0x120F0B0AF9565651 (1301270959094126161) | table field `monotonic_sent_time` (Long)
895 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
896 [[fallthrough]];
897 case 0x50u:
898 if ((end_byte) == 0x50u) {
899 break;
900 }
901 // +0x50 | 0C A5 42 18 | uint32_t | 0x1842A50C (407020812) | table field `queue_index` (UInt)
Austin Schuh71a40d42023-02-04 21:22:22 -0800902 buffer = Push<uint32_t>(buffer, context.queue_index);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700903 // +0x54 | 87 10 7C D7 | uint32_t | 0xD77C1087 (3615232135) | table field `channel_index` (UInt)
Austin Schuh71a40d42023-02-04 21:22:22 -0800904 buffer = Push<uint32_t>(buffer, channel_index);
905
906 // clang-format on
907 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700908 break;
909
Austin Schuhfa30c352022-10-16 11:12:02 -0700910 case LogType::kLogRemoteMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -0800911 switch (start_byte) {
912 case 0x00u:
913 if ((end_byte) == 0x00u) {
914 break;
915 }
916 // This is the message we need to recreate.
917 //
918 // clang-format off
919 // header:
920 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
921 buffer = Push<flatbuffers::uoffset_t>(
922 buffer, message_size - sizeof(flatbuffers::uoffset_t));
923 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
924 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
925 [[fallthrough]];
926 case 0x08u:
927 if ((end_byte) == 0x08u) {
928 break;
929 }
930 //
931 // padding:
932 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
933 buffer = Pad(buffer, 6);
934 //
935 // vtable (aos.logger.MessageHeader):
936 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
937 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e);
938 [[fallthrough]];
939 case 0x10u:
940 if ((end_byte) == 0x10u) {
941 break;
942 }
943 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
944 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
945 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
946 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
947 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
948 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
949 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
950 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
951 [[fallthrough]];
952 case 0x18u:
953 if ((end_byte) == 0x18u) {
954 break;
955 }
956 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
957 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
958 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
959 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
960 //
961 // root_table (aos.logger.MessageHeader):
962 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
963 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E);
964 [[fallthrough]];
965 case 0x20u:
966 if ((end_byte) == 0x20u) {
967 break;
968 }
969 // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long)
970 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
971 [[fallthrough]];
972 case 0x28u:
973 if ((end_byte) == 0x28u) {
974 break;
975 }
976 // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long)
977 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
978 [[fallthrough]];
979 case 0x30u:
980 if ((end_byte) == 0x30u) {
981 break;
982 }
983 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
984 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C);
985 // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt)
986 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
987 [[fallthrough]];
988 case 0x38u:
989 if ((end_byte) == 0x38u) {
990 break;
991 }
992 // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt)
993 buffer = Push<uint32_t>(buffer, channel_index);
994 //
995 // vector (aos.logger.MessageHeader.data):
996 // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items)
997 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
998 [[fallthrough]];
999 case 0x40u:
1000 if ((end_byte) == 0x40u) {
1001 break;
1002 }
1003 [[fallthrough]];
1004 default:
1005 // +0x40 | 38 | uint8_t | 0x38 (56) | value[0]
1006 // +0x41 | 1A | uint8_t | 0x1A (26) | value[1]
1007 // ...
1008 // +0x58 | 90 | uint8_t | 0x90 (144) | value[24]
1009 // +0x59 | 92 | uint8_t | 0x92 (146) | value[25]
1010 //
1011 // padding:
1012 // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1013 // clang-format on
1014 if (start_byte <= 0x40 && end_byte == message_size) {
1015 // The easy one, slap it all down.
1016 buffer = PushBytes(buffer, context.data, context.size);
1017 buffer =
1018 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1019 } else {
1020 const size_t data_start_byte =
1021 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
1022 const size_t data_end_byte = end_byte - 0x40;
1023 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1024 if (data_start_byte < padded_size) {
1025 buffer = PushBytes(
1026 buffer,
1027 reinterpret_cast<const uint8_t *>(context.data) +
1028 data_start_byte,
1029 std::min(context.size, data_end_byte) - data_start_byte);
1030 if (data_end_byte == padded_size) {
1031 // We can only pad the last 7 bytes, so this only gets written
1032 // if we write the last byte.
1033 buffer = Pad(buffer,
1034 ((context.size + 7) & 0xfffffff8u) - context.size);
1035 }
1036 }
1037 }
1038 break;
1039 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001040 }
1041
Austin Schuh71a40d42023-02-04 21:22:22 -08001042 return end_byte - start_byte;
Austin Schuhfa30c352022-10-16 11:12:02 -07001043}
1044
Austin Schuhcd368422021-11-22 21:23:29 -08001045SpanReader::SpanReader(std::string_view filename, bool quiet)
Alexei Strotscee7b372023-04-21 11:57:54 -07001046 : SpanReader(filename, ResolveDecoder(filename, quiet)) {}
Tyler Chatow2015bc62021-08-04 21:15:09 -07001047
Alexei Strotscee7b372023-04-21 11:57:54 -07001048SpanReader::SpanReader(std::string_view filename,
1049 std::unique_ptr<DataDecoder> decoder)
1050 : filename_(filename), decoder_(std::move(decoder)) {}
Austin Schuh05b70472020-01-01 17:11:17 -08001051
Austin Schuhcf5f6442021-07-06 10:43:28 -07001052absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001053 // Make sure we have enough for the size.
1054 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
1055 if (!ReadBlock()) {
1056 return absl::Span<const uint8_t>();
1057 }
1058 }
1059
1060 // Now make sure we have enough for the message.
1061 const size_t data_size =
1062 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1063 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -08001064 if (data_size == sizeof(flatbuffers::uoffset_t)) {
1065 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
1066 LOG(ERROR) << " Rest of log file is "
1067 << absl::BytesToHexString(std::string_view(
1068 reinterpret_cast<const char *>(data_.data() +
1069 consumed_data_),
1070 data_.size() - consumed_data_));
1071 return absl::Span<const uint8_t>();
1072 }
Austin Schuh05b70472020-01-01 17:11:17 -08001073 while (data_.size() < consumed_data_ + data_size) {
1074 if (!ReadBlock()) {
1075 return absl::Span<const uint8_t>();
1076 }
1077 }
1078
1079 // And return it, consuming the data.
1080 const uint8_t *data_ptr = data_.data() + consumed_data_;
1081
Austin Schuh05b70472020-01-01 17:11:17 -08001082 return absl::Span<const uint8_t>(data_ptr, data_size);
1083}
1084
Austin Schuhcf5f6442021-07-06 10:43:28 -07001085void SpanReader::ConsumeMessage() {
Brian Smarttea913d42021-12-10 15:02:38 -08001086 size_t consumed_size =
Austin Schuhcf5f6442021-07-06 10:43:28 -07001087 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1088 sizeof(flatbuffers::uoffset_t);
Brian Smarttea913d42021-12-10 15:02:38 -08001089 consumed_data_ += consumed_size;
1090 total_consumed_ += consumed_size;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001091}
1092
1093absl::Span<const uint8_t> SpanReader::ReadMessage() {
1094 absl::Span<const uint8_t> result = PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001095 if (!result.empty()) {
Austin Schuhcf5f6442021-07-06 10:43:28 -07001096 ConsumeMessage();
Brian Smarttea913d42021-12-10 15:02:38 -08001097 } else {
1098 is_finished_ = true;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001099 }
1100 return result;
1101}
1102
Austin Schuh05b70472020-01-01 17:11:17 -08001103bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001104 // This is the amount of data we grab at a time. Doing larger chunks minimizes
1105 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -08001106 constexpr size_t kReadSize = 256 * 1024;
1107
1108 // Strip off any unused data at the front.
1109 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001110 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -08001111 consumed_data_ = 0;
1112 }
1113
1114 const size_t starting_size = data_.size();
1115
1116 // This should automatically grow the backing store. It won't shrink if we
1117 // get a small chunk later. This reduces allocations when we want to append
1118 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -07001119 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -08001120
Brian Silvermanf51499a2020-09-21 12:49:08 -07001121 const size_t count =
1122 decoder_->Read(data_.begin() + starting_size, data_.end());
1123 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -08001124 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -08001125 return false;
1126 }
Austin Schuh05b70472020-01-01 17:11:17 -08001127
Brian Smarttea913d42021-12-10 15:02:38 -08001128 total_read_ += count;
1129
Austin Schuh05b70472020-01-01 17:11:17 -08001130 return true;
1131}
1132
Alexei Strotsa3194712023-04-21 23:30:50 -07001133LogReadersPool::LogReadersPool(const LogSource *log_source, size_t pool_size)
1134 : log_source_(log_source), pool_size_(pool_size) {}
1135
1136SpanReader *LogReadersPool::BorrowReader(std::string_view id) {
1137 if (part_readers_.size() > pool_size_) {
1138 // Don't leave arbitrary numbers of readers open, because they each take
1139 // resources, so close a big batch at once periodically.
1140 part_readers_.clear();
1141 }
1142 if (log_source_ == nullptr) {
1143 part_readers_.emplace_back(id, FLAGS_quiet_sorting);
1144 } else {
1145 part_readers_.emplace_back(id, log_source_->GetDecoder(id));
1146 }
1147 return &part_readers_.back();
1148}
1149
Austin Schuhadd6eb32020-11-09 21:24:26 -08001150std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -07001151 SpanReader *span_reader) {
1152 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001153
1154 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001155 if (config_data.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001156 return std::nullopt;
1157 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001158
Austin Schuh5212cad2020-09-09 23:12:09 -07001159 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001160 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -08001161 if (!result.Verify()) {
1162 return std::nullopt;
1163 }
Austin Schuh0e8db662021-07-06 10:43:47 -07001164
Austin Schuhcc2c9a52022-12-12 15:55:13 -08001165 // We only know of busted headers in the versions of the log file header
1166 // *before* the logger_sha1 field was added. At some point before that point,
1167 // the logic to track when a header has been written was rewritten in such a
1168 // way that it can't happen anymore. We've seen some logs where the body
1169 // parses as a header recently, so the simple solution of always looking is
1170 // failing us.
1171 if (FLAGS_workaround_double_headers && !result.message().has_logger_sha1()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001172 while (true) {
1173 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001174 if (maybe_header_data.empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001175 break;
1176 }
1177
1178 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
1179 maybe_header_data);
1180 if (maybe_header.Verify()) {
1181 LOG(WARNING) << "Found duplicate LogFileHeader in "
1182 << span_reader->filename();
1183 ResizeableBuffer header_data_copy;
1184 header_data_copy.resize(maybe_header_data.size());
1185 memcpy(header_data_copy.data(), maybe_header_data.begin(),
1186 header_data_copy.size());
1187 result = SizePrefixedFlatbufferVector<LogFileHeader>(
1188 std::move(header_data_copy));
1189
1190 span_reader->ConsumeMessage();
1191 } else {
1192 break;
1193 }
1194 }
1195 }
Austin Schuhe09beb12020-12-11 20:04:27 -08001196 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001197}
1198
Austin Schuh0e8db662021-07-06 10:43:47 -07001199std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
1200 std::string_view filename) {
1201 SpanReader span_reader(filename);
1202 return ReadHeader(&span_reader);
1203}
1204
Austin Schuhadd6eb32020-11-09 21:24:26 -08001205std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -08001206 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -07001207 SpanReader span_reader(filename);
1208 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
1209 for (size_t i = 0; i < n + 1; ++i) {
1210 data_span = span_reader.ReadMessage();
1211
1212 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001213 if (data_span.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001214 return std::nullopt;
1215 }
Austin Schuh5212cad2020-09-09 23:12:09 -07001216 }
1217
Brian Silverman354697a2020-09-22 21:06:32 -07001218 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001219 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -08001220 if (!result.Verify()) {
1221 return std::nullopt;
1222 }
1223 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -07001224}
1225
Alexei Strots58017402023-05-03 22:05:06 -07001226MessageReader::MessageReader(SpanReader span_reader)
1227 : span_reader_(std::move(span_reader)),
Austin Schuhadd6eb32020-11-09 21:24:26 -08001228 raw_log_file_header_(
1229 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001230 set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
1231 set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
1232
Austin Schuh0e8db662021-07-06 10:43:47 -07001233 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
1234 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -08001235
1236 // Make sure something was read.
Alexei Strots58017402023-05-03 22:05:06 -07001237 CHECK(raw_log_file_header)
1238 << ": Failed to read header from: " << span_reader_.filename();
Austin Schuh05b70472020-01-01 17:11:17 -08001239
Austin Schuh0e8db662021-07-06 10:43:47 -07001240 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -08001241
Austin Schuh5b728b72021-06-16 14:57:15 -07001242 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
1243
Brian Smarttea913d42021-12-10 15:02:38 -08001244 total_verified_before_ = span_reader_.TotalConsumed();
1245
Austin Schuhcde938c2020-02-02 17:30:07 -08001246 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -08001247 FLAGS_max_out_of_order > 0
1248 ? chrono::duration_cast<chrono::nanoseconds>(
1249 chrono::duration<double>(FLAGS_max_out_of_order))
1250 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -08001251
Alexei Strots58017402023-05-03 22:05:06 -07001252 VLOG(1) << "Opened " << span_reader_.filename() << " as node "
Austin Schuhcde938c2020-02-02 17:30:07 -08001253 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -08001254}
1255
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001256std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001257 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001258 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001259 if (is_corrupted()) {
1260 LOG(ERROR) << "Total corrupted volumes: before = "
1261 << total_verified_before_
1262 << " | corrupted = " << total_corrupted_
1263 << " | during = " << total_verified_during_
1264 << " | after = " << total_verified_after_ << std::endl;
1265 }
1266
1267 if (span_reader_.IsIncomplete()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001268 LOG(ERROR) << "Unable to access some messages in " << filename() << " : "
1269 << span_reader_.TotalRead() << " bytes read, "
Brian Smarttea913d42021-12-10 15:02:38 -08001270 << span_reader_.TotalConsumed() << " bytes usable."
1271 << std::endl;
1272 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001273 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -08001274 }
1275
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001276 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
Brian Smarttea913d42021-12-10 15:02:38 -08001277
1278 if (crash_on_corrupt_message_flag_) {
1279 CHECK(msg.Verify()) << "Corrupted message at offset "
Austin Schuh60e77942022-05-16 17:48:24 -07001280 << total_verified_before_ << " found within "
1281 << filename()
Brian Smarttea913d42021-12-10 15:02:38 -08001282 << "; set --nocrash_on_corrupt_message to see summary;"
1283 << " also set --ignore_corrupt_messages to process"
1284 << " anyway";
1285
1286 } else if (!msg.Verify()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001287 LOG(ERROR) << "Corrupted message at offset " << total_verified_before_
Brian Smarttea913d42021-12-10 15:02:38 -08001288 << " from " << filename() << std::endl;
1289
1290 total_corrupted_ += msg_data.size();
1291
1292 while (true) {
1293 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1294
James Kuszmaul9776b392023-01-14 14:08:08 -08001295 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001296 if (!ignore_corrupt_messages_flag_) {
1297 LOG(ERROR) << "Total corrupted volumes: before = "
1298 << total_verified_before_
1299 << " | corrupted = " << total_corrupted_
1300 << " | during = " << total_verified_during_
1301 << " | after = " << total_verified_after_ << std::endl;
1302
1303 if (span_reader_.IsIncomplete()) {
1304 LOG(ERROR) << "Unable to access some messages in " << filename()
1305 << " : " << span_reader_.TotalRead() << " bytes read, "
1306 << span_reader_.TotalConsumed() << " bytes usable."
1307 << std::endl;
1308 }
1309 return nullptr;
1310 }
1311 break;
1312 }
1313
1314 SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
1315
1316 if (!next_msg.Verify()) {
1317 total_corrupted_ += msg_data.size();
1318 total_verified_during_ += total_verified_after_;
1319 total_verified_after_ = 0;
1320
1321 } else {
1322 total_verified_after_ += msg_data.size();
1323 if (ignore_corrupt_messages_flag_) {
1324 msg = next_msg;
1325 break;
1326 }
1327 }
1328 }
1329 }
1330
1331 if (is_corrupted()) {
1332 total_verified_after_ += msg_data.size();
1333 } else {
1334 total_verified_before_ += msg_data.size();
1335 }
Austin Schuh05b70472020-01-01 17:11:17 -08001336
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001337 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -07001338
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001339 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001340
1341 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -08001342
1343 if (VLOG_IS_ON(3)) {
1344 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
1345 } else if (VLOG_IS_ON(2)) {
1346 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
1347 msg_copy.mutable_message()->clear_data();
1348 VLOG(2) << "Read from " << filename() << " data "
1349 << FlatbufferToJson(msg_copy);
1350 }
1351
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001352 return result;
1353}
1354
1355std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
1356 const MessageHeader &message) {
1357 const size_t data_size = message.has_data() ? message.data()->size() : 0;
1358
1359 UnpackedMessageHeader *const unpacked_message =
1360 reinterpret_cast<UnpackedMessageHeader *>(
1361 malloc(sizeof(UnpackedMessageHeader) + data_size +
1362 kChannelDataAlignment - 1));
1363
1364 CHECK(message.has_channel_index());
1365 CHECK(message.has_monotonic_sent_time());
1366
1367 absl::Span<uint8_t> span;
1368 if (data_size > 0) {
1369 span =
1370 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
1371 &unpacked_message->actual_data[0], data_size)),
1372 data_size);
1373 }
1374
Austin Schuh826e6ce2021-11-18 20:33:10 -08001375 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001376 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001377 monotonic_remote_time = aos::monotonic_clock::time_point(
1378 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001379 }
1380 std::optional<realtime_clock::time_point> realtime_remote_time;
1381 if (message.has_realtime_remote_time()) {
1382 realtime_remote_time = realtime_clock::time_point(
1383 chrono::nanoseconds(message.realtime_remote_time()));
1384 }
Austin Schuhb5224ec2024-03-27 15:20:09 -07001385 aos::monotonic_clock::time_point monotonic_remote_transmit_time =
1386 aos::monotonic_clock::time_point(
1387 std::chrono::nanoseconds(message.monotonic_remote_transmit_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001388
1389 std::optional<uint32_t> remote_queue_index;
1390 if (message.has_remote_queue_index()) {
1391 remote_queue_index = message.remote_queue_index();
1392 }
1393
James Kuszmaul9776b392023-01-14 14:08:08 -08001394 new (unpacked_message) UnpackedMessageHeader(
1395 message.channel_index(),
1396 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001397 chrono::nanoseconds(message.monotonic_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001398 realtime_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001399 chrono::nanoseconds(message.realtime_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001400 message.queue_index(), monotonic_remote_time, realtime_remote_time,
Austin Schuhb5224ec2024-03-27 15:20:09 -07001401 monotonic_remote_transmit_time, remote_queue_index,
James Kuszmaul9776b392023-01-14 14:08:08 -08001402 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001403 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001404 message.has_monotonic_timestamp_time(), span);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001405
1406 if (data_size > 0) {
1407 memcpy(span.data(), message.data()->data(), data_size);
1408 }
1409
1410 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
1411 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -08001412}
1413
Alexei Strots58017402023-05-03 22:05:06 -07001414SpanReader PartsMessageReader::MakeSpanReader(
1415 const LogPartsAccess &log_parts_access, size_t part_number) {
1416 const auto part = log_parts_access.GetPartAt(part_number);
1417 if (log_parts_access.log_source().has_value()) {
1418 return SpanReader(part,
1419 log_parts_access.log_source().value()->GetDecoder(part));
1420 } else {
1421 return SpanReader(part);
1422 }
1423}
1424
1425PartsMessageReader::PartsMessageReader(LogPartsAccess log_parts_access)
1426 : log_parts_access_(std::move(log_parts_access)),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001427 message_reader_(MakeSpanReader(log_parts_access_, 0)),
1428 max_out_of_order_duration_(
1429 log_parts_access_.max_out_of_order_duration()) {
Alexei Strots58017402023-05-03 22:05:06 -07001430 if (log_parts_access_.size() >= 2) {
1431 next_message_reader_.emplace(MakeSpanReader(log_parts_access_, 1));
Brian Silvermanfee16972021-09-14 12:06:38 -07001432 }
Austin Schuh48507722021-07-17 17:29:24 -07001433 ComputeBootCounts();
1434}
1435
1436void PartsMessageReader::ComputeBootCounts() {
Austin Schuh63097262023-08-16 17:04:29 -07001437 boot_counts_.assign(
1438 configuration::NodesCount(log_parts_access_.config().get()),
1439 std::nullopt);
Austin Schuh48507722021-07-17 17:29:24 -07001440
Alexei Strots58017402023-05-03 22:05:06 -07001441 const auto boots = log_parts_access_.parts().boots;
1442
Austin Schuh48507722021-07-17 17:29:24 -07001443 // We have 3 vintages of log files with different amounts of information.
1444 if (log_file_header()->has_boot_uuids()) {
1445 // The new hotness with the boots explicitly listed out. We can use the log
1446 // file header to compute the boot count of all relevant nodes.
1447 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
1448 size_t node_index = 0;
1449 for (const flatbuffers::String *boot_uuid :
1450 *log_file_header()->boot_uuids()) {
Alexei Strots58017402023-05-03 22:05:06 -07001451 CHECK(boots);
Austin Schuh48507722021-07-17 17:29:24 -07001452 if (boot_uuid->size() != 0) {
Alexei Strots58017402023-05-03 22:05:06 -07001453 auto it = boots->boot_count_map.find(boot_uuid->str());
1454 if (it != boots->boot_count_map.end()) {
Austin Schuh48507722021-07-17 17:29:24 -07001455 boot_counts_[node_index] = it->second;
1456 }
1457 } else if (parts().boots->boots[node_index].size() == 1u) {
1458 boot_counts_[node_index] = 0;
1459 }
1460 ++node_index;
1461 }
1462 } else {
1463 // Older multi-node logs which are guarenteed to have UUIDs logged, or
1464 // single node log files with boot UUIDs in the header. We only know how to
1465 // order certain boots in certain circumstances.
Austin Schuh63097262023-08-16 17:04:29 -07001466 if (configuration::MultiNode(log_parts_access_.config().get()) || boots) {
Austin Schuh48507722021-07-17 17:29:24 -07001467 for (size_t node_index = 0; node_index < boot_counts_.size();
1468 ++node_index) {
Alexei Strots58017402023-05-03 22:05:06 -07001469 if (boots->boots[node_index].size() == 1u) {
Austin Schuh48507722021-07-17 17:29:24 -07001470 boot_counts_[node_index] = 0;
1471 }
1472 }
1473 } else {
1474 // Really old single node logs without any UUIDs. They can't reboot.
1475 CHECK_EQ(boot_counts_.size(), 1u);
1476 boot_counts_[0] = 0u;
1477 }
1478 }
1479}
Austin Schuhc41603c2020-10-11 16:17:37 -07001480
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001481std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -07001482 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001483 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -07001484 message_reader_.ReadMessage();
1485 if (message) {
1486 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001487 const monotonic_clock::time_point monotonic_sent_time =
1488 message->monotonic_sent_time;
1489
1490 // TODO(austin): Does this work with startup? Might need to use the
1491 // start time.
1492 // TODO(austin): Does this work with startup when we don't know the
1493 // remote start time too? Look at one of those logs to compare.
Alexei Strots58017402023-05-03 22:05:06 -07001494 if (monotonic_sent_time > log_parts_access_.parts().monotonic_start_time +
1495 max_out_of_order_duration()) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001496 after_start_ = true;
1497 }
1498 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -08001499 CHECK_GE(monotonic_sent_time,
1500 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -08001501 << ": Max out of order of " << max_out_of_order_duration().count()
Alexei Strots58017402023-05-03 22:05:06 -07001502 << "ns exceeded. " << log_parts_access_.parts()
1503 << ", start time is "
1504 << log_parts_access_.parts().monotonic_start_time
1505 << " currently reading " << filename();
Austin Schuhb000de62020-12-03 22:00:40 -08001506 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001507 return message;
1508 }
1509 NextLog();
1510 }
Austin Schuh32f68492020-11-08 21:45:51 -08001511 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001512 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -07001513}
1514
1515void PartsMessageReader::NextLog() {
Alexei Strots58017402023-05-03 22:05:06 -07001516 if (next_part_index_ == log_parts_access_.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001517 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -07001518 done_ = true;
1519 return;
1520 }
Brian Silvermanfee16972021-09-14 12:06:38 -07001521 CHECK(next_message_reader_);
1522 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -07001523 ComputeBootCounts();
Alexei Strots58017402023-05-03 22:05:06 -07001524 if (next_part_index_ + 1 < log_parts_access_.size()) {
1525 next_message_reader_.emplace(
1526 MakeSpanReader(log_parts_access_, next_part_index_ + 1));
Brian Silvermanfee16972021-09-14 12:06:38 -07001527 } else {
1528 next_message_reader_.reset();
1529 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001530 ++next_part_index_;
1531}
1532
Austin Schuh1be0ce42020-11-29 22:43:26 -08001533bool Message::operator<(const Message &m2) const {
Austin Schuh63097262023-08-16 17:04:29 -07001534 if (this->timestamp < m2.timestamp) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001535 return true;
Austin Schuh63097262023-08-16 17:04:29 -07001536 } else if (this->timestamp > m2.timestamp) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001537 return false;
1538 }
1539
1540 if (this->channel_index < m2.channel_index) {
1541 return true;
1542 } else if (this->channel_index > m2.channel_index) {
1543 return false;
1544 }
1545
1546 return this->queue_index < m2.queue_index;
1547}
1548
1549bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001550bool Message::operator==(const Message &m2) const {
Austin Schuh63097262023-08-16 17:04:29 -07001551 return timestamp == m2.timestamp && channel_index == m2.channel_index &&
1552 queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001553}
Austin Schuh1be0ce42020-11-29 22:43:26 -08001554
Austin Schuh63097262023-08-16 17:04:29 -07001555bool Message::operator<=(const Message &m2) const {
1556 return *this == m2 || *this < m2;
1557}
1558
1559std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &msg) {
1560 os << "{.channel_index=" << msg.channel_index
1561 << ", .monotonic_sent_time=" << msg.monotonic_sent_time
1562 << ", .realtime_sent_time=" << msg.realtime_sent_time
1563 << ", .queue_index=" << msg.queue_index;
1564 if (msg.monotonic_remote_time) {
1565 os << ", .monotonic_remote_time=" << *msg.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001566 }
1567 os << ", .realtime_remote_time=";
Austin Schuh63097262023-08-16 17:04:29 -07001568 PrintOptionalOrNull(&os, msg.realtime_remote_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001569 os << ", .remote_queue_index=";
Austin Schuh63097262023-08-16 17:04:29 -07001570 PrintOptionalOrNull(&os, msg.remote_queue_index);
1571 if (msg.has_monotonic_timestamp_time) {
1572 os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001573 }
Austin Schuh22cf7862022-09-19 19:09:42 -07001574 os << "}";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001575 return os;
1576}
1577
Austin Schuh63097262023-08-16 17:04:29 -07001578std::ostream &operator<<(std::ostream &os, const Message &msg) {
1579 os << "{.channel_index=" << msg.channel_index
1580 << ", .queue_index=" << msg.queue_index
1581 << ", .timestamp=" << msg.timestamp;
Austin Schuh3c9f92c2024-04-30 17:56:42 -07001582 if (msg.header != nullptr) {
1583 if (msg.header->remote_queue_index.has_value()) {
1584 os << ", .remote_queue_index=" << *msg.header->remote_queue_index;
Austin Schuh826e6ce2021-11-18 20:33:10 -08001585 }
Austin Schuh3c9f92c2024-04-30 17:56:42 -07001586 if (msg.header->monotonic_remote_time.has_value()) {
1587 os << ", .monotonic_remote_time=" << *msg.header->monotonic_remote_time;
Austin Schuh826e6ce2021-11-18 20:33:10 -08001588 }
Austin Schuh3c9f92c2024-04-30 17:56:42 -07001589 os << ", .header=" << msg.header;
Austin Schuhd2f96102020-12-01 20:27:29 -08001590 }
1591 os << "}";
1592 return os;
1593}
1594
Austin Schuh63097262023-08-16 17:04:29 -07001595std::ostream &operator<<(std::ostream &os, const TimestampedMessage &msg) {
1596 os << "{.channel_index=" << msg.channel_index
1597 << ", .queue_index=" << msg.queue_index
1598 << ", .monotonic_event_time=" << msg.monotonic_event_time
1599 << ", .realtime_event_time=" << msg.realtime_event_time;
1600 if (msg.remote_queue_index != BootQueueIndex::Invalid()) {
1601 os << ", .remote_queue_index=" << msg.remote_queue_index;
Austin Schuhd2f96102020-12-01 20:27:29 -08001602 }
Austin Schuh63097262023-08-16 17:04:29 -07001603 if (msg.monotonic_remote_time != BootTimestamp::min_time()) {
1604 os << ", .monotonic_remote_time=" << msg.monotonic_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001605 }
Austin Schuh63097262023-08-16 17:04:29 -07001606 if (msg.realtime_remote_time != realtime_clock::min_time) {
1607 os << ", .realtime_remote_time=" << msg.realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001608 }
Austin Schuhb5224ec2024-03-27 15:20:09 -07001609 if (msg.monotonic_remote_transmit_time != BootTimestamp::min_time()) {
1610 os << ", .monotonic_remote_transmit_time="
1611 << msg.monotonic_remote_transmit_time;
1612 }
Austin Schuh63097262023-08-16 17:04:29 -07001613 if (msg.monotonic_timestamp_time != BootTimestamp::min_time()) {
1614 os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001615 }
Austin Schuh63097262023-08-16 17:04:29 -07001616 if (msg.data != nullptr) {
Austin Schuh3c9f92c2024-04-30 17:56:42 -07001617 os << ", .data=" << msg.data.get();
Austin Schuh22cf7862022-09-19 19:09:42 -07001618 } else {
1619 os << ", .data=nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08001620 }
1621 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -08001622 return os;
1623}
1624
Alexei Strots58017402023-05-03 22:05:06 -07001625MessageSorter::MessageSorter(const LogPartsAccess log_parts_access)
1626 : parts_message_reader_(log_parts_access),
Austin Schuh48507722021-07-17 17:29:24 -07001627 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
1628}
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001629
Adam Snaider13d48d92023-08-03 12:20:15 -07001630const Message *MessageSorter::Front() {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001631 // Queue up data until enough data has been queued that the front message is
1632 // sorted enough to be safe to pop. This may do nothing, so we should make
1633 // sure the nothing path is checked quickly.
1634 if (sorted_until() != monotonic_clock::max_time) {
1635 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -07001636 if (!messages_.empty() &&
1637 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -08001638 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001639 break;
1640 }
1641
Austin Schuh63097262023-08-16 17:04:29 -07001642 std::shared_ptr<UnpackedMessageHeader> msg =
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001643 parts_message_reader_.ReadMessage();
1644 // No data left, sorted forever, work through what is left.
Austin Schuh63097262023-08-16 17:04:29 -07001645 if (!msg) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001646 sorted_until_ = monotonic_clock::max_time;
1647 break;
1648 }
1649
Austin Schuh48507722021-07-17 17:29:24 -07001650 size_t monotonic_timestamp_boot = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001651 if (msg->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -07001652 monotonic_timestamp_boot = parts().logger_boot_count;
1653 }
1654 size_t monotonic_remote_boot = 0xffffff;
1655
Austin Schuh63097262023-08-16 17:04:29 -07001656 if (msg->monotonic_remote_time.has_value()) {
Maxwell Gumley8c1b87f2024-02-13 17:54:52 -07001657 CHECK_LT(msg->channel_index, source_node_index_.size());
Austin Schuh63097262023-08-16 17:04:29 -07001658 const Node *node = parts().config->nodes()->Get(
1659 source_node_index_[msg->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001660
Austin Schuh48507722021-07-17 17:29:24 -07001661 std::optional<size_t> boot = parts_message_reader_.boot_count(
Austin Schuh63097262023-08-16 17:04:29 -07001662 source_node_index_[msg->channel_index]);
Alexei Strots036d84e2023-05-03 16:05:12 -07001663 CHECK(boot) << ": Failed to find boot for node '" << MaybeNodeName(node)
Austin Schuh63097262023-08-16 17:04:29 -07001664 << "', with index "
1665 << source_node_index_[msg->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -07001666 monotonic_remote_boot = *boot;
1667 }
1668
Austin Schuh3c9f92c2024-04-30 17:56:42 -07001669 std::shared_ptr<SharedSpan> data =
1670 std::make_shared<SharedSpan>(msg, &msg->span);
1671
1672 messages_.insert(Message{
1673 .channel_index = msg->channel_index,
1674 .queue_index = BootQueueIndex{.boot = parts().boot_count,
1675 .index = msg->queue_index},
1676 .timestamp = BootTimestamp{.boot = parts().boot_count,
1677 .time = msg->monotonic_sent_time},
1678 .monotonic_remote_boot = monotonic_remote_boot,
1679 .monotonic_timestamp_boot = monotonic_timestamp_boot,
1680 .header = std::move(msg),
1681 .data = std::move(data),
1682 });
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001683
1684 // Now, update sorted_until_ to match the new message.
1685 if (parts_message_reader_.newest_timestamp() >
1686 monotonic_clock::min_time +
1687 parts_message_reader_.max_out_of_order_duration()) {
1688 sorted_until_ = parts_message_reader_.newest_timestamp() -
1689 parts_message_reader_.max_out_of_order_duration();
1690 } else {
1691 sorted_until_ = monotonic_clock::min_time;
1692 }
1693 }
1694 }
1695
1696 // Now that we have enough data queued, return a pointer to the oldest piece
1697 // of data if it exists.
1698 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -08001699 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001700 return nullptr;
1701 }
1702
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001703 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -08001704 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001705 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh63097262023-08-16 17:04:29 -07001706 VLOG(1) << this << " Front, sorted until " << sorted_until_ << " for "
1707 << (*messages_.begin()) << " on " << parts_message_reader_.filename();
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001708 return &(*messages_.begin());
1709}
1710
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001711void MessageSorter::PopFront() { messages_.erase(messages_.begin()); }
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001712
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001713std::string MessageSorter::DebugString() const {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001714 std::stringstream ss;
1715 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001716 int count = 0;
1717 bool no_dots = true;
Austin Schuh63097262023-08-16 17:04:29 -07001718 for (const Message &msg : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001719 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
Austin Schuh63097262023-08-16 17:04:29 -07001720 ss << msg << "\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001721 } else if (no_dots) {
1722 ss << "...\n";
1723 no_dots = false;
1724 }
1725 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001726 }
1727 ss << "] <- " << parts_message_reader_.filename();
1728 return ss.str();
1729}
1730
Austin Schuh63097262023-08-16 17:04:29 -07001731// Class to merge start times cleanly, reusably, and incrementally.
1732class StartTimes {
1733 public:
1734 void Update(monotonic_clock::time_point new_monotonic_start_time,
1735 realtime_clock::time_point new_realtime_start_time) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001736 // We want to capture the earliest meaningful start time here. The start
1737 // time defaults to min_time when there's no meaningful value to report, so
1738 // let's ignore those.
Austin Schuh63097262023-08-16 17:04:29 -07001739 if (new_monotonic_start_time != monotonic_clock::min_time) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001740 bool accept = false;
1741 // We want to prioritize start times from the logger node. Really, we
1742 // want to prioritize start times with a valid realtime_clock time. So,
1743 // if we have a start time without a RT clock, prefer a start time with a
1744 // RT clock, even it if is later.
Austin Schuh63097262023-08-16 17:04:29 -07001745 if (new_realtime_start_time != realtime_clock::min_time) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001746 // We've got a good one. See if the current start time has a good RT
1747 // clock, or if we should use this one instead.
Austin Schuh63097262023-08-16 17:04:29 -07001748 if (new_monotonic_start_time < monotonic_start_time_ ||
1749 monotonic_start_time_ == monotonic_clock::min_time) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001750 accept = true;
1751 } else if (realtime_start_time_ == realtime_clock::min_time) {
1752 // The previous start time doesn't have a good RT time, so it is very
1753 // likely the start time from a remote part file. We just found a
1754 // better start time with a real RT time, so switch to that instead.
1755 accept = true;
1756 }
1757 } else if (realtime_start_time_ == realtime_clock::min_time) {
1758 // We don't have a RT time, so take the oldest.
Austin Schuh63097262023-08-16 17:04:29 -07001759 if (new_monotonic_start_time < monotonic_start_time_ ||
1760 monotonic_start_time_ == monotonic_clock::min_time) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001761 accept = true;
1762 }
1763 }
1764
1765 if (accept) {
Austin Schuh63097262023-08-16 17:04:29 -07001766 monotonic_start_time_ = new_monotonic_start_time;
1767 realtime_start_time_ = new_realtime_start_time;
Austin Schuh9dc42612021-09-20 20:41:29 -07001768 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001769 }
1770 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001771
Austin Schuh63097262023-08-16 17:04:29 -07001772 monotonic_clock::time_point monotonic_start_time() const {
1773 return monotonic_start_time_;
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001774 }
Austin Schuh63097262023-08-16 17:04:29 -07001775 realtime_clock::time_point realtime_start_time() const {
1776 return realtime_start_time_;
1777 }
1778
1779 private:
1780 monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::min_time;
1781 realtime_clock::time_point realtime_start_time_ = realtime_clock::min_time;
1782};
1783
1784PartsMerger::PartsMerger(SelectedLogParts &&parts) {
1785 node_ = configuration::GetNodeIndex(parts.config().get(), parts.node_name());
1786
1787 for (LogPartsAccess part : parts) {
1788 message_sorters_.emplace_back(std::move(part));
1789 }
1790
1791 StartTimes start_times;
1792 for (const MessageSorter &message_sorter : message_sorters_) {
1793 start_times.Update(message_sorter.monotonic_start_time(),
1794 message_sorter.realtime_start_time());
1795 }
1796 monotonic_start_time_ = start_times.monotonic_start_time();
1797 realtime_start_time_ = start_times.realtime_start_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08001798}
Austin Schuh8f52ed52020-11-30 23:12:39 -08001799
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001800std::vector<const LogParts *> PartsMerger::Parts() const {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001801 std::vector<const LogParts *> p;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001802 p.reserve(message_sorters_.size());
1803 for (const MessageSorter &message_sorter : message_sorters_) {
1804 p.emplace_back(&message_sorter.parts());
Austin Schuh0ca51f32020-12-25 21:51:45 -08001805 }
1806 return p;
1807}
1808
Adam Snaider13d48d92023-08-03 12:20:15 -07001809const Message *PartsMerger::Front() {
Austin Schuh8f52ed52020-11-30 23:12:39 -08001810 // Return the current Front if we have one, otherwise go compute one.
1811 if (current_ != nullptr) {
Adam Snaider13d48d92023-08-03 12:20:15 -07001812 const Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001813 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuh63097262023-08-16 17:04:29 -07001814 VLOG(1) << this << " PartsMerger::Front for node " << node_name() << " "
1815 << *result;
Austin Schuhb000de62020-12-03 22:00:40 -08001816 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001817 }
1818
1819 // Otherwise, do a simple search for the oldest message, deduplicating any
1820 // duplicates.
Adam Snaider13d48d92023-08-03 12:20:15 -07001821 const Message *oldest = nullptr;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001822 sorted_until_ = monotonic_clock::max_time;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001823 for (MessageSorter &message_sorter : message_sorters_) {
Adam Snaider13d48d92023-08-03 12:20:15 -07001824 const Message *msg = message_sorter.Front();
Austin Schuh63097262023-08-16 17:04:29 -07001825 if (!msg) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001826 sorted_until_ = std::min(sorted_until_, message_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001827 continue;
1828 }
Austin Schuh63097262023-08-16 17:04:29 -07001829 if (oldest == nullptr || *msg < *oldest) {
1830 oldest = msg;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001831 current_ = &message_sorter;
Austin Schuh63097262023-08-16 17:04:29 -07001832 } else if (*msg == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001833 // Found a duplicate. If there is a choice, we want the one which has
1834 // the timestamp time.
Austin Schuh3c9f92c2024-04-30 17:56:42 -07001835 if (!msg->header->has_monotonic_timestamp_time) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001836 message_sorter.PopFront();
Austin Schuh3c9f92c2024-04-30 17:56:42 -07001837 } else if (!oldest->header->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001838 current_->PopFront();
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001839 current_ = &message_sorter;
Austin Schuh63097262023-08-16 17:04:29 -07001840 oldest = msg;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001841 } else {
Austin Schuh3c9f92c2024-04-30 17:56:42 -07001842 CHECK_EQ(msg->header->monotonic_timestamp_time,
1843 oldest->header->monotonic_timestamp_time);
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001844 message_sorter.PopFront();
Austin Schuh8bf1e632021-01-02 22:41:04 -08001845 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001846 }
1847
1848 // PopFront may change this, so compute it down here.
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001849 sorted_until_ = std::min(sorted_until_, message_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001850 }
1851
Austin Schuhb000de62020-12-03 22:00:40 -08001852 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001853 CHECK_GE(oldest->timestamp.time, last_message_time_);
1854 last_message_time_ = oldest->timestamp.time;
Austin Schuh63097262023-08-16 17:04:29 -07001855 if (monotonic_oldest_time_ > oldest->timestamp.time) {
1856 VLOG(1) << this << " Updating oldest to " << oldest->timestamp.time
1857 << " for node " << node_name() << " with a start time of "
1858 << monotonic_start_time_ << " " << *oldest;
1859 }
Austin Schuh5dd22842021-11-17 16:09:39 -08001860 monotonic_oldest_time_ =
1861 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08001862 } else {
1863 last_message_time_ = monotonic_clock::max_time;
1864 }
1865
Austin Schuh8f52ed52020-11-30 23:12:39 -08001866 // Return the oldest message found. This will be nullptr if nothing was
1867 // found, indicating there is nothing left.
Austin Schuh63097262023-08-16 17:04:29 -07001868 if (oldest) {
1869 VLOG(1) << this << " PartsMerger::Front for node " << node_name() << " "
1870 << *oldest;
1871 } else {
1872 VLOG(1) << this << " PartsMerger::Front for node " << node_name();
1873 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001874 return oldest;
1875}
1876
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001877void PartsMerger::PopFront() {
Austin Schuh8f52ed52020-11-30 23:12:39 -08001878 CHECK(current_ != nullptr) << "Popping before calling Front()";
1879 current_->PopFront();
1880 current_ = nullptr;
1881}
1882
Alexei Strots1f51ac72023-05-15 10:14:54 -07001883BootMerger::BootMerger(std::string_view node_name,
Austin Schuh63097262023-08-16 17:04:29 -07001884 const LogFilesContainer &log_files,
1885 const std::vector<StoredDataType> &types)
1886 : configuration_(log_files.config()),
1887 node_(configuration::GetNodeIndex(configuration_.get(), node_name)) {
Alexei Strots1f51ac72023-05-15 10:14:54 -07001888 size_t number_of_boots = log_files.BootsForNode(node_name);
1889 parts_mergers_.reserve(number_of_boots);
1890 for (size_t i = 0; i < number_of_boots; ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07001891 VLOG(2) << "Boot " << i;
Austin Schuh63097262023-08-16 17:04:29 -07001892 SelectedLogParts selected_parts =
1893 log_files.SelectParts(node_name, i, types);
1894 // We are guarenteed to have something each boot, but not guarenteed to have
1895 // both timestamps and data for each boot. If we don't have anything, don't
1896 // create a parts merger. The rest of this class will detect that and
1897 // ignore it as required.
1898 if (selected_parts.empty()) {
1899 parts_mergers_.emplace_back(nullptr);
1900 } else {
1901 parts_mergers_.emplace_back(
1902 std::make_unique<PartsMerger>(std::move(selected_parts)));
1903 }
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001904 }
1905}
1906
Austin Schuh63097262023-08-16 17:04:29 -07001907std::string_view BootMerger::node_name() const {
1908 return configuration::NodeName(configuration().get(), node());
1909}
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001910
Adam Snaider13d48d92023-08-03 12:20:15 -07001911const Message *BootMerger::Front() {
Austin Schuh63097262023-08-16 17:04:29 -07001912 if (parts_mergers_[index_].get() != nullptr) {
Adam Snaider13d48d92023-08-03 12:20:15 -07001913 const Message *result = parts_mergers_[index_]->Front();
Austin Schuh63097262023-08-16 17:04:29 -07001914
1915 if (result != nullptr) {
1916 VLOG(1) << this << " BootMerger::Front " << node_name() << " " << *result;
1917 return result;
1918 }
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001919 }
1920
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001921 if (index_ + 1u == parts_mergers_.size()) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001922 // At the end of the last node merger, just return.
Austin Schuh63097262023-08-16 17:04:29 -07001923 VLOG(1) << this << " BootMerger::Front " << node_name() << " nullptr";
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001924 return nullptr;
1925 } else {
1926 ++index_;
Adam Snaider13d48d92023-08-03 12:20:15 -07001927 const Message *result = Front();
Austin Schuh63097262023-08-16 17:04:29 -07001928 VLOG(1) << this << " BootMerger::Front " << node_name() << " " << *result;
1929 return result;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001930 }
1931}
1932
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001933void BootMerger::PopFront() { parts_mergers_[index_]->PopFront(); }
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001934
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001935std::vector<const LogParts *> BootMerger::Parts() const {
1936 std::vector<const LogParts *> results;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001937 for (const std::unique_ptr<PartsMerger> &parts_merger : parts_mergers_) {
Austin Schuh63097262023-08-16 17:04:29 -07001938 if (!parts_merger) continue;
1939
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001940 std::vector<const LogParts *> node_parts = parts_merger->Parts();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001941
1942 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
1943 std::make_move_iterator(node_parts.end()));
1944 }
1945
1946 return results;
1947}
1948
Austin Schuh63097262023-08-16 17:04:29 -07001949monotonic_clock::time_point BootMerger::monotonic_start_time(
1950 size_t boot) const {
1951 CHECK_LT(boot, parts_mergers_.size());
1952 if (parts_mergers_[boot]) {
1953 return parts_mergers_[boot]->monotonic_start_time();
Austin Schuh0ca51f32020-12-25 21:51:45 -08001954 }
Austin Schuh63097262023-08-16 17:04:29 -07001955 return monotonic_clock::min_time;
1956}
1957
1958realtime_clock::time_point BootMerger::realtime_start_time(size_t boot) const {
1959 CHECK_LT(boot, parts_mergers_.size());
1960 if (parts_mergers_[boot]) {
1961 return parts_mergers_[boot]->realtime_start_time();
1962 }
1963 return realtime_clock::min_time;
1964}
1965
1966monotonic_clock::time_point BootMerger::monotonic_oldest_time(
1967 size_t boot) const {
1968 CHECK_LT(boot, parts_mergers_.size());
1969 if (parts_mergers_[boot]) {
1970 return parts_mergers_[boot]->monotonic_oldest_time();
1971 }
1972 return monotonic_clock::max_time;
1973}
1974
1975bool BootMerger::started() const {
1976 if (index_ == 0) {
1977 if (!parts_mergers_[0]) {
1978 return false;
1979 }
1980 return parts_mergers_[index_]->sorted_until() != monotonic_clock::min_time;
1981 }
1982 return true;
1983}
1984
1985SplitTimestampBootMerger::SplitTimestampBootMerger(
1986 std::string_view node_name, const LogFilesContainer &log_files,
1987 TimestampQueueStrategy timestamp_queue_strategy)
1988 : boot_merger_(node_name, log_files,
1989 (timestamp_queue_strategy ==
1990 TimestampQueueStrategy::kQueueTimestampsAtStartup)
1991 ? std::vector<StoredDataType>{StoredDataType::DATA}
1992 : std::vector<StoredDataType>{
1993 StoredDataType::DATA, StoredDataType::TIMESTAMPS,
1994 StoredDataType::REMOTE_TIMESTAMPS}) {
1995 // Make the timestamp_boot_merger_ only if we are asked to, and if there are
1996 // files to put in it. We don't need it for a data only log.
1997 if (timestamp_queue_strategy ==
1998 TimestampQueueStrategy::kQueueTimestampsAtStartup &&
1999 log_files.HasTimestamps(node_name)) {
2000 timestamp_boot_merger_ = std::make_unique<BootMerger>(
2001 node_name, log_files,
2002 std::vector<StoredDataType>{StoredDataType::TIMESTAMPS,
2003 StoredDataType::REMOTE_TIMESTAMPS});
2004 }
2005
2006 size_t number_of_boots = log_files.BootsForNode(node_name);
2007 monotonic_start_time_.reserve(number_of_boots);
2008 realtime_start_time_.reserve(number_of_boots);
2009
2010 // Start times are split across the timestamp boot merger, and data boot
2011 // merger. Pull from both and combine them to get the same answer as before.
2012 for (size_t i = 0u; i < number_of_boots; ++i) {
2013 StartTimes start_times;
2014
2015 if (timestamp_boot_merger_) {
2016 start_times.Update(timestamp_boot_merger_->monotonic_start_time(i),
2017 timestamp_boot_merger_->realtime_start_time(i));
2018 }
2019
2020 start_times.Update(boot_merger_.monotonic_start_time(i),
2021 boot_merger_.realtime_start_time(i));
2022
2023 monotonic_start_time_.push_back(start_times.monotonic_start_time());
2024 realtime_start_time_.push_back(start_times.realtime_start_time());
2025 }
2026}
2027
2028void SplitTimestampBootMerger::QueueTimestamps(
2029 std::function<void(TimestampedMessage *)> fn,
2030 const std::vector<size_t> &source_node) {
2031 if (!timestamp_boot_merger_) {
2032 return;
2033 }
2034
2035 while (true) {
2036 // Load all the timestamps. If we find data, ignore it and drop it on the
2037 // floor. It will be read when boot_merger_ is used.
Adam Snaider13d48d92023-08-03 12:20:15 -07002038 const Message *msg = timestamp_boot_merger_->Front();
Austin Schuh63097262023-08-16 17:04:29 -07002039 if (!msg) {
2040 queue_timestamps_ran_ = true;
2041 return;
2042 }
Philipp Schrader416505b2024-03-28 11:59:45 -07002043 CHECK_LT(msg->channel_index, source_node.size());
Austin Schuh63097262023-08-16 17:04:29 -07002044 if (source_node[msg->channel_index] != static_cast<size_t>(node())) {
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002045 TimestampedMessage timestamped_message{
Austin Schuh63097262023-08-16 17:04:29 -07002046 .channel_index = msg->channel_index,
2047 .queue_index = msg->queue_index,
2048 .monotonic_event_time = msg->timestamp,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002049 .realtime_event_time = msg->header->realtime_sent_time,
Austin Schuh63097262023-08-16 17:04:29 -07002050 .remote_queue_index =
2051 BootQueueIndex{.boot = msg->monotonic_remote_boot,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002052 .index = msg->header->remote_queue_index.value()},
Austin Schuh63097262023-08-16 17:04:29 -07002053 .monotonic_remote_time = {msg->monotonic_remote_boot,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002054 msg->header->monotonic_remote_time.value()},
2055 .realtime_remote_time = msg->header->realtime_remote_time.value(),
Austin Schuhb5224ec2024-03-27 15:20:09 -07002056 .monotonic_remote_transmit_time =
2057 {msg->monotonic_remote_boot,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002058 msg->header->monotonic_remote_transmit_time},
Austin Schuh63097262023-08-16 17:04:29 -07002059 .monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002060 msg->header->monotonic_timestamp_time},
2061 .data = msg->data,
2062 };
Austin Schuh63097262023-08-16 17:04:29 -07002063
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002064 fn(&timestamped_message);
2065
2066 VLOG(2) << this << " Queued timestamp of " << timestamped_message;
2067
2068 timestamp_messages_.emplace_back(std::move(*msg));
Austin Schuh63097262023-08-16 17:04:29 -07002069 } else {
2070 VLOG(2) << this << " Dropped data";
2071 }
2072 timestamp_boot_merger_->PopFront();
2073 }
2074
2075 // TODO(austin): Push the queue into TimestampMapper instead. Have it pull
2076 // all the timestamps. That will also make it so we don't have to clear the
2077 // function.
2078}
2079
2080std::string_view SplitTimestampBootMerger::node_name() const {
2081 return configuration::NodeName(configuration().get(), node());
2082}
2083
2084monotonic_clock::time_point SplitTimestampBootMerger::monotonic_start_time(
2085 size_t boot) const {
2086 CHECK_LT(boot, monotonic_start_time_.size());
2087 return monotonic_start_time_[boot];
2088}
2089
2090realtime_clock::time_point SplitTimestampBootMerger::realtime_start_time(
2091 size_t boot) const {
2092 CHECK_LT(boot, realtime_start_time_.size());
2093 return realtime_start_time_[boot];
2094}
2095
2096monotonic_clock::time_point SplitTimestampBootMerger::monotonic_oldest_time(
2097 size_t boot) const {
2098 if (!timestamp_boot_merger_) {
2099 return boot_merger_.monotonic_oldest_time(boot);
2100 }
2101 return std::min(boot_merger_.monotonic_oldest_time(boot),
2102 timestamp_boot_merger_->monotonic_oldest_time(boot));
2103}
2104
Adam Snaider13d48d92023-08-03 12:20:15 -07002105const Message *SplitTimestampBootMerger::Front() {
2106 const Message *boot_merger_front = boot_merger_.Front();
Austin Schuh63097262023-08-16 17:04:29 -07002107
2108 if (timestamp_boot_merger_) {
2109 CHECK(queue_timestamps_ran_);
2110 }
2111
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002112 const Message *timestamp_messages_front = nullptr;
2113 if (!timestamp_messages_.empty()) {
2114 timestamp_messages_front = &timestamp_messages_.front();
Austin Schuh63097262023-08-16 17:04:29 -07002115 }
2116
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002117 if (!timestamp_messages_front) {
Austin Schuh63097262023-08-16 17:04:29 -07002118 message_source_ = MessageSource::kBootMerger;
2119 if (boot_merger_front != nullptr) {
2120 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2121 << " " << *boot_merger_front;
2122 } else {
2123 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2124 << " nullptr";
2125 }
2126 return boot_merger_front;
2127 }
2128
2129 if (boot_merger_front == nullptr) {
2130 message_source_ = MessageSource::kTimestampMessage;
2131
2132 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002133 << *timestamp_messages_front;
2134 return timestamp_messages_front;
Austin Schuh63097262023-08-16 17:04:29 -07002135 }
2136
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002137 if (*boot_merger_front <= *timestamp_messages_front) {
2138 if (*boot_merger_front == *timestamp_messages_front) {
Austin Schuh63097262023-08-16 17:04:29 -07002139 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2140 << " Dropping duplicate timestamp.";
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002141 timestamp_messages_.pop_front();
Austin Schuh63097262023-08-16 17:04:29 -07002142 }
2143 message_source_ = MessageSource::kBootMerger;
2144 if (boot_merger_front != nullptr) {
2145 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2146 << " " << *boot_merger_front;
2147 } else {
2148 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2149 << " nullptr";
2150 }
2151 return boot_merger_front;
2152 } else {
2153 message_source_ = MessageSource::kTimestampMessage;
2154 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002155 << *timestamp_messages_front;
2156 return timestamp_messages_front;
Austin Schuh63097262023-08-16 17:04:29 -07002157 }
2158}
2159
2160void SplitTimestampBootMerger::PopFront() {
2161 switch (message_source_) {
2162 case MessageSource::kTimestampMessage:
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002163 CHECK(!timestamp_messages_.empty());
2164 timestamp_messages_.pop_front();
Austin Schuh63097262023-08-16 17:04:29 -07002165 break;
2166 case MessageSource::kBootMerger:
2167 boot_merger_.PopFront();
2168 break;
2169 }
2170}
2171
2172TimestampMapper::TimestampMapper(
2173 std::string_view node_name, const LogFilesContainer &log_files,
2174 TimestampQueueStrategy timestamp_queue_strategy)
2175 : boot_merger_(node_name, log_files, timestamp_queue_strategy),
2176 timestamp_callback_([](TimestampedMessage *) {}) {
2177 configuration_ = boot_merger_.configuration();
2178
Austin Schuh0ca51f32020-12-25 21:51:45 -08002179 const Configuration *config = configuration_.get();
Alexei Strots1f51ac72023-05-15 10:14:54 -07002180 // Only fill out nodes_data_ if there are nodes. Otherwise, everything is
Austin Schuhd2f96102020-12-01 20:27:29 -08002181 // pretty simple.
2182 if (configuration::MultiNode(config)) {
2183 nodes_data_.resize(config->nodes()->size());
2184 const Node *my_node = config->nodes()->Get(node());
2185 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
2186 const Node *node = config->nodes()->Get(node_index);
2187 NodeData *node_data = &nodes_data_[node_index];
2188 node_data->channels.resize(config->channels()->size());
2189 // We should save the channel if it is delivered to the node represented
2190 // by the NodeData, but not sent by that node. That combo means it is
2191 // forwarded.
2192 size_t channel_index = 0;
2193 node_data->any_delivered = false;
2194 for (const Channel *channel : *config->channels()) {
2195 node_data->channels[channel_index].delivered =
2196 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08002197 configuration::ChannelIsSendableOnNode(channel, my_node) &&
2198 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08002199 node_data->any_delivered = node_data->any_delivered ||
2200 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08002201 if (node_data->channels[channel_index].delivered) {
2202 const Connection *connection =
2203 configuration::ConnectionToNode(channel, node);
2204 node_data->channels[channel_index].time_to_live =
2205 chrono::nanoseconds(connection->time_to_live());
2206 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002207 ++channel_index;
2208 }
2209 }
2210
2211 for (const Channel *channel : *config->channels()) {
2212 source_node_.emplace_back(configuration::GetNodeIndex(
2213 config, channel->source_node()->string_view()));
2214 }
Philipp Schrader416505b2024-03-28 11:59:45 -07002215 } else {
2216 // The node index for single-node logs is always 0.
2217 source_node_.resize(config->channels()->size(), 0);
Austin Schuhd2f96102020-12-01 20:27:29 -08002218 }
2219}
2220
2221void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002222 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08002223 CHECK_NE(timestamp_mapper->node(), node());
2224 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
2225
2226 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002227 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08002228 // we could needlessly save data.
2229 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08002230 VLOG(1) << "Registering on node " << node() << " for peer node "
2231 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08002232 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
2233
2234 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07002235
2236 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08002237 }
2238}
2239
Adam Snaider13d48d92023-08-03 12:20:15 -07002240void TimestampMapper::QueueMessage(const Message *msg) {
Austin Schuhb5224ec2024-03-27 15:20:09 -07002241 matched_messages_.emplace_back(TimestampedMessage{
2242 .channel_index = msg->channel_index,
2243 .queue_index = msg->queue_index,
2244 .monotonic_event_time = msg->timestamp,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002245 .realtime_event_time = msg->header->realtime_sent_time,
Austin Schuhb5224ec2024-03-27 15:20:09 -07002246 .remote_queue_index = BootQueueIndex::Invalid(),
2247 .monotonic_remote_time = BootTimestamp::min_time(),
2248 .realtime_remote_time = realtime_clock::min_time,
2249 .monotonic_remote_transmit_time = BootTimestamp::min_time(),
2250 .monotonic_timestamp_time = BootTimestamp::min_time(),
2251 .data = std::move(msg->data)});
Austin Schuh63097262023-08-16 17:04:29 -07002252 VLOG(1) << node_name() << " Inserted " << matched_messages_.back();
Austin Schuhd2f96102020-12-01 20:27:29 -08002253}
2254
2255TimestampedMessage *TimestampMapper::Front() {
2256 // No need to fetch anything new. A previous message still exists.
2257 switch (first_message_) {
2258 case FirstMessage::kNeedsUpdate:
2259 break;
2260 case FirstMessage::kInMessage:
Austin Schuh63097262023-08-16 17:04:29 -07002261 VLOG(1) << this << " TimestampMapper::Front " << node_name() << " "
2262 << matched_messages_.front();
Austin Schuh79b30942021-01-24 22:32:21 -08002263 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002264 case FirstMessage::kNullptr:
Austin Schuh63097262023-08-16 17:04:29 -07002265 VLOG(1) << this << " TimestampMapper::Front " << node_name()
2266 << " nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08002267 return nullptr;
2268 }
2269
Austin Schuh79b30942021-01-24 22:32:21 -08002270 if (matched_messages_.empty()) {
2271 if (!QueueMatched()) {
2272 first_message_ = FirstMessage::kNullptr;
Austin Schuh63097262023-08-16 17:04:29 -07002273 VLOG(1) << this << " TimestampMapper::Front " << node_name()
2274 << " nullptr";
Austin Schuh79b30942021-01-24 22:32:21 -08002275 return nullptr;
2276 }
2277 }
2278 first_message_ = FirstMessage::kInMessage;
Austin Schuh63097262023-08-16 17:04:29 -07002279 VLOG(1) << this << " TimestampMapper::Front " << node_name() << " "
2280 << matched_messages_.front();
Austin Schuh79b30942021-01-24 22:32:21 -08002281 return &matched_messages_.front();
2282}
2283
2284bool TimestampMapper::QueueMatched() {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002285 MatchResult result = MatchResult::kEndOfFile;
2286 do {
2287 result = MaybeQueueMatched();
2288 } while (result == MatchResult::kSkipped);
2289 return result == MatchResult::kQueued;
2290}
2291
2292bool TimestampMapper::CheckReplayChannelsAndMaybePop(
2293 const TimestampedMessage & /*message*/) {
2294 if (replay_channels_callback_ &&
2295 !replay_channels_callback_(matched_messages_.back())) {
Austin Schuh63097262023-08-16 17:04:29 -07002296 VLOG(1) << node_name() << " Popped " << matched_messages_.back();
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002297 matched_messages_.pop_back();
2298 return true;
2299 }
2300 return false;
2301}
2302
2303TimestampMapper::MatchResult TimestampMapper::MaybeQueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08002304 if (nodes_data_.empty()) {
2305 // Simple path. We are single node, so there are no timestamps to match!
2306 CHECK_EQ(messages_.size(), 0u);
Adam Snaider13d48d92023-08-03 12:20:15 -07002307 const Message *msg = boot_merger_.Front();
Austin Schuh63097262023-08-16 17:04:29 -07002308 if (!msg) {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002309 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002310 }
Austin Schuh79b30942021-01-24 22:32:21 -08002311 // Enqueue this message into matched_messages_ so we have a place to
2312 // associate remote timestamps, and return it.
Austin Schuh63097262023-08-16 17:04:29 -07002313 QueueMessage(msg);
Austin Schuhd2f96102020-12-01 20:27:29 -08002314
Austin Schuh63097262023-08-16 17:04:29 -07002315 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
2316 << " on " << node_name();
Austin Schuh79b30942021-01-24 22:32:21 -08002317 last_message_time_ = matched_messages_.back().monotonic_event_time;
2318
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002319 // We are thin wrapper around parts_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002320 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08002321 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002322 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2323 return MatchResult::kSkipped;
2324 }
2325 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002326 }
2327
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002328 // We need to only add messages to the list so they get processed for
2329 // messages which are delivered. Reuse the flow below which uses messages_
2330 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08002331 if (messages_.empty()) {
2332 if (!Queue()) {
2333 // Found nothing to add, we are out of data!
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002334 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002335 }
2336
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002337 // Now that it has been added (and cannibalized), forget about it
2338 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002339 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002340 }
2341
Austin Schuh63097262023-08-16 17:04:29 -07002342 Message *msg = &(messages_.front());
Austin Schuhd2f96102020-12-01 20:27:29 -08002343
Austin Schuh63097262023-08-16 17:04:29 -07002344 if (source_node_[msg->channel_index] == node()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002345 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh63097262023-08-16 17:04:29 -07002346 QueueMessage(msg);
2347 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
2348 << " on " << node_name();
Austin Schuh79b30942021-01-24 22:32:21 -08002349 last_message_time_ = matched_messages_.back().monotonic_event_time;
2350 messages_.pop_front();
2351 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002352 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2353 return MatchResult::kSkipped;
2354 }
2355 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002356 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002357 // Got a timestamp, find the matching remote data, match it, and return
2358 // it.
Austin Schuh63097262023-08-16 17:04:29 -07002359 Message data = MatchingMessageFor(*msg);
Austin Schuhd2f96102020-12-01 20:27:29 -08002360
2361 // Return the data from the remote. The local message only has timestamp
2362 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08002363 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuh63097262023-08-16 17:04:29 -07002364 .channel_index = msg->channel_index,
2365 .queue_index = msg->queue_index,
2366 .monotonic_event_time = msg->timestamp,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002367 .realtime_event_time = msg->header->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07002368 .remote_queue_index =
Austin Schuh63097262023-08-16 17:04:29 -07002369 BootQueueIndex{.boot = msg->monotonic_remote_boot,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002370 .index = msg->header->remote_queue_index.value()},
Austin Schuh63097262023-08-16 17:04:29 -07002371 .monotonic_remote_time = {msg->monotonic_remote_boot,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002372 msg->header->monotonic_remote_time.value()},
2373 .realtime_remote_time = msg->header->realtime_remote_time.value(),
Austin Schuhb5224ec2024-03-27 15:20:09 -07002374 .monotonic_remote_transmit_time =
2375 {msg->monotonic_remote_boot,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002376 msg->header->monotonic_remote_transmit_time},
Austin Schuh63097262023-08-16 17:04:29 -07002377 .monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002378 msg->header->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08002379 .data = std::move(data.data)});
Austin Schuh63097262023-08-16 17:04:29 -07002380 VLOG(1) << node_name() << " Inserted timestamp "
2381 << matched_messages_.back();
2382 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
2383 << " on " << node_name() << " " << matched_messages_.back();
Austin Schuh79b30942021-01-24 22:32:21 -08002384 last_message_time_ = matched_messages_.back().monotonic_event_time;
2385 // Since messages_ holds the data, drop it.
2386 messages_.pop_front();
2387 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002388 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2389 return MatchResult::kSkipped;
2390 }
2391 return MatchResult::kQueued;
Austin Schuh79b30942021-01-24 22:32:21 -08002392 }
2393}
2394
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002395void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08002396 while (last_message_time_ <= queue_time) {
2397 if (!QueueMatched()) {
2398 return;
2399 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002400 }
2401}
2402
Austin Schuhe639ea12021-01-25 13:00:22 -08002403void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002404 // Note: queueing for time doesn't really work well across boots. So we
2405 // just assume that if you are using this, you only care about the current
2406 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002407 //
2408 // TODO(austin): Is that the right concept?
2409 //
Austin Schuhe639ea12021-01-25 13:00:22 -08002410 // Make sure we have something queued first. This makes the end time
2411 // calculation simpler, and is typically what folks want regardless.
2412 if (matched_messages_.empty()) {
2413 if (!QueueMatched()) {
2414 return;
2415 }
2416 }
2417
2418 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002419 std::max(monotonic_start_time(
2420 matched_messages_.front().monotonic_event_time.boot),
2421 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08002422 time_estimation_buffer;
2423
2424 // Place sorted messages on the list until we have
2425 // --time_estimation_buffer_seconds seconds queued up (but queue at least
2426 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002427 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08002428 if (!QueueMatched()) {
2429 return;
2430 }
2431 }
2432}
2433
Austin Schuhd2f96102020-12-01 20:27:29 -08002434void TimestampMapper::PopFront() {
2435 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08002436 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002437 first_message_ = FirstMessage::kNeedsUpdate;
2438
Austin Schuh63097262023-08-16 17:04:29 -07002439 VLOG(1) << node_name() << " Popped " << matched_messages_.back();
Austin Schuh79b30942021-01-24 22:32:21 -08002440 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002441}
2442
2443Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002444 // Figure out what queue index we are looking for.
Austin Schuh6bdcc372024-06-27 14:49:11 -07002445 CHECK(message.header != nullptr);
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002446 CHECK(message.header->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07002447 const BootQueueIndex remote_queue_index =
2448 BootQueueIndex{.boot = message.monotonic_remote_boot,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002449 .index = *message.header->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08002450
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002451 CHECK(message.header->monotonic_remote_time.has_value());
2452 CHECK(message.header->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08002453
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002454 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07002455 .boot = message.monotonic_remote_boot,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002456 .time = message.header->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002457 const realtime_clock::time_point realtime_remote_time =
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002458 *message.header->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002459
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002460 TimestampMapper *peer =
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002461 nodes_data_[source_node_[message.header->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08002462
2463 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002464 // asked to pull a timestamp from a peer which doesn't exist, return an
2465 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08002466 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002467 // TODO(austin): Make sure the tests hit all these paths with a boot count
2468 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002469 return Message{.channel_index = message.channel_index,
2470 .queue_index = remote_queue_index,
2471 .timestamp = monotonic_remote_time,
2472 .monotonic_remote_boot = 0xffffff,
2473 .monotonic_timestamp_boot = 0xffffff,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002474 .header = nullptr,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002475 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08002476 }
2477
2478 // The queue which will have the matching data, if available.
2479 std::deque<Message> *data_queue =
2480 &peer->nodes_data_[node()].channels[message.channel_index].messages;
2481
Austin Schuh79b30942021-01-24 22:32:21 -08002482 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08002483
2484 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002485 return Message{.channel_index = message.channel_index,
2486 .queue_index = remote_queue_index,
2487 .timestamp = monotonic_remote_time,
2488 .monotonic_remote_boot = 0xffffff,
2489 .monotonic_timestamp_boot = 0xffffff,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002490 .header = nullptr,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002491 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002492 }
2493
Austin Schuhd2f96102020-12-01 20:27:29 -08002494 if (remote_queue_index < data_queue->front().queue_index ||
2495 remote_queue_index > data_queue->back().queue_index) {
Austin Schuh60e77942022-05-16 17:48:24 -07002496 return Message{.channel_index = message.channel_index,
2497 .queue_index = remote_queue_index,
2498 .timestamp = monotonic_remote_time,
2499 .monotonic_remote_boot = 0xffffff,
2500 .monotonic_timestamp_boot = 0xffffff,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002501 .header = nullptr,
Austin Schuh60e77942022-05-16 17:48:24 -07002502 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002503 }
2504
Austin Schuh993ccb52020-12-12 15:59:32 -08002505 // The algorithm below is constant time with some assumptions. We need there
2506 // to be no missing messages in the data stream. This also assumes a queue
2507 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07002508 if (data_queue->back().queue_index.boot ==
2509 data_queue->front().queue_index.boot &&
2510 (data_queue->back().queue_index.index -
2511 data_queue->front().queue_index.index + 1u ==
2512 data_queue->size())) {
2513 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08002514 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07002515 //
2516 // TODO(austin): Move if not reliable.
2517 Message result = (*data_queue)[remote_queue_index.index -
2518 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08002519
2520 CHECK_EQ(result.timestamp, monotonic_remote_time)
2521 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002522 CHECK_EQ(result.header->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08002523 << ": Queue index matches, but timestamp doesn't. Please investigate!";
2524 // Now drop the data off the front. We have deduplicated timestamps, so we
2525 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07002526 data_queue->erase(
2527 data_queue->begin(),
2528 data_queue->begin() +
2529 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08002530 return result;
2531 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07002532 // TODO(austin): Binary search.
2533 auto it = std::find_if(
2534 data_queue->begin(), data_queue->end(),
2535 [remote_queue_index,
Austin Schuh63097262023-08-16 17:04:29 -07002536 remote_boot = monotonic_remote_time.boot](const Message &msg) {
2537 return msg.queue_index == remote_queue_index &&
2538 msg.timestamp.boot == remote_boot;
Austin Schuh58646e22021-08-23 23:51:46 -07002539 });
Austin Schuh993ccb52020-12-12 15:59:32 -08002540 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002541 return Message{.channel_index = message.channel_index,
2542 .queue_index = remote_queue_index,
2543 .timestamp = monotonic_remote_time,
2544 .monotonic_remote_boot = 0xffffff,
2545 .monotonic_timestamp_boot = 0xffffff,
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002546 .header = nullptr,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002547 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08002548 }
2549
2550 Message result = std::move(*it);
2551
2552 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002553 << ": Queue index matches, but timestamp doesn't. Please "
2554 "investigate!";
Austin Schuh3c9f92c2024-04-30 17:56:42 -07002555 CHECK_EQ(result.header->realtime_sent_time, realtime_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002556 << ": Queue index matches, but timestamp doesn't. Please "
2557 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08002558
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08002559 // Erase everything up to this message. We want to keep 1 message in the
2560 // queue so we can handle reliable messages forwarded across boots.
2561 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08002562
2563 return result;
2564 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002565}
2566
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002567void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002568 if (queued_until_ > t) {
2569 return;
2570 }
2571 while (true) {
2572 if (!messages_.empty() && messages_.back().timestamp > t) {
2573 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
2574 return;
2575 }
2576
2577 if (!Queue()) {
2578 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002579 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08002580 return;
2581 }
2582
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002583 // Now that it has been added (and cannibalized), forget about it
2584 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002585 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002586 }
2587}
2588
2589bool TimestampMapper::Queue() {
Adam Snaider13d48d92023-08-03 12:20:15 -07002590 const Message *msg = boot_merger_.Front();
Austin Schuh63097262023-08-16 17:04:29 -07002591 if (msg == nullptr) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002592 return false;
2593 }
2594 for (NodeData &node_data : nodes_data_) {
2595 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07002596 if (!node_data.save_for_peer) continue;
Austin Schuh63097262023-08-16 17:04:29 -07002597 if (node_data.channels[msg->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08002598 // If we have data but no timestamps (logs where the timestamps didn't get
2599 // logged are classic), we can grow this indefinitely. We don't need to
2600 // keep anything that is older than the last message returned.
2601
2602 // We have the time on the source node.
2603 // We care to wait until we have the time on the destination node.
2604 std::deque<Message> &messages =
Austin Schuh63097262023-08-16 17:04:29 -07002605 node_data.channels[msg->channel_index].messages;
Austin Schuh6a7358f2021-11-18 22:40:40 -08002606 // Max delay over the network is the TTL, so let's take the queue time and
2607 // add TTL to it. Don't forget any messages which are reliable until
2608 // someone can come up with a good reason to forget those too.
Austin Schuh63097262023-08-16 17:04:29 -07002609 if (node_data.channels[msg->channel_index].time_to_live >
Austin Schuh6a7358f2021-11-18 22:40:40 -08002610 chrono::nanoseconds(0)) {
2611 // We need to make *some* assumptions about network delay for this to
2612 // work. We want to only look at the RX side. This means we need to
2613 // track the last time a message was popped from any channel from the
2614 // node sending this message, and compare that to the max time we expect
2615 // that a message will take to be delivered across the network. This
2616 // assumes that messages are popped in time order as a proxy for
2617 // measuring the distributed time at this layer.
2618 //
2619 // Leave at least 1 message in here so we can handle reboots and
2620 // messages getting sent twice.
2621 while (messages.size() > 1u &&
2622 messages.begin()->timestamp +
Austin Schuh63097262023-08-16 17:04:29 -07002623 node_data.channels[msg->channel_index].time_to_live +
Austin Schuh6a7358f2021-11-18 22:40:40 -08002624 chrono::duration_cast<chrono::nanoseconds>(
2625 chrono::duration<double>(FLAGS_max_network_delay)) <
2626 last_popped_message_time_) {
2627 messages.pop_front();
2628 }
2629 }
Austin Schuh63097262023-08-16 17:04:29 -07002630 node_data.channels[msg->channel_index].messages.emplace_back(*msg);
Austin Schuhd2f96102020-12-01 20:27:29 -08002631 }
2632 }
2633
Austin Schuh63097262023-08-16 17:04:29 -07002634 messages_.emplace_back(std::move(*msg));
Austin Schuhd2f96102020-12-01 20:27:29 -08002635 return true;
2636}
2637
Austin Schuh63097262023-08-16 17:04:29 -07002638void TimestampMapper::QueueTimestamps() {
2639 boot_merger_.QueueTimestamps(std::ref(timestamp_callback_), source_node_);
2640}
2641
Austin Schuhd2f96102020-12-01 20:27:29 -08002642std::string TimestampMapper::DebugString() const {
2643 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07002644 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002645 for (const Message &message : messages_) {
2646 ss << " " << message << "\n";
2647 }
2648 ss << "] queued_until " << queued_until_;
2649 for (const NodeData &ns : nodes_data_) {
2650 if (ns.peer == nullptr) continue;
2651 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
2652 size_t channel_index = 0;
2653 for (const NodeData::ChannelData &channel_data :
2654 ns.peer->nodes_data_[node()].channels) {
2655 if (channel_data.messages.empty()) {
2656 continue;
2657 }
Austin Schuhb000de62020-12-03 22:00:40 -08002658
Austin Schuhd2f96102020-12-01 20:27:29 -08002659 ss << " channel " << channel_index << " [\n";
Austin Schuh63097262023-08-16 17:04:29 -07002660 for (const Message &msg : channel_data.messages) {
2661 ss << " " << msg << "\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002662 }
2663 ss << " ]\n";
2664 ++channel_index;
2665 }
2666 ss << "] queued_until " << ns.peer->queued_until_;
2667 }
2668 return ss.str();
2669}
2670
Brian Silvermanf51499a2020-09-21 12:49:08 -07002671} // namespace aos::logger