blob: 990e742cf3b8559ea24876515647c61a77606f9e [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Alexei Strots01395492023-03-20 13:59:56 -070010#include <filesystem>
Austin Schuha36c8902019-12-30 18:07:15 -080011
Austin Schuhe4fca832020-03-07 16:58:53 -080012#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080013#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070014#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080015#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080016#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080017#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080018#include "gflags/gflags.h"
19#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080020
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070021#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070022#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070023#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070024#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070025#else
26#define ENABLE_LZMA 0
27#endif
28
29#if ENABLE_LZMA
30#include "aos/events/logging/lzma_encoder.h"
31#endif
Austin Schuh86110712022-09-16 15:40:54 -070032#if ENABLE_S3
33#include "aos/events/logging/s3_fetcher.h"
34#endif
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070035
Austin Schuh48d10d62022-10-16 22:19:23 -070036DEFINE_int32(flush_size, 128 * 1024,
Austin Schuha36c8902019-12-30 18:07:15 -080037 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070038DEFINE_double(
39 flush_period, 5.0,
40 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080041
Austin Schuha040c3f2021-02-13 16:09:07 -080042DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080043 max_network_delay, 1.0,
44 "Max time to assume a message takes to cross the network before we are "
45 "willing to drop it from our buffers and assume it didn't make it. "
46 "Increasing this number can increase memory usage depending on the packet "
47 "loss of your network or if the timestamps aren't logged for a message.");
48
49DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080050 max_out_of_order, -1,
51 "If set, this overrides the max out of order duration for a log file.");
52
Austin Schuh0e8db662021-07-06 10:43:47 -070053DEFINE_bool(workaround_double_headers, true,
54 "Some old log files have two headers at the beginning. Use the "
55 "last header as the actual header.");
56
Brian Smarttea913d42021-12-10 15:02:38 -080057DEFINE_bool(crash_on_corrupt_message, true,
58 "When true, MessageReader will crash the first time a message "
59 "with corrupted format is found. When false, the crash will be "
60 "suppressed, and any remaining readable messages will be "
61 "evaluated to present verified vs corrupted stats.");
62
63DEFINE_bool(ignore_corrupt_messages, false,
64 "When true, and crash_on_corrupt_message is false, then any "
65 "corrupt message found by MessageReader be silently ignored, "
66 "providing access to all uncorrupted messages in a logfile.");
67
Brian Silvermanf51499a2020-09-21 12:49:08 -070068namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070069namespace {
Austin Schuha36c8902019-12-30 18:07:15 -080070
Austin Schuh05b70472020-01-01 17:11:17 -080071namespace chrono = std::chrono;
72
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070073template <typename T>
74void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
75 if (t.has_value()) {
76 *os << *t;
77 } else {
78 *os << "null";
79 }
80}
81} // namespace
82
Alexei Strots01395492023-03-20 13:59:56 -070083DetachedBufferWriter::DetachedBufferWriter(
84 std::unique_ptr<FileHandler> file_handler,
85 std::unique_ptr<DataEncoder> encoder)
86 : file_handler_(std::move(file_handler)), encoder_(std::move(encoder)) {
87 CHECK(file_handler_);
88 ran_out_of_space_ = file_handler_->OpenForWrite() == WriteCode::kOutOfSpace;
89 if (ran_out_of_space_) {
90 LOG(WARNING) << "And we are out of space";
Austin Schuh4c3cdb72023-02-11 15:05:26 -080091 }
92}
93
Austin Schuha36c8902019-12-30 18:07:15 -080094DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -070095 Close();
96 if (ran_out_of_space_) {
97 CHECK(acknowledge_ran_out_of_space_)
98 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -070099 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700100}
101
Brian Silvermand90905f2020-09-23 14:42:56 -0700102DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700103 *this = std::move(other);
104}
105
Brian Silverman87ac0402020-09-17 14:47:01 -0700106// When other is destroyed "soon" (which it should be because we're getting an
107// rvalue reference to it), it will flush etc all the data we have queued up
108// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700109DetachedBufferWriter &DetachedBufferWriter::operator=(
110 DetachedBufferWriter &&other) {
Alexei Strots01395492023-03-20 13:59:56 -0700111 std::swap(file_handler_, other.file_handler_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700112 std::swap(encoder_, other.encoder_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700113 std::swap(ran_out_of_space_, other.ran_out_of_space_);
114 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800115 std::swap(last_flush_time_, other.last_flush_time_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700116 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800117}
118
Austin Schuh8bdfc492023-02-11 12:53:13 -0800119void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *copier,
Austin Schuh7ef11a42023-02-04 17:15:12 -0800120 aos::monotonic_clock::time_point now) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700121 if (ran_out_of_space_) {
122 // We don't want any later data to be written after space becomes
123 // available, so refuse to write anything more once we've dropped data
124 // because we ran out of space.
Austin Schuh48d10d62022-10-16 22:19:23 -0700125 return;
Austin Schuha36c8902019-12-30 18:07:15 -0800126 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700127
Austin Schuh8bdfc492023-02-11 12:53:13 -0800128 const size_t message_size = copier->size();
129 size_t overall_bytes_written = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -0700130
Austin Schuh8bdfc492023-02-11 12:53:13 -0800131 // Keep writing chunks until we've written it all. If we end up with a
132 // partial write, this means we need to flush to disk.
133 do {
Alexei Strots01395492023-03-20 13:59:56 -0700134 const size_t bytes_written =
135 encoder_->Encode(copier, overall_bytes_written);
Austin Schuh8bdfc492023-02-11 12:53:13 -0800136 CHECK(bytes_written != 0);
137
138 overall_bytes_written += bytes_written;
139 if (overall_bytes_written < message_size) {
140 VLOG(1) << "Flushing because of a partial write, tried to write "
141 << message_size << " wrote " << overall_bytes_written;
142 Flush(now);
143 }
144 } while (overall_bytes_written < message_size);
145
Austin Schuhbd06ae42021-03-31 22:48:21 -0700146 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800147}
148
Brian Silverman0465fcf2020-09-24 00:29:18 -0700149void DetachedBufferWriter::Close() {
Alexei Strots01395492023-03-20 13:59:56 -0700150 if (!file_handler_->is_open()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700151 return;
152 }
153 encoder_->Finish();
154 while (encoder_->queue_size() > 0) {
Austin Schuh8bdfc492023-02-11 12:53:13 -0800155 Flush(monotonic_clock::max_time);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700156 }
Alexei Strots01395492023-03-20 13:59:56 -0700157 ran_out_of_space_ = file_handler_->Close() == WriteCode::kOutOfSpace;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700158}
159
Austin Schuh8bdfc492023-02-11 12:53:13 -0800160void DetachedBufferWriter::Flush(aos::monotonic_clock::time_point now) {
161 last_flush_time_ = now;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700162 if (ran_out_of_space_) {
163 // We don't want any later data to be written after space becomes available,
164 // so refuse to write anything more once we've dropped data because we ran
165 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700166 if (encoder_) {
167 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
168 encoder_->Clear(encoder_->queue().size());
169 } else {
170 VLOG(1) << "No queue to ignore";
171 }
172 return;
173 }
174
175 const auto queue = encoder_->queue();
176 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700177 return;
178 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700179
Alexei Strots01395492023-03-20 13:59:56 -0700180 const WriteResult result = file_handler_->Write(queue);
181 encoder_->Clear(result.messages_written);
182 ran_out_of_space_ = result.code == WriteCode::kOutOfSpace;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700183}
184
Austin Schuhbd06ae42021-03-31 22:48:21 -0700185void DetachedBufferWriter::FlushAtThreshold(
186 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700187 if (ran_out_of_space_) {
188 // We don't want any later data to be written after space becomes available,
189 // so refuse to write anything more once we've dropped data because we ran
190 // out of space.
191 if (encoder_) {
192 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
193 encoder_->Clear(encoder_->queue().size());
194 } else {
195 VLOG(1) << "No queue to ignore";
196 }
197 return;
198 }
199
Austin Schuhbd06ae42021-03-31 22:48:21 -0700200 // We don't want to flush the first time through. Otherwise we will flush as
201 // the log file header might be compressing, defeating any parallelism and
202 // queueing there.
203 if (last_flush_time_ == aos::monotonic_clock::min_time) {
204 last_flush_time_ = now;
205 }
206
Brian Silvermanf51499a2020-09-21 12:49:08 -0700207 // Flush if we are at the max number of iovs per writev, because there's no
208 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700209 // data queued up or if it has been long enough.
Austin Schuh8bdfc492023-02-11 12:53:13 -0800210 while (encoder_->space() == 0 ||
211 encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700212 encoder_->queue_size() >= IOV_MAX ||
Austin Schuh3ebaf782023-04-07 16:03:28 -0700213 (now > last_flush_time_ +
214 chrono::duration_cast<chrono::nanoseconds>(
215 chrono::duration<double>(FLAGS_flush_period)) &&
216 encoder_->queued_bytes() != 0)) {
217 VLOG(1) << "Chose to flush at " << now << ", last " << last_flush_time_
218 << " queued bytes " << encoder_->queued_bytes();
Austin Schuh8bdfc492023-02-11 12:53:13 -0800219 Flush(now);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700220 }
Austin Schuha36c8902019-12-30 18:07:15 -0800221}
222
Austin Schuhf2d0e682022-10-16 14:20:58 -0700223// Do the magic dance to convert the endianness of the data and append it to the
224// buffer.
225namespace {
226
227// TODO(austin): Look at the generated code to see if building the header is
228// efficient or not.
229template <typename T>
230uint8_t *Push(uint8_t *buffer, const T data) {
231 const T endian_data = flatbuffers::EndianScalar<T>(data);
232 std::memcpy(buffer, &endian_data, sizeof(T));
233 return buffer + sizeof(T);
234}
235
236uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
237 std::memcpy(buffer, data, size);
238 return buffer + size;
239}
240
241uint8_t *Pad(uint8_t *buffer, size_t padding) {
242 std::memset(buffer, 0, padding);
243 return buffer + padding;
244}
245} // namespace
246
247flatbuffers::Offset<MessageHeader> PackRemoteMessage(
248 flatbuffers::FlatBufferBuilder *fbb,
249 const message_bridge::RemoteMessage *msg, int channel_index,
250 const aos::monotonic_clock::time_point monotonic_timestamp_time) {
251 logger::MessageHeader::Builder message_header_builder(*fbb);
252 // Note: this must match the same order as MessageBridgeServer and
253 // PackMessage. We want identical headers to have identical
254 // on-the-wire formats to make comparing them easier.
255
256 message_header_builder.add_channel_index(channel_index);
257
258 message_header_builder.add_queue_index(msg->queue_index());
259 message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
260 message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
261
262 message_header_builder.add_monotonic_remote_time(
263 msg->monotonic_remote_time());
264 message_header_builder.add_realtime_remote_time(msg->realtime_remote_time());
265 message_header_builder.add_remote_queue_index(msg->remote_queue_index());
266
267 message_header_builder.add_monotonic_timestamp_time(
268 monotonic_timestamp_time.time_since_epoch().count());
269
270 return message_header_builder.Finish();
271}
272
273size_t PackRemoteMessageInline(
274 uint8_t *buffer, const message_bridge::RemoteMessage *msg,
275 int channel_index,
Austin Schuh71a40d42023-02-04 21:22:22 -0800276 const aos::monotonic_clock::time_point monotonic_timestamp_time,
277 size_t start_byte, size_t end_byte) {
Austin Schuhf2d0e682022-10-16 14:20:58 -0700278 const flatbuffers::uoffset_t message_size = PackRemoteMessageSize();
Austin Schuh71a40d42023-02-04 21:22:22 -0800279 DCHECK_EQ((start_byte % 8u), 0u);
280 DCHECK_EQ((end_byte % 8u), 0u);
281 DCHECK_LE(start_byte, end_byte);
282 DCHECK_LE(end_byte, message_size);
Austin Schuhf2d0e682022-10-16 14:20:58 -0700283
Austin Schuh71a40d42023-02-04 21:22:22 -0800284 switch (start_byte) {
285 case 0x00u:
286 if ((end_byte) == 0x00u) {
287 break;
288 }
289 // clang-format off
290 // header:
291 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
292 buffer = Push<flatbuffers::uoffset_t>(
293 buffer, message_size - sizeof(flatbuffers::uoffset_t));
294 // +0x04 | 20 00 00 00 | UOffset32 | 0x00000020 (32) Loc: +0x24 | offset to root table `aos.logger.MessageHeader`
295 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x20);
296 [[fallthrough]];
297 case 0x08u:
298 if ((end_byte) == 0x08u) {
299 break;
300 }
301 //
302 // padding:
303 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
304 buffer = Pad(buffer, 6);
305 //
306 // vtable (aos.logger.MessageHeader):
307 // +0x0E | 16 00 | uint16_t | 0x0016 (22) | size of this vtable
308 buffer = Push<flatbuffers::voffset_t>(buffer, 0x16);
309 [[fallthrough]];
310 case 0x10u:
311 if ((end_byte) == 0x10u) {
312 break;
313 }
314 // +0x10 | 3C 00 | uint16_t | 0x003C (60) | size of referring table
315 buffer = Push<flatbuffers::voffset_t>(buffer, 0x3c);
316 // +0x12 | 38 00 | VOffset16 | 0x0038 (56) | offset to field `channel_index` (id: 0)
317 buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
318 // +0x14 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `monotonic_sent_time` (id: 1)
319 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
320 // +0x16 | 24 00 | VOffset16 | 0x0024 (36) | offset to field `realtime_sent_time` (id: 2)
321 buffer = Push<flatbuffers::voffset_t>(buffer, 0x24);
322 [[fallthrough]];
323 case 0x18u:
324 if ((end_byte) == 0x18u) {
325 break;
326 }
327 // +0x18 | 34 00 | VOffset16 | 0x0034 (52) | offset to field `queue_index` (id: 3)
328 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
329 // +0x1A | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
330 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
331 // +0x1C | 1C 00 | VOffset16 | 0x001C (28) | offset to field `monotonic_remote_time` (id: 5)
332 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
333 // +0x1E | 14 00 | VOffset16 | 0x0014 (20) | offset to field `realtime_remote_time` (id: 6)
334 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
335 [[fallthrough]];
336 case 0x20u:
337 if ((end_byte) == 0x20u) {
338 break;
339 }
340 // +0x20 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `remote_queue_index` (id: 7)
341 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
342 // +0x22 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `monotonic_timestamp_time` (id: 8)
343 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
344 //
345 // root_table (aos.logger.MessageHeader):
346 // +0x24 | 16 00 00 00 | SOffset32 | 0x00000016 (22) Loc: +0x0E | offset to vtable
347 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x16);
348 [[fallthrough]];
349 case 0x28u:
350 if ((end_byte) == 0x28u) {
351 break;
352 }
353 // +0x28 | F6 0B D8 11 A4 A8 B1 71 | int64_t | 0x71B1A8A411D80BF6 (8192514619791117302) | table field `monotonic_timestamp_time` (Long)
354 buffer = Push<int64_t>(buffer,
355 monotonic_timestamp_time.time_since_epoch().count());
356 [[fallthrough]];
357 case 0x30u:
358 if ((end_byte) == 0x30u) {
359 break;
360 }
361 // +0x30 | 00 00 00 00 | uint8_t[4] | .... | padding
362 // TODO(austin): Can we re-arrange the order to ditch the padding?
363 // (Answer is yes, but what is the impact elsewhere? It will change the
364 // binary format)
365 buffer = Pad(buffer, 4);
366 // +0x34 | 75 00 00 00 | uint32_t | 0x00000075 (117) | table field `remote_queue_index` (UInt)
367 buffer = Push<uint32_t>(buffer, msg->remote_queue_index());
368 [[fallthrough]];
369 case 0x38u:
370 if ((end_byte) == 0x38u) {
371 break;
372 }
373 // +0x38 | AA B0 43 0A 35 BE FA D2 | int64_t | 0xD2FABE350A43B0AA (-3244071446552268630) | table field `realtime_remote_time` (Long)
374 buffer = Push<int64_t>(buffer, msg->realtime_remote_time());
375 [[fallthrough]];
376 case 0x40u:
377 if ((end_byte) == 0x40u) {
378 break;
379 }
380 // +0x40 | D5 40 30 F3 C1 A7 26 1D | int64_t | 0x1D26A7C1F33040D5 (2100550727665467605) | table field `monotonic_remote_time` (Long)
381 buffer = Push<int64_t>(buffer, msg->monotonic_remote_time());
382 [[fallthrough]];
383 case 0x48u:
384 if ((end_byte) == 0x48u) {
385 break;
386 }
387 // +0x48 | 5B 25 32 A1 4A E8 46 CA | int64_t | 0xCA46E84AA132255B (-3871151422448720549) | table field `realtime_sent_time` (Long)
388 buffer = Push<int64_t>(buffer, msg->realtime_sent_time());
389 [[fallthrough]];
390 case 0x50u:
391 if ((end_byte) == 0x50u) {
392 break;
393 }
394 // +0x50 | 49 7D 45 1F 8C 36 6B A3 | int64_t | 0xA36B368C1F457D49 (-6671178447571288759) | table field `monotonic_sent_time` (Long)
395 buffer = Push<int64_t>(buffer, msg->monotonic_sent_time());
396 [[fallthrough]];
397 case 0x58u:
398 if ((end_byte) == 0x58u) {
399 break;
400 }
401 // +0x58 | 33 00 00 00 | uint32_t | 0x00000033 (51) | table field `queue_index` (UInt)
402 buffer = Push<uint32_t>(buffer, msg->queue_index());
403 // +0x5C | 76 00 00 00 | uint32_t | 0x00000076 (118) | table field `channel_index` (UInt)
404 buffer = Push<uint32_t>(buffer, channel_index);
405 // clang-format on
406 [[fallthrough]];
407 case 0x60u:
408 if ((end_byte) == 0x60u) {
409 break;
410 }
411 }
Austin Schuhf2d0e682022-10-16 14:20:58 -0700412
Austin Schuh71a40d42023-02-04 21:22:22 -0800413 return end_byte - start_byte;
Austin Schuhf2d0e682022-10-16 14:20:58 -0700414}
415
Austin Schuha36c8902019-12-30 18:07:15 -0800416flatbuffers::Offset<MessageHeader> PackMessage(
417 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
418 int channel_index, LogType log_type) {
419 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
420
421 switch (log_type) {
422 case LogType::kLogMessage:
423 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800424 case LogType::kLogRemoteMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700425 // Since the timestamps are 8 byte aligned, we are going to end up adding
426 // padding in the middle of the message to pad everything out to 8 byte
427 // alignment. That's rather wasteful. To make things efficient to mmap
428 // while reading uncompressed logs, we'd actually rather the message be
429 // aligned. So, force 8 byte alignment (enough to preserve alignment
430 // inside the nested message so that we can read it without moving it)
431 // here.
432 fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8);
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700433 data_offset = fbb->CreateVector(
434 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800435 break;
436
437 case LogType::kLogDeliveryTimeOnly:
438 break;
439 }
440
441 MessageHeader::Builder message_header_builder(*fbb);
442 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800443
Austin Schuhfa30c352022-10-16 11:12:02 -0700444 // These are split out into very explicit serialization calls because the
445 // order here changes the order things are written out on the wire, and we
446 // want to control and understand it here. Changing the order can increase
447 // the amount of padding bytes in the middle.
448 //
James Kuszmaul9776b392023-01-14 14:08:08 -0800449 // It is also easier to follow... And doesn't actually make things much
450 // bigger.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800451 switch (log_type) {
452 case LogType::kLogRemoteMessage:
453 message_header_builder.add_queue_index(context.remote_queue_index);
Austin Schuhfa30c352022-10-16 11:12:02 -0700454 message_header_builder.add_data(data_offset);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800455 message_header_builder.add_monotonic_sent_time(
456 context.monotonic_remote_time.time_since_epoch().count());
457 message_header_builder.add_realtime_sent_time(
458 context.realtime_remote_time.time_since_epoch().count());
459 break;
460
Austin Schuh6f3babe2020-01-26 20:34:50 -0800461 case LogType::kLogDeliveryTimeOnly:
462 message_header_builder.add_queue_index(context.queue_index);
463 message_header_builder.add_monotonic_sent_time(
464 context.monotonic_event_time.time_since_epoch().count());
465 message_header_builder.add_realtime_sent_time(
466 context.realtime_event_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800467 message_header_builder.add_monotonic_remote_time(
468 context.monotonic_remote_time.time_since_epoch().count());
469 message_header_builder.add_realtime_remote_time(
470 context.realtime_remote_time.time_since_epoch().count());
471 message_header_builder.add_remote_queue_index(context.remote_queue_index);
472 break;
Austin Schuhfa30c352022-10-16 11:12:02 -0700473
474 case LogType::kLogMessage:
475 message_header_builder.add_queue_index(context.queue_index);
476 message_header_builder.add_data(data_offset);
477 message_header_builder.add_monotonic_sent_time(
478 context.monotonic_event_time.time_since_epoch().count());
479 message_header_builder.add_realtime_sent_time(
480 context.realtime_event_time.time_since_epoch().count());
481 break;
482
483 case LogType::kLogMessageAndDeliveryTime:
484 message_header_builder.add_queue_index(context.queue_index);
485 message_header_builder.add_remote_queue_index(context.remote_queue_index);
486 message_header_builder.add_monotonic_sent_time(
487 context.monotonic_event_time.time_since_epoch().count());
488 message_header_builder.add_realtime_sent_time(
489 context.realtime_event_time.time_since_epoch().count());
490 message_header_builder.add_monotonic_remote_time(
491 context.monotonic_remote_time.time_since_epoch().count());
492 message_header_builder.add_realtime_remote_time(
493 context.realtime_remote_time.time_since_epoch().count());
494 message_header_builder.add_data(data_offset);
495 break;
Austin Schuha36c8902019-12-30 18:07:15 -0800496 }
497
498 return message_header_builder.Finish();
499}
500
Austin Schuhfa30c352022-10-16 11:12:02 -0700501flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) {
502 switch (log_type) {
503 case LogType::kLogMessage:
504 return
505 // Root table size + offset.
506 sizeof(flatbuffers::uoffset_t) * 2 +
507 // 6 padding bytes to pad the header out properly.
508 6 +
509 // vtable header (size + size of table)
510 sizeof(flatbuffers::voffset_t) * 2 +
511 // offsets to all the fields.
512 sizeof(flatbuffers::voffset_t) * 5 +
513 // pointer to vtable
514 sizeof(flatbuffers::soffset_t) +
515 // pointer to data
516 sizeof(flatbuffers::uoffset_t) +
517 // realtime_sent_time, monotonic_sent_time
518 sizeof(int64_t) * 2 +
519 // queue_index, channel_index
520 sizeof(uint32_t) * 2;
521
522 case LogType::kLogDeliveryTimeOnly:
523 return
524 // Root table size + offset.
525 sizeof(flatbuffers::uoffset_t) * 2 +
526 // 6 padding bytes to pad the header out properly.
527 4 +
528 // vtable header (size + size of table)
529 sizeof(flatbuffers::voffset_t) * 2 +
530 // offsets to all the fields.
531 sizeof(flatbuffers::voffset_t) * 8 +
532 // pointer to vtable
533 sizeof(flatbuffers::soffset_t) +
534 // remote_queue_index
535 sizeof(uint32_t) +
536 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
537 // monotonic_sent_time
538 sizeof(int64_t) * 4 +
539 // queue_index, channel_index
540 sizeof(uint32_t) * 2;
541
542 case LogType::kLogMessageAndDeliveryTime:
543 return
544 // Root table size + offset.
545 sizeof(flatbuffers::uoffset_t) * 2 +
546 // 4 padding bytes to pad the header out properly.
547 4 +
548 // vtable header (size + size of table)
549 sizeof(flatbuffers::voffset_t) * 2 +
550 // offsets to all the fields.
551 sizeof(flatbuffers::voffset_t) * 8 +
552 // pointer to vtable
553 sizeof(flatbuffers::soffset_t) +
554 // pointer to data
555 sizeof(flatbuffers::uoffset_t) +
556 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
557 // monotonic_sent_time
558 sizeof(int64_t) * 4 +
559 // remote_queue_index, queue_index, channel_index
560 sizeof(uint32_t) * 3;
561
562 case LogType::kLogRemoteMessage:
563 return
564 // Root table size + offset.
565 sizeof(flatbuffers::uoffset_t) * 2 +
566 // 6 padding bytes to pad the header out properly.
567 6 +
568 // vtable header (size + size of table)
569 sizeof(flatbuffers::voffset_t) * 2 +
570 // offsets to all the fields.
571 sizeof(flatbuffers::voffset_t) * 5 +
572 // pointer to vtable
573 sizeof(flatbuffers::soffset_t) +
574 // realtime_sent_time, monotonic_sent_time
575 sizeof(int64_t) * 2 +
576 // pointer to data
577 sizeof(flatbuffers::uoffset_t) +
578 // queue_index, channel_index
579 sizeof(uint32_t) * 2;
580 }
581 LOG(FATAL);
582}
583
James Kuszmaul9776b392023-01-14 14:08:08 -0800584flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size) {
Austin Schuhfa30c352022-10-16 11:12:02 -0700585 static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
586 "Update size logic please.");
587 const flatbuffers::uoffset_t aligned_data_length =
Austin Schuh48d10d62022-10-16 22:19:23 -0700588 ((data_size + 7) & 0xfffffff8u);
Austin Schuhfa30c352022-10-16 11:12:02 -0700589 switch (log_type) {
590 case LogType::kLogDeliveryTimeOnly:
591 return PackMessageHeaderSize(log_type);
592
593 case LogType::kLogMessage:
594 case LogType::kLogMessageAndDeliveryTime:
595 case LogType::kLogRemoteMessage:
596 return PackMessageHeaderSize(log_type) +
597 // Vector...
598 sizeof(flatbuffers::uoffset_t) + aligned_data_length;
599 }
600 LOG(FATAL);
601}
602
Austin Schuhfa30c352022-10-16 11:12:02 -0700603size_t PackMessageInline(uint8_t *buffer, const Context &context,
Austin Schuh71a40d42023-02-04 21:22:22 -0800604 int channel_index, LogType log_type, size_t start_byte,
605 size_t end_byte) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700606 // TODO(austin): Figure out how to copy directly from shared memory instead of
607 // first into the fetcher's memory and then into here. That would save a lot
608 // of memory.
Austin Schuhfa30c352022-10-16 11:12:02 -0700609 const flatbuffers::uoffset_t message_size =
Austin Schuh48d10d62022-10-16 22:19:23 -0700610 PackMessageSize(log_type, context.size);
Austin Schuh71a40d42023-02-04 21:22:22 -0800611 DCHECK_EQ((message_size % 8), 0u) << ": Non 8 byte length...";
612 DCHECK_EQ((start_byte % 8u), 0u);
613 DCHECK_EQ((end_byte % 8u), 0u);
614 DCHECK_LE(start_byte, end_byte);
615 DCHECK_LE(end_byte, message_size);
Austin Schuhfa30c352022-10-16 11:12:02 -0700616
617 // Pack all the data in. This is brittle but easy to change. Use the
618 // InlinePackMessage.Equivilent unit test to verify everything matches.
619 switch (log_type) {
620 case LogType::kLogMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -0800621 switch (start_byte) {
622 case 0x00u:
623 if ((end_byte) == 0x00u) {
624 break;
625 }
626 // clang-format off
627 // header:
628 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
629 buffer = Push<flatbuffers::uoffset_t>(
630 buffer, message_size - sizeof(flatbuffers::uoffset_t));
631
632 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
633 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
634 [[fallthrough]];
635 case 0x08u:
636 if ((end_byte) == 0x08u) {
637 break;
638 }
639 //
640 // padding:
641 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
642 buffer = Pad(buffer, 6);
643 //
644 // vtable (aos.logger.MessageHeader):
645 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
646 buffer = Push<flatbuffers::voffset_t>(buffer, 0xe);
647 [[fallthrough]];
648 case 0x10u:
649 if ((end_byte) == 0x10u) {
650 break;
651 }
652 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
653 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
654 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
655 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
656 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
657 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
658 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
659 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
660 [[fallthrough]];
661 case 0x18u:
662 if ((end_byte) == 0x18u) {
663 break;
664 }
665 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
666 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
667 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
668 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
669 //
670 // root_table (aos.logger.MessageHeader):
671 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
672 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e);
673 [[fallthrough]];
674 case 0x20u:
675 if ((end_byte) == 0x20u) {
676 break;
677 }
678 // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long)
679 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
680 [[fallthrough]];
681 case 0x28u:
682 if ((end_byte) == 0x28u) {
683 break;
684 }
685 // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long)
686 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
687 [[fallthrough]];
688 case 0x30u:
689 if ((end_byte) == 0x30u) {
690 break;
691 }
692 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
693 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c);
694 // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt)
695 buffer = Push<uint32_t>(buffer, context.queue_index);
696 [[fallthrough]];
697 case 0x38u:
698 if ((end_byte) == 0x38u) {
699 break;
700 }
701 // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt)
702 buffer = Push<uint32_t>(buffer, channel_index);
703 //
704 // vector (aos.logger.MessageHeader.data):
705 // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items)
706 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
707 [[fallthrough]];
708 case 0x40u:
709 if ((end_byte) == 0x40u) {
710 break;
711 }
712 [[fallthrough]];
713 default:
714 // +0x40 | FF | uint8_t | 0xFF (255) | value[0]
715 // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1]
716 // +0x42 | EE | uint8_t | 0xEE (238) | value[2]
717 // +0x43 | 00 | uint8_t | 0x00 (0) | value[3]
718 // +0x44 | 20 | uint8_t | 0x20 (32) | value[4]
719 // +0x45 | 4D | uint8_t | 0x4D (77) | value[5]
720 // +0x46 | FF | uint8_t | 0xFF (255) | value[6]
721 // +0x47 | 25 | uint8_t | 0x25 (37) | value[7]
722 // +0x48 | 3C | uint8_t | 0x3C (60) | value[8]
723 // +0x49 | 17 | uint8_t | 0x17 (23) | value[9]
724 // +0x4A | 65 | uint8_t | 0x65 (101) | value[10]
725 // +0x4B | 2F | uint8_t | 0x2F (47) | value[11]
726 // +0x4C | 63 | uint8_t | 0x63 (99) | value[12]
727 // +0x4D | 58 | uint8_t | 0x58 (88) | value[13]
728 //
729 // padding:
730 // +0x4E | 00 00 | uint8_t[2] | .. | padding
731 // clang-format on
732 if (start_byte <= 0x40 && end_byte == message_size) {
733 // The easy one, slap it all down.
734 buffer = PushBytes(buffer, context.data, context.size);
735 buffer =
736 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
737 } else {
738 const size_t data_start_byte =
739 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
740 const size_t data_end_byte = end_byte - 0x40;
741 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
742 if (data_start_byte < padded_size) {
743 buffer = PushBytes(
744 buffer,
745 reinterpret_cast<const uint8_t *>(context.data) +
746 data_start_byte,
747 std::min(context.size, data_end_byte) - data_start_byte);
748 if (data_end_byte == padded_size) {
749 // We can only pad the last 7 bytes, so this only gets written
750 // if we write the last byte.
751 buffer = Pad(buffer,
752 ((context.size + 7) & 0xfffffff8u) - context.size);
753 }
754 }
755 }
756 break;
757 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700758 break;
759
760 case LogType::kLogDeliveryTimeOnly:
Austin Schuh71a40d42023-02-04 21:22:22 -0800761 switch (start_byte) {
762 case 0x00u:
763 if ((end_byte) == 0x00u) {
764 break;
765 }
766 // clang-format off
767 // header:
768 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
769 buffer = Push<flatbuffers::uoffset_t>(
770 buffer, message_size - sizeof(flatbuffers::uoffset_t));
771 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
772 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
Austin Schuhfa30c352022-10-16 11:12:02 -0700773
Austin Schuh71a40d42023-02-04 21:22:22 -0800774 [[fallthrough]];
775 case 0x08u:
776 if ((end_byte) == 0x08u) {
777 break;
778 }
779 //
780 // padding:
781 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
782 buffer = Pad(buffer, 4);
783 //
784 // vtable (aos.logger.MessageHeader):
785 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
786 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
787 // +0x0E | 30 00 | uint16_t | 0x0030 (48) | size of referring table
788 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
789 [[fallthrough]];
790 case 0x10u:
791 if ((end_byte) == 0x10u) {
792 break;
793 }
794 // +0x10 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `channel_index` (id: 0)
795 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
796 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
797 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
798 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
799 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
800 // +0x16 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `queue_index` (id: 3)
801 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
802 [[fallthrough]];
803 case 0x18u:
804 if ((end_byte) == 0x18u) {
805 break;
806 }
807 // +0x18 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
808 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
809 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
810 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
811 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
812 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
813 // +0x1E | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
814 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
815 [[fallthrough]];
816 case 0x20u:
817 if ((end_byte) == 0x20u) {
818 break;
819 }
820 //
821 // root_table (aos.logger.MessageHeader):
822 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
823 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
824 // +0x24 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `remote_queue_index` (UInt)
825 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
826 [[fallthrough]];
827 case 0x28u:
828 if ((end_byte) == 0x28u) {
829 break;
830 }
831 // +0x28 | C6 85 F1 AB 83 B5 CD EB | int64_t | 0xEBCDB583ABF185C6 (-1455307527440726586) | table field `realtime_remote_time` (Long)
832 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
833 [[fallthrough]];
834 case 0x30u:
835 if ((end_byte) == 0x30u) {
836 break;
837 }
838 // +0x30 | 47 24 D3 97 1E 42 2D 99 | int64_t | 0x992D421E97D32447 (-7409193112790948793) | table field `monotonic_remote_time` (Long)
839 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
840 [[fallthrough]];
841 case 0x38u:
842 if ((end_byte) == 0x38u) {
843 break;
844 }
845 // +0x38 | C8 B9 A7 AB 79 F2 CD 60 | int64_t | 0x60CDF279ABA7B9C8 (6975498002251626952) | table field `realtime_sent_time` (Long)
846 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
847 [[fallthrough]];
848 case 0x40u:
849 if ((end_byte) == 0x40u) {
850 break;
851 }
852 // +0x40 | EA 8F 2A 0F AF 01 7A AB | int64_t | 0xAB7A01AF0F2A8FEA (-6090553694679822358) | table field `monotonic_sent_time` (Long)
853 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
854 [[fallthrough]];
855 case 0x48u:
856 if ((end_byte) == 0x48u) {
857 break;
858 }
859 // +0x48 | F5 00 00 00 | uint32_t | 0x000000F5 (245) | table field `queue_index` (UInt)
860 buffer = Push<uint32_t>(buffer, context.queue_index);
861 // +0x4C | 88 00 00 00 | uint32_t | 0x00000088 (136) | table field `channel_index` (UInt)
862 buffer = Push<uint32_t>(buffer, channel_index);
863
864 // clang-format on
865 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700866 break;
867
868 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh71a40d42023-02-04 21:22:22 -0800869 switch (start_byte) {
870 case 0x00u:
871 if ((end_byte) == 0x00u) {
872 break;
873 }
874 // clang-format off
875 // header:
876 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
877 buffer = Push<flatbuffers::uoffset_t>(
878 buffer, message_size - sizeof(flatbuffers::uoffset_t));
879 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
880 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
881 [[fallthrough]];
882 case 0x08u:
883 if ((end_byte) == 0x08u) {
884 break;
885 }
886 //
887 // padding:
888 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
889 buffer = Pad(buffer, 4);
890 //
891 // vtable (aos.logger.MessageHeader):
892 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
893 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
894 // +0x0E | 34 00 | uint16_t | 0x0034 (52) | size of referring table
895 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
896 [[fallthrough]];
897 case 0x10u:
898 if ((end_byte) == 0x10u) {
899 break;
900 }
901 // +0x10 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `channel_index` (id: 0)
902 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
903 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
904 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
905 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
906 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
907 // +0x16 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `queue_index` (id: 3)
908 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
909 [[fallthrough]];
910 case 0x18u:
911 if ((end_byte) == 0x18u) {
912 break;
913 }
914 // +0x18 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `data` (id: 4)
915 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
916 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
917 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
918 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
919 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
920 // +0x1E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `remote_queue_index` (id: 7)
921 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
922 [[fallthrough]];
923 case 0x20u:
924 if ((end_byte) == 0x20u) {
925 break;
926 }
927 //
928 // root_table (aos.logger.MessageHeader):
929 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
930 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
931 // +0x24 | 30 00 00 00 | UOffset32 | 0x00000030 (48) Loc: +0x54 | offset to field `data` (vector)
932 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x30);
933 [[fallthrough]];
934 case 0x28u:
935 if ((end_byte) == 0x28u) {
936 break;
937 }
938 // +0x28 | C4 C8 87 BF 40 6C 1F 29 | int64_t | 0x291F6C40BF87C8C4 (2963206105180129476) | table field `realtime_remote_time` (Long)
939 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
940 [[fallthrough]];
941 case 0x30u:
942 if ((end_byte) == 0x30u) {
943 break;
944 }
945 // +0x30 | 0F 00 26 FD D2 6D C0 1F | int64_t | 0x1FC06DD2FD26000F (2287949363661897743) | table field `monotonic_remote_time` (Long)
946 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
947 [[fallthrough]];
948 case 0x38u:
949 if ((end_byte) == 0x38u) {
950 break;
951 }
952 // +0x38 | 29 75 09 C0 73 73 BF 88 | int64_t | 0x88BF7373C0097529 (-8593022623019338455) | table field `realtime_sent_time` (Long)
953 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
954 [[fallthrough]];
955 case 0x40u:
956 if ((end_byte) == 0x40u) {
957 break;
958 }
959 // +0x40 | 6D 8A AE 04 50 25 9C E9 | int64_t | 0xE99C255004AE8A6D (-1613373540899321235) | table field `monotonic_sent_time` (Long)
960 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
961 [[fallthrough]];
962 case 0x48u:
963 if ((end_byte) == 0x48u) {
964 break;
965 }
966 // +0x48 | 47 00 00 00 | uint32_t | 0x00000047 (71) | table field `remote_queue_index` (UInt)
967 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
968 // +0x4C | 4C 00 00 00 | uint32_t | 0x0000004C (76) | table field `queue_index` (UInt)
969 buffer = Push<uint32_t>(buffer, context.queue_index);
970 [[fallthrough]];
971 case 0x50u:
972 if ((end_byte) == 0x50u) {
973 break;
974 }
975 // +0x50 | 72 00 00 00 | uint32_t | 0x00000072 (114) | table field `channel_index` (UInt)
976 buffer = Push<uint32_t>(buffer, channel_index);
977 //
978 // vector (aos.logger.MessageHeader.data):
979 // +0x54 | 07 00 00 00 | uint32_t | 0x00000007 (7) | length of vector (# items)
980 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
981 [[fallthrough]];
982 case 0x58u:
983 if ((end_byte) == 0x58u) {
984 break;
985 }
986 [[fallthrough]];
987 default:
988 // +0x58 | B1 | uint8_t | 0xB1 (177) | value[0]
989 // +0x59 | 4A | uint8_t | 0x4A (74) | value[1]
990 // +0x5A | 50 | uint8_t | 0x50 (80) | value[2]
991 // +0x5B | 24 | uint8_t | 0x24 (36) | value[3]
992 // +0x5C | AF | uint8_t | 0xAF (175) | value[4]
993 // +0x5D | C8 | uint8_t | 0xC8 (200) | value[5]
994 // +0x5E | D5 | uint8_t | 0xD5 (213) | value[6]
995 //
996 // padding:
997 // +0x5F | 00 | uint8_t[1] | . | padding
998 // clang-format on
999
1000 if (start_byte <= 0x58 && end_byte == message_size) {
1001 // The easy one, slap it all down.
1002 buffer = PushBytes(buffer, context.data, context.size);
1003 buffer =
1004 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1005 } else {
1006 const size_t data_start_byte =
1007 start_byte < 0x58 ? 0x0u : (start_byte - 0x58);
1008 const size_t data_end_byte = end_byte - 0x58;
1009 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1010 if (data_start_byte < padded_size) {
1011 buffer = PushBytes(
1012 buffer,
1013 reinterpret_cast<const uint8_t *>(context.data) +
1014 data_start_byte,
1015 std::min(context.size, data_end_byte) - data_start_byte);
1016 if (data_end_byte == padded_size) {
1017 // We can only pad the last 7 bytes, so this only gets written
1018 // if we write the last byte.
1019 buffer = Pad(buffer,
1020 ((context.size + 7) & 0xfffffff8u) - context.size);
1021 }
1022 }
1023 }
1024
1025 break;
1026 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001027
1028 break;
1029
1030 case LogType::kLogRemoteMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -08001031 switch (start_byte) {
1032 case 0x00u:
1033 if ((end_byte) == 0x00u) {
1034 break;
1035 }
1036 // This is the message we need to recreate.
1037 //
1038 // clang-format off
1039 // header:
1040 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
1041 buffer = Push<flatbuffers::uoffset_t>(
1042 buffer, message_size - sizeof(flatbuffers::uoffset_t));
1043 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
1044 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
1045 [[fallthrough]];
1046 case 0x08u:
1047 if ((end_byte) == 0x08u) {
1048 break;
1049 }
1050 //
1051 // padding:
1052 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1053 buffer = Pad(buffer, 6);
1054 //
1055 // vtable (aos.logger.MessageHeader):
1056 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
1057 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e);
1058 [[fallthrough]];
1059 case 0x10u:
1060 if ((end_byte) == 0x10u) {
1061 break;
1062 }
1063 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
1064 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
1065 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
1066 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
1067 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
1068 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
1069 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
1070 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
1071 [[fallthrough]];
1072 case 0x18u:
1073 if ((end_byte) == 0x18u) {
1074 break;
1075 }
1076 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
1077 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
1078 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
1079 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
1080 //
1081 // root_table (aos.logger.MessageHeader):
1082 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
1083 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E);
1084 [[fallthrough]];
1085 case 0x20u:
1086 if ((end_byte) == 0x20u) {
1087 break;
1088 }
1089 // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long)
1090 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
1091 [[fallthrough]];
1092 case 0x28u:
1093 if ((end_byte) == 0x28u) {
1094 break;
1095 }
1096 // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long)
1097 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
1098 [[fallthrough]];
1099 case 0x30u:
1100 if ((end_byte) == 0x30u) {
1101 break;
1102 }
1103 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
1104 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C);
1105 // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt)
1106 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1107 [[fallthrough]];
1108 case 0x38u:
1109 if ((end_byte) == 0x38u) {
1110 break;
1111 }
1112 // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt)
1113 buffer = Push<uint32_t>(buffer, channel_index);
1114 //
1115 // vector (aos.logger.MessageHeader.data):
1116 // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items)
1117 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
1118 [[fallthrough]];
1119 case 0x40u:
1120 if ((end_byte) == 0x40u) {
1121 break;
1122 }
1123 [[fallthrough]];
1124 default:
1125 // +0x40 | 38 | uint8_t | 0x38 (56) | value[0]
1126 // +0x41 | 1A | uint8_t | 0x1A (26) | value[1]
1127 // ...
1128 // +0x58 | 90 | uint8_t | 0x90 (144) | value[24]
1129 // +0x59 | 92 | uint8_t | 0x92 (146) | value[25]
1130 //
1131 // padding:
1132 // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1133 // clang-format on
1134 if (start_byte <= 0x40 && end_byte == message_size) {
1135 // The easy one, slap it all down.
1136 buffer = PushBytes(buffer, context.data, context.size);
1137 buffer =
1138 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1139 } else {
1140 const size_t data_start_byte =
1141 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
1142 const size_t data_end_byte = end_byte - 0x40;
1143 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1144 if (data_start_byte < padded_size) {
1145 buffer = PushBytes(
1146 buffer,
1147 reinterpret_cast<const uint8_t *>(context.data) +
1148 data_start_byte,
1149 std::min(context.size, data_end_byte) - data_start_byte);
1150 if (data_end_byte == padded_size) {
1151 // We can only pad the last 7 bytes, so this only gets written
1152 // if we write the last byte.
1153 buffer = Pad(buffer,
1154 ((context.size + 7) & 0xfffffff8u) - context.size);
1155 }
1156 }
1157 }
1158 break;
1159 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001160 }
1161
Austin Schuh71a40d42023-02-04 21:22:22 -08001162 return end_byte - start_byte;
Austin Schuhfa30c352022-10-16 11:12:02 -07001163}
1164
Austin Schuhcd368422021-11-22 21:23:29 -08001165SpanReader::SpanReader(std::string_view filename, bool quiet)
1166 : filename_(filename) {
Austin Schuh86110712022-09-16 15:40:54 -07001167 static constexpr std::string_view kS3 = "s3:";
1168 if (filename.substr(0, kS3.size()) == kS3) {
1169#if ENABLE_S3
1170 decoder_ = std::make_unique<S3Fetcher>(filename);
1171#else
1172 LOG(FATAL) << "Reading files from S3 not supported on this platform";
1173#endif
1174 } else {
1175 decoder_ = std::make_unique<DummyDecoder>(filename);
1176 }
Tyler Chatow2015bc62021-08-04 21:15:09 -07001177
1178 static constexpr std::string_view kXz = ".xz";
James Kuszmauldd0a5042021-10-28 23:38:04 -07001179 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001180 if (filename.substr(filename.size() - kXz.size()) == kXz) {
1181#if ENABLE_LZMA
Austin Schuhcd368422021-11-22 21:23:29 -08001182 decoder_ =
1183 std::make_unique<ThreadedLzmaDecoder>(std::move(decoder_), quiet);
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001184#else
Austin Schuhcd368422021-11-22 21:23:29 -08001185 (void)quiet;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001186 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
1187#endif
James Kuszmauldd0a5042021-10-28 23:38:04 -07001188 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
1189 decoder_ = std::make_unique<SnappyDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001190 }
Austin Schuh05b70472020-01-01 17:11:17 -08001191}
1192
Austin Schuhcf5f6442021-07-06 10:43:28 -07001193absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001194 // Make sure we have enough for the size.
1195 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
1196 if (!ReadBlock()) {
1197 return absl::Span<const uint8_t>();
1198 }
1199 }
1200
1201 // Now make sure we have enough for the message.
1202 const size_t data_size =
1203 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1204 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -08001205 if (data_size == sizeof(flatbuffers::uoffset_t)) {
1206 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
1207 LOG(ERROR) << " Rest of log file is "
1208 << absl::BytesToHexString(std::string_view(
1209 reinterpret_cast<const char *>(data_.data() +
1210 consumed_data_),
1211 data_.size() - consumed_data_));
1212 return absl::Span<const uint8_t>();
1213 }
Austin Schuh05b70472020-01-01 17:11:17 -08001214 while (data_.size() < consumed_data_ + data_size) {
1215 if (!ReadBlock()) {
1216 return absl::Span<const uint8_t>();
1217 }
1218 }
1219
1220 // And return it, consuming the data.
1221 const uint8_t *data_ptr = data_.data() + consumed_data_;
1222
Austin Schuh05b70472020-01-01 17:11:17 -08001223 return absl::Span<const uint8_t>(data_ptr, data_size);
1224}
1225
Austin Schuhcf5f6442021-07-06 10:43:28 -07001226void SpanReader::ConsumeMessage() {
Brian Smarttea913d42021-12-10 15:02:38 -08001227 size_t consumed_size =
Austin Schuhcf5f6442021-07-06 10:43:28 -07001228 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1229 sizeof(flatbuffers::uoffset_t);
Brian Smarttea913d42021-12-10 15:02:38 -08001230 consumed_data_ += consumed_size;
1231 total_consumed_ += consumed_size;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001232}
1233
1234absl::Span<const uint8_t> SpanReader::ReadMessage() {
1235 absl::Span<const uint8_t> result = PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001236 if (!result.empty()) {
Austin Schuhcf5f6442021-07-06 10:43:28 -07001237 ConsumeMessage();
Brian Smarttea913d42021-12-10 15:02:38 -08001238 } else {
1239 is_finished_ = true;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001240 }
1241 return result;
1242}
1243
Austin Schuh05b70472020-01-01 17:11:17 -08001244bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001245 // This is the amount of data we grab at a time. Doing larger chunks minimizes
1246 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -08001247 constexpr size_t kReadSize = 256 * 1024;
1248
1249 // Strip off any unused data at the front.
1250 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001251 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -08001252 consumed_data_ = 0;
1253 }
1254
1255 const size_t starting_size = data_.size();
1256
1257 // This should automatically grow the backing store. It won't shrink if we
1258 // get a small chunk later. This reduces allocations when we want to append
1259 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -07001260 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -08001261
Brian Silvermanf51499a2020-09-21 12:49:08 -07001262 const size_t count =
1263 decoder_->Read(data_.begin() + starting_size, data_.end());
1264 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -08001265 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -08001266 return false;
1267 }
Austin Schuh05b70472020-01-01 17:11:17 -08001268
Brian Smarttea913d42021-12-10 15:02:38 -08001269 total_read_ += count;
1270
Austin Schuh05b70472020-01-01 17:11:17 -08001271 return true;
1272}
1273
Austin Schuhadd6eb32020-11-09 21:24:26 -08001274std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -07001275 SpanReader *span_reader) {
1276 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001277
1278 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001279 if (config_data.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001280 return std::nullopt;
1281 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001282
Austin Schuh5212cad2020-09-09 23:12:09 -07001283 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001284 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -08001285 if (!result.Verify()) {
1286 return std::nullopt;
1287 }
Austin Schuh0e8db662021-07-06 10:43:47 -07001288
Austin Schuhcc2c9a52022-12-12 15:55:13 -08001289 // We only know of busted headers in the versions of the log file header
1290 // *before* the logger_sha1 field was added. At some point before that point,
1291 // the logic to track when a header has been written was rewritten in such a
1292 // way that it can't happen anymore. We've seen some logs where the body
1293 // parses as a header recently, so the simple solution of always looking is
1294 // failing us.
1295 if (FLAGS_workaround_double_headers && !result.message().has_logger_sha1()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001296 while (true) {
1297 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001298 if (maybe_header_data.empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001299 break;
1300 }
1301
1302 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
1303 maybe_header_data);
1304 if (maybe_header.Verify()) {
1305 LOG(WARNING) << "Found duplicate LogFileHeader in "
1306 << span_reader->filename();
1307 ResizeableBuffer header_data_copy;
1308 header_data_copy.resize(maybe_header_data.size());
1309 memcpy(header_data_copy.data(), maybe_header_data.begin(),
1310 header_data_copy.size());
1311 result = SizePrefixedFlatbufferVector<LogFileHeader>(
1312 std::move(header_data_copy));
1313
1314 span_reader->ConsumeMessage();
1315 } else {
1316 break;
1317 }
1318 }
1319 }
Austin Schuhe09beb12020-12-11 20:04:27 -08001320 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001321}
1322
Austin Schuh0e8db662021-07-06 10:43:47 -07001323std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
1324 std::string_view filename) {
1325 SpanReader span_reader(filename);
1326 return ReadHeader(&span_reader);
1327}
1328
Austin Schuhadd6eb32020-11-09 21:24:26 -08001329std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -08001330 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -07001331 SpanReader span_reader(filename);
1332 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
1333 for (size_t i = 0; i < n + 1; ++i) {
1334 data_span = span_reader.ReadMessage();
1335
1336 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001337 if (data_span.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001338 return std::nullopt;
1339 }
Austin Schuh5212cad2020-09-09 23:12:09 -07001340 }
1341
Brian Silverman354697a2020-09-22 21:06:32 -07001342 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001343 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -08001344 if (!result.Verify()) {
1345 return std::nullopt;
1346 }
1347 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -07001348}
1349
Austin Schuh05b70472020-01-01 17:11:17 -08001350MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -07001351 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -08001352 raw_log_file_header_(
1353 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001354 set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
1355 set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
1356
Austin Schuh0e8db662021-07-06 10:43:47 -07001357 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
1358 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -08001359
1360 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -07001361 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -08001362
Austin Schuh0e8db662021-07-06 10:43:47 -07001363 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -08001364
Austin Schuh5b728b72021-06-16 14:57:15 -07001365 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
1366
Brian Smarttea913d42021-12-10 15:02:38 -08001367 total_verified_before_ = span_reader_.TotalConsumed();
1368
Austin Schuhcde938c2020-02-02 17:30:07 -08001369 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -08001370 FLAGS_max_out_of_order > 0
1371 ? chrono::duration_cast<chrono::nanoseconds>(
1372 chrono::duration<double>(FLAGS_max_out_of_order))
1373 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -08001374
1375 VLOG(1) << "Opened " << filename << " as node "
1376 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -08001377}
1378
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001379std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001380 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001381 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001382 if (is_corrupted()) {
1383 LOG(ERROR) << "Total corrupted volumes: before = "
1384 << total_verified_before_
1385 << " | corrupted = " << total_corrupted_
1386 << " | during = " << total_verified_during_
1387 << " | after = " << total_verified_after_ << std::endl;
1388 }
1389
1390 if (span_reader_.IsIncomplete()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001391 LOG(ERROR) << "Unable to access some messages in " << filename() << " : "
1392 << span_reader_.TotalRead() << " bytes read, "
Brian Smarttea913d42021-12-10 15:02:38 -08001393 << span_reader_.TotalConsumed() << " bytes usable."
1394 << std::endl;
1395 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001396 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -08001397 }
1398
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001399 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
Brian Smarttea913d42021-12-10 15:02:38 -08001400
1401 if (crash_on_corrupt_message_flag_) {
1402 CHECK(msg.Verify()) << "Corrupted message at offset "
Austin Schuh60e77942022-05-16 17:48:24 -07001403 << total_verified_before_ << " found within "
1404 << filename()
Brian Smarttea913d42021-12-10 15:02:38 -08001405 << "; set --nocrash_on_corrupt_message to see summary;"
1406 << " also set --ignore_corrupt_messages to process"
1407 << " anyway";
1408
1409 } else if (!msg.Verify()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001410 LOG(ERROR) << "Corrupted message at offset " << total_verified_before_
Brian Smarttea913d42021-12-10 15:02:38 -08001411 << " from " << filename() << std::endl;
1412
1413 total_corrupted_ += msg_data.size();
1414
1415 while (true) {
1416 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1417
James Kuszmaul9776b392023-01-14 14:08:08 -08001418 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001419 if (!ignore_corrupt_messages_flag_) {
1420 LOG(ERROR) << "Total corrupted volumes: before = "
1421 << total_verified_before_
1422 << " | corrupted = " << total_corrupted_
1423 << " | during = " << total_verified_during_
1424 << " | after = " << total_verified_after_ << std::endl;
1425
1426 if (span_reader_.IsIncomplete()) {
1427 LOG(ERROR) << "Unable to access some messages in " << filename()
1428 << " : " << span_reader_.TotalRead() << " bytes read, "
1429 << span_reader_.TotalConsumed() << " bytes usable."
1430 << std::endl;
1431 }
1432 return nullptr;
1433 }
1434 break;
1435 }
1436
1437 SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
1438
1439 if (!next_msg.Verify()) {
1440 total_corrupted_ += msg_data.size();
1441 total_verified_during_ += total_verified_after_;
1442 total_verified_after_ = 0;
1443
1444 } else {
1445 total_verified_after_ += msg_data.size();
1446 if (ignore_corrupt_messages_flag_) {
1447 msg = next_msg;
1448 break;
1449 }
1450 }
1451 }
1452 }
1453
1454 if (is_corrupted()) {
1455 total_verified_after_ += msg_data.size();
1456 } else {
1457 total_verified_before_ += msg_data.size();
1458 }
Austin Schuh05b70472020-01-01 17:11:17 -08001459
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001460 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -07001461
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001462 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001463
1464 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -08001465
1466 if (VLOG_IS_ON(3)) {
1467 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
1468 } else if (VLOG_IS_ON(2)) {
1469 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
1470 msg_copy.mutable_message()->clear_data();
1471 VLOG(2) << "Read from " << filename() << " data "
1472 << FlatbufferToJson(msg_copy);
1473 }
1474
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001475 return result;
1476}
1477
1478std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
1479 const MessageHeader &message) {
1480 const size_t data_size = message.has_data() ? message.data()->size() : 0;
1481
1482 UnpackedMessageHeader *const unpacked_message =
1483 reinterpret_cast<UnpackedMessageHeader *>(
1484 malloc(sizeof(UnpackedMessageHeader) + data_size +
1485 kChannelDataAlignment - 1));
1486
1487 CHECK(message.has_channel_index());
1488 CHECK(message.has_monotonic_sent_time());
1489
1490 absl::Span<uint8_t> span;
1491 if (data_size > 0) {
1492 span =
1493 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
1494 &unpacked_message->actual_data[0], data_size)),
1495 data_size);
1496 }
1497
Austin Schuh826e6ce2021-11-18 20:33:10 -08001498 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001499 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001500 monotonic_remote_time = aos::monotonic_clock::time_point(
1501 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001502 }
1503 std::optional<realtime_clock::time_point> realtime_remote_time;
1504 if (message.has_realtime_remote_time()) {
1505 realtime_remote_time = realtime_clock::time_point(
1506 chrono::nanoseconds(message.realtime_remote_time()));
1507 }
1508
1509 std::optional<uint32_t> remote_queue_index;
1510 if (message.has_remote_queue_index()) {
1511 remote_queue_index = message.remote_queue_index();
1512 }
1513
James Kuszmaul9776b392023-01-14 14:08:08 -08001514 new (unpacked_message) UnpackedMessageHeader(
1515 message.channel_index(),
1516 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001517 chrono::nanoseconds(message.monotonic_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001518 realtime_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001519 chrono::nanoseconds(message.realtime_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001520 message.queue_index(), monotonic_remote_time, realtime_remote_time,
1521 remote_queue_index,
1522 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001523 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001524 message.has_monotonic_timestamp_time(), span);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001525
1526 if (data_size > 0) {
1527 memcpy(span.data(), message.data()->data(), data_size);
1528 }
1529
1530 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
1531 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -08001532}
1533
Austin Schuhc41603c2020-10-11 16:17:37 -07001534PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001535 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001536 if (parts_.parts.size() >= 2) {
1537 next_message_reader_.emplace(parts_.parts[1]);
1538 }
Austin Schuh48507722021-07-17 17:29:24 -07001539 ComputeBootCounts();
1540}
1541
1542void PartsMessageReader::ComputeBootCounts() {
1543 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
1544 std::nullopt);
1545
1546 // We have 3 vintages of log files with different amounts of information.
1547 if (log_file_header()->has_boot_uuids()) {
1548 // The new hotness with the boots explicitly listed out. We can use the log
1549 // file header to compute the boot count of all relevant nodes.
1550 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
1551 size_t node_index = 0;
1552 for (const flatbuffers::String *boot_uuid :
1553 *log_file_header()->boot_uuids()) {
1554 CHECK(parts_.boots);
1555 if (boot_uuid->size() != 0) {
1556 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
1557 if (it != parts_.boots->boot_count_map.end()) {
1558 boot_counts_[node_index] = it->second;
1559 }
1560 } else if (parts().boots->boots[node_index].size() == 1u) {
1561 boot_counts_[node_index] = 0;
1562 }
1563 ++node_index;
1564 }
1565 } else {
1566 // Older multi-node logs which are guarenteed to have UUIDs logged, or
1567 // single node log files with boot UUIDs in the header. We only know how to
1568 // order certain boots in certain circumstances.
1569 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
1570 for (size_t node_index = 0; node_index < boot_counts_.size();
1571 ++node_index) {
1572 CHECK(parts_.boots);
1573 if (parts().boots->boots[node_index].size() == 1u) {
1574 boot_counts_[node_index] = 0;
1575 }
1576 }
1577 } else {
1578 // Really old single node logs without any UUIDs. They can't reboot.
1579 CHECK_EQ(boot_counts_.size(), 1u);
1580 boot_counts_[0] = 0u;
1581 }
1582 }
1583}
Austin Schuhc41603c2020-10-11 16:17:37 -07001584
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001585std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -07001586 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001587 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -07001588 message_reader_.ReadMessage();
1589 if (message) {
1590 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001591 const monotonic_clock::time_point monotonic_sent_time =
1592 message->monotonic_sent_time;
1593
1594 // TODO(austin): Does this work with startup? Might need to use the
1595 // start time.
1596 // TODO(austin): Does this work with startup when we don't know the
1597 // remote start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -08001598 if (monotonic_sent_time >
1599 parts_.monotonic_start_time + max_out_of_order_duration()) {
1600 after_start_ = true;
1601 }
1602 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -08001603 CHECK_GE(monotonic_sent_time,
1604 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -08001605 << ": Max out of order of " << max_out_of_order_duration().count()
1606 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -08001607 << parts_.monotonic_start_time << " currently reading "
1608 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -08001609 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001610 return message;
1611 }
1612 NextLog();
1613 }
Austin Schuh32f68492020-11-08 21:45:51 -08001614 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001615 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -07001616}
1617
1618void PartsMessageReader::NextLog() {
1619 if (next_part_index_ == parts_.parts.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001620 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -07001621 done_ = true;
1622 return;
1623 }
Brian Silvermanfee16972021-09-14 12:06:38 -07001624 CHECK(next_message_reader_);
1625 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -07001626 ComputeBootCounts();
Brian Silvermanfee16972021-09-14 12:06:38 -07001627 if (next_part_index_ + 1 < parts_.parts.size()) {
1628 next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
1629 } else {
1630 next_message_reader_.reset();
1631 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001632 ++next_part_index_;
1633}
1634
Austin Schuh1be0ce42020-11-29 22:43:26 -08001635bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001636 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001637
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001638 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001639 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001640 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001641 return false;
1642 }
1643
1644 if (this->channel_index < m2.channel_index) {
1645 return true;
1646 } else if (this->channel_index > m2.channel_index) {
1647 return false;
1648 }
1649
1650 return this->queue_index < m2.queue_index;
1651}
1652
1653bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001654bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001655 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001656
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001657 return timestamp.time == m2.timestamp.time &&
1658 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001659}
Austin Schuh1be0ce42020-11-29 22:43:26 -08001660
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001661std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
1662 os << "{.channel_index=" << m.channel_index
1663 << ", .monotonic_sent_time=" << m.monotonic_sent_time
1664 << ", .realtime_sent_time=" << m.realtime_sent_time
1665 << ", .queue_index=" << m.queue_index;
1666 if (m.monotonic_remote_time) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001667 os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001668 }
1669 os << ", .realtime_remote_time=";
1670 PrintOptionalOrNull(&os, m.realtime_remote_time);
1671 os << ", .remote_queue_index=";
1672 PrintOptionalOrNull(&os, m.remote_queue_index);
1673 if (m.has_monotonic_timestamp_time) {
1674 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1675 }
Austin Schuh22cf7862022-09-19 19:09:42 -07001676 os << "}";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001677 return os;
1678}
1679
Austin Schuh1be0ce42020-11-29 22:43:26 -08001680std::ostream &operator<<(std::ostream &os, const Message &m) {
1681 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001682 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001683 if (m.data != nullptr) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001684 if (m.data->remote_queue_index.has_value()) {
1685 os << ", .remote_queue_index=" << *m.data->remote_queue_index;
1686 }
1687 if (m.data->monotonic_remote_time.has_value()) {
1688 os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
1689 }
Austin Schuhfb1b3292021-11-16 21:20:15 -08001690 os << ", .data=" << m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -08001691 }
1692 os << "}";
1693 return os;
1694}
1695
1696std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
1697 os << "{.channel_index=" << m.channel_index
1698 << ", .queue_index=" << m.queue_index
1699 << ", .monotonic_event_time=" << m.monotonic_event_time
1700 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -07001701 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001702 os << ", .remote_queue_index=" << m.remote_queue_index;
1703 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001704 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001705 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
1706 }
1707 if (m.realtime_remote_time != realtime_clock::min_time) {
1708 os << ", .realtime_remote_time=" << m.realtime_remote_time;
1709 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001710 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001711 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1712 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001713 if (m.data != nullptr) {
1714 os << ", .data=" << *m.data;
Austin Schuh22cf7862022-09-19 19:09:42 -07001715 } else {
1716 os << ", .data=nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08001717 }
1718 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -08001719 return os;
1720}
1721
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001722LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001723 : parts_message_reader_(log_parts),
1724 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
1725}
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001726
1727Message *LogPartsSorter::Front() {
1728 // Queue up data until enough data has been queued that the front message is
1729 // sorted enough to be safe to pop. This may do nothing, so we should make
1730 // sure the nothing path is checked quickly.
1731 if (sorted_until() != monotonic_clock::max_time) {
1732 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -07001733 if (!messages_.empty() &&
1734 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -08001735 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001736 break;
1737 }
1738
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001739 std::shared_ptr<UnpackedMessageHeader> m =
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001740 parts_message_reader_.ReadMessage();
1741 // No data left, sorted forever, work through what is left.
1742 if (!m) {
1743 sorted_until_ = monotonic_clock::max_time;
1744 break;
1745 }
1746
Austin Schuh48507722021-07-17 17:29:24 -07001747 size_t monotonic_timestamp_boot = 0;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001748 if (m->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -07001749 monotonic_timestamp_boot = parts().logger_boot_count;
1750 }
1751 size_t monotonic_remote_boot = 0xffffff;
1752
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001753 if (m->monotonic_remote_time.has_value()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001754 const Node *node =
1755 parts().config->nodes()->Get(source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001756
Austin Schuh48507722021-07-17 17:29:24 -07001757 std::optional<size_t> boot = parts_message_reader_.boot_count(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001758 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001759 CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
Austin Schuh60e77942022-05-16 17:48:24 -07001760 << ", with index " << source_node_index_[m->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -07001761 monotonic_remote_boot = *boot;
1762 }
1763
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001764 messages_.insert(
1765 Message{.channel_index = m->channel_index,
1766 .queue_index = BootQueueIndex{.boot = parts().boot_count,
1767 .index = m->queue_index},
1768 .timestamp = BootTimestamp{.boot = parts().boot_count,
1769 .time = m->monotonic_sent_time},
1770 .monotonic_remote_boot = monotonic_remote_boot,
1771 .monotonic_timestamp_boot = monotonic_timestamp_boot,
1772 .data = std::move(m)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001773
1774 // Now, update sorted_until_ to match the new message.
1775 if (parts_message_reader_.newest_timestamp() >
1776 monotonic_clock::min_time +
1777 parts_message_reader_.max_out_of_order_duration()) {
1778 sorted_until_ = parts_message_reader_.newest_timestamp() -
1779 parts_message_reader_.max_out_of_order_duration();
1780 } else {
1781 sorted_until_ = monotonic_clock::min_time;
1782 }
1783 }
1784 }
1785
1786 // Now that we have enough data queued, return a pointer to the oldest piece
1787 // of data if it exists.
1788 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -08001789 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001790 return nullptr;
1791 }
1792
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001793 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -08001794 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001795 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001796 return &(*messages_.begin());
1797}
1798
1799void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
1800
1801std::string LogPartsSorter::DebugString() const {
1802 std::stringstream ss;
1803 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001804 int count = 0;
1805 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001806 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001807 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
1808 ss << m << "\n";
1809 } else if (no_dots) {
1810 ss << "...\n";
1811 no_dots = false;
1812 }
1813 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001814 }
1815 ss << "] <- " << parts_message_reader_.filename();
1816 return ss.str();
1817}
1818
Austin Schuhd2f96102020-12-01 20:27:29 -08001819NodeMerger::NodeMerger(std::vector<LogParts> parts) {
1820 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -07001821 // Enforce that we are sorting things only from a single node from a single
1822 // boot.
1823 const std::string_view part0_node = parts[0].node;
1824 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -08001825 for (size_t i = 1; i < parts.size(); ++i) {
1826 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -07001827 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
1828 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -08001829 }
Austin Schuh715adc12021-06-29 22:07:39 -07001830
1831 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
1832
Austin Schuhd2f96102020-12-01 20:27:29 -08001833 for (LogParts &part : parts) {
1834 parts_sorters_.emplace_back(std::move(part));
1835 }
1836
Austin Schuhd2f96102020-12-01 20:27:29 -08001837 monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh9dc42612021-09-20 20:41:29 -07001838 realtime_start_time_ = realtime_clock::min_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001839 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001840 // We want to capture the earliest meaningful start time here. The start
1841 // time defaults to min_time when there's no meaningful value to report, so
1842 // let's ignore those.
Austin Schuh9dc42612021-09-20 20:41:29 -07001843 if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time) {
1844 bool accept = false;
1845 // We want to prioritize start times from the logger node. Really, we
1846 // want to prioritize start times with a valid realtime_clock time. So,
1847 // if we have a start time without a RT clock, prefer a start time with a
1848 // RT clock, even it if is later.
1849 if (parts_sorter.realtime_start_time() != realtime_clock::min_time) {
1850 // We've got a good one. See if the current start time has a good RT
1851 // clock, or if we should use this one instead.
1852 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1853 accept = true;
1854 } else if (realtime_start_time_ == realtime_clock::min_time) {
1855 // The previous start time doesn't have a good RT time, so it is very
1856 // likely the start time from a remote part file. We just found a
1857 // better start time with a real RT time, so switch to that instead.
1858 accept = true;
1859 }
1860 } else if (realtime_start_time_ == realtime_clock::min_time) {
1861 // We don't have a RT time, so take the oldest.
1862 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1863 accept = true;
1864 }
1865 }
1866
1867 if (accept) {
1868 monotonic_start_time_ = parts_sorter.monotonic_start_time();
1869 realtime_start_time_ = parts_sorter.realtime_start_time();
1870 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001871 }
1872 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001873
1874 // If there was no meaningful start time reported, just use min_time.
1875 if (monotonic_start_time_ == monotonic_clock::max_time) {
1876 monotonic_start_time_ = monotonic_clock::min_time;
1877 realtime_start_time_ = realtime_clock::min_time;
1878 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001879}
Austin Schuh8f52ed52020-11-30 23:12:39 -08001880
Austin Schuh0ca51f32020-12-25 21:51:45 -08001881std::vector<const LogParts *> NodeMerger::Parts() const {
1882 std::vector<const LogParts *> p;
1883 p.reserve(parts_sorters_.size());
1884 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
1885 p.emplace_back(&parts_sorter.parts());
1886 }
1887 return p;
1888}
1889
Austin Schuh8f52ed52020-11-30 23:12:39 -08001890Message *NodeMerger::Front() {
1891 // Return the current Front if we have one, otherwise go compute one.
1892 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -08001893 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001894 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -08001895 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001896 }
1897
1898 // Otherwise, do a simple search for the oldest message, deduplicating any
1899 // duplicates.
1900 Message *oldest = nullptr;
1901 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001902 for (LogPartsSorter &parts_sorter : parts_sorters_) {
1903 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -08001904 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001905 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001906 continue;
1907 }
1908 if (oldest == nullptr || *m < *oldest) {
1909 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -08001910 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001911 } else if (*m == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001912 // Found a duplicate. If there is a choice, we want the one which has
1913 // the timestamp time.
1914 if (!m->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001915 parts_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001916 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001917 current_->PopFront();
1918 current_ = &parts_sorter;
1919 oldest = m;
1920 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001921 CHECK_EQ(m->data->monotonic_timestamp_time,
1922 oldest->data->monotonic_timestamp_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001923 parts_sorter.PopFront();
1924 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001925 }
1926
1927 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -08001928 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001929 }
1930
Austin Schuhb000de62020-12-03 22:00:40 -08001931 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001932 CHECK_GE(oldest->timestamp.time, last_message_time_);
1933 last_message_time_ = oldest->timestamp.time;
Austin Schuh5dd22842021-11-17 16:09:39 -08001934 monotonic_oldest_time_ =
1935 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08001936 } else {
1937 last_message_time_ = monotonic_clock::max_time;
1938 }
1939
Austin Schuh8f52ed52020-11-30 23:12:39 -08001940 // Return the oldest message found. This will be nullptr if nothing was
1941 // found, indicating there is nothing left.
1942 return oldest;
1943}
1944
1945void NodeMerger::PopFront() {
1946 CHECK(current_ != nullptr) << "Popping before calling Front()";
1947 current_->PopFront();
1948 current_ = nullptr;
1949}
1950
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001951BootMerger::BootMerger(std::vector<LogParts> files) {
1952 std::vector<std::vector<LogParts>> boots;
1953
1954 // Now, we need to split things out by boot.
1955 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001956 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001957 if (boot_count + 1 > boots.size()) {
1958 boots.resize(boot_count + 1);
1959 }
1960 boots[boot_count].emplace_back(std::move(files[i]));
1961 }
1962
1963 node_mergers_.reserve(boots.size());
1964 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07001965 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001966 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -07001967 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001968 }
1969 node_mergers_.emplace_back(
1970 std::make_unique<NodeMerger>(std::move(boots[i])));
1971 }
1972}
1973
1974Message *BootMerger::Front() {
1975 Message *result = node_mergers_[index_]->Front();
1976
1977 if (result != nullptr) {
1978 return result;
1979 }
1980
1981 if (index_ + 1u == node_mergers_.size()) {
1982 // At the end of the last node merger, just return.
1983 return nullptr;
1984 } else {
1985 ++index_;
1986 return Front();
1987 }
1988}
1989
1990void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
1991
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001992std::vector<const LogParts *> BootMerger::Parts() const {
1993 std::vector<const LogParts *> results;
1994 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
1995 std::vector<const LogParts *> node_parts = node_merger->Parts();
1996
1997 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
1998 std::make_move_iterator(node_parts.end()));
1999 }
2000
2001 return results;
2002}
2003
Austin Schuhd2f96102020-12-01 20:27:29 -08002004TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002005 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -08002006 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002007 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002008 if (!configuration_) {
2009 configuration_ = part->config;
2010 } else {
2011 CHECK_EQ(configuration_.get(), part->config.get());
2012 }
2013 }
2014 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -08002015 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
2016 // pretty simple.
2017 if (configuration::MultiNode(config)) {
2018 nodes_data_.resize(config->nodes()->size());
2019 const Node *my_node = config->nodes()->Get(node());
2020 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
2021 const Node *node = config->nodes()->Get(node_index);
2022 NodeData *node_data = &nodes_data_[node_index];
2023 node_data->channels.resize(config->channels()->size());
2024 // We should save the channel if it is delivered to the node represented
2025 // by the NodeData, but not sent by that node. That combo means it is
2026 // forwarded.
2027 size_t channel_index = 0;
2028 node_data->any_delivered = false;
2029 for (const Channel *channel : *config->channels()) {
2030 node_data->channels[channel_index].delivered =
2031 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08002032 configuration::ChannelIsSendableOnNode(channel, my_node) &&
2033 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08002034 node_data->any_delivered = node_data->any_delivered ||
2035 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08002036 if (node_data->channels[channel_index].delivered) {
2037 const Connection *connection =
2038 configuration::ConnectionToNode(channel, node);
2039 node_data->channels[channel_index].time_to_live =
2040 chrono::nanoseconds(connection->time_to_live());
2041 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002042 ++channel_index;
2043 }
2044 }
2045
2046 for (const Channel *channel : *config->channels()) {
2047 source_node_.emplace_back(configuration::GetNodeIndex(
2048 config, channel->source_node()->string_view()));
2049 }
2050 }
2051}
2052
2053void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002054 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08002055 CHECK_NE(timestamp_mapper->node(), node());
2056 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
2057
2058 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002059 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08002060 // we could needlessly save data.
2061 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08002062 VLOG(1) << "Registering on node " << node() << " for peer node "
2063 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08002064 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
2065
2066 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07002067
2068 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08002069 }
2070}
2071
Austin Schuh79b30942021-01-24 22:32:21 -08002072void TimestampMapper::QueueMessage(Message *m) {
Austin Schuh60e77942022-05-16 17:48:24 -07002073 matched_messages_.emplace_back(
2074 TimestampedMessage{.channel_index = m->channel_index,
2075 .queue_index = m->queue_index,
2076 .monotonic_event_time = m->timestamp,
2077 .realtime_event_time = m->data->realtime_sent_time,
2078 .remote_queue_index = BootQueueIndex::Invalid(),
2079 .monotonic_remote_time = BootTimestamp::min_time(),
2080 .realtime_remote_time = realtime_clock::min_time,
2081 .monotonic_timestamp_time = BootTimestamp::min_time(),
2082 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08002083}
2084
2085TimestampedMessage *TimestampMapper::Front() {
2086 // No need to fetch anything new. A previous message still exists.
2087 switch (first_message_) {
2088 case FirstMessage::kNeedsUpdate:
2089 break;
2090 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08002091 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002092 case FirstMessage::kNullptr:
2093 return nullptr;
2094 }
2095
Austin Schuh79b30942021-01-24 22:32:21 -08002096 if (matched_messages_.empty()) {
2097 if (!QueueMatched()) {
2098 first_message_ = FirstMessage::kNullptr;
2099 return nullptr;
2100 }
2101 }
2102 first_message_ = FirstMessage::kInMessage;
2103 return &matched_messages_.front();
2104}
2105
2106bool TimestampMapper::QueueMatched() {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002107 MatchResult result = MatchResult::kEndOfFile;
2108 do {
2109 result = MaybeQueueMatched();
2110 } while (result == MatchResult::kSkipped);
2111 return result == MatchResult::kQueued;
2112}
2113
2114bool TimestampMapper::CheckReplayChannelsAndMaybePop(
2115 const TimestampedMessage & /*message*/) {
2116 if (replay_channels_callback_ &&
2117 !replay_channels_callback_(matched_messages_.back())) {
2118 matched_messages_.pop_back();
2119 return true;
2120 }
2121 return false;
2122}
2123
2124TimestampMapper::MatchResult TimestampMapper::MaybeQueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08002125 if (nodes_data_.empty()) {
2126 // Simple path. We are single node, so there are no timestamps to match!
2127 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002128 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002129 if (!m) {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002130 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002131 }
Austin Schuh79b30942021-01-24 22:32:21 -08002132 // Enqueue this message into matched_messages_ so we have a place to
2133 // associate remote timestamps, and return it.
2134 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08002135
Austin Schuh79b30942021-01-24 22:32:21 -08002136 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2137 last_message_time_ = matched_messages_.back().monotonic_event_time;
2138
2139 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002140 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08002141 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002142 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2143 return MatchResult::kSkipped;
2144 }
2145 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002146 }
2147
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002148 // We need to only add messages to the list so they get processed for
2149 // messages which are delivered. Reuse the flow below which uses messages_
2150 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08002151 if (messages_.empty()) {
2152 if (!Queue()) {
2153 // Found nothing to add, we are out of data!
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002154 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002155 }
2156
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002157 // Now that it has been added (and cannibalized), forget about it
2158 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002159 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002160 }
2161
2162 Message *m = &(messages_.front());
2163
2164 if (source_node_[m->channel_index] == node()) {
2165 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08002166 QueueMessage(m);
2167 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2168 last_message_time_ = matched_messages_.back().monotonic_event_time;
2169 messages_.pop_front();
2170 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002171 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2172 return MatchResult::kSkipped;
2173 }
2174 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002175 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002176 // Got a timestamp, find the matching remote data, match it, and return
2177 // it.
Austin Schuhd2f96102020-12-01 20:27:29 -08002178 Message data = MatchingMessageFor(*m);
2179
2180 // Return the data from the remote. The local message only has timestamp
2181 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08002182 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08002183 .channel_index = m->channel_index,
2184 .queue_index = m->queue_index,
2185 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002186 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07002187 .remote_queue_index =
2188 BootQueueIndex{.boot = m->monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002189 .index = m->data->remote_queue_index.value()},
2190 .monotonic_remote_time = {m->monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002191 m->data->monotonic_remote_time.value()},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002192 .realtime_remote_time = m->data->realtime_remote_time.value(),
2193 .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
2194 m->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08002195 .data = std::move(data.data)});
2196 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2197 last_message_time_ = matched_messages_.back().monotonic_event_time;
2198 // Since messages_ holds the data, drop it.
2199 messages_.pop_front();
2200 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002201 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2202 return MatchResult::kSkipped;
2203 }
2204 return MatchResult::kQueued;
Austin Schuh79b30942021-01-24 22:32:21 -08002205 }
2206}
2207
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002208void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08002209 while (last_message_time_ <= queue_time) {
2210 if (!QueueMatched()) {
2211 return;
2212 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002213 }
2214}
2215
Austin Schuhe639ea12021-01-25 13:00:22 -08002216void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002217 // Note: queueing for time doesn't really work well across boots. So we
2218 // just assume that if you are using this, you only care about the current
2219 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002220 //
2221 // TODO(austin): Is that the right concept?
2222 //
Austin Schuhe639ea12021-01-25 13:00:22 -08002223 // Make sure we have something queued first. This makes the end time
2224 // calculation simpler, and is typically what folks want regardless.
2225 if (matched_messages_.empty()) {
2226 if (!QueueMatched()) {
2227 return;
2228 }
2229 }
2230
2231 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002232 std::max(monotonic_start_time(
2233 matched_messages_.front().monotonic_event_time.boot),
2234 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08002235 time_estimation_buffer;
2236
2237 // Place sorted messages on the list until we have
2238 // --time_estimation_buffer_seconds seconds queued up (but queue at least
2239 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002240 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08002241 if (!QueueMatched()) {
2242 return;
2243 }
2244 }
2245}
2246
Austin Schuhd2f96102020-12-01 20:27:29 -08002247void TimestampMapper::PopFront() {
2248 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08002249 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002250 first_message_ = FirstMessage::kNeedsUpdate;
2251
Austin Schuh79b30942021-01-24 22:32:21 -08002252 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002253}
2254
2255Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002256 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002257 CHECK_NOTNULL(message.data);
2258 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07002259 const BootQueueIndex remote_queue_index =
2260 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002261 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08002262
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002263 CHECK(message.data->monotonic_remote_time.has_value());
2264 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08002265
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002266 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07002267 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002268 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002269 const realtime_clock::time_point realtime_remote_time =
2270 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002271
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002272 TimestampMapper *peer =
2273 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08002274
2275 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002276 // asked to pull a timestamp from a peer which doesn't exist, return an
2277 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08002278 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002279 // TODO(austin): Make sure the tests hit all these paths with a boot count
2280 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002281 return Message{.channel_index = message.channel_index,
2282 .queue_index = remote_queue_index,
2283 .timestamp = monotonic_remote_time,
2284 .monotonic_remote_boot = 0xffffff,
2285 .monotonic_timestamp_boot = 0xffffff,
2286 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08002287 }
2288
2289 // The queue which will have the matching data, if available.
2290 std::deque<Message> *data_queue =
2291 &peer->nodes_data_[node()].channels[message.channel_index].messages;
2292
Austin Schuh79b30942021-01-24 22:32:21 -08002293 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08002294
2295 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002296 return Message{.channel_index = message.channel_index,
2297 .queue_index = remote_queue_index,
2298 .timestamp = monotonic_remote_time,
2299 .monotonic_remote_boot = 0xffffff,
2300 .monotonic_timestamp_boot = 0xffffff,
2301 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002302 }
2303
Austin Schuhd2f96102020-12-01 20:27:29 -08002304 if (remote_queue_index < data_queue->front().queue_index ||
2305 remote_queue_index > data_queue->back().queue_index) {
Austin Schuh60e77942022-05-16 17:48:24 -07002306 return Message{.channel_index = message.channel_index,
2307 .queue_index = remote_queue_index,
2308 .timestamp = monotonic_remote_time,
2309 .monotonic_remote_boot = 0xffffff,
2310 .monotonic_timestamp_boot = 0xffffff,
2311 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002312 }
2313
Austin Schuh993ccb52020-12-12 15:59:32 -08002314 // The algorithm below is constant time with some assumptions. We need there
2315 // to be no missing messages in the data stream. This also assumes a queue
2316 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07002317 if (data_queue->back().queue_index.boot ==
2318 data_queue->front().queue_index.boot &&
2319 (data_queue->back().queue_index.index -
2320 data_queue->front().queue_index.index + 1u ==
2321 data_queue->size())) {
2322 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08002323 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07002324 //
2325 // TODO(austin): Move if not reliable.
2326 Message result = (*data_queue)[remote_queue_index.index -
2327 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08002328
2329 CHECK_EQ(result.timestamp, monotonic_remote_time)
2330 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh6a7358f2021-11-18 22:40:40 -08002331 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08002332 << ": Queue index matches, but timestamp doesn't. Please investigate!";
2333 // Now drop the data off the front. We have deduplicated timestamps, so we
2334 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07002335 data_queue->erase(
2336 data_queue->begin(),
2337 data_queue->begin() +
2338 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08002339 return result;
2340 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07002341 // TODO(austin): Binary search.
2342 auto it = std::find_if(
2343 data_queue->begin(), data_queue->end(),
2344 [remote_queue_index,
2345 remote_boot = monotonic_remote_time.boot](const Message &m) {
2346 return m.queue_index == remote_queue_index &&
2347 m.timestamp.boot == remote_boot;
2348 });
Austin Schuh993ccb52020-12-12 15:59:32 -08002349 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002350 return Message{.channel_index = message.channel_index,
2351 .queue_index = remote_queue_index,
2352 .timestamp = monotonic_remote_time,
2353 .monotonic_remote_boot = 0xffffff,
2354 .monotonic_timestamp_boot = 0xffffff,
2355 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08002356 }
2357
2358 Message result = std::move(*it);
2359
2360 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002361 << ": Queue index matches, but timestamp doesn't. Please "
2362 "investigate!";
2363 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
2364 << ": Queue index matches, but timestamp doesn't. Please "
2365 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08002366
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08002367 // Erase everything up to this message. We want to keep 1 message in the
2368 // queue so we can handle reliable messages forwarded across boots.
2369 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08002370
2371 return result;
2372 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002373}
2374
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002375void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002376 if (queued_until_ > t) {
2377 return;
2378 }
2379 while (true) {
2380 if (!messages_.empty() && messages_.back().timestamp > t) {
2381 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
2382 return;
2383 }
2384
2385 if (!Queue()) {
2386 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002387 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08002388 return;
2389 }
2390
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002391 // Now that it has been added (and cannibalized), forget about it
2392 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002393 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002394 }
2395}
2396
2397bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002398 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002399 if (m == nullptr) {
2400 return false;
2401 }
2402 for (NodeData &node_data : nodes_data_) {
2403 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07002404 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08002405 if (node_data.channels[m->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08002406 // If we have data but no timestamps (logs where the timestamps didn't get
2407 // logged are classic), we can grow this indefinitely. We don't need to
2408 // keep anything that is older than the last message returned.
2409
2410 // We have the time on the source node.
2411 // We care to wait until we have the time on the destination node.
2412 std::deque<Message> &messages =
2413 node_data.channels[m->channel_index].messages;
2414 // Max delay over the network is the TTL, so let's take the queue time and
2415 // add TTL to it. Don't forget any messages which are reliable until
2416 // someone can come up with a good reason to forget those too.
2417 if (node_data.channels[m->channel_index].time_to_live >
2418 chrono::nanoseconds(0)) {
2419 // We need to make *some* assumptions about network delay for this to
2420 // work. We want to only look at the RX side. This means we need to
2421 // track the last time a message was popped from any channel from the
2422 // node sending this message, and compare that to the max time we expect
2423 // that a message will take to be delivered across the network. This
2424 // assumes that messages are popped in time order as a proxy for
2425 // measuring the distributed time at this layer.
2426 //
2427 // Leave at least 1 message in here so we can handle reboots and
2428 // messages getting sent twice.
2429 while (messages.size() > 1u &&
2430 messages.begin()->timestamp +
2431 node_data.channels[m->channel_index].time_to_live +
2432 chrono::duration_cast<chrono::nanoseconds>(
2433 chrono::duration<double>(FLAGS_max_network_delay)) <
2434 last_popped_message_time_) {
2435 messages.pop_front();
2436 }
2437 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002438 node_data.channels[m->channel_index].messages.emplace_back(*m);
2439 }
2440 }
2441
2442 messages_.emplace_back(std::move(*m));
2443 return true;
2444}
2445
2446std::string TimestampMapper::DebugString() const {
2447 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07002448 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002449 for (const Message &message : messages_) {
2450 ss << " " << message << "\n";
2451 }
2452 ss << "] queued_until " << queued_until_;
2453 for (const NodeData &ns : nodes_data_) {
2454 if (ns.peer == nullptr) continue;
2455 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
2456 size_t channel_index = 0;
2457 for (const NodeData::ChannelData &channel_data :
2458 ns.peer->nodes_data_[node()].channels) {
2459 if (channel_data.messages.empty()) {
2460 continue;
2461 }
Austin Schuhb000de62020-12-03 22:00:40 -08002462
Austin Schuhd2f96102020-12-01 20:27:29 -08002463 ss << " channel " << channel_index << " [\n";
2464 for (const Message &m : channel_data.messages) {
2465 ss << " " << m << "\n";
2466 }
2467 ss << " ]\n";
2468 ++channel_index;
2469 }
2470 ss << "] queued_until " << ns.peer->queued_until_;
2471 }
2472 return ss.str();
2473}
2474
Austin Schuhee711052020-08-24 16:06:09 -07002475std::string MaybeNodeName(const Node *node) {
2476 if (node != nullptr) {
2477 return node->name()->str() + " ";
2478 }
2479 return "";
2480}
2481
Brian Silvermanf51499a2020-09-21 12:49:08 -07002482} // namespace aos::logger