Fix handling of special channels in log_to_mcap
Because RegisterWithoutStarting starts the clocks at 0.0, and because
the various options for creating startup handlers didn't obviously help
in this situation, fix the special configuration and timestamp channels
to not produce messages at t=0.0.
Change-Id: Ifaab480c7f37a6b9fea0317d3a5a529bdb3c3c86
Signed-off-by: James Kuszmaul <jabukuszmaul@gmail.com>
diff --git a/aos/util/log_to_mcap.cc b/aos/util/log_to_mcap.cc
index a94473e..38a3be7 100644
--- a/aos/util/log_to_mcap.cc
+++ b/aos/util/log_to_mcap.cc
@@ -51,7 +51,7 @@
if (FLAGS_include_clocks) {
aos::logger::LogReader config_reader(logfiles);
- const aos::Configuration *raw_config = config_reader.configuration();
+ const aos::Configuration *raw_config = config_reader.logged_configuration();
config = aos::configuration::AddChannelToConfiguration(
raw_config, "/clocks",
aos::FlatbufferSpan<reflection::Schema>(aos::ClockTimepointsSchema()),
@@ -74,26 +74,33 @@
std::unique_ptr<aos::EventLoop> clock_event_loop;
std::unique_ptr<aos::ClockPublisher> clock_publisher;
if (FLAGS_include_clocks) {
- // TODO(james): Currently, because of RegisterWithoutStarting, this ends up
- // running from t=0.0 rather than the start of the logfile. Fix that.
- clock_event_loop =
- reader.event_loop_factory()->MakeEventLoop("clock", node);
- clock_publisher =
- std::make_unique<aos::ClockPublisher>(&factory, clock_event_loop.get());
+ reader.OnStart(node, [&clock_event_loop, &reader, &clock_publisher,
+ &factory, node]() {
+ clock_event_loop =
+ reader.event_loop_factory()->MakeEventLoop("clock", node);
+ clock_publisher = std::make_unique<aos::ClockPublisher>(
+ &factory, clock_event_loop.get());
+ });
}
- std::unique_ptr<aos::EventLoop> mcap_event_loop =
- reader.event_loop_factory()->MakeEventLoop("mcap", node);
+ std::unique_ptr<aos::EventLoop> mcap_event_loop;
CHECK(!FLAGS_output_path.empty());
- aos::McapLogger relogger(
- mcap_event_loop.get(), FLAGS_output_path,
- FLAGS_mode == "flatbuffer" ? aos::McapLogger::Serialization::kFlatbuffer
- : aos::McapLogger::Serialization::kJson,
- FLAGS_canonical_channel_names
- ? aos::McapLogger::CanonicalChannelNames::kCanonical
- : aos::McapLogger::CanonicalChannelNames::kShortened,
- FLAGS_compress ? aos::McapLogger::Compression::kLz4
- : aos::McapLogger::Compression::kNone);
+ std::unique_ptr<aos::McapLogger> relogger;
+ factory.GetNodeEventLoopFactory(node)
+ ->OnStartup([&relogger, &mcap_event_loop, &reader, node]() {
+ mcap_event_loop =
+ reader.event_loop_factory()->MakeEventLoop("mcap", node);
+ relogger = std::make_unique<aos::McapLogger>(
+ mcap_event_loop.get(), FLAGS_output_path,
+ FLAGS_mode == "flatbuffer"
+ ? aos::McapLogger::Serialization::kFlatbuffer
+ : aos::McapLogger::Serialization::kJson,
+ FLAGS_canonical_channel_names
+ ? aos::McapLogger::CanonicalChannelNames::kCanonical
+ : aos::McapLogger::CanonicalChannelNames::kShortened,
+ FLAGS_compress ? aos::McapLogger::Compression::kLz4
+ : aos::McapLogger::Compression::kNone);
+ });
reader.event_loop_factory()->Run();
reader.Deregister();
}
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
index 88476ed..96a9b60 100644
--- a/aos/util/mcap_logger.cc
+++ b/aos/util/mcap_logger.cc
@@ -229,17 +229,6 @@
// Manually add in a special /configuration channel.
if (register_handlers == RegisterHandlers::kYes) {
configuration_id_ = ++id;
- event_loop_->OnRun([this]() {
- // TODO(james): Make it so that the timestamp for the configuration
- // message is not 0.0.
- Context config_context;
- config_context.monotonic_event_time = event_loop_->monotonic_now();
- config_context.queue_index = 0;
- config_context.size = configuration_.span().size();
- config_context.data = configuration_.span().data();
- WriteMessage(configuration_id_, &configuration_channel_.message(),
- config_context, ¤t_chunks_[configuration_id_]);
- });
}
std::vector<SummaryOffset> offsets;
@@ -276,6 +265,18 @@
return offsets;
}
+void McapLogger::WriteConfigurationMessage() {
+ Context config_context;
+ config_context.monotonic_event_time = event_loop_->monotonic_now();
+ config_context.queue_index = 0;
+ config_context.size = configuration_.span().size();
+ config_context.data = configuration_.span().data();
+ // Avoid infinite recursion...
+ wrote_configuration_ = true;
+ WriteMessage(configuration_id_, &configuration_channel_.message(),
+ config_context, ¤t_chunks_[configuration_id_]);
+}
+
void McapLogger::WriteMagic() { output_ << "\x89MCAP0\r\n"; }
void McapLogger::WriteHeader() {
@@ -390,6 +391,9 @@
void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
const Context &context, ChunkStatus *chunk) {
+ if (!wrote_configuration_) {
+ WriteConfigurationMessage();
+ }
CHECK_NOTNULL(context.data);
message_counts_[channel_id]++;
diff --git a/aos/util/mcap_logger.h b/aos/util/mcap_logger.h
index 70a1328..c3fdda1 100644
--- a/aos/util/mcap_logger.h
+++ b/aos/util/mcap_logger.h
@@ -143,6 +143,11 @@
const Context &context, ChunkStatus *chunk);
void WriteChunk(ChunkStatus *chunk);
+ // Writes out the special configuration channel. This gets called right before
+ // the first actual message is written so that we can have a reasonable
+ // monotonic clock time.
+ void WriteConfigurationMessage();
+
// The helpers for writing records which appear in the Summary section will
// return SummaryOffset's so that they can be referenced in the SummaryOffset
// section.
@@ -199,6 +204,7 @@
uint16_t configuration_id_ = 0;
FlatbufferDetachedBuffer<Channel> configuration_channel_;
FlatbufferDetachedBuffer<Configuration> configuration_;
+ bool wrote_configuration_ = false;
// Memory buffer to use for compressing data.
std::vector<uint8_t> compression_buffer_;