blob: 326fa6d780b1dbcbf58a191d6434e5873276ec53 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Alexei Strots01395492023-03-20 13:59:56 -070010#include <filesystem>
Austin Schuha36c8902019-12-30 18:07:15 -080011
Austin Schuhe4fca832020-03-07 16:58:53 -080012#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070013#include "flatbuffers/flatbuffers.h"
14#include "gflags/gflags.h"
15#include "glog/logging.h"
16
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070018#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080019#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080020#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080021
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070022#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070023#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070024#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070025#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070026#else
27#define ENABLE_LZMA 0
28#endif
29
30#if ENABLE_LZMA
31#include "aos/events/logging/lzma_encoder.h"
32#endif
Austin Schuh86110712022-09-16 15:40:54 -070033#if ENABLE_S3
34#include "aos/events/logging/s3_fetcher.h"
35#endif
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070036
Austin Schuh48d10d62022-10-16 22:19:23 -070037DEFINE_int32(flush_size, 128 * 1024,
Austin Schuha36c8902019-12-30 18:07:15 -080038 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070039DEFINE_double(
40 flush_period, 5.0,
41 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080042
Austin Schuha040c3f2021-02-13 16:09:07 -080043DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080044 max_network_delay, 1.0,
45 "Max time to assume a message takes to cross the network before we are "
46 "willing to drop it from our buffers and assume it didn't make it. "
47 "Increasing this number can increase memory usage depending on the packet "
48 "loss of your network or if the timestamps aren't logged for a message.");
49
50DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080051 max_out_of_order, -1,
52 "If set, this overrides the max out of order duration for a log file.");
53
Austin Schuh0e8db662021-07-06 10:43:47 -070054DEFINE_bool(workaround_double_headers, true,
55 "Some old log files have two headers at the beginning. Use the "
56 "last header as the actual header.");
57
Brian Smarttea913d42021-12-10 15:02:38 -080058DEFINE_bool(crash_on_corrupt_message, true,
59 "When true, MessageReader will crash the first time a message "
60 "with corrupted format is found. When false, the crash will be "
61 "suppressed, and any remaining readable messages will be "
62 "evaluated to present verified vs corrupted stats.");
63
64DEFINE_bool(ignore_corrupt_messages, false,
65 "When true, and crash_on_corrupt_message is false, then any "
66 "corrupt message found by MessageReader be silently ignored, "
67 "providing access to all uncorrupted messages in a logfile.");
68
Alexei Strotsa3194712023-04-21 23:30:50 -070069DECLARE_bool(quiet_sorting);
70
Brian Silvermanf51499a2020-09-21 12:49:08 -070071namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070072namespace {
Austin Schuh05b70472020-01-01 17:11:17 -080073namespace chrono = std::chrono;
74
Alexei Strotscee7b372023-04-21 11:57:54 -070075std::unique_ptr<DataDecoder> ResolveDecoder(std::string_view filename,
76 bool quiet) {
77 static constexpr std::string_view kS3 = "s3:";
78
79 std::unique_ptr<DataDecoder> decoder;
80
81 if (filename.substr(0, kS3.size()) == kS3) {
82#if ENABLE_S3
83 decoder = std::make_unique<S3Fetcher>(filename);
84#else
85 LOG(FATAL) << "Reading files from S3 not supported on this platform";
86#endif
87 } else {
88 decoder = std::make_unique<DummyDecoder>(filename);
89 }
90
91 static constexpr std::string_view kXz = ".xz";
92 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
93 if (filename.substr(filename.size() - kXz.size()) == kXz) {
94#if ENABLE_LZMA
95 decoder = std::make_unique<ThreadedLzmaDecoder>(std::move(decoder), quiet);
96#else
97 (void)quiet;
98 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
99#endif
100 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
101 decoder = std::make_unique<SnappyDecoder>(std::move(decoder));
102 }
103 return decoder;
104}
105
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700106template <typename T>
107void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
108 if (t.has_value()) {
109 *os << *t;
110 } else {
111 *os << "null";
112 }
113}
Philipp Schrader10397952023-06-15 11:43:07 -0700114
115// A dummy LogSink implementation that handles the special case when we create
116// a DetachedBufferWriter when there's no space left on the system. The
117// DetachedBufferWriter frequently dereferences log_sink_, so we want a class
118// here that effectively refuses to do anything meaningful.
119class OutOfDiskSpaceLogSink : public LogSink {
120 public:
121 WriteCode OpenForWrite() override { return WriteCode::kOutOfSpace; }
122 WriteCode Close() override { return WriteCode::kOk; }
123 bool is_open() const override { return false; }
124 WriteResult Write(
125 const absl::Span<const absl::Span<const uint8_t>> &) override {
126 return WriteResult{
127 .code = WriteCode::kOutOfSpace,
128 .messages_written = 0,
129 };
130 }
131 std::string_view name() const override { return "<out_of_disk_space>"; }
132};
133
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700134} // namespace
135
Alexei Strotsbc082d82023-05-03 08:43:42 -0700136DetachedBufferWriter::DetachedBufferWriter(std::unique_ptr<LogSink> log_sink,
137 std::unique_ptr<DataEncoder> encoder)
138 : log_sink_(std::move(log_sink)), encoder_(std::move(encoder)) {
139 CHECK(log_sink_);
140 ran_out_of_space_ = log_sink_->OpenForWrite() == WriteCode::kOutOfSpace;
Alexei Strots01395492023-03-20 13:59:56 -0700141 if (ran_out_of_space_) {
142 LOG(WARNING) << "And we are out of space";
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800143 }
144}
145
Philipp Schrader10397952023-06-15 11:43:07 -0700146DetachedBufferWriter::DetachedBufferWriter(already_out_of_space_t)
147 : DetachedBufferWriter(std::make_unique<OutOfDiskSpaceLogSink>(), nullptr) {
148}
149
Austin Schuha36c8902019-12-30 18:07:15 -0800150DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700151 Close();
152 if (ran_out_of_space_) {
153 CHECK(acknowledge_ran_out_of_space_)
154 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -0700155 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700156}
157
Brian Silvermand90905f2020-09-23 14:42:56 -0700158DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700159 *this = std::move(other);
160}
161
Brian Silverman87ac0402020-09-17 14:47:01 -0700162// When other is destroyed "soon" (which it should be because we're getting an
163// rvalue reference to it), it will flush etc all the data we have queued up
164// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700165DetachedBufferWriter &DetachedBufferWriter::operator=(
166 DetachedBufferWriter &&other) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700167 std::swap(log_sink_, other.log_sink_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700168 std::swap(encoder_, other.encoder_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700169 std::swap(ran_out_of_space_, other.ran_out_of_space_);
170 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800171 std::swap(last_flush_time_, other.last_flush_time_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700172 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800173}
174
Austin Schuh8bdfc492023-02-11 12:53:13 -0800175void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *copier,
Austin Schuh7ef11a42023-02-04 17:15:12 -0800176 aos::monotonic_clock::time_point now) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700177 if (ran_out_of_space_) {
178 // We don't want any later data to be written after space becomes
179 // available, so refuse to write anything more once we've dropped data
180 // because we ran out of space.
Austin Schuh48d10d62022-10-16 22:19:23 -0700181 return;
Austin Schuha36c8902019-12-30 18:07:15 -0800182 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700183
Austin Schuh8bdfc492023-02-11 12:53:13 -0800184 const size_t message_size = copier->size();
185 size_t overall_bytes_written = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -0700186
Austin Schuh8bdfc492023-02-11 12:53:13 -0800187 // Keep writing chunks until we've written it all. If we end up with a
188 // partial write, this means we need to flush to disk.
189 do {
Alexei Strots01395492023-03-20 13:59:56 -0700190 const size_t bytes_written =
191 encoder_->Encode(copier, overall_bytes_written);
Austin Schuh8bdfc492023-02-11 12:53:13 -0800192 CHECK(bytes_written != 0);
193
194 overall_bytes_written += bytes_written;
195 if (overall_bytes_written < message_size) {
196 VLOG(1) << "Flushing because of a partial write, tried to write "
197 << message_size << " wrote " << overall_bytes_written;
198 Flush(now);
199 }
200 } while (overall_bytes_written < message_size);
201
Austin Schuhbd06ae42021-03-31 22:48:21 -0700202 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800203}
204
Brian Silverman0465fcf2020-09-24 00:29:18 -0700205void DetachedBufferWriter::Close() {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700206 if (!log_sink_->is_open()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700207 return;
208 }
209 encoder_->Finish();
210 while (encoder_->queue_size() > 0) {
Austin Schuh8bdfc492023-02-11 12:53:13 -0800211 Flush(monotonic_clock::max_time);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700212 }
Austin Schuhb2461652023-05-01 08:30:56 -0700213 encoder_.reset();
Alexei Strotsbc082d82023-05-03 08:43:42 -0700214 ran_out_of_space_ = log_sink_->Close() == WriteCode::kOutOfSpace;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700215}
216
Austin Schuh8bdfc492023-02-11 12:53:13 -0800217void DetachedBufferWriter::Flush(aos::monotonic_clock::time_point now) {
218 last_flush_time_ = now;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700219 if (ran_out_of_space_) {
220 // We don't want any later data to be written after space becomes available,
221 // so refuse to write anything more once we've dropped data because we ran
222 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700223 if (encoder_) {
224 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
225 encoder_->Clear(encoder_->queue().size());
226 } else {
227 VLOG(1) << "No queue to ignore";
228 }
229 return;
230 }
231
232 const auto queue = encoder_->queue();
233 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700234 return;
235 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700236
Alexei Strotsbc082d82023-05-03 08:43:42 -0700237 const WriteResult result = log_sink_->Write(queue);
Alexei Strots01395492023-03-20 13:59:56 -0700238 encoder_->Clear(result.messages_written);
239 ran_out_of_space_ = result.code == WriteCode::kOutOfSpace;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700240}
241
Austin Schuhbd06ae42021-03-31 22:48:21 -0700242void DetachedBufferWriter::FlushAtThreshold(
243 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700244 if (ran_out_of_space_) {
245 // We don't want any later data to be written after space becomes available,
246 // so refuse to write anything more once we've dropped data because we ran
247 // out of space.
248 if (encoder_) {
249 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
250 encoder_->Clear(encoder_->queue().size());
251 } else {
252 VLOG(1) << "No queue to ignore";
253 }
254 return;
255 }
256
Austin Schuhbd06ae42021-03-31 22:48:21 -0700257 // We don't want to flush the first time through. Otherwise we will flush as
258 // the log file header might be compressing, defeating any parallelism and
259 // queueing there.
260 if (last_flush_time_ == aos::monotonic_clock::min_time) {
261 last_flush_time_ = now;
262 }
263
Brian Silvermanf51499a2020-09-21 12:49:08 -0700264 // Flush if we are at the max number of iovs per writev, because there's no
265 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700266 // data queued up or if it has been long enough.
Austin Schuh8bdfc492023-02-11 12:53:13 -0800267 while (encoder_->space() == 0 ||
268 encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700269 encoder_->queue_size() >= IOV_MAX ||
Austin Schuh3ebaf782023-04-07 16:03:28 -0700270 (now > last_flush_time_ +
271 chrono::duration_cast<chrono::nanoseconds>(
272 chrono::duration<double>(FLAGS_flush_period)) &&
273 encoder_->queued_bytes() != 0)) {
274 VLOG(1) << "Chose to flush at " << now << ", last " << last_flush_time_
275 << " queued bytes " << encoder_->queued_bytes();
Austin Schuh8bdfc492023-02-11 12:53:13 -0800276 Flush(now);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700277 }
Austin Schuha36c8902019-12-30 18:07:15 -0800278}
279
Austin Schuhf2d0e682022-10-16 14:20:58 -0700280// Do the magic dance to convert the endianness of the data and append it to the
281// buffer.
282namespace {
283
284// TODO(austin): Look at the generated code to see if building the header is
285// efficient or not.
286template <typename T>
287uint8_t *Push(uint8_t *buffer, const T data) {
288 const T endian_data = flatbuffers::EndianScalar<T>(data);
289 std::memcpy(buffer, &endian_data, sizeof(T));
290 return buffer + sizeof(T);
291}
292
293uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
294 std::memcpy(buffer, data, size);
295 return buffer + size;
296}
297
298uint8_t *Pad(uint8_t *buffer, size_t padding) {
299 std::memset(buffer, 0, padding);
300 return buffer + padding;
301}
302} // namespace
303
304flatbuffers::Offset<MessageHeader> PackRemoteMessage(
305 flatbuffers::FlatBufferBuilder *fbb,
306 const message_bridge::RemoteMessage *msg, int channel_index,
307 const aos::monotonic_clock::time_point monotonic_timestamp_time) {
308 logger::MessageHeader::Builder message_header_builder(*fbb);
309 // Note: this must match the same order as MessageBridgeServer and
310 // PackMessage. We want identical headers to have identical
311 // on-the-wire formats to make comparing them easier.
312
313 message_header_builder.add_channel_index(channel_index);
314
315 message_header_builder.add_queue_index(msg->queue_index());
316 message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
317 message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
318
319 message_header_builder.add_monotonic_remote_time(
320 msg->monotonic_remote_time());
321 message_header_builder.add_realtime_remote_time(msg->realtime_remote_time());
322 message_header_builder.add_remote_queue_index(msg->remote_queue_index());
323
324 message_header_builder.add_monotonic_timestamp_time(
325 monotonic_timestamp_time.time_since_epoch().count());
326
327 return message_header_builder.Finish();
328}
329
330size_t PackRemoteMessageInline(
331 uint8_t *buffer, const message_bridge::RemoteMessage *msg,
332 int channel_index,
Austin Schuh71a40d42023-02-04 21:22:22 -0800333 const aos::monotonic_clock::time_point monotonic_timestamp_time,
334 size_t start_byte, size_t end_byte) {
Austin Schuhf2d0e682022-10-16 14:20:58 -0700335 const flatbuffers::uoffset_t message_size = PackRemoteMessageSize();
Austin Schuh71a40d42023-02-04 21:22:22 -0800336 DCHECK_EQ((start_byte % 8u), 0u);
337 DCHECK_EQ((end_byte % 8u), 0u);
338 DCHECK_LE(start_byte, end_byte);
339 DCHECK_LE(end_byte, message_size);
Austin Schuhf2d0e682022-10-16 14:20:58 -0700340
Austin Schuh71a40d42023-02-04 21:22:22 -0800341 switch (start_byte) {
342 case 0x00u:
343 if ((end_byte) == 0x00u) {
344 break;
345 }
346 // clang-format off
347 // header:
348 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
349 buffer = Push<flatbuffers::uoffset_t>(
350 buffer, message_size - sizeof(flatbuffers::uoffset_t));
351 // +0x04 | 20 00 00 00 | UOffset32 | 0x00000020 (32) Loc: +0x24 | offset to root table `aos.logger.MessageHeader`
352 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x20);
353 [[fallthrough]];
354 case 0x08u:
355 if ((end_byte) == 0x08u) {
356 break;
357 }
358 //
359 // padding:
360 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
361 buffer = Pad(buffer, 6);
362 //
363 // vtable (aos.logger.MessageHeader):
364 // +0x0E | 16 00 | uint16_t | 0x0016 (22) | size of this vtable
365 buffer = Push<flatbuffers::voffset_t>(buffer, 0x16);
366 [[fallthrough]];
367 case 0x10u:
368 if ((end_byte) == 0x10u) {
369 break;
370 }
371 // +0x10 | 3C 00 | uint16_t | 0x003C (60) | size of referring table
372 buffer = Push<flatbuffers::voffset_t>(buffer, 0x3c);
373 // +0x12 | 38 00 | VOffset16 | 0x0038 (56) | offset to field `channel_index` (id: 0)
374 buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
375 // +0x14 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `monotonic_sent_time` (id: 1)
376 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
377 // +0x16 | 24 00 | VOffset16 | 0x0024 (36) | offset to field `realtime_sent_time` (id: 2)
378 buffer = Push<flatbuffers::voffset_t>(buffer, 0x24);
379 [[fallthrough]];
380 case 0x18u:
381 if ((end_byte) == 0x18u) {
382 break;
383 }
384 // +0x18 | 34 00 | VOffset16 | 0x0034 (52) | offset to field `queue_index` (id: 3)
385 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
386 // +0x1A | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
387 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
388 // +0x1C | 1C 00 | VOffset16 | 0x001C (28) | offset to field `monotonic_remote_time` (id: 5)
389 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
390 // +0x1E | 14 00 | VOffset16 | 0x0014 (20) | offset to field `realtime_remote_time` (id: 6)
391 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
392 [[fallthrough]];
393 case 0x20u:
394 if ((end_byte) == 0x20u) {
395 break;
396 }
397 // +0x20 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `remote_queue_index` (id: 7)
398 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
399 // +0x22 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `monotonic_timestamp_time` (id: 8)
400 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
401 //
402 // root_table (aos.logger.MessageHeader):
403 // +0x24 | 16 00 00 00 | SOffset32 | 0x00000016 (22) Loc: +0x0E | offset to vtable
404 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x16);
405 [[fallthrough]];
406 case 0x28u:
407 if ((end_byte) == 0x28u) {
408 break;
409 }
410 // +0x28 | F6 0B D8 11 A4 A8 B1 71 | int64_t | 0x71B1A8A411D80BF6 (8192514619791117302) | table field `monotonic_timestamp_time` (Long)
411 buffer = Push<int64_t>(buffer,
412 monotonic_timestamp_time.time_since_epoch().count());
413 [[fallthrough]];
414 case 0x30u:
415 if ((end_byte) == 0x30u) {
416 break;
417 }
418 // +0x30 | 00 00 00 00 | uint8_t[4] | .... | padding
419 // TODO(austin): Can we re-arrange the order to ditch the padding?
420 // (Answer is yes, but what is the impact elsewhere? It will change the
421 // binary format)
422 buffer = Pad(buffer, 4);
423 // +0x34 | 75 00 00 00 | uint32_t | 0x00000075 (117) | table field `remote_queue_index` (UInt)
424 buffer = Push<uint32_t>(buffer, msg->remote_queue_index());
425 [[fallthrough]];
426 case 0x38u:
427 if ((end_byte) == 0x38u) {
428 break;
429 }
430 // +0x38 | AA B0 43 0A 35 BE FA D2 | int64_t | 0xD2FABE350A43B0AA (-3244071446552268630) | table field `realtime_remote_time` (Long)
431 buffer = Push<int64_t>(buffer, msg->realtime_remote_time());
432 [[fallthrough]];
433 case 0x40u:
434 if ((end_byte) == 0x40u) {
435 break;
436 }
437 // +0x40 | D5 40 30 F3 C1 A7 26 1D | int64_t | 0x1D26A7C1F33040D5 (2100550727665467605) | table field `monotonic_remote_time` (Long)
438 buffer = Push<int64_t>(buffer, msg->monotonic_remote_time());
439 [[fallthrough]];
440 case 0x48u:
441 if ((end_byte) == 0x48u) {
442 break;
443 }
444 // +0x48 | 5B 25 32 A1 4A E8 46 CA | int64_t | 0xCA46E84AA132255B (-3871151422448720549) | table field `realtime_sent_time` (Long)
445 buffer = Push<int64_t>(buffer, msg->realtime_sent_time());
446 [[fallthrough]];
447 case 0x50u:
448 if ((end_byte) == 0x50u) {
449 break;
450 }
451 // +0x50 | 49 7D 45 1F 8C 36 6B A3 | int64_t | 0xA36B368C1F457D49 (-6671178447571288759) | table field `monotonic_sent_time` (Long)
452 buffer = Push<int64_t>(buffer, msg->monotonic_sent_time());
453 [[fallthrough]];
454 case 0x58u:
455 if ((end_byte) == 0x58u) {
456 break;
457 }
458 // +0x58 | 33 00 00 00 | uint32_t | 0x00000033 (51) | table field `queue_index` (UInt)
459 buffer = Push<uint32_t>(buffer, msg->queue_index());
460 // +0x5C | 76 00 00 00 | uint32_t | 0x00000076 (118) | table field `channel_index` (UInt)
461 buffer = Push<uint32_t>(buffer, channel_index);
462 // clang-format on
463 [[fallthrough]];
464 case 0x60u:
465 if ((end_byte) == 0x60u) {
466 break;
467 }
468 }
Austin Schuhf2d0e682022-10-16 14:20:58 -0700469
Austin Schuh71a40d42023-02-04 21:22:22 -0800470 return end_byte - start_byte;
Austin Schuhf2d0e682022-10-16 14:20:58 -0700471}
472
Austin Schuha36c8902019-12-30 18:07:15 -0800473flatbuffers::Offset<MessageHeader> PackMessage(
474 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
475 int channel_index, LogType log_type) {
476 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
477
478 switch (log_type) {
479 case LogType::kLogMessage:
480 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800481 case LogType::kLogRemoteMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700482 // Since the timestamps are 8 byte aligned, we are going to end up adding
483 // padding in the middle of the message to pad everything out to 8 byte
484 // alignment. That's rather wasteful. To make things efficient to mmap
485 // while reading uncompressed logs, we'd actually rather the message be
486 // aligned. So, force 8 byte alignment (enough to preserve alignment
487 // inside the nested message so that we can read it without moving it)
488 // here.
489 fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8);
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700490 data_offset = fbb->CreateVector(
491 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800492 break;
493
494 case LogType::kLogDeliveryTimeOnly:
495 break;
496 }
497
498 MessageHeader::Builder message_header_builder(*fbb);
499 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800500
Austin Schuhfa30c352022-10-16 11:12:02 -0700501 // These are split out into very explicit serialization calls because the
502 // order here changes the order things are written out on the wire, and we
503 // want to control and understand it here. Changing the order can increase
504 // the amount of padding bytes in the middle.
505 //
James Kuszmaul9776b392023-01-14 14:08:08 -0800506 // It is also easier to follow... And doesn't actually make things much
507 // bigger.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800508 switch (log_type) {
509 case LogType::kLogRemoteMessage:
510 message_header_builder.add_queue_index(context.remote_queue_index);
Austin Schuhfa30c352022-10-16 11:12:02 -0700511 message_header_builder.add_data(data_offset);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800512 message_header_builder.add_monotonic_sent_time(
513 context.monotonic_remote_time.time_since_epoch().count());
514 message_header_builder.add_realtime_sent_time(
515 context.realtime_remote_time.time_since_epoch().count());
516 break;
517
Austin Schuh6f3babe2020-01-26 20:34:50 -0800518 case LogType::kLogDeliveryTimeOnly:
519 message_header_builder.add_queue_index(context.queue_index);
520 message_header_builder.add_monotonic_sent_time(
521 context.monotonic_event_time.time_since_epoch().count());
522 message_header_builder.add_realtime_sent_time(
523 context.realtime_event_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800524 message_header_builder.add_monotonic_remote_time(
525 context.monotonic_remote_time.time_since_epoch().count());
526 message_header_builder.add_realtime_remote_time(
527 context.realtime_remote_time.time_since_epoch().count());
528 message_header_builder.add_remote_queue_index(context.remote_queue_index);
529 break;
Austin Schuhfa30c352022-10-16 11:12:02 -0700530
531 case LogType::kLogMessage:
532 message_header_builder.add_queue_index(context.queue_index);
533 message_header_builder.add_data(data_offset);
534 message_header_builder.add_monotonic_sent_time(
535 context.monotonic_event_time.time_since_epoch().count());
536 message_header_builder.add_realtime_sent_time(
537 context.realtime_event_time.time_since_epoch().count());
538 break;
539
540 case LogType::kLogMessageAndDeliveryTime:
541 message_header_builder.add_queue_index(context.queue_index);
542 message_header_builder.add_remote_queue_index(context.remote_queue_index);
543 message_header_builder.add_monotonic_sent_time(
544 context.monotonic_event_time.time_since_epoch().count());
545 message_header_builder.add_realtime_sent_time(
546 context.realtime_event_time.time_since_epoch().count());
547 message_header_builder.add_monotonic_remote_time(
548 context.monotonic_remote_time.time_since_epoch().count());
549 message_header_builder.add_realtime_remote_time(
550 context.realtime_remote_time.time_since_epoch().count());
551 message_header_builder.add_data(data_offset);
552 break;
Austin Schuha36c8902019-12-30 18:07:15 -0800553 }
554
555 return message_header_builder.Finish();
556}
557
Austin Schuhfa30c352022-10-16 11:12:02 -0700558flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) {
559 switch (log_type) {
560 case LogType::kLogMessage:
561 return
562 // Root table size + offset.
563 sizeof(flatbuffers::uoffset_t) * 2 +
564 // 6 padding bytes to pad the header out properly.
565 6 +
566 // vtable header (size + size of table)
567 sizeof(flatbuffers::voffset_t) * 2 +
568 // offsets to all the fields.
569 sizeof(flatbuffers::voffset_t) * 5 +
570 // pointer to vtable
571 sizeof(flatbuffers::soffset_t) +
572 // pointer to data
573 sizeof(flatbuffers::uoffset_t) +
574 // realtime_sent_time, monotonic_sent_time
575 sizeof(int64_t) * 2 +
576 // queue_index, channel_index
577 sizeof(uint32_t) * 2;
578
579 case LogType::kLogDeliveryTimeOnly:
580 return
581 // Root table size + offset.
582 sizeof(flatbuffers::uoffset_t) * 2 +
583 // 6 padding bytes to pad the header out properly.
584 4 +
585 // vtable header (size + size of table)
586 sizeof(flatbuffers::voffset_t) * 2 +
587 // offsets to all the fields.
588 sizeof(flatbuffers::voffset_t) * 8 +
589 // pointer to vtable
590 sizeof(flatbuffers::soffset_t) +
591 // remote_queue_index
592 sizeof(uint32_t) +
593 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
594 // monotonic_sent_time
595 sizeof(int64_t) * 4 +
596 // queue_index, channel_index
597 sizeof(uint32_t) * 2;
598
599 case LogType::kLogMessageAndDeliveryTime:
600 return
601 // Root table size + offset.
602 sizeof(flatbuffers::uoffset_t) * 2 +
603 // 4 padding bytes to pad the header out properly.
604 4 +
605 // vtable header (size + size of table)
606 sizeof(flatbuffers::voffset_t) * 2 +
607 // offsets to all the fields.
608 sizeof(flatbuffers::voffset_t) * 8 +
609 // pointer to vtable
610 sizeof(flatbuffers::soffset_t) +
611 // pointer to data
612 sizeof(flatbuffers::uoffset_t) +
613 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
614 // monotonic_sent_time
615 sizeof(int64_t) * 4 +
616 // remote_queue_index, queue_index, channel_index
617 sizeof(uint32_t) * 3;
618
619 case LogType::kLogRemoteMessage:
620 return
621 // Root table size + offset.
622 sizeof(flatbuffers::uoffset_t) * 2 +
623 // 6 padding bytes to pad the header out properly.
624 6 +
625 // vtable header (size + size of table)
626 sizeof(flatbuffers::voffset_t) * 2 +
627 // offsets to all the fields.
628 sizeof(flatbuffers::voffset_t) * 5 +
629 // pointer to vtable
630 sizeof(flatbuffers::soffset_t) +
631 // realtime_sent_time, monotonic_sent_time
632 sizeof(int64_t) * 2 +
633 // pointer to data
634 sizeof(flatbuffers::uoffset_t) +
635 // queue_index, channel_index
636 sizeof(uint32_t) * 2;
637 }
638 LOG(FATAL);
639}
640
James Kuszmaul9776b392023-01-14 14:08:08 -0800641flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size) {
Austin Schuhfa30c352022-10-16 11:12:02 -0700642 static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
643 "Update size logic please.");
644 const flatbuffers::uoffset_t aligned_data_length =
Austin Schuh48d10d62022-10-16 22:19:23 -0700645 ((data_size + 7) & 0xfffffff8u);
Austin Schuhfa30c352022-10-16 11:12:02 -0700646 switch (log_type) {
647 case LogType::kLogDeliveryTimeOnly:
648 return PackMessageHeaderSize(log_type);
649
650 case LogType::kLogMessage:
651 case LogType::kLogMessageAndDeliveryTime:
652 case LogType::kLogRemoteMessage:
653 return PackMessageHeaderSize(log_type) +
654 // Vector...
655 sizeof(flatbuffers::uoffset_t) + aligned_data_length;
656 }
657 LOG(FATAL);
658}
659
Austin Schuhfa30c352022-10-16 11:12:02 -0700660size_t PackMessageInline(uint8_t *buffer, const Context &context,
Austin Schuh71a40d42023-02-04 21:22:22 -0800661 int channel_index, LogType log_type, size_t start_byte,
662 size_t end_byte) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700663 // TODO(austin): Figure out how to copy directly from shared memory instead of
664 // first into the fetcher's memory and then into here. That would save a lot
665 // of memory.
Austin Schuhfa30c352022-10-16 11:12:02 -0700666 const flatbuffers::uoffset_t message_size =
Austin Schuh48d10d62022-10-16 22:19:23 -0700667 PackMessageSize(log_type, context.size);
Austin Schuh71a40d42023-02-04 21:22:22 -0800668 DCHECK_EQ((message_size % 8), 0u) << ": Non 8 byte length...";
669 DCHECK_EQ((start_byte % 8u), 0u);
670 DCHECK_EQ((end_byte % 8u), 0u);
671 DCHECK_LE(start_byte, end_byte);
672 DCHECK_LE(end_byte, message_size);
Austin Schuhfa30c352022-10-16 11:12:02 -0700673
674 // Pack all the data in. This is brittle but easy to change. Use the
675 // InlinePackMessage.Equivilent unit test to verify everything matches.
676 switch (log_type) {
677 case LogType::kLogMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -0800678 switch (start_byte) {
679 case 0x00u:
680 if ((end_byte) == 0x00u) {
681 break;
682 }
683 // clang-format off
684 // header:
685 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
686 buffer = Push<flatbuffers::uoffset_t>(
687 buffer, message_size - sizeof(flatbuffers::uoffset_t));
688
689 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
690 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
691 [[fallthrough]];
692 case 0x08u:
693 if ((end_byte) == 0x08u) {
694 break;
695 }
696 //
697 // padding:
698 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
699 buffer = Pad(buffer, 6);
700 //
701 // vtable (aos.logger.MessageHeader):
702 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
703 buffer = Push<flatbuffers::voffset_t>(buffer, 0xe);
704 [[fallthrough]];
705 case 0x10u:
706 if ((end_byte) == 0x10u) {
707 break;
708 }
709 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
710 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
711 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
712 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
713 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
714 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
715 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
716 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
717 [[fallthrough]];
718 case 0x18u:
719 if ((end_byte) == 0x18u) {
720 break;
721 }
722 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
723 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
724 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
725 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
726 //
727 // root_table (aos.logger.MessageHeader):
728 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
729 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e);
730 [[fallthrough]];
731 case 0x20u:
732 if ((end_byte) == 0x20u) {
733 break;
734 }
735 // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long)
736 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
737 [[fallthrough]];
738 case 0x28u:
739 if ((end_byte) == 0x28u) {
740 break;
741 }
742 // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long)
743 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
744 [[fallthrough]];
745 case 0x30u:
746 if ((end_byte) == 0x30u) {
747 break;
748 }
749 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
750 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c);
751 // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt)
752 buffer = Push<uint32_t>(buffer, context.queue_index);
753 [[fallthrough]];
754 case 0x38u:
755 if ((end_byte) == 0x38u) {
756 break;
757 }
758 // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt)
759 buffer = Push<uint32_t>(buffer, channel_index);
760 //
761 // vector (aos.logger.MessageHeader.data):
762 // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items)
763 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
764 [[fallthrough]];
765 case 0x40u:
766 if ((end_byte) == 0x40u) {
767 break;
768 }
769 [[fallthrough]];
770 default:
771 // +0x40 | FF | uint8_t | 0xFF (255) | value[0]
772 // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1]
773 // +0x42 | EE | uint8_t | 0xEE (238) | value[2]
774 // +0x43 | 00 | uint8_t | 0x00 (0) | value[3]
775 // +0x44 | 20 | uint8_t | 0x20 (32) | value[4]
776 // +0x45 | 4D | uint8_t | 0x4D (77) | value[5]
777 // +0x46 | FF | uint8_t | 0xFF (255) | value[6]
778 // +0x47 | 25 | uint8_t | 0x25 (37) | value[7]
779 // +0x48 | 3C | uint8_t | 0x3C (60) | value[8]
780 // +0x49 | 17 | uint8_t | 0x17 (23) | value[9]
781 // +0x4A | 65 | uint8_t | 0x65 (101) | value[10]
782 // +0x4B | 2F | uint8_t | 0x2F (47) | value[11]
783 // +0x4C | 63 | uint8_t | 0x63 (99) | value[12]
784 // +0x4D | 58 | uint8_t | 0x58 (88) | value[13]
785 //
786 // padding:
787 // +0x4E | 00 00 | uint8_t[2] | .. | padding
788 // clang-format on
789 if (start_byte <= 0x40 && end_byte == message_size) {
790 // The easy one, slap it all down.
791 buffer = PushBytes(buffer, context.data, context.size);
792 buffer =
793 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
794 } else {
795 const size_t data_start_byte =
796 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
797 const size_t data_end_byte = end_byte - 0x40;
798 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
799 if (data_start_byte < padded_size) {
800 buffer = PushBytes(
801 buffer,
802 reinterpret_cast<const uint8_t *>(context.data) +
803 data_start_byte,
804 std::min(context.size, data_end_byte) - data_start_byte);
805 if (data_end_byte == padded_size) {
806 // We can only pad the last 7 bytes, so this only gets written
807 // if we write the last byte.
808 buffer = Pad(buffer,
809 ((context.size + 7) & 0xfffffff8u) - context.size);
810 }
811 }
812 }
813 break;
814 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700815 break;
816
817 case LogType::kLogDeliveryTimeOnly:
Austin Schuh71a40d42023-02-04 21:22:22 -0800818 switch (start_byte) {
819 case 0x00u:
820 if ((end_byte) == 0x00u) {
821 break;
822 }
823 // clang-format off
824 // header:
825 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
826 buffer = Push<flatbuffers::uoffset_t>(
827 buffer, message_size - sizeof(flatbuffers::uoffset_t));
828 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
829 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
Austin Schuhfa30c352022-10-16 11:12:02 -0700830
Austin Schuh71a40d42023-02-04 21:22:22 -0800831 [[fallthrough]];
832 case 0x08u:
833 if ((end_byte) == 0x08u) {
834 break;
835 }
836 //
837 // padding:
838 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
839 buffer = Pad(buffer, 4);
840 //
841 // vtable (aos.logger.MessageHeader):
842 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
843 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
844 // +0x0E | 30 00 | uint16_t | 0x0030 (48) | size of referring table
845 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
846 [[fallthrough]];
847 case 0x10u:
848 if ((end_byte) == 0x10u) {
849 break;
850 }
851 // +0x10 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `channel_index` (id: 0)
852 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
853 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
854 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
855 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
856 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
857 // +0x16 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `queue_index` (id: 3)
858 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
859 [[fallthrough]];
860 case 0x18u:
861 if ((end_byte) == 0x18u) {
862 break;
863 }
864 // +0x18 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
865 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
866 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
867 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
868 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
869 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
870 // +0x1E | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
871 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
872 [[fallthrough]];
873 case 0x20u:
874 if ((end_byte) == 0x20u) {
875 break;
876 }
877 //
878 // root_table (aos.logger.MessageHeader):
879 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
880 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
881 // +0x24 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `remote_queue_index` (UInt)
882 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
883 [[fallthrough]];
884 case 0x28u:
885 if ((end_byte) == 0x28u) {
886 break;
887 }
888 // +0x28 | C6 85 F1 AB 83 B5 CD EB | int64_t | 0xEBCDB583ABF185C6 (-1455307527440726586) | table field `realtime_remote_time` (Long)
889 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
890 [[fallthrough]];
891 case 0x30u:
892 if ((end_byte) == 0x30u) {
893 break;
894 }
895 // +0x30 | 47 24 D3 97 1E 42 2D 99 | int64_t | 0x992D421E97D32447 (-7409193112790948793) | table field `monotonic_remote_time` (Long)
896 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
897 [[fallthrough]];
898 case 0x38u:
899 if ((end_byte) == 0x38u) {
900 break;
901 }
902 // +0x38 | C8 B9 A7 AB 79 F2 CD 60 | int64_t | 0x60CDF279ABA7B9C8 (6975498002251626952) | table field `realtime_sent_time` (Long)
903 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
904 [[fallthrough]];
905 case 0x40u:
906 if ((end_byte) == 0x40u) {
907 break;
908 }
909 // +0x40 | EA 8F 2A 0F AF 01 7A AB | int64_t | 0xAB7A01AF0F2A8FEA (-6090553694679822358) | table field `monotonic_sent_time` (Long)
910 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
911 [[fallthrough]];
912 case 0x48u:
913 if ((end_byte) == 0x48u) {
914 break;
915 }
916 // +0x48 | F5 00 00 00 | uint32_t | 0x000000F5 (245) | table field `queue_index` (UInt)
917 buffer = Push<uint32_t>(buffer, context.queue_index);
918 // +0x4C | 88 00 00 00 | uint32_t | 0x00000088 (136) | table field `channel_index` (UInt)
919 buffer = Push<uint32_t>(buffer, channel_index);
920
921 // clang-format on
922 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700923 break;
924
925 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh71a40d42023-02-04 21:22:22 -0800926 switch (start_byte) {
927 case 0x00u:
928 if ((end_byte) == 0x00u) {
929 break;
930 }
931 // clang-format off
932 // header:
933 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
934 buffer = Push<flatbuffers::uoffset_t>(
935 buffer, message_size - sizeof(flatbuffers::uoffset_t));
936 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
937 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
938 [[fallthrough]];
939 case 0x08u:
940 if ((end_byte) == 0x08u) {
941 break;
942 }
943 //
944 // padding:
945 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
946 buffer = Pad(buffer, 4);
947 //
948 // vtable (aos.logger.MessageHeader):
949 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
950 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
951 // +0x0E | 34 00 | uint16_t | 0x0034 (52) | size of referring table
952 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
953 [[fallthrough]];
954 case 0x10u:
955 if ((end_byte) == 0x10u) {
956 break;
957 }
958 // +0x10 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `channel_index` (id: 0)
959 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
960 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
961 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
962 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
963 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
964 // +0x16 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `queue_index` (id: 3)
965 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
966 [[fallthrough]];
967 case 0x18u:
968 if ((end_byte) == 0x18u) {
969 break;
970 }
971 // +0x18 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `data` (id: 4)
972 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
973 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
974 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
975 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
976 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
977 // +0x1E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `remote_queue_index` (id: 7)
978 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
979 [[fallthrough]];
980 case 0x20u:
981 if ((end_byte) == 0x20u) {
982 break;
983 }
984 //
985 // root_table (aos.logger.MessageHeader):
986 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
987 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
988 // +0x24 | 30 00 00 00 | UOffset32 | 0x00000030 (48) Loc: +0x54 | offset to field `data` (vector)
989 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x30);
990 [[fallthrough]];
991 case 0x28u:
992 if ((end_byte) == 0x28u) {
993 break;
994 }
995 // +0x28 | C4 C8 87 BF 40 6C 1F 29 | int64_t | 0x291F6C40BF87C8C4 (2963206105180129476) | table field `realtime_remote_time` (Long)
996 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
997 [[fallthrough]];
998 case 0x30u:
999 if ((end_byte) == 0x30u) {
1000 break;
1001 }
1002 // +0x30 | 0F 00 26 FD D2 6D C0 1F | int64_t | 0x1FC06DD2FD26000F (2287949363661897743) | table field `monotonic_remote_time` (Long)
1003 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
1004 [[fallthrough]];
1005 case 0x38u:
1006 if ((end_byte) == 0x38u) {
1007 break;
1008 }
1009 // +0x38 | 29 75 09 C0 73 73 BF 88 | int64_t | 0x88BF7373C0097529 (-8593022623019338455) | table field `realtime_sent_time` (Long)
1010 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
1011 [[fallthrough]];
1012 case 0x40u:
1013 if ((end_byte) == 0x40u) {
1014 break;
1015 }
1016 // +0x40 | 6D 8A AE 04 50 25 9C E9 | int64_t | 0xE99C255004AE8A6D (-1613373540899321235) | table field `monotonic_sent_time` (Long)
1017 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
1018 [[fallthrough]];
1019 case 0x48u:
1020 if ((end_byte) == 0x48u) {
1021 break;
1022 }
1023 // +0x48 | 47 00 00 00 | uint32_t | 0x00000047 (71) | table field `remote_queue_index` (UInt)
1024 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1025 // +0x4C | 4C 00 00 00 | uint32_t | 0x0000004C (76) | table field `queue_index` (UInt)
1026 buffer = Push<uint32_t>(buffer, context.queue_index);
1027 [[fallthrough]];
1028 case 0x50u:
1029 if ((end_byte) == 0x50u) {
1030 break;
1031 }
1032 // +0x50 | 72 00 00 00 | uint32_t | 0x00000072 (114) | table field `channel_index` (UInt)
1033 buffer = Push<uint32_t>(buffer, channel_index);
1034 //
1035 // vector (aos.logger.MessageHeader.data):
1036 // +0x54 | 07 00 00 00 | uint32_t | 0x00000007 (7) | length of vector (# items)
1037 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
1038 [[fallthrough]];
1039 case 0x58u:
1040 if ((end_byte) == 0x58u) {
1041 break;
1042 }
1043 [[fallthrough]];
1044 default:
1045 // +0x58 | B1 | uint8_t | 0xB1 (177) | value[0]
1046 // +0x59 | 4A | uint8_t | 0x4A (74) | value[1]
1047 // +0x5A | 50 | uint8_t | 0x50 (80) | value[2]
1048 // +0x5B | 24 | uint8_t | 0x24 (36) | value[3]
1049 // +0x5C | AF | uint8_t | 0xAF (175) | value[4]
1050 // +0x5D | C8 | uint8_t | 0xC8 (200) | value[5]
1051 // +0x5E | D5 | uint8_t | 0xD5 (213) | value[6]
1052 //
1053 // padding:
1054 // +0x5F | 00 | uint8_t[1] | . | padding
1055 // clang-format on
1056
1057 if (start_byte <= 0x58 && end_byte == message_size) {
1058 // The easy one, slap it all down.
1059 buffer = PushBytes(buffer, context.data, context.size);
1060 buffer =
1061 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1062 } else {
1063 const size_t data_start_byte =
1064 start_byte < 0x58 ? 0x0u : (start_byte - 0x58);
1065 const size_t data_end_byte = end_byte - 0x58;
1066 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1067 if (data_start_byte < padded_size) {
1068 buffer = PushBytes(
1069 buffer,
1070 reinterpret_cast<const uint8_t *>(context.data) +
1071 data_start_byte,
1072 std::min(context.size, data_end_byte) - data_start_byte);
1073 if (data_end_byte == padded_size) {
1074 // We can only pad the last 7 bytes, so this only gets written
1075 // if we write the last byte.
1076 buffer = Pad(buffer,
1077 ((context.size + 7) & 0xfffffff8u) - context.size);
1078 }
1079 }
1080 }
1081
1082 break;
1083 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001084
1085 break;
1086
1087 case LogType::kLogRemoteMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -08001088 switch (start_byte) {
1089 case 0x00u:
1090 if ((end_byte) == 0x00u) {
1091 break;
1092 }
1093 // This is the message we need to recreate.
1094 //
1095 // clang-format off
1096 // header:
1097 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
1098 buffer = Push<flatbuffers::uoffset_t>(
1099 buffer, message_size - sizeof(flatbuffers::uoffset_t));
1100 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
1101 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
1102 [[fallthrough]];
1103 case 0x08u:
1104 if ((end_byte) == 0x08u) {
1105 break;
1106 }
1107 //
1108 // padding:
1109 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1110 buffer = Pad(buffer, 6);
1111 //
1112 // vtable (aos.logger.MessageHeader):
1113 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
1114 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e);
1115 [[fallthrough]];
1116 case 0x10u:
1117 if ((end_byte) == 0x10u) {
1118 break;
1119 }
1120 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
1121 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
1122 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
1123 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
1124 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
1125 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
1126 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
1127 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
1128 [[fallthrough]];
1129 case 0x18u:
1130 if ((end_byte) == 0x18u) {
1131 break;
1132 }
1133 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
1134 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
1135 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
1136 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
1137 //
1138 // root_table (aos.logger.MessageHeader):
1139 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
1140 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E);
1141 [[fallthrough]];
1142 case 0x20u:
1143 if ((end_byte) == 0x20u) {
1144 break;
1145 }
1146 // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long)
1147 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
1148 [[fallthrough]];
1149 case 0x28u:
1150 if ((end_byte) == 0x28u) {
1151 break;
1152 }
1153 // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long)
1154 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
1155 [[fallthrough]];
1156 case 0x30u:
1157 if ((end_byte) == 0x30u) {
1158 break;
1159 }
1160 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
1161 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C);
1162 // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt)
1163 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1164 [[fallthrough]];
1165 case 0x38u:
1166 if ((end_byte) == 0x38u) {
1167 break;
1168 }
1169 // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt)
1170 buffer = Push<uint32_t>(buffer, channel_index);
1171 //
1172 // vector (aos.logger.MessageHeader.data):
1173 // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items)
1174 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
1175 [[fallthrough]];
1176 case 0x40u:
1177 if ((end_byte) == 0x40u) {
1178 break;
1179 }
1180 [[fallthrough]];
1181 default:
1182 // +0x40 | 38 | uint8_t | 0x38 (56) | value[0]
1183 // +0x41 | 1A | uint8_t | 0x1A (26) | value[1]
1184 // ...
1185 // +0x58 | 90 | uint8_t | 0x90 (144) | value[24]
1186 // +0x59 | 92 | uint8_t | 0x92 (146) | value[25]
1187 //
1188 // padding:
1189 // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1190 // clang-format on
1191 if (start_byte <= 0x40 && end_byte == message_size) {
1192 // The easy one, slap it all down.
1193 buffer = PushBytes(buffer, context.data, context.size);
1194 buffer =
1195 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1196 } else {
1197 const size_t data_start_byte =
1198 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
1199 const size_t data_end_byte = end_byte - 0x40;
1200 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1201 if (data_start_byte < padded_size) {
1202 buffer = PushBytes(
1203 buffer,
1204 reinterpret_cast<const uint8_t *>(context.data) +
1205 data_start_byte,
1206 std::min(context.size, data_end_byte) - data_start_byte);
1207 if (data_end_byte == padded_size) {
1208 // We can only pad the last 7 bytes, so this only gets written
1209 // if we write the last byte.
1210 buffer = Pad(buffer,
1211 ((context.size + 7) & 0xfffffff8u) - context.size);
1212 }
1213 }
1214 }
1215 break;
1216 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001217 }
1218
Austin Schuh71a40d42023-02-04 21:22:22 -08001219 return end_byte - start_byte;
Austin Schuhfa30c352022-10-16 11:12:02 -07001220}
1221
Austin Schuhcd368422021-11-22 21:23:29 -08001222SpanReader::SpanReader(std::string_view filename, bool quiet)
Alexei Strotscee7b372023-04-21 11:57:54 -07001223 : SpanReader(filename, ResolveDecoder(filename, quiet)) {}
Tyler Chatow2015bc62021-08-04 21:15:09 -07001224
Alexei Strotscee7b372023-04-21 11:57:54 -07001225SpanReader::SpanReader(std::string_view filename,
1226 std::unique_ptr<DataDecoder> decoder)
1227 : filename_(filename), decoder_(std::move(decoder)) {}
Austin Schuh05b70472020-01-01 17:11:17 -08001228
Austin Schuhcf5f6442021-07-06 10:43:28 -07001229absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001230 // Make sure we have enough for the size.
1231 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
1232 if (!ReadBlock()) {
1233 return absl::Span<const uint8_t>();
1234 }
1235 }
1236
1237 // Now make sure we have enough for the message.
1238 const size_t data_size =
1239 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1240 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -08001241 if (data_size == sizeof(flatbuffers::uoffset_t)) {
1242 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
1243 LOG(ERROR) << " Rest of log file is "
1244 << absl::BytesToHexString(std::string_view(
1245 reinterpret_cast<const char *>(data_.data() +
1246 consumed_data_),
1247 data_.size() - consumed_data_));
1248 return absl::Span<const uint8_t>();
1249 }
Austin Schuh05b70472020-01-01 17:11:17 -08001250 while (data_.size() < consumed_data_ + data_size) {
1251 if (!ReadBlock()) {
1252 return absl::Span<const uint8_t>();
1253 }
1254 }
1255
1256 // And return it, consuming the data.
1257 const uint8_t *data_ptr = data_.data() + consumed_data_;
1258
Austin Schuh05b70472020-01-01 17:11:17 -08001259 return absl::Span<const uint8_t>(data_ptr, data_size);
1260}
1261
Austin Schuhcf5f6442021-07-06 10:43:28 -07001262void SpanReader::ConsumeMessage() {
Brian Smarttea913d42021-12-10 15:02:38 -08001263 size_t consumed_size =
Austin Schuhcf5f6442021-07-06 10:43:28 -07001264 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1265 sizeof(flatbuffers::uoffset_t);
Brian Smarttea913d42021-12-10 15:02:38 -08001266 consumed_data_ += consumed_size;
1267 total_consumed_ += consumed_size;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001268}
1269
1270absl::Span<const uint8_t> SpanReader::ReadMessage() {
1271 absl::Span<const uint8_t> result = PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001272 if (!result.empty()) {
Austin Schuhcf5f6442021-07-06 10:43:28 -07001273 ConsumeMessage();
Brian Smarttea913d42021-12-10 15:02:38 -08001274 } else {
1275 is_finished_ = true;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001276 }
1277 return result;
1278}
1279
Austin Schuh05b70472020-01-01 17:11:17 -08001280bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001281 // This is the amount of data we grab at a time. Doing larger chunks minimizes
1282 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -08001283 constexpr size_t kReadSize = 256 * 1024;
1284
1285 // Strip off any unused data at the front.
1286 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001287 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -08001288 consumed_data_ = 0;
1289 }
1290
1291 const size_t starting_size = data_.size();
1292
1293 // This should automatically grow the backing store. It won't shrink if we
1294 // get a small chunk later. This reduces allocations when we want to append
1295 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -07001296 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -08001297
Brian Silvermanf51499a2020-09-21 12:49:08 -07001298 const size_t count =
1299 decoder_->Read(data_.begin() + starting_size, data_.end());
1300 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -08001301 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -08001302 return false;
1303 }
Austin Schuh05b70472020-01-01 17:11:17 -08001304
Brian Smarttea913d42021-12-10 15:02:38 -08001305 total_read_ += count;
1306
Austin Schuh05b70472020-01-01 17:11:17 -08001307 return true;
1308}
1309
Alexei Strotsa3194712023-04-21 23:30:50 -07001310LogReadersPool::LogReadersPool(const LogSource *log_source, size_t pool_size)
1311 : log_source_(log_source), pool_size_(pool_size) {}
1312
1313SpanReader *LogReadersPool::BorrowReader(std::string_view id) {
1314 if (part_readers_.size() > pool_size_) {
1315 // Don't leave arbitrary numbers of readers open, because they each take
1316 // resources, so close a big batch at once periodically.
1317 part_readers_.clear();
1318 }
1319 if (log_source_ == nullptr) {
1320 part_readers_.emplace_back(id, FLAGS_quiet_sorting);
1321 } else {
1322 part_readers_.emplace_back(id, log_source_->GetDecoder(id));
1323 }
1324 return &part_readers_.back();
1325}
1326
Austin Schuhadd6eb32020-11-09 21:24:26 -08001327std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -07001328 SpanReader *span_reader) {
1329 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001330
1331 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001332 if (config_data.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001333 return std::nullopt;
1334 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001335
Austin Schuh5212cad2020-09-09 23:12:09 -07001336 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001337 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -08001338 if (!result.Verify()) {
1339 return std::nullopt;
1340 }
Austin Schuh0e8db662021-07-06 10:43:47 -07001341
Austin Schuhcc2c9a52022-12-12 15:55:13 -08001342 // We only know of busted headers in the versions of the log file header
1343 // *before* the logger_sha1 field was added. At some point before that point,
1344 // the logic to track when a header has been written was rewritten in such a
1345 // way that it can't happen anymore. We've seen some logs where the body
1346 // parses as a header recently, so the simple solution of always looking is
1347 // failing us.
1348 if (FLAGS_workaround_double_headers && !result.message().has_logger_sha1()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001349 while (true) {
1350 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001351 if (maybe_header_data.empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001352 break;
1353 }
1354
1355 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
1356 maybe_header_data);
1357 if (maybe_header.Verify()) {
1358 LOG(WARNING) << "Found duplicate LogFileHeader in "
1359 << span_reader->filename();
1360 ResizeableBuffer header_data_copy;
1361 header_data_copy.resize(maybe_header_data.size());
1362 memcpy(header_data_copy.data(), maybe_header_data.begin(),
1363 header_data_copy.size());
1364 result = SizePrefixedFlatbufferVector<LogFileHeader>(
1365 std::move(header_data_copy));
1366
1367 span_reader->ConsumeMessage();
1368 } else {
1369 break;
1370 }
1371 }
1372 }
Austin Schuhe09beb12020-12-11 20:04:27 -08001373 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001374}
1375
Austin Schuh0e8db662021-07-06 10:43:47 -07001376std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
1377 std::string_view filename) {
1378 SpanReader span_reader(filename);
1379 return ReadHeader(&span_reader);
1380}
1381
Austin Schuhadd6eb32020-11-09 21:24:26 -08001382std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -08001383 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -07001384 SpanReader span_reader(filename);
1385 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
1386 for (size_t i = 0; i < n + 1; ++i) {
1387 data_span = span_reader.ReadMessage();
1388
1389 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001390 if (data_span.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001391 return std::nullopt;
1392 }
Austin Schuh5212cad2020-09-09 23:12:09 -07001393 }
1394
Brian Silverman354697a2020-09-22 21:06:32 -07001395 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001396 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -08001397 if (!result.Verify()) {
1398 return std::nullopt;
1399 }
1400 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -07001401}
1402
Alexei Strots58017402023-05-03 22:05:06 -07001403MessageReader::MessageReader(SpanReader span_reader)
1404 : span_reader_(std::move(span_reader)),
Austin Schuhadd6eb32020-11-09 21:24:26 -08001405 raw_log_file_header_(
1406 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001407 set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
1408 set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
1409
Austin Schuh0e8db662021-07-06 10:43:47 -07001410 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
1411 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -08001412
1413 // Make sure something was read.
Alexei Strots58017402023-05-03 22:05:06 -07001414 CHECK(raw_log_file_header)
1415 << ": Failed to read header from: " << span_reader_.filename();
Austin Schuh05b70472020-01-01 17:11:17 -08001416
Austin Schuh0e8db662021-07-06 10:43:47 -07001417 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -08001418
Austin Schuh5b728b72021-06-16 14:57:15 -07001419 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
1420
Brian Smarttea913d42021-12-10 15:02:38 -08001421 total_verified_before_ = span_reader_.TotalConsumed();
1422
Austin Schuhcde938c2020-02-02 17:30:07 -08001423 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -08001424 FLAGS_max_out_of_order > 0
1425 ? chrono::duration_cast<chrono::nanoseconds>(
1426 chrono::duration<double>(FLAGS_max_out_of_order))
1427 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -08001428
Alexei Strots58017402023-05-03 22:05:06 -07001429 VLOG(1) << "Opened " << span_reader_.filename() << " as node "
Austin Schuhcde938c2020-02-02 17:30:07 -08001430 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -08001431}
1432
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001433std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001434 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001435 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001436 if (is_corrupted()) {
1437 LOG(ERROR) << "Total corrupted volumes: before = "
1438 << total_verified_before_
1439 << " | corrupted = " << total_corrupted_
1440 << " | during = " << total_verified_during_
1441 << " | after = " << total_verified_after_ << std::endl;
1442 }
1443
1444 if (span_reader_.IsIncomplete()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001445 LOG(ERROR) << "Unable to access some messages in " << filename() << " : "
1446 << span_reader_.TotalRead() << " bytes read, "
Brian Smarttea913d42021-12-10 15:02:38 -08001447 << span_reader_.TotalConsumed() << " bytes usable."
1448 << std::endl;
1449 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001450 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -08001451 }
1452
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001453 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
Brian Smarttea913d42021-12-10 15:02:38 -08001454
1455 if (crash_on_corrupt_message_flag_) {
1456 CHECK(msg.Verify()) << "Corrupted message at offset "
Austin Schuh60e77942022-05-16 17:48:24 -07001457 << total_verified_before_ << " found within "
1458 << filename()
Brian Smarttea913d42021-12-10 15:02:38 -08001459 << "; set --nocrash_on_corrupt_message to see summary;"
1460 << " also set --ignore_corrupt_messages to process"
1461 << " anyway";
1462
1463 } else if (!msg.Verify()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001464 LOG(ERROR) << "Corrupted message at offset " << total_verified_before_
Brian Smarttea913d42021-12-10 15:02:38 -08001465 << " from " << filename() << std::endl;
1466
1467 total_corrupted_ += msg_data.size();
1468
1469 while (true) {
1470 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1471
James Kuszmaul9776b392023-01-14 14:08:08 -08001472 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001473 if (!ignore_corrupt_messages_flag_) {
1474 LOG(ERROR) << "Total corrupted volumes: before = "
1475 << total_verified_before_
1476 << " | corrupted = " << total_corrupted_
1477 << " | during = " << total_verified_during_
1478 << " | after = " << total_verified_after_ << std::endl;
1479
1480 if (span_reader_.IsIncomplete()) {
1481 LOG(ERROR) << "Unable to access some messages in " << filename()
1482 << " : " << span_reader_.TotalRead() << " bytes read, "
1483 << span_reader_.TotalConsumed() << " bytes usable."
1484 << std::endl;
1485 }
1486 return nullptr;
1487 }
1488 break;
1489 }
1490
1491 SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
1492
1493 if (!next_msg.Verify()) {
1494 total_corrupted_ += msg_data.size();
1495 total_verified_during_ += total_verified_after_;
1496 total_verified_after_ = 0;
1497
1498 } else {
1499 total_verified_after_ += msg_data.size();
1500 if (ignore_corrupt_messages_flag_) {
1501 msg = next_msg;
1502 break;
1503 }
1504 }
1505 }
1506 }
1507
1508 if (is_corrupted()) {
1509 total_verified_after_ += msg_data.size();
1510 } else {
1511 total_verified_before_ += msg_data.size();
1512 }
Austin Schuh05b70472020-01-01 17:11:17 -08001513
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001514 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -07001515
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001516 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001517
1518 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -08001519
1520 if (VLOG_IS_ON(3)) {
1521 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
1522 } else if (VLOG_IS_ON(2)) {
1523 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
1524 msg_copy.mutable_message()->clear_data();
1525 VLOG(2) << "Read from " << filename() << " data "
1526 << FlatbufferToJson(msg_copy);
1527 }
1528
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001529 return result;
1530}
1531
1532std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
1533 const MessageHeader &message) {
1534 const size_t data_size = message.has_data() ? message.data()->size() : 0;
1535
1536 UnpackedMessageHeader *const unpacked_message =
1537 reinterpret_cast<UnpackedMessageHeader *>(
1538 malloc(sizeof(UnpackedMessageHeader) + data_size +
1539 kChannelDataAlignment - 1));
1540
1541 CHECK(message.has_channel_index());
1542 CHECK(message.has_monotonic_sent_time());
1543
1544 absl::Span<uint8_t> span;
1545 if (data_size > 0) {
1546 span =
1547 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
1548 &unpacked_message->actual_data[0], data_size)),
1549 data_size);
1550 }
1551
Austin Schuh826e6ce2021-11-18 20:33:10 -08001552 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001553 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001554 monotonic_remote_time = aos::monotonic_clock::time_point(
1555 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001556 }
1557 std::optional<realtime_clock::time_point> realtime_remote_time;
1558 if (message.has_realtime_remote_time()) {
1559 realtime_remote_time = realtime_clock::time_point(
1560 chrono::nanoseconds(message.realtime_remote_time()));
1561 }
1562
1563 std::optional<uint32_t> remote_queue_index;
1564 if (message.has_remote_queue_index()) {
1565 remote_queue_index = message.remote_queue_index();
1566 }
1567
James Kuszmaul9776b392023-01-14 14:08:08 -08001568 new (unpacked_message) UnpackedMessageHeader(
1569 message.channel_index(),
1570 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001571 chrono::nanoseconds(message.monotonic_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001572 realtime_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001573 chrono::nanoseconds(message.realtime_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001574 message.queue_index(), monotonic_remote_time, realtime_remote_time,
1575 remote_queue_index,
1576 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001577 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001578 message.has_monotonic_timestamp_time(), span);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001579
1580 if (data_size > 0) {
1581 memcpy(span.data(), message.data()->data(), data_size);
1582 }
1583
1584 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
1585 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -08001586}
1587
Alexei Strots58017402023-05-03 22:05:06 -07001588SpanReader PartsMessageReader::MakeSpanReader(
1589 const LogPartsAccess &log_parts_access, size_t part_number) {
1590 const auto part = log_parts_access.GetPartAt(part_number);
1591 if (log_parts_access.log_source().has_value()) {
1592 return SpanReader(part,
1593 log_parts_access.log_source().value()->GetDecoder(part));
1594 } else {
1595 return SpanReader(part);
1596 }
1597}
1598
1599PartsMessageReader::PartsMessageReader(LogPartsAccess log_parts_access)
1600 : log_parts_access_(std::move(log_parts_access)),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -07001601 message_reader_(MakeSpanReader(log_parts_access_, 0)),
1602 max_out_of_order_duration_(
1603 log_parts_access_.max_out_of_order_duration()) {
Alexei Strots58017402023-05-03 22:05:06 -07001604 if (log_parts_access_.size() >= 2) {
1605 next_message_reader_.emplace(MakeSpanReader(log_parts_access_, 1));
Brian Silvermanfee16972021-09-14 12:06:38 -07001606 }
Austin Schuh48507722021-07-17 17:29:24 -07001607 ComputeBootCounts();
1608}
1609
1610void PartsMessageReader::ComputeBootCounts() {
Austin Schuh63097262023-08-16 17:04:29 -07001611 boot_counts_.assign(
1612 configuration::NodesCount(log_parts_access_.config().get()),
1613 std::nullopt);
Austin Schuh48507722021-07-17 17:29:24 -07001614
Alexei Strots58017402023-05-03 22:05:06 -07001615 const auto boots = log_parts_access_.parts().boots;
1616
Austin Schuh48507722021-07-17 17:29:24 -07001617 // We have 3 vintages of log files with different amounts of information.
1618 if (log_file_header()->has_boot_uuids()) {
1619 // The new hotness with the boots explicitly listed out. We can use the log
1620 // file header to compute the boot count of all relevant nodes.
1621 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
1622 size_t node_index = 0;
1623 for (const flatbuffers::String *boot_uuid :
1624 *log_file_header()->boot_uuids()) {
Alexei Strots58017402023-05-03 22:05:06 -07001625 CHECK(boots);
Austin Schuh48507722021-07-17 17:29:24 -07001626 if (boot_uuid->size() != 0) {
Alexei Strots58017402023-05-03 22:05:06 -07001627 auto it = boots->boot_count_map.find(boot_uuid->str());
1628 if (it != boots->boot_count_map.end()) {
Austin Schuh48507722021-07-17 17:29:24 -07001629 boot_counts_[node_index] = it->second;
1630 }
1631 } else if (parts().boots->boots[node_index].size() == 1u) {
1632 boot_counts_[node_index] = 0;
1633 }
1634 ++node_index;
1635 }
1636 } else {
1637 // Older multi-node logs which are guarenteed to have UUIDs logged, or
1638 // single node log files with boot UUIDs in the header. We only know how to
1639 // order certain boots in certain circumstances.
Austin Schuh63097262023-08-16 17:04:29 -07001640 if (configuration::MultiNode(log_parts_access_.config().get()) || boots) {
Austin Schuh48507722021-07-17 17:29:24 -07001641 for (size_t node_index = 0; node_index < boot_counts_.size();
1642 ++node_index) {
Alexei Strots58017402023-05-03 22:05:06 -07001643 if (boots->boots[node_index].size() == 1u) {
Austin Schuh48507722021-07-17 17:29:24 -07001644 boot_counts_[node_index] = 0;
1645 }
1646 }
1647 } else {
1648 // Really old single node logs without any UUIDs. They can't reboot.
1649 CHECK_EQ(boot_counts_.size(), 1u);
1650 boot_counts_[0] = 0u;
1651 }
1652 }
1653}
Austin Schuhc41603c2020-10-11 16:17:37 -07001654
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001655std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -07001656 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001657 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -07001658 message_reader_.ReadMessage();
1659 if (message) {
1660 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001661 const monotonic_clock::time_point monotonic_sent_time =
1662 message->monotonic_sent_time;
1663
1664 // TODO(austin): Does this work with startup? Might need to use the
1665 // start time.
1666 // TODO(austin): Does this work with startup when we don't know the
1667 // remote start time too? Look at one of those logs to compare.
Alexei Strots58017402023-05-03 22:05:06 -07001668 if (monotonic_sent_time > log_parts_access_.parts().monotonic_start_time +
1669 max_out_of_order_duration()) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001670 after_start_ = true;
1671 }
1672 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -08001673 CHECK_GE(monotonic_sent_time,
1674 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -08001675 << ": Max out of order of " << max_out_of_order_duration().count()
Alexei Strots58017402023-05-03 22:05:06 -07001676 << "ns exceeded. " << log_parts_access_.parts()
1677 << ", start time is "
1678 << log_parts_access_.parts().monotonic_start_time
1679 << " currently reading " << filename();
Austin Schuhb000de62020-12-03 22:00:40 -08001680 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001681 return message;
1682 }
1683 NextLog();
1684 }
Austin Schuh32f68492020-11-08 21:45:51 -08001685 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001686 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -07001687}
1688
1689void PartsMessageReader::NextLog() {
Alexei Strots58017402023-05-03 22:05:06 -07001690 if (next_part_index_ == log_parts_access_.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001691 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -07001692 done_ = true;
1693 return;
1694 }
Brian Silvermanfee16972021-09-14 12:06:38 -07001695 CHECK(next_message_reader_);
1696 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -07001697 ComputeBootCounts();
Alexei Strots58017402023-05-03 22:05:06 -07001698 if (next_part_index_ + 1 < log_parts_access_.size()) {
1699 next_message_reader_.emplace(
1700 MakeSpanReader(log_parts_access_, next_part_index_ + 1));
Brian Silvermanfee16972021-09-14 12:06:38 -07001701 } else {
1702 next_message_reader_.reset();
1703 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001704 ++next_part_index_;
1705}
1706
Austin Schuh1be0ce42020-11-29 22:43:26 -08001707bool Message::operator<(const Message &m2) const {
Austin Schuh63097262023-08-16 17:04:29 -07001708 if (this->timestamp < m2.timestamp) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001709 return true;
Austin Schuh63097262023-08-16 17:04:29 -07001710 } else if (this->timestamp > m2.timestamp) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001711 return false;
1712 }
1713
1714 if (this->channel_index < m2.channel_index) {
1715 return true;
1716 } else if (this->channel_index > m2.channel_index) {
1717 return false;
1718 }
1719
1720 return this->queue_index < m2.queue_index;
1721}
1722
1723bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001724bool Message::operator==(const Message &m2) const {
Austin Schuh63097262023-08-16 17:04:29 -07001725 return timestamp == m2.timestamp && channel_index == m2.channel_index &&
1726 queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001727}
Austin Schuh1be0ce42020-11-29 22:43:26 -08001728
Austin Schuh63097262023-08-16 17:04:29 -07001729bool Message::operator<=(const Message &m2) const {
1730 return *this == m2 || *this < m2;
1731}
1732
1733std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &msg) {
1734 os << "{.channel_index=" << msg.channel_index
1735 << ", .monotonic_sent_time=" << msg.monotonic_sent_time
1736 << ", .realtime_sent_time=" << msg.realtime_sent_time
1737 << ", .queue_index=" << msg.queue_index;
1738 if (msg.monotonic_remote_time) {
1739 os << ", .monotonic_remote_time=" << *msg.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001740 }
1741 os << ", .realtime_remote_time=";
Austin Schuh63097262023-08-16 17:04:29 -07001742 PrintOptionalOrNull(&os, msg.realtime_remote_time);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001743 os << ", .remote_queue_index=";
Austin Schuh63097262023-08-16 17:04:29 -07001744 PrintOptionalOrNull(&os, msg.remote_queue_index);
1745 if (msg.has_monotonic_timestamp_time) {
1746 os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001747 }
Austin Schuh22cf7862022-09-19 19:09:42 -07001748 os << "}";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001749 return os;
1750}
1751
Austin Schuh63097262023-08-16 17:04:29 -07001752std::ostream &operator<<(std::ostream &os, const Message &msg) {
1753 os << "{.channel_index=" << msg.channel_index
1754 << ", .queue_index=" << msg.queue_index
1755 << ", .timestamp=" << msg.timestamp;
1756 if (msg.data != nullptr) {
1757 if (msg.data->remote_queue_index.has_value()) {
1758 os << ", .remote_queue_index=" << *msg.data->remote_queue_index;
Austin Schuh826e6ce2021-11-18 20:33:10 -08001759 }
Austin Schuh63097262023-08-16 17:04:29 -07001760 if (msg.data->monotonic_remote_time.has_value()) {
1761 os << ", .monotonic_remote_time=" << *msg.data->monotonic_remote_time;
Austin Schuh826e6ce2021-11-18 20:33:10 -08001762 }
Austin Schuh63097262023-08-16 17:04:29 -07001763 os << ", .data=" << msg.data;
Austin Schuhd2f96102020-12-01 20:27:29 -08001764 }
1765 os << "}";
1766 return os;
1767}
1768
Austin Schuh63097262023-08-16 17:04:29 -07001769std::ostream &operator<<(std::ostream &os, const TimestampedMessage &msg) {
1770 os << "{.channel_index=" << msg.channel_index
1771 << ", .queue_index=" << msg.queue_index
1772 << ", .monotonic_event_time=" << msg.monotonic_event_time
1773 << ", .realtime_event_time=" << msg.realtime_event_time;
1774 if (msg.remote_queue_index != BootQueueIndex::Invalid()) {
1775 os << ", .remote_queue_index=" << msg.remote_queue_index;
Austin Schuhd2f96102020-12-01 20:27:29 -08001776 }
Austin Schuh63097262023-08-16 17:04:29 -07001777 if (msg.monotonic_remote_time != BootTimestamp::min_time()) {
1778 os << ", .monotonic_remote_time=" << msg.monotonic_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001779 }
Austin Schuh63097262023-08-16 17:04:29 -07001780 if (msg.realtime_remote_time != realtime_clock::min_time) {
1781 os << ", .realtime_remote_time=" << msg.realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001782 }
Austin Schuh63097262023-08-16 17:04:29 -07001783 if (msg.monotonic_timestamp_time != BootTimestamp::min_time()) {
1784 os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001785 }
Austin Schuh63097262023-08-16 17:04:29 -07001786 if (msg.data != nullptr) {
1787 os << ", .data=" << *msg.data;
Austin Schuh22cf7862022-09-19 19:09:42 -07001788 } else {
1789 os << ", .data=nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08001790 }
1791 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -08001792 return os;
1793}
1794
Alexei Strots58017402023-05-03 22:05:06 -07001795MessageSorter::MessageSorter(const LogPartsAccess log_parts_access)
1796 : parts_message_reader_(log_parts_access),
Austin Schuh48507722021-07-17 17:29:24 -07001797 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
1798}
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001799
Adam Snaider13d48d92023-08-03 12:20:15 -07001800const Message *MessageSorter::Front() {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001801 // Queue up data until enough data has been queued that the front message is
1802 // sorted enough to be safe to pop. This may do nothing, so we should make
1803 // sure the nothing path is checked quickly.
1804 if (sorted_until() != monotonic_clock::max_time) {
1805 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -07001806 if (!messages_.empty() &&
1807 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -08001808 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001809 break;
1810 }
1811
Austin Schuh63097262023-08-16 17:04:29 -07001812 std::shared_ptr<UnpackedMessageHeader> msg =
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001813 parts_message_reader_.ReadMessage();
1814 // No data left, sorted forever, work through what is left.
Austin Schuh63097262023-08-16 17:04:29 -07001815 if (!msg) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001816 sorted_until_ = monotonic_clock::max_time;
1817 break;
1818 }
1819
Austin Schuh48507722021-07-17 17:29:24 -07001820 size_t monotonic_timestamp_boot = 0;
Austin Schuh63097262023-08-16 17:04:29 -07001821 if (msg->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -07001822 monotonic_timestamp_boot = parts().logger_boot_count;
1823 }
1824 size_t monotonic_remote_boot = 0xffffff;
1825
Austin Schuh63097262023-08-16 17:04:29 -07001826 if (msg->monotonic_remote_time.has_value()) {
Maxwell Gumley8c1b87f2024-02-13 17:54:52 -07001827 CHECK_LT(msg->channel_index, source_node_index_.size());
Austin Schuh63097262023-08-16 17:04:29 -07001828 const Node *node = parts().config->nodes()->Get(
1829 source_node_index_[msg->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001830
Austin Schuh48507722021-07-17 17:29:24 -07001831 std::optional<size_t> boot = parts_message_reader_.boot_count(
Austin Schuh63097262023-08-16 17:04:29 -07001832 source_node_index_[msg->channel_index]);
Alexei Strots036d84e2023-05-03 16:05:12 -07001833 CHECK(boot) << ": Failed to find boot for node '" << MaybeNodeName(node)
Austin Schuh63097262023-08-16 17:04:29 -07001834 << "', with index "
1835 << source_node_index_[msg->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -07001836 monotonic_remote_boot = *boot;
1837 }
1838
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001839 messages_.insert(
Austin Schuh63097262023-08-16 17:04:29 -07001840 Message{.channel_index = msg->channel_index,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001841 .queue_index = BootQueueIndex{.boot = parts().boot_count,
Austin Schuh63097262023-08-16 17:04:29 -07001842 .index = msg->queue_index},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001843 .timestamp = BootTimestamp{.boot = parts().boot_count,
Austin Schuh63097262023-08-16 17:04:29 -07001844 .time = msg->monotonic_sent_time},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001845 .monotonic_remote_boot = monotonic_remote_boot,
1846 .monotonic_timestamp_boot = monotonic_timestamp_boot,
Austin Schuh63097262023-08-16 17:04:29 -07001847 .data = std::move(msg)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001848
1849 // Now, update sorted_until_ to match the new message.
1850 if (parts_message_reader_.newest_timestamp() >
1851 monotonic_clock::min_time +
1852 parts_message_reader_.max_out_of_order_duration()) {
1853 sorted_until_ = parts_message_reader_.newest_timestamp() -
1854 parts_message_reader_.max_out_of_order_duration();
1855 } else {
1856 sorted_until_ = monotonic_clock::min_time;
1857 }
1858 }
1859 }
1860
1861 // Now that we have enough data queued, return a pointer to the oldest piece
1862 // of data if it exists.
1863 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -08001864 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001865 return nullptr;
1866 }
1867
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001868 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -08001869 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001870 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh63097262023-08-16 17:04:29 -07001871 VLOG(1) << this << " Front, sorted until " << sorted_until_ << " for "
1872 << (*messages_.begin()) << " on " << parts_message_reader_.filename();
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001873 return &(*messages_.begin());
1874}
1875
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001876void MessageSorter::PopFront() { messages_.erase(messages_.begin()); }
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001877
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001878std::string MessageSorter::DebugString() const {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001879 std::stringstream ss;
1880 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001881 int count = 0;
1882 bool no_dots = true;
Austin Schuh63097262023-08-16 17:04:29 -07001883 for (const Message &msg : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001884 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
Austin Schuh63097262023-08-16 17:04:29 -07001885 ss << msg << "\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001886 } else if (no_dots) {
1887 ss << "...\n";
1888 no_dots = false;
1889 }
1890 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001891 }
1892 ss << "] <- " << parts_message_reader_.filename();
1893 return ss.str();
1894}
1895
Austin Schuh63097262023-08-16 17:04:29 -07001896// Class to merge start times cleanly, reusably, and incrementally.
1897class StartTimes {
1898 public:
1899 void Update(monotonic_clock::time_point new_monotonic_start_time,
1900 realtime_clock::time_point new_realtime_start_time) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001901 // We want to capture the earliest meaningful start time here. The start
1902 // time defaults to min_time when there's no meaningful value to report, so
1903 // let's ignore those.
Austin Schuh63097262023-08-16 17:04:29 -07001904 if (new_monotonic_start_time != monotonic_clock::min_time) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001905 bool accept = false;
1906 // We want to prioritize start times from the logger node. Really, we
1907 // want to prioritize start times with a valid realtime_clock time. So,
1908 // if we have a start time without a RT clock, prefer a start time with a
1909 // RT clock, even it if is later.
Austin Schuh63097262023-08-16 17:04:29 -07001910 if (new_realtime_start_time != realtime_clock::min_time) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001911 // We've got a good one. See if the current start time has a good RT
1912 // clock, or if we should use this one instead.
Austin Schuh63097262023-08-16 17:04:29 -07001913 if (new_monotonic_start_time < monotonic_start_time_ ||
1914 monotonic_start_time_ == monotonic_clock::min_time) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001915 accept = true;
1916 } else if (realtime_start_time_ == realtime_clock::min_time) {
1917 // The previous start time doesn't have a good RT time, so it is very
1918 // likely the start time from a remote part file. We just found a
1919 // better start time with a real RT time, so switch to that instead.
1920 accept = true;
1921 }
1922 } else if (realtime_start_time_ == realtime_clock::min_time) {
1923 // We don't have a RT time, so take the oldest.
Austin Schuh63097262023-08-16 17:04:29 -07001924 if (new_monotonic_start_time < monotonic_start_time_ ||
1925 monotonic_start_time_ == monotonic_clock::min_time) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001926 accept = true;
1927 }
1928 }
1929
1930 if (accept) {
Austin Schuh63097262023-08-16 17:04:29 -07001931 monotonic_start_time_ = new_monotonic_start_time;
1932 realtime_start_time_ = new_realtime_start_time;
Austin Schuh9dc42612021-09-20 20:41:29 -07001933 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001934 }
1935 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001936
Austin Schuh63097262023-08-16 17:04:29 -07001937 monotonic_clock::time_point monotonic_start_time() const {
1938 return monotonic_start_time_;
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001939 }
Austin Schuh63097262023-08-16 17:04:29 -07001940 realtime_clock::time_point realtime_start_time() const {
1941 return realtime_start_time_;
1942 }
1943
1944 private:
1945 monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::min_time;
1946 realtime_clock::time_point realtime_start_time_ = realtime_clock::min_time;
1947};
1948
1949PartsMerger::PartsMerger(SelectedLogParts &&parts) {
1950 node_ = configuration::GetNodeIndex(parts.config().get(), parts.node_name());
1951
1952 for (LogPartsAccess part : parts) {
1953 message_sorters_.emplace_back(std::move(part));
1954 }
1955
1956 StartTimes start_times;
1957 for (const MessageSorter &message_sorter : message_sorters_) {
1958 start_times.Update(message_sorter.monotonic_start_time(),
1959 message_sorter.realtime_start_time());
1960 }
1961 monotonic_start_time_ = start_times.monotonic_start_time();
1962 realtime_start_time_ = start_times.realtime_start_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08001963}
Austin Schuh8f52ed52020-11-30 23:12:39 -08001964
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001965std::vector<const LogParts *> PartsMerger::Parts() const {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001966 std::vector<const LogParts *> p;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001967 p.reserve(message_sorters_.size());
1968 for (const MessageSorter &message_sorter : message_sorters_) {
1969 p.emplace_back(&message_sorter.parts());
Austin Schuh0ca51f32020-12-25 21:51:45 -08001970 }
1971 return p;
1972}
1973
Adam Snaider13d48d92023-08-03 12:20:15 -07001974const Message *PartsMerger::Front() {
Austin Schuh8f52ed52020-11-30 23:12:39 -08001975 // Return the current Front if we have one, otherwise go compute one.
1976 if (current_ != nullptr) {
Adam Snaider13d48d92023-08-03 12:20:15 -07001977 const Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001978 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuh63097262023-08-16 17:04:29 -07001979 VLOG(1) << this << " PartsMerger::Front for node " << node_name() << " "
1980 << *result;
Austin Schuhb000de62020-12-03 22:00:40 -08001981 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001982 }
1983
1984 // Otherwise, do a simple search for the oldest message, deduplicating any
1985 // duplicates.
Adam Snaider13d48d92023-08-03 12:20:15 -07001986 const Message *oldest = nullptr;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001987 sorted_until_ = monotonic_clock::max_time;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001988 for (MessageSorter &message_sorter : message_sorters_) {
Adam Snaider13d48d92023-08-03 12:20:15 -07001989 const Message *msg = message_sorter.Front();
Austin Schuh63097262023-08-16 17:04:29 -07001990 if (!msg) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001991 sorted_until_ = std::min(sorted_until_, message_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001992 continue;
1993 }
Austin Schuh63097262023-08-16 17:04:29 -07001994 if (oldest == nullptr || *msg < *oldest) {
1995 oldest = msg;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001996 current_ = &message_sorter;
Austin Schuh63097262023-08-16 17:04:29 -07001997 } else if (*msg == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001998 // Found a duplicate. If there is a choice, we want the one which has
1999 // the timestamp time.
Austin Schuh63097262023-08-16 17:04:29 -07002000 if (!msg->data->has_monotonic_timestamp_time) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002001 message_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002002 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08002003 current_->PopFront();
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002004 current_ = &message_sorter;
Austin Schuh63097262023-08-16 17:04:29 -07002005 oldest = msg;
Austin Schuh8bf1e632021-01-02 22:41:04 -08002006 } else {
Austin Schuh63097262023-08-16 17:04:29 -07002007 CHECK_EQ(msg->data->monotonic_timestamp_time,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002008 oldest->data->monotonic_timestamp_time);
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002009 message_sorter.PopFront();
Austin Schuh8bf1e632021-01-02 22:41:04 -08002010 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08002011 }
2012
2013 // PopFront may change this, so compute it down here.
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002014 sorted_until_ = std::min(sorted_until_, message_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08002015 }
2016
Austin Schuhb000de62020-12-03 22:00:40 -08002017 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002018 CHECK_GE(oldest->timestamp.time, last_message_time_);
2019 last_message_time_ = oldest->timestamp.time;
Austin Schuh63097262023-08-16 17:04:29 -07002020 if (monotonic_oldest_time_ > oldest->timestamp.time) {
2021 VLOG(1) << this << " Updating oldest to " << oldest->timestamp.time
2022 << " for node " << node_name() << " with a start time of "
2023 << monotonic_start_time_ << " " << *oldest;
2024 }
Austin Schuh5dd22842021-11-17 16:09:39 -08002025 monotonic_oldest_time_ =
2026 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08002027 } else {
2028 last_message_time_ = monotonic_clock::max_time;
2029 }
2030
Austin Schuh8f52ed52020-11-30 23:12:39 -08002031 // Return the oldest message found. This will be nullptr if nothing was
2032 // found, indicating there is nothing left.
Austin Schuh63097262023-08-16 17:04:29 -07002033 if (oldest) {
2034 VLOG(1) << this << " PartsMerger::Front for node " << node_name() << " "
2035 << *oldest;
2036 } else {
2037 VLOG(1) << this << " PartsMerger::Front for node " << node_name();
2038 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08002039 return oldest;
2040}
2041
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002042void PartsMerger::PopFront() {
Austin Schuh8f52ed52020-11-30 23:12:39 -08002043 CHECK(current_ != nullptr) << "Popping before calling Front()";
2044 current_->PopFront();
2045 current_ = nullptr;
2046}
2047
Alexei Strots1f51ac72023-05-15 10:14:54 -07002048BootMerger::BootMerger(std::string_view node_name,
Austin Schuh63097262023-08-16 17:04:29 -07002049 const LogFilesContainer &log_files,
2050 const std::vector<StoredDataType> &types)
2051 : configuration_(log_files.config()),
2052 node_(configuration::GetNodeIndex(configuration_.get(), node_name)) {
Alexei Strots1f51ac72023-05-15 10:14:54 -07002053 size_t number_of_boots = log_files.BootsForNode(node_name);
2054 parts_mergers_.reserve(number_of_boots);
2055 for (size_t i = 0; i < number_of_boots; ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07002056 VLOG(2) << "Boot " << i;
Austin Schuh63097262023-08-16 17:04:29 -07002057 SelectedLogParts selected_parts =
2058 log_files.SelectParts(node_name, i, types);
2059 // We are guarenteed to have something each boot, but not guarenteed to have
2060 // both timestamps and data for each boot. If we don't have anything, don't
2061 // create a parts merger. The rest of this class will detect that and
2062 // ignore it as required.
2063 if (selected_parts.empty()) {
2064 parts_mergers_.emplace_back(nullptr);
2065 } else {
2066 parts_mergers_.emplace_back(
2067 std::make_unique<PartsMerger>(std::move(selected_parts)));
2068 }
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002069 }
2070}
2071
Austin Schuh63097262023-08-16 17:04:29 -07002072std::string_view BootMerger::node_name() const {
2073 return configuration::NodeName(configuration().get(), node());
2074}
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002075
Adam Snaider13d48d92023-08-03 12:20:15 -07002076const Message *BootMerger::Front() {
Austin Schuh63097262023-08-16 17:04:29 -07002077 if (parts_mergers_[index_].get() != nullptr) {
Adam Snaider13d48d92023-08-03 12:20:15 -07002078 const Message *result = parts_mergers_[index_]->Front();
Austin Schuh63097262023-08-16 17:04:29 -07002079
2080 if (result != nullptr) {
2081 VLOG(1) << this << " BootMerger::Front " << node_name() << " " << *result;
2082 return result;
2083 }
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002084 }
2085
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002086 if (index_ + 1u == parts_mergers_.size()) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002087 // At the end of the last node merger, just return.
Austin Schuh63097262023-08-16 17:04:29 -07002088 VLOG(1) << this << " BootMerger::Front " << node_name() << " nullptr";
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002089 return nullptr;
2090 } else {
2091 ++index_;
Adam Snaider13d48d92023-08-03 12:20:15 -07002092 const Message *result = Front();
Austin Schuh63097262023-08-16 17:04:29 -07002093 VLOG(1) << this << " BootMerger::Front " << node_name() << " " << *result;
2094 return result;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002095 }
2096}
2097
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002098void BootMerger::PopFront() { parts_mergers_[index_]->PopFront(); }
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002099
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002100std::vector<const LogParts *> BootMerger::Parts() const {
2101 std::vector<const LogParts *> results;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002102 for (const std::unique_ptr<PartsMerger> &parts_merger : parts_mergers_) {
Austin Schuh63097262023-08-16 17:04:29 -07002103 if (!parts_merger) continue;
2104
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002105 std::vector<const LogParts *> node_parts = parts_merger->Parts();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002106
2107 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
2108 std::make_move_iterator(node_parts.end()));
2109 }
2110
2111 return results;
2112}
2113
Austin Schuh63097262023-08-16 17:04:29 -07002114monotonic_clock::time_point BootMerger::monotonic_start_time(
2115 size_t boot) const {
2116 CHECK_LT(boot, parts_mergers_.size());
2117 if (parts_mergers_[boot]) {
2118 return parts_mergers_[boot]->monotonic_start_time();
Austin Schuh0ca51f32020-12-25 21:51:45 -08002119 }
Austin Schuh63097262023-08-16 17:04:29 -07002120 return monotonic_clock::min_time;
2121}
2122
2123realtime_clock::time_point BootMerger::realtime_start_time(size_t boot) const {
2124 CHECK_LT(boot, parts_mergers_.size());
2125 if (parts_mergers_[boot]) {
2126 return parts_mergers_[boot]->realtime_start_time();
2127 }
2128 return realtime_clock::min_time;
2129}
2130
2131monotonic_clock::time_point BootMerger::monotonic_oldest_time(
2132 size_t boot) const {
2133 CHECK_LT(boot, parts_mergers_.size());
2134 if (parts_mergers_[boot]) {
2135 return parts_mergers_[boot]->monotonic_oldest_time();
2136 }
2137 return monotonic_clock::max_time;
2138}
2139
2140bool BootMerger::started() const {
2141 if (index_ == 0) {
2142 if (!parts_mergers_[0]) {
2143 return false;
2144 }
2145 return parts_mergers_[index_]->sorted_until() != monotonic_clock::min_time;
2146 }
2147 return true;
2148}
2149
2150SplitTimestampBootMerger::SplitTimestampBootMerger(
2151 std::string_view node_name, const LogFilesContainer &log_files,
2152 TimestampQueueStrategy timestamp_queue_strategy)
2153 : boot_merger_(node_name, log_files,
2154 (timestamp_queue_strategy ==
2155 TimestampQueueStrategy::kQueueTimestampsAtStartup)
2156 ? std::vector<StoredDataType>{StoredDataType::DATA}
2157 : std::vector<StoredDataType>{
2158 StoredDataType::DATA, StoredDataType::TIMESTAMPS,
2159 StoredDataType::REMOTE_TIMESTAMPS}) {
2160 // Make the timestamp_boot_merger_ only if we are asked to, and if there are
2161 // files to put in it. We don't need it for a data only log.
2162 if (timestamp_queue_strategy ==
2163 TimestampQueueStrategy::kQueueTimestampsAtStartup &&
2164 log_files.HasTimestamps(node_name)) {
2165 timestamp_boot_merger_ = std::make_unique<BootMerger>(
2166 node_name, log_files,
2167 std::vector<StoredDataType>{StoredDataType::TIMESTAMPS,
2168 StoredDataType::REMOTE_TIMESTAMPS});
2169 }
2170
2171 size_t number_of_boots = log_files.BootsForNode(node_name);
2172 monotonic_start_time_.reserve(number_of_boots);
2173 realtime_start_time_.reserve(number_of_boots);
2174
2175 // Start times are split across the timestamp boot merger, and data boot
2176 // merger. Pull from both and combine them to get the same answer as before.
2177 for (size_t i = 0u; i < number_of_boots; ++i) {
2178 StartTimes start_times;
2179
2180 if (timestamp_boot_merger_) {
2181 start_times.Update(timestamp_boot_merger_->monotonic_start_time(i),
2182 timestamp_boot_merger_->realtime_start_time(i));
2183 }
2184
2185 start_times.Update(boot_merger_.monotonic_start_time(i),
2186 boot_merger_.realtime_start_time(i));
2187
2188 monotonic_start_time_.push_back(start_times.monotonic_start_time());
2189 realtime_start_time_.push_back(start_times.realtime_start_time());
2190 }
2191}
2192
2193void SplitTimestampBootMerger::QueueTimestamps(
2194 std::function<void(TimestampedMessage *)> fn,
2195 const std::vector<size_t> &source_node) {
2196 if (!timestamp_boot_merger_) {
2197 return;
2198 }
2199
2200 while (true) {
2201 // Load all the timestamps. If we find data, ignore it and drop it on the
2202 // floor. It will be read when boot_merger_ is used.
Adam Snaider13d48d92023-08-03 12:20:15 -07002203 const Message *msg = timestamp_boot_merger_->Front();
Austin Schuh63097262023-08-16 17:04:29 -07002204 if (!msg) {
2205 queue_timestamps_ran_ = true;
2206 return;
2207 }
2208 if (source_node[msg->channel_index] != static_cast<size_t>(node())) {
2209 timestamp_messages_.emplace_back(TimestampedMessage{
2210 .channel_index = msg->channel_index,
2211 .queue_index = msg->queue_index,
2212 .monotonic_event_time = msg->timestamp,
2213 .realtime_event_time = msg->data->realtime_sent_time,
2214 .remote_queue_index =
2215 BootQueueIndex{.boot = msg->monotonic_remote_boot,
2216 .index = msg->data->remote_queue_index.value()},
2217 .monotonic_remote_time = {msg->monotonic_remote_boot,
2218 msg->data->monotonic_remote_time.value()},
2219 .realtime_remote_time = msg->data->realtime_remote_time.value(),
2220 .monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
2221 msg->data->monotonic_timestamp_time},
2222 .data = std::move(msg->data)});
2223
2224 VLOG(2) << this << " Queued timestamp of " << timestamp_messages_.back();
2225 fn(&timestamp_messages_.back());
2226 } else {
2227 VLOG(2) << this << " Dropped data";
2228 }
2229 timestamp_boot_merger_->PopFront();
2230 }
2231
2232 // TODO(austin): Push the queue into TimestampMapper instead. Have it pull
2233 // all the timestamps. That will also make it so we don't have to clear the
2234 // function.
2235}
2236
2237std::string_view SplitTimestampBootMerger::node_name() const {
2238 return configuration::NodeName(configuration().get(), node());
2239}
2240
2241monotonic_clock::time_point SplitTimestampBootMerger::monotonic_start_time(
2242 size_t boot) const {
2243 CHECK_LT(boot, monotonic_start_time_.size());
2244 return monotonic_start_time_[boot];
2245}
2246
2247realtime_clock::time_point SplitTimestampBootMerger::realtime_start_time(
2248 size_t boot) const {
2249 CHECK_LT(boot, realtime_start_time_.size());
2250 return realtime_start_time_[boot];
2251}
2252
2253monotonic_clock::time_point SplitTimestampBootMerger::monotonic_oldest_time(
2254 size_t boot) const {
2255 if (!timestamp_boot_merger_) {
2256 return boot_merger_.monotonic_oldest_time(boot);
2257 }
2258 return std::min(boot_merger_.monotonic_oldest_time(boot),
2259 timestamp_boot_merger_->monotonic_oldest_time(boot));
2260}
2261
Adam Snaider13d48d92023-08-03 12:20:15 -07002262const Message *SplitTimestampBootMerger::Front() {
2263 const Message *boot_merger_front = boot_merger_.Front();
Austin Schuh63097262023-08-16 17:04:29 -07002264
2265 if (timestamp_boot_merger_) {
2266 CHECK(queue_timestamps_ran_);
2267 }
2268
2269 // timestamp_messages_ is a queue of TimestampedMessage, but we are supposed
2270 // to return a Message. We need to convert the first message in the list
2271 // before returning it (and comparing, honestly). Fill next_timestamp_ in if
2272 // it is empty so the rest of the logic here can just look at next_timestamp_
2273 // and use that instead.
2274 if (!next_timestamp_ && !timestamp_messages_.empty()) {
2275 auto &front = timestamp_messages_.front();
2276 next_timestamp_ = Message{
2277 .channel_index = front.channel_index,
2278 .queue_index = front.queue_index,
2279 .timestamp = front.monotonic_event_time,
2280 .monotonic_remote_boot = front.remote_queue_index.boot,
2281 .monotonic_timestamp_boot = front.monotonic_timestamp_time.boot,
2282 .data = std::move(front.data),
2283 };
2284 timestamp_messages_.pop_front();
2285 }
2286
2287 if (!next_timestamp_) {
2288 message_source_ = MessageSource::kBootMerger;
2289 if (boot_merger_front != nullptr) {
2290 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2291 << " " << *boot_merger_front;
2292 } else {
2293 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2294 << " nullptr";
2295 }
2296 return boot_merger_front;
2297 }
2298
2299 if (boot_merger_front == nullptr) {
2300 message_source_ = MessageSource::kTimestampMessage;
2301
2302 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
2303 << next_timestamp_.value();
2304 return &next_timestamp_.value();
2305 }
2306
2307 if (*boot_merger_front <= next_timestamp_.value()) {
2308 if (*boot_merger_front == next_timestamp_.value()) {
2309 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2310 << " Dropping duplicate timestamp.";
2311 next_timestamp_.reset();
2312 }
2313 message_source_ = MessageSource::kBootMerger;
2314 if (boot_merger_front != nullptr) {
2315 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2316 << " " << *boot_merger_front;
2317 } else {
2318 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
2319 << " nullptr";
2320 }
2321 return boot_merger_front;
2322 } else {
2323 message_source_ = MessageSource::kTimestampMessage;
2324 VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
2325 << next_timestamp_.value();
2326 return &next_timestamp_.value();
2327 }
2328}
2329
2330void SplitTimestampBootMerger::PopFront() {
2331 switch (message_source_) {
2332 case MessageSource::kTimestampMessage:
2333 CHECK(next_timestamp_.has_value());
2334 next_timestamp_.reset();
2335 break;
2336 case MessageSource::kBootMerger:
2337 boot_merger_.PopFront();
2338 break;
2339 }
2340}
2341
2342TimestampMapper::TimestampMapper(
2343 std::string_view node_name, const LogFilesContainer &log_files,
2344 TimestampQueueStrategy timestamp_queue_strategy)
2345 : boot_merger_(node_name, log_files, timestamp_queue_strategy),
2346 timestamp_callback_([](TimestampedMessage *) {}) {
2347 configuration_ = boot_merger_.configuration();
2348
Austin Schuh0ca51f32020-12-25 21:51:45 -08002349 const Configuration *config = configuration_.get();
Alexei Strots1f51ac72023-05-15 10:14:54 -07002350 // Only fill out nodes_data_ if there are nodes. Otherwise, everything is
Austin Schuhd2f96102020-12-01 20:27:29 -08002351 // pretty simple.
2352 if (configuration::MultiNode(config)) {
2353 nodes_data_.resize(config->nodes()->size());
2354 const Node *my_node = config->nodes()->Get(node());
2355 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
2356 const Node *node = config->nodes()->Get(node_index);
2357 NodeData *node_data = &nodes_data_[node_index];
2358 node_data->channels.resize(config->channels()->size());
2359 // We should save the channel if it is delivered to the node represented
2360 // by the NodeData, but not sent by that node. That combo means it is
2361 // forwarded.
2362 size_t channel_index = 0;
2363 node_data->any_delivered = false;
2364 for (const Channel *channel : *config->channels()) {
2365 node_data->channels[channel_index].delivered =
2366 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08002367 configuration::ChannelIsSendableOnNode(channel, my_node) &&
2368 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08002369 node_data->any_delivered = node_data->any_delivered ||
2370 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08002371 if (node_data->channels[channel_index].delivered) {
2372 const Connection *connection =
2373 configuration::ConnectionToNode(channel, node);
2374 node_data->channels[channel_index].time_to_live =
2375 chrono::nanoseconds(connection->time_to_live());
2376 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002377 ++channel_index;
2378 }
2379 }
2380
2381 for (const Channel *channel : *config->channels()) {
2382 source_node_.emplace_back(configuration::GetNodeIndex(
2383 config, channel->source_node()->string_view()));
2384 }
2385 }
2386}
2387
2388void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002389 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08002390 CHECK_NE(timestamp_mapper->node(), node());
2391 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
2392
2393 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002394 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08002395 // we could needlessly save data.
2396 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08002397 VLOG(1) << "Registering on node " << node() << " for peer node "
2398 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08002399 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
2400
2401 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07002402
2403 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08002404 }
2405}
2406
Adam Snaider13d48d92023-08-03 12:20:15 -07002407void TimestampMapper::QueueMessage(const Message *msg) {
Austin Schuh60e77942022-05-16 17:48:24 -07002408 matched_messages_.emplace_back(
Austin Schuh63097262023-08-16 17:04:29 -07002409 TimestampedMessage{.channel_index = msg->channel_index,
2410 .queue_index = msg->queue_index,
2411 .monotonic_event_time = msg->timestamp,
2412 .realtime_event_time = msg->data->realtime_sent_time,
Austin Schuh60e77942022-05-16 17:48:24 -07002413 .remote_queue_index = BootQueueIndex::Invalid(),
2414 .monotonic_remote_time = BootTimestamp::min_time(),
2415 .realtime_remote_time = realtime_clock::min_time,
2416 .monotonic_timestamp_time = BootTimestamp::min_time(),
Austin Schuh63097262023-08-16 17:04:29 -07002417 .data = std::move(msg->data)});
2418 VLOG(1) << node_name() << " Inserted " << matched_messages_.back();
Austin Schuhd2f96102020-12-01 20:27:29 -08002419}
2420
2421TimestampedMessage *TimestampMapper::Front() {
2422 // No need to fetch anything new. A previous message still exists.
2423 switch (first_message_) {
2424 case FirstMessage::kNeedsUpdate:
2425 break;
2426 case FirstMessage::kInMessage:
Austin Schuh63097262023-08-16 17:04:29 -07002427 VLOG(1) << this << " TimestampMapper::Front " << node_name() << " "
2428 << matched_messages_.front();
Austin Schuh79b30942021-01-24 22:32:21 -08002429 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002430 case FirstMessage::kNullptr:
Austin Schuh63097262023-08-16 17:04:29 -07002431 VLOG(1) << this << " TimestampMapper::Front " << node_name()
2432 << " nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08002433 return nullptr;
2434 }
2435
Austin Schuh79b30942021-01-24 22:32:21 -08002436 if (matched_messages_.empty()) {
2437 if (!QueueMatched()) {
2438 first_message_ = FirstMessage::kNullptr;
Austin Schuh63097262023-08-16 17:04:29 -07002439 VLOG(1) << this << " TimestampMapper::Front " << node_name()
2440 << " nullptr";
Austin Schuh79b30942021-01-24 22:32:21 -08002441 return nullptr;
2442 }
2443 }
2444 first_message_ = FirstMessage::kInMessage;
Austin Schuh63097262023-08-16 17:04:29 -07002445 VLOG(1) << this << " TimestampMapper::Front " << node_name() << " "
2446 << matched_messages_.front();
Austin Schuh79b30942021-01-24 22:32:21 -08002447 return &matched_messages_.front();
2448}
2449
2450bool TimestampMapper::QueueMatched() {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002451 MatchResult result = MatchResult::kEndOfFile;
2452 do {
2453 result = MaybeQueueMatched();
2454 } while (result == MatchResult::kSkipped);
2455 return result == MatchResult::kQueued;
2456}
2457
2458bool TimestampMapper::CheckReplayChannelsAndMaybePop(
2459 const TimestampedMessage & /*message*/) {
2460 if (replay_channels_callback_ &&
2461 !replay_channels_callback_(matched_messages_.back())) {
Austin Schuh63097262023-08-16 17:04:29 -07002462 VLOG(1) << node_name() << " Popped " << matched_messages_.back();
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002463 matched_messages_.pop_back();
2464 return true;
2465 }
2466 return false;
2467}
2468
2469TimestampMapper::MatchResult TimestampMapper::MaybeQueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08002470 if (nodes_data_.empty()) {
2471 // Simple path. We are single node, so there are no timestamps to match!
2472 CHECK_EQ(messages_.size(), 0u);
Adam Snaider13d48d92023-08-03 12:20:15 -07002473 const Message *msg = boot_merger_.Front();
Austin Schuh63097262023-08-16 17:04:29 -07002474 if (!msg) {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002475 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002476 }
Austin Schuh79b30942021-01-24 22:32:21 -08002477 // Enqueue this message into matched_messages_ so we have a place to
2478 // associate remote timestamps, and return it.
Austin Schuh63097262023-08-16 17:04:29 -07002479 QueueMessage(msg);
Austin Schuhd2f96102020-12-01 20:27:29 -08002480
Austin Schuh63097262023-08-16 17:04:29 -07002481 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
2482 << " on " << node_name();
Austin Schuh79b30942021-01-24 22:32:21 -08002483 last_message_time_ = matched_messages_.back().monotonic_event_time;
2484
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002485 // We are thin wrapper around parts_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002486 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08002487 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002488 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2489 return MatchResult::kSkipped;
2490 }
2491 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002492 }
2493
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002494 // We need to only add messages to the list so they get processed for
2495 // messages which are delivered. Reuse the flow below which uses messages_
2496 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08002497 if (messages_.empty()) {
2498 if (!Queue()) {
2499 // Found nothing to add, we are out of data!
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002500 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002501 }
2502
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002503 // Now that it has been added (and cannibalized), forget about it
2504 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002505 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002506 }
2507
Austin Schuh63097262023-08-16 17:04:29 -07002508 Message *msg = &(messages_.front());
Austin Schuhd2f96102020-12-01 20:27:29 -08002509
Austin Schuh63097262023-08-16 17:04:29 -07002510 if (source_node_[msg->channel_index] == node()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002511 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh63097262023-08-16 17:04:29 -07002512 QueueMessage(msg);
2513 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
2514 << " on " << node_name();
Austin Schuh79b30942021-01-24 22:32:21 -08002515 last_message_time_ = matched_messages_.back().monotonic_event_time;
2516 messages_.pop_front();
2517 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002518 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2519 return MatchResult::kSkipped;
2520 }
2521 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002522 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002523 // Got a timestamp, find the matching remote data, match it, and return
2524 // it.
Austin Schuh63097262023-08-16 17:04:29 -07002525 Message data = MatchingMessageFor(*msg);
Austin Schuhd2f96102020-12-01 20:27:29 -08002526
2527 // Return the data from the remote. The local message only has timestamp
2528 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08002529 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuh63097262023-08-16 17:04:29 -07002530 .channel_index = msg->channel_index,
2531 .queue_index = msg->queue_index,
2532 .monotonic_event_time = msg->timestamp,
2533 .realtime_event_time = msg->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07002534 .remote_queue_index =
Austin Schuh63097262023-08-16 17:04:29 -07002535 BootQueueIndex{.boot = msg->monotonic_remote_boot,
2536 .index = msg->data->remote_queue_index.value()},
2537 .monotonic_remote_time = {msg->monotonic_remote_boot,
2538 msg->data->monotonic_remote_time.value()},
2539 .realtime_remote_time = msg->data->realtime_remote_time.value(),
2540 .monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
2541 msg->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08002542 .data = std::move(data.data)});
Austin Schuh63097262023-08-16 17:04:29 -07002543 VLOG(1) << node_name() << " Inserted timestamp "
2544 << matched_messages_.back();
2545 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_)
2546 << " on " << node_name() << " " << matched_messages_.back();
Austin Schuh79b30942021-01-24 22:32:21 -08002547 last_message_time_ = matched_messages_.back().monotonic_event_time;
2548 // Since messages_ holds the data, drop it.
2549 messages_.pop_front();
2550 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002551 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2552 return MatchResult::kSkipped;
2553 }
2554 return MatchResult::kQueued;
Austin Schuh79b30942021-01-24 22:32:21 -08002555 }
2556}
2557
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002558void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08002559 while (last_message_time_ <= queue_time) {
2560 if (!QueueMatched()) {
2561 return;
2562 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002563 }
2564}
2565
Austin Schuhe639ea12021-01-25 13:00:22 -08002566void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002567 // Note: queueing for time doesn't really work well across boots. So we
2568 // just assume that if you are using this, you only care about the current
2569 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002570 //
2571 // TODO(austin): Is that the right concept?
2572 //
Austin Schuhe639ea12021-01-25 13:00:22 -08002573 // Make sure we have something queued first. This makes the end time
2574 // calculation simpler, and is typically what folks want regardless.
2575 if (matched_messages_.empty()) {
2576 if (!QueueMatched()) {
2577 return;
2578 }
2579 }
2580
2581 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002582 std::max(monotonic_start_time(
2583 matched_messages_.front().monotonic_event_time.boot),
2584 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08002585 time_estimation_buffer;
2586
2587 // Place sorted messages on the list until we have
2588 // --time_estimation_buffer_seconds seconds queued up (but queue at least
2589 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002590 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08002591 if (!QueueMatched()) {
2592 return;
2593 }
2594 }
2595}
2596
Austin Schuhd2f96102020-12-01 20:27:29 -08002597void TimestampMapper::PopFront() {
2598 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08002599 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002600 first_message_ = FirstMessage::kNeedsUpdate;
2601
Austin Schuh63097262023-08-16 17:04:29 -07002602 VLOG(1) << node_name() << " Popped " << matched_messages_.back();
Austin Schuh79b30942021-01-24 22:32:21 -08002603 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002604}
2605
2606Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002607 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002608 CHECK_NOTNULL(message.data);
2609 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07002610 const BootQueueIndex remote_queue_index =
2611 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002612 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08002613
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002614 CHECK(message.data->monotonic_remote_time.has_value());
2615 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08002616
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002617 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07002618 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002619 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002620 const realtime_clock::time_point realtime_remote_time =
2621 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002622
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002623 TimestampMapper *peer =
2624 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08002625
2626 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002627 // asked to pull a timestamp from a peer which doesn't exist, return an
2628 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08002629 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002630 // TODO(austin): Make sure the tests hit all these paths with a boot count
2631 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002632 return Message{.channel_index = message.channel_index,
2633 .queue_index = remote_queue_index,
2634 .timestamp = monotonic_remote_time,
2635 .monotonic_remote_boot = 0xffffff,
2636 .monotonic_timestamp_boot = 0xffffff,
2637 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08002638 }
2639
2640 // The queue which will have the matching data, if available.
2641 std::deque<Message> *data_queue =
2642 &peer->nodes_data_[node()].channels[message.channel_index].messages;
2643
Austin Schuh79b30942021-01-24 22:32:21 -08002644 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08002645
2646 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002647 return Message{.channel_index = message.channel_index,
2648 .queue_index = remote_queue_index,
2649 .timestamp = monotonic_remote_time,
2650 .monotonic_remote_boot = 0xffffff,
2651 .monotonic_timestamp_boot = 0xffffff,
2652 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002653 }
2654
Austin Schuhd2f96102020-12-01 20:27:29 -08002655 if (remote_queue_index < data_queue->front().queue_index ||
2656 remote_queue_index > data_queue->back().queue_index) {
Austin Schuh60e77942022-05-16 17:48:24 -07002657 return Message{.channel_index = message.channel_index,
2658 .queue_index = remote_queue_index,
2659 .timestamp = monotonic_remote_time,
2660 .monotonic_remote_boot = 0xffffff,
2661 .monotonic_timestamp_boot = 0xffffff,
2662 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002663 }
2664
Austin Schuh993ccb52020-12-12 15:59:32 -08002665 // The algorithm below is constant time with some assumptions. We need there
2666 // to be no missing messages in the data stream. This also assumes a queue
2667 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07002668 if (data_queue->back().queue_index.boot ==
2669 data_queue->front().queue_index.boot &&
2670 (data_queue->back().queue_index.index -
2671 data_queue->front().queue_index.index + 1u ==
2672 data_queue->size())) {
2673 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08002674 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07002675 //
2676 // TODO(austin): Move if not reliable.
2677 Message result = (*data_queue)[remote_queue_index.index -
2678 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08002679
2680 CHECK_EQ(result.timestamp, monotonic_remote_time)
2681 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh6a7358f2021-11-18 22:40:40 -08002682 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08002683 << ": Queue index matches, but timestamp doesn't. Please investigate!";
2684 // Now drop the data off the front. We have deduplicated timestamps, so we
2685 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07002686 data_queue->erase(
2687 data_queue->begin(),
2688 data_queue->begin() +
2689 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08002690 return result;
2691 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07002692 // TODO(austin): Binary search.
2693 auto it = std::find_if(
2694 data_queue->begin(), data_queue->end(),
2695 [remote_queue_index,
Austin Schuh63097262023-08-16 17:04:29 -07002696 remote_boot = monotonic_remote_time.boot](const Message &msg) {
2697 return msg.queue_index == remote_queue_index &&
2698 msg.timestamp.boot == remote_boot;
Austin Schuh58646e22021-08-23 23:51:46 -07002699 });
Austin Schuh993ccb52020-12-12 15:59:32 -08002700 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002701 return Message{.channel_index = message.channel_index,
2702 .queue_index = remote_queue_index,
2703 .timestamp = monotonic_remote_time,
2704 .monotonic_remote_boot = 0xffffff,
2705 .monotonic_timestamp_boot = 0xffffff,
2706 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08002707 }
2708
2709 Message result = std::move(*it);
2710
2711 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002712 << ": Queue index matches, but timestamp doesn't. Please "
2713 "investigate!";
2714 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
2715 << ": Queue index matches, but timestamp doesn't. Please "
2716 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08002717
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08002718 // Erase everything up to this message. We want to keep 1 message in the
2719 // queue so we can handle reliable messages forwarded across boots.
2720 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08002721
2722 return result;
2723 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002724}
2725
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002726void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002727 if (queued_until_ > t) {
2728 return;
2729 }
2730 while (true) {
2731 if (!messages_.empty() && messages_.back().timestamp > t) {
2732 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
2733 return;
2734 }
2735
2736 if (!Queue()) {
2737 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002738 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08002739 return;
2740 }
2741
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002742 // Now that it has been added (and cannibalized), forget about it
2743 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002744 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002745 }
2746}
2747
2748bool TimestampMapper::Queue() {
Adam Snaider13d48d92023-08-03 12:20:15 -07002749 const Message *msg = boot_merger_.Front();
Austin Schuh63097262023-08-16 17:04:29 -07002750 if (msg == nullptr) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002751 return false;
2752 }
2753 for (NodeData &node_data : nodes_data_) {
2754 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07002755 if (!node_data.save_for_peer) continue;
Austin Schuh63097262023-08-16 17:04:29 -07002756 if (node_data.channels[msg->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08002757 // If we have data but no timestamps (logs where the timestamps didn't get
2758 // logged are classic), we can grow this indefinitely. We don't need to
2759 // keep anything that is older than the last message returned.
2760
2761 // We have the time on the source node.
2762 // We care to wait until we have the time on the destination node.
2763 std::deque<Message> &messages =
Austin Schuh63097262023-08-16 17:04:29 -07002764 node_data.channels[msg->channel_index].messages;
Austin Schuh6a7358f2021-11-18 22:40:40 -08002765 // Max delay over the network is the TTL, so let's take the queue time and
2766 // add TTL to it. Don't forget any messages which are reliable until
2767 // someone can come up with a good reason to forget those too.
Austin Schuh63097262023-08-16 17:04:29 -07002768 if (node_data.channels[msg->channel_index].time_to_live >
Austin Schuh6a7358f2021-11-18 22:40:40 -08002769 chrono::nanoseconds(0)) {
2770 // We need to make *some* assumptions about network delay for this to
2771 // work. We want to only look at the RX side. This means we need to
2772 // track the last time a message was popped from any channel from the
2773 // node sending this message, and compare that to the max time we expect
2774 // that a message will take to be delivered across the network. This
2775 // assumes that messages are popped in time order as a proxy for
2776 // measuring the distributed time at this layer.
2777 //
2778 // Leave at least 1 message in here so we can handle reboots and
2779 // messages getting sent twice.
2780 while (messages.size() > 1u &&
2781 messages.begin()->timestamp +
Austin Schuh63097262023-08-16 17:04:29 -07002782 node_data.channels[msg->channel_index].time_to_live +
Austin Schuh6a7358f2021-11-18 22:40:40 -08002783 chrono::duration_cast<chrono::nanoseconds>(
2784 chrono::duration<double>(FLAGS_max_network_delay)) <
2785 last_popped_message_time_) {
2786 messages.pop_front();
2787 }
2788 }
Austin Schuh63097262023-08-16 17:04:29 -07002789 node_data.channels[msg->channel_index].messages.emplace_back(*msg);
Austin Schuhd2f96102020-12-01 20:27:29 -08002790 }
2791 }
2792
Austin Schuh63097262023-08-16 17:04:29 -07002793 messages_.emplace_back(std::move(*msg));
Austin Schuhd2f96102020-12-01 20:27:29 -08002794 return true;
2795}
2796
Austin Schuh63097262023-08-16 17:04:29 -07002797void TimestampMapper::QueueTimestamps() {
2798 boot_merger_.QueueTimestamps(std::ref(timestamp_callback_), source_node_);
2799}
2800
Austin Schuhd2f96102020-12-01 20:27:29 -08002801std::string TimestampMapper::DebugString() const {
2802 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07002803 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002804 for (const Message &message : messages_) {
2805 ss << " " << message << "\n";
2806 }
2807 ss << "] queued_until " << queued_until_;
2808 for (const NodeData &ns : nodes_data_) {
2809 if (ns.peer == nullptr) continue;
2810 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
2811 size_t channel_index = 0;
2812 for (const NodeData::ChannelData &channel_data :
2813 ns.peer->nodes_data_[node()].channels) {
2814 if (channel_data.messages.empty()) {
2815 continue;
2816 }
Austin Schuhb000de62020-12-03 22:00:40 -08002817
Austin Schuhd2f96102020-12-01 20:27:29 -08002818 ss << " channel " << channel_index << " [\n";
Austin Schuh63097262023-08-16 17:04:29 -07002819 for (const Message &msg : channel_data.messages) {
2820 ss << " " << msg << "\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002821 }
2822 ss << " ]\n";
2823 ++channel_index;
2824 }
2825 ss << "] queued_until " << ns.peer->queued_until_;
2826 }
2827 return ss.str();
2828}
2829
Brian Silvermanf51499a2020-09-21 12:49:08 -07002830} // namespace aos::logger