Merge changes I211ba096,I83085561
* changes:
Make sure everything is up before starting message_bridge_client
Practice and comp robot calibration for MTTD
diff --git a/WORKSPACE b/WORKSPACE
index 6ef1eb9..172e7bb 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -1266,11 +1266,22 @@
url = "https://www.frc971.org/Build-Dependencies/aws_sdk-19.0.0-RC1.tar.gz",
)
+# Source code of LZ4 (files under lib/) are under BSD 2-Clause.
+# The rest of the repository (build information, documentation, etc.) is under GPLv2.
+# We only care about the lib/ subfolder anyways, and strip out any other files.
+http_archive(
+ name = "com_github_lz4_lz4",
+ build_file = "//debian:BUILD.lz4.bazel",
+ sha256 = "0b0e3aa07c8c063ddf40b082bdf7e37a1562bda40a0ff5272957f3e987e0e54b",
+ strip_prefix = "lz4-1.9.4/lib",
+ url = "https://github.com/lz4/lz4/archive/refs/tags/v1.9.4.tar.gz",
+)
+
http_file(
name = "com_github_foxglove_mcap_mcap",
executable = True,
- sha256 = "cf4dfcf71e20a60406aaded03a165312c1ca535b509ead90eb1846fc598137d2",
- urls = ["https://github.com/foxglove/mcap/releases/download/releases%2Fmcap-cli%2Fv0.0.5/mcap-linux-amd64"],
+ sha256 = "ae745dd09cf4c9570c1c038a72630c07b073f0ed4b05983d64108ff748a40d3f",
+ urls = ["https://github.com/foxglove/mcap/releases/download/releases%2Fmcap-cli%2Fv0.0.22/mcap-linux-amd64"],
)
http_archive(
diff --git a/aos/events/event_loop_runtime.rs b/aos/events/event_loop_runtime.rs
index 360c931..023cfb6 100644
--- a/aos/events/event_loop_runtime.rs
+++ b/aos/events/event_loop_runtime.rs
@@ -370,6 +370,9 @@
MonotonicInstant(self.0.monotonic_now())
}
+ pub fn realtime_now(&self) -> RealtimeInstant {
+ RealtimeInstant(self.0.realtime_now())
+ }
/// Note that the `'event_loop` input lifetime is intentional. The C++ API requires that it is
/// part of `self.configuration()`, which will always have this lifetime.
///
@@ -711,7 +714,6 @@
where
T: Follow<'a> + 'a;
-// TODO(Brian): Add the realtime timestamps here.
impl<'a, T> TypedContext<'a, T>
where
T: Follow<'a> + 'a,
@@ -730,6 +732,12 @@
pub fn monotonic_remote_time(&self) -> MonotonicInstant {
self.0.monotonic_remote_time()
}
+ pub fn realtime_event_time(&self) -> RealtimeInstant {
+ self.0.realtime_event_time()
+ }
+ pub fn realtime_remote_time(&self) -> RealtimeInstant {
+ self.0.realtime_remote_time()
+ }
pub fn queue_index(&self) -> u32 {
self.0.queue_index()
}
@@ -750,10 +758,11 @@
T::Inner: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- // TODO(Brian): Add the realtime timestamps here.
f.debug_struct("TypedContext")
.field("monotonic_event_time", &self.monotonic_event_time())
.field("monotonic_remote_time", &self.monotonic_remote_time())
+ .field("realtime_event_time", &self.realtime_event_time())
+ .field("realtime_remote_time", &self.realtime_remote_time())
.field("queue_index", &self.queue_index())
.field("remote_queue_index", &self.remote_queue_index())
.field("message", &self.message())
@@ -1020,10 +1029,11 @@
impl fmt::Debug for Context<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- // TODO(Brian): Add the realtime timestamps here.
f.debug_struct("Context")
.field("monotonic_event_time", &self.monotonic_event_time())
.field("monotonic_remote_time", &self.monotonic_remote_time())
+ .field("realtime_event_time", &self.realtime_event_time())
+ .field("realtime_remote_time", &self.realtime_remote_time())
.field("queue_index", &self.queue_index())
.field("remote_queue_index", &self.remote_queue_index())
.field("size", &self.data().map(|data| data.len()))
@@ -1033,7 +1043,6 @@
}
}
-// TODO(Brian): Add the realtime timestamps here.
impl<'context> Context<'context> {
pub fn monotonic_event_time(self) -> MonotonicInstant {
MonotonicInstant(self.0.monotonic_event_time)
@@ -1043,6 +1052,14 @@
MonotonicInstant(self.0.monotonic_remote_time)
}
+ pub fn realtime_event_time(self) -> RealtimeInstant {
+ RealtimeInstant(self.0.realtime_event_time)
+ }
+
+ pub fn realtime_remote_time(self) -> RealtimeInstant {
+ RealtimeInstant(self.0.realtime_remote_time)
+ }
+
pub fn queue_index(self) -> u32 {
self.0.queue_index
}
@@ -1093,9 +1110,6 @@
/// Represents a `aos::monotonic_clock::time_point` in a natural Rust way. This
/// is intended to have the same API as [`std::time::Instant`], any missing
/// functionality can be added if useful.
-///
-/// TODO(Brian): Do RealtimeInstant too. Use a macro? Integer as a generic
-/// parameter to distinguish them? Or just copy/paste?
#[repr(transparent)]
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct MonotonicInstant(i64);
@@ -1125,6 +1139,34 @@
}
}
+#[repr(transparent)]
+#[derive(Clone, Copy, Eq, PartialEq)]
+pub struct RealtimeInstant(i64);
+
+impl RealtimeInstant {
+ pub const MIN_TIME: Self = Self(i64::MIN);
+
+ pub fn is_min_time(self) -> bool {
+ self == Self::MIN_TIME
+ }
+
+ pub fn duration_since_epoch(self) -> Option<Duration> {
+ if self.is_min_time() {
+ None
+ } else {
+ Some(Duration::from_nanos(self.0.try_into().expect(
+ "monotonic_clock::time_point should always be after the epoch",
+ )))
+ }
+ }
+}
+
+impl fmt::Debug for RealtimeInstant {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ self.duration_since_epoch().fmt(f)
+ }
+}
+
mod panic_waker {
use std::task::{RawWaker, RawWakerVTable, Waker};
diff --git a/aos/util/BUILD b/aos/util/BUILD
index 19a9172..f8eb09f 100644
--- a/aos/util/BUILD
+++ b/aos/util/BUILD
@@ -86,6 +86,7 @@
"//aos:fast_string_builder",
"//aos:flatbuffer_utils",
"//aos/events:event_loop",
+ "@com_github_lz4_lz4//:lz4",
"@com_github_nlohmann_json//:json",
],
)
diff --git a/aos/util/log_to_mcap.cc b/aos/util/log_to_mcap.cc
index 5330c60..e669658 100644
--- a/aos/util/log_to_mcap.cc
+++ b/aos/util/log_to_mcap.cc
@@ -5,7 +5,12 @@
DEFINE_string(node, "", "Node to replay from the perspective of.");
DEFINE_string(output_path, "/tmp/log.mcap", "Log to output.");
-DEFINE_string(mode, "json", "json or flatbuffer serialization.");
+DEFINE_string(mode, "flatbuffer", "json or flatbuffer serialization.");
+DEFINE_bool(
+ canonical_channel_names, false,
+ "If set, use full channel names; by default, will shorten names to be the "
+ "shortest possible version of the name (e.g., /aos instead of /pi/aos).");
+DEFINE_bool(compress, true, "Whether to use LZ4 compression in MCAP file.");
// Converts an AOS log to an MCAP log that can be fed into Foxglove. To try this
// out, run:
@@ -19,21 +24,43 @@
const std::vector<aos::logger::LogFile> logfiles =
aos::logger::SortParts(aos::logger::FindLogs(argc, argv));
+ CHECK(!logfiles.empty());
+ const std::string logger_node = logfiles.at(0).logger_node;
+ bool all_logs_from_same_node = true;
+ for (const aos::logger::LogFile &log : logfiles) {
+ if (log.logger_node != logger_node) {
+ all_logs_from_same_node = false;
+ break;
+ }
+ }
+ std::string replay_node = FLAGS_node;
+ if (replay_node.empty() && all_logs_from_same_node) {
+ LOG(INFO) << "Guessing \"" << logger_node
+ << "\" as node given that --node was not specified.";
+ replay_node = logger_node;
+ }
aos::logger::LogReader reader(logfiles);
reader.Register();
const aos::Node *node =
- FLAGS_node.empty()
+ (replay_node.empty() ||
+ !aos::configuration::MultiNode(reader.configuration()))
? nullptr
- : aos::configuration::GetNode(reader.configuration(), FLAGS_node);
+ : aos::configuration::GetNode(reader.configuration(), replay_node);
+
std::unique_ptr<aos::EventLoop> mcap_event_loop =
reader.event_loop_factory()->MakeEventLoop("mcap", node);
CHECK(!FLAGS_output_path.empty());
- aos::McapLogger relogger(mcap_event_loop.get(), FLAGS_output_path,
- FLAGS_mode == "flatbuffer"
- ? aos::McapLogger::Serialization::kFlatbuffer
- : aos::McapLogger::Serialization::kJson);
+ aos::McapLogger relogger(
+ mcap_event_loop.get(), FLAGS_output_path,
+ FLAGS_mode == "flatbuffer" ? aos::McapLogger::Serialization::kFlatbuffer
+ : aos::McapLogger::Serialization::kJson,
+ FLAGS_canonical_channel_names
+ ? aos::McapLogger::CanonicalChannelNames::kCanonical
+ : aos::McapLogger::CanonicalChannelNames::kShortened,
+ FLAGS_compress ? aos::McapLogger::Compression::kLz4
+ : aos::McapLogger::Compression::kNone);
reader.event_loop_factory()->Run();
}
diff --git a/aos/util/log_to_mcap_test.py b/aos/util/log_to_mcap_test.py
index 7bdffe4..36f8de0 100644
--- a/aos/util/log_to_mcap_test.py
+++ b/aos/util/log_to_mcap_test.py
@@ -10,6 +10,27 @@
from typing import Sequence, Text
+def make_permutations(options):
+ if len(options) == 0:
+ return [[]]
+ permutations = []
+ for option in options[0]:
+ for sub_permutations in make_permutations(options[1:]):
+ permutations.append([option] + sub_permutations)
+ return permutations
+
+
+def generate_argument_permutations():
+ arg_sets = [["--compress", "--nocompress"],
+ ["--mode=flatbuffer", "--mode=json"],
+ ["--canonical_channel_names", "--nocanonical_channel_names"],
+ ["--mcap_chunk_size=1000", "--mcap_chunk_size=10000000"],
+ ["--fetch", "--nofetch"]]
+ permutations = make_permutations(arg_sets)
+ print(permutations)
+ return permutations
+
+
def main(argv: Sequence[Text]):
parser = argparse.ArgumentParser()
parser.add_argument("--log_to_mcap",
@@ -20,35 +41,38 @@
required=True,
help="Path to logfile generator.")
args = parser.parse_args(argv)
- with tempfile.TemporaryDirectory() as tmpdir:
- log_name = tmpdir + "/test_log/"
- mcap_name = tmpdir + "/log.mcap"
- subprocess.run([args.generate_log, "--output_folder",
- log_name]).check_returncode()
- # Run with a really small chunk size, to force a multi-chunk file.
- subprocess.run([
- args.log_to_mcap, "--output_path", mcap_name, "--mcap_chunk_size",
- "1000", "--mode", "json", log_name
- ]).check_returncode()
- # MCAP attempts to find $HOME/.mcap.yaml, and dies on $HOME not existing. So
- # give it an arbitrary config location (it seems to be fine with a non-existent config).
- doctor_result = subprocess.run([
- args.mcap, "doctor", mcap_name, "--config", tmpdir + "/.mcap.yaml"
- ],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- encoding='utf-8')
- print(doctor_result.stdout)
- print(doctor_result.stderr)
- # mcap doctor doesn't actually return a non-zero exit code on certain failures...
- # See https://github.com/foxglove/mcap/issues/356
- if len(doctor_result.stderr) != 0:
- print("Didn't expect any stderr output.")
- return 1
- if doctor_result.stdout != f"Examining {mcap_name}\n":
- print("Only expected one line of stdout.")
- return 1
- doctor_result.check_returncode()
+ log_to_mcap_argument_permutations = generate_argument_permutations()
+ for log_to_mcap_args in log_to_mcap_argument_permutations:
+ with tempfile.TemporaryDirectory() as tmpdir:
+ log_name = tmpdir + "/test_log/"
+ mcap_name = tmpdir + "/log.mcap"
+ subprocess.run([args.generate_log, "--output_folder",
+ log_name]).check_returncode()
+ # Run with a really small chunk size, to force a multi-chunk file.
+ subprocess.run([
+ args.log_to_mcap, "--output_path", mcap_name,
+ "--mcap_chunk_size", "1000", "--mode", "json", log_name
+ ] + log_to_mcap_args).check_returncode()
+ # MCAP attempts to find $HOME/.mcap.yaml, and dies on $HOME not existing. So
+ # give it an arbitrary config location (it seems to be fine with a non-existent config).
+ doctor_result = subprocess.run([
+ args.mcap, "doctor", mcap_name, "--config",
+ tmpdir + "/.mcap.yaml"
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ encoding='utf-8')
+ print(doctor_result.stdout)
+ print(doctor_result.stderr)
+ # mcap doctor doesn't actually return a non-zero exit code on certain failures...
+ # See https://github.com/foxglove/mcap/issues/356
+ if len(doctor_result.stderr) != 0:
+ print("Didn't expect any stderr output.")
+ return 1
+ if doctor_result.stdout != f"Examining {mcap_name}\nHeader.profile field \"x-aos\" is not a well-known profile.\n":
+ print("Only expected two lines of stdout.")
+ return 1
+ doctor_result.check_returncode()
return 0
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
index dc27504..742f284 100644
--- a/aos/util/mcap_logger.cc
+++ b/aos/util/mcap_logger.cc
@@ -3,6 +3,8 @@
#include "absl/strings/str_replace.h"
#include "aos/configuration_schema.h"
#include "aos/flatbuffer_merge.h"
+#include "lz4/lz4.h"
+#include "lz4/lz4frame.h"
#include "single_include/nlohmann/json.hpp"
DEFINE_uint64(mcap_chunk_size, 10'000'000,
@@ -82,11 +84,27 @@
return schema;
}
+namespace {
+std::string_view CompressionName(McapLogger::Compression compression) {
+ switch (compression) {
+ case McapLogger::Compression::kNone:
+ return "";
+ case McapLogger::Compression::kLz4:
+ return "lz4";
+ }
+ LOG(FATAL) << "Unreachable.";
+}
+} // namespace
+
McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path,
- Serialization serialization)
+ Serialization serialization,
+ CanonicalChannelNames canonical_channels,
+ Compression compression)
: event_loop_(event_loop),
output_(output_path),
serialization_(serialization),
+ canonical_channels_(canonical_channels),
+ compression_(compression),
configuration_channel_([]() {
// Setup a fake Channel for providing the configuration in the MCAP
// file. This is included for convenience so that consumers of the MCAP
@@ -121,8 +139,10 @@
McapLogger::~McapLogger() {
// If we have any data remaining, write one last chunk.
- if (current_chunk_.tellp() > 0) {
- WriteChunk();
+ for (auto &pair : current_chunks_) {
+ if (pair.second.data.tellp() > 0) {
+ WriteChunk(&pair.second);
+ }
}
WriteDataEnd();
@@ -189,16 +209,18 @@
message_counts_[id] = 0;
event_loop_->MakeRawWatcher(
channel, [this, id, channel](const Context &context, const void *) {
- WriteMessage(id, channel, context, ¤t_chunk_);
- if (static_cast<uint64_t>(current_chunk_.tellp()) >
+ ChunkStatus *chunk = ¤t_chunks_[id];
+ WriteMessage(id, channel, context, chunk);
+ if (static_cast<uint64_t>(chunk->data.tellp()) >
FLAGS_mcap_chunk_size) {
- WriteChunk();
+ WriteChunk(chunk);
}
});
fetchers_[id] = event_loop_->MakeRawFetcher(channel);
event_loop_->OnRun([this, id, channel]() {
if (FLAGS_fetch && fetchers_[id]->Fetch()) {
- WriteMessage(id, channel, fetchers_[id]->context(), ¤t_chunk_);
+ WriteMessage(id, channel, fetchers_[id]->context(),
+ ¤t_chunks_[id]);
}
});
}
@@ -214,7 +236,7 @@
config_context.size = configuration_.span().size();
config_context.data = configuration_.span().data();
WriteMessage(configuration_id_, &configuration_channel_.message(),
- config_context, ¤t_chunk_);
+ config_context, ¤t_chunks_[configuration_id_]);
});
}
@@ -321,11 +343,34 @@
// Schema ID
AppendInt16(&string_builder_, schema_id);
// Topic name
- AppendString(&string_builder_,
- override_name.empty()
- ? absl::StrCat(channel->name()->string_view(), " ",
- channel->type()->string_view())
- : override_name);
+ std::string topic_name(override_name);
+ if (topic_name.empty()) {
+ switch (canonical_channels_) {
+ case CanonicalChannelNames::kCanonical:
+ topic_name = absl::StrCat(channel->name()->string_view(), " ",
+ channel->type()->string_view());
+ break;
+ case CanonicalChannelNames::kShortened: {
+ std::set<std::string> names = configuration::GetChannelAliases(
+ event_loop_->configuration(), channel, event_loop_->name(),
+ event_loop_->node());
+ std::string_view shortest_name;
+ for (const std::string &name : names) {
+ if (shortest_name.empty() || name.size() < shortest_name.size()) {
+ shortest_name = name;
+ }
+ }
+ if (shortest_name != channel->name()->string_view()) {
+ VLOG(1) << "Shortening " << channel->name()->string_view() << " "
+ << channel->type()->string_view() << " to " << shortest_name;
+ }
+ topic_name =
+ absl::StrCat(shortest_name, " ", channel->type()->string_view());
+ break;
+ }
+ }
+ }
+ AppendString(&string_builder_, topic_name);
// Encoding
switch (serialization_) {
case Serialization::kJson:
@@ -342,7 +387,7 @@
}
void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
- const Context &context, std::ostream *output) {
+ const Context &context, ChunkStatus *chunk) {
CHECK_NOTNULL(context.data);
message_counts_[channel_id]++;
@@ -353,12 +398,13 @@
earliest_message_ =
std::min(context.monotonic_event_time, earliest_message_.value());
}
- if (!earliest_chunk_message_.has_value()) {
- earliest_chunk_message_ = context.monotonic_event_time;
+ if (!chunk->earliest_message.has_value()) {
+ chunk->earliest_message = context.monotonic_event_time;
} else {
- earliest_chunk_message_ =
- std::min(context.monotonic_event_time, earliest_chunk_message_.value());
+ chunk->earliest_message =
+ std::min(context.monotonic_event_time, chunk->earliest_message.value());
}
+ chunk->latest_message = context.monotonic_event_time;
latest_message_ = context.monotonic_event_time;
string_builder_.Reset();
@@ -396,11 +442,12 @@
total_message_bytes_ += context.size;
total_channel_bytes_[channel] += context.size;
- message_indices_[channel_id].push_back(std::make_pair<uint64_t, uint64_t>(
- context.monotonic_event_time.time_since_epoch().count(),
- output->tellp()));
+ chunk->message_indices[channel_id].push_back(
+ std::make_pair<uint64_t, uint64_t>(
+ context.monotonic_event_time.time_since_epoch().count(),
+ chunk->data.tellp()));
- WriteRecord(OpCode::kMessage, string_builder_.Result(), output);
+ WriteRecord(OpCode::kMessage, string_builder_.Result(), &chunk->data);
}
void McapLogger::WriteRecord(OpCode op, std::string_view record,
@@ -412,43 +459,77 @@
*ostream << record;
}
-void McapLogger::WriteChunk() {
+void McapLogger::WriteChunk(ChunkStatus *chunk) {
string_builder_.Reset();
- CHECK(earliest_chunk_message_.has_value());
+ CHECK(chunk->earliest_message.has_value());
const uint64_t chunk_offset = output_.tellp();
AppendInt64(&string_builder_,
- earliest_chunk_message_->time_since_epoch().count());
- AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
+ chunk->earliest_message->time_since_epoch().count());
+ CHECK(chunk->latest_message.has_value());
+ AppendInt64(&string_builder_,
+ chunk->latest_message.value().time_since_epoch().count());
- std::string chunk_records = current_chunk_.str();
+ std::string chunk_records = chunk->data.str();
// Reset the chunk buffer.
- current_chunk_.str("");
+ chunk->data.str("");
const uint64_t records_size = chunk_records.size();
// Uncompressed chunk size.
AppendInt64(&string_builder_, records_size);
// Uncompressed CRC (unpopulated).
AppendInt32(&string_builder_, 0);
- AppendString(&string_builder_, "");
- AppendBytes(&string_builder_, chunk_records);
+ // Compression
+ AppendString(&string_builder_, CompressionName(compression_));
+ uint64_t records_size_compressed = records_size;
+ switch (compression_) {
+ case Compression::kNone:
+ AppendBytes(&string_builder_, chunk_records);
+ break;
+ case Compression::kLz4: {
+ // Default preferences.
+ LZ4F_preferences_t *lz4_preferences = nullptr;
+ const uint64_t max_size =
+ LZ4F_compressFrameBound(records_size, lz4_preferences);
+ CHECK_NE(0u, max_size);
+ if (max_size > compression_buffer_.size()) {
+ compression_buffer_.resize(max_size);
+ }
+ records_size_compressed = LZ4F_compressFrame(
+ compression_buffer_.data(), compression_buffer_.size(),
+ reinterpret_cast<const char *>(chunk_records.data()),
+ chunk_records.size(), lz4_preferences);
+ CHECK(!LZ4F_isError(records_size_compressed));
+ AppendBytes(&string_builder_,
+ {reinterpret_cast<const char *>(compression_buffer_.data()),
+ static_cast<size_t>(records_size_compressed)});
+ break;
+ }
+ }
WriteRecord(OpCode::kChunk, string_builder_.Result());
std::map<uint16_t, uint64_t> index_offsets;
const uint64_t message_index_start = output_.tellp();
- for (const auto &indices : message_indices_) {
+ for (const auto &indices : chunk->message_indices) {
index_offsets[indices.first] = output_.tellp();
string_builder_.Reset();
AppendInt16(&string_builder_, indices.first);
AppendMessageIndices(&string_builder_, indices.second);
WriteRecord(OpCode::kMessageIndex, string_builder_.Result());
}
- message_indices_.clear();
+ chunk->message_indices.clear();
chunk_indices_.push_back(ChunkIndex{
- earliest_chunk_message_.value(), latest_message_, chunk_offset,
- message_index_start - chunk_offset, records_size, index_offsets,
- static_cast<uint64_t>(output_.tellp()) - message_index_start});
- earliest_chunk_message_.reset();
+ .start_time = chunk->earliest_message.value(),
+ .end_time = chunk->latest_message.value(),
+ .offset = chunk_offset,
+ .chunk_size = message_index_start - chunk_offset,
+ .records_size = records_size,
+ .records_size_compressed = records_size_compressed,
+ .message_index_offsets = index_offsets,
+ .message_index_size =
+ static_cast<uint64_t>(output_.tellp()) - message_index_start,
+ .compression = compression_});
+ chunk->earliest_message.reset();
}
McapLogger::SummaryOffset McapLogger::WriteStatistics() {
@@ -491,9 +572,9 @@
AppendChannelMap(&string_builder_, index.message_index_offsets);
AppendInt64(&string_builder_, index.message_index_size);
// Compression used.
- AppendString(&string_builder_, "");
+ AppendString(&string_builder_, CompressionName(index.compression));
// Compressed and uncompressed records size.
- AppendInt64(&string_builder_, index.records_size);
+ AppendInt64(&string_builder_, index.records_size_compressed);
AppendInt64(&string_builder_, index.records_size);
WriteRecord(OpCode::kChunkIndex, string_builder_.Result());
}
diff --git a/aos/util/mcap_logger.h b/aos/util/mcap_logger.h
index d7409fb..70a1328 100644
--- a/aos/util/mcap_logger.h
+++ b/aos/util/mcap_logger.h
@@ -36,8 +36,24 @@
kJson,
kFlatbuffer,
};
+ // Whether to attempt to shorten channel names.
+ enum class CanonicalChannelNames {
+ // Just use the full, unambiguous, channel names.
+ kCanonical,
+ // Use GetChannelAliases() to determine the shortest possible name for the
+ // channel for the current node, and use that in the MCAP file. This makes
+ // it so that the channels in the resulting file are more likely to match
+ // the channel names that are used in "real" applications.
+ kShortened,
+ };
+ // Chunk compression to use in the MCAP file.
+ enum class Compression {
+ kNone,
+ kLz4,
+ };
McapLogger(EventLoop *event_loop, const std::string &output_path,
- Serialization serialization);
+ Serialization serialization,
+ CanonicalChannelNames canonical_channels, Compression compression);
~McapLogger();
private:
@@ -77,8 +93,10 @@
uint64_t offset;
// Total size of the Chunk, in bytes.
uint64_t chunk_size;
- // Total size of the records portion of the Chunk, in bytes.
+ // Total uncompressed size of the records portion of the Chunk, in bytes.
uint64_t records_size;
+ // Total size of the records portion of the Chunk, when compressed
+ uint64_t records_size_compressed;
// Mapping of channel IDs to the MessageIndex entry for that channel within
// the referenced Chunk. The MessageIndex is referenced by an offset from
// the start of the file.
@@ -86,6 +104,30 @@
// Total size, in bytes, of all the MessageIndex entries for this Chunk
// together (note that they are required to be contiguous).
uint64_t message_index_size;
+ // Compression used in this Chunk.
+ Compression compression;
+ };
+ // Maintains the state of a single Chunk. In order to maximize read
+ // performance, we currently maintain separate chunks for each channel so
+ // that, in order to read a given channel, only data associated with that
+ // channel nead be read.
+ struct ChunkStatus {
+ // Buffer containing serialized message data for the currently-being-built
+ // chunk.
+ std::stringstream data;
+ // Earliest message observed in this chunk.
+ std::optional<aos::monotonic_clock::time_point> earliest_message;
+ // Latest message observed in this chunk.
+ std::optional<aos::monotonic_clock::time_point> latest_message;
+ // MessageIndex's for each message. The std::map is indexed by channel ID.
+ // The vector is then a series of pairs of (timestamp, offset from start of
+ // data).
+ // Note that currently this will only ever have one entry, for the channel
+ // that this chunk corresponds to. However, the standard provides for there
+ // being more than one channel per chunk and so we still have some code that
+ // supports that.
+ std::map<uint16_t, std::vector<std::pair<uint64_t, uint64_t>>>
+ message_indices;
};
enum class RegisterHandlers { kYes, kNo };
// Helpers to write each type of relevant record.
@@ -98,8 +140,8 @@
const aos::Channel *channel,
std::string_view override_name = "");
void WriteMessage(uint16_t channel_id, const Channel *channel,
- const Context &context, std::ostream *output);
- void WriteChunk();
+ const Context &context, ChunkStatus *chunk);
+ void WriteChunk(ChunkStatus *chunk);
// The helpers for writing records which appear in the Summary section will
// return SummaryOffset's so that they can be referenced in the SummaryOffset
@@ -131,28 +173,23 @@
aos::EventLoop *event_loop_;
std::ofstream output_;
const Serialization serialization_;
+ const CanonicalChannelNames canonical_channels_;
+ const Compression compression_;
size_t total_message_bytes_ = 0;
std::map<const Channel *, size_t> total_channel_bytes_;
- // Buffer containing serialized message data for the currently-being-built
- // chunk.
- std::stringstream current_chunk_;
FastStringBuilder string_builder_;
// Earliest message observed in this logfile.
std::optional<aos::monotonic_clock::time_point> earliest_message_;
- // Earliest message observed in the current chunk.
- std::optional<aos::monotonic_clock::time_point> earliest_chunk_message_;
- // Latest message observed.
+ // Latest message observed in this logfile.
aos::monotonic_clock::time_point latest_message_ =
aos::monotonic_clock::min_time;
// Count of all messages on each channel, indexed by channel ID.
std::map<uint16_t, uint64_t> message_counts_;
std::map<uint16_t, std::unique_ptr<RawFetcher>> fetchers_;
- // MessageIndex's for each message. The std::map is indexed by channel ID. The
- // vector is then a series of pairs of (timestamp, offset from start of
- // current_chunk_).
- std::map<uint16_t, std::vector<std::pair<uint64_t, uint64_t>>>
- message_indices_;
+ // All currently-being-built chunks. Indexed by channel ID. This is used to
+ // segregate channels into separate chunks to support more efficient reading.
+ std::map<uint16_t, ChunkStatus> current_chunks_;
// ChunkIndex's for all fully written Chunks.
std::vector<ChunkIndex> chunk_indices_;
@@ -162,6 +199,9 @@
uint16_t configuration_id_ = 0;
FlatbufferDetachedBuffer<Channel> configuration_channel_;
FlatbufferDetachedBuffer<Configuration> configuration_;
+
+ // Memory buffer to use for compressing data.
+ std::vector<uint8_t> compression_buffer_;
};
} // namespace aos
#endif // AOS_UTIL_MCAP_LOGGER_H_
diff --git a/debian/BUILD.lz4.bazel b/debian/BUILD.lz4.bazel
new file mode 100644
index 0000000..7aa8def
--- /dev/null
+++ b/debian/BUILD.lz4.bazel
@@ -0,0 +1,22 @@
+licenses(["notice"])
+
+cc_library(
+ name = "lz4",
+ srcs = [
+ "lz4.c",
+ "lz4frame.c",
+ "lz4hc.c",
+ "xxhash.c",
+ ],
+ hdrs = [
+ # lz4hc.c tries to #include lz4.c....
+ "lz4.c",
+ "lz4.h",
+ "lz4frame.h",
+ "lz4hc.h",
+ "xxhash.h",
+ ],
+ include_prefix = "lz4",
+ includes = ["."],
+ visibility = ["//visibility:public"],
+)