blob: c67abdac3363bbe9a9184384ab93ee3b188e5847 [file] [log] [blame]
#ifndef AOS_UTIL_MCAP_LOGGER_H_
#define AOS_UTIL_MCAP_LOGGER_H_
#include <stddef.h>
#include <stdint.h>
#include <iosfwd>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
#include "nlohmann/json_fwd.hpp"
#include "aos/configuration_generated.h"
#include "aos/events/context.h"
#include "aos/events/event_loop.h"
#include "aos/fast_string_builder.h"
#include "aos/flatbuffer_utils.h"
#include "aos/flatbuffers.h"
#include "aos/time/time.h"
namespace aos {
// Produces a JSON Schema (https://json-schema.org/) for a given flatbuffer
// type. If recursion_level is set, will include a $schema attribute indicating
// the schema definition being used (this is used to allow for recursion).
//
// Note that this is pretty bare-bones, so, e.g., we don't distinguish between
// structs and tables when generating the JSON schema, so we don't bother to
// mark struct fields as required.
enum class JsonSchemaRecursion {
kTopLevel,
kNested,
};
nlohmann::json JsonSchemaForFlatbuffer(
const FlatbufferType &type,
JsonSchemaRecursion recursion_level = JsonSchemaRecursion::kTopLevel);
// Returns the shortest possible alias for the specified channel on the
// specified node/application.
std::string ShortenedChannelName(const aos::Configuration *config,
const aos::Channel *channel,
std::string_view application_name,
const aos::Node *node);
// Generates an MCAP file, per the specification at
// https://github.com/foxglove/mcap/tree/main/docs/specification
// This currently generates an uncompressed logfile with full message indexing
// available, to be able to support Foxglove fully.
class McapLogger {
public:
// Whether to serialize the messages into the MCAP file as JSON or
// flatbuffers.
enum class Serialization {
kJson,
kFlatbuffer,
};
// Whether to attempt to shorten channel names.
enum class CanonicalChannelNames {
// Just use the full, unambiguous, channel names.
kCanonical,
// Use GetChannelAliases() to determine the shortest possible name for the
// channel for the current node, and use that in the MCAP file. This makes
// it so that the channels in the resulting file are more likely to match
// the channel names that are used in "real" applications.
kShortened,
};
// Chunk compression to use in the MCAP file.
enum class Compression {
kNone,
kLz4,
};
McapLogger(EventLoop *event_loop, const std::string &output_path,
Serialization serialization,
CanonicalChannelNames canonical_channels, Compression compression);
~McapLogger();
private:
enum class OpCode {
kHeader = 0x01,
kFooter = 0x02,
kSchema = 0x03,
kChannel = 0x04,
kMessage = 0x05,
kChunk = 0x06,
kMessageIndex = 0x07,
kChunkIndex = 0x08,
kAttachment = 0x09,
kAttachmentIndex = 0x0A,
kStatistics = 0x0B,
kMetadata = 0x0C,
kMetadataIndex = 0x0D,
kSummaryOffset = 0x0E,
kDataEnd = 0x0F,
};
// Stores information associated with a SummaryOffset entry (an offset to the
// start of a section within Summary section, which allows readers to quickly
// find all the indices/channel definitions/etc. for a given log).
struct SummaryOffset {
OpCode op_code;
// Offset from the start of the file.
uint64_t offset;
// Total length of the section, in bytes.
uint64_t size;
};
// Information needed to build a ChunkIndex entry.
struct ChunkIndex {
// Earliest and latest message times within the Chunk being referenced.
aos::monotonic_clock::time_point start_time;
aos::monotonic_clock::time_point end_time;
// Offset from the start of the file to the start of the relevant Chunk.
uint64_t offset;
// Total size of the Chunk, in bytes.
uint64_t chunk_size;
// Total uncompressed size of the records portion of the Chunk, in bytes.
uint64_t records_size;
// Total size of the records portion of the Chunk, when compressed
uint64_t records_size_compressed;
// Mapping of channel IDs to the MessageIndex entry for that channel within
// the referenced Chunk. The MessageIndex is referenced by an offset from
// the start of the file.
std::map<uint16_t, uint64_t> message_index_offsets;
// Total size, in bytes, of all the MessageIndex entries for this Chunk
// together (note that they are required to be contiguous).
uint64_t message_index_size;
// Compression used in this Chunk.
Compression compression;
};
// Maintains the state of a single Chunk. In order to maximize read
// performance, we currently maintain separate chunks for each channel so
// that, in order to read a given channel, only data associated with that
// channel nead be read.
struct ChunkStatus {
// Buffer containing serialized message data for the currently-being-built
// chunk.
std::stringstream data;
// Earliest message observed in this chunk.
std::optional<aos::monotonic_clock::time_point> earliest_message;
// Latest message observed in this chunk.
std::optional<aos::monotonic_clock::time_point> latest_message;
// MessageIndex's for each message. The std::map is indexed by channel ID.
// The vector is then a series of pairs of (timestamp, offset from start of
// data).
// Note that currently this will only ever have one entry, for the channel
// that this chunk corresponds to. However, the standard provides for there
// being more than one channel per chunk and so we still have some code that
// supports that.
std::map<uint16_t, std::vector<std::pair<uint64_t, uint64_t>>>
message_indices;
};
enum class RegisterHandlers { kYes, kNo };
// Helpers to write each type of relevant record.
void WriteMagic();
void WriteHeader();
void WriteFooter(uint64_t summary_offset, uint64_t summary_offset_offset);
void WriteDataEnd();
void WriteSchema(const uint16_t id, const aos::Channel *channel);
void WriteChannel(const uint16_t id, const uint16_t schema_id,
const aos::Channel *channel,
std::string_view override_name = "");
void WriteMessage(uint16_t channel_id, const Channel *channel,
const Context &context, ChunkStatus *chunk);
void WriteChunk(ChunkStatus *chunk);
// Writes out the special configuration channel. This gets called right before
// the first actual message is written so that we can have a reasonable
// monotonic clock time.
void WriteConfigurationMessage();
// The helpers for writing records which appear in the Summary section will
// return SummaryOffset's so that they can be referenced in the SummaryOffset
// section.
SummaryOffset WriteChunkIndices();
SummaryOffset WriteStatistics();
std::vector<SummaryOffset> WriteSchemasAndChannels(
RegisterHandlers register_handlers);
void WriteSummaryOffset(const SummaryOffset &offset);
// Writes an MCAP record to the output file.
void WriteRecord(OpCode op, std::string_view record, std::ostream *ostream);
void WriteRecord(OpCode op, std::string_view record) {
WriteRecord(op, record, &output_);
}
// Adds an MCAP-spec string/byte-array/map/array of pairs/fixed-size integer
// to a buffer.
static void AppendString(FastStringBuilder *builder, std::string_view string);
static void AppendBytes(FastStringBuilder *builder, std::string_view bytes);
static void AppendChannelMap(FastStringBuilder *builder,
const std::map<uint16_t, uint64_t> &map);
static void AppendMessageIndices(
FastStringBuilder *builder,
const std::vector<std::pair<uint64_t, uint64_t>> &messages);
static void AppendInt16(FastStringBuilder *builder, uint16_t val);
static void AppendInt32(FastStringBuilder *builder, uint32_t val);
static void AppendInt64(FastStringBuilder *builder, uint64_t val);
aos::EventLoop *event_loop_;
std::ofstream output_;
const Serialization serialization_;
const CanonicalChannelNames canonical_channels_;
const Compression compression_;
size_t total_message_bytes_ = 0;
std::map<const Channel *, size_t> total_channel_bytes_;
FastStringBuilder string_builder_;
// Earliest message observed in this logfile.
std::optional<aos::monotonic_clock::time_point> earliest_message_;
// Latest message observed in this logfile.
aos::monotonic_clock::time_point latest_message_ =
aos::monotonic_clock::min_time;
// Count of all messages on each channel, indexed by channel ID.
std::map<uint16_t, uint64_t> message_counts_;
std::map<uint16_t, std::unique_ptr<RawFetcher>> fetchers_;
// All currently-being-built chunks. Indexed by channel ID. This is used to
// segregate channels into separate chunks to support more efficient reading.
std::map<uint16_t, ChunkStatus> current_chunks_;
// ChunkIndex's for all fully written Chunks.
std::vector<ChunkIndex> chunk_indices_;
// Metadata associated with the fake "configuration" channel that we create in
// order to ensure that foxglove extensions/users have access to the full
// configuration.
uint16_t configuration_id_ = 0;
FlatbufferDetachedBuffer<Channel> configuration_channel_;
FlatbufferDetachedBuffer<Configuration> configuration_;
bool wrote_configuration_ = false;
// Memory buffer to use for compressing data.
std::vector<uint8_t> compression_buffer_;
};
} // namespace aos
#endif // AOS_UTIL_MCAP_LOGGER_H_