blob: 86c89a82f0f0b81b2fa90f68c9ab121bfcba44ea [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Austin Schuha36c8902019-12-30 18:07:15 -080010
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070013#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080014#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080015#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "gflags/gflags.h"
18#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080019
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070020#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070021#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070022#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070023#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070024#else
25#define ENABLE_LZMA 0
26#endif
27
28#if ENABLE_LZMA
29#include "aos/events/logging/lzma_encoder.h"
30#endif
Austin Schuh86110712022-09-16 15:40:54 -070031#if ENABLE_S3
32#include "aos/events/logging/s3_fetcher.h"
33#endif
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070034
Austin Schuh48d10d62022-10-16 22:19:23 -070035DEFINE_int32(flush_size, 128 * 1024,
Austin Schuha36c8902019-12-30 18:07:15 -080036 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070037DEFINE_double(
38 flush_period, 5.0,
39 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080040
Austin Schuha040c3f2021-02-13 16:09:07 -080041DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080042 max_network_delay, 1.0,
43 "Max time to assume a message takes to cross the network before we are "
44 "willing to drop it from our buffers and assume it didn't make it. "
45 "Increasing this number can increase memory usage depending on the packet "
46 "loss of your network or if the timestamps aren't logged for a message.");
47
48DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080049 max_out_of_order, -1,
50 "If set, this overrides the max out of order duration for a log file.");
51
Austin Schuh0e8db662021-07-06 10:43:47 -070052DEFINE_bool(workaround_double_headers, true,
53 "Some old log files have two headers at the beginning. Use the "
54 "last header as the actual header.");
55
Brian Smarttea913d42021-12-10 15:02:38 -080056DEFINE_bool(crash_on_corrupt_message, true,
57 "When true, MessageReader will crash the first time a message "
58 "with corrupted format is found. When false, the crash will be "
59 "suppressed, and any remaining readable messages will be "
60 "evaluated to present verified vs corrupted stats.");
61
62DEFINE_bool(ignore_corrupt_messages, false,
63 "When true, and crash_on_corrupt_message is false, then any "
64 "corrupt message found by MessageReader be silently ignored, "
65 "providing access to all uncorrupted messages in a logfile.");
66
Brian Silvermanf51499a2020-09-21 12:49:08 -070067namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070068namespace {
Austin Schuha36c8902019-12-30 18:07:15 -080069
Austin Schuh05b70472020-01-01 17:11:17 -080070namespace chrono = std::chrono;
71
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070072template <typename T>
73void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
74 if (t.has_value()) {
75 *os << *t;
76 } else {
77 *os << "null";
78 }
79}
80} // namespace
81
Austin Schuh48d10d62022-10-16 22:19:23 -070082DetachedBufferWriter::DetachedBufferWriter(std::string_view filename,
83 std::unique_ptr<DataEncoder> encoder)
Brian Silvermanf51499a2020-09-21 12:49:08 -070084 : filename_(filename), encoder_(std::move(encoder)) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -070085 if (!util::MkdirPIfSpace(filename, 0777)) {
86 ran_out_of_space_ = true;
87 } else {
James Kuszmaul9776b392023-01-14 14:08:08 -080088 fd_ = open(filename_.c_str(), O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
Brian Silvermana9f2ec92020-10-06 18:00:53 -070089 if (fd_ == -1 && errno == ENOSPC) {
90 ran_out_of_space_ = true;
91 } else {
Austin Schuh58646e22021-08-23 23:51:46 -070092 PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
93 << " for writing";
94 VLOG(1) << "Opened " << this->filename() << " for writing";
Brian Silvermana9f2ec92020-10-06 18:00:53 -070095 }
96 }
Austin Schuha36c8902019-12-30 18:07:15 -080097}
98
99DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700100 Close();
101 if (ran_out_of_space_) {
102 CHECK(acknowledge_ran_out_of_space_)
103 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -0700104 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700105}
106
Brian Silvermand90905f2020-09-23 14:42:56 -0700107DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700108 *this = std::move(other);
109}
110
Brian Silverman87ac0402020-09-17 14:47:01 -0700111// When other is destroyed "soon" (which it should be because we're getting an
112// rvalue reference to it), it will flush etc all the data we have queued up
113// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700114DetachedBufferWriter &DetachedBufferWriter::operator=(
115 DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700116 std::swap(filename_, other.filename_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700117 std::swap(encoder_, other.encoder_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700118 std::swap(fd_, other.fd_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700119 std::swap(ran_out_of_space_, other.ran_out_of_space_);
120 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700121 std::swap(iovec_, other.iovec_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700122 std::swap(max_write_time_, other.max_write_time_);
123 std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
124 std::swap(max_write_time_messages_, other.max_write_time_messages_);
125 std::swap(total_write_time_, other.total_write_time_);
126 std::swap(total_write_count_, other.total_write_count_);
127 std::swap(total_write_messages_, other.total_write_messages_);
128 std::swap(total_write_bytes_, other.total_write_bytes_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700129 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800130}
131
Austin Schuh8bdfc492023-02-11 12:53:13 -0800132void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *copier,
Austin Schuh7ef11a42023-02-04 17:15:12 -0800133 aos::monotonic_clock::time_point now) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700134 if (ran_out_of_space_) {
135 // We don't want any later data to be written after space becomes
136 // available, so refuse to write anything more once we've dropped data
137 // because we ran out of space.
Austin Schuh48d10d62022-10-16 22:19:23 -0700138 return;
Austin Schuha36c8902019-12-30 18:07:15 -0800139 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700140
Austin Schuh8bdfc492023-02-11 12:53:13 -0800141 const size_t message_size = copier->size();
142 size_t overall_bytes_written = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -0700143
Austin Schuh8bdfc492023-02-11 12:53:13 -0800144 // Keep writing chunks until we've written it all. If we end up with a
145 // partial write, this means we need to flush to disk.
146 do {
147 const size_t bytes_written = encoder_->Encode(copier, overall_bytes_written);
148 CHECK(bytes_written != 0);
149
150 overall_bytes_written += bytes_written;
151 if (overall_bytes_written < message_size) {
152 VLOG(1) << "Flushing because of a partial write, tried to write "
153 << message_size << " wrote " << overall_bytes_written;
154 Flush(now);
155 }
156 } while (overall_bytes_written < message_size);
157
Austin Schuhbd06ae42021-03-31 22:48:21 -0700158 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800159}
160
Brian Silverman0465fcf2020-09-24 00:29:18 -0700161void DetachedBufferWriter::Close() {
162 if (fd_ == -1) {
163 return;
164 }
165 encoder_->Finish();
166 while (encoder_->queue_size() > 0) {
Austin Schuh8bdfc492023-02-11 12:53:13 -0800167 Flush(monotonic_clock::max_time);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700168 }
169 if (close(fd_) == -1) {
170 if (errno == ENOSPC) {
171 ran_out_of_space_ = true;
172 } else {
173 PLOG(ERROR) << "Closing log file failed";
174 }
175 }
176 fd_ = -1;
Austin Schuh58646e22021-08-23 23:51:46 -0700177 VLOG(1) << "Closed " << filename();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700178}
179
Austin Schuh8bdfc492023-02-11 12:53:13 -0800180void DetachedBufferWriter::Flush(aos::monotonic_clock::time_point now) {
181 last_flush_time_ = now;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700182 if (ran_out_of_space_) {
183 // We don't want any later data to be written after space becomes available,
184 // so refuse to write anything more once we've dropped data because we ran
185 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700186 if (encoder_) {
187 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
188 encoder_->Clear(encoder_->queue().size());
189 } else {
190 VLOG(1) << "No queue to ignore";
191 }
192 return;
193 }
194
195 const auto queue = encoder_->queue();
196 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700197 return;
198 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700199
Austin Schuha36c8902019-12-30 18:07:15 -0800200 iovec_.clear();
Brian Silvermanf51499a2020-09-21 12:49:08 -0700201 const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
202 iovec_.resize(iovec_size);
Austin Schuha36c8902019-12-30 18:07:15 -0800203 size_t counted_size = 0;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700204 for (size_t i = 0; i < iovec_size; ++i) {
205 iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
206 iovec_[i].iov_len = queue[i].size();
207 counted_size += iovec_[i].iov_len;
Austin Schuha36c8902019-12-30 18:07:15 -0800208 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700209
210 const auto start = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800211 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700212 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700213 HandleWriteReturn(written, counted_size);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700214
215 encoder_->Clear(iovec_size);
216
217 UpdateStatsForWrite(end - start, written, iovec_size);
218}
219
Brian Silverman0465fcf2020-09-24 00:29:18 -0700220void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
221 size_t write_size) {
222 if (write_return == -1 && errno == ENOSPC) {
223 ran_out_of_space_ = true;
224 return;
225 }
226 PCHECK(write_return >= 0) << ": write failed";
227 if (write_return < static_cast<ssize_t>(write_size)) {
228 // Sometimes this happens instead of ENOSPC. On a real filesystem, this
229 // never seems to happen in any other case. If we ever want to log to a
230 // socket, this will happen more often. However, until we get there, we'll
231 // just assume it means we ran out of space.
232 ran_out_of_space_ = true;
233 return;
234 }
235}
236
Brian Silvermanf51499a2020-09-21 12:49:08 -0700237void DetachedBufferWriter::UpdateStatsForWrite(
238 aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
239 if (duration > max_write_time_) {
240 max_write_time_ = duration;
241 max_write_time_bytes_ = written;
242 max_write_time_messages_ = iovec_size;
243 }
244 total_write_time_ += duration;
245 ++total_write_count_;
246 total_write_messages_ += iovec_size;
247 total_write_bytes_ += written;
248}
249
Austin Schuhbd06ae42021-03-31 22:48:21 -0700250void DetachedBufferWriter::FlushAtThreshold(
251 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700252 if (ran_out_of_space_) {
253 // We don't want any later data to be written after space becomes available,
254 // so refuse to write anything more once we've dropped data because we ran
255 // out of space.
256 if (encoder_) {
257 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
258 encoder_->Clear(encoder_->queue().size());
259 } else {
260 VLOG(1) << "No queue to ignore";
261 }
262 return;
263 }
264
Austin Schuhbd06ae42021-03-31 22:48:21 -0700265 // We don't want to flush the first time through. Otherwise we will flush as
266 // the log file header might be compressing, defeating any parallelism and
267 // queueing there.
268 if (last_flush_time_ == aos::monotonic_clock::min_time) {
269 last_flush_time_ = now;
270 }
271
Brian Silvermanf51499a2020-09-21 12:49:08 -0700272 // Flush if we are at the max number of iovs per writev, because there's no
273 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700274 // data queued up or if it has been long enough.
Austin Schuh8bdfc492023-02-11 12:53:13 -0800275 while (encoder_->space() == 0 ||
276 encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700277 encoder_->queue_size() >= IOV_MAX ||
278 now > last_flush_time_ +
279 chrono::duration_cast<chrono::nanoseconds>(
280 chrono::duration<double>(FLAGS_flush_period))) {
Austin Schuh8bdfc492023-02-11 12:53:13 -0800281 VLOG(1) << "Chose to flush at " << now << ", last " << last_flush_time_;
282 Flush(now);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700283 }
Austin Schuha36c8902019-12-30 18:07:15 -0800284}
285
Austin Schuhf2d0e682022-10-16 14:20:58 -0700286// Do the magic dance to convert the endianness of the data and append it to the
287// buffer.
288namespace {
289
290// TODO(austin): Look at the generated code to see if building the header is
291// efficient or not.
292template <typename T>
293uint8_t *Push(uint8_t *buffer, const T data) {
294 const T endian_data = flatbuffers::EndianScalar<T>(data);
295 std::memcpy(buffer, &endian_data, sizeof(T));
296 return buffer + sizeof(T);
297}
298
299uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
300 std::memcpy(buffer, data, size);
301 return buffer + size;
302}
303
304uint8_t *Pad(uint8_t *buffer, size_t padding) {
305 std::memset(buffer, 0, padding);
306 return buffer + padding;
307}
308} // namespace
309
310flatbuffers::Offset<MessageHeader> PackRemoteMessage(
311 flatbuffers::FlatBufferBuilder *fbb,
312 const message_bridge::RemoteMessage *msg, int channel_index,
313 const aos::monotonic_clock::time_point monotonic_timestamp_time) {
314 logger::MessageHeader::Builder message_header_builder(*fbb);
315 // Note: this must match the same order as MessageBridgeServer and
316 // PackMessage. We want identical headers to have identical
317 // on-the-wire formats to make comparing them easier.
318
319 message_header_builder.add_channel_index(channel_index);
320
321 message_header_builder.add_queue_index(msg->queue_index());
322 message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
323 message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
324
325 message_header_builder.add_monotonic_remote_time(
326 msg->monotonic_remote_time());
327 message_header_builder.add_realtime_remote_time(msg->realtime_remote_time());
328 message_header_builder.add_remote_queue_index(msg->remote_queue_index());
329
330 message_header_builder.add_monotonic_timestamp_time(
331 monotonic_timestamp_time.time_since_epoch().count());
332
333 return message_header_builder.Finish();
334}
335
336size_t PackRemoteMessageInline(
337 uint8_t *buffer, const message_bridge::RemoteMessage *msg,
338 int channel_index,
Austin Schuh71a40d42023-02-04 21:22:22 -0800339 const aos::monotonic_clock::time_point monotonic_timestamp_time,
340 size_t start_byte, size_t end_byte) {
Austin Schuhf2d0e682022-10-16 14:20:58 -0700341 const flatbuffers::uoffset_t message_size = PackRemoteMessageSize();
Austin Schuh71a40d42023-02-04 21:22:22 -0800342 DCHECK_EQ((start_byte % 8u), 0u);
343 DCHECK_EQ((end_byte % 8u), 0u);
344 DCHECK_LE(start_byte, end_byte);
345 DCHECK_LE(end_byte, message_size);
Austin Schuhf2d0e682022-10-16 14:20:58 -0700346
Austin Schuh71a40d42023-02-04 21:22:22 -0800347 switch (start_byte) {
348 case 0x00u:
349 if ((end_byte) == 0x00u) {
350 break;
351 }
352 // clang-format off
353 // header:
354 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
355 buffer = Push<flatbuffers::uoffset_t>(
356 buffer, message_size - sizeof(flatbuffers::uoffset_t));
357 // +0x04 | 20 00 00 00 | UOffset32 | 0x00000020 (32) Loc: +0x24 | offset to root table `aos.logger.MessageHeader`
358 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x20);
359 [[fallthrough]];
360 case 0x08u:
361 if ((end_byte) == 0x08u) {
362 break;
363 }
364 //
365 // padding:
366 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
367 buffer = Pad(buffer, 6);
368 //
369 // vtable (aos.logger.MessageHeader):
370 // +0x0E | 16 00 | uint16_t | 0x0016 (22) | size of this vtable
371 buffer = Push<flatbuffers::voffset_t>(buffer, 0x16);
372 [[fallthrough]];
373 case 0x10u:
374 if ((end_byte) == 0x10u) {
375 break;
376 }
377 // +0x10 | 3C 00 | uint16_t | 0x003C (60) | size of referring table
378 buffer = Push<flatbuffers::voffset_t>(buffer, 0x3c);
379 // +0x12 | 38 00 | VOffset16 | 0x0038 (56) | offset to field `channel_index` (id: 0)
380 buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
381 // +0x14 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `monotonic_sent_time` (id: 1)
382 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
383 // +0x16 | 24 00 | VOffset16 | 0x0024 (36) | offset to field `realtime_sent_time` (id: 2)
384 buffer = Push<flatbuffers::voffset_t>(buffer, 0x24);
385 [[fallthrough]];
386 case 0x18u:
387 if ((end_byte) == 0x18u) {
388 break;
389 }
390 // +0x18 | 34 00 | VOffset16 | 0x0034 (52) | offset to field `queue_index` (id: 3)
391 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
392 // +0x1A | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
393 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
394 // +0x1C | 1C 00 | VOffset16 | 0x001C (28) | offset to field `monotonic_remote_time` (id: 5)
395 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
396 // +0x1E | 14 00 | VOffset16 | 0x0014 (20) | offset to field `realtime_remote_time` (id: 6)
397 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
398 [[fallthrough]];
399 case 0x20u:
400 if ((end_byte) == 0x20u) {
401 break;
402 }
403 // +0x20 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `remote_queue_index` (id: 7)
404 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
405 // +0x22 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `monotonic_timestamp_time` (id: 8)
406 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
407 //
408 // root_table (aos.logger.MessageHeader):
409 // +0x24 | 16 00 00 00 | SOffset32 | 0x00000016 (22) Loc: +0x0E | offset to vtable
410 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x16);
411 [[fallthrough]];
412 case 0x28u:
413 if ((end_byte) == 0x28u) {
414 break;
415 }
416 // +0x28 | F6 0B D8 11 A4 A8 B1 71 | int64_t | 0x71B1A8A411D80BF6 (8192514619791117302) | table field `monotonic_timestamp_time` (Long)
417 buffer = Push<int64_t>(buffer,
418 monotonic_timestamp_time.time_since_epoch().count());
419 [[fallthrough]];
420 case 0x30u:
421 if ((end_byte) == 0x30u) {
422 break;
423 }
424 // +0x30 | 00 00 00 00 | uint8_t[4] | .... | padding
425 // TODO(austin): Can we re-arrange the order to ditch the padding?
426 // (Answer is yes, but what is the impact elsewhere? It will change the
427 // binary format)
428 buffer = Pad(buffer, 4);
429 // +0x34 | 75 00 00 00 | uint32_t | 0x00000075 (117) | table field `remote_queue_index` (UInt)
430 buffer = Push<uint32_t>(buffer, msg->remote_queue_index());
431 [[fallthrough]];
432 case 0x38u:
433 if ((end_byte) == 0x38u) {
434 break;
435 }
436 // +0x38 | AA B0 43 0A 35 BE FA D2 | int64_t | 0xD2FABE350A43B0AA (-3244071446552268630) | table field `realtime_remote_time` (Long)
437 buffer = Push<int64_t>(buffer, msg->realtime_remote_time());
438 [[fallthrough]];
439 case 0x40u:
440 if ((end_byte) == 0x40u) {
441 break;
442 }
443 // +0x40 | D5 40 30 F3 C1 A7 26 1D | int64_t | 0x1D26A7C1F33040D5 (2100550727665467605) | table field `monotonic_remote_time` (Long)
444 buffer = Push<int64_t>(buffer, msg->monotonic_remote_time());
445 [[fallthrough]];
446 case 0x48u:
447 if ((end_byte) == 0x48u) {
448 break;
449 }
450 // +0x48 | 5B 25 32 A1 4A E8 46 CA | int64_t | 0xCA46E84AA132255B (-3871151422448720549) | table field `realtime_sent_time` (Long)
451 buffer = Push<int64_t>(buffer, msg->realtime_sent_time());
452 [[fallthrough]];
453 case 0x50u:
454 if ((end_byte) == 0x50u) {
455 break;
456 }
457 // +0x50 | 49 7D 45 1F 8C 36 6B A3 | int64_t | 0xA36B368C1F457D49 (-6671178447571288759) | table field `monotonic_sent_time` (Long)
458 buffer = Push<int64_t>(buffer, msg->monotonic_sent_time());
459 [[fallthrough]];
460 case 0x58u:
461 if ((end_byte) == 0x58u) {
462 break;
463 }
464 // +0x58 | 33 00 00 00 | uint32_t | 0x00000033 (51) | table field `queue_index` (UInt)
465 buffer = Push<uint32_t>(buffer, msg->queue_index());
466 // +0x5C | 76 00 00 00 | uint32_t | 0x00000076 (118) | table field `channel_index` (UInt)
467 buffer = Push<uint32_t>(buffer, channel_index);
468 // clang-format on
469 [[fallthrough]];
470 case 0x60u:
471 if ((end_byte) == 0x60u) {
472 break;
473 }
474 }
Austin Schuhf2d0e682022-10-16 14:20:58 -0700475
Austin Schuh71a40d42023-02-04 21:22:22 -0800476 return end_byte - start_byte;
Austin Schuhf2d0e682022-10-16 14:20:58 -0700477}
478
Austin Schuha36c8902019-12-30 18:07:15 -0800479flatbuffers::Offset<MessageHeader> PackMessage(
480 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
481 int channel_index, LogType log_type) {
482 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
483
484 switch (log_type) {
485 case LogType::kLogMessage:
486 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800487 case LogType::kLogRemoteMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700488 // Since the timestamps are 8 byte aligned, we are going to end up adding
489 // padding in the middle of the message to pad everything out to 8 byte
490 // alignment. That's rather wasteful. To make things efficient to mmap
491 // while reading uncompressed logs, we'd actually rather the message be
492 // aligned. So, force 8 byte alignment (enough to preserve alignment
493 // inside the nested message so that we can read it without moving it)
494 // here.
495 fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8);
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700496 data_offset = fbb->CreateVector(
497 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800498 break;
499
500 case LogType::kLogDeliveryTimeOnly:
501 break;
502 }
503
504 MessageHeader::Builder message_header_builder(*fbb);
505 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800506
Austin Schuhfa30c352022-10-16 11:12:02 -0700507 // These are split out into very explicit serialization calls because the
508 // order here changes the order things are written out on the wire, and we
509 // want to control and understand it here. Changing the order can increase
510 // the amount of padding bytes in the middle.
511 //
James Kuszmaul9776b392023-01-14 14:08:08 -0800512 // It is also easier to follow... And doesn't actually make things much
513 // bigger.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800514 switch (log_type) {
515 case LogType::kLogRemoteMessage:
516 message_header_builder.add_queue_index(context.remote_queue_index);
Austin Schuhfa30c352022-10-16 11:12:02 -0700517 message_header_builder.add_data(data_offset);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800518 message_header_builder.add_monotonic_sent_time(
519 context.monotonic_remote_time.time_since_epoch().count());
520 message_header_builder.add_realtime_sent_time(
521 context.realtime_remote_time.time_since_epoch().count());
522 break;
523
Austin Schuh6f3babe2020-01-26 20:34:50 -0800524 case LogType::kLogDeliveryTimeOnly:
525 message_header_builder.add_queue_index(context.queue_index);
526 message_header_builder.add_monotonic_sent_time(
527 context.monotonic_event_time.time_since_epoch().count());
528 message_header_builder.add_realtime_sent_time(
529 context.realtime_event_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800530 message_header_builder.add_monotonic_remote_time(
531 context.monotonic_remote_time.time_since_epoch().count());
532 message_header_builder.add_realtime_remote_time(
533 context.realtime_remote_time.time_since_epoch().count());
534 message_header_builder.add_remote_queue_index(context.remote_queue_index);
535 break;
Austin Schuhfa30c352022-10-16 11:12:02 -0700536
537 case LogType::kLogMessage:
538 message_header_builder.add_queue_index(context.queue_index);
539 message_header_builder.add_data(data_offset);
540 message_header_builder.add_monotonic_sent_time(
541 context.monotonic_event_time.time_since_epoch().count());
542 message_header_builder.add_realtime_sent_time(
543 context.realtime_event_time.time_since_epoch().count());
544 break;
545
546 case LogType::kLogMessageAndDeliveryTime:
547 message_header_builder.add_queue_index(context.queue_index);
548 message_header_builder.add_remote_queue_index(context.remote_queue_index);
549 message_header_builder.add_monotonic_sent_time(
550 context.monotonic_event_time.time_since_epoch().count());
551 message_header_builder.add_realtime_sent_time(
552 context.realtime_event_time.time_since_epoch().count());
553 message_header_builder.add_monotonic_remote_time(
554 context.monotonic_remote_time.time_since_epoch().count());
555 message_header_builder.add_realtime_remote_time(
556 context.realtime_remote_time.time_since_epoch().count());
557 message_header_builder.add_data(data_offset);
558 break;
Austin Schuha36c8902019-12-30 18:07:15 -0800559 }
560
561 return message_header_builder.Finish();
562}
563
Austin Schuhfa30c352022-10-16 11:12:02 -0700564flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) {
565 switch (log_type) {
566 case LogType::kLogMessage:
567 return
568 // Root table size + offset.
569 sizeof(flatbuffers::uoffset_t) * 2 +
570 // 6 padding bytes to pad the header out properly.
571 6 +
572 // vtable header (size + size of table)
573 sizeof(flatbuffers::voffset_t) * 2 +
574 // offsets to all the fields.
575 sizeof(flatbuffers::voffset_t) * 5 +
576 // pointer to vtable
577 sizeof(flatbuffers::soffset_t) +
578 // pointer to data
579 sizeof(flatbuffers::uoffset_t) +
580 // realtime_sent_time, monotonic_sent_time
581 sizeof(int64_t) * 2 +
582 // queue_index, channel_index
583 sizeof(uint32_t) * 2;
584
585 case LogType::kLogDeliveryTimeOnly:
586 return
587 // Root table size + offset.
588 sizeof(flatbuffers::uoffset_t) * 2 +
589 // 6 padding bytes to pad the header out properly.
590 4 +
591 // vtable header (size + size of table)
592 sizeof(flatbuffers::voffset_t) * 2 +
593 // offsets to all the fields.
594 sizeof(flatbuffers::voffset_t) * 8 +
595 // pointer to vtable
596 sizeof(flatbuffers::soffset_t) +
597 // remote_queue_index
598 sizeof(uint32_t) +
599 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
600 // monotonic_sent_time
601 sizeof(int64_t) * 4 +
602 // queue_index, channel_index
603 sizeof(uint32_t) * 2;
604
605 case LogType::kLogMessageAndDeliveryTime:
606 return
607 // Root table size + offset.
608 sizeof(flatbuffers::uoffset_t) * 2 +
609 // 4 padding bytes to pad the header out properly.
610 4 +
611 // vtable header (size + size of table)
612 sizeof(flatbuffers::voffset_t) * 2 +
613 // offsets to all the fields.
614 sizeof(flatbuffers::voffset_t) * 8 +
615 // pointer to vtable
616 sizeof(flatbuffers::soffset_t) +
617 // pointer to data
618 sizeof(flatbuffers::uoffset_t) +
619 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
620 // monotonic_sent_time
621 sizeof(int64_t) * 4 +
622 // remote_queue_index, queue_index, channel_index
623 sizeof(uint32_t) * 3;
624
625 case LogType::kLogRemoteMessage:
626 return
627 // Root table size + offset.
628 sizeof(flatbuffers::uoffset_t) * 2 +
629 // 6 padding bytes to pad the header out properly.
630 6 +
631 // vtable header (size + size of table)
632 sizeof(flatbuffers::voffset_t) * 2 +
633 // offsets to all the fields.
634 sizeof(flatbuffers::voffset_t) * 5 +
635 // pointer to vtable
636 sizeof(flatbuffers::soffset_t) +
637 // realtime_sent_time, monotonic_sent_time
638 sizeof(int64_t) * 2 +
639 // pointer to data
640 sizeof(flatbuffers::uoffset_t) +
641 // queue_index, channel_index
642 sizeof(uint32_t) * 2;
643 }
644 LOG(FATAL);
645}
646
James Kuszmaul9776b392023-01-14 14:08:08 -0800647flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size) {
Austin Schuhfa30c352022-10-16 11:12:02 -0700648 static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
649 "Update size logic please.");
650 const flatbuffers::uoffset_t aligned_data_length =
Austin Schuh48d10d62022-10-16 22:19:23 -0700651 ((data_size + 7) & 0xfffffff8u);
Austin Schuhfa30c352022-10-16 11:12:02 -0700652 switch (log_type) {
653 case LogType::kLogDeliveryTimeOnly:
654 return PackMessageHeaderSize(log_type);
655
656 case LogType::kLogMessage:
657 case LogType::kLogMessageAndDeliveryTime:
658 case LogType::kLogRemoteMessage:
659 return PackMessageHeaderSize(log_type) +
660 // Vector...
661 sizeof(flatbuffers::uoffset_t) + aligned_data_length;
662 }
663 LOG(FATAL);
664}
665
Austin Schuhfa30c352022-10-16 11:12:02 -0700666size_t PackMessageInline(uint8_t *buffer, const Context &context,
Austin Schuh71a40d42023-02-04 21:22:22 -0800667 int channel_index, LogType log_type, size_t start_byte,
668 size_t end_byte) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700669 // TODO(austin): Figure out how to copy directly from shared memory instead of
670 // first into the fetcher's memory and then into here. That would save a lot
671 // of memory.
Austin Schuhfa30c352022-10-16 11:12:02 -0700672 const flatbuffers::uoffset_t message_size =
Austin Schuh48d10d62022-10-16 22:19:23 -0700673 PackMessageSize(log_type, context.size);
Austin Schuh71a40d42023-02-04 21:22:22 -0800674 DCHECK_EQ((message_size % 8), 0u) << ": Non 8 byte length...";
675 DCHECK_EQ((start_byte % 8u), 0u);
676 DCHECK_EQ((end_byte % 8u), 0u);
677 DCHECK_LE(start_byte, end_byte);
678 DCHECK_LE(end_byte, message_size);
Austin Schuhfa30c352022-10-16 11:12:02 -0700679
680 // Pack all the data in. This is brittle but easy to change. Use the
681 // InlinePackMessage.Equivilent unit test to verify everything matches.
682 switch (log_type) {
683 case LogType::kLogMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -0800684 switch (start_byte) {
685 case 0x00u:
686 if ((end_byte) == 0x00u) {
687 break;
688 }
689 // clang-format off
690 // header:
691 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
692 buffer = Push<flatbuffers::uoffset_t>(
693 buffer, message_size - sizeof(flatbuffers::uoffset_t));
694
695 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
696 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
697 [[fallthrough]];
698 case 0x08u:
699 if ((end_byte) == 0x08u) {
700 break;
701 }
702 //
703 // padding:
704 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
705 buffer = Pad(buffer, 6);
706 //
707 // vtable (aos.logger.MessageHeader):
708 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
709 buffer = Push<flatbuffers::voffset_t>(buffer, 0xe);
710 [[fallthrough]];
711 case 0x10u:
712 if ((end_byte) == 0x10u) {
713 break;
714 }
715 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
716 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
717 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
718 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
719 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
720 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
721 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
722 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
723 [[fallthrough]];
724 case 0x18u:
725 if ((end_byte) == 0x18u) {
726 break;
727 }
728 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
729 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
730 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
731 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
732 //
733 // root_table (aos.logger.MessageHeader):
734 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
735 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e);
736 [[fallthrough]];
737 case 0x20u:
738 if ((end_byte) == 0x20u) {
739 break;
740 }
741 // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long)
742 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
743 [[fallthrough]];
744 case 0x28u:
745 if ((end_byte) == 0x28u) {
746 break;
747 }
748 // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long)
749 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
750 [[fallthrough]];
751 case 0x30u:
752 if ((end_byte) == 0x30u) {
753 break;
754 }
755 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
756 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c);
757 // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt)
758 buffer = Push<uint32_t>(buffer, context.queue_index);
759 [[fallthrough]];
760 case 0x38u:
761 if ((end_byte) == 0x38u) {
762 break;
763 }
764 // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt)
765 buffer = Push<uint32_t>(buffer, channel_index);
766 //
767 // vector (aos.logger.MessageHeader.data):
768 // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items)
769 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
770 [[fallthrough]];
771 case 0x40u:
772 if ((end_byte) == 0x40u) {
773 break;
774 }
775 [[fallthrough]];
776 default:
777 // +0x40 | FF | uint8_t | 0xFF (255) | value[0]
778 // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1]
779 // +0x42 | EE | uint8_t | 0xEE (238) | value[2]
780 // +0x43 | 00 | uint8_t | 0x00 (0) | value[3]
781 // +0x44 | 20 | uint8_t | 0x20 (32) | value[4]
782 // +0x45 | 4D | uint8_t | 0x4D (77) | value[5]
783 // +0x46 | FF | uint8_t | 0xFF (255) | value[6]
784 // +0x47 | 25 | uint8_t | 0x25 (37) | value[7]
785 // +0x48 | 3C | uint8_t | 0x3C (60) | value[8]
786 // +0x49 | 17 | uint8_t | 0x17 (23) | value[9]
787 // +0x4A | 65 | uint8_t | 0x65 (101) | value[10]
788 // +0x4B | 2F | uint8_t | 0x2F (47) | value[11]
789 // +0x4C | 63 | uint8_t | 0x63 (99) | value[12]
790 // +0x4D | 58 | uint8_t | 0x58 (88) | value[13]
791 //
792 // padding:
793 // +0x4E | 00 00 | uint8_t[2] | .. | padding
794 // clang-format on
795 if (start_byte <= 0x40 && end_byte == message_size) {
796 // The easy one, slap it all down.
797 buffer = PushBytes(buffer, context.data, context.size);
798 buffer =
799 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
800 } else {
801 const size_t data_start_byte =
802 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
803 const size_t data_end_byte = end_byte - 0x40;
804 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
805 if (data_start_byte < padded_size) {
806 buffer = PushBytes(
807 buffer,
808 reinterpret_cast<const uint8_t *>(context.data) +
809 data_start_byte,
810 std::min(context.size, data_end_byte) - data_start_byte);
811 if (data_end_byte == padded_size) {
812 // We can only pad the last 7 bytes, so this only gets written
813 // if we write the last byte.
814 buffer = Pad(buffer,
815 ((context.size + 7) & 0xfffffff8u) - context.size);
816 }
817 }
818 }
819 break;
820 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700821 break;
822
823 case LogType::kLogDeliveryTimeOnly:
Austin Schuh71a40d42023-02-04 21:22:22 -0800824 switch (start_byte) {
825 case 0x00u:
826 if ((end_byte) == 0x00u) {
827 break;
828 }
829 // clang-format off
830 // header:
831 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
832 buffer = Push<flatbuffers::uoffset_t>(
833 buffer, message_size - sizeof(flatbuffers::uoffset_t));
834 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
835 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
Austin Schuhfa30c352022-10-16 11:12:02 -0700836
Austin Schuh71a40d42023-02-04 21:22:22 -0800837 [[fallthrough]];
838 case 0x08u:
839 if ((end_byte) == 0x08u) {
840 break;
841 }
842 //
843 // padding:
844 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
845 buffer = Pad(buffer, 4);
846 //
847 // vtable (aos.logger.MessageHeader):
848 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
849 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
850 // +0x0E | 30 00 | uint16_t | 0x0030 (48) | size of referring table
851 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
852 [[fallthrough]];
853 case 0x10u:
854 if ((end_byte) == 0x10u) {
855 break;
856 }
857 // +0x10 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `channel_index` (id: 0)
858 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
859 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
860 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
861 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
862 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
863 // +0x16 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `queue_index` (id: 3)
864 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
865 [[fallthrough]];
866 case 0x18u:
867 if ((end_byte) == 0x18u) {
868 break;
869 }
870 // +0x18 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
871 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
872 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
873 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
874 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
875 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
876 // +0x1E | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
877 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
878 [[fallthrough]];
879 case 0x20u:
880 if ((end_byte) == 0x20u) {
881 break;
882 }
883 //
884 // root_table (aos.logger.MessageHeader):
885 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
886 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
887 // +0x24 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `remote_queue_index` (UInt)
888 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
889 [[fallthrough]];
890 case 0x28u:
891 if ((end_byte) == 0x28u) {
892 break;
893 }
894 // +0x28 | C6 85 F1 AB 83 B5 CD EB | int64_t | 0xEBCDB583ABF185C6 (-1455307527440726586) | table field `realtime_remote_time` (Long)
895 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
896 [[fallthrough]];
897 case 0x30u:
898 if ((end_byte) == 0x30u) {
899 break;
900 }
901 // +0x30 | 47 24 D3 97 1E 42 2D 99 | int64_t | 0x992D421E97D32447 (-7409193112790948793) | table field `monotonic_remote_time` (Long)
902 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
903 [[fallthrough]];
904 case 0x38u:
905 if ((end_byte) == 0x38u) {
906 break;
907 }
908 // +0x38 | C8 B9 A7 AB 79 F2 CD 60 | int64_t | 0x60CDF279ABA7B9C8 (6975498002251626952) | table field `realtime_sent_time` (Long)
909 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
910 [[fallthrough]];
911 case 0x40u:
912 if ((end_byte) == 0x40u) {
913 break;
914 }
915 // +0x40 | EA 8F 2A 0F AF 01 7A AB | int64_t | 0xAB7A01AF0F2A8FEA (-6090553694679822358) | table field `monotonic_sent_time` (Long)
916 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
917 [[fallthrough]];
918 case 0x48u:
919 if ((end_byte) == 0x48u) {
920 break;
921 }
922 // +0x48 | F5 00 00 00 | uint32_t | 0x000000F5 (245) | table field `queue_index` (UInt)
923 buffer = Push<uint32_t>(buffer, context.queue_index);
924 // +0x4C | 88 00 00 00 | uint32_t | 0x00000088 (136) | table field `channel_index` (UInt)
925 buffer = Push<uint32_t>(buffer, channel_index);
926
927 // clang-format on
928 }
Austin Schuhfa30c352022-10-16 11:12:02 -0700929 break;
930
931 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh71a40d42023-02-04 21:22:22 -0800932 switch (start_byte) {
933 case 0x00u:
934 if ((end_byte) == 0x00u) {
935 break;
936 }
937 // clang-format off
938 // header:
939 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
940 buffer = Push<flatbuffers::uoffset_t>(
941 buffer, message_size - sizeof(flatbuffers::uoffset_t));
942 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
943 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
944 [[fallthrough]];
945 case 0x08u:
946 if ((end_byte) == 0x08u) {
947 break;
948 }
949 //
950 // padding:
951 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
952 buffer = Pad(buffer, 4);
953 //
954 // vtable (aos.logger.MessageHeader):
955 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
956 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
957 // +0x0E | 34 00 | uint16_t | 0x0034 (52) | size of referring table
958 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
959 [[fallthrough]];
960 case 0x10u:
961 if ((end_byte) == 0x10u) {
962 break;
963 }
964 // +0x10 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `channel_index` (id: 0)
965 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
966 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
967 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
968 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
969 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
970 // +0x16 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `queue_index` (id: 3)
971 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
972 [[fallthrough]];
973 case 0x18u:
974 if ((end_byte) == 0x18u) {
975 break;
976 }
977 // +0x18 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `data` (id: 4)
978 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
979 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
980 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
981 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
982 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
983 // +0x1E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `remote_queue_index` (id: 7)
984 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
985 [[fallthrough]];
986 case 0x20u:
987 if ((end_byte) == 0x20u) {
988 break;
989 }
990 //
991 // root_table (aos.logger.MessageHeader):
992 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
993 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
994 // +0x24 | 30 00 00 00 | UOffset32 | 0x00000030 (48) Loc: +0x54 | offset to field `data` (vector)
995 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x30);
996 [[fallthrough]];
997 case 0x28u:
998 if ((end_byte) == 0x28u) {
999 break;
1000 }
1001 // +0x28 | C4 C8 87 BF 40 6C 1F 29 | int64_t | 0x291F6C40BF87C8C4 (2963206105180129476) | table field `realtime_remote_time` (Long)
1002 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
1003 [[fallthrough]];
1004 case 0x30u:
1005 if ((end_byte) == 0x30u) {
1006 break;
1007 }
1008 // +0x30 | 0F 00 26 FD D2 6D C0 1F | int64_t | 0x1FC06DD2FD26000F (2287949363661897743) | table field `monotonic_remote_time` (Long)
1009 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
1010 [[fallthrough]];
1011 case 0x38u:
1012 if ((end_byte) == 0x38u) {
1013 break;
1014 }
1015 // +0x38 | 29 75 09 C0 73 73 BF 88 | int64_t | 0x88BF7373C0097529 (-8593022623019338455) | table field `realtime_sent_time` (Long)
1016 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
1017 [[fallthrough]];
1018 case 0x40u:
1019 if ((end_byte) == 0x40u) {
1020 break;
1021 }
1022 // +0x40 | 6D 8A AE 04 50 25 9C E9 | int64_t | 0xE99C255004AE8A6D (-1613373540899321235) | table field `monotonic_sent_time` (Long)
1023 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
1024 [[fallthrough]];
1025 case 0x48u:
1026 if ((end_byte) == 0x48u) {
1027 break;
1028 }
1029 // +0x48 | 47 00 00 00 | uint32_t | 0x00000047 (71) | table field `remote_queue_index` (UInt)
1030 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1031 // +0x4C | 4C 00 00 00 | uint32_t | 0x0000004C (76) | table field `queue_index` (UInt)
1032 buffer = Push<uint32_t>(buffer, context.queue_index);
1033 [[fallthrough]];
1034 case 0x50u:
1035 if ((end_byte) == 0x50u) {
1036 break;
1037 }
1038 // +0x50 | 72 00 00 00 | uint32_t | 0x00000072 (114) | table field `channel_index` (UInt)
1039 buffer = Push<uint32_t>(buffer, channel_index);
1040 //
1041 // vector (aos.logger.MessageHeader.data):
1042 // +0x54 | 07 00 00 00 | uint32_t | 0x00000007 (7) | length of vector (# items)
1043 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
1044 [[fallthrough]];
1045 case 0x58u:
1046 if ((end_byte) == 0x58u) {
1047 break;
1048 }
1049 [[fallthrough]];
1050 default:
1051 // +0x58 | B1 | uint8_t | 0xB1 (177) | value[0]
1052 // +0x59 | 4A | uint8_t | 0x4A (74) | value[1]
1053 // +0x5A | 50 | uint8_t | 0x50 (80) | value[2]
1054 // +0x5B | 24 | uint8_t | 0x24 (36) | value[3]
1055 // +0x5C | AF | uint8_t | 0xAF (175) | value[4]
1056 // +0x5D | C8 | uint8_t | 0xC8 (200) | value[5]
1057 // +0x5E | D5 | uint8_t | 0xD5 (213) | value[6]
1058 //
1059 // padding:
1060 // +0x5F | 00 | uint8_t[1] | . | padding
1061 // clang-format on
1062
1063 if (start_byte <= 0x58 && end_byte == message_size) {
1064 // The easy one, slap it all down.
1065 buffer = PushBytes(buffer, context.data, context.size);
1066 buffer =
1067 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1068 } else {
1069 const size_t data_start_byte =
1070 start_byte < 0x58 ? 0x0u : (start_byte - 0x58);
1071 const size_t data_end_byte = end_byte - 0x58;
1072 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1073 if (data_start_byte < padded_size) {
1074 buffer = PushBytes(
1075 buffer,
1076 reinterpret_cast<const uint8_t *>(context.data) +
1077 data_start_byte,
1078 std::min(context.size, data_end_byte) - data_start_byte);
1079 if (data_end_byte == padded_size) {
1080 // We can only pad the last 7 bytes, so this only gets written
1081 // if we write the last byte.
1082 buffer = Pad(buffer,
1083 ((context.size + 7) & 0xfffffff8u) - context.size);
1084 }
1085 }
1086 }
1087
1088 break;
1089 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001090
1091 break;
1092
1093 case LogType::kLogRemoteMessage:
Austin Schuh71a40d42023-02-04 21:22:22 -08001094 switch (start_byte) {
1095 case 0x00u:
1096 if ((end_byte) == 0x00u) {
1097 break;
1098 }
1099 // This is the message we need to recreate.
1100 //
1101 // clang-format off
1102 // header:
1103 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
1104 buffer = Push<flatbuffers::uoffset_t>(
1105 buffer, message_size - sizeof(flatbuffers::uoffset_t));
1106 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
1107 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
1108 [[fallthrough]];
1109 case 0x08u:
1110 if ((end_byte) == 0x08u) {
1111 break;
1112 }
1113 //
1114 // padding:
1115 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1116 buffer = Pad(buffer, 6);
1117 //
1118 // vtable (aos.logger.MessageHeader):
1119 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
1120 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e);
1121 [[fallthrough]];
1122 case 0x10u:
1123 if ((end_byte) == 0x10u) {
1124 break;
1125 }
1126 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
1127 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
1128 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
1129 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
1130 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
1131 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
1132 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
1133 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
1134 [[fallthrough]];
1135 case 0x18u:
1136 if ((end_byte) == 0x18u) {
1137 break;
1138 }
1139 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
1140 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
1141 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
1142 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
1143 //
1144 // root_table (aos.logger.MessageHeader):
1145 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
1146 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E);
1147 [[fallthrough]];
1148 case 0x20u:
1149 if ((end_byte) == 0x20u) {
1150 break;
1151 }
1152 // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long)
1153 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
1154 [[fallthrough]];
1155 case 0x28u:
1156 if ((end_byte) == 0x28u) {
1157 break;
1158 }
1159 // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long)
1160 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
1161 [[fallthrough]];
1162 case 0x30u:
1163 if ((end_byte) == 0x30u) {
1164 break;
1165 }
1166 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
1167 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C);
1168 // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt)
1169 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
1170 [[fallthrough]];
1171 case 0x38u:
1172 if ((end_byte) == 0x38u) {
1173 break;
1174 }
1175 // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt)
1176 buffer = Push<uint32_t>(buffer, channel_index);
1177 //
1178 // vector (aos.logger.MessageHeader.data):
1179 // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items)
1180 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
1181 [[fallthrough]];
1182 case 0x40u:
1183 if ((end_byte) == 0x40u) {
1184 break;
1185 }
1186 [[fallthrough]];
1187 default:
1188 // +0x40 | 38 | uint8_t | 0x38 (56) | value[0]
1189 // +0x41 | 1A | uint8_t | 0x1A (26) | value[1]
1190 // ...
1191 // +0x58 | 90 | uint8_t | 0x90 (144) | value[24]
1192 // +0x59 | 92 | uint8_t | 0x92 (146) | value[25]
1193 //
1194 // padding:
1195 // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
1196 // clang-format on
1197 if (start_byte <= 0x40 && end_byte == message_size) {
1198 // The easy one, slap it all down.
1199 buffer = PushBytes(buffer, context.data, context.size);
1200 buffer =
1201 Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
1202 } else {
1203 const size_t data_start_byte =
1204 start_byte < 0x40 ? 0x0u : (start_byte - 0x40);
1205 const size_t data_end_byte = end_byte - 0x40;
1206 const size_t padded_size = ((context.size + 7) & 0xfffffff8u);
1207 if (data_start_byte < padded_size) {
1208 buffer = PushBytes(
1209 buffer,
1210 reinterpret_cast<const uint8_t *>(context.data) +
1211 data_start_byte,
1212 std::min(context.size, data_end_byte) - data_start_byte);
1213 if (data_end_byte == padded_size) {
1214 // We can only pad the last 7 bytes, so this only gets written
1215 // if we write the last byte.
1216 buffer = Pad(buffer,
1217 ((context.size + 7) & 0xfffffff8u) - context.size);
1218 }
1219 }
1220 }
1221 break;
1222 }
Austin Schuhfa30c352022-10-16 11:12:02 -07001223 }
1224
Austin Schuh71a40d42023-02-04 21:22:22 -08001225 return end_byte - start_byte;
Austin Schuhfa30c352022-10-16 11:12:02 -07001226}
1227
Austin Schuhcd368422021-11-22 21:23:29 -08001228SpanReader::SpanReader(std::string_view filename, bool quiet)
1229 : filename_(filename) {
Austin Schuh86110712022-09-16 15:40:54 -07001230 static constexpr std::string_view kS3 = "s3:";
1231 if (filename.substr(0, kS3.size()) == kS3) {
1232#if ENABLE_S3
1233 decoder_ = std::make_unique<S3Fetcher>(filename);
1234#else
1235 LOG(FATAL) << "Reading files from S3 not supported on this platform";
1236#endif
1237 } else {
1238 decoder_ = std::make_unique<DummyDecoder>(filename);
1239 }
Tyler Chatow2015bc62021-08-04 21:15:09 -07001240
1241 static constexpr std::string_view kXz = ".xz";
James Kuszmauldd0a5042021-10-28 23:38:04 -07001242 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001243 if (filename.substr(filename.size() - kXz.size()) == kXz) {
1244#if ENABLE_LZMA
Austin Schuhcd368422021-11-22 21:23:29 -08001245 decoder_ =
1246 std::make_unique<ThreadedLzmaDecoder>(std::move(decoder_), quiet);
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001247#else
Austin Schuhcd368422021-11-22 21:23:29 -08001248 (void)quiet;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001249 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
1250#endif
James Kuszmauldd0a5042021-10-28 23:38:04 -07001251 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
1252 decoder_ = std::make_unique<SnappyDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001253 }
Austin Schuh05b70472020-01-01 17:11:17 -08001254}
1255
Austin Schuhcf5f6442021-07-06 10:43:28 -07001256absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001257 // Make sure we have enough for the size.
1258 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
1259 if (!ReadBlock()) {
1260 return absl::Span<const uint8_t>();
1261 }
1262 }
1263
1264 // Now make sure we have enough for the message.
1265 const size_t data_size =
1266 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1267 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -08001268 if (data_size == sizeof(flatbuffers::uoffset_t)) {
1269 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
1270 LOG(ERROR) << " Rest of log file is "
1271 << absl::BytesToHexString(std::string_view(
1272 reinterpret_cast<const char *>(data_.data() +
1273 consumed_data_),
1274 data_.size() - consumed_data_));
1275 return absl::Span<const uint8_t>();
1276 }
Austin Schuh05b70472020-01-01 17:11:17 -08001277 while (data_.size() < consumed_data_ + data_size) {
1278 if (!ReadBlock()) {
1279 return absl::Span<const uint8_t>();
1280 }
1281 }
1282
1283 // And return it, consuming the data.
1284 const uint8_t *data_ptr = data_.data() + consumed_data_;
1285
Austin Schuh05b70472020-01-01 17:11:17 -08001286 return absl::Span<const uint8_t>(data_ptr, data_size);
1287}
1288
Austin Schuhcf5f6442021-07-06 10:43:28 -07001289void SpanReader::ConsumeMessage() {
Brian Smarttea913d42021-12-10 15:02:38 -08001290 size_t consumed_size =
Austin Schuhcf5f6442021-07-06 10:43:28 -07001291 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
1292 sizeof(flatbuffers::uoffset_t);
Brian Smarttea913d42021-12-10 15:02:38 -08001293 consumed_data_ += consumed_size;
1294 total_consumed_ += consumed_size;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001295}
1296
1297absl::Span<const uint8_t> SpanReader::ReadMessage() {
1298 absl::Span<const uint8_t> result = PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001299 if (!result.empty()) {
Austin Schuhcf5f6442021-07-06 10:43:28 -07001300 ConsumeMessage();
Brian Smarttea913d42021-12-10 15:02:38 -08001301 } else {
1302 is_finished_ = true;
Austin Schuhcf5f6442021-07-06 10:43:28 -07001303 }
1304 return result;
1305}
1306
Austin Schuh05b70472020-01-01 17:11:17 -08001307bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001308 // This is the amount of data we grab at a time. Doing larger chunks minimizes
1309 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -08001310 constexpr size_t kReadSize = 256 * 1024;
1311
1312 // Strip off any unused data at the front.
1313 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -07001314 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -08001315 consumed_data_ = 0;
1316 }
1317
1318 const size_t starting_size = data_.size();
1319
1320 // This should automatically grow the backing store. It won't shrink if we
1321 // get a small chunk later. This reduces allocations when we want to append
1322 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -07001323 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -08001324
Brian Silvermanf51499a2020-09-21 12:49:08 -07001325 const size_t count =
1326 decoder_->Read(data_.begin() + starting_size, data_.end());
1327 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -08001328 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -08001329 return false;
1330 }
Austin Schuh05b70472020-01-01 17:11:17 -08001331
Brian Smarttea913d42021-12-10 15:02:38 -08001332 total_read_ += count;
1333
Austin Schuh05b70472020-01-01 17:11:17 -08001334 return true;
1335}
1336
Austin Schuhadd6eb32020-11-09 21:24:26 -08001337std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -07001338 SpanReader *span_reader) {
1339 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -08001340
1341 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001342 if (config_data.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001343 return std::nullopt;
1344 }
Austin Schuh6f3babe2020-01-26 20:34:50 -08001345
Austin Schuh5212cad2020-09-09 23:12:09 -07001346 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001347 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -08001348 if (!result.Verify()) {
1349 return std::nullopt;
1350 }
Austin Schuh0e8db662021-07-06 10:43:47 -07001351
Austin Schuhcc2c9a52022-12-12 15:55:13 -08001352 // We only know of busted headers in the versions of the log file header
1353 // *before* the logger_sha1 field was added. At some point before that point,
1354 // the logic to track when a header has been written was rewritten in such a
1355 // way that it can't happen anymore. We've seen some logs where the body
1356 // parses as a header recently, so the simple solution of always looking is
1357 // failing us.
1358 if (FLAGS_workaround_double_headers && !result.message().has_logger_sha1()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001359 while (true) {
1360 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001361 if (maybe_header_data.empty()) {
Austin Schuh0e8db662021-07-06 10:43:47 -07001362 break;
1363 }
1364
1365 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
1366 maybe_header_data);
1367 if (maybe_header.Verify()) {
1368 LOG(WARNING) << "Found duplicate LogFileHeader in "
1369 << span_reader->filename();
1370 ResizeableBuffer header_data_copy;
1371 header_data_copy.resize(maybe_header_data.size());
1372 memcpy(header_data_copy.data(), maybe_header_data.begin(),
1373 header_data_copy.size());
1374 result = SizePrefixedFlatbufferVector<LogFileHeader>(
1375 std::move(header_data_copy));
1376
1377 span_reader->ConsumeMessage();
1378 } else {
1379 break;
1380 }
1381 }
1382 }
Austin Schuhe09beb12020-12-11 20:04:27 -08001383 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001384}
1385
Austin Schuh0e8db662021-07-06 10:43:47 -07001386std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
1387 std::string_view filename) {
1388 SpanReader span_reader(filename);
1389 return ReadHeader(&span_reader);
1390}
1391
Austin Schuhadd6eb32020-11-09 21:24:26 -08001392std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -08001393 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -07001394 SpanReader span_reader(filename);
1395 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
1396 for (size_t i = 0; i < n + 1; ++i) {
1397 data_span = span_reader.ReadMessage();
1398
1399 // Make sure something was read.
James Kuszmaul9776b392023-01-14 14:08:08 -08001400 if (data_span.empty()) {
Austin Schuh3bd4c402020-11-06 18:19:06 -08001401 return std::nullopt;
1402 }
Austin Schuh5212cad2020-09-09 23:12:09 -07001403 }
1404
Brian Silverman354697a2020-09-22 21:06:32 -07001405 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001406 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -08001407 if (!result.Verify()) {
1408 return std::nullopt;
1409 }
1410 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -07001411}
1412
Austin Schuh05b70472020-01-01 17:11:17 -08001413MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -07001414 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -08001415 raw_log_file_header_(
1416 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001417 set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
1418 set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
1419
Austin Schuh0e8db662021-07-06 10:43:47 -07001420 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
1421 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -08001422
1423 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -07001424 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -08001425
Austin Schuh0e8db662021-07-06 10:43:47 -07001426 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -08001427
Austin Schuh5b728b72021-06-16 14:57:15 -07001428 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
1429
Brian Smarttea913d42021-12-10 15:02:38 -08001430 total_verified_before_ = span_reader_.TotalConsumed();
1431
Austin Schuhcde938c2020-02-02 17:30:07 -08001432 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -08001433 FLAGS_max_out_of_order > 0
1434 ? chrono::duration_cast<chrono::nanoseconds>(
1435 chrono::duration<double>(FLAGS_max_out_of_order))
1436 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -08001437
1438 VLOG(1) << "Opened " << filename << " as node "
1439 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -08001440}
1441
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001442std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001443 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
James Kuszmaul9776b392023-01-14 14:08:08 -08001444 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001445 if (is_corrupted()) {
1446 LOG(ERROR) << "Total corrupted volumes: before = "
1447 << total_verified_before_
1448 << " | corrupted = " << total_corrupted_
1449 << " | during = " << total_verified_during_
1450 << " | after = " << total_verified_after_ << std::endl;
1451 }
1452
1453 if (span_reader_.IsIncomplete()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001454 LOG(ERROR) << "Unable to access some messages in " << filename() << " : "
1455 << span_reader_.TotalRead() << " bytes read, "
Brian Smarttea913d42021-12-10 15:02:38 -08001456 << span_reader_.TotalConsumed() << " bytes usable."
1457 << std::endl;
1458 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001459 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -08001460 }
1461
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001462 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
Brian Smarttea913d42021-12-10 15:02:38 -08001463
1464 if (crash_on_corrupt_message_flag_) {
1465 CHECK(msg.Verify()) << "Corrupted message at offset "
Austin Schuh60e77942022-05-16 17:48:24 -07001466 << total_verified_before_ << " found within "
1467 << filename()
Brian Smarttea913d42021-12-10 15:02:38 -08001468 << "; set --nocrash_on_corrupt_message to see summary;"
1469 << " also set --ignore_corrupt_messages to process"
1470 << " anyway";
1471
1472 } else if (!msg.Verify()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001473 LOG(ERROR) << "Corrupted message at offset " << total_verified_before_
Brian Smarttea913d42021-12-10 15:02:38 -08001474 << " from " << filename() << std::endl;
1475
1476 total_corrupted_ += msg_data.size();
1477
1478 while (true) {
1479 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1480
James Kuszmaul9776b392023-01-14 14:08:08 -08001481 if (msg_data.empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001482 if (!ignore_corrupt_messages_flag_) {
1483 LOG(ERROR) << "Total corrupted volumes: before = "
1484 << total_verified_before_
1485 << " | corrupted = " << total_corrupted_
1486 << " | during = " << total_verified_during_
1487 << " | after = " << total_verified_after_ << std::endl;
1488
1489 if (span_reader_.IsIncomplete()) {
1490 LOG(ERROR) << "Unable to access some messages in " << filename()
1491 << " : " << span_reader_.TotalRead() << " bytes read, "
1492 << span_reader_.TotalConsumed() << " bytes usable."
1493 << std::endl;
1494 }
1495 return nullptr;
1496 }
1497 break;
1498 }
1499
1500 SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
1501
1502 if (!next_msg.Verify()) {
1503 total_corrupted_ += msg_data.size();
1504 total_verified_during_ += total_verified_after_;
1505 total_verified_after_ = 0;
1506
1507 } else {
1508 total_verified_after_ += msg_data.size();
1509 if (ignore_corrupt_messages_flag_) {
1510 msg = next_msg;
1511 break;
1512 }
1513 }
1514 }
1515 }
1516
1517 if (is_corrupted()) {
1518 total_verified_after_ += msg_data.size();
1519 } else {
1520 total_verified_before_ += msg_data.size();
1521 }
Austin Schuh05b70472020-01-01 17:11:17 -08001522
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001523 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -07001524
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001525 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001526
1527 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -08001528
1529 if (VLOG_IS_ON(3)) {
1530 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
1531 } else if (VLOG_IS_ON(2)) {
1532 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
1533 msg_copy.mutable_message()->clear_data();
1534 VLOG(2) << "Read from " << filename() << " data "
1535 << FlatbufferToJson(msg_copy);
1536 }
1537
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001538 return result;
1539}
1540
1541std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
1542 const MessageHeader &message) {
1543 const size_t data_size = message.has_data() ? message.data()->size() : 0;
1544
1545 UnpackedMessageHeader *const unpacked_message =
1546 reinterpret_cast<UnpackedMessageHeader *>(
1547 malloc(sizeof(UnpackedMessageHeader) + data_size +
1548 kChannelDataAlignment - 1));
1549
1550 CHECK(message.has_channel_index());
1551 CHECK(message.has_monotonic_sent_time());
1552
1553 absl::Span<uint8_t> span;
1554 if (data_size > 0) {
1555 span =
1556 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
1557 &unpacked_message->actual_data[0], data_size)),
1558 data_size);
1559 }
1560
Austin Schuh826e6ce2021-11-18 20:33:10 -08001561 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001562 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001563 monotonic_remote_time = aos::monotonic_clock::time_point(
1564 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001565 }
1566 std::optional<realtime_clock::time_point> realtime_remote_time;
1567 if (message.has_realtime_remote_time()) {
1568 realtime_remote_time = realtime_clock::time_point(
1569 chrono::nanoseconds(message.realtime_remote_time()));
1570 }
1571
1572 std::optional<uint32_t> remote_queue_index;
1573 if (message.has_remote_queue_index()) {
1574 remote_queue_index = message.remote_queue_index();
1575 }
1576
James Kuszmaul9776b392023-01-14 14:08:08 -08001577 new (unpacked_message) UnpackedMessageHeader(
1578 message.channel_index(),
1579 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001580 chrono::nanoseconds(message.monotonic_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001581 realtime_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001582 chrono::nanoseconds(message.realtime_sent_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001583 message.queue_index(), monotonic_remote_time, realtime_remote_time,
1584 remote_queue_index,
1585 monotonic_clock::time_point(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001586 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
James Kuszmaul9776b392023-01-14 14:08:08 -08001587 message.has_monotonic_timestamp_time(), span);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001588
1589 if (data_size > 0) {
1590 memcpy(span.data(), message.data()->data(), data_size);
1591 }
1592
1593 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
1594 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -08001595}
1596
Austin Schuhc41603c2020-10-11 16:17:37 -07001597PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001598 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001599 if (parts_.parts.size() >= 2) {
1600 next_message_reader_.emplace(parts_.parts[1]);
1601 }
Austin Schuh48507722021-07-17 17:29:24 -07001602 ComputeBootCounts();
1603}
1604
1605void PartsMessageReader::ComputeBootCounts() {
1606 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
1607 std::nullopt);
1608
1609 // We have 3 vintages of log files with different amounts of information.
1610 if (log_file_header()->has_boot_uuids()) {
1611 // The new hotness with the boots explicitly listed out. We can use the log
1612 // file header to compute the boot count of all relevant nodes.
1613 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
1614 size_t node_index = 0;
1615 for (const flatbuffers::String *boot_uuid :
1616 *log_file_header()->boot_uuids()) {
1617 CHECK(parts_.boots);
1618 if (boot_uuid->size() != 0) {
1619 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
1620 if (it != parts_.boots->boot_count_map.end()) {
1621 boot_counts_[node_index] = it->second;
1622 }
1623 } else if (parts().boots->boots[node_index].size() == 1u) {
1624 boot_counts_[node_index] = 0;
1625 }
1626 ++node_index;
1627 }
1628 } else {
1629 // Older multi-node logs which are guarenteed to have UUIDs logged, or
1630 // single node log files with boot UUIDs in the header. We only know how to
1631 // order certain boots in certain circumstances.
1632 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
1633 for (size_t node_index = 0; node_index < boot_counts_.size();
1634 ++node_index) {
1635 CHECK(parts_.boots);
1636 if (parts().boots->boots[node_index].size() == 1u) {
1637 boot_counts_[node_index] = 0;
1638 }
1639 }
1640 } else {
1641 // Really old single node logs without any UUIDs. They can't reboot.
1642 CHECK_EQ(boot_counts_.size(), 1u);
1643 boot_counts_[0] = 0u;
1644 }
1645 }
1646}
Austin Schuhc41603c2020-10-11 16:17:37 -07001647
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001648std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -07001649 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001650 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -07001651 message_reader_.ReadMessage();
1652 if (message) {
1653 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001654 const monotonic_clock::time_point monotonic_sent_time =
1655 message->monotonic_sent_time;
1656
1657 // TODO(austin): Does this work with startup? Might need to use the
1658 // start time.
1659 // TODO(austin): Does this work with startup when we don't know the
1660 // remote start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -08001661 if (monotonic_sent_time >
1662 parts_.monotonic_start_time + max_out_of_order_duration()) {
1663 after_start_ = true;
1664 }
1665 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -08001666 CHECK_GE(monotonic_sent_time,
1667 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -08001668 << ": Max out of order of " << max_out_of_order_duration().count()
1669 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -08001670 << parts_.monotonic_start_time << " currently reading "
1671 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -08001672 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001673 return message;
1674 }
1675 NextLog();
1676 }
Austin Schuh32f68492020-11-08 21:45:51 -08001677 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001678 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -07001679}
1680
1681void PartsMessageReader::NextLog() {
1682 if (next_part_index_ == parts_.parts.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001683 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -07001684 done_ = true;
1685 return;
1686 }
Brian Silvermanfee16972021-09-14 12:06:38 -07001687 CHECK(next_message_reader_);
1688 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -07001689 ComputeBootCounts();
Brian Silvermanfee16972021-09-14 12:06:38 -07001690 if (next_part_index_ + 1 < parts_.parts.size()) {
1691 next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
1692 } else {
1693 next_message_reader_.reset();
1694 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001695 ++next_part_index_;
1696}
1697
Austin Schuh1be0ce42020-11-29 22:43:26 -08001698bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001699 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001700
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001701 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001702 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001703 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001704 return false;
1705 }
1706
1707 if (this->channel_index < m2.channel_index) {
1708 return true;
1709 } else if (this->channel_index > m2.channel_index) {
1710 return false;
1711 }
1712
1713 return this->queue_index < m2.queue_index;
1714}
1715
1716bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001717bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001718 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001719
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001720 return timestamp.time == m2.timestamp.time &&
1721 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001722}
Austin Schuh1be0ce42020-11-29 22:43:26 -08001723
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001724std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
1725 os << "{.channel_index=" << m.channel_index
1726 << ", .monotonic_sent_time=" << m.monotonic_sent_time
1727 << ", .realtime_sent_time=" << m.realtime_sent_time
1728 << ", .queue_index=" << m.queue_index;
1729 if (m.monotonic_remote_time) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001730 os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001731 }
1732 os << ", .realtime_remote_time=";
1733 PrintOptionalOrNull(&os, m.realtime_remote_time);
1734 os << ", .remote_queue_index=";
1735 PrintOptionalOrNull(&os, m.remote_queue_index);
1736 if (m.has_monotonic_timestamp_time) {
1737 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1738 }
Austin Schuh22cf7862022-09-19 19:09:42 -07001739 os << "}";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001740 return os;
1741}
1742
Austin Schuh1be0ce42020-11-29 22:43:26 -08001743std::ostream &operator<<(std::ostream &os, const Message &m) {
1744 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001745 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001746 if (m.data != nullptr) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001747 if (m.data->remote_queue_index.has_value()) {
1748 os << ", .remote_queue_index=" << *m.data->remote_queue_index;
1749 }
1750 if (m.data->monotonic_remote_time.has_value()) {
1751 os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
1752 }
Austin Schuhfb1b3292021-11-16 21:20:15 -08001753 os << ", .data=" << m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -08001754 }
1755 os << "}";
1756 return os;
1757}
1758
1759std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
1760 os << "{.channel_index=" << m.channel_index
1761 << ", .queue_index=" << m.queue_index
1762 << ", .monotonic_event_time=" << m.monotonic_event_time
1763 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -07001764 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001765 os << ", .remote_queue_index=" << m.remote_queue_index;
1766 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001767 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001768 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
1769 }
1770 if (m.realtime_remote_time != realtime_clock::min_time) {
1771 os << ", .realtime_remote_time=" << m.realtime_remote_time;
1772 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001773 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001774 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1775 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001776 if (m.data != nullptr) {
1777 os << ", .data=" << *m.data;
Austin Schuh22cf7862022-09-19 19:09:42 -07001778 } else {
1779 os << ", .data=nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08001780 }
1781 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -08001782 return os;
1783}
1784
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001785LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001786 : parts_message_reader_(log_parts),
1787 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
1788}
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001789
1790Message *LogPartsSorter::Front() {
1791 // Queue up data until enough data has been queued that the front message is
1792 // sorted enough to be safe to pop. This may do nothing, so we should make
1793 // sure the nothing path is checked quickly.
1794 if (sorted_until() != monotonic_clock::max_time) {
1795 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -07001796 if (!messages_.empty() &&
1797 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -08001798 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001799 break;
1800 }
1801
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001802 std::shared_ptr<UnpackedMessageHeader> m =
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001803 parts_message_reader_.ReadMessage();
1804 // No data left, sorted forever, work through what is left.
1805 if (!m) {
1806 sorted_until_ = monotonic_clock::max_time;
1807 break;
1808 }
1809
Austin Schuh48507722021-07-17 17:29:24 -07001810 size_t monotonic_timestamp_boot = 0;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001811 if (m->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -07001812 monotonic_timestamp_boot = parts().logger_boot_count;
1813 }
1814 size_t monotonic_remote_boot = 0xffffff;
1815
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001816 if (m->monotonic_remote_time.has_value()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001817 const Node *node =
1818 parts().config->nodes()->Get(source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001819
Austin Schuh48507722021-07-17 17:29:24 -07001820 std::optional<size_t> boot = parts_message_reader_.boot_count(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001821 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001822 CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
Austin Schuh60e77942022-05-16 17:48:24 -07001823 << ", with index " << source_node_index_[m->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -07001824 monotonic_remote_boot = *boot;
1825 }
1826
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001827 messages_.insert(
1828 Message{.channel_index = m->channel_index,
1829 .queue_index = BootQueueIndex{.boot = parts().boot_count,
1830 .index = m->queue_index},
1831 .timestamp = BootTimestamp{.boot = parts().boot_count,
1832 .time = m->monotonic_sent_time},
1833 .monotonic_remote_boot = monotonic_remote_boot,
1834 .monotonic_timestamp_boot = monotonic_timestamp_boot,
1835 .data = std::move(m)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001836
1837 // Now, update sorted_until_ to match the new message.
1838 if (parts_message_reader_.newest_timestamp() >
1839 monotonic_clock::min_time +
1840 parts_message_reader_.max_out_of_order_duration()) {
1841 sorted_until_ = parts_message_reader_.newest_timestamp() -
1842 parts_message_reader_.max_out_of_order_duration();
1843 } else {
1844 sorted_until_ = monotonic_clock::min_time;
1845 }
1846 }
1847 }
1848
1849 // Now that we have enough data queued, return a pointer to the oldest piece
1850 // of data if it exists.
1851 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -08001852 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001853 return nullptr;
1854 }
1855
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001856 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -08001857 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001858 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001859 return &(*messages_.begin());
1860}
1861
1862void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
1863
1864std::string LogPartsSorter::DebugString() const {
1865 std::stringstream ss;
1866 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001867 int count = 0;
1868 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001869 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001870 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
1871 ss << m << "\n";
1872 } else if (no_dots) {
1873 ss << "...\n";
1874 no_dots = false;
1875 }
1876 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001877 }
1878 ss << "] <- " << parts_message_reader_.filename();
1879 return ss.str();
1880}
1881
Austin Schuhd2f96102020-12-01 20:27:29 -08001882NodeMerger::NodeMerger(std::vector<LogParts> parts) {
1883 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -07001884 // Enforce that we are sorting things only from a single node from a single
1885 // boot.
1886 const std::string_view part0_node = parts[0].node;
1887 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -08001888 for (size_t i = 1; i < parts.size(); ++i) {
1889 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -07001890 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
1891 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -08001892 }
Austin Schuh715adc12021-06-29 22:07:39 -07001893
1894 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
1895
Austin Schuhd2f96102020-12-01 20:27:29 -08001896 for (LogParts &part : parts) {
1897 parts_sorters_.emplace_back(std::move(part));
1898 }
1899
Austin Schuhd2f96102020-12-01 20:27:29 -08001900 monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh9dc42612021-09-20 20:41:29 -07001901 realtime_start_time_ = realtime_clock::min_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001902 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001903 // We want to capture the earliest meaningful start time here. The start
1904 // time defaults to min_time when there's no meaningful value to report, so
1905 // let's ignore those.
Austin Schuh9dc42612021-09-20 20:41:29 -07001906 if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time) {
1907 bool accept = false;
1908 // We want to prioritize start times from the logger node. Really, we
1909 // want to prioritize start times with a valid realtime_clock time. So,
1910 // if we have a start time without a RT clock, prefer a start time with a
1911 // RT clock, even it if is later.
1912 if (parts_sorter.realtime_start_time() != realtime_clock::min_time) {
1913 // We've got a good one. See if the current start time has a good RT
1914 // clock, or if we should use this one instead.
1915 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1916 accept = true;
1917 } else if (realtime_start_time_ == realtime_clock::min_time) {
1918 // The previous start time doesn't have a good RT time, so it is very
1919 // likely the start time from a remote part file. We just found a
1920 // better start time with a real RT time, so switch to that instead.
1921 accept = true;
1922 }
1923 } else if (realtime_start_time_ == realtime_clock::min_time) {
1924 // We don't have a RT time, so take the oldest.
1925 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1926 accept = true;
1927 }
1928 }
1929
1930 if (accept) {
1931 monotonic_start_time_ = parts_sorter.monotonic_start_time();
1932 realtime_start_time_ = parts_sorter.realtime_start_time();
1933 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001934 }
1935 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001936
1937 // If there was no meaningful start time reported, just use min_time.
1938 if (monotonic_start_time_ == monotonic_clock::max_time) {
1939 monotonic_start_time_ = monotonic_clock::min_time;
1940 realtime_start_time_ = realtime_clock::min_time;
1941 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001942}
Austin Schuh8f52ed52020-11-30 23:12:39 -08001943
Austin Schuh0ca51f32020-12-25 21:51:45 -08001944std::vector<const LogParts *> NodeMerger::Parts() const {
1945 std::vector<const LogParts *> p;
1946 p.reserve(parts_sorters_.size());
1947 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
1948 p.emplace_back(&parts_sorter.parts());
1949 }
1950 return p;
1951}
1952
Austin Schuh8f52ed52020-11-30 23:12:39 -08001953Message *NodeMerger::Front() {
1954 // Return the current Front if we have one, otherwise go compute one.
1955 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -08001956 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001957 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -08001958 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001959 }
1960
1961 // Otherwise, do a simple search for the oldest message, deduplicating any
1962 // duplicates.
1963 Message *oldest = nullptr;
1964 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001965 for (LogPartsSorter &parts_sorter : parts_sorters_) {
1966 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -08001967 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001968 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001969 continue;
1970 }
1971 if (oldest == nullptr || *m < *oldest) {
1972 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -08001973 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001974 } else if (*m == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001975 // Found a duplicate. If there is a choice, we want the one which has
1976 // the timestamp time.
1977 if (!m->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001978 parts_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001979 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001980 current_->PopFront();
1981 current_ = &parts_sorter;
1982 oldest = m;
1983 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001984 CHECK_EQ(m->data->monotonic_timestamp_time,
1985 oldest->data->monotonic_timestamp_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001986 parts_sorter.PopFront();
1987 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001988 }
1989
1990 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -08001991 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001992 }
1993
Austin Schuhb000de62020-12-03 22:00:40 -08001994 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001995 CHECK_GE(oldest->timestamp.time, last_message_time_);
1996 last_message_time_ = oldest->timestamp.time;
Austin Schuh5dd22842021-11-17 16:09:39 -08001997 monotonic_oldest_time_ =
1998 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08001999 } else {
2000 last_message_time_ = monotonic_clock::max_time;
2001 }
2002
Austin Schuh8f52ed52020-11-30 23:12:39 -08002003 // Return the oldest message found. This will be nullptr if nothing was
2004 // found, indicating there is nothing left.
2005 return oldest;
2006}
2007
2008void NodeMerger::PopFront() {
2009 CHECK(current_ != nullptr) << "Popping before calling Front()";
2010 current_->PopFront();
2011 current_ = nullptr;
2012}
2013
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002014BootMerger::BootMerger(std::vector<LogParts> files) {
2015 std::vector<std::vector<LogParts>> boots;
2016
2017 // Now, we need to split things out by boot.
2018 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002019 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002020 if (boot_count + 1 > boots.size()) {
2021 boots.resize(boot_count + 1);
2022 }
2023 boots[boot_count].emplace_back(std::move(files[i]));
2024 }
2025
2026 node_mergers_.reserve(boots.size());
2027 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07002028 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002029 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -07002030 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07002031 }
2032 node_mergers_.emplace_back(
2033 std::make_unique<NodeMerger>(std::move(boots[i])));
2034 }
2035}
2036
2037Message *BootMerger::Front() {
2038 Message *result = node_mergers_[index_]->Front();
2039
2040 if (result != nullptr) {
2041 return result;
2042 }
2043
2044 if (index_ + 1u == node_mergers_.size()) {
2045 // At the end of the last node merger, just return.
2046 return nullptr;
2047 } else {
2048 ++index_;
2049 return Front();
2050 }
2051}
2052
2053void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
2054
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002055std::vector<const LogParts *> BootMerger::Parts() const {
2056 std::vector<const LogParts *> results;
2057 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
2058 std::vector<const LogParts *> node_parts = node_merger->Parts();
2059
2060 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
2061 std::make_move_iterator(node_parts.end()));
2062 }
2063
2064 return results;
2065}
2066
Austin Schuhd2f96102020-12-01 20:27:29 -08002067TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002068 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -08002069 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002070 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002071 if (!configuration_) {
2072 configuration_ = part->config;
2073 } else {
2074 CHECK_EQ(configuration_.get(), part->config.get());
2075 }
2076 }
2077 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -08002078 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
2079 // pretty simple.
2080 if (configuration::MultiNode(config)) {
2081 nodes_data_.resize(config->nodes()->size());
2082 const Node *my_node = config->nodes()->Get(node());
2083 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
2084 const Node *node = config->nodes()->Get(node_index);
2085 NodeData *node_data = &nodes_data_[node_index];
2086 node_data->channels.resize(config->channels()->size());
2087 // We should save the channel if it is delivered to the node represented
2088 // by the NodeData, but not sent by that node. That combo means it is
2089 // forwarded.
2090 size_t channel_index = 0;
2091 node_data->any_delivered = false;
2092 for (const Channel *channel : *config->channels()) {
2093 node_data->channels[channel_index].delivered =
2094 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08002095 configuration::ChannelIsSendableOnNode(channel, my_node) &&
2096 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08002097 node_data->any_delivered = node_data->any_delivered ||
2098 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08002099 if (node_data->channels[channel_index].delivered) {
2100 const Connection *connection =
2101 configuration::ConnectionToNode(channel, node);
2102 node_data->channels[channel_index].time_to_live =
2103 chrono::nanoseconds(connection->time_to_live());
2104 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002105 ++channel_index;
2106 }
2107 }
2108
2109 for (const Channel *channel : *config->channels()) {
2110 source_node_.emplace_back(configuration::GetNodeIndex(
2111 config, channel->source_node()->string_view()));
2112 }
2113 }
2114}
2115
2116void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08002117 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08002118 CHECK_NE(timestamp_mapper->node(), node());
2119 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
2120
2121 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002122 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08002123 // we could needlessly save data.
2124 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08002125 VLOG(1) << "Registering on node " << node() << " for peer node "
2126 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08002127 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
2128
2129 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07002130
2131 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08002132 }
2133}
2134
Austin Schuh79b30942021-01-24 22:32:21 -08002135void TimestampMapper::QueueMessage(Message *m) {
Austin Schuh60e77942022-05-16 17:48:24 -07002136 matched_messages_.emplace_back(
2137 TimestampedMessage{.channel_index = m->channel_index,
2138 .queue_index = m->queue_index,
2139 .monotonic_event_time = m->timestamp,
2140 .realtime_event_time = m->data->realtime_sent_time,
2141 .remote_queue_index = BootQueueIndex::Invalid(),
2142 .monotonic_remote_time = BootTimestamp::min_time(),
2143 .realtime_remote_time = realtime_clock::min_time,
2144 .monotonic_timestamp_time = BootTimestamp::min_time(),
2145 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08002146}
2147
2148TimestampedMessage *TimestampMapper::Front() {
2149 // No need to fetch anything new. A previous message still exists.
2150 switch (first_message_) {
2151 case FirstMessage::kNeedsUpdate:
2152 break;
2153 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08002154 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002155 case FirstMessage::kNullptr:
2156 return nullptr;
2157 }
2158
Austin Schuh79b30942021-01-24 22:32:21 -08002159 if (matched_messages_.empty()) {
2160 if (!QueueMatched()) {
2161 first_message_ = FirstMessage::kNullptr;
2162 return nullptr;
2163 }
2164 }
2165 first_message_ = FirstMessage::kInMessage;
2166 return &matched_messages_.front();
2167}
2168
2169bool TimestampMapper::QueueMatched() {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002170 MatchResult result = MatchResult::kEndOfFile;
2171 do {
2172 result = MaybeQueueMatched();
2173 } while (result == MatchResult::kSkipped);
2174 return result == MatchResult::kQueued;
2175}
2176
2177bool TimestampMapper::CheckReplayChannelsAndMaybePop(
2178 const TimestampedMessage & /*message*/) {
2179 if (replay_channels_callback_ &&
2180 !replay_channels_callback_(matched_messages_.back())) {
2181 matched_messages_.pop_back();
2182 return true;
2183 }
2184 return false;
2185}
2186
2187TimestampMapper::MatchResult TimestampMapper::MaybeQueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08002188 if (nodes_data_.empty()) {
2189 // Simple path. We are single node, so there are no timestamps to match!
2190 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002191 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002192 if (!m) {
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002193 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002194 }
Austin Schuh79b30942021-01-24 22:32:21 -08002195 // Enqueue this message into matched_messages_ so we have a place to
2196 // associate remote timestamps, and return it.
2197 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08002198
Austin Schuh79b30942021-01-24 22:32:21 -08002199 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2200 last_message_time_ = matched_messages_.back().monotonic_event_time;
2201
2202 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002203 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08002204 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002205 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2206 return MatchResult::kSkipped;
2207 }
2208 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002209 }
2210
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002211 // We need to only add messages to the list so they get processed for
2212 // messages which are delivered. Reuse the flow below which uses messages_
2213 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08002214 if (messages_.empty()) {
2215 if (!Queue()) {
2216 // Found nothing to add, we are out of data!
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002217 return MatchResult::kEndOfFile;
Austin Schuhd2f96102020-12-01 20:27:29 -08002218 }
2219
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002220 // Now that it has been added (and cannibalized), forget about it
2221 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002222 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002223 }
2224
2225 Message *m = &(messages_.front());
2226
2227 if (source_node_[m->channel_index] == node()) {
2228 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08002229 QueueMessage(m);
2230 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2231 last_message_time_ = matched_messages_.back().monotonic_event_time;
2232 messages_.pop_front();
2233 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002234 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2235 return MatchResult::kSkipped;
2236 }
2237 return MatchResult::kQueued;
Austin Schuhd2f96102020-12-01 20:27:29 -08002238 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002239 // Got a timestamp, find the matching remote data, match it, and return
2240 // it.
Austin Schuhd2f96102020-12-01 20:27:29 -08002241 Message data = MatchingMessageFor(*m);
2242
2243 // Return the data from the remote. The local message only has timestamp
2244 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08002245 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08002246 .channel_index = m->channel_index,
2247 .queue_index = m->queue_index,
2248 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002249 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07002250 .remote_queue_index =
2251 BootQueueIndex{.boot = m->monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002252 .index = m->data->remote_queue_index.value()},
2253 .monotonic_remote_time = {m->monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002254 m->data->monotonic_remote_time.value()},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002255 .realtime_remote_time = m->data->realtime_remote_time.value(),
2256 .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
2257 m->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08002258 .data = std::move(data.data)});
2259 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
2260 last_message_time_ = matched_messages_.back().monotonic_event_time;
2261 // Since messages_ holds the data, drop it.
2262 messages_.pop_front();
2263 timestamp_callback_(&matched_messages_.back());
Eric Schmiedebergb38477e2022-12-02 16:08:04 -07002264 if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
2265 return MatchResult::kSkipped;
2266 }
2267 return MatchResult::kQueued;
Austin Schuh79b30942021-01-24 22:32:21 -08002268 }
2269}
2270
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002271void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08002272 while (last_message_time_ <= queue_time) {
2273 if (!QueueMatched()) {
2274 return;
2275 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002276 }
2277}
2278
Austin Schuhe639ea12021-01-25 13:00:22 -08002279void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002280 // Note: queueing for time doesn't really work well across boots. So we
2281 // just assume that if you are using this, you only care about the current
2282 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002283 //
2284 // TODO(austin): Is that the right concept?
2285 //
Austin Schuhe639ea12021-01-25 13:00:22 -08002286 // Make sure we have something queued first. This makes the end time
2287 // calculation simpler, and is typically what folks want regardless.
2288 if (matched_messages_.empty()) {
2289 if (!QueueMatched()) {
2290 return;
2291 }
2292 }
2293
2294 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002295 std::max(monotonic_start_time(
2296 matched_messages_.front().monotonic_event_time.boot),
2297 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08002298 time_estimation_buffer;
2299
2300 // Place sorted messages on the list until we have
2301 // --time_estimation_buffer_seconds seconds queued up (but queue at least
2302 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002303 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08002304 if (!QueueMatched()) {
2305 return;
2306 }
2307 }
2308}
2309
Austin Schuhd2f96102020-12-01 20:27:29 -08002310void TimestampMapper::PopFront() {
2311 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08002312 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002313 first_message_ = FirstMessage::kNeedsUpdate;
2314
Austin Schuh79b30942021-01-24 22:32:21 -08002315 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002316}
2317
2318Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002319 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002320 CHECK_NOTNULL(message.data);
2321 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07002322 const BootQueueIndex remote_queue_index =
2323 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002324 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08002325
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002326 CHECK(message.data->monotonic_remote_time.has_value());
2327 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08002328
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002329 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07002330 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08002331 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002332 const realtime_clock::time_point realtime_remote_time =
2333 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08002334
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002335 TimestampMapper *peer =
2336 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08002337
2338 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002339 // asked to pull a timestamp from a peer which doesn't exist, return an
2340 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08002341 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002342 // TODO(austin): Make sure the tests hit all these paths with a boot count
2343 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002344 return Message{.channel_index = message.channel_index,
2345 .queue_index = remote_queue_index,
2346 .timestamp = monotonic_remote_time,
2347 .monotonic_remote_boot = 0xffffff,
2348 .monotonic_timestamp_boot = 0xffffff,
2349 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08002350 }
2351
2352 // The queue which will have the matching data, if available.
2353 std::deque<Message> *data_queue =
2354 &peer->nodes_data_[node()].channels[message.channel_index].messages;
2355
Austin Schuh79b30942021-01-24 22:32:21 -08002356 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08002357
2358 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002359 return Message{.channel_index = message.channel_index,
2360 .queue_index = remote_queue_index,
2361 .timestamp = monotonic_remote_time,
2362 .monotonic_remote_boot = 0xffffff,
2363 .monotonic_timestamp_boot = 0xffffff,
2364 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002365 }
2366
Austin Schuhd2f96102020-12-01 20:27:29 -08002367 if (remote_queue_index < data_queue->front().queue_index ||
2368 remote_queue_index > data_queue->back().queue_index) {
Austin Schuh60e77942022-05-16 17:48:24 -07002369 return Message{.channel_index = message.channel_index,
2370 .queue_index = remote_queue_index,
2371 .timestamp = monotonic_remote_time,
2372 .monotonic_remote_boot = 0xffffff,
2373 .monotonic_timestamp_boot = 0xffffff,
2374 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08002375 }
2376
Austin Schuh993ccb52020-12-12 15:59:32 -08002377 // The algorithm below is constant time with some assumptions. We need there
2378 // to be no missing messages in the data stream. This also assumes a queue
2379 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07002380 if (data_queue->back().queue_index.boot ==
2381 data_queue->front().queue_index.boot &&
2382 (data_queue->back().queue_index.index -
2383 data_queue->front().queue_index.index + 1u ==
2384 data_queue->size())) {
2385 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08002386 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07002387 //
2388 // TODO(austin): Move if not reliable.
2389 Message result = (*data_queue)[remote_queue_index.index -
2390 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08002391
2392 CHECK_EQ(result.timestamp, monotonic_remote_time)
2393 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh6a7358f2021-11-18 22:40:40 -08002394 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08002395 << ": Queue index matches, but timestamp doesn't. Please investigate!";
2396 // Now drop the data off the front. We have deduplicated timestamps, so we
2397 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07002398 data_queue->erase(
2399 data_queue->begin(),
2400 data_queue->begin() +
2401 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08002402 return result;
2403 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07002404 // TODO(austin): Binary search.
2405 auto it = std::find_if(
2406 data_queue->begin(), data_queue->end(),
2407 [remote_queue_index,
2408 remote_boot = monotonic_remote_time.boot](const Message &m) {
2409 return m.queue_index == remote_queue_index &&
2410 m.timestamp.boot == remote_boot;
2411 });
Austin Schuh993ccb52020-12-12 15:59:32 -08002412 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002413 return Message{.channel_index = message.channel_index,
2414 .queue_index = remote_queue_index,
2415 .timestamp = monotonic_remote_time,
2416 .monotonic_remote_boot = 0xffffff,
2417 .monotonic_timestamp_boot = 0xffffff,
2418 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08002419 }
2420
2421 Message result = std::move(*it);
2422
2423 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002424 << ": Queue index matches, but timestamp doesn't. Please "
2425 "investigate!";
2426 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
2427 << ": Queue index matches, but timestamp doesn't. Please "
2428 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08002429
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08002430 // Erase everything up to this message. We want to keep 1 message in the
2431 // queue so we can handle reliable messages forwarded across boots.
2432 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08002433
2434 return result;
2435 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002436}
2437
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002438void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002439 if (queued_until_ > t) {
2440 return;
2441 }
2442 while (true) {
2443 if (!messages_.empty() && messages_.back().timestamp > t) {
2444 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
2445 return;
2446 }
2447
2448 if (!Queue()) {
2449 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002450 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08002451 return;
2452 }
2453
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002454 // Now that it has been added (and cannibalized), forget about it
2455 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002456 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002457 }
2458}
2459
2460bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002461 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002462 if (m == nullptr) {
2463 return false;
2464 }
2465 for (NodeData &node_data : nodes_data_) {
2466 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07002467 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08002468 if (node_data.channels[m->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08002469 // If we have data but no timestamps (logs where the timestamps didn't get
2470 // logged are classic), we can grow this indefinitely. We don't need to
2471 // keep anything that is older than the last message returned.
2472
2473 // We have the time on the source node.
2474 // We care to wait until we have the time on the destination node.
2475 std::deque<Message> &messages =
2476 node_data.channels[m->channel_index].messages;
2477 // Max delay over the network is the TTL, so let's take the queue time and
2478 // add TTL to it. Don't forget any messages which are reliable until
2479 // someone can come up with a good reason to forget those too.
2480 if (node_data.channels[m->channel_index].time_to_live >
2481 chrono::nanoseconds(0)) {
2482 // We need to make *some* assumptions about network delay for this to
2483 // work. We want to only look at the RX side. This means we need to
2484 // track the last time a message was popped from any channel from the
2485 // node sending this message, and compare that to the max time we expect
2486 // that a message will take to be delivered across the network. This
2487 // assumes that messages are popped in time order as a proxy for
2488 // measuring the distributed time at this layer.
2489 //
2490 // Leave at least 1 message in here so we can handle reboots and
2491 // messages getting sent twice.
2492 while (messages.size() > 1u &&
2493 messages.begin()->timestamp +
2494 node_data.channels[m->channel_index].time_to_live +
2495 chrono::duration_cast<chrono::nanoseconds>(
2496 chrono::duration<double>(FLAGS_max_network_delay)) <
2497 last_popped_message_time_) {
2498 messages.pop_front();
2499 }
2500 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002501 node_data.channels[m->channel_index].messages.emplace_back(*m);
2502 }
2503 }
2504
2505 messages_.emplace_back(std::move(*m));
2506 return true;
2507}
2508
2509std::string TimestampMapper::DebugString() const {
2510 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07002511 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002512 for (const Message &message : messages_) {
2513 ss << " " << message << "\n";
2514 }
2515 ss << "] queued_until " << queued_until_;
2516 for (const NodeData &ns : nodes_data_) {
2517 if (ns.peer == nullptr) continue;
2518 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
2519 size_t channel_index = 0;
2520 for (const NodeData::ChannelData &channel_data :
2521 ns.peer->nodes_data_[node()].channels) {
2522 if (channel_data.messages.empty()) {
2523 continue;
2524 }
Austin Schuhb000de62020-12-03 22:00:40 -08002525
Austin Schuhd2f96102020-12-01 20:27:29 -08002526 ss << " channel " << channel_index << " [\n";
2527 for (const Message &m : channel_data.messages) {
2528 ss << " " << m << "\n";
2529 }
2530 ss << " ]\n";
2531 ++channel_index;
2532 }
2533 ss << "] queued_until " << ns.peer->queued_until_;
2534 }
2535 return ss.str();
2536}
2537
Austin Schuhee711052020-08-24 16:06:09 -07002538std::string MaybeNodeName(const Node *node) {
2539 if (node != nullptr) {
2540 return node->name()->str() + " ";
2541 }
2542 return "";
2543}
2544
Brian Silvermanf51499a2020-09-21 12:49:08 -07002545} // namespace aos::logger