Add a LogReader-based timepoints channel to MCAP converter

This makes it so that I can visualize how different clock estimators are
doing relative to the estimator in the LogReader.

Change-Id: I4ebcc1052c6b043daecda82a08d0a3a29f85d58d
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 8b2e1b8..1d5e60a 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -1623,6 +1623,7 @@
     aos::FlatbufferVector<reflection::Schema> schema, const aos::Node *node,
     ChannelT overrides) {
   overrides.name = name;
+  CHECK(schema.message().has_root_table());
   overrides.type = schema.message().root_table()->name()->string_view();
   if (node != nullptr) {
     CHECK(node->has_name());
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 22eedc1..c4704ff 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -234,13 +234,13 @@
   // args for the Main class.  Returns a pointer to the class that was started
   // if it was started, or nullptr.
   template <class Main, class... Args>
-  Main *MaybeStart(std::string_view name, Args &&...args);
+  Main *MaybeStart(std::string_view name, Args &&... args);
 
   // Starts an application regardless of if the config says to or not.  name is
   // the name of the application, and args are the constructor args for the
   // application.  Returns a pointer to the class that was started.
   template <class Main, class... Args>
-  Main *AlwaysStart(std::string_view name, Args &&...args);
+  Main *AlwaysStart(std::string_view name, Args &&... args);
 
   // Returns the simulated network delay for messages forwarded between nodes.
   std::chrono::nanoseconds network_delay() const {
@@ -252,6 +252,8 @@
 
   size_t boot_count() const { return scheduler_.boot_count(); }
 
+  bool is_running() const { return scheduler_.is_running(); }
+
   // TODO(austin): Private for the following?
 
   // Converts a time to the distributed clock for scheduling and cross-node time
@@ -348,7 +350,7 @@
     // application.
     template <class... Args>
     TypedApplication(NodeEventLoopFactory *node_factory, std::string_view name,
-                     Args &&...args)
+                     Args &&... args)
         : Application(node_factory, name),
           main(event_loop.get(), std::forward<Args>(args)...) {
       VLOG(1) << node_factory->scheduler_.distributed_now() << " "
@@ -367,7 +369,7 @@
 };
 
 template <class Main, class... Args>
-Main *NodeEventLoopFactory::MaybeStart(std::string_view name, Args &&...args) {
+Main *NodeEventLoopFactory::MaybeStart(std::string_view name, Args &&... args) {
   const aos::Application *application =
       configuration::GetApplication(configuration(), node(), name);
 
@@ -378,7 +380,8 @@
 }
 
 template <class Main, class... Args>
-Main *NodeEventLoopFactory::AlwaysStart(std::string_view name, Args &&...args) {
+Main *NodeEventLoopFactory::AlwaysStart(std::string_view name,
+                                        Args &&... args) {
   std::unique_ptr<TypedApplication<Main>> app =
       std::make_unique<TypedApplication<Main>>(this, name,
                                                std::forward<Args>(args)...);
diff --git a/aos/util/BUILD b/aos/util/BUILD
index 3731965..48dc054 100644
--- a/aos/util/BUILD
+++ b/aos/util/BUILD
@@ -1,4 +1,5 @@
 load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("//aos:flatbuffers.bzl", "cc_static_flatbuffer")
 load("config_validator_macro.bzl", "config_validator_rule")
 
 package(default_visibility = ["//visibility:public"])
@@ -46,7 +47,10 @@
     name = "log_to_mcap",
     srcs = ["log_to_mcap.cc"],
     deps = [
+        ":clock_publisher",
+        ":clock_timepoints_schema",
         ":mcap_logger",
+        "//aos:configuration",
         "//aos:init",
         "//aos/events/logging:log_reader",
         "//frc971/control_loops:control_loops_fbs",
@@ -75,6 +79,29 @@
     ],
 )
 
+flatbuffer_cc_library(
+    name = "clock_timepoints_fbs",
+    srcs = ["clock_timepoints.fbs"],
+    gen_reflections = True,
+    deps = ["//aos:configuration_fbs"],
+)
+
+cc_static_flatbuffer(
+    name = "clock_timepoints_schema",
+    function = "aos::ClockTimepointsSchema",
+    target = ":clock_timepoints_fbs_reflection_out",
+)
+
+cc_library(
+    name = "clock_publisher",
+    srcs = ["clock_publisher.cc"],
+    hdrs = ["clock_publisher.h"],
+    deps = [
+        ":clock_timepoints_fbs",
+        "//aos/events:simulated_event_loop",
+    ],
+)
+
 cc_library(
     name = "mcap_logger",
     srcs = ["mcap_logger.cc"],
diff --git a/aos/util/clock_publisher.cc b/aos/util/clock_publisher.cc
new file mode 100644
index 0000000..93102f6
--- /dev/null
+++ b/aos/util/clock_publisher.cc
@@ -0,0 +1,51 @@
+#include "aos/util/clock_publisher.h"
+
+namespace aos {
+ClockPublisher::ClockPublisher(aos::SimulatedEventLoopFactory *factory,
+                               aos::EventLoop *event_loop)
+    : factory_(factory),
+      timepoints_sender_(event_loop->MakeSender<ClockTimepoints>("/clocks")) {
+  aos::TimerHandler *timer_handler =
+      event_loop->AddTimer([this]() { SendTimepoints(); });
+  event_loop->OnRun([timer_handler, event_loop]() {
+    timer_handler->Setup(event_loop->context().monotonic_event_time,
+                         std::chrono::seconds(1));
+  });
+}
+
+void ClockPublisher::SendTimepoints() {
+  std::vector<flatbuffers::Offset<NodeTimepoint>> timepoints;
+  auto builder = timepoints_sender_.MakeBuilder();
+  for (const aos::Node *node : factory_->nodes()) {
+    const NodeEventLoopFactory *node_factory =
+        factory_->GetNodeEventLoopFactory(node);
+    flatbuffers::Offset<flatbuffers::String> node_name =
+        (node != nullptr)
+            ? builder.fbb()->CreateString(node->name()->string_view())
+            : flatbuffers::Offset<flatbuffers::String>(0);
+    NodeTimepoint::Builder timepoint_builder =
+        builder.MakeBuilder<NodeTimepoint>();
+    if (node != nullptr) {
+      timepoint_builder.add_node(node_name);
+    }
+    if (node_factory->is_running()) {
+      timepoint_builder.add_boot_count(node_factory->boot_count());
+      timepoint_builder.add_monotonic_time(
+          node_factory->monotonic_now().time_since_epoch().count());
+      timepoint_builder.add_realtime_time(
+          node_factory->realtime_now().time_since_epoch().count());
+    }
+    timepoints.push_back(timepoint_builder.Finish());
+  }
+  const flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<NodeTimepoint>>>
+      timepoints_offset = builder.fbb()->CreateVector(timepoints);
+  ClockTimepoints::Builder timepoints_builder =
+      builder.MakeBuilder<ClockTimepoints>();
+  timepoints_builder.add_distributed_clock(
+      factory_->distributed_now().time_since_epoch().count());
+  timepoints_builder.add_clocks(timepoints_offset);
+  builder.CheckOk(builder.Send(timepoints_builder.Finish()));
+}
+
+}  // namespace aos
diff --git a/aos/util/clock_publisher.h b/aos/util/clock_publisher.h
new file mode 100644
index 0000000..a6d3488
--- /dev/null
+++ b/aos/util/clock_publisher.h
@@ -0,0 +1,26 @@
+#ifndef AOS_UTIL_CLOCK_PUBLISHER_H_
+#define AOS_UTIL_CLOCK_PUBLISHER_H_
+#include "aos/events/simulated_event_loop.h"
+#include "aos/util/clock_timepoints_generated.h"
+
+namespace aos {
+// A simple class that periodically queries a SimulatedEventLoopFactory for the
+// current timestamps on all nodes and publishes a ClockTimepoints message on
+// the provided EventLoop.
+// This is used by the log_to_mcap converter to allow Foxglove users access to
+// offset estimates. In order to use this, a /clocks channel with a type of
+// aos.ClockTimepoints must be available.
+class ClockPublisher {
+ public:
+  ClockPublisher(aos::SimulatedEventLoopFactory *factory,
+                 aos::EventLoop *event_loop);
+
+ private:
+  void SendTimepoints();
+
+  aos::SimulatedEventLoopFactory *const factory_;
+  aos::Sender<ClockTimepoints> timepoints_sender_;
+};
+}  // namespace aos
+
+#endif  // AOS_UTIL_CLOCK_PUBLISHER_H_
diff --git a/aos/util/clock_timepoints.fbs b/aos/util/clock_timepoints.fbs
new file mode 100644
index 0000000..892b1c4
--- /dev/null
+++ b/aos/util/clock_timepoints.fbs
@@ -0,0 +1,27 @@
+include "aos/configuration.fbs";
+
+namespace aos;
+
+// Current clock values on a given node. Clock values + boot count will not be populated if
+// the node is not currently running.
+table NodeTimepoint {
+  // The name of the node that this clock corresponds to.
+  node:string (id: 0);
+  // Current boot count for this node (to allow observing reboots).
+  boot_count:int (id: 1);
+  // Current monotonic time of this clock, in nanoseconds.
+  monotonic_time:int64 (id: 2);
+  // Current realtime (UNIX epoch) time of this clock, in nanoseconds.
+  realtime_time:int64 (id: 3);
+}
+
+table ClockTimepoints {
+  // Current "distributed clock" time, in nanoseconds. This will roughly correspond to the
+  // average of the monotonic clocks across all devices, and will itself be monotonic.
+  distributed_clock:int64 (id: 0);
+  // Current clock values for every node. There will be an entry for every node, and the nodes
+  // will be in the same order as they are in in the config used to generated this message.
+  clocks:[NodeTimepoint] (id: 1);
+}
+
+root_type ClockTimepoints;
diff --git a/aos/util/log_to_mcap.cc b/aos/util/log_to_mcap.cc
index ca2c98d..a94473e 100644
--- a/aos/util/log_to_mcap.cc
+++ b/aos/util/log_to_mcap.cc
@@ -1,6 +1,9 @@
+#include "aos/configuration.h"
 #include "aos/events/event_loop_generated.h"
 #include "aos/events/logging/log_reader.h"
 #include "aos/init.h"
+#include "aos/util/clock_publisher.h"
+#include "aos/util/clock_timepoints_schema.h"
 #include "aos/util/mcap_logger.h"
 
 DEFINE_string(node, "", "Node to replay from the perspective of.");
@@ -11,6 +14,9 @@
     "If set, use full channel names; by default, will shorten names to be the "
     "shortest possible version of the name (e.g., /aos instead of /pi/aos).");
 DEFINE_bool(compress, true, "Whether to use LZ4 compression in MCAP file.");
+DEFINE_bool(include_clocks, true,
+            "Whether to add a /clocks channel that publishes all nodes' clock "
+            "offsets.");
 
 // Converts an AOS log to an MCAP log that can be fed into Foxglove. To try this
 // out, run:
@@ -40,7 +46,22 @@
     replay_node = logger_node;
   }
 
-  aos::logger::LogReader reader(logfiles);
+  std::optional<aos::FlatbufferDetachedBuffer<aos::Configuration>> config;
+
+  if (FLAGS_include_clocks) {
+    aos::logger::LogReader config_reader(logfiles);
+
+    const aos::Configuration *raw_config = config_reader.configuration();
+    config = aos::configuration::AddChannelToConfiguration(
+        raw_config, "/clocks",
+        aos::FlatbufferSpan<reflection::Schema>(aos::ClockTimepointsSchema()),
+        replay_node.empty()
+            ? nullptr
+            : aos::configuration::GetNode(raw_config, replay_node));
+  }
+
+  aos::logger::LogReader reader(
+      logfiles, config.has_value() ? &config.value().message() : nullptr);
   aos::SimulatedEventLoopFactory factory(reader.configuration());
   reader.RegisterWithoutStarting(&factory);
 
@@ -50,6 +71,17 @@
           ? nullptr
           : aos::configuration::GetNode(reader.configuration(), replay_node);
 
+  std::unique_ptr<aos::EventLoop> clock_event_loop;
+  std::unique_ptr<aos::ClockPublisher> clock_publisher;
+  if (FLAGS_include_clocks) {
+    // TODO(james): Currently, because of RegisterWithoutStarting, this ends up
+    // running from t=0.0 rather than the start of the logfile. Fix that.
+    clock_event_loop =
+        reader.event_loop_factory()->MakeEventLoop("clock", node);
+    clock_publisher =
+        std::make_unique<aos::ClockPublisher>(&factory, clock_event_loop.get());
+  }
+
   std::unique_ptr<aos::EventLoop> mcap_event_loop =
       reader.event_loop_factory()->MakeEventLoop("mcap", node);
   CHECK(!FLAGS_output_path.empty());