blob: 2e19f3c9a1b603d6246f8e1d9ef9446ee3471d86 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Alexei Strots01395492023-03-20 13:59:56 -070010#include <filesystem>
Austin Schuha36c8902019-12-30 18:07:15 -080011
Austin Schuhe4fca832020-03-07 16:58:53 -080012#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070013#include "flatbuffers/flatbuffers.h"
14#include "gflags/gflags.h"
15#include "glog/logging.h"
16
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070018#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080019#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080020#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080021
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070022#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070023#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070024#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070025#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070026#else
27#define ENABLE_LZMA 0
28#endif
29
30#if ENABLE_LZMA
31#include "aos/events/logging/lzma_encoder.h"
32#endif
Austin Schuh86110712022-09-16 15:40:54 -070033#if ENABLE_S3
34#include "aos/events/logging/s3_fetcher.h"
35#endif
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070036
Austin Schuh48d10d62022-10-16 22:19:23 -070037DEFINE_int32(flush_size, 128 * 1024,
Austin Schuha36c8902019-12-30 18:07:15 -080038 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070039DEFINE_double(
40 flush_period, 5.0,
41 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080042
Austin Schuha040c3f2021-02-13 16:09:07 -080043DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080044 max_network_delay, 1.0,
45 "Max time to assume a message takes to cross the network before we are "
46 "willing to drop it from our buffers and assume it didn't make it. "
47 "Increasing this number can increase memory usage depending on the packet "
48 "loss of your network or if the timestamps aren't logged for a message.");
49
50DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080051 max_out_of_order, -1,
52 "If set, this overrides the max out of order duration for a log file.");
53
Austin Schuh0e8db662021-07-06 10:43:47 -070054DEFINE_bool(workaround_double_headers, true,
55 "Some old log files have two headers at the beginning. Use the "
56 "last header as the actual header.");
57
Brian Smarttea913d42021-12-10 15:02:38 -080058DEFINE_bool(crash_on_corrupt_message, true,
59 "When true, MessageReader will crash the first time a message "
60 "with corrupted format is found. When false, the crash will be "
61 "suppressed, and any remaining readable messages will be "
62 "evaluated to present verified vs corrupted stats.");
63
64DEFINE_bool(ignore_corrupt_messages, false,
65 "When true, and crash_on_corrupt_message is false, then any "
66 "corrupt message found by MessageReader be silently ignored, "
67 "providing access to all uncorrupted messages in a logfile.");
68
Alexei Strotsa3194712023-04-21 23:30:50 -070069DECLARE_bool(quiet_sorting);
70
Brian Silvermanf51499a2020-09-21 12:49:08 -070071namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070072namespace {
Austin Schuh05b70472020-01-01 17:11:17 -080073namespace chrono = std::chrono;
74
Alexei Strotscee7b372023-04-21 11:57:54 -070075std::unique_ptr<DataDecoder> ResolveDecoder(std::string_view filename,
76 bool quiet) {
77 static constexpr std::string_view kS3 = "s3:";
78
79 std::unique_ptr<DataDecoder> decoder;
80
81 if (filename.substr(0, kS3.size()) == kS3) {
82#if ENABLE_S3
83 decoder = std::make_unique<S3Fetcher>(filename);
84#else
85 LOG(FATAL) << "Reading files from S3 not supported on this platform";
86#endif
87 } else {
88 decoder = std::make_unique<DummyDecoder>(filename);
89 }
90
91 static constexpr std::string_view kXz = ".xz";
92 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
93 if (filename.substr(filename.size() - kXz.size()) == kXz) {
94#if ENABLE_LZMA
95 decoder = std::make_unique<ThreadedLzmaDecoder>(std::move(decoder), quiet);
96#else
97 (void)quiet;
98 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
99#endif
100 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
101 decoder = std::make_unique<SnappyDecoder>(std::move(decoder));
102 }
103 return decoder;
104}
105
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700106template <typename T>
107void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
108 if (t.has_value()) {
109 *os << *t;
110 } else {
111 *os << "null";
112 }
113}
Philipp Schrader10397952023-06-15 11:43:07 -0700114
115// A dummy LogSink implementation that handles the special case when we create
116// a DetachedBufferWriter when there's no space left on the system. The
117// DetachedBufferWriter frequently dereferences log_sink_, so we want a class
118// here that effectively refuses to do anything meaningful.
119class OutOfDiskSpaceLogSink : public LogSink {
120 public:
121 WriteCode OpenForWrite() override { return WriteCode::kOutOfSpace; }
122 WriteCode Close() override { return WriteCode::kOk; }
123 bool is_open() const override { return false; }
124 WriteResult Write(
125 const absl::Span<const absl::Span<const uint8_t>> &) override {
126 return WriteResult{
127 .code = WriteCode::kOutOfSpace,
128 .messages_written = 0,
129 };
130 }
131 std::string_view name() const override { return "<out_of_disk_space>"; }
132};
133
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700134} // namespace
135
Alexei Strotsbc082d82023-05-03 08:43:42 -0700136DetachedBufferWriter::DetachedBufferWriter(std::unique_ptr<LogSink> log_sink,
137 std::unique_ptr<DataEncoder> encoder)
138 : log_sink_(std::move(log_sink)), encoder_(std::move(encoder)) {
139 CHECK(log_sink_);
140 ran_out_of_space_ = log_sink_->OpenForWrite() == WriteCode::kOutOfSpace;
Alexei Strots01395492023-03-20 13:59:56 -0700141 if (ran_out_of_space_) {
142 LOG(WARNING) << "And we are out of space";
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800143 }
144}
145
Philipp Schrader10397952023-06-15 11:43:07 -0700146DetachedBufferWriter::DetachedBufferWriter(already_out_of_space_t)
147 : DetachedBufferWriter(std::make_unique<OutOfDiskSpaceLogSink>(), nullptr) {
148}
149
Austin Schuha36c8902019-12-30 18:07:15 -0800150DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700151 Close();
152 if (ran_out_of_space_) {
153 CHECK(acknowledge_ran_out_of_space_)
154 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -0700155 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700156}
157
Brian Silvermand90905f2020-09-23 14:42:56 -0700158DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700159 *this = std::move(other);
160}
161
Brian Silverman87ac0402020-09-17 14:47:01 -0700162// When other is destroyed "soon" (which it should be because we're getting an
163// rvalue reference to it), it will flush etc all the data we have queued up
164// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700165DetachedBufferWriter &DetachedBufferWriter::operator=(
166 DetachedBufferWriter &&other) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700167 std::swap(log_sink_, other.log_sink_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700168 std::swap(encoder_, other.encoder_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700169 std::swap(ran_out_of_space_, other.ran_out_of_space_);
170 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800171 std::swap(last_flush_time_, other.last_flush_time_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700172 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800173}
174
Austin Schuh8bdfc492023-02-11 12:53:13 -0800175void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *copier,
Austin Schuh7ef11a42023-02-04 17:15:12 -0800176 aos::monotonic_clock::time_point now) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700177 if (ran_out_of_space_) {
178 // We don't want any later data to be written after space becomes
179 // available, so refuse to write anything more once we've dropped data
180 // because we ran out of space.
Austin Schuh48d10d62022-10-16 22:19:23 -0700181 return;
Austin Schuha36c8902019-12-30 18:07:15 -0800182 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700183
Austin Schuh8bdfc492023-02-11 12:53:13 -0800184 const size_t message_size = copier->size();
185 size_t overall_bytes_written = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -0700186
Austin Schuh8bdfc492023-02-11 12:53:13 -0800187 // Keep writing chunks until we've written it all. If we end up with a
188 // partial write, this means we need to flush to disk.
189 do {
Alexei Strots01395492023-03-20 13:59:56 -0700190 const size_t bytes_written =
191 encoder_->Encode(copier, overall_bytes_written);
Austin Schuh8bdfc492023-02-11 12:53:13 -0800192 CHECK(bytes_written != 0);
193
194 overall_bytes_written += bytes_written;
195 if (overall_bytes_written < message_size) {
196 VLOG(1) << "Flushing because of a partial write, tried to write "
197 << message_size << " wrote " << overall_bytes_written;
198 Flush(now);
199 }
200 } while (overall_bytes_written < message_size);
201
Austin Schuhbd06ae42021-03-31 22:48:21 -0700202 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800203}
204
Brian Silverman0465fcf2020-09-24 00:29:18 -0700205void DetachedBufferWriter::Close() {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700206 if (!log_sink_->is_open()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700207 return;
208 }
209 encoder_->Finish();
210 while (encoder_->queue_size() > 0) {
Austin Schuh8bdfc492023-02-11 12:53:13 -0800211 Flush(monotonic_clock::max_time);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700212 }
Austin Schuhb2461652023-05-01 08:30:56 -0700213 encoder_.reset();
Alexei Strotsbc082d82023-05-03 08:43:42 -0700214 ran_out_of_space_ = log_sink_->Close() == WriteCode::kOutOfSpace;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700215}
216
Austin Schuh8bdfc492023-02-11 12:53:13 -0800217void DetachedBufferWriter::Flush(aos::monotonic_clock::time_point now) {
218 last_flush_time_ = now;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700219 if (ran_out_of_space_) {
220 // We don't want any later data to be written after space becomes available,
221 // so refuse to write anything more once we've dropped data because we ran
222 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700223 if (encoder_) {
224 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
225 encoder_->Clear(encoder_->queue().size());
226 } else {
227 VLOG(1) << "No queue to ignore";
228 }
229 return;
230 }
231
232 const auto queue = encoder_->queue();
233 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700234 return;
235 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700236
Alexei Strotsbc082d82023-05-03 08:43:42 -0700237 const WriteResult result = log_sink_->Write(queue);
Alexei Strots01395492023-03-20 13:59:56 -0700238 encoder_->Clear(result.messages_written);
239 ran_out_of_space_ = result.code == WriteCode::kOutOfSpace;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700240}
241
Austin Schuhbd06ae42021-03-31 22:48:21 -0700242void DetachedBufferWriter::FlushAtThreshold(
243 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700244 if (ran_out_of_space_) {
245 // We don't want any later data to be written after space becomes available,
246 // so refuse to write anything more once we've dropped data because we ran
247 // out of space.
248 if (encoder_) {
249 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
250 encoder_->Clear(encoder_->queue().size());
251 } else {
252 VLOG(1) << "No queue to ignore";
253 }
254 return;
255 }
256
Austin Schuhbd06ae42021-03-31 22:48:21 -0700257 // We don't want to flush the first time through. Otherwise we will flush as
258 // the log file header might be compressing, defeating any parallelism and
259 // queueing there.
260 if (last_flush_time_ == aos::monotonic_clock::min_time) {
261 last_flush_time_ = now;
262 }
263
Brian Silvermanf51499a2020-09-21 12:49:08 -0700264 // Flush if we are at the max number of iovs per writev, because there's no
265 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700266 // data queued up or if it has been long enough.
Austin Schuh8bdfc492023-02-11 12:53:13 -0800267 while (encoder_->space() == 0 ||
268 encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700269 encoder_->queue_size() >= IOV_MAX ||
Austin Schuh3ebaf782023-04-07 16:03:28 -0700270 (now > last_flush_time_ +
271 chrono::duration_cast<chrono::nanoseconds>(
272 chrono::duration<double>(FLAGS_flush_period)) &&
273 encoder_->queued_bytes() != 0)) {
274 VLOG(1) << "Chose to flush at " << now << ", last " << last_flush_time_
275 << " queued bytes " << encoder_->queued_bytes();
Austin Schuh8bdfc492023-02-11 12:53:13 -0800276 Flush(now);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700277 }
Austin Schuha36c8902019-12-30 18:07:15 -0800278}
279
Austin Schuhf2d0e682022-10-16 14:20:58 -0700280// Do the magic dance to convert the endianness of the data and append it to the
281// buffer.
282namespace {
283
284// TODO(austin): Look at the generated code to see if building the header is
285// efficient or not.
286template <typename T>
287uint8_t *Push(uint8_t *buffer, const T data) {
288 const T endian_data = flatbuffers::EndianScalar<T>(data);
289 std::memcpy(buffer, &endian_data, sizeof(T));
290 return buffer + sizeof(T);
291}
292
293uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
294 std::memcpy(buffer, data, size);
295 return buffer + size;
296}
297
298uint8_t *Pad(uint8_t *buffer, size_t padding) {
299 std::memset(buffer, 0, padding);
300 return buffer + padding;
301}
302} // namespace
303
304flatbuffers::Offset<MessageHeader> PackRemoteMessage(
305 flatbuffers::FlatBufferBuilder *fbb,
306 const message_bridge::RemoteMessage *msg, int channel_index,
307 const aos::monotonic_clock::time_point monotonic_timestamp_time) {
308 logger::MessageHeader::Builder message_header_builder(*fbb);
309 // Note: this must match the same order as MessageBridgeServer and
310 // PackMessage. We want identical headers to have identical
311 // on-the-wire formats to make comparing them easier.
312
313 message_header_builder.add_channel_index(channel_index);
314
315 message_header_builder.add_queue_index(msg->queue_index());
316 message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
317 message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
318
Austin Schuhb5224ec2024-03-27 15:20:09 -0700319 message_header_builder.add_monotonic_timestamp_time(
320 monotonic_timestamp_time.time_since_epoch().count());
321
322 message_header_builder.add_monotonic_remote_transmit_time(
323 msg->monotonic_remote_transmit_time());
324
Austin Schuhf2d0e682022-10-16 14:20:58 -0700325 message_header_builder.add_monotonic_remote_time(
326 msg->monotonic_remote_time());
327 message_header_builder.add_realtime_remote_time(msg->realtime_remote_time());
328 message_header_builder.add_remote_queue_index(msg->remote_queue_index());
329
Austin Schuhf2d0e682022-10-16 14:20:58 -0700330 return message_header_builder.Finish();
331}
332
333size_t PackRemoteMessageInline(
334 uint8_t *buffer, const message_bridge::RemoteMessage *msg,
335 int channel_index,
Austin Schuh71a40d42023-02-04 21:22:22 -0800336 const aos::monotonic_clock::time_point monotonic_timestamp_time,
337 size_t start_byte, size_t end_byte) {
Austin Schuhf2d0e682022-10-16 14:20:58 -0700338 const flatbuffers::uoffset_t message_size = PackRemoteMessageSize();
Austin Schuh71a40d42023-02-04 21:22:22 -0800339 DCHECK_EQ((start_byte % 8u), 0u);
340 DCHECK_EQ((end_byte % 8u), 0u);
341 DCHECK_LE(start_byte, end_byte);
342 DCHECK_LE(end_byte, message_size);
Austin Schuhf2d0e682022-10-16 14:20:58 -0700343
Austin Schuh71a40d42023-02-04 21:22:22 -0800344 switch (start_byte) {
345 case 0x00u:
346 if ((end_byte) == 0x00u) {
347 break;
348 }
349 // clang-format off
350 // header:
Austin Schuhb5224ec2024-03-27 15:20:09 -0700351 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
352
Austin Schuh71a40d42023-02-04 21:22:22 -0800353 buffer = Push<flatbuffers::uoffset_t>(
354 buffer, message_size - sizeof(flatbuffers::uoffset_t));
Austin Schuhb5224ec2024-03-27 15:20:09 -0700355 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
356 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1C);
Austin Schuh71a40d42023-02-04 21:22:22 -0800357 [[fallthrough]];
358 case 0x08u:
359 if ((end_byte) == 0x08u) {
360 break;
361 }
362 //
Austin Schuh71a40d42023-02-04 21:22:22 -0800363 // vtable (aos.logger.MessageHeader):
Austin Schuhb5224ec2024-03-27 15:20:09 -0700364 // +0x08 | 18 00 | uint16_t | 0x0018 (24) | size of this vtable
365 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
366 // +0x0A | 40 00 | uint16_t | 0x0040 (64) | size of referring table
367 buffer = Push<flatbuffers::voffset_t>(buffer, 0x40);
368 // +0x0C | 3C 00 | VOffset16 | 0x003C (60) | offset to field `channel_index` (id: 0)
369 buffer = Push<flatbuffers::voffset_t>(buffer, 0x3c);
370 // +0x0E | 30 00 | VOffset16 | 0x0030 (48) | offset to field `monotonic_sent_time` (id: 1)
371 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
Austin Schuh71a40d42023-02-04 21:22:22 -0800372 [[fallthrough]];
373 case 0x10u:
374 if ((end_byte) == 0x10u) {
375 break;
376 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700377 // +0x10 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `realtime_sent_time` (id: 2)
378 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
379 // +0x12 | 38 00 | VOffset16 | 0x0038 (56) | offset to field `queue_index` (id: 3)
Austin Schuh71a40d42023-02-04 21:22:22 -0800380 buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700381 // +0x14 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
382 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
383 // +0x16 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
384 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
Austin Schuh71a40d42023-02-04 21:22:22 -0800385 [[fallthrough]];
386 case 0x18u:
387 if ((end_byte) == 0x18u) {
388 break;
389 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700390 // +0x18 | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
391 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
392 // +0x1A | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
393 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
394 // +0x1C | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_timestamp_time` (id: 8)
395 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
396 // +0x1E | 18 00 | VOffset16 | 0x0018 (24) | offset to field `monotonic_remote_transmit_time` (id: 9)
397 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
Austin Schuh71a40d42023-02-04 21:22:22 -0800398 [[fallthrough]];
399 case 0x20u:
400 if ((end_byte) == 0x20u) {
401 break;
402 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700403
404
Austin Schuh71a40d42023-02-04 21:22:22 -0800405 // root_table (aos.logger.MessageHeader):
Austin Schuhb5224ec2024-03-27 15:20:09 -0700406 // +0x20 | 18 00 00 00 | SOffset32 | 0x00000018 (24) Loc: +0x08 | offset to vtable
407 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
408 // +0x24 | 8B 00 00 00 | uint32_t | 0x0000008B (139) | table field `remote_queue_index` (UInt)
409 buffer = Push<uint32_t>(buffer, msg->remote_queue_index());
Austin Schuh71a40d42023-02-04 21:22:22 -0800410 [[fallthrough]];
411 case 0x28u:
412 if ((end_byte) == 0x28u) {
413 break;
414 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700415 // +0x28 | D4 C9 48 86 92 8B 6A AF | int64_t | 0xAF6A8B928648C9D4 (-5806675308106429996) | table field `realtime_remote_time` (Long)
416 buffer = Push<int64_t>(buffer, msg->realtime_remote_time());
Austin Schuh71a40d42023-02-04 21:22:22 -0800417 [[fallthrough]];
418 case 0x30u:
419 if ((end_byte) == 0x30u) {
420 break;
421 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700422 // +0x30 | 65 B1 32 50 FE 54 50 6B | int64_t | 0x6B5054FE5032B165 (7732774011439067493) | table field `monotonic_remote_time` (Long)
423 buffer = Push<int64_t>(buffer, msg->monotonic_remote_time());
Austin Schuh71a40d42023-02-04 21:22:22 -0800424 [[fallthrough]];
425 case 0x38u:
426 if ((end_byte) == 0x38u) {
427 break;
428 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700429 // +0x38 | EA 4D CC E0 FC 20 86 71 | int64_t | 0x718620FCE0CC4DEA (8180262043640417770) | table field `monotonic_remote_transmit_time` (Long)
430 buffer = Push<int64_t>(buffer, msg->monotonic_remote_transmit_time());
Austin Schuh71a40d42023-02-04 21:22:22 -0800431 [[fallthrough]];
432 case 0x40u:
433 if ((end_byte) == 0x40u) {
434 break;
435 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700436 // +0x40 | 8E 59 CF 88 9D DF 02 07 | int64_t | 0x0702DF9D88CF598E (505211975917066638) | table field `monotonic_timestamp_time` (Long)
437 buffer = Push<int64_t>(buffer,
438 monotonic_timestamp_time.time_since_epoch().count());
Austin Schuh71a40d42023-02-04 21:22:22 -0800439 [[fallthrough]];
440 case 0x48u:
441 if ((end_byte) == 0x48u) {
442 break;
443 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700444 // +0x48 | 14 D5 A7 D8 B2 E4 EF 89 | int64_t | 0x89EFE4B2D8A7D514 (-8507329714289388268) | table field `realtime_sent_time` (Long)
Austin Schuh71a40d42023-02-04 21:22:22 -0800445 buffer = Push<int64_t>(buffer, msg->realtime_sent_time());
446 [[fallthrough]];
447 case 0x50u:
448 if ((end_byte) == 0x50u) {
449 break;
450 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700451 // +0x50 | 19 7D 7F EF 86 8D 92 65 | int64_t | 0x65928D86EF7F7D19 (7319067955113721113) | table field `monotonic_sent_time` (Long)
Austin Schuh71a40d42023-02-04 21:22:22 -0800452 buffer = Push<int64_t>(buffer, msg->monotonic_sent_time());
453 [[fallthrough]];
454 case 0x58u:
455 if ((end_byte) == 0x58u) {
456 break;
457 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700458 // +0x58 | FC 00 00 00 | uint32_t | 0x000000FC (252) | table field `queue_index` (UInt)
Austin Schuh71a40d42023-02-04 21:22:22 -0800459 buffer = Push<uint32_t>(buffer, msg->queue_index());
Austin Schuhb5224ec2024-03-27 15:20:09 -0700460 // +0x5C | 9C 00 00 00 | uint32_t | 0x0000009C (156) | table field `channel_index` (UInt)
Austin Schuh71a40d42023-02-04 21:22:22 -0800461 buffer = Push<uint32_t>(buffer, channel_index);
462 // clang-format on
463 [[fallthrough]];
464 case 0x60u:
465 if ((end_byte) == 0x60u) {
466 break;
467 }
468 }
Austin Schuhf2d0e682022-10-16 14:20:58 -0700469
Austin Schuh71a40d42023-02-04 21:22:22 -0800470 return end_byte - start_byte;
Austin Schuhf2d0e682022-10-16 14:20:58 -0700471}
472
Austin Schuha36c8902019-12-30 18:07:15 -0800473flatbuffers::Offset<MessageHeader> PackMessage(
474 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
475 int channel_index, LogType log_type) {
476 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
477
478 switch (log_type) {
479 case LogType::kLogMessage:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800480 case LogType::kLogRemoteMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700481 // Since the timestamps are 8 byte aligned, we are going to end up adding
482 // padding in the middle of the message to pad everything out to 8 byte
483 // alignment. That's rather wasteful. To make things efficient to mmap
484 // while reading uncompressed logs, we'd actually rather the message be
485 // aligned. So, force 8 byte alignment (enough to preserve alignment
486 // inside the nested message so that we can read it without moving it)
487 // here.
488 fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8);
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700489 data_offset = fbb->CreateVector(
490 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800491 break;
492
493 case LogType::kLogDeliveryTimeOnly:
494 break;
495 }
496
497 MessageHeader::Builder message_header_builder(*fbb);
498 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800499
Austin Schuhfa30c352022-10-16 11:12:02 -0700500 // These are split out into very explicit serialization calls because the
501 // order here changes the order things are written out on the wire, and we
502 // want to control and understand it here. Changing the order can increase
503 // the amount of padding bytes in the middle.
504 //
James Kuszmaul9776b392023-01-14 14:08:08 -0800505 // It is also easier to follow... And doesn't actually make things much
506 // bigger.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800507 switch (log_type) {
508 case LogType::kLogRemoteMessage:
509 message_header_builder.add_queue_index(context.remote_queue_index);
Austin Schuhfa30c352022-10-16 11:12:02 -0700510 message_header_builder.add_data(data_offset);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800511 message_header_builder.add_monotonic_sent_time(
512 context.monotonic_remote_time.time_since_epoch().count());
513 message_header_builder.add_realtime_sent_time(
514 context.realtime_remote_time.time_since_epoch().count());
515 break;
516
Austin Schuh6f3babe2020-01-26 20:34:50 -0800517 case LogType::kLogDeliveryTimeOnly:
518 message_header_builder.add_queue_index(context.queue_index);
519 message_header_builder.add_monotonic_sent_time(
520 context.monotonic_event_time.time_since_epoch().count());
521 message_header_builder.add_realtime_sent_time(
522 context.realtime_event_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800523 message_header_builder.add_monotonic_remote_time(
524 context.monotonic_remote_time.time_since_epoch().count());
525 message_header_builder.add_realtime_remote_time(
526 context.realtime_remote_time.time_since_epoch().count());
Austin Schuhb5224ec2024-03-27 15:20:09 -0700527 message_header_builder.add_monotonic_remote_transmit_time(
528 context.monotonic_remote_transmit_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800529 message_header_builder.add_remote_queue_index(context.remote_queue_index);
530 break;
Austin Schuhfa30c352022-10-16 11:12:02 -0700531
532 case LogType::kLogMessage:
533 message_header_builder.add_queue_index(context.queue_index);
534 message_header_builder.add_data(data_offset);
535 message_header_builder.add_monotonic_sent_time(
536 context.monotonic_event_time.time_since_epoch().count());
537 message_header_builder.add_realtime_sent_time(
538 context.realtime_event_time.time_since_epoch().count());
539 break;
Austin Schuha36c8902019-12-30 18:07:15 -0800540 }
541
542 return message_header_builder.Finish();
543}
544
Austin Schuhfa30c352022-10-16 11:12:02 -0700545flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) {
546 switch (log_type) {
547 case LogType::kLogMessage:
548 return
549 // Root table size + offset.
550 sizeof(flatbuffers::uoffset_t) * 2 +
551 // 6 padding bytes to pad the header out properly.
552 6 +
553 // vtable header (size + size of table)
554 sizeof(flatbuffers::voffset_t) * 2 +
555 // offsets to all the fields.
556 sizeof(flatbuffers::voffset_t) * 5 +
557 // pointer to vtable
558 sizeof(flatbuffers::soffset_t) +
559 // pointer to data
560 sizeof(flatbuffers::uoffset_t) +
561 // realtime_sent_time, monotonic_sent_time
562 sizeof(int64_t) * 2 +
563 // queue_index, channel_index
564 sizeof(uint32_t) * 2;
565
566 case LogType::kLogDeliveryTimeOnly:
567 return
568 // Root table size + offset.
569 sizeof(flatbuffers::uoffset_t) * 2 +
Austin Schuhfa30c352022-10-16 11:12:02 -0700570 // vtable header (size + size of table)
571 sizeof(flatbuffers::voffset_t) * 2 +
572 // offsets to all the fields.
Austin Schuhb5224ec2024-03-27 15:20:09 -0700573 sizeof(flatbuffers::voffset_t) * 10 +
Austin Schuhfa30c352022-10-16 11:12:02 -0700574 // pointer to vtable
575 sizeof(flatbuffers::soffset_t) +
576 // remote_queue_index
577 sizeof(uint32_t) +
578 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
579 // monotonic_sent_time
Austin Schuhb5224ec2024-03-27 15:20:09 -0700580 sizeof(int64_t) * 5 +
Austin Schuhfa30c352022-10-16 11:12:02 -0700581 // queue_index, channel_index
582 sizeof(uint32_t) * 2;
583
Austin Schuhfa30c352022-10-16 11:12:02 -0700584 case LogType::kLogRemoteMessage:
585 return
586 // Root table size + offset.
587 sizeof(flatbuffers::uoffset_t) * 2 +
588 // 6 padding bytes to pad the header out properly.
589 6 +
590 // vtable header (size + size of table)
591 sizeof(flatbuffers::voffset_t) * 2 +
592 // offsets to all the fields.
593 sizeof(flatbuffers::voffset_t) * 5 +
594 // pointer to vtable
595 sizeof(flatbuffers::soffset_t) +
596 // realtime_sent_time, monotonic_sent_time
597 sizeof(int64_t) * 2 +
598 // pointer to data
599 sizeof(flatbuffers::uoffset_t) +
600 // queue_index, channel_index
601 sizeof(uint32_t) * 2;
602 }
603 LOG(FATAL);
604}
605
James Kuszmaul9776b392023-01-14 14:08:08 -0800606flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size) {
Austin Schuhfa30c352022-10-16 11:12:02 -0700607 static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
608 "Update size logic please.");
609 const flatbuffers::uoffset_t aligned_data_length =
Austin Schuh48d10d62022-10-16 22:19:23 -0700610 ((data_size + 7) & 0xfffffff8u);
Austin Schuhfa30c352022-10-16 11:12:02 -0700611 switch (log_type) {
612 case LogType::kLogDeliveryTimeOnly:
613 return PackMessageHeaderSize(log_type);
614
615 case LogType::kLogMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700616 case LogType::kLogRemoteMessage:
617 return PackMessageHeaderSize(log_type) +
618 // Vector...
619 sizeof(flatbuffers::uoffset_t) + aligned_data_length;
620 }
621 LOG(FATAL);
622}
623
Austin Schuhfa30c352022-10-16 11:12:02 -0700624size_t PackMessageInline(uint8_t *buffer, const Context &context,
Austin Schuh71a40d42023-02-04 21:22:22 -0800625 int channel_index, LogType log_type, size_t start_byte,
626 size_t end_byte) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700627 // TODO(austin): Figure out how to copy directly from shared memory instead of
628 // first into the fetcher's memory and then into here. That would save a lot
629 // of memory.
Austin Schuhfa30c352022-10-16 11:12:02 -0700630 const flatbuffers::uoffset_t message_size =
Austin Schuh48d10d62022-10-16 22:19:23 -0700631 PackMessageSize(log_type, context.size);
Austin Schuh71a40d42023-02-04 21:22:22 -0800632 DCHECK_EQ((message_size % 8), 0u) << ": Non 8 byte length...";
633 DCHECK_EQ((start_byte % 8u), 0u);
634 DCHECK_EQ((end_byte % 8u), 0u);
635 DCHECK_LE(start_byte, end_byte);
636 DCHECK_LE(end_byte, message_size);
Austin Schuhfa30c352022-10-16 11:12:02 -0700637
638 // Pack all the data in. This is brittle but easy to change. Use the
639 // InlinePackMessage.Equivilent unit test to verify everything matches.
640 switch (log_type) {
641 case LogType::kLogMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -0800642 switch (start_byte) {
643 case 0x00u:
644 if ((end_byte) == 0x00u) {
645 break;
646 }
647 // clang-format off
648 // header:
649 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
650 buffer = Push<flatbuffers::uoffset_t>(
651 buffer, message_size - sizeof(flatbuffers::uoffset_t));
652
653 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
654 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
655 [[fallthrough]];
656 case 0x08u:
657 if ((end_byte) == 0x08u) {
658 break;
659 }
660 //
661 // padding:
662 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
663 buffer = Pad(buffer, 6);
664 //
665 // vtable (aos.logger.MessageHeader):
666 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
667 buffer = Push<flatbuffers::voffset_t>(buffer, 0xe);
668 [[fallthrough]];
669 case 0x10u:
670 if ((end_byte) == 0x10u) {
671 break;
672 }
673 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
674 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
675 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
676 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
677 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
678 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
679 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
680 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
681 [[fallthrough]];
682 case 0x18u:
683 if ((end_byte) == 0x18u) {
684 break;
685 }
686 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
687 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
688 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
689 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
690 //
691 // root_table (aos.logger.MessageHeader):
692 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
693 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e);
694 [[fallthrough]];
695 case 0x20u:
696 if ((end_byte) == 0x20u) {
697 break;
698 }
699 // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long)
700 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
701 [[fallthrough]];
702 case 0x28u:
703 if ((end_byte) == 0x28u) {
704 break;
705 }
706 // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long)
707 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
708 [[fallthrough]];
709 case 0x30u:
710 if ((end_byte) == 0x30u) {
711 break;
712 }
713 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
714 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c);
715 // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt)
716 buffer = Push<uint32_t>(buffer, context.queue_index);
717 [[fallthrough]];
718 case 0x38u:
719 if ((end_byte) == 0x38u) {
720 break;
721 }
722 // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt)
723 buffer = Push<uint32_t>(buffer, channel_index);
724 //
725 // vector (aos.logger.MessageHeader.data):
726 // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items)
727 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
728 [[fallthrough]];
729 case 0x40u:
730 if ((end_byte) == 0x40u) {
731 break;
732 }
733 [[fallthrough]];
734 default:
735 // +0x40 | FF | uint8_t | 0xFF (255) | value[0]
736 // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1]
737 // +0x42 | EE | uint8_t | 0xEE (238) | value[2]
738 // +0x43 | 00 | uint8_t | 0x00 (0) | value[3]
739 // +0x44 | 20 | uint8_t | 0x20 (32) | value[4]
740 // +0x45 | 4D | uint8_t | 0x4D (77) | value[5]
741 // +0x46 | FF | uint8_t | 0xFF (255) | value[6]
742 // +0x47 | 25 | uint8_t | 0x25 (37) | value[7]
743 // +0x48 | 3C | uint8_t | 0x3C (60) | value[8]
744 // +0x49 | 17 | uint8_t | 0x17 (23) | value[9]
745 // +0x4A | 65 | uint8_t | 0x65 (101) | value[10]
746 // +0x4B | 2F | uint8_t | 0x2F (47) | value[11]
747 // +0x4C | 63 | uint8_t | 0x63 (99) | value[12]
748 // +0x4D | 58 | uint8_t | 0x58 (88) | value[13]
749 //
750 // padding:
751 // +0x4E | 00 00 | uint8_t[2] | .. | padding
752 // clang-format on
753 if (start_byte <= 0x40 && end_byte == message_size) {
754 // The easy one, slap it all down.
755 buffer = PushBytes(buffer, context.data, context.size);
756 buffer =
757 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
758 } else {
759 const size_t data_start_byte =
760 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
761 const size_t data_end_byte = end_byte - 0x40;
762 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
763 if (data_start_byte < padded_size) {
764 buffer = PushBytes(
765 buffer,
766 reinterpret_cast<const uint8_t *>(context.data) +
767 data_start_byte,
768 std::min(context.size, data_end_byte) - data_start_byte);
769 if (data_end_byte == padded_size) {
770 // We can only pad the last 7 bytes, so this only gets written
771 // if we write the last byte.
772 buffer = Pad(buffer,
773 ((context.size + 7) & 0xfffffff8u) - context.size);
774 }
775 }
776 }
777 break;
778 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700779 break;
780
781 case LogType::kLogDeliveryTimeOnly:
Austin Schuh71a40d42023-02-04 21:22:22 -0800782 switch (start_byte) {
783 case 0x00u:
784 if ((end_byte) == 0x00u) {
785 break;
786 }
787 // clang-format off
788 // header:
Austin Schuhb5224ec2024-03-27 15:20:09 -0700789 // +0x00 | 54 00 00 00 | UOffset32 | 0x00000054 (84) Loc: +0x54 | size prefix
Austin Schuh71a40d42023-02-04 21:22:22 -0800790 buffer = Push<flatbuffers::uoffset_t>(
791 buffer, message_size - sizeof(flatbuffers::uoffset_t));
Austin Schuhb5224ec2024-03-27 15:20:09 -0700792 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
Austin Schuh71a40d42023-02-04 21:22:22 -0800793 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
Austin Schuhfa30c352022-10-16 11:12:02 -0700794
Austin Schuh71a40d42023-02-04 21:22:22 -0800795 [[fallthrough]];
796 case 0x08u:
797 if ((end_byte) == 0x08u) {
798 break;
799 }
800 //
Austin Schuh71a40d42023-02-04 21:22:22 -0800801 // vtable (aos.logger.MessageHeader):
Austin Schuhb5224ec2024-03-27 15:20:09 -0700802 // +0x08 | 18 00 | uint16_t | 0x0018 (24) | size of this vtable
803 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
804 // +0x0A | 38 00 | uint16_t | 0x0038 (56) | size of referring table
805 buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
806 // +0x0C | 34 00 | VOffset16 | 0x0034 (52) | offset to field `channel_index` (id: 0)
807 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
808 // +0x0E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `monotonic_sent_time` (id: 1)
809 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
Austin Schuh71a40d42023-02-04 21:22:22 -0800810 [[fallthrough]];
811 case 0x10u:
812 if ((end_byte) == 0x10u) {
813 break;
814 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700815 // +0x10 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `realtime_sent_time` (id: 2)
Austin Schuh71a40d42023-02-04 21:22:22 -0800816 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700817 // +0x12 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `queue_index` (id: 3)
818 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
819 // +0x14 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
820 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
821 // +0x16 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `monotonic_remote_time` (id: 5)
Austin Schuh71a40d42023-02-04 21:22:22 -0800822 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
Austin Schuh71a40d42023-02-04 21:22:22 -0800823 [[fallthrough]];
824 case 0x18u:
825 if ((end_byte) == 0x18u) {
826 break;
827 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700828 // +0x18 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `realtime_remote_time` (id: 6)
Austin Schuh71a40d42023-02-04 21:22:22 -0800829 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700830 // +0x1A | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
Austin Schuh71a40d42023-02-04 21:22:22 -0800831 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700832 // +0x1C | 00 00 | VOffset16 | 0x0000 (0) | offset to field `monotonic_timestamp_time` (id: 8) <defaults to -9223372036854775808> (Long)
833 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
834 // +0x1E | 08 00 | VOffset16 | 0x0008 (8) | offset to field `monotonic_remote_transmit_time` (id: 9)
835 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
Austin Schuh71a40d42023-02-04 21:22:22 -0800836 [[fallthrough]];
837 case 0x20u:
838 if ((end_byte) == 0x20u) {
839 break;
840 }
Austin Schuh71a40d42023-02-04 21:22:22 -0800841 // root_table (aos.logger.MessageHeader):
Austin Schuhb5224ec2024-03-27 15:20:09 -0700842 // +0x20 | 18 00 00 00 | SOffset32 | 0x00000018 (24) Loc: +0x08 | offset to vtable
843 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
844 // +0x24 | 3F 9A 69 37 | uint32_t | 0x37699A3F (929667647) | table field `remote_queue_index` (UInt)
Austin Schuh71a40d42023-02-04 21:22:22 -0800845 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
846 [[fallthrough]];
847 case 0x28u:
848 if ((end_byte) == 0x28u) {
849 break;
850 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700851 // +0x28 | 00 00 00 00 00 00 00 80 | int64_t | 0x8000000000000000 (-9223372036854775808) | table field `monotonic_remote_transmit_time` (Long)
852 buffer = Push<int64_t>(buffer, context.monotonic_remote_transmit_time.time_since_epoch().count());
Austin Schuh71a40d42023-02-04 21:22:22 -0800853 [[fallthrough]];
854 case 0x30u:
855 if ((end_byte) == 0x30u) {
856 break;
857 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700858 // +0x30 | 1D CE 4A 38 54 33 C9 F8 | int64_t | 0xF8C93354384ACE1D (-519827845169885667) | table field `realtime_remote_time` (Long)
859 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
Austin Schuh71a40d42023-02-04 21:22:22 -0800860 [[fallthrough]];
861 case 0x38u:
862 if ((end_byte) == 0x38u) {
863 break;
864 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700865 // +0x38 | FE EA DF 1D C7 3F C6 03 | int64_t | 0x03C63FC71DDFEAFE (271974951934749438) | table field `monotonic_remote_time` (Long)
866 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
Austin Schuh71a40d42023-02-04 21:22:22 -0800867 [[fallthrough]];
868 case 0x40u:
869 if ((end_byte) == 0x40u) {
870 break;
871 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700872 // +0x40 | 4E 0C 96 6E FB B5 CE 12 | int64_t | 0x12CEB5FB6E960C4E (1355220629381844046) | table field `realtime_sent_time` (Long)
873 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
Austin Schuh71a40d42023-02-04 21:22:22 -0800874 [[fallthrough]];
875 case 0x48u:
876 if ((end_byte) == 0x48u) {
877 break;
878 }
Austin Schuhb5224ec2024-03-27 15:20:09 -0700879 // +0x48 | 51 56 56 F9 0A 0B 0F 12 | int64_t | 0x120F0B0AF9565651 (1301270959094126161) | table field `monotonic_sent_time` (Long)
880 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
881 [[fallthrough]];
882 case 0x50u:
883 if ((end_byte) == 0x50u) {
884 break;
885 }
886 // +0x50 | 0C A5 42 18 | uint32_t | 0x1842A50C (407020812) | table field `queue_index` (UInt)
Austin Schuh71a40d42023-02-04 21:22:22 -0800887 buffer = Push<uint32_t>(buffer, context.queue_index);
Austin Schuhb5224ec2024-03-27 15:20:09 -0700888 // +0x54 | 87 10 7C D7 | uint32_t | 0xD77C1087 (3615232135) | table field `channel_index` (UInt)
Austin Schuh71a40d42023-02-04 21:22:22 -0800889 buffer = Push<uint32_t>(buffer, channel_index);
890
891 // clang-format on
892 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700893 break;
894
Austin Schuhfa30c352022-10-16 11:12:02 -0700895 case LogType::kLogRemoteMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -0800896 switch (start_byte) {
897 case 0x00u:
898 if ((end_byte) == 0x00u) {
899 break;
900 }
901 // This is the message we need to recreate.
902 //
903 // clang-format off
904 // header:
905 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
906 buffer = Push<flatbuffers::uoffset_t>(
907 buffer, message_size - sizeof(flatbuffers::uoffset_t));
908 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
909 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
910 [[fallthrough]];
911 case 0x08u:
912 if ((end_byte) == 0x08u) {
913 break;
914 }
915 //
916 // padding:
917 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
918 buffer = Pad(buffer, 6);
919 //
920 // vtable (aos.logger.MessageHeader):
921 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
922 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e);
923 [[fallthrough]];
924 case 0x10u:
925 if ((end_byte) == 0x10u) {
926 break;
927 }
928 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
929 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
930 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
931 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
932 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
933 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
934 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
935 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
936 [[fallthrough]];
937 case 0x18u:
938 if ((end_byte) == 0x18u) {
939 break;
940 }
941 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
942 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
943 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
944 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
945 //
946 // root_table (aos.logger.MessageHeader):
947 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
948 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E);
949 [[fallthrough]];
950 case 0x20u:
951 if ((end_byte) == 0x20u) {
952 break;
953 }
954 // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long)
955 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
956 [[fallthrough]];
957 case 0x28u:
958 if ((end_byte) == 0x28u) {
959 break;
960 }
961 // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long)
962 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
963 [[fallthrough]];
964 case 0x30u:
965 if ((end_byte) == 0x30u) {
966 break;
967 }
968 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
969 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C);
970 // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt)
971 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
972 [[fallthrough]];
973 case 0x38u:
974 if ((end_byte) == 0x38u) {
975 break;
976 }
977 // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt)
978 buffer = Push<uint32_t>(buffer, channel_index);
979 //
980 // vector (aos.logger.MessageHeader.data):
981 // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items)
982 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
983 [[fallthrough]];
984 case 0x40u:
985 if ((end_byte) == 0x40u) {
986 break;
987 }
988 [[fallthrough]];
989 default:
990 // +0x40 | 38 | uint8_t | 0x38 (56) | value[0]
991 // +0x41 | 1A | uint8_t | 0x1A (26) | value[1]
992 // ...
993 // +0x58 | 90 | uint8_t | 0x90 (144) | value[24]
994 // +0x59 | 92 | uint8_t | 0x92 (146) | value[25]
995 //
996 // padding:
997 // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
998 // clang-format on
999 if (start_byte <= 0x40 && end_byte == message_size) {
1000 // The easy one, slap it all down.
1001 buffer = PushBytes(buffer, context.data, context.size);
1002 buffer =
1003 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1004 } else {
1005 const size_t data_start_byte =
1006 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
1007 const size_t data_end_byte = end_byte - 0x40;
1008 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1009 if (data_start_byte < padded_size) {
1010 buffer = PushBytes(
1011 buffer,
1012 reinterpret_cast<const uint8_t *>(context.data) +
1013 data_start_byte,
1014 std::min(context.size, data_end_byte) - data_start_byte);
1015 if (data_end_byte == padded_size) {
1016 // We can only pad the last 7 bytes, so this only gets written
1017 // if we write the last byte.
1018 buffer = Pad(buffer,
1019 ((context.size + 7) & 0xfffffff8u) - context.size);
1020 }
1021 }
1022 }
1023 break;
1024 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001025 }
1026
Austin Schuh71a40d42023-02-04 21:22:22 -08001027 return end_byte - start_byte;
Austin Schuhfa30c352022-10-16 11:12:02 -07001028}
1029
Austin Schuhcd368422021-11-22 21:23:29 -08001030SpanReader::SpanReader(std::string_view filename, bool quiet)
Alexei Strotscee7b372023-04-21 11:57:54 -07001031 : SpanReader(filename, ResolveDecoder(filename, quiet)) {}
Tyler Chatow2015bc62021-08-04 21:15:09 -07001032
Alexei Strotscee7b372023-04-21 11:57:54 -07001033SpanReader::SpanReader(std::string_view filename,
1034 std::unique_ptr<DataDecoder> decoder)
1035 : filename_(filename), decoder_(std::move(decoder)) {}
Austin Schuh05b70472020-01-01 17:11:17 -08001036
Austin Schuhcf5f6442021-07-06 10:43:28 -07001037absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001038 // Make sure we have enough for the size.
1039 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
1040 if (!ReadBlock()) {
1041 return absl::Span<const uint8_t>();
1042 }
1043 }
1044
1045 // Now make sure we have enough for the message.
1046 const size_t data_size =
1047 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1048 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -08001049 if (data_size == sizeof(flatbuffers::uoffset_t)) {
1050 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
1051 LOG(ERROR) << " Rest of log file is "
1052 << absl::BytesToHexString(std::string_view(
1053 reinterpret_cast<const char *>(data_.data() +
1054 consumed_data_),
1055 data_.size() - consumed_data_));
1056 return absl::Span<const uint8_t>();
1057 }
Austin Schuh05b70472020-01-01 17:11:17 -08001058 while (data_.size() < consumed_data_ + data_size) {
1059 if (!ReadBlock()) {
1060 return absl::Span<const uint8_t>();
1061 }
1062 }
1063
1064 // And return it, consuming the data.
1065 const uint8_t *data_ptr = data_.data() + consumed_data_;
1066
Austin Schuh05b70472020-01-01 17:11:17 -08001067 return absl::Span<const uint8_t>(data_ptr, data_size);
1068}
1069
Austin Schuhcf5f6442021-07-06 10:43:28 -07001070void SpanReader::ConsumeMessage() {
Brian Smarttea913d42021-12-10 15:02:38 -08001071 size_t consumed_size =
Austin Schuhcf5f6442021-07-06 10:43:28 -07001072 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1073 sizeof(flatbuffers::uoffset_t);
Brian Smarttea913d42021-12-10 15:02:38 -08001074 consumed_data_ += consumed_size;
1075 total_consumed_ += consumed_size;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001076}
1077
1078absl::Span<const uint8_t> SpanReader::ReadMessage() {
1079 absl::Span<const uint8_t> result = PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001080 if (!result.empty()) {
Austin Schuhcf5f6442021-07-06 10:43:28 -07001081 ConsumeMessage();
Brian Smarttea913d42021-12-10 15:02:38 -08001082 } else {
1083 is_finished_ = true;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001084 }
1085 return result;
1086}
1087
Austin Schuh05b70472020-01-01 17:11:17 -08001088bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001089 // This is the amount of data we grab at a time. Doing larger chunks minimizes
1090 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -08001091 constexpr size_t kReadSize = 256 * 1024;
1092
1093 // Strip off any unused data at the front.
1094 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001095 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -08001096 consumed_data_ = 0;
1097 }
1098
1099 const size_t starting_size = data_.size();
1100
1101 // This should automatically grow the backing store. It won't shrink if we
1102 // get a small chunk later. This reduces allocations when we want to append
1103 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -07001104 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -08001105
Brian Silvermanf51499a2020-09-21 12:49:08 -07001106 const size_t count =
1107 decoder_->Read(data_.begin() + starting_size, data_.end());
1108 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -08001109 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -08001110 return false;
1111 }
Austin Schuh05b70472020-01-01 17:11:17 -08001112
Brian Smarttea913d42021-12-10 15:02:38 -08001113 total_read_ += count;
1114
Austin Schuh05b70472020-01-01 17:11:17 -08001115 return true;
1116}
1117
Alexei Strotsa3194712023-04-21 23:30:50 -07001118LogReadersPool::LogReadersPool(const LogSource *log_source, size_t pool_size)
1119 : log_source_(log_source), pool_size_(pool_size) {}
1120
1121SpanReader *LogReadersPool::BorrowReader(std::string_view id) {
1122 if (part_readers_.size() > pool_size_) {
1123 // Don't leave arbitrary numbers of readers open, because they each take
1124 // resources, so close a big batch at once periodically.
1125 part_readers_.clear();
1126 }
1127 if (log_source_ == nullptr) {
1128 part_readers_.emplace_back(id, FLAGS_quiet_sorting);
1129 } else {
1130 part_readers_.emplace_back(id, log_source_->GetDecoder(id));
1131 }
1132 return &part_readers_.back();
1133}
1134
Austin Schuhadd6eb32020-11-09 21:24:26 -08001135std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -07001136 SpanReader *span_reader) {
1137 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001138
1139 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001140 if (config_data.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001141 return std::nullopt;
1142 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001143
Austin Schuh5212cad2020-09-09 23:12:09 -07001144 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001145 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -08001146 if (!result.Verify()) {
1147 return std::nullopt;
1148 }
Austin Schuh0e8db662021-07-06 10:43:47 -07001149
Austin Schuhcc2c9a52022-12-12 15:55:13 -08001150 // We only know of busted headers in the versions of the log file header
1151 // *before* the logger_sha1 field was added. At some point before that point,
1152 // the logic to track when a header has been written was rewritten in such a
1153 // way that it can't happen anymore. We've seen some logs where the body
1154 // parses as a header recently, so the simple solution of always looking is
1155 // failing us.
1156 if (FLAGS_workaround_double_headers && !result.message().has_logger_sha1()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001157 while (true) {
1158 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001159 if (maybe_header_data.empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001160 break;
1161 }
1162
1163 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
1164 maybe_header_data);
1165 if (maybe_header.Verify()) {
1166 LOG(WARNING) << "Found duplicate LogFileHeader in "
1167 << span_reader->filename();
1168 ResizeableBuffer header_data_copy;
1169 header_data_copy.resize(maybe_header_data.size());
1170 memcpy(header_data_copy.data(), maybe_header_data.begin(),
1171 header_data_copy.size());
1172 result = SizePrefixedFlatbufferVector<LogFileHeader>(
1173 std::move(header_data_copy));
1174
1175 span_reader->ConsumeMessage();
1176 } else {
1177 break;
1178 }
1179 }
1180 }
Austin Schuhe09beb12020-12-11 20:04:27 -08001181 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001182}
1183
Austin Schuh0e8db662021-07-06 10:43:47 -07001184std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
1185 std::string_view filename) {
1186 SpanReader span_reader(filename);
1187 return ReadHeader(&span_reader);
1188}
1189
Austin Schuhadd6eb32020-11-09 21:24:26 -08001190std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -08001191 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -07001192 SpanReader span_reader(filename);
1193 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
1194 for (size_t i = 0; i < n + 1; ++i) {
1195 data_span = span_reader.ReadMessage();
1196
1197 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001198 if (data_span.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001199 return std::nullopt;
1200 }
Austin Schuh5212cad2020-09-09 23:12:09 -07001201 }
1202
Brian Silverman354697a2020-09-22 21:06:32 -07001203 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001204 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -08001205 if (!result.Verify()) {
1206 return std::nullopt;
1207 }
1208 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -07001209}
1210
Alexei Strots58017402023-05-03 22:05:06 -07001211MessageReader::MessageReader(SpanReader span_reader)
1212 : span_reader_(std::move(span_reader)),
Austin Schuhadd6eb32020-11-09 21:24:26 -08001213 raw_log_file_header_(
1214 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001215 set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
1216 set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
1217
Austin Schuh0e8db662021-07-06 10:43:47 -07001218 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
1219 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -08001220
1221 // Make sure something was read.
Alexei Strots58017402023-05-03 22:05:06 -07001222 CHECK(raw_log_file_header)
1223 << ": Failed to read header from: " << span_reader_.filename();
Austin Schuh05b70472020-01-01 17:11:17 -08001224
Austin Schuh0e8db662021-07-06 10:43:47 -07001225 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -08001226
Austin Schuh5b728b72021-06-16 14:57:15 -07001227 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
1228
Brian Smarttea913d42021-12-10 15:02:38 -08001229 total_verified_before_ = span_reader_.TotalConsumed();
1230
Austin Schuhcde938c2020-02-02 17:30:07 -08001231 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -08001232 FLAGS_max_out_of_order > 0
1233 ? chrono::duration_cast<chrono::nanoseconds>(
1234 chrono::duration<double>(FLAGS_max_out_of_order))
1235 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -08001236
Alexei Strots58017402023-05-03 22:05:06 -07001237 VLOG(1) << "Opened " << span_reader_.filename() << " as node "
Austin Schuhcde938c2020-02-02 17:30:07 -08001238 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -08001239}
1240
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001241std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001242 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001243 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001244 if (is_corrupted()) {
1245 LOG(ERROR) << "Total corrupted volumes: before = "
1246 << total_verified_before_
1247 << " | corrupted = " << total_corrupted_
1248 << " | during = " << total_verified_during_
1249 << " | after = " << total_verified_after_ << std::endl;
1250 }
1251
1252 if (span_reader_.IsIncomplete()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001253 LOG(ERROR) << "Unable to access some messages in " << filename() << " : "
1254 << span_reader_.TotalRead() << " bytes read, "
Brian Smarttea913d42021-12-10 15:02:38 -08001255 << span_reader_.TotalConsumed() << " bytes usable."
1256 << std::endl;
1257 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001258 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -08001259 }
1260
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001261 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
Brian Smarttea913d42021-12-10 15:02:38 -08001262
1263 if (crash_on_corrupt_message_flag_) {
1264 CHECK(msg.Verify()) << "Corrupted message at offset "
Austin Schuh60e77942022-05-16 17:48:24 -07001265 << total_verified_before_ << " found within "
1266 << filename()
Brian Smarttea913d42021-12-10 15:02:38 -08001267 << "; set --nocrash_on_corrupt_message to see summary;"
1268 << " also set --ignore_corrupt_messages to process"
1269 << " anyway";
1270
1271 } else if (!msg.Verify()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001272 LOG(ERROR) << "Corrupted message at offset " << total_verified_before_
Brian Smarttea913d42021-12-10 15:02:38 -08001273 << " from " << filename() << std::endl;
1274
1275 total_corrupted_ += msg_data.size();
1276
1277 while (true) {
1278 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1279
James Kuszmaul9776b392023-01-14 14:08:08 -08001280 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001281 if (!ignore_corrupt_messages_flag_) {
1282 LOG(ERROR) << "Total corrupted volumes: before = "
1283 << total_verified_before_
1284 << " | corrupted = " << total_corrupted_
1285 << " | during = " << total_verified_during_
1286 << " | after = " << total_verified_after_ << std::endl;
1287
1288 if (span_reader_.IsIncomplete()) {
1289 LOG(ERROR) << "Unable to access some messages in " << filename()
1290 << " : " << span_reader_.TotalRead() << " bytes read, "
1291 << span_reader_.TotalConsumed() << " bytes usable."
1292 << std::endl;
1293 }
1294 return nullptr;
1295 }
1296 break;
1297 }
1298
1299 SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
1300
1301 if (!next_msg.Verify()) {
1302 total_corrupted_ += msg_data.size();
1303 total_verified_during_ += total_verified_after_;
1304 total_verified_after_ = 0;
1305
1306 } else {
1307 total_verified_after_ += msg_data.size();
1308 if (ignore_corrupt_messages_flag_) {
1309 msg = next_msg;
1310 break;
1311 }
1312 }
1313 }
1314 }
1315
1316 if (is_corrupted()) {
1317 total_verified_after_ += msg_data.size();
1318 } else {
1319 total_verified_before_ += msg_data.size();
1320 }
Austin Schuh05b70472020-01-01 17:11:17 -08001321
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001322 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -07001323
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001324 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001325
1326 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -08001327
1328 if (VLOG_IS_ON(3)) {
1329 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
1330 } else if (VLOG_IS_ON(2)) {
1331 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
1332 msg_copy.mutable_message()->clear_data();
1333 VLOG(2) << "Read from " << filename() << " data "
1334 << FlatbufferToJson(msg_copy);
1335 }
1336
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001337 return result;
1338}
1339
1340std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
1341 const MessageHeader &message) {
1342 const size_t data_size = message.has_data() ? message.data()->size() : 0;
1343
1344 UnpackedMessageHeader *const unpacked_message =
1345 reinterpret_cast<UnpackedMessageHeader *>(
1346 malloc(sizeof(UnpackedMessageHeader) + data_size +
1347 kChannelDataAlignment - 1));
1348
1349 CHECK(message.has_channel_index());
1350 CHECK(message.has_monotonic_sent_time());
1351
1352 absl::Span<uint8_t> span;
1353 if (data_size > 0) {
1354 span =
1355 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
1356 &unpacked_message->actual_data[0], data_size)),
1357 data_size);
1358 }
1359
Austin Schuh826e6ce2021-11-18 20:33:10 -08001360 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001361 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001362 monotonic_remote_time = aos::monotonic_clock::time_point(
1363 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001364 }
1365 std::optional<realtime_clock::time_point> realtime_remote_time;
1366 if (message.has_realtime_remote_time()) {
1367 realtime_remote_time = realtime_clock::time_point(
1368 chrono::nanoseconds(message.realtime_remote_time()));
1369 }
Austin Schuhb5224ec2024-03-27 15:20:09 -07001370 aos::monotonic_clock::time_point monotonic_remote_transmit_time =
1371 aos::monotonic_clock::time_point(
1372 std::chrono::nanoseconds(message.monotonic_remote_transmit_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001373
1374 std::optional<uint32_t> remote_queue_index;
1375 if (message.has_remote_queue_index()) {
1376 remote_queue_index = message.remote_queue_index();
1377 }
1378
James Kuszmaul9776b392023-01-14 14:08:08 -08001379 new (unpacked_message) UnpackedMessageHeader(
1380 message.channel_index(),
1381 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001382 chrono::nanoseconds(message.monotonic_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001383 realtime_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001384 chrono::nanoseconds(message.realtime_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001385 message.queue_index(), monotonic_remote_time, realtime_remote_time,
Austin Schuhb5224ec2024-03-27 15:20:09 -07001386 monotonic_remote_transmit_time, remote_queue_index,
James Kuszmaul9776b392023-01-14 14:08:08 -08001387 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001388 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001389 message.has_monotonic_timestamp_time(), span);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001390
1391 if (data_size > 0) {
1392 memcpy(span.data(), message.data()->data(), data_size);
1393 }
1394
1395 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
1396 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -08001397}
1398
Alexei Strots58017402023-05-03 22:05:06 -07001399SpanReader PartsMessageReader::MakeSpanReader(
1400 const LogPartsAccess &log_parts_access, size_t part_number) {
1401 const auto part = log_parts_access.GetPartAt(part_number);
1402 if (log_parts_access.log_source().has_value()) {
1403 return SpanReader(part,
1404 log_parts_access.log_source().value()->GetDecoder(part));
1405 } else {
1406 return SpanReader(part);
1407 }
1408}
1409
1410PartsMessageReader::PartsMessageReader(LogPartsAccess log_parts_access)
1411 : log_parts_access_(std::move(log_parts_access)),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001412 message_reader_(MakeSpanReader(log_parts_access_, 0)),
1413 max_out_of_order_duration_(
1414 log_parts_access_.max_out_of_order_duration()) {
Alexei Strots58017402023-05-03 22:05:06 -07001415 if (log_parts_access_.size() >= 2) {
1416 next_message_reader_.emplace(MakeSpanReader(log_parts_access_, 1));
Brian Silvermanfee16972021-09-14 12:06:38 -07001417 }
Austin Schuh48507722021-07-17 17:29:24 -07001418 ComputeBootCounts();
1419}
1420
1421void PartsMessageReader::ComputeBootCounts() {
Austin Schuh63097262023-08-16 17:04:29 -07001422 boot_counts_.assign(
1423 configuration::NodesCount(log_parts_access_.config().get()),
1424 std::nullopt);
Austin Schuh48507722021-07-17 17:29:24 -07001425
Alexei Strots58017402023-05-03 22:05:06 -07001426 const auto boots = log_parts_access_.parts().boots;
1427
Austin Schuh48507722021-07-17 17:29:24 -07001428 // We have 3 vintages of log files with different amounts of information.
1429 if (log_file_header()->has_boot_uuids()) {
1430 // The new hotness with the boots explicitly listed out. We can use the log
1431 // file header to compute the boot count of all relevant nodes.
1432 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
1433 size_t node_index = 0;
1434 for (const flatbuffers::String *boot_uuid :
1435 *log_file_header()->boot_uuids()) {
Alexei Strots58017402023-05-03 22:05:06 -07001436 CHECK(boots);
Austin Schuh48507722021-07-17 17:29:24 -07001437 if (boot_uuid->size() != 0) {
Alexei Strots58017402023-05-03 22:05:06 -07001438 auto it = boots->boot_count_map.find(boot_uuid->str());
1439 if (it != boots->boot_count_map.end()) {
Austin Schuh48507722021-07-17 17:29:24 -07001440 boot_counts_[node_index] = it->second;
1441 }
1442 } else if (parts().boots->boots[node_index].size() == 1u) {
1443 boot_counts_[node_index] = 0;
1444 }
1445 ++node_index;
1446 }
1447 } else {
1448 // Older multi-node logs which are guarenteed to have UUIDs logged, or
1449 // single node log files with boot UUIDs in the header. We only know how to
1450 // order certain boots in certain circumstances.
Austin Schuh63097262023-08-16 17:04:29 -07001451 if (configuration::MultiNode(log_parts_access_.config().get()) || boots) {
Austin Schuh48507722021-07-17 17:29:24 -07001452 for (size_t node_index = 0; node_index < boot_counts_.size();
1453 ++node_index) {
Alexei Strots58017402023-05-03 22:05:06 -07001454 if (boots->boots[node_index].size() == 1u) {
Austin Schuh48507722021-07-17 17:29:24 -07001455 boot_counts_[node_index] = 0;
1456 }
1457 }
1458 } else {
1459 // Really old single node logs without any UUIDs. They can't reboot.
1460 CHECK_EQ(boot_counts_.size(), 1u);
1461 boot_counts_[0] = 0u;
1462 }
1463 }
1464}
Austin Schuhc41603c2020-10-11 16:17:37 -07001465
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001466std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -07001467 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001468 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -07001469 message_reader_.ReadMessage();
1470 if (message) {
1471 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001472 const monotonic_clock::time_point monotonic_sent_time =
1473 message->monotonic_sent_time;
1474
1475 // TODO(austin): Does this work with startup? Might need to use the
1476 // start time.
1477 // TODO(austin): Does this work with startup when we don't know the
1478 // remote start time too? Look at one of those logs to compare.
Alexei Strots58017402023-05-03 22:05:06 -07001479 if (monotonic_sent_time > log_parts_access_.parts().monotonic_start_time +
1480 max_out_of_order_duration()) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001481 after_start_ = true;
1482 }
1483 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -08001484 CHECK_GE(monotonic_sent_time,
1485 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -08001486 << ": Max out of order of " << max_out_of_order_duration().count()
Alexei Strots58017402023-05-03 22:05:06 -07001487 << "ns exceeded. " << log_parts_access_.parts()
1488 << ", start time is "
1489 << log_parts_access_.parts().monotonic_start_time
1490 << " currently reading " << filename();
Austin Schuhb000de62020-12-03 22:00:40 -08001491 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001492 return message;
1493 }
1494 NextLog();
1495 }
Austin Schuh32f68492020-11-08 21:45:51 -08001496 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001497 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -07001498}
1499
1500void PartsMessageReader::NextLog() {
Alexei Strots58017402023-05-03 22:05:06 -07001501 if (next_part_index_ == log_parts_access_.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001502 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -07001503 done_ = true;
1504 return;
1505 }
Brian Silvermanfee16972021-09-14 12:06:38 -07001506 CHECK(next_message_reader_);
1507 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -07001508 ComputeBootCounts();
Alexei Strots58017402023-05-03 22:05:06 -07001509 if (next_part_index_ + 1 < log_parts_access_.size()) {
1510 next_message_reader_.emplace(
1511 MakeSpanReader(log_parts_access_, next_part_index_ + 1));
Brian Silvermanfee16972021-09-14 12:06:38 -07001512 } else {
1513 next_message_reader_.reset();
1514 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001515 ++next_part_index_;
1516}
1517
Austin Schuh1be0ce42020-11-29 22:43:26 -08001518bool Message::operator<(const Message &m2) const {
Austin Schuh63097262023-08-16 17:04:29 -07001519 if (this->timestamp < m2.timestamp) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001520 return true;
Austin Schuh63097262023-08-16 17:04:29 -07001521 } else if (this->timestamp > m2.timestamp) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001522 return false;
1523 }
1524
1525 if (this->channel_index < m2.channel_index) {
1526 return true;
1527 } else if (this->channel_index > m2.channel_index) {
1528 return false;
1529 }
1530
1531 return this->queue_index < m2.queue_index;
1532}
1533
1534bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001535bool Message::operator==(const Message &m2) const {
Austin Schuh63097262023-08-16 17:04:29 -07001536 return timestamp == m2.timestamp && channel_index == m2.channel_index &&
1537 queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001538}
Austin Schuh1be0ce42020-11-29 22:43:26 -08001539
Austin Schuh63097262023-08-16 17:04:29 -07001540bool Message::operator<=(const Message &m2) const {
1541 return *this == m2 || *this < m2;
1542}
1543
1544std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &msg) {
1545 os << "{.channel_index=" << msg.channel_index
1546 << ", .monotonic_sent_time=" << msg.monotonic_sent_time
1547 << ", .realtime_sent_time=" << msg.realtime_sent_time
1548 << ", .queue_index=" << msg.queue_index;
1549 if (msg.monotonic_remote_time) {
1550 os << ", .monotonic_remote_time=" << *msg.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001551 }
1552 os << ", .realtime_remote_time=";
Austin Schuh63097262023-08-16 17:04:29 -07001553 PrintOptionalOrNull(&os, msg.realtime_remote_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001554 os << ", .remote_queue_index=";
Austin Schuh63097262023-08-16 17:04:29 -07001555 PrintOptionalOrNull(&os, msg.remote_queue_index);
1556 if (msg.has_monotonic_timestamp_time) {
1557 os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001558 }
Austin Schuh22cf7862022-09-19 19:09:42 -07001559 os << "}";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001560 return os;
1561}
1562
Austin Schuh63097262023-08-16 17:04:29 -07001563std::ostream &operator<<(std::ostream &os, const Message &msg) {
1564 os << "{.channel_index=" << msg.channel_index
1565 << ", .queue_index=" << msg.queue_index
1566 << ", .timestamp=" << msg.timestamp;
1567 if (msg.data != nullptr) {
1568 if (msg.data->remote_queue_index.has_value()) {
1569 os << ", .remote_queue_index=" << *msg.data->remote_queue_index;
Austin Schuh826e6ce2021-11-18 20:33:10 -08001570 }
Austin Schuh63097262023-08-16 17:04:29 -07001571 if (msg.data->monotonic_remote_time.has_value()) {
1572 os << ", .monotonic_remote_time=" << *msg.data->monotonic_remote_time;
Austin Schuh826e6ce2021-11-18 20:33:10 -08001573 }
Austin Schuh63097262023-08-16 17:04:29 -07001574 os << ", .data=" << msg.data;
Austin Schuhd2f96102020-12-01 20:27:29 -08001575 }
1576 os << "}";
1577 return os;
1578}
1579
Austin Schuh63097262023-08-16 17:04:29 -07001580std::ostream &operator<<(std::ostream &os, const TimestampedMessage &msg) {
1581 os << "{.channel_index=" << msg.channel_index
1582 << ", .queue_index=" << msg.queue_index
1583 << ", .monotonic_event_time=" << msg.monotonic_event_time
1584 << ", .realtime_event_time=" << msg.realtime_event_time;
1585 if (msg.remote_queue_index != BootQueueIndex::Invalid()) {
1586 os << ", .remote_queue_index=" << msg.remote_queue_index;
Austin Schuhd2f96102020-12-01 20:27:29 -08001587 }
Austin Schuh63097262023-08-16 17:04:29 -07001588 if (msg.monotonic_remote_time != BootTimestamp::min_time()) {
1589 os << ", .monotonic_remote_time=" << msg.monotonic_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001590 }
Austin Schuh63097262023-08-16 17:04:29 -07001591 if (msg.realtime_remote_time != realtime_clock::min_time) {
1592 os << ", .realtime_remote_time=" << msg.realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001593 }
Austin Schuhb5224ec2024-03-27 15:20:09 -07001594 if (msg.monotonic_remote_transmit_time != BootTimestamp::min_time()) {
1595 os << ", .monotonic_remote_transmit_time="
1596 << msg.monotonic_remote_transmit_time;
1597 }
Austin Schuh63097262023-08-16 17:04:29 -07001598 if (msg.monotonic_timestamp_time != BootTimestamp::min_time()) {
1599 os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001600 }
Austin Schuh63097262023-08-16 17:04:29 -07001601 if (msg.data != nullptr) {
1602 os << ", .data=" << *msg.data;
Austin Schuh22cf7862022-09-19 19:09:42 -07001603 } else {
1604 os << ", .data=nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08001605 }
1606 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -08001607 return os;
1608}
1609
Alexei Strots58017402023-05-03 22:05:06 -07001610MessageSorter::MessageSorter(const LogPartsAccess log_parts_access)
1611 : parts_message_reader_(log_parts_access),
Austin Schuh48507722021-07-17 17:29:24 -07001612 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
1613}
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001614
Adam Snaider13d48d92023-08-03 12:20:15 -07001615const Message *MessageSorter::Front() {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001616 // Queue up data until enough data has been queued that the front message is
1617 // sorted enough to be safe to pop. This may do nothing, so we should make
1618 // sure the nothing path is checked quickly.
1619 if (sorted_until() != monotonic_clock::max_time) {
1620 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -07001621 if (!messages_.empty() &&
1622 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -08001623 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001624 break;
1625 }
1626
Austin Schuh63097262023-08-16 17:04:29 -07001627 std::shared_ptr<UnpackedMessageHeader> msg =
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001628 parts_message_reader_.ReadMessage();
1629 // No data left, sorted forever, work through what is left.
Austin Schuh63097262023-08-16 17:04:29 -07001630 if (!msg) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001631 sorted_until_ = monotonic_clock::max_time;
1632 break;
1633 }
1634
Austin Schuh48507722021-07-17 17:29:24 -07001635 size_t monotonic_timestamp_boot = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001636 if (msg->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -07001637 monotonic_timestamp_boot = parts().logger_boot_count;
1638 }
1639 size_t monotonic_remote_boot = 0xffffff;
1640
Austin Schuh63097262023-08-16 17:04:29 -07001641 if (msg->monotonic_remote_time.has_value()) {
Maxwell Gumley8c1b87f2024-02-13 17:54:52 -07001642 CHECK_LT(msg->channel_index, source_node_index_.size());
Austin Schuh63097262023-08-16 17:04:29 -07001643 const Node *node = parts().config->nodes()->Get(
1644 source_node_index_[msg->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001645
Austin Schuh48507722021-07-17 17:29:24 -07001646 std::optional<size_t> boot = parts_message_reader_.boot_count(
Austin Schuh63097262023-08-16 17:04:29 -07001647 source_node_index_[msg->channel_index]);
Alexei Strots036d84e2023-05-03 16:05:12 -07001648 CHECK(boot) << ": Failed to find boot for node '" << MaybeNodeName(node)
Austin Schuh63097262023-08-16 17:04:29 -07001649 << "', with index "
1650 << source_node_index_[msg->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -07001651 monotonic_remote_boot = *boot;
1652 }
1653
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001654 messages_.insert(
Austin Schuh63097262023-08-16 17:04:29 -07001655 Message{.channel_index = msg->channel_index,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001656 .queue_index = BootQueueIndex{.boot = parts().boot_count,
Austin Schuh63097262023-08-16 17:04:29 -07001657 .index = msg->queue_index},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001658 .timestamp = BootTimestamp{.boot = parts().boot_count,
Austin Schuh63097262023-08-16 17:04:29 -07001659 .time = msg->monotonic_sent_time},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001660 .monotonic_remote_boot = monotonic_remote_boot,
1661 .monotonic_timestamp_boot = monotonic_timestamp_boot,
Austin Schuh63097262023-08-16 17:04:29 -07001662 .data = std::move(msg)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001663
1664 // Now, update sorted_until_ to match the new message.
1665 if (parts_message_reader_.newest_timestamp() >
1666 monotonic_clock::min_time +
1667 parts_message_reader_.max_out_of_order_duration()) {
1668 sorted_until_ = parts_message_reader_.newest_timestamp() -
1669 parts_message_reader_.max_out_of_order_duration();
1670 } else {
1671 sorted_until_ = monotonic_clock::min_time;
1672 }
1673 }
1674 }
1675
1676 // Now that we have enough data queued, return a pointer to the oldest piece
1677 // of data if it exists.
1678 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -08001679 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001680 return nullptr;
1681 }
1682
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001683 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -08001684 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001685 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh63097262023-08-16 17:04:29 -07001686 VLOG(1) << this << " Front, sorted until " << sorted_until_ << " for "
1687 << (*messages_.begin()) << " on " << parts_message_reader_.filename();
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001688 return &(*messages_.begin());
1689}
1690
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001691void MessageSorter::PopFront() { messages_.erase(messages_.begin()); }
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001692
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001693std::string MessageSorter::DebugString() const {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001694 std::stringstream ss;
1695 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001696 int count = 0;
1697 bool no_dots = true;
Austin Schuh63097262023-08-16 17:04:29 -07001698 for (const Message &msg : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001699 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
Austin Schuh63097262023-08-16 17:04:29 -07001700 ss << msg << "\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001701 } else if (no_dots) {
1702 ss << "...\n";
1703 no_dots = false;
1704 }
1705 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001706 }
1707 ss << "] <- " << parts_message_reader_.filename();
1708 return ss.str();
1709}
1710
Austin Schuh63097262023-08-16 17:04:29 -07001711// Class to merge start times cleanly, reusably, and incrementally.
1712class StartTimes {
1713 public:
1714 void Update(monotonic_clock::time_point new_monotonic_start_time,
1715 realtime_clock::time_point new_realtime_start_time) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001716 // We want to capture the earliest meaningful start time here. The start
1717 // time defaults to min_time when there's no meaningful value to report, so
1718 // let's ignore those.
Austin Schuh63097262023-08-16 17:04:29 -07001719 if (new_monotonic_start_time != monotonic_clock::min_time) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001720 bool accept = false;
1721 // We want to prioritize start times from the logger node. Really, we
1722 // want to prioritize start times with a valid realtime_clock time. So,
1723 // if we have a start time without a RT clock, prefer a start time with a
1724 // RT clock, even it if is later.
Austin Schuh63097262023-08-16 17:04:29 -07001725 if (new_realtime_start_time != realtime_clock::min_time) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001726 // We've got a good one. See if the current start time has a good RT
1727 // clock, or if we should use this one instead.
Austin Schuh63097262023-08-16 17:04:29 -07001728 if (new_monotonic_start_time < monotonic_start_time_ ||
1729 monotonic_start_time_ == monotonic_clock::min_time) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001730 accept = true;
1731 } else if (realtime_start_time_ == realtime_clock::min_time) {
1732 // The previous start time doesn't have a good RT time, so it is very
1733 // likely the start time from a remote part file. We just found a
1734 // better start time with a real RT time, so switch to that instead.
1735 accept = true;
1736 }
1737 } else if (realtime_start_time_ == realtime_clock::min_time) {
1738 // We don't have a RT time, so take the oldest.
Austin Schuh63097262023-08-16 17:04:29 -07001739 if (new_monotonic_start_time < monotonic_start_time_ ||
1740 monotonic_start_time_ == monotonic_clock::min_time) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001741 accept = true;
1742 }
1743 }
1744
1745 if (accept) {
Austin Schuh63097262023-08-16 17:04:29 -07001746 monotonic_start_time_ = new_monotonic_start_time;
1747 realtime_start_time_ = new_realtime_start_time;
Austin Schuh9dc42612021-09-20 20:41:29 -07001748 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001749 }
1750 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001751
Austin Schuh63097262023-08-16 17:04:29 -07001752 monotonic_clock::time_point monotonic_start_time() const {
1753 return monotonic_start_time_;
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001754 }
Austin Schuh63097262023-08-16 17:04:29 -07001755 realtime_clock::time_point realtime_start_time() const {
1756 return realtime_start_time_;
1757 }
1758
1759 private:
1760 monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::min_time;
1761 realtime_clock::time_point realtime_start_time_ = realtime_clock::min_time;
1762};
1763
1764PartsMerger::PartsMerger(SelectedLogParts &&parts) {
1765 node_ = configuration::GetNodeIndex(parts.config().get(), parts.node_name());
1766
1767 for (LogPartsAccess part : parts) {
1768 message_sorters_.emplace_back(std::move(part));
1769 }
1770
1771 StartTimes start_times;
1772 for (const MessageSorter &message_sorter : message_sorters_) {
1773 start_times.Update(message_sorter.monotonic_start_time(),
1774 message_sorter.realtime_start_time());
1775 }
1776 monotonic_start_time_ = start_times.monotonic_start_time();
1777 realtime_start_time_ = start_times.realtime_start_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08001778}
Austin Schuh8f52ed52020-11-30 23:12:39 -08001779
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001780std::vector<const LogParts *> PartsMerger::Parts() const {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001781 std::vector<const LogParts *> p;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001782 p.reserve(message_sorters_.size());
1783 for (const MessageSorter &message_sorter : message_sorters_) {
1784 p.emplace_back(&message_sorter.parts());
Austin Schuh0ca51f32020-12-25 21:51:45 -08001785 }
1786 return p;
1787}
1788
Adam Snaider13d48d92023-08-03 12:20:15 -07001789const Message *PartsMerger::Front() {
Austin Schuh8f52ed52020-11-30 23:12:39 -08001790 // Return the current Front if we have one, otherwise go compute one.
1791 if (current_ != nullptr) {
Adam Snaider13d48d92023-08-03 12:20:15 -07001792 const Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001793 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuh63097262023-08-16 17:04:29 -07001794 VLOG(1) << this << " PartsMerger::Front for node " << node_name() << " "
1795 << *result;
Austin Schuhb000de62020-12-03 22:00:40 -08001796 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001797 }
1798
1799 // Otherwise, do a simple search for the oldest message, deduplicating any
1800 // duplicates.
Adam Snaider13d48d92023-08-03 12:20:15 -07001801 const Message *oldest = nullptr;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001802 sorted_until_ = monotonic_clock::max_time;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001803 for (MessageSorter &message_sorter : message_sorters_) {
Adam Snaider13d48d92023-08-03 12:20:15 -07001804 const Message *msg = message_sorter.Front();
Austin Schuh63097262023-08-16 17:04:29 -07001805 if (!msg) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001806 sorted_until_ = std::min(sorted_until_, message_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001807 continue;
1808 }
Austin Schuh63097262023-08-16 17:04:29 -07001809 if (oldest == nullptr || *msg < *oldest) {
1810 oldest = msg;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001811 current_ = &message_sorter;
Austin Schuh63097262023-08-16 17:04:29 -07001812 } else if (*msg == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001813 // Found a duplicate. If there is a choice, we want the one which has
1814 // the timestamp time.
Austin Schuh63097262023-08-16 17:04:29 -07001815 if (!msg->data->has_monotonic_timestamp_time) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001816 message_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001817 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001818 current_->PopFront();
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001819 current_ = &message_sorter;
Austin Schuh63097262023-08-16 17:04:29 -07001820 oldest = msg;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001821 } else {
Austin Schuh63097262023-08-16 17:04:29 -07001822 CHECK_EQ(msg->data->monotonic_timestamp_time,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001823 oldest->data->monotonic_timestamp_time);
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001824 message_sorter.PopFront();
Austin Schuh8bf1e632021-01-02 22:41:04 -08001825 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001826 }
1827
1828 // PopFront may change this, so compute it down here.
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001829 sorted_until_ = std::min(sorted_until_, message_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001830 }
1831
Austin Schuhb000de62020-12-03 22:00:40 -08001832 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001833 CHECK_GE(oldest->timestamp.time, last_message_time_);
1834 last_message_time_ = oldest->timestamp.time;
Austin Schuh63097262023-08-16 17:04:29 -07001835 if (monotonic_oldest_time_ > oldest->timestamp.time) {
1836 VLOG(1) << this << " Updating oldest to " << oldest->timestamp.time
1837 << " for node " << node_name() << " with a start time of "
1838 << monotonic_start_time_ << " " << *oldest;
1839 }
Austin Schuh5dd22842021-11-17 16:09:39 -08001840 monotonic_oldest_time_ =
1841 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08001842 } else {
1843 last_message_time_ = monotonic_clock::max_time;
1844 }
1845
Austin Schuh8f52ed52020-11-30 23:12:39 -08001846 // Return the oldest message found. This will be nullptr if nothing was
1847 // found, indicating there is nothing left.
Austin Schuh63097262023-08-16 17:04:29 -07001848 if (oldest) {
1849 VLOG(1) << this << " PartsMerger::Front for node " << node_name() << " "
1850 << *oldest;
1851 } else {
1852 VLOG(1) << this << " PartsMerger::Front for node " << node_name();
1853 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001854 return oldest;
1855}
1856
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001857void PartsMerger::PopFront() {
Austin Schuh8f52ed52020-11-30 23:12:39 -08001858 CHECK(current_ != nullptr) << "Popping before calling Front()";
1859 current_->PopFront();
1860 current_ = nullptr;
1861}
1862
Alexei Strots1f51ac72023-05-15 10:14:54 -07001863BootMerger::BootMerger(std::string_view node_name,
Austin Schuh63097262023-08-16 17:04:29 -07001864 const LogFilesContainer &log_files,
1865 const std::vector<StoredDataType> &types)
1866 : configuration_(log_files.config()),
1867 node_(configuration::GetNodeIndex(configuration_.get(), node_name)) {
Alexei Strots1f51ac72023-05-15 10:14:54 -07001868 size_t number_of_boots = log_files.BootsForNode(node_name);
1869 parts_mergers_.reserve(number_of_boots);
1870 for (size_t i = 0; i < number_of_boots; ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07001871 VLOG(2) << "Boot " << i;
Austin Schuh63097262023-08-16 17:04:29 -07001872 SelectedLogParts selected_parts =
1873 log_files.SelectParts(node_name, i, types);
1874 // We are guarenteed to have something each boot, but not guarenteed to have
1875 // both timestamps and data for each boot. If we don't have anything, don't
1876 // create a parts merger. The rest of this class will detect that and
1877 // ignore it as required.
1878 if (selected_parts.empty()) {
1879 parts_mergers_.emplace_back(nullptr);
1880 } else {
1881 parts_mergers_.emplace_back(
1882 std::make_unique<PartsMerger>(std::move(selected_parts)));
1883 }
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001884 }
1885}
1886
Austin Schuh63097262023-08-16 17:04:29 -07001887std::string_view BootMerger::node_name() const {
1888 return configuration::NodeName(configuration().get(), node());
1889}
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001890
Adam Snaider13d48d92023-08-03 12:20:15 -07001891const Message *BootMerger::Front() {
Austin Schuh63097262023-08-16 17:04:29 -07001892 if (parts_mergers_[index_].get() != nullptr) {
Adam Snaider13d48d92023-08-03 12:20:15 -07001893 const Message *result = parts_mergers_[index_]->Front();
Austin Schuh63097262023-08-16 17:04:29 -07001894
1895 if (result != nullptr) {
1896 VLOG(1) << this << " BootMerger::Front " << node_name() << " " << *result;
1897 return result;
1898 }
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001899 }
1900
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001901 if (index_ + 1u == parts_mergers_.size()) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001902 // At the end of the last node merger, just return.
Austin Schuh63097262023-08-16 17:04:29 -07001903 VLOG(1) << this << " BootMerger::Front " << node_name() << " nullptr";
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001904 return nullptr;
1905 } else {
1906 ++index_;
Adam Snaider13d48d92023-08-03 12:20:15 -07001907 const Message *result = Front();
Austin Schuh63097262023-08-16 17:04:29 -07001908 VLOG(1) << this << " BootMerger::Front " << node_name() << " " << *result;
1909 return result;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001910 }
1911}
1912
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001913void BootMerger::PopFront() { parts_mergers_[index_]->PopFront(); }
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001914
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001915std::vector<const LogParts *> BootMerger::Parts() const {
1916 std::vector<const LogParts *> results;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001917 for (const std::unique_ptr<PartsMerger> &parts_merger : parts_mergers_) {
Austin Schuh63097262023-08-16 17:04:29 -07001918 if (!parts_merger) continue;
1919
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001920 std::vector<const LogParts *> node_parts = parts_merger->Parts();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001921
1922 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
1923 std::make_move_iterator(node_parts.end()));
1924 }
1925
1926 return results;
1927}
1928
Austin Schuh63097262023-08-16 17:04:29 -07001929monotonic_clock::time_point BootMerger::monotonic_start_time(
1930 size_t boot) const {
1931 CHECK_LT(boot, parts_mergers_.size());
1932 if (parts_mergers_[boot]) {
1933 return parts_mergers_[boot]->monotonic_start_time();
Austin Schuh0ca51f32020-12-25 21:51:45 -08001934 }
Austin Schuh63097262023-08-16 17:04:29 -07001935 return monotonic_clock::min_time;
1936}
1937
1938realtime_clock::time_point BootMerger::realtime_start_time(size_t boot) const {
1939 CHECK_LT(boot, parts_mergers_.size());
1940 if (parts_mergers_[boot]) {
1941 return parts_mergers_[boot]->realtime_start_time();
1942 }
1943 return realtime_clock::min_time;
1944}
1945
1946monotonic_clock::time_point BootMerger::monotonic_oldest_time(
1947 size_t boot) const {
1948 CHECK_LT(boot, parts_mergers_.size());
1949 if (parts_mergers_[boot]) {
1950 return parts_mergers_[boot]->monotonic_oldest_time();
1951 }
1952 return monotonic_clock::max_time;
1953}
1954
1955bool BootMerger::started() const {
1956 if (index_ == 0) {
1957 if (!parts_mergers_[0]) {
1958 return false;
1959 }
1960 return parts_mergers_[index_]->sorted_until() != monotonic_clock::min_time;
1961 }
1962 return true;
1963}
1964
1965SplitTimestampBootMerger::SplitTimestampBootMerger(
1966 std::string_view node_name, const LogFilesContainer &log_files,
1967 TimestampQueueStrategy timestamp_queue_strategy)
1968 : boot_merger_(node_name, log_files,
1969 (timestamp_queue_strategy ==
1970 TimestampQueueStrategy::kQueueTimestampsAtStartup)
1971 ? std::vector<StoredDataType>{StoredDataType::DATA}
1972 : std::vector<StoredDataType>{
1973 StoredDataType::DATA, StoredDataType::TIMESTAMPS,
1974 StoredDataType::REMOTE_TIMESTAMPS}) {
1975 // Make the timestamp_boot_merger_ only if we are asked to, and if there are
1976 // files to put in it. We don't need it for a data only log.
1977 if (timestamp_queue_strategy ==
1978 TimestampQueueStrategy::kQueueTimestampsAtStartup &&
1979 log_files.HasTimestamps(node_name)) {
1980 timestamp_boot_merger_ = std::make_unique<BootMerger>(
1981 node_name, log_files,
1982 std::vector<StoredDataType>{StoredDataType::TIMESTAMPS,
1983 StoredDataType::REMOTE_TIMESTAMPS});
1984 }
1985
1986 size_t number_of_boots = log_files.BootsForNode(node_name);
1987 monotonic_start_time_.reserve(number_of_boots);
1988 realtime_start_time_.reserve(number_of_boots);
1989
1990 // Start times are split across the timestamp boot merger, and data boot
1991 // merger. Pull from both and combine them to get the same answer as before.
1992 for (size_t i = 0u; i < number_of_boots; ++i) {
1993 StartTimes start_times;
1994
1995 if (timestamp_boot_merger_) {
1996 start_times.Update(timestamp_boot_merger_->monotonic_start_time(i),
1997 timestamp_boot_merger_->realtime_start_time(i));
1998 }
1999
2000 start_times.Update(boot_merger_.monotonic_start_time(i),
2001 boot_merger_.realtime_start_time(i));
2002
2003 monotonic_start_time_.push_back(start_times.monotonic_start_time());
2004 realtime_start_time_.push_back(start_times.realtime_start_time());
2005 }
2006}
2007
2008void SplitTimestampBootMerger::QueueTimestamps(
2009 std::function<void(TimestampedMessage *)> fn,
2010 const std::vector<size_t> &source_node) {
2011 if (!timestamp_boot_merger_) {
2012 return;
2013 }
2014
2015 while (true) {
2016 // Load all the timestamps. If we find data, ignore it and drop it on the
2017 // floor. It will be read when boot_merger_ is used.
Adam Snaider13d48d92023-08-03 12:20:15 -07002018 const Message *msg = timestamp_boot_merger_->Front();
Austin Schuh63097262023-08-16 17:04:29 -07002019 if (!msg) {
2020 queue_timestamps_ran_ = true;
2021 return;
2022 }
2023 if (source_node[msg->channel_index] != static_cast<size_t>(node())) {
2024 timestamp_messages_.emplace_back(TimestampedMessage{
2025 .channel_index = msg->channel_index,
2026 .queue_index = msg->queue_index,
2027 .monotonic_event_time = msg->timestamp,
2028 .realtime_event_time = msg->data->realtime_sent_time,
2029 .remote_queue_index =
2030 BootQueueIndex{.boot = msg->monotonic_remote_boot,
2031 .index = msg->data->remote_queue_index.value()},
2032 .monotonic_remote_time = {msg->monotonic_remote_boot,
2033 msg->data->monotonic_remote_time.value()},
2034 .realtime_remote_time = msg->data->realtime_remote_time.value(),
Austin Schuhb5224ec2024-03-27 15:20:09 -07002035 .monotonic_remote_transmit_time =
2036 {msg->monotonic_remote_boot,
2037 msg->data->monotonic_remote_transmit_time},
Austin Schuh63097262023-08-16 17:04:29 -07002038 .monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
2039 msg->data->monotonic_timestamp_time},
2040 .data = std::move(msg->data)});
2041
2042 VLOG(2) << this << " Queued timestamp of " << timestamp_messages_.back();
2043 fn(&timestamp_messages_.back());
2044 } else {
2045 VLOG(2) << this << " Dropped data";
2046 }
2047 timestamp_boot_merger_->PopFront();
2048 }
2049
2050 // TODO(austin): Push the queue into TimestampMapper instead. Have it pull
2051 // all the timestamps. That will also make it so we don't have to clear the
2052 // function.
2053}
2054
2055std::string_view SplitTimestampBootMerger::node_name() const {
2056 return configuration::NodeName(configuration().get(), node());
2057}
2058
2059monotonic_clock::time_point SplitTimestampBootMerger::monotonic_start_time(
2060 size_t boot) const {
2061 CHECK_LT(boot, monotonic_start_time_.size());
2062 return monotonic_start_time_[boot];
2063}
2064
2065realtime_clock::time_point SplitTimestampBootMerger::realtime_start_time(
2066 size_t boot) const {
2067 CHECK_LT(boot, realtime_start_time_.size());
2068 return realtime_start_time_[boot];
2069}
2070
2071monotonic_clock::time_point SplitTimestampBootMerger::monotonic_oldest_time(
2072 size_t boot) const {
2073 if (!timestamp_boot_merger_) {
2074 return boot_merger_.monotonic_oldest_time(boot);
2075 }
2076 return std::min(boot_merger_.monotonic_oldest_time(boot),
2077 timestamp_boot_merger_->monotonic_oldest_time(boot));
2078}
2079
Adam Snaider13d48d92023-08-03 12:20:15 -07002080const Message *SplitTimestampBootMerger::Front() {
2081 const Message *boot_merger_front = boot_merger_.Front();
Austin Schuh63097262023-08-16 17:04:29 -07002082
2083 if (timestamp_boot_merger_) {
2084 CHECK(queue_timestamps_ran_);
2085 }
2086
2087 // timestamp_messages_ is a queue of TimestampedMessage, but we are supposed
2088 // to return a Message. We need to convert the first message in the list
2089 // before returning it (and comparing, honestly). Fill next_timestamp_ in if
2090 // it is empty so the rest of the logic here can just look at next_timestamp_
2091 // and use that instead.
2092 if (!next_timestamp_ && !timestamp_messages_.empty()) {
2093 auto &front = timestamp_messages_.front();
2094 next_timestamp_ = Message{
2095 .channel_index = front.channel_index,
2096 .queue_index = front.queue_index,
2097 .timestamp = front.monotonic_event_time,
2098 .monotonic_remote_boot = front.remote_queue_index.boot,
2099 .monotonic_timestamp_boot = front.monotonic_timestamp_time.boot,
2100 .data = std::move(front.data),
2101 };
2102 timestamp_messages_.pop_front();
2103 }
2104
2105 if (!next_timestamp_) {
2106 message_source_ = MessageSource::kBootMerger;
2107 if (boot_merger_front != nullptr) {
2108 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2109 << " " << *boot_merger_front;
2110 } else {
2111 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2112 << " nullptr";
2113 }
2114 return boot_merger_front;
2115 }
2116
2117 if (boot_merger_front == nullptr) {
2118 message_source_ = MessageSource::kTimestampMessage;
2119
2120 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
2121 << next_timestamp_.value();
2122 return &next_timestamp_.value();
2123 }
2124
2125 if (*boot_merger_front <= next_timestamp_.value()) {
2126 if (*boot_merger_front == next_timestamp_.value()) {
2127 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2128 << " Dropping duplicate timestamp.";
2129 next_timestamp_.reset();
2130 }
2131 message_source_ = MessageSource::kBootMerger;
2132 if (boot_merger_front != nullptr) {
2133 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2134 << " " << *boot_merger_front;
2135 } else {
2136 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2137 << " nullptr";
2138 }
2139 return boot_merger_front;
2140 } else {
2141 message_source_ = MessageSource::kTimestampMessage;
2142 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
2143 << next_timestamp_.value();
2144 return &next_timestamp_.value();
2145 }
2146}
2147
2148void SplitTimestampBootMerger::PopFront() {
2149 switch (message_source_) {
2150 case MessageSource::kTimestampMessage:
2151 CHECK(next_timestamp_.has_value());
2152 next_timestamp_.reset();
2153 break;
2154 case MessageSource::kBootMerger:
2155 boot_merger_.PopFront();
2156 break;
2157 }
2158}
2159
2160TimestampMapper::TimestampMapper(
2161 std::string_view node_name, const LogFilesContainer &log_files,
2162 TimestampQueueStrategy timestamp_queue_strategy)
2163 : boot_merger_(node_name, log_files, timestamp_queue_strategy),
2164 timestamp_callback_([](TimestampedMessage *) {}) {
2165 configuration_ = boot_merger_.configuration();
2166
Austin Schuh0ca51f32020-12-25 21:51:45 -08002167 const Configuration *config = configuration_.get();
Alexei Strots1f51ac72023-05-15 10:14:54 -07002168 // Only fill out nodes_data_ if there are nodes. Otherwise, everything is
Austin Schuhd2f96102020-12-01 20:27:29 -08002169 // pretty simple.
2170 if (configuration::MultiNode(config)) {
2171 nodes_data_.resize(config->nodes()->size());
2172 const Node *my_node = config->nodes()->Get(node());
2173 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
2174 const Node *node = config->nodes()->Get(node_index);
2175 NodeData *node_data = &nodes_data_[node_index];
2176 node_data->channels.resize(config->channels()->size());
2177 // We should save the channel if it is delivered to the node represented
2178 // by the NodeData, but not sent by that node. That combo means it is
2179 // forwarded.
2180 size_t channel_index = 0;
2181 node_data->any_delivered = false;
2182 for (const Channel *channel : *config->channels()) {
2183 node_data->channels[channel_index].delivered =
2184 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08002185 configuration::ChannelIsSendableOnNode(channel, my_node) &&
2186 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08002187 node_data->any_delivered = node_data->any_delivered ||
2188 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08002189 if (node_data->channels[channel_index].delivered) {
2190 const Connection *connection =
2191 configuration::ConnectionToNode(channel, node);
2192 node_data->channels[channel_index].time_to_live =
2193 chrono::nanoseconds(connection->time_to_live());
2194 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002195 ++channel_index;
2196 }
2197 }
2198
2199 for (const Channel *channel : *config->channels()) {
2200 source_node_.emplace_back(configuration::GetNodeIndex(
2201 config, channel->source_node()->string_view()));
2202 }
2203 }
2204}
2205
2206void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002207 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08002208 CHECK_NE(timestamp_mapper->node(), node());
2209 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
2210
2211 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002212 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08002213 // we could needlessly save data.
2214 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08002215 VLOG(1) << "Registering on node " << node() << " for peer node "
2216 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08002217 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
2218
2219 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07002220
2221 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08002222 }
2223}
2224
Adam Snaider13d48d92023-08-03 12:20:15 -07002225void TimestampMapper::QueueMessage(const Message *msg) {
Austin Schuhb5224ec2024-03-27 15:20:09 -07002226 matched_messages_.emplace_back(TimestampedMessage{
2227 .channel_index = msg->channel_index,
2228 .queue_index = msg->queue_index,
2229 .monotonic_event_time = msg->timestamp,
2230 .realtime_event_time = msg->data->realtime_sent_time,
2231 .remote_queue_index = BootQueueIndex::Invalid(),
2232 .monotonic_remote_time = BootTimestamp::min_time(),
2233 .realtime_remote_time = realtime_clock::min_time,
2234 .monotonic_remote_transmit_time = BootTimestamp::min_time(),
2235 .monotonic_timestamp_time = BootTimestamp::min_time(),
2236 .data = std::move(msg->data)});
Austin Schuh63097262023-08-16 17:04:29 -07002237 VLOG(1) << node_name() << " Inserted " << matched_messages_.back();
Austin Schuhd2f96102020-12-01 20:27:29 -08002238}
2239
2240TimestampedMessage *TimestampMapper::Front() {
2241 // No need to fetch anything new. A previous message still exists.
2242 switch (first_message_) {
2243 case FirstMessage::kNeedsUpdate:
2244 break;
2245 case FirstMessage::kInMessage:
Austin Schuh63097262023-08-16 17:04:29 -07002246 VLOG(1) << this << " TimestampMapper::Front " << node_name() << " "
2247 << matched_messages_.front();
Austin Schuh79b30942021-01-24 22:32:21 -08002248 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002249 case FirstMessage::kNullptr:
Austin Schuh63097262023-08-16 17:04:29 -07002250 VLOG(1) << this << " TimestampMapper::Front " << node_name()
2251 << " nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08002252 return nullptr;
2253 }
2254
Austin Schuh79b30942021-01-24 22:32:21 -08002255 if (matched_messages_.empty()) {
2256 if (!QueueMatched()) {
2257 first_message_ = FirstMessage::kNullptr;
Austin Schuh63097262023-08-16 17:04:29 -07002258 VLOG(1) << this << " TimestampMapper::Front " << node_name()
2259 << " nullptr";
Austin Schuh79b30942021-01-24 22:32:21 -08002260 return nullptr;
2261 }
2262 }
2263 first_message_ = FirstMessage::kInMessage;
Austin Schuh63097262023-08-16 17:04:29 -07002264 VLOG(1) << this << " TimestampMapper::Front " << node_name() << " "
2265 << matched_messages_.front();
Austin Schuh79b30942021-01-24 22:32:21 -08002266 return &matched_messages_.front();
2267}
2268
2269bool TimestampMapper::QueueMatched() {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002270 MatchResult result = MatchResult::kEndOfFile;
2271 do {
2272 result = MaybeQueueMatched();
2273 } while (result == MatchResult::kSkipped);
2274 return result == MatchResult::kQueued;
2275}
2276
2277bool TimestampMapper::CheckReplayChannelsAndMaybePop(
2278 const TimestampedMessage & /*message*/) {
2279 if (replay_channels_callback_ &&
2280 !replay_channels_callback_(matched_messages_.back())) {
Austin Schuh63097262023-08-16 17:04:29 -07002281 VLOG(1) << node_name() << " Popped " << matched_messages_.back();
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002282 matched_messages_.pop_back();
2283 return true;
2284 }
2285 return false;
2286}
2287
2288TimestampMapper::MatchResult TimestampMapper::MaybeQueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08002289 if (nodes_data_.empty()) {
2290 // Simple path. We are single node, so there are no timestamps to match!
2291 CHECK_EQ(messages_.size(), 0u);
Adam Snaider13d48d92023-08-03 12:20:15 -07002292 const Message *msg = boot_merger_.Front();
Austin Schuh63097262023-08-16 17:04:29 -07002293 if (!msg) {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002294 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002295 }
Austin Schuh79b30942021-01-24 22:32:21 -08002296 // Enqueue this message into matched_messages_ so we have a place to
2297 // associate remote timestamps, and return it.
Austin Schuh63097262023-08-16 17:04:29 -07002298 QueueMessage(msg);
Austin Schuhd2f96102020-12-01 20:27:29 -08002299
Austin Schuh63097262023-08-16 17:04:29 -07002300 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
2301 << " on " << node_name();
Austin Schuh79b30942021-01-24 22:32:21 -08002302 last_message_time_ = matched_messages_.back().monotonic_event_time;
2303
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002304 // We are thin wrapper around parts_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002305 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08002306 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002307 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2308 return MatchResult::kSkipped;
2309 }
2310 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002311 }
2312
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002313 // We need to only add messages to the list so they get processed for
2314 // messages which are delivered. Reuse the flow below which uses messages_
2315 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08002316 if (messages_.empty()) {
2317 if (!Queue()) {
2318 // Found nothing to add, we are out of data!
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002319 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002320 }
2321
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002322 // Now that it has been added (and cannibalized), forget about it
2323 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002324 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002325 }
2326
Austin Schuh63097262023-08-16 17:04:29 -07002327 Message *msg = &(messages_.front());
Austin Schuhd2f96102020-12-01 20:27:29 -08002328
Austin Schuh63097262023-08-16 17:04:29 -07002329 if (source_node_[msg->channel_index] == node()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002330 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh63097262023-08-16 17:04:29 -07002331 QueueMessage(msg);
2332 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
2333 << " on " << node_name();
Austin Schuh79b30942021-01-24 22:32:21 -08002334 last_message_time_ = matched_messages_.back().monotonic_event_time;
2335 messages_.pop_front();
2336 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002337 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2338 return MatchResult::kSkipped;
2339 }
2340 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002341 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002342 // Got a timestamp, find the matching remote data, match it, and return
2343 // it.
Austin Schuh63097262023-08-16 17:04:29 -07002344 Message data = MatchingMessageFor(*msg);
Austin Schuhd2f96102020-12-01 20:27:29 -08002345
2346 // Return the data from the remote. The local message only has timestamp
2347 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08002348 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuh63097262023-08-16 17:04:29 -07002349 .channel_index = msg->channel_index,
2350 .queue_index = msg->queue_index,
2351 .monotonic_event_time = msg->timestamp,
2352 .realtime_event_time = msg->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07002353 .remote_queue_index =
Austin Schuh63097262023-08-16 17:04:29 -07002354 BootQueueIndex{.boot = msg->monotonic_remote_boot,
2355 .index = msg->data->remote_queue_index.value()},
2356 .monotonic_remote_time = {msg->monotonic_remote_boot,
2357 msg->data->monotonic_remote_time.value()},
2358 .realtime_remote_time = msg->data->realtime_remote_time.value(),
Austin Schuhb5224ec2024-03-27 15:20:09 -07002359 .monotonic_remote_transmit_time =
2360 {msg->monotonic_remote_boot,
2361 msg->data->monotonic_remote_transmit_time},
Austin Schuh63097262023-08-16 17:04:29 -07002362 .monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
2363 msg->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08002364 .data = std::move(data.data)});
Austin Schuh63097262023-08-16 17:04:29 -07002365 VLOG(1) << node_name() << " Inserted timestamp "
2366 << matched_messages_.back();
2367 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
2368 << " on " << node_name() << " " << matched_messages_.back();
Austin Schuh79b30942021-01-24 22:32:21 -08002369 last_message_time_ = matched_messages_.back().monotonic_event_time;
2370 // Since messages_ holds the data, drop it.
2371 messages_.pop_front();
2372 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002373 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2374 return MatchResult::kSkipped;
2375 }
2376 return MatchResult::kQueued;
Austin Schuh79b30942021-01-24 22:32:21 -08002377 }
2378}
2379
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002380void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08002381 while (last_message_time_ <= queue_time) {
2382 if (!QueueMatched()) {
2383 return;
2384 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002385 }
2386}
2387
Austin Schuhe639ea12021-01-25 13:00:22 -08002388void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002389 // Note: queueing for time doesn't really work well across boots. So we
2390 // just assume that if you are using this, you only care about the current
2391 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002392 //
2393 // TODO(austin): Is that the right concept?
2394 //
Austin Schuhe639ea12021-01-25 13:00:22 -08002395 // Make sure we have something queued first. This makes the end time
2396 // calculation simpler, and is typically what folks want regardless.
2397 if (matched_messages_.empty()) {
2398 if (!QueueMatched()) {
2399 return;
2400 }
2401 }
2402
2403 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002404 std::max(monotonic_start_time(
2405 matched_messages_.front().monotonic_event_time.boot),
2406 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08002407 time_estimation_buffer;
2408
2409 // Place sorted messages on the list until we have
2410 // --time_estimation_buffer_seconds seconds queued up (but queue at least
2411 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002412 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08002413 if (!QueueMatched()) {
2414 return;
2415 }
2416 }
2417}
2418
Austin Schuhd2f96102020-12-01 20:27:29 -08002419void TimestampMapper::PopFront() {
2420 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08002421 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002422 first_message_ = FirstMessage::kNeedsUpdate;
2423
Austin Schuh63097262023-08-16 17:04:29 -07002424 VLOG(1) << node_name() << " Popped " << matched_messages_.back();
Austin Schuh79b30942021-01-24 22:32:21 -08002425 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002426}
2427
2428Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002429 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002430 CHECK_NOTNULL(message.data);
2431 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07002432 const BootQueueIndex remote_queue_index =
2433 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002434 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08002435
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002436 CHECK(message.data->monotonic_remote_time.has_value());
2437 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08002438
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002439 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07002440 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002441 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002442 const realtime_clock::time_point realtime_remote_time =
2443 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002444
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002445 TimestampMapper *peer =
2446 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08002447
2448 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002449 // asked to pull a timestamp from a peer which doesn't exist, return an
2450 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08002451 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002452 // TODO(austin): Make sure the tests hit all these paths with a boot count
2453 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002454 return Message{.channel_index = message.channel_index,
2455 .queue_index = remote_queue_index,
2456 .timestamp = monotonic_remote_time,
2457 .monotonic_remote_boot = 0xffffff,
2458 .monotonic_timestamp_boot = 0xffffff,
2459 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08002460 }
2461
2462 // The queue which will have the matching data, if available.
2463 std::deque<Message> *data_queue =
2464 &peer->nodes_data_[node()].channels[message.channel_index].messages;
2465
Austin Schuh79b30942021-01-24 22:32:21 -08002466 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08002467
2468 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002469 return Message{.channel_index = message.channel_index,
2470 .queue_index = remote_queue_index,
2471 .timestamp = monotonic_remote_time,
2472 .monotonic_remote_boot = 0xffffff,
2473 .monotonic_timestamp_boot = 0xffffff,
2474 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002475 }
2476
Austin Schuhd2f96102020-12-01 20:27:29 -08002477 if (remote_queue_index < data_queue->front().queue_index ||
2478 remote_queue_index > data_queue->back().queue_index) {
Austin Schuh60e77942022-05-16 17:48:24 -07002479 return Message{.channel_index = message.channel_index,
2480 .queue_index = remote_queue_index,
2481 .timestamp = monotonic_remote_time,
2482 .monotonic_remote_boot = 0xffffff,
2483 .monotonic_timestamp_boot = 0xffffff,
2484 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002485 }
2486
Austin Schuh993ccb52020-12-12 15:59:32 -08002487 // The algorithm below is constant time with some assumptions. We need there
2488 // to be no missing messages in the data stream. This also assumes a queue
2489 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07002490 if (data_queue->back().queue_index.boot ==
2491 data_queue->front().queue_index.boot &&
2492 (data_queue->back().queue_index.index -
2493 data_queue->front().queue_index.index + 1u ==
2494 data_queue->size())) {
2495 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08002496 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07002497 //
2498 // TODO(austin): Move if not reliable.
2499 Message result = (*data_queue)[remote_queue_index.index -
2500 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08002501
2502 CHECK_EQ(result.timestamp, monotonic_remote_time)
2503 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh6a7358f2021-11-18 22:40:40 -08002504 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08002505 << ": Queue index matches, but timestamp doesn't. Please investigate!";
2506 // Now drop the data off the front. We have deduplicated timestamps, so we
2507 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07002508 data_queue->erase(
2509 data_queue->begin(),
2510 data_queue->begin() +
2511 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08002512 return result;
2513 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07002514 // TODO(austin): Binary search.
2515 auto it = std::find_if(
2516 data_queue->begin(), data_queue->end(),
2517 [remote_queue_index,
Austin Schuh63097262023-08-16 17:04:29 -07002518 remote_boot = monotonic_remote_time.boot](const Message &msg) {
2519 return msg.queue_index == remote_queue_index &&
2520 msg.timestamp.boot == remote_boot;
Austin Schuh58646e22021-08-23 23:51:46 -07002521 });
Austin Schuh993ccb52020-12-12 15:59:32 -08002522 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002523 return Message{.channel_index = message.channel_index,
2524 .queue_index = remote_queue_index,
2525 .timestamp = monotonic_remote_time,
2526 .monotonic_remote_boot = 0xffffff,
2527 .monotonic_timestamp_boot = 0xffffff,
2528 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08002529 }
2530
2531 Message result = std::move(*it);
2532
2533 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002534 << ": Queue index matches, but timestamp doesn't. Please "
2535 "investigate!";
2536 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
2537 << ": Queue index matches, but timestamp doesn't. Please "
2538 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08002539
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08002540 // Erase everything up to this message. We want to keep 1 message in the
2541 // queue so we can handle reliable messages forwarded across boots.
2542 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08002543
2544 return result;
2545 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002546}
2547
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002548void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002549 if (queued_until_ > t) {
2550 return;
2551 }
2552 while (true) {
2553 if (!messages_.empty() && messages_.back().timestamp > t) {
2554 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
2555 return;
2556 }
2557
2558 if (!Queue()) {
2559 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002560 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08002561 return;
2562 }
2563
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002564 // Now that it has been added (and cannibalized), forget about it
2565 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002566 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002567 }
2568}
2569
2570bool TimestampMapper::Queue() {
Adam Snaider13d48d92023-08-03 12:20:15 -07002571 const Message *msg = boot_merger_.Front();
Austin Schuh63097262023-08-16 17:04:29 -07002572 if (msg == nullptr) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002573 return false;
2574 }
2575 for (NodeData &node_data : nodes_data_) {
2576 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07002577 if (!node_data.save_for_peer) continue;
Austin Schuh63097262023-08-16 17:04:29 -07002578 if (node_data.channels[msg->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08002579 // If we have data but no timestamps (logs where the timestamps didn't get
2580 // logged are classic), we can grow this indefinitely. We don't need to
2581 // keep anything that is older than the last message returned.
2582
2583 // We have the time on the source node.
2584 // We care to wait until we have the time on the destination node.
2585 std::deque<Message> &messages =
Austin Schuh63097262023-08-16 17:04:29 -07002586 node_data.channels[msg->channel_index].messages;
Austin Schuh6a7358f2021-11-18 22:40:40 -08002587 // Max delay over the network is the TTL, so let's take the queue time and
2588 // add TTL to it. Don't forget any messages which are reliable until
2589 // someone can come up with a good reason to forget those too.
Austin Schuh63097262023-08-16 17:04:29 -07002590 if (node_data.channels[msg->channel_index].time_to_live >
Austin Schuh6a7358f2021-11-18 22:40:40 -08002591 chrono::nanoseconds(0)) {
2592 // We need to make *some* assumptions about network delay for this to
2593 // work. We want to only look at the RX side. This means we need to
2594 // track the last time a message was popped from any channel from the
2595 // node sending this message, and compare that to the max time we expect
2596 // that a message will take to be delivered across the network. This
2597 // assumes that messages are popped in time order as a proxy for
2598 // measuring the distributed time at this layer.
2599 //
2600 // Leave at least 1 message in here so we can handle reboots and
2601 // messages getting sent twice.
2602 while (messages.size() > 1u &&
2603 messages.begin()->timestamp +
Austin Schuh63097262023-08-16 17:04:29 -07002604 node_data.channels[msg->channel_index].time_to_live +
Austin Schuh6a7358f2021-11-18 22:40:40 -08002605 chrono::duration_cast<chrono::nanoseconds>(
2606 chrono::duration<double>(FLAGS_max_network_delay)) <
2607 last_popped_message_time_) {
2608 messages.pop_front();
2609 }
2610 }
Austin Schuh63097262023-08-16 17:04:29 -07002611 node_data.channels[msg->channel_index].messages.emplace_back(*msg);
Austin Schuhd2f96102020-12-01 20:27:29 -08002612 }
2613 }
2614
Austin Schuh63097262023-08-16 17:04:29 -07002615 messages_.emplace_back(std::move(*msg));
Austin Schuhd2f96102020-12-01 20:27:29 -08002616 return true;
2617}
2618
Austin Schuh63097262023-08-16 17:04:29 -07002619void TimestampMapper::QueueTimestamps() {
2620 boot_merger_.QueueTimestamps(std::ref(timestamp_callback_), source_node_);
2621}
2622
Austin Schuhd2f96102020-12-01 20:27:29 -08002623std::string TimestampMapper::DebugString() const {
2624 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07002625 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002626 for (const Message &message : messages_) {
2627 ss << " " << message << "\n";
2628 }
2629 ss << "] queued_until " << queued_until_;
2630 for (const NodeData &ns : nodes_data_) {
2631 if (ns.peer == nullptr) continue;
2632 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
2633 size_t channel_index = 0;
2634 for (const NodeData::ChannelData &channel_data :
2635 ns.peer->nodes_data_[node()].channels) {
2636 if (channel_data.messages.empty()) {
2637 continue;
2638 }
Austin Schuhb000de62020-12-03 22:00:40 -08002639
Austin Schuhd2f96102020-12-01 20:27:29 -08002640 ss << " channel " << channel_index << " [\n";
Austin Schuh63097262023-08-16 17:04:29 -07002641 for (const Message &msg : channel_data.messages) {
2642 ss << " " << msg << "\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002643 }
2644 ss << " ]\n";
2645 ++channel_index;
2646 }
2647 ss << "] queued_until " << ns.peer->queued_until_;
2648 }
2649 return ss.str();
2650}
2651
Brian Silvermanf51499a2020-09-21 12:49:08 -07002652} // namespace aos::logger