Use log source for log reading

Change-Id: I4d6018a6117f5864cda38a5e6485c6d08a782999
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index af8a4e8..874fe43 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -598,7 +598,7 @@
   filters_ =
       std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
           event_loop_factory_->configuration(), logged_configuration(),
-          log_files_.front_boots(), FLAGS_skip_order_validation,
+          log_files_.boots(), FLAGS_skip_order_validation,
           chrono::duration_cast<chrono::nanoseconds>(
               chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
 
@@ -785,7 +785,7 @@
   filters_ =
       std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
           event_loop->configuration(), logged_configuration(),
-          log_files_.front_boots(), FLAGS_skip_order_validation,
+          log_files_.boots(), FLAGS_skip_order_validation,
           chrono::duration_cast<chrono::nanoseconds>(
               chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
 
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 0566120..dc7078a 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -2084,5 +2084,113 @@
   return stream;
 }
 
+// Validates that collection of log files or log parts shares the same configs.
+template <typename TCollection>
+bool CheckMatchingConfigs(const TCollection &items) {
+  const Configuration *config = nullptr;
+  for (const auto &item : items) {
+    VLOG(1) << item;
+    if (config == nullptr) {
+      config = GetConfig(item);
+    } else {
+      if (config != GetConfig(item)) {
+        LOG(ERROR) << ": Config mismatched: " << config << " vs. "
+                   << GetConfig(item);
+        return false;
+      }
+    }
+  }
+  if (config == nullptr) {
+    LOG(ERROR) << ": No configs are found";
+    return false;
+  }
+  return true;
+}
+
+// Provides unified access to config field stored in LogFile. It is used in
+// CheckMatchingConfigs.
+inline const Configuration *GetConfig(const LogFile &log_file) {
+  return log_file.config.get();
+}
+
+// Output of LogPartsAccess for debug purposes.
+std::ostream &operator<<(std::ostream &stream,
+                         const LogPartsAccess &log_parts_access) {
+  stream << log_parts_access.parts();
+  return stream;
+}
+
+SelectedLogParts::SelectedLogParts(std::string_view node_name,
+                                   size_t boot_index,
+                                   std::vector<LogPartsAccess> log_parts)
+    : node_name_(node_name),
+      boot_index_(boot_index),
+      log_parts_(std::move(log_parts)) {
+  CHECK_GT(log_parts_.size(), 0u) << ": Nothing was selected for node "
+                                  << node_name_ << " boot " << boot_index_;
+  CHECK(CheckMatchingConfigs(log_parts_));
+  config_ = log_parts_.front().config();
+
+  // Enforce that we are sorting things only from a single node from a single
+  // boot.
+  const std::string_view part0_source_boot_uuid =
+      log_parts_.front().source_boot_uuid();
+  for (const auto &part : log_parts_) {
+    CHECK_EQ(node_name_, part.node_name()) << ": Can't merge different nodes.";
+    CHECK_EQ(part0_source_boot_uuid, part.source_boot_uuid())
+        << ": Can't merge different boots.";
+    CHECK_EQ(boot_index_, part.boot_count());
+  }
+}
+
+LogFilesContainer::LogFilesContainer(
+    std::optional<const LogSource *> log_source, std::vector<LogFile> log_files)
+    : log_source_(log_source), log_files_(std::move(log_files)) {
+  CHECK_GT(log_files_.size(), 0u);
+  CHECK(CheckMatchingConfigs(log_files_));
+  config_ = log_files_.front().config.get();
+  boots_ = log_files_.front().boots;
+
+  std::unordered_set<std::string> logger_nodes;
+
+  // Scan and collect all related nodes and number of reboots per node.
+  for (const LogFile &log_file : log_files_) {
+    for (const LogParts &part : log_file.parts) {
+      auto node_item = nodes_boots_.find(part.node);
+      if (node_item != nodes_boots_.end()) {
+        node_item->second = std::max(node_item->second, part.boot_count + 1);
+      } else {
+        nodes_boots_[part.node] = part.boot_count + 1;
+      }
+    }
+    logger_nodes.insert(log_file.logger_node);
+  }
+  while (!logger_nodes.empty()) {
+    logger_nodes_.emplace_back(
+        logger_nodes.extract(logger_nodes.begin()).value());
+  }
+}
+
+size_t LogFilesContainer::BootsForNode(std::string_view node_name) const {
+  const auto &node_item = nodes_boots_.find(std::string(node_name));
+  CHECK(node_item != nodes_boots_.end())
+      << ": Missing parts associated with node " << node_name;
+  CHECK_GT(node_item->second, 0u) << ": No boots for node " << node_name;
+  return node_item->second;
+}
+
+SelectedLogParts LogFilesContainer::SelectParts(std::string_view node_name,
+                                                size_t boot_index) const {
+  std::vector<LogPartsAccess> result;
+  for (const LogFile &log_file : log_files_) {
+    for (const LogParts &part : log_file.parts) {
+      if (part.node == node_name && part.boot_count == boot_index) {
+        result.emplace_back(log_source_, part);
+      }
+    }
+  }
+  return SelectedLogParts(node_name, boot_index, result);
+}
+
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 41a3498..7461e89 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -151,84 +151,77 @@
 // Recursively searches for logfiles in argv[1] and onward.
 std::vector<std::string> FindLogs(int argc, char **argv);
 
-// Validates that collection of log files or log parts shares the same configs.
-template <typename TCollection>
-bool CheckMatchingConfigs(const TCollection &items) {
-  const Configuration *config = nullptr;
-  for (const auto &item : items) {
-    VLOG(1) << item;
-    if (config == nullptr) {
-      config = item.config.get();
-    } else {
-      if (config != item.config.get()) {
-        LOG(ERROR) << ": Config mismatched: " << config << " vs. "
-                   << item.config.get();
-        return false;
-      }
-    }
+// Proxy container to bind log parts with log source. It helps with reading logs
+// from virtual media such as memory or S3.
+class LogPartsAccess {
+ public:
+  LogPartsAccess(std::optional<const LogSource *> log_source,
+                 LogParts log_parts)
+      : log_source_(std::move(log_source)), log_parts_(std::move(log_parts)) {
+    CHECK(!log_parts_.parts.empty());
   }
-  if (config == nullptr) {
-    LOG(ERROR) << ": No configs are found";
-    return false;
+
+  std::string_view node_name() const { return log_parts_.node; }
+
+  std::string_view source_boot_uuid() const {
+    return log_parts_.source_boot_uuid;
   }
-  return true;
+
+  size_t boot_count() const { return log_parts_.boot_count; }
+
+  const Configuration *config() const { return log_parts_.config.get(); }
+
+  std::optional<const LogSource *> log_source() const { return log_source_; }
+
+  std::string GetPartAt(size_t index) const {
+    CHECK_LT(index, log_parts_.parts.size());
+    return log_parts_.parts[index];
+  }
+
+  // TODO (Alexei): do we need to reduce it to concrete operations?
+  const LogParts &parts() const { return log_parts_; }
+
+  size_t size() const { return log_parts_.parts.size(); }
+
+ private:
+  std::optional<const LogSource *> log_source_;
+  LogParts log_parts_;
+};
+
+// Provides unified access to config field stored in LogPartsAccess. It is used
+// in CheckMatchingConfigs.
+inline const Configuration *GetConfig(const LogPartsAccess &log_parts_access) {
+  return log_parts_access.config();
 }
 
+// Output of LogPartsAccess for debug purposes.
+std::ostream &operator<<(std::ostream &stream,
+                         const LogPartsAccess &log_parts_access);
+
 // Collection of log parts that associated with pair: node and boot.
 class SelectedLogParts {
  public:
-  SelectedLogParts(std::optional<const LogSource *> log_source,
-                   std::string_view node_name, size_t boot_count,
-                   std::vector<LogParts> log_parts)
-      : log_source_(log_source),
-        node_name_(node_name),
-        boot_count_(boot_count),
-        log_parts_(std::move(log_parts)) {
-    CHECK_GT(log_parts_.size(), 0u) << ": Nothing was selected for node "
-                                    << node_name << " boot " << boot_count;
-    configs_matched_ = CheckMatchingConfigs(log_parts_);
-
-    // Enforce that we are sorting things only from a single node from a single
-    // boot.
-    const std::string_view part0_source_boot_uuid =
-        log_parts_.front().source_boot_uuid;
-    for (const auto &part : log_parts_) {
-      CHECK_EQ(node_name_, part.node) << ": Can't merge different nodes.";
-      CHECK_EQ(part0_source_boot_uuid, part.source_boot_uuid)
-          << ": Can't merge different boots.";
-      CHECK_EQ(boot_count_, part.boot_count);
-    }
-  }
+  SelectedLogParts(std::string_view node_name, size_t boot_index,
+                   std::vector<LogPartsAccess> log_parts);
 
   // Use items in fancy loops.
-  auto begin() { return log_parts_.begin(); }
-  auto end() { return log_parts_.end(); }
-  auto cbegin() const { return log_parts_.cbegin(); }
-  auto cend() const { return log_parts_.cend(); }
   auto begin() const { return log_parts_.begin(); }
   auto end() const { return log_parts_.end(); }
 
-  const Configuration *config() const {
-    // TODO (Alexei): it is a first usage of assumption that all parts have
-    // matching configs. Should it be strong requirement and validated in the
-    // constructor?
-    CHECK(configs_matched_);
-    return log_parts_.front().config.get();
-  }
+  // Config that shared across all log parts.
+  const Configuration *config() const { return config_; }
 
   const std::string &node_name() const { return node_name_; }
 
   // Number of boots found in the log parts.
-  size_t boot_count() const { return boot_count_; }
+  size_t boot_count() const { return boot_index_; }
 
  private:
-  std::optional<const LogSource *> log_source_;
   std::string node_name_;
-  size_t boot_count_;
-  std::vector<LogParts> log_parts_;
+  size_t boot_index_;
+  std::vector<LogPartsAccess> log_parts_;
 
-  // Indicates that all parts shared the same config.
-  bool configs_matched_;
+  const Configuration *config_;
 };
 
 // Container that keeps a sorted list of log files and provides functions that
@@ -252,39 +245,17 @@
   }
 
   // Returns numbers of reboots found in log files associated with the node.
-  size_t BootsForNode(std::string_view node_name) const {
-    const auto &node_item = nodes_boots_.find(std::string(node_name));
-    CHECK(node_item != nodes_boots_.end())
-        << ": Missing parts associated with node " << node_name;
-    CHECK_GT(node_item->second, 0u) << ": No boots for node " << node_name;
-    return node_item->second;
-  }
+  size_t BootsForNode(std::string_view node_name) const;
 
   // Get only log parts that associated with node and boot number.
   SelectedLogParts SelectParts(std::string_view node_name,
-                               size_t boot_count) const {
-    std::vector<LogParts> result;
-    for (const LogFile &log_file : log_files_) {
-      for (const LogParts &part : log_file.parts) {
-        if (part.node == node_name && part.boot_count == boot_count) {
-          result.emplace_back(part);
-        }
-      }
-    }
-    return SelectedLogParts(log_source_, node_name, boot_count, result);
-  }
+                               size_t boot_index) const;
 
-  // It provides access to boots of the first element. I'm not sure why...
-  const auto &front_boots() const { return log_files_.front().boots; }
+  // It provides access to boots logged by all log files in the container.
+  const std::shared_ptr<const Boots> &boots() const { return boots_; }
 
   // Access the configuration shared with all log files in the container.
-  const Configuration *config() const {
-    // TODO (Alexei): it is a first usage of assumption that all parts have
-    // matching configs. Should it be strong requirement and validated in the
-    // constructor?
-    CHECK(configs_matched_);
-    return log_files_.front().config.get();
-  }
+  const Configuration *config() const { return config_; }
 
   // List of logger nodes for given set of log files.
   const auto &logger_nodes() const { return logger_nodes_; }
@@ -295,40 +266,17 @@
 
  private:
   LogFilesContainer(std::optional<const LogSource *> log_source,
-                    std::vector<LogFile> log_files)
-      : log_source_(log_source), log_files_(std::move(log_files)) {
-    CHECK_GT(log_files_.size(), 0u);
-
-    std::unordered_set<std::string> logger_nodes;
-
-    // Scan and collect all related nodes and number of reboots per node.
-    for (const LogFile &log_file : log_files_) {
-      for (const LogParts &part : log_file.parts) {
-        auto node_item = nodes_boots_.find(part.node);
-        if (node_item != nodes_boots_.end()) {
-          node_item->second = std::max(node_item->second, part.boot_count + 1);
-        } else {
-          nodes_boots_[part.node] = part.boot_count + 1;
-        }
-      }
-      logger_nodes.insert(log_file.logger_node);
-    }
-    while (!logger_nodes.empty()) {
-      logger_nodes_.emplace_back(
-          logger_nodes.extract(logger_nodes.begin()).value());
-    }
-    configs_matched_ = CheckMatchingConfigs(log_files_);
-  }
+                    std::vector<LogFile> log_files);
 
   std::optional<const LogSource *> log_source_;
   std::vector<LogFile> log_files_;
 
+  const Configuration *config_;
+  std::shared_ptr<const Boots> boots_;
+
   // Keeps information about nodes and number of reboots per node.
   std::unordered_map<std::string, size_t> nodes_boots_;
   std::vector<std::string> logger_nodes_;
-
-  // Indicates that all parts shared the same config.
-  bool configs_matched_;
 };
 
 }  // namespace logger
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 62c19ae..36bac42 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1376,8 +1376,8 @@
   return result;
 }
 
-MessageReader::MessageReader(std::string_view filename)
-    : span_reader_(filename),
+MessageReader::MessageReader(SpanReader span_reader)
+    : span_reader_(std::move(span_reader)),
       raw_log_file_header_(
           SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
   set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
@@ -1387,7 +1387,8 @@
       raw_log_file_header = ReadHeader(&span_reader_);
 
   // Make sure something was read.
-  CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
+  CHECK(raw_log_file_header)
+      << ": Failed to read header from: " << span_reader_.filename();
 
   raw_log_file_header_ = std::move(*raw_log_file_header);
 
@@ -1401,7 +1402,7 @@
                 chrono::duration<double>(FLAGS_max_out_of_order))
           : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
 
-  VLOG(1) << "Opened " << filename << " as node "
+  VLOG(1) << "Opened " << span_reader_.filename() << " as node "
           << FlatbufferToJson(log_file_header()->node());
 }
 
@@ -1560,18 +1561,32 @@
                                                 &DestroyAndFree);
 }
 
-PartsMessageReader::PartsMessageReader(LogParts log_parts)
-    : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
-  if (parts_.parts.size() >= 2) {
-    next_message_reader_.emplace(parts_.parts[1]);
+SpanReader PartsMessageReader::MakeSpanReader(
+    const LogPartsAccess &log_parts_access, size_t part_number) {
+  const auto part = log_parts_access.GetPartAt(part_number);
+  if (log_parts_access.log_source().has_value()) {
+    return SpanReader(part,
+                      log_parts_access.log_source().value()->GetDecoder(part));
+  } else {
+    return SpanReader(part);
+  }
+}
+
+PartsMessageReader::PartsMessageReader(LogPartsAccess log_parts_access)
+    : log_parts_access_(std::move(log_parts_access)),
+      message_reader_(MakeSpanReader(log_parts_access_, 0)) {
+  if (log_parts_access_.size() >= 2) {
+    next_message_reader_.emplace(MakeSpanReader(log_parts_access_, 1));
   }
   ComputeBootCounts();
 }
 
 void PartsMessageReader::ComputeBootCounts() {
-  boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
+  boot_counts_.assign(configuration::NodesCount(log_parts_access_.config()),
                       std::nullopt);
 
+  const auto boots = log_parts_access_.parts().boots;
+
   // We have 3 vintages of log files with different amounts of information.
   if (log_file_header()->has_boot_uuids()) {
     // The new hotness with the boots explicitly listed out.  We can use the log
@@ -1580,10 +1595,10 @@
     size_t node_index = 0;
     for (const flatbuffers::String *boot_uuid :
          *log_file_header()->boot_uuids()) {
-      CHECK(parts_.boots);
+      CHECK(boots);
       if (boot_uuid->size() != 0) {
-        auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
-        if (it != parts_.boots->boot_count_map.end()) {
+        auto it = boots->boot_count_map.find(boot_uuid->str());
+        if (it != boots->boot_count_map.end()) {
           boot_counts_[node_index] = it->second;
         }
       } else if (parts().boots->boots[node_index].size() == 1u) {
@@ -1595,11 +1610,10 @@
     // Older multi-node logs which are guarenteed to have UUIDs logged, or
     // single node log files with boot UUIDs in the header.  We only know how to
     // order certain boots in certain circumstances.
-    if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
+    if (configuration::MultiNode(log_parts_access_.config()) || boots) {
       for (size_t node_index = 0; node_index < boot_counts_.size();
            ++node_index) {
-        CHECK(parts_.boots);
-        if (parts().boots->boots[node_index].size() == 1u) {
+        if (boots->boots[node_index].size() == 1u) {
           boot_counts_[node_index] = 0;
         }
       }
@@ -1624,17 +1638,18 @@
       // start time.
       // TODO(austin): Does this work with startup when we don't know the
       // remote start time too?  Look at one of those logs to compare.
-      if (monotonic_sent_time >
-          parts_.monotonic_start_time + max_out_of_order_duration()) {
+      if (monotonic_sent_time > log_parts_access_.parts().monotonic_start_time +
+                                    max_out_of_order_duration()) {
         after_start_ = true;
       }
       if (after_start_) {
         CHECK_GE(monotonic_sent_time,
                  newest_timestamp_ - max_out_of_order_duration())
             << ": Max out of order of " << max_out_of_order_duration().count()
-            << "ns exceeded. " << parts_ << ", start time is "
-            << parts_.monotonic_start_time << " currently reading "
-            << filename();
+            << "ns exceeded. " << log_parts_access_.parts()
+            << ", start time is "
+            << log_parts_access_.parts().monotonic_start_time
+            << " currently reading " << filename();
       }
       return message;
     }
@@ -1645,7 +1660,7 @@
 }
 
 void PartsMessageReader::NextLog() {
-  if (next_part_index_ == parts_.parts.size()) {
+  if (next_part_index_ == log_parts_access_.size()) {
     CHECK(!next_message_reader_);
     done_ = true;
     return;
@@ -1653,8 +1668,9 @@
   CHECK(next_message_reader_);
   message_reader_ = std::move(*next_message_reader_);
   ComputeBootCounts();
-  if (next_part_index_ + 1 < parts_.parts.size()) {
-    next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
+  if (next_part_index_ + 1 < log_parts_access_.size()) {
+    next_message_reader_.emplace(
+        MakeSpanReader(log_parts_access_, next_part_index_ + 1));
   } else {
     next_message_reader_.reset();
   }
@@ -1748,8 +1764,8 @@
   return os;
 }
 
-MessageSorter::MessageSorter(LogParts log_parts)
-    : parts_message_reader_(log_parts),
+MessageSorter::MessageSorter(const LogPartsAccess log_parts_access)
+    : parts_message_reader_(log_parts_access),
       source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
 }
 
@@ -1850,7 +1866,7 @@
   const auto parts = log_files.SelectParts(node_name, boot_count);
   node_ = configuration::GetNodeIndex(parts.config(), node_name);
 
-  for (LogParts part : parts) {
+  for (LogPartsAccess part : parts) {
     message_sorters_.emplace_back(std::move(part));
   }
 
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 3281bc8..9dc7d88 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -269,7 +269,11 @@
 // handles any per-file state left before merging below.
 class MessageReader {
  public:
-  MessageReader(std::string_view filename);
+  // TODO (Alexei): it's deprecated and needs to be removed.
+  explicit MessageReader(std::string_view filename)
+      : MessageReader(SpanReader(filename)) {}
+
+  explicit MessageReader(SpanReader span_reader);
 
   std::string_view filename() const { return span_reader_.filename(); }
 
@@ -353,12 +357,17 @@
 // A class to seamlessly read messages from a list of part files.
 class PartsMessageReader {
  public:
-  PartsMessageReader(LogParts log_parts);
+  // TODO (Alexei): it's deprecated, need to removed.
+  explicit PartsMessageReader(LogParts log_parts)
+      : PartsMessageReader(LogPartsAccess(std::nullopt, std::move(log_parts))) {
+  }
+
+  explicit PartsMessageReader(LogPartsAccess log_parts_access);
 
   std::string_view filename() const { return message_reader_.filename(); }
 
   // Returns the LogParts that holds the filenames we are reading.
-  const LogParts &parts() const { return parts_; }
+  const LogParts &parts() const { return log_parts_access_.parts(); }
 
   const LogFileHeader *log_file_header() const {
     return message_reader_.log_file_header();
@@ -389,12 +398,15 @@
   }
 
  private:
+  static SpanReader MakeSpanReader(const LogPartsAccess &log_parts_access,
+                                   size_t part_number);
+
   // Opens the next log and updates message_reader_.  Sets done_ if there is
   // nothing more to do.
   void NextLog();
   void ComputeBootCounts();
 
-  const LogParts parts_;
+  const LogPartsAccess log_parts_access_;
   size_t next_part_index_ = 1u;
   bool done_ = false;
 
@@ -544,7 +556,11 @@
 // Class to sort the resulting messages from a PartsMessageReader.
 class MessageSorter {
  public:
-  MessageSorter(LogParts log_parts);
+  // TODO (Alexei): it's deperecated and need to be removed.
+  explicit MessageSorter(LogParts log_parts)
+      : MessageSorter(LogPartsAccess(std::nullopt, std::move(log_parts))) {}
+
+  explicit MessageSorter(const LogPartsAccess log_parts_access);
 
   // Returns the parts that this is sorting messages from.
   const LogParts &parts() const { return parts_message_reader_.parts(); }
@@ -635,6 +651,7 @@
  private:
   // Unsorted list of all parts sorters.
   std::vector<MessageSorter> message_sorters_;
+
   // Pointer to the parts sorter holding the current Front message if one
   // exists, or nullptr if a new one needs to be found.
   MessageSorter *current_ = nullptr;
diff --git a/aos/events/logging/timestamp_extractor.cc b/aos/events/logging/timestamp_extractor.cc
index cf0ea5a..72f9de8 100644
--- a/aos/events/logging/timestamp_extractor.cc
+++ b/aos/events/logging/timestamp_extractor.cc
@@ -43,7 +43,7 @@
 
   // Now, build up the estimator used to solve for time.
   message_bridge::MultiNodeNoncausalOffsetEstimator multinode_estimator(
-      config, config, log_files.front_boots(), FLAGS_skip_order_validation,
+      config, config, log_files.boots(), FLAGS_skip_order_validation,
       chrono::seconds(0));
   multinode_estimator.set_reboot_found(
       [config](distributed_clock::time_point reboot_time,