blob: 4c06f0aa0751d1657a179e0107c1dd4a6cd52366 [file] [log] [blame]
Austin Schuha36c8902019-12-30 18:07:15 -08001#include "aos/events/logging/logfile_utils.h"
2
3#include <fcntl.h>
Austin Schuha36c8902019-12-30 18:07:15 -08004#include <sys/stat.h>
5#include <sys/types.h>
6#include <sys/uio.h>
7
Brian Silvermanf51499a2020-09-21 12:49:08 -07008#include <algorithm>
9#include <climits>
Austin Schuha36c8902019-12-30 18:07:15 -080010
Austin Schuhe4fca832020-03-07 16:58:53 -080011#include "absl/strings/escaping.h"
Austin Schuh05b70472020-01-01 17:11:17 -080012#include "aos/configuration.h"
James Kuszmauldd0a5042021-10-28 23:38:04 -070013#include "aos/events/logging/snappy_encoder.h"
Austin Schuhfa895892020-01-07 20:07:41 -080014#include "aos/flatbuffer_merge.h"
Austin Schuh6f3babe2020-01-26 20:34:50 -080015#include "aos/util/file.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "flatbuffers/flatbuffers.h"
Austin Schuh05b70472020-01-01 17:11:17 -080017#include "gflags/gflags.h"
18#include "glog/logging.h"
Austin Schuha36c8902019-12-30 18:07:15 -080019
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070020#if defined(__x86_64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070021#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070022#elif defined(__aarch64__)
Tyler Chatow2015bc62021-08-04 21:15:09 -070023#define ENABLE_LZMA (!__has_feature(memory_sanitizer))
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070024#else
25#define ENABLE_LZMA 0
26#endif
27
28#if ENABLE_LZMA
29#include "aos/events/logging/lzma_encoder.h"
30#endif
31
Austin Schuh48d10d62022-10-16 22:19:23 -070032DEFINE_int32(flush_size, 128 * 1024,
Austin Schuha36c8902019-12-30 18:07:15 -080033 "Number of outstanding bytes to allow before flushing to disk.");
Austin Schuhbd06ae42021-03-31 22:48:21 -070034DEFINE_double(
35 flush_period, 5.0,
36 "Max time to let data sit in the queue before flushing in seconds.");
Austin Schuha36c8902019-12-30 18:07:15 -080037
Austin Schuha040c3f2021-02-13 16:09:07 -080038DEFINE_double(
Austin Schuh6a7358f2021-11-18 22:40:40 -080039 max_network_delay, 1.0,
40 "Max time to assume a message takes to cross the network before we are "
41 "willing to drop it from our buffers and assume it didn't make it. "
42 "Increasing this number can increase memory usage depending on the packet "
43 "loss of your network or if the timestamps aren't logged for a message.");
44
45DEFINE_double(
Austin Schuha040c3f2021-02-13 16:09:07 -080046 max_out_of_order, -1,
47 "If set, this overrides the max out of order duration for a log file.");
48
Austin Schuh0e8db662021-07-06 10:43:47 -070049DEFINE_bool(workaround_double_headers, true,
50 "Some old log files have two headers at the beginning. Use the "
51 "last header as the actual header.");
52
Brian Smarttea913d42021-12-10 15:02:38 -080053DEFINE_bool(crash_on_corrupt_message, true,
54 "When true, MessageReader will crash the first time a message "
55 "with corrupted format is found. When false, the crash will be "
56 "suppressed, and any remaining readable messages will be "
57 "evaluated to present verified vs corrupted stats.");
58
59DEFINE_bool(ignore_corrupt_messages, false,
60 "When true, and crash_on_corrupt_message is false, then any "
61 "corrupt message found by MessageReader be silently ignored, "
62 "providing access to all uncorrupted messages in a logfile.");
63
Brian Silvermanf51499a2020-09-21 12:49:08 -070064namespace aos::logger {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070065namespace {
Austin Schuha36c8902019-12-30 18:07:15 -080066
Austin Schuh05b70472020-01-01 17:11:17 -080067namespace chrono = std::chrono;
68
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070069template <typename T>
70void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
71 if (t.has_value()) {
72 *os << *t;
73 } else {
74 *os << "null";
75 }
76}
77} // namespace
78
Austin Schuh48d10d62022-10-16 22:19:23 -070079DetachedBufferWriter::DetachedBufferWriter(std::string_view filename,
80 std::unique_ptr<DataEncoder> encoder)
Brian Silvermanf51499a2020-09-21 12:49:08 -070081 : filename_(filename), encoder_(std::move(encoder)) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -070082 if (!util::MkdirPIfSpace(filename, 0777)) {
83 ran_out_of_space_ = true;
84 } else {
Austin Schuh48d10d62022-10-16 22:19:23 -070085 fd_ = open(filename_.c_str(),
Brian Silvermana9f2ec92020-10-06 18:00:53 -070086 O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
87 if (fd_ == -1 && errno == ENOSPC) {
88 ran_out_of_space_ = true;
89 } else {
Austin Schuh58646e22021-08-23 23:51:46 -070090 PCHECK(fd_ != -1) << ": Failed to open " << this->filename()
91 << " for writing";
92 VLOG(1) << "Opened " << this->filename() << " for writing";
Brian Silvermana9f2ec92020-10-06 18:00:53 -070093 }
94 }
Austin Schuha36c8902019-12-30 18:07:15 -080095}
96
97DetachedBufferWriter::~DetachedBufferWriter() {
Brian Silverman0465fcf2020-09-24 00:29:18 -070098 Close();
99 if (ran_out_of_space_) {
100 CHECK(acknowledge_ran_out_of_space_)
101 << ": Unacknowledged out of disk space, log file was not completed";
Brian Silvermanf51499a2020-09-21 12:49:08 -0700102 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700103}
104
Brian Silvermand90905f2020-09-23 14:42:56 -0700105DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700106 *this = std::move(other);
107}
108
Brian Silverman87ac0402020-09-17 14:47:01 -0700109// When other is destroyed "soon" (which it should be because we're getting an
110// rvalue reference to it), it will flush etc all the data we have queued up
111// (because that data will then be its data).
Austin Schuh2f8fd752020-09-01 22:38:28 -0700112DetachedBufferWriter &DetachedBufferWriter::operator=(
113 DetachedBufferWriter &&other) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700114 std::swap(filename_, other.filename_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700115 std::swap(encoder_, other.encoder_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700116 std::swap(fd_, other.fd_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700117 std::swap(ran_out_of_space_, other.ran_out_of_space_);
118 std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700119 std::swap(iovec_, other.iovec_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700120 std::swap(max_write_time_, other.max_write_time_);
121 std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
122 std::swap(max_write_time_messages_, other.max_write_time_messages_);
123 std::swap(total_write_time_, other.total_write_time_);
124 std::swap(total_write_count_, other.total_write_count_);
125 std::swap(total_write_messages_, other.total_write_messages_);
126 std::swap(total_write_bytes_, other.total_write_bytes_);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700127 return *this;
Austin Schuha36c8902019-12-30 18:07:15 -0800128}
129
Brian Silvermanf51499a2020-09-21 12:49:08 -0700130void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700131 if (ran_out_of_space_) {
132 // We don't want any later data to be written after space becomes
133 // available, so refuse to write anything more once we've dropped data
134 // because we ran out of space.
135 VLOG(1) << "Ignoring span: " << span.size();
136 return;
137 }
138
Austin Schuh48d10d62022-10-16 22:19:23 -0700139 if (!encoder_->HasSpace(span.size())) {
140 Flush();
141 CHECK(encoder_->HasSpace(span.size()));
142 }
143 DataEncoder::SpanCopier coppier(span);
144 encoder_->Encode(&coppier);
145 FlushAtThreshold(aos::monotonic_clock::now());
146}
Austin Schuha36c8902019-12-30 18:07:15 -0800147
Austin Schuh48d10d62022-10-16 22:19:23 -0700148void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *coppier,
149 aos::monotonic_clock::time_point now) {
150 if (ran_out_of_space_) {
151 return;
Austin Schuha36c8902019-12-30 18:07:15 -0800152 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700153
Austin Schuh48d10d62022-10-16 22:19:23 -0700154 if (!encoder_->HasSpace(coppier->size())) {
155 Flush();
156 CHECK(encoder_->HasSpace(coppier->size()));
157 }
158
159 encoder_->Encode(coppier);
Austin Schuhbd06ae42021-03-31 22:48:21 -0700160 FlushAtThreshold(now);
Austin Schuha36c8902019-12-30 18:07:15 -0800161}
162
Brian Silverman0465fcf2020-09-24 00:29:18 -0700163void DetachedBufferWriter::Close() {
164 if (fd_ == -1) {
165 return;
166 }
167 encoder_->Finish();
168 while (encoder_->queue_size() > 0) {
169 Flush();
170 }
171 if (close(fd_) == -1) {
172 if (errno == ENOSPC) {
173 ran_out_of_space_ = true;
174 } else {
175 PLOG(ERROR) << "Closing log file failed";
176 }
177 }
178 fd_ = -1;
Austin Schuh58646e22021-08-23 23:51:46 -0700179 VLOG(1) << "Closed " << filename();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700180}
181
Austin Schuha36c8902019-12-30 18:07:15 -0800182void DetachedBufferWriter::Flush() {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700183 if (ran_out_of_space_) {
184 // We don't want any later data to be written after space becomes available,
185 // so refuse to write anything more once we've dropped data because we ran
186 // out of space.
Austin Schuha426f1f2021-03-31 22:27:41 -0700187 if (encoder_) {
188 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
189 encoder_->Clear(encoder_->queue().size());
190 } else {
191 VLOG(1) << "No queue to ignore";
192 }
193 return;
194 }
195
196 const auto queue = encoder_->queue();
197 if (queue.empty()) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700198 return;
199 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700200
Austin Schuha36c8902019-12-30 18:07:15 -0800201 iovec_.clear();
Brian Silvermanf51499a2020-09-21 12:49:08 -0700202 const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
203 iovec_.resize(iovec_size);
Austin Schuha36c8902019-12-30 18:07:15 -0800204 size_t counted_size = 0;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700205 for (size_t i = 0; i < iovec_size; ++i) {
206 iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
207 iovec_[i].iov_len = queue[i].size();
208 counted_size += iovec_[i].iov_len;
Austin Schuha36c8902019-12-30 18:07:15 -0800209 }
Brian Silvermanf51499a2020-09-21 12:49:08 -0700210
211 const auto start = aos::monotonic_clock::now();
Austin Schuha36c8902019-12-30 18:07:15 -0800212 const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
Brian Silvermanf51499a2020-09-21 12:49:08 -0700213 const auto end = aos::monotonic_clock::now();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700214 HandleWriteReturn(written, counted_size);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700215
216 encoder_->Clear(iovec_size);
217
218 UpdateStatsForWrite(end - start, written, iovec_size);
219}
220
Brian Silverman0465fcf2020-09-24 00:29:18 -0700221void DetachedBufferWriter::HandleWriteReturn(ssize_t write_return,
222 size_t write_size) {
223 if (write_return == -1 && errno == ENOSPC) {
224 ran_out_of_space_ = true;
225 return;
226 }
227 PCHECK(write_return >= 0) << ": write failed";
228 if (write_return < static_cast<ssize_t>(write_size)) {
229 // Sometimes this happens instead of ENOSPC. On a real filesystem, this
230 // never seems to happen in any other case. If we ever want to log to a
231 // socket, this will happen more often. However, until we get there, we'll
232 // just assume it means we ran out of space.
233 ran_out_of_space_ = true;
234 return;
235 }
236}
237
Brian Silvermanf51499a2020-09-21 12:49:08 -0700238void DetachedBufferWriter::UpdateStatsForWrite(
239 aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
240 if (duration > max_write_time_) {
241 max_write_time_ = duration;
242 max_write_time_bytes_ = written;
243 max_write_time_messages_ = iovec_size;
244 }
245 total_write_time_ += duration;
246 ++total_write_count_;
247 total_write_messages_ += iovec_size;
248 total_write_bytes_ += written;
249}
250
Austin Schuhbd06ae42021-03-31 22:48:21 -0700251void DetachedBufferWriter::FlushAtThreshold(
252 aos::monotonic_clock::time_point now) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700253 if (ran_out_of_space_) {
254 // We don't want any later data to be written after space becomes available,
255 // so refuse to write anything more once we've dropped data because we ran
256 // out of space.
257 if (encoder_) {
258 VLOG(1) << "Ignoring queue: " << encoder_->queue().size();
259 encoder_->Clear(encoder_->queue().size());
260 } else {
261 VLOG(1) << "No queue to ignore";
262 }
263 return;
264 }
265
Austin Schuhbd06ae42021-03-31 22:48:21 -0700266 // We don't want to flush the first time through. Otherwise we will flush as
267 // the log file header might be compressing, defeating any parallelism and
268 // queueing there.
269 if (last_flush_time_ == aos::monotonic_clock::min_time) {
270 last_flush_time_ = now;
271 }
272
Brian Silvermanf51499a2020-09-21 12:49:08 -0700273 // Flush if we are at the max number of iovs per writev, because there's no
274 // point queueing up any more data in memory. Also flush once we have enough
Austin Schuhbd06ae42021-03-31 22:48:21 -0700275 // data queued up or if it has been long enough.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700276 while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
Austin Schuhbd06ae42021-03-31 22:48:21 -0700277 encoder_->queue_size() >= IOV_MAX ||
278 now > last_flush_time_ +
279 chrono::duration_cast<chrono::nanoseconds>(
280 chrono::duration<double>(FLAGS_flush_period))) {
281 last_flush_time_ = now;
Brian Silvermanf51499a2020-09-21 12:49:08 -0700282 Flush();
283 }
Austin Schuha36c8902019-12-30 18:07:15 -0800284}
285
Austin Schuhf2d0e682022-10-16 14:20:58 -0700286// Do the magic dance to convert the endianness of the data and append it to the
287// buffer.
288namespace {
289
290// TODO(austin): Look at the generated code to see if building the header is
291// efficient or not.
292template <typename T>
293uint8_t *Push(uint8_t *buffer, const T data) {
294 const T endian_data = flatbuffers::EndianScalar<T>(data);
295 std::memcpy(buffer, &endian_data, sizeof(T));
296 return buffer + sizeof(T);
297}
298
299uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) {
300 std::memcpy(buffer, data, size);
301 return buffer + size;
302}
303
304uint8_t *Pad(uint8_t *buffer, size_t padding) {
305 std::memset(buffer, 0, padding);
306 return buffer + padding;
307}
308} // namespace
309
310flatbuffers::Offset<MessageHeader> PackRemoteMessage(
311 flatbuffers::FlatBufferBuilder *fbb,
312 const message_bridge::RemoteMessage *msg, int channel_index,
313 const aos::monotonic_clock::time_point monotonic_timestamp_time) {
314 logger::MessageHeader::Builder message_header_builder(*fbb);
315 // Note: this must match the same order as MessageBridgeServer and
316 // PackMessage. We want identical headers to have identical
317 // on-the-wire formats to make comparing them easier.
318
319 message_header_builder.add_channel_index(channel_index);
320
321 message_header_builder.add_queue_index(msg->queue_index());
322 message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time());
323 message_header_builder.add_realtime_sent_time(msg->realtime_sent_time());
324
325 message_header_builder.add_monotonic_remote_time(
326 msg->monotonic_remote_time());
327 message_header_builder.add_realtime_remote_time(msg->realtime_remote_time());
328 message_header_builder.add_remote_queue_index(msg->remote_queue_index());
329
330 message_header_builder.add_monotonic_timestamp_time(
331 monotonic_timestamp_time.time_since_epoch().count());
332
333 return message_header_builder.Finish();
334}
335
336size_t PackRemoteMessageInline(
337 uint8_t *buffer, const message_bridge::RemoteMessage *msg,
338 int channel_index,
339 const aos::monotonic_clock::time_point monotonic_timestamp_time) {
340 const flatbuffers::uoffset_t message_size = PackRemoteMessageSize();
341
342 // clang-format off
343 // header:
344 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
345 buffer = Push<flatbuffers::uoffset_t>(
346 buffer, message_size - sizeof(flatbuffers::uoffset_t));
347 // +0x04 | 20 00 00 00 | UOffset32 | 0x00000020 (32) Loc: +0x24 | offset to root table `aos.logger.MessageHeader`
348 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x20);
349 //
350 // padding:
351 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
352 buffer = Pad(buffer, 6);
353 //
354 // vtable (aos.logger.MessageHeader):
355 // +0x0E | 16 00 | uint16_t | 0x0016 (22) | size of this vtable
356 buffer = Push<flatbuffers::voffset_t>(buffer, 0x16);
357 // +0x10 | 3C 00 | uint16_t | 0x003C (60) | size of referring table
358 buffer = Push<flatbuffers::voffset_t>(buffer, 0x3c);
359 // +0x12 | 38 00 | VOffset16 | 0x0038 (56) | offset to field `channel_index` (id: 0)
360 buffer = Push<flatbuffers::voffset_t>(buffer, 0x38);
361 // +0x14 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `monotonic_sent_time` (id: 1)
362 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
363 // +0x16 | 24 00 | VOffset16 | 0x0024 (36) | offset to field `realtime_sent_time` (id: 2)
364 buffer = Push<flatbuffers::voffset_t>(buffer, 0x24);
365 // +0x18 | 34 00 | VOffset16 | 0x0034 (52) | offset to field `queue_index` (id: 3)
366 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
367 // +0x1A | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
368 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
369 // +0x1C | 1C 00 | VOffset16 | 0x001C (28) | offset to field `monotonic_remote_time` (id: 5)
370 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
371 // +0x1E | 14 00 | VOffset16 | 0x0014 (20) | offset to field `realtime_remote_time` (id: 6)
372 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
373 // +0x20 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `remote_queue_index` (id: 7)
374 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
375 // +0x22 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `monotonic_timestamp_time` (id: 8)
376 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
377 //
378 // root_table (aos.logger.MessageHeader):
379 // +0x24 | 16 00 00 00 | SOffset32 | 0x00000016 (22) Loc: +0x0E | offset to vtable
380 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x16);
381 // +0x28 | F6 0B D8 11 A4 A8 B1 71 | int64_t | 0x71B1A8A411D80BF6 (8192514619791117302) | table field `monotonic_timestamp_time` (Long)
382 buffer = Push<int64_t>(buffer,
383 monotonic_timestamp_time.time_since_epoch().count());
384 // +0x30 | 00 00 00 00 | uint8_t[4] | .... | padding
385 // TODO(austin): Can we re-arrange the order to ditch the padding?
386 // (Answer is yes, but what is the impact elsewhere? It will change the
387 // binary format)
388 buffer = Pad(buffer, 4);
389 // +0x34 | 75 00 00 00 | uint32_t | 0x00000075 (117) | table field `remote_queue_index` (UInt)
390 buffer = Push<uint32_t>(buffer, msg->remote_queue_index());
391 // +0x38 | AA B0 43 0A 35 BE FA D2 | int64_t | 0xD2FABE350A43B0AA (-3244071446552268630) | table field `realtime_remote_time` (Long)
392 buffer = Push<int64_t>(buffer, msg->realtime_remote_time());
393 // +0x40 | D5 40 30 F3 C1 A7 26 1D | int64_t | 0x1D26A7C1F33040D5 (2100550727665467605) | table field `monotonic_remote_time` (Long)
394 buffer = Push<int64_t>(buffer, msg->monotonic_remote_time());
395 // +0x48 | 5B 25 32 A1 4A E8 46 CA | int64_t | 0xCA46E84AA132255B (-3871151422448720549) | table field `realtime_sent_time` (Long)
396 buffer = Push<int64_t>(buffer, msg->realtime_sent_time());
397 // +0x50 | 49 7D 45 1F 8C 36 6B A3 | int64_t | 0xA36B368C1F457D49 (-6671178447571288759) | table field `monotonic_sent_time` (Long)
398 buffer = Push<int64_t>(buffer, msg->monotonic_sent_time());
399 // +0x58 | 33 00 00 00 | uint32_t | 0x00000033 (51) | table field `queue_index` (UInt)
400 buffer = Push<uint32_t>(buffer, msg->queue_index());
401 // +0x5C | 76 00 00 00 | uint32_t | 0x00000076 (118) | table field `channel_index` (UInt)
402 buffer = Push<uint32_t>(buffer, channel_index);
403 // clang-format on
404
405 return message_size;
406}
407
Austin Schuha36c8902019-12-30 18:07:15 -0800408flatbuffers::Offset<MessageHeader> PackMessage(
409 flatbuffers::FlatBufferBuilder *fbb, const Context &context,
410 int channel_index, LogType log_type) {
411 flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
412
413 switch (log_type) {
414 case LogType::kLogMessage:
415 case LogType::kLogMessageAndDeliveryTime:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800416 case LogType::kLogRemoteMessage:
Austin Schuhfa30c352022-10-16 11:12:02 -0700417 // Since the timestamps are 8 byte aligned, we are going to end up adding
418 // padding in the middle of the message to pad everything out to 8 byte
419 // alignment. That's rather wasteful. To make things efficient to mmap
420 // while reading uncompressed logs, we'd actually rather the message be
421 // aligned. So, force 8 byte alignment (enough to preserve alignment
422 // inside the nested message so that we can read it without moving it)
423 // here.
424 fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8);
Brian Silvermaneaa41d62020-07-08 19:47:35 -0700425 data_offset = fbb->CreateVector(
426 static_cast<const uint8_t *>(context.data), context.size);
Austin Schuha36c8902019-12-30 18:07:15 -0800427 break;
428
429 case LogType::kLogDeliveryTimeOnly:
430 break;
431 }
432
433 MessageHeader::Builder message_header_builder(*fbb);
434 message_header_builder.add_channel_index(channel_index);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800435
Austin Schuhfa30c352022-10-16 11:12:02 -0700436 // These are split out into very explicit serialization calls because the
437 // order here changes the order things are written out on the wire, and we
438 // want to control and understand it here. Changing the order can increase
439 // the amount of padding bytes in the middle.
440 //
441 // It is also easier to follow... And doesn't actually make things much bigger.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800442 switch (log_type) {
443 case LogType::kLogRemoteMessage:
444 message_header_builder.add_queue_index(context.remote_queue_index);
Austin Schuhfa30c352022-10-16 11:12:02 -0700445 message_header_builder.add_data(data_offset);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800446 message_header_builder.add_monotonic_sent_time(
447 context.monotonic_remote_time.time_since_epoch().count());
448 message_header_builder.add_realtime_sent_time(
449 context.realtime_remote_time.time_since_epoch().count());
450 break;
451
Austin Schuh6f3babe2020-01-26 20:34:50 -0800452 case LogType::kLogDeliveryTimeOnly:
453 message_header_builder.add_queue_index(context.queue_index);
454 message_header_builder.add_monotonic_sent_time(
455 context.monotonic_event_time.time_since_epoch().count());
456 message_header_builder.add_realtime_sent_time(
457 context.realtime_event_time.time_since_epoch().count());
Austin Schuha36c8902019-12-30 18:07:15 -0800458 message_header_builder.add_monotonic_remote_time(
459 context.monotonic_remote_time.time_since_epoch().count());
460 message_header_builder.add_realtime_remote_time(
461 context.realtime_remote_time.time_since_epoch().count());
462 message_header_builder.add_remote_queue_index(context.remote_queue_index);
463 break;
Austin Schuhfa30c352022-10-16 11:12:02 -0700464
465 case LogType::kLogMessage:
466 message_header_builder.add_queue_index(context.queue_index);
467 message_header_builder.add_data(data_offset);
468 message_header_builder.add_monotonic_sent_time(
469 context.monotonic_event_time.time_since_epoch().count());
470 message_header_builder.add_realtime_sent_time(
471 context.realtime_event_time.time_since_epoch().count());
472 break;
473
474 case LogType::kLogMessageAndDeliveryTime:
475 message_header_builder.add_queue_index(context.queue_index);
476 message_header_builder.add_remote_queue_index(context.remote_queue_index);
477 message_header_builder.add_monotonic_sent_time(
478 context.monotonic_event_time.time_since_epoch().count());
479 message_header_builder.add_realtime_sent_time(
480 context.realtime_event_time.time_since_epoch().count());
481 message_header_builder.add_monotonic_remote_time(
482 context.monotonic_remote_time.time_since_epoch().count());
483 message_header_builder.add_realtime_remote_time(
484 context.realtime_remote_time.time_since_epoch().count());
485 message_header_builder.add_data(data_offset);
486 break;
Austin Schuha36c8902019-12-30 18:07:15 -0800487 }
488
489 return message_header_builder.Finish();
490}
491
Austin Schuhfa30c352022-10-16 11:12:02 -0700492flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) {
493 switch (log_type) {
494 case LogType::kLogMessage:
495 return
496 // Root table size + offset.
497 sizeof(flatbuffers::uoffset_t) * 2 +
498 // 6 padding bytes to pad the header out properly.
499 6 +
500 // vtable header (size + size of table)
501 sizeof(flatbuffers::voffset_t) * 2 +
502 // offsets to all the fields.
503 sizeof(flatbuffers::voffset_t) * 5 +
504 // pointer to vtable
505 sizeof(flatbuffers::soffset_t) +
506 // pointer to data
507 sizeof(flatbuffers::uoffset_t) +
508 // realtime_sent_time, monotonic_sent_time
509 sizeof(int64_t) * 2 +
510 // queue_index, channel_index
511 sizeof(uint32_t) * 2;
512
513 case LogType::kLogDeliveryTimeOnly:
514 return
515 // Root table size + offset.
516 sizeof(flatbuffers::uoffset_t) * 2 +
517 // 6 padding bytes to pad the header out properly.
518 4 +
519 // vtable header (size + size of table)
520 sizeof(flatbuffers::voffset_t) * 2 +
521 // offsets to all the fields.
522 sizeof(flatbuffers::voffset_t) * 8 +
523 // pointer to vtable
524 sizeof(flatbuffers::soffset_t) +
525 // remote_queue_index
526 sizeof(uint32_t) +
527 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
528 // monotonic_sent_time
529 sizeof(int64_t) * 4 +
530 // queue_index, channel_index
531 sizeof(uint32_t) * 2;
532
533 case LogType::kLogMessageAndDeliveryTime:
534 return
535 // Root table size + offset.
536 sizeof(flatbuffers::uoffset_t) * 2 +
537 // 4 padding bytes to pad the header out properly.
538 4 +
539 // vtable header (size + size of table)
540 sizeof(flatbuffers::voffset_t) * 2 +
541 // offsets to all the fields.
542 sizeof(flatbuffers::voffset_t) * 8 +
543 // pointer to vtable
544 sizeof(flatbuffers::soffset_t) +
545 // pointer to data
546 sizeof(flatbuffers::uoffset_t) +
547 // realtime_remote_time, monotonic_remote_time, realtime_sent_time,
548 // monotonic_sent_time
549 sizeof(int64_t) * 4 +
550 // remote_queue_index, queue_index, channel_index
551 sizeof(uint32_t) * 3;
552
553 case LogType::kLogRemoteMessage:
554 return
555 // Root table size + offset.
556 sizeof(flatbuffers::uoffset_t) * 2 +
557 // 6 padding bytes to pad the header out properly.
558 6 +
559 // vtable header (size + size of table)
560 sizeof(flatbuffers::voffset_t) * 2 +
561 // offsets to all the fields.
562 sizeof(flatbuffers::voffset_t) * 5 +
563 // pointer to vtable
564 sizeof(flatbuffers::soffset_t) +
565 // realtime_sent_time, monotonic_sent_time
566 sizeof(int64_t) * 2 +
567 // pointer to data
568 sizeof(flatbuffers::uoffset_t) +
569 // queue_index, channel_index
570 sizeof(uint32_t) * 2;
571 }
572 LOG(FATAL);
573}
574
575flatbuffers::uoffset_t PackMessageSize(LogType log_type,
Austin Schuh48d10d62022-10-16 22:19:23 -0700576 size_t data_size) {
Austin Schuhfa30c352022-10-16 11:12:02 -0700577 static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
578 "Update size logic please.");
579 const flatbuffers::uoffset_t aligned_data_length =
Austin Schuh48d10d62022-10-16 22:19:23 -0700580 ((data_size + 7) & 0xfffffff8u);
Austin Schuhfa30c352022-10-16 11:12:02 -0700581 switch (log_type) {
582 case LogType::kLogDeliveryTimeOnly:
583 return PackMessageHeaderSize(log_type);
584
585 case LogType::kLogMessage:
586 case LogType::kLogMessageAndDeliveryTime:
587 case LogType::kLogRemoteMessage:
588 return PackMessageHeaderSize(log_type) +
589 // Vector...
590 sizeof(flatbuffers::uoffset_t) + aligned_data_length;
591 }
592 LOG(FATAL);
593}
594
Austin Schuhfa30c352022-10-16 11:12:02 -0700595size_t PackMessageInline(uint8_t *buffer, const Context &context,
596 int channel_index, LogType log_type) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700597 // TODO(austin): Figure out how to copy directly from shared memory instead of
598 // first into the fetcher's memory and then into here. That would save a lot
599 // of memory.
Austin Schuhfa30c352022-10-16 11:12:02 -0700600 const flatbuffers::uoffset_t message_size =
Austin Schuh48d10d62022-10-16 22:19:23 -0700601 PackMessageSize(log_type, context.size);
Austin Schuhfa30c352022-10-16 11:12:02 -0700602
603 buffer = Push<flatbuffers::uoffset_t>(
604 buffer, message_size - sizeof(flatbuffers::uoffset_t));
605
606 // Pack all the data in. This is brittle but easy to change. Use the
607 // InlinePackMessage.Equivilent unit test to verify everything matches.
608 switch (log_type) {
609 case LogType::kLogMessage:
610 // clang-format off
611 // header:
612 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
613 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
614 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
615 //
616 // padding:
617 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
618 buffer = Pad(buffer, 6);
619 //
620 // vtable (aos.logger.MessageHeader):
621 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
622 buffer = Push<flatbuffers::voffset_t>(buffer, 0xe);
623 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
624 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
625 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
626 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
627 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
628 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
629 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
630 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
631 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
632 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
633 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
634 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
635 //
636 // root_table (aos.logger.MessageHeader):
637 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
638 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e);
639 // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long)
640 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
641 // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long)
642 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
643 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
644 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c);
645 // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt)
646 buffer = Push<uint32_t>(buffer, context.queue_index);
647 // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt)
648 buffer = Push<uint32_t>(buffer, channel_index);
649 //
650 // vector (aos.logger.MessageHeader.data):
651 // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items)
652 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
653 // +0x40 | FF | uint8_t | 0xFF (255) | value[0]
654 // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1]
655 // +0x42 | EE | uint8_t | 0xEE (238) | value[2]
656 // +0x43 | 00 | uint8_t | 0x00 (0) | value[3]
657 // +0x44 | 20 | uint8_t | 0x20 (32) | value[4]
658 // +0x45 | 4D | uint8_t | 0x4D (77) | value[5]
659 // +0x46 | FF | uint8_t | 0xFF (255) | value[6]
660 // +0x47 | 25 | uint8_t | 0x25 (37) | value[7]
661 // +0x48 | 3C | uint8_t | 0x3C (60) | value[8]
662 // +0x49 | 17 | uint8_t | 0x17 (23) | value[9]
663 // +0x4A | 65 | uint8_t | 0x65 (101) | value[10]
664 // +0x4B | 2F | uint8_t | 0x2F (47) | value[11]
665 // +0x4C | 63 | uint8_t | 0x63 (99) | value[12]
666 // +0x4D | 58 | uint8_t | 0x58 (88) | value[13]
667 buffer = PushBytes(buffer, context.data, context.size);
668 //
669 // padding:
670 // +0x4E | 00 00 | uint8_t[2] | .. | padding
671 buffer = Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
672 // clang-format on
673 break;
674
675 case LogType::kLogDeliveryTimeOnly:
676 // clang-format off
677 // header:
678 // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix
679 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
680 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
681 //
682 // padding:
683 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
684 buffer = Pad(buffer, 4);
685 //
686 // vtable (aos.logger.MessageHeader):
687 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
688 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
689 // +0x0E | 30 00 | uint16_t | 0x0030 (48) | size of referring table
690 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
691 // +0x10 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `channel_index` (id: 0)
692 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
693 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
694 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
695 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
696 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
697 // +0x16 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `queue_index` (id: 3)
698 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
699 // +0x18 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector)
700 buffer = Push<flatbuffers::voffset_t>(buffer, 0x00);
701 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
702 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
703 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
704 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
705 // +0x1E | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7)
706 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
707 //
708 // root_table (aos.logger.MessageHeader):
709 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
710 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
711 // +0x24 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `remote_queue_index` (UInt)
712 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
713 // +0x28 | C6 85 F1 AB 83 B5 CD EB | int64_t | 0xEBCDB583ABF185C6 (-1455307527440726586) | table field `realtime_remote_time` (Long)
714 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
715 // +0x30 | 47 24 D3 97 1E 42 2D 99 | int64_t | 0x992D421E97D32447 (-7409193112790948793) | table field `monotonic_remote_time` (Long)
716 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
717 // +0x38 | C8 B9 A7 AB 79 F2 CD 60 | int64_t | 0x60CDF279ABA7B9C8 (6975498002251626952) | table field `realtime_sent_time` (Long)
718 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
719 // +0x40 | EA 8F 2A 0F AF 01 7A AB | int64_t | 0xAB7A01AF0F2A8FEA (-6090553694679822358) | table field `monotonic_sent_time` (Long)
720 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
721 // +0x48 | F5 00 00 00 | uint32_t | 0x000000F5 (245) | table field `queue_index` (UInt)
722 buffer = Push<uint32_t>(buffer, context.queue_index);
723 // +0x4C | 88 00 00 00 | uint32_t | 0x00000088 (136) | table field `channel_index` (UInt)
724 buffer = Push<uint32_t>(buffer, channel_index);
725
726 // clang-format on
727 break;
728
729 case LogType::kLogMessageAndDeliveryTime:
730 // clang-format off
731 // header:
732 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
733 // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader`
734 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c);
735 //
736 // padding:
737 // +0x08 | 00 00 00 00 | uint8_t[4] | .... | padding
738 buffer = Pad(buffer, 4);
739 //
740 // vtable (aos.logger.MessageHeader):
741 // +0x0C | 14 00 | uint16_t | 0x0014 (20) | size of this vtable
742 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
743 // +0x0E | 34 00 | uint16_t | 0x0034 (52) | size of referring table
744 buffer = Push<flatbuffers::voffset_t>(buffer, 0x34);
745 // +0x10 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `channel_index` (id: 0)
746 buffer = Push<flatbuffers::voffset_t>(buffer, 0x30);
747 // +0x12 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_sent_time` (id: 1)
748 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
749 // +0x14 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `realtime_sent_time` (id: 2)
750 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
751 // +0x16 | 2C 00 | VOffset16 | 0x002C (44) | offset to field `queue_index` (id: 3)
752 buffer = Push<flatbuffers::voffset_t>(buffer, 0x2c);
753 // +0x18 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `data` (id: 4)
754 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
755 // +0x1A | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5)
756 buffer = Push<flatbuffers::voffset_t>(buffer, 0x10);
757 // +0x1C | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6)
758 buffer = Push<flatbuffers::voffset_t>(buffer, 0x08);
759 // +0x1E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `remote_queue_index` (id: 7)
760 buffer = Push<flatbuffers::voffset_t>(buffer, 0x28);
761 //
762 // root_table (aos.logger.MessageHeader):
763 // +0x20 | 14 00 00 00 | SOffset32 | 0x00000014 (20) Loc: +0x0C | offset to vtable
764 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x14);
765 // +0x24 | 30 00 00 00 | UOffset32 | 0x00000030 (48) Loc: +0x54 | offset to field `data` (vector)
766 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x30);
767 // +0x28 | C4 C8 87 BF 40 6C 1F 29 | int64_t | 0x291F6C40BF87C8C4 (2963206105180129476) | table field `realtime_remote_time` (Long)
768 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
769 // +0x30 | 0F 00 26 FD D2 6D C0 1F | int64_t | 0x1FC06DD2FD26000F (2287949363661897743) | table field `monotonic_remote_time` (Long)
770 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
771 // +0x38 | 29 75 09 C0 73 73 BF 88 | int64_t | 0x88BF7373C0097529 (-8593022623019338455) | table field `realtime_sent_time` (Long)
772 buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count());
773 // +0x40 | 6D 8A AE 04 50 25 9C E9 | int64_t | 0xE99C255004AE8A6D (-1613373540899321235) | table field `monotonic_sent_time` (Long)
774 buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count());
775 // +0x48 | 47 00 00 00 | uint32_t | 0x00000047 (71) | table field `remote_queue_index` (UInt)
776 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
777 // +0x4C | 4C 00 00 00 | uint32_t | 0x0000004C (76) | table field `queue_index` (UInt)
778 buffer = Push<uint32_t>(buffer, context.queue_index);
779 // +0x50 | 72 00 00 00 | uint32_t | 0x00000072 (114) | table field `channel_index` (UInt)
780 buffer = Push<uint32_t>(buffer, channel_index);
781 //
782 // vector (aos.logger.MessageHeader.data):
783 // +0x54 | 07 00 00 00 | uint32_t | 0x00000007 (7) | length of vector (# items)
784 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
785 // +0x58 | B1 | uint8_t | 0xB1 (177) | value[0]
786 // +0x59 | 4A | uint8_t | 0x4A (74) | value[1]
787 // +0x5A | 50 | uint8_t | 0x50 (80) | value[2]
788 // +0x5B | 24 | uint8_t | 0x24 (36) | value[3]
789 // +0x5C | AF | uint8_t | 0xAF (175) | value[4]
790 // +0x5D | C8 | uint8_t | 0xC8 (200) | value[5]
791 // +0x5E | D5 | uint8_t | 0xD5 (213) | value[6]
792 buffer = PushBytes(buffer, context.data, context.size);
793 //
794 // padding:
795 // +0x5F | 00 | uint8_t[1] | . | padding
796 buffer = Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
797 // clang-format on
798
799 break;
800
801 case LogType::kLogRemoteMessage:
802 // This is the message we need to recreate.
803 //
804 // clang-format off
805 // header:
806 // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix
807 // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader`
808 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18);
809 //
810 // padding:
811 // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
812 buffer = Pad(buffer, 6);
813 //
814 // vtable (aos.logger.MessageHeader):
815 // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable
816 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e);
817 // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table
818 buffer = Push<flatbuffers::voffset_t>(buffer, 0x20);
819 // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0)
820 buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c);
821 // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1)
822 buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c);
823 // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2)
824 buffer = Push<flatbuffers::voffset_t>(buffer, 0x04);
825 // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3)
826 buffer = Push<flatbuffers::voffset_t>(buffer, 0x18);
827 // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4)
828 buffer = Push<flatbuffers::voffset_t>(buffer, 0x14);
829 //
830 // root_table (aos.logger.MessageHeader):
831 // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable
832 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E);
833 // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long)
834 buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count());
835 // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long)
836 buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count());
837 // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector)
838 buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C);
839 // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt)
840 buffer = Push<uint32_t>(buffer, context.remote_queue_index);
841 // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt)
842 buffer = Push<uint32_t>(buffer, channel_index);
843 //
844 // vector (aos.logger.MessageHeader.data):
845 // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items)
846 buffer = Push<flatbuffers::uoffset_t>(buffer, context.size);
847 // +0x40 | 38 | uint8_t | 0x38 (56) | value[0]
848 // +0x41 | 1A | uint8_t | 0x1A (26) | value[1]
849 // ...
850 // +0x58 | 90 | uint8_t | 0x90 (144) | value[24]
851 // +0x59 | 92 | uint8_t | 0x92 (146) | value[25]
852 buffer = PushBytes(buffer, context.data, context.size);
853 //
854 // padding:
855 // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding
856 buffer = Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size);
857 // clang-format on
858 }
859
860 return message_size;
861}
862
Austin Schuhcd368422021-11-22 21:23:29 -0800863SpanReader::SpanReader(std::string_view filename, bool quiet)
864 : filename_(filename) {
Tyler Chatow2015bc62021-08-04 21:15:09 -0700865 decoder_ = std::make_unique<DummyDecoder>(filename);
866
867 static constexpr std::string_view kXz = ".xz";
James Kuszmauldd0a5042021-10-28 23:38:04 -0700868 static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700869 if (filename.substr(filename.size() - kXz.size()) == kXz) {
870#if ENABLE_LZMA
Austin Schuhcd368422021-11-22 21:23:29 -0800871 decoder_ =
872 std::make_unique<ThreadedLzmaDecoder>(std::move(decoder_), quiet);
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700873#else
Austin Schuhcd368422021-11-22 21:23:29 -0800874 (void)quiet;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700875 LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
876#endif
James Kuszmauldd0a5042021-10-28 23:38:04 -0700877 } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) {
878 decoder_ = std::make_unique<SnappyDecoder>(std::move(decoder_));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700879 }
Austin Schuh05b70472020-01-01 17:11:17 -0800880}
881
Austin Schuhcf5f6442021-07-06 10:43:28 -0700882absl::Span<const uint8_t> SpanReader::PeekMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -0800883 // Make sure we have enough for the size.
884 if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
885 if (!ReadBlock()) {
886 return absl::Span<const uint8_t>();
887 }
888 }
889
890 // Now make sure we have enough for the message.
891 const size_t data_size =
892 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
893 sizeof(flatbuffers::uoffset_t);
Austin Schuhe4fca832020-03-07 16:58:53 -0800894 if (data_size == sizeof(flatbuffers::uoffset_t)) {
895 LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping.";
896 LOG(ERROR) << " Rest of log file is "
897 << absl::BytesToHexString(std::string_view(
898 reinterpret_cast<const char *>(data_.data() +
899 consumed_data_),
900 data_.size() - consumed_data_));
901 return absl::Span<const uint8_t>();
902 }
Austin Schuh05b70472020-01-01 17:11:17 -0800903 while (data_.size() < consumed_data_ + data_size) {
904 if (!ReadBlock()) {
905 return absl::Span<const uint8_t>();
906 }
907 }
908
909 // And return it, consuming the data.
910 const uint8_t *data_ptr = data_.data() + consumed_data_;
911
Austin Schuh05b70472020-01-01 17:11:17 -0800912 return absl::Span<const uint8_t>(data_ptr, data_size);
913}
914
Austin Schuhcf5f6442021-07-06 10:43:28 -0700915void SpanReader::ConsumeMessage() {
Brian Smarttea913d42021-12-10 15:02:38 -0800916 size_t consumed_size =
Austin Schuhcf5f6442021-07-06 10:43:28 -0700917 flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
918 sizeof(flatbuffers::uoffset_t);
Brian Smarttea913d42021-12-10 15:02:38 -0800919 consumed_data_ += consumed_size;
920 total_consumed_ += consumed_size;
Austin Schuhcf5f6442021-07-06 10:43:28 -0700921}
922
923absl::Span<const uint8_t> SpanReader::ReadMessage() {
924 absl::Span<const uint8_t> result = PeekMessage();
925 if (result != absl::Span<const uint8_t>()) {
926 ConsumeMessage();
Brian Smarttea913d42021-12-10 15:02:38 -0800927 } else {
928 is_finished_ = true;
Austin Schuhcf5f6442021-07-06 10:43:28 -0700929 }
930 return result;
931}
932
Austin Schuh05b70472020-01-01 17:11:17 -0800933bool SpanReader::ReadBlock() {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700934 // This is the amount of data we grab at a time. Doing larger chunks minimizes
935 // syscalls and helps decompressors batch things more efficiently.
Austin Schuh05b70472020-01-01 17:11:17 -0800936 constexpr size_t kReadSize = 256 * 1024;
937
938 // Strip off any unused data at the front.
939 if (consumed_data_ != 0) {
Brian Silvermanf51499a2020-09-21 12:49:08 -0700940 data_.erase_front(consumed_data_);
Austin Schuh05b70472020-01-01 17:11:17 -0800941 consumed_data_ = 0;
942 }
943
944 const size_t starting_size = data_.size();
945
946 // This should automatically grow the backing store. It won't shrink if we
947 // get a small chunk later. This reduces allocations when we want to append
948 // more data.
Brian Silvermanf51499a2020-09-21 12:49:08 -0700949 data_.resize(starting_size + kReadSize);
Austin Schuh05b70472020-01-01 17:11:17 -0800950
Brian Silvermanf51499a2020-09-21 12:49:08 -0700951 const size_t count =
952 decoder_->Read(data_.begin() + starting_size, data_.end());
953 data_.resize(starting_size + count);
Austin Schuh05b70472020-01-01 17:11:17 -0800954 if (count == 0) {
Austin Schuh05b70472020-01-01 17:11:17 -0800955 return false;
956 }
Austin Schuh05b70472020-01-01 17:11:17 -0800957
Brian Smarttea913d42021-12-10 15:02:38 -0800958 total_read_ += count;
959
Austin Schuh05b70472020-01-01 17:11:17 -0800960 return true;
961}
962
Austin Schuhadd6eb32020-11-09 21:24:26 -0800963std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
Austin Schuh0e8db662021-07-06 10:43:47 -0700964 SpanReader *span_reader) {
965 absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800966
967 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -0800968 if (config_data == absl::Span<const uint8_t>()) {
969 return std::nullopt;
970 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800971
Austin Schuh5212cad2020-09-09 23:12:09 -0700972 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -0700973 SizePrefixedFlatbufferVector<LogFileHeader> result(config_data);
Austin Schuhe09beb12020-12-11 20:04:27 -0800974 if (!result.Verify()) {
975 return std::nullopt;
976 }
Austin Schuh0e8db662021-07-06 10:43:47 -0700977
978 if (FLAGS_workaround_double_headers) {
979 while (true) {
980 absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
981 if (maybe_header_data == absl::Span<const uint8_t>()) {
982 break;
983 }
984
985 aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
986 maybe_header_data);
987 if (maybe_header.Verify()) {
988 LOG(WARNING) << "Found duplicate LogFileHeader in "
989 << span_reader->filename();
990 ResizeableBuffer header_data_copy;
991 header_data_copy.resize(maybe_header_data.size());
992 memcpy(header_data_copy.data(), maybe_header_data.begin(),
993 header_data_copy.size());
994 result = SizePrefixedFlatbufferVector<LogFileHeader>(
995 std::move(header_data_copy));
996
997 span_reader->ConsumeMessage();
998 } else {
999 break;
1000 }
1001 }
1002 }
Austin Schuhe09beb12020-12-11 20:04:27 -08001003 return result;
Austin Schuh6f3babe2020-01-26 20:34:50 -08001004}
1005
Austin Schuh0e8db662021-07-06 10:43:47 -07001006std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
1007 std::string_view filename) {
1008 SpanReader span_reader(filename);
1009 return ReadHeader(&span_reader);
1010}
1011
Austin Schuhadd6eb32020-11-09 21:24:26 -08001012std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
Austin Schuh3bd4c402020-11-06 18:19:06 -08001013 std::string_view filename, size_t n) {
Austin Schuh5212cad2020-09-09 23:12:09 -07001014 SpanReader span_reader(filename);
1015 absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
1016 for (size_t i = 0; i < n + 1; ++i) {
1017 data_span = span_reader.ReadMessage();
1018
1019 // Make sure something was read.
Austin Schuh3bd4c402020-11-06 18:19:06 -08001020 if (data_span == absl::Span<const uint8_t>()) {
1021 return std::nullopt;
1022 }
Austin Schuh5212cad2020-09-09 23:12:09 -07001023 }
1024
Brian Silverman354697a2020-09-22 21:06:32 -07001025 // And copy the config so we have it forever, removing the size prefix.
Austin Schuhb929c4e2021-07-12 15:32:53 -07001026 SizePrefixedFlatbufferVector<MessageHeader> result(data_span);
Austin Schuhe09beb12020-12-11 20:04:27 -08001027 if (!result.Verify()) {
1028 return std::nullopt;
1029 }
1030 return result;
Austin Schuh5212cad2020-09-09 23:12:09 -07001031}
1032
Austin Schuh05b70472020-01-01 17:11:17 -08001033MessageReader::MessageReader(std::string_view filename)
Austin Schuh97789fc2020-08-01 14:42:45 -07001034 : span_reader_(filename),
Austin Schuhadd6eb32020-11-09 21:24:26 -08001035 raw_log_file_header_(
1036 SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001037 set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
1038 set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
1039
Austin Schuh0e8db662021-07-06 10:43:47 -07001040 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
1041 raw_log_file_header = ReadHeader(&span_reader_);
Austin Schuh05b70472020-01-01 17:11:17 -08001042
1043 // Make sure something was read.
Austin Schuh0e8db662021-07-06 10:43:47 -07001044 CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
Austin Schuh05b70472020-01-01 17:11:17 -08001045
Austin Schuh0e8db662021-07-06 10:43:47 -07001046 raw_log_file_header_ = std::move(*raw_log_file_header);
Austin Schuh05b70472020-01-01 17:11:17 -08001047
Austin Schuh5b728b72021-06-16 14:57:15 -07001048 CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
1049
Brian Smarttea913d42021-12-10 15:02:38 -08001050 total_verified_before_ = span_reader_.TotalConsumed();
1051
Austin Schuhcde938c2020-02-02 17:30:07 -08001052 max_out_of_order_duration_ =
Austin Schuha040c3f2021-02-13 16:09:07 -08001053 FLAGS_max_out_of_order > 0
1054 ? chrono::duration_cast<chrono::nanoseconds>(
1055 chrono::duration<double>(FLAGS_max_out_of_order))
1056 : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
Austin Schuhcde938c2020-02-02 17:30:07 -08001057
1058 VLOG(1) << "Opened " << filename << " as node "
1059 << FlatbufferToJson(log_file_header()->node());
Austin Schuh05b70472020-01-01 17:11:17 -08001060}
1061
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001062std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
Austin Schuh05b70472020-01-01 17:11:17 -08001063 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1064 if (msg_data == absl::Span<const uint8_t>()) {
Brian Smarttea913d42021-12-10 15:02:38 -08001065 if (is_corrupted()) {
1066 LOG(ERROR) << "Total corrupted volumes: before = "
1067 << total_verified_before_
1068 << " | corrupted = " << total_corrupted_
1069 << " | during = " << total_verified_during_
1070 << " | after = " << total_verified_after_ << std::endl;
1071 }
1072
1073 if (span_reader_.IsIncomplete()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001074 LOG(ERROR) << "Unable to access some messages in " << filename() << " : "
1075 << span_reader_.TotalRead() << " bytes read, "
Brian Smarttea913d42021-12-10 15:02:38 -08001076 << span_reader_.TotalConsumed() << " bytes usable."
1077 << std::endl;
1078 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001079 return nullptr;
Austin Schuh05b70472020-01-01 17:11:17 -08001080 }
1081
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001082 SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
Brian Smarttea913d42021-12-10 15:02:38 -08001083
1084 if (crash_on_corrupt_message_flag_) {
1085 CHECK(msg.Verify()) << "Corrupted message at offset "
Austin Schuh60e77942022-05-16 17:48:24 -07001086 << total_verified_before_ << " found within "
1087 << filename()
Brian Smarttea913d42021-12-10 15:02:38 -08001088 << "; set --nocrash_on_corrupt_message to see summary;"
1089 << " also set --ignore_corrupt_messages to process"
1090 << " anyway";
1091
1092 } else if (!msg.Verify()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001093 LOG(ERROR) << "Corrupted message at offset " << total_verified_before_
Brian Smarttea913d42021-12-10 15:02:38 -08001094 << " from " << filename() << std::endl;
1095
1096 total_corrupted_ += msg_data.size();
1097
1098 while (true) {
1099 absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
1100
1101 if (msg_data == absl::Span<const uint8_t>()) {
1102 if (!ignore_corrupt_messages_flag_) {
1103 LOG(ERROR) << "Total corrupted volumes: before = "
1104 << total_verified_before_
1105 << " | corrupted = " << total_corrupted_
1106 << " | during = " << total_verified_during_
1107 << " | after = " << total_verified_after_ << std::endl;
1108
1109 if (span_reader_.IsIncomplete()) {
1110 LOG(ERROR) << "Unable to access some messages in " << filename()
1111 << " : " << span_reader_.TotalRead() << " bytes read, "
1112 << span_reader_.TotalConsumed() << " bytes usable."
1113 << std::endl;
1114 }
1115 return nullptr;
1116 }
1117 break;
1118 }
1119
1120 SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
1121
1122 if (!next_msg.Verify()) {
1123 total_corrupted_ += msg_data.size();
1124 total_verified_during_ += total_verified_after_;
1125 total_verified_after_ = 0;
1126
1127 } else {
1128 total_verified_after_ += msg_data.size();
1129 if (ignore_corrupt_messages_flag_) {
1130 msg = next_msg;
1131 break;
1132 }
1133 }
1134 }
1135 }
1136
1137 if (is_corrupted()) {
1138 total_verified_after_ += msg_data.size();
1139 } else {
1140 total_verified_before_ += msg_data.size();
1141 }
Austin Schuh05b70472020-01-01 17:11:17 -08001142
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001143 auto result = UnpackedMessageHeader::MakeMessage(msg.message());
Austin Schuh0e8db662021-07-06 10:43:47 -07001144
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001145 const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
Austin Schuh05b70472020-01-01 17:11:17 -08001146
1147 newest_timestamp_ = std::max(newest_timestamp_, timestamp);
Austin Schuhd1873292021-11-18 15:35:30 -08001148
1149 if (VLOG_IS_ON(3)) {
1150 VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
1151 } else if (VLOG_IS_ON(2)) {
1152 SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg;
1153 msg_copy.mutable_message()->clear_data();
1154 VLOG(2) << "Read from " << filename() << " data "
1155 << FlatbufferToJson(msg_copy);
1156 }
1157
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001158 return result;
1159}
1160
1161std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
1162 const MessageHeader &message) {
1163 const size_t data_size = message.has_data() ? message.data()->size() : 0;
1164
1165 UnpackedMessageHeader *const unpacked_message =
1166 reinterpret_cast<UnpackedMessageHeader *>(
1167 malloc(sizeof(UnpackedMessageHeader) + data_size +
1168 kChannelDataAlignment - 1));
1169
1170 CHECK(message.has_channel_index());
1171 CHECK(message.has_monotonic_sent_time());
1172
1173 absl::Span<uint8_t> span;
1174 if (data_size > 0) {
1175 span =
1176 absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
1177 &unpacked_message->actual_data[0], data_size)),
1178 data_size);
1179 }
1180
Austin Schuh826e6ce2021-11-18 20:33:10 -08001181 std::optional<aos::monotonic_clock::time_point> monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001182 if (message.has_monotonic_remote_time()) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001183 monotonic_remote_time = aos::monotonic_clock::time_point(
1184 std::chrono::nanoseconds(message.monotonic_remote_time()));
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001185 }
1186 std::optional<realtime_clock::time_point> realtime_remote_time;
1187 if (message.has_realtime_remote_time()) {
1188 realtime_remote_time = realtime_clock::time_point(
1189 chrono::nanoseconds(message.realtime_remote_time()));
1190 }
1191
1192 std::optional<uint32_t> remote_queue_index;
1193 if (message.has_remote_queue_index()) {
1194 remote_queue_index = message.remote_queue_index();
1195 }
1196
1197 new (unpacked_message) UnpackedMessageHeader{
1198 .channel_index = message.channel_index(),
1199 .monotonic_sent_time = monotonic_clock::time_point(
1200 chrono::nanoseconds(message.monotonic_sent_time())),
1201 .realtime_sent_time = realtime_clock::time_point(
1202 chrono::nanoseconds(message.realtime_sent_time())),
1203 .queue_index = message.queue_index(),
1204 .monotonic_remote_time = monotonic_remote_time,
1205 .realtime_remote_time = realtime_remote_time,
1206 .remote_queue_index = remote_queue_index,
1207 .monotonic_timestamp_time = monotonic_clock::time_point(
1208 std::chrono::nanoseconds(message.monotonic_timestamp_time())),
1209 .has_monotonic_timestamp_time = message.has_monotonic_timestamp_time(),
1210 .span = span};
1211
1212 if (data_size > 0) {
1213 memcpy(span.data(), message.data()->data(), data_size);
1214 }
1215
1216 return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
1217 &DestroyAndFree);
Austin Schuh05b70472020-01-01 17:11:17 -08001218}
1219
Austin Schuhc41603c2020-10-11 16:17:37 -07001220PartsMessageReader::PartsMessageReader(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001221 : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001222 if (parts_.parts.size() >= 2) {
1223 next_message_reader_.emplace(parts_.parts[1]);
1224 }
Austin Schuh48507722021-07-17 17:29:24 -07001225 ComputeBootCounts();
1226}
1227
1228void PartsMessageReader::ComputeBootCounts() {
1229 boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
1230 std::nullopt);
1231
1232 // We have 3 vintages of log files with different amounts of information.
1233 if (log_file_header()->has_boot_uuids()) {
1234 // The new hotness with the boots explicitly listed out. We can use the log
1235 // file header to compute the boot count of all relevant nodes.
1236 CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
1237 size_t node_index = 0;
1238 for (const flatbuffers::String *boot_uuid :
1239 *log_file_header()->boot_uuids()) {
1240 CHECK(parts_.boots);
1241 if (boot_uuid->size() != 0) {
1242 auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
1243 if (it != parts_.boots->boot_count_map.end()) {
1244 boot_counts_[node_index] = it->second;
1245 }
1246 } else if (parts().boots->boots[node_index].size() == 1u) {
1247 boot_counts_[node_index] = 0;
1248 }
1249 ++node_index;
1250 }
1251 } else {
1252 // Older multi-node logs which are guarenteed to have UUIDs logged, or
1253 // single node log files with boot UUIDs in the header. We only know how to
1254 // order certain boots in certain circumstances.
1255 if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
1256 for (size_t node_index = 0; node_index < boot_counts_.size();
1257 ++node_index) {
1258 CHECK(parts_.boots);
1259 if (parts().boots->boots[node_index].size() == 1u) {
1260 boot_counts_[node_index] = 0;
1261 }
1262 }
1263 } else {
1264 // Really old single node logs without any UUIDs. They can't reboot.
1265 CHECK_EQ(boot_counts_.size(), 1u);
1266 boot_counts_[0] = 0u;
1267 }
1268 }
1269}
Austin Schuhc41603c2020-10-11 16:17:37 -07001270
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001271std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
Austin Schuhc41603c2020-10-11 16:17:37 -07001272 while (!done_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001273 std::shared_ptr<UnpackedMessageHeader> message =
Austin Schuhc41603c2020-10-11 16:17:37 -07001274 message_reader_.ReadMessage();
1275 if (message) {
1276 newest_timestamp_ = message_reader_.newest_timestamp();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001277 const monotonic_clock::time_point monotonic_sent_time =
1278 message->monotonic_sent_time;
1279
1280 // TODO(austin): Does this work with startup? Might need to use the
1281 // start time.
1282 // TODO(austin): Does this work with startup when we don't know the
1283 // remote start time too? Look at one of those logs to compare.
Austin Schuh315b96b2020-12-11 21:21:12 -08001284 if (monotonic_sent_time >
1285 parts_.monotonic_start_time + max_out_of_order_duration()) {
1286 after_start_ = true;
1287 }
1288 if (after_start_) {
Austin Schuhb000de62020-12-03 22:00:40 -08001289 CHECK_GE(monotonic_sent_time,
1290 newest_timestamp_ - max_out_of_order_duration())
Austin Schuha040c3f2021-02-13 16:09:07 -08001291 << ": Max out of order of " << max_out_of_order_duration().count()
1292 << "ns exceeded. " << parts_ << ", start time is "
Austin Schuh315b96b2020-12-11 21:21:12 -08001293 << parts_.monotonic_start_time << " currently reading "
1294 << filename();
Austin Schuhb000de62020-12-03 22:00:40 -08001295 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001296 return message;
1297 }
1298 NextLog();
1299 }
Austin Schuh32f68492020-11-08 21:45:51 -08001300 newest_timestamp_ = monotonic_clock::max_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001301 return nullptr;
Austin Schuhc41603c2020-10-11 16:17:37 -07001302}
1303
1304void PartsMessageReader::NextLog() {
1305 if (next_part_index_ == parts_.parts.size()) {
Brian Silvermanfee16972021-09-14 12:06:38 -07001306 CHECK(!next_message_reader_);
Austin Schuhc41603c2020-10-11 16:17:37 -07001307 done_ = true;
1308 return;
1309 }
Brian Silvermanfee16972021-09-14 12:06:38 -07001310 CHECK(next_message_reader_);
1311 message_reader_ = std::move(*next_message_reader_);
Austin Schuh48507722021-07-17 17:29:24 -07001312 ComputeBootCounts();
Brian Silvermanfee16972021-09-14 12:06:38 -07001313 if (next_part_index_ + 1 < parts_.parts.size()) {
1314 next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
1315 } else {
1316 next_message_reader_.reset();
1317 }
Austin Schuhc41603c2020-10-11 16:17:37 -07001318 ++next_part_index_;
1319}
1320
Austin Schuh1be0ce42020-11-29 22:43:26 -08001321bool Message::operator<(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001322 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001323
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001324 if (this->timestamp.time < m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001325 return true;
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001326 } else if (this->timestamp.time > m2.timestamp.time) {
Austin Schuh1be0ce42020-11-29 22:43:26 -08001327 return false;
1328 }
1329
1330 if (this->channel_index < m2.channel_index) {
1331 return true;
1332 } else if (this->channel_index > m2.channel_index) {
1333 return false;
1334 }
1335
1336 return this->queue_index < m2.queue_index;
1337}
1338
1339bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001340bool Message::operator==(const Message &m2) const {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001341 CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001342
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001343 return timestamp.time == m2.timestamp.time &&
1344 channel_index == m2.channel_index && queue_index == m2.queue_index;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001345}
Austin Schuh1be0ce42020-11-29 22:43:26 -08001346
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001347std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
1348 os << "{.channel_index=" << m.channel_index
1349 << ", .monotonic_sent_time=" << m.monotonic_sent_time
1350 << ", .realtime_sent_time=" << m.realtime_sent_time
1351 << ", .queue_index=" << m.queue_index;
1352 if (m.monotonic_remote_time) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001353 os << ", .monotonic_remote_time=" << *m.monotonic_remote_time;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001354 }
1355 os << ", .realtime_remote_time=";
1356 PrintOptionalOrNull(&os, m.realtime_remote_time);
1357 os << ", .remote_queue_index=";
1358 PrintOptionalOrNull(&os, m.remote_queue_index);
1359 if (m.has_monotonic_timestamp_time) {
1360 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1361 }
Austin Schuh22cf7862022-09-19 19:09:42 -07001362 os << "}";
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001363 return os;
1364}
1365
Austin Schuh1be0ce42020-11-29 22:43:26 -08001366std::ostream &operator<<(std::ostream &os, const Message &m) {
1367 os << "{.channel_index=" << m.channel_index
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001368 << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001369 if (m.data != nullptr) {
Austin Schuh826e6ce2021-11-18 20:33:10 -08001370 if (m.data->remote_queue_index.has_value()) {
1371 os << ", .remote_queue_index=" << *m.data->remote_queue_index;
1372 }
1373 if (m.data->monotonic_remote_time.has_value()) {
1374 os << ", .monotonic_remote_time=" << *m.data->monotonic_remote_time;
1375 }
Austin Schuhfb1b3292021-11-16 21:20:15 -08001376 os << ", .data=" << m.data;
Austin Schuhd2f96102020-12-01 20:27:29 -08001377 }
1378 os << "}";
1379 return os;
1380}
1381
1382std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
1383 os << "{.channel_index=" << m.channel_index
1384 << ", .queue_index=" << m.queue_index
1385 << ", .monotonic_event_time=" << m.monotonic_event_time
1386 << ", .realtime_event_time=" << m.realtime_event_time;
Austin Schuh58646e22021-08-23 23:51:46 -07001387 if (m.remote_queue_index != BootQueueIndex::Invalid()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001388 os << ", .remote_queue_index=" << m.remote_queue_index;
1389 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001390 if (m.monotonic_remote_time != BootTimestamp::min_time()) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001391 os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
1392 }
1393 if (m.realtime_remote_time != realtime_clock::min_time) {
1394 os << ", .realtime_remote_time=" << m.realtime_remote_time;
1395 }
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001396 if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001397 os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
1398 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001399 if (m.data != nullptr) {
1400 os << ", .data=" << *m.data;
Austin Schuh22cf7862022-09-19 19:09:42 -07001401 } else {
1402 os << ", .data=nullptr";
Austin Schuhd2f96102020-12-01 20:27:29 -08001403 }
1404 os << "}";
Austin Schuh1be0ce42020-11-29 22:43:26 -08001405 return os;
1406}
1407
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001408LogPartsSorter::LogPartsSorter(LogParts log_parts)
Austin Schuh48507722021-07-17 17:29:24 -07001409 : parts_message_reader_(log_parts),
1410 source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
1411}
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001412
1413Message *LogPartsSorter::Front() {
1414 // Queue up data until enough data has been queued that the front message is
1415 // sorted enough to be safe to pop. This may do nothing, so we should make
1416 // sure the nothing path is checked quickly.
1417 if (sorted_until() != monotonic_clock::max_time) {
1418 while (true) {
Austin Schuh48507722021-07-17 17:29:24 -07001419 if (!messages_.empty() &&
1420 messages_.begin()->timestamp.time < sorted_until() &&
Austin Schuhb000de62020-12-03 22:00:40 -08001421 sorted_until() >= monotonic_start_time()) {
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001422 break;
1423 }
1424
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001425 std::shared_ptr<UnpackedMessageHeader> m =
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001426 parts_message_reader_.ReadMessage();
1427 // No data left, sorted forever, work through what is left.
1428 if (!m) {
1429 sorted_until_ = monotonic_clock::max_time;
1430 break;
1431 }
1432
Austin Schuh48507722021-07-17 17:29:24 -07001433 size_t monotonic_timestamp_boot = 0;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001434 if (m->has_monotonic_timestamp_time) {
Austin Schuh48507722021-07-17 17:29:24 -07001435 monotonic_timestamp_boot = parts().logger_boot_count;
1436 }
1437 size_t monotonic_remote_boot = 0xffffff;
1438
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001439 if (m->monotonic_remote_time.has_value()) {
Austin Schuh60e77942022-05-16 17:48:24 -07001440 const Node *node =
1441 parts().config->nodes()->Get(source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001442
Austin Schuh48507722021-07-17 17:29:24 -07001443 std::optional<size_t> boot = parts_message_reader_.boot_count(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001444 source_node_index_[m->channel_index]);
milind-ua50344f2021-08-25 18:22:20 -07001445 CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
Austin Schuh60e77942022-05-16 17:48:24 -07001446 << ", with index " << source_node_index_[m->channel_index];
Austin Schuh48507722021-07-17 17:29:24 -07001447 monotonic_remote_boot = *boot;
1448 }
1449
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001450 messages_.insert(
1451 Message{.channel_index = m->channel_index,
1452 .queue_index = BootQueueIndex{.boot = parts().boot_count,
1453 .index = m->queue_index},
1454 .timestamp = BootTimestamp{.boot = parts().boot_count,
1455 .time = m->monotonic_sent_time},
1456 .monotonic_remote_boot = monotonic_remote_boot,
1457 .monotonic_timestamp_boot = monotonic_timestamp_boot,
1458 .data = std::move(m)});
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001459
1460 // Now, update sorted_until_ to match the new message.
1461 if (parts_message_reader_.newest_timestamp() >
1462 monotonic_clock::min_time +
1463 parts_message_reader_.max_out_of_order_duration()) {
1464 sorted_until_ = parts_message_reader_.newest_timestamp() -
1465 parts_message_reader_.max_out_of_order_duration();
1466 } else {
1467 sorted_until_ = monotonic_clock::min_time;
1468 }
1469 }
1470 }
1471
1472 // Now that we have enough data queued, return a pointer to the oldest piece
1473 // of data if it exists.
1474 if (messages_.empty()) {
Austin Schuhb000de62020-12-03 22:00:40 -08001475 last_message_time_ = monotonic_clock::max_time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001476 return nullptr;
1477 }
1478
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001479 CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
Austin Schuh315b96b2020-12-11 21:21:12 -08001480 << DebugString() << " reading " << parts_message_reader_.filename();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001481 last_message_time_ = messages_.begin()->timestamp.time;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001482 return &(*messages_.begin());
1483}
1484
1485void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
1486
1487std::string LogPartsSorter::DebugString() const {
1488 std::stringstream ss;
1489 ss << "messages: [\n";
Austin Schuh315b96b2020-12-11 21:21:12 -08001490 int count = 0;
1491 bool no_dots = true;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001492 for (const Message &m : messages_) {
Austin Schuh315b96b2020-12-11 21:21:12 -08001493 if (count < 15 || count > static_cast<int>(messages_.size()) - 15) {
1494 ss << m << "\n";
1495 } else if (no_dots) {
1496 ss << "...\n";
1497 no_dots = false;
1498 }
1499 ++count;
Austin Schuh4b5c22a2020-11-30 22:58:43 -08001500 }
1501 ss << "] <- " << parts_message_reader_.filename();
1502 return ss.str();
1503}
1504
Austin Schuhd2f96102020-12-01 20:27:29 -08001505NodeMerger::NodeMerger(std::vector<LogParts> parts) {
1506 CHECK_GE(parts.size(), 1u);
Austin Schuh715adc12021-06-29 22:07:39 -07001507 // Enforce that we are sorting things only from a single node from a single
1508 // boot.
1509 const std::string_view part0_node = parts[0].node;
1510 const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
Austin Schuhd2f96102020-12-01 20:27:29 -08001511 for (size_t i = 1; i < parts.size(); ++i) {
1512 CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
Austin Schuh715adc12021-06-29 22:07:39 -07001513 CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
1514 << ": Can't merge different boots.";
Austin Schuhd2f96102020-12-01 20:27:29 -08001515 }
Austin Schuh715adc12021-06-29 22:07:39 -07001516
1517 node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
1518
Austin Schuhd2f96102020-12-01 20:27:29 -08001519 for (LogParts &part : parts) {
1520 parts_sorters_.emplace_back(std::move(part));
1521 }
1522
Austin Schuhd2f96102020-12-01 20:27:29 -08001523 monotonic_start_time_ = monotonic_clock::max_time;
Austin Schuh9dc42612021-09-20 20:41:29 -07001524 realtime_start_time_ = realtime_clock::min_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001525 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001526 // We want to capture the earliest meaningful start time here. The start
1527 // time defaults to min_time when there's no meaningful value to report, so
1528 // let's ignore those.
Austin Schuh9dc42612021-09-20 20:41:29 -07001529 if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time) {
1530 bool accept = false;
1531 // We want to prioritize start times from the logger node. Really, we
1532 // want to prioritize start times with a valid realtime_clock time. So,
1533 // if we have a start time without a RT clock, prefer a start time with a
1534 // RT clock, even it if is later.
1535 if (parts_sorter.realtime_start_time() != realtime_clock::min_time) {
1536 // We've got a good one. See if the current start time has a good RT
1537 // clock, or if we should use this one instead.
1538 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1539 accept = true;
1540 } else if (realtime_start_time_ == realtime_clock::min_time) {
1541 // The previous start time doesn't have a good RT time, so it is very
1542 // likely the start time from a remote part file. We just found a
1543 // better start time with a real RT time, so switch to that instead.
1544 accept = true;
1545 }
1546 } else if (realtime_start_time_ == realtime_clock::min_time) {
1547 // We don't have a RT time, so take the oldest.
1548 if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
1549 accept = true;
1550 }
1551 }
1552
1553 if (accept) {
1554 monotonic_start_time_ = parts_sorter.monotonic_start_time();
1555 realtime_start_time_ = parts_sorter.realtime_start_time();
1556 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001557 }
1558 }
Sanjay Narayanan9896c752021-09-01 16:16:48 -07001559
1560 // If there was no meaningful start time reported, just use min_time.
1561 if (monotonic_start_time_ == monotonic_clock::max_time) {
1562 monotonic_start_time_ = monotonic_clock::min_time;
1563 realtime_start_time_ = realtime_clock::min_time;
1564 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001565}
Austin Schuh8f52ed52020-11-30 23:12:39 -08001566
Austin Schuh0ca51f32020-12-25 21:51:45 -08001567std::vector<const LogParts *> NodeMerger::Parts() const {
1568 std::vector<const LogParts *> p;
1569 p.reserve(parts_sorters_.size());
1570 for (const LogPartsSorter &parts_sorter : parts_sorters_) {
1571 p.emplace_back(&parts_sorter.parts());
1572 }
1573 return p;
1574}
1575
Austin Schuh8f52ed52020-11-30 23:12:39 -08001576Message *NodeMerger::Front() {
1577 // Return the current Front if we have one, otherwise go compute one.
1578 if (current_ != nullptr) {
Austin Schuhb000de62020-12-03 22:00:40 -08001579 Message *result = current_->Front();
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001580 CHECK_GE(result->timestamp.time, last_message_time_);
Austin Schuhb000de62020-12-03 22:00:40 -08001581 return result;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001582 }
1583
1584 // Otherwise, do a simple search for the oldest message, deduplicating any
1585 // duplicates.
1586 Message *oldest = nullptr;
1587 sorted_until_ = monotonic_clock::max_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001588 for (LogPartsSorter &parts_sorter : parts_sorters_) {
1589 Message *m = parts_sorter.Front();
Austin Schuh8f52ed52020-11-30 23:12:39 -08001590 if (!m) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001591 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001592 continue;
1593 }
1594 if (oldest == nullptr || *m < *oldest) {
1595 oldest = m;
Austin Schuhd2f96102020-12-01 20:27:29 -08001596 current_ = &parts_sorter;
Austin Schuh8f52ed52020-11-30 23:12:39 -08001597 } else if (*m == *oldest) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001598 // Found a duplicate. If there is a choice, we want the one which has
1599 // the timestamp time.
1600 if (!m->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001601 parts_sorter.PopFront();
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001602 } else if (!oldest->data->has_monotonic_timestamp_time) {
Austin Schuh8bf1e632021-01-02 22:41:04 -08001603 current_->PopFront();
1604 current_ = &parts_sorter;
1605 oldest = m;
1606 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001607 CHECK_EQ(m->data->monotonic_timestamp_time,
1608 oldest->data->monotonic_timestamp_time);
Austin Schuh8bf1e632021-01-02 22:41:04 -08001609 parts_sorter.PopFront();
1610 }
Austin Schuh8f52ed52020-11-30 23:12:39 -08001611 }
1612
1613 // PopFront may change this, so compute it down here.
Austin Schuhd2f96102020-12-01 20:27:29 -08001614 sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
Austin Schuh8f52ed52020-11-30 23:12:39 -08001615 }
1616
Austin Schuhb000de62020-12-03 22:00:40 -08001617 if (oldest) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001618 CHECK_GE(oldest->timestamp.time, last_message_time_);
1619 last_message_time_ = oldest->timestamp.time;
Austin Schuh5dd22842021-11-17 16:09:39 -08001620 monotonic_oldest_time_ =
1621 std::min(monotonic_oldest_time_, oldest->timestamp.time);
Austin Schuhb000de62020-12-03 22:00:40 -08001622 } else {
1623 last_message_time_ = monotonic_clock::max_time;
1624 }
1625
Austin Schuh8f52ed52020-11-30 23:12:39 -08001626 // Return the oldest message found. This will be nullptr if nothing was
1627 // found, indicating there is nothing left.
1628 return oldest;
1629}
1630
1631void NodeMerger::PopFront() {
1632 CHECK(current_ != nullptr) << "Popping before calling Front()";
1633 current_->PopFront();
1634 current_ = nullptr;
1635}
1636
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001637BootMerger::BootMerger(std::vector<LogParts> files) {
1638 std::vector<std::vector<LogParts>> boots;
1639
1640 // Now, we need to split things out by boot.
1641 for (size_t i = 0; i < files.size(); ++i) {
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001642 const size_t boot_count = files[i].boot_count;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001643 if (boot_count + 1 > boots.size()) {
1644 boots.resize(boot_count + 1);
1645 }
1646 boots[boot_count].emplace_back(std::move(files[i]));
1647 }
1648
1649 node_mergers_.reserve(boots.size());
1650 for (size_t i = 0; i < boots.size(); ++i) {
Austin Schuh48507722021-07-17 17:29:24 -07001651 VLOG(2) << "Boot " << i;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001652 for (auto &p : boots[i]) {
Austin Schuh48507722021-07-17 17:29:24 -07001653 VLOG(2) << "Part " << p;
Austin Schuhf16ef6a2021-06-30 21:48:17 -07001654 }
1655 node_mergers_.emplace_back(
1656 std::make_unique<NodeMerger>(std::move(boots[i])));
1657 }
1658}
1659
1660Message *BootMerger::Front() {
1661 Message *result = node_mergers_[index_]->Front();
1662
1663 if (result != nullptr) {
1664 return result;
1665 }
1666
1667 if (index_ + 1u == node_mergers_.size()) {
1668 // At the end of the last node merger, just return.
1669 return nullptr;
1670 } else {
1671 ++index_;
1672 return Front();
1673 }
1674}
1675
1676void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
1677
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001678std::vector<const LogParts *> BootMerger::Parts() const {
1679 std::vector<const LogParts *> results;
1680 for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
1681 std::vector<const LogParts *> node_parts = node_merger->Parts();
1682
1683 results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
1684 std::make_move_iterator(node_parts.end()));
1685 }
1686
1687 return results;
1688}
1689
Austin Schuhd2f96102020-12-01 20:27:29 -08001690TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001691 : boot_merger_(std::move(parts)),
Austin Schuh79b30942021-01-24 22:32:21 -08001692 timestamp_callback_([](TimestampedMessage *) {}) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001693 for (const LogParts *part : boot_merger_.Parts()) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001694 if (!configuration_) {
1695 configuration_ = part->config;
1696 } else {
1697 CHECK_EQ(configuration_.get(), part->config.get());
1698 }
1699 }
1700 const Configuration *config = configuration_.get();
Austin Schuhd2f96102020-12-01 20:27:29 -08001701 // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
1702 // pretty simple.
1703 if (configuration::MultiNode(config)) {
1704 nodes_data_.resize(config->nodes()->size());
1705 const Node *my_node = config->nodes()->Get(node());
1706 for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
1707 const Node *node = config->nodes()->Get(node_index);
1708 NodeData *node_data = &nodes_data_[node_index];
1709 node_data->channels.resize(config->channels()->size());
1710 // We should save the channel if it is delivered to the node represented
1711 // by the NodeData, but not sent by that node. That combo means it is
1712 // forwarded.
1713 size_t channel_index = 0;
1714 node_data->any_delivered = false;
1715 for (const Channel *channel : *config->channels()) {
1716 node_data->channels[channel_index].delivered =
1717 configuration::ChannelIsReadableOnNode(channel, node) &&
Austin Schuhb3dbb6d2021-01-02 17:29:35 -08001718 configuration::ChannelIsSendableOnNode(channel, my_node) &&
1719 (my_node != node);
Austin Schuhd2f96102020-12-01 20:27:29 -08001720 node_data->any_delivered = node_data->any_delivered ||
1721 node_data->channels[channel_index].delivered;
Austin Schuh6a7358f2021-11-18 22:40:40 -08001722 if (node_data->channels[channel_index].delivered) {
1723 const Connection *connection =
1724 configuration::ConnectionToNode(channel, node);
1725 node_data->channels[channel_index].time_to_live =
1726 chrono::nanoseconds(connection->time_to_live());
1727 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001728 ++channel_index;
1729 }
1730 }
1731
1732 for (const Channel *channel : *config->channels()) {
1733 source_node_.emplace_back(configuration::GetNodeIndex(
1734 config, channel->source_node()->string_view()));
1735 }
1736 }
1737}
1738
1739void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
Austin Schuh0ca51f32020-12-25 21:51:45 -08001740 CHECK(configuration::MultiNode(configuration()));
Austin Schuhd2f96102020-12-01 20:27:29 -08001741 CHECK_NE(timestamp_mapper->node(), node());
1742 CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
1743
1744 NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001745 // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
Austin Schuhd2f96102020-12-01 20:27:29 -08001746 // we could needlessly save data.
1747 if (node_data->any_delivered) {
Austin Schuh87dd3832021-01-01 23:07:31 -08001748 VLOG(1) << "Registering on node " << node() << " for peer node "
1749 << timestamp_mapper->node();
Austin Schuhd2f96102020-12-01 20:27:29 -08001750 CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
1751
1752 timestamp_mapper->nodes_data_[node()].peer = this;
Austin Schuh36c00932021-07-19 18:13:21 -07001753
1754 node_data->save_for_peer = true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001755 }
1756}
1757
Austin Schuh79b30942021-01-24 22:32:21 -08001758void TimestampMapper::QueueMessage(Message *m) {
Austin Schuh60e77942022-05-16 17:48:24 -07001759 matched_messages_.emplace_back(
1760 TimestampedMessage{.channel_index = m->channel_index,
1761 .queue_index = m->queue_index,
1762 .monotonic_event_time = m->timestamp,
1763 .realtime_event_time = m->data->realtime_sent_time,
1764 .remote_queue_index = BootQueueIndex::Invalid(),
1765 .monotonic_remote_time = BootTimestamp::min_time(),
1766 .realtime_remote_time = realtime_clock::min_time,
1767 .monotonic_timestamp_time = BootTimestamp::min_time(),
1768 .data = std::move(m->data)});
Austin Schuhd2f96102020-12-01 20:27:29 -08001769}
1770
1771TimestampedMessage *TimestampMapper::Front() {
1772 // No need to fetch anything new. A previous message still exists.
1773 switch (first_message_) {
1774 case FirstMessage::kNeedsUpdate:
1775 break;
1776 case FirstMessage::kInMessage:
Austin Schuh79b30942021-01-24 22:32:21 -08001777 return &matched_messages_.front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001778 case FirstMessage::kNullptr:
1779 return nullptr;
1780 }
1781
Austin Schuh79b30942021-01-24 22:32:21 -08001782 if (matched_messages_.empty()) {
1783 if (!QueueMatched()) {
1784 first_message_ = FirstMessage::kNullptr;
1785 return nullptr;
1786 }
1787 }
1788 first_message_ = FirstMessage::kInMessage;
1789 return &matched_messages_.front();
1790}
1791
1792bool TimestampMapper::QueueMatched() {
Austin Schuhd2f96102020-12-01 20:27:29 -08001793 if (nodes_data_.empty()) {
1794 // Simple path. We are single node, so there are no timestamps to match!
1795 CHECK_EQ(messages_.size(), 0u);
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001796 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001797 if (!m) {
Austin Schuh79b30942021-01-24 22:32:21 -08001798 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001799 }
Austin Schuh79b30942021-01-24 22:32:21 -08001800 // Enqueue this message into matched_messages_ so we have a place to
1801 // associate remote timestamps, and return it.
1802 QueueMessage(m);
Austin Schuhd2f96102020-12-01 20:27:29 -08001803
Austin Schuh79b30942021-01-24 22:32:21 -08001804 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1805 last_message_time_ = matched_messages_.back().monotonic_event_time;
1806
1807 // We are thin wrapper around node_merger. Call it directly.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001808 boot_merger_.PopFront();
Austin Schuh79b30942021-01-24 22:32:21 -08001809 timestamp_callback_(&matched_messages_.back());
1810 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001811 }
1812
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001813 // We need to only add messages to the list so they get processed for
1814 // messages which are delivered. Reuse the flow below which uses messages_
1815 // by just adding the new message to messages_ and continuing.
Austin Schuhd2f96102020-12-01 20:27:29 -08001816 if (messages_.empty()) {
1817 if (!Queue()) {
1818 // Found nothing to add, we are out of data!
Austin Schuh79b30942021-01-24 22:32:21 -08001819 return false;
Austin Schuhd2f96102020-12-01 20:27:29 -08001820 }
1821
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001822 // Now that it has been added (and cannibalized), forget about it
1823 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001824 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08001825 }
1826
1827 Message *m = &(messages_.front());
1828
1829 if (source_node_[m->channel_index] == node()) {
1830 // From us, just forward it on, filling the remote data in as invalid.
Austin Schuh79b30942021-01-24 22:32:21 -08001831 QueueMessage(m);
1832 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1833 last_message_time_ = matched_messages_.back().monotonic_event_time;
1834 messages_.pop_front();
1835 timestamp_callback_(&matched_messages_.back());
1836 return true;
Austin Schuhd2f96102020-12-01 20:27:29 -08001837 } else {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001838 // Got a timestamp, find the matching remote data, match it, and return
1839 // it.
Austin Schuhd2f96102020-12-01 20:27:29 -08001840 Message data = MatchingMessageFor(*m);
1841
1842 // Return the data from the remote. The local message only has timestamp
1843 // info which isn't relevant anymore once extracted.
Austin Schuh79b30942021-01-24 22:32:21 -08001844 matched_messages_.emplace_back(TimestampedMessage{
Austin Schuhd2f96102020-12-01 20:27:29 -08001845 .channel_index = m->channel_index,
1846 .queue_index = m->queue_index,
1847 .monotonic_event_time = m->timestamp,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001848 .realtime_event_time = m->data->realtime_sent_time,
Austin Schuh58646e22021-08-23 23:51:46 -07001849 .remote_queue_index =
1850 BootQueueIndex{.boot = m->monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001851 .index = m->data->remote_queue_index.value()},
1852 .monotonic_remote_time = {m->monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08001853 m->data->monotonic_remote_time.value()},
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001854 .realtime_remote_time = m->data->realtime_remote_time.value(),
1855 .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
1856 m->data->monotonic_timestamp_time},
Austin Schuh79b30942021-01-24 22:32:21 -08001857 .data = std::move(data.data)});
1858 CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
1859 last_message_time_ = matched_messages_.back().monotonic_event_time;
1860 // Since messages_ holds the data, drop it.
1861 messages_.pop_front();
1862 timestamp_callback_(&matched_messages_.back());
1863 return true;
1864 }
1865}
1866
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001867void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
Austin Schuh79b30942021-01-24 22:32:21 -08001868 while (last_message_time_ <= queue_time) {
1869 if (!QueueMatched()) {
1870 return;
1871 }
Austin Schuhd2f96102020-12-01 20:27:29 -08001872 }
1873}
1874
Austin Schuhe639ea12021-01-25 13:00:22 -08001875void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001876 // Note: queueing for time doesn't really work well across boots. So we
1877 // just assume that if you are using this, you only care about the current
1878 // boot.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001879 //
1880 // TODO(austin): Is that the right concept?
1881 //
Austin Schuhe639ea12021-01-25 13:00:22 -08001882 // Make sure we have something queued first. This makes the end time
1883 // calculation simpler, and is typically what folks want regardless.
1884 if (matched_messages_.empty()) {
1885 if (!QueueMatched()) {
1886 return;
1887 }
1888 }
1889
1890 const aos::monotonic_clock::time_point end_queue_time =
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001891 std::max(monotonic_start_time(
1892 matched_messages_.front().monotonic_event_time.boot),
1893 matched_messages_.front().monotonic_event_time.time) +
Austin Schuhe639ea12021-01-25 13:00:22 -08001894 time_estimation_buffer;
1895
1896 // Place sorted messages on the list until we have
1897 // --time_estimation_buffer_seconds seconds queued up (but queue at least
1898 // until the log starts).
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001899 while (end_queue_time >= last_message_time_.time) {
Austin Schuhe639ea12021-01-25 13:00:22 -08001900 if (!QueueMatched()) {
1901 return;
1902 }
1903 }
1904}
1905
Austin Schuhd2f96102020-12-01 20:27:29 -08001906void TimestampMapper::PopFront() {
1907 CHECK(first_message_ != FirstMessage::kNeedsUpdate);
Austin Schuh6a7358f2021-11-18 22:40:40 -08001908 last_popped_message_time_ = Front()->monotonic_event_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001909 first_message_ = FirstMessage::kNeedsUpdate;
1910
Austin Schuh79b30942021-01-24 22:32:21 -08001911 matched_messages_.pop_front();
Austin Schuhd2f96102020-12-01 20:27:29 -08001912}
1913
1914Message TimestampMapper::MatchingMessageFor(const Message &message) {
Austin Schuhd2f96102020-12-01 20:27:29 -08001915 // Figure out what queue index we are looking for.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001916 CHECK_NOTNULL(message.data);
1917 CHECK(message.data->remote_queue_index.has_value());
Austin Schuh58646e22021-08-23 23:51:46 -07001918 const BootQueueIndex remote_queue_index =
1919 BootQueueIndex{.boot = message.monotonic_remote_boot,
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001920 .index = *message.data->remote_queue_index};
Austin Schuhd2f96102020-12-01 20:27:29 -08001921
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001922 CHECK(message.data->monotonic_remote_time.has_value());
1923 CHECK(message.data->realtime_remote_time.has_value());
Austin Schuhd2f96102020-12-01 20:27:29 -08001924
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001925 const BootTimestamp monotonic_remote_time{
Austin Schuh48507722021-07-17 17:29:24 -07001926 .boot = message.monotonic_remote_boot,
Austin Schuh826e6ce2021-11-18 20:33:10 -08001927 .time = message.data->monotonic_remote_time.value()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001928 const realtime_clock::time_point realtime_remote_time =
1929 *message.data->realtime_remote_time;
Austin Schuhd2f96102020-12-01 20:27:29 -08001930
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001931 TimestampMapper *peer =
1932 nodes_data_[source_node_[message.data->channel_index]].peer;
Austin Schuhfecf1d82020-12-19 16:57:28 -08001933
1934 // We only register the peers which we have data for. So, if we are being
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001935 // asked to pull a timestamp from a peer which doesn't exist, return an
1936 // empty message.
Austin Schuhfecf1d82020-12-19 16:57:28 -08001937 if (peer == nullptr) {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07001938 // TODO(austin): Make sure the tests hit all these paths with a boot count
1939 // of 1...
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001940 return Message{.channel_index = message.channel_index,
1941 .queue_index = remote_queue_index,
1942 .timestamp = monotonic_remote_time,
1943 .monotonic_remote_boot = 0xffffff,
1944 .monotonic_timestamp_boot = 0xffffff,
1945 .data = nullptr};
Austin Schuhfecf1d82020-12-19 16:57:28 -08001946 }
1947
1948 // The queue which will have the matching data, if available.
1949 std::deque<Message> *data_queue =
1950 &peer->nodes_data_[node()].channels[message.channel_index].messages;
1951
Austin Schuh79b30942021-01-24 22:32:21 -08001952 peer->QueueUnmatchedUntil(monotonic_remote_time);
Austin Schuhd2f96102020-12-01 20:27:29 -08001953
1954 if (data_queue->empty()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001955 return Message{.channel_index = message.channel_index,
1956 .queue_index = remote_queue_index,
1957 .timestamp = monotonic_remote_time,
1958 .monotonic_remote_boot = 0xffffff,
1959 .monotonic_timestamp_boot = 0xffffff,
1960 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08001961 }
1962
Austin Schuhd2f96102020-12-01 20:27:29 -08001963 if (remote_queue_index < data_queue->front().queue_index ||
1964 remote_queue_index > data_queue->back().queue_index) {
Austin Schuh60e77942022-05-16 17:48:24 -07001965 return Message{.channel_index = message.channel_index,
1966 .queue_index = remote_queue_index,
1967 .timestamp = monotonic_remote_time,
1968 .monotonic_remote_boot = 0xffffff,
1969 .monotonic_timestamp_boot = 0xffffff,
1970 .data = nullptr};
Austin Schuhd2f96102020-12-01 20:27:29 -08001971 }
1972
Austin Schuh993ccb52020-12-12 15:59:32 -08001973 // The algorithm below is constant time with some assumptions. We need there
1974 // to be no missing messages in the data stream. This also assumes a queue
1975 // hasn't wrapped. That is conservative, but should let us get started.
Austin Schuh58646e22021-08-23 23:51:46 -07001976 if (data_queue->back().queue_index.boot ==
1977 data_queue->front().queue_index.boot &&
1978 (data_queue->back().queue_index.index -
1979 data_queue->front().queue_index.index + 1u ==
1980 data_queue->size())) {
1981 CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot);
Austin Schuh993ccb52020-12-12 15:59:32 -08001982 // Pull the data out and confirm that the timestamps match as expected.
Austin Schuh58646e22021-08-23 23:51:46 -07001983 //
1984 // TODO(austin): Move if not reliable.
1985 Message result = (*data_queue)[remote_queue_index.index -
1986 data_queue->front().queue_index.index];
Austin Schuh993ccb52020-12-12 15:59:32 -08001987
1988 CHECK_EQ(result.timestamp, monotonic_remote_time)
1989 << ": Queue index matches, but timestamp doesn't. Please investigate!";
Austin Schuh6a7358f2021-11-18 22:40:40 -08001990 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
Austin Schuh993ccb52020-12-12 15:59:32 -08001991 << ": Queue index matches, but timestamp doesn't. Please investigate!";
1992 // Now drop the data off the front. We have deduplicated timestamps, so we
1993 // are done. And all the data is in order.
Austin Schuh58646e22021-08-23 23:51:46 -07001994 data_queue->erase(
1995 data_queue->begin(),
1996 data_queue->begin() +
1997 (remote_queue_index.index - data_queue->front().queue_index.index));
Austin Schuh993ccb52020-12-12 15:59:32 -08001998 return result;
1999 } else {
Austin Schuh58646e22021-08-23 23:51:46 -07002000 // TODO(austin): Binary search.
2001 auto it = std::find_if(
2002 data_queue->begin(), data_queue->end(),
2003 [remote_queue_index,
2004 remote_boot = monotonic_remote_time.boot](const Message &m) {
2005 return m.queue_index == remote_queue_index &&
2006 m.timestamp.boot == remote_boot;
2007 });
Austin Schuh993ccb52020-12-12 15:59:32 -08002008 if (it == data_queue->end()) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002009 return Message{.channel_index = message.channel_index,
2010 .queue_index = remote_queue_index,
2011 .timestamp = monotonic_remote_time,
2012 .monotonic_remote_boot = 0xffffff,
2013 .monotonic_timestamp_boot = 0xffffff,
2014 .data = nullptr};
Austin Schuh993ccb52020-12-12 15:59:32 -08002015 }
2016
2017 Message result = std::move(*it);
2018
2019 CHECK_EQ(result.timestamp, monotonic_remote_time)
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002020 << ": Queue index matches, but timestamp doesn't. Please "
2021 "investigate!";
2022 CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
2023 << ": Queue index matches, but timestamp doesn't. Please "
2024 "investigate!";
Austin Schuh993ccb52020-12-12 15:59:32 -08002025
Austin Schuhd6b1f4c2021-11-18 20:29:00 -08002026 // Erase everything up to this message. We want to keep 1 message in the
2027 // queue so we can handle reliable messages forwarded across boots.
2028 data_queue->erase(data_queue->begin(), it);
Austin Schuh993ccb52020-12-12 15:59:32 -08002029
2030 return result;
2031 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002032}
2033
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002034void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
Austin Schuhd2f96102020-12-01 20:27:29 -08002035 if (queued_until_ > t) {
2036 return;
2037 }
2038 while (true) {
2039 if (!messages_.empty() && messages_.back().timestamp > t) {
2040 queued_until_ = std::max(queued_until_, messages_.back().timestamp);
2041 return;
2042 }
2043
2044 if (!Queue()) {
2045 // Found nothing to add, we are out of data!
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002046 queued_until_ = BootTimestamp::max_time();
Austin Schuhd2f96102020-12-01 20:27:29 -08002047 return;
2048 }
2049
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07002050 // Now that it has been added (and cannibalized), forget about it
2051 // upstream.
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002052 boot_merger_.PopFront();
Austin Schuhd2f96102020-12-01 20:27:29 -08002053 }
2054}
2055
2056bool TimestampMapper::Queue() {
Austin Schuh2dc8c7d2021-07-01 17:41:28 -07002057 Message *m = boot_merger_.Front();
Austin Schuhd2f96102020-12-01 20:27:29 -08002058 if (m == nullptr) {
2059 return false;
2060 }
2061 for (NodeData &node_data : nodes_data_) {
2062 if (!node_data.any_delivered) continue;
Austin Schuh36c00932021-07-19 18:13:21 -07002063 if (!node_data.save_for_peer) continue;
Austin Schuhd2f96102020-12-01 20:27:29 -08002064 if (node_data.channels[m->channel_index].delivered) {
Austin Schuh6a7358f2021-11-18 22:40:40 -08002065 // If we have data but no timestamps (logs where the timestamps didn't get
2066 // logged are classic), we can grow this indefinitely. We don't need to
2067 // keep anything that is older than the last message returned.
2068
2069 // We have the time on the source node.
2070 // We care to wait until we have the time on the destination node.
2071 std::deque<Message> &messages =
2072 node_data.channels[m->channel_index].messages;
2073 // Max delay over the network is the TTL, so let's take the queue time and
2074 // add TTL to it. Don't forget any messages which are reliable until
2075 // someone can come up with a good reason to forget those too.
2076 if (node_data.channels[m->channel_index].time_to_live >
2077 chrono::nanoseconds(0)) {
2078 // We need to make *some* assumptions about network delay for this to
2079 // work. We want to only look at the RX side. This means we need to
2080 // track the last time a message was popped from any channel from the
2081 // node sending this message, and compare that to the max time we expect
2082 // that a message will take to be delivered across the network. This
2083 // assumes that messages are popped in time order as a proxy for
2084 // measuring the distributed time at this layer.
2085 //
2086 // Leave at least 1 message in here so we can handle reboots and
2087 // messages getting sent twice.
2088 while (messages.size() > 1u &&
2089 messages.begin()->timestamp +
2090 node_data.channels[m->channel_index].time_to_live +
2091 chrono::duration_cast<chrono::nanoseconds>(
2092 chrono::duration<double>(FLAGS_max_network_delay)) <
2093 last_popped_message_time_) {
2094 messages.pop_front();
2095 }
2096 }
Austin Schuhd2f96102020-12-01 20:27:29 -08002097 node_data.channels[m->channel_index].messages.emplace_back(*m);
2098 }
2099 }
2100
2101 messages_.emplace_back(std::move(*m));
2102 return true;
2103}
2104
2105std::string TimestampMapper::DebugString() const {
2106 std::stringstream ss;
Austin Schuh6e014b82021-09-14 17:46:33 -07002107 ss << "node " << node() << " (" << node_name() << ") [\n";
Austin Schuhd2f96102020-12-01 20:27:29 -08002108 for (const Message &message : messages_) {
2109 ss << " " << message << "\n";
2110 }
2111 ss << "] queued_until " << queued_until_;
2112 for (const NodeData &ns : nodes_data_) {
2113 if (ns.peer == nullptr) continue;
2114 ss << "\nnode " << ns.peer->node() << " remote_data [\n";
2115 size_t channel_index = 0;
2116 for (const NodeData::ChannelData &channel_data :
2117 ns.peer->nodes_data_[node()].channels) {
2118 if (channel_data.messages.empty()) {
2119 continue;
2120 }
Austin Schuhb000de62020-12-03 22:00:40 -08002121
Austin Schuhd2f96102020-12-01 20:27:29 -08002122 ss << " channel " << channel_index << " [\n";
2123 for (const Message &m : channel_data.messages) {
2124 ss << " " << m << "\n";
2125 }
2126 ss << " ]\n";
2127 ++channel_index;
2128 }
2129 ss << "] queued_until " << ns.peer->queued_until_;
2130 }
2131 return ss.str();
2132}
2133
Austin Schuhee711052020-08-24 16:06:09 -07002134std::string MaybeNodeName(const Node *node) {
2135 if (node != nullptr) {
2136 return node->name()->str() + " ";
2137 }
2138 return "";
2139}
2140
Brian Silvermanf51499a2020-09-21 12:49:08 -07002141} // namespace aos::logger