blob: 342cf8b5d7e67b0ebbed1ec6fbd5a3dcae1632f1 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Austin Schuha36c8902019-12-30 18:07:15 -080010
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070013#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080014#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080015#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "gflags/gflags.h"
18#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080019
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070020#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070021#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070022#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070023#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070024#else
25#define ENABLE_LZMA 0
26#endif
27
28#if ENABLE_LZMA
29#include "aos/events/logging/lzma_encoder.h"
30#endif
Austin Schuh86110712022-09-16 15:40:54 -070031#if ENABLE_S3
32#include "aos/events/logging/s3_fetcher.h"
33#endif
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070034
Austin Schuh48d10d62022-10-16 22:19:23 -070035DEFINE_int32(flush_size, 128 * 1024,
Austin Schuha36c8902019-12-30 18:07:15 -080036 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070037DEFINE_double(
38 flush_period, 5.0,
39 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080040
Austin Schuha040c3f2021-02-13 16:09:07 -080041DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080042 max_network_delay, 1.0,
43 "Max time to assume a message takes to cross the network before we are "
44 "willing to drop it from our buffers and assume it didn't make it. "
45 "Increasing this number can increase memory usage depending on the packet "
46 "loss of your network or if the timestamps aren't logged for a message.");
47
48DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080049 max_out_of_order, -1,
50 "If set, this overrides the max out of order duration for a log file.");
51
Austin Schuh0e8db662021-07-06 10:43:47 -070052DEFINE_bool(workaround_double_headers, true,
53 "Some old log files have two headers at the beginning. Use the "
54 "last header as the actual header.");
55
Brian Smarttea913d42021-12-10 15:02:38 -080056DEFINE_bool(crash_on_corrupt_message, true,
57 "When true, MessageReader will crash the first time a message "
58 "with corrupted format is found. When false, the crash will be "
59 "suppressed, and any remaining readable messages will be "
60 "evaluated to present verified vs corrupted stats.");
61
62DEFINE_bool(ignore_corrupt_messages, false,
63 "When true, and crash_on_corrupt_message is false, then any "
64 "corrupt message found by MessageReader be silently ignored, "
65 "providing access to all uncorrupted messages in a logfile.");
66
Brian Silvermanf51499a2020-09-21 12:49:08 -070067namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070068namespace {
Austin Schuha36c8902019-12-30 18:07:15 -080069
Austin Schuh05b70472020-01-01 17:11:17 -080070namespace chrono = std::chrono;
71
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070072template <typename T>
73void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
74 if (t.has_value()) {
75 *os << *t;
76 } else {
77 *os << "null";
78 }
79}
80} // namespace
81
Austin Schuh48d10d62022-10-16 22:19:23 -070082DetachedBufferWriter::DetachedBufferWriter(std::string_view filename,
83 std::unique_ptr<DataEncoder> encoder)
Brian Silvermanf51499a2020-09-21 12:49:08 -070084 : filename_(filename), encoder_(std::move(encoder)) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -070085 if (!util::MkdirPIfSpace(filename, 0777)) {
86 ran_out_of_space_ = true;
87 } else {
Austin Schuh48d10d62022-10-16 22:19:23 -070088 fd_ = open(filename_.c_str(),
Brian Silvermana9f2ec92020-10-06 18:00:53 -070089 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
90 if (fd_ == -1 && errno == ENOSPC) {
91 ran_out_of_space_ = true;
92 } else {
Austin Schuh58646e22021-08-23 23:51:46 -070093 PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
94 << " for writing";
95 VLOG(1) << "Opened " << this->filename() << " for writing";
Brian Silvermana9f2ec92020-10-06 18:00:53 -070096 }
97 }
Austin Schuha36c8902019-12-30 18:07:15 -080098}
99
100DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700101 Close();
102 if (ran_out_of_space_) {
103 CHECK(acknowledge_ran_out_of_space_)
104 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -0700105 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700106}
107
Brian Silvermand90905f2020-09-23 14:42:56 -0700108DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700109 *this = std::move(other);
110}
111
Brian Silverman87ac0402020-09-17 14:47:01 -0700112// When other is destroyed "soon" (which it should be because we're getting an
113// rvalue reference to it), it will flush etc all the data we have queued up
114// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700115DetachedBufferWriter &DetachedBufferWriter::operator=(
116 DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700117 std::swap(filename_, other.filename_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700118 std::swap(encoder_, other.encoder_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700119 std::swap(fd_, other.fd_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700120 std::swap(ran_out_of_space_, other.ran_out_of_space_);
121 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700122 std::swap(iovec_, other.iovec_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700123 std::swap(max_write_time_, other.max_write_time_);
124 std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
125 std::swap(max_write_time_messages_, other.max_write_time_messages_);
126 std::swap(total_write_time_, other.total_write_time_);
127 std::swap(total_write_count_, other.total_write_count_);
128 std::swap(total_write_messages_, other.total_write_messages_);
129 std::swap(total_write_bytes_, other.total_write_bytes_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700130 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800131}
132
Brian Silvermanf51499a2020-09-21 12:49:08 -0700133void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700134 if (ran_out_of_space_) {
135 // We don't want any later data to be written after space becomes
136 // available, so refuse to write anything more once we've dropped data
137 // because we ran out of space.
138 VLOG(1) << "Ignoring span: " << span.size();
139 return;
140 }
141
Austin Schuh48d10d62022-10-16 22:19:23 -0700142 if (!encoder_->HasSpace(span.size())) {
143 Flush();
144 CHECK(encoder_->HasSpace(span.size()));
145 }
146 DataEncoder::SpanCopier coppier(span);
147 encoder_->Encode(&coppier);
148 FlushAtThreshold(aos::monotonic_clock::now());
149}
Austin Schuha36c8902019-12-30 18:07:15 -0800150
Austin Schuh48d10d62022-10-16 22:19:23 -0700151void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *coppier,
152 aos::monotonic_clock::time_point now) {
153 if (ran_out_of_space_) {
154 return;
Austin Schuha36c8902019-12-30 18:07:15 -0800155 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700156
Austin Schuh48d10d62022-10-16 22:19:23 -0700157 if (!encoder_->HasSpace(coppier->size())) {
158 Flush();
159 CHECK(encoder_->HasSpace(coppier->size()));
160 }
161
162 encoder_->Encode(coppier);
Austin Schuhbd06ae42021-03-31 22:48:21 -0700163 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800164}
165
Brian Silverman0465fcf2020-09-24 00:29:18 -0700166void DetachedBufferWriter::Close() {
167 if (fd_ == -1) {
168 return;
169 }
170 encoder_->Finish();
171 while (encoder_->queue_size() > 0) {
172 Flush();
173 }
174 if (close(fd_) == -1) {
175 if (errno == ENOSPC) {
176 ran_out_of_space_ = true;
177 } else {
178 PLOG(ERROR) << "Closing log file failed";
179 }
180 }
181 fd_ = -1;
Austin Schuh58646e22021-08-23 23:51:46 -0700182 VLOG(1) << "Closed " << filename();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700183}
184
Austin Schuha36c8902019-12-30 18:07:15 -0800185void DetachedBufferWriter::Flush() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700186 if (ran_out_of_space_) {
187 // We don't want any later data to be written after space becomes available,
188 // so refuse to write anything more once we've dropped data because we ran
189 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700190 if (encoder_) {
191 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
192 encoder_->Clear(encoder_->queue().size());
193 } else {
194 VLOG(1) << "No queue to ignore";
195 }
196 return;
197 }
198
199 const auto queue = encoder_->queue();
200 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700201 return;
202 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700203
Austin Schuha36c8902019-12-30 18:07:15 -0800204 iovec_.clear();
Brian Silvermanf51499a2020-09-21 12:49:08 -0700205 const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
206 iovec_.resize(iovec_size);
Austin Schuha36c8902019-12-30 18:07:15 -0800207 size_t counted_size = 0;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700208 for (size_t i = 0; i < iovec_size; ++i) {
209 iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
210 iovec_[i].iov_len = queue[i].size();
211 counted_size += iovec_[i].iov_len;
Austin Schuha36c8902019-12-30 18:07:15 -0800212 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700213
214 const auto start = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800215 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700216 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700217 HandleWriteReturn(written, counted_size);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700218
219 encoder_->Clear(iovec_size);
220
221 UpdateStatsForWrite(end - start, written, iovec_size);
222}
223
Brian Silverman0465fcf2020-09-24 00:29:18 -0700224void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
225 size_t write_size) {
226 if (write_return == -1 && errno == ENOSPC) {
227 ran_out_of_space_ = true;
228 return;
229 }
230 PCHECK(write_return >= 0) << ": write failed";
231 if (write_return < static_cast<ssize_t>(write_size)) {
232 // Sometimes this happens instead of ENOSPC. On a real filesystem, this
233 // never seems to happen in any other case. If we ever want to log to a
234 // socket, this will happen more often. However, until we get there, we'll
235 // just assume it means we ran out of space.
236 ran_out_of_space_ = true;
237 return;
238 }
239}
240
Brian Silvermanf51499a2020-09-21 12:49:08 -0700241void DetachedBufferWriter::UpdateStatsForWrite(
242 aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
243 if (duration > max_write_time_) {
244 max_write_time_ = duration;
245 max_write_time_bytes_ = written;
246 max_write_time_messages_ = iovec_size;
247 }
248 total_write_time_ += duration;
249 ++total_write_count_;
250 total_write_messages_ += iovec_size;
251 total_write_bytes_ += written;
252}
253
Austin Schuhbd06ae42021-03-31 22:48:21 -0700254void DetachedBufferWriter::FlushAtThreshold(
255 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700256 if (ran_out_of_space_) {
257 // We don't want any later data to be written after space becomes available,
258 // so refuse to write anything more once we've dropped data because we ran
259 // out of space.
260 if (encoder_) {
261 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
262 encoder_->Clear(encoder_->queue().size());
263 } else {
264 VLOG(1) << "No queue to ignore";
265 }
266 return;
267 }
268
Austin Schuhbd06ae42021-03-31 22:48:21 -0700269 // We don't want to flush the first time through. Otherwise we will flush as
270 // the log file header might be compressing, defeating any parallelism and
271 // queueing there.
272 if (last_flush_time_ == aos::monotonic_clock::min_time) {
273 last_flush_time_ = now;
274 }
275
Brian Silvermanf51499a2020-09-21 12:49:08 -0700276 // Flush if we are at the max number of iovs per writev, because there's no
277 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700278 // data queued up or if it has been long enough.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700279 while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700280 encoder_->queue_size() >= IOV_MAX ||
281 now > last_flush_time_ +
282 chrono::duration_cast<chrono::nanoseconds>(
283 chrono::duration<double>(FLAGS_flush_period))) {
284 last_flush_time_ = now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700285 Flush();
286 }
Austin Schuha36c8902019-12-30 18:07:15 -0800287}
288
Austin Schuhf2d0e682022-10-16 14:20:58 -0700289// Do the magic dance to convert the endianness of the data and append it to the
290// buffer.
291namespace {
292
293// TODO(austin): Look at the generated code to see if building the header is
294// efficient or not.
295template <typename T>
296uint8_t *Push(uint8_t *buffer, const T data) {
297 const T endian_data = flatbuffers::EndianScalar<T>(data);
298 std::memcpy(buffer, &endian_data, sizeof(T));
299 return buffer + sizeof(T);
300}
301
302uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
303 std::memcpy(buffer, data, size);
304 return buffer + size;
305}
306
307uint8_t *Pad(uint8_t *buffer, size_t padding) {
308 std::memset(buffer, 0, padding);
309 return buffer + padding;
310}
311} // namespace
312
313flatbuffers::Offset<MessageHeader> PackRemoteMessage(
314 flatbuffers::FlatBufferBuilder *fbb,
315 const message_bridge::RemoteMessage *msg, int channel_index,
316 const aos::monotonic_clock::time_point monotonic_timestamp_time) {
317 logger::MessageHeader::Builder message_header_builder(*fbb);
318 // Note: this must match the same order as MessageBridgeServer and
319 // PackMessage. We want identical headers to have identical
320 // on-the-wire formats to make comparing them easier.
321
322 message_header_builder.add_channel_index(channel_index);
323
324 message_header_builder.add_queue_index(msg->queue_index());
325 message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
326 message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
327
328 message_header_builder.add_monotonic_remote_time(
329 msg->monotonic_remote_time());
330 message_header_builder.add_realtime_remote_time(msg->realtime_remote_time());
331 message_header_builder.add_remote_queue_index(msg->remote_queue_index());
332
333 message_header_builder.add_monotonic_timestamp_time(
334 monotonic_timestamp_time.time_since_epoch().count());
335
336 return message_header_builder.Finish();
337}
338
339size_t PackRemoteMessageInline(
340 uint8_t *buffer, const message_bridge::RemoteMessage *msg,
341 int channel_index,
342 const aos::monotonic_clock::time_point monotonic_timestamp_time) {
343 const flatbuffers::uoffset_t message_size = PackRemoteMessageSize();
344
345 // clang-format off
346 // header:
347 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
348 buffer = Push<flatbuffers::uoffset_t>(
349 buffer, message_size - sizeof(flatbuffers::uoffset_t));
350 // +0x04 | 20 00 00 00 | UOffset32 | 0x00000020 (32) Loc: +0x24 | offset to root table `aos.logger.MessageHeader`
351 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x20);
352 //
353 // padding:
354 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
355 buffer = Pad(buffer, 6);
356 //
357 // vtable (aos.logger.MessageHeader):
358 // +0x0E | 16 00 | uint16_t | 0x0016 (22) | size of this vtable
359 buffer = Push<flatbuffers::voffset_t>(buffer, 0x16);
360 // +0x10 | 3C 00 | uint16_t | 0x003C (60) | size of referring table
361 buffer = Push<flatbuffers::voffset_t>(buffer, 0x3c);
362 // +0x12 | 38 00 | VOffset16 | 0x0038 (56) | offset to field `channel_index` (id: 0)
363 buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
364 // +0x14 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `monotonic_sent_time` (id: 1)
365 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
366 // +0x16 | 24 00 | VOffset16 | 0x0024 (36) | offset to field `realtime_sent_time` (id: 2)
367 buffer = Push<flatbuffers::voffset_t>(buffer, 0x24);
368 // +0x18 | 34 00 | VOffset16 | 0x0034 (52) | offset to field `queue_index` (id: 3)
369 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
370 // +0x1A | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
371 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
372 // +0x1C | 1C 00 | VOffset16 | 0x001C (28) | offset to field `monotonic_remote_time` (id: 5)
373 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
374 // +0x1E | 14 00 | VOffset16 | 0x0014 (20) | offset to field `realtime_remote_time` (id: 6)
375 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
376 // +0x20 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `remote_queue_index` (id: 7)
377 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
378 // +0x22 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `monotonic_timestamp_time` (id: 8)
379 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
380 //
381 // root_table (aos.logger.MessageHeader):
382 // +0x24 | 16 00 00 00 | SOffset32 | 0x00000016 (22) Loc: +0x0E | offset to vtable
383 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x16);
384 // +0x28 | F6 0B D8 11 A4 A8 B1 71 | int64_t | 0x71B1A8A411D80BF6 (8192514619791117302) | table field `monotonic_timestamp_time` (Long)
385 buffer = Push<int64_t>(buffer,
386 monotonic_timestamp_time.time_since_epoch().count());
387 // +0x30 | 00 00 00 00 | uint8_t[4] | .... | padding
388 // TODO(austin): Can we re-arrange the order to ditch the padding?
389 // (Answer is yes, but what is the impact elsewhere? It will change the
390 // binary format)
391 buffer = Pad(buffer, 4);
392 // +0x34 | 75 00 00 00 | uint32_t | 0x00000075 (117) | table field `remote_queue_index` (UInt)
393 buffer = Push<uint32_t>(buffer, msg->remote_queue_index());
394 // +0x38 | AA B0 43 0A 35 BE FA D2 | int64_t | 0xD2FABE350A43B0AA (-3244071446552268630) | table field `realtime_remote_time` (Long)
395 buffer = Push<int64_t>(buffer, msg->realtime_remote_time());
396 // +0x40 | D5 40 30 F3 C1 A7 26 1D | int64_t | 0x1D26A7C1F33040D5 (2100550727665467605) | table field `monotonic_remote_time` (Long)
397 buffer = Push<int64_t>(buffer, msg->monotonic_remote_time());
398 // +0x48 | 5B 25 32 A1 4A E8 46 CA | int64_t | 0xCA46E84AA132255B (-3871151422448720549) | table field `realtime_sent_time` (Long)
399 buffer = Push<int64_t>(buffer, msg->realtime_sent_time());
400 // +0x50 | 49 7D 45 1F 8C 36 6B A3 | int64_t | 0xA36B368C1F457D49 (-6671178447571288759) | table field `monotonic_sent_time` (Long)
401 buffer = Push<int64_t>(buffer, msg->monotonic_sent_time());
402 // +0x58 | 33 00 00 00 | uint32_t | 0x00000033 (51) | table field `queue_index` (UInt)
403 buffer = Push<uint32_t>(buffer, msg->queue_index());
404 // +0x5C | 76 00 00 00 | uint32_t | 0x00000076 (118) | table field `channel_index` (UInt)
405 buffer = Push<uint32_t>(buffer, channel_index);
406 // clang-format on
407
408 return message_size;
409}
410
Austin Schuha36c8902019-12-30 18:07:15 -0800411flatbuffers::Offset<MessageHeader> PackMessage(
412 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
413 int channel_index, LogType log_type) {
414 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
415
416 switch (log_type) {
417 case LogType::kLogMessage:
418 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800419 case LogType::kLogRemoteMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700420 // Since the timestamps are 8 byte aligned, we are going to end up adding
421 // padding in the middle of the message to pad everything out to 8 byte
422 // alignment. That's rather wasteful. To make things efficient to mmap
423 // while reading uncompressed logs, we'd actually rather the message be
424 // aligned. So, force 8 byte alignment (enough to preserve alignment
425 // inside the nested message so that we can read it without moving it)
426 // here.
427 fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8);
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700428 data_offset = fbb->CreateVector(
429 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800430 break;
431
432 case LogType::kLogDeliveryTimeOnly:
433 break;
434 }
435
436 MessageHeader::Builder message_header_builder(*fbb);
437 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800438
Austin Schuhfa30c352022-10-16 11:12:02 -0700439 // These are split out into very explicit serialization calls because the
440 // order here changes the order things are written out on the wire, and we
441 // want to control and understand it here. Changing the order can increase
442 // the amount of padding bytes in the middle.
443 //
444 // It is also easier to follow... And doesn't actually make things much bigger.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800445 switch (log_type) {
446 case LogType::kLogRemoteMessage:
447 message_header_builder.add_queue_index(context.remote_queue_index);
Austin Schuhfa30c352022-10-16 11:12:02 -0700448 message_header_builder.add_data(data_offset);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800449 message_header_builder.add_monotonic_sent_time(
450 context.monotonic_remote_time.time_since_epoch().count());
451 message_header_builder.add_realtime_sent_time(
452 context.realtime_remote_time.time_since_epoch().count());
453 break;
454
Austin Schuh6f3babe2020-01-26 20:34:50 -0800455 case LogType::kLogDeliveryTimeOnly:
456 message_header_builder.add_queue_index(context.queue_index);
457 message_header_builder.add_monotonic_sent_time(
458 context.monotonic_event_time.time_since_epoch().count());
459 message_header_builder.add_realtime_sent_time(
460 context.realtime_event_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800461 message_header_builder.add_monotonic_remote_time(
462 context.monotonic_remote_time.time_since_epoch().count());
463 message_header_builder.add_realtime_remote_time(
464 context.realtime_remote_time.time_since_epoch().count());
465 message_header_builder.add_remote_queue_index(context.remote_queue_index);
466 break;
Austin Schuhfa30c352022-10-16 11:12:02 -0700467
468 case LogType::kLogMessage:
469 message_header_builder.add_queue_index(context.queue_index);
470 message_header_builder.add_data(data_offset);
471 message_header_builder.add_monotonic_sent_time(
472 context.monotonic_event_time.time_since_epoch().count());
473 message_header_builder.add_realtime_sent_time(
474 context.realtime_event_time.time_since_epoch().count());
475 break;
476
477 case LogType::kLogMessageAndDeliveryTime:
478 message_header_builder.add_queue_index(context.queue_index);
479 message_header_builder.add_remote_queue_index(context.remote_queue_index);
480 message_header_builder.add_monotonic_sent_time(
481 context.monotonic_event_time.time_since_epoch().count());
482 message_header_builder.add_realtime_sent_time(
483 context.realtime_event_time.time_since_epoch().count());
484 message_header_builder.add_monotonic_remote_time(
485 context.monotonic_remote_time.time_since_epoch().count());
486 message_header_builder.add_realtime_remote_time(
487 context.realtime_remote_time.time_since_epoch().count());
488 message_header_builder.add_data(data_offset);
489 break;
Austin Schuha36c8902019-12-30 18:07:15 -0800490 }
491
492 return message_header_builder.Finish();
493}
494
Austin Schuhfa30c352022-10-16 11:12:02 -0700495flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) {
496 switch (log_type) {
497 case LogType::kLogMessage:
498 return
499 // Root table size + offset.
500 sizeof(flatbuffers::uoffset_t) * 2 +
501 // 6 padding bytes to pad the header out properly.
502 6 +
503 // vtable header (size + size of table)
504 sizeof(flatbuffers::voffset_t) * 2 +
505 // offsets to all the fields.
506 sizeof(flatbuffers::voffset_t) * 5 +
507 // pointer to vtable
508 sizeof(flatbuffers::soffset_t) +
509 // pointer to data
510 sizeof(flatbuffers::uoffset_t) +
511 // realtime_sent_time, monotonic_sent_time
512 sizeof(int64_t) * 2 +
513 // queue_index, channel_index
514 sizeof(uint32_t) * 2;
515
516 case LogType::kLogDeliveryTimeOnly:
517 return
518 // Root table size + offset.
519 sizeof(flatbuffers::uoffset_t) * 2 +
520 // 6 padding bytes to pad the header out properly.
521 4 +
522 // vtable header (size + size of table)
523 sizeof(flatbuffers::voffset_t) * 2 +
524 // offsets to all the fields.
525 sizeof(flatbuffers::voffset_t) * 8 +
526 // pointer to vtable
527 sizeof(flatbuffers::soffset_t) +
528 // remote_queue_index
529 sizeof(uint32_t) +
530 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
531 // monotonic_sent_time
532 sizeof(int64_t) * 4 +
533 // queue_index, channel_index
534 sizeof(uint32_t) * 2;
535
536 case LogType::kLogMessageAndDeliveryTime:
537 return
538 // Root table size + offset.
539 sizeof(flatbuffers::uoffset_t) * 2 +
540 // 4 padding bytes to pad the header out properly.
541 4 +
542 // vtable header (size + size of table)
543 sizeof(flatbuffers::voffset_t) * 2 +
544 // offsets to all the fields.
545 sizeof(flatbuffers::voffset_t) * 8 +
546 // pointer to vtable
547 sizeof(flatbuffers::soffset_t) +
548 // pointer to data
549 sizeof(flatbuffers::uoffset_t) +
550 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
551 // monotonic_sent_time
552 sizeof(int64_t) * 4 +
553 // remote_queue_index, queue_index, channel_index
554 sizeof(uint32_t) * 3;
555
556 case LogType::kLogRemoteMessage:
557 return
558 // Root table size + offset.
559 sizeof(flatbuffers::uoffset_t) * 2 +
560 // 6 padding bytes to pad the header out properly.
561 6 +
562 // vtable header (size + size of table)
563 sizeof(flatbuffers::voffset_t) * 2 +
564 // offsets to all the fields.
565 sizeof(flatbuffers::voffset_t) * 5 +
566 // pointer to vtable
567 sizeof(flatbuffers::soffset_t) +
568 // realtime_sent_time, monotonic_sent_time
569 sizeof(int64_t) * 2 +
570 // pointer to data
571 sizeof(flatbuffers::uoffset_t) +
572 // queue_index, channel_index
573 sizeof(uint32_t) * 2;
574 }
575 LOG(FATAL);
576}
577
578flatbuffers::uoffset_t PackMessageSize(LogType log_type,
Austin Schuh48d10d62022-10-16 22:19:23 -0700579 size_t data_size) {
Austin Schuhfa30c352022-10-16 11:12:02 -0700580 static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
581 "Update size logic please.");
582 const flatbuffers::uoffset_t aligned_data_length =
Austin Schuh48d10d62022-10-16 22:19:23 -0700583 ((data_size + 7) & 0xfffffff8u);
Austin Schuhfa30c352022-10-16 11:12:02 -0700584 switch (log_type) {
585 case LogType::kLogDeliveryTimeOnly:
586 return PackMessageHeaderSize(log_type);
587
588 case LogType::kLogMessage:
589 case LogType::kLogMessageAndDeliveryTime:
590 case LogType::kLogRemoteMessage:
591 return PackMessageHeaderSize(log_type) +
592 // Vector...
593 sizeof(flatbuffers::uoffset_t) + aligned_data_length;
594 }
595 LOG(FATAL);
596}
597
Austin Schuhfa30c352022-10-16 11:12:02 -0700598size_t PackMessageInline(uint8_t *buffer, const Context &context,
599 int channel_index, LogType log_type) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700600 // TODO(austin): Figure out how to copy directly from shared memory instead of
601 // first into the fetcher's memory and then into here. That would save a lot
602 // of memory.
Austin Schuhfa30c352022-10-16 11:12:02 -0700603 const flatbuffers::uoffset_t message_size =
Austin Schuh48d10d62022-10-16 22:19:23 -0700604 PackMessageSize(log_type, context.size);
Austin Schuhfa30c352022-10-16 11:12:02 -0700605
606 buffer = Push<flatbuffers::uoffset_t>(
607 buffer, message_size - sizeof(flatbuffers::uoffset_t));
608
609 // Pack all the data in. This is brittle but easy to change. Use the
610 // InlinePackMessage.Equivilent unit test to verify everything matches.
611 switch (log_type) {
612 case LogType::kLogMessage:
613 // clang-format off
614 // header:
615 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
616 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
617 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
618 //
619 // padding:
620 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
621 buffer = Pad(buffer, 6);
622 //
623 // vtable (aos.logger.MessageHeader):
624 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
625 buffer = Push<flatbuffers::voffset_t>(buffer, 0xe);
626 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
627 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
628 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
629 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
630 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
631 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
632 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
633 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
634 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
635 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
636 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
637 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
638 //
639 // root_table (aos.logger.MessageHeader):
640 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
641 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e);
642 // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long)
643 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
644 // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long)
645 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
646 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
647 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c);
648 // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt)
649 buffer = Push<uint32_t>(buffer, context.queue_index);
650 // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt)
651 buffer = Push<uint32_t>(buffer, channel_index);
652 //
653 // vector (aos.logger.MessageHeader.data):
654 // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items)
655 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
656 // +0x40 | FF | uint8_t | 0xFF (255) | value[0]
657 // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1]
658 // +0x42 | EE | uint8_t | 0xEE (238) | value[2]
659 // +0x43 | 00 | uint8_t | 0x00 (0) | value[3]
660 // +0x44 | 20 | uint8_t | 0x20 (32) | value[4]
661 // +0x45 | 4D | uint8_t | 0x4D (77) | value[5]
662 // +0x46 | FF | uint8_t | 0xFF (255) | value[6]
663 // +0x47 | 25 | uint8_t | 0x25 (37) | value[7]
664 // +0x48 | 3C | uint8_t | 0x3C (60) | value[8]
665 // +0x49 | 17 | uint8_t | 0x17 (23) | value[9]
666 // +0x4A | 65 | uint8_t | 0x65 (101) | value[10]
667 // +0x4B | 2F | uint8_t | 0x2F (47) | value[11]
668 // +0x4C | 63 | uint8_t | 0x63 (99) | value[12]
669 // +0x4D | 58 | uint8_t | 0x58 (88) | value[13]
670 buffer = PushBytes(buffer, context.data, context.size);
671 //
672 // padding:
673 // +0x4E | 00 00 | uint8_t[2] | .. | padding
674 buffer = Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
675 // clang-format on
676 break;
677
678 case LogType::kLogDeliveryTimeOnly:
679 // clang-format off
680 // header:
681 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
682 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
683 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
684 //
685 // padding:
686 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
687 buffer = Pad(buffer, 4);
688 //
689 // vtable (aos.logger.MessageHeader):
690 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
691 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
692 // +0x0E | 30 00 | uint16_t | 0x0030 (48) | size of referring table
693 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
694 // +0x10 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `channel_index` (id: 0)
695 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
696 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
697 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
698 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
699 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
700 // +0x16 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `queue_index` (id: 3)
701 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
702 // +0x18 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
703 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
704 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
705 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
706 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
707 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
708 // +0x1E | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
709 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
710 //
711 // root_table (aos.logger.MessageHeader):
712 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
713 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
714 // +0x24 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `remote_queue_index` (UInt)
715 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
716 // +0x28 | C6 85 F1 AB 83 B5 CD EB | int64_t | 0xEBCDB583ABF185C6 (-1455307527440726586) | table field `realtime_remote_time` (Long)
717 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
718 // +0x30 | 47 24 D3 97 1E 42 2D 99 | int64_t | 0x992D421E97D32447 (-7409193112790948793) | table field `monotonic_remote_time` (Long)
719 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
720 // +0x38 | C8 B9 A7 AB 79 F2 CD 60 | int64_t | 0x60CDF279ABA7B9C8 (6975498002251626952) | table field `realtime_sent_time` (Long)
721 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
722 // +0x40 | EA 8F 2A 0F AF 01 7A AB | int64_t | 0xAB7A01AF0F2A8FEA (-6090553694679822358) | table field `monotonic_sent_time` (Long)
723 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
724 // +0x48 | F5 00 00 00 | uint32_t | 0x000000F5 (245) | table field `queue_index` (UInt)
725 buffer = Push<uint32_t>(buffer, context.queue_index);
726 // +0x4C | 88 00 00 00 | uint32_t | 0x00000088 (136) | table field `channel_index` (UInt)
727 buffer = Push<uint32_t>(buffer, channel_index);
728
729 // clang-format on
730 break;
731
732 case LogType::kLogMessageAndDeliveryTime:
733 // clang-format off
734 // header:
735 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
736 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
737 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
738 //
739 // padding:
740 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
741 buffer = Pad(buffer, 4);
742 //
743 // vtable (aos.logger.MessageHeader):
744 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
745 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
746 // +0x0E | 34 00 | uint16_t | 0x0034 (52) | size of referring table
747 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
748 // +0x10 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `channel_index` (id: 0)
749 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
750 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
751 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
752 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
753 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
754 // +0x16 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `queue_index` (id: 3)
755 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
756 // +0x18 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `data` (id: 4)
757 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
758 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
759 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
760 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
761 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
762 // +0x1E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `remote_queue_index` (id: 7)
763 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
764 //
765 // root_table (aos.logger.MessageHeader):
766 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
767 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
768 // +0x24 | 30 00 00 00 | UOffset32 | 0x00000030 (48) Loc: +0x54 | offset to field `data` (vector)
769 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x30);
770 // +0x28 | C4 C8 87 BF 40 6C 1F 29 | int64_t | 0x291F6C40BF87C8C4 (2963206105180129476) | table field `realtime_remote_time` (Long)
771 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
772 // +0x30 | 0F 00 26 FD D2 6D C0 1F | int64_t | 0x1FC06DD2FD26000F (2287949363661897743) | table field `monotonic_remote_time` (Long)
773 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
774 // +0x38 | 29 75 09 C0 73 73 BF 88 | int64_t | 0x88BF7373C0097529 (-8593022623019338455) | table field `realtime_sent_time` (Long)
775 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
776 // +0x40 | 6D 8A AE 04 50 25 9C E9 | int64_t | 0xE99C255004AE8A6D (-1613373540899321235) | table field `monotonic_sent_time` (Long)
777 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
778 // +0x48 | 47 00 00 00 | uint32_t | 0x00000047 (71) | table field `remote_queue_index` (UInt)
779 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
780 // +0x4C | 4C 00 00 00 | uint32_t | 0x0000004C (76) | table field `queue_index` (UInt)
781 buffer = Push<uint32_t>(buffer, context.queue_index);
782 // +0x50 | 72 00 00 00 | uint32_t | 0x00000072 (114) | table field `channel_index` (UInt)
783 buffer = Push<uint32_t>(buffer, channel_index);
784 //
785 // vector (aos.logger.MessageHeader.data):
786 // +0x54 | 07 00 00 00 | uint32_t | 0x00000007 (7) | length of vector (# items)
787 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
788 // +0x58 | B1 | uint8_t | 0xB1 (177) | value[0]
789 // +0x59 | 4A | uint8_t | 0x4A (74) | value[1]
790 // +0x5A | 50 | uint8_t | 0x50 (80) | value[2]
791 // +0x5B | 24 | uint8_t | 0x24 (36) | value[3]
792 // +0x5C | AF | uint8_t | 0xAF (175) | value[4]
793 // +0x5D | C8 | uint8_t | 0xC8 (200) | value[5]
794 // +0x5E | D5 | uint8_t | 0xD5 (213) | value[6]
795 buffer = PushBytes(buffer, context.data, context.size);
796 //
797 // padding:
798 // +0x5F | 00 | uint8_t[1] | . | padding
799 buffer = Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
800 // clang-format on
801
802 break;
803
804 case LogType::kLogRemoteMessage:
805 // This is the message we need to recreate.
806 //
807 // clang-format off
808 // header:
809 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
810 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
811 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
812 //
813 // padding:
814 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
815 buffer = Pad(buffer, 6);
816 //
817 // vtable (aos.logger.MessageHeader):
818 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
819 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e);
820 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
821 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
822 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
823 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
824 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
825 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
826 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
827 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
828 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
829 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
830 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
831 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
832 //
833 // root_table (aos.logger.MessageHeader):
834 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
835 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E);
836 // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long)
837 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
838 // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long)
839 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
840 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
841 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C);
842 // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt)
843 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
844 // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt)
845 buffer = Push<uint32_t>(buffer, channel_index);
846 //
847 // vector (aos.logger.MessageHeader.data):
848 // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items)
849 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
850 // +0x40 | 38 | uint8_t | 0x38 (56) | value[0]
851 // +0x41 | 1A | uint8_t | 0x1A (26) | value[1]
852 // ...
853 // +0x58 | 90 | uint8_t | 0x90 (144) | value[24]
854 // +0x59 | 92 | uint8_t | 0x92 (146) | value[25]
855 buffer = PushBytes(buffer, context.data, context.size);
856 //
857 // padding:
858 // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
859 buffer = Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
860 // clang-format on
861 }
862
863 return message_size;
864}
865
Austin Schuhcd368422021-11-22 21:23:29 -0800866SpanReader::SpanReader(std::string_view filename, bool quiet)
867 : filename_(filename) {
Austin Schuh86110712022-09-16 15:40:54 -0700868 static constexpr std::string_view kS3 = "s3:";
869 if (filename.substr(0, kS3.size()) == kS3) {
870#if ENABLE_S3
871 decoder_ = std::make_unique<S3Fetcher>(filename);
872#else
873 LOG(FATAL) << "Reading files from S3 not supported on this platform";
874#endif
875 } else {
876 decoder_ = std::make_unique<DummyDecoder>(filename);
877 }
Tyler Chatow2015bc62021-08-04 21:15:09 -0700878
879 static constexpr std::string_view kXz = ".xz";
James Kuszmauldd0a5042021-10-28 23:38:04 -0700880 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700881 if (filename.substr(filename.size() - kXz.size()) == kXz) {
882#if ENABLE_LZMA
Austin Schuhcd368422021-11-22 21:23:29 -0800883 decoder_ =
884 std::make_unique<ThreadedLzmaDecoder>(std::move(decoder_), quiet);
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700885#else
Austin Schuhcd368422021-11-22 21:23:29 -0800886 (void)quiet;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700887 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
888#endif
James Kuszmauldd0a5042021-10-28 23:38:04 -0700889 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
890 decoder_ = std::make_unique<SnappyDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700891 }
Austin Schuh05b70472020-01-01 17:11:17 -0800892}
893
Austin Schuhcf5f6442021-07-06 10:43:28 -0700894absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800895 // Make sure we have enough for the size.
896 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
897 if (!ReadBlock()) {
898 return absl::Span<const uint8_t>();
899 }
900 }
901
902 // Now make sure we have enough for the message.
903 const size_t data_size =
904 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
905 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -0800906 if (data_size == sizeof(flatbuffers::uoffset_t)) {
907 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
908 LOG(ERROR) << " Rest of log file is "
909 << absl::BytesToHexString(std::string_view(
910 reinterpret_cast<const char *>(data_.data() +
911 consumed_data_),
912 data_.size() - consumed_data_));
913 return absl::Span<const uint8_t>();
914 }
Austin Schuh05b70472020-01-01 17:11:17 -0800915 while (data_.size() < consumed_data_ + data_size) {
916 if (!ReadBlock()) {
917 return absl::Span<const uint8_t>();
918 }
919 }
920
921 // And return it, consuming the data.
922 const uint8_t *data_ptr = data_.data() + consumed_data_;
923
Austin Schuh05b70472020-01-01 17:11:17 -0800924 return absl::Span<const uint8_t>(data_ptr, data_size);
925}
926
Austin Schuhcf5f6442021-07-06 10:43:28 -0700927void SpanReader::ConsumeMessage() {
Brian Smarttea913d42021-12-10 15:02:38 -0800928 size_t consumed_size =
Austin Schuhcf5f6442021-07-06 10:43:28 -0700929 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
930 sizeof(flatbuffers::uoffset_t);
Brian Smarttea913d42021-12-10 15:02:38 -0800931 consumed_data_ += consumed_size;
932 total_consumed_ += consumed_size;
Austin Schuhcf5f6442021-07-06 10:43:28 -0700933}
934
935absl::Span<const uint8_t> SpanReader::ReadMessage() {
936 absl::Span<const uint8_t> result = PeekMessage();
937 if (result != absl::Span<const uint8_t>()) {
938 ConsumeMessage();
Brian Smarttea913d42021-12-10 15:02:38 -0800939 } else {
940 is_finished_ = true;
Austin Schuhcf5f6442021-07-06 10:43:28 -0700941 }
942 return result;
943}
944
Austin Schuh05b70472020-01-01 17:11:17 -0800945bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700946 // This is the amount of data we grab at a time. Doing larger chunks minimizes
947 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -0800948 constexpr size_t kReadSize = 256 * 1024;
949
950 // Strip off any unused data at the front.
951 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700952 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -0800953 consumed_data_ = 0;
954 }
955
956 const size_t starting_size = data_.size();
957
958 // This should automatically grow the backing store. It won't shrink if we
959 // get a small chunk later. This reduces allocations when we want to append
960 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700961 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -0800962
Brian Silvermanf51499a2020-09-21 12:49:08 -0700963 const size_t count =
964 decoder_->Read(data_.begin() + starting_size, data_.end());
965 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -0800966 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -0800967 return false;
968 }
Austin Schuh05b70472020-01-01 17:11:17 -0800969
Brian Smarttea913d42021-12-10 15:02:38 -0800970 total_read_ += count;
971
Austin Schuh05b70472020-01-01 17:11:17 -0800972 return true;
973}
974
Austin Schuhadd6eb32020-11-09 21:24:26 -0800975std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -0700976 SpanReader *span_reader) {
977 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800978
979 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800980 if (config_data == absl::Span<const uint8_t>()) {
981 return std::nullopt;
982 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800983
Austin Schuh5212cad2020-09-09 23:12:09 -0700984 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700985 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -0800986 if (!result.Verify()) {
987 return std::nullopt;
988 }
Austin Schuh0e8db662021-07-06 10:43:47 -0700989
Austin Schuhcc2c9a52022-12-12 15:55:13 -0800990 // We only know of busted headers in the versions of the log file header
991 // *before* the logger_sha1 field was added. At some point before that point,
992 // the logic to track when a header has been written was rewritten in such a
993 // way that it can't happen anymore. We've seen some logs where the body
994 // parses as a header recently, so the simple solution of always looking is
995 // failing us.
996 if (FLAGS_workaround_double_headers && !result.message().has_logger_sha1()) {
Austin Schuh0e8db662021-07-06 10:43:47 -0700997 while (true) {
998 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
999 if (maybe_header_data == absl::Span<const uint8_t>()) {
1000 break;
1001 }
1002
1003 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
1004 maybe_header_data);
1005 if (maybe_header.Verify()) {
1006 LOG(WARNING) << "Found duplicate LogFileHeader in "
1007 << span_reader->filename();
1008 ResizeableBuffer header_data_copy;
1009 header_data_copy.resize(maybe_header_data.size());
1010 memcpy(header_data_copy.data(), maybe_header_data.begin(),
1011 header_data_copy.size());
1012 result = SizePrefixedFlatbufferVector<LogFileHeader>(
1013 std::move(header_data_copy));
1014
1015 span_reader->ConsumeMessage();
1016 } else {
1017 break;
1018 }
1019 }
1020 }
Austin Schuhe09beb12020-12-11 20:04:27 -08001021 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001022}
1023
Austin Schuh0e8db662021-07-06 10:43:47 -07001024std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
1025 std::string_view filename) {
1026 SpanReader span_reader(filename);
1027 return ReadHeader(&span_reader);
1028}
1029
Austin Schuhadd6eb32020-11-09 21:24:26 -08001030std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -08001031 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -07001032 SpanReader span_reader(filename);
1033 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
1034 for (size_t i = 0; i < n + 1; ++i) {
1035 data_span = span_reader.ReadMessage();
1036
1037 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -08001038 if (data_span == absl::Span<const uint8_t>()) {
1039 return std::nullopt;
1040 }
Austin Schuh5212cad2020-09-09 23:12:09 -07001041 }
1042
Brian Silverman354697a2020-09-22 21:06:32 -07001043 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001044 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -08001045 if (!result.Verify()) {
1046 return std::nullopt;
1047 }
1048 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -07001049}
1050
Austin Schuh05b70472020-01-01 17:11:17 -08001051MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -07001052 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -08001053 raw_log_file_header_(
1054 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001055 set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
1056 set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
1057
Austin Schuh0e8db662021-07-06 10:43:47 -07001058 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
1059 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -08001060
1061 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -07001062 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -08001063
Austin Schuh0e8db662021-07-06 10:43:47 -07001064 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -08001065
Austin Schuh5b728b72021-06-16 14:57:15 -07001066 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
1067
Brian Smarttea913d42021-12-10 15:02:38 -08001068 total_verified_before_ = span_reader_.TotalConsumed();
1069
Austin Schuhcde938c2020-02-02 17:30:07 -08001070 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -08001071 FLAGS_max_out_of_order > 0
1072 ? chrono::duration_cast<chrono::nanoseconds>(
1073 chrono::duration<double>(FLAGS_max_out_of_order))
1074 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -08001075
1076 VLOG(1) << "Opened " << filename << " as node "
1077 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -08001078}
1079
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001080std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001081 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1082 if (msg_data == absl::Span<const uint8_t>()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001083 if (is_corrupted()) {
1084 LOG(ERROR) << "Total corrupted volumes: before = "
1085 << total_verified_before_
1086 << " | corrupted = " << total_corrupted_
1087 << " | during = " << total_verified_during_
1088 << " | after = " << total_verified_after_ << std::endl;
1089 }
1090
1091 if (span_reader_.IsIncomplete()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001092 LOG(ERROR) << "Unable to access some messages in " << filename() << " : "
1093 << span_reader_.TotalRead() << " bytes read, "
Brian Smarttea913d42021-12-10 15:02:38 -08001094 << span_reader_.TotalConsumed() << " bytes usable."
1095 << std::endl;
1096 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001097 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -08001098 }
1099
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001100 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
Brian Smarttea913d42021-12-10 15:02:38 -08001101
1102 if (crash_on_corrupt_message_flag_) {
1103 CHECK(msg.Verify()) << "Corrupted message at offset "
Austin Schuh60e77942022-05-16 17:48:24 -07001104 << total_verified_before_ << " found within "
1105 << filename()
Brian Smarttea913d42021-12-10 15:02:38 -08001106 << "; set --nocrash_on_corrupt_message to see summary;"
1107 << " also set --ignore_corrupt_messages to process"
1108 << " anyway";
1109
1110 } else if (!msg.Verify()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001111 LOG(ERROR) << "Corrupted message at offset " << total_verified_before_
Brian Smarttea913d42021-12-10 15:02:38 -08001112 << " from " << filename() << std::endl;
1113
1114 total_corrupted_ += msg_data.size();
1115
1116 while (true) {
1117 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1118
1119 if (msg_data == absl::Span<const uint8_t>()) {
1120 if (!ignore_corrupt_messages_flag_) {
1121 LOG(ERROR) << "Total corrupted volumes: before = "
1122 << total_verified_before_
1123 << " | corrupted = " << total_corrupted_
1124 << " | during = " << total_verified_during_
1125 << " | after = " << total_verified_after_ << std::endl;
1126
1127 if (span_reader_.IsIncomplete()) {
1128 LOG(ERROR) << "Unable to access some messages in " << filename()
1129 << " : " << span_reader_.TotalRead() << " bytes read, "
1130 << span_reader_.TotalConsumed() << " bytes usable."
1131 << std::endl;
1132 }
1133 return nullptr;
1134 }
1135 break;
1136 }
1137
1138 SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
1139
1140 if (!next_msg.Verify()) {
1141 total_corrupted_ += msg_data.size();
1142 total_verified_during_ += total_verified_after_;
1143 total_verified_after_ = 0;
1144
1145 } else {
1146 total_verified_after_ += msg_data.size();
1147 if (ignore_corrupt_messages_flag_) {
1148 msg = next_msg;
1149 break;
1150 }
1151 }
1152 }
1153 }
1154
1155 if (is_corrupted()) {
1156 total_verified_after_ += msg_data.size();
1157 } else {
1158 total_verified_before_ += msg_data.size();
1159 }
Austin Schuh05b70472020-01-01 17:11:17 -08001160
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001161 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -07001162
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001163 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001164
1165 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -08001166
1167 if (VLOG_IS_ON(3)) {
1168 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
1169 } else if (VLOG_IS_ON(2)) {
1170 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
1171 msg_copy.mutable_message()->clear_data();
1172 VLOG(2) << "Read from " << filename() << " data "
1173 << FlatbufferToJson(msg_copy);
1174 }
1175
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001176 return result;
1177}
1178
1179std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
1180 const MessageHeader &message) {
1181 const size_t data_size = message.has_data() ? message.data()->size() : 0;
1182
1183 UnpackedMessageHeader *const unpacked_message =
1184 reinterpret_cast<UnpackedMessageHeader *>(
1185 malloc(sizeof(UnpackedMessageHeader) + data_size +
1186 kChannelDataAlignment - 1));
1187
1188 CHECK(message.has_channel_index());
1189 CHECK(message.has_monotonic_sent_time());
1190
1191 absl::Span<uint8_t> span;
1192 if (data_size > 0) {
1193 span =
1194 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
1195 &unpacked_message->actual_data[0], data_size)),
1196 data_size);
1197 }
1198
Austin Schuh826e6ce2021-11-18 20:33:10 -08001199 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001200 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001201 monotonic_remote_time = aos::monotonic_clock::time_point(
1202 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001203 }
1204 std::optional<realtime_clock::time_point> realtime_remote_time;
1205 if (message.has_realtime_remote_time()) {
1206 realtime_remote_time = realtime_clock::time_point(
1207 chrono::nanoseconds(message.realtime_remote_time()));
1208 }
1209
1210 std::optional<uint32_t> remote_queue_index;
1211 if (message.has_remote_queue_index()) {
1212 remote_queue_index = message.remote_queue_index();
1213 }
1214
1215 new (unpacked_message) UnpackedMessageHeader{
1216 .channel_index = message.channel_index(),
1217 .monotonic_sent_time = monotonic_clock::time_point(
1218 chrono::nanoseconds(message.monotonic_sent_time())),
1219 .realtime_sent_time = realtime_clock::time_point(
1220 chrono::nanoseconds(message.realtime_sent_time())),
1221 .queue_index = message.queue_index(),
1222 .monotonic_remote_time = monotonic_remote_time,
1223 .realtime_remote_time = realtime_remote_time,
1224 .remote_queue_index = remote_queue_index,
1225 .monotonic_timestamp_time = monotonic_clock::time_point(
1226 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
1227 .has_monotonic_timestamp_time = message.has_monotonic_timestamp_time(),
1228 .span = span};
1229
1230 if (data_size > 0) {
1231 memcpy(span.data(), message.data()->data(), data_size);
1232 }
1233
1234 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
1235 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -08001236}
1237
Austin Schuhc41603c2020-10-11 16:17:37 -07001238PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001239 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001240 if (parts_.parts.size() >= 2) {
1241 next_message_reader_.emplace(parts_.parts[1]);
1242 }
Austin Schuh48507722021-07-17 17:29:24 -07001243 ComputeBootCounts();
1244}
1245
1246void PartsMessageReader::ComputeBootCounts() {
1247 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
1248 std::nullopt);
1249
1250 // We have 3 vintages of log files with different amounts of information.
1251 if (log_file_header()->has_boot_uuids()) {
1252 // The new hotness with the boots explicitly listed out. We can use the log
1253 // file header to compute the boot count of all relevant nodes.
1254 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
1255 size_t node_index = 0;
1256 for (const flatbuffers::String *boot_uuid :
1257 *log_file_header()->boot_uuids()) {
1258 CHECK(parts_.boots);
1259 if (boot_uuid->size() != 0) {
1260 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
1261 if (it != parts_.boots->boot_count_map.end()) {
1262 boot_counts_[node_index] = it->second;
1263 }
1264 } else if (parts().boots->boots[node_index].size() == 1u) {
1265 boot_counts_[node_index] = 0;
1266 }
1267 ++node_index;
1268 }
1269 } else {
1270 // Older multi-node logs which are guarenteed to have UUIDs logged, or
1271 // single node log files with boot UUIDs in the header. We only know how to
1272 // order certain boots in certain circumstances.
1273 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
1274 for (size_t node_index = 0; node_index < boot_counts_.size();
1275 ++node_index) {
1276 CHECK(parts_.boots);
1277 if (parts().boots->boots[node_index].size() == 1u) {
1278 boot_counts_[node_index] = 0;
1279 }
1280 }
1281 } else {
1282 // Really old single node logs without any UUIDs. They can't reboot.
1283 CHECK_EQ(boot_counts_.size(), 1u);
1284 boot_counts_[0] = 0u;
1285 }
1286 }
1287}
Austin Schuhc41603c2020-10-11 16:17:37 -07001288
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001289std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -07001290 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001291 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -07001292 message_reader_.ReadMessage();
1293 if (message) {
1294 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001295 const monotonic_clock::time_point monotonic_sent_time =
1296 message->monotonic_sent_time;
1297
1298 // TODO(austin): Does this work with startup? Might need to use the
1299 // start time.
1300 // TODO(austin): Does this work with startup when we don't know the
1301 // remote start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -08001302 if (monotonic_sent_time >
1303 parts_.monotonic_start_time + max_out_of_order_duration()) {
1304 after_start_ = true;
1305 }
1306 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -08001307 CHECK_GE(monotonic_sent_time,
1308 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -08001309 << ": Max out of order of " << max_out_of_order_duration().count()
1310 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -08001311 << parts_.monotonic_start_time << " currently reading "
1312 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -08001313 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001314 return message;
1315 }
1316 NextLog();
1317 }
Austin Schuh32f68492020-11-08 21:45:51 -08001318 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001319 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -07001320}
1321
1322void PartsMessageReader::NextLog() {
1323 if (next_part_index_ == parts_.parts.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001324 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -07001325 done_ = true;
1326 return;
1327 }
Brian Silvermanfee16972021-09-14 12:06:38 -07001328 CHECK(next_message_reader_);
1329 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -07001330 ComputeBootCounts();
Brian Silvermanfee16972021-09-14 12:06:38 -07001331 if (next_part_index_ + 1 < parts_.parts.size()) {
1332 next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
1333 } else {
1334 next_message_reader_.reset();
1335 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001336 ++next_part_index_;
1337}
1338
Austin Schuh1be0ce42020-11-29 22:43:26 -08001339bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001340 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001341
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001342 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001343 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001344 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001345 return false;
1346 }
1347
1348 if (this->channel_index < m2.channel_index) {
1349 return true;
1350 } else if (this->channel_index > m2.channel_index) {
1351 return false;
1352 }
1353
1354 return this->queue_index < m2.queue_index;
1355}
1356
1357bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001358bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001359 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001360
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001361 return timestamp.time == m2.timestamp.time &&
1362 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001363}
Austin Schuh1be0ce42020-11-29 22:43:26 -08001364
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001365std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
1366 os << "{.channel_index=" << m.channel_index
1367 << ", .monotonic_sent_time=" << m.monotonic_sent_time
1368 << ", .realtime_sent_time=" << m.realtime_sent_time
1369 << ", .queue_index=" << m.queue_index;
1370 if (m.monotonic_remote_time) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001371 os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001372 }
1373 os << ", .realtime_remote_time=";
1374 PrintOptionalOrNull(&os, m.realtime_remote_time);
1375 os << ", .remote_queue_index=";
1376 PrintOptionalOrNull(&os, m.remote_queue_index);
1377 if (m.has_monotonic_timestamp_time) {
1378 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1379 }
Austin Schuh22cf7862022-09-19 19:09:42 -07001380 os << "}";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001381 return os;
1382}
1383
Austin Schuh1be0ce42020-11-29 22:43:26 -08001384std::ostream &operator<<(std::ostream &os, const Message &m) {
1385 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001386 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001387 if (m.data != nullptr) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001388 if (m.data->remote_queue_index.has_value()) {
1389 os << ", .remote_queue_index=" << *m.data->remote_queue_index;
1390 }
1391 if (m.data->monotonic_remote_time.has_value()) {
1392 os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
1393 }
Austin Schuhfb1b3292021-11-16 21:20:15 -08001394 os << ", .data=" << m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -08001395 }
1396 os << "}";
1397 return os;
1398}
1399
1400std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
1401 os << "{.channel_index=" << m.channel_index
1402 << ", .queue_index=" << m.queue_index
1403 << ", .monotonic_event_time=" << m.monotonic_event_time
1404 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -07001405 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001406 os << ", .remote_queue_index=" << m.remote_queue_index;
1407 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001408 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001409 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
1410 }
1411 if (m.realtime_remote_time != realtime_clock::min_time) {
1412 os << ", .realtime_remote_time=" << m.realtime_remote_time;
1413 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001414 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001415 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1416 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001417 if (m.data != nullptr) {
1418 os << ", .data=" << *m.data;
Austin Schuh22cf7862022-09-19 19:09:42 -07001419 } else {
1420 os << ", .data=nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08001421 }
1422 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -08001423 return os;
1424}
1425
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001426LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001427 : parts_message_reader_(log_parts),
1428 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
1429}
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001430
1431Message *LogPartsSorter::Front() {
1432 // Queue up data until enough data has been queued that the front message is
1433 // sorted enough to be safe to pop. This may do nothing, so we should make
1434 // sure the nothing path is checked quickly.
1435 if (sorted_until() != monotonic_clock::max_time) {
1436 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -07001437 if (!messages_.empty() &&
1438 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -08001439 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001440 break;
1441 }
1442
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001443 std::shared_ptr<UnpackedMessageHeader> m =
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001444 parts_message_reader_.ReadMessage();
1445 // No data left, sorted forever, work through what is left.
1446 if (!m) {
1447 sorted_until_ = monotonic_clock::max_time;
1448 break;
1449 }
1450
Austin Schuh48507722021-07-17 17:29:24 -07001451 size_t monotonic_timestamp_boot = 0;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001452 if (m->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -07001453 monotonic_timestamp_boot = parts().logger_boot_count;
1454 }
1455 size_t monotonic_remote_boot = 0xffffff;
1456
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001457 if (m->monotonic_remote_time.has_value()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001458 const Node *node =
1459 parts().config->nodes()->Get(source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001460
Austin Schuh48507722021-07-17 17:29:24 -07001461 std::optional<size_t> boot = parts_message_reader_.boot_count(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001462 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001463 CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
Austin Schuh60e77942022-05-16 17:48:24 -07001464 << ", with index " << source_node_index_[m->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -07001465 monotonic_remote_boot = *boot;
1466 }
1467
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001468 messages_.insert(
1469 Message{.channel_index = m->channel_index,
1470 .queue_index = BootQueueIndex{.boot = parts().boot_count,
1471 .index = m->queue_index},
1472 .timestamp = BootTimestamp{.boot = parts().boot_count,
1473 .time = m->monotonic_sent_time},
1474 .monotonic_remote_boot = monotonic_remote_boot,
1475 .monotonic_timestamp_boot = monotonic_timestamp_boot,
1476 .data = std::move(m)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001477
1478 // Now, update sorted_until_ to match the new message.
1479 if (parts_message_reader_.newest_timestamp() >
1480 monotonic_clock::min_time +
1481 parts_message_reader_.max_out_of_order_duration()) {
1482 sorted_until_ = parts_message_reader_.newest_timestamp() -
1483 parts_message_reader_.max_out_of_order_duration();
1484 } else {
1485 sorted_until_ = monotonic_clock::min_time;
1486 }
1487 }
1488 }
1489
1490 // Now that we have enough data queued, return a pointer to the oldest piece
1491 // of data if it exists.
1492 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -08001493 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001494 return nullptr;
1495 }
1496
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001497 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -08001498 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001499 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001500 return &(*messages_.begin());
1501}
1502
1503void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
1504
1505std::string LogPartsSorter::DebugString() const {
1506 std::stringstream ss;
1507 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001508 int count = 0;
1509 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001510 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001511 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
1512 ss << m << "\n";
1513 } else if (no_dots) {
1514 ss << "...\n";
1515 no_dots = false;
1516 }
1517 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001518 }
1519 ss << "] <- " << parts_message_reader_.filename();
1520 return ss.str();
1521}
1522
Austin Schuhd2f96102020-12-01 20:27:29 -08001523NodeMerger::NodeMerger(std::vector<LogParts> parts) {
1524 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -07001525 // Enforce that we are sorting things only from a single node from a single
1526 // boot.
1527 const std::string_view part0_node = parts[0].node;
1528 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -08001529 for (size_t i = 1; i < parts.size(); ++i) {
1530 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -07001531 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
1532 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -08001533 }
Austin Schuh715adc12021-06-29 22:07:39 -07001534
1535 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
1536
Austin Schuhd2f96102020-12-01 20:27:29 -08001537 for (LogParts &part : parts) {
1538 parts_sorters_.emplace_back(std::move(part));
1539 }
1540
Austin Schuhd2f96102020-12-01 20:27:29 -08001541 monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh9dc42612021-09-20 20:41:29 -07001542 realtime_start_time_ = realtime_clock::min_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001543 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001544 // We want to capture the earliest meaningful start time here. The start
1545 // time defaults to min_time when there's no meaningful value to report, so
1546 // let's ignore those.
Austin Schuh9dc42612021-09-20 20:41:29 -07001547 if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time) {
1548 bool accept = false;
1549 // We want to prioritize start times from the logger node. Really, we
1550 // want to prioritize start times with a valid realtime_clock time. So,
1551 // if we have a start time without a RT clock, prefer a start time with a
1552 // RT clock, even it if is later.
1553 if (parts_sorter.realtime_start_time() != realtime_clock::min_time) {
1554 // We've got a good one. See if the current start time has a good RT
1555 // clock, or if we should use this one instead.
1556 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1557 accept = true;
1558 } else if (realtime_start_time_ == realtime_clock::min_time) {
1559 // The previous start time doesn't have a good RT time, so it is very
1560 // likely the start time from a remote part file. We just found a
1561 // better start time with a real RT time, so switch to that instead.
1562 accept = true;
1563 }
1564 } else if (realtime_start_time_ == realtime_clock::min_time) {
1565 // We don't have a RT time, so take the oldest.
1566 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1567 accept = true;
1568 }
1569 }
1570
1571 if (accept) {
1572 monotonic_start_time_ = parts_sorter.monotonic_start_time();
1573 realtime_start_time_ = parts_sorter.realtime_start_time();
1574 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001575 }
1576 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001577
1578 // If there was no meaningful start time reported, just use min_time.
1579 if (monotonic_start_time_ == monotonic_clock::max_time) {
1580 monotonic_start_time_ = monotonic_clock::min_time;
1581 realtime_start_time_ = realtime_clock::min_time;
1582 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001583}
Austin Schuh8f52ed52020-11-30 23:12:39 -08001584
Austin Schuh0ca51f32020-12-25 21:51:45 -08001585std::vector<const LogParts *> NodeMerger::Parts() const {
1586 std::vector<const LogParts *> p;
1587 p.reserve(parts_sorters_.size());
1588 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
1589 p.emplace_back(&parts_sorter.parts());
1590 }
1591 return p;
1592}
1593
Austin Schuh8f52ed52020-11-30 23:12:39 -08001594Message *NodeMerger::Front() {
1595 // Return the current Front if we have one, otherwise go compute one.
1596 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -08001597 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001598 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -08001599 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001600 }
1601
1602 // Otherwise, do a simple search for the oldest message, deduplicating any
1603 // duplicates.
1604 Message *oldest = nullptr;
1605 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001606 for (LogPartsSorter &parts_sorter : parts_sorters_) {
1607 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -08001608 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001609 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001610 continue;
1611 }
1612 if (oldest == nullptr || *m < *oldest) {
1613 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -08001614 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001615 } else if (*m == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001616 // Found a duplicate. If there is a choice, we want the one which has
1617 // the timestamp time.
1618 if (!m->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001619 parts_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001620 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001621 current_->PopFront();
1622 current_ = &parts_sorter;
1623 oldest = m;
1624 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001625 CHECK_EQ(m->data->monotonic_timestamp_time,
1626 oldest->data->monotonic_timestamp_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001627 parts_sorter.PopFront();
1628 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001629 }
1630
1631 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -08001632 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001633 }
1634
Austin Schuhb000de62020-12-03 22:00:40 -08001635 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001636 CHECK_GE(oldest->timestamp.time, last_message_time_);
1637 last_message_time_ = oldest->timestamp.time;
Austin Schuh5dd22842021-11-17 16:09:39 -08001638 monotonic_oldest_time_ =
1639 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08001640 } else {
1641 last_message_time_ = monotonic_clock::max_time;
1642 }
1643
Austin Schuh8f52ed52020-11-30 23:12:39 -08001644 // Return the oldest message found. This will be nullptr if nothing was
1645 // found, indicating there is nothing left.
1646 return oldest;
1647}
1648
1649void NodeMerger::PopFront() {
1650 CHECK(current_ != nullptr) << "Popping before calling Front()";
1651 current_->PopFront();
1652 current_ = nullptr;
1653}
1654
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001655BootMerger::BootMerger(std::vector<LogParts> files) {
1656 std::vector<std::vector<LogParts>> boots;
1657
1658 // Now, we need to split things out by boot.
1659 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001660 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001661 if (boot_count + 1 > boots.size()) {
1662 boots.resize(boot_count + 1);
1663 }
1664 boots[boot_count].emplace_back(std::move(files[i]));
1665 }
1666
1667 node_mergers_.reserve(boots.size());
1668 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07001669 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001670 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -07001671 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001672 }
1673 node_mergers_.emplace_back(
1674 std::make_unique<NodeMerger>(std::move(boots[i])));
1675 }
1676}
1677
1678Message *BootMerger::Front() {
1679 Message *result = node_mergers_[index_]->Front();
1680
1681 if (result != nullptr) {
1682 return result;
1683 }
1684
1685 if (index_ + 1u == node_mergers_.size()) {
1686 // At the end of the last node merger, just return.
1687 return nullptr;
1688 } else {
1689 ++index_;
1690 return Front();
1691 }
1692}
1693
1694void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
1695
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001696std::vector<const LogParts *> BootMerger::Parts() const {
1697 std::vector<const LogParts *> results;
1698 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
1699 std::vector<const LogParts *> node_parts = node_merger->Parts();
1700
1701 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
1702 std::make_move_iterator(node_parts.end()));
1703 }
1704
1705 return results;
1706}
1707
Austin Schuhd2f96102020-12-01 20:27:29 -08001708TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001709 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -08001710 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001711 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001712 if (!configuration_) {
1713 configuration_ = part->config;
1714 } else {
1715 CHECK_EQ(configuration_.get(), part->config.get());
1716 }
1717 }
1718 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -08001719 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
1720 // pretty simple.
1721 if (configuration::MultiNode(config)) {
1722 nodes_data_.resize(config->nodes()->size());
1723 const Node *my_node = config->nodes()->Get(node());
1724 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
1725 const Node *node = config->nodes()->Get(node_index);
1726 NodeData *node_data = &nodes_data_[node_index];
1727 node_data->channels.resize(config->channels()->size());
1728 // We should save the channel if it is delivered to the node represented
1729 // by the NodeData, but not sent by that node. That combo means it is
1730 // forwarded.
1731 size_t channel_index = 0;
1732 node_data->any_delivered = false;
1733 for (const Channel *channel : *config->channels()) {
1734 node_data->channels[channel_index].delivered =
1735 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08001736 configuration::ChannelIsSendableOnNode(channel, my_node) &&
1737 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08001738 node_data->any_delivered = node_data->any_delivered ||
1739 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08001740 if (node_data->channels[channel_index].delivered) {
1741 const Connection *connection =
1742 configuration::ConnectionToNode(channel, node);
1743 node_data->channels[channel_index].time_to_live =
1744 chrono::nanoseconds(connection->time_to_live());
1745 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001746 ++channel_index;
1747 }
1748 }
1749
1750 for (const Channel *channel : *config->channels()) {
1751 source_node_.emplace_back(configuration::GetNodeIndex(
1752 config, channel->source_node()->string_view()));
1753 }
1754 }
1755}
1756
1757void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001758 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08001759 CHECK_NE(timestamp_mapper->node(), node());
1760 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
1761
1762 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001763 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08001764 // we could needlessly save data.
1765 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08001766 VLOG(1) << "Registering on node " << node() << " for peer node "
1767 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08001768 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
1769
1770 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07001771
1772 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001773 }
1774}
1775
Austin Schuh79b30942021-01-24 22:32:21 -08001776void TimestampMapper::QueueMessage(Message *m) {
Austin Schuh60e77942022-05-16 17:48:24 -07001777 matched_messages_.emplace_back(
1778 TimestampedMessage{.channel_index = m->channel_index,
1779 .queue_index = m->queue_index,
1780 .monotonic_event_time = m->timestamp,
1781 .realtime_event_time = m->data->realtime_sent_time,
1782 .remote_queue_index = BootQueueIndex::Invalid(),
1783 .monotonic_remote_time = BootTimestamp::min_time(),
1784 .realtime_remote_time = realtime_clock::min_time,
1785 .monotonic_timestamp_time = BootTimestamp::min_time(),
1786 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08001787}
1788
1789TimestampedMessage *TimestampMapper::Front() {
1790 // No need to fetch anything new. A previous message still exists.
1791 switch (first_message_) {
1792 case FirstMessage::kNeedsUpdate:
1793 break;
1794 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08001795 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001796 case FirstMessage::kNullptr:
1797 return nullptr;
1798 }
1799
Austin Schuh79b30942021-01-24 22:32:21 -08001800 if (matched_messages_.empty()) {
1801 if (!QueueMatched()) {
1802 first_message_ = FirstMessage::kNullptr;
1803 return nullptr;
1804 }
1805 }
1806 first_message_ = FirstMessage::kInMessage;
1807 return &matched_messages_.front();
1808}
1809
1810bool TimestampMapper::QueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08001811 if (nodes_data_.empty()) {
1812 // Simple path. We are single node, so there are no timestamps to match!
1813 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001814 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001815 if (!m) {
Austin Schuh79b30942021-01-24 22:32:21 -08001816 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001817 }
Austin Schuh79b30942021-01-24 22:32:21 -08001818 // Enqueue this message into matched_messages_ so we have a place to
1819 // associate remote timestamps, and return it.
1820 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08001821
Austin Schuh79b30942021-01-24 22:32:21 -08001822 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1823 last_message_time_ = matched_messages_.back().monotonic_event_time;
1824
1825 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001826 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08001827 timestamp_callback_(&matched_messages_.back());
1828 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001829 }
1830
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001831 // We need to only add messages to the list so they get processed for
1832 // messages which are delivered. Reuse the flow below which uses messages_
1833 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08001834 if (messages_.empty()) {
1835 if (!Queue()) {
1836 // Found nothing to add, we are out of data!
Austin Schuh79b30942021-01-24 22:32:21 -08001837 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001838 }
1839
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001840 // Now that it has been added (and cannibalized), forget about it
1841 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001842 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001843 }
1844
1845 Message *m = &(messages_.front());
1846
1847 if (source_node_[m->channel_index] == node()) {
1848 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08001849 QueueMessage(m);
1850 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1851 last_message_time_ = matched_messages_.back().monotonic_event_time;
1852 messages_.pop_front();
1853 timestamp_callback_(&matched_messages_.back());
1854 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001855 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001856 // Got a timestamp, find the matching remote data, match it, and return
1857 // it.
Austin Schuhd2f96102020-12-01 20:27:29 -08001858 Message data = MatchingMessageFor(*m);
1859
1860 // Return the data from the remote. The local message only has timestamp
1861 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08001862 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08001863 .channel_index = m->channel_index,
1864 .queue_index = m->queue_index,
1865 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001866 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07001867 .remote_queue_index =
1868 BootQueueIndex{.boot = m->monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001869 .index = m->data->remote_queue_index.value()},
1870 .monotonic_remote_time = {m->monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08001871 m->data->monotonic_remote_time.value()},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001872 .realtime_remote_time = m->data->realtime_remote_time.value(),
1873 .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
1874 m->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08001875 .data = std::move(data.data)});
1876 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1877 last_message_time_ = matched_messages_.back().monotonic_event_time;
1878 // Since messages_ holds the data, drop it.
1879 messages_.pop_front();
1880 timestamp_callback_(&matched_messages_.back());
1881 return true;
1882 }
1883}
1884
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001885void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08001886 while (last_message_time_ <= queue_time) {
1887 if (!QueueMatched()) {
1888 return;
1889 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001890 }
1891}
1892
Austin Schuhe639ea12021-01-25 13:00:22 -08001893void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001894 // Note: queueing for time doesn't really work well across boots. So we
1895 // just assume that if you are using this, you only care about the current
1896 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001897 //
1898 // TODO(austin): Is that the right concept?
1899 //
Austin Schuhe639ea12021-01-25 13:00:22 -08001900 // Make sure we have something queued first. This makes the end time
1901 // calculation simpler, and is typically what folks want regardless.
1902 if (matched_messages_.empty()) {
1903 if (!QueueMatched()) {
1904 return;
1905 }
1906 }
1907
1908 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001909 std::max(monotonic_start_time(
1910 matched_messages_.front().monotonic_event_time.boot),
1911 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08001912 time_estimation_buffer;
1913
1914 // Place sorted messages on the list until we have
1915 // --time_estimation_buffer_seconds seconds queued up (but queue at least
1916 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001917 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08001918 if (!QueueMatched()) {
1919 return;
1920 }
1921 }
1922}
1923
Austin Schuhd2f96102020-12-01 20:27:29 -08001924void TimestampMapper::PopFront() {
1925 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08001926 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001927 first_message_ = FirstMessage::kNeedsUpdate;
1928
Austin Schuh79b30942021-01-24 22:32:21 -08001929 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001930}
1931
1932Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001933 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001934 CHECK_NOTNULL(message.data);
1935 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07001936 const BootQueueIndex remote_queue_index =
1937 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001938 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08001939
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001940 CHECK(message.data->monotonic_remote_time.has_value());
1941 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08001942
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001943 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07001944 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08001945 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001946 const realtime_clock::time_point realtime_remote_time =
1947 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001948
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001949 TimestampMapper *peer =
1950 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001951
1952 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001953 // asked to pull a timestamp from a peer which doesn't exist, return an
1954 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08001955 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001956 // TODO(austin): Make sure the tests hit all these paths with a boot count
1957 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001958 return Message{.channel_index = message.channel_index,
1959 .queue_index = remote_queue_index,
1960 .timestamp = monotonic_remote_time,
1961 .monotonic_remote_boot = 0xffffff,
1962 .monotonic_timestamp_boot = 0xffffff,
1963 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08001964 }
1965
1966 // The queue which will have the matching data, if available.
1967 std::deque<Message> *data_queue =
1968 &peer->nodes_data_[node()].channels[message.channel_index].messages;
1969
Austin Schuh79b30942021-01-24 22:32:21 -08001970 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08001971
1972 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001973 return Message{.channel_index = message.channel_index,
1974 .queue_index = remote_queue_index,
1975 .timestamp = monotonic_remote_time,
1976 .monotonic_remote_boot = 0xffffff,
1977 .monotonic_timestamp_boot = 0xffffff,
1978 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08001979 }
1980
Austin Schuhd2f96102020-12-01 20:27:29 -08001981 if (remote_queue_index < data_queue->front().queue_index ||
1982 remote_queue_index > data_queue->back().queue_index) {
Austin Schuh60e77942022-05-16 17:48:24 -07001983 return Message{.channel_index = message.channel_index,
1984 .queue_index = remote_queue_index,
1985 .timestamp = monotonic_remote_time,
1986 .monotonic_remote_boot = 0xffffff,
1987 .monotonic_timestamp_boot = 0xffffff,
1988 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08001989 }
1990
Austin Schuh993ccb52020-12-12 15:59:32 -08001991 // The algorithm below is constant time with some assumptions. We need there
1992 // to be no missing messages in the data stream. This also assumes a queue
1993 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07001994 if (data_queue->back().queue_index.boot ==
1995 data_queue->front().queue_index.boot &&
1996 (data_queue->back().queue_index.index -
1997 data_queue->front().queue_index.index + 1u ==
1998 data_queue->size())) {
1999 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08002000 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07002001 //
2002 // TODO(austin): Move if not reliable.
2003 Message result = (*data_queue)[remote_queue_index.index -
2004 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08002005
2006 CHECK_EQ(result.timestamp, monotonic_remote_time)
2007 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh6a7358f2021-11-18 22:40:40 -08002008 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08002009 << ": Queue index matches, but timestamp doesn't. Please investigate!";
2010 // Now drop the data off the front. We have deduplicated timestamps, so we
2011 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07002012 data_queue->erase(
2013 data_queue->begin(),
2014 data_queue->begin() +
2015 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08002016 return result;
2017 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07002018 // TODO(austin): Binary search.
2019 auto it = std::find_if(
2020 data_queue->begin(), data_queue->end(),
2021 [remote_queue_index,
2022 remote_boot = monotonic_remote_time.boot](const Message &m) {
2023 return m.queue_index == remote_queue_index &&
2024 m.timestamp.boot == remote_boot;
2025 });
Austin Schuh993ccb52020-12-12 15:59:32 -08002026 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002027 return Message{.channel_index = message.channel_index,
2028 .queue_index = remote_queue_index,
2029 .timestamp = monotonic_remote_time,
2030 .monotonic_remote_boot = 0xffffff,
2031 .monotonic_timestamp_boot = 0xffffff,
2032 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08002033 }
2034
2035 Message result = std::move(*it);
2036
2037 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002038 << ": Queue index matches, but timestamp doesn't. Please "
2039 "investigate!";
2040 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
2041 << ": Queue index matches, but timestamp doesn't. Please "
2042 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08002043
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08002044 // Erase everything up to this message. We want to keep 1 message in the
2045 // queue so we can handle reliable messages forwarded across boots.
2046 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08002047
2048 return result;
2049 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002050}
2051
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002052void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002053 if (queued_until_ > t) {
2054 return;
2055 }
2056 while (true) {
2057 if (!messages_.empty() && messages_.back().timestamp > t) {
2058 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
2059 return;
2060 }
2061
2062 if (!Queue()) {
2063 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002064 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08002065 return;
2066 }
2067
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002068 // Now that it has been added (and cannibalized), forget about it
2069 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002070 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002071 }
2072}
2073
2074bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002075 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002076 if (m == nullptr) {
2077 return false;
2078 }
2079 for (NodeData &node_data : nodes_data_) {
2080 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07002081 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08002082 if (node_data.channels[m->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08002083 // If we have data but no timestamps (logs where the timestamps didn't get
2084 // logged are classic), we can grow this indefinitely. We don't need to
2085 // keep anything that is older than the last message returned.
2086
2087 // We have the time on the source node.
2088 // We care to wait until we have the time on the destination node.
2089 std::deque<Message> &messages =
2090 node_data.channels[m->channel_index].messages;
2091 // Max delay over the network is the TTL, so let's take the queue time and
2092 // add TTL to it. Don't forget any messages which are reliable until
2093 // someone can come up with a good reason to forget those too.
2094 if (node_data.channels[m->channel_index].time_to_live >
2095 chrono::nanoseconds(0)) {
2096 // We need to make *some* assumptions about network delay for this to
2097 // work. We want to only look at the RX side. This means we need to
2098 // track the last time a message was popped from any channel from the
2099 // node sending this message, and compare that to the max time we expect
2100 // that a message will take to be delivered across the network. This
2101 // assumes that messages are popped in time order as a proxy for
2102 // measuring the distributed time at this layer.
2103 //
2104 // Leave at least 1 message in here so we can handle reboots and
2105 // messages getting sent twice.
2106 while (messages.size() > 1u &&
2107 messages.begin()->timestamp +
2108 node_data.channels[m->channel_index].time_to_live +
2109 chrono::duration_cast<chrono::nanoseconds>(
2110 chrono::duration<double>(FLAGS_max_network_delay)) <
2111 last_popped_message_time_) {
2112 messages.pop_front();
2113 }
2114 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002115 node_data.channels[m->channel_index].messages.emplace_back(*m);
2116 }
2117 }
2118
2119 messages_.emplace_back(std::move(*m));
2120 return true;
2121}
2122
2123std::string TimestampMapper::DebugString() const {
2124 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07002125 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002126 for (const Message &message : messages_) {
2127 ss << " " << message << "\n";
2128 }
2129 ss << "] queued_until " << queued_until_;
2130 for (const NodeData &ns : nodes_data_) {
2131 if (ns.peer == nullptr) continue;
2132 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
2133 size_t channel_index = 0;
2134 for (const NodeData::ChannelData &channel_data :
2135 ns.peer->nodes_data_[node()].channels) {
2136 if (channel_data.messages.empty()) {
2137 continue;
2138 }
Austin Schuhb000de62020-12-03 22:00:40 -08002139
Austin Schuhd2f96102020-12-01 20:27:29 -08002140 ss << " channel " << channel_index << " [\n";
2141 for (const Message &m : channel_data.messages) {
2142 ss << " " << m << "\n";
2143 }
2144 ss << " ]\n";
2145 ++channel_index;
2146 }
2147 ss << "] queued_until " << ns.peer->queued_until_;
2148 }
2149 return ss.str();
2150}
2151
Austin Schuhee711052020-08-24 16:06:09 -07002152std::string MaybeNodeName(const Node *node) {
2153 if (node != nullptr) {
2154 return node->name()->str() + " ";
2155 }
2156 return "";
2157}
2158
Brian Silvermanf51499a2020-09-21 12:49:08 -07002159} // namespace aos::logger