| #include "aos/events/logging/logfile_utils.h" |
| |
| #include <fcntl.h> |
| #include <sys/stat.h> |
| #include <sys/types.h> |
| #include <sys/uio.h> |
| |
| #include <algorithm> |
| #include <climits> |
| #include <filesystem> |
| |
| #include "absl/strings/escaping.h" |
| #include "flatbuffers/flatbuffers.h" |
| #include "gflags/gflags.h" |
| #include "glog/logging.h" |
| |
| #include "aos/configuration.h" |
| #include "aos/events/logging/snappy_encoder.h" |
| #include "aos/flatbuffer_merge.h" |
| #include "aos/util/file.h" |
| |
| #if defined(__x86_64__) |
| #define ENABLE_LZMA (!__has_feature(memory_sanitizer)) |
| #elif defined(__aarch64__) |
| #define ENABLE_LZMA (!__has_feature(memory_sanitizer)) |
| #else |
| #define ENABLE_LZMA 0 |
| #endif |
| |
| #if ENABLE_LZMA |
| #include "aos/events/logging/lzma_encoder.h" |
| #endif |
| #if ENABLE_S3 |
| #include "aos/events/logging/s3_fetcher.h" |
| #endif |
| |
| DEFINE_int32(flush_size, 128 * 1024, |
| "Number of outstanding bytes to allow before flushing to disk."); |
| DEFINE_double( |
| flush_period, 5.0, |
| "Max time to let data sit in the queue before flushing in seconds."); |
| |
| DEFINE_double( |
| max_network_delay, 1.0, |
| "Max time to assume a message takes to cross the network before we are " |
| "willing to drop it from our buffers and assume it didn't make it. " |
| "Increasing this number can increase memory usage depending on the packet " |
| "loss of your network or if the timestamps aren't logged for a message."); |
| |
| DEFINE_double( |
| max_out_of_order, -1, |
| "If set, this overrides the max out of order duration for a log file."); |
| |
| DEFINE_bool(workaround_double_headers, true, |
| "Some old log files have two headers at the beginning. Use the " |
| "last header as the actual header."); |
| |
| DEFINE_bool(crash_on_corrupt_message, true, |
| "When true, MessageReader will crash the first time a message " |
| "with corrupted format is found. When false, the crash will be " |
| "suppressed, and any remaining readable messages will be " |
| "evaluated to present verified vs corrupted stats."); |
| |
| DEFINE_bool(ignore_corrupt_messages, false, |
| "When true, and crash_on_corrupt_message is false, then any " |
| "corrupt message found by MessageReader be silently ignored, " |
| "providing access to all uncorrupted messages in a logfile."); |
| |
| DECLARE_bool(quiet_sorting); |
| |
| namespace aos::logger { |
| namespace { |
| namespace chrono = std::chrono; |
| |
| std::unique_ptr<DataDecoder> ResolveDecoder(std::string_view filename, |
| bool quiet) { |
| static constexpr std::string_view kS3 = "s3:"; |
| |
| std::unique_ptr<DataDecoder> decoder; |
| |
| if (filename.substr(0, kS3.size()) == kS3) { |
| #if ENABLE_S3 |
| decoder = std::make_unique<S3Fetcher>(filename); |
| #else |
| LOG(FATAL) << "Reading files from S3 not supported on this platform"; |
| #endif |
| } else { |
| decoder = std::make_unique<DummyDecoder>(filename); |
| } |
| |
| static constexpr std::string_view kXz = ".xz"; |
| static constexpr std::string_view kSnappy = SnappyDecoder::kExtension; |
| if (filename.substr(filename.size() - kXz.size()) == kXz) { |
| #if ENABLE_LZMA |
| decoder = std::make_unique<ThreadedLzmaDecoder>(std::move(decoder), quiet); |
| #else |
| (void)quiet; |
| LOG(FATAL) << "Reading xz-compressed files not supported on this platform"; |
| #endif |
| } else if (filename.substr(filename.size() - kSnappy.size()) == kSnappy) { |
| decoder = std::make_unique<SnappyDecoder>(std::move(decoder)); |
| } |
| return decoder; |
| } |
| |
| template <typename T> |
| void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) { |
| if (t.has_value()) { |
| *os << *t; |
| } else { |
| *os << "null"; |
| } |
| } |
| |
| // A dummy LogSink implementation that handles the special case when we create |
| // a DetachedBufferWriter when there's no space left on the system. The |
| // DetachedBufferWriter frequently dereferences log_sink_, so we want a class |
| // here that effectively refuses to do anything meaningful. |
| class OutOfDiskSpaceLogSink : public LogSink { |
| public: |
| WriteCode OpenForWrite() override { return WriteCode::kOutOfSpace; } |
| WriteCode Close() override { return WriteCode::kOk; } |
| bool is_open() const override { return false; } |
| WriteResult Write( |
| const absl::Span<const absl::Span<const uint8_t>> &) override { |
| return WriteResult{ |
| .code = WriteCode::kOutOfSpace, |
| .messages_written = 0, |
| }; |
| } |
| std::string_view name() const override { return "<out_of_disk_space>"; } |
| }; |
| |
| } // namespace |
| |
| DetachedBufferWriter::DetachedBufferWriter(std::unique_ptr<LogSink> log_sink, |
| std::unique_ptr<DataEncoder> encoder) |
| : log_sink_(std::move(log_sink)), encoder_(std::move(encoder)) { |
| CHECK(log_sink_); |
| ran_out_of_space_ = log_sink_->OpenForWrite() == WriteCode::kOutOfSpace; |
| if (ran_out_of_space_) { |
| LOG(WARNING) << "And we are out of space"; |
| } |
| } |
| |
| DetachedBufferWriter::DetachedBufferWriter(already_out_of_space_t) |
| : DetachedBufferWriter(std::make_unique<OutOfDiskSpaceLogSink>(), nullptr) { |
| } |
| |
| DetachedBufferWriter::~DetachedBufferWriter() { |
| Close(); |
| if (ran_out_of_space_) { |
| CHECK(acknowledge_ran_out_of_space_) |
| << ": Unacknowledged out of disk space, log file was not completed"; |
| } |
| } |
| |
| DetachedBufferWriter::DetachedBufferWriter(DetachedBufferWriter &&other) { |
| *this = std::move(other); |
| } |
| |
| // When other is destroyed "soon" (which it should be because we're getting an |
| // rvalue reference to it), it will flush etc all the data we have queued up |
| // (because that data will then be its data). |
| DetachedBufferWriter &DetachedBufferWriter::operator=( |
| DetachedBufferWriter &&other) { |
| std::swap(log_sink_, other.log_sink_); |
| std::swap(encoder_, other.encoder_); |
| std::swap(ran_out_of_space_, other.ran_out_of_space_); |
| std::swap(acknowledge_ran_out_of_space_, other.acknowledge_ran_out_of_space_); |
| std::swap(last_flush_time_, other.last_flush_time_); |
| return *this; |
| } |
| |
| std::chrono::nanoseconds DetachedBufferWriter::CopyMessage( |
| DataEncoder::Copier *copier, aos::monotonic_clock::time_point now) { |
| if (ran_out_of_space_) { |
| // We don't want any later data to be written after space becomes |
| // available, so refuse to write anything more once we've dropped data |
| // because we ran out of space. |
| return std::chrono::nanoseconds::zero(); |
| } |
| |
| const size_t message_size = copier->size(); |
| size_t overall_bytes_written = 0; |
| |
| std::chrono::nanoseconds total_encode_duration = |
| std::chrono::nanoseconds::zero(); |
| |
| // Keep writing chunks until we've written it all. If we end up with a |
| // partial write, this means we need to flush to disk. |
| do { |
| // Initialize encode_duration for the case that the encoder cannot measure |
| // encode duration for a single message. |
| std::chrono::nanoseconds encode_duration = std::chrono::nanoseconds::zero(); |
| const size_t bytes_written = |
| encoder_->Encode(copier, overall_bytes_written, &encode_duration); |
| |
| CHECK(bytes_written != 0); |
| |
| overall_bytes_written += bytes_written; |
| if (overall_bytes_written < message_size) { |
| VLOG(1) << "Flushing because of a partial write, tried to write " |
| << message_size << " wrote " << overall_bytes_written; |
| Flush(now); |
| } |
| total_encode_duration += encode_duration; |
| } while (overall_bytes_written < message_size); |
| |
| WriteStatistics()->UpdateEncodeDuration(total_encode_duration); |
| |
| FlushAtThreshold(now); |
| return total_encode_duration; |
| } |
| |
| void DetachedBufferWriter::Close() { |
| if (!log_sink_->is_open()) { |
| return; |
| } |
| // Initialize encode_duration for the case that the encoder cannot measure |
| // encode duration for a single message. |
| std::chrono::nanoseconds encode_duration = std::chrono::nanoseconds::zero(); |
| encoder_->Finish(&encode_duration); |
| WriteStats().UpdateEncodeDuration(encode_duration); |
| while (encoder_->queue_size() > 0) { |
| Flush(monotonic_clock::max_time); |
| } |
| encoder_.reset(); |
| ran_out_of_space_ = log_sink_->Close() == WriteCode::kOutOfSpace; |
| } |
| |
| void DetachedBufferWriter::Flush(aos::monotonic_clock::time_point now) { |
| last_flush_time_ = now; |
| if (ran_out_of_space_) { |
| // We don't want any later data to be written after space becomes available, |
| // so refuse to write anything more once we've dropped data because we ran |
| // out of space. |
| if (encoder_) { |
| VLOG(1) << "Ignoring queue: " << encoder_->queue().size(); |
| encoder_->Clear(encoder_->queue().size()); |
| } else { |
| VLOG(1) << "No queue to ignore"; |
| } |
| return; |
| } |
| |
| const auto queue = encoder_->queue(); |
| if (queue.empty()) { |
| return; |
| } |
| |
| const WriteResult result = log_sink_->Write(queue); |
| encoder_->Clear(result.messages_written); |
| ran_out_of_space_ = result.code == WriteCode::kOutOfSpace; |
| } |
| |
| void DetachedBufferWriter::FlushAtThreshold( |
| aos::monotonic_clock::time_point now) { |
| if (ran_out_of_space_) { |
| // We don't want any later data to be written after space becomes available, |
| // so refuse to write anything more once we've dropped data because we ran |
| // out of space. |
| if (encoder_) { |
| VLOG(1) << "Ignoring queue: " << encoder_->queue().size(); |
| encoder_->Clear(encoder_->queue().size()); |
| } else { |
| VLOG(1) << "No queue to ignore"; |
| } |
| return; |
| } |
| |
| // We don't want to flush the first time through. Otherwise we will flush as |
| // the log file header might be compressing, defeating any parallelism and |
| // queueing there. |
| if (last_flush_time_ == aos::monotonic_clock::min_time) { |
| last_flush_time_ = now; |
| } |
| |
| // Flush if we are at the max number of iovs per writev, because there's no |
| // point queueing up any more data in memory. Also flush once we have enough |
| // data queued up or if it has been long enough. |
| while (encoder_->space() == 0 || |
| encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) || |
| encoder_->queue_size() >= IOV_MAX || |
| (now > last_flush_time_ + |
| chrono::duration_cast<chrono::nanoseconds>( |
| chrono::duration<double>(FLAGS_flush_period)) && |
| encoder_->queued_bytes() != 0)) { |
| VLOG(1) << "Chose to flush at " << now << ", last " << last_flush_time_ |
| << " queued bytes " << encoder_->queued_bytes(); |
| Flush(now); |
| } |
| } |
| |
| // Do the magic dance to convert the endianness of the data and append it to the |
| // buffer. |
| namespace { |
| |
| // TODO(austin): Look at the generated code to see if building the header is |
| // efficient or not. |
| template <typename T> |
| uint8_t *Push(uint8_t *buffer, const T data) { |
| const T endian_data = flatbuffers::EndianScalar<T>(data); |
| std::memcpy(buffer, &endian_data, sizeof(T)); |
| return buffer + sizeof(T); |
| } |
| |
| uint8_t *PushBytes(uint8_t *buffer, const void *data, size_t size) { |
| std::memcpy(buffer, data, size); |
| return buffer + size; |
| } |
| |
| uint8_t *Pad(uint8_t *buffer, size_t padding) { |
| std::memset(buffer, 0, padding); |
| return buffer + padding; |
| } |
| } // namespace |
| |
| flatbuffers::Offset<MessageHeader> PackRemoteMessage( |
| flatbuffers::FlatBufferBuilder *fbb, |
| const message_bridge::RemoteMessage *msg, int channel_index, |
| const aos::monotonic_clock::time_point monotonic_timestamp_time) { |
| logger::MessageHeader::Builder message_header_builder(*fbb); |
| // Note: this must match the same order as MessageBridgeServer and |
| // PackMessage. We want identical headers to have identical |
| // on-the-wire formats to make comparing them easier. |
| |
| message_header_builder.add_channel_index(channel_index); |
| |
| message_header_builder.add_queue_index(msg->queue_index()); |
| message_header_builder.add_monotonic_sent_time(msg->monotonic_sent_time()); |
| message_header_builder.add_realtime_sent_time(msg->realtime_sent_time()); |
| |
| message_header_builder.add_monotonic_timestamp_time( |
| monotonic_timestamp_time.time_since_epoch().count()); |
| |
| message_header_builder.add_monotonic_remote_transmit_time( |
| msg->monotonic_remote_transmit_time()); |
| |
| message_header_builder.add_monotonic_remote_time( |
| msg->monotonic_remote_time()); |
| message_header_builder.add_realtime_remote_time(msg->realtime_remote_time()); |
| message_header_builder.add_remote_queue_index(msg->remote_queue_index()); |
| |
| return message_header_builder.Finish(); |
| } |
| |
| size_t PackRemoteMessageInline( |
| uint8_t *buffer, const message_bridge::RemoteMessage *msg, |
| int channel_index, |
| const aos::monotonic_clock::time_point monotonic_timestamp_time, |
| size_t start_byte, size_t end_byte) { |
| const flatbuffers::uoffset_t message_size = PackRemoteMessageSize(); |
| DCHECK_EQ((start_byte % 8u), 0u); |
| DCHECK_EQ((end_byte % 8u), 0u); |
| DCHECK_LE(start_byte, end_byte); |
| DCHECK_LE(end_byte, message_size); |
| |
| switch (start_byte) { |
| case 0x00u: |
| if ((end_byte) == 0x00u) { |
| break; |
| } |
| // clang-format off |
| // header: |
| // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix |
| |
| buffer = Push<flatbuffers::uoffset_t>( |
| buffer, message_size - sizeof(flatbuffers::uoffset_t)); |
| // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader` |
| buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1C); |
| [[fallthrough]]; |
| case 0x08u: |
| if ((end_byte) == 0x08u) { |
| break; |
| } |
| // |
| // vtable (aos.logger.MessageHeader): |
| // +0x08 | 18 00 | uint16_t | 0x0018 (24) | size of this vtable |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x18); |
| // +0x0A | 40 00 | uint16_t | 0x0040 (64) | size of referring table |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x40); |
| // +0x0C | 3C 00 | VOffset16 | 0x003C (60) | offset to field `channel_index` (id: 0) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x3c); |
| // +0x0E | 30 00 | VOffset16 | 0x0030 (48) | offset to field `monotonic_sent_time` (id: 1) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x30); |
| [[fallthrough]]; |
| case 0x10u: |
| if ((end_byte) == 0x10u) { |
| break; |
| } |
| // +0x10 | 28 00 | VOffset16 | 0x0028 (40) | offset to field `realtime_sent_time` (id: 2) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x28); |
| // +0x12 | 38 00 | VOffset16 | 0x0038 (56) | offset to field `queue_index` (id: 3) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x38); |
| // +0x14 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x00); |
| // +0x16 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `monotonic_remote_time` (id: 5) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x10); |
| [[fallthrough]]; |
| case 0x18u: |
| if ((end_byte) == 0x18u) { |
| break; |
| } |
| // +0x18 | 08 00 | VOffset16 | 0x0008 (8) | offset to field `realtime_remote_time` (id: 6) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x08); |
| // +0x1A | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x04); |
| // +0x1C | 20 00 | VOffset16 | 0x0020 (32) | offset to field `monotonic_timestamp_time` (id: 8) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x20); |
| // +0x1E | 18 00 | VOffset16 | 0x0018 (24) | offset to field `monotonic_remote_transmit_time` (id: 9) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x18); |
| [[fallthrough]]; |
| case 0x20u: |
| if ((end_byte) == 0x20u) { |
| break; |
| } |
| |
| |
| // root_table (aos.logger.MessageHeader): |
| // +0x20 | 18 00 00 00 | SOffset32 | 0x00000018 (24) Loc: +0x08 | offset to vtable |
| buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18); |
| // +0x24 | 8B 00 00 00 | uint32_t | 0x0000008B (139) | table field `remote_queue_index` (UInt) |
| buffer = Push<uint32_t>(buffer, msg->remote_queue_index()); |
| [[fallthrough]]; |
| case 0x28u: |
| if ((end_byte) == 0x28u) { |
| break; |
| } |
| // +0x28 | D4 C9 48 86 92 8B 6A AF | int64_t | 0xAF6A8B928648C9D4 (-5806675308106429996) | table field `realtime_remote_time` (Long) |
| buffer = Push<int64_t>(buffer, msg->realtime_remote_time()); |
| [[fallthrough]]; |
| case 0x30u: |
| if ((end_byte) == 0x30u) { |
| break; |
| } |
| // +0x30 | 65 B1 32 50 FE 54 50 6B | int64_t | 0x6B5054FE5032B165 (7732774011439067493) | table field `monotonic_remote_time` (Long) |
| buffer = Push<int64_t>(buffer, msg->monotonic_remote_time()); |
| [[fallthrough]]; |
| case 0x38u: |
| if ((end_byte) == 0x38u) { |
| break; |
| } |
| // +0x38 | EA 4D CC E0 FC 20 86 71 | int64_t | 0x718620FCE0CC4DEA (8180262043640417770) | table field `monotonic_remote_transmit_time` (Long) |
| buffer = Push<int64_t>(buffer, msg->monotonic_remote_transmit_time()); |
| [[fallthrough]]; |
| case 0x40u: |
| if ((end_byte) == 0x40u) { |
| break; |
| } |
| // +0x40 | 8E 59 CF 88 9D DF 02 07 | int64_t | 0x0702DF9D88CF598E (505211975917066638) | table field `monotonic_timestamp_time` (Long) |
| buffer = Push<int64_t>(buffer, |
| monotonic_timestamp_time.time_since_epoch().count()); |
| [[fallthrough]]; |
| case 0x48u: |
| if ((end_byte) == 0x48u) { |
| break; |
| } |
| // +0x48 | 14 D5 A7 D8 B2 E4 EF 89 | int64_t | 0x89EFE4B2D8A7D514 (-8507329714289388268) | table field `realtime_sent_time` (Long) |
| buffer = Push<int64_t>(buffer, msg->realtime_sent_time()); |
| [[fallthrough]]; |
| case 0x50u: |
| if ((end_byte) == 0x50u) { |
| break; |
| } |
| // +0x50 | 19 7D 7F EF 86 8D 92 65 | int64_t | 0x65928D86EF7F7D19 (7319067955113721113) | table field `monotonic_sent_time` (Long) |
| buffer = Push<int64_t>(buffer, msg->monotonic_sent_time()); |
| [[fallthrough]]; |
| case 0x58u: |
| if ((end_byte) == 0x58u) { |
| break; |
| } |
| // +0x58 | FC 00 00 00 | uint32_t | 0x000000FC (252) | table field `queue_index` (UInt) |
| buffer = Push<uint32_t>(buffer, msg->queue_index()); |
| // +0x5C | 9C 00 00 00 | uint32_t | 0x0000009C (156) | table field `channel_index` (UInt) |
| buffer = Push<uint32_t>(buffer, channel_index); |
| // clang-format on |
| [[fallthrough]]; |
| case 0x60u: |
| if ((end_byte) == 0x60u) { |
| break; |
| } |
| } |
| |
| return end_byte - start_byte; |
| } |
| |
| flatbuffers::Offset<MessageHeader> PackMessage( |
| flatbuffers::FlatBufferBuilder *fbb, const Context &context, |
| int channel_index, LogType log_type) { |
| flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset; |
| |
| switch (log_type) { |
| case LogType::kLogMessage: |
| case LogType::kLogRemoteMessage: |
| // Since the timestamps are 8 byte aligned, we are going to end up adding |
| // padding in the middle of the message to pad everything out to 8 byte |
| // alignment. That's rather wasteful. To make things efficient to mmap |
| // while reading uncompressed logs, we'd actually rather the message be |
| // aligned. So, force 8 byte alignment (enough to preserve alignment |
| // inside the nested message so that we can read it without moving it) |
| // here. |
| fbb->ForceVectorAlignment(context.size, sizeof(uint8_t), 8); |
| data_offset = fbb->CreateVector( |
| static_cast<const uint8_t *>(context.data), context.size); |
| break; |
| |
| case LogType::kLogDeliveryTimeOnly: |
| break; |
| } |
| |
| MessageHeader::Builder message_header_builder(*fbb); |
| message_header_builder.add_channel_index(channel_index); |
| |
| // These are split out into very explicit serialization calls because the |
| // order here changes the order things are written out on the wire, and we |
| // want to control and understand it here. Changing the order can increase |
| // the amount of padding bytes in the middle. |
| // |
| // It is also easier to follow... And doesn't actually make things much |
| // bigger. |
| switch (log_type) { |
| case LogType::kLogRemoteMessage: |
| message_header_builder.add_queue_index(context.remote_queue_index); |
| message_header_builder.add_data(data_offset); |
| message_header_builder.add_monotonic_sent_time( |
| context.monotonic_remote_time.time_since_epoch().count()); |
| message_header_builder.add_realtime_sent_time( |
| context.realtime_remote_time.time_since_epoch().count()); |
| break; |
| |
| case LogType::kLogDeliveryTimeOnly: |
| message_header_builder.add_queue_index(context.queue_index); |
| message_header_builder.add_monotonic_sent_time( |
| context.monotonic_event_time.time_since_epoch().count()); |
| message_header_builder.add_realtime_sent_time( |
| context.realtime_event_time.time_since_epoch().count()); |
| message_header_builder.add_monotonic_remote_time( |
| context.monotonic_remote_time.time_since_epoch().count()); |
| message_header_builder.add_realtime_remote_time( |
| context.realtime_remote_time.time_since_epoch().count()); |
| message_header_builder.add_monotonic_remote_transmit_time( |
| context.monotonic_remote_transmit_time.time_since_epoch().count()); |
| message_header_builder.add_remote_queue_index(context.remote_queue_index); |
| break; |
| |
| case LogType::kLogMessage: |
| message_header_builder.add_queue_index(context.queue_index); |
| message_header_builder.add_data(data_offset); |
| message_header_builder.add_monotonic_sent_time( |
| context.monotonic_event_time.time_since_epoch().count()); |
| message_header_builder.add_realtime_sent_time( |
| context.realtime_event_time.time_since_epoch().count()); |
| break; |
| } |
| |
| return message_header_builder.Finish(); |
| } |
| |
| flatbuffers::uoffset_t PackMessageHeaderSize(LogType log_type) { |
| switch (log_type) { |
| case LogType::kLogMessage: |
| return |
| // Root table size + offset. |
| sizeof(flatbuffers::uoffset_t) * 2 + |
| // 6 padding bytes to pad the header out properly. |
| 6 + |
| // vtable header (size + size of table) |
| sizeof(flatbuffers::voffset_t) * 2 + |
| // offsets to all the fields. |
| sizeof(flatbuffers::voffset_t) * 5 + |
| // pointer to vtable |
| sizeof(flatbuffers::soffset_t) + |
| // pointer to data |
| sizeof(flatbuffers::uoffset_t) + |
| // realtime_sent_time, monotonic_sent_time |
| sizeof(int64_t) * 2 + |
| // queue_index, channel_index |
| sizeof(uint32_t) * 2; |
| |
| case LogType::kLogDeliveryTimeOnly: |
| return |
| // Root table size + offset. |
| sizeof(flatbuffers::uoffset_t) * 2 + |
| // vtable header (size + size of table) |
| sizeof(flatbuffers::voffset_t) * 2 + |
| // offsets to all the fields. |
| sizeof(flatbuffers::voffset_t) * 10 + |
| // pointer to vtable |
| sizeof(flatbuffers::soffset_t) + |
| // remote_queue_index |
| sizeof(uint32_t) + |
| // realtime_remote_time, monotonic_remote_time, realtime_sent_time, |
| // monotonic_sent_time |
| sizeof(int64_t) * 5 + |
| // queue_index, channel_index |
| sizeof(uint32_t) * 2; |
| |
| case LogType::kLogRemoteMessage: |
| return |
| // Root table size + offset. |
| sizeof(flatbuffers::uoffset_t) * 2 + |
| // 6 padding bytes to pad the header out properly. |
| 6 + |
| // vtable header (size + size of table) |
| sizeof(flatbuffers::voffset_t) * 2 + |
| // offsets to all the fields. |
| sizeof(flatbuffers::voffset_t) * 5 + |
| // pointer to vtable |
| sizeof(flatbuffers::soffset_t) + |
| // realtime_sent_time, monotonic_sent_time |
| sizeof(int64_t) * 2 + |
| // pointer to data |
| sizeof(flatbuffers::uoffset_t) + |
| // queue_index, channel_index |
| sizeof(uint32_t) * 2; |
| } |
| LOG(FATAL); |
| } |
| |
| flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size) { |
| static_assert(sizeof(flatbuffers::uoffset_t) == 4u, |
| "Update size logic please."); |
| const flatbuffers::uoffset_t aligned_data_length = |
| ((data_size + 7) & 0xfffffff8u); |
| switch (log_type) { |
| case LogType::kLogDeliveryTimeOnly: |
| return PackMessageHeaderSize(log_type); |
| |
| case LogType::kLogMessage: |
| case LogType::kLogRemoteMessage: |
| return PackMessageHeaderSize(log_type) + |
| // Vector... |
| sizeof(flatbuffers::uoffset_t) + aligned_data_length; |
| } |
| LOG(FATAL); |
| } |
| |
| size_t PackMessageInline(uint8_t *buffer, const Context &context, |
| int channel_index, LogType log_type, size_t start_byte, |
| size_t end_byte) { |
| // TODO(austin): Figure out how to copy directly from shared memory instead of |
| // first into the fetcher's memory and then into here. That would save a lot |
| // of memory. |
| const flatbuffers::uoffset_t message_size = |
| PackMessageSize(log_type, context.size); |
| DCHECK_EQ((message_size % 8), 0u) << ": Non 8 byte length..."; |
| DCHECK_EQ((start_byte % 8u), 0u); |
| DCHECK_EQ((end_byte % 8u), 0u); |
| DCHECK_LE(start_byte, end_byte); |
| DCHECK_LE(end_byte, message_size); |
| |
| // Pack all the data in. This is brittle but easy to change. Use the |
| // InlinePackMessage.Equivilent unit test to verify everything matches. |
| switch (log_type) { |
| case LogType::kLogMessage: |
| switch (start_byte) { |
| case 0x00u: |
| if ((end_byte) == 0x00u) { |
| break; |
| } |
| // clang-format off |
| // header: |
| // +0x00 | 4C 00 00 00 | UOffset32 | 0x0000004C (76) Loc: +0x4C | size prefix |
| buffer = Push<flatbuffers::uoffset_t>( |
| buffer, message_size - sizeof(flatbuffers::uoffset_t)); |
| |
| // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader` |
| buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18); |
| [[fallthrough]]; |
| case 0x08u: |
| if ((end_byte) == 0x08u) { |
| break; |
| } |
| // |
| // padding: |
| // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding |
| buffer = Pad(buffer, 6); |
| // |
| // vtable (aos.logger.MessageHeader): |
| // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0xe); |
| [[fallthrough]]; |
| case 0x10u: |
| if ((end_byte) == 0x10u) { |
| break; |
| } |
| // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x20); |
| // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c); |
| // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c); |
| // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x04); |
| [[fallthrough]]; |
| case 0x18u: |
| if ((end_byte) == 0x18u) { |
| break; |
| } |
| // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x18); |
| // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x14); |
| // |
| // root_table (aos.logger.MessageHeader): |
| // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable |
| buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0e); |
| [[fallthrough]]; |
| case 0x20u: |
| if ((end_byte) == 0x20u) { |
| break; |
| } |
| // +0x20 | B2 E4 EF 89 19 7D 7F 6F | int64_t | 0x6F7F7D1989EFE4B2 (8034277808894108850) | table field `realtime_sent_time` (Long) |
| buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count()); |
| [[fallthrough]]; |
| case 0x28u: |
| if ((end_byte) == 0x28u) { |
| break; |
| } |
| // +0x28 | 86 8D 92 65 FC 79 74 2B | int64_t | 0x2B7479FC65928D86 (3131261765872160134) | table field `monotonic_sent_time` (Long) |
| buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count()); |
| [[fallthrough]]; |
| case 0x30u: |
| if ((end_byte) == 0x30u) { |
| break; |
| } |
| // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector) |
| buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0c); |
| // +0x34 | 86 00 00 00 | uint32_t | 0x00000086 (134) | table field `queue_index` (UInt) |
| buffer = Push<uint32_t>(buffer, context.queue_index); |
| [[fallthrough]]; |
| case 0x38u: |
| if ((end_byte) == 0x38u) { |
| break; |
| } |
| // +0x38 | 71 00 00 00 | uint32_t | 0x00000071 (113) | table field `channel_index` (UInt) |
| buffer = Push<uint32_t>(buffer, channel_index); |
| // |
| // vector (aos.logger.MessageHeader.data): |
| // +0x3C | 0E 00 00 00 | uint32_t | 0x0000000E (14) | length of vector (# items) |
| buffer = Push<flatbuffers::uoffset_t>(buffer, context.size); |
| [[fallthrough]]; |
| case 0x40u: |
| if ((end_byte) == 0x40u) { |
| break; |
| } |
| [[fallthrough]]; |
| default: |
| // +0x40 | FF | uint8_t | 0xFF (255) | value[0] |
| // +0x41 | B8 | uint8_t | 0xB8 (184) | value[1] |
| // +0x42 | EE | uint8_t | 0xEE (238) | value[2] |
| // +0x43 | 00 | uint8_t | 0x00 (0) | value[3] |
| // +0x44 | 20 | uint8_t | 0x20 (32) | value[4] |
| // +0x45 | 4D | uint8_t | 0x4D (77) | value[5] |
| // +0x46 | FF | uint8_t | 0xFF (255) | value[6] |
| // +0x47 | 25 | uint8_t | 0x25 (37) | value[7] |
| // +0x48 | 3C | uint8_t | 0x3C (60) | value[8] |
| // +0x49 | 17 | uint8_t | 0x17 (23) | value[9] |
| // +0x4A | 65 | uint8_t | 0x65 (101) | value[10] |
| // +0x4B | 2F | uint8_t | 0x2F (47) | value[11] |
| // +0x4C | 63 | uint8_t | 0x63 (99) | value[12] |
| // +0x4D | 58 | uint8_t | 0x58 (88) | value[13] |
| // |
| // padding: |
| // +0x4E | 00 00 | uint8_t[2] | .. | padding |
| // clang-format on |
| if (start_byte <= 0x40 && end_byte == message_size) { |
| // The easy one, slap it all down. |
| buffer = PushBytes(buffer, context.data, context.size); |
| buffer = |
| Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size); |
| } else { |
| const size_t data_start_byte = |
| start_byte < 0x40 ? 0x0u : (start_byte - 0x40); |
| const size_t data_end_byte = end_byte - 0x40; |
| const size_t padded_size = ((context.size + 7) & 0xfffffff8u); |
| if (data_start_byte < padded_size) { |
| buffer = PushBytes( |
| buffer, |
| reinterpret_cast<const uint8_t *>(context.data) + |
| data_start_byte, |
| std::min(context.size, data_end_byte) - data_start_byte); |
| if (data_end_byte == padded_size) { |
| // We can only pad the last 7 bytes, so this only gets written |
| // if we write the last byte. |
| buffer = Pad(buffer, |
| ((context.size + 7) & 0xfffffff8u) - context.size); |
| } |
| } |
| } |
| break; |
| } |
| break; |
| |
| case LogType::kLogDeliveryTimeOnly: |
| switch (start_byte) { |
| case 0x00u: |
| if ((end_byte) == 0x00u) { |
| break; |
| } |
| // clang-format off |
| // header: |
| // +0x00 | 54 00 00 00 | UOffset32 | 0x00000054 (84) Loc: +0x54 | size prefix |
| buffer = Push<flatbuffers::uoffset_t>( |
| buffer, message_size - sizeof(flatbuffers::uoffset_t)); |
| // +0x04 | 1C 00 00 00 | UOffset32 | 0x0000001C (28) Loc: +0x20 | offset to root table `aos.logger.MessageHeader` |
| buffer = Push<flatbuffers::uoffset_t>(buffer, 0x1c); |
| |
| [[fallthrough]]; |
| case 0x08u: |
| if ((end_byte) == 0x08u) { |
| break; |
| } |
| // |
| // vtable (aos.logger.MessageHeader): |
| // +0x08 | 18 00 | uint16_t | 0x0018 (24) | size of this vtable |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x18); |
| // +0x0A | 38 00 | uint16_t | 0x0038 (56) | size of referring table |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x38); |
| // +0x0C | 34 00 | VOffset16 | 0x0034 (52) | offset to field `channel_index` (id: 0) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x34); |
| // +0x0E | 28 00 | VOffset16 | 0x0028 (40) | offset to field `monotonic_sent_time` (id: 1) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x28); |
| [[fallthrough]]; |
| case 0x10u: |
| if ((end_byte) == 0x10u) { |
| break; |
| } |
| // +0x10 | 20 00 | VOffset16 | 0x0020 (32) | offset to field `realtime_sent_time` (id: 2) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x20); |
| // +0x12 | 30 00 | VOffset16 | 0x0030 (48) | offset to field `queue_index` (id: 3) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x30); |
| // +0x14 | 00 00 | VOffset16 | 0x0000 (0) | offset to field `data` (id: 4) <null> (Vector) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x00); |
| // +0x16 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `monotonic_remote_time` (id: 5) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x18); |
| [[fallthrough]]; |
| case 0x18u: |
| if ((end_byte) == 0x18u) { |
| break; |
| } |
| // +0x18 | 10 00 | VOffset16 | 0x0010 (16) | offset to field `realtime_remote_time` (id: 6) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x10); |
| // +0x1A | 04 00 | VOffset16 | 0x0004 (4) | offset to field `remote_queue_index` (id: 7) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x04); |
| // +0x1C | 00 00 | VOffset16 | 0x0000 (0) | offset to field `monotonic_timestamp_time` (id: 8) <defaults to -9223372036854775808> (Long) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x00); |
| // +0x1E | 08 00 | VOffset16 | 0x0008 (8) | offset to field `monotonic_remote_transmit_time` (id: 9) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x08); |
| [[fallthrough]]; |
| case 0x20u: |
| if ((end_byte) == 0x20u) { |
| break; |
| } |
| // root_table (aos.logger.MessageHeader): |
| // +0x20 | 18 00 00 00 | SOffset32 | 0x00000018 (24) Loc: +0x08 | offset to vtable |
| buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18); |
| // +0x24 | 3F 9A 69 37 | uint32_t | 0x37699A3F (929667647) | table field `remote_queue_index` (UInt) |
| buffer = Push<uint32_t>(buffer, context.remote_queue_index); |
| [[fallthrough]]; |
| case 0x28u: |
| if ((end_byte) == 0x28u) { |
| break; |
| } |
| // +0x28 | 00 00 00 00 00 00 00 80 | int64_t | 0x8000000000000000 (-9223372036854775808) | table field `monotonic_remote_transmit_time` (Long) |
| buffer = Push<int64_t>(buffer, context.monotonic_remote_transmit_time.time_since_epoch().count()); |
| [[fallthrough]]; |
| case 0x30u: |
| if ((end_byte) == 0x30u) { |
| break; |
| } |
| // +0x30 | 1D CE 4A 38 54 33 C9 F8 | int64_t | 0xF8C93354384ACE1D (-519827845169885667) | table field `realtime_remote_time` (Long) |
| buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count()); |
| [[fallthrough]]; |
| case 0x38u: |
| if ((end_byte) == 0x38u) { |
| break; |
| } |
| // +0x38 | FE EA DF 1D C7 3F C6 03 | int64_t | 0x03C63FC71DDFEAFE (271974951934749438) | table field `monotonic_remote_time` (Long) |
| buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count()); |
| [[fallthrough]]; |
| case 0x40u: |
| if ((end_byte) == 0x40u) { |
| break; |
| } |
| // +0x40 | 4E 0C 96 6E FB B5 CE 12 | int64_t | 0x12CEB5FB6E960C4E (1355220629381844046) | table field `realtime_sent_time` (Long) |
| buffer = Push<int64_t>(buffer, context.realtime_event_time.time_since_epoch().count()); |
| [[fallthrough]]; |
| case 0x48u: |
| if ((end_byte) == 0x48u) { |
| break; |
| } |
| // +0x48 | 51 56 56 F9 0A 0B 0F 12 | int64_t | 0x120F0B0AF9565651 (1301270959094126161) | table field `monotonic_sent_time` (Long) |
| buffer = Push<int64_t>(buffer, context.monotonic_event_time.time_since_epoch().count()); |
| [[fallthrough]]; |
| case 0x50u: |
| if ((end_byte) == 0x50u) { |
| break; |
| } |
| // +0x50 | 0C A5 42 18 | uint32_t | 0x1842A50C (407020812) | table field `queue_index` (UInt) |
| buffer = Push<uint32_t>(buffer, context.queue_index); |
| // +0x54 | 87 10 7C D7 | uint32_t | 0xD77C1087 (3615232135) | table field `channel_index` (UInt) |
| buffer = Push<uint32_t>(buffer, channel_index); |
| |
| // clang-format on |
| } |
| break; |
| |
| case LogType::kLogRemoteMessage: |
| switch (start_byte) { |
| case 0x00u: |
| if ((end_byte) == 0x00u) { |
| break; |
| } |
| // This is the message we need to recreate. |
| // |
| // clang-format off |
| // header: |
| // +0x00 | 5C 00 00 00 | UOffset32 | 0x0000005C (92) Loc: +0x5C | size prefix |
| buffer = Push<flatbuffers::uoffset_t>( |
| buffer, message_size - sizeof(flatbuffers::uoffset_t)); |
| // +0x04 | 18 00 00 00 | UOffset32 | 0x00000018 (24) Loc: +0x1C | offset to root table `aos.logger.MessageHeader` |
| buffer = Push<flatbuffers::uoffset_t>(buffer, 0x18); |
| [[fallthrough]]; |
| case 0x08u: |
| if ((end_byte) == 0x08u) { |
| break; |
| } |
| // |
| // padding: |
| // +0x08 | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding |
| buffer = Pad(buffer, 6); |
| // |
| // vtable (aos.logger.MessageHeader): |
| // +0x0E | 0E 00 | uint16_t | 0x000E (14) | size of this vtable |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x0e); |
| [[fallthrough]]; |
| case 0x10u: |
| if ((end_byte) == 0x10u) { |
| break; |
| } |
| // +0x10 | 20 00 | uint16_t | 0x0020 (32) | size of referring table |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x20); |
| // +0x12 | 1C 00 | VOffset16 | 0x001C (28) | offset to field `channel_index` (id: 0) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x1c); |
| // +0x14 | 0C 00 | VOffset16 | 0x000C (12) | offset to field `monotonic_sent_time` (id: 1) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x0c); |
| // +0x16 | 04 00 | VOffset16 | 0x0004 (4) | offset to field `realtime_sent_time` (id: 2) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x04); |
| [[fallthrough]]; |
| case 0x18u: |
| if ((end_byte) == 0x18u) { |
| break; |
| } |
| // +0x18 | 18 00 | VOffset16 | 0x0018 (24) | offset to field `queue_index` (id: 3) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x18); |
| // +0x1A | 14 00 | VOffset16 | 0x0014 (20) | offset to field `data` (id: 4) |
| buffer = Push<flatbuffers::voffset_t>(buffer, 0x14); |
| // |
| // root_table (aos.logger.MessageHeader): |
| // +0x1C | 0E 00 00 00 | SOffset32 | 0x0000000E (14) Loc: +0x0E | offset to vtable |
| buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0E); |
| [[fallthrough]]; |
| case 0x20u: |
| if ((end_byte) == 0x20u) { |
| break; |
| } |
| // +0x20 | D8 96 32 1A A0 D3 23 BB | int64_t | 0xBB23D3A01A3296D8 (-4961889679844403496) | table field `realtime_sent_time` (Long) |
| buffer = Push<int64_t>(buffer, context.realtime_remote_time.time_since_epoch().count()); |
| [[fallthrough]]; |
| case 0x28u: |
| if ((end_byte) == 0x28u) { |
| break; |
| } |
| // +0x28 | 2E 5D 23 B3 BE 84 CF C2 | int64_t | 0xC2CF84BEB3235D2E (-4409159555588334290) | table field `monotonic_sent_time` (Long) |
| buffer = Push<int64_t>(buffer, context.monotonic_remote_time.time_since_epoch().count()); |
| [[fallthrough]]; |
| case 0x30u: |
| if ((end_byte) == 0x30u) { |
| break; |
| } |
| // +0x30 | 0C 00 00 00 | UOffset32 | 0x0000000C (12) Loc: +0x3C | offset to field `data` (vector) |
| buffer = Push<flatbuffers::uoffset_t>(buffer, 0x0C); |
| // +0x34 | 69 00 00 00 | uint32_t | 0x00000069 (105) | table field `queue_index` (UInt) |
| buffer = Push<uint32_t>(buffer, context.remote_queue_index); |
| [[fallthrough]]; |
| case 0x38u: |
| if ((end_byte) == 0x38u) { |
| break; |
| } |
| // +0x38 | F3 00 00 00 | uint32_t | 0x000000F3 (243) | table field `channel_index` (UInt) |
| buffer = Push<uint32_t>(buffer, channel_index); |
| // |
| // vector (aos.logger.MessageHeader.data): |
| // +0x3C | 1A 00 00 00 | uint32_t | 0x0000001A (26) | length of vector (# items) |
| buffer = Push<flatbuffers::uoffset_t>(buffer, context.size); |
| [[fallthrough]]; |
| case 0x40u: |
| if ((end_byte) == 0x40u) { |
| break; |
| } |
| [[fallthrough]]; |
| default: |
| // +0x40 | 38 | uint8_t | 0x38 (56) | value[0] |
| // +0x41 | 1A | uint8_t | 0x1A (26) | value[1] |
| // ... |
| // +0x58 | 90 | uint8_t | 0x90 (144) | value[24] |
| // +0x59 | 92 | uint8_t | 0x92 (146) | value[25] |
| // |
| // padding: |
| // +0x5A | 00 00 00 00 00 00 | uint8_t[6] | ...... | padding |
| // clang-format on |
| if (start_byte <= 0x40 && end_byte == message_size) { |
| // The easy one, slap it all down. |
| buffer = PushBytes(buffer, context.data, context.size); |
| buffer = |
| Pad(buffer, ((context.size + 7) & 0xfffffff8u) - context.size); |
| } else { |
| const size_t data_start_byte = |
| start_byte < 0x40 ? 0x0u : (start_byte - 0x40); |
| const size_t data_end_byte = end_byte - 0x40; |
| const size_t padded_size = ((context.size + 7) & 0xfffffff8u); |
| if (data_start_byte < padded_size) { |
| buffer = PushBytes( |
| buffer, |
| reinterpret_cast<const uint8_t *>(context.data) + |
| data_start_byte, |
| std::min(context.size, data_end_byte) - data_start_byte); |
| if (data_end_byte == padded_size) { |
| // We can only pad the last 7 bytes, so this only gets written |
| // if we write the last byte. |
| buffer = Pad(buffer, |
| ((context.size + 7) & 0xfffffff8u) - context.size); |
| } |
| } |
| } |
| break; |
| } |
| } |
| |
| return end_byte - start_byte; |
| } |
| |
| SpanReader::SpanReader(std::string_view filename, bool quiet) |
| : SpanReader(filename, ResolveDecoder(filename, quiet)) {} |
| |
| SpanReader::SpanReader(std::string_view filename, |
| std::unique_ptr<DataDecoder> decoder) |
| : filename_(filename), decoder_(std::move(decoder)) {} |
| |
| absl::Span<const uint8_t> SpanReader::PeekMessage() { |
| // Make sure we have enough for the size. |
| if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) { |
| if (!ReadBlock()) { |
| return absl::Span<const uint8_t>(); |
| } |
| } |
| |
| // Now make sure we have enough for the message. |
| const size_t data_size = |
| flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) + |
| sizeof(flatbuffers::uoffset_t); |
| if (data_size == sizeof(flatbuffers::uoffset_t)) { |
| LOG(ERROR) << "Size of data is zero. Log file end is corrupted, skipping."; |
| LOG(ERROR) << " Rest of log file is " |
| << absl::BytesToHexString(std::string_view( |
| reinterpret_cast<const char *>(data_.data() + |
| consumed_data_), |
| data_.size() - consumed_data_)); |
| return absl::Span<const uint8_t>(); |
| } |
| while (data_.size() < consumed_data_ + data_size) { |
| if (!ReadBlock()) { |
| return absl::Span<const uint8_t>(); |
| } |
| } |
| |
| // And return it, consuming the data. |
| const uint8_t *data_ptr = data_.data() + consumed_data_; |
| |
| return absl::Span<const uint8_t>(data_ptr, data_size); |
| } |
| |
| void SpanReader::ConsumeMessage() { |
| size_t consumed_size = |
| flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) + |
| sizeof(flatbuffers::uoffset_t); |
| consumed_data_ += consumed_size; |
| total_consumed_ += consumed_size; |
| } |
| |
| absl::Span<const uint8_t> SpanReader::ReadMessage() { |
| absl::Span<const uint8_t> result = PeekMessage(); |
| if (!result.empty()) { |
| ConsumeMessage(); |
| } else { |
| is_finished_ = true; |
| } |
| return result; |
| } |
| |
| bool SpanReader::ReadBlock() { |
| // This is the amount of data we grab at a time. Doing larger chunks minimizes |
| // syscalls and helps decompressors batch things more efficiently. |
| constexpr size_t kReadSize = 256 * 1024; |
| |
| // Strip off any unused data at the front. |
| if (consumed_data_ != 0) { |
| data_.erase_front(consumed_data_); |
| consumed_data_ = 0; |
| } |
| |
| const size_t starting_size = data_.size(); |
| |
| // This should automatically grow the backing store. It won't shrink if we |
| // get a small chunk later. This reduces allocations when we want to append |
| // more data. |
| data_.resize(starting_size + kReadSize); |
| |
| const size_t count = |
| decoder_->Read(data_.begin() + starting_size, data_.end()); |
| data_.resize(starting_size + count); |
| if (count == 0) { |
| return false; |
| } |
| |
| total_read_ += count; |
| |
| return true; |
| } |
| |
| LogReadersPool::LogReadersPool(const LogSource *log_source, size_t pool_size) |
| : log_source_(log_source), pool_size_(pool_size) {} |
| |
| SpanReader *LogReadersPool::BorrowReader(std::string_view id) { |
| if (part_readers_.size() > pool_size_) { |
| // Don't leave arbitrary numbers of readers open, because they each take |
| // resources, so close a big batch at once periodically. |
| part_readers_.clear(); |
| } |
| if (log_source_ == nullptr) { |
| part_readers_.emplace_back(id, FLAGS_quiet_sorting); |
| } else { |
| part_readers_.emplace_back(id, log_source_->GetDecoder(id)); |
| } |
| return &part_readers_.back(); |
| } |
| |
| std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader( |
| SpanReader *span_reader) { |
| absl::Span<const uint8_t> config_data = span_reader->ReadMessage(); |
| |
| // Make sure something was read. |
| if (config_data.empty()) { |
| return std::nullopt; |
| } |
| |
| // And copy the config so we have it forever, removing the size prefix. |
| SizePrefixedFlatbufferVector<LogFileHeader> result(config_data); |
| if (!result.Verify()) { |
| return std::nullopt; |
| } |
| |
| // We only know of busted headers in the versions of the log file header |
| // *before* the logger_sha1 field was added. At some point before that point, |
| // the logic to track when a header has been written was rewritten in such a |
| // way that it can't happen anymore. We've seen some logs where the body |
| // parses as a header recently, so the simple solution of always looking is |
| // failing us. |
| if (FLAGS_workaround_double_headers && !result.message().has_logger_sha1()) { |
| while (true) { |
| absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage(); |
| if (maybe_header_data.empty()) { |
| break; |
| } |
| |
| aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header( |
| maybe_header_data); |
| if (maybe_header.Verify()) { |
| LOG(WARNING) << "Found duplicate LogFileHeader in " |
| << span_reader->filename(); |
| ResizeableBuffer header_data_copy; |
| header_data_copy.resize(maybe_header_data.size()); |
| memcpy(header_data_copy.data(), maybe_header_data.begin(), |
| header_data_copy.size()); |
| result = SizePrefixedFlatbufferVector<LogFileHeader>( |
| std::move(header_data_copy)); |
| |
| span_reader->ConsumeMessage(); |
| } else { |
| break; |
| } |
| } |
| } |
| return result; |
| } |
| |
| std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader( |
| std::string_view filename) { |
| SpanReader span_reader(filename); |
| return ReadHeader(&span_reader); |
| } |
| |
| std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage( |
| std::string_view filename, size_t n) { |
| SpanReader span_reader(filename); |
| absl::Span<const uint8_t> data_span = span_reader.ReadMessage(); |
| for (size_t i = 0; i < n + 1; ++i) { |
| data_span = span_reader.ReadMessage(); |
| |
| // Make sure something was read. |
| if (data_span.empty()) { |
| return std::nullopt; |
| } |
| } |
| |
| // And copy the config so we have it forever, removing the size prefix. |
| SizePrefixedFlatbufferVector<MessageHeader> result(data_span); |
| if (!result.Verify()) { |
| return std::nullopt; |
| } |
| return result; |
| } |
| |
| MessageReader::MessageReader(SpanReader span_reader) |
| : span_reader_(std::move(span_reader)), |
| raw_log_file_header_( |
| SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) { |
| set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message); |
| set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages); |
| |
| std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> |
| raw_log_file_header = ReadHeader(&span_reader_); |
| |
| // Make sure something was read. |
| CHECK(raw_log_file_header) |
| << ": Failed to read header from: " << span_reader_.filename(); |
| |
| raw_log_file_header_ = std::move(*raw_log_file_header); |
| |
| CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted"; |
| |
| total_verified_before_ = span_reader_.TotalConsumed(); |
| |
| max_out_of_order_duration_ = |
| FLAGS_max_out_of_order > 0 |
| ? chrono::duration_cast<chrono::nanoseconds>( |
| chrono::duration<double>(FLAGS_max_out_of_order)) |
| : chrono::nanoseconds(log_file_header()->max_out_of_order_duration()); |
| |
| VLOG(1) << "Opened " << span_reader_.filename() << " as node " |
| << FlatbufferToJson(log_file_header()->node()); |
| } |
| |
| std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() { |
| absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage(); |
| if (msg_data.empty()) { |
| if (is_corrupted()) { |
| LOG(ERROR) << "Total corrupted volumes: before = " |
| << total_verified_before_ |
| << " | corrupted = " << total_corrupted_ |
| << " | during = " << total_verified_during_ |
| << " | after = " << total_verified_after_ << std::endl; |
| } |
| |
| if (span_reader_.IsIncomplete()) { |
| LOG(ERROR) << "Unable to access some messages in " << filename() << " : " |
| << span_reader_.TotalRead() << " bytes read, " |
| << span_reader_.TotalConsumed() << " bytes usable." |
| << std::endl; |
| } |
| return nullptr; |
| } |
| |
| SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data); |
| |
| if (crash_on_corrupt_message_flag_) { |
| CHECK(msg.Verify()) << "Corrupted message at offset " |
| << total_verified_before_ << " found within " |
| << filename() |
| << "; set --nocrash_on_corrupt_message to see summary;" |
| << " also set --ignore_corrupt_messages to process" |
| << " anyway"; |
| |
| } else if (!msg.Verify()) { |
| LOG(ERROR) << "Corrupted message at offset " << total_verified_before_ |
| << " from " << filename() << std::endl; |
| |
| total_corrupted_ += msg_data.size(); |
| |
| while (true) { |
| absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage(); |
| |
| if (msg_data.empty()) { |
| if (!ignore_corrupt_messages_flag_) { |
| LOG(ERROR) << "Total corrupted volumes: before = " |
| << total_verified_before_ |
| << " | corrupted = " << total_corrupted_ |
| << " | during = " << total_verified_during_ |
| << " | after = " << total_verified_after_ << std::endl; |
| |
| if (span_reader_.IsIncomplete()) { |
| LOG(ERROR) << "Unable to access some messages in " << filename() |
| << " : " << span_reader_.TotalRead() << " bytes read, " |
| << span_reader_.TotalConsumed() << " bytes usable." |
| << std::endl; |
| } |
| return nullptr; |
| } |
| break; |
| } |
| |
| SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data); |
| |
| if (!next_msg.Verify()) { |
| total_corrupted_ += msg_data.size(); |
| total_verified_during_ += total_verified_after_; |
| total_verified_after_ = 0; |
| |
| } else { |
| total_verified_after_ += msg_data.size(); |
| if (ignore_corrupt_messages_flag_) { |
| msg = next_msg; |
| break; |
| } |
| } |
| } |
| } |
| |
| if (is_corrupted()) { |
| total_verified_after_ += msg_data.size(); |
| } else { |
| total_verified_before_ += msg_data.size(); |
| } |
| |
| auto result = UnpackedMessageHeader::MakeMessage(msg.message()); |
| |
| const monotonic_clock::time_point timestamp = result->monotonic_sent_time; |
| |
| newest_timestamp_ = std::max(newest_timestamp_, timestamp); |
| |
| if (VLOG_IS_ON(3)) { |
| VLOG(3) << "Read from " << filename() << " data " << FlatbufferToJson(msg); |
| } else if (VLOG_IS_ON(2)) { |
| SizePrefixedFlatbufferVector<MessageHeader> msg_copy = msg; |
| msg_copy.mutable_message()->clear_data(); |
| VLOG(2) << "Read from " << filename() << " data " |
| << FlatbufferToJson(msg_copy); |
| } |
| |
| return result; |
| } |
| |
| std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage( |
| const MessageHeader &message) { |
| const size_t data_size = message.has_data() ? message.data()->size() : 0; |
| |
| UnpackedMessageHeader *const unpacked_message = |
| reinterpret_cast<UnpackedMessageHeader *>( |
| malloc(sizeof(UnpackedMessageHeader) + data_size + |
| kChannelDataAlignment - 1)); |
| |
| CHECK(message.has_channel_index()); |
| CHECK(message.has_monotonic_sent_time()); |
| |
| absl::Span<uint8_t> span; |
| if (data_size > 0) { |
| span = |
| absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData( |
| &unpacked_message->actual_data[0], data_size)), |
| data_size); |
| } |
| |
| std::optional<aos::monotonic_clock::time_point> monotonic_remote_time; |
| if (message.has_monotonic_remote_time()) { |
| monotonic_remote_time = aos::monotonic_clock::time_point( |
| std::chrono::nanoseconds(message.monotonic_remote_time())); |
| } |
| std::optional<realtime_clock::time_point> realtime_remote_time; |
| if (message.has_realtime_remote_time()) { |
| realtime_remote_time = realtime_clock::time_point( |
| chrono::nanoseconds(message.realtime_remote_time())); |
| } |
| aos::monotonic_clock::time_point monotonic_remote_transmit_time = |
| aos::monotonic_clock::time_point( |
| std::chrono::nanoseconds(message.monotonic_remote_transmit_time())); |
| |
| std::optional<uint32_t> remote_queue_index; |
| if (message.has_remote_queue_index()) { |
| remote_queue_index = message.remote_queue_index(); |
| } |
| |
| new (unpacked_message) UnpackedMessageHeader( |
| message.channel_index(), |
| monotonic_clock::time_point( |
| chrono::nanoseconds(message.monotonic_sent_time())), |
| realtime_clock::time_point( |
| chrono::nanoseconds(message.realtime_sent_time())), |
| message.queue_index(), monotonic_remote_time, realtime_remote_time, |
| monotonic_remote_transmit_time, remote_queue_index, |
| monotonic_clock::time_point( |
| std::chrono::nanoseconds(message.monotonic_timestamp_time())), |
| message.has_monotonic_timestamp_time(), span); |
| |
| if (data_size > 0) { |
| memcpy(span.data(), message.data()->data(), data_size); |
| } |
| |
| return std::shared_ptr<UnpackedMessageHeader>(unpacked_message, |
| &DestroyAndFree); |
| } |
| |
| SpanReader PartsMessageReader::MakeSpanReader( |
| const LogPartsAccess &log_parts_access, size_t part_number) { |
| const auto part = log_parts_access.GetPartAt(part_number); |
| if (log_parts_access.log_source().has_value()) { |
| return SpanReader(part, |
| log_parts_access.log_source().value()->GetDecoder(part)); |
| } else { |
| return SpanReader(part); |
| } |
| } |
| |
| PartsMessageReader::PartsMessageReader(LogPartsAccess log_parts_access) |
| : log_parts_access_(std::move(log_parts_access)), |
| message_reader_(MakeSpanReader(log_parts_access_, 0)), |
| max_out_of_order_duration_( |
| log_parts_access_.max_out_of_order_duration()) { |
| if (log_parts_access_.size() >= 2) { |
| next_message_reader_.emplace(MakeSpanReader(log_parts_access_, 1)); |
| } |
| ComputeBootCounts(); |
| } |
| |
| void PartsMessageReader::ComputeBootCounts() { |
| boot_counts_.assign( |
| configuration::NodesCount(log_parts_access_.config().get()), |
| std::nullopt); |
| |
| const auto boots = log_parts_access_.parts().boots; |
| |
| // We have 3 vintages of log files with different amounts of information. |
| if (log_file_header()->has_boot_uuids()) { |
| // The new hotness with the boots explicitly listed out. We can use the log |
| // file header to compute the boot count of all relevant nodes. |
| CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size()); |
| size_t node_index = 0; |
| for (const flatbuffers::String *boot_uuid : |
| *log_file_header()->boot_uuids()) { |
| CHECK(boots); |
| if (boot_uuid->size() != 0) { |
| auto it = boots->boot_count_map.find(boot_uuid->str()); |
| if (it != boots->boot_count_map.end()) { |
| boot_counts_[node_index] = it->second; |
| } |
| } else if (parts().boots->boots[node_index].size() == 1u) { |
| boot_counts_[node_index] = 0; |
| } |
| ++node_index; |
| } |
| } else { |
| // Older multi-node logs which are guarenteed to have UUIDs logged, or |
| // single node log files with boot UUIDs in the header. We only know how to |
| // order certain boots in certain circumstances. |
| if (configuration::MultiNode(log_parts_access_.config().get()) || boots) { |
| for (size_t node_index = 0; node_index < boot_counts_.size(); |
| ++node_index) { |
| if (boots->boots[node_index].size() == 1u) { |
| boot_counts_[node_index] = 0; |
| } |
| } |
| } else { |
| // Really old single node logs without any UUIDs. They can't reboot. |
| CHECK_EQ(boot_counts_.size(), 1u); |
| boot_counts_[0] = 0u; |
| } |
| } |
| } |
| |
| std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() { |
| while (!done_) { |
| std::shared_ptr<UnpackedMessageHeader> message = |
| message_reader_.ReadMessage(); |
| if (message) { |
| newest_timestamp_ = message_reader_.newest_timestamp(); |
| const monotonic_clock::time_point monotonic_sent_time = |
| message->monotonic_sent_time; |
| |
| // TODO(austin): Does this work with startup? Might need to use the |
| // start time. |
| // TODO(austin): Does this work with startup when we don't know the |
| // remote start time too? Look at one of those logs to compare. |
| if (monotonic_sent_time > log_parts_access_.parts().monotonic_start_time + |
| max_out_of_order_duration()) { |
| after_start_ = true; |
| } |
| if (after_start_) { |
| CHECK_GE(monotonic_sent_time, |
| newest_timestamp_ - max_out_of_order_duration()) |
| << ": Max out of order of " << max_out_of_order_duration().count() |
| << "ns exceeded. " << log_parts_access_.parts() |
| << ", start time is " |
| << log_parts_access_.parts().monotonic_start_time |
| << " currently reading " << filename(); |
| } |
| return message; |
| } |
| NextLog(); |
| } |
| newest_timestamp_ = monotonic_clock::max_time; |
| return nullptr; |
| } |
| |
| void PartsMessageReader::NextLog() { |
| if (next_part_index_ == log_parts_access_.size()) { |
| CHECK(!next_message_reader_); |
| done_ = true; |
| return; |
| } |
| CHECK(next_message_reader_); |
| message_reader_ = std::move(*next_message_reader_); |
| ComputeBootCounts(); |
| if (next_part_index_ + 1 < log_parts_access_.size()) { |
| next_message_reader_.emplace( |
| MakeSpanReader(log_parts_access_, next_part_index_ + 1)); |
| } else { |
| next_message_reader_.reset(); |
| } |
| ++next_part_index_; |
| } |
| |
| bool Message::operator<(const Message &m2) const { |
| if (this->timestamp < m2.timestamp) { |
| return true; |
| } else if (this->timestamp > m2.timestamp) { |
| return false; |
| } |
| |
| if (this->channel_index < m2.channel_index) { |
| return true; |
| } else if (this->channel_index > m2.channel_index) { |
| return false; |
| } |
| |
| return this->queue_index < m2.queue_index; |
| } |
| |
| bool Message::operator>=(const Message &m2) const { return !(*this < m2); } |
| bool Message::operator==(const Message &m2) const { |
| return timestamp == m2.timestamp && channel_index == m2.channel_index && |
| queue_index == m2.queue_index; |
| } |
| |
| bool Message::operator<=(const Message &m2) const { |
| return *this == m2 || *this < m2; |
| } |
| |
| std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &msg) { |
| os << "{.channel_index=" << msg.channel_index |
| << ", .monotonic_sent_time=" << msg.monotonic_sent_time |
| << ", .realtime_sent_time=" << msg.realtime_sent_time |
| << ", .queue_index=" << msg.queue_index; |
| if (msg.monotonic_remote_time) { |
| os << ", .monotonic_remote_time=" << *msg.monotonic_remote_time; |
| } |
| os << ", .realtime_remote_time="; |
| PrintOptionalOrNull(&os, msg.realtime_remote_time); |
| os << ", .remote_queue_index="; |
| PrintOptionalOrNull(&os, msg.remote_queue_index); |
| if (msg.has_monotonic_timestamp_time) { |
| os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time; |
| } |
| os << "}"; |
| return os; |
| } |
| |
| std::ostream &operator<<(std::ostream &os, const Message &msg) { |
| os << "{.channel_index=" << msg.channel_index |
| << ", .queue_index=" << msg.queue_index |
| << ", .timestamp=" << msg.timestamp; |
| if (msg.data != nullptr) { |
| if (msg.data->remote_queue_index.has_value()) { |
| os << ", .remote_queue_index=" << *msg.data->remote_queue_index; |
| } |
| if (msg.data->monotonic_remote_time.has_value()) { |
| os << ", .monotonic_remote_time=" << *msg.data->monotonic_remote_time; |
| } |
| os << ", .data=" << msg.data; |
| } |
| os << "}"; |
| return os; |
| } |
| |
| std::ostream &operator<<(std::ostream &os, const TimestampedMessage &msg) { |
| os << "{.channel_index=" << msg.channel_index |
| << ", .queue_index=" << msg.queue_index |
| << ", .monotonic_event_time=" << msg.monotonic_event_time |
| << ", .realtime_event_time=" << msg.realtime_event_time; |
| if (msg.remote_queue_index != BootQueueIndex::Invalid()) { |
| os << ", .remote_queue_index=" << msg.remote_queue_index; |
| } |
| if (msg.monotonic_remote_time != BootTimestamp::min_time()) { |
| os << ", .monotonic_remote_time=" << msg.monotonic_remote_time; |
| } |
| if (msg.realtime_remote_time != realtime_clock::min_time) { |
| os << ", .realtime_remote_time=" << msg.realtime_remote_time; |
| } |
| if (msg.monotonic_remote_transmit_time != BootTimestamp::min_time()) { |
| os << ", .monotonic_remote_transmit_time=" |
| << msg.monotonic_remote_transmit_time; |
| } |
| if (msg.monotonic_timestamp_time != BootTimestamp::min_time()) { |
| os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time; |
| } |
| if (msg.data != nullptr) { |
| os << ", .data=" << *msg.data; |
| } else { |
| os << ", .data=nullptr"; |
| } |
| os << "}"; |
| return os; |
| } |
| |
| MessageSorter::MessageSorter(const LogPartsAccess log_parts_access) |
| : parts_message_reader_(log_parts_access), |
| source_node_index_(configuration::SourceNodeIndex(parts().config.get())) { |
| } |
| |
| const Message *MessageSorter::Front() { |
| // Queue up data until enough data has been queued that the front message is |
| // sorted enough to be safe to pop. This may do nothing, so we should make |
| // sure the nothing path is checked quickly. |
| if (sorted_until() != monotonic_clock::max_time) { |
| while (true) { |
| if (!messages_.empty() && |
| messages_.begin()->timestamp.time < sorted_until() && |
| sorted_until() >= monotonic_start_time()) { |
| break; |
| } |
| |
| std::shared_ptr<UnpackedMessageHeader> msg = |
| parts_message_reader_.ReadMessage(); |
| // No data left, sorted forever, work through what is left. |
| if (!msg) { |
| sorted_until_ = monotonic_clock::max_time; |
| break; |
| } |
| |
| size_t monotonic_timestamp_boot = 0; |
| if (msg->has_monotonic_timestamp_time) { |
| monotonic_timestamp_boot = parts().logger_boot_count; |
| } |
| size_t monotonic_remote_boot = 0xffffff; |
| |
| if (msg->monotonic_remote_time.has_value()) { |
| CHECK_LT(msg->channel_index, source_node_index_.size()); |
| const Node *node = parts().config->nodes()->Get( |
| source_node_index_[msg->channel_index]); |
| |
| std::optional<size_t> boot = parts_message_reader_.boot_count( |
| source_node_index_[msg->channel_index]); |
| CHECK(boot) << ": Failed to find boot for node '" << MaybeNodeName(node) |
| << "', with index " |
| << source_node_index_[msg->channel_index]; |
| monotonic_remote_boot = *boot; |
| } |
| |
| messages_.insert( |
| Message{.channel_index = msg->channel_index, |
| .queue_index = BootQueueIndex{.boot = parts().boot_count, |
| .index = msg->queue_index}, |
| .timestamp = BootTimestamp{.boot = parts().boot_count, |
| .time = msg->monotonic_sent_time}, |
| .monotonic_remote_boot = monotonic_remote_boot, |
| .monotonic_timestamp_boot = monotonic_timestamp_boot, |
| .data = std::move(msg)}); |
| |
| // Now, update sorted_until_ to match the new message. |
| if (parts_message_reader_.newest_timestamp() > |
| monotonic_clock::min_time + |
| parts_message_reader_.max_out_of_order_duration()) { |
| sorted_until_ = parts_message_reader_.newest_timestamp() - |
| parts_message_reader_.max_out_of_order_duration(); |
| } else { |
| sorted_until_ = monotonic_clock::min_time; |
| } |
| } |
| } |
| |
| // Now that we have enough data queued, return a pointer to the oldest piece |
| // of data if it exists. |
| if (messages_.empty()) { |
| last_message_time_ = monotonic_clock::max_time; |
| return nullptr; |
| } |
| |
| CHECK_GE(messages_.begin()->timestamp.time, last_message_time_) |
| << DebugString() << " reading " << parts_message_reader_.filename(); |
| last_message_time_ = messages_.begin()->timestamp.time; |
| VLOG(1) << this << " Front, sorted until " << sorted_until_ << " for " |
| << (*messages_.begin()) << " on " << parts_message_reader_.filename(); |
| return &(*messages_.begin()); |
| } |
| |
| void MessageSorter::PopFront() { messages_.erase(messages_.begin()); } |
| |
| std::string MessageSorter::DebugString() const { |
| std::stringstream ss; |
| ss << "messages: [\n"; |
| int count = 0; |
| bool no_dots = true; |
| for (const Message &msg : messages_) { |
| if (count < 15 || count > static_cast<int>(messages_.size()) - 15) { |
| ss << msg << "\n"; |
| } else if (no_dots) { |
| ss << "...\n"; |
| no_dots = false; |
| } |
| ++count; |
| } |
| ss << "] <- " << parts_message_reader_.filename(); |
| return ss.str(); |
| } |
| |
| // Class to merge start times cleanly, reusably, and incrementally. |
| class StartTimes { |
| public: |
| void Update(monotonic_clock::time_point new_monotonic_start_time, |
| realtime_clock::time_point new_realtime_start_time) { |
| // We want to capture the earliest meaningful start time here. The start |
| // time defaults to min_time when there's no meaningful value to report, so |
| // let's ignore those. |
| if (new_monotonic_start_time != monotonic_clock::min_time) { |
| bool accept = false; |
| // We want to prioritize start times from the logger node. Really, we |
| // want to prioritize start times with a valid realtime_clock time. So, |
| // if we have a start time without a RT clock, prefer a start time with a |
| // RT clock, even it if is later. |
| if (new_realtime_start_time != realtime_clock::min_time) { |
| // We've got a good one. See if the current start time has a good RT |
| // clock, or if we should use this one instead. |
| if (new_monotonic_start_time < monotonic_start_time_ || |
| monotonic_start_time_ == monotonic_clock::min_time) { |
| accept = true; |
| } else if (realtime_start_time_ == realtime_clock::min_time) { |
| // The previous start time doesn't have a good RT time, so it is very |
| // likely the start time from a remote part file. We just found a |
| // better start time with a real RT time, so switch to that instead. |
| accept = true; |
| } |
| } else if (realtime_start_time_ == realtime_clock::min_time) { |
| // We don't have a RT time, so take the oldest. |
| if (new_monotonic_start_time < monotonic_start_time_ || |
| monotonic_start_time_ == monotonic_clock::min_time) { |
| accept = true; |
| } |
| } |
| |
| if (accept) { |
| monotonic_start_time_ = new_monotonic_start_time; |
| realtime_start_time_ = new_realtime_start_time; |
| } |
| } |
| } |
| |
| monotonic_clock::time_point monotonic_start_time() const { |
| return monotonic_start_time_; |
| } |
| realtime_clock::time_point realtime_start_time() const { |
| return realtime_start_time_; |
| } |
| |
| private: |
| monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::min_time; |
| realtime_clock::time_point realtime_start_time_ = realtime_clock::min_time; |
| }; |
| |
| PartsMerger::PartsMerger(SelectedLogParts &&parts) { |
| node_ = configuration::GetNodeIndex(parts.config().get(), parts.node_name()); |
| |
| for (LogPartsAccess part : parts) { |
| message_sorters_.emplace_back(std::move(part)); |
| } |
| |
| StartTimes start_times; |
| for (const MessageSorter &message_sorter : message_sorters_) { |
| start_times.Update(message_sorter.monotonic_start_time(), |
| message_sorter.realtime_start_time()); |
| } |
| monotonic_start_time_ = start_times.monotonic_start_time(); |
| realtime_start_time_ = start_times.realtime_start_time(); |
| } |
| |
| std::vector<const LogParts *> PartsMerger::Parts() const { |
| std::vector<const LogParts *> p; |
| p.reserve(message_sorters_.size()); |
| for (const MessageSorter &message_sorter : message_sorters_) { |
| p.emplace_back(&message_sorter.parts()); |
| } |
| return p; |
| } |
| |
| const Message *PartsMerger::Front() { |
| // Return the current Front if we have one, otherwise go compute one. |
| if (current_ != nullptr) { |
| const Message *result = current_->Front(); |
| CHECK_GE(result->timestamp.time, last_message_time_); |
| VLOG(1) << this << " PartsMerger::Front for node " << node_name() << " " |
| << *result; |
| return result; |
| } |
| |
| // Otherwise, do a simple search for the oldest message, deduplicating any |
| // duplicates. |
| const Message *oldest = nullptr; |
| sorted_until_ = monotonic_clock::max_time; |
| for (MessageSorter &message_sorter : message_sorters_) { |
| const Message *msg = message_sorter.Front(); |
| if (!msg) { |
| sorted_until_ = std::min(sorted_until_, message_sorter.sorted_until()); |
| continue; |
| } |
| if (oldest == nullptr || *msg < *oldest) { |
| oldest = msg; |
| current_ = &message_sorter; |
| } else if (*msg == *oldest) { |
| // Found a duplicate. If there is a choice, we want the one which has |
| // the timestamp time. |
| if (!msg->data->has_monotonic_timestamp_time) { |
| message_sorter.PopFront(); |
| } else if (!oldest->data->has_monotonic_timestamp_time) { |
| current_->PopFront(); |
| current_ = &message_sorter; |
| oldest = msg; |
| } else { |
| CHECK_EQ(msg->data->monotonic_timestamp_time, |
| oldest->data->monotonic_timestamp_time); |
| message_sorter.PopFront(); |
| } |
| } |
| |
| // PopFront may change this, so compute it down here. |
| sorted_until_ = std::min(sorted_until_, message_sorter.sorted_until()); |
| } |
| |
| if (oldest) { |
| CHECK_GE(oldest->timestamp.time, last_message_time_); |
| last_message_time_ = oldest->timestamp.time; |
| if (monotonic_oldest_time_ > oldest->timestamp.time) { |
| VLOG(1) << this << " Updating oldest to " << oldest->timestamp.time |
| << " for node " << node_name() << " with a start time of " |
| << monotonic_start_time_ << " " << *oldest; |
| } |
| monotonic_oldest_time_ = |
| std::min(monotonic_oldest_time_, oldest->timestamp.time); |
| } else { |
| last_message_time_ = monotonic_clock::max_time; |
| } |
| |
| // Return the oldest message found. This will be nullptr if nothing was |
| // found, indicating there is nothing left. |
| if (oldest) { |
| VLOG(1) << this << " PartsMerger::Front for node " << node_name() << " " |
| << *oldest; |
| } else { |
| VLOG(1) << this << " PartsMerger::Front for node " << node_name(); |
| } |
| return oldest; |
| } |
| |
| void PartsMerger::PopFront() { |
| CHECK(current_ != nullptr) << "Popping before calling Front()"; |
| current_->PopFront(); |
| current_ = nullptr; |
| } |
| |
| BootMerger::BootMerger(std::string_view node_name, |
| const LogFilesContainer &log_files, |
| const std::vector<StoredDataType> &types) |
| : configuration_(log_files.config()), |
| node_(configuration::GetNodeIndex(configuration_.get(), node_name)) { |
| size_t number_of_boots = log_files.BootsForNode(node_name); |
| parts_mergers_.reserve(number_of_boots); |
| for (size_t i = 0; i < number_of_boots; ++i) { |
| VLOG(2) << "Boot " << i; |
| SelectedLogParts selected_parts = |
| log_files.SelectParts(node_name, i, types); |
| // We are guarenteed to have something each boot, but not guarenteed to have |
| // both timestamps and data for each boot. If we don't have anything, don't |
| // create a parts merger. The rest of this class will detect that and |
| // ignore it as required. |
| if (selected_parts.empty()) { |
| parts_mergers_.emplace_back(nullptr); |
| } else { |
| parts_mergers_.emplace_back( |
| std::make_unique<PartsMerger>(std::move(selected_parts))); |
| } |
| } |
| } |
| |
| std::string_view BootMerger::node_name() const { |
| return configuration::NodeName(configuration().get(), node()); |
| } |
| |
| const Message *BootMerger::Front() { |
| if (parts_mergers_[index_].get() != nullptr) { |
| const Message *result = parts_mergers_[index_]->Front(); |
| |
| if (result != nullptr) { |
| VLOG(1) << this << " BootMerger::Front " << node_name() << " " << *result; |
| return result; |
| } |
| } |
| |
| if (index_ + 1u == parts_mergers_.size()) { |
| // At the end of the last node merger, just return. |
| VLOG(1) << this << " BootMerger::Front " << node_name() << " nullptr"; |
| return nullptr; |
| } else { |
| ++index_; |
| const Message *result = Front(); |
| VLOG(1) << this << " BootMerger::Front " << node_name() << " " << *result; |
| return result; |
| } |
| } |
| |
| void BootMerger::PopFront() { parts_mergers_[index_]->PopFront(); } |
| |
| std::vector<const LogParts *> BootMerger::Parts() const { |
| std::vector<const LogParts *> results; |
| for (const std::unique_ptr<PartsMerger> &parts_merger : parts_mergers_) { |
| if (!parts_merger) continue; |
| |
| std::vector<const LogParts *> node_parts = parts_merger->Parts(); |
| |
| results.insert(results.end(), std::make_move_iterator(node_parts.begin()), |
| std::make_move_iterator(node_parts.end())); |
| } |
| |
| return results; |
| } |
| |
| monotonic_clock::time_point BootMerger::monotonic_start_time( |
| size_t boot) const { |
| CHECK_LT(boot, parts_mergers_.size()); |
| if (parts_mergers_[boot]) { |
| return parts_mergers_[boot]->monotonic_start_time(); |
| } |
| return monotonic_clock::min_time; |
| } |
| |
| realtime_clock::time_point BootMerger::realtime_start_time(size_t boot) const { |
| CHECK_LT(boot, parts_mergers_.size()); |
| if (parts_mergers_[boot]) { |
| return parts_mergers_[boot]->realtime_start_time(); |
| } |
| return realtime_clock::min_time; |
| } |
| |
| monotonic_clock::time_point BootMerger::monotonic_oldest_time( |
| size_t boot) const { |
| CHECK_LT(boot, parts_mergers_.size()); |
| if (parts_mergers_[boot]) { |
| return parts_mergers_[boot]->monotonic_oldest_time(); |
| } |
| return monotonic_clock::max_time; |
| } |
| |
| bool BootMerger::started() const { |
| if (index_ == 0) { |
| if (!parts_mergers_[0]) { |
| return false; |
| } |
| return parts_mergers_[index_]->sorted_until() != monotonic_clock::min_time; |
| } |
| return true; |
| } |
| |
| SplitTimestampBootMerger::SplitTimestampBootMerger( |
| std::string_view node_name, const LogFilesContainer &log_files, |
| TimestampQueueStrategy timestamp_queue_strategy) |
| : boot_merger_(node_name, log_files, |
| (timestamp_queue_strategy == |
| TimestampQueueStrategy::kQueueTimestampsAtStartup) |
| ? std::vector<StoredDataType>{StoredDataType::DATA} |
| : std::vector<StoredDataType>{ |
| StoredDataType::DATA, StoredDataType::TIMESTAMPS, |
| StoredDataType::REMOTE_TIMESTAMPS}) { |
| // Make the timestamp_boot_merger_ only if we are asked to, and if there are |
| // files to put in it. We don't need it for a data only log. |
| if (timestamp_queue_strategy == |
| TimestampQueueStrategy::kQueueTimestampsAtStartup && |
| log_files.HasTimestamps(node_name)) { |
| timestamp_boot_merger_ = std::make_unique<BootMerger>( |
| node_name, log_files, |
| std::vector<StoredDataType>{StoredDataType::TIMESTAMPS, |
| StoredDataType::REMOTE_TIMESTAMPS}); |
| } |
| |
| size_t number_of_boots = log_files.BootsForNode(node_name); |
| monotonic_start_time_.reserve(number_of_boots); |
| realtime_start_time_.reserve(number_of_boots); |
| |
| // Start times are split across the timestamp boot merger, and data boot |
| // merger. Pull from both and combine them to get the same answer as before. |
| for (size_t i = 0u; i < number_of_boots; ++i) { |
| StartTimes start_times; |
| |
| if (timestamp_boot_merger_) { |
| start_times.Update(timestamp_boot_merger_->monotonic_start_time(i), |
| timestamp_boot_merger_->realtime_start_time(i)); |
| } |
| |
| start_times.Update(boot_merger_.monotonic_start_time(i), |
| boot_merger_.realtime_start_time(i)); |
| |
| monotonic_start_time_.push_back(start_times.monotonic_start_time()); |
| realtime_start_time_.push_back(start_times.realtime_start_time()); |
| } |
| } |
| |
| void SplitTimestampBootMerger::QueueTimestamps( |
| std::function<void(TimestampedMessage *)> fn, |
| const std::vector<size_t> &source_node) { |
| if (!timestamp_boot_merger_) { |
| return; |
| } |
| |
| while (true) { |
| // Load all the timestamps. If we find data, ignore it and drop it on the |
| // floor. It will be read when boot_merger_ is used. |
| const Message *msg = timestamp_boot_merger_->Front(); |
| if (!msg) { |
| queue_timestamps_ran_ = true; |
| return; |
| } |
| CHECK_LT(msg->channel_index, source_node.size()); |
| if (source_node[msg->channel_index] != static_cast<size_t>(node())) { |
| timestamp_messages_.emplace_back(TimestampedMessage{ |
| .channel_index = msg->channel_index, |
| .queue_index = msg->queue_index, |
| .monotonic_event_time = msg->timestamp, |
| .realtime_event_time = msg->data->realtime_sent_time, |
| .remote_queue_index = |
| BootQueueIndex{.boot = msg->monotonic_remote_boot, |
| .index = msg->data->remote_queue_index.value()}, |
| .monotonic_remote_time = {msg->monotonic_remote_boot, |
| msg->data->monotonic_remote_time.value()}, |
| .realtime_remote_time = msg->data->realtime_remote_time.value(), |
| .monotonic_remote_transmit_time = |
| {msg->monotonic_remote_boot, |
| msg->data->monotonic_remote_transmit_time}, |
| .monotonic_timestamp_time = {msg->monotonic_timestamp_boot, |
| msg->data->monotonic_timestamp_time}, |
| .data = std::move(msg->data)}); |
| |
| VLOG(2) << this << " Queued timestamp of " << timestamp_messages_.back(); |
| fn(×tamp_messages_.back()); |
| } else { |
| VLOG(2) << this << " Dropped data"; |
| } |
| timestamp_boot_merger_->PopFront(); |
| } |
| |
| // TODO(austin): Push the queue into TimestampMapper instead. Have it pull |
| // all the timestamps. That will also make it so we don't have to clear the |
| // function. |
| } |
| |
| std::string_view SplitTimestampBootMerger::node_name() const { |
| return configuration::NodeName(configuration().get(), node()); |
| } |
| |
| monotonic_clock::time_point SplitTimestampBootMerger::monotonic_start_time( |
| size_t boot) const { |
| CHECK_LT(boot, monotonic_start_time_.size()); |
| return monotonic_start_time_[boot]; |
| } |
| |
| realtime_clock::time_point SplitTimestampBootMerger::realtime_start_time( |
| size_t boot) const { |
| CHECK_LT(boot, realtime_start_time_.size()); |
| return realtime_start_time_[boot]; |
| } |
| |
| monotonic_clock::time_point SplitTimestampBootMerger::monotonic_oldest_time( |
| size_t boot) const { |
| if (!timestamp_boot_merger_) { |
| return boot_merger_.monotonic_oldest_time(boot); |
| } |
| return std::min(boot_merger_.monotonic_oldest_time(boot), |
| timestamp_boot_merger_->monotonic_oldest_time(boot)); |
| } |
| |
| const Message *SplitTimestampBootMerger::Front() { |
| const Message *boot_merger_front = boot_merger_.Front(); |
| |
| if (timestamp_boot_merger_) { |
| CHECK(queue_timestamps_ran_); |
| } |
| |
| // timestamp_messages_ is a queue of TimestampedMessage, but we are supposed |
| // to return a Message. We need to convert the first message in the list |
| // before returning it (and comparing, honestly). Fill next_timestamp_ in if |
| // it is empty so the rest of the logic here can just look at next_timestamp_ |
| // and use that instead. |
| if (!next_timestamp_ && !timestamp_messages_.empty()) { |
| auto &front = timestamp_messages_.front(); |
| next_timestamp_ = Message{ |
| .channel_index = front.channel_index, |
| .queue_index = front.queue_index, |
| .timestamp = front.monotonic_event_time, |
| .monotonic_remote_boot = front.remote_queue_index.boot, |
| .monotonic_timestamp_boot = front.monotonic_timestamp_time.boot, |
| .data = std::move(front.data), |
| }; |
| timestamp_messages_.pop_front(); |
| } |
| |
| if (!next_timestamp_) { |
| message_source_ = MessageSource::kBootMerger; |
| if (boot_merger_front != nullptr) { |
| VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() |
| << " " << *boot_merger_front; |
| } else { |
| VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() |
| << " nullptr"; |
| } |
| return boot_merger_front; |
| } |
| |
| if (boot_merger_front == nullptr) { |
| message_source_ = MessageSource::kTimestampMessage; |
| |
| VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " " |
| << next_timestamp_.value(); |
| return &next_timestamp_.value(); |
| } |
| |
| if (*boot_merger_front <= next_timestamp_.value()) { |
| if (*boot_merger_front == next_timestamp_.value()) { |
| VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() |
| << " Dropping duplicate timestamp."; |
| next_timestamp_.reset(); |
| } |
| message_source_ = MessageSource::kBootMerger; |
| if (boot_merger_front != nullptr) { |
| VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() |
| << " " << *boot_merger_front; |
| } else { |
| VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() |
| << " nullptr"; |
| } |
| return boot_merger_front; |
| } else { |
| message_source_ = MessageSource::kTimestampMessage; |
| VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " " |
| << next_timestamp_.value(); |
| return &next_timestamp_.value(); |
| } |
| } |
| |
| void SplitTimestampBootMerger::PopFront() { |
| switch (message_source_) { |
| case MessageSource::kTimestampMessage: |
| CHECK(next_timestamp_.has_value()); |
| next_timestamp_.reset(); |
| break; |
| case MessageSource::kBootMerger: |
| boot_merger_.PopFront(); |
| break; |
| } |
| } |
| |
| TimestampMapper::TimestampMapper( |
| std::string_view node_name, const LogFilesContainer &log_files, |
| TimestampQueueStrategy timestamp_queue_strategy) |
| : boot_merger_(node_name, log_files, timestamp_queue_strategy), |
| timestamp_callback_([](TimestampedMessage *) {}) { |
| configuration_ = boot_merger_.configuration(); |
| |
| const Configuration *config = configuration_.get(); |
| // Only fill out nodes_data_ if there are nodes. Otherwise, everything is |
| // pretty simple. |
| if (configuration::MultiNode(config)) { |
| nodes_data_.resize(config->nodes()->size()); |
| const Node *my_node = config->nodes()->Get(node()); |
| for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) { |
| const Node *node = config->nodes()->Get(node_index); |
| NodeData *node_data = &nodes_data_[node_index]; |
| node_data->channels.resize(config->channels()->size()); |
| // We should save the channel if it is delivered to the node represented |
| // by the NodeData, but not sent by that node. That combo means it is |
| // forwarded. |
| size_t channel_index = 0; |
| node_data->any_delivered = false; |
| for (const Channel *channel : *config->channels()) { |
| node_data->channels[channel_index].delivered = |
| configuration::ChannelIsReadableOnNode(channel, node) && |
| configuration::ChannelIsSendableOnNode(channel, my_node) && |
| (my_node != node); |
| node_data->any_delivered = node_data->any_delivered || |
| node_data->channels[channel_index].delivered; |
| if (node_data->channels[channel_index].delivered) { |
| const Connection *connection = |
| configuration::ConnectionToNode(channel, node); |
| node_data->channels[channel_index].time_to_live = |
| chrono::nanoseconds(connection->time_to_live()); |
| } |
| ++channel_index; |
| } |
| } |
| |
| for (const Channel *channel : *config->channels()) { |
| source_node_.emplace_back(configuration::GetNodeIndex( |
| config, channel->source_node()->string_view())); |
| } |
| } else { |
| // The node index for single-node logs is always 0. |
| source_node_.resize(config->channels()->size(), 0); |
| } |
| } |
| |
| void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) { |
| CHECK(configuration::MultiNode(configuration())); |
| CHECK_NE(timestamp_mapper->node(), node()); |
| CHECK_LT(timestamp_mapper->node(), nodes_data_.size()); |
| |
| NodeData *node_data = &nodes_data_[timestamp_mapper->node()]; |
| // Only set it if this node delivers to the peer timestamp_mapper. Otherwise |
| // we could needlessly save data. |
| if (node_data->any_delivered) { |
| VLOG(1) << "Registering on node " << node() << " for peer node " |
| << timestamp_mapper->node(); |
| CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr); |
| |
| timestamp_mapper->nodes_data_[node()].peer = this; |
| |
| node_data->save_for_peer = true; |
| } |
| } |
| |
| void TimestampMapper::QueueMessage(const Message *msg) { |
| matched_messages_.emplace_back(TimestampedMessage{ |
| .channel_index = msg->channel_index, |
| .queue_index = msg->queue_index, |
| .monotonic_event_time = msg->timestamp, |
| .realtime_event_time = msg->data->realtime_sent_time, |
| .remote_queue_index = BootQueueIndex::Invalid(), |
| .monotonic_remote_time = BootTimestamp::min_time(), |
| .realtime_remote_time = realtime_clock::min_time, |
| .monotonic_remote_transmit_time = BootTimestamp::min_time(), |
| .monotonic_timestamp_time = BootTimestamp::min_time(), |
| .data = std::move(msg->data)}); |
| VLOG(1) << node_name() << " Inserted " << matched_messages_.back(); |
| } |
| |
| TimestampedMessage *TimestampMapper::Front() { |
| // No need to fetch anything new. A previous message still exists. |
| switch (first_message_) { |
| case FirstMessage::kNeedsUpdate: |
| break; |
| case FirstMessage::kInMessage: |
| VLOG(1) << this << " TimestampMapper::Front " << node_name() << " " |
| << matched_messages_.front(); |
| return &matched_messages_.front(); |
| case FirstMessage::kNullptr: |
| VLOG(1) << this << " TimestampMapper::Front " << node_name() |
| << " nullptr"; |
| return nullptr; |
| } |
| |
| if (matched_messages_.empty()) { |
| if (!QueueMatched()) { |
| first_message_ = FirstMessage::kNullptr; |
| VLOG(1) << this << " TimestampMapper::Front " << node_name() |
| << " nullptr"; |
| return nullptr; |
| } |
| } |
| first_message_ = FirstMessage::kInMessage; |
| VLOG(1) << this << " TimestampMapper::Front " << node_name() << " " |
| << matched_messages_.front(); |
| return &matched_messages_.front(); |
| } |
| |
| bool TimestampMapper::QueueMatched() { |
| MatchResult result = MatchResult::kEndOfFile; |
| do { |
| result = MaybeQueueMatched(); |
| } while (result == MatchResult::kSkipped); |
| return result == MatchResult::kQueued; |
| } |
| |
| bool TimestampMapper::CheckReplayChannelsAndMaybePop( |
| const TimestampedMessage & /*message*/) { |
| if (replay_channels_callback_ && |
| !replay_channels_callback_(matched_messages_.back())) { |
| VLOG(1) << node_name() << " Popped " << matched_messages_.back(); |
| matched_messages_.pop_back(); |
| return true; |
| } |
| return false; |
| } |
| |
| TimestampMapper::MatchResult TimestampMapper::MaybeQueueMatched() { |
| if (nodes_data_.empty()) { |
| // Simple path. We are single node, so there are no timestamps to match! |
| CHECK_EQ(messages_.size(), 0u); |
| const Message *msg = boot_merger_.Front(); |
| if (!msg) { |
| return MatchResult::kEndOfFile; |
| } |
| // Enqueue this message into matched_messages_ so we have a place to |
| // associate remote timestamps, and return it. |
| QueueMessage(msg); |
| |
| CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_) |
| << " on " << node_name(); |
| last_message_time_ = matched_messages_.back().monotonic_event_time; |
| |
| // We are thin wrapper around parts_merger. Call it directly. |
| boot_merger_.PopFront(); |
| timestamp_callback_(&matched_messages_.back()); |
| if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) { |
| return MatchResult::kSkipped; |
| } |
| return MatchResult::kQueued; |
| } |
| |
| // We need to only add messages to the list so they get processed for |
| // messages which are delivered. Reuse the flow below which uses messages_ |
| // by just adding the new message to messages_ and continuing. |
| if (messages_.empty()) { |
| if (!Queue()) { |
| // Found nothing to add, we are out of data! |
| return MatchResult::kEndOfFile; |
| } |
| |
| // Now that it has been added (and cannibalized), forget about it |
| // upstream. |
| boot_merger_.PopFront(); |
| } |
| |
| Message *msg = &(messages_.front()); |
| |
| if (source_node_[msg->channel_index] == node()) { |
| // From us, just forward it on, filling the remote data in as invalid. |
| QueueMessage(msg); |
| CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_) |
| << " on " << node_name(); |
| last_message_time_ = matched_messages_.back().monotonic_event_time; |
| messages_.pop_front(); |
| timestamp_callback_(&matched_messages_.back()); |
| if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) { |
| return MatchResult::kSkipped; |
| } |
| return MatchResult::kQueued; |
| } else { |
| // Got a timestamp, find the matching remote data, match it, and return |
| // it. |
| Message data = MatchingMessageFor(*msg); |
| |
| // Return the data from the remote. The local message only has timestamp |
| // info which isn't relevant anymore once extracted. |
| matched_messages_.emplace_back(TimestampedMessage{ |
| .channel_index = msg->channel_index, |
| .queue_index = msg->queue_index, |
| .monotonic_event_time = msg->timestamp, |
| .realtime_event_time = msg->data->realtime_sent_time, |
| .remote_queue_index = |
| BootQueueIndex{.boot = msg->monotonic_remote_boot, |
| .index = msg->data->remote_queue_index.value()}, |
| .monotonic_remote_time = {msg->monotonic_remote_boot, |
| msg->data->monotonic_remote_time.value()}, |
| .realtime_remote_time = msg->data->realtime_remote_time.value(), |
| .monotonic_remote_transmit_time = |
| {msg->monotonic_remote_boot, |
| msg->data->monotonic_remote_transmit_time}, |
| .monotonic_timestamp_time = {msg->monotonic_timestamp_boot, |
| msg->data->monotonic_timestamp_time}, |
| .data = std::move(data.data)}); |
| VLOG(1) << node_name() << " Inserted timestamp " |
| << matched_messages_.back(); |
| CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_) |
| << " on " << node_name() << " " << matched_messages_.back(); |
| last_message_time_ = matched_messages_.back().monotonic_event_time; |
| // Since messages_ holds the data, drop it. |
| messages_.pop_front(); |
| timestamp_callback_(&matched_messages_.back()); |
| if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) { |
| return MatchResult::kSkipped; |
| } |
| return MatchResult::kQueued; |
| } |
| } |
| |
| void TimestampMapper::QueueUntil(BootTimestamp queue_time) { |
| while (last_message_time_ <= queue_time) { |
| if (!QueueMatched()) { |
| return; |
| } |
| } |
| } |
| |
| void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) { |
| // Note: queueing for time doesn't really work well across boots. So we |
| // just assume that if you are using this, you only care about the current |
| // boot. |
| // |
| // TODO(austin): Is that the right concept? |
| // |
| // Make sure we have something queued first. This makes the end time |
| // calculation simpler, and is typically what folks want regardless. |
| if (matched_messages_.empty()) { |
| if (!QueueMatched()) { |
| return; |
| } |
| } |
| |
| const aos::monotonic_clock::time_point end_queue_time = |
| std::max(monotonic_start_time( |
| matched_messages_.front().monotonic_event_time.boot), |
| matched_messages_.front().monotonic_event_time.time) + |
| time_estimation_buffer; |
| |
| // Place sorted messages on the list until we have |
| // --time_estimation_buffer_seconds seconds queued up (but queue at least |
| // until the log starts). |
| while (end_queue_time >= last_message_time_.time) { |
| if (!QueueMatched()) { |
| return; |
| } |
| } |
| } |
| |
| void TimestampMapper::PopFront() { |
| CHECK(first_message_ != FirstMessage::kNeedsUpdate); |
| last_popped_message_time_ = Front()->monotonic_event_time; |
| first_message_ = FirstMessage::kNeedsUpdate; |
| |
| VLOG(1) << node_name() << " Popped " << matched_messages_.back(); |
| matched_messages_.pop_front(); |
| } |
| |
| Message TimestampMapper::MatchingMessageFor(const Message &message) { |
| // Figure out what queue index we are looking for. |
| CHECK_NOTNULL(message.data); |
| CHECK(message.data->remote_queue_index.has_value()); |
| const BootQueueIndex remote_queue_index = |
| BootQueueIndex{.boot = message.monotonic_remote_boot, |
| .index = *message.data->remote_queue_index}; |
| |
| CHECK(message.data->monotonic_remote_time.has_value()); |
| CHECK(message.data->realtime_remote_time.has_value()); |
| |
| const BootTimestamp monotonic_remote_time{ |
| .boot = message.monotonic_remote_boot, |
| .time = message.data->monotonic_remote_time.value()}; |
| const realtime_clock::time_point realtime_remote_time = |
| *message.data->realtime_remote_time; |
| |
| TimestampMapper *peer = |
| nodes_data_[source_node_[message.data->channel_index]].peer; |
| |
| // We only register the peers which we have data for. So, if we are being |
| // asked to pull a timestamp from a peer which doesn't exist, return an |
| // empty message. |
| if (peer == nullptr) { |
| // TODO(austin): Make sure the tests hit all these paths with a boot count |
| // of 1... |
| return Message{.channel_index = message.channel_index, |
| .queue_index = remote_queue_index, |
| .timestamp = monotonic_remote_time, |
| .monotonic_remote_boot = 0xffffff, |
| .monotonic_timestamp_boot = 0xffffff, |
| .data = nullptr}; |
| } |
| |
| // The queue which will have the matching data, if available. |
| std::deque<Message> *data_queue = |
| &peer->nodes_data_[node()].channels[message.channel_index].messages; |
| |
| peer->QueueUnmatchedUntil(monotonic_remote_time); |
| |
| if (data_queue->empty()) { |
| return Message{.channel_index = message.channel_index, |
| .queue_index = remote_queue_index, |
| .timestamp = monotonic_remote_time, |
| .monotonic_remote_boot = 0xffffff, |
| .monotonic_timestamp_boot = 0xffffff, |
| .data = nullptr}; |
| } |
| |
| if (remote_queue_index < data_queue->front().queue_index || |
| remote_queue_index > data_queue->back().queue_index) { |
| return Message{.channel_index = message.channel_index, |
| .queue_index = remote_queue_index, |
| .timestamp = monotonic_remote_time, |
| .monotonic_remote_boot = 0xffffff, |
| .monotonic_timestamp_boot = 0xffffff, |
| .data = nullptr}; |
| } |
| |
| // The algorithm below is constant time with some assumptions. We need there |
| // to be no missing messages in the data stream. This also assumes a queue |
| // hasn't wrapped. That is conservative, but should let us get started. |
| if (data_queue->back().queue_index.boot == |
| data_queue->front().queue_index.boot && |
| (data_queue->back().queue_index.index - |
| data_queue->front().queue_index.index + 1u == |
| data_queue->size())) { |
| CHECK_EQ(remote_queue_index.boot, data_queue->front().queue_index.boot); |
| // Pull the data out and confirm that the timestamps match as expected. |
| // |
| // TODO(austin): Move if not reliable. |
| Message result = (*data_queue)[remote_queue_index.index - |
| data_queue->front().queue_index.index]; |
| |
| CHECK_EQ(result.timestamp, monotonic_remote_time) |
| << ": Queue index matches, but timestamp doesn't. Please investigate!"; |
| CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time) |
| << ": Queue index matches, but timestamp doesn't. Please investigate!"; |
| // Now drop the data off the front. We have deduplicated timestamps, so we |
| // are done. And all the data is in order. |
| data_queue->erase( |
| data_queue->begin(), |
| data_queue->begin() + |
| (remote_queue_index.index - data_queue->front().queue_index.index)); |
| return result; |
| } else { |
| // TODO(austin): Binary search. |
| auto it = std::find_if( |
| data_queue->begin(), data_queue->end(), |
| [remote_queue_index, |
| remote_boot = monotonic_remote_time.boot](const Message &msg) { |
| return msg.queue_index == remote_queue_index && |
| msg.timestamp.boot == remote_boot; |
| }); |
| if (it == data_queue->end()) { |
| return Message{.channel_index = message.channel_index, |
| .queue_index = remote_queue_index, |
| .timestamp = monotonic_remote_time, |
| .monotonic_remote_boot = 0xffffff, |
| .monotonic_timestamp_boot = 0xffffff, |
| .data = nullptr}; |
| } |
| |
| Message result = std::move(*it); |
| |
| CHECK_EQ(result.timestamp, monotonic_remote_time) |
| << ": Queue index matches, but timestamp doesn't. Please " |
| "investigate!"; |
| CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time) |
| << ": Queue index matches, but timestamp doesn't. Please " |
| "investigate!"; |
| |
| // Erase everything up to this message. We want to keep 1 message in the |
| // queue so we can handle reliable messages forwarded across boots. |
| data_queue->erase(data_queue->begin(), it); |
| |
| return result; |
| } |
| } |
| |
| void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) { |
| if (queued_until_ > t) { |
| return; |
| } |
| while (true) { |
| if (!messages_.empty() && messages_.back().timestamp > t) { |
| queued_until_ = std::max(queued_until_, messages_.back().timestamp); |
| return; |
| } |
| |
| if (!Queue()) { |
| // Found nothing to add, we are out of data! |
| queued_until_ = BootTimestamp::max_time(); |
| return; |
| } |
| |
| // Now that it has been added (and cannibalized), forget about it |
| // upstream. |
| boot_merger_.PopFront(); |
| } |
| } |
| |
| bool TimestampMapper::Queue() { |
| const Message *msg = boot_merger_.Front(); |
| if (msg == nullptr) { |
| return false; |
| } |
| for (NodeData &node_data : nodes_data_) { |
| if (!node_data.any_delivered) continue; |
| if (!node_data.save_for_peer) continue; |
| if (node_data.channels[msg->channel_index].delivered) { |
| // If we have data but no timestamps (logs where the timestamps didn't get |
| // logged are classic), we can grow this indefinitely. We don't need to |
| // keep anything that is older than the last message returned. |
| |
| // We have the time on the source node. |
| // We care to wait until we have the time on the destination node. |
| std::deque<Message> &messages = |
| node_data.channels[msg->channel_index].messages; |
| // Max delay over the network is the TTL, so let's take the queue time and |
| // add TTL to it. Don't forget any messages which are reliable until |
| // someone can come up with a good reason to forget those too. |
| if (node_data.channels[msg->channel_index].time_to_live > |
| chrono::nanoseconds(0)) { |
| // We need to make *some* assumptions about network delay for this to |
| // work. We want to only look at the RX side. This means we need to |
| // track the last time a message was popped from any channel from the |
| // node sending this message, and compare that to the max time we expect |
| // that a message will take to be delivered across the network. This |
| // assumes that messages are popped in time order as a proxy for |
| // measuring the distributed time at this layer. |
| // |
| // Leave at least 1 message in here so we can handle reboots and |
| // messages getting sent twice. |
| while (messages.size() > 1u && |
| messages.begin()->timestamp + |
| node_data.channels[msg->channel_index].time_to_live + |
| chrono::duration_cast<chrono::nanoseconds>( |
| chrono::duration<double>(FLAGS_max_network_delay)) < |
| last_popped_message_time_) { |
| messages.pop_front(); |
| } |
| } |
| node_data.channels[msg->channel_index].messages.emplace_back(*msg); |
| } |
| } |
| |
| messages_.emplace_back(std::move(*msg)); |
| return true; |
| } |
| |
| void TimestampMapper::QueueTimestamps() { |
| boot_merger_.QueueTimestamps(std::ref(timestamp_callback_), source_node_); |
| } |
| |
| std::string TimestampMapper::DebugString() const { |
| std::stringstream ss; |
| ss << "node " << node() << " (" << node_name() << ") [\n"; |
| for (const Message &message : messages_) { |
| ss << " " << message << "\n"; |
| } |
| ss << "] queued_until " << queued_until_; |
| for (const NodeData &ns : nodes_data_) { |
| if (ns.peer == nullptr) continue; |
| ss << "\nnode " << ns.peer->node() << " remote_data [\n"; |
| size_t channel_index = 0; |
| for (const NodeData::ChannelData &channel_data : |
| ns.peer->nodes_data_[node()].channels) { |
| if (channel_data.messages.empty()) { |
| continue; |
| } |
| |
| ss << " channel " << channel_index << " [\n"; |
| for (const Message &msg : channel_data.messages) { |
| ss << " " << msg << "\n"; |
| } |
| ss << " ]\n"; |
| ++channel_index; |
| } |
| ss << "] queued_until " << ns.peer->queued_until_; |
| } |
| return ss.str(); |
| } |
| |
| } // namespace aos::logger |