blob: 3c8c50d662615de0c751f51d2160b8dc5282a1b7 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Alexei Strots01395492023-03-20 13:59:56 -070010#include <filesystem>
Austin Schuha36c8902019-12-30 18:07:15 -080011
Austin Schuhe4fca832020-03-07 16:58:53 -080012#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070013#include "flatbuffers/flatbuffers.h"
14#include "gflags/gflags.h"
15#include "glog/logging.h"
16
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070018#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080019#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080020#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080021
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070022#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070023#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070024#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070025#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070026#else
27#define ENABLE_LZMA 0
28#endif
29
30#if ENABLE_LZMA
31#include "aos/events/logging/lzma_encoder.h"
32#endif
Austin Schuh86110712022-09-16 15:40:54 -070033#if ENABLE_S3
34#include "aos/events/logging/s3_fetcher.h"
35#endif
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070036
Austin Schuh48d10d62022-10-16 22:19:23 -070037DEFINE_int32(flush_size, 128 * 1024,
Austin Schuha36c8902019-12-30 18:07:15 -080038 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070039DEFINE_double(
40 flush_period, 5.0,
41 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080042
Austin Schuha040c3f2021-02-13 16:09:07 -080043DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080044 max_network_delay, 1.0,
45 "Max time to assume a message takes to cross the network before we are "
46 "willing to drop it from our buffers and assume it didn't make it. "
47 "Increasing this number can increase memory usage depending on the packet "
48 "loss of your network or if the timestamps aren't logged for a message.");
49
50DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080051 max_out_of_order, -1,
52 "If set, this overrides the max out of order duration for a log file.");
53
Austin Schuh0e8db662021-07-06 10:43:47 -070054DEFINE_bool(workaround_double_headers, true,
55 "Some old log files have two headers at the beginning. Use the "
56 "last header as the actual header.");
57
Brian Smarttea913d42021-12-10 15:02:38 -080058DEFINE_bool(crash_on_corrupt_message, true,
59 "When true, MessageReader will crash the first time a message "
60 "with corrupted format is found. When false, the crash will be "
61 "suppressed, and any remaining readable messages will be "
62 "evaluated to present verified vs corrupted stats.");
63
64DEFINE_bool(ignore_corrupt_messages, false,
65 "When true, and crash_on_corrupt_message is false, then any "
66 "corrupt message found by MessageReader be silently ignored, "
67 "providing access to all uncorrupted messages in a logfile.");
68
Brian Silvermanf51499a2020-09-21 12:49:08 -070069namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070070namespace {
Austin Schuh05b70472020-01-01 17:11:17 -080071namespace chrono = std::chrono;
72
Alexei Strotscee7b372023-04-21 11:57:54 -070073std::unique_ptr<DataDecoder> ResolveDecoder(std::string_view filename,
74 bool quiet) {
75 static constexpr std::string_view kS3 = "s3:";
76
77 std::unique_ptr<DataDecoder> decoder;
78
79 if (filename.substr(0, kS3.size()) == kS3) {
80#if ENABLE_S3
81 decoder = std::make_unique<S3Fetcher>(filename);
82#else
83 LOG(FATAL) << "Reading files from S3 not supported on this platform";
84#endif
85 } else {
86 decoder = std::make_unique<DummyDecoder>(filename);
87 }
88
89 static constexpr std::string_view kXz = ".xz";
90 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
91 if (filename.substr(filename.size() - kXz.size()) == kXz) {
92#if ENABLE_LZMA
93 decoder = std::make_unique<ThreadedLzmaDecoder>(std::move(decoder), quiet);
94#else
95 (void)quiet;
96 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
97#endif
98 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
99 decoder = std::make_unique<SnappyDecoder>(std::move(decoder));
100 }
101 return decoder;
102}
103
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700104template <typename T>
105void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
106 if (t.has_value()) {
107 *os << *t;
108 } else {
109 *os << "null";
110 }
111}
112} // namespace
113
Alexei Strotsbc082d82023-05-03 08:43:42 -0700114DetachedBufferWriter::DetachedBufferWriter(std::unique_ptr<LogSink> log_sink,
115 std::unique_ptr<DataEncoder> encoder)
116 : log_sink_(std::move(log_sink)), encoder_(std::move(encoder)) {
117 CHECK(log_sink_);
118 ran_out_of_space_ = log_sink_->OpenForWrite() == WriteCode::kOutOfSpace;
Alexei Strots01395492023-03-20 13:59:56 -0700119 if (ran_out_of_space_) {
120 LOG(WARNING) << "And we are out of space";
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800121 }
122}
123
Austin Schuha36c8902019-12-30 18:07:15 -0800124DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700125 Close();
126 if (ran_out_of_space_) {
127 CHECK(acknowledge_ran_out_of_space_)
128 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -0700129 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700130}
131
Brian Silvermand90905f2020-09-23 14:42:56 -0700132DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700133 *this = std::move(other);
134}
135
Brian Silverman87ac0402020-09-17 14:47:01 -0700136// When other is destroyed "soon" (which it should be because we're getting an
137// rvalue reference to it), it will flush etc all the data we have queued up
138// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700139DetachedBufferWriter &DetachedBufferWriter::operator=(
140 DetachedBufferWriter &&other) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700141 std::swap(log_sink_, other.log_sink_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700142 std::swap(encoder_, other.encoder_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700143 std::swap(ran_out_of_space_, other.ran_out_of_space_);
144 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800145 std::swap(last_flush_time_, other.last_flush_time_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700146 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800147}
148
Austin Schuh8bdfc492023-02-11 12:53:13 -0800149void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *copier,
Austin Schuh7ef11a42023-02-04 17:15:12 -0800150 aos::monotonic_clock::time_point now) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700151 if (ran_out_of_space_) {
152 // We don't want any later data to be written after space becomes
153 // available, so refuse to write anything more once we've dropped data
154 // because we ran out of space.
Austin Schuh48d10d62022-10-16 22:19:23 -0700155 return;
Austin Schuha36c8902019-12-30 18:07:15 -0800156 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700157
Austin Schuh8bdfc492023-02-11 12:53:13 -0800158 const size_t message_size = copier->size();
159 size_t overall_bytes_written = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -0700160
Austin Schuh8bdfc492023-02-11 12:53:13 -0800161 // Keep writing chunks until we've written it all. If we end up with a
162 // partial write, this means we need to flush to disk.
163 do {
Alexei Strots01395492023-03-20 13:59:56 -0700164 const size_t bytes_written =
165 encoder_->Encode(copier, overall_bytes_written);
Austin Schuh8bdfc492023-02-11 12:53:13 -0800166 CHECK(bytes_written != 0);
167
168 overall_bytes_written += bytes_written;
169 if (overall_bytes_written < message_size) {
170 VLOG(1) << "Flushing because of a partial write, tried to write "
171 << message_size << " wrote " << overall_bytes_written;
172 Flush(now);
173 }
174 } while (overall_bytes_written < message_size);
175
Austin Schuhbd06ae42021-03-31 22:48:21 -0700176 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800177}
178
Brian Silverman0465fcf2020-09-24 00:29:18 -0700179void DetachedBufferWriter::Close() {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700180 if (!log_sink_->is_open()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700181 return;
182 }
183 encoder_->Finish();
184 while (encoder_->queue_size() > 0) {
Austin Schuh8bdfc492023-02-11 12:53:13 -0800185 Flush(monotonic_clock::max_time);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700186 }
Austin Schuhb2461652023-05-01 08:30:56 -0700187 encoder_.reset();
Alexei Strotsbc082d82023-05-03 08:43:42 -0700188 ran_out_of_space_ = log_sink_->Close() == WriteCode::kOutOfSpace;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700189}
190
Austin Schuh8bdfc492023-02-11 12:53:13 -0800191void DetachedBufferWriter::Flush(aos::monotonic_clock::time_point now) {
192 last_flush_time_ = now;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700193 if (ran_out_of_space_) {
194 // We don't want any later data to be written after space becomes available,
195 // so refuse to write anything more once we've dropped data because we ran
196 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700197 if (encoder_) {
198 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
199 encoder_->Clear(encoder_->queue().size());
200 } else {
201 VLOG(1) << "No queue to ignore";
202 }
203 return;
204 }
205
206 const auto queue = encoder_->queue();
207 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700208 return;
209 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700210
Alexei Strotsbc082d82023-05-03 08:43:42 -0700211 const WriteResult result = log_sink_->Write(queue);
Alexei Strots01395492023-03-20 13:59:56 -0700212 encoder_->Clear(result.messages_written);
213 ran_out_of_space_ = result.code == WriteCode::kOutOfSpace;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700214}
215
Austin Schuhbd06ae42021-03-31 22:48:21 -0700216void DetachedBufferWriter::FlushAtThreshold(
217 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700218 if (ran_out_of_space_) {
219 // We don't want any later data to be written after space becomes available,
220 // so refuse to write anything more once we've dropped data because we ran
221 // out of space.
222 if (encoder_) {
223 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
224 encoder_->Clear(encoder_->queue().size());
225 } else {
226 VLOG(1) << "No queue to ignore";
227 }
228 return;
229 }
230
Austin Schuhbd06ae42021-03-31 22:48:21 -0700231 // We don't want to flush the first time through. Otherwise we will flush as
232 // the log file header might be compressing, defeating any parallelism and
233 // queueing there.
234 if (last_flush_time_ == aos::monotonic_clock::min_time) {
235 last_flush_time_ = now;
236 }
237
Brian Silvermanf51499a2020-09-21 12:49:08 -0700238 // Flush if we are at the max number of iovs per writev, because there's no
239 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700240 // data queued up or if it has been long enough.
Austin Schuh8bdfc492023-02-11 12:53:13 -0800241 while (encoder_->space() == 0 ||
242 encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700243 encoder_->queue_size() >= IOV_MAX ||
Austin Schuh3ebaf782023-04-07 16:03:28 -0700244 (now > last_flush_time_ +
245 chrono::duration_cast<chrono::nanoseconds>(
246 chrono::duration<double>(FLAGS_flush_period)) &&
247 encoder_->queued_bytes() != 0)) {
248 VLOG(1) << "Chose to flush at " << now << ", last " << last_flush_time_
249 << " queued bytes " << encoder_->queued_bytes();
Austin Schuh8bdfc492023-02-11 12:53:13 -0800250 Flush(now);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700251 }
Austin Schuha36c8902019-12-30 18:07:15 -0800252}
253
Austin Schuhf2d0e682022-10-16 14:20:58 -0700254// Do the magic dance to convert the endianness of the data and append it to the
255// buffer.
256namespace {
257
258// TODO(austin): Look at the generated code to see if building the header is
259// efficient or not.
260template <typename T>
261uint8_t *Push(uint8_t *buffer, const T data) {
262 const T endian_data = flatbuffers::EndianScalar<T>(data);
263 std::memcpy(buffer, &endian_data, sizeof(T));
264 return buffer + sizeof(T);
265}
266
267uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
268 std::memcpy(buffer, data, size);
269 return buffer + size;
270}
271
272uint8_t *Pad(uint8_t *buffer, size_t padding) {
273 std::memset(buffer, 0, padding);
274 return buffer + padding;
275}
276} // namespace
277
278flatbuffers::Offset<MessageHeader> PackRemoteMessage(
279 flatbuffers::FlatBufferBuilder *fbb,
280 const message_bridge::RemoteMessage *msg, int channel_index,
281 const aos::monotonic_clock::time_point monotonic_timestamp_time) {
282 logger::MessageHeader::Builder message_header_builder(*fbb);
283 // Note: this must match the same order as MessageBridgeServer and
284 // PackMessage. We want identical headers to have identical
285 // on-the-wire formats to make comparing them easier.
286
287 message_header_builder.add_channel_index(channel_index);
288
289 message_header_builder.add_queue_index(msg->queue_index());
290 message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
291 message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
292
293 message_header_builder.add_monotonic_remote_time(
294 msg->monotonic_remote_time());
295 message_header_builder.add_realtime_remote_time(msg->realtime_remote_time());
296 message_header_builder.add_remote_queue_index(msg->remote_queue_index());
297
298 message_header_builder.add_monotonic_timestamp_time(
299 monotonic_timestamp_time.time_since_epoch().count());
300
301 return message_header_builder.Finish();
302}
303
304size_t PackRemoteMessageInline(
305 uint8_t *buffer, const message_bridge::RemoteMessage *msg,
306 int channel_index,
Austin Schuh71a40d42023-02-04 21:22:22 -0800307 const aos::monotonic_clock::time_point monotonic_timestamp_time,
308 size_t start_byte, size_t end_byte) {
Austin Schuhf2d0e682022-10-16 14:20:58 -0700309 const flatbuffers::uoffset_t message_size = PackRemoteMessageSize();
Austin Schuh71a40d42023-02-04 21:22:22 -0800310 DCHECK_EQ((start_byte % 8u), 0u);
311 DCHECK_EQ((end_byte % 8u), 0u);
312 DCHECK_LE(start_byte, end_byte);
313 DCHECK_LE(end_byte, message_size);
Austin Schuhf2d0e682022-10-16 14:20:58 -0700314
Austin Schuh71a40d42023-02-04 21:22:22 -0800315 switch (start_byte) {
316 case 0x00u:
317 if ((end_byte) == 0x00u) {
318 break;
319 }
320 // clang-format off
321 // header:
322 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
323 buffer = Push<flatbuffers::uoffset_t>(
324 buffer, message_size - sizeof(flatbuffers::uoffset_t));
325 // +0x04 | 20 00 00 00 | UOffset32 | 0x00000020 (32) Loc: +0x24 | offset to root table `aos.logger.MessageHeader`
326 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x20);
327 [[fallthrough]];
328 case 0x08u:
329 if ((end_byte) == 0x08u) {
330 break;
331 }
332 //
333 // padding:
334 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
335 buffer = Pad(buffer, 6);
336 //
337 // vtable (aos.logger.MessageHeader):
338 // +0x0E | 16 00 | uint16_t | 0x0016 (22) | size of this vtable
339 buffer = Push<flatbuffers::voffset_t>(buffer, 0x16);
340 [[fallthrough]];
341 case 0x10u:
342 if ((end_byte) == 0x10u) {
343 break;
344 }
345 // +0x10 | 3C 00 | uint16_t | 0x003C (60) | size of referring table
346 buffer = Push<flatbuffers::voffset_t>(buffer, 0x3c);
347 // +0x12 | 38 00 | VOffset16 | 0x0038 (56) | offset to field `channel_index` (id: 0)
348 buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
349 // +0x14 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `monotonic_sent_time` (id: 1)
350 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
351 // +0x16 | 24 00 | VOffset16 | 0x0024 (36) | offset to field `realtime_sent_time` (id: 2)
352 buffer = Push<flatbuffers::voffset_t>(buffer, 0x24);
353 [[fallthrough]];
354 case 0x18u:
355 if ((end_byte) == 0x18u) {
356 break;
357 }
358 // +0x18 | 34 00 | VOffset16 | 0x0034 (52) | offset to field `queue_index` (id: 3)
359 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
360 // +0x1A | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
361 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
362 // +0x1C | 1C 00 | VOffset16 | 0x001C (28) | offset to field `monotonic_remote_time` (id: 5)
363 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
364 // +0x1E | 14 00 | VOffset16 | 0x0014 (20) | offset to field `realtime_remote_time` (id: 6)
365 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
366 [[fallthrough]];
367 case 0x20u:
368 if ((end_byte) == 0x20u) {
369 break;
370 }
371 // +0x20 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `remote_queue_index` (id: 7)
372 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
373 // +0x22 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `monotonic_timestamp_time` (id: 8)
374 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
375 //
376 // root_table (aos.logger.MessageHeader):
377 // +0x24 | 16 00 00 00 | SOffset32 | 0x00000016 (22) Loc: +0x0E | offset to vtable
378 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x16);
379 [[fallthrough]];
380 case 0x28u:
381 if ((end_byte) == 0x28u) {
382 break;
383 }
384 // +0x28 | F6 0B D8 11 A4 A8 B1 71 | int64_t | 0x71B1A8A411D80BF6 (8192514619791117302) | table field `monotonic_timestamp_time` (Long)
385 buffer = Push<int64_t>(buffer,
386 monotonic_timestamp_time.time_since_epoch().count());
387 [[fallthrough]];
388 case 0x30u:
389 if ((end_byte) == 0x30u) {
390 break;
391 }
392 // +0x30 | 00 00 00 00 | uint8_t[4] | .... | padding
393 // TODO(austin): Can we re-arrange the order to ditch the padding?
394 // (Answer is yes, but what is the impact elsewhere? It will change the
395 // binary format)
396 buffer = Pad(buffer, 4);
397 // +0x34 | 75 00 00 00 | uint32_t | 0x00000075 (117) | table field `remote_queue_index` (UInt)
398 buffer = Push<uint32_t>(buffer, msg->remote_queue_index());
399 [[fallthrough]];
400 case 0x38u:
401 if ((end_byte) == 0x38u) {
402 break;
403 }
404 // +0x38 | AA B0 43 0A 35 BE FA D2 | int64_t | 0xD2FABE350A43B0AA (-3244071446552268630) | table field `realtime_remote_time` (Long)
405 buffer = Push<int64_t>(buffer, msg->realtime_remote_time());
406 [[fallthrough]];
407 case 0x40u:
408 if ((end_byte) == 0x40u) {
409 break;
410 }
411 // +0x40 | D5 40 30 F3 C1 A7 26 1D | int64_t | 0x1D26A7C1F33040D5 (2100550727665467605) | table field `monotonic_remote_time` (Long)
412 buffer = Push<int64_t>(buffer, msg->monotonic_remote_time());
413 [[fallthrough]];
414 case 0x48u:
415 if ((end_byte) == 0x48u) {
416 break;
417 }
418 // +0x48 | 5B 25 32 A1 4A E8 46 CA | int64_t | 0xCA46E84AA132255B (-3871151422448720549) | table field `realtime_sent_time` (Long)
419 buffer = Push<int64_t>(buffer, msg->realtime_sent_time());
420 [[fallthrough]];
421 case 0x50u:
422 if ((end_byte) == 0x50u) {
423 break;
424 }
425 // +0x50 | 49 7D 45 1F 8C 36 6B A3 | int64_t | 0xA36B368C1F457D49 (-6671178447571288759) | table field `monotonic_sent_time` (Long)
426 buffer = Push<int64_t>(buffer, msg->monotonic_sent_time());
427 [[fallthrough]];
428 case 0x58u:
429 if ((end_byte) == 0x58u) {
430 break;
431 }
432 // +0x58 | 33 00 00 00 | uint32_t | 0x00000033 (51) | table field `queue_index` (UInt)
433 buffer = Push<uint32_t>(buffer, msg->queue_index());
434 // +0x5C | 76 00 00 00 | uint32_t | 0x00000076 (118) | table field `channel_index` (UInt)
435 buffer = Push<uint32_t>(buffer, channel_index);
436 // clang-format on
437 [[fallthrough]];
438 case 0x60u:
439 if ((end_byte) == 0x60u) {
440 break;
441 }
442 }
Austin Schuhf2d0e682022-10-16 14:20:58 -0700443
Austin Schuh71a40d42023-02-04 21:22:22 -0800444 return end_byte - start_byte;
Austin Schuhf2d0e682022-10-16 14:20:58 -0700445}
446
Austin Schuha36c8902019-12-30 18:07:15 -0800447flatbuffers::Offset<MessageHeader> PackMessage(
448 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
449 int channel_index, LogType log_type) {
450 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
451
452 switch (log_type) {
453 case LogType::kLogMessage:
454 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800455 case LogType::kLogRemoteMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700456 // Since the timestamps are 8 byte aligned, we are going to end up adding
457 // padding in the middle of the message to pad everything out to 8 byte
458 // alignment. That's rather wasteful. To make things efficient to mmap
459 // while reading uncompressed logs, we'd actually rather the message be
460 // aligned. So, force 8 byte alignment (enough to preserve alignment
461 // inside the nested message so that we can read it without moving it)
462 // here.
463 fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8);
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700464 data_offset = fbb->CreateVector(
465 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800466 break;
467
468 case LogType::kLogDeliveryTimeOnly:
469 break;
470 }
471
472 MessageHeader::Builder message_header_builder(*fbb);
473 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800474
Austin Schuhfa30c352022-10-16 11:12:02 -0700475 // These are split out into very explicit serialization calls because the
476 // order here changes the order things are written out on the wire, and we
477 // want to control and understand it here. Changing the order can increase
478 // the amount of padding bytes in the middle.
479 //
James Kuszmaul9776b392023-01-14 14:08:08 -0800480 // It is also easier to follow... And doesn't actually make things much
481 // bigger.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800482 switch (log_type) {
483 case LogType::kLogRemoteMessage:
484 message_header_builder.add_queue_index(context.remote_queue_index);
Austin Schuhfa30c352022-10-16 11:12:02 -0700485 message_header_builder.add_data(data_offset);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800486 message_header_builder.add_monotonic_sent_time(
487 context.monotonic_remote_time.time_since_epoch().count());
488 message_header_builder.add_realtime_sent_time(
489 context.realtime_remote_time.time_since_epoch().count());
490 break;
491
Austin Schuh6f3babe2020-01-26 20:34:50 -0800492 case LogType::kLogDeliveryTimeOnly:
493 message_header_builder.add_queue_index(context.queue_index);
494 message_header_builder.add_monotonic_sent_time(
495 context.monotonic_event_time.time_since_epoch().count());
496 message_header_builder.add_realtime_sent_time(
497 context.realtime_event_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800498 message_header_builder.add_monotonic_remote_time(
499 context.monotonic_remote_time.time_since_epoch().count());
500 message_header_builder.add_realtime_remote_time(
501 context.realtime_remote_time.time_since_epoch().count());
502 message_header_builder.add_remote_queue_index(context.remote_queue_index);
503 break;
Austin Schuhfa30c352022-10-16 11:12:02 -0700504
505 case LogType::kLogMessage:
506 message_header_builder.add_queue_index(context.queue_index);
507 message_header_builder.add_data(data_offset);
508 message_header_builder.add_monotonic_sent_time(
509 context.monotonic_event_time.time_since_epoch().count());
510 message_header_builder.add_realtime_sent_time(
511 context.realtime_event_time.time_since_epoch().count());
512 break;
513
514 case LogType::kLogMessageAndDeliveryTime:
515 message_header_builder.add_queue_index(context.queue_index);
516 message_header_builder.add_remote_queue_index(context.remote_queue_index);
517 message_header_builder.add_monotonic_sent_time(
518 context.monotonic_event_time.time_since_epoch().count());
519 message_header_builder.add_realtime_sent_time(
520 context.realtime_event_time.time_since_epoch().count());
521 message_header_builder.add_monotonic_remote_time(
522 context.monotonic_remote_time.time_since_epoch().count());
523 message_header_builder.add_realtime_remote_time(
524 context.realtime_remote_time.time_since_epoch().count());
525 message_header_builder.add_data(data_offset);
526 break;
Austin Schuha36c8902019-12-30 18:07:15 -0800527 }
528
529 return message_header_builder.Finish();
530}
531
Austin Schuhfa30c352022-10-16 11:12:02 -0700532flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) {
533 switch (log_type) {
534 case LogType::kLogMessage:
535 return
536 // Root table size + offset.
537 sizeof(flatbuffers::uoffset_t) * 2 +
538 // 6 padding bytes to pad the header out properly.
539 6 +
540 // vtable header (size + size of table)
541 sizeof(flatbuffers::voffset_t) * 2 +
542 // offsets to all the fields.
543 sizeof(flatbuffers::voffset_t) * 5 +
544 // pointer to vtable
545 sizeof(flatbuffers::soffset_t) +
546 // pointer to data
547 sizeof(flatbuffers::uoffset_t) +
548 // realtime_sent_time, monotonic_sent_time
549 sizeof(int64_t) * 2 +
550 // queue_index, channel_index
551 sizeof(uint32_t) * 2;
552
553 case LogType::kLogDeliveryTimeOnly:
554 return
555 // Root table size + offset.
556 sizeof(flatbuffers::uoffset_t) * 2 +
557 // 6 padding bytes to pad the header out properly.
558 4 +
559 // vtable header (size + size of table)
560 sizeof(flatbuffers::voffset_t) * 2 +
561 // offsets to all the fields.
562 sizeof(flatbuffers::voffset_t) * 8 +
563 // pointer to vtable
564 sizeof(flatbuffers::soffset_t) +
565 // remote_queue_index
566 sizeof(uint32_t) +
567 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
568 // monotonic_sent_time
569 sizeof(int64_t) * 4 +
570 // queue_index, channel_index
571 sizeof(uint32_t) * 2;
572
573 case LogType::kLogMessageAndDeliveryTime:
574 return
575 // Root table size + offset.
576 sizeof(flatbuffers::uoffset_t) * 2 +
577 // 4 padding bytes to pad the header out properly.
578 4 +
579 // vtable header (size + size of table)
580 sizeof(flatbuffers::voffset_t) * 2 +
581 // offsets to all the fields.
582 sizeof(flatbuffers::voffset_t) * 8 +
583 // pointer to vtable
584 sizeof(flatbuffers::soffset_t) +
585 // pointer to data
586 sizeof(flatbuffers::uoffset_t) +
587 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
588 // monotonic_sent_time
589 sizeof(int64_t) * 4 +
590 // remote_queue_index, queue_index, channel_index
591 sizeof(uint32_t) * 3;
592
593 case LogType::kLogRemoteMessage:
594 return
595 // Root table size + offset.
596 sizeof(flatbuffers::uoffset_t) * 2 +
597 // 6 padding bytes to pad the header out properly.
598 6 +
599 // vtable header (size + size of table)
600 sizeof(flatbuffers::voffset_t) * 2 +
601 // offsets to all the fields.
602 sizeof(flatbuffers::voffset_t) * 5 +
603 // pointer to vtable
604 sizeof(flatbuffers::soffset_t) +
605 // realtime_sent_time, monotonic_sent_time
606 sizeof(int64_t) * 2 +
607 // pointer to data
608 sizeof(flatbuffers::uoffset_t) +
609 // queue_index, channel_index
610 sizeof(uint32_t) * 2;
611 }
612 LOG(FATAL);
613}
614
James Kuszmaul9776b392023-01-14 14:08:08 -0800615flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size) {
Austin Schuhfa30c352022-10-16 11:12:02 -0700616 static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
617 "Update size logic please.");
618 const flatbuffers::uoffset_t aligned_data_length =
Austin Schuh48d10d62022-10-16 22:19:23 -0700619 ((data_size + 7) & 0xfffffff8u);
Austin Schuhfa30c352022-10-16 11:12:02 -0700620 switch (log_type) {
621 case LogType::kLogDeliveryTimeOnly:
622 return PackMessageHeaderSize(log_type);
623
624 case LogType::kLogMessage:
625 case LogType::kLogMessageAndDeliveryTime:
626 case LogType::kLogRemoteMessage:
627 return PackMessageHeaderSize(log_type) +
628 // Vector...
629 sizeof(flatbuffers::uoffset_t) + aligned_data_length;
630 }
631 LOG(FATAL);
632}
633
Austin Schuhfa30c352022-10-16 11:12:02 -0700634size_t PackMessageInline(uint8_t *buffer, const Context &context,
Austin Schuh71a40d42023-02-04 21:22:22 -0800635 int channel_index, LogType log_type, size_t start_byte,
636 size_t end_byte) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700637 // TODO(austin): Figure out how to copy directly from shared memory instead of
638 // first into the fetcher's memory and then into here. That would save a lot
639 // of memory.
Austin Schuhfa30c352022-10-16 11:12:02 -0700640 const flatbuffers::uoffset_t message_size =
Austin Schuh48d10d62022-10-16 22:19:23 -0700641 PackMessageSize(log_type, context.size);
Austin Schuh71a40d42023-02-04 21:22:22 -0800642 DCHECK_EQ((message_size % 8), 0u) << ": Non 8 byte length...";
643 DCHECK_EQ((start_byte % 8u), 0u);
644 DCHECK_EQ((end_byte % 8u), 0u);
645 DCHECK_LE(start_byte, end_byte);
646 DCHECK_LE(end_byte, message_size);
Austin Schuhfa30c352022-10-16 11:12:02 -0700647
648 // Pack all the data in. This is brittle but easy to change. Use the
649 // InlinePackMessage.Equivilent unit test to verify everything matches.
650 switch (log_type) {
651 case LogType::kLogMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -0800652 switch (start_byte) {
653 case 0x00u:
654 if ((end_byte) == 0x00u) {
655 break;
656 }
657 // clang-format off
658 // header:
659 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
660 buffer = Push<flatbuffers::uoffset_t>(
661 buffer, message_size - sizeof(flatbuffers::uoffset_t));
662
663 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
664 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
665 [[fallthrough]];
666 case 0x08u:
667 if ((end_byte) == 0x08u) {
668 break;
669 }
670 //
671 // padding:
672 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
673 buffer = Pad(buffer, 6);
674 //
675 // vtable (aos.logger.MessageHeader):
676 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
677 buffer = Push<flatbuffers::voffset_t>(buffer, 0xe);
678 [[fallthrough]];
679 case 0x10u:
680 if ((end_byte) == 0x10u) {
681 break;
682 }
683 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
684 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
685 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
686 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
687 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
688 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
689 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
690 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
691 [[fallthrough]];
692 case 0x18u:
693 if ((end_byte) == 0x18u) {
694 break;
695 }
696 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
697 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
698 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
699 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
700 //
701 // root_table (aos.logger.MessageHeader):
702 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
703 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e);
704 [[fallthrough]];
705 case 0x20u:
706 if ((end_byte) == 0x20u) {
707 break;
708 }
709 // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long)
710 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
711 [[fallthrough]];
712 case 0x28u:
713 if ((end_byte) == 0x28u) {
714 break;
715 }
716 // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long)
717 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
718 [[fallthrough]];
719 case 0x30u:
720 if ((end_byte) == 0x30u) {
721 break;
722 }
723 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
724 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c);
725 // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt)
726 buffer = Push<uint32_t>(buffer, context.queue_index);
727 [[fallthrough]];
728 case 0x38u:
729 if ((end_byte) == 0x38u) {
730 break;
731 }
732 // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt)
733 buffer = Push<uint32_t>(buffer, channel_index);
734 //
735 // vector (aos.logger.MessageHeader.data):
736 // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items)
737 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
738 [[fallthrough]];
739 case 0x40u:
740 if ((end_byte) == 0x40u) {
741 break;
742 }
743 [[fallthrough]];
744 default:
745 // +0x40 | FF | uint8_t | 0xFF (255) | value[0]
746 // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1]
747 // +0x42 | EE | uint8_t | 0xEE (238) | value[2]
748 // +0x43 | 00 | uint8_t | 0x00 (0) | value[3]
749 // +0x44 | 20 | uint8_t | 0x20 (32) | value[4]
750 // +0x45 | 4D | uint8_t | 0x4D (77) | value[5]
751 // +0x46 | FF | uint8_t | 0xFF (255) | value[6]
752 // +0x47 | 25 | uint8_t | 0x25 (37) | value[7]
753 // +0x48 | 3C | uint8_t | 0x3C (60) | value[8]
754 // +0x49 | 17 | uint8_t | 0x17 (23) | value[9]
755 // +0x4A | 65 | uint8_t | 0x65 (101) | value[10]
756 // +0x4B | 2F | uint8_t | 0x2F (47) | value[11]
757 // +0x4C | 63 | uint8_t | 0x63 (99) | value[12]
758 // +0x4D | 58 | uint8_t | 0x58 (88) | value[13]
759 //
760 // padding:
761 // +0x4E | 00 00 | uint8_t[2] | .. | padding
762 // clang-format on
763 if (start_byte <= 0x40 && end_byte == message_size) {
764 // The easy one, slap it all down.
765 buffer = PushBytes(buffer, context.data, context.size);
766 buffer =
767 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
768 } else {
769 const size_t data_start_byte =
770 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
771 const size_t data_end_byte = end_byte - 0x40;
772 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
773 if (data_start_byte < padded_size) {
774 buffer = PushBytes(
775 buffer,
776 reinterpret_cast<const uint8_t *>(context.data) +
777 data_start_byte,
778 std::min(context.size, data_end_byte) - data_start_byte);
779 if (data_end_byte == padded_size) {
780 // We can only pad the last 7 bytes, so this only gets written
781 // if we write the last byte.
782 buffer = Pad(buffer,
783 ((context.size + 7) & 0xfffffff8u) - context.size);
784 }
785 }
786 }
787 break;
788 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700789 break;
790
791 case LogType::kLogDeliveryTimeOnly:
Austin Schuh71a40d42023-02-04 21:22:22 -0800792 switch (start_byte) {
793 case 0x00u:
794 if ((end_byte) == 0x00u) {
795 break;
796 }
797 // clang-format off
798 // header:
799 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
800 buffer = Push<flatbuffers::uoffset_t>(
801 buffer, message_size - sizeof(flatbuffers::uoffset_t));
802 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
803 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
Austin Schuhfa30c352022-10-16 11:12:02 -0700804
Austin Schuh71a40d42023-02-04 21:22:22 -0800805 [[fallthrough]];
806 case 0x08u:
807 if ((end_byte) == 0x08u) {
808 break;
809 }
810 //
811 // padding:
812 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
813 buffer = Pad(buffer, 4);
814 //
815 // vtable (aos.logger.MessageHeader):
816 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
817 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
818 // +0x0E | 30 00 | uint16_t | 0x0030 (48) | size of referring table
819 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
820 [[fallthrough]];
821 case 0x10u:
822 if ((end_byte) == 0x10u) {
823 break;
824 }
825 // +0x10 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `channel_index` (id: 0)
826 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
827 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
828 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
829 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
830 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
831 // +0x16 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `queue_index` (id: 3)
832 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
833 [[fallthrough]];
834 case 0x18u:
835 if ((end_byte) == 0x18u) {
836 break;
837 }
838 // +0x18 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
839 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
840 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
841 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
842 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
843 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
844 // +0x1E | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
845 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
846 [[fallthrough]];
847 case 0x20u:
848 if ((end_byte) == 0x20u) {
849 break;
850 }
851 //
852 // root_table (aos.logger.MessageHeader):
853 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
854 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
855 // +0x24 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `remote_queue_index` (UInt)
856 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
857 [[fallthrough]];
858 case 0x28u:
859 if ((end_byte) == 0x28u) {
860 break;
861 }
862 // +0x28 | C6 85 F1 AB 83 B5 CD EB | int64_t | 0xEBCDB583ABF185C6 (-1455307527440726586) | table field `realtime_remote_time` (Long)
863 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
864 [[fallthrough]];
865 case 0x30u:
866 if ((end_byte) == 0x30u) {
867 break;
868 }
869 // +0x30 | 47 24 D3 97 1E 42 2D 99 | int64_t | 0x992D421E97D32447 (-7409193112790948793) | table field `monotonic_remote_time` (Long)
870 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
871 [[fallthrough]];
872 case 0x38u:
873 if ((end_byte) == 0x38u) {
874 break;
875 }
876 // +0x38 | C8 B9 A7 AB 79 F2 CD 60 | int64_t | 0x60CDF279ABA7B9C8 (6975498002251626952) | table field `realtime_sent_time` (Long)
877 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
878 [[fallthrough]];
879 case 0x40u:
880 if ((end_byte) == 0x40u) {
881 break;
882 }
883 // +0x40 | EA 8F 2A 0F AF 01 7A AB | int64_t | 0xAB7A01AF0F2A8FEA (-6090553694679822358) | table field `monotonic_sent_time` (Long)
884 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
885 [[fallthrough]];
886 case 0x48u:
887 if ((end_byte) == 0x48u) {
888 break;
889 }
890 // +0x48 | F5 00 00 00 | uint32_t | 0x000000F5 (245) | table field `queue_index` (UInt)
891 buffer = Push<uint32_t>(buffer, context.queue_index);
892 // +0x4C | 88 00 00 00 | uint32_t | 0x00000088 (136) | table field `channel_index` (UInt)
893 buffer = Push<uint32_t>(buffer, channel_index);
894
895 // clang-format on
896 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700897 break;
898
899 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh71a40d42023-02-04 21:22:22 -0800900 switch (start_byte) {
901 case 0x00u:
902 if ((end_byte) == 0x00u) {
903 break;
904 }
905 // clang-format off
906 // header:
907 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
908 buffer = Push<flatbuffers::uoffset_t>(
909 buffer, message_size - sizeof(flatbuffers::uoffset_t));
910 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
911 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
912 [[fallthrough]];
913 case 0x08u:
914 if ((end_byte) == 0x08u) {
915 break;
916 }
917 //
918 // padding:
919 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
920 buffer = Pad(buffer, 4);
921 //
922 // vtable (aos.logger.MessageHeader):
923 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
924 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
925 // +0x0E | 34 00 | uint16_t | 0x0034 (52) | size of referring table
926 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
927 [[fallthrough]];
928 case 0x10u:
929 if ((end_byte) == 0x10u) {
930 break;
931 }
932 // +0x10 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `channel_index` (id: 0)
933 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
934 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
935 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
936 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
937 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
938 // +0x16 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `queue_index` (id: 3)
939 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
940 [[fallthrough]];
941 case 0x18u:
942 if ((end_byte) == 0x18u) {
943 break;
944 }
945 // +0x18 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `data` (id: 4)
946 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
947 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
948 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
949 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
950 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
951 // +0x1E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `remote_queue_index` (id: 7)
952 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
953 [[fallthrough]];
954 case 0x20u:
955 if ((end_byte) == 0x20u) {
956 break;
957 }
958 //
959 // root_table (aos.logger.MessageHeader):
960 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
961 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
962 // +0x24 | 30 00 00 00 | UOffset32 | 0x00000030 (48) Loc: +0x54 | offset to field `data` (vector)
963 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x30);
964 [[fallthrough]];
965 case 0x28u:
966 if ((end_byte) == 0x28u) {
967 break;
968 }
969 // +0x28 | C4 C8 87 BF 40 6C 1F 29 | int64_t | 0x291F6C40BF87C8C4 (2963206105180129476) | table field `realtime_remote_time` (Long)
970 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
971 [[fallthrough]];
972 case 0x30u:
973 if ((end_byte) == 0x30u) {
974 break;
975 }
976 // +0x30 | 0F 00 26 FD D2 6D C0 1F | int64_t | 0x1FC06DD2FD26000F (2287949363661897743) | table field `monotonic_remote_time` (Long)
977 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
978 [[fallthrough]];
979 case 0x38u:
980 if ((end_byte) == 0x38u) {
981 break;
982 }
983 // +0x38 | 29 75 09 C0 73 73 BF 88 | int64_t | 0x88BF7373C0097529 (-8593022623019338455) | table field `realtime_sent_time` (Long)
984 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
985 [[fallthrough]];
986 case 0x40u:
987 if ((end_byte) == 0x40u) {
988 break;
989 }
990 // +0x40 | 6D 8A AE 04 50 25 9C E9 | int64_t | 0xE99C255004AE8A6D (-1613373540899321235) | table field `monotonic_sent_time` (Long)
991 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
992 [[fallthrough]];
993 case 0x48u:
994 if ((end_byte) == 0x48u) {
995 break;
996 }
997 // +0x48 | 47 00 00 00 | uint32_t | 0x00000047 (71) | table field `remote_queue_index` (UInt)
998 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
999 // +0x4C | 4C 00 00 00 | uint32_t | 0x0000004C (76) | table field `queue_index` (UInt)
1000 buffer = Push<uint32_t>(buffer, context.queue_index);
1001 [[fallthrough]];
1002 case 0x50u:
1003 if ((end_byte) == 0x50u) {
1004 break;
1005 }
1006 // +0x50 | 72 00 00 00 | uint32_t | 0x00000072 (114) | table field `channel_index` (UInt)
1007 buffer = Push<uint32_t>(buffer, channel_index);
1008 //
1009 // vector (aos.logger.MessageHeader.data):
1010 // +0x54 | 07 00 00 00 | uint32_t | 0x00000007 (7) | length of vector (# items)
1011 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
1012 [[fallthrough]];
1013 case 0x58u:
1014 if ((end_byte) == 0x58u) {
1015 break;
1016 }
1017 [[fallthrough]];
1018 default:
1019 // +0x58 | B1 | uint8_t | 0xB1 (177) | value[0]
1020 // +0x59 | 4A | uint8_t | 0x4A (74) | value[1]
1021 // +0x5A | 50 | uint8_t | 0x50 (80) | value[2]
1022 // +0x5B | 24 | uint8_t | 0x24 (36) | value[3]
1023 // +0x5C | AF | uint8_t | 0xAF (175) | value[4]
1024 // +0x5D | C8 | uint8_t | 0xC8 (200) | value[5]
1025 // +0x5E | D5 | uint8_t | 0xD5 (213) | value[6]
1026 //
1027 // padding:
1028 // +0x5F | 00 | uint8_t[1] | . | padding
1029 // clang-format on
1030
1031 if (start_byte <= 0x58 && end_byte == message_size) {
1032 // The easy one, slap it all down.
1033 buffer = PushBytes(buffer, context.data, context.size);
1034 buffer =
1035 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1036 } else {
1037 const size_t data_start_byte =
1038 start_byte < 0x58 ? 0x0u : (start_byte - 0x58);
1039 const size_t data_end_byte = end_byte - 0x58;
1040 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1041 if (data_start_byte < padded_size) {
1042 buffer = PushBytes(
1043 buffer,
1044 reinterpret_cast<const uint8_t *>(context.data) +
1045 data_start_byte,
1046 std::min(context.size, data_end_byte) - data_start_byte);
1047 if (data_end_byte == padded_size) {
1048 // We can only pad the last 7 bytes, so this only gets written
1049 // if we write the last byte.
1050 buffer = Pad(buffer,
1051 ((context.size + 7) & 0xfffffff8u) - context.size);
1052 }
1053 }
1054 }
1055
1056 break;
1057 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001058
1059 break;
1060
1061 case LogType::kLogRemoteMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -08001062 switch (start_byte) {
1063 case 0x00u:
1064 if ((end_byte) == 0x00u) {
1065 break;
1066 }
1067 // This is the message we need to recreate.
1068 //
1069 // clang-format off
1070 // header:
1071 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
1072 buffer = Push<flatbuffers::uoffset_t>(
1073 buffer, message_size - sizeof(flatbuffers::uoffset_t));
1074 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
1075 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
1076 [[fallthrough]];
1077 case 0x08u:
1078 if ((end_byte) == 0x08u) {
1079 break;
1080 }
1081 //
1082 // padding:
1083 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1084 buffer = Pad(buffer, 6);
1085 //
1086 // vtable (aos.logger.MessageHeader):
1087 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
1088 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e);
1089 [[fallthrough]];
1090 case 0x10u:
1091 if ((end_byte) == 0x10u) {
1092 break;
1093 }
1094 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
1095 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
1096 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
1097 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
1098 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
1099 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
1100 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
1101 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
1102 [[fallthrough]];
1103 case 0x18u:
1104 if ((end_byte) == 0x18u) {
1105 break;
1106 }
1107 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
1108 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
1109 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
1110 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
1111 //
1112 // root_table (aos.logger.MessageHeader):
1113 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
1114 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E);
1115 [[fallthrough]];
1116 case 0x20u:
1117 if ((end_byte) == 0x20u) {
1118 break;
1119 }
1120 // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long)
1121 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
1122 [[fallthrough]];
1123 case 0x28u:
1124 if ((end_byte) == 0x28u) {
1125 break;
1126 }
1127 // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long)
1128 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
1129 [[fallthrough]];
1130 case 0x30u:
1131 if ((end_byte) == 0x30u) {
1132 break;
1133 }
1134 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
1135 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C);
1136 // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt)
1137 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1138 [[fallthrough]];
1139 case 0x38u:
1140 if ((end_byte) == 0x38u) {
1141 break;
1142 }
1143 // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt)
1144 buffer = Push<uint32_t>(buffer, channel_index);
1145 //
1146 // vector (aos.logger.MessageHeader.data):
1147 // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items)
1148 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
1149 [[fallthrough]];
1150 case 0x40u:
1151 if ((end_byte) == 0x40u) {
1152 break;
1153 }
1154 [[fallthrough]];
1155 default:
1156 // +0x40 | 38 | uint8_t | 0x38 (56) | value[0]
1157 // +0x41 | 1A | uint8_t | 0x1A (26) | value[1]
1158 // ...
1159 // +0x58 | 90 | uint8_t | 0x90 (144) | value[24]
1160 // +0x59 | 92 | uint8_t | 0x92 (146) | value[25]
1161 //
1162 // padding:
1163 // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1164 // clang-format on
1165 if (start_byte <= 0x40 && end_byte == message_size) {
1166 // The easy one, slap it all down.
1167 buffer = PushBytes(buffer, context.data, context.size);
1168 buffer =
1169 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1170 } else {
1171 const size_t data_start_byte =
1172 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
1173 const size_t data_end_byte = end_byte - 0x40;
1174 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1175 if (data_start_byte < padded_size) {
1176 buffer = PushBytes(
1177 buffer,
1178 reinterpret_cast<const uint8_t *>(context.data) +
1179 data_start_byte,
1180 std::min(context.size, data_end_byte) - data_start_byte);
1181 if (data_end_byte == padded_size) {
1182 // We can only pad the last 7 bytes, so this only gets written
1183 // if we write the last byte.
1184 buffer = Pad(buffer,
1185 ((context.size + 7) & 0xfffffff8u) - context.size);
1186 }
1187 }
1188 }
1189 break;
1190 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001191 }
1192
Austin Schuh71a40d42023-02-04 21:22:22 -08001193 return end_byte - start_byte;
Austin Schuhfa30c352022-10-16 11:12:02 -07001194}
1195
Austin Schuhcd368422021-11-22 21:23:29 -08001196SpanReader::SpanReader(std::string_view filename, bool quiet)
Alexei Strotscee7b372023-04-21 11:57:54 -07001197 : SpanReader(filename, ResolveDecoder(filename, quiet)) {}
Tyler Chatow2015bc62021-08-04 21:15:09 -07001198
Alexei Strotscee7b372023-04-21 11:57:54 -07001199SpanReader::SpanReader(std::string_view filename,
1200 std::unique_ptr<DataDecoder> decoder)
1201 : filename_(filename), decoder_(std::move(decoder)) {}
Austin Schuh05b70472020-01-01 17:11:17 -08001202
Austin Schuhcf5f6442021-07-06 10:43:28 -07001203absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001204 // Make sure we have enough for the size.
1205 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
1206 if (!ReadBlock()) {
1207 return absl::Span<const uint8_t>();
1208 }
1209 }
1210
1211 // Now make sure we have enough for the message.
1212 const size_t data_size =
1213 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1214 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -08001215 if (data_size == sizeof(flatbuffers::uoffset_t)) {
1216 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
1217 LOG(ERROR) << " Rest of log file is "
1218 << absl::BytesToHexString(std::string_view(
1219 reinterpret_cast<const char *>(data_.data() +
1220 consumed_data_),
1221 data_.size() - consumed_data_));
1222 return absl::Span<const uint8_t>();
1223 }
Austin Schuh05b70472020-01-01 17:11:17 -08001224 while (data_.size() < consumed_data_ + data_size) {
1225 if (!ReadBlock()) {
1226 return absl::Span<const uint8_t>();
1227 }
1228 }
1229
1230 // And return it, consuming the data.
1231 const uint8_t *data_ptr = data_.data() + consumed_data_;
1232
Austin Schuh05b70472020-01-01 17:11:17 -08001233 return absl::Span<const uint8_t>(data_ptr, data_size);
1234}
1235
Austin Schuhcf5f6442021-07-06 10:43:28 -07001236void SpanReader::ConsumeMessage() {
Brian Smarttea913d42021-12-10 15:02:38 -08001237 size_t consumed_size =
Austin Schuhcf5f6442021-07-06 10:43:28 -07001238 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1239 sizeof(flatbuffers::uoffset_t);
Brian Smarttea913d42021-12-10 15:02:38 -08001240 consumed_data_ += consumed_size;
1241 total_consumed_ += consumed_size;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001242}
1243
1244absl::Span<const uint8_t> SpanReader::ReadMessage() {
1245 absl::Span<const uint8_t> result = PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001246 if (!result.empty()) {
Austin Schuhcf5f6442021-07-06 10:43:28 -07001247 ConsumeMessage();
Brian Smarttea913d42021-12-10 15:02:38 -08001248 } else {
1249 is_finished_ = true;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001250 }
1251 return result;
1252}
1253
Austin Schuh05b70472020-01-01 17:11:17 -08001254bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001255 // This is the amount of data we grab at a time. Doing larger chunks minimizes
1256 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -08001257 constexpr size_t kReadSize = 256 * 1024;
1258
1259 // Strip off any unused data at the front.
1260 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001261 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -08001262 consumed_data_ = 0;
1263 }
1264
1265 const size_t starting_size = data_.size();
1266
1267 // This should automatically grow the backing store. It won't shrink if we
1268 // get a small chunk later. This reduces allocations when we want to append
1269 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -07001270 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -08001271
Brian Silvermanf51499a2020-09-21 12:49:08 -07001272 const size_t count =
1273 decoder_->Read(data_.begin() + starting_size, data_.end());
1274 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -08001275 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -08001276 return false;
1277 }
Austin Schuh05b70472020-01-01 17:11:17 -08001278
Brian Smarttea913d42021-12-10 15:02:38 -08001279 total_read_ += count;
1280
Austin Schuh05b70472020-01-01 17:11:17 -08001281 return true;
1282}
1283
Austin Schuhadd6eb32020-11-09 21:24:26 -08001284std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -07001285 SpanReader *span_reader) {
1286 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001287
1288 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001289 if (config_data.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001290 return std::nullopt;
1291 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001292
Austin Schuh5212cad2020-09-09 23:12:09 -07001293 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001294 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -08001295 if (!result.Verify()) {
1296 return std::nullopt;
1297 }
Austin Schuh0e8db662021-07-06 10:43:47 -07001298
Austin Schuhcc2c9a52022-12-12 15:55:13 -08001299 // We only know of busted headers in the versions of the log file header
1300 // *before* the logger_sha1 field was added. At some point before that point,
1301 // the logic to track when a header has been written was rewritten in such a
1302 // way that it can't happen anymore. We've seen some logs where the body
1303 // parses as a header recently, so the simple solution of always looking is
1304 // failing us.
1305 if (FLAGS_workaround_double_headers && !result.message().has_logger_sha1()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001306 while (true) {
1307 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001308 if (maybe_header_data.empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001309 break;
1310 }
1311
1312 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
1313 maybe_header_data);
1314 if (maybe_header.Verify()) {
1315 LOG(WARNING) << "Found duplicate LogFileHeader in "
1316 << span_reader->filename();
1317 ResizeableBuffer header_data_copy;
1318 header_data_copy.resize(maybe_header_data.size());
1319 memcpy(header_data_copy.data(), maybe_header_data.begin(),
1320 header_data_copy.size());
1321 result = SizePrefixedFlatbufferVector<LogFileHeader>(
1322 std::move(header_data_copy));
1323
1324 span_reader->ConsumeMessage();
1325 } else {
1326 break;
1327 }
1328 }
1329 }
Austin Schuhe09beb12020-12-11 20:04:27 -08001330 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001331}
1332
Austin Schuh0e8db662021-07-06 10:43:47 -07001333std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
1334 std::string_view filename) {
1335 SpanReader span_reader(filename);
1336 return ReadHeader(&span_reader);
1337}
1338
Austin Schuhadd6eb32020-11-09 21:24:26 -08001339std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -08001340 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -07001341 SpanReader span_reader(filename);
1342 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
1343 for (size_t i = 0; i < n + 1; ++i) {
1344 data_span = span_reader.ReadMessage();
1345
1346 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001347 if (data_span.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001348 return std::nullopt;
1349 }
Austin Schuh5212cad2020-09-09 23:12:09 -07001350 }
1351
Brian Silverman354697a2020-09-22 21:06:32 -07001352 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001353 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -08001354 if (!result.Verify()) {
1355 return std::nullopt;
1356 }
1357 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -07001358}
1359
Austin Schuh05b70472020-01-01 17:11:17 -08001360MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -07001361 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -08001362 raw_log_file_header_(
1363 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001364 set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
1365 set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
1366
Austin Schuh0e8db662021-07-06 10:43:47 -07001367 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
1368 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -08001369
1370 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -07001371 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -08001372
Austin Schuh0e8db662021-07-06 10:43:47 -07001373 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -08001374
Austin Schuh5b728b72021-06-16 14:57:15 -07001375 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
1376
Brian Smarttea913d42021-12-10 15:02:38 -08001377 total_verified_before_ = span_reader_.TotalConsumed();
1378
Austin Schuhcde938c2020-02-02 17:30:07 -08001379 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -08001380 FLAGS_max_out_of_order > 0
1381 ? chrono::duration_cast<chrono::nanoseconds>(
1382 chrono::duration<double>(FLAGS_max_out_of_order))
1383 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -08001384
1385 VLOG(1) << "Opened " << filename << " as node "
1386 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -08001387}
1388
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001389std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001390 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001391 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001392 if (is_corrupted()) {
1393 LOG(ERROR) << "Total corrupted volumes: before = "
1394 << total_verified_before_
1395 << " | corrupted = " << total_corrupted_
1396 << " | during = " << total_verified_during_
1397 << " | after = " << total_verified_after_ << std::endl;
1398 }
1399
1400 if (span_reader_.IsIncomplete()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001401 LOG(ERROR) << "Unable to access some messages in " << filename() << " : "
1402 << span_reader_.TotalRead() << " bytes read, "
Brian Smarttea913d42021-12-10 15:02:38 -08001403 << span_reader_.TotalConsumed() << " bytes usable."
1404 << std::endl;
1405 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001406 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -08001407 }
1408
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001409 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
Brian Smarttea913d42021-12-10 15:02:38 -08001410
1411 if (crash_on_corrupt_message_flag_) {
1412 CHECK(msg.Verify()) << "Corrupted message at offset "
Austin Schuh60e77942022-05-16 17:48:24 -07001413 << total_verified_before_ << " found within "
1414 << filename()
Brian Smarttea913d42021-12-10 15:02:38 -08001415 << "; set --nocrash_on_corrupt_message to see summary;"
1416 << " also set --ignore_corrupt_messages to process"
1417 << " anyway";
1418
1419 } else if (!msg.Verify()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001420 LOG(ERROR) << "Corrupted message at offset " << total_verified_before_
Brian Smarttea913d42021-12-10 15:02:38 -08001421 << " from " << filename() << std::endl;
1422
1423 total_corrupted_ += msg_data.size();
1424
1425 while (true) {
1426 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1427
James Kuszmaul9776b392023-01-14 14:08:08 -08001428 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001429 if (!ignore_corrupt_messages_flag_) {
1430 LOG(ERROR) << "Total corrupted volumes: before = "
1431 << total_verified_before_
1432 << " | corrupted = " << total_corrupted_
1433 << " | during = " << total_verified_during_
1434 << " | after = " << total_verified_after_ << std::endl;
1435
1436 if (span_reader_.IsIncomplete()) {
1437 LOG(ERROR) << "Unable to access some messages in " << filename()
1438 << " : " << span_reader_.TotalRead() << " bytes read, "
1439 << span_reader_.TotalConsumed() << " bytes usable."
1440 << std::endl;
1441 }
1442 return nullptr;
1443 }
1444 break;
1445 }
1446
1447 SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
1448
1449 if (!next_msg.Verify()) {
1450 total_corrupted_ += msg_data.size();
1451 total_verified_during_ += total_verified_after_;
1452 total_verified_after_ = 0;
1453
1454 } else {
1455 total_verified_after_ += msg_data.size();
1456 if (ignore_corrupt_messages_flag_) {
1457 msg = next_msg;
1458 break;
1459 }
1460 }
1461 }
1462 }
1463
1464 if (is_corrupted()) {
1465 total_verified_after_ += msg_data.size();
1466 } else {
1467 total_verified_before_ += msg_data.size();
1468 }
Austin Schuh05b70472020-01-01 17:11:17 -08001469
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001470 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -07001471
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001472 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001473
1474 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -08001475
1476 if (VLOG_IS_ON(3)) {
1477 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
1478 } else if (VLOG_IS_ON(2)) {
1479 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
1480 msg_copy.mutable_message()->clear_data();
1481 VLOG(2) << "Read from " << filename() << " data "
1482 << FlatbufferToJson(msg_copy);
1483 }
1484
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001485 return result;
1486}
1487
1488std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
1489 const MessageHeader &message) {
1490 const size_t data_size = message.has_data() ? message.data()->size() : 0;
1491
1492 UnpackedMessageHeader *const unpacked_message =
1493 reinterpret_cast<UnpackedMessageHeader *>(
1494 malloc(sizeof(UnpackedMessageHeader) + data_size +
1495 kChannelDataAlignment - 1));
1496
1497 CHECK(message.has_channel_index());
1498 CHECK(message.has_monotonic_sent_time());
1499
1500 absl::Span<uint8_t> span;
1501 if (data_size > 0) {
1502 span =
1503 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
1504 &unpacked_message->actual_data[0], data_size)),
1505 data_size);
1506 }
1507
Austin Schuh826e6ce2021-11-18 20:33:10 -08001508 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001509 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001510 monotonic_remote_time = aos::monotonic_clock::time_point(
1511 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001512 }
1513 std::optional<realtime_clock::time_point> realtime_remote_time;
1514 if (message.has_realtime_remote_time()) {
1515 realtime_remote_time = realtime_clock::time_point(
1516 chrono::nanoseconds(message.realtime_remote_time()));
1517 }
1518
1519 std::optional<uint32_t> remote_queue_index;
1520 if (message.has_remote_queue_index()) {
1521 remote_queue_index = message.remote_queue_index();
1522 }
1523
James Kuszmaul9776b392023-01-14 14:08:08 -08001524 new (unpacked_message) UnpackedMessageHeader(
1525 message.channel_index(),
1526 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001527 chrono::nanoseconds(message.monotonic_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001528 realtime_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001529 chrono::nanoseconds(message.realtime_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001530 message.queue_index(), monotonic_remote_time, realtime_remote_time,
1531 remote_queue_index,
1532 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001533 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001534 message.has_monotonic_timestamp_time(), span);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001535
1536 if (data_size > 0) {
1537 memcpy(span.data(), message.data()->data(), data_size);
1538 }
1539
1540 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
1541 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -08001542}
1543
Austin Schuhc41603c2020-10-11 16:17:37 -07001544PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001545 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001546 if (parts_.parts.size() >= 2) {
1547 next_message_reader_.emplace(parts_.parts[1]);
1548 }
Austin Schuh48507722021-07-17 17:29:24 -07001549 ComputeBootCounts();
1550}
1551
1552void PartsMessageReader::ComputeBootCounts() {
1553 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
1554 std::nullopt);
1555
1556 // We have 3 vintages of log files with different amounts of information.
1557 if (log_file_header()->has_boot_uuids()) {
1558 // The new hotness with the boots explicitly listed out. We can use the log
1559 // file header to compute the boot count of all relevant nodes.
1560 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
1561 size_t node_index = 0;
1562 for (const flatbuffers::String *boot_uuid :
1563 *log_file_header()->boot_uuids()) {
1564 CHECK(parts_.boots);
1565 if (boot_uuid->size() != 0) {
1566 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
1567 if (it != parts_.boots->boot_count_map.end()) {
1568 boot_counts_[node_index] = it->second;
1569 }
1570 } else if (parts().boots->boots[node_index].size() == 1u) {
1571 boot_counts_[node_index] = 0;
1572 }
1573 ++node_index;
1574 }
1575 } else {
1576 // Older multi-node logs which are guarenteed to have UUIDs logged, or
1577 // single node log files with boot UUIDs in the header. We only know how to
1578 // order certain boots in certain circumstances.
1579 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
1580 for (size_t node_index = 0; node_index < boot_counts_.size();
1581 ++node_index) {
1582 CHECK(parts_.boots);
1583 if (parts().boots->boots[node_index].size() == 1u) {
1584 boot_counts_[node_index] = 0;
1585 }
1586 }
1587 } else {
1588 // Really old single node logs without any UUIDs. They can't reboot.
1589 CHECK_EQ(boot_counts_.size(), 1u);
1590 boot_counts_[0] = 0u;
1591 }
1592 }
1593}
Austin Schuhc41603c2020-10-11 16:17:37 -07001594
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001595std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -07001596 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001597 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -07001598 message_reader_.ReadMessage();
1599 if (message) {
1600 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001601 const monotonic_clock::time_point monotonic_sent_time =
1602 message->monotonic_sent_time;
1603
1604 // TODO(austin): Does this work with startup? Might need to use the
1605 // start time.
1606 // TODO(austin): Does this work with startup when we don't know the
1607 // remote start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -08001608 if (monotonic_sent_time >
1609 parts_.monotonic_start_time + max_out_of_order_duration()) {
1610 after_start_ = true;
1611 }
1612 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -08001613 CHECK_GE(monotonic_sent_time,
1614 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -08001615 << ": Max out of order of " << max_out_of_order_duration().count()
1616 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -08001617 << parts_.monotonic_start_time << " currently reading "
1618 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -08001619 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001620 return message;
1621 }
1622 NextLog();
1623 }
Austin Schuh32f68492020-11-08 21:45:51 -08001624 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001625 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -07001626}
1627
1628void PartsMessageReader::NextLog() {
1629 if (next_part_index_ == parts_.parts.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001630 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -07001631 done_ = true;
1632 return;
1633 }
Brian Silvermanfee16972021-09-14 12:06:38 -07001634 CHECK(next_message_reader_);
1635 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -07001636 ComputeBootCounts();
Brian Silvermanfee16972021-09-14 12:06:38 -07001637 if (next_part_index_ + 1 < parts_.parts.size()) {
1638 next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
1639 } else {
1640 next_message_reader_.reset();
1641 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001642 ++next_part_index_;
1643}
1644
Austin Schuh1be0ce42020-11-29 22:43:26 -08001645bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001646 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001647
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001648 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001649 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001650 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001651 return false;
1652 }
1653
1654 if (this->channel_index < m2.channel_index) {
1655 return true;
1656 } else if (this->channel_index > m2.channel_index) {
1657 return false;
1658 }
1659
1660 return this->queue_index < m2.queue_index;
1661}
1662
1663bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001664bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001665 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001666
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001667 return timestamp.time == m2.timestamp.time &&
1668 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001669}
Austin Schuh1be0ce42020-11-29 22:43:26 -08001670
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001671std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
1672 os << "{.channel_index=" << m.channel_index
1673 << ", .monotonic_sent_time=" << m.monotonic_sent_time
1674 << ", .realtime_sent_time=" << m.realtime_sent_time
1675 << ", .queue_index=" << m.queue_index;
1676 if (m.monotonic_remote_time) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001677 os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001678 }
1679 os << ", .realtime_remote_time=";
1680 PrintOptionalOrNull(&os, m.realtime_remote_time);
1681 os << ", .remote_queue_index=";
1682 PrintOptionalOrNull(&os, m.remote_queue_index);
1683 if (m.has_monotonic_timestamp_time) {
1684 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1685 }
Austin Schuh22cf7862022-09-19 19:09:42 -07001686 os << "}";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001687 return os;
1688}
1689
Austin Schuh1be0ce42020-11-29 22:43:26 -08001690std::ostream &operator<<(std::ostream &os, const Message &m) {
1691 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001692 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001693 if (m.data != nullptr) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001694 if (m.data->remote_queue_index.has_value()) {
1695 os << ", .remote_queue_index=" << *m.data->remote_queue_index;
1696 }
1697 if (m.data->monotonic_remote_time.has_value()) {
1698 os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
1699 }
Austin Schuhfb1b3292021-11-16 21:20:15 -08001700 os << ", .data=" << m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -08001701 }
1702 os << "}";
1703 return os;
1704}
1705
1706std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
1707 os << "{.channel_index=" << m.channel_index
1708 << ", .queue_index=" << m.queue_index
1709 << ", .monotonic_event_time=" << m.monotonic_event_time
1710 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -07001711 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001712 os << ", .remote_queue_index=" << m.remote_queue_index;
1713 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001714 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001715 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
1716 }
1717 if (m.realtime_remote_time != realtime_clock::min_time) {
1718 os << ", .realtime_remote_time=" << m.realtime_remote_time;
1719 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001720 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001721 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1722 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001723 if (m.data != nullptr) {
1724 os << ", .data=" << *m.data;
Austin Schuh22cf7862022-09-19 19:09:42 -07001725 } else {
1726 os << ", .data=nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08001727 }
1728 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -08001729 return os;
1730}
1731
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001732LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001733 : parts_message_reader_(log_parts),
1734 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
1735}
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001736
1737Message *LogPartsSorter::Front() {
1738 // Queue up data until enough data has been queued that the front message is
1739 // sorted enough to be safe to pop. This may do nothing, so we should make
1740 // sure the nothing path is checked quickly.
1741 if (sorted_until() != monotonic_clock::max_time) {
1742 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -07001743 if (!messages_.empty() &&
1744 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -08001745 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001746 break;
1747 }
1748
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001749 std::shared_ptr<UnpackedMessageHeader> m =
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001750 parts_message_reader_.ReadMessage();
1751 // No data left, sorted forever, work through what is left.
1752 if (!m) {
1753 sorted_until_ = monotonic_clock::max_time;
1754 break;
1755 }
1756
Austin Schuh48507722021-07-17 17:29:24 -07001757 size_t monotonic_timestamp_boot = 0;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001758 if (m->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -07001759 monotonic_timestamp_boot = parts().logger_boot_count;
1760 }
1761 size_t monotonic_remote_boot = 0xffffff;
1762
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001763 if (m->monotonic_remote_time.has_value()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001764 const Node *node =
1765 parts().config->nodes()->Get(source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001766
Austin Schuh48507722021-07-17 17:29:24 -07001767 std::optional<size_t> boot = parts_message_reader_.boot_count(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001768 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001769 CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
Austin Schuh60e77942022-05-16 17:48:24 -07001770 << ", with index " << source_node_index_[m->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -07001771 monotonic_remote_boot = *boot;
1772 }
1773
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001774 messages_.insert(
1775 Message{.channel_index = m->channel_index,
1776 .queue_index = BootQueueIndex{.boot = parts().boot_count,
1777 .index = m->queue_index},
1778 .timestamp = BootTimestamp{.boot = parts().boot_count,
1779 .time = m->monotonic_sent_time},
1780 .monotonic_remote_boot = monotonic_remote_boot,
1781 .monotonic_timestamp_boot = monotonic_timestamp_boot,
1782 .data = std::move(m)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001783
1784 // Now, update sorted_until_ to match the new message.
1785 if (parts_message_reader_.newest_timestamp() >
1786 monotonic_clock::min_time +
1787 parts_message_reader_.max_out_of_order_duration()) {
1788 sorted_until_ = parts_message_reader_.newest_timestamp() -
1789 parts_message_reader_.max_out_of_order_duration();
1790 } else {
1791 sorted_until_ = monotonic_clock::min_time;
1792 }
1793 }
1794 }
1795
1796 // Now that we have enough data queued, return a pointer to the oldest piece
1797 // of data if it exists.
1798 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -08001799 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001800 return nullptr;
1801 }
1802
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001803 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -08001804 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001805 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001806 return &(*messages_.begin());
1807}
1808
1809void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
1810
1811std::string LogPartsSorter::DebugString() const {
1812 std::stringstream ss;
1813 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001814 int count = 0;
1815 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001816 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001817 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
1818 ss << m << "\n";
1819 } else if (no_dots) {
1820 ss << "...\n";
1821 no_dots = false;
1822 }
1823 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001824 }
1825 ss << "] <- " << parts_message_reader_.filename();
1826 return ss.str();
1827}
1828
Austin Schuhd2f96102020-12-01 20:27:29 -08001829NodeMerger::NodeMerger(std::vector<LogParts> parts) {
1830 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -07001831 // Enforce that we are sorting things only from a single node from a single
1832 // boot.
1833 const std::string_view part0_node = parts[0].node;
1834 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -08001835 for (size_t i = 1; i < parts.size(); ++i) {
1836 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -07001837 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
1838 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -08001839 }
Austin Schuh715adc12021-06-29 22:07:39 -07001840
1841 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
1842
Austin Schuhd2f96102020-12-01 20:27:29 -08001843 for (LogParts &part : parts) {
1844 parts_sorters_.emplace_back(std::move(part));
1845 }
1846
Austin Schuhd2f96102020-12-01 20:27:29 -08001847 monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh9dc42612021-09-20 20:41:29 -07001848 realtime_start_time_ = realtime_clock::min_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001849 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001850 // We want to capture the earliest meaningful start time here. The start
1851 // time defaults to min_time when there's no meaningful value to report, so
1852 // let's ignore those.
Austin Schuh9dc42612021-09-20 20:41:29 -07001853 if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time) {
1854 bool accept = false;
1855 // We want to prioritize start times from the logger node. Really, we
1856 // want to prioritize start times with a valid realtime_clock time. So,
1857 // if we have a start time without a RT clock, prefer a start time with a
1858 // RT clock, even it if is later.
1859 if (parts_sorter.realtime_start_time() != realtime_clock::min_time) {
1860 // We've got a good one. See if the current start time has a good RT
1861 // clock, or if we should use this one instead.
1862 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1863 accept = true;
1864 } else if (realtime_start_time_ == realtime_clock::min_time) {
1865 // The previous start time doesn't have a good RT time, so it is very
1866 // likely the start time from a remote part file. We just found a
1867 // better start time with a real RT time, so switch to that instead.
1868 accept = true;
1869 }
1870 } else if (realtime_start_time_ == realtime_clock::min_time) {
1871 // We don't have a RT time, so take the oldest.
1872 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1873 accept = true;
1874 }
1875 }
1876
1877 if (accept) {
1878 monotonic_start_time_ = parts_sorter.monotonic_start_time();
1879 realtime_start_time_ = parts_sorter.realtime_start_time();
1880 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001881 }
1882 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001883
1884 // If there was no meaningful start time reported, just use min_time.
1885 if (monotonic_start_time_ == monotonic_clock::max_time) {
1886 monotonic_start_time_ = monotonic_clock::min_time;
1887 realtime_start_time_ = realtime_clock::min_time;
1888 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001889}
Austin Schuh8f52ed52020-11-30 23:12:39 -08001890
Austin Schuh0ca51f32020-12-25 21:51:45 -08001891std::vector<const LogParts *> NodeMerger::Parts() const {
1892 std::vector<const LogParts *> p;
1893 p.reserve(parts_sorters_.size());
1894 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
1895 p.emplace_back(&parts_sorter.parts());
1896 }
1897 return p;
1898}
1899
Austin Schuh8f52ed52020-11-30 23:12:39 -08001900Message *NodeMerger::Front() {
1901 // Return the current Front if we have one, otherwise go compute one.
1902 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -08001903 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001904 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -08001905 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001906 }
1907
1908 // Otherwise, do a simple search for the oldest message, deduplicating any
1909 // duplicates.
1910 Message *oldest = nullptr;
1911 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001912 for (LogPartsSorter &parts_sorter : parts_sorters_) {
1913 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -08001914 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001915 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001916 continue;
1917 }
1918 if (oldest == nullptr || *m < *oldest) {
1919 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -08001920 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001921 } else if (*m == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001922 // Found a duplicate. If there is a choice, we want the one which has
1923 // the timestamp time.
1924 if (!m->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001925 parts_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001926 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001927 current_->PopFront();
1928 current_ = &parts_sorter;
1929 oldest = m;
1930 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001931 CHECK_EQ(m->data->monotonic_timestamp_time,
1932 oldest->data->monotonic_timestamp_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001933 parts_sorter.PopFront();
1934 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001935 }
1936
1937 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -08001938 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001939 }
1940
Austin Schuhb000de62020-12-03 22:00:40 -08001941 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001942 CHECK_GE(oldest->timestamp.time, last_message_time_);
1943 last_message_time_ = oldest->timestamp.time;
Austin Schuh5dd22842021-11-17 16:09:39 -08001944 monotonic_oldest_time_ =
1945 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08001946 } else {
1947 last_message_time_ = monotonic_clock::max_time;
1948 }
1949
Austin Schuh8f52ed52020-11-30 23:12:39 -08001950 // Return the oldest message found. This will be nullptr if nothing was
1951 // found, indicating there is nothing left.
1952 return oldest;
1953}
1954
1955void NodeMerger::PopFront() {
1956 CHECK(current_ != nullptr) << "Popping before calling Front()";
1957 current_->PopFront();
1958 current_ = nullptr;
1959}
1960
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001961BootMerger::BootMerger(std::vector<LogParts> files) {
1962 std::vector<std::vector<LogParts>> boots;
1963
1964 // Now, we need to split things out by boot.
1965 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001966 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001967 if (boot_count + 1 > boots.size()) {
1968 boots.resize(boot_count + 1);
1969 }
1970 boots[boot_count].emplace_back(std::move(files[i]));
1971 }
1972
1973 node_mergers_.reserve(boots.size());
1974 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07001975 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001976 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -07001977 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001978 }
1979 node_mergers_.emplace_back(
1980 std::make_unique<NodeMerger>(std::move(boots[i])));
1981 }
1982}
1983
1984Message *BootMerger::Front() {
1985 Message *result = node_mergers_[index_]->Front();
1986
1987 if (result != nullptr) {
1988 return result;
1989 }
1990
1991 if (index_ + 1u == node_mergers_.size()) {
1992 // At the end of the last node merger, just return.
1993 return nullptr;
1994 } else {
1995 ++index_;
1996 return Front();
1997 }
1998}
1999
2000void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
2001
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002002std::vector<const LogParts *> BootMerger::Parts() const {
2003 std::vector<const LogParts *> results;
2004 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
2005 std::vector<const LogParts *> node_parts = node_merger->Parts();
2006
2007 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
2008 std::make_move_iterator(node_parts.end()));
2009 }
2010
2011 return results;
2012}
2013
Austin Schuhd2f96102020-12-01 20:27:29 -08002014TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002015 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -08002016 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002017 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002018 if (!configuration_) {
2019 configuration_ = part->config;
2020 } else {
2021 CHECK_EQ(configuration_.get(), part->config.get());
2022 }
2023 }
2024 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -08002025 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
2026 // pretty simple.
2027 if (configuration::MultiNode(config)) {
2028 nodes_data_.resize(config->nodes()->size());
2029 const Node *my_node = config->nodes()->Get(node());
2030 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
2031 const Node *node = config->nodes()->Get(node_index);
2032 NodeData *node_data = &nodes_data_[node_index];
2033 node_data->channels.resize(config->channels()->size());
2034 // We should save the channel if it is delivered to the node represented
2035 // by the NodeData, but not sent by that node. That combo means it is
2036 // forwarded.
2037 size_t channel_index = 0;
2038 node_data->any_delivered = false;
2039 for (const Channel *channel : *config->channels()) {
2040 node_data->channels[channel_index].delivered =
2041 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08002042 configuration::ChannelIsSendableOnNode(channel, my_node) &&
2043 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08002044 node_data->any_delivered = node_data->any_delivered ||
2045 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08002046 if (node_data->channels[channel_index].delivered) {
2047 const Connection *connection =
2048 configuration::ConnectionToNode(channel, node);
2049 node_data->channels[channel_index].time_to_live =
2050 chrono::nanoseconds(connection->time_to_live());
2051 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002052 ++channel_index;
2053 }
2054 }
2055
2056 for (const Channel *channel : *config->channels()) {
2057 source_node_.emplace_back(configuration::GetNodeIndex(
2058 config, channel->source_node()->string_view()));
2059 }
2060 }
2061}
2062
2063void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002064 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08002065 CHECK_NE(timestamp_mapper->node(), node());
2066 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
2067
2068 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002069 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08002070 // we could needlessly save data.
2071 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08002072 VLOG(1) << "Registering on node " << node() << " for peer node "
2073 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08002074 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
2075
2076 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07002077
2078 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08002079 }
2080}
2081
Austin Schuh79b30942021-01-24 22:32:21 -08002082void TimestampMapper::QueueMessage(Message *m) {
Austin Schuh60e77942022-05-16 17:48:24 -07002083 matched_messages_.emplace_back(
2084 TimestampedMessage{.channel_index = m->channel_index,
2085 .queue_index = m->queue_index,
2086 .monotonic_event_time = m->timestamp,
2087 .realtime_event_time = m->data->realtime_sent_time,
2088 .remote_queue_index = BootQueueIndex::Invalid(),
2089 .monotonic_remote_time = BootTimestamp::min_time(),
2090 .realtime_remote_time = realtime_clock::min_time,
2091 .monotonic_timestamp_time = BootTimestamp::min_time(),
2092 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08002093}
2094
2095TimestampedMessage *TimestampMapper::Front() {
2096 // No need to fetch anything new. A previous message still exists.
2097 switch (first_message_) {
2098 case FirstMessage::kNeedsUpdate:
2099 break;
2100 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08002101 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002102 case FirstMessage::kNullptr:
2103 return nullptr;
2104 }
2105
Austin Schuh79b30942021-01-24 22:32:21 -08002106 if (matched_messages_.empty()) {
2107 if (!QueueMatched()) {
2108 first_message_ = FirstMessage::kNullptr;
2109 return nullptr;
2110 }
2111 }
2112 first_message_ = FirstMessage::kInMessage;
2113 return &matched_messages_.front();
2114}
2115
2116bool TimestampMapper::QueueMatched() {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002117 MatchResult result = MatchResult::kEndOfFile;
2118 do {
2119 result = MaybeQueueMatched();
2120 } while (result == MatchResult::kSkipped);
2121 return result == MatchResult::kQueued;
2122}
2123
2124bool TimestampMapper::CheckReplayChannelsAndMaybePop(
2125 const TimestampedMessage & /*message*/) {
2126 if (replay_channels_callback_ &&
2127 !replay_channels_callback_(matched_messages_.back())) {
2128 matched_messages_.pop_back();
2129 return true;
2130 }
2131 return false;
2132}
2133
2134TimestampMapper::MatchResult TimestampMapper::MaybeQueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08002135 if (nodes_data_.empty()) {
2136 // Simple path. We are single node, so there are no timestamps to match!
2137 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002138 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002139 if (!m) {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002140 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002141 }
Austin Schuh79b30942021-01-24 22:32:21 -08002142 // Enqueue this message into matched_messages_ so we have a place to
2143 // associate remote timestamps, and return it.
2144 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08002145
Austin Schuh79b30942021-01-24 22:32:21 -08002146 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2147 last_message_time_ = matched_messages_.back().monotonic_event_time;
2148
2149 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002150 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08002151 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002152 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2153 return MatchResult::kSkipped;
2154 }
2155 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002156 }
2157
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002158 // We need to only add messages to the list so they get processed for
2159 // messages which are delivered. Reuse the flow below which uses messages_
2160 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08002161 if (messages_.empty()) {
2162 if (!Queue()) {
2163 // Found nothing to add, we are out of data!
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002164 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002165 }
2166
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002167 // Now that it has been added (and cannibalized), forget about it
2168 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002169 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002170 }
2171
2172 Message *m = &(messages_.front());
2173
2174 if (source_node_[m->channel_index] == node()) {
2175 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08002176 QueueMessage(m);
2177 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2178 last_message_time_ = matched_messages_.back().monotonic_event_time;
2179 messages_.pop_front();
2180 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002181 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2182 return MatchResult::kSkipped;
2183 }
2184 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002185 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002186 // Got a timestamp, find the matching remote data, match it, and return
2187 // it.
Austin Schuhd2f96102020-12-01 20:27:29 -08002188 Message data = MatchingMessageFor(*m);
2189
2190 // Return the data from the remote. The local message only has timestamp
2191 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08002192 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08002193 .channel_index = m->channel_index,
2194 .queue_index = m->queue_index,
2195 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002196 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07002197 .remote_queue_index =
2198 BootQueueIndex{.boot = m->monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002199 .index = m->data->remote_queue_index.value()},
2200 .monotonic_remote_time = {m->monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002201 m->data->monotonic_remote_time.value()},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002202 .realtime_remote_time = m->data->realtime_remote_time.value(),
2203 .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
2204 m->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08002205 .data = std::move(data.data)});
2206 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2207 last_message_time_ = matched_messages_.back().monotonic_event_time;
2208 // Since messages_ holds the data, drop it.
2209 messages_.pop_front();
2210 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002211 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2212 return MatchResult::kSkipped;
2213 }
2214 return MatchResult::kQueued;
Austin Schuh79b30942021-01-24 22:32:21 -08002215 }
2216}
2217
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002218void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08002219 while (last_message_time_ <= queue_time) {
2220 if (!QueueMatched()) {
2221 return;
2222 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002223 }
2224}
2225
Austin Schuhe639ea12021-01-25 13:00:22 -08002226void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002227 // Note: queueing for time doesn't really work well across boots. So we
2228 // just assume that if you are using this, you only care about the current
2229 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002230 //
2231 // TODO(austin): Is that the right concept?
2232 //
Austin Schuhe639ea12021-01-25 13:00:22 -08002233 // Make sure we have something queued first. This makes the end time
2234 // calculation simpler, and is typically what folks want regardless.
2235 if (matched_messages_.empty()) {
2236 if (!QueueMatched()) {
2237 return;
2238 }
2239 }
2240
2241 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002242 std::max(monotonic_start_time(
2243 matched_messages_.front().monotonic_event_time.boot),
2244 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08002245 time_estimation_buffer;
2246
2247 // Place sorted messages on the list until we have
2248 // --time_estimation_buffer_seconds seconds queued up (but queue at least
2249 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002250 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08002251 if (!QueueMatched()) {
2252 return;
2253 }
2254 }
2255}
2256
Austin Schuhd2f96102020-12-01 20:27:29 -08002257void TimestampMapper::PopFront() {
2258 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08002259 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002260 first_message_ = FirstMessage::kNeedsUpdate;
2261
Austin Schuh79b30942021-01-24 22:32:21 -08002262 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002263}
2264
2265Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002266 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002267 CHECK_NOTNULL(message.data);
2268 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07002269 const BootQueueIndex remote_queue_index =
2270 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002271 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08002272
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002273 CHECK(message.data->monotonic_remote_time.has_value());
2274 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08002275
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002276 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07002277 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002278 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002279 const realtime_clock::time_point realtime_remote_time =
2280 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002281
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002282 TimestampMapper *peer =
2283 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08002284
2285 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002286 // asked to pull a timestamp from a peer which doesn't exist, return an
2287 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08002288 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002289 // TODO(austin): Make sure the tests hit all these paths with a boot count
2290 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002291 return Message{.channel_index = message.channel_index,
2292 .queue_index = remote_queue_index,
2293 .timestamp = monotonic_remote_time,
2294 .monotonic_remote_boot = 0xffffff,
2295 .monotonic_timestamp_boot = 0xffffff,
2296 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08002297 }
2298
2299 // The queue which will have the matching data, if available.
2300 std::deque<Message> *data_queue =
2301 &peer->nodes_data_[node()].channels[message.channel_index].messages;
2302
Austin Schuh79b30942021-01-24 22:32:21 -08002303 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08002304
2305 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002306 return Message{.channel_index = message.channel_index,
2307 .queue_index = remote_queue_index,
2308 .timestamp = monotonic_remote_time,
2309 .monotonic_remote_boot = 0xffffff,
2310 .monotonic_timestamp_boot = 0xffffff,
2311 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002312 }
2313
Austin Schuhd2f96102020-12-01 20:27:29 -08002314 if (remote_queue_index < data_queue->front().queue_index ||
2315 remote_queue_index > data_queue->back().queue_index) {
Austin Schuh60e77942022-05-16 17:48:24 -07002316 return Message{.channel_index = message.channel_index,
2317 .queue_index = remote_queue_index,
2318 .timestamp = monotonic_remote_time,
2319 .monotonic_remote_boot = 0xffffff,
2320 .monotonic_timestamp_boot = 0xffffff,
2321 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002322 }
2323
Austin Schuh993ccb52020-12-12 15:59:32 -08002324 // The algorithm below is constant time with some assumptions. We need there
2325 // to be no missing messages in the data stream. This also assumes a queue
2326 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07002327 if (data_queue->back().queue_index.boot ==
2328 data_queue->front().queue_index.boot &&
2329 (data_queue->back().queue_index.index -
2330 data_queue->front().queue_index.index + 1u ==
2331 data_queue->size())) {
2332 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08002333 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07002334 //
2335 // TODO(austin): Move if not reliable.
2336 Message result = (*data_queue)[remote_queue_index.index -
2337 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08002338
2339 CHECK_EQ(result.timestamp, monotonic_remote_time)
2340 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh6a7358f2021-11-18 22:40:40 -08002341 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08002342 << ": Queue index matches, but timestamp doesn't. Please investigate!";
2343 // Now drop the data off the front. We have deduplicated timestamps, so we
2344 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07002345 data_queue->erase(
2346 data_queue->begin(),
2347 data_queue->begin() +
2348 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08002349 return result;
2350 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07002351 // TODO(austin): Binary search.
2352 auto it = std::find_if(
2353 data_queue->begin(), data_queue->end(),
2354 [remote_queue_index,
2355 remote_boot = monotonic_remote_time.boot](const Message &m) {
2356 return m.queue_index == remote_queue_index &&
2357 m.timestamp.boot == remote_boot;
2358 });
Austin Schuh993ccb52020-12-12 15:59:32 -08002359 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002360 return Message{.channel_index = message.channel_index,
2361 .queue_index = remote_queue_index,
2362 .timestamp = monotonic_remote_time,
2363 .monotonic_remote_boot = 0xffffff,
2364 .monotonic_timestamp_boot = 0xffffff,
2365 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08002366 }
2367
2368 Message result = std::move(*it);
2369
2370 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002371 << ": Queue index matches, but timestamp doesn't. Please "
2372 "investigate!";
2373 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
2374 << ": Queue index matches, but timestamp doesn't. Please "
2375 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08002376
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08002377 // Erase everything up to this message. We want to keep 1 message in the
2378 // queue so we can handle reliable messages forwarded across boots.
2379 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08002380
2381 return result;
2382 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002383}
2384
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002385void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002386 if (queued_until_ > t) {
2387 return;
2388 }
2389 while (true) {
2390 if (!messages_.empty() && messages_.back().timestamp > t) {
2391 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
2392 return;
2393 }
2394
2395 if (!Queue()) {
2396 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002397 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08002398 return;
2399 }
2400
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002401 // Now that it has been added (and cannibalized), forget about it
2402 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002403 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002404 }
2405}
2406
2407bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002408 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002409 if (m == nullptr) {
2410 return false;
2411 }
2412 for (NodeData &node_data : nodes_data_) {
2413 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07002414 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08002415 if (node_data.channels[m->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08002416 // If we have data but no timestamps (logs where the timestamps didn't get
2417 // logged are classic), we can grow this indefinitely. We don't need to
2418 // keep anything that is older than the last message returned.
2419
2420 // We have the time on the source node.
2421 // We care to wait until we have the time on the destination node.
2422 std::deque<Message> &messages =
2423 node_data.channels[m->channel_index].messages;
2424 // Max delay over the network is the TTL, so let's take the queue time and
2425 // add TTL to it. Don't forget any messages which are reliable until
2426 // someone can come up with a good reason to forget those too.
2427 if (node_data.channels[m->channel_index].time_to_live >
2428 chrono::nanoseconds(0)) {
2429 // We need to make *some* assumptions about network delay for this to
2430 // work. We want to only look at the RX side. This means we need to
2431 // track the last time a message was popped from any channel from the
2432 // node sending this message, and compare that to the max time we expect
2433 // that a message will take to be delivered across the network. This
2434 // assumes that messages are popped in time order as a proxy for
2435 // measuring the distributed time at this layer.
2436 //
2437 // Leave at least 1 message in here so we can handle reboots and
2438 // messages getting sent twice.
2439 while (messages.size() > 1u &&
2440 messages.begin()->timestamp +
2441 node_data.channels[m->channel_index].time_to_live +
2442 chrono::duration_cast<chrono::nanoseconds>(
2443 chrono::duration<double>(FLAGS_max_network_delay)) <
2444 last_popped_message_time_) {
2445 messages.pop_front();
2446 }
2447 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002448 node_data.channels[m->channel_index].messages.emplace_back(*m);
2449 }
2450 }
2451
2452 messages_.emplace_back(std::move(*m));
2453 return true;
2454}
2455
2456std::string TimestampMapper::DebugString() const {
2457 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07002458 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002459 for (const Message &message : messages_) {
2460 ss << " " << message << "\n";
2461 }
2462 ss << "] queued_until " << queued_until_;
2463 for (const NodeData &ns : nodes_data_) {
2464 if (ns.peer == nullptr) continue;
2465 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
2466 size_t channel_index = 0;
2467 for (const NodeData::ChannelData &channel_data :
2468 ns.peer->nodes_data_[node()].channels) {
2469 if (channel_data.messages.empty()) {
2470 continue;
2471 }
Austin Schuhb000de62020-12-03 22:00:40 -08002472
Austin Schuhd2f96102020-12-01 20:27:29 -08002473 ss << " channel " << channel_index << " [\n";
2474 for (const Message &m : channel_data.messages) {
2475 ss << " " << m << "\n";
2476 }
2477 ss << " ]\n";
2478 ++channel_index;
2479 }
2480 ss << "] queued_until " << ns.peer->queued_until_;
2481 }
2482 return ss.str();
2483}
2484
Austin Schuhee711052020-08-24 16:06:09 -07002485std::string MaybeNodeName(const Node *node) {
2486 if (node != nullptr) {
2487 return node->name()->str() + " ";
2488 }
2489 return "";
2490}
2491
Brian Silvermanf51499a2020-09-21 12:49:08 -07002492} // namespace aos::logger