blob: 36bac42c6422ad3cd67e68122fb418a98990dd92 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Alexei Strots01395492023-03-20 13:59:56 -070010#include <filesystem>
Austin Schuha36c8902019-12-30 18:07:15 -080011
Austin Schuhe4fca832020-03-07 16:58:53 -080012#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070013#include "flatbuffers/flatbuffers.h"
14#include "gflags/gflags.h"
15#include "glog/logging.h"
16
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070018#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080019#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080020#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080021
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070022#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070023#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070024#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070025#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070026#else
27#define ENABLE_LZMA 0
28#endif
29
30#if ENABLE_LZMA
31#include "aos/events/logging/lzma_encoder.h"
32#endif
Austin Schuh86110712022-09-16 15:40:54 -070033#if ENABLE_S3
34#include "aos/events/logging/s3_fetcher.h"
35#endif
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070036
Austin Schuh48d10d62022-10-16 22:19:23 -070037DEFINE_int32(flush_size, 128 * 1024,
Austin Schuha36c8902019-12-30 18:07:15 -080038 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070039DEFINE_double(
40 flush_period, 5.0,
41 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080042
Austin Schuha040c3f2021-02-13 16:09:07 -080043DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080044 max_network_delay, 1.0,
45 "Max time to assume a message takes to cross the network before we are "
46 "willing to drop it from our buffers and assume it didn't make it. "
47 "Increasing this number can increase memory usage depending on the packet "
48 "loss of your network or if the timestamps aren't logged for a message.");
49
50DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080051 max_out_of_order, -1,
52 "If set, this overrides the max out of order duration for a log file.");
53
Austin Schuh0e8db662021-07-06 10:43:47 -070054DEFINE_bool(workaround_double_headers, true,
55 "Some old log files have two headers at the beginning. Use the "
56 "last header as the actual header.");
57
Brian Smarttea913d42021-12-10 15:02:38 -080058DEFINE_bool(crash_on_corrupt_message, true,
59 "When true, MessageReader will crash the first time a message "
60 "with corrupted format is found. When false, the crash will be "
61 "suppressed, and any remaining readable messages will be "
62 "evaluated to present verified vs corrupted stats.");
63
64DEFINE_bool(ignore_corrupt_messages, false,
65 "When true, and crash_on_corrupt_message is false, then any "
66 "corrupt message found by MessageReader be silently ignored, "
67 "providing access to all uncorrupted messages in a logfile.");
68
Alexei Strotsa3194712023-04-21 23:30:50 -070069DECLARE_bool(quiet_sorting);
70
Brian Silvermanf51499a2020-09-21 12:49:08 -070071namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070072namespace {
Austin Schuh05b70472020-01-01 17:11:17 -080073namespace chrono = std::chrono;
74
Alexei Strotscee7b372023-04-21 11:57:54 -070075std::unique_ptr<DataDecoder> ResolveDecoder(std::string_view filename,
76 bool quiet) {
77 static constexpr std::string_view kS3 = "s3:";
78
79 std::unique_ptr<DataDecoder> decoder;
80
81 if (filename.substr(0, kS3.size()) == kS3) {
82#if ENABLE_S3
83 decoder = std::make_unique<S3Fetcher>(filename);
84#else
85 LOG(FATAL) << "Reading files from S3 not supported on this platform";
86#endif
87 } else {
88 decoder = std::make_unique<DummyDecoder>(filename);
89 }
90
91 static constexpr std::string_view kXz = ".xz";
92 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
93 if (filename.substr(filename.size() - kXz.size()) == kXz) {
94#if ENABLE_LZMA
95 decoder = std::make_unique<ThreadedLzmaDecoder>(std::move(decoder), quiet);
96#else
97 (void)quiet;
98 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
99#endif
100 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
101 decoder = std::make_unique<SnappyDecoder>(std::move(decoder));
102 }
103 return decoder;
104}
105
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700106template <typename T>
107void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
108 if (t.has_value()) {
109 *os << *t;
110 } else {
111 *os << "null";
112 }
113}
114} // namespace
115
Alexei Strotsbc082d82023-05-03 08:43:42 -0700116DetachedBufferWriter::DetachedBufferWriter(std::unique_ptr<LogSink> log_sink,
117 std::unique_ptr<DataEncoder> encoder)
118 : log_sink_(std::move(log_sink)), encoder_(std::move(encoder)) {
119 CHECK(log_sink_);
120 ran_out_of_space_ = log_sink_->OpenForWrite() == WriteCode::kOutOfSpace;
Alexei Strots01395492023-03-20 13:59:56 -0700121 if (ran_out_of_space_) {
122 LOG(WARNING) << "And we are out of space";
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800123 }
124}
125
Austin Schuha36c8902019-12-30 18:07:15 -0800126DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700127 Close();
128 if (ran_out_of_space_) {
129 CHECK(acknowledge_ran_out_of_space_)
130 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -0700131 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700132}
133
Brian Silvermand90905f2020-09-23 14:42:56 -0700134DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700135 *this = std::move(other);
136}
137
Brian Silverman87ac0402020-09-17 14:47:01 -0700138// When other is destroyed "soon" (which it should be because we're getting an
139// rvalue reference to it), it will flush etc all the data we have queued up
140// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700141DetachedBufferWriter &DetachedBufferWriter::operator=(
142 DetachedBufferWriter &&other) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700143 std::swap(log_sink_, other.log_sink_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700144 std::swap(encoder_, other.encoder_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700145 std::swap(ran_out_of_space_, other.ran_out_of_space_);
146 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800147 std::swap(last_flush_time_, other.last_flush_time_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700148 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800149}
150
Austin Schuh8bdfc492023-02-11 12:53:13 -0800151void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *copier,
Austin Schuh7ef11a42023-02-04 17:15:12 -0800152 aos::monotonic_clock::time_point now) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700153 if (ran_out_of_space_) {
154 // We don't want any later data to be written after space becomes
155 // available, so refuse to write anything more once we've dropped data
156 // because we ran out of space.
Austin Schuh48d10d62022-10-16 22:19:23 -0700157 return;
Austin Schuha36c8902019-12-30 18:07:15 -0800158 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700159
Austin Schuh8bdfc492023-02-11 12:53:13 -0800160 const size_t message_size = copier->size();
161 size_t overall_bytes_written = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -0700162
Austin Schuh8bdfc492023-02-11 12:53:13 -0800163 // Keep writing chunks until we've written it all. If we end up with a
164 // partial write, this means we need to flush to disk.
165 do {
Alexei Strots01395492023-03-20 13:59:56 -0700166 const size_t bytes_written =
167 encoder_->Encode(copier, overall_bytes_written);
Austin Schuh8bdfc492023-02-11 12:53:13 -0800168 CHECK(bytes_written != 0);
169
170 overall_bytes_written += bytes_written;
171 if (overall_bytes_written < message_size) {
172 VLOG(1) << "Flushing because of a partial write, tried to write "
173 << message_size << " wrote " << overall_bytes_written;
174 Flush(now);
175 }
176 } while (overall_bytes_written < message_size);
177
Austin Schuhbd06ae42021-03-31 22:48:21 -0700178 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800179}
180
Brian Silverman0465fcf2020-09-24 00:29:18 -0700181void DetachedBufferWriter::Close() {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700182 if (!log_sink_->is_open()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700183 return;
184 }
185 encoder_->Finish();
186 while (encoder_->queue_size() > 0) {
Austin Schuh8bdfc492023-02-11 12:53:13 -0800187 Flush(monotonic_clock::max_time);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700188 }
Austin Schuhb2461652023-05-01 08:30:56 -0700189 encoder_.reset();
Alexei Strotsbc082d82023-05-03 08:43:42 -0700190 ran_out_of_space_ = log_sink_->Close() == WriteCode::kOutOfSpace;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700191}
192
Austin Schuh8bdfc492023-02-11 12:53:13 -0800193void DetachedBufferWriter::Flush(aos::monotonic_clock::time_point now) {
194 last_flush_time_ = now;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700195 if (ran_out_of_space_) {
196 // We don't want any later data to be written after space becomes available,
197 // so refuse to write anything more once we've dropped data because we ran
198 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700199 if (encoder_) {
200 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
201 encoder_->Clear(encoder_->queue().size());
202 } else {
203 VLOG(1) << "No queue to ignore";
204 }
205 return;
206 }
207
208 const auto queue = encoder_->queue();
209 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700210 return;
211 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700212
Alexei Strotsbc082d82023-05-03 08:43:42 -0700213 const WriteResult result = log_sink_->Write(queue);
Alexei Strots01395492023-03-20 13:59:56 -0700214 encoder_->Clear(result.messages_written);
215 ran_out_of_space_ = result.code == WriteCode::kOutOfSpace;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700216}
217
Austin Schuhbd06ae42021-03-31 22:48:21 -0700218void DetachedBufferWriter::FlushAtThreshold(
219 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700220 if (ran_out_of_space_) {
221 // We don't want any later data to be written after space becomes available,
222 // so refuse to write anything more once we've dropped data because we ran
223 // out of space.
224 if (encoder_) {
225 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
226 encoder_->Clear(encoder_->queue().size());
227 } else {
228 VLOG(1) << "No queue to ignore";
229 }
230 return;
231 }
232
Austin Schuhbd06ae42021-03-31 22:48:21 -0700233 // We don't want to flush the first time through. Otherwise we will flush as
234 // the log file header might be compressing, defeating any parallelism and
235 // queueing there.
236 if (last_flush_time_ == aos::monotonic_clock::min_time) {
237 last_flush_time_ = now;
238 }
239
Brian Silvermanf51499a2020-09-21 12:49:08 -0700240 // Flush if we are at the max number of iovs per writev, because there's no
241 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700242 // data queued up or if it has been long enough.
Austin Schuh8bdfc492023-02-11 12:53:13 -0800243 while (encoder_->space() == 0 ||
244 encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700245 encoder_->queue_size() >= IOV_MAX ||
Austin Schuh3ebaf782023-04-07 16:03:28 -0700246 (now > last_flush_time_ +
247 chrono::duration_cast<chrono::nanoseconds>(
248 chrono::duration<double>(FLAGS_flush_period)) &&
249 encoder_->queued_bytes() != 0)) {
250 VLOG(1) << "Chose to flush at " << now << ", last " << last_flush_time_
251 << " queued bytes " << encoder_->queued_bytes();
Austin Schuh8bdfc492023-02-11 12:53:13 -0800252 Flush(now);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700253 }
Austin Schuha36c8902019-12-30 18:07:15 -0800254}
255
Austin Schuhf2d0e682022-10-16 14:20:58 -0700256// Do the magic dance to convert the endianness of the data and append it to the
257// buffer.
258namespace {
259
260// TODO(austin): Look at the generated code to see if building the header is
261// efficient or not.
262template <typename T>
263uint8_t *Push(uint8_t *buffer, const T data) {
264 const T endian_data = flatbuffers::EndianScalar<T>(data);
265 std::memcpy(buffer, &endian_data, sizeof(T));
266 return buffer + sizeof(T);
267}
268
269uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
270 std::memcpy(buffer, data, size);
271 return buffer + size;
272}
273
274uint8_t *Pad(uint8_t *buffer, size_t padding) {
275 std::memset(buffer, 0, padding);
276 return buffer + padding;
277}
278} // namespace
279
280flatbuffers::Offset<MessageHeader> PackRemoteMessage(
281 flatbuffers::FlatBufferBuilder *fbb,
282 const message_bridge::RemoteMessage *msg, int channel_index,
283 const aos::monotonic_clock::time_point monotonic_timestamp_time) {
284 logger::MessageHeader::Builder message_header_builder(*fbb);
285 // Note: this must match the same order as MessageBridgeServer and
286 // PackMessage. We want identical headers to have identical
287 // on-the-wire formats to make comparing them easier.
288
289 message_header_builder.add_channel_index(channel_index);
290
291 message_header_builder.add_queue_index(msg->queue_index());
292 message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
293 message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
294
295 message_header_builder.add_monotonic_remote_time(
296 msg->monotonic_remote_time());
297 message_header_builder.add_realtime_remote_time(msg->realtime_remote_time());
298 message_header_builder.add_remote_queue_index(msg->remote_queue_index());
299
300 message_header_builder.add_monotonic_timestamp_time(
301 monotonic_timestamp_time.time_since_epoch().count());
302
303 return message_header_builder.Finish();
304}
305
306size_t PackRemoteMessageInline(
307 uint8_t *buffer, const message_bridge::RemoteMessage *msg,
308 int channel_index,
Austin Schuh71a40d42023-02-04 21:22:22 -0800309 const aos::monotonic_clock::time_point monotonic_timestamp_time,
310 size_t start_byte, size_t end_byte) {
Austin Schuhf2d0e682022-10-16 14:20:58 -0700311 const flatbuffers::uoffset_t message_size = PackRemoteMessageSize();
Austin Schuh71a40d42023-02-04 21:22:22 -0800312 DCHECK_EQ((start_byte % 8u), 0u);
313 DCHECK_EQ((end_byte % 8u), 0u);
314 DCHECK_LE(start_byte, end_byte);
315 DCHECK_LE(end_byte, message_size);
Austin Schuhf2d0e682022-10-16 14:20:58 -0700316
Austin Schuh71a40d42023-02-04 21:22:22 -0800317 switch (start_byte) {
318 case 0x00u:
319 if ((end_byte) == 0x00u) {
320 break;
321 }
322 // clang-format off
323 // header:
324 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
325 buffer = Push<flatbuffers::uoffset_t>(
326 buffer, message_size - sizeof(flatbuffers::uoffset_t));
327 // +0x04 | 20 00 00 00 | UOffset32 | 0x00000020 (32) Loc: +0x24 | offset to root table `aos.logger.MessageHeader`
328 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x20);
329 [[fallthrough]];
330 case 0x08u:
331 if ((end_byte) == 0x08u) {
332 break;
333 }
334 //
335 // padding:
336 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
337 buffer = Pad(buffer, 6);
338 //
339 // vtable (aos.logger.MessageHeader):
340 // +0x0E | 16 00 | uint16_t | 0x0016 (22) | size of this vtable
341 buffer = Push<flatbuffers::voffset_t>(buffer, 0x16);
342 [[fallthrough]];
343 case 0x10u:
344 if ((end_byte) == 0x10u) {
345 break;
346 }
347 // +0x10 | 3C 00 | uint16_t | 0x003C (60) | size of referring table
348 buffer = Push<flatbuffers::voffset_t>(buffer, 0x3c);
349 // +0x12 | 38 00 | VOffset16 | 0x0038 (56) | offset to field `channel_index` (id: 0)
350 buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
351 // +0x14 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `monotonic_sent_time` (id: 1)
352 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
353 // +0x16 | 24 00 | VOffset16 | 0x0024 (36) | offset to field `realtime_sent_time` (id: 2)
354 buffer = Push<flatbuffers::voffset_t>(buffer, 0x24);
355 [[fallthrough]];
356 case 0x18u:
357 if ((end_byte) == 0x18u) {
358 break;
359 }
360 // +0x18 | 34 00 | VOffset16 | 0x0034 (52) | offset to field `queue_index` (id: 3)
361 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
362 // +0x1A | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
363 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
364 // +0x1C | 1C 00 | VOffset16 | 0x001C (28) | offset to field `monotonic_remote_time` (id: 5)
365 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
366 // +0x1E | 14 00 | VOffset16 | 0x0014 (20) | offset to field `realtime_remote_time` (id: 6)
367 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
368 [[fallthrough]];
369 case 0x20u:
370 if ((end_byte) == 0x20u) {
371 break;
372 }
373 // +0x20 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `remote_queue_index` (id: 7)
374 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
375 // +0x22 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `monotonic_timestamp_time` (id: 8)
376 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
377 //
378 // root_table (aos.logger.MessageHeader):
379 // +0x24 | 16 00 00 00 | SOffset32 | 0x00000016 (22) Loc: +0x0E | offset to vtable
380 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x16);
381 [[fallthrough]];
382 case 0x28u:
383 if ((end_byte) == 0x28u) {
384 break;
385 }
386 // +0x28 | F6 0B D8 11 A4 A8 B1 71 | int64_t | 0x71B1A8A411D80BF6 (8192514619791117302) | table field `monotonic_timestamp_time` (Long)
387 buffer = Push<int64_t>(buffer,
388 monotonic_timestamp_time.time_since_epoch().count());
389 [[fallthrough]];
390 case 0x30u:
391 if ((end_byte) == 0x30u) {
392 break;
393 }
394 // +0x30 | 00 00 00 00 | uint8_t[4] | .... | padding
395 // TODO(austin): Can we re-arrange the order to ditch the padding?
396 // (Answer is yes, but what is the impact elsewhere? It will change the
397 // binary format)
398 buffer = Pad(buffer, 4);
399 // +0x34 | 75 00 00 00 | uint32_t | 0x00000075 (117) | table field `remote_queue_index` (UInt)
400 buffer = Push<uint32_t>(buffer, msg->remote_queue_index());
401 [[fallthrough]];
402 case 0x38u:
403 if ((end_byte) == 0x38u) {
404 break;
405 }
406 // +0x38 | AA B0 43 0A 35 BE FA D2 | int64_t | 0xD2FABE350A43B0AA (-3244071446552268630) | table field `realtime_remote_time` (Long)
407 buffer = Push<int64_t>(buffer, msg->realtime_remote_time());
408 [[fallthrough]];
409 case 0x40u:
410 if ((end_byte) == 0x40u) {
411 break;
412 }
413 // +0x40 | D5 40 30 F3 C1 A7 26 1D | int64_t | 0x1D26A7C1F33040D5 (2100550727665467605) | table field `monotonic_remote_time` (Long)
414 buffer = Push<int64_t>(buffer, msg->monotonic_remote_time());
415 [[fallthrough]];
416 case 0x48u:
417 if ((end_byte) == 0x48u) {
418 break;
419 }
420 // +0x48 | 5B 25 32 A1 4A E8 46 CA | int64_t | 0xCA46E84AA132255B (-3871151422448720549) | table field `realtime_sent_time` (Long)
421 buffer = Push<int64_t>(buffer, msg->realtime_sent_time());
422 [[fallthrough]];
423 case 0x50u:
424 if ((end_byte) == 0x50u) {
425 break;
426 }
427 // +0x50 | 49 7D 45 1F 8C 36 6B A3 | int64_t | 0xA36B368C1F457D49 (-6671178447571288759) | table field `monotonic_sent_time` (Long)
428 buffer = Push<int64_t>(buffer, msg->monotonic_sent_time());
429 [[fallthrough]];
430 case 0x58u:
431 if ((end_byte) == 0x58u) {
432 break;
433 }
434 // +0x58 | 33 00 00 00 | uint32_t | 0x00000033 (51) | table field `queue_index` (UInt)
435 buffer = Push<uint32_t>(buffer, msg->queue_index());
436 // +0x5C | 76 00 00 00 | uint32_t | 0x00000076 (118) | table field `channel_index` (UInt)
437 buffer = Push<uint32_t>(buffer, channel_index);
438 // clang-format on
439 [[fallthrough]];
440 case 0x60u:
441 if ((end_byte) == 0x60u) {
442 break;
443 }
444 }
Austin Schuhf2d0e682022-10-16 14:20:58 -0700445
Austin Schuh71a40d42023-02-04 21:22:22 -0800446 return end_byte - start_byte;
Austin Schuhf2d0e682022-10-16 14:20:58 -0700447}
448
Austin Schuha36c8902019-12-30 18:07:15 -0800449flatbuffers::Offset<MessageHeader> PackMessage(
450 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
451 int channel_index, LogType log_type) {
452 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
453
454 switch (log_type) {
455 case LogType::kLogMessage:
456 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800457 case LogType::kLogRemoteMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700458 // Since the timestamps are 8 byte aligned, we are going to end up adding
459 // padding in the middle of the message to pad everything out to 8 byte
460 // alignment. That's rather wasteful. To make things efficient to mmap
461 // while reading uncompressed logs, we'd actually rather the message be
462 // aligned. So, force 8 byte alignment (enough to preserve alignment
463 // inside the nested message so that we can read it without moving it)
464 // here.
465 fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8);
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700466 data_offset = fbb->CreateVector(
467 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800468 break;
469
470 case LogType::kLogDeliveryTimeOnly:
471 break;
472 }
473
474 MessageHeader::Builder message_header_builder(*fbb);
475 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800476
Austin Schuhfa30c352022-10-16 11:12:02 -0700477 // These are split out into very explicit serialization calls because the
478 // order here changes the order things are written out on the wire, and we
479 // want to control and understand it here. Changing the order can increase
480 // the amount of padding bytes in the middle.
481 //
James Kuszmaul9776b392023-01-14 14:08:08 -0800482 // It is also easier to follow... And doesn't actually make things much
483 // bigger.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800484 switch (log_type) {
485 case LogType::kLogRemoteMessage:
486 message_header_builder.add_queue_index(context.remote_queue_index);
Austin Schuhfa30c352022-10-16 11:12:02 -0700487 message_header_builder.add_data(data_offset);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800488 message_header_builder.add_monotonic_sent_time(
489 context.monotonic_remote_time.time_since_epoch().count());
490 message_header_builder.add_realtime_sent_time(
491 context.realtime_remote_time.time_since_epoch().count());
492 break;
493
Austin Schuh6f3babe2020-01-26 20:34:50 -0800494 case LogType::kLogDeliveryTimeOnly:
495 message_header_builder.add_queue_index(context.queue_index);
496 message_header_builder.add_monotonic_sent_time(
497 context.monotonic_event_time.time_since_epoch().count());
498 message_header_builder.add_realtime_sent_time(
499 context.realtime_event_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800500 message_header_builder.add_monotonic_remote_time(
501 context.monotonic_remote_time.time_since_epoch().count());
502 message_header_builder.add_realtime_remote_time(
503 context.realtime_remote_time.time_since_epoch().count());
504 message_header_builder.add_remote_queue_index(context.remote_queue_index);
505 break;
Austin Schuhfa30c352022-10-16 11:12:02 -0700506
507 case LogType::kLogMessage:
508 message_header_builder.add_queue_index(context.queue_index);
509 message_header_builder.add_data(data_offset);
510 message_header_builder.add_monotonic_sent_time(
511 context.monotonic_event_time.time_since_epoch().count());
512 message_header_builder.add_realtime_sent_time(
513 context.realtime_event_time.time_since_epoch().count());
514 break;
515
516 case LogType::kLogMessageAndDeliveryTime:
517 message_header_builder.add_queue_index(context.queue_index);
518 message_header_builder.add_remote_queue_index(context.remote_queue_index);
519 message_header_builder.add_monotonic_sent_time(
520 context.monotonic_event_time.time_since_epoch().count());
521 message_header_builder.add_realtime_sent_time(
522 context.realtime_event_time.time_since_epoch().count());
523 message_header_builder.add_monotonic_remote_time(
524 context.monotonic_remote_time.time_since_epoch().count());
525 message_header_builder.add_realtime_remote_time(
526 context.realtime_remote_time.time_since_epoch().count());
527 message_header_builder.add_data(data_offset);
528 break;
Austin Schuha36c8902019-12-30 18:07:15 -0800529 }
530
531 return message_header_builder.Finish();
532}
533
Austin Schuhfa30c352022-10-16 11:12:02 -0700534flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) {
535 switch (log_type) {
536 case LogType::kLogMessage:
537 return
538 // Root table size + offset.
539 sizeof(flatbuffers::uoffset_t) * 2 +
540 // 6 padding bytes to pad the header out properly.
541 6 +
542 // vtable header (size + size of table)
543 sizeof(flatbuffers::voffset_t) * 2 +
544 // offsets to all the fields.
545 sizeof(flatbuffers::voffset_t) * 5 +
546 // pointer to vtable
547 sizeof(flatbuffers::soffset_t) +
548 // pointer to data
549 sizeof(flatbuffers::uoffset_t) +
550 // realtime_sent_time, monotonic_sent_time
551 sizeof(int64_t) * 2 +
552 // queue_index, channel_index
553 sizeof(uint32_t) * 2;
554
555 case LogType::kLogDeliveryTimeOnly:
556 return
557 // Root table size + offset.
558 sizeof(flatbuffers::uoffset_t) * 2 +
559 // 6 padding bytes to pad the header out properly.
560 4 +
561 // vtable header (size + size of table)
562 sizeof(flatbuffers::voffset_t) * 2 +
563 // offsets to all the fields.
564 sizeof(flatbuffers::voffset_t) * 8 +
565 // pointer to vtable
566 sizeof(flatbuffers::soffset_t) +
567 // remote_queue_index
568 sizeof(uint32_t) +
569 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
570 // monotonic_sent_time
571 sizeof(int64_t) * 4 +
572 // queue_index, channel_index
573 sizeof(uint32_t) * 2;
574
575 case LogType::kLogMessageAndDeliveryTime:
576 return
577 // Root table size + offset.
578 sizeof(flatbuffers::uoffset_t) * 2 +
579 // 4 padding bytes to pad the header out properly.
580 4 +
581 // vtable header (size + size of table)
582 sizeof(flatbuffers::voffset_t) * 2 +
583 // offsets to all the fields.
584 sizeof(flatbuffers::voffset_t) * 8 +
585 // pointer to vtable
586 sizeof(flatbuffers::soffset_t) +
587 // pointer to data
588 sizeof(flatbuffers::uoffset_t) +
589 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
590 // monotonic_sent_time
591 sizeof(int64_t) * 4 +
592 // remote_queue_index, queue_index, channel_index
593 sizeof(uint32_t) * 3;
594
595 case LogType::kLogRemoteMessage:
596 return
597 // Root table size + offset.
598 sizeof(flatbuffers::uoffset_t) * 2 +
599 // 6 padding bytes to pad the header out properly.
600 6 +
601 // vtable header (size + size of table)
602 sizeof(flatbuffers::voffset_t) * 2 +
603 // offsets to all the fields.
604 sizeof(flatbuffers::voffset_t) * 5 +
605 // pointer to vtable
606 sizeof(flatbuffers::soffset_t) +
607 // realtime_sent_time, monotonic_sent_time
608 sizeof(int64_t) * 2 +
609 // pointer to data
610 sizeof(flatbuffers::uoffset_t) +
611 // queue_index, channel_index
612 sizeof(uint32_t) * 2;
613 }
614 LOG(FATAL);
615}
616
James Kuszmaul9776b392023-01-14 14:08:08 -0800617flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size) {
Austin Schuhfa30c352022-10-16 11:12:02 -0700618 static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
619 "Update size logic please.");
620 const flatbuffers::uoffset_t aligned_data_length =
Austin Schuh48d10d62022-10-16 22:19:23 -0700621 ((data_size + 7) & 0xfffffff8u);
Austin Schuhfa30c352022-10-16 11:12:02 -0700622 switch (log_type) {
623 case LogType::kLogDeliveryTimeOnly:
624 return PackMessageHeaderSize(log_type);
625
626 case LogType::kLogMessage:
627 case LogType::kLogMessageAndDeliveryTime:
628 case LogType::kLogRemoteMessage:
629 return PackMessageHeaderSize(log_type) +
630 // Vector...
631 sizeof(flatbuffers::uoffset_t) + aligned_data_length;
632 }
633 LOG(FATAL);
634}
635
Austin Schuhfa30c352022-10-16 11:12:02 -0700636size_t PackMessageInline(uint8_t *buffer, const Context &context,
Austin Schuh71a40d42023-02-04 21:22:22 -0800637 int channel_index, LogType log_type, size_t start_byte,
638 size_t end_byte) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700639 // TODO(austin): Figure out how to copy directly from shared memory instead of
640 // first into the fetcher's memory and then into here. That would save a lot
641 // of memory.
Austin Schuhfa30c352022-10-16 11:12:02 -0700642 const flatbuffers::uoffset_t message_size =
Austin Schuh48d10d62022-10-16 22:19:23 -0700643 PackMessageSize(log_type, context.size);
Austin Schuh71a40d42023-02-04 21:22:22 -0800644 DCHECK_EQ((message_size % 8), 0u) << ": Non 8 byte length...";
645 DCHECK_EQ((start_byte % 8u), 0u);
646 DCHECK_EQ((end_byte % 8u), 0u);
647 DCHECK_LE(start_byte, end_byte);
648 DCHECK_LE(end_byte, message_size);
Austin Schuhfa30c352022-10-16 11:12:02 -0700649
650 // Pack all the data in. This is brittle but easy to change. Use the
651 // InlinePackMessage.Equivilent unit test to verify everything matches.
652 switch (log_type) {
653 case LogType::kLogMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -0800654 switch (start_byte) {
655 case 0x00u:
656 if ((end_byte) == 0x00u) {
657 break;
658 }
659 // clang-format off
660 // header:
661 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
662 buffer = Push<flatbuffers::uoffset_t>(
663 buffer, message_size - sizeof(flatbuffers::uoffset_t));
664
665 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
666 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
667 [[fallthrough]];
668 case 0x08u:
669 if ((end_byte) == 0x08u) {
670 break;
671 }
672 //
673 // padding:
674 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
675 buffer = Pad(buffer, 6);
676 //
677 // vtable (aos.logger.MessageHeader):
678 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
679 buffer = Push<flatbuffers::voffset_t>(buffer, 0xe);
680 [[fallthrough]];
681 case 0x10u:
682 if ((end_byte) == 0x10u) {
683 break;
684 }
685 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
686 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
687 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
688 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
689 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
690 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
691 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
692 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
693 [[fallthrough]];
694 case 0x18u:
695 if ((end_byte) == 0x18u) {
696 break;
697 }
698 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
699 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
700 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
701 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
702 //
703 // root_table (aos.logger.MessageHeader):
704 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
705 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e);
706 [[fallthrough]];
707 case 0x20u:
708 if ((end_byte) == 0x20u) {
709 break;
710 }
711 // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long)
712 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
713 [[fallthrough]];
714 case 0x28u:
715 if ((end_byte) == 0x28u) {
716 break;
717 }
718 // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long)
719 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
720 [[fallthrough]];
721 case 0x30u:
722 if ((end_byte) == 0x30u) {
723 break;
724 }
725 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
726 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c);
727 // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt)
728 buffer = Push<uint32_t>(buffer, context.queue_index);
729 [[fallthrough]];
730 case 0x38u:
731 if ((end_byte) == 0x38u) {
732 break;
733 }
734 // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt)
735 buffer = Push<uint32_t>(buffer, channel_index);
736 //
737 // vector (aos.logger.MessageHeader.data):
738 // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items)
739 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
740 [[fallthrough]];
741 case 0x40u:
742 if ((end_byte) == 0x40u) {
743 break;
744 }
745 [[fallthrough]];
746 default:
747 // +0x40 | FF | uint8_t | 0xFF (255) | value[0]
748 // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1]
749 // +0x42 | EE | uint8_t | 0xEE (238) | value[2]
750 // +0x43 | 00 | uint8_t | 0x00 (0) | value[3]
751 // +0x44 | 20 | uint8_t | 0x20 (32) | value[4]
752 // +0x45 | 4D | uint8_t | 0x4D (77) | value[5]
753 // +0x46 | FF | uint8_t | 0xFF (255) | value[6]
754 // +0x47 | 25 | uint8_t | 0x25 (37) | value[7]
755 // +0x48 | 3C | uint8_t | 0x3C (60) | value[8]
756 // +0x49 | 17 | uint8_t | 0x17 (23) | value[9]
757 // +0x4A | 65 | uint8_t | 0x65 (101) | value[10]
758 // +0x4B | 2F | uint8_t | 0x2F (47) | value[11]
759 // +0x4C | 63 | uint8_t | 0x63 (99) | value[12]
760 // +0x4D | 58 | uint8_t | 0x58 (88) | value[13]
761 //
762 // padding:
763 // +0x4E | 00 00 | uint8_t[2] | .. | padding
764 // clang-format on
765 if (start_byte <= 0x40 && end_byte == message_size) {
766 // The easy one, slap it all down.
767 buffer = PushBytes(buffer, context.data, context.size);
768 buffer =
769 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
770 } else {
771 const size_t data_start_byte =
772 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
773 const size_t data_end_byte = end_byte - 0x40;
774 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
775 if (data_start_byte < padded_size) {
776 buffer = PushBytes(
777 buffer,
778 reinterpret_cast<const uint8_t *>(context.data) +
779 data_start_byte,
780 std::min(context.size, data_end_byte) - data_start_byte);
781 if (data_end_byte == padded_size) {
782 // We can only pad the last 7 bytes, so this only gets written
783 // if we write the last byte.
784 buffer = Pad(buffer,
785 ((context.size + 7) & 0xfffffff8u) - context.size);
786 }
787 }
788 }
789 break;
790 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700791 break;
792
793 case LogType::kLogDeliveryTimeOnly:
Austin Schuh71a40d42023-02-04 21:22:22 -0800794 switch (start_byte) {
795 case 0x00u:
796 if ((end_byte) == 0x00u) {
797 break;
798 }
799 // clang-format off
800 // header:
801 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
802 buffer = Push<flatbuffers::uoffset_t>(
803 buffer, message_size - sizeof(flatbuffers::uoffset_t));
804 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
805 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
Austin Schuhfa30c352022-10-16 11:12:02 -0700806
Austin Schuh71a40d42023-02-04 21:22:22 -0800807 [[fallthrough]];
808 case 0x08u:
809 if ((end_byte) == 0x08u) {
810 break;
811 }
812 //
813 // padding:
814 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
815 buffer = Pad(buffer, 4);
816 //
817 // vtable (aos.logger.MessageHeader):
818 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
819 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
820 // +0x0E | 30 00 | uint16_t | 0x0030 (48) | size of referring table
821 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
822 [[fallthrough]];
823 case 0x10u:
824 if ((end_byte) == 0x10u) {
825 break;
826 }
827 // +0x10 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `channel_index` (id: 0)
828 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
829 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
830 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
831 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
832 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
833 // +0x16 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `queue_index` (id: 3)
834 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
835 [[fallthrough]];
836 case 0x18u:
837 if ((end_byte) == 0x18u) {
838 break;
839 }
840 // +0x18 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
841 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
842 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
843 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
844 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
845 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
846 // +0x1E | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
847 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
848 [[fallthrough]];
849 case 0x20u:
850 if ((end_byte) == 0x20u) {
851 break;
852 }
853 //
854 // root_table (aos.logger.MessageHeader):
855 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
856 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
857 // +0x24 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `remote_queue_index` (UInt)
858 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
859 [[fallthrough]];
860 case 0x28u:
861 if ((end_byte) == 0x28u) {
862 break;
863 }
864 // +0x28 | C6 85 F1 AB 83 B5 CD EB | int64_t | 0xEBCDB583ABF185C6 (-1455307527440726586) | table field `realtime_remote_time` (Long)
865 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
866 [[fallthrough]];
867 case 0x30u:
868 if ((end_byte) == 0x30u) {
869 break;
870 }
871 // +0x30 | 47 24 D3 97 1E 42 2D 99 | int64_t | 0x992D421E97D32447 (-7409193112790948793) | table field `monotonic_remote_time` (Long)
872 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
873 [[fallthrough]];
874 case 0x38u:
875 if ((end_byte) == 0x38u) {
876 break;
877 }
878 // +0x38 | C8 B9 A7 AB 79 F2 CD 60 | int64_t | 0x60CDF279ABA7B9C8 (6975498002251626952) | table field `realtime_sent_time` (Long)
879 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
880 [[fallthrough]];
881 case 0x40u:
882 if ((end_byte) == 0x40u) {
883 break;
884 }
885 // +0x40 | EA 8F 2A 0F AF 01 7A AB | int64_t | 0xAB7A01AF0F2A8FEA (-6090553694679822358) | table field `monotonic_sent_time` (Long)
886 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
887 [[fallthrough]];
888 case 0x48u:
889 if ((end_byte) == 0x48u) {
890 break;
891 }
892 // +0x48 | F5 00 00 00 | uint32_t | 0x000000F5 (245) | table field `queue_index` (UInt)
893 buffer = Push<uint32_t>(buffer, context.queue_index);
894 // +0x4C | 88 00 00 00 | uint32_t | 0x00000088 (136) | table field `channel_index` (UInt)
895 buffer = Push<uint32_t>(buffer, channel_index);
896
897 // clang-format on
898 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700899 break;
900
901 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh71a40d42023-02-04 21:22:22 -0800902 switch (start_byte) {
903 case 0x00u:
904 if ((end_byte) == 0x00u) {
905 break;
906 }
907 // clang-format off
908 // header:
909 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
910 buffer = Push<flatbuffers::uoffset_t>(
911 buffer, message_size - sizeof(flatbuffers::uoffset_t));
912 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
913 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
914 [[fallthrough]];
915 case 0x08u:
916 if ((end_byte) == 0x08u) {
917 break;
918 }
919 //
920 // padding:
921 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
922 buffer = Pad(buffer, 4);
923 //
924 // vtable (aos.logger.MessageHeader):
925 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
926 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
927 // +0x0E | 34 00 | uint16_t | 0x0034 (52) | size of referring table
928 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
929 [[fallthrough]];
930 case 0x10u:
931 if ((end_byte) == 0x10u) {
932 break;
933 }
934 // +0x10 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `channel_index` (id: 0)
935 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
936 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
937 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
938 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
939 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
940 // +0x16 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `queue_index` (id: 3)
941 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
942 [[fallthrough]];
943 case 0x18u:
944 if ((end_byte) == 0x18u) {
945 break;
946 }
947 // +0x18 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `data` (id: 4)
948 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
949 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
950 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
951 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
952 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
953 // +0x1E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `remote_queue_index` (id: 7)
954 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
955 [[fallthrough]];
956 case 0x20u:
957 if ((end_byte) == 0x20u) {
958 break;
959 }
960 //
961 // root_table (aos.logger.MessageHeader):
962 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
963 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
964 // +0x24 | 30 00 00 00 | UOffset32 | 0x00000030 (48) Loc: +0x54 | offset to field `data` (vector)
965 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x30);
966 [[fallthrough]];
967 case 0x28u:
968 if ((end_byte) == 0x28u) {
969 break;
970 }
971 // +0x28 | C4 C8 87 BF 40 6C 1F 29 | int64_t | 0x291F6C40BF87C8C4 (2963206105180129476) | table field `realtime_remote_time` (Long)
972 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
973 [[fallthrough]];
974 case 0x30u:
975 if ((end_byte) == 0x30u) {
976 break;
977 }
978 // +0x30 | 0F 00 26 FD D2 6D C0 1F | int64_t | 0x1FC06DD2FD26000F (2287949363661897743) | table field `monotonic_remote_time` (Long)
979 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
980 [[fallthrough]];
981 case 0x38u:
982 if ((end_byte) == 0x38u) {
983 break;
984 }
985 // +0x38 | 29 75 09 C0 73 73 BF 88 | int64_t | 0x88BF7373C0097529 (-8593022623019338455) | table field `realtime_sent_time` (Long)
986 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
987 [[fallthrough]];
988 case 0x40u:
989 if ((end_byte) == 0x40u) {
990 break;
991 }
992 // +0x40 | 6D 8A AE 04 50 25 9C E9 | int64_t | 0xE99C255004AE8A6D (-1613373540899321235) | table field `monotonic_sent_time` (Long)
993 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
994 [[fallthrough]];
995 case 0x48u:
996 if ((end_byte) == 0x48u) {
997 break;
998 }
999 // +0x48 | 47 00 00 00 | uint32_t | 0x00000047 (71) | table field `remote_queue_index` (UInt)
1000 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1001 // +0x4C | 4C 00 00 00 | uint32_t | 0x0000004C (76) | table field `queue_index` (UInt)
1002 buffer = Push<uint32_t>(buffer, context.queue_index);
1003 [[fallthrough]];
1004 case 0x50u:
1005 if ((end_byte) == 0x50u) {
1006 break;
1007 }
1008 // +0x50 | 72 00 00 00 | uint32_t | 0x00000072 (114) | table field `channel_index` (UInt)
1009 buffer = Push<uint32_t>(buffer, channel_index);
1010 //
1011 // vector (aos.logger.MessageHeader.data):
1012 // +0x54 | 07 00 00 00 | uint32_t | 0x00000007 (7) | length of vector (# items)
1013 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
1014 [[fallthrough]];
1015 case 0x58u:
1016 if ((end_byte) == 0x58u) {
1017 break;
1018 }
1019 [[fallthrough]];
1020 default:
1021 // +0x58 | B1 | uint8_t | 0xB1 (177) | value[0]
1022 // +0x59 | 4A | uint8_t | 0x4A (74) | value[1]
1023 // +0x5A | 50 | uint8_t | 0x50 (80) | value[2]
1024 // +0x5B | 24 | uint8_t | 0x24 (36) | value[3]
1025 // +0x5C | AF | uint8_t | 0xAF (175) | value[4]
1026 // +0x5D | C8 | uint8_t | 0xC8 (200) | value[5]
1027 // +0x5E | D5 | uint8_t | 0xD5 (213) | value[6]
1028 //
1029 // padding:
1030 // +0x5F | 00 | uint8_t[1] | . | padding
1031 // clang-format on
1032
1033 if (start_byte <= 0x58 && end_byte == message_size) {
1034 // The easy one, slap it all down.
1035 buffer = PushBytes(buffer, context.data, context.size);
1036 buffer =
1037 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1038 } else {
1039 const size_t data_start_byte =
1040 start_byte < 0x58 ? 0x0u : (start_byte - 0x58);
1041 const size_t data_end_byte = end_byte - 0x58;
1042 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1043 if (data_start_byte < padded_size) {
1044 buffer = PushBytes(
1045 buffer,
1046 reinterpret_cast<const uint8_t *>(context.data) +
1047 data_start_byte,
1048 std::min(context.size, data_end_byte) - data_start_byte);
1049 if (data_end_byte == padded_size) {
1050 // We can only pad the last 7 bytes, so this only gets written
1051 // if we write the last byte.
1052 buffer = Pad(buffer,
1053 ((context.size + 7) & 0xfffffff8u) - context.size);
1054 }
1055 }
1056 }
1057
1058 break;
1059 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001060
1061 break;
1062
1063 case LogType::kLogRemoteMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -08001064 switch (start_byte) {
1065 case 0x00u:
1066 if ((end_byte) == 0x00u) {
1067 break;
1068 }
1069 // This is the message we need to recreate.
1070 //
1071 // clang-format off
1072 // header:
1073 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
1074 buffer = Push<flatbuffers::uoffset_t>(
1075 buffer, message_size - sizeof(flatbuffers::uoffset_t));
1076 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
1077 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
1078 [[fallthrough]];
1079 case 0x08u:
1080 if ((end_byte) == 0x08u) {
1081 break;
1082 }
1083 //
1084 // padding:
1085 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1086 buffer = Pad(buffer, 6);
1087 //
1088 // vtable (aos.logger.MessageHeader):
1089 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
1090 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e);
1091 [[fallthrough]];
1092 case 0x10u:
1093 if ((end_byte) == 0x10u) {
1094 break;
1095 }
1096 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
1097 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
1098 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
1099 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
1100 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
1101 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
1102 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
1103 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
1104 [[fallthrough]];
1105 case 0x18u:
1106 if ((end_byte) == 0x18u) {
1107 break;
1108 }
1109 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
1110 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
1111 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
1112 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
1113 //
1114 // root_table (aos.logger.MessageHeader):
1115 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
1116 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E);
1117 [[fallthrough]];
1118 case 0x20u:
1119 if ((end_byte) == 0x20u) {
1120 break;
1121 }
1122 // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long)
1123 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
1124 [[fallthrough]];
1125 case 0x28u:
1126 if ((end_byte) == 0x28u) {
1127 break;
1128 }
1129 // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long)
1130 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
1131 [[fallthrough]];
1132 case 0x30u:
1133 if ((end_byte) == 0x30u) {
1134 break;
1135 }
1136 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
1137 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C);
1138 // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt)
1139 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1140 [[fallthrough]];
1141 case 0x38u:
1142 if ((end_byte) == 0x38u) {
1143 break;
1144 }
1145 // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt)
1146 buffer = Push<uint32_t>(buffer, channel_index);
1147 //
1148 // vector (aos.logger.MessageHeader.data):
1149 // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items)
1150 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
1151 [[fallthrough]];
1152 case 0x40u:
1153 if ((end_byte) == 0x40u) {
1154 break;
1155 }
1156 [[fallthrough]];
1157 default:
1158 // +0x40 | 38 | uint8_t | 0x38 (56) | value[0]
1159 // +0x41 | 1A | uint8_t | 0x1A (26) | value[1]
1160 // ...
1161 // +0x58 | 90 | uint8_t | 0x90 (144) | value[24]
1162 // +0x59 | 92 | uint8_t | 0x92 (146) | value[25]
1163 //
1164 // padding:
1165 // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1166 // clang-format on
1167 if (start_byte <= 0x40 && end_byte == message_size) {
1168 // The easy one, slap it all down.
1169 buffer = PushBytes(buffer, context.data, context.size);
1170 buffer =
1171 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1172 } else {
1173 const size_t data_start_byte =
1174 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
1175 const size_t data_end_byte = end_byte - 0x40;
1176 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1177 if (data_start_byte < padded_size) {
1178 buffer = PushBytes(
1179 buffer,
1180 reinterpret_cast<const uint8_t *>(context.data) +
1181 data_start_byte,
1182 std::min(context.size, data_end_byte) - data_start_byte);
1183 if (data_end_byte == padded_size) {
1184 // We can only pad the last 7 bytes, so this only gets written
1185 // if we write the last byte.
1186 buffer = Pad(buffer,
1187 ((context.size + 7) & 0xfffffff8u) - context.size);
1188 }
1189 }
1190 }
1191 break;
1192 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001193 }
1194
Austin Schuh71a40d42023-02-04 21:22:22 -08001195 return end_byte - start_byte;
Austin Schuhfa30c352022-10-16 11:12:02 -07001196}
1197
Austin Schuhcd368422021-11-22 21:23:29 -08001198SpanReader::SpanReader(std::string_view filename, bool quiet)
Alexei Strotscee7b372023-04-21 11:57:54 -07001199 : SpanReader(filename, ResolveDecoder(filename, quiet)) {}
Tyler Chatow2015bc62021-08-04 21:15:09 -07001200
Alexei Strotscee7b372023-04-21 11:57:54 -07001201SpanReader::SpanReader(std::string_view filename,
1202 std::unique_ptr<DataDecoder> decoder)
1203 : filename_(filename), decoder_(std::move(decoder)) {}
Austin Schuh05b70472020-01-01 17:11:17 -08001204
Austin Schuhcf5f6442021-07-06 10:43:28 -07001205absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001206 // Make sure we have enough for the size.
1207 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
1208 if (!ReadBlock()) {
1209 return absl::Span<const uint8_t>();
1210 }
1211 }
1212
1213 // Now make sure we have enough for the message.
1214 const size_t data_size =
1215 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1216 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -08001217 if (data_size == sizeof(flatbuffers::uoffset_t)) {
1218 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
1219 LOG(ERROR) << " Rest of log file is "
1220 << absl::BytesToHexString(std::string_view(
1221 reinterpret_cast<const char *>(data_.data() +
1222 consumed_data_),
1223 data_.size() - consumed_data_));
1224 return absl::Span<const uint8_t>();
1225 }
Austin Schuh05b70472020-01-01 17:11:17 -08001226 while (data_.size() < consumed_data_ + data_size) {
1227 if (!ReadBlock()) {
1228 return absl::Span<const uint8_t>();
1229 }
1230 }
1231
1232 // And return it, consuming the data.
1233 const uint8_t *data_ptr = data_.data() + consumed_data_;
1234
Austin Schuh05b70472020-01-01 17:11:17 -08001235 return absl::Span<const uint8_t>(data_ptr, data_size);
1236}
1237
Austin Schuhcf5f6442021-07-06 10:43:28 -07001238void SpanReader::ConsumeMessage() {
Brian Smarttea913d42021-12-10 15:02:38 -08001239 size_t consumed_size =
Austin Schuhcf5f6442021-07-06 10:43:28 -07001240 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1241 sizeof(flatbuffers::uoffset_t);
Brian Smarttea913d42021-12-10 15:02:38 -08001242 consumed_data_ += consumed_size;
1243 total_consumed_ += consumed_size;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001244}
1245
1246absl::Span<const uint8_t> SpanReader::ReadMessage() {
1247 absl::Span<const uint8_t> result = PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001248 if (!result.empty()) {
Austin Schuhcf5f6442021-07-06 10:43:28 -07001249 ConsumeMessage();
Brian Smarttea913d42021-12-10 15:02:38 -08001250 } else {
1251 is_finished_ = true;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001252 }
1253 return result;
1254}
1255
Austin Schuh05b70472020-01-01 17:11:17 -08001256bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001257 // This is the amount of data we grab at a time. Doing larger chunks minimizes
1258 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -08001259 constexpr size_t kReadSize = 256 * 1024;
1260
1261 // Strip off any unused data at the front.
1262 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001263 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -08001264 consumed_data_ = 0;
1265 }
1266
1267 const size_t starting_size = data_.size();
1268
1269 // This should automatically grow the backing store. It won't shrink if we
1270 // get a small chunk later. This reduces allocations when we want to append
1271 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -07001272 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -08001273
Brian Silvermanf51499a2020-09-21 12:49:08 -07001274 const size_t count =
1275 decoder_->Read(data_.begin() + starting_size, data_.end());
1276 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -08001277 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -08001278 return false;
1279 }
Austin Schuh05b70472020-01-01 17:11:17 -08001280
Brian Smarttea913d42021-12-10 15:02:38 -08001281 total_read_ += count;
1282
Austin Schuh05b70472020-01-01 17:11:17 -08001283 return true;
1284}
1285
Alexei Strotsa3194712023-04-21 23:30:50 -07001286LogReadersPool::LogReadersPool(const LogSource *log_source, size_t pool_size)
1287 : log_source_(log_source), pool_size_(pool_size) {}
1288
1289SpanReader *LogReadersPool::BorrowReader(std::string_view id) {
1290 if (part_readers_.size() > pool_size_) {
1291 // Don't leave arbitrary numbers of readers open, because they each take
1292 // resources, so close a big batch at once periodically.
1293 part_readers_.clear();
1294 }
1295 if (log_source_ == nullptr) {
1296 part_readers_.emplace_back(id, FLAGS_quiet_sorting);
1297 } else {
1298 part_readers_.emplace_back(id, log_source_->GetDecoder(id));
1299 }
1300 return &part_readers_.back();
1301}
1302
Austin Schuhadd6eb32020-11-09 21:24:26 -08001303std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -07001304 SpanReader *span_reader) {
1305 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001306
1307 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001308 if (config_data.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001309 return std::nullopt;
1310 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001311
Austin Schuh5212cad2020-09-09 23:12:09 -07001312 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001313 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -08001314 if (!result.Verify()) {
1315 return std::nullopt;
1316 }
Austin Schuh0e8db662021-07-06 10:43:47 -07001317
Austin Schuhcc2c9a52022-12-12 15:55:13 -08001318 // We only know of busted headers in the versions of the log file header
1319 // *before* the logger_sha1 field was added. At some point before that point,
1320 // the logic to track when a header has been written was rewritten in such a
1321 // way that it can't happen anymore. We've seen some logs where the body
1322 // parses as a header recently, so the simple solution of always looking is
1323 // failing us.
1324 if (FLAGS_workaround_double_headers && !result.message().has_logger_sha1()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001325 while (true) {
1326 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001327 if (maybe_header_data.empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001328 break;
1329 }
1330
1331 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
1332 maybe_header_data);
1333 if (maybe_header.Verify()) {
1334 LOG(WARNING) << "Found duplicate LogFileHeader in "
1335 << span_reader->filename();
1336 ResizeableBuffer header_data_copy;
1337 header_data_copy.resize(maybe_header_data.size());
1338 memcpy(header_data_copy.data(), maybe_header_data.begin(),
1339 header_data_copy.size());
1340 result = SizePrefixedFlatbufferVector<LogFileHeader>(
1341 std::move(header_data_copy));
1342
1343 span_reader->ConsumeMessage();
1344 } else {
1345 break;
1346 }
1347 }
1348 }
Austin Schuhe09beb12020-12-11 20:04:27 -08001349 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001350}
1351
Austin Schuh0e8db662021-07-06 10:43:47 -07001352std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
1353 std::string_view filename) {
1354 SpanReader span_reader(filename);
1355 return ReadHeader(&span_reader);
1356}
1357
Austin Schuhadd6eb32020-11-09 21:24:26 -08001358std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -08001359 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -07001360 SpanReader span_reader(filename);
1361 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
1362 for (size_t i = 0; i < n + 1; ++i) {
1363 data_span = span_reader.ReadMessage();
1364
1365 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001366 if (data_span.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001367 return std::nullopt;
1368 }
Austin Schuh5212cad2020-09-09 23:12:09 -07001369 }
1370
Brian Silverman354697a2020-09-22 21:06:32 -07001371 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001372 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -08001373 if (!result.Verify()) {
1374 return std::nullopt;
1375 }
1376 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -07001377}
1378
Alexei Strots58017402023-05-03 22:05:06 -07001379MessageReader::MessageReader(SpanReader span_reader)
1380 : span_reader_(std::move(span_reader)),
Austin Schuhadd6eb32020-11-09 21:24:26 -08001381 raw_log_file_header_(
1382 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001383 set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
1384 set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
1385
Austin Schuh0e8db662021-07-06 10:43:47 -07001386 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
1387 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -08001388
1389 // Make sure something was read.
Alexei Strots58017402023-05-03 22:05:06 -07001390 CHECK(raw_log_file_header)
1391 << ": Failed to read header from: " << span_reader_.filename();
Austin Schuh05b70472020-01-01 17:11:17 -08001392
Austin Schuh0e8db662021-07-06 10:43:47 -07001393 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -08001394
Austin Schuh5b728b72021-06-16 14:57:15 -07001395 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
1396
Brian Smarttea913d42021-12-10 15:02:38 -08001397 total_verified_before_ = span_reader_.TotalConsumed();
1398
Austin Schuhcde938c2020-02-02 17:30:07 -08001399 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -08001400 FLAGS_max_out_of_order > 0
1401 ? chrono::duration_cast<chrono::nanoseconds>(
1402 chrono::duration<double>(FLAGS_max_out_of_order))
1403 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -08001404
Alexei Strots58017402023-05-03 22:05:06 -07001405 VLOG(1) << "Opened " << span_reader_.filename() << " as node "
Austin Schuhcde938c2020-02-02 17:30:07 -08001406 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -08001407}
1408
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001409std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001410 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001411 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001412 if (is_corrupted()) {
1413 LOG(ERROR) << "Total corrupted volumes: before = "
1414 << total_verified_before_
1415 << " | corrupted = " << total_corrupted_
1416 << " | during = " << total_verified_during_
1417 << " | after = " << total_verified_after_ << std::endl;
1418 }
1419
1420 if (span_reader_.IsIncomplete()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001421 LOG(ERROR) << "Unable to access some messages in " << filename() << " : "
1422 << span_reader_.TotalRead() << " bytes read, "
Brian Smarttea913d42021-12-10 15:02:38 -08001423 << span_reader_.TotalConsumed() << " bytes usable."
1424 << std::endl;
1425 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001426 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -08001427 }
1428
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001429 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
Brian Smarttea913d42021-12-10 15:02:38 -08001430
1431 if (crash_on_corrupt_message_flag_) {
1432 CHECK(msg.Verify()) << "Corrupted message at offset "
Austin Schuh60e77942022-05-16 17:48:24 -07001433 << total_verified_before_ << " found within "
1434 << filename()
Brian Smarttea913d42021-12-10 15:02:38 -08001435 << "; set --nocrash_on_corrupt_message to see summary;"
1436 << " also set --ignore_corrupt_messages to process"
1437 << " anyway";
1438
1439 } else if (!msg.Verify()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001440 LOG(ERROR) << "Corrupted message at offset " << total_verified_before_
Brian Smarttea913d42021-12-10 15:02:38 -08001441 << " from " << filename() << std::endl;
1442
1443 total_corrupted_ += msg_data.size();
1444
1445 while (true) {
1446 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1447
James Kuszmaul9776b392023-01-14 14:08:08 -08001448 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001449 if (!ignore_corrupt_messages_flag_) {
1450 LOG(ERROR) << "Total corrupted volumes: before = "
1451 << total_verified_before_
1452 << " | corrupted = " << total_corrupted_
1453 << " | during = " << total_verified_during_
1454 << " | after = " << total_verified_after_ << std::endl;
1455
1456 if (span_reader_.IsIncomplete()) {
1457 LOG(ERROR) << "Unable to access some messages in " << filename()
1458 << " : " << span_reader_.TotalRead() << " bytes read, "
1459 << span_reader_.TotalConsumed() << " bytes usable."
1460 << std::endl;
1461 }
1462 return nullptr;
1463 }
1464 break;
1465 }
1466
1467 SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
1468
1469 if (!next_msg.Verify()) {
1470 total_corrupted_ += msg_data.size();
1471 total_verified_during_ += total_verified_after_;
1472 total_verified_after_ = 0;
1473
1474 } else {
1475 total_verified_after_ += msg_data.size();
1476 if (ignore_corrupt_messages_flag_) {
1477 msg = next_msg;
1478 break;
1479 }
1480 }
1481 }
1482 }
1483
1484 if (is_corrupted()) {
1485 total_verified_after_ += msg_data.size();
1486 } else {
1487 total_verified_before_ += msg_data.size();
1488 }
Austin Schuh05b70472020-01-01 17:11:17 -08001489
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001490 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -07001491
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001492 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001493
1494 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -08001495
1496 if (VLOG_IS_ON(3)) {
1497 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
1498 } else if (VLOG_IS_ON(2)) {
1499 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
1500 msg_copy.mutable_message()->clear_data();
1501 VLOG(2) << "Read from " << filename() << " data "
1502 << FlatbufferToJson(msg_copy);
1503 }
1504
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001505 return result;
1506}
1507
1508std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
1509 const MessageHeader &message) {
1510 const size_t data_size = message.has_data() ? message.data()->size() : 0;
1511
1512 UnpackedMessageHeader *const unpacked_message =
1513 reinterpret_cast<UnpackedMessageHeader *>(
1514 malloc(sizeof(UnpackedMessageHeader) + data_size +
1515 kChannelDataAlignment - 1));
1516
1517 CHECK(message.has_channel_index());
1518 CHECK(message.has_monotonic_sent_time());
1519
1520 absl::Span<uint8_t> span;
1521 if (data_size > 0) {
1522 span =
1523 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
1524 &unpacked_message->actual_data[0], data_size)),
1525 data_size);
1526 }
1527
Austin Schuh826e6ce2021-11-18 20:33:10 -08001528 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001529 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001530 monotonic_remote_time = aos::monotonic_clock::time_point(
1531 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001532 }
1533 std::optional<realtime_clock::time_point> realtime_remote_time;
1534 if (message.has_realtime_remote_time()) {
1535 realtime_remote_time = realtime_clock::time_point(
1536 chrono::nanoseconds(message.realtime_remote_time()));
1537 }
1538
1539 std::optional<uint32_t> remote_queue_index;
1540 if (message.has_remote_queue_index()) {
1541 remote_queue_index = message.remote_queue_index();
1542 }
1543
James Kuszmaul9776b392023-01-14 14:08:08 -08001544 new (unpacked_message) UnpackedMessageHeader(
1545 message.channel_index(),
1546 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001547 chrono::nanoseconds(message.monotonic_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001548 realtime_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001549 chrono::nanoseconds(message.realtime_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001550 message.queue_index(), monotonic_remote_time, realtime_remote_time,
1551 remote_queue_index,
1552 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001553 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001554 message.has_monotonic_timestamp_time(), span);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001555
1556 if (data_size > 0) {
1557 memcpy(span.data(), message.data()->data(), data_size);
1558 }
1559
1560 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
1561 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -08001562}
1563
Alexei Strots58017402023-05-03 22:05:06 -07001564SpanReader PartsMessageReader::MakeSpanReader(
1565 const LogPartsAccess &log_parts_access, size_t part_number) {
1566 const auto part = log_parts_access.GetPartAt(part_number);
1567 if (log_parts_access.log_source().has_value()) {
1568 return SpanReader(part,
1569 log_parts_access.log_source().value()->GetDecoder(part));
1570 } else {
1571 return SpanReader(part);
1572 }
1573}
1574
1575PartsMessageReader::PartsMessageReader(LogPartsAccess log_parts_access)
1576 : log_parts_access_(std::move(log_parts_access)),
1577 message_reader_(MakeSpanReader(log_parts_access_, 0)) {
1578 if (log_parts_access_.size() >= 2) {
1579 next_message_reader_.emplace(MakeSpanReader(log_parts_access_, 1));
Brian Silvermanfee16972021-09-14 12:06:38 -07001580 }
Austin Schuh48507722021-07-17 17:29:24 -07001581 ComputeBootCounts();
1582}
1583
1584void PartsMessageReader::ComputeBootCounts() {
Alexei Strots58017402023-05-03 22:05:06 -07001585 boot_counts_.assign(configuration::NodesCount(log_parts_access_.config()),
Austin Schuh48507722021-07-17 17:29:24 -07001586 std::nullopt);
1587
Alexei Strots58017402023-05-03 22:05:06 -07001588 const auto boots = log_parts_access_.parts().boots;
1589
Austin Schuh48507722021-07-17 17:29:24 -07001590 // We have 3 vintages of log files with different amounts of information.
1591 if (log_file_header()->has_boot_uuids()) {
1592 // The new hotness with the boots explicitly listed out. We can use the log
1593 // file header to compute the boot count of all relevant nodes.
1594 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
1595 size_t node_index = 0;
1596 for (const flatbuffers::String *boot_uuid :
1597 *log_file_header()->boot_uuids()) {
Alexei Strots58017402023-05-03 22:05:06 -07001598 CHECK(boots);
Austin Schuh48507722021-07-17 17:29:24 -07001599 if (boot_uuid->size() != 0) {
Alexei Strots58017402023-05-03 22:05:06 -07001600 auto it = boots->boot_count_map.find(boot_uuid->str());
1601 if (it != boots->boot_count_map.end()) {
Austin Schuh48507722021-07-17 17:29:24 -07001602 boot_counts_[node_index] = it->second;
1603 }
1604 } else if (parts().boots->boots[node_index].size() == 1u) {
1605 boot_counts_[node_index] = 0;
1606 }
1607 ++node_index;
1608 }
1609 } else {
1610 // Older multi-node logs which are guarenteed to have UUIDs logged, or
1611 // single node log files with boot UUIDs in the header. We only know how to
1612 // order certain boots in certain circumstances.
Alexei Strots58017402023-05-03 22:05:06 -07001613 if (configuration::MultiNode(log_parts_access_.config()) || boots) {
Austin Schuh48507722021-07-17 17:29:24 -07001614 for (size_t node_index = 0; node_index < boot_counts_.size();
1615 ++node_index) {
Alexei Strots58017402023-05-03 22:05:06 -07001616 if (boots->boots[node_index].size() == 1u) {
Austin Schuh48507722021-07-17 17:29:24 -07001617 boot_counts_[node_index] = 0;
1618 }
1619 }
1620 } else {
1621 // Really old single node logs without any UUIDs. They can't reboot.
1622 CHECK_EQ(boot_counts_.size(), 1u);
1623 boot_counts_[0] = 0u;
1624 }
1625 }
1626}
Austin Schuhc41603c2020-10-11 16:17:37 -07001627
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001628std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -07001629 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001630 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -07001631 message_reader_.ReadMessage();
1632 if (message) {
1633 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001634 const monotonic_clock::time_point monotonic_sent_time =
1635 message->monotonic_sent_time;
1636
1637 // TODO(austin): Does this work with startup? Might need to use the
1638 // start time.
1639 // TODO(austin): Does this work with startup when we don't know the
1640 // remote start time too? Look at one of those logs to compare.
Alexei Strots58017402023-05-03 22:05:06 -07001641 if (monotonic_sent_time > log_parts_access_.parts().monotonic_start_time +
1642 max_out_of_order_duration()) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001643 after_start_ = true;
1644 }
1645 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -08001646 CHECK_GE(monotonic_sent_time,
1647 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -08001648 << ": Max out of order of " << max_out_of_order_duration().count()
Alexei Strots58017402023-05-03 22:05:06 -07001649 << "ns exceeded. " << log_parts_access_.parts()
1650 << ", start time is "
1651 << log_parts_access_.parts().monotonic_start_time
1652 << " currently reading " << filename();
Austin Schuhb000de62020-12-03 22:00:40 -08001653 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001654 return message;
1655 }
1656 NextLog();
1657 }
Austin Schuh32f68492020-11-08 21:45:51 -08001658 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001659 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -07001660}
1661
1662void PartsMessageReader::NextLog() {
Alexei Strots58017402023-05-03 22:05:06 -07001663 if (next_part_index_ == log_parts_access_.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001664 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -07001665 done_ = true;
1666 return;
1667 }
Brian Silvermanfee16972021-09-14 12:06:38 -07001668 CHECK(next_message_reader_);
1669 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -07001670 ComputeBootCounts();
Alexei Strots58017402023-05-03 22:05:06 -07001671 if (next_part_index_ + 1 < log_parts_access_.size()) {
1672 next_message_reader_.emplace(
1673 MakeSpanReader(log_parts_access_, next_part_index_ + 1));
Brian Silvermanfee16972021-09-14 12:06:38 -07001674 } else {
1675 next_message_reader_.reset();
1676 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001677 ++next_part_index_;
1678}
1679
Austin Schuh1be0ce42020-11-29 22:43:26 -08001680bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001681 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001682
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001683 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001684 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001685 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001686 return false;
1687 }
1688
1689 if (this->channel_index < m2.channel_index) {
1690 return true;
1691 } else if (this->channel_index > m2.channel_index) {
1692 return false;
1693 }
1694
1695 return this->queue_index < m2.queue_index;
1696}
1697
1698bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001699bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001700 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001701
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001702 return timestamp.time == m2.timestamp.time &&
1703 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001704}
Austin Schuh1be0ce42020-11-29 22:43:26 -08001705
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001706std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
1707 os << "{.channel_index=" << m.channel_index
1708 << ", .monotonic_sent_time=" << m.monotonic_sent_time
1709 << ", .realtime_sent_time=" << m.realtime_sent_time
1710 << ", .queue_index=" << m.queue_index;
1711 if (m.monotonic_remote_time) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001712 os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001713 }
1714 os << ", .realtime_remote_time=";
1715 PrintOptionalOrNull(&os, m.realtime_remote_time);
1716 os << ", .remote_queue_index=";
1717 PrintOptionalOrNull(&os, m.remote_queue_index);
1718 if (m.has_monotonic_timestamp_time) {
1719 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1720 }
Austin Schuh22cf7862022-09-19 19:09:42 -07001721 os << "}";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001722 return os;
1723}
1724
Austin Schuh1be0ce42020-11-29 22:43:26 -08001725std::ostream &operator<<(std::ostream &os, const Message &m) {
1726 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001727 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001728 if (m.data != nullptr) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001729 if (m.data->remote_queue_index.has_value()) {
1730 os << ", .remote_queue_index=" << *m.data->remote_queue_index;
1731 }
1732 if (m.data->monotonic_remote_time.has_value()) {
1733 os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
1734 }
Austin Schuhfb1b3292021-11-16 21:20:15 -08001735 os << ", .data=" << m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -08001736 }
1737 os << "}";
1738 return os;
1739}
1740
1741std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
1742 os << "{.channel_index=" << m.channel_index
1743 << ", .queue_index=" << m.queue_index
1744 << ", .monotonic_event_time=" << m.monotonic_event_time
1745 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -07001746 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001747 os << ", .remote_queue_index=" << m.remote_queue_index;
1748 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001749 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001750 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
1751 }
1752 if (m.realtime_remote_time != realtime_clock::min_time) {
1753 os << ", .realtime_remote_time=" << m.realtime_remote_time;
1754 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001755 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001756 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1757 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001758 if (m.data != nullptr) {
1759 os << ", .data=" << *m.data;
Austin Schuh22cf7862022-09-19 19:09:42 -07001760 } else {
1761 os << ", .data=nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08001762 }
1763 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -08001764 return os;
1765}
1766
Alexei Strots58017402023-05-03 22:05:06 -07001767MessageSorter::MessageSorter(const LogPartsAccess log_parts_access)
1768 : parts_message_reader_(log_parts_access),
Austin Schuh48507722021-07-17 17:29:24 -07001769 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
1770}
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001771
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001772Message *MessageSorter::Front() {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001773 // Queue up data until enough data has been queued that the front message is
1774 // sorted enough to be safe to pop. This may do nothing, so we should make
1775 // sure the nothing path is checked quickly.
1776 if (sorted_until() != monotonic_clock::max_time) {
1777 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -07001778 if (!messages_.empty() &&
1779 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -08001780 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001781 break;
1782 }
1783
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001784 std::shared_ptr<UnpackedMessageHeader> m =
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001785 parts_message_reader_.ReadMessage();
1786 // No data left, sorted forever, work through what is left.
1787 if (!m) {
1788 sorted_until_ = monotonic_clock::max_time;
1789 break;
1790 }
1791
Austin Schuh48507722021-07-17 17:29:24 -07001792 size_t monotonic_timestamp_boot = 0;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001793 if (m->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -07001794 monotonic_timestamp_boot = parts().logger_boot_count;
1795 }
1796 size_t monotonic_remote_boot = 0xffffff;
1797
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001798 if (m->monotonic_remote_time.has_value()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001799 const Node *node =
1800 parts().config->nodes()->Get(source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001801
Austin Schuh48507722021-07-17 17:29:24 -07001802 std::optional<size_t> boot = parts_message_reader_.boot_count(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001803 source_node_index_[m->channel_index]);
Alexei Strots036d84e2023-05-03 16:05:12 -07001804 CHECK(boot) << ": Failed to find boot for node '" << MaybeNodeName(node)
1805 << "', with index " << source_node_index_[m->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -07001806 monotonic_remote_boot = *boot;
1807 }
1808
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001809 messages_.insert(
1810 Message{.channel_index = m->channel_index,
1811 .queue_index = BootQueueIndex{.boot = parts().boot_count,
1812 .index = m->queue_index},
1813 .timestamp = BootTimestamp{.boot = parts().boot_count,
1814 .time = m->monotonic_sent_time},
1815 .monotonic_remote_boot = monotonic_remote_boot,
1816 .monotonic_timestamp_boot = monotonic_timestamp_boot,
1817 .data = std::move(m)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001818
1819 // Now, update sorted_until_ to match the new message.
1820 if (parts_message_reader_.newest_timestamp() >
1821 monotonic_clock::min_time +
1822 parts_message_reader_.max_out_of_order_duration()) {
1823 sorted_until_ = parts_message_reader_.newest_timestamp() -
1824 parts_message_reader_.max_out_of_order_duration();
1825 } else {
1826 sorted_until_ = monotonic_clock::min_time;
1827 }
1828 }
1829 }
1830
1831 // Now that we have enough data queued, return a pointer to the oldest piece
1832 // of data if it exists.
1833 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -08001834 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001835 return nullptr;
1836 }
1837
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001838 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -08001839 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001840 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001841 return &(*messages_.begin());
1842}
1843
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001844void MessageSorter::PopFront() { messages_.erase(messages_.begin()); }
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001845
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001846std::string MessageSorter::DebugString() const {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001847 std::stringstream ss;
1848 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001849 int count = 0;
1850 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001851 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001852 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
1853 ss << m << "\n";
1854 } else if (no_dots) {
1855 ss << "...\n";
1856 no_dots = false;
1857 }
1858 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001859 }
1860 ss << "] <- " << parts_message_reader_.filename();
1861 return ss.str();
1862}
1863
Alexei Strots1f51ac72023-05-15 10:14:54 -07001864PartsMerger::PartsMerger(std::string_view node_name, size_t boot_count,
1865 const LogFilesContainer &log_files) {
1866 const auto parts = log_files.SelectParts(node_name, boot_count);
1867 node_ = configuration::GetNodeIndex(parts.config(), node_name);
Austin Schuh715adc12021-06-29 22:07:39 -07001868
Alexei Strots58017402023-05-03 22:05:06 -07001869 for (LogPartsAccess part : parts) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001870 message_sorters_.emplace_back(std::move(part));
Austin Schuhd2f96102020-12-01 20:27:29 -08001871 }
1872
Austin Schuhd2f96102020-12-01 20:27:29 -08001873 monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh9dc42612021-09-20 20:41:29 -07001874 realtime_start_time_ = realtime_clock::min_time;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001875 for (const MessageSorter &message_sorter : message_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001876 // We want to capture the earliest meaningful start time here. The start
1877 // time defaults to min_time when there's no meaningful value to report, so
1878 // let's ignore those.
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001879 if (message_sorter.monotonic_start_time() != monotonic_clock::min_time) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001880 bool accept = false;
1881 // We want to prioritize start times from the logger node. Really, we
1882 // want to prioritize start times with a valid realtime_clock time. So,
1883 // if we have a start time without a RT clock, prefer a start time with a
1884 // RT clock, even it if is later.
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001885 if (message_sorter.realtime_start_time() != realtime_clock::min_time) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001886 // We've got a good one. See if the current start time has a good RT
1887 // clock, or if we should use this one instead.
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001888 if (message_sorter.monotonic_start_time() < monotonic_start_time_) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001889 accept = true;
1890 } else if (realtime_start_time_ == realtime_clock::min_time) {
1891 // The previous start time doesn't have a good RT time, so it is very
1892 // likely the start time from a remote part file. We just found a
1893 // better start time with a real RT time, so switch to that instead.
1894 accept = true;
1895 }
1896 } else if (realtime_start_time_ == realtime_clock::min_time) {
1897 // We don't have a RT time, so take the oldest.
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001898 if (message_sorter.monotonic_start_time() < monotonic_start_time_) {
Austin Schuh9dc42612021-09-20 20:41:29 -07001899 accept = true;
1900 }
1901 }
1902
1903 if (accept) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001904 monotonic_start_time_ = message_sorter.monotonic_start_time();
1905 realtime_start_time_ = message_sorter.realtime_start_time();
Austin Schuh9dc42612021-09-20 20:41:29 -07001906 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001907 }
1908 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001909
1910 // If there was no meaningful start time reported, just use min_time.
1911 if (monotonic_start_time_ == monotonic_clock::max_time) {
1912 monotonic_start_time_ = monotonic_clock::min_time;
1913 realtime_start_time_ = realtime_clock::min_time;
1914 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001915}
Austin Schuh8f52ed52020-11-30 23:12:39 -08001916
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001917std::vector<const LogParts *> PartsMerger::Parts() const {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001918 std::vector<const LogParts *> p;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001919 p.reserve(message_sorters_.size());
1920 for (const MessageSorter &message_sorter : message_sorters_) {
1921 p.emplace_back(&message_sorter.parts());
Austin Schuh0ca51f32020-12-25 21:51:45 -08001922 }
1923 return p;
1924}
1925
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001926Message *PartsMerger::Front() {
Austin Schuh8f52ed52020-11-30 23:12:39 -08001927 // Return the current Front if we have one, otherwise go compute one.
1928 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -08001929 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001930 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -08001931 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001932 }
1933
1934 // Otherwise, do a simple search for the oldest message, deduplicating any
1935 // duplicates.
1936 Message *oldest = nullptr;
1937 sorted_until_ = monotonic_clock::max_time;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001938 for (MessageSorter &message_sorter : message_sorters_) {
1939 Message *m = message_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -08001940 if (!m) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001941 sorted_until_ = std::min(sorted_until_, message_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001942 continue;
1943 }
1944 if (oldest == nullptr || *m < *oldest) {
1945 oldest = m;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001946 current_ = &message_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001947 } else if (*m == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001948 // Found a duplicate. If there is a choice, we want the one which has
1949 // the timestamp time.
1950 if (!m->data->has_monotonic_timestamp_time) {
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001951 message_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001952 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001953 current_->PopFront();
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001954 current_ = &message_sorter;
Austin Schuh8bf1e632021-01-02 22:41:04 -08001955 oldest = m;
1956 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001957 CHECK_EQ(m->data->monotonic_timestamp_time,
1958 oldest->data->monotonic_timestamp_time);
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001959 message_sorter.PopFront();
Austin Schuh8bf1e632021-01-02 22:41:04 -08001960 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001961 }
1962
1963 // PopFront may change this, so compute it down here.
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001964 sorted_until_ = std::min(sorted_until_, message_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001965 }
1966
Austin Schuhb000de62020-12-03 22:00:40 -08001967 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001968 CHECK_GE(oldest->timestamp.time, last_message_time_);
1969 last_message_time_ = oldest->timestamp.time;
Austin Schuh5dd22842021-11-17 16:09:39 -08001970 monotonic_oldest_time_ =
1971 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08001972 } else {
1973 last_message_time_ = monotonic_clock::max_time;
1974 }
1975
Austin Schuh8f52ed52020-11-30 23:12:39 -08001976 // Return the oldest message found. This will be nullptr if nothing was
1977 // found, indicating there is nothing left.
1978 return oldest;
1979}
1980
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001981void PartsMerger::PopFront() {
Austin Schuh8f52ed52020-11-30 23:12:39 -08001982 CHECK(current_ != nullptr) << "Popping before calling Front()";
1983 current_->PopFront();
1984 current_ = nullptr;
1985}
1986
Alexei Strots1f51ac72023-05-15 10:14:54 -07001987BootMerger::BootMerger(std::string_view node_name,
1988 const LogFilesContainer &log_files) {
1989 size_t number_of_boots = log_files.BootsForNode(node_name);
1990 parts_mergers_.reserve(number_of_boots);
1991 for (size_t i = 0; i < number_of_boots; ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07001992 VLOG(2) << "Boot " << i;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001993 parts_mergers_.emplace_back(
Alexei Strots1f51ac72023-05-15 10:14:54 -07001994 std::make_unique<PartsMerger>(node_name, i, log_files));
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001995 }
1996}
1997
1998Message *BootMerger::Front() {
Alexei Strotsa8dadd12023-04-28 15:19:23 -07001999 Message *result = parts_mergers_[index_]->Front();
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002000
2001 if (result != nullptr) {
2002 return result;
2003 }
2004
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002005 if (index_ + 1u == parts_mergers_.size()) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002006 // At the end of the last node merger, just return.
2007 return nullptr;
2008 } else {
2009 ++index_;
2010 return Front();
2011 }
2012}
2013
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002014void BootMerger::PopFront() { parts_mergers_[index_]->PopFront(); }
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002015
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002016std::vector<const LogParts *> BootMerger::Parts() const {
2017 std::vector<const LogParts *> results;
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002018 for (const std::unique_ptr<PartsMerger> &parts_merger : parts_mergers_) {
2019 std::vector<const LogParts *> node_parts = parts_merger->Parts();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002020
2021 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
2022 std::make_move_iterator(node_parts.end()));
2023 }
2024
2025 return results;
2026}
2027
Alexei Strots1f51ac72023-05-15 10:14:54 -07002028TimestampMapper::TimestampMapper(std::string_view node_name,
2029 const LogFilesContainer &log_files)
2030 : boot_merger_(node_name, log_files),
Austin Schuh79b30942021-01-24 22:32:21 -08002031 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002032 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002033 if (!configuration_) {
2034 configuration_ = part->config;
2035 } else {
2036 CHECK_EQ(configuration_.get(), part->config.get());
2037 }
2038 }
2039 const Configuration *config = configuration_.get();
Alexei Strots1f51ac72023-05-15 10:14:54 -07002040 // Only fill out nodes_data_ if there are nodes. Otherwise, everything is
Austin Schuhd2f96102020-12-01 20:27:29 -08002041 // pretty simple.
2042 if (configuration::MultiNode(config)) {
2043 nodes_data_.resize(config->nodes()->size());
2044 const Node *my_node = config->nodes()->Get(node());
2045 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
2046 const Node *node = config->nodes()->Get(node_index);
2047 NodeData *node_data = &nodes_data_[node_index];
2048 node_data->channels.resize(config->channels()->size());
2049 // We should save the channel if it is delivered to the node represented
2050 // by the NodeData, but not sent by that node. That combo means it is
2051 // forwarded.
2052 size_t channel_index = 0;
2053 node_data->any_delivered = false;
2054 for (const Channel *channel : *config->channels()) {
2055 node_data->channels[channel_index].delivered =
2056 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08002057 configuration::ChannelIsSendableOnNode(channel, my_node) &&
2058 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08002059 node_data->any_delivered = node_data->any_delivered ||
2060 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08002061 if (node_data->channels[channel_index].delivered) {
2062 const Connection *connection =
2063 configuration::ConnectionToNode(channel, node);
2064 node_data->channels[channel_index].time_to_live =
2065 chrono::nanoseconds(connection->time_to_live());
2066 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002067 ++channel_index;
2068 }
2069 }
2070
2071 for (const Channel *channel : *config->channels()) {
2072 source_node_.emplace_back(configuration::GetNodeIndex(
2073 config, channel->source_node()->string_view()));
2074 }
2075 }
2076}
2077
2078void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002079 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08002080 CHECK_NE(timestamp_mapper->node(), node());
2081 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
2082
2083 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002084 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08002085 // we could needlessly save data.
2086 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08002087 VLOG(1) << "Registering on node " << node() << " for peer node "
2088 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08002089 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
2090
2091 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07002092
2093 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08002094 }
2095}
2096
Austin Schuh79b30942021-01-24 22:32:21 -08002097void TimestampMapper::QueueMessage(Message *m) {
Austin Schuh60e77942022-05-16 17:48:24 -07002098 matched_messages_.emplace_back(
2099 TimestampedMessage{.channel_index = m->channel_index,
2100 .queue_index = m->queue_index,
2101 .monotonic_event_time = m->timestamp,
2102 .realtime_event_time = m->data->realtime_sent_time,
2103 .remote_queue_index = BootQueueIndex::Invalid(),
2104 .monotonic_remote_time = BootTimestamp::min_time(),
2105 .realtime_remote_time = realtime_clock::min_time,
2106 .monotonic_timestamp_time = BootTimestamp::min_time(),
2107 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08002108}
2109
2110TimestampedMessage *TimestampMapper::Front() {
2111 // No need to fetch anything new. A previous message still exists.
2112 switch (first_message_) {
2113 case FirstMessage::kNeedsUpdate:
2114 break;
2115 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08002116 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002117 case FirstMessage::kNullptr:
2118 return nullptr;
2119 }
2120
Austin Schuh79b30942021-01-24 22:32:21 -08002121 if (matched_messages_.empty()) {
2122 if (!QueueMatched()) {
2123 first_message_ = FirstMessage::kNullptr;
2124 return nullptr;
2125 }
2126 }
2127 first_message_ = FirstMessage::kInMessage;
2128 return &matched_messages_.front();
2129}
2130
2131bool TimestampMapper::QueueMatched() {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002132 MatchResult result = MatchResult::kEndOfFile;
2133 do {
2134 result = MaybeQueueMatched();
2135 } while (result == MatchResult::kSkipped);
2136 return result == MatchResult::kQueued;
2137}
2138
2139bool TimestampMapper::CheckReplayChannelsAndMaybePop(
2140 const TimestampedMessage & /*message*/) {
2141 if (replay_channels_callback_ &&
2142 !replay_channels_callback_(matched_messages_.back())) {
2143 matched_messages_.pop_back();
2144 return true;
2145 }
2146 return false;
2147}
2148
2149TimestampMapper::MatchResult TimestampMapper::MaybeQueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08002150 if (nodes_data_.empty()) {
2151 // Simple path. We are single node, so there are no timestamps to match!
2152 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002153 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002154 if (!m) {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002155 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002156 }
Austin Schuh79b30942021-01-24 22:32:21 -08002157 // Enqueue this message into matched_messages_ so we have a place to
2158 // associate remote timestamps, and return it.
2159 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08002160
Austin Schuh79b30942021-01-24 22:32:21 -08002161 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2162 last_message_time_ = matched_messages_.back().monotonic_event_time;
2163
Alexei Strotsa8dadd12023-04-28 15:19:23 -07002164 // We are thin wrapper around parts_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002165 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08002166 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002167 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2168 return MatchResult::kSkipped;
2169 }
2170 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002171 }
2172
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002173 // We need to only add messages to the list so they get processed for
2174 // messages which are delivered. Reuse the flow below which uses messages_
2175 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08002176 if (messages_.empty()) {
2177 if (!Queue()) {
2178 // Found nothing to add, we are out of data!
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002179 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002180 }
2181
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002182 // Now that it has been added (and cannibalized), forget about it
2183 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002184 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002185 }
2186
2187 Message *m = &(messages_.front());
2188
2189 if (source_node_[m->channel_index] == node()) {
2190 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08002191 QueueMessage(m);
2192 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2193 last_message_time_ = matched_messages_.back().monotonic_event_time;
2194 messages_.pop_front();
2195 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002196 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2197 return MatchResult::kSkipped;
2198 }
2199 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002200 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002201 // Got a timestamp, find the matching remote data, match it, and return
2202 // it.
Austin Schuhd2f96102020-12-01 20:27:29 -08002203 Message data = MatchingMessageFor(*m);
2204
2205 // Return the data from the remote. The local message only has timestamp
2206 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08002207 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08002208 .channel_index = m->channel_index,
2209 .queue_index = m->queue_index,
2210 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002211 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07002212 .remote_queue_index =
2213 BootQueueIndex{.boot = m->monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002214 .index = m->data->remote_queue_index.value()},
2215 .monotonic_remote_time = {m->monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002216 m->data->monotonic_remote_time.value()},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002217 .realtime_remote_time = m->data->realtime_remote_time.value(),
2218 .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
2219 m->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08002220 .data = std::move(data.data)});
2221 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2222 last_message_time_ = matched_messages_.back().monotonic_event_time;
2223 // Since messages_ holds the data, drop it.
2224 messages_.pop_front();
2225 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002226 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2227 return MatchResult::kSkipped;
2228 }
2229 return MatchResult::kQueued;
Austin Schuh79b30942021-01-24 22:32:21 -08002230 }
2231}
2232
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002233void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08002234 while (last_message_time_ <= queue_time) {
2235 if (!QueueMatched()) {
2236 return;
2237 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002238 }
2239}
2240
Austin Schuhe639ea12021-01-25 13:00:22 -08002241void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002242 // Note: queueing for time doesn't really work well across boots. So we
2243 // just assume that if you are using this, you only care about the current
2244 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002245 //
2246 // TODO(austin): Is that the right concept?
2247 //
Austin Schuhe639ea12021-01-25 13:00:22 -08002248 // Make sure we have something queued first. This makes the end time
2249 // calculation simpler, and is typically what folks want regardless.
2250 if (matched_messages_.empty()) {
2251 if (!QueueMatched()) {
2252 return;
2253 }
2254 }
2255
2256 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002257 std::max(monotonic_start_time(
2258 matched_messages_.front().monotonic_event_time.boot),
2259 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08002260 time_estimation_buffer;
2261
2262 // Place sorted messages on the list until we have
2263 // --time_estimation_buffer_seconds seconds queued up (but queue at least
2264 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002265 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08002266 if (!QueueMatched()) {
2267 return;
2268 }
2269 }
2270}
2271
Austin Schuhd2f96102020-12-01 20:27:29 -08002272void TimestampMapper::PopFront() {
2273 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08002274 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002275 first_message_ = FirstMessage::kNeedsUpdate;
2276
Austin Schuh79b30942021-01-24 22:32:21 -08002277 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002278}
2279
2280Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002281 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002282 CHECK_NOTNULL(message.data);
2283 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07002284 const BootQueueIndex remote_queue_index =
2285 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002286 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08002287
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002288 CHECK(message.data->monotonic_remote_time.has_value());
2289 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08002290
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002291 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07002292 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002293 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002294 const realtime_clock::time_point realtime_remote_time =
2295 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002296
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002297 TimestampMapper *peer =
2298 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08002299
2300 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002301 // asked to pull a timestamp from a peer which doesn't exist, return an
2302 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08002303 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002304 // TODO(austin): Make sure the tests hit all these paths with a boot count
2305 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002306 return Message{.channel_index = message.channel_index,
2307 .queue_index = remote_queue_index,
2308 .timestamp = monotonic_remote_time,
2309 .monotonic_remote_boot = 0xffffff,
2310 .monotonic_timestamp_boot = 0xffffff,
2311 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08002312 }
2313
2314 // The queue which will have the matching data, if available.
2315 std::deque<Message> *data_queue =
2316 &peer->nodes_data_[node()].channels[message.channel_index].messages;
2317
Austin Schuh79b30942021-01-24 22:32:21 -08002318 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08002319
2320 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002321 return Message{.channel_index = message.channel_index,
2322 .queue_index = remote_queue_index,
2323 .timestamp = monotonic_remote_time,
2324 .monotonic_remote_boot = 0xffffff,
2325 .monotonic_timestamp_boot = 0xffffff,
2326 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002327 }
2328
Austin Schuhd2f96102020-12-01 20:27:29 -08002329 if (remote_queue_index < data_queue->front().queue_index ||
2330 remote_queue_index > data_queue->back().queue_index) {
Austin Schuh60e77942022-05-16 17:48:24 -07002331 return Message{.channel_index = message.channel_index,
2332 .queue_index = remote_queue_index,
2333 .timestamp = monotonic_remote_time,
2334 .monotonic_remote_boot = 0xffffff,
2335 .monotonic_timestamp_boot = 0xffffff,
2336 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002337 }
2338
Austin Schuh993ccb52020-12-12 15:59:32 -08002339 // The algorithm below is constant time with some assumptions. We need there
2340 // to be no missing messages in the data stream. This also assumes a queue
2341 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07002342 if (data_queue->back().queue_index.boot ==
2343 data_queue->front().queue_index.boot &&
2344 (data_queue->back().queue_index.index -
2345 data_queue->front().queue_index.index + 1u ==
2346 data_queue->size())) {
2347 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08002348 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07002349 //
2350 // TODO(austin): Move if not reliable.
2351 Message result = (*data_queue)[remote_queue_index.index -
2352 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08002353
2354 CHECK_EQ(result.timestamp, monotonic_remote_time)
2355 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh6a7358f2021-11-18 22:40:40 -08002356 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08002357 << ": Queue index matches, but timestamp doesn't. Please investigate!";
2358 // Now drop the data off the front. We have deduplicated timestamps, so we
2359 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07002360 data_queue->erase(
2361 data_queue->begin(),
2362 data_queue->begin() +
2363 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08002364 return result;
2365 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07002366 // TODO(austin): Binary search.
2367 auto it = std::find_if(
2368 data_queue->begin(), data_queue->end(),
2369 [remote_queue_index,
2370 remote_boot = monotonic_remote_time.boot](const Message &m) {
2371 return m.queue_index == remote_queue_index &&
2372 m.timestamp.boot == remote_boot;
2373 });
Austin Schuh993ccb52020-12-12 15:59:32 -08002374 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002375 return Message{.channel_index = message.channel_index,
2376 .queue_index = remote_queue_index,
2377 .timestamp = monotonic_remote_time,
2378 .monotonic_remote_boot = 0xffffff,
2379 .monotonic_timestamp_boot = 0xffffff,
2380 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08002381 }
2382
2383 Message result = std::move(*it);
2384
2385 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002386 << ": Queue index matches, but timestamp doesn't. Please "
2387 "investigate!";
2388 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
2389 << ": Queue index matches, but timestamp doesn't. Please "
2390 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08002391
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08002392 // Erase everything up to this message. We want to keep 1 message in the
2393 // queue so we can handle reliable messages forwarded across boots.
2394 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08002395
2396 return result;
2397 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002398}
2399
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002400void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002401 if (queued_until_ > t) {
2402 return;
2403 }
2404 while (true) {
2405 if (!messages_.empty() && messages_.back().timestamp > t) {
2406 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
2407 return;
2408 }
2409
2410 if (!Queue()) {
2411 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002412 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08002413 return;
2414 }
2415
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002416 // Now that it has been added (and cannibalized), forget about it
2417 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002418 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002419 }
2420}
2421
2422bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002423 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002424 if (m == nullptr) {
2425 return false;
2426 }
2427 for (NodeData &node_data : nodes_data_) {
2428 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07002429 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08002430 if (node_data.channels[m->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08002431 // If we have data but no timestamps (logs where the timestamps didn't get
2432 // logged are classic), we can grow this indefinitely. We don't need to
2433 // keep anything that is older than the last message returned.
2434
2435 // We have the time on the source node.
2436 // We care to wait until we have the time on the destination node.
2437 std::deque<Message> &messages =
2438 node_data.channels[m->channel_index].messages;
2439 // Max delay over the network is the TTL, so let's take the queue time and
2440 // add TTL to it. Don't forget any messages which are reliable until
2441 // someone can come up with a good reason to forget those too.
2442 if (node_data.channels[m->channel_index].time_to_live >
2443 chrono::nanoseconds(0)) {
2444 // We need to make *some* assumptions about network delay for this to
2445 // work. We want to only look at the RX side. This means we need to
2446 // track the last time a message was popped from any channel from the
2447 // node sending this message, and compare that to the max time we expect
2448 // that a message will take to be delivered across the network. This
2449 // assumes that messages are popped in time order as a proxy for
2450 // measuring the distributed time at this layer.
2451 //
2452 // Leave at least 1 message in here so we can handle reboots and
2453 // messages getting sent twice.
2454 while (messages.size() > 1u &&
2455 messages.begin()->timestamp +
2456 node_data.channels[m->channel_index].time_to_live +
2457 chrono::duration_cast<chrono::nanoseconds>(
2458 chrono::duration<double>(FLAGS_max_network_delay)) <
2459 last_popped_message_time_) {
2460 messages.pop_front();
2461 }
2462 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002463 node_data.channels[m->channel_index].messages.emplace_back(*m);
2464 }
2465 }
2466
2467 messages_.emplace_back(std::move(*m));
2468 return true;
2469}
2470
2471std::string TimestampMapper::DebugString() const {
2472 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07002473 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002474 for (const Message &message : messages_) {
2475 ss << " " << message << "\n";
2476 }
2477 ss << "] queued_until " << queued_until_;
2478 for (const NodeData &ns : nodes_data_) {
2479 if (ns.peer == nullptr) continue;
2480 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
2481 size_t channel_index = 0;
2482 for (const NodeData::ChannelData &channel_data :
2483 ns.peer->nodes_data_[node()].channels) {
2484 if (channel_data.messages.empty()) {
2485 continue;
2486 }
Austin Schuhb000de62020-12-03 22:00:40 -08002487
Austin Schuhd2f96102020-12-01 20:27:29 -08002488 ss << " channel " << channel_index << " [\n";
2489 for (const Message &m : channel_data.messages) {
2490 ss << " " << m << "\n";
2491 }
2492 ss << " ]\n";
2493 ++channel_index;
2494 }
2495 ss << "] queued_until " << ns.peer->queued_until_;
2496 }
2497 return ss.str();
2498}
2499
Brian Silvermanf51499a2020-09-21 12:49:08 -07002500} // namespace aos::logger