blob: 27c6c3b37d78c0c9ec64da6f6e8ba83197f87e29 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Alexei Strots01395492023-03-20 13:59:56 -070010#include <filesystem>
Austin Schuha36c8902019-12-30 18:07:15 -080011
Austin Schuhe4fca832020-03-07 16:58:53 -080012#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080013#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070014#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080015#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080016#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080017#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080018#include "gflags/gflags.h"
19#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080020
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070021#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070022#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070023#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070024#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070025#else
26#define ENABLE_LZMA 0
27#endif
28
29#if ENABLE_LZMA
30#include "aos/events/logging/lzma_encoder.h"
31#endif
Austin Schuh86110712022-09-16 15:40:54 -070032#if ENABLE_S3
33#include "aos/events/logging/s3_fetcher.h"
34#endif
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070035
Austin Schuh48d10d62022-10-16 22:19:23 -070036DEFINE_int32(flush_size, 128 * 1024,
Austin Schuha36c8902019-12-30 18:07:15 -080037 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070038DEFINE_double(
39 flush_period, 5.0,
40 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080041
Austin Schuha040c3f2021-02-13 16:09:07 -080042DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080043 max_network_delay, 1.0,
44 "Max time to assume a message takes to cross the network before we are "
45 "willing to drop it from our buffers and assume it didn't make it. "
46 "Increasing this number can increase memory usage depending on the packet "
47 "loss of your network or if the timestamps aren't logged for a message.");
48
49DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080050 max_out_of_order, -1,
51 "If set, this overrides the max out of order duration for a log file.");
52
Austin Schuh0e8db662021-07-06 10:43:47 -070053DEFINE_bool(workaround_double_headers, true,
54 "Some old log files have two headers at the beginning. Use the "
55 "last header as the actual header.");
56
Brian Smarttea913d42021-12-10 15:02:38 -080057DEFINE_bool(crash_on_corrupt_message, true,
58 "When true, MessageReader will crash the first time a message "
59 "with corrupted format is found. When false, the crash will be "
60 "suppressed, and any remaining readable messages will be "
61 "evaluated to present verified vs corrupted stats.");
62
63DEFINE_bool(ignore_corrupt_messages, false,
64 "When true, and crash_on_corrupt_message is false, then any "
65 "corrupt message found by MessageReader be silently ignored, "
66 "providing access to all uncorrupted messages in a logfile.");
67
Brian Silvermanf51499a2020-09-21 12:49:08 -070068namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070069namespace {
Austin Schuha36c8902019-12-30 18:07:15 -080070
Austin Schuh05b70472020-01-01 17:11:17 -080071namespace chrono = std::chrono;
72
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070073template <typename T>
74void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
75 if (t.has_value()) {
76 *os << *t;
77 } else {
78 *os << "null";
79 }
80}
81} // namespace
82
Alexei Strots01395492023-03-20 13:59:56 -070083DetachedBufferWriter::DetachedBufferWriter(
84 std::unique_ptr<FileHandler> file_handler,
85 std::unique_ptr<DataEncoder> encoder)
86 : file_handler_(std::move(file_handler)), encoder_(std::move(encoder)) {
87 CHECK(file_handler_);
88 ran_out_of_space_ = file_handler_->OpenForWrite() == WriteCode::kOutOfSpace;
89 if (ran_out_of_space_) {
90 LOG(WARNING) << "And we are out of space";
Austin Schuh4c3cdb72023-02-11 15:05:26 -080091 }
92}
93
Austin Schuha36c8902019-12-30 18:07:15 -080094DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -070095 Close();
96 if (ran_out_of_space_) {
97 CHECK(acknowledge_ran_out_of_space_)
98 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -070099 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700100}
101
Brian Silvermand90905f2020-09-23 14:42:56 -0700102DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700103 *this = std::move(other);
104}
105
Brian Silverman87ac0402020-09-17 14:47:01 -0700106// When other is destroyed "soon" (which it should be because we're getting an
107// rvalue reference to it), it will flush etc all the data we have queued up
108// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700109DetachedBufferWriter &DetachedBufferWriter::operator=(
110 DetachedBufferWriter &&other) {
Alexei Strots01395492023-03-20 13:59:56 -0700111 std::swap(file_handler_, other.file_handler_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700112 std::swap(encoder_, other.encoder_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700113 std::swap(ran_out_of_space_, other.ran_out_of_space_);
114 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh4c3cdb72023-02-11 15:05:26 -0800115 std::swap(last_flush_time_, other.last_flush_time_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700116 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800117}
118
Austin Schuh8bdfc492023-02-11 12:53:13 -0800119void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *copier,
Austin Schuh7ef11a42023-02-04 17:15:12 -0800120 aos::monotonic_clock::time_point now) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700121 if (ran_out_of_space_) {
122 // We don't want any later data to be written after space becomes
123 // available, so refuse to write anything more once we've dropped data
124 // because we ran out of space.
Austin Schuh48d10d62022-10-16 22:19:23 -0700125 return;
Austin Schuha36c8902019-12-30 18:07:15 -0800126 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700127
Austin Schuh8bdfc492023-02-11 12:53:13 -0800128 const size_t message_size = copier->size();
129 size_t overall_bytes_written = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -0700130
Austin Schuh8bdfc492023-02-11 12:53:13 -0800131 // Keep writing chunks until we've written it all. If we end up with a
132 // partial write, this means we need to flush to disk.
133 do {
Alexei Strots01395492023-03-20 13:59:56 -0700134 const size_t bytes_written =
135 encoder_->Encode(copier, overall_bytes_written);
Austin Schuh8bdfc492023-02-11 12:53:13 -0800136 CHECK(bytes_written != 0);
137
138 overall_bytes_written += bytes_written;
139 if (overall_bytes_written < message_size) {
140 VLOG(1) << "Flushing because of a partial write, tried to write "
141 << message_size << " wrote " << overall_bytes_written;
142 Flush(now);
143 }
144 } while (overall_bytes_written < message_size);
145
Austin Schuhbd06ae42021-03-31 22:48:21 -0700146 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800147}
148
Brian Silverman0465fcf2020-09-24 00:29:18 -0700149void DetachedBufferWriter::Close() {
Alexei Strots01395492023-03-20 13:59:56 -0700150 if (!file_handler_->is_open()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700151 return;
152 }
153 encoder_->Finish();
154 while (encoder_->queue_size() > 0) {
Austin Schuh8bdfc492023-02-11 12:53:13 -0800155 Flush(monotonic_clock::max_time);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700156 }
Austin Schuhb2461652023-05-01 08:30:56 -0700157 encoder_.reset();
Alexei Strots01395492023-03-20 13:59:56 -0700158 ran_out_of_space_ = file_handler_->Close() == WriteCode::kOutOfSpace;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700159}
160
Austin Schuh8bdfc492023-02-11 12:53:13 -0800161void DetachedBufferWriter::Flush(aos::monotonic_clock::time_point now) {
162 last_flush_time_ = now;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700163 if (ran_out_of_space_) {
164 // We don't want any later data to be written after space becomes available,
165 // so refuse to write anything more once we've dropped data because we ran
166 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700167 if (encoder_) {
168 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
169 encoder_->Clear(encoder_->queue().size());
170 } else {
171 VLOG(1) << "No queue to ignore";
172 }
173 return;
174 }
175
176 const auto queue = encoder_->queue();
177 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700178 return;
179 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700180
Alexei Strots01395492023-03-20 13:59:56 -0700181 const WriteResult result = file_handler_->Write(queue);
182 encoder_->Clear(result.messages_written);
183 ran_out_of_space_ = result.code == WriteCode::kOutOfSpace;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700184}
185
Austin Schuhbd06ae42021-03-31 22:48:21 -0700186void DetachedBufferWriter::FlushAtThreshold(
187 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700188 if (ran_out_of_space_) {
189 // We don't want any later data to be written after space becomes available,
190 // so refuse to write anything more once we've dropped data because we ran
191 // out of space.
192 if (encoder_) {
193 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
194 encoder_->Clear(encoder_->queue().size());
195 } else {
196 VLOG(1) << "No queue to ignore";
197 }
198 return;
199 }
200
Austin Schuhbd06ae42021-03-31 22:48:21 -0700201 // We don't want to flush the first time through. Otherwise we will flush as
202 // the log file header might be compressing, defeating any parallelism and
203 // queueing there.
204 if (last_flush_time_ == aos::monotonic_clock::min_time) {
205 last_flush_time_ = now;
206 }
207
Brian Silvermanf51499a2020-09-21 12:49:08 -0700208 // Flush if we are at the max number of iovs per writev, because there's no
209 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700210 // data queued up or if it has been long enough.
Austin Schuh8bdfc492023-02-11 12:53:13 -0800211 while (encoder_->space() == 0 ||
212 encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700213 encoder_->queue_size() >= IOV_MAX ||
Austin Schuh3ebaf782023-04-07 16:03:28 -0700214 (now > last_flush_time_ +
215 chrono::duration_cast<chrono::nanoseconds>(
216 chrono::duration<double>(FLAGS_flush_period)) &&
217 encoder_->queued_bytes() != 0)) {
218 VLOG(1) << "Chose to flush at " << now << ", last " << last_flush_time_
219 << " queued bytes " << encoder_->queued_bytes();
Austin Schuh8bdfc492023-02-11 12:53:13 -0800220 Flush(now);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700221 }
Austin Schuha36c8902019-12-30 18:07:15 -0800222}
223
Austin Schuhf2d0e682022-10-16 14:20:58 -0700224// Do the magic dance to convert the endianness of the data and append it to the
225// buffer.
226namespace {
227
228// TODO(austin): Look at the generated code to see if building the header is
229// efficient or not.
230template <typename T>
231uint8_t *Push(uint8_t *buffer, const T data) {
232 const T endian_data = flatbuffers::EndianScalar<T>(data);
233 std::memcpy(buffer, &endian_data, sizeof(T));
234 return buffer + sizeof(T);
235}
236
237uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
238 std::memcpy(buffer, data, size);
239 return buffer + size;
240}
241
242uint8_t *Pad(uint8_t *buffer, size_t padding) {
243 std::memset(buffer, 0, padding);
244 return buffer + padding;
245}
246} // namespace
247
248flatbuffers::Offset<MessageHeader> PackRemoteMessage(
249 flatbuffers::FlatBufferBuilder *fbb,
250 const message_bridge::RemoteMessage *msg, int channel_index,
251 const aos::monotonic_clock::time_point monotonic_timestamp_time) {
252 logger::MessageHeader::Builder message_header_builder(*fbb);
253 // Note: this must match the same order as MessageBridgeServer and
254 // PackMessage. We want identical headers to have identical
255 // on-the-wire formats to make comparing them easier.
256
257 message_header_builder.add_channel_index(channel_index);
258
259 message_header_builder.add_queue_index(msg->queue_index());
260 message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
261 message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
262
263 message_header_builder.add_monotonic_remote_time(
264 msg->monotonic_remote_time());
265 message_header_builder.add_realtime_remote_time(msg->realtime_remote_time());
266 message_header_builder.add_remote_queue_index(msg->remote_queue_index());
267
268 message_header_builder.add_monotonic_timestamp_time(
269 monotonic_timestamp_time.time_since_epoch().count());
270
271 return message_header_builder.Finish();
272}
273
274size_t PackRemoteMessageInline(
275 uint8_t *buffer, const message_bridge::RemoteMessage *msg,
276 int channel_index,
Austin Schuh71a40d42023-02-04 21:22:22 -0800277 const aos::monotonic_clock::time_point monotonic_timestamp_time,
278 size_t start_byte, size_t end_byte) {
Austin Schuhf2d0e682022-10-16 14:20:58 -0700279 const flatbuffers::uoffset_t message_size = PackRemoteMessageSize();
Austin Schuh71a40d42023-02-04 21:22:22 -0800280 DCHECK_EQ((start_byte % 8u), 0u);
281 DCHECK_EQ((end_byte % 8u), 0u);
282 DCHECK_LE(start_byte, end_byte);
283 DCHECK_LE(end_byte, message_size);
Austin Schuhf2d0e682022-10-16 14:20:58 -0700284
Austin Schuh71a40d42023-02-04 21:22:22 -0800285 switch (start_byte) {
286 case 0x00u:
287 if ((end_byte) == 0x00u) {
288 break;
289 }
290 // clang-format off
291 // header:
292 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
293 buffer = Push<flatbuffers::uoffset_t>(
294 buffer, message_size - sizeof(flatbuffers::uoffset_t));
295 // +0x04 | 20 00 00 00 | UOffset32 | 0x00000020 (32) Loc: +0x24 | offset to root table `aos.logger.MessageHeader`
296 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x20);
297 [[fallthrough]];
298 case 0x08u:
299 if ((end_byte) == 0x08u) {
300 break;
301 }
302 //
303 // padding:
304 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
305 buffer = Pad(buffer, 6);
306 //
307 // vtable (aos.logger.MessageHeader):
308 // +0x0E | 16 00 | uint16_t | 0x0016 (22) | size of this vtable
309 buffer = Push<flatbuffers::voffset_t>(buffer, 0x16);
310 [[fallthrough]];
311 case 0x10u:
312 if ((end_byte) == 0x10u) {
313 break;
314 }
315 // +0x10 | 3C 00 | uint16_t | 0x003C (60) | size of referring table
316 buffer = Push<flatbuffers::voffset_t>(buffer, 0x3c);
317 // +0x12 | 38 00 | VOffset16 | 0x0038 (56) | offset to field `channel_index` (id: 0)
318 buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
319 // +0x14 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `monotonic_sent_time` (id: 1)
320 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
321 // +0x16 | 24 00 | VOffset16 | 0x0024 (36) | offset to field `realtime_sent_time` (id: 2)
322 buffer = Push<flatbuffers::voffset_t>(buffer, 0x24);
323 [[fallthrough]];
324 case 0x18u:
325 if ((end_byte) == 0x18u) {
326 break;
327 }
328 // +0x18 | 34 00 | VOffset16 | 0x0034 (52) | offset to field `queue_index` (id: 3)
329 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
330 // +0x1A | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
331 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
332 // +0x1C | 1C 00 | VOffset16 | 0x001C (28) | offset to field `monotonic_remote_time` (id: 5)
333 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
334 // +0x1E | 14 00 | VOffset16 | 0x0014 (20) | offset to field `realtime_remote_time` (id: 6)
335 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
336 [[fallthrough]];
337 case 0x20u:
338 if ((end_byte) == 0x20u) {
339 break;
340 }
341 // +0x20 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `remote_queue_index` (id: 7)
342 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
343 // +0x22 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `monotonic_timestamp_time` (id: 8)
344 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
345 //
346 // root_table (aos.logger.MessageHeader):
347 // +0x24 | 16 00 00 00 | SOffset32 | 0x00000016 (22) Loc: +0x0E | offset to vtable
348 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x16);
349 [[fallthrough]];
350 case 0x28u:
351 if ((end_byte) == 0x28u) {
352 break;
353 }
354 // +0x28 | F6 0B D8 11 A4 A8 B1 71 | int64_t | 0x71B1A8A411D80BF6 (8192514619791117302) | table field `monotonic_timestamp_time` (Long)
355 buffer = Push<int64_t>(buffer,
356 monotonic_timestamp_time.time_since_epoch().count());
357 [[fallthrough]];
358 case 0x30u:
359 if ((end_byte) == 0x30u) {
360 break;
361 }
362 // +0x30 | 00 00 00 00 | uint8_t[4] | .... | padding
363 // TODO(austin): Can we re-arrange the order to ditch the padding?
364 // (Answer is yes, but what is the impact elsewhere? It will change the
365 // binary format)
366 buffer = Pad(buffer, 4);
367 // +0x34 | 75 00 00 00 | uint32_t | 0x00000075 (117) | table field `remote_queue_index` (UInt)
368 buffer = Push<uint32_t>(buffer, msg->remote_queue_index());
369 [[fallthrough]];
370 case 0x38u:
371 if ((end_byte) == 0x38u) {
372 break;
373 }
374 // +0x38 | AA B0 43 0A 35 BE FA D2 | int64_t | 0xD2FABE350A43B0AA (-3244071446552268630) | table field `realtime_remote_time` (Long)
375 buffer = Push<int64_t>(buffer, msg->realtime_remote_time());
376 [[fallthrough]];
377 case 0x40u:
378 if ((end_byte) == 0x40u) {
379 break;
380 }
381 // +0x40 | D5 40 30 F3 C1 A7 26 1D | int64_t | 0x1D26A7C1F33040D5 (2100550727665467605) | table field `monotonic_remote_time` (Long)
382 buffer = Push<int64_t>(buffer, msg->monotonic_remote_time());
383 [[fallthrough]];
384 case 0x48u:
385 if ((end_byte) == 0x48u) {
386 break;
387 }
388 // +0x48 | 5B 25 32 A1 4A E8 46 CA | int64_t | 0xCA46E84AA132255B (-3871151422448720549) | table field `realtime_sent_time` (Long)
389 buffer = Push<int64_t>(buffer, msg->realtime_sent_time());
390 [[fallthrough]];
391 case 0x50u:
392 if ((end_byte) == 0x50u) {
393 break;
394 }
395 // +0x50 | 49 7D 45 1F 8C 36 6B A3 | int64_t | 0xA36B368C1F457D49 (-6671178447571288759) | table field `monotonic_sent_time` (Long)
396 buffer = Push<int64_t>(buffer, msg->monotonic_sent_time());
397 [[fallthrough]];
398 case 0x58u:
399 if ((end_byte) == 0x58u) {
400 break;
401 }
402 // +0x58 | 33 00 00 00 | uint32_t | 0x00000033 (51) | table field `queue_index` (UInt)
403 buffer = Push<uint32_t>(buffer, msg->queue_index());
404 // +0x5C | 76 00 00 00 | uint32_t | 0x00000076 (118) | table field `channel_index` (UInt)
405 buffer = Push<uint32_t>(buffer, channel_index);
406 // clang-format on
407 [[fallthrough]];
408 case 0x60u:
409 if ((end_byte) == 0x60u) {
410 break;
411 }
412 }
Austin Schuhf2d0e682022-10-16 14:20:58 -0700413
Austin Schuh71a40d42023-02-04 21:22:22 -0800414 return end_byte - start_byte;
Austin Schuhf2d0e682022-10-16 14:20:58 -0700415}
416
Austin Schuha36c8902019-12-30 18:07:15 -0800417flatbuffers::Offset<MessageHeader> PackMessage(
418 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
419 int channel_index, LogType log_type) {
420 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
421
422 switch (log_type) {
423 case LogType::kLogMessage:
424 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800425 case LogType::kLogRemoteMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700426 // Since the timestamps are 8 byte aligned, we are going to end up adding
427 // padding in the middle of the message to pad everything out to 8 byte
428 // alignment. That's rather wasteful. To make things efficient to mmap
429 // while reading uncompressed logs, we'd actually rather the message be
430 // aligned. So, force 8 byte alignment (enough to preserve alignment
431 // inside the nested message so that we can read it without moving it)
432 // here.
433 fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8);
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700434 data_offset = fbb->CreateVector(
435 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800436 break;
437
438 case LogType::kLogDeliveryTimeOnly:
439 break;
440 }
441
442 MessageHeader::Builder message_header_builder(*fbb);
443 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800444
Austin Schuhfa30c352022-10-16 11:12:02 -0700445 // These are split out into very explicit serialization calls because the
446 // order here changes the order things are written out on the wire, and we
447 // want to control and understand it here. Changing the order can increase
448 // the amount of padding bytes in the middle.
449 //
James Kuszmaul9776b392023-01-14 14:08:08 -0800450 // It is also easier to follow... And doesn't actually make things much
451 // bigger.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800452 switch (log_type) {
453 case LogType::kLogRemoteMessage:
454 message_header_builder.add_queue_index(context.remote_queue_index);
Austin Schuhfa30c352022-10-16 11:12:02 -0700455 message_header_builder.add_data(data_offset);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800456 message_header_builder.add_monotonic_sent_time(
457 context.monotonic_remote_time.time_since_epoch().count());
458 message_header_builder.add_realtime_sent_time(
459 context.realtime_remote_time.time_since_epoch().count());
460 break;
461
Austin Schuh6f3babe2020-01-26 20:34:50 -0800462 case LogType::kLogDeliveryTimeOnly:
463 message_header_builder.add_queue_index(context.queue_index);
464 message_header_builder.add_monotonic_sent_time(
465 context.monotonic_event_time.time_since_epoch().count());
466 message_header_builder.add_realtime_sent_time(
467 context.realtime_event_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800468 message_header_builder.add_monotonic_remote_time(
469 context.monotonic_remote_time.time_since_epoch().count());
470 message_header_builder.add_realtime_remote_time(
471 context.realtime_remote_time.time_since_epoch().count());
472 message_header_builder.add_remote_queue_index(context.remote_queue_index);
473 break;
Austin Schuhfa30c352022-10-16 11:12:02 -0700474
475 case LogType::kLogMessage:
476 message_header_builder.add_queue_index(context.queue_index);
477 message_header_builder.add_data(data_offset);
478 message_header_builder.add_monotonic_sent_time(
479 context.monotonic_event_time.time_since_epoch().count());
480 message_header_builder.add_realtime_sent_time(
481 context.realtime_event_time.time_since_epoch().count());
482 break;
483
484 case LogType::kLogMessageAndDeliveryTime:
485 message_header_builder.add_queue_index(context.queue_index);
486 message_header_builder.add_remote_queue_index(context.remote_queue_index);
487 message_header_builder.add_monotonic_sent_time(
488 context.monotonic_event_time.time_since_epoch().count());
489 message_header_builder.add_realtime_sent_time(
490 context.realtime_event_time.time_since_epoch().count());
491 message_header_builder.add_monotonic_remote_time(
492 context.monotonic_remote_time.time_since_epoch().count());
493 message_header_builder.add_realtime_remote_time(
494 context.realtime_remote_time.time_since_epoch().count());
495 message_header_builder.add_data(data_offset);
496 break;
Austin Schuha36c8902019-12-30 18:07:15 -0800497 }
498
499 return message_header_builder.Finish();
500}
501
Austin Schuhfa30c352022-10-16 11:12:02 -0700502flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) {
503 switch (log_type) {
504 case LogType::kLogMessage:
505 return
506 // Root table size + offset.
507 sizeof(flatbuffers::uoffset_t) * 2 +
508 // 6 padding bytes to pad the header out properly.
509 6 +
510 // vtable header (size + size of table)
511 sizeof(flatbuffers::voffset_t) * 2 +
512 // offsets to all the fields.
513 sizeof(flatbuffers::voffset_t) * 5 +
514 // pointer to vtable
515 sizeof(flatbuffers::soffset_t) +
516 // pointer to data
517 sizeof(flatbuffers::uoffset_t) +
518 // realtime_sent_time, monotonic_sent_time
519 sizeof(int64_t) * 2 +
520 // queue_index, channel_index
521 sizeof(uint32_t) * 2;
522
523 case LogType::kLogDeliveryTimeOnly:
524 return
525 // Root table size + offset.
526 sizeof(flatbuffers::uoffset_t) * 2 +
527 // 6 padding bytes to pad the header out properly.
528 4 +
529 // vtable header (size + size of table)
530 sizeof(flatbuffers::voffset_t) * 2 +
531 // offsets to all the fields.
532 sizeof(flatbuffers::voffset_t) * 8 +
533 // pointer to vtable
534 sizeof(flatbuffers::soffset_t) +
535 // remote_queue_index
536 sizeof(uint32_t) +
537 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
538 // monotonic_sent_time
539 sizeof(int64_t) * 4 +
540 // queue_index, channel_index
541 sizeof(uint32_t) * 2;
542
543 case LogType::kLogMessageAndDeliveryTime:
544 return
545 // Root table size + offset.
546 sizeof(flatbuffers::uoffset_t) * 2 +
547 // 4 padding bytes to pad the header out properly.
548 4 +
549 // vtable header (size + size of table)
550 sizeof(flatbuffers::voffset_t) * 2 +
551 // offsets to all the fields.
552 sizeof(flatbuffers::voffset_t) * 8 +
553 // pointer to vtable
554 sizeof(flatbuffers::soffset_t) +
555 // pointer to data
556 sizeof(flatbuffers::uoffset_t) +
557 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
558 // monotonic_sent_time
559 sizeof(int64_t) * 4 +
560 // remote_queue_index, queue_index, channel_index
561 sizeof(uint32_t) * 3;
562
563 case LogType::kLogRemoteMessage:
564 return
565 // Root table size + offset.
566 sizeof(flatbuffers::uoffset_t) * 2 +
567 // 6 padding bytes to pad the header out properly.
568 6 +
569 // vtable header (size + size of table)
570 sizeof(flatbuffers::voffset_t) * 2 +
571 // offsets to all the fields.
572 sizeof(flatbuffers::voffset_t) * 5 +
573 // pointer to vtable
574 sizeof(flatbuffers::soffset_t) +
575 // realtime_sent_time, monotonic_sent_time
576 sizeof(int64_t) * 2 +
577 // pointer to data
578 sizeof(flatbuffers::uoffset_t) +
579 // queue_index, channel_index
580 sizeof(uint32_t) * 2;
581 }
582 LOG(FATAL);
583}
584
James Kuszmaul9776b392023-01-14 14:08:08 -0800585flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size) {
Austin Schuhfa30c352022-10-16 11:12:02 -0700586 static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
587 "Update size logic please.");
588 const flatbuffers::uoffset_t aligned_data_length =
Austin Schuh48d10d62022-10-16 22:19:23 -0700589 ((data_size + 7) & 0xfffffff8u);
Austin Schuhfa30c352022-10-16 11:12:02 -0700590 switch (log_type) {
591 case LogType::kLogDeliveryTimeOnly:
592 return PackMessageHeaderSize(log_type);
593
594 case LogType::kLogMessage:
595 case LogType::kLogMessageAndDeliveryTime:
596 case LogType::kLogRemoteMessage:
597 return PackMessageHeaderSize(log_type) +
598 // Vector...
599 sizeof(flatbuffers::uoffset_t) + aligned_data_length;
600 }
601 LOG(FATAL);
602}
603
Austin Schuhfa30c352022-10-16 11:12:02 -0700604size_t PackMessageInline(uint8_t *buffer, const Context &context,
Austin Schuh71a40d42023-02-04 21:22:22 -0800605 int channel_index, LogType log_type, size_t start_byte,
606 size_t end_byte) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700607 // TODO(austin): Figure out how to copy directly from shared memory instead of
608 // first into the fetcher's memory and then into here. That would save a lot
609 // of memory.
Austin Schuhfa30c352022-10-16 11:12:02 -0700610 const flatbuffers::uoffset_t message_size =
Austin Schuh48d10d62022-10-16 22:19:23 -0700611 PackMessageSize(log_type, context.size);
Austin Schuh71a40d42023-02-04 21:22:22 -0800612 DCHECK_EQ((message_size % 8), 0u) << ": Non 8 byte length...";
613 DCHECK_EQ((start_byte % 8u), 0u);
614 DCHECK_EQ((end_byte % 8u), 0u);
615 DCHECK_LE(start_byte, end_byte);
616 DCHECK_LE(end_byte, message_size);
Austin Schuhfa30c352022-10-16 11:12:02 -0700617
618 // Pack all the data in. This is brittle but easy to change. Use the
619 // InlinePackMessage.Equivilent unit test to verify everything matches.
620 switch (log_type) {
621 case LogType::kLogMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -0800622 switch (start_byte) {
623 case 0x00u:
624 if ((end_byte) == 0x00u) {
625 break;
626 }
627 // clang-format off
628 // header:
629 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
630 buffer = Push<flatbuffers::uoffset_t>(
631 buffer, message_size - sizeof(flatbuffers::uoffset_t));
632
633 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
634 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
635 [[fallthrough]];
636 case 0x08u:
637 if ((end_byte) == 0x08u) {
638 break;
639 }
640 //
641 // padding:
642 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
643 buffer = Pad(buffer, 6);
644 //
645 // vtable (aos.logger.MessageHeader):
646 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
647 buffer = Push<flatbuffers::voffset_t>(buffer, 0xe);
648 [[fallthrough]];
649 case 0x10u:
650 if ((end_byte) == 0x10u) {
651 break;
652 }
653 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
654 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
655 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
656 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
657 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
658 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
659 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
660 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
661 [[fallthrough]];
662 case 0x18u:
663 if ((end_byte) == 0x18u) {
664 break;
665 }
666 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
667 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
668 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
669 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
670 //
671 // root_table (aos.logger.MessageHeader):
672 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
673 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e);
674 [[fallthrough]];
675 case 0x20u:
676 if ((end_byte) == 0x20u) {
677 break;
678 }
679 // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long)
680 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
681 [[fallthrough]];
682 case 0x28u:
683 if ((end_byte) == 0x28u) {
684 break;
685 }
686 // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long)
687 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
688 [[fallthrough]];
689 case 0x30u:
690 if ((end_byte) == 0x30u) {
691 break;
692 }
693 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
694 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c);
695 // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt)
696 buffer = Push<uint32_t>(buffer, context.queue_index);
697 [[fallthrough]];
698 case 0x38u:
699 if ((end_byte) == 0x38u) {
700 break;
701 }
702 // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt)
703 buffer = Push<uint32_t>(buffer, channel_index);
704 //
705 // vector (aos.logger.MessageHeader.data):
706 // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items)
707 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
708 [[fallthrough]];
709 case 0x40u:
710 if ((end_byte) == 0x40u) {
711 break;
712 }
713 [[fallthrough]];
714 default:
715 // +0x40 | FF | uint8_t | 0xFF (255) | value[0]
716 // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1]
717 // +0x42 | EE | uint8_t | 0xEE (238) | value[2]
718 // +0x43 | 00 | uint8_t | 0x00 (0) | value[3]
719 // +0x44 | 20 | uint8_t | 0x20 (32) | value[4]
720 // +0x45 | 4D | uint8_t | 0x4D (77) | value[5]
721 // +0x46 | FF | uint8_t | 0xFF (255) | value[6]
722 // +0x47 | 25 | uint8_t | 0x25 (37) | value[7]
723 // +0x48 | 3C | uint8_t | 0x3C (60) | value[8]
724 // +0x49 | 17 | uint8_t | 0x17 (23) | value[9]
725 // +0x4A | 65 | uint8_t | 0x65 (101) | value[10]
726 // +0x4B | 2F | uint8_t | 0x2F (47) | value[11]
727 // +0x4C | 63 | uint8_t | 0x63 (99) | value[12]
728 // +0x4D | 58 | uint8_t | 0x58 (88) | value[13]
729 //
730 // padding:
731 // +0x4E | 00 00 | uint8_t[2] | .. | padding
732 // clang-format on
733 if (start_byte <= 0x40 && end_byte == message_size) {
734 // The easy one, slap it all down.
735 buffer = PushBytes(buffer, context.data, context.size);
736 buffer =
737 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
738 } else {
739 const size_t data_start_byte =
740 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
741 const size_t data_end_byte = end_byte - 0x40;
742 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
743 if (data_start_byte < padded_size) {
744 buffer = PushBytes(
745 buffer,
746 reinterpret_cast<const uint8_t *>(context.data) +
747 data_start_byte,
748 std::min(context.size, data_end_byte) - data_start_byte);
749 if (data_end_byte == padded_size) {
750 // We can only pad the last 7 bytes, so this only gets written
751 // if we write the last byte.
752 buffer = Pad(buffer,
753 ((context.size + 7) & 0xfffffff8u) - context.size);
754 }
755 }
756 }
757 break;
758 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700759 break;
760
761 case LogType::kLogDeliveryTimeOnly:
Austin Schuh71a40d42023-02-04 21:22:22 -0800762 switch (start_byte) {
763 case 0x00u:
764 if ((end_byte) == 0x00u) {
765 break;
766 }
767 // clang-format off
768 // header:
769 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
770 buffer = Push<flatbuffers::uoffset_t>(
771 buffer, message_size - sizeof(flatbuffers::uoffset_t));
772 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
773 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
Austin Schuhfa30c352022-10-16 11:12:02 -0700774
Austin Schuh71a40d42023-02-04 21:22:22 -0800775 [[fallthrough]];
776 case 0x08u:
777 if ((end_byte) == 0x08u) {
778 break;
779 }
780 //
781 // padding:
782 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
783 buffer = Pad(buffer, 4);
784 //
785 // vtable (aos.logger.MessageHeader):
786 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
787 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
788 // +0x0E | 30 00 | uint16_t | 0x0030 (48) | size of referring table
789 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
790 [[fallthrough]];
791 case 0x10u:
792 if ((end_byte) == 0x10u) {
793 break;
794 }
795 // +0x10 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `channel_index` (id: 0)
796 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
797 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
798 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
799 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
800 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
801 // +0x16 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `queue_index` (id: 3)
802 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
803 [[fallthrough]];
804 case 0x18u:
805 if ((end_byte) == 0x18u) {
806 break;
807 }
808 // +0x18 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
809 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
810 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
811 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
812 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
813 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
814 // +0x1E | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
815 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
816 [[fallthrough]];
817 case 0x20u:
818 if ((end_byte) == 0x20u) {
819 break;
820 }
821 //
822 // root_table (aos.logger.MessageHeader):
823 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
824 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
825 // +0x24 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `remote_queue_index` (UInt)
826 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
827 [[fallthrough]];
828 case 0x28u:
829 if ((end_byte) == 0x28u) {
830 break;
831 }
832 // +0x28 | C6 85 F1 AB 83 B5 CD EB | int64_t | 0xEBCDB583ABF185C6 (-1455307527440726586) | table field `realtime_remote_time` (Long)
833 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
834 [[fallthrough]];
835 case 0x30u:
836 if ((end_byte) == 0x30u) {
837 break;
838 }
839 // +0x30 | 47 24 D3 97 1E 42 2D 99 | int64_t | 0x992D421E97D32447 (-7409193112790948793) | table field `monotonic_remote_time` (Long)
840 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
841 [[fallthrough]];
842 case 0x38u:
843 if ((end_byte) == 0x38u) {
844 break;
845 }
846 // +0x38 | C8 B9 A7 AB 79 F2 CD 60 | int64_t | 0x60CDF279ABA7B9C8 (6975498002251626952) | table field `realtime_sent_time` (Long)
847 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
848 [[fallthrough]];
849 case 0x40u:
850 if ((end_byte) == 0x40u) {
851 break;
852 }
853 // +0x40 | EA 8F 2A 0F AF 01 7A AB | int64_t | 0xAB7A01AF0F2A8FEA (-6090553694679822358) | table field `monotonic_sent_time` (Long)
854 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
855 [[fallthrough]];
856 case 0x48u:
857 if ((end_byte) == 0x48u) {
858 break;
859 }
860 // +0x48 | F5 00 00 00 | uint32_t | 0x000000F5 (245) | table field `queue_index` (UInt)
861 buffer = Push<uint32_t>(buffer, context.queue_index);
862 // +0x4C | 88 00 00 00 | uint32_t | 0x00000088 (136) | table field `channel_index` (UInt)
863 buffer = Push<uint32_t>(buffer, channel_index);
864
865 // clang-format on
866 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700867 break;
868
869 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh71a40d42023-02-04 21:22:22 -0800870 switch (start_byte) {
871 case 0x00u:
872 if ((end_byte) == 0x00u) {
873 break;
874 }
875 // clang-format off
876 // header:
877 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
878 buffer = Push<flatbuffers::uoffset_t>(
879 buffer, message_size - sizeof(flatbuffers::uoffset_t));
880 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
881 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
882 [[fallthrough]];
883 case 0x08u:
884 if ((end_byte) == 0x08u) {
885 break;
886 }
887 //
888 // padding:
889 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
890 buffer = Pad(buffer, 4);
891 //
892 // vtable (aos.logger.MessageHeader):
893 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
894 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
895 // +0x0E | 34 00 | uint16_t | 0x0034 (52) | size of referring table
896 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
897 [[fallthrough]];
898 case 0x10u:
899 if ((end_byte) == 0x10u) {
900 break;
901 }
902 // +0x10 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `channel_index` (id: 0)
903 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
904 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
905 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
906 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
907 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
908 // +0x16 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `queue_index` (id: 3)
909 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
910 [[fallthrough]];
911 case 0x18u:
912 if ((end_byte) == 0x18u) {
913 break;
914 }
915 // +0x18 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `data` (id: 4)
916 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
917 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
918 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
919 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
920 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
921 // +0x1E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `remote_queue_index` (id: 7)
922 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
923 [[fallthrough]];
924 case 0x20u:
925 if ((end_byte) == 0x20u) {
926 break;
927 }
928 //
929 // root_table (aos.logger.MessageHeader):
930 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
931 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
932 // +0x24 | 30 00 00 00 | UOffset32 | 0x00000030 (48) Loc: +0x54 | offset to field `data` (vector)
933 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x30);
934 [[fallthrough]];
935 case 0x28u:
936 if ((end_byte) == 0x28u) {
937 break;
938 }
939 // +0x28 | C4 C8 87 BF 40 6C 1F 29 | int64_t | 0x291F6C40BF87C8C4 (2963206105180129476) | table field `realtime_remote_time` (Long)
940 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
941 [[fallthrough]];
942 case 0x30u:
943 if ((end_byte) == 0x30u) {
944 break;
945 }
946 // +0x30 | 0F 00 26 FD D2 6D C0 1F | int64_t | 0x1FC06DD2FD26000F (2287949363661897743) | table field `monotonic_remote_time` (Long)
947 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
948 [[fallthrough]];
949 case 0x38u:
950 if ((end_byte) == 0x38u) {
951 break;
952 }
953 // +0x38 | 29 75 09 C0 73 73 BF 88 | int64_t | 0x88BF7373C0097529 (-8593022623019338455) | table field `realtime_sent_time` (Long)
954 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
955 [[fallthrough]];
956 case 0x40u:
957 if ((end_byte) == 0x40u) {
958 break;
959 }
960 // +0x40 | 6D 8A AE 04 50 25 9C E9 | int64_t | 0xE99C255004AE8A6D (-1613373540899321235) | table field `monotonic_sent_time` (Long)
961 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
962 [[fallthrough]];
963 case 0x48u:
964 if ((end_byte) == 0x48u) {
965 break;
966 }
967 // +0x48 | 47 00 00 00 | uint32_t | 0x00000047 (71) | table field `remote_queue_index` (UInt)
968 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
969 // +0x4C | 4C 00 00 00 | uint32_t | 0x0000004C (76) | table field `queue_index` (UInt)
970 buffer = Push<uint32_t>(buffer, context.queue_index);
971 [[fallthrough]];
972 case 0x50u:
973 if ((end_byte) == 0x50u) {
974 break;
975 }
976 // +0x50 | 72 00 00 00 | uint32_t | 0x00000072 (114) | table field `channel_index` (UInt)
977 buffer = Push<uint32_t>(buffer, channel_index);
978 //
979 // vector (aos.logger.MessageHeader.data):
980 // +0x54 | 07 00 00 00 | uint32_t | 0x00000007 (7) | length of vector (# items)
981 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
982 [[fallthrough]];
983 case 0x58u:
984 if ((end_byte) == 0x58u) {
985 break;
986 }
987 [[fallthrough]];
988 default:
989 // +0x58 | B1 | uint8_t | 0xB1 (177) | value[0]
990 // +0x59 | 4A | uint8_t | 0x4A (74) | value[1]
991 // +0x5A | 50 | uint8_t | 0x50 (80) | value[2]
992 // +0x5B | 24 | uint8_t | 0x24 (36) | value[3]
993 // +0x5C | AF | uint8_t | 0xAF (175) | value[4]
994 // +0x5D | C8 | uint8_t | 0xC8 (200) | value[5]
995 // +0x5E | D5 | uint8_t | 0xD5 (213) | value[6]
996 //
997 // padding:
998 // +0x5F | 00 | uint8_t[1] | . | padding
999 // clang-format on
1000
1001 if (start_byte <= 0x58 && end_byte == message_size) {
1002 // The easy one, slap it all down.
1003 buffer = PushBytes(buffer, context.data, context.size);
1004 buffer =
1005 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1006 } else {
1007 const size_t data_start_byte =
1008 start_byte < 0x58 ? 0x0u : (start_byte - 0x58);
1009 const size_t data_end_byte = end_byte - 0x58;
1010 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1011 if (data_start_byte < padded_size) {
1012 buffer = PushBytes(
1013 buffer,
1014 reinterpret_cast<const uint8_t *>(context.data) +
1015 data_start_byte,
1016 std::min(context.size, data_end_byte) - data_start_byte);
1017 if (data_end_byte == padded_size) {
1018 // We can only pad the last 7 bytes, so this only gets written
1019 // if we write the last byte.
1020 buffer = Pad(buffer,
1021 ((context.size + 7) & 0xfffffff8u) - context.size);
1022 }
1023 }
1024 }
1025
1026 break;
1027 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001028
1029 break;
1030
1031 case LogType::kLogRemoteMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -08001032 switch (start_byte) {
1033 case 0x00u:
1034 if ((end_byte) == 0x00u) {
1035 break;
1036 }
1037 // This is the message we need to recreate.
1038 //
1039 // clang-format off
1040 // header:
1041 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
1042 buffer = Push<flatbuffers::uoffset_t>(
1043 buffer, message_size - sizeof(flatbuffers::uoffset_t));
1044 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
1045 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
1046 [[fallthrough]];
1047 case 0x08u:
1048 if ((end_byte) == 0x08u) {
1049 break;
1050 }
1051 //
1052 // padding:
1053 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1054 buffer = Pad(buffer, 6);
1055 //
1056 // vtable (aos.logger.MessageHeader):
1057 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
1058 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e);
1059 [[fallthrough]];
1060 case 0x10u:
1061 if ((end_byte) == 0x10u) {
1062 break;
1063 }
1064 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
1065 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
1066 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
1067 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
1068 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
1069 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
1070 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
1071 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
1072 [[fallthrough]];
1073 case 0x18u:
1074 if ((end_byte) == 0x18u) {
1075 break;
1076 }
1077 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
1078 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
1079 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
1080 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
1081 //
1082 // root_table (aos.logger.MessageHeader):
1083 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
1084 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E);
1085 [[fallthrough]];
1086 case 0x20u:
1087 if ((end_byte) == 0x20u) {
1088 break;
1089 }
1090 // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long)
1091 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
1092 [[fallthrough]];
1093 case 0x28u:
1094 if ((end_byte) == 0x28u) {
1095 break;
1096 }
1097 // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long)
1098 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
1099 [[fallthrough]];
1100 case 0x30u:
1101 if ((end_byte) == 0x30u) {
1102 break;
1103 }
1104 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
1105 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C);
1106 // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt)
1107 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1108 [[fallthrough]];
1109 case 0x38u:
1110 if ((end_byte) == 0x38u) {
1111 break;
1112 }
1113 // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt)
1114 buffer = Push<uint32_t>(buffer, channel_index);
1115 //
1116 // vector (aos.logger.MessageHeader.data):
1117 // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items)
1118 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
1119 [[fallthrough]];
1120 case 0x40u:
1121 if ((end_byte) == 0x40u) {
1122 break;
1123 }
1124 [[fallthrough]];
1125 default:
1126 // +0x40 | 38 | uint8_t | 0x38 (56) | value[0]
1127 // +0x41 | 1A | uint8_t | 0x1A (26) | value[1]
1128 // ...
1129 // +0x58 | 90 | uint8_t | 0x90 (144) | value[24]
1130 // +0x59 | 92 | uint8_t | 0x92 (146) | value[25]
1131 //
1132 // padding:
1133 // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1134 // clang-format on
1135 if (start_byte <= 0x40 && end_byte == message_size) {
1136 // The easy one, slap it all down.
1137 buffer = PushBytes(buffer, context.data, context.size);
1138 buffer =
1139 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1140 } else {
1141 const size_t data_start_byte =
1142 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
1143 const size_t data_end_byte = end_byte - 0x40;
1144 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1145 if (data_start_byte < padded_size) {
1146 buffer = PushBytes(
1147 buffer,
1148 reinterpret_cast<const uint8_t *>(context.data) +
1149 data_start_byte,
1150 std::min(context.size, data_end_byte) - data_start_byte);
1151 if (data_end_byte == padded_size) {
1152 // We can only pad the last 7 bytes, so this only gets written
1153 // if we write the last byte.
1154 buffer = Pad(buffer,
1155 ((context.size + 7) & 0xfffffff8u) - context.size);
1156 }
1157 }
1158 }
1159 break;
1160 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001161 }
1162
Austin Schuh71a40d42023-02-04 21:22:22 -08001163 return end_byte - start_byte;
Austin Schuhfa30c352022-10-16 11:12:02 -07001164}
1165
Austin Schuhcd368422021-11-22 21:23:29 -08001166SpanReader::SpanReader(std::string_view filename, bool quiet)
1167 : filename_(filename) {
Austin Schuh86110712022-09-16 15:40:54 -07001168 static constexpr std::string_view kS3 = "s3:";
1169 if (filename.substr(0, kS3.size()) == kS3) {
1170#if ENABLE_S3
1171 decoder_ = std::make_unique<S3Fetcher>(filename);
1172#else
1173 LOG(FATAL) << "Reading files from S3 not supported on this platform";
1174#endif
1175 } else {
1176 decoder_ = std::make_unique<DummyDecoder>(filename);
1177 }
Tyler Chatow2015bc62021-08-04 21:15:09 -07001178
1179 static constexpr std::string_view kXz = ".xz";
James Kuszmauldd0a5042021-10-28 23:38:04 -07001180 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001181 if (filename.substr(filename.size() - kXz.size()) == kXz) {
1182#if ENABLE_LZMA
Austin Schuhcd368422021-11-22 21:23:29 -08001183 decoder_ =
1184 std::make_unique<ThreadedLzmaDecoder>(std::move(decoder_), quiet);
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001185#else
Austin Schuhcd368422021-11-22 21:23:29 -08001186 (void)quiet;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001187 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
1188#endif
James Kuszmauldd0a5042021-10-28 23:38:04 -07001189 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
1190 decoder_ = std::make_unique<SnappyDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001191 }
Austin Schuh05b70472020-01-01 17:11:17 -08001192}
1193
Austin Schuhcf5f6442021-07-06 10:43:28 -07001194absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001195 // Make sure we have enough for the size.
1196 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
1197 if (!ReadBlock()) {
1198 return absl::Span<const uint8_t>();
1199 }
1200 }
1201
1202 // Now make sure we have enough for the message.
1203 const size_t data_size =
1204 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1205 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -08001206 if (data_size == sizeof(flatbuffers::uoffset_t)) {
1207 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
1208 LOG(ERROR) << " Rest of log file is "
1209 << absl::BytesToHexString(std::string_view(
1210 reinterpret_cast<const char *>(data_.data() +
1211 consumed_data_),
1212 data_.size() - consumed_data_));
1213 return absl::Span<const uint8_t>();
1214 }
Austin Schuh05b70472020-01-01 17:11:17 -08001215 while (data_.size() < consumed_data_ + data_size) {
1216 if (!ReadBlock()) {
1217 return absl::Span<const uint8_t>();
1218 }
1219 }
1220
1221 // And return it, consuming the data.
1222 const uint8_t *data_ptr = data_.data() + consumed_data_;
1223
Austin Schuh05b70472020-01-01 17:11:17 -08001224 return absl::Span<const uint8_t>(data_ptr, data_size);
1225}
1226
Austin Schuhcf5f6442021-07-06 10:43:28 -07001227void SpanReader::ConsumeMessage() {
Brian Smarttea913d42021-12-10 15:02:38 -08001228 size_t consumed_size =
Austin Schuhcf5f6442021-07-06 10:43:28 -07001229 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1230 sizeof(flatbuffers::uoffset_t);
Brian Smarttea913d42021-12-10 15:02:38 -08001231 consumed_data_ += consumed_size;
1232 total_consumed_ += consumed_size;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001233}
1234
1235absl::Span<const uint8_t> SpanReader::ReadMessage() {
1236 absl::Span<const uint8_t> result = PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001237 if (!result.empty()) {
Austin Schuhcf5f6442021-07-06 10:43:28 -07001238 ConsumeMessage();
Brian Smarttea913d42021-12-10 15:02:38 -08001239 } else {
1240 is_finished_ = true;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001241 }
1242 return result;
1243}
1244
Austin Schuh05b70472020-01-01 17:11:17 -08001245bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001246 // This is the amount of data we grab at a time. Doing larger chunks minimizes
1247 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -08001248 constexpr size_t kReadSize = 256 * 1024;
1249
1250 // Strip off any unused data at the front.
1251 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001252 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -08001253 consumed_data_ = 0;
1254 }
1255
1256 const size_t starting_size = data_.size();
1257
1258 // This should automatically grow the backing store. It won't shrink if we
1259 // get a small chunk later. This reduces allocations when we want to append
1260 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -07001261 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -08001262
Brian Silvermanf51499a2020-09-21 12:49:08 -07001263 const size_t count =
1264 decoder_->Read(data_.begin() + starting_size, data_.end());
1265 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -08001266 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -08001267 return false;
1268 }
Austin Schuh05b70472020-01-01 17:11:17 -08001269
Brian Smarttea913d42021-12-10 15:02:38 -08001270 total_read_ += count;
1271
Austin Schuh05b70472020-01-01 17:11:17 -08001272 return true;
1273}
1274
Austin Schuhadd6eb32020-11-09 21:24:26 -08001275std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -07001276 SpanReader *span_reader) {
1277 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001278
1279 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001280 if (config_data.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001281 return std::nullopt;
1282 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001283
Austin Schuh5212cad2020-09-09 23:12:09 -07001284 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001285 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -08001286 if (!result.Verify()) {
1287 return std::nullopt;
1288 }
Austin Schuh0e8db662021-07-06 10:43:47 -07001289
Austin Schuhcc2c9a52022-12-12 15:55:13 -08001290 // We only know of busted headers in the versions of the log file header
1291 // *before* the logger_sha1 field was added. At some point before that point,
1292 // the logic to track when a header has been written was rewritten in such a
1293 // way that it can't happen anymore. We've seen some logs where the body
1294 // parses as a header recently, so the simple solution of always looking is
1295 // failing us.
1296 if (FLAGS_workaround_double_headers && !result.message().has_logger_sha1()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001297 while (true) {
1298 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001299 if (maybe_header_data.empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001300 break;
1301 }
1302
1303 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
1304 maybe_header_data);
1305 if (maybe_header.Verify()) {
1306 LOG(WARNING) << "Found duplicate LogFileHeader in "
1307 << span_reader->filename();
1308 ResizeableBuffer header_data_copy;
1309 header_data_copy.resize(maybe_header_data.size());
1310 memcpy(header_data_copy.data(), maybe_header_data.begin(),
1311 header_data_copy.size());
1312 result = SizePrefixedFlatbufferVector<LogFileHeader>(
1313 std::move(header_data_copy));
1314
1315 span_reader->ConsumeMessage();
1316 } else {
1317 break;
1318 }
1319 }
1320 }
Austin Schuhe09beb12020-12-11 20:04:27 -08001321 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001322}
1323
Austin Schuh0e8db662021-07-06 10:43:47 -07001324std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
1325 std::string_view filename) {
1326 SpanReader span_reader(filename);
1327 return ReadHeader(&span_reader);
1328}
1329
Austin Schuhadd6eb32020-11-09 21:24:26 -08001330std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -08001331 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -07001332 SpanReader span_reader(filename);
1333 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
1334 for (size_t i = 0; i < n + 1; ++i) {
1335 data_span = span_reader.ReadMessage();
1336
1337 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001338 if (data_span.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001339 return std::nullopt;
1340 }
Austin Schuh5212cad2020-09-09 23:12:09 -07001341 }
1342
Brian Silverman354697a2020-09-22 21:06:32 -07001343 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001344 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -08001345 if (!result.Verify()) {
1346 return std::nullopt;
1347 }
1348 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -07001349}
1350
Austin Schuh05b70472020-01-01 17:11:17 -08001351MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -07001352 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -08001353 raw_log_file_header_(
1354 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001355 set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
1356 set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
1357
Austin Schuh0e8db662021-07-06 10:43:47 -07001358 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
1359 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -08001360
1361 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -07001362 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -08001363
Austin Schuh0e8db662021-07-06 10:43:47 -07001364 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -08001365
Austin Schuh5b728b72021-06-16 14:57:15 -07001366 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
1367
Brian Smarttea913d42021-12-10 15:02:38 -08001368 total_verified_before_ = span_reader_.TotalConsumed();
1369
Austin Schuhcde938c2020-02-02 17:30:07 -08001370 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -08001371 FLAGS_max_out_of_order > 0
1372 ? chrono::duration_cast<chrono::nanoseconds>(
1373 chrono::duration<double>(FLAGS_max_out_of_order))
1374 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -08001375
1376 VLOG(1) << "Opened " << filename << " as node "
1377 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -08001378}
1379
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001380std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001381 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001382 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001383 if (is_corrupted()) {
1384 LOG(ERROR) << "Total corrupted volumes: before = "
1385 << total_verified_before_
1386 << " | corrupted = " << total_corrupted_
1387 << " | during = " << total_verified_during_
1388 << " | after = " << total_verified_after_ << std::endl;
1389 }
1390
1391 if (span_reader_.IsIncomplete()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001392 LOG(ERROR) << "Unable to access some messages in " << filename() << " : "
1393 << span_reader_.TotalRead() << " bytes read, "
Brian Smarttea913d42021-12-10 15:02:38 -08001394 << span_reader_.TotalConsumed() << " bytes usable."
1395 << std::endl;
1396 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001397 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -08001398 }
1399
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001400 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
Brian Smarttea913d42021-12-10 15:02:38 -08001401
1402 if (crash_on_corrupt_message_flag_) {
1403 CHECK(msg.Verify()) << "Corrupted message at offset "
Austin Schuh60e77942022-05-16 17:48:24 -07001404 << total_verified_before_ << " found within "
1405 << filename()
Brian Smarttea913d42021-12-10 15:02:38 -08001406 << "; set --nocrash_on_corrupt_message to see summary;"
1407 << " also set --ignore_corrupt_messages to process"
1408 << " anyway";
1409
1410 } else if (!msg.Verify()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001411 LOG(ERROR) << "Corrupted message at offset " << total_verified_before_
Brian Smarttea913d42021-12-10 15:02:38 -08001412 << " from " << filename() << std::endl;
1413
1414 total_corrupted_ += msg_data.size();
1415
1416 while (true) {
1417 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1418
James Kuszmaul9776b392023-01-14 14:08:08 -08001419 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001420 if (!ignore_corrupt_messages_flag_) {
1421 LOG(ERROR) << "Total corrupted volumes: before = "
1422 << total_verified_before_
1423 << " | corrupted = " << total_corrupted_
1424 << " | during = " << total_verified_during_
1425 << " | after = " << total_verified_after_ << std::endl;
1426
1427 if (span_reader_.IsIncomplete()) {
1428 LOG(ERROR) << "Unable to access some messages in " << filename()
1429 << " : " << span_reader_.TotalRead() << " bytes read, "
1430 << span_reader_.TotalConsumed() << " bytes usable."
1431 << std::endl;
1432 }
1433 return nullptr;
1434 }
1435 break;
1436 }
1437
1438 SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
1439
1440 if (!next_msg.Verify()) {
1441 total_corrupted_ += msg_data.size();
1442 total_verified_during_ += total_verified_after_;
1443 total_verified_after_ = 0;
1444
1445 } else {
1446 total_verified_after_ += msg_data.size();
1447 if (ignore_corrupt_messages_flag_) {
1448 msg = next_msg;
1449 break;
1450 }
1451 }
1452 }
1453 }
1454
1455 if (is_corrupted()) {
1456 total_verified_after_ += msg_data.size();
1457 } else {
1458 total_verified_before_ += msg_data.size();
1459 }
Austin Schuh05b70472020-01-01 17:11:17 -08001460
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001461 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -07001462
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001463 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001464
1465 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -08001466
1467 if (VLOG_IS_ON(3)) {
1468 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
1469 } else if (VLOG_IS_ON(2)) {
1470 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
1471 msg_copy.mutable_message()->clear_data();
1472 VLOG(2) << "Read from " << filename() << " data "
1473 << FlatbufferToJson(msg_copy);
1474 }
1475
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001476 return result;
1477}
1478
1479std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
1480 const MessageHeader &message) {
1481 const size_t data_size = message.has_data() ? message.data()->size() : 0;
1482
1483 UnpackedMessageHeader *const unpacked_message =
1484 reinterpret_cast<UnpackedMessageHeader *>(
1485 malloc(sizeof(UnpackedMessageHeader) + data_size +
1486 kChannelDataAlignment - 1));
1487
1488 CHECK(message.has_channel_index());
1489 CHECK(message.has_monotonic_sent_time());
1490
1491 absl::Span<uint8_t> span;
1492 if (data_size > 0) {
1493 span =
1494 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
1495 &unpacked_message->actual_data[0], data_size)),
1496 data_size);
1497 }
1498
Austin Schuh826e6ce2021-11-18 20:33:10 -08001499 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001500 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001501 monotonic_remote_time = aos::monotonic_clock::time_point(
1502 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001503 }
1504 std::optional<realtime_clock::time_point> realtime_remote_time;
1505 if (message.has_realtime_remote_time()) {
1506 realtime_remote_time = realtime_clock::time_point(
1507 chrono::nanoseconds(message.realtime_remote_time()));
1508 }
1509
1510 std::optional<uint32_t> remote_queue_index;
1511 if (message.has_remote_queue_index()) {
1512 remote_queue_index = message.remote_queue_index();
1513 }
1514
James Kuszmaul9776b392023-01-14 14:08:08 -08001515 new (unpacked_message) UnpackedMessageHeader(
1516 message.channel_index(),
1517 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001518 chrono::nanoseconds(message.monotonic_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001519 realtime_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001520 chrono::nanoseconds(message.realtime_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001521 message.queue_index(), monotonic_remote_time, realtime_remote_time,
1522 remote_queue_index,
1523 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001524 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001525 message.has_monotonic_timestamp_time(), span);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001526
1527 if (data_size > 0) {
1528 memcpy(span.data(), message.data()->data(), data_size);
1529 }
1530
1531 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
1532 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -08001533}
1534
Austin Schuhc41603c2020-10-11 16:17:37 -07001535PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001536 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001537 if (parts_.parts.size() >= 2) {
1538 next_message_reader_.emplace(parts_.parts[1]);
1539 }
Austin Schuh48507722021-07-17 17:29:24 -07001540 ComputeBootCounts();
1541}
1542
1543void PartsMessageReader::ComputeBootCounts() {
1544 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
1545 std::nullopt);
1546
1547 // We have 3 vintages of log files with different amounts of information.
1548 if (log_file_header()->has_boot_uuids()) {
1549 // The new hotness with the boots explicitly listed out. We can use the log
1550 // file header to compute the boot count of all relevant nodes.
1551 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
1552 size_t node_index = 0;
1553 for (const flatbuffers::String *boot_uuid :
1554 *log_file_header()->boot_uuids()) {
1555 CHECK(parts_.boots);
1556 if (boot_uuid->size() != 0) {
1557 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
1558 if (it != parts_.boots->boot_count_map.end()) {
1559 boot_counts_[node_index] = it->second;
1560 }
1561 } else if (parts().boots->boots[node_index].size() == 1u) {
1562 boot_counts_[node_index] = 0;
1563 }
1564 ++node_index;
1565 }
1566 } else {
1567 // Older multi-node logs which are guarenteed to have UUIDs logged, or
1568 // single node log files with boot UUIDs in the header. We only know how to
1569 // order certain boots in certain circumstances.
1570 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
1571 for (size_t node_index = 0; node_index < boot_counts_.size();
1572 ++node_index) {
1573 CHECK(parts_.boots);
1574 if (parts().boots->boots[node_index].size() == 1u) {
1575 boot_counts_[node_index] = 0;
1576 }
1577 }
1578 } else {
1579 // Really old single node logs without any UUIDs. They can't reboot.
1580 CHECK_EQ(boot_counts_.size(), 1u);
1581 boot_counts_[0] = 0u;
1582 }
1583 }
1584}
Austin Schuhc41603c2020-10-11 16:17:37 -07001585
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001586std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -07001587 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001588 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -07001589 message_reader_.ReadMessage();
1590 if (message) {
1591 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001592 const monotonic_clock::time_point monotonic_sent_time =
1593 message->monotonic_sent_time;
1594
1595 // TODO(austin): Does this work with startup? Might need to use the
1596 // start time.
1597 // TODO(austin): Does this work with startup when we don't know the
1598 // remote start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -08001599 if (monotonic_sent_time >
1600 parts_.monotonic_start_time + max_out_of_order_duration()) {
1601 after_start_ = true;
1602 }
1603 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -08001604 CHECK_GE(monotonic_sent_time,
1605 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -08001606 << ": Max out of order of " << max_out_of_order_duration().count()
1607 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -08001608 << parts_.monotonic_start_time << " currently reading "
1609 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -08001610 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001611 return message;
1612 }
1613 NextLog();
1614 }
Austin Schuh32f68492020-11-08 21:45:51 -08001615 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001616 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -07001617}
1618
1619void PartsMessageReader::NextLog() {
1620 if (next_part_index_ == parts_.parts.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001621 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -07001622 done_ = true;
1623 return;
1624 }
Brian Silvermanfee16972021-09-14 12:06:38 -07001625 CHECK(next_message_reader_);
1626 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -07001627 ComputeBootCounts();
Brian Silvermanfee16972021-09-14 12:06:38 -07001628 if (next_part_index_ + 1 < parts_.parts.size()) {
1629 next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
1630 } else {
1631 next_message_reader_.reset();
1632 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001633 ++next_part_index_;
1634}
1635
Austin Schuh1be0ce42020-11-29 22:43:26 -08001636bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001637 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001638
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001639 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001640 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001641 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001642 return false;
1643 }
1644
1645 if (this->channel_index < m2.channel_index) {
1646 return true;
1647 } else if (this->channel_index > m2.channel_index) {
1648 return false;
1649 }
1650
1651 return this->queue_index < m2.queue_index;
1652}
1653
1654bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001655bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001656 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001657
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001658 return timestamp.time == m2.timestamp.time &&
1659 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001660}
Austin Schuh1be0ce42020-11-29 22:43:26 -08001661
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001662std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
1663 os << "{.channel_index=" << m.channel_index
1664 << ", .monotonic_sent_time=" << m.monotonic_sent_time
1665 << ", .realtime_sent_time=" << m.realtime_sent_time
1666 << ", .queue_index=" << m.queue_index;
1667 if (m.monotonic_remote_time) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001668 os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001669 }
1670 os << ", .realtime_remote_time=";
1671 PrintOptionalOrNull(&os, m.realtime_remote_time);
1672 os << ", .remote_queue_index=";
1673 PrintOptionalOrNull(&os, m.remote_queue_index);
1674 if (m.has_monotonic_timestamp_time) {
1675 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1676 }
Austin Schuh22cf7862022-09-19 19:09:42 -07001677 os << "}";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001678 return os;
1679}
1680
Austin Schuh1be0ce42020-11-29 22:43:26 -08001681std::ostream &operator<<(std::ostream &os, const Message &m) {
1682 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001683 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001684 if (m.data != nullptr) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001685 if (m.data->remote_queue_index.has_value()) {
1686 os << ", .remote_queue_index=" << *m.data->remote_queue_index;
1687 }
1688 if (m.data->monotonic_remote_time.has_value()) {
1689 os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
1690 }
Austin Schuhfb1b3292021-11-16 21:20:15 -08001691 os << ", .data=" << m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -08001692 }
1693 os << "}";
1694 return os;
1695}
1696
1697std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
1698 os << "{.channel_index=" << m.channel_index
1699 << ", .queue_index=" << m.queue_index
1700 << ", .monotonic_event_time=" << m.monotonic_event_time
1701 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -07001702 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001703 os << ", .remote_queue_index=" << m.remote_queue_index;
1704 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001705 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001706 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
1707 }
1708 if (m.realtime_remote_time != realtime_clock::min_time) {
1709 os << ", .realtime_remote_time=" << m.realtime_remote_time;
1710 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001711 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001712 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1713 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001714 if (m.data != nullptr) {
1715 os << ", .data=" << *m.data;
Austin Schuh22cf7862022-09-19 19:09:42 -07001716 } else {
1717 os << ", .data=nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08001718 }
1719 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -08001720 return os;
1721}
1722
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001723LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001724 : parts_message_reader_(log_parts),
1725 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
1726}
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001727
1728Message *LogPartsSorter::Front() {
1729 // Queue up data until enough data has been queued that the front message is
1730 // sorted enough to be safe to pop. This may do nothing, so we should make
1731 // sure the nothing path is checked quickly.
1732 if (sorted_until() != monotonic_clock::max_time) {
1733 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -07001734 if (!messages_.empty() &&
1735 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -08001736 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001737 break;
1738 }
1739
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001740 std::shared_ptr<UnpackedMessageHeader> m =
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001741 parts_message_reader_.ReadMessage();
1742 // No data left, sorted forever, work through what is left.
1743 if (!m) {
1744 sorted_until_ = monotonic_clock::max_time;
1745 break;
1746 }
1747
Austin Schuh48507722021-07-17 17:29:24 -07001748 size_t monotonic_timestamp_boot = 0;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001749 if (m->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -07001750 monotonic_timestamp_boot = parts().logger_boot_count;
1751 }
1752 size_t monotonic_remote_boot = 0xffffff;
1753
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001754 if (m->monotonic_remote_time.has_value()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001755 const Node *node =
1756 parts().config->nodes()->Get(source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001757
Austin Schuh48507722021-07-17 17:29:24 -07001758 std::optional<size_t> boot = parts_message_reader_.boot_count(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001759 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001760 CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
Austin Schuh60e77942022-05-16 17:48:24 -07001761 << ", with index " << source_node_index_[m->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -07001762 monotonic_remote_boot = *boot;
1763 }
1764
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001765 messages_.insert(
1766 Message{.channel_index = m->channel_index,
1767 .queue_index = BootQueueIndex{.boot = parts().boot_count,
1768 .index = m->queue_index},
1769 .timestamp = BootTimestamp{.boot = parts().boot_count,
1770 .time = m->monotonic_sent_time},
1771 .monotonic_remote_boot = monotonic_remote_boot,
1772 .monotonic_timestamp_boot = monotonic_timestamp_boot,
1773 .data = std::move(m)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001774
1775 // Now, update sorted_until_ to match the new message.
1776 if (parts_message_reader_.newest_timestamp() >
1777 monotonic_clock::min_time +
1778 parts_message_reader_.max_out_of_order_duration()) {
1779 sorted_until_ = parts_message_reader_.newest_timestamp() -
1780 parts_message_reader_.max_out_of_order_duration();
1781 } else {
1782 sorted_until_ = monotonic_clock::min_time;
1783 }
1784 }
1785 }
1786
1787 // Now that we have enough data queued, return a pointer to the oldest piece
1788 // of data if it exists.
1789 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -08001790 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001791 return nullptr;
1792 }
1793
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001794 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -08001795 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001796 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001797 return &(*messages_.begin());
1798}
1799
1800void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
1801
1802std::string LogPartsSorter::DebugString() const {
1803 std::stringstream ss;
1804 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001805 int count = 0;
1806 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001807 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001808 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
1809 ss << m << "\n";
1810 } else if (no_dots) {
1811 ss << "...\n";
1812 no_dots = false;
1813 }
1814 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001815 }
1816 ss << "] <- " << parts_message_reader_.filename();
1817 return ss.str();
1818}
1819
Austin Schuhd2f96102020-12-01 20:27:29 -08001820NodeMerger::NodeMerger(std::vector<LogParts> parts) {
1821 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -07001822 // Enforce that we are sorting things only from a single node from a single
1823 // boot.
1824 const std::string_view part0_node = parts[0].node;
1825 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -08001826 for (size_t i = 1; i < parts.size(); ++i) {
1827 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -07001828 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
1829 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -08001830 }
Austin Schuh715adc12021-06-29 22:07:39 -07001831
1832 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
1833
Austin Schuhd2f96102020-12-01 20:27:29 -08001834 for (LogParts &part : parts) {
1835 parts_sorters_.emplace_back(std::move(part));
1836 }
1837
Austin Schuhd2f96102020-12-01 20:27:29 -08001838 monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh9dc42612021-09-20 20:41:29 -07001839 realtime_start_time_ = realtime_clock::min_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001840 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001841 // We want to capture the earliest meaningful start time here. The start
1842 // time defaults to min_time when there's no meaningful value to report, so
1843 // let's ignore those.
Austin Schuh9dc42612021-09-20 20:41:29 -07001844 if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time) {
1845 bool accept = false;
1846 // We want to prioritize start times from the logger node. Really, we
1847 // want to prioritize start times with a valid realtime_clock time. So,
1848 // if we have a start time without a RT clock, prefer a start time with a
1849 // RT clock, even it if is later.
1850 if (parts_sorter.realtime_start_time() != realtime_clock::min_time) {
1851 // We've got a good one. See if the current start time has a good RT
1852 // clock, or if we should use this one instead.
1853 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1854 accept = true;
1855 } else if (realtime_start_time_ == realtime_clock::min_time) {
1856 // The previous start time doesn't have a good RT time, so it is very
1857 // likely the start time from a remote part file. We just found a
1858 // better start time with a real RT time, so switch to that instead.
1859 accept = true;
1860 }
1861 } else if (realtime_start_time_ == realtime_clock::min_time) {
1862 // We don't have a RT time, so take the oldest.
1863 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1864 accept = true;
1865 }
1866 }
1867
1868 if (accept) {
1869 monotonic_start_time_ = parts_sorter.monotonic_start_time();
1870 realtime_start_time_ = parts_sorter.realtime_start_time();
1871 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001872 }
1873 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001874
1875 // If there was no meaningful start time reported, just use min_time.
1876 if (monotonic_start_time_ == monotonic_clock::max_time) {
1877 monotonic_start_time_ = monotonic_clock::min_time;
1878 realtime_start_time_ = realtime_clock::min_time;
1879 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001880}
Austin Schuh8f52ed52020-11-30 23:12:39 -08001881
Austin Schuh0ca51f32020-12-25 21:51:45 -08001882std::vector<const LogParts *> NodeMerger::Parts() const {
1883 std::vector<const LogParts *> p;
1884 p.reserve(parts_sorters_.size());
1885 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
1886 p.emplace_back(&parts_sorter.parts());
1887 }
1888 return p;
1889}
1890
Austin Schuh8f52ed52020-11-30 23:12:39 -08001891Message *NodeMerger::Front() {
1892 // Return the current Front if we have one, otherwise go compute one.
1893 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -08001894 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001895 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -08001896 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001897 }
1898
1899 // Otherwise, do a simple search for the oldest message, deduplicating any
1900 // duplicates.
1901 Message *oldest = nullptr;
1902 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001903 for (LogPartsSorter &parts_sorter : parts_sorters_) {
1904 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -08001905 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001906 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001907 continue;
1908 }
1909 if (oldest == nullptr || *m < *oldest) {
1910 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -08001911 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001912 } else if (*m == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001913 // Found a duplicate. If there is a choice, we want the one which has
1914 // the timestamp time.
1915 if (!m->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001916 parts_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001917 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001918 current_->PopFront();
1919 current_ = &parts_sorter;
1920 oldest = m;
1921 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001922 CHECK_EQ(m->data->monotonic_timestamp_time,
1923 oldest->data->monotonic_timestamp_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001924 parts_sorter.PopFront();
1925 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001926 }
1927
1928 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -08001929 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001930 }
1931
Austin Schuhb000de62020-12-03 22:00:40 -08001932 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001933 CHECK_GE(oldest->timestamp.time, last_message_time_);
1934 last_message_time_ = oldest->timestamp.time;
Austin Schuh5dd22842021-11-17 16:09:39 -08001935 monotonic_oldest_time_ =
1936 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08001937 } else {
1938 last_message_time_ = monotonic_clock::max_time;
1939 }
1940
Austin Schuh8f52ed52020-11-30 23:12:39 -08001941 // Return the oldest message found. This will be nullptr if nothing was
1942 // found, indicating there is nothing left.
1943 return oldest;
1944}
1945
1946void NodeMerger::PopFront() {
1947 CHECK(current_ != nullptr) << "Popping before calling Front()";
1948 current_->PopFront();
1949 current_ = nullptr;
1950}
1951
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001952BootMerger::BootMerger(std::vector<LogParts> files) {
1953 std::vector<std::vector<LogParts>> boots;
1954
1955 // Now, we need to split things out by boot.
1956 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001957 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001958 if (boot_count + 1 > boots.size()) {
1959 boots.resize(boot_count + 1);
1960 }
1961 boots[boot_count].emplace_back(std::move(files[i]));
1962 }
1963
1964 node_mergers_.reserve(boots.size());
1965 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07001966 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001967 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -07001968 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001969 }
1970 node_mergers_.emplace_back(
1971 std::make_unique<NodeMerger>(std::move(boots[i])));
1972 }
1973}
1974
1975Message *BootMerger::Front() {
1976 Message *result = node_mergers_[index_]->Front();
1977
1978 if (result != nullptr) {
1979 return result;
1980 }
1981
1982 if (index_ + 1u == node_mergers_.size()) {
1983 // At the end of the last node merger, just return.
1984 return nullptr;
1985 } else {
1986 ++index_;
1987 return Front();
1988 }
1989}
1990
1991void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
1992
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001993std::vector<const LogParts *> BootMerger::Parts() const {
1994 std::vector<const LogParts *> results;
1995 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
1996 std::vector<const LogParts *> node_parts = node_merger->Parts();
1997
1998 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
1999 std::make_move_iterator(node_parts.end()));
2000 }
2001
2002 return results;
2003}
2004
Austin Schuhd2f96102020-12-01 20:27:29 -08002005TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002006 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -08002007 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002008 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002009 if (!configuration_) {
2010 configuration_ = part->config;
2011 } else {
2012 CHECK_EQ(configuration_.get(), part->config.get());
2013 }
2014 }
2015 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -08002016 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
2017 // pretty simple.
2018 if (configuration::MultiNode(config)) {
2019 nodes_data_.resize(config->nodes()->size());
2020 const Node *my_node = config->nodes()->Get(node());
2021 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
2022 const Node *node = config->nodes()->Get(node_index);
2023 NodeData *node_data = &nodes_data_[node_index];
2024 node_data->channels.resize(config->channels()->size());
2025 // We should save the channel if it is delivered to the node represented
2026 // by the NodeData, but not sent by that node. That combo means it is
2027 // forwarded.
2028 size_t channel_index = 0;
2029 node_data->any_delivered = false;
2030 for (const Channel *channel : *config->channels()) {
2031 node_data->channels[channel_index].delivered =
2032 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08002033 configuration::ChannelIsSendableOnNode(channel, my_node) &&
2034 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08002035 node_data->any_delivered = node_data->any_delivered ||
2036 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08002037 if (node_data->channels[channel_index].delivered) {
2038 const Connection *connection =
2039 configuration::ConnectionToNode(channel, node);
2040 node_data->channels[channel_index].time_to_live =
2041 chrono::nanoseconds(connection->time_to_live());
2042 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002043 ++channel_index;
2044 }
2045 }
2046
2047 for (const Channel *channel : *config->channels()) {
2048 source_node_.emplace_back(configuration::GetNodeIndex(
2049 config, channel->source_node()->string_view()));
2050 }
2051 }
2052}
2053
2054void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002055 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08002056 CHECK_NE(timestamp_mapper->node(), node());
2057 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
2058
2059 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002060 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08002061 // we could needlessly save data.
2062 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08002063 VLOG(1) << "Registering on node " << node() << " for peer node "
2064 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08002065 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
2066
2067 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07002068
2069 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08002070 }
2071}
2072
Austin Schuh79b30942021-01-24 22:32:21 -08002073void TimestampMapper::QueueMessage(Message *m) {
Austin Schuh60e77942022-05-16 17:48:24 -07002074 matched_messages_.emplace_back(
2075 TimestampedMessage{.channel_index = m->channel_index,
2076 .queue_index = m->queue_index,
2077 .monotonic_event_time = m->timestamp,
2078 .realtime_event_time = m->data->realtime_sent_time,
2079 .remote_queue_index = BootQueueIndex::Invalid(),
2080 .monotonic_remote_time = BootTimestamp::min_time(),
2081 .realtime_remote_time = realtime_clock::min_time,
2082 .monotonic_timestamp_time = BootTimestamp::min_time(),
2083 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08002084}
2085
2086TimestampedMessage *TimestampMapper::Front() {
2087 // No need to fetch anything new. A previous message still exists.
2088 switch (first_message_) {
2089 case FirstMessage::kNeedsUpdate:
2090 break;
2091 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08002092 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002093 case FirstMessage::kNullptr:
2094 return nullptr;
2095 }
2096
Austin Schuh79b30942021-01-24 22:32:21 -08002097 if (matched_messages_.empty()) {
2098 if (!QueueMatched()) {
2099 first_message_ = FirstMessage::kNullptr;
2100 return nullptr;
2101 }
2102 }
2103 first_message_ = FirstMessage::kInMessage;
2104 return &matched_messages_.front();
2105}
2106
2107bool TimestampMapper::QueueMatched() {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002108 MatchResult result = MatchResult::kEndOfFile;
2109 do {
2110 result = MaybeQueueMatched();
2111 } while (result == MatchResult::kSkipped);
2112 return result == MatchResult::kQueued;
2113}
2114
2115bool TimestampMapper::CheckReplayChannelsAndMaybePop(
2116 const TimestampedMessage & /*message*/) {
2117 if (replay_channels_callback_ &&
2118 !replay_channels_callback_(matched_messages_.back())) {
2119 matched_messages_.pop_back();
2120 return true;
2121 }
2122 return false;
2123}
2124
2125TimestampMapper::MatchResult TimestampMapper::MaybeQueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08002126 if (nodes_data_.empty()) {
2127 // Simple path. We are single node, so there are no timestamps to match!
2128 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002129 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002130 if (!m) {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002131 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002132 }
Austin Schuh79b30942021-01-24 22:32:21 -08002133 // Enqueue this message into matched_messages_ so we have a place to
2134 // associate remote timestamps, and return it.
2135 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08002136
Austin Schuh79b30942021-01-24 22:32:21 -08002137 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2138 last_message_time_ = matched_messages_.back().monotonic_event_time;
2139
2140 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002141 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08002142 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002143 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2144 return MatchResult::kSkipped;
2145 }
2146 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002147 }
2148
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002149 // We need to only add messages to the list so they get processed for
2150 // messages which are delivered. Reuse the flow below which uses messages_
2151 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08002152 if (messages_.empty()) {
2153 if (!Queue()) {
2154 // Found nothing to add, we are out of data!
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002155 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002156 }
2157
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002158 // Now that it has been added (and cannibalized), forget about it
2159 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002160 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002161 }
2162
2163 Message *m = &(messages_.front());
2164
2165 if (source_node_[m->channel_index] == node()) {
2166 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08002167 QueueMessage(m);
2168 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2169 last_message_time_ = matched_messages_.back().monotonic_event_time;
2170 messages_.pop_front();
2171 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002172 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2173 return MatchResult::kSkipped;
2174 }
2175 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002176 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002177 // Got a timestamp, find the matching remote data, match it, and return
2178 // it.
Austin Schuhd2f96102020-12-01 20:27:29 -08002179 Message data = MatchingMessageFor(*m);
2180
2181 // Return the data from the remote. The local message only has timestamp
2182 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08002183 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08002184 .channel_index = m->channel_index,
2185 .queue_index = m->queue_index,
2186 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002187 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07002188 .remote_queue_index =
2189 BootQueueIndex{.boot = m->monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002190 .index = m->data->remote_queue_index.value()},
2191 .monotonic_remote_time = {m->monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002192 m->data->monotonic_remote_time.value()},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002193 .realtime_remote_time = m->data->realtime_remote_time.value(),
2194 .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
2195 m->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08002196 .data = std::move(data.data)});
2197 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2198 last_message_time_ = matched_messages_.back().monotonic_event_time;
2199 // Since messages_ holds the data, drop it.
2200 messages_.pop_front();
2201 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002202 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2203 return MatchResult::kSkipped;
2204 }
2205 return MatchResult::kQueued;
Austin Schuh79b30942021-01-24 22:32:21 -08002206 }
2207}
2208
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002209void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08002210 while (last_message_time_ <= queue_time) {
2211 if (!QueueMatched()) {
2212 return;
2213 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002214 }
2215}
2216
Austin Schuhe639ea12021-01-25 13:00:22 -08002217void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002218 // Note: queueing for time doesn't really work well across boots. So we
2219 // just assume that if you are using this, you only care about the current
2220 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002221 //
2222 // TODO(austin): Is that the right concept?
2223 //
Austin Schuhe639ea12021-01-25 13:00:22 -08002224 // Make sure we have something queued first. This makes the end time
2225 // calculation simpler, and is typically what folks want regardless.
2226 if (matched_messages_.empty()) {
2227 if (!QueueMatched()) {
2228 return;
2229 }
2230 }
2231
2232 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002233 std::max(monotonic_start_time(
2234 matched_messages_.front().monotonic_event_time.boot),
2235 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08002236 time_estimation_buffer;
2237
2238 // Place sorted messages on the list until we have
2239 // --time_estimation_buffer_seconds seconds queued up (but queue at least
2240 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002241 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08002242 if (!QueueMatched()) {
2243 return;
2244 }
2245 }
2246}
2247
Austin Schuhd2f96102020-12-01 20:27:29 -08002248void TimestampMapper::PopFront() {
2249 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08002250 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002251 first_message_ = FirstMessage::kNeedsUpdate;
2252
Austin Schuh79b30942021-01-24 22:32:21 -08002253 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002254}
2255
2256Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002257 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002258 CHECK_NOTNULL(message.data);
2259 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07002260 const BootQueueIndex remote_queue_index =
2261 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002262 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08002263
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002264 CHECK(message.data->monotonic_remote_time.has_value());
2265 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08002266
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002267 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07002268 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002269 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002270 const realtime_clock::time_point realtime_remote_time =
2271 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002272
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002273 TimestampMapper *peer =
2274 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08002275
2276 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002277 // asked to pull a timestamp from a peer which doesn't exist, return an
2278 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08002279 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002280 // TODO(austin): Make sure the tests hit all these paths with a boot count
2281 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002282 return Message{.channel_index = message.channel_index,
2283 .queue_index = remote_queue_index,
2284 .timestamp = monotonic_remote_time,
2285 .monotonic_remote_boot = 0xffffff,
2286 .monotonic_timestamp_boot = 0xffffff,
2287 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08002288 }
2289
2290 // The queue which will have the matching data, if available.
2291 std::deque<Message> *data_queue =
2292 &peer->nodes_data_[node()].channels[message.channel_index].messages;
2293
Austin Schuh79b30942021-01-24 22:32:21 -08002294 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08002295
2296 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002297 return Message{.channel_index = message.channel_index,
2298 .queue_index = remote_queue_index,
2299 .timestamp = monotonic_remote_time,
2300 .monotonic_remote_boot = 0xffffff,
2301 .monotonic_timestamp_boot = 0xffffff,
2302 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002303 }
2304
Austin Schuhd2f96102020-12-01 20:27:29 -08002305 if (remote_queue_index < data_queue->front().queue_index ||
2306 remote_queue_index > data_queue->back().queue_index) {
Austin Schuh60e77942022-05-16 17:48:24 -07002307 return Message{.channel_index = message.channel_index,
2308 .queue_index = remote_queue_index,
2309 .timestamp = monotonic_remote_time,
2310 .monotonic_remote_boot = 0xffffff,
2311 .monotonic_timestamp_boot = 0xffffff,
2312 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002313 }
2314
Austin Schuh993ccb52020-12-12 15:59:32 -08002315 // The algorithm below is constant time with some assumptions. We need there
2316 // to be no missing messages in the data stream. This also assumes a queue
2317 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07002318 if (data_queue->back().queue_index.boot ==
2319 data_queue->front().queue_index.boot &&
2320 (data_queue->back().queue_index.index -
2321 data_queue->front().queue_index.index + 1u ==
2322 data_queue->size())) {
2323 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08002324 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07002325 //
2326 // TODO(austin): Move if not reliable.
2327 Message result = (*data_queue)[remote_queue_index.index -
2328 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08002329
2330 CHECK_EQ(result.timestamp, monotonic_remote_time)
2331 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh6a7358f2021-11-18 22:40:40 -08002332 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08002333 << ": Queue index matches, but timestamp doesn't. Please investigate!";
2334 // Now drop the data off the front. We have deduplicated timestamps, so we
2335 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07002336 data_queue->erase(
2337 data_queue->begin(),
2338 data_queue->begin() +
2339 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08002340 return result;
2341 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07002342 // TODO(austin): Binary search.
2343 auto it = std::find_if(
2344 data_queue->begin(), data_queue->end(),
2345 [remote_queue_index,
2346 remote_boot = monotonic_remote_time.boot](const Message &m) {
2347 return m.queue_index == remote_queue_index &&
2348 m.timestamp.boot == remote_boot;
2349 });
Austin Schuh993ccb52020-12-12 15:59:32 -08002350 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002351 return Message{.channel_index = message.channel_index,
2352 .queue_index = remote_queue_index,
2353 .timestamp = monotonic_remote_time,
2354 .monotonic_remote_boot = 0xffffff,
2355 .monotonic_timestamp_boot = 0xffffff,
2356 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08002357 }
2358
2359 Message result = std::move(*it);
2360
2361 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002362 << ": Queue index matches, but timestamp doesn't. Please "
2363 "investigate!";
2364 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
2365 << ": Queue index matches, but timestamp doesn't. Please "
2366 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08002367
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08002368 // Erase everything up to this message. We want to keep 1 message in the
2369 // queue so we can handle reliable messages forwarded across boots.
2370 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08002371
2372 return result;
2373 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002374}
2375
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002376void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002377 if (queued_until_ > t) {
2378 return;
2379 }
2380 while (true) {
2381 if (!messages_.empty() && messages_.back().timestamp > t) {
2382 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
2383 return;
2384 }
2385
2386 if (!Queue()) {
2387 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002388 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08002389 return;
2390 }
2391
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002392 // Now that it has been added (and cannibalized), forget about it
2393 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002394 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002395 }
2396}
2397
2398bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002399 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002400 if (m == nullptr) {
2401 return false;
2402 }
2403 for (NodeData &node_data : nodes_data_) {
2404 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07002405 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08002406 if (node_data.channels[m->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08002407 // If we have data but no timestamps (logs where the timestamps didn't get
2408 // logged are classic), we can grow this indefinitely. We don't need to
2409 // keep anything that is older than the last message returned.
2410
2411 // We have the time on the source node.
2412 // We care to wait until we have the time on the destination node.
2413 std::deque<Message> &messages =
2414 node_data.channels[m->channel_index].messages;
2415 // Max delay over the network is the TTL, so let's take the queue time and
2416 // add TTL to it. Don't forget any messages which are reliable until
2417 // someone can come up with a good reason to forget those too.
2418 if (node_data.channels[m->channel_index].time_to_live >
2419 chrono::nanoseconds(0)) {
2420 // We need to make *some* assumptions about network delay for this to
2421 // work. We want to only look at the RX side. This means we need to
2422 // track the last time a message was popped from any channel from the
2423 // node sending this message, and compare that to the max time we expect
2424 // that a message will take to be delivered across the network. This
2425 // assumes that messages are popped in time order as a proxy for
2426 // measuring the distributed time at this layer.
2427 //
2428 // Leave at least 1 message in here so we can handle reboots and
2429 // messages getting sent twice.
2430 while (messages.size() > 1u &&
2431 messages.begin()->timestamp +
2432 node_data.channels[m->channel_index].time_to_live +
2433 chrono::duration_cast<chrono::nanoseconds>(
2434 chrono::duration<double>(FLAGS_max_network_delay)) <
2435 last_popped_message_time_) {
2436 messages.pop_front();
2437 }
2438 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002439 node_data.channels[m->channel_index].messages.emplace_back(*m);
2440 }
2441 }
2442
2443 messages_.emplace_back(std::move(*m));
2444 return true;
2445}
2446
2447std::string TimestampMapper::DebugString() const {
2448 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07002449 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002450 for (const Message &message : messages_) {
2451 ss << " " << message << "\n";
2452 }
2453 ss << "] queued_until " << queued_until_;
2454 for (const NodeData &ns : nodes_data_) {
2455 if (ns.peer == nullptr) continue;
2456 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
2457 size_t channel_index = 0;
2458 for (const NodeData::ChannelData &channel_data :
2459 ns.peer->nodes_data_[node()].channels) {
2460 if (channel_data.messages.empty()) {
2461 continue;
2462 }
Austin Schuhb000de62020-12-03 22:00:40 -08002463
Austin Schuhd2f96102020-12-01 20:27:29 -08002464 ss << " channel " << channel_index << " [\n";
2465 for (const Message &m : channel_data.messages) {
2466 ss << " " << m << "\n";
2467 }
2468 ss << " ]\n";
2469 ++channel_index;
2470 }
2471 ss << "] queued_until " << ns.peer->queued_until_;
2472 }
2473 return ss.str();
2474}
2475
Austin Schuhee711052020-08-24 16:06:09 -07002476std::string MaybeNodeName(const Node *node) {
2477 if (node != nullptr) {
2478 return node->name()->str() + " ";
2479 }
2480 return "";
2481}
2482
Brian Silvermanf51499a2020-09-21 12:49:08 -07002483} // namespace aos::logger