Add a LogReader-based timepoints channel to MCAP converter
This makes it so that I can visualize how different clock estimators are
doing relative to the estimator in the LogReader.
Change-Id: I4ebcc1052c6b043daecda82a08d0a3a29f85d58d
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 8b2e1b8..1d5e60a 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -1623,6 +1623,7 @@
aos::FlatbufferVector<reflection::Schema> schema, const aos::Node *node,
ChannelT overrides) {
overrides.name = name;
+ CHECK(schema.message().has_root_table());
overrides.type = schema.message().root_table()->name()->string_view();
if (node != nullptr) {
CHECK(node->has_name());
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 22eedc1..c4704ff 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -234,13 +234,13 @@
// args for the Main class. Returns a pointer to the class that was started
// if it was started, or nullptr.
template <class Main, class... Args>
- Main *MaybeStart(std::string_view name, Args &&...args);
+ Main *MaybeStart(std::string_view name, Args &&... args);
// Starts an application regardless of if the config says to or not. name is
// the name of the application, and args are the constructor args for the
// application. Returns a pointer to the class that was started.
template <class Main, class... Args>
- Main *AlwaysStart(std::string_view name, Args &&...args);
+ Main *AlwaysStart(std::string_view name, Args &&... args);
// Returns the simulated network delay for messages forwarded between nodes.
std::chrono::nanoseconds network_delay() const {
@@ -252,6 +252,8 @@
size_t boot_count() const { return scheduler_.boot_count(); }
+ bool is_running() const { return scheduler_.is_running(); }
+
// TODO(austin): Private for the following?
// Converts a time to the distributed clock for scheduling and cross-node time
@@ -348,7 +350,7 @@
// application.
template <class... Args>
TypedApplication(NodeEventLoopFactory *node_factory, std::string_view name,
- Args &&...args)
+ Args &&... args)
: Application(node_factory, name),
main(event_loop.get(), std::forward<Args>(args)...) {
VLOG(1) << node_factory->scheduler_.distributed_now() << " "
@@ -367,7 +369,7 @@
};
template <class Main, class... Args>
-Main *NodeEventLoopFactory::MaybeStart(std::string_view name, Args &&...args) {
+Main *NodeEventLoopFactory::MaybeStart(std::string_view name, Args &&... args) {
const aos::Application *application =
configuration::GetApplication(configuration(), node(), name);
@@ -378,7 +380,8 @@
}
template <class Main, class... Args>
-Main *NodeEventLoopFactory::AlwaysStart(std::string_view name, Args &&...args) {
+Main *NodeEventLoopFactory::AlwaysStart(std::string_view name,
+ Args &&... args) {
std::unique_ptr<TypedApplication<Main>> app =
std::make_unique<TypedApplication<Main>>(this, name,
std::forward<Args>(args)...);
diff --git a/aos/util/BUILD b/aos/util/BUILD
index 3731965..48dc054 100644
--- a/aos/util/BUILD
+++ b/aos/util/BUILD
@@ -1,4 +1,5 @@
load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("//aos:flatbuffers.bzl", "cc_static_flatbuffer")
load("config_validator_macro.bzl", "config_validator_rule")
package(default_visibility = ["//visibility:public"])
@@ -46,7 +47,10 @@
name = "log_to_mcap",
srcs = ["log_to_mcap.cc"],
deps = [
+ ":clock_publisher",
+ ":clock_timepoints_schema",
":mcap_logger",
+ "//aos:configuration",
"//aos:init",
"//aos/events/logging:log_reader",
"//frc971/control_loops:control_loops_fbs",
@@ -75,6 +79,29 @@
],
)
+flatbuffer_cc_library(
+ name = "clock_timepoints_fbs",
+ srcs = ["clock_timepoints.fbs"],
+ gen_reflections = True,
+ deps = ["//aos:configuration_fbs"],
+)
+
+cc_static_flatbuffer(
+ name = "clock_timepoints_schema",
+ function = "aos::ClockTimepointsSchema",
+ target = ":clock_timepoints_fbs_reflection_out",
+)
+
+cc_library(
+ name = "clock_publisher",
+ srcs = ["clock_publisher.cc"],
+ hdrs = ["clock_publisher.h"],
+ deps = [
+ ":clock_timepoints_fbs",
+ "//aos/events:simulated_event_loop",
+ ],
+)
+
cc_library(
name = "mcap_logger",
srcs = ["mcap_logger.cc"],
diff --git a/aos/util/clock_publisher.cc b/aos/util/clock_publisher.cc
new file mode 100644
index 0000000..93102f6
--- /dev/null
+++ b/aos/util/clock_publisher.cc
@@ -0,0 +1,51 @@
+#include "aos/util/clock_publisher.h"
+
+namespace aos {
+ClockPublisher::ClockPublisher(aos::SimulatedEventLoopFactory *factory,
+ aos::EventLoop *event_loop)
+ : factory_(factory),
+ timepoints_sender_(event_loop->MakeSender<ClockTimepoints>("/clocks")) {
+ aos::TimerHandler *timer_handler =
+ event_loop->AddTimer([this]() { SendTimepoints(); });
+ event_loop->OnRun([timer_handler, event_loop]() {
+ timer_handler->Setup(event_loop->context().monotonic_event_time,
+ std::chrono::seconds(1));
+ });
+}
+
+void ClockPublisher::SendTimepoints() {
+ std::vector<flatbuffers::Offset<NodeTimepoint>> timepoints;
+ auto builder = timepoints_sender_.MakeBuilder();
+ for (const aos::Node *node : factory_->nodes()) {
+ const NodeEventLoopFactory *node_factory =
+ factory_->GetNodeEventLoopFactory(node);
+ flatbuffers::Offset<flatbuffers::String> node_name =
+ (node != nullptr)
+ ? builder.fbb()->CreateString(node->name()->string_view())
+ : flatbuffers::Offset<flatbuffers::String>(0);
+ NodeTimepoint::Builder timepoint_builder =
+ builder.MakeBuilder<NodeTimepoint>();
+ if (node != nullptr) {
+ timepoint_builder.add_node(node_name);
+ }
+ if (node_factory->is_running()) {
+ timepoint_builder.add_boot_count(node_factory->boot_count());
+ timepoint_builder.add_monotonic_time(
+ node_factory->monotonic_now().time_since_epoch().count());
+ timepoint_builder.add_realtime_time(
+ node_factory->realtime_now().time_since_epoch().count());
+ }
+ timepoints.push_back(timepoint_builder.Finish());
+ }
+ const flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<NodeTimepoint>>>
+ timepoints_offset = builder.fbb()->CreateVector(timepoints);
+ ClockTimepoints::Builder timepoints_builder =
+ builder.MakeBuilder<ClockTimepoints>();
+ timepoints_builder.add_distributed_clock(
+ factory_->distributed_now().time_since_epoch().count());
+ timepoints_builder.add_clocks(timepoints_offset);
+ builder.CheckOk(builder.Send(timepoints_builder.Finish()));
+}
+
+} // namespace aos
diff --git a/aos/util/clock_publisher.h b/aos/util/clock_publisher.h
new file mode 100644
index 0000000..a6d3488
--- /dev/null
+++ b/aos/util/clock_publisher.h
@@ -0,0 +1,26 @@
+#ifndef AOS_UTIL_CLOCK_PUBLISHER_H_
+#define AOS_UTIL_CLOCK_PUBLISHER_H_
+#include "aos/events/simulated_event_loop.h"
+#include "aos/util/clock_timepoints_generated.h"
+
+namespace aos {
+// A simple class that periodically queries a SimulatedEventLoopFactory for the
+// current timestamps on all nodes and publishes a ClockTimepoints message on
+// the provided EventLoop.
+// This is used by the log_to_mcap converter to allow Foxglove users access to
+// offset estimates. In order to use this, a /clocks channel with a type of
+// aos.ClockTimepoints must be available.
+class ClockPublisher {
+ public:
+ ClockPublisher(aos::SimulatedEventLoopFactory *factory,
+ aos::EventLoop *event_loop);
+
+ private:
+ void SendTimepoints();
+
+ aos::SimulatedEventLoopFactory *const factory_;
+ aos::Sender<ClockTimepoints> timepoints_sender_;
+};
+} // namespace aos
+
+#endif // AOS_UTIL_CLOCK_PUBLISHER_H_
diff --git a/aos/util/clock_timepoints.fbs b/aos/util/clock_timepoints.fbs
new file mode 100644
index 0000000..892b1c4
--- /dev/null
+++ b/aos/util/clock_timepoints.fbs
@@ -0,0 +1,27 @@
+include "aos/configuration.fbs";
+
+namespace aos;
+
+// Current clock values on a given node. Clock values + boot count will not be populated if
+// the node is not currently running.
+table NodeTimepoint {
+ // The name of the node that this clock corresponds to.
+ node:string (id: 0);
+ // Current boot count for this node (to allow observing reboots).
+ boot_count:int (id: 1);
+ // Current monotonic time of this clock, in nanoseconds.
+ monotonic_time:int64 (id: 2);
+ // Current realtime (UNIX epoch) time of this clock, in nanoseconds.
+ realtime_time:int64 (id: 3);
+}
+
+table ClockTimepoints {
+ // Current "distributed clock" time, in nanoseconds. This will roughly correspond to the
+ // average of the monotonic clocks across all devices, and will itself be monotonic.
+ distributed_clock:int64 (id: 0);
+ // Current clock values for every node. There will be an entry for every node, and the nodes
+ // will be in the same order as they are in in the config used to generated this message.
+ clocks:[NodeTimepoint] (id: 1);
+}
+
+root_type ClockTimepoints;
diff --git a/aos/util/log_to_mcap.cc b/aos/util/log_to_mcap.cc
index ca2c98d..a94473e 100644
--- a/aos/util/log_to_mcap.cc
+++ b/aos/util/log_to_mcap.cc
@@ -1,6 +1,9 @@
+#include "aos/configuration.h"
#include "aos/events/event_loop_generated.h"
#include "aos/events/logging/log_reader.h"
#include "aos/init.h"
+#include "aos/util/clock_publisher.h"
+#include "aos/util/clock_timepoints_schema.h"
#include "aos/util/mcap_logger.h"
DEFINE_string(node, "", "Node to replay from the perspective of.");
@@ -11,6 +14,9 @@
"If set, use full channel names; by default, will shorten names to be the "
"shortest possible version of the name (e.g., /aos instead of /pi/aos).");
DEFINE_bool(compress, true, "Whether to use LZ4 compression in MCAP file.");
+DEFINE_bool(include_clocks, true,
+ "Whether to add a /clocks channel that publishes all nodes' clock "
+ "offsets.");
// Converts an AOS log to an MCAP log that can be fed into Foxglove. To try this
// out, run:
@@ -40,7 +46,22 @@
replay_node = logger_node;
}
- aos::logger::LogReader reader(logfiles);
+ std::optional<aos::FlatbufferDetachedBuffer<aos::Configuration>> config;
+
+ if (FLAGS_include_clocks) {
+ aos::logger::LogReader config_reader(logfiles);
+
+ const aos::Configuration *raw_config = config_reader.configuration();
+ config = aos::configuration::AddChannelToConfiguration(
+ raw_config, "/clocks",
+ aos::FlatbufferSpan<reflection::Schema>(aos::ClockTimepointsSchema()),
+ replay_node.empty()
+ ? nullptr
+ : aos::configuration::GetNode(raw_config, replay_node));
+ }
+
+ aos::logger::LogReader reader(
+ logfiles, config.has_value() ? &config.value().message() : nullptr);
aos::SimulatedEventLoopFactory factory(reader.configuration());
reader.RegisterWithoutStarting(&factory);
@@ -50,6 +71,17 @@
? nullptr
: aos::configuration::GetNode(reader.configuration(), replay_node);
+ std::unique_ptr<aos::EventLoop> clock_event_loop;
+ std::unique_ptr<aos::ClockPublisher> clock_publisher;
+ if (FLAGS_include_clocks) {
+ // TODO(james): Currently, because of RegisterWithoutStarting, this ends up
+ // running from t=0.0 rather than the start of the logfile. Fix that.
+ clock_event_loop =
+ reader.event_loop_factory()->MakeEventLoop("clock", node);
+ clock_publisher =
+ std::make_unique<aos::ClockPublisher>(&factory, clock_event_loop.get());
+ }
+
std::unique_ptr<aos::EventLoop> mcap_event_loop =
reader.event_loop_factory()->MakeEventLoop("mcap", node);
CHECK(!FLAGS_output_path.empty());