Merge "Add redzones around lockless queue data"
diff --git a/aos/aos_dump.cc b/aos/aos_dump.cc
index ac82b95..ef61f6b 100644
--- a/aos/aos_dump.cc
+++ b/aos/aos_dump.cc
@@ -14,6 +14,9 @@
"If true, fetch the current message on the channel first");
DEFINE_bool(pretty, false,
"If true, pretty print the messages on multiple lines");
+DEFINE_bool(print_timestamps, true, "If true, timestamps are printed.");
+DEFINE_uint64(count, 0,
+ "If >0, aos_dump will exit after printing this many messages.");
namespace {
@@ -29,14 +32,18 @@
builder, channel->schema(), static_cast<const uint8_t *>(context.data),
{FLAGS_pretty, static_cast<size_t>(FLAGS_max_vector_size)});
- if (context.monotonic_remote_time != context.monotonic_event_time) {
- std::cout << context.realtime_remote_time << " ("
- << context.monotonic_remote_time << ") delivered "
- << context.realtime_event_time << " ("
- << context.monotonic_event_time << "): " << *builder << '\n';
+ if (FLAGS_print_timestamps) {
+ if (context.monotonic_remote_time != context.monotonic_event_time) {
+ std::cout << context.realtime_remote_time << " ("
+ << context.monotonic_remote_time << ") delivered "
+ << context.realtime_event_time << " ("
+ << context.monotonic_event_time << "): " << *builder << '\n';
+ } else {
+ std::cout << context.realtime_event_time << " ("
+ << context.monotonic_event_time << "): " << *builder << '\n';
+ }
} else {
- std::cout << context.realtime_event_time << " ("
- << context.monotonic_event_time << "): " << *builder << '\n';
+ std::cout << *builder << '\n';
}
}
@@ -58,7 +65,7 @@
aos::configuration::ReadConfig(FLAGS_config);
const aos::Configuration *config_msg = &config.message();
- ::aos::ShmEventLoop event_loop(config_msg);
+ aos::ShmEventLoop event_loop(config_msg);
event_loop.SkipTimingReport();
event_loop.SkipAosLog();
@@ -98,6 +105,8 @@
LOG(FATAL) << "Multiple channels found with same type";
}
+ uint64_t message_count = 0;
+
aos::FastStringBuilder str_builder;
for (const aos::Channel *channel : found_channels) {
@@ -106,13 +115,22 @@
event_loop.MakeRawFetcher(channel);
if (fetcher->Fetch()) {
PrintMessage(channel, fetcher->context(), &str_builder);
+ ++message_count;
}
}
+ if (FLAGS_count > 0 && message_count >= FLAGS_count) {
+ return 0;
+ }
+
event_loop.MakeRawWatcher(
- channel, [channel, &str_builder](const aos::Context &context,
- const void * /*message*/) {
+ channel, [channel, &str_builder, &event_loop, &message_count](
+ const aos::Context &context, const void * /*message*/) {
PrintMessage(channel, context, &str_builder);
+ ++message_count;
+ if (FLAGS_count > 0 && message_count >= FLAGS_count) {
+ event_loop.Exit();
+ }
});
}
diff --git a/aos/configuration.h b/aos/configuration.h
index 4ada459..fbd9cff 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -57,6 +57,14 @@
const Node *node) {
return GetChannel(&config.message(), name, type, application_name, node);
}
+template <typename T>
+inline const Channel *GetChannel(const Configuration *config,
+ const std::string_view name,
+ const std::string_view application_name,
+ const Node *node) {
+ return GetChannel(config, name, T::GetFullyQualifiedName(), application_name,
+ node);
+}
// Convenience wrapper for getting a channel from a specified config if you
// already have the name/type in a Channel object--this is useful if you Channel
// object you have does not point to memory within config.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 902ca76..5912ac6 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -143,6 +143,7 @@
":ping_fbs",
":pong_fbs",
"//aos/network:message_bridge_client_fbs",
+ "//aos/events/logging:logger_fbs",
"//aos/network:timestamp_fbs",
"//aos/network:message_bridge_server_fbs",
],
@@ -306,6 +307,7 @@
":aos_logging",
":event_loop",
":simple_channel",
+ "//aos/events/logging:logger_fbs",
"//aos/ipc_lib:index",
"//aos/network:message_bridge_client_status",
"//aos/network:message_bridge_server_status",
diff --git a/aos/events/aos_logging.cc b/aos/events/aos_logging.cc
index f831071..d312925 100644
--- a/aos/events/aos_logging.cc
+++ b/aos/events/aos_logging.cc
@@ -19,7 +19,7 @@
builder.add_message(message_str);
builder.add_level(
static_cast<::aos::logging::Level>(message_data.level));
- builder.add_source(message_data.source);
+ builder.add_source_pid(message_data.source);
builder.add_name(name_str);
message.Send(builder.Finish());
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index c922f2e..de397e2 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -527,7 +527,10 @@
CHECK(channel != nullptr)
<< ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
<< T::GetFullyQualifiedName() << "\" } not found in config for "
- << name() << ".";
+ << name()
+ << (configuration::MultiNode(configuration_)
+ ? absl::StrCat(" on node ", node()->name()->string_view())
+ : ".");
if (!configuration::ChannelIsSendableOnNode(channel, node())) {
LOG(FATAL) << "Channel { \"name\": \"" << channel_name
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index 202c772..f5d6c92 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -94,9 +94,12 @@
// We get to pick our tradeoffs here. Either we assume that there are no
// backward step changes in our time function for each node, or we have to
- // let time go backwards. This backwards time jump should be small, so we
- // can check for it and bound it.
- CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::milliseconds(100))
+ // let time go backwards. We currently only really see this happen when 2
+ // events are scheduled for "now", time changes, and there is a nanosecond
+ // or two of rounding due to integer math.
+ //
+ // //aos/events/logging:logger_test triggers this.
+ CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
<< ": Simulated time went backwards by too much. Please investigate.";
now_ = std::get<0>(oldest_event);
@@ -120,9 +123,12 @@
// We get to pick our tradeoffs here. Either we assume that there are no
// backward step changes in our time function for each node, or we have to
- // let time go backwards. This backwards time jump should be small, so we
- // can check for it and bound it.
- CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::milliseconds(100))
+ // let time go backwards. We currently only really see this happen when 2
+ // events are scheduled for "now", time changes, and there is a nanosecond
+ // or two of rounding due to integer math.
+ //
+ // //aos/events/logging:logger_test triggers this.
+ CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
<< ": Simulated time went backwards by too much. Please investigate.";
now_ = std::get<0>(oldest_event);
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 468d904..c067048 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -196,9 +196,16 @@
inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
// Make sure we stay in sync.
if (monotonic_now_valid_) {
+ // We want time to be smooth, so confirm that it doesn't change too much
+ // while handling an event.
+ //
+ // There are 2 sources of error. There are numerical precision and interger
+ // rounding problems going from the monotonic clock to the distributed clock
+ // and back again. When we update the time function as well to transition
+ // line segments, we have a slight jump as well.
CHECK_NEAR(monotonic_now_,
FromDistributedClock(scheduler_scheduler_->distributed_now()),
- std::chrono::nanoseconds(1));
+ std::chrono::nanoseconds(2));
return monotonic_now_;
} else {
return FromDistributedClock(scheduler_scheduler_->distributed_now());
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 0fb4708..56bef43 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -36,22 +36,29 @@
cc_library(
name = "logger",
srcs = [
+ "log_namer.cc",
"logger.cc",
"logger_math.cc",
],
hdrs = [
+ "eigen_mpq.h",
+ "log_namer.h",
"logger.h",
],
visibility = ["//visibility:public"],
deps = [
":logfile_utils",
":logger_fbs",
+ ":uuid",
"//aos/events:event_loop",
"//aos/events:simulated_event_loop",
+ "//aos/network:message_bridge_server_fbs",
"//aos/network:team_number",
"//aos/network:timestamp_filter",
"//aos/time",
+ "//third_party/gmp",
"@com_github_google_flatbuffers//:flatbuffers",
+ "@com_google_absl//absl/strings",
"@com_google_absl//absl/types:span",
"@org_tuxfamily_eigen//:eigen",
],
@@ -129,6 +136,7 @@
name = "multinode_pingpong_config",
src = "multinode_pingpong.json",
flatbuffers = [
+ ":logger_fbs",
"//aos/events:ping_fbs",
"//aos/events:pong_fbs",
"//aos/network:message_bridge_client_fbs",
@@ -153,3 +161,18 @@
"//aos/testing:googletest",
],
)
+
+cc_library(
+ name = "uuid",
+ srcs = ["uuid.cc"],
+ hdrs = ["uuid.h"],
+)
+
+cc_test(
+ name = "uuid_test",
+ srcs = ["uuid_test.cc"],
+ deps = [
+ ":uuid",
+ "//aos/testing:googletest",
+ ],
+)
diff --git a/aos/events/logging/eigen_mpq.h b/aos/events/logging/eigen_mpq.h
new file mode 100644
index 0000000..1463648
--- /dev/null
+++ b/aos/events/logging/eigen_mpq.h
@@ -0,0 +1,35 @@
+#ifndef AOS_EVENTS_LOGGING_EIGEN_MPQ_H_
+#define AOS_EVENTS_LOGGING_EIGEN_MPQ_H_
+
+#include "Eigen/Dense"
+#include "third_party/gmp/gmpxx.h"
+
+namespace Eigen {
+
+// TypeTraits for mpq_class. This is only really enough to use inverse().
+template <>
+struct NumTraits<mpq_class>
+ : GenericNumTraits<mpq_class> {
+ typedef mpq_class Real;
+ typedef mpq_class Literal;
+ typedef mpq_class NonInteger;
+ typedef mpq_class Nested;
+
+ enum {
+ IsComplex = 0,
+ IsInteger = 0,
+ IsSigned = 1,
+ RequireInitialization = 1,
+ ReadCost = 1,
+ AddCost = 3,
+ MulCost = 9
+ };
+
+ static inline Real dummy_precision() { return mpq_class(0); }
+ static inline Real epsilon() { return mpq_class(0); }
+ static inline int digits10() { return 0; }
+};
+
+} // namespace Eigen
+
+#endif // AOS_EVENTS_LOGGING_EIGEN_MPQ_H_
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index b9d53ff..f406d24 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -78,6 +78,11 @@
LOG(FATAL) << "Expected 1 logfile as an argument.";
}
aos::logger::MessageReader reader(argv[1]);
+ std::cout << aos::FlatbufferToJson(reader.log_file_header(),
+ {.multi_line = FLAGS_pretty,
+ .max_vector_size = static_cast<size_t>(
+ FLAGS_max_vector_size)})
+ << std::endl;
while (true) {
std::optional<aos::FlatbufferVector<aos::logger::MessageHeader>> message =
@@ -117,12 +122,14 @@
LOG(FATAL) << "Expected at least 1 logfile as an argument.";
}
- std::vector<std::vector<std::string>> logfiles;
-
+ std::vector<std::string> unsorted_logfiles;
for (int i = 1; i < argc; ++i) {
- logfiles.emplace_back(std::vector<std::string>{std::string(argv[i])});
+ unsorted_logfiles.emplace_back(std::string(argv[i]));
}
+ std::vector<std::vector<std::string>> logfiles =
+ aos::logger::SortParts(unsorted_logfiles);
+
aos::logger::LogReader reader(logfiles);
aos::FastStringBuilder builder;
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
new file mode 100644
index 0000000..def0842
--- /dev/null
+++ b/aos/events/logging/log_namer.cc
@@ -0,0 +1,237 @@
+#include "aos/events/logging/log_namer.h"
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <string_view>
+#include <vector>
+
+#include "absl/strings/str_cat.h"
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
+#include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace logger {
+
+void LogNamer::UpdateHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ const UUID &uuid, int parts_index) const {
+ header->mutable_message()->mutate_parts_index(parts_index);
+ CHECK_EQ(uuid.string_view().size(),
+ header->mutable_message()->mutable_parts_uuid()->size());
+ std::copy(uuid.string_view().begin(), uuid.string_view().end(),
+ reinterpret_cast<char *>(
+ header->mutable_message()->mutable_parts_uuid()->Data()));
+}
+
+void LocalLogNamer::WriteHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ const Node *node) {
+ CHECK_EQ(node, this->node());
+ UpdateHeader(header, uuid_, part_number_);
+ data_writer_->WriteSizedFlatbuffer(header->full_span());
+}
+
+DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
+ CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
+ return data_writer_.get();
+}
+
+void LocalLogNamer::Rotate(
+ const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
+ CHECK(node == this->node());
+ ++part_number_;
+ *data_writer_ = std::move(*OpenDataWriter());
+ UpdateHeader(header, uuid_, part_number_);
+ data_writer_->WriteSizedFlatbuffer(header->full_span());
+}
+
+DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
+ const Channel *channel) {
+ CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
+ << ": Message is not delivered to this node.";
+ CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
+ CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
+ node_))
+ << ": Delivery times aren't logged for this channel on this node.";
+ return data_writer_.get();
+}
+
+DetachedBufferWriter *LocalLogNamer::MakeForwardedTimestampWriter(
+ const Channel * /*channel*/, const Node * /*node*/) {
+ LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
+ return nullptr;
+}
+
+MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
+ const Configuration *configuration,
+ const Node *node)
+ : LogNamer(node),
+ base_name_(base_name),
+ configuration_(configuration),
+ uuid_(UUID::Random()),
+ data_writer_(OpenDataWriter()) {}
+
+void MultiNodeLogNamer::WriteHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ const Node *node) {
+ if (node == this->node()) {
+ UpdateHeader(header, uuid_, part_number_);
+ data_writer_->WriteSizedFlatbuffer(header->full_span());
+ } else {
+ for (std::pair<const Channel *const, DataWriter> &data_writer :
+ data_writers_) {
+ if (node == data_writer.second.node) {
+ UpdateHeader(header, data_writer.second.uuid,
+ data_writer.second.part_number);
+ data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+ }
+ }
+ }
+}
+
+void MultiNodeLogNamer::Rotate(
+ const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
+ if (node == this->node()) {
+ ++part_number_;
+ *data_writer_ = std::move(*OpenDataWriter());
+ UpdateHeader(header, uuid_, part_number_);
+ data_writer_->WriteSizedFlatbuffer(header->full_span());
+ } else {
+ for (std::pair<const Channel *const, DataWriter> &data_writer :
+ data_writers_) {
+ if (node == data_writer.second.node) {
+ ++data_writer.second.part_number;
+ data_writer.second.rotate(data_writer.first, &data_writer.second);
+ UpdateHeader(header, data_writer.second.uuid,
+ data_writer.second.part_number);
+ data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+ }
+ }
+ }
+}
+
+DetachedBufferWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
+ // See if we can read the data on this node at all.
+ const bool is_readable =
+ configuration::ChannelIsReadableOnNode(channel, this->node());
+ if (!is_readable) {
+ return nullptr;
+ }
+
+ // Then, see if we are supposed to log the data here.
+ const bool log_message =
+ configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
+
+ if (!log_message) {
+ return nullptr;
+ }
+
+ // Now, sort out if this is data generated on this node, or not. It is
+ // generated if it is sendable on this node.
+ if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
+ return data_writer_.get();
+ }
+
+ // Ok, we have data that is being forwarded to us that we are supposed to
+ // log. It needs to be logged with send timestamps, but be sorted enough
+ // to be able to be processed.
+ CHECK(data_writers_.find(channel) == data_writers_.end());
+
+ // Track that this node is being logged.
+ const Node *source_node = configuration::GetNode(
+ configuration_, channel->source_node()->string_view());
+
+ if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
+ nodes_.emplace_back(source_node);
+ }
+
+ DataWriter data_writer;
+ data_writer.node = source_node;
+ data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
+ OpenWriter(channel, data_writer);
+ };
+ data_writer.rotate(channel, &data_writer);
+
+ return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
+ .first->second.writer.get();
+}
+
+DetachedBufferWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
+ const Channel *channel, const Node *node) {
+ // See if we can read the data on this node at all.
+ const bool is_readable =
+ configuration::ChannelIsReadableOnNode(channel, this->node());
+ CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
+
+ CHECK(data_writers_.find(channel) == data_writers_.end());
+
+ if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
+ nodes_.emplace_back(node);
+ }
+
+ DataWriter data_writer;
+ data_writer.node = node;
+ data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
+ OpenForwardedTimestampWriter(channel, data_writer);
+ };
+ data_writer.rotate(channel, &data_writer);
+
+ return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
+ .first->second.writer.get();
+}
+
+DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
+ const Channel *channel) {
+ const bool log_delivery_times =
+ (this->node() == nullptr)
+ ? false
+ : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ channel, this->node(), this->node());
+ if (!log_delivery_times) {
+ return nullptr;
+ }
+
+ return data_writer_.get();
+}
+
+void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
+ DataWriter *data_writer) {
+ std::string filename =
+ absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
+ "/", channel->type()->string_view(), ".part",
+ data_writer->part_number, ".bfbs");
+
+ if (!data_writer->writer) {
+ data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+ } else {
+ *data_writer->writer = DetachedBufferWriter(filename);
+ }
+}
+
+void MultiNodeLogNamer::OpenWriter(const Channel *channel,
+ DataWriter *data_writer) {
+ const std::string filename = absl::StrCat(
+ base_name_, "_", channel->source_node()->string_view(), "_data",
+ channel->name()->string_view(), "/", channel->type()->string_view(),
+ ".part", data_writer->part_number, ".bfbs");
+ if (!data_writer->writer) {
+ data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+ } else {
+ *data_writer->writer = DetachedBufferWriter(filename);
+ }
+}
+
+std::unique_ptr<DetachedBufferWriter> MultiNodeLogNamer::OpenDataWriter() {
+ return std::make_unique<DetachedBufferWriter>(
+ absl::StrCat(base_name_, "_", node()->name()->string_view(), "_data.part",
+ part_number_, ".bfbs"));
+}
+
+} // namespace logger
+} // namespace aos
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
new file mode 100644
index 0000000..631016b
--- /dev/null
+++ b/aos/events/logging/log_namer.h
@@ -0,0 +1,168 @@
+#ifndef AOS_EVENTS_LOGGING_LOG_NAMER_H_
+#define AOS_EVENTS_LOGGING_LOG_NAMER_H_
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <string_view>
+#include <vector>
+
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+namespace logger {
+
+// Interface describing how to name, track, and add headers to log file parts.
+class LogNamer {
+ public:
+ // Constructs a LogNamer with the primary node (ie the one the logger runs on)
+ // being node.
+ LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
+ virtual ~LogNamer() {}
+
+ // Writes the header to all log files for a specific node. This function
+ // needs to be called after all the writers are created.
+ //
+ // Modifies header to contain the uuid and part number for each writer as it
+ // writes it. Since this is done unconditionally, it does not restore the
+ // previous value at the end.
+ virtual void WriteHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ const Node *node) = 0;
+
+ // Returns a writer for writing data logged on this channel (on the node
+ // provided in the constructor).
+ virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
+
+ // Returns a writer for writing timestamps logged on this channel (on the node
+ // provided in the constructor).
+ virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
+
+ // Returns a writer for writing timestamps delivered over the special
+ // /aos/remote_timestamps/* channels. node is the node that the timestamps
+ // are forwarded back from.
+ virtual DetachedBufferWriter *MakeForwardedTimestampWriter(
+ const Channel *channel, const Node *node) = 0;
+
+ // Rotates all log files for the provided node. The provided header will be
+ // modified and written per WriteHeader above.
+ virtual void Rotate(
+ const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) = 0;
+
+ // Returns all the nodes that data is being written for.
+ const std::vector<const Node *> &nodes() const { return nodes_; }
+
+ // Returns the node the logger is running on.
+ const Node *node() const { return node_; }
+
+ protected:
+ // Modifies the header to have the provided UUID and part id.
+ void UpdateHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ const UUID &uuid, int part_id) const;
+
+ const Node *const node_;
+ std::vector<const Node *> nodes_;
+};
+
+// Local log namer is a simple version which only names things
+// "base_name.part#.bfbs" and increments the part number. It doesn't support
+// any other log type.
+class LocalLogNamer : public LogNamer {
+ public:
+ LocalLogNamer(std::string_view base_name, const Node *node)
+ : LogNamer(node),
+ base_name_(base_name),
+ uuid_(UUID::Random()),
+ data_writer_(OpenDataWriter()) {}
+
+ void WriteHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ const Node *node) override;
+
+ DetachedBufferWriter *MakeWriter(const Channel *channel) override;
+
+ void Rotate(const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
+ override;
+
+ DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
+
+ DetachedBufferWriter *MakeForwardedTimestampWriter(
+ const Channel * /*channel*/, const Node * /*node*/) override;
+
+ private:
+ // Creates a new data writer with the new part number.
+ std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
+ return std::make_unique<DetachedBufferWriter>(
+ absl::StrCat(base_name_, ".part", part_number_, ".bfbs"));
+ }
+
+ const std::string base_name_;
+ const UUID uuid_;
+ size_t part_number_ = 0;
+ std::unique_ptr<DetachedBufferWriter> data_writer_;
+};
+
+// Log namer which uses a config and a base name to name a bunch of files.
+class MultiNodeLogNamer : public LogNamer {
+ public:
+ MultiNodeLogNamer(std::string_view base_name,
+ const Configuration *configuration, const Node *node);
+
+ void WriteHeader(
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+ const Node *node) override;
+
+ void Rotate(const Node *node,
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
+ override;
+
+ DetachedBufferWriter *MakeWriter(const Channel *channel) override;
+
+ DetachedBufferWriter *MakeForwardedTimestampWriter(
+ const Channel *channel, const Node *node) override;
+
+ DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
+
+ private:
+ // Files to write remote data to. We want one per channel. Maps the channel
+ // to the writer, Node, and part number.
+ struct DataWriter {
+ std::unique_ptr<DetachedBufferWriter> writer = nullptr;
+ const Node *node;
+ size_t part_number = 0;
+ UUID uuid = UUID::Random();
+ std::function<void(const Channel *, DataWriter *)> rotate;
+ };
+
+ // Opens up a writer for timestamps forwarded back.
+ void OpenForwardedTimestampWriter(const Channel *channel,
+ DataWriter *data_writer);
+
+ // Opens up a writer for remote data.
+ void OpenWriter(const Channel *channel, DataWriter *data_writer);
+
+ // Opens the main data writer file for this node responsible for data_writer_.
+ std::unique_ptr<DetachedBufferWriter> OpenDataWriter();
+
+ const std::string base_name_;
+ const Configuration *const configuration_;
+ const UUID uuid_;
+
+ size_t part_number_ = 0;
+
+ // File to write both delivery timestamps and local data to.
+ std::unique_ptr<DetachedBufferWriter> data_writer_;
+
+ std::map<const Channel *, DataWriter> data_writers_;
+};
+
+} // namespace logger
+} // namespace aos
+
+#endif // AOS_EVENTS_LOGGING_LOG_NAMER_H_
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index ade82f9..7f53577 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -37,6 +37,24 @@
DetachedBufferWriter::~DetachedBufferWriter() {
Flush();
PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
+ VLOG(1) << "Closed " << filename_;
+}
+
+DetachedBufferWriter::DetachedBufferWriter(
+ DetachedBufferWriter &&other) {
+ *this = std::move(other);
+}
+
+DetachedBufferWriter &DetachedBufferWriter::operator=(
+ DetachedBufferWriter &&other) {
+ Flush();
+ std::swap(filename_, other.filename_);
+ std::swap(fd_, other.fd_);
+ std::swap(queued_size_, other.queued_size_);
+ std::swap(written_size_, other.written_size_);
+ std::swap(queue_, other.queue_);
+ std::swap(iovec_, other.iovec_);
+ return *this;
}
void DetachedBufferWriter::QueueSizedFlatbuffer(
@@ -260,19 +278,36 @@
FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename) {
SpanReader span_reader(filename);
- // Make sure we have enough to read the size.
absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
// Make sure something was read.
CHECK(config_data != absl::Span<const uint8_t>())
<< ": Failed to read header from: " << filename;
- // And copy the config so we have it forever.
+ // And copy the config so we have it forever, removing the size prefix.
std::vector<uint8_t> data(
config_data.begin() + sizeof(flatbuffers::uoffset_t), config_data.end());
return FlatbufferVector<LogFileHeader>(std::move(data));
}
+FlatbufferVector<MessageHeader> ReadNthMessage(std::string_view filename,
+ size_t n) {
+ SpanReader span_reader(filename);
+ absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
+ for (size_t i = 0; i < n + 1; ++i) {
+ data_span = span_reader.ReadMessage();
+
+ // Make sure something was read.
+ CHECK(data_span != absl::Span<const uint8_t>())
+ << ": Failed to read data from: " << filename;
+ }
+
+ // And copy the data so we have it forever.
+ std::vector<uint8_t> data(data_span.begin() + sizeof(flatbuffers::uoffset_t),
+ data_span.end());
+ return FlatbufferVector<MessageHeader>(std::move(data));
+}
+
MessageReader::MessageReader(std::string_view filename)
: span_reader_(filename),
raw_log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
@@ -290,7 +325,7 @@
FlatbufferVector<LogFileHeader>(std::move(header_data_copy));
max_out_of_order_duration_ =
- std::chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
+ chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
VLOG(1) << "Opened " << filename << " as node "
<< FlatbufferToJson(log_file_header()->node());
@@ -323,6 +358,81 @@
// open more of them).
log_file_header_ = message_reader_->raw_log_file_header();
+ for (size_t i = 1; i < filenames_.size(); ++i) {
+ MessageReader message_reader(filenames_[i]);
+
+ const monotonic_clock::time_point new_monotonic_start_time(
+ chrono::nanoseconds(
+ message_reader.log_file_header()->monotonic_start_time()));
+ const realtime_clock::time_point new_realtime_start_time(
+ chrono::nanoseconds(
+ message_reader.log_file_header()->realtime_start_time()));
+
+ // There are 2 types of part files. Part files from before time estimation
+ // has started, and part files after. We don't declare a log file "started"
+ // until time estimation is up. And once a log file starts, it should never
+ // stop again, and should remain constant.
+ // To compare both types of headers, we mutate our saved copy of the header
+ // to match the next chunk by updating time if we detect a stopped ->
+ // started transition.
+ if (monotonic_start_time() == monotonic_clock::min_time) {
+ CHECK_EQ(realtime_start_time(), realtime_clock::min_time);
+ // We should only be missing the monotonic start time when logging data
+ // for remote nodes. We don't have a good way to deteremine the remote
+ // realtime offset, so it shouldn't be filled out.
+ // TODO(austin): If we have a good way, feel free to fill it out. It
+ // probably won't be better than we could do in post though with the same
+ // data.
+ CHECK(!log_file_header_.mutable_message()->has_realtime_start_time());
+ if (new_monotonic_start_time != monotonic_clock::min_time) {
+ // If we finally found our start time, update the header. Do this once
+ // because it should never change again.
+ log_file_header_.mutable_message()->mutate_monotonic_start_time(
+ new_monotonic_start_time.time_since_epoch().count());
+ log_file_header_.mutable_message()->mutate_realtime_start_time(
+ new_realtime_start_time.time_since_epoch().count());
+ }
+ }
+
+ // We don't have a good way to set the realtime start time on remote nodes.
+ // Confirm it remains consistent.
+ CHECK_EQ(log_file_header_.mutable_message()->has_realtime_start_time(),
+ message_reader.log_file_header()->has_realtime_start_time());
+
+ // Parts index will *not* match unless we set them to match. We only want
+ // to accept the start time and parts mismatching, so set them.
+ log_file_header_.mutable_message()->mutate_parts_index(
+ message_reader.log_file_header()->parts_index());
+
+ // Now compare that the headers match.
+ if (!CompareFlatBuffer(message_reader.raw_log_file_header(),
+ log_file_header_)) {
+ if (message_reader.log_file_header()->has_logger_uuid() &&
+ log_file_header_.message().has_logger_uuid() &&
+ message_reader.log_file_header()->logger_uuid()->string_view() !=
+ log_file_header_.message().logger_uuid()->string_view()) {
+ LOG(FATAL) << "Logger UUIDs don't match between log file chunks "
+ << filenames_[0] << " and " << filenames_[i]
+ << ", this is not supported.";
+ }
+ if (message_reader.log_file_header()->has_parts_uuid() &&
+ log_file_header_.message().has_parts_uuid() &&
+ message_reader.log_file_header()->parts_uuid()->string_view() !=
+ log_file_header_.message().parts_uuid()->string_view()) {
+ LOG(FATAL) << "Parts UUIDs don't match between log file chunks "
+ << filenames_[0] << " and " << filenames_[i]
+ << ", this is not supported.";
+ }
+
+ LOG(FATAL) << "Header is different between log file chunks "
+ << filenames_[0] << " and " << filenames_[i]
+ << ", this is not supported.";
+ }
+ }
+ // Put the parts index back to the first log file chunk.
+ log_file_header_.mutable_message()->mutate_parts_index(
+ message_reader_->log_file_header()->parts_index());
+
// Setup per channel state.
channels_.resize(configuration()->channels()->size());
for (ChannelData &channel_data : channels_) {
@@ -368,11 +478,22 @@
// We can't support the config diverging between two log file headers. See if
// they are the same.
if (next_filename_index_ != 0) {
+ // In order for the headers to identically compare, they need to have the
+ // same parts_index. Rewrite the saved header with the new parts_index,
+ // compare, and then restore.
+ const int32_t original_parts_index =
+ log_file_header_.message().parts_index();
+ log_file_header_.mutable_message()->mutate_parts_index(
+ message_reader_->log_file_header()->parts_index());
+
CHECK(CompareFlatBuffer(message_reader_->raw_log_file_header(),
log_file_header_))
<< ": Header is different between log file chunks "
<< filenames_[next_filename_index_] << " and "
<< filenames_[next_filename_index_ - 1] << ", this is not supported.";
+
+ log_file_header_.mutable_message()->mutate_parts_index(
+ original_parts_index);
}
++next_filename_index_;
@@ -545,10 +666,13 @@
timestamp = channels_[channel_index].data.front_timestamp();
FlatbufferVector<MessageHeader> front =
std::move(channels_[channel_index].data.front());
- channels_[channel_index].data.pop_front();
+ channels_[channel_index].data.PopFront();
- VLOG(1) << MaybeNodeName(target_node_) << "Popped " << this << " "
- << std::get<0>(timestamp) << " for " << channel_index;
+ VLOG(1) << MaybeNodeName(target_node_) << "Popped Data " << this << " "
+ << std::get<0>(timestamp) << " for "
+ << configuration::StrippedChannelToString(
+ configuration()->channels()->Get(channel_index))
+ << " (" << channel_index << ")";
QueueMessages(std::get<0>(timestamp));
@@ -558,19 +682,21 @@
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
-SplitMessageReader::PopOldest(int channel, int node_index) {
+SplitMessageReader::PopOldestTimestamp(int channel, int node_index) {
CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
timestamp = channels_[channel].timestamps[node_index].front_timestamp();
FlatbufferVector<MessageHeader> front =
std::move(channels_[channel].timestamps[node_index].front());
- channels_[channel].timestamps[node_index].pop_front();
+ channels_[channel].timestamps[node_index].PopFront();
- VLOG(1) << MaybeNodeName(target_node_) << "Popped " << this << " "
+ VLOG(1) << MaybeNodeName(target_node_) << "Popped timestamp " << this << " "
<< std::get<0>(timestamp) << " for "
<< configuration::StrippedChannelToString(
configuration()->channels()->Get(channel))
- << " on " << node_index;
+ << " on "
+ << configuration()->nodes()->Get(node_index)->name()->string_view()
+ << " (" << node_index << ")";
QueueMessages(std::get<0>(timestamp));
@@ -607,7 +733,7 @@
return true;
}
-void SplitMessageReader::MessageHeaderQueue::pop_front() {
+void SplitMessageReader::MessageHeaderQueue::PopFront() {
data_.pop_front();
if (data_.size() != 0u) {
// Yup, new data.
@@ -616,6 +742,15 @@
} else {
timestamp_merger->Update(split_reader, front_timestamp());
}
+ } else {
+ // Poke anyways to update the heap.
+ if (timestamps) {
+ timestamp_merger->UpdateTimestamp(
+ nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
+ } else {
+ timestamp_merger->Update(
+ nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
+ }
}
}
@@ -687,25 +822,32 @@
std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
timestamp,
SplitMessageReader *split_message_reader) {
- DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
- [split_message_reader](
- const std::tuple<monotonic_clock::time_point,
- uint32_t, SplitMessageReader *>
- x) {
- return std::get<2>(x) == split_message_reader;
- }) == message_heap_.end())
- << ": Pushing message when it is already in the heap.";
+ if (split_message_reader != nullptr) {
+ DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
+ [split_message_reader](
+ const std::tuple<monotonic_clock::time_point,
+ uint32_t, SplitMessageReader *>
+ x) {
+ return std::get<2>(x) == split_message_reader;
+ }) == message_heap_.end())
+ << ": Pushing message when it is already in the heap.";
- message_heap_.push_back(std::make_tuple(
- std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
+ message_heap_.push_back(std::make_tuple(
+ std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
- std::push_heap(message_heap_.begin(), message_heap_.end(),
- &SplitMessageReaderHeapCompare);
+ std::push_heap(message_heap_.begin(), message_heap_.end(),
+ &SplitMessageReaderHeapCompare);
+ }
// If we are just a data merger, don't wait for timestamps.
if (!has_timestamps_) {
- channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
- pushed_ = true;
+ if (!message_heap_.empty()) {
+ channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
+ pushed_ = true;
+ } else {
+ // Remove ourselves if we are empty.
+ channel_merger_->Update(monotonic_clock::min_time, channel_index_);
+ }
}
}
@@ -730,26 +872,33 @@
std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
timestamp,
SplitMessageReader *split_message_reader) {
- DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
- [split_message_reader](
- const std::tuple<monotonic_clock::time_point,
- uint32_t, SplitMessageReader *>
- x) {
- return std::get<2>(x) == split_message_reader;
- }) == timestamp_heap_.end())
- << ": Pushing timestamp when it is already in the heap.";
+ if (split_message_reader != nullptr) {
+ DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
+ [split_message_reader](
+ const std::tuple<monotonic_clock::time_point,
+ uint32_t, SplitMessageReader *>
+ x) {
+ return std::get<2>(x) == split_message_reader;
+ }) == timestamp_heap_.end())
+ << ": Pushing timestamp when it is already in the heap.";
- timestamp_heap_.push_back(std::make_tuple(
- std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
+ timestamp_heap_.push_back(std::make_tuple(
+ std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
- std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
- SplitMessageReaderHeapCompare);
+ std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+ SplitMessageReaderHeapCompare);
+ }
// If we are a timestamp merger, don't wait for data. Missing data will be
// caught at read time.
if (has_timestamps_) {
- channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
- pushed_ = true;
+ if (!timestamp_heap_.empty()) {
+ channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
+ pushed_ = true;
+ } else {
+ // Remove ourselves if we are empty.
+ channel_merger_->Update(monotonic_clock::min_time, channel_index_);
+ }
}
}
@@ -832,50 +981,73 @@
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
oldest_timestamp = std::get<2>(oldest_timestamp_reader)
- ->PopOldest(channel_index_, node_index_);
+ ->PopOldestTimestamp(channel_index_, node_index_);
// Confirm that the time we have recorded matches.
CHECK_EQ(std::get<0>(oldest_timestamp), std::get<0>(oldest_timestamp_reader));
CHECK_EQ(std::get<1>(oldest_timestamp), std::get<1>(oldest_timestamp_reader));
- // TODO(austin): What if we get duplicate timestamps?
+ // Now, keep reading until we have found all duplicates.
+ while (!timestamp_heap_.empty()) {
+ // See if it is a duplicate.
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+ next_oldest_timestamp_reader = timestamp_heap_.front();
- return oldest_timestamp;
-}
+ std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+ next_oldest_timestamp_time =
+ std::get<2>(next_oldest_timestamp_reader)
+ ->oldest_message(channel_index_, node_index_);
-TimestampMerger::DeliveryTimestamp TimestampMerger::OldestTimestamp() const {
- if (!has_timestamps_ || timestamp_heap_.size() == 0u) {
- return TimestampMerger::DeliveryTimestamp{};
+ if (std::get<0>(next_oldest_timestamp_time) ==
+ std::get<0>(oldest_timestamp) &&
+ std::get<1>(next_oldest_timestamp_time) ==
+ std::get<1>(oldest_timestamp)) {
+ // Pop the timestamp reader pointer.
+ std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+ &SplitMessageReaderHeapCompare);
+ timestamp_heap_.pop_back();
+
+ // Pop the next oldest timestamp. This re-pushes any messages from the
+ // reader.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ next_oldest_timestamp =
+ std::get<2>(next_oldest_timestamp_reader)
+ ->PopOldestTimestamp(channel_index_, node_index_);
+
+ // And make sure the contents matches in it's entirety.
+ CHECK(std::get<2>(oldest_timestamp).span() ==
+ std::get<2>(next_oldest_timestamp).span())
+ << ": Data at the same timestamp doesn't match, "
+ << aos::FlatbufferToJson(std::get<2>(oldest_timestamp)) << " vs "
+ << aos::FlatbufferToJson(std::get<2>(next_oldest_timestamp)) << " "
+ << absl::BytesToHexString(std::string_view(
+ reinterpret_cast<const char *>(
+ std::get<2>(oldest_timestamp).span().data()),
+ std::get<2>(oldest_timestamp).span().size()))
+ << " vs "
+ << absl::BytesToHexString(std::string_view(
+ reinterpret_cast<const char *>(
+ std::get<2>(next_oldest_timestamp).span().data()),
+ std::get<2>(next_oldest_timestamp).span().size()));
+
+ } else {
+ break;
+ }
}
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_timestamp_reader = timestamp_heap_.front();
-
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_timestamp = std::get<2>(oldest_timestamp_reader)
- ->oldest_message(channel_index_, node_index_);
-
- TimestampMerger::DeliveryTimestamp timestamp;
- timestamp.monotonic_event_time =
- monotonic_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_timestamp)->monotonic_sent_time()));
- timestamp.realtime_event_time = realtime_clock::time_point(
- chrono::nanoseconds(std::get<2>(oldest_timestamp)->realtime_sent_time()));
-
- timestamp.monotonic_remote_time =
- monotonic_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_timestamp)->monotonic_remote_time()));
- timestamp.realtime_remote_time =
- realtime_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_timestamp)->realtime_remote_time()));
-
- timestamp.remote_queue_index = std::get<2>(oldest_timestamp)->queue_index();
- return timestamp;
+ return oldest_timestamp;
}
std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
TimestampMerger::PopOldest() {
if (has_timestamps_) {
+ VLOG(1) << "Looking for matching timestamp for "
+ << configuration::StrippedChannelToString(
+ configuration_->channels()->Get(channel_index_))
+ << " (" << channel_index_ << ") "
+ << " at " << std::get<0>(oldest_timestamp());
+
// Read the timestamps.
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
@@ -944,7 +1116,8 @@
<< " on channel "
<< configuration::StrippedChannelToString(
configuration_->channels()->Get(channel_index_))
- << " (" << channel_index_ << ")";
+ << " (" << channel_index_ << ")"
+ << (VLOG_IS_ON(1) ? DebugString() : "");
return std::make_tuple(timestamp,
std::move(std::get<2>(oldest_timestamp)));
}
@@ -952,6 +1125,10 @@
timestamp.monotonic_remote_time = remote_monotonic_time;
}
+ VLOG(1) << "Found matching data "
+ << configuration::StrippedChannelToString(
+ configuration_->channels()->Get(channel_index_))
+ << " (" << channel_index_ << ")";
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
oldest_message = PopMessageHeap();
@@ -1062,10 +1239,27 @@
<< FlatbufferToJson(reader->node()) << " start_time "
<< monotonic_start_time();
} else {
- // And then make sure all the other files have matching headers.
- CHECK(CompareFlatBuffer(log_file_header(), reader->log_file_header()))
- << ": " << FlatbufferToJson(log_file_header()) << " reader "
- << FlatbufferToJson(reader->log_file_header());
+ // Find the earliest start time. That way, if we get a full log file
+ // directly from the node, and a partial later, we start with the
+ // full. Update our header to match that.
+ const monotonic_clock::time_point new_monotonic_start_time(
+ chrono::nanoseconds(
+ reader->log_file_header()->monotonic_start_time()));
+ const realtime_clock::time_point new_realtime_start_time(
+ chrono::nanoseconds(
+ reader->log_file_header()->realtime_start_time()));
+
+ if (monotonic_start_time() == monotonic_clock::min_time ||
+ (new_monotonic_start_time != monotonic_clock::min_time &&
+ new_monotonic_start_time < monotonic_start_time())) {
+ log_file_header_.mutable_message()->mutate_monotonic_start_time(
+ new_monotonic_start_time.time_since_epoch().count());
+ log_file_header_.mutable_message()->mutate_realtime_start_time(
+ new_realtime_start_time.time_since_epoch().count());
+ VLOG(1) << "Updated log file " << reader->filename()
+ << " with node " << FlatbufferToJson(reader->node())
+ << " start_time " << new_monotonic_start_time;
+ }
}
}
}
@@ -1105,24 +1299,6 @@
return channel_heap_.front().first;
}
-TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestamp() const {
- if (timestamp_heap_.empty()) {
- return TimestampMerger::DeliveryTimestamp{};
- }
- return timestamp_mergers_[timestamp_heap_.front().second].OldestTimestamp();
-}
-
-TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestampForChannel(
- int channel) const {
- // If we didn't find any data for this node, we won't have any mergers. Return
- // an invalid timestamp in that case.
- if (timestamp_mergers_.size() <= static_cast<size_t>(channel)) {
- TimestampMerger::DeliveryTimestamp result;
- return result;
- }
- return timestamp_mergers_[channel].OldestTimestamp();
-}
-
void ChannelMerger::PushChannelHeap(monotonic_clock::time_point timestamp,
int channel_index) {
// Pop and recreate the heap if it has already been pushed. And since we are
@@ -1142,23 +1318,11 @@
channel_heap_.erase(channel_iterator);
std::make_heap(channel_heap_.begin(), channel_heap_.end(),
ChannelHeapCompare);
+ }
- if (timestamp_mergers_[channel_index].has_timestamps()) {
- const auto timestamp_iterator = std::find_if(
- timestamp_heap_.begin(), timestamp_heap_.end(),
- [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
- return x.second == channel_index;
- });
- DCHECK(timestamp_iterator != timestamp_heap_.end());
- if (std::get<0>(*timestamp_iterator) == timestamp) {
- // It's already in the heap, in the correct spot, so nothing
- // more for us to do here.
- return;
- }
- timestamp_heap_.erase(timestamp_iterator);
- std::make_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
- ChannelHeapCompare);
- }
+ if (timestamp == monotonic_clock::min_time) {
+ timestamp_mergers_[channel_index].set_pushed(false);
+ return;
}
channel_heap_.push_back(std::make_pair(timestamp, channel_index));
@@ -1167,11 +1331,18 @@
// put the oldest message first.
std::push_heap(channel_heap_.begin(), channel_heap_.end(),
ChannelHeapCompare);
+}
- if (timestamp_mergers_[channel_index].has_timestamps()) {
- timestamp_heap_.push_back(std::make_pair(timestamp, channel_index));
- std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
- ChannelHeapCompare);
+void ChannelMerger::VerifyHeaps() {
+ std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
+ channel_heap_;
+ std::make_heap(channel_heap.begin(), channel_heap.end(), &ChannelHeapCompare);
+
+ for (size_t i = 0; i < channel_heap_.size(); ++i) {
+ CHECK(channel_heap_[i] == channel_heap[i]) << ": Heaps diverged...";
+ CHECK_EQ(
+ std::get<0>(channel_heap[i]),
+ timestamp_mergers_[std::get<1>(channel_heap[i])].channel_merger_time());
}
}
@@ -1190,17 +1361,6 @@
TimestampMerger *merger = ×tamp_mergers_[channel_index];
- if (merger->has_timestamps()) {
- CHECK_GT(timestamp_heap_.size(), 0u);
- std::pair<monotonic_clock::time_point, int> oldest_timestamp_data =
- timestamp_heap_.front();
- CHECK(oldest_timestamp_data == oldest_channel_data)
- << ": Timestamp heap out of sync.";
- std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
- &ChannelHeapCompare);
- timestamp_heap_.pop_back();
- }
-
// Merger handles any queueing needed from here.
std::tuple<TimestampMerger::DeliveryTimestamp,
FlatbufferVector<MessageHeader>>
@@ -1210,6 +1370,16 @@
<< ": channel_heap_ was corrupted for " << channel_index << ": "
<< DebugString();
+ CHECK_GE(std::get<0>(message).monotonic_event_time, last_popped_time_)
+ << ": " << MaybeNodeName(log_file_header()->node())
+ << "Messages came off the queue out of order. " << DebugString();
+ last_popped_time_ = std::get<0>(message).monotonic_event_time;
+
+ VLOG(1) << "Popped " << last_popped_time_ << " "
+ << configuration::StrippedChannelToString(
+ configuration()->channels()->Get(channel_index))
+ << " (" << channel_index << ")";
+
return std::make_tuple(std::get<0>(message), channel_index,
std::move(std::get<1>(message)));
}
@@ -1217,27 +1387,31 @@
std::string SplitMessageReader::MessageHeaderQueue::DebugString() const {
std::stringstream ss;
for (size_t i = 0; i < data_.size(); ++i) {
- if (timestamps) {
- ss << " msg: ";
- } else {
- ss << " timestamp: ";
- }
- ss << monotonic_clock::time_point(std::chrono::nanoseconds(
- data_[i].message().monotonic_sent_time()))
- << " ("
- << realtime_clock::time_point(
- std::chrono::nanoseconds(data_[i].message().realtime_sent_time()))
- << ") " << data_[i].message().queue_index();
- if (timestamps) {
- ss << " <- remote "
- << monotonic_clock::time_point(std::chrono::nanoseconds(
- data_[i].message().monotonic_remote_time()))
+ if (i < 5 || i + 5 > data_.size()) {
+ if (timestamps) {
+ ss << " msg: ";
+ } else {
+ ss << " timestamp: ";
+ }
+ ss << monotonic_clock::time_point(
+ chrono::nanoseconds(data_[i].message().monotonic_sent_time()))
<< " ("
- << realtime_clock::time_point(std::chrono::nanoseconds(
- data_[i].message().realtime_remote_time()))
- << ")";
+ << realtime_clock::time_point(
+ chrono::nanoseconds(data_[i].message().realtime_sent_time()))
+ << ") " << data_[i].message().queue_index();
+ if (timestamps) {
+ ss << " <- remote "
+ << monotonic_clock::time_point(chrono::nanoseconds(
+ data_[i].message().monotonic_remote_time()))
+ << " ("
+ << realtime_clock::time_point(chrono::nanoseconds(
+ data_[i].message().realtime_remote_time()))
+ << ")";
+ }
+ ss << "\n";
+ } else if (i == 5) {
+ ss << " ...\n";
}
- ss << "\n";
}
return ss.str();
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 2b08b59..9534860 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -41,13 +41,20 @@
class DetachedBufferWriter {
public:
DetachedBufferWriter(std::string_view filename);
+ DetachedBufferWriter(DetachedBufferWriter &&other);
+ DetachedBufferWriter(const DetachedBufferWriter &) = delete;
+
~DetachedBufferWriter();
- DetachedBufferWriter(const DetachedBufferWriter &) = delete;
+ DetachedBufferWriter &operator=(DetachedBufferWriter &&other);
DetachedBufferWriter &operator=(const DetachedBufferWriter &) = delete;
std::string_view filename() const { return filename_; }
+ // Rewrites a location in a file (relative to the start) to have new data in
+ // it. The main use case is updating start times after a log file starts.
+ void RewriteLocation(off64_t offset, absl::Span<const uint8_t> data);
+
// TODO(austin): Snappy compress the log file if it ends with .snappy!
// Queues up a finished FlatBufferBuilder to be written. Steals the detached
@@ -68,7 +75,7 @@
size_t total_size() const { return written_size_ + queued_size_; }
private:
- const std::string filename_;
+ std::string filename_;
int fd_ = -1;
@@ -89,6 +96,8 @@
int channel_index, LogType log_type);
FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename);
+FlatbufferVector<MessageHeader> ReadNthMessage(std::string_view filename,
+ size_t n);
// Class to read chunks out of a log file.
class SpanReader {
@@ -236,15 +245,15 @@
void SetTimestampMerger(TimestampMerger *timestamp_merger, int channel,
const Node *target_node);
- // Returns the (timestamp, queue_idex) for the oldest message in a channel, or
- // max_time if there is nothing in the channel.
+ // Returns the (timestamp, queue_index, message_header) for the oldest message
+ // in a channel, or max_time if there is nothing in the channel.
std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
oldest_message(int channel) {
return channels_[channel].data.front_timestamp();
}
- // Returns the (timestamp, queue_index) for the oldest delivery time in a
- // channel, or max_time if there is nothing in the channel.
+ // Returns the (timestamp, queue_index, message_header) for the oldest
+ // delivery time in a channel, or max_time if there is nothing in the channel.
std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
oldest_message(int channel, int destination_node) {
return channels_[channel].timestamps[destination_node].front_timestamp();
@@ -260,7 +269,7 @@
// a channel delivered to a node. Requeues data as needed.
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
- PopOldest(int channel, int node_index);
+ PopOldestTimestamp(int channel, int node_index);
// Returns the header for the log files.
const LogFileHeader *log_file_header() const {
@@ -367,7 +376,7 @@
bool emplace_back(FlatbufferVector<MessageHeader> &&msg);
// Drops the front message. Invalidates the front() reference.
- void pop_front();
+ void PopFront();
// The size of the queue.
size_t size() { return data_.size(); }
@@ -375,15 +384,16 @@
// Returns a debug string with info about each message in the queue.
std::string DebugString() const;
- // Returns the (timestamp, queue_index) for the oldest message.
+ // Returns the (timestamp, queue_index, message_header) for the oldest
+ // message.
const std::tuple<monotonic_clock::time_point, uint32_t,
const MessageHeader *>
front_timestamp() {
- CHECK_GT(data_.size(), 0u);
+ const MessageHeader &message = front().message();
return std::make_tuple(
- monotonic_clock::time_point(std::chrono::nanoseconds(
- front().message().monotonic_sent_time())),
- front().message().queue_index(), &front().message());
+ monotonic_clock::time_point(
+ std::chrono::nanoseconds(message.monotonic_sent_time())),
+ message.queue_index(), &message);
}
// Pointer to the timestamp merger for this queue if available.
@@ -471,9 +481,6 @@
// The caller can determine what the appropriate action is to recover.
std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
- // Returns the oldest forwarding timestamp.
- DeliveryTimestamp OldestTimestamp() const;
-
// Tracks if the channel merger has pushed this onto it's heap or not.
bool pushed() { return pushed_; }
// Sets if this has been pushed to the channel merger heap. Should only be
@@ -490,6 +497,14 @@
// called by a SplitMessageReader.
void NoticeAtEnd();
+ aos::monotonic_clock::time_point channel_merger_time() {
+ if (has_timestamps_) {
+ return std::get<0>(timestamp_heap_[0]);
+ } else {
+ return std::get<0>(message_heap_[0]);
+ }
+ }
+
private:
// Pushes messages and timestamps to the corresponding heaps.
void PushMessageHeap(
@@ -576,12 +591,6 @@
FlatbufferVector<MessageHeader>>
PopOldest();
- // Returns the oldest timestamp in the timestamp heap.
- TimestampMerger::DeliveryTimestamp OldestTimestamp() const;
- // Returns the oldest timestamp in the timestamp heap for a specific channel.
- TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
- int channel) const;
-
// Returns the config for this set of log files.
const Configuration *configuration() const {
return log_file_header()->configuration();
@@ -628,6 +637,9 @@
void PushChannelHeap(monotonic_clock::time_point timestamp,
int channel_index);
+ // CHECKs that channel_heap_ and timestamp_heap_ are valid heaps.
+ void VerifyHeaps();
+
// All the message readers.
std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
@@ -639,9 +651,6 @@
// A heap of the channel readers and timestamps for the oldest data in each.
std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
- // A heap of just the timestamp channel readers and timestamps for the oldest
- // data in each.
- std::vector<std::pair<monotonic_clock::time_point, int>> timestamp_heap_;
// Configured node.
const Node *node_;
@@ -650,6 +659,9 @@
// Cached copy of the list of nodes.
std::vector<const Node *> nodes_;
+
+ // Last time popped. Used to detect events being returned out of order.
+ monotonic_clock::time_point last_popped_time_ = monotonic_clock::min_time;
};
// Returns the node name with a trailing space, or an empty string if we are on
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 242638c..caccf03 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -8,13 +8,16 @@
#include <vector>
#include "Eigen/Dense"
+#include "absl/strings/escaping.h"
#include "absl/types/span.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
#include "aos/flatbuffer_merge.h"
#include "aos/network/team_number.h"
#include "aos/time/time.h"
#include "flatbuffers/flatbuffers.h"
+#include "third_party/gmp/gmpxx.h"
DEFINE_bool(skip_missing_forwarding_entries, false,
"If true, drop any forwarding entries with missing data. If "
@@ -25,26 +28,98 @@
"of CSV files in /tmp/. This should only be needed when debugging "
"time synchronization.");
+DEFINE_bool(skip_order_validation, false,
+ "If true, ignore any out of orderness in replay");
+
namespace aos {
namespace logger {
-
namespace chrono = std::chrono;
-Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
- std::chrono::milliseconds polling_period)
- : Logger(std::make_unique<LocalLogNamer>(writer, event_loop->node()),
- event_loop, polling_period) {}
+Logger::Logger(std::string_view base_name, EventLoop *event_loop,
+ std::chrono::milliseconds polling_period)
+ : Logger(base_name, event_loop, event_loop->configuration(),
+ polling_period) {}
+Logger::Logger(std::string_view base_name, EventLoop *event_loop,
+ const Configuration *configuration,
+ std::chrono::milliseconds polling_period)
+ : Logger(std::make_unique<LocalLogNamer>(base_name, event_loop->node()),
+ event_loop, configuration, polling_period) {}
Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
std::chrono::milliseconds polling_period)
+ : Logger(std::move(log_namer), event_loop, event_loop->configuration(),
+ polling_period) {}
+
+Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
+ const Configuration *configuration,
+ std::chrono::milliseconds polling_period)
: event_loop_(event_loop),
+ uuid_(UUID::Random()),
log_namer_(std::move(log_namer)),
+ configuration_(configuration),
+ name_(network::GetHostname()),
timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
- polling_period_(polling_period) {
+ polling_period_(polling_period),
+ server_statistics_fetcher_(
+ configuration::MultiNode(event_loop_->configuration())
+ ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
+ "/aos")
+ : aos::Fetcher<message_bridge::ServerStatistics>()) {
VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
int channel_index = 0;
- for (const Channel *channel : *event_loop_->configuration()->channels()) {
+
+ // Find all the nodes which are logging timestamps on our node.
+ std::set<const Node *> timestamp_logger_nodes;
+ for (const Channel *channel : *configuration_->channels()) {
+ if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) ||
+ !channel->has_destination_nodes()) {
+ continue;
+ }
+ for (const Connection *connection : *channel->destination_nodes()) {
+ const Node *other_node = configuration::GetNode(
+ configuration_, connection->name()->string_view());
+
+ if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ connection, event_loop_->node())) {
+ VLOG(1) << "Timestamps are logged from "
+ << FlatbufferToJson(other_node);
+ timestamp_logger_nodes.insert(other_node);
+ }
+ }
+ }
+
+ std::map<const Channel *, const Node *> timestamp_logger_channels;
+
+ // Now that we have all the nodes accumulated, make remote timestamp loggers
+ // for them.
+ for (const Node *node : timestamp_logger_nodes) {
+ const Channel *channel = configuration::GetChannel(
+ configuration_,
+ absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
+ logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
+ event_loop_->node());
+
+ CHECK(channel != nullptr)
+ << ": Remote timestamps are logged on "
+ << event_loop_->node()->name()->string_view()
+ << " but can't find channel /aos/remote_timestamps/"
+ << node->name()->string_view();
+ timestamp_logger_channels.insert(std::make_pair(channel, node));
+ }
+
+ const size_t our_node_index = configuration::GetNodeIndex(
+ configuration_, event_loop_->node());
+
+ for (const Channel *config_channel : *configuration_->channels()) {
+ // The MakeRawFetcher method needs a channel which is in the event loop
+ // configuration() object, not the configuration_ object. Go look that up
+ // from the config.
+ const Channel *channel = aos::configuration::GetChannel(
+ event_loop_->configuration(), config_channel->name()->string_view(),
+ config_channel->type()->string_view(), "", event_loop_->node());
+
FetcherStruct fs;
+ fs.node_index = our_node_index;
const bool is_local =
configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
@@ -60,7 +135,15 @@
: configuration::ConnectionDeliveryTimeIsLoggedOnNode(
channel, event_loop_->node(), event_loop_->node());
- if (log_message || log_delivery_times) {
+ // Now, detect a MessageHeader timestamp logger where we should just log the
+ // contents to a file directly.
+ const bool log_contents = timestamp_logger_channels.find(channel) !=
+ timestamp_logger_channels.end();
+ const Node *timestamp_node =
+ log_contents ? timestamp_logger_channels.find(channel)->second
+ : nullptr;
+
+ if (log_message || log_delivery_times || log_contents) {
fs.fetcher = event_loop->MakeRawFetcher(channel);
VLOG(1) << "Logging channel "
<< configuration::CleanedChannelToString(channel);
@@ -76,6 +159,14 @@
fs.log_type = LogType::kLogRemoteMessage;
}
}
+ if (log_contents) {
+ VLOG(1) << "Timestamp logger channel "
+ << configuration::CleanedChannelToString(channel);
+ fs.contents_writer =
+ log_namer_->MakeForwardedTimestampWriter(channel, timestamp_node);
+ fs.node_index =
+ configuration::GetNodeIndex(configuration_, timestamp_node);
+ }
fs.channel_index = channel_index;
fs.written = false;
fetchers_.emplace_back(std::move(fs));
@@ -83,63 +174,207 @@
++channel_index;
}
- // When things start, we want to log the header, then the most recent messages
- // available on each fetcher to capture the previous state, then start
- // polling.
- event_loop_->OnRun([this, polling_period]() {
- // Grab data from each channel right before we declare the log file started
- // so we can capture the latest message on each channel. This lets us have
- // non periodic messages with configuration that now get logged.
- for (FetcherStruct &f : fetchers_) {
- f.written = !f.fetcher->Fetch();
- }
+ node_state_.resize(configuration::MultiNode(configuration_)
+ ? configuration_->nodes()->size()
+ : 1u);
- // We need to pick a point in time to declare the log file "started". This
- // starts here. It needs to be after everything is fetched so that the
- // fetchers are all pointed at the most recent message before the start
- // time.
- monotonic_start_time_ = event_loop_->monotonic_now();
- realtime_start_time_ = event_loop_->realtime_now();
- last_synchronized_time_ = monotonic_start_time_;
+ for (const Node *node : log_namer_->nodes()) {
+ const int node_index =
+ configuration::GetNodeIndex(configuration_, node);
- LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
- << " start_time " << monotonic_start_time_;
+ node_state_[node_index].log_file_header = MakeHeader(node);
+ }
- WriteHeader();
-
- timer_handler_->Setup(event_loop_->monotonic_now() + polling_period,
- polling_period);
- });
+ // When things start, we want to log the header, then the most recent
+ // messages available on each fetcher to capture the previous state, then
+ // start polling.
+ event_loop_->OnRun([this]() { StartLogging(); });
}
-// TODO(austin): Set the remote start time to the first time we see a remote
-// message when we are logging those messages separate? Need to signal what to
-// do, or how to get a good timestamp.
+Logger::~Logger() {
+ // If we are replaying a log file, or in simulation, we want to force the last
+ // bit of data to be logged. The easiest way to deal with this is to poll
+ // everything as we go to destroy the class, ie, shut down the logger, and
+ // write it to disk.
+ DoLogData();
+}
+
+void Logger::StartLogging() {
+ // Grab data from each channel right before we declare the log file started
+ // so we can capture the latest message on each channel. This lets us have
+ // non periodic messages with configuration that now get logged.
+ for (FetcherStruct &f : fetchers_) {
+ f.written = !f.fetcher->Fetch();
+ }
+
+ // Clear out any old timestamps in case we are re-starting logging.
+ for (size_t i = 0; i < node_state_.size(); ++i) {
+ SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time);
+ }
+
+ WriteHeader();
+
+ LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
+ << " start_time " << last_synchronized_time_;
+
+ timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
+ polling_period_);
+}
+
void Logger::WriteHeader() {
+ if (configuration::MultiNode(configuration_)) {
+ server_statistics_fetcher_.Fetch();
+ }
+
+ aos::monotonic_clock::time_point monotonic_start_time =
+ event_loop_->monotonic_now();
+ aos::realtime_clock::time_point realtime_start_time =
+ event_loop_->realtime_now();
+
+ // We need to pick a point in time to declare the log file "started". This
+ // starts here. It needs to be after everything is fetched so that the
+ // fetchers are all pointed at the most recent message before the start
+ // time.
+ last_synchronized_time_ = monotonic_start_time;
+
for (const Node *node : log_namer_->nodes()) {
- WriteHeader(node);
+ const int node_index =
+ configuration::GetNodeIndex(configuration_, node);
+ MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
+ realtime_start_time);
+ log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
}
}
-void Logger::WriteHeader(const Node *node) {
+void Logger::WriteMissingTimestamps() {
+ if (configuration::MultiNode(configuration_)) {
+ server_statistics_fetcher_.Fetch();
+ } else {
+ return;
+ }
+
+ if (server_statistics_fetcher_.get() == nullptr) {
+ return;
+ }
+
+ for (const Node *node : log_namer_->nodes()) {
+ const int node_index =
+ configuration::GetNodeIndex(configuration_, node);
+ if (MaybeUpdateTimestamp(
+ node, node_index,
+ server_statistics_fetcher_.context().monotonic_event_time,
+ server_statistics_fetcher_.context().realtime_event_time)) {
+ log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+ }
+ }
+}
+
+void Logger::SetStartTime(size_t node_index,
+ aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time) {
+ node_state_[node_index].monotonic_start_time = monotonic_start_time;
+ node_state_[node_index].realtime_start_time = realtime_start_time;
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ monotonic_start_time.time_since_epoch())
+ .count());
+ if (node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->has_realtime_start_time()) {
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_start_time.time_since_epoch())
+ .count());
+ }
+}
+
+bool Logger::MaybeUpdateTimestamp(
+ const Node *node, int node_index,
+ aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time) {
+ // Bail early if there the start times are already set.
+ if (node_state_[node_index].monotonic_start_time !=
+ monotonic_clock::min_time) {
+ return false;
+ }
+ if (configuration::MultiNode(configuration_)) {
+ if (event_loop_->node() == node) {
+ // There are no offsets to compute for ourself, so always succeed.
+ SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+ return true;
+ } else if (server_statistics_fetcher_.get() != nullptr) {
+ // We must be a remote node now. Look for the connection and see if it is
+ // connected.
+
+ for (const message_bridge::ServerConnection *connection :
+ *server_statistics_fetcher_->connections()) {
+ if (connection->node()->name()->string_view() !=
+ node->name()->string_view()) {
+ continue;
+ }
+
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ VLOG(1) << node->name()->string_view()
+ << " is not connected, can't start it yet.";
+ break;
+ }
+
+ if (!connection->has_monotonic_offset()) {
+ VLOG(1) << "Missing monotonic offset for setting start time for node "
+ << aos::FlatbufferToJson(node);
+ break;
+ }
+
+ VLOG(1) << "Updating start time for " << aos::FlatbufferToJson(node);
+
+ // Found it and it is connected. Compensate and go.
+ monotonic_start_time +=
+ std::chrono::nanoseconds(connection->monotonic_offset());
+
+ SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+ return true;
+ }
+ }
+ } else {
+ SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+ return true;
+ }
+ return false;
+}
+
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
+ const Node *node) {
// Now write the header with this timestamp in it.
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
+ // TODO(austin): Compress this much more efficiently. There are a bunch of
+ // duplicated schemas.
flatbuffers::Offset<aos::Configuration> configuration_offset =
- CopyFlatBuffer(event_loop_->configuration(), &fbb);
+ CopyFlatBuffer(configuration_, &fbb);
- flatbuffers::Offset<flatbuffers::String> string_offset =
- fbb.CreateString(network::GetHostname());
+ flatbuffers::Offset<flatbuffers::String> name_offset =
+ fbb.CreateString(name_);
+
+ flatbuffers::Offset<flatbuffers::String> logger_uuid_offset =
+ fbb.CreateString(uuid_.string_view());
+
+ flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
+ fbb.CreateString("00000000-0000-4000-8000-000000000000");
flatbuffers::Offset<Node> node_offset;
- if (event_loop_->node() != nullptr) {
+
+ if (configuration::MultiNode(configuration_)) {
node_offset = CopyFlatBuffer(node, &fbb);
}
aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
- log_file_header_builder.add_name(string_offset);
+ log_file_header_builder.add_name(name_offset);
// Only add the node if we are running in a multinode configuration.
if (node != nullptr) {
@@ -158,41 +393,138 @@
log_file_header_builder.add_monotonic_start_time(
std::chrono::duration_cast<std::chrono::nanoseconds>(
- monotonic_start_time_.time_since_epoch())
+ monotonic_clock::min_time.time_since_epoch())
.count());
- log_file_header_builder.add_realtime_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- realtime_start_time_.time_since_epoch())
- .count());
-
- fbb.FinishSizePrefixed(log_file_header_builder.Finish());
- log_namer_->WriteHeader(&fbb, node);
-}
-
-void Logger::Rotate(DetachedBufferWriter *writer) {
- Rotate(std::make_unique<LocalLogNamer>(writer, event_loop_->node()));
-}
-
-void Logger::Rotate(std::unique_ptr<LogNamer> log_namer) {
- // Force data up until now to be written.
- DoLogData();
-
- // Swap the writer out, and re-write the header.
- log_namer_ = std::move(log_namer);
-
- // And then update the writers.
- for (FetcherStruct &f : fetchers_) {
- const Channel *channel =
- event_loop_->configuration()->channels()->Get(f.channel_index);
- if (f.timestamp_writer != nullptr) {
- f.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
- }
- if (f.writer != nullptr) {
- f.writer = log_namer_->MakeWriter(channel);
- }
+ if (node == event_loop_->node()) {
+ log_file_header_builder.add_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_clock::min_time.time_since_epoch())
+ .count());
}
- WriteHeader();
+ log_file_header_builder.add_logger_uuid(logger_uuid_offset);
+
+ log_file_header_builder.add_parts_uuid(parts_uuid_offset);
+ log_file_header_builder.add_parts_index(0);
+
+ fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+ return fbb.Release();
+}
+
+void Logger::Rotate() {
+ for (const Node *node : log_namer_->nodes()) {
+ const int node_index =
+ configuration::GetNodeIndex(configuration_, node);
+ log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+ }
+}
+
+void Logger::LogUntil(monotonic_clock::time_point t) {
+ WriteMissingTimestamps();
+
+ // Write each channel to disk, one at a time.
+ for (FetcherStruct &f : fetchers_) {
+ while (true) {
+ if (f.written) {
+ if (!f.fetcher->FetchNext()) {
+ VLOG(2) << "No new data on "
+ << configuration::CleanedChannelToString(
+ f.fetcher->channel());
+ break;
+ } else {
+ f.written = false;
+ }
+ }
+
+ CHECK(!f.written);
+
+ // TODO(james): Write tests to exercise this logic.
+ if (f.fetcher->context().monotonic_event_time < t) {
+ if (f.writer != nullptr) {
+ // Write!
+ flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+ max_header_size_);
+ fbb.ForceDefaults(true);
+
+ fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+ f.channel_index, f.log_type));
+
+ VLOG(2) << "Writing data as node "
+ << FlatbufferToJson(event_loop_->node()) << " for channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel())
+ << " to " << f.writer->filename() << " data "
+ << FlatbufferToJson(
+ flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
+
+ max_header_size_ = std::max(
+ max_header_size_, fbb.GetSize() - f.fetcher->context().size);
+ f.writer->QueueSizedFlatbuffer(&fbb);
+ }
+
+ if (f.timestamp_writer != nullptr) {
+ // And now handle timestamps.
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+ f.channel_index,
+ LogType::kLogDeliveryTimeOnly));
+
+ VLOG(2) << "Writing timestamps as node "
+ << FlatbufferToJson(event_loop_->node()) << " for channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel())
+ << " to " << f.timestamp_writer->filename() << " timestamp "
+ << FlatbufferToJson(
+ flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
+
+ f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
+ }
+
+ if (f.contents_writer != nullptr) {
+ // And now handle the special message contents channel. Copy the
+ // message into a FlatBufferBuilder and save it to disk.
+ // TODO(austin): We can be more efficient here when we start to
+ // care...
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ const MessageHeader *msg =
+ flatbuffers::GetRoot<MessageHeader>(f.fetcher->context().data);
+
+ logger::MessageHeader::Builder message_header_builder(fbb);
+
+ // Note: this must match the same order as MessageBridgeServer and
+ // PackMessage. We want identical headers to have identical
+ // on-the-wire formats to make comparing them easier.
+ message_header_builder.add_channel_index(msg->channel_index());
+
+ message_header_builder.add_queue_index(msg->queue_index());
+ message_header_builder.add_monotonic_sent_time(
+ msg->monotonic_sent_time());
+ message_header_builder.add_realtime_sent_time(
+ msg->realtime_sent_time());
+
+ message_header_builder.add_monotonic_remote_time(
+ msg->monotonic_remote_time());
+ message_header_builder.add_realtime_remote_time(
+ msg->realtime_remote_time());
+ message_header_builder.add_remote_queue_index(
+ msg->remote_queue_index());
+
+ fbb.FinishSizePrefixed(message_header_builder.Finish());
+
+ f.contents_writer->QueueSizedFlatbuffer(&fbb);
+ }
+
+ f.written = true;
+ } else {
+ break;
+ }
+ }
+ }
+ last_synchronized_time_ = t;
}
void Logger::DoLogData() {
@@ -205,83 +537,107 @@
do {
// Move the sync point up by at most polling_period. This forces one sync
// per iteration, even if it is small.
- last_synchronized_time_ =
- std::min(last_synchronized_time_ + polling_period_, monotonic_now);
- // Write each channel to disk, one at a time.
- for (FetcherStruct &f : fetchers_) {
- while (true) {
- if (f.written) {
- if (!f.fetcher->FetchNext()) {
- VLOG(2) << "No new data on "
- << configuration::CleanedChannelToString(
- f.fetcher->channel());
- break;
- } else {
- f.written = false;
- }
- }
-
- CHECK(!f.written);
-
- // TODO(james): Write tests to exercise this logic.
- if (f.fetcher->context().monotonic_event_time <
- last_synchronized_time_) {
- if (f.writer != nullptr) {
- // Write!
- flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
- max_header_size_);
- fbb.ForceDefaults(true);
-
- fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- f.channel_index, f.log_type));
-
- VLOG(2) << "Writing data as node "
- << FlatbufferToJson(event_loop_->node()) << " for channel "
- << configuration::CleanedChannelToString(
- f.fetcher->channel())
- << " to " << f.writer->filename() << " data "
- << FlatbufferToJson(
- flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
-
- max_header_size_ = std::max(
- max_header_size_, fbb.GetSize() - f.fetcher->context().size);
- f.writer->QueueSizedFlatbuffer(&fbb);
- }
-
- if (f.timestamp_writer != nullptr) {
- // And now handle timestamps.
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
-
- fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- f.channel_index,
- LogType::kLogDeliveryTimeOnly));
-
- VLOG(2) << "Writing timestamps as node "
- << FlatbufferToJson(event_loop_->node()) << " for channel "
- << configuration::CleanedChannelToString(
- f.fetcher->channel())
- << " to " << f.timestamp_writer->filename() << " timestamp "
- << FlatbufferToJson(
- flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
-
- f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
- }
-
- f.written = true;
- } else {
- break;
- }
- }
- }
+ LogUntil(
+ std::min(last_synchronized_time_ + polling_period_, monotonic_now));
// If we missed cycles, we could be pretty far behind. Spin until we are
// caught up.
} while (last_synchronized_time_ + polling_period_ < monotonic_now);
}
+std::vector<std::vector<std::string>> SortParts(
+ const std::vector<std::string> &parts) {
+ // Start by grouping all parts by UUID, and extracting the part index.
+ std::map<std::string, std::vector<std::pair<std::string, int>>> parts_list;
+
+ // Sort part files without UUIDs and part indexes as well. Extract everything
+ // useful from the log in the first pass, then sort later.
+ struct LogPart {
+ std::string filename;
+ monotonic_clock::time_point start_time;
+ monotonic_clock::time_point first_message_time;
+ };
+
+ std::vector<LogPart> old_parts;
+
+ for (const std::string &part : parts) {
+ FlatbufferVector<LogFileHeader> log_header = ReadHeader(part);
+
+ // Looks like an old log. No UUID, index, and also single node. We have
+ // little to no multi-node log files in the wild without part UUIDs and
+ // indexes which we care much about.
+ if (!log_header.message().has_parts_uuid() &&
+ !log_header.message().has_parts_index() &&
+ !log_header.message().has_node()) {
+ LogPart log_part;
+ log_part.filename = part;
+ log_part.start_time = monotonic_clock::time_point(
+ chrono::nanoseconds(log_header.message().monotonic_start_time()));
+ FlatbufferVector<MessageHeader> first_message = ReadNthMessage(part, 0);
+ log_part.first_message_time = monotonic_clock::time_point(
+ chrono::nanoseconds(first_message.message().monotonic_sent_time()));
+ old_parts.emplace_back(std::move(log_part));
+ continue;
+ }
+
+ CHECK(log_header.message().has_parts_uuid());
+ CHECK(log_header.message().has_parts_index());
+
+ const std::string parts_uuid = log_header.message().parts_uuid()->str();
+ auto it = parts_list.find(parts_uuid);
+ if (it == parts_list.end()) {
+ it = parts_list
+ .insert(std::make_pair(
+ parts_uuid, std::vector<std::pair<std::string, int>>{}))
+ .first;
+ }
+ it->second.emplace_back(
+ std::make_pair(part, log_header.message().parts_index()));
+ }
+
+ CHECK_NE(old_parts.empty(), parts_list.empty())
+ << ": Can't have a mix of old and new parts.";
+
+ if (!old_parts.empty()) {
+ // Confirm they all have the same start time. Old loggers always used the
+ // same start time.
+ for (const LogPart &p : old_parts) {
+ CHECK_EQ(old_parts[0].start_time, p.start_time);
+ }
+ // Sort by the oldest message in each file.
+ std::sort(old_parts.begin(), old_parts.end(),
+ [](const LogPart &a, const LogPart &b) {
+ return a.first_message_time < b.first_message_time;
+ });
+
+ // Produce the final form.
+ std::vector<std::string> sorted_old_parts;
+ sorted_old_parts.reserve(old_parts.size());
+ for (LogPart &p : old_parts) {
+ sorted_old_parts.emplace_back(std::move(p.filename));
+ }
+ return std::vector<std::vector<std::string>>{std::move(sorted_old_parts)};
+ }
+
+ // Now, sort them and produce the final vector form.
+ std::vector<std::vector<std::string>> result;
+ result.reserve(parts_list.size());
+ for (auto &part : parts_list) {
+ std::sort(part.second.begin(), part.second.end(),
+ [](const std::pair<std::string, int> &a,
+ const std::pair<std::string, int> &b) {
+ return a.second < b.second;
+ });
+ std::vector<std::string> result_line;
+ result_line.reserve(part.second.size());
+ for (std::pair<std::string, int> &p : part.second) {
+ result_line.emplace_back(std::move(p.first));
+ }
+ result.emplace_back(std::move(result_line));
+ }
+ return result;
+}
+
LogReader::LogReader(std::string_view filename,
const Configuration *replay_configuration)
: LogReader(std::vector<std::string>{std::string(filename)},
@@ -302,7 +658,8 @@
if (replay_configuration) {
CHECK_EQ(configuration::MultiNode(configuration()),
configuration::MultiNode(replay_configuration))
- << ": Log file and replay config need to both be multi or single node.";
+ << ": Log file and replay config need to both be multi or single "
+ "node.";
}
if (!configuration::MultiNode(configuration())) {
@@ -312,12 +669,13 @@
if (replay_configuration) {
CHECK_EQ(logged_configuration()->nodes()->size(),
replay_configuration->nodes()->size())
- << ": Log file and replay config need to have matching nodes lists.";
+ << ": Log file and replay config need to have matching nodes "
+ "lists.";
for (const Node *node : *logged_configuration()->nodes()) {
if (configuration::GetNode(replay_configuration, node) == nullptr) {
- LOG(FATAL)
- << "Found node " << FlatbufferToJson(node)
- << " in logged config that is not present in the replay config.";
+ LOG(FATAL) << "Found node " << FlatbufferToJson(node)
+ << " in logged config that is not present in the replay "
+ "config.";
}
}
}
@@ -335,8 +693,8 @@
if (offset_fp_ != nullptr) {
fclose(offset_fp_);
}
- // Zero out some buffers. It's easy to do use-after-frees on these, so make it
- // more obvious.
+ // Zero out some buffers. It's easy to do use-after-frees on these, so make
+ // it more obvious.
if (remapped_configuration_buffer_) {
remapped_configuration_buffer_->Wipe();
}
@@ -352,8 +710,8 @@
}
std::vector<const Node *> LogReader::Nodes() const {
- // Because the Node pointer will only be valid if it actually points to memory
- // owned by remapped_configuration_, we need to wait for the
+ // Because the Node pointer will only be valid if it actually points to
+ // memory owned by remapped_configuration_, we need to wait for the
// remapped_configuration_ to be populated before accessing it.
//
// Also, note, that when ever a map is changed, the nodes in here are
@@ -404,38 +762,45 @@
"you sure that the replay config matches the original config?";
}
- // We need to now seed our per-node time offsets and get everything set up to
- // run.
- const size_t num_nodes = !configuration::MultiNode(logged_configuration())
- ? 1u
- : logged_configuration()->nodes()->size();
+ // We need to now seed our per-node time offsets and get everything set up
+ // to run.
+ const size_t num_nodes = nodes_count();
// It is easiest to solve for per node offsets with a matrix rather than
// trying to solve the equations by hand. So let's get after it.
//
// Now, build up the map matrix.
//
- // sample_matrix_ = map_matrix_ * offset_matrix_
- map_matrix_ = Eigen::MatrixXd::Zero(filters_.size() + 1, num_nodes);
+ // offset_matrix_ = (map_matrix_ + slope_matrix_) * [ta; tb; tc]
+ map_matrix_ = Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ filters_.size() + 1, num_nodes);
+ slope_matrix_ =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ filters_.size() + 1, num_nodes);
- sample_matrix_ = Eigen::VectorXd::Zero(filters_.size() + 1);
- offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
+ offset_matrix_ =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
+ valid_matrix_ =
+ Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
+ last_valid_matrix_ =
+ Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
- // And the base offset matrix, which will be a copy of the initial offset
- // matrix.
- base_offset_matrix_ =
- Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>::Zero(
- num_nodes);
+ time_offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
+ time_slope_matrix_ = Eigen::VectorXd::Zero(num_nodes);
- // All offsets should sum to 0. Add that as the first constraint in our least
- // squares.
- map_matrix_.row(0).setOnes();
+ // All times should average out to the distributed clock.
+ for (int i = 0; i < map_matrix_.cols(); ++i) {
+ // 1/num_nodes.
+ map_matrix_(0, i) = mpq_class(1, num_nodes);
+ }
+ valid_matrix_(0) = true;
{
// Now, add the a - b -> sample elements.
size_t i = 1;
for (std::pair<const std::tuple<const Node *, const Node *>,
- message_bridge::ClippedAverageFilter> &filter : filters_) {
+ std::tuple<message_bridge::NoncausalOffsetEstimator>>
+ &filter : filters_) {
const Node *const node_a = std::get<0>(filter.first);
const Node *const node_b = std::get<1>(filter.first);
@@ -444,136 +809,68 @@
const size_t node_b_index =
configuration::GetNodeIndex(configuration(), node_b);
- // +a
- map_matrix_(i, node_a_index) = 1.0;
- // -b
- map_matrix_(i, node_b_index) = -1.0;
+ // -a
+ map_matrix_(i, node_a_index) = mpq_class(-1);
+ // +b
+ map_matrix_(i, node_b_index) = mpq_class(1);
// -> sample
- filter.second.set_sample_pointer(&sample_matrix_(i, 0));
+ std::get<0>(filter.second)
+ .set_slope_pointer(&slope_matrix_(i, node_a_index));
+ std::get<0>(filter.second).set_offset_pointer(&offset_matrix_(i, 0));
+
+ valid_matrix_(i) = false;
+ std::get<0>(filter.second).set_valid_pointer(&valid_matrix_(i));
++i;
}
}
- // Rank of the map matrix tells you if all the nodes are in communication with
- // each other, which tells you if the offsets are observable.
- const size_t connected_nodes =
- Eigen::FullPivLU<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>>(
- map_matrix_)
- .rank();
-
- // We don't need to support isolated nodes until someone has a real use case.
- CHECK_EQ(connected_nodes, num_nodes)
- << ": There is a node which isn't communicating with the rest.";
-
- // Now, iterate through all the timestamps from all the nodes and seed
- // everything.
- for (std::unique_ptr<State> &state : states_) {
- for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
- TimestampMerger::DeliveryTimestamp timestamp =
- state->OldestTimestampForChannel(i);
- if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
- CHECK(state->MaybeUpdateTimestamp(timestamp, i));
- }
- }
- }
-
- // Make sure all the samples have been seeded.
- for (int i = 1; i < sample_matrix_.cols(); ++i) {
- // The seeding logic is pretty basic right now because we don't have great
- // use cases yet. It wants to see data from every node. Blow up for now,
- // and once we have a reason to do something different, update this logic.
- // Maybe read further in the log file? Or seed off the realtime time?
- CHECK_NE(sample_matrix_(i, 0), 0.0)
- << ": Sample " << i << " is not seeded.";
- }
-
- // And solve.
- offset_matrix_ = SolveOffsets();
-
- // Save off the base offsets so we can work in deltas from here out. That
- // will significantly simplify the numerical precision problems.
- for (size_t i = 0; i < num_nodes; ++i) {
- base_offset_matrix_(i, 0) =
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- std::chrono::duration<double>(offset_matrix_(i, 0)));
- }
-
- {
- // Shift everything so we never could (reasonably) require the distributed
- // clock to have a large backwards jump in time. This makes it so the boot
- // time on the node up the longest will essentially start matching the
- // distributed clock.
- const chrono::nanoseconds offset = -base_offset_matrix_.maxCoeff();
- for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
- base_offset_matrix_(i, 0) += offset;
- }
- }
-
- {
- // Re-compute the samples and setup all the filters so that they
- // subtract this base offset.
-
- size_t i = 1;
- for (std::pair<const std::tuple<const Node *, const Node *>,
- message_bridge::ClippedAverageFilter> &filter : filters_) {
- CHECK(filter.second.sample_pointer() == &sample_matrix_(i, 0));
-
- const Node *const node_a = std::get<0>(filter.first);
- const Node *const node_b = std::get<1>(filter.first);
-
- const size_t node_a_index =
- configuration::GetNodeIndex(configuration(), node_a);
- const size_t node_b_index =
- configuration::GetNodeIndex(configuration(), node_b);
-
- filter.second.set_base_offset(base_offset_matrix_(node_a_index) -
- base_offset_matrix_(node_b_index));
-
- ++i;
- }
- }
-
- // Now, iterate again through all the offsets now that we have set the base
- // offset to something sane. This will seed everything with an accurate
- // initial offset.
- for (std::unique_ptr<State> &state : states_) {
- for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
- TimestampMerger::DeliveryTimestamp timestamp =
- state->OldestTimestampForChannel(i);
- if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
- CHECK(state->MaybeUpdateTimestamp(timestamp, i));
- }
- }
- }
-
for (std::unique_ptr<State> &state : states_) {
state->SeedSortedMessages();
}
+ // Rank of the map matrix tells you if all the nodes are in communication
+ // with each other, which tells you if the offsets are observable.
+ const size_t connected_nodes =
+ Eigen::FullPivLU<
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>(map_matrix_)
+ .rank();
+
+ // We don't need to support isolated nodes until someone has a real use
+ // case.
+ CHECK_EQ(connected_nodes, num_nodes)
+ << ": There is a node which isn't communicating with the rest.";
+
+ // And solve.
UpdateOffsets();
- // We want to start the log file at the last start time of the log files from
- // all the nodes. Compute how long each node's simulation needs to run to
- // move time to this point.
+ // We want to start the log file at the last start time of the log files
+ // from all the nodes. Compute how long each node's simulation needs to run
+ // to move time to this point.
distributed_clock::time_point start_time = distributed_clock::min_time;
+ // TODO(austin): We want an "OnStart" callback for each node rather than
+ // running until the last node.
+
for (std::unique_ptr<State> &state : states_) {
- // Setup the realtime clock to have something sane in it now.
- state->SetRealtimeOffset(state->monotonic_start_time(),
- state->realtime_start_time());
- // And start computing the start time on the distributed clock now that that
- // works.
+ VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
+ << MaybeNodeName(state->event_loop()->node()) << "now "
+ << state->monotonic_now();
+ // And start computing the start time on the distributed clock now that
+ // that works.
start_time = std::max(
start_time, state->ToDistributedClock(state->monotonic_start_time()));
}
- CHECK_GE(start_time, distributed_clock::epoch());
+
+ CHECK_GE(start_time, distributed_clock::epoch())
+ << ": Hmm, we have a node starting before the start of time. Offset "
+ "everything.";
// Forwarding is tracked per channel. If it is enabled, we want to turn it
// off. Otherwise messages replayed will get forwarded across to the other
- // nodes, and also replayed on the other nodes. This may not satisfy all our
- // users, but it'll start the discussion.
+ // nodes, and also replayed on the other nodes. This may not satisfy all
+ // our users, but it'll start the discussion.
if (configuration::MultiNode(event_loop_factory_->configuration())) {
for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
const Channel *channel = logged_configuration()->channels()->Get(i);
@@ -598,34 +895,177 @@
// to timestamps on log files where the timestamp log file starts before the
// data. In this case, it is reasonable to expect missing data.
ignore_missing_data_ = true;
- VLOG(1) << "Running until start time: " << start_time;
+ VLOG(1) << "Running until " << start_time << " in Register";
event_loop_factory_->RunFor(start_time.time_since_epoch());
VLOG(1) << "At start time";
// Now that we are running for real, missing data means that the log file is
// corrupted or went wrong.
ignore_missing_data_ = false;
-}
-void LogReader::UpdateOffsets() {
- // TODO(austin): Evaluate less accurate inverses. We might be able to
- // do some tricks to keep the accuracy up.
- offset_matrix_ = SolveOffsets();
-
- size_t node_index = 0;
for (std::unique_ptr<State> &state : states_) {
- state->SetDistributedOffset(-offset(node_index), 1.0);
- ++node_index;
+ // Make the RT clock be correct before handing it to the user.
+ if (state->realtime_start_time() != realtime_clock::min_time) {
+ state->SetRealtimeOffset(state->monotonic_start_time(),
+ state->realtime_start_time());
+ }
+ VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
+ << MaybeNodeName(state->event_loop()->node()) << "now "
+ << state->monotonic_now();
+ }
+
+ if (FLAGS_timestamps_to_csv) {
+ for (std::pair<const std::tuple<const Node *, const Node *>,
+ std::tuple<message_bridge::NoncausalOffsetEstimator>>
+ &filter : filters_) {
+ const Node *const node_a = std::get<0>(filter.first);
+ const Node *const node_b = std::get<1>(filter.first);
+
+ std::get<0>(filter.second)
+ .SetFirstFwdTime(event_loop_factory_->GetNodeEventLoopFactory(node_a)
+ ->monotonic_now());
+ std::get<0>(filter.second)
+ .SetFirstRevTime(event_loop_factory_->GetNodeEventLoopFactory(node_b)
+ ->monotonic_now());
+ }
}
}
-std::tuple<message_bridge::ClippedAverageFilter *, bool> LogReader::GetFilter(
+void LogReader::UpdateOffsets() {
+ VLOG(2) << "Samples are " << offset_matrix_;
+ VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
+ std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
+ Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
+ "]");
+ VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
+ << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
+
+ size_t node_index = 0;
+ for (std::unique_ptr<State> &state : states_) {
+ state->SetDistributedOffset(offset(node_index), slope(node_index));
+ VLOG(1) << "Offset for node " << node_index << " "
+ << MaybeNodeName(state->event_loop()->node()) << "is "
+ << aos::distributed_clock::time_point(offset(node_index))
+ << " slope " << std::setprecision(9) << std::fixed
+ << slope(node_index);
+ ++node_index;
+ }
+
+ if (VLOG_IS_ON(1)) {
+ LogFit("Offset is");
+ }
+}
+
+void LogReader::LogFit(std::string_view prefix) {
+ for (std::unique_ptr<State> &state : states_) {
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << " now "
+ << state->monotonic_now() << " distributed "
+ << event_loop_factory_->distributed_now();
+ }
+
+ for (std::pair<const std::tuple<const Node *, const Node *>,
+ std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
+ filters_) {
+ message_bridge::NoncausalOffsetEstimator *estimator =
+ &std::get<0>(filter.second);
+
+ if (estimator->a_timestamps().size() == 0 &&
+ estimator->b_timestamps().size() == 0) {
+ continue;
+ }
+
+ if (VLOG_IS_ON(1)) {
+ estimator->LogFit(prefix);
+ }
+
+ const Node *const node_a = std::get<0>(filter.first);
+ const Node *const node_b = std::get<1>(filter.first);
+
+ const size_t node_a_index =
+ configuration::GetNodeIndex(configuration(), node_a);
+ const size_t node_b_index =
+ configuration::GetNodeIndex(configuration(), node_b);
+
+ const double recovered_slope =
+ slope(node_b_index) / slope(node_a_index) - 1.0;
+ const int64_t recovered_offset =
+ offset(node_b_index).count() - offset(node_a_index).count() *
+ slope(node_b_index) /
+ slope(node_a_index);
+
+ VLOG(1) << "Recovered slope " << std::setprecision(20) << recovered_slope
+ << " (error " << recovered_slope - estimator->fit().slope() << ") "
+ << " offset " << std::setprecision(20) << recovered_offset
+ << " (error "
+ << recovered_offset - estimator->fit().offset().count() << ")";
+
+ const aos::distributed_clock::time_point a0 =
+ states_[node_a_index]->ToDistributedClock(
+ std::get<0>(estimator->a_timestamps()[0]));
+ const aos::distributed_clock::time_point a1 =
+ states_[node_a_index]->ToDistributedClock(
+ std::get<0>(estimator->a_timestamps()[1]));
+
+ VLOG(1) << node_a->name()->string_view() << " timestamps()[0] = "
+ << std::get<0>(estimator->a_timestamps()[0]) << " -> " << a0
+ << " distributed -> " << node_b->name()->string_view() << " "
+ << states_[node_b_index]->FromDistributedClock(a0) << " should be "
+ << aos::monotonic_clock::time_point(
+ std::chrono::nanoseconds(static_cast<int64_t>(
+ std::get<0>(estimator->a_timestamps()[0])
+ .time_since_epoch()
+ .count() *
+ (1.0 + estimator->fit().slope()))) +
+ estimator->fit().offset())
+ << ((a0 <= event_loop_factory_->distributed_now())
+ ? ""
+ : " After now, investigate");
+ VLOG(1) << node_a->name()->string_view() << " timestamps()[1] = "
+ << std::get<0>(estimator->a_timestamps()[1]) << " -> " << a1
+ << " distributed -> " << node_b->name()->string_view() << " "
+ << states_[node_b_index]->FromDistributedClock(a1) << " should be "
+ << aos::monotonic_clock::time_point(
+ std::chrono::nanoseconds(static_cast<int64_t>(
+ std::get<0>(estimator->a_timestamps()[1])
+ .time_since_epoch()
+ .count() *
+ (1.0 + estimator->fit().slope()))) +
+ estimator->fit().offset())
+ << ((event_loop_factory_->distributed_now() <= a1)
+ ? ""
+ : " Before now, investigate");
+
+ const aos::distributed_clock::time_point b0 =
+ states_[node_b_index]->ToDistributedClock(
+ std::get<0>(estimator->b_timestamps()[0]));
+ const aos::distributed_clock::time_point b1 =
+ states_[node_b_index]->ToDistributedClock(
+ std::get<0>(estimator->b_timestamps()[1]));
+
+ VLOG(1) << node_b->name()->string_view() << " timestamps()[0] = "
+ << std::get<0>(estimator->b_timestamps()[0]) << " -> " << b0
+ << " distributed -> " << node_a->name()->string_view() << " "
+ << states_[node_a_index]->FromDistributedClock(b0)
+ << ((b0 <= event_loop_factory_->distributed_now())
+ ? ""
+ : " After now, investigate");
+ VLOG(1) << node_b->name()->string_view() << " timestamps()[1] = "
+ << std::get<0>(estimator->b_timestamps()[1]) << " -> " << b1
+ << " distributed -> " << node_a->name()->string_view() << " "
+ << states_[node_a_index]->FromDistributedClock(b1)
+ << ((event_loop_factory_->distributed_now() <= b1)
+ ? ""
+ : " Before now, investigate");
+ }
+}
+
+message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
const Node *node_a, const Node *node_b) {
CHECK_NE(node_a, node_b);
CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
if (node_a > node_b) {
- return std::make_pair(std::get<0>(GetFilter(node_b, node_a)), false);
+ return GetFilter(node_b, node_a);
}
auto tuple = std::make_tuple(node_a, node_b);
@@ -633,53 +1073,27 @@
auto it = filters_.find(tuple);
if (it == filters_.end()) {
- auto &x = filters_
- .insert(std::make_pair(
- tuple, message_bridge::ClippedAverageFilter()))
- .first->second;
+ auto &x =
+ filters_
+ .insert(std::make_pair(
+ tuple, std::make_tuple(message_bridge::NoncausalOffsetEstimator(
+ node_a, node_b))))
+ .first->second;
if (FLAGS_timestamps_to_csv) {
- std::string fwd_name =
- absl::StrCat("/tmp/timestamp_", node_a->name()->string_view(), "_",
- node_b->name()->string_view());
- x.SetFwdCsvFileName(fwd_name);
- std::string rev_name =
- absl::StrCat("/tmp/timestamp_", node_b->name()->string_view(), "_",
- node_a->name()->string_view());
- x.SetRevCsvFileName(rev_name);
+ std::get<0>(x).SetFwdCsvFileName(absl::StrCat(
+ "/tmp/timestamp_noncausal_", node_a->name()->string_view(), "_",
+ node_b->name()->string_view()));
+ std::get<0>(x).SetRevCsvFileName(absl::StrCat(
+ "/tmp/timestamp_noncausal_", node_b->name()->string_view(), "_",
+ node_a->name()->string_view()));
}
- return std::make_tuple(&x, true);
+ return &std::get<0>(x);
} else {
- return std::make_tuple(&(it->second), true);
+ return &std::get<0>(it->second);
}
}
-bool LogReader::State::MaybeUpdateTimestamp(
- const TimestampMerger::DeliveryTimestamp &channel_timestamp,
- int channel_index) {
- if (channel_timestamp.monotonic_remote_time == monotonic_clock::min_time) {
- CHECK(std::get<0>(filters_[channel_index]) == nullptr);
- return false;
- }
-
- // Got a forwarding timestamp!
- CHECK(std::get<0>(filters_[channel_index]) != nullptr);
-
- // Call the correct method depending on if we are the forward or reverse
- // direction here.
- if (std::get<1>(filters_[channel_index])) {
- std::get<0>(filters_[channel_index])
- ->FwdSample(channel_timestamp.monotonic_event_time,
- channel_timestamp.monotonic_event_time -
- channel_timestamp.monotonic_remote_time);
- } else {
- std::get<0>(filters_[channel_index])
- ->RevSample(channel_timestamp.monotonic_event_time,
- channel_timestamp.monotonic_event_time -
- channel_timestamp.monotonic_remote_time);
- }
- return true;
-}
void LogReader::Register(EventLoop *event_loop) {
State *state =
@@ -702,10 +1116,8 @@
const Channel *channel =
RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
- std::tuple<message_bridge::ClippedAverageFilter *, bool> filter =
- std::make_tuple(nullptr, false);
-
NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
+ message_bridge::NoncausalOffsetEstimator *filter = nullptr;
if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
@@ -730,31 +1142,48 @@
}
state->set_timer_handler(event_loop->AddTimer([this, state]() {
+ VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
+ << "at " << state->event_loop()->context().monotonic_event_time
+ << " now " << state->monotonic_now();
if (state->OldestMessageTime() == monotonic_clock::max_time) {
--live_nodes_;
- VLOG(1) << "Node down!";
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
if (live_nodes_ == 0) {
event_loop_factory_->Exit();
}
return;
}
- bool update_offsets = false;
TimestampMerger::DeliveryTimestamp channel_timestamp;
int channel_index;
FlatbufferVector<MessageHeader> channel_data =
FlatbufferVector<MessageHeader>::Empty();
- bool dummy_update_time = false;
+ if (VLOG_IS_ON(1)) {
+ LogFit("Offset was");
+ }
+
+ bool update_time;
std::tie(channel_timestamp, channel_index, channel_data) =
- state->PopOldest(&dummy_update_time);
+ state->PopOldest(&update_time);
const monotonic_clock::time_point monotonic_now =
state->event_loop()->context().monotonic_event_time;
- CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
- << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
- << monotonic_now << " trying to send "
- << channel_timestamp.monotonic_event_time << " failure "
- << state->DebugString();
+ if (!FLAGS_skip_order_validation) {
+ CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
+ << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
+ << monotonic_now << " trying to send "
+ << channel_timestamp.monotonic_event_time << " failure "
+ << state->DebugString();
+ } else if (monotonic_now != channel_timestamp.monotonic_event_time) {
+ LOG(WARNING) << "Check failed: monotonic_now == "
+ "channel_timestamp.monotonic_event_time) ("
+ << monotonic_now << " vs. "
+ << channel_timestamp.monotonic_event_time
+ << "): " << FlatbufferToJson(state->event_loop()->node())
+ << " Now " << monotonic_now << " trying to send "
+ << channel_timestamp.monotonic_event_time << " failure "
+ << state->DebugString();
+ }
if (channel_timestamp.monotonic_event_time >
state->monotonic_start_time() ||
@@ -764,17 +1193,39 @@
channel_data.message().data() != nullptr) {
CHECK(channel_data.message().data() != nullptr)
<< ": Got a message without data. Forwarding entry which was "
- "not matched? Use --skip_missing_forwarding_entries to ignore "
+ "not matched? Use --skip_missing_forwarding_entries to "
+ "ignore "
"this.";
- if (state->MaybeUpdateTimestamp(channel_timestamp, channel_index)) {
+ if (update_time) {
// Confirm that the message was sent on the sending node before the
// destination node (this node). As a proxy, do this by making sure
// that time on the source node is past when the message was sent.
- CHECK_LT(channel_timestamp.monotonic_remote_time,
- state->monotonic_remote_now(channel_index));
-
- update_offsets = true;
+ if (!FLAGS_skip_order_validation) {
+ CHECK_LT(channel_timestamp.monotonic_remote_time,
+ state->monotonic_remote_now(channel_index))
+ << state->event_loop()->node()->name()->string_view() << " to "
+ << state->remote_node(channel_index)->name()->string_view()
+ << " " << state->DebugString();
+ } else if (channel_timestamp.monotonic_remote_time >=
+ state->monotonic_remote_now(channel_index)) {
+ LOG(WARNING)
+ << "Check failed: channel_timestamp.monotonic_remote_time < "
+ "state->monotonic_remote_now(channel_index) ("
+ << channel_timestamp.monotonic_remote_time << " vs. "
+ << state->monotonic_remote_now(channel_index) << ") "
+ << state->event_loop()->node()->name()->string_view() << " to "
+ << state->remote_node(channel_index)->name()->string_view()
+ << " currently " << channel_timestamp.monotonic_event_time
+ << " ("
+ << state->ToDistributedClock(
+ channel_timestamp.monotonic_event_time)
+ << ") remote event time "
+ << channel_timestamp.monotonic_remote_time << " ("
+ << state->RemoteToDistributedClock(
+ channel_index, channel_timestamp.monotonic_remote_time)
+ << ") " << state->DebugString();
+ }
if (FLAGS_timestamps_to_csv) {
if (offset_fp_ == nullptr) {
@@ -789,13 +1240,14 @@
std::chrono::duration_cast<std::chrono::duration<double>>(
channel_timestamp.realtime_event_time - first_time_)
.count());
- for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
- fprintf(
- offset_fp_, ", %.9f",
- offset_matrix_(i, 0) +
- std::chrono::duration_cast<std::chrono::duration<double>>(
- base_offset_matrix_(i, 0))
- .count());
+ for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
+ fprintf(offset_fp_, ", %.9f",
+ time_offset_matrix_(i, 0) +
+ time_slope_matrix_(i, 0) *
+ chrono::duration<double>(
+ event_loop_factory_->distributed_now()
+ .time_since_epoch())
+ .count());
}
fprintf(offset_fp_, "\n");
}
@@ -805,21 +1257,26 @@
state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
channel_timestamp.realtime_event_time);
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
+ << channel_timestamp.monotonic_event_time;
+ // TODO(austin): std::move channel_data in and make that efficient in
+ // simulation.
state->Send(channel_index, channel_data.message().data()->Data(),
channel_data.message().data()->size(),
channel_timestamp.monotonic_remote_time,
channel_timestamp.realtime_remote_time,
channel_timestamp.remote_queue_index);
- } else if (state->at_end()) {
+ } else if (state->at_end() && !ignore_missing_data_) {
// We are at the end of the log file and found missing data. Finish
- // reading the rest of the log file and call it quits. We don't want to
- // replay partial data.
+ // reading the rest of the log file and call it quits. We don't want
+ // to replay partial data.
while (state->OldestMessageTime() != monotonic_clock::max_time) {
bool update_time_dummy;
state->PopOldest(&update_time_dummy);
}
+ } else {
+ CHECK(channel_data.message().data() == nullptr) << ": Nullptr";
}
-
} else {
LOG(WARNING)
<< "Not sending data from before the start of the log file. "
@@ -830,22 +1287,100 @@
const monotonic_clock::time_point next_time = state->OldestMessageTime();
if (next_time != monotonic_clock::max_time) {
+ VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+ << "wakeup for " << next_time << "("
+ << state->ToDistributedClock(next_time)
+ << " distributed), now is " << state->monotonic_now();
state->Setup(next_time);
} else {
- // Set a timer up immediately after now to die. If we don't do this, then
- // the senders waiting on the message we just read will never get called.
+ VLOG(1) << MaybeNodeName(state->event_loop()->node())
+ << "No next message, scheduling shutdown";
+ // Set a timer up immediately after now to die. If we don't do this,
+ // then the senders waiting on the message we just read will never get
+ // called.
if (event_loop_factory_ != nullptr) {
state->Setup(monotonic_now + event_loop_factory_->send_delay() +
std::chrono::nanoseconds(1));
}
}
- // Once we make this call, the current time changes. So do everything which
- // involves time before changing it. That especially includes sending the
- // message.
- if (update_offsets) {
+ // Once we make this call, the current time changes. So do everything
+ // which involves time before changing it. That especially includes
+ // sending the message.
+ if (update_time) {
+ VLOG(1) << MaybeNodeName(state->event_loop()->node())
+ << "updating offsets";
+
+ std::vector<aos::monotonic_clock::time_point> before_times;
+ before_times.resize(states_.size());
+ std::transform(states_.begin(), states_.end(), before_times.begin(),
+ [](const std::unique_ptr<State> &state) {
+ return state->monotonic_now();
+ });
+
+ for (size_t i = 0; i < states_.size(); ++i) {
+ VLOG(1) << MaybeNodeName(
+ states_[i]->event_loop()->node())
+ << "before " << states_[i]->monotonic_now();
+ }
+
UpdateOffsets();
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Now is now "
+ << state->monotonic_now();
+
+ for (size_t i = 0; i < states_.size(); ++i) {
+ VLOG(1) << MaybeNodeName(
+ states_[i]->event_loop()->node())
+ << "after " << states_[i]->monotonic_now();
+ }
+
+ // TODO(austin): We should be perfect.
+ const std::chrono::nanoseconds kTolerance{3};
+ if (!FLAGS_skip_order_validation) {
+ CHECK_GE(next_time, state->monotonic_now())
+ << ": Time skipped the next event.";
+
+ for (size_t i = 0; i < states_.size(); ++i) {
+ CHECK_GE(states_[i]->monotonic_now(), before_times[i] - kTolerance)
+ << ": Time changed too much on node "
+ << MaybeNodeName(states_[i]->event_loop()->node());
+ CHECK_LE(states_[i]->monotonic_now(), before_times[i] + kTolerance)
+ << ": Time changed too much on node "
+ << states_[i]->event_loop()->node()->name()->string_view();
+ }
+ } else {
+ if (next_time < state->monotonic_now()) {
+ LOG(WARNING) << "Check failed: next_time >= "
+ "state->monotonic_now() ("
+ << next_time << " vs. " << state->monotonic_now()
+ << "): Time skipped the next event.";
+ }
+ for (size_t i = 0; i < states_.size(); ++i) {
+ if (states_[i]->monotonic_now() >= before_times[i] - kTolerance) {
+ LOG(WARNING) << "Check failed: "
+ "states_[i]->monotonic_now() "
+ ">= before_times[i] - kTolerance ("
+ << states_[i]->monotonic_now() << " vs. "
+ << before_times[i] - kTolerance
+ << ") : Time changed too much on node "
+ << MaybeNodeName(states_[i]->event_loop()->node());
+ }
+ if (states_[i]->monotonic_now() <= before_times[i] + kTolerance) {
+ LOG(WARNING) << "Check failed: "
+ "states_[i]->monotonic_now() "
+ "<= before_times[i] + kTolerance ("
+ << states_[i]->monotonic_now() << " vs. "
+ << before_times[i] - kTolerance
+ << ") : Time changed too much on node "
+ << MaybeNodeName(states_[i]->event_loop()->node());
+ }
+ }
+ }
}
+
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Done sending at "
+ << state->event_loop()->context().monotonic_event_time << " now "
+ << state->monotonic_now();
}));
++live_nodes_;
@@ -942,8 +1477,8 @@
new_name_builder.add_name(name_offset);
new_name_fbb.Finish(new_name_builder.Finish());
const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
- // Retrieve the channel that we want to copy, confirming that it is actually
- // present in base_config.
+ // Retrieve the channel that we want to copy, confirming that it is
+ // actually present in base_config.
const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
base_config, logged_configuration()->channels()->Get(pair.first), "",
nullptr));
@@ -1020,7 +1555,7 @@
void LogReader::State::SetChannel(
size_t channel, std::unique_ptr<RawSender> sender,
- std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
+ message_bridge::NoncausalOffsetEstimator *filter,
NodeEventLoopFactory *channel_target_event_loop_factory) {
channels_[channel] = std::move(sender);
filters_[channel] = filter;
@@ -1034,21 +1569,27 @@
CHECK_GT(sorted_messages_.size(), 0u);
std::tuple<TimestampMerger::DeliveryTimestamp, int,
- FlatbufferVector<MessageHeader>>
+ FlatbufferVector<MessageHeader>,
+ message_bridge::NoncausalOffsetEstimator *>
result = std::move(sorted_messages_.front());
- VLOG(1) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
+ VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
<< std::get<0>(result).monotonic_event_time;
sorted_messages_.pop_front();
SeedSortedMessages();
- *update_time = false;
+ if (std::get<3>(result) != nullptr) {
+ *update_time = std::get<3>(result)->Pop(
+ event_loop_->node(), std::get<0>(result).monotonic_event_time);
+ } else {
+ *update_time = false;
+ }
return std::make_tuple(std::get<0>(result), std::get<1>(result),
std::move(std::get<2>(result)));
}
monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
if (sorted_messages_.size() > 0) {
- VLOG(1) << MaybeNodeName(event_loop_->node()) << "oldest message at "
+ VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
<< std::get<0>(sorted_messages_.front()).monotonic_event_time;
return std::get<0>(sorted_messages_.front()).monotonic_event_time;
}
@@ -1081,11 +1622,26 @@
FlatbufferVector<MessageHeader> channel_data =
FlatbufferVector<MessageHeader>::Empty();
+ message_bridge::NoncausalOffsetEstimator *filter = nullptr;
+
std::tie(channel_timestamp, channel_index, channel_data) =
channel_merger_->PopOldest();
+ // Skip any messages without forwarding information.
+ if (channel_timestamp.monotonic_remote_time != monotonic_clock::min_time) {
+ // Got a forwarding timestamp!
+ filter = filters_[channel_index];
+
+ CHECK(filter != nullptr);
+
+ // Call the correct method depending on if we are the forward or
+ // reverse direction here.
+ filter->Sample(event_loop_->node(),
+ channel_timestamp.monotonic_event_time,
+ channel_timestamp.monotonic_remote_time);
+ }
sorted_messages_.emplace_back(channel_timestamp, channel_index,
- std::move(channel_data));
+ std::move(channel_data), filter);
}
}
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index f1af17e..2905dfa 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -15,9 +15,11 @@
table LogFileHeader {
// Time this log file started on the monotonic clock in nanoseconds.
- monotonic_start_time:long;
+ // If this isn't known (the log file is being recorded from another node
+ // where we don't know the time offset), both timestamps will be min_time.
+ monotonic_start_time:int64 = -9223372036854775808;
// Time this log file started on the realtime clock in nanoseconds.
- realtime_start_time:long;
+ realtime_start_time:int64 = -9223372036854775808;
// Messages are not written in order to disk. They will be out of order by
// at most this duration (in nanoseconds). If the log reader buffers until
@@ -33,6 +35,36 @@
// The current node, if known and running in a multi-node configuration.
node:Node;
+
+ // All UUIDs are uuid4.
+
+ // A log file is made up of a bunch of log files and parts. These build up
+ // a tree. Every .bfbs file has a LogFileHeader at the start.
+ //
+ // /-- basename_pi1_data.part0.bfbs, basename_pi1_data.part1.bfbs, etc.
+ // ---- basename_timestamps/pi1/aos/remote_timestamps/pi2/aos.logger.MessageHeader.part0.bfbs, etc.
+ // \-- basename_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs, etc.
+
+ // All log files and parts from a single run of a logger executable will have
+ // the same uuid. This should be all the files generated on a single node.
+ // Used to correlate files recorded together.
+ logger_uuid:string;
+
+ // Part files which go together all have headers. When creating a log file
+ // with multiple parts, the logger should stop writing to part n-1 as soon
+ // as it starts writing to part n, and write messages as though there was
+ // just 1 big file. Therefore, part files won't be self standing, since
+ // they won't have data fetched at the beginning.
+
+ // If data is logged before the time offset can be established to the other
+ // node, the start time will be monotonic_clock::min_time, and a new part file
+ // will be created when the start time is known.
+
+ // All the parts which go together have the same uuid.
+ parts_uuid:string;
+ // And the parts_index corresponds to which part this is in the sequence. The
+ // index should start at 0.
+ parts_index:int32;
}
// Table holding a message.
@@ -58,10 +90,12 @@
// Time this message was sent on the monotonic clock of the remote node in
// nanoseconds.
- monotonic_remote_time:long = -9223372036854775808;
+ monotonic_remote_time:int64 = -9223372036854775808;
// Time this message was sent on the realtime clock of the remote node in
// nanoseconds.
- realtime_remote_time:long = -9223372036854775808;
+ realtime_remote_time:int64 = -9223372036854775808;
// Queue index of this message on the remote node.
- remote_queue_index:uint = 4294967295;
+ remote_queue_index:uint32 = 4294967295;
}
+
+root_type MessageHeader;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index a123dcc..32c2022 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -4,207 +4,94 @@
#include <chrono>
#include <deque>
#include <string_view>
+#include <tuple>
#include <vector>
#include "Eigen/Dense"
#include "absl/strings/str_cat.h"
#include "absl/types/span.h"
#include "aos/events/event_loop.h"
+#include "aos/events/logging/eigen_mpq.h"
+#include "aos/events/logging/log_namer.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
#include "aos/events/simulated_event_loop.h"
+#include "aos/network/message_bridge_server_generated.h"
#include "aos/network/timestamp_filter.h"
#include "aos/time/time.h"
#include "flatbuffers/flatbuffers.h"
+#include "third_party/gmp/gmpxx.h"
namespace aos {
namespace logger {
-class LogNamer {
- public:
- LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
- virtual ~LogNamer() {}
-
- virtual void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
- const Node *node) = 0;
- virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
-
- virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
- const std::vector<const Node *> &nodes() const { return nodes_; }
-
- const Node *node() const { return node_; }
-
- protected:
- const Node *const node_;
- std::vector<const Node *> nodes_;
-};
-
-class LocalLogNamer : public LogNamer {
- public:
- LocalLogNamer(DetachedBufferWriter *writer, const Node *node)
- : LogNamer(node), writer_(writer) {}
-
- ~LocalLogNamer() override { writer_->Flush(); }
-
- void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
- const Node *node) override {
- CHECK_EQ(node, this->node());
- writer_->WriteSizedFlatbuffer(
- absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
- }
-
- DetachedBufferWriter *MakeWriter(const Channel *channel) override {
- CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
- return writer_;
- }
-
- DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
- CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
- << ": Message is not delivered to this node.";
- CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
- CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
- node_))
- << ": Delivery times aren't logged for this channel on this node.";
- return writer_;
- }
-
- private:
- DetachedBufferWriter *writer_;
-};
-
-// TODO(austin): Split naming files from making files so we can re-use the
-// naming code to predict the log file names for a provided base name.
-class MultiNodeLogNamer : public LogNamer {
- public:
- MultiNodeLogNamer(std::string_view base_name,
- const Configuration *configuration, const Node *node)
- : LogNamer(node),
- base_name_(base_name),
- configuration_(configuration),
- data_writer_(std::make_unique<DetachedBufferWriter>(absl::StrCat(
- base_name_, "_", node->name()->string_view(), "_data.bfbs"))) {}
-
- // Writes the header to all log files for a specific node. This function
- // needs to be called after all the writers are created.
- void WriteHeader(flatbuffers::FlatBufferBuilder *fbb, const Node *node) {
- if (node == this->node()) {
- data_writer_->WriteSizedFlatbuffer(
- absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
- } else {
- for (std::pair<const Channel *const,
- std::unique_ptr<DetachedBufferWriter>> &data_writer :
- data_writers_) {
- if (configuration::ChannelIsSendableOnNode(data_writer.first, node)) {
- data_writer.second->WriteSizedFlatbuffer(absl::Span<const uint8_t>(
- fbb->GetBufferPointer(), fbb->GetSize()));
- }
- }
- }
- }
-
- // Makes a data logger for a specific channel.
- DetachedBufferWriter *MakeWriter(const Channel *channel) {
- // See if we can read the data on this node at all.
- const bool is_readable =
- configuration::ChannelIsReadableOnNode(channel, this->node());
- if (!is_readable) {
- return nullptr;
- }
-
- // Then, see if we are supposed to log the data here.
- const bool log_message =
- configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
-
- if (!log_message) {
- return nullptr;
- }
-
- // Now, sort out if this is data generated on this node, or not. It is
- // generated if it is sendable on this node.
- if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
- return data_writer_.get();
- } else {
- // Ok, we have data that is being forwarded to us that we are supposed to
- // log. It needs to be logged with send timestamps, but be sorted enough
- // to be able to be processed.
- CHECK(data_writers_.find(channel) == data_writers_.end());
-
- // Track that this node is being logged.
- if (configuration::MultiNode(configuration_)) {
- const Node *source_node = configuration::GetNode(
- configuration_, channel->source_node()->string_view());
- if (std::find(nodes_.begin(), nodes_.end(), source_node) ==
- nodes_.end()) {
- nodes_.emplace_back(source_node);
- }
- }
-
- return data_writers_
- .insert(std::make_pair(
- channel,
- std::make_unique<DetachedBufferWriter>(absl::StrCat(
- base_name_, "_", channel->source_node()->string_view(),
- "_data", channel->name()->string_view(), "/",
- channel->type()->string_view(), ".bfbs"))))
- .first->second.get();
- }
- }
-
- // Makes a timestamp (or timestamp and data) logger for a channel and
- // forwarding connection.
- DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) {
- const bool log_delivery_times =
- (this->node() == nullptr)
- ? false
- : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
- channel, this->node(), this->node());
- if (!log_delivery_times) {
- return nullptr;
- }
-
- return data_writer_.get();
- }
-
- const std::vector<const Node *> &nodes() const { return nodes_; }
-
- private:
- const std::string base_name_;
- const Configuration *const configuration_;
-
- // File to write both delivery timestamps and local data to.
- std::unique_ptr<DetachedBufferWriter> data_writer_;
- // Files to write remote data to. We want one per channel.
- std::map<const Channel *, std::unique_ptr<DetachedBufferWriter>>
- data_writers_;
-};
-
-
// Logs all channels available in the event loop to disk every 100 ms.
// Start by logging one message per channel to capture any state and
// configuration that is sent rately on a channel and would affect execution.
class Logger {
public:
- Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
+ // Constructs a logger.
+ // base_name/log_namer: Object used to write data to disk in one or more log
+ // files. If a base_name is passed in, a LocalLogNamer is wrapped
+ // around it.
+ // event_loop: The event loop used to read the messages.
+ // polling_period: The period used to poll the data.
+ // configuration: When provided, this is the configuration to log, and the
+ // configuration to use for the channel list to log. If not provided,
+ // this becomes the configuration from the event loop.
+ Logger(std::string_view base_name, EventLoop *event_loop,
+ std::chrono::milliseconds polling_period =
+ std::chrono::milliseconds(100));
+ Logger(std::string_view base_name, EventLoop *event_loop,
+ const Configuration *configuration,
std::chrono::milliseconds polling_period =
std::chrono::milliseconds(100));
Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
std::chrono::milliseconds polling_period =
std::chrono::milliseconds(100));
+ Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
+ const Configuration *configuration,
+ std::chrono::milliseconds polling_period =
+ std::chrono::milliseconds(100));
+ ~Logger();
- // Rotates the log file with the new writer. This writes out the header
- // again, but keeps going as if nothing else happened.
- void Rotate(DetachedBufferWriter *writer);
- void Rotate(std::unique_ptr<LogNamer> log_namer);
+ // Overrides the name in the log file header.
+ void set_name(std::string_view name) { name_ = name; }
+
+ // Rotates the log file(s), triggering new part files to be written for each
+ // log file.
+ void Rotate();
private:
void WriteHeader();
- void WriteHeader(const Node *node);
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+ const Node *node);
+
+ bool MaybeUpdateTimestamp(
+ const Node *node, int node_index,
+ aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time);
void DoLogData();
+ void WriteMissingTimestamps();
+
+ void StartLogging();
+
+ // Fetches from each channel until all the data is logged.
+ void LogUntil(monotonic_clock::time_point t);
+
EventLoop *event_loop_;
+ const UUID uuid_;
std::unique_ptr<LogNamer> log_namer_;
+ // The configuration to place at the top of the log file.
+ const Configuration *configuration_;
+
+ // Name to save in the log file. Defaults to hostname.
+ std::string name_;
+
// Structure to track both a fetcher, and if the data fetched has been
// written. We may want to delay writing data to disk so that we don't let
// data get too far out of order when written to disk so we can avoid making
@@ -219,6 +106,10 @@
DetachedBufferWriter *writer = nullptr;
DetachedBufferWriter *timestamp_writer = nullptr;
+ DetachedBufferWriter *contents_writer = nullptr;
+ const Node *writer_node = nullptr;
+ const Node *timestamp_node = nullptr;
+ int node_index = 0;
};
std::vector<FetcherStruct> fetchers_;
@@ -236,8 +127,31 @@
// Max size that the header has consumed. This much extra data will be
// reserved in the builder to avoid reallocating.
size_t max_header_size_ = 0;
+
+ // Fetcher for all the statistics from all the nodes.
+ aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
+
+ // Sets the start time for a specific node.
+ void SetStartTime(size_t node_index,
+ aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time);
+
+ struct NodeState {
+ aos::monotonic_clock::time_point monotonic_start_time =
+ aos::monotonic_clock::min_time;
+ aos::realtime_clock::time_point realtime_start_time =
+ aos::realtime_clock::min_time;
+
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
+ };
+ std::vector<NodeState> node_state_;
};
+// Takes a bunch of parts and sorts them based on part_uuid and part_index.
+std::vector<std::vector<std::string>> SortParts(
+ const std::vector<std::string> &parts);
+
// We end up with one of the following 3 log file types.
//
// Single node logged as the source node.
@@ -313,7 +227,11 @@
// gets called.
void Deregister();
- // Returns the configuration from the log file.
+ // Returns the configuration being used for replay from the log file.
+ // Note that this may be different from the configuration actually used for
+ // handling events. You should generally only use this to create a
+ // SimulatedEventLoopFactory, and then get the configuration from there for
+ // everything else.
const Configuration *logged_configuration() const;
// Returns the configuration being used for replay.
// The pointer is invalidated whenever RemapLoggedChannel is called.
@@ -355,6 +273,10 @@
return &log_file_header_.message();
}
+ std::string_view name() const {
+ return log_file_header()->name()->string_view();
+ }
+
private:
const Channel *RemapChannel(const EventLoop *event_loop,
const Channel *channel);
@@ -365,13 +287,25 @@
// channels from calls to RemapLoggedChannel.
void MakeRemappedConfig();
+ // Returns the number of nodes.
+ size_t nodes_count() const {
+ return !configuration::MultiNode(logged_configuration())
+ ? 1u
+ : logged_configuration()->nodes()->size();
+ }
+
const std::vector<std::vector<std::string>> filenames_;
// This is *a* log file header used to provide the logged config. The rest of
// the header is likely distracting.
FlatbufferVector<LogFileHeader> log_file_header_;
- Eigen::Matrix<double, Eigen::Dynamic, 1> SolveOffsets();
+ // Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
+ std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+ SolveOffsets();
+
+ void LogFit(std::string_view prefix);
// State per node.
class State {
@@ -392,13 +326,6 @@
// OldestMessageTime.
void SeedSortedMessages();
- // Updates the timestamp filter with the timestamp. Returns true if the
- // provided timestamp was actually a forwarding timestamp and used, and
- // false otherwise.
- bool MaybeUpdateTimestamp(
- const TimestampMerger::DeliveryTimestamp &channel_timestamp,
- int channel_index);
-
// Returns the starting time for this node.
monotonic_clock::time_point monotonic_start_time() const {
return channel_merger_->monotonic_start_time();
@@ -416,13 +343,6 @@
void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
EventLoop *event_loop() { return event_loop_; }
- // Returns the oldest timestamp for the provided channel. This should only
- // be called before SeedSortedMessages();
- TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
- size_t channel) {
- return channel_merger_->OldestTimestampForChannel(channel);
- }
-
// Sets the current realtime offset from the monotonic clock for this node
// (if we are on a simulated event loop).
void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
@@ -440,6 +360,11 @@
return node_event_loop_factory_->ToDistributedClock(time);
}
+ monotonic_clock::time_point FromDistributedClock(
+ distributed_clock::time_point time) {
+ return node_event_loop_factory_->FromDistributedClock(time);
+ }
+
// Sets the offset (and slope) from the distributed clock.
void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
double distributed_slope) {
@@ -453,6 +378,20 @@
return channel_target_event_loop_factory_[channel_index]->monotonic_now();
}
+ distributed_clock::time_point RemoteToDistributedClock(
+ size_t channel_index, monotonic_clock::time_point time) {
+ return channel_target_event_loop_factory_[channel_index]
+ ->ToDistributedClock(time);
+ }
+
+ const Node *remote_node(size_t channel_index) {
+ return channel_target_event_loop_factory_[channel_index]->node();
+ }
+
+ monotonic_clock::time_point monotonic_now() {
+ return node_event_loop_factory_->monotonic_now();
+ }
+
// Sets the node we will be merging as, and returns true if there is any
// data on it.
bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
@@ -461,10 +400,9 @@
void SetChannelCount(size_t count);
// Sets the sender, filter, and target factory for a channel.
- void SetChannel(
- size_t channel, std::unique_ptr<RawSender> sender,
- std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
- NodeEventLoopFactory *channel_target_event_loop_factory);
+ void SetChannel(size_t channel, std::unique_ptr<RawSender> sender,
+ message_bridge::NoncausalOffsetEstimator *filter,
+ NodeEventLoopFactory *channel_target_event_loop_factory);
// Returns if we have read all the messages from all the logs.
bool at_end() const { return channel_merger_->at_end(); }
@@ -493,14 +431,32 @@
}
// Returns a debug string for the channel merger.
- std::string DebugString() const { return channel_merger_->DebugString(); }
+ std::string DebugString() const {
+ std::stringstream messages;
+ size_t i = 0;
+ for (const auto &message : sorted_messages_) {
+ if (i < 7 || i + 7 > sorted_messages_.size()) {
+ messages << "sorted_messages[" << i
+ << "]: " << std::get<0>(message).monotonic_event_time << " "
+ << configuration::StrippedChannelToString(
+ event_loop_->configuration()->channels()->Get(
+ std::get<2>(message).message().channel_index()))
+ << "\n";
+ } else if (i == 7) {
+ messages << "...\n";
+ }
+ ++i;
+ }
+ return messages.str() + channel_merger_->DebugString();
+ }
private:
// Log file.
std::unique_ptr<ChannelMerger> channel_merger_;
std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
- FlatbufferVector<MessageHeader>>>
+ FlatbufferVector<MessageHeader>,
+ message_bridge::NoncausalOffsetEstimator *>>
sorted_messages_;
// Senders.
@@ -518,8 +474,7 @@
// This corresponds to the object which is shared among all the channels
// going between 2 nodes. The second element in the tuple indicates if this
// is the primary direction or not.
- std::vector<std::tuple<message_bridge::ClippedAverageFilter *, bool>>
- filters_;
+ std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
// List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
// channel) which correspond to the originating node.
@@ -532,8 +487,8 @@
// Creates the requested filter if it doesn't exist, regardless of whether
// these nodes can actually communicate directly. The second return value
// reports if this is the primary direction or not.
- std::tuple<message_bridge::ClippedAverageFilter *, bool> GetFilter(
- const Node *node_a, const Node *node_b);
+ message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
+ const Node *node_b);
// FILE to write offsets to (if populated).
FILE *offset_fp_ = nullptr;
@@ -544,32 +499,79 @@
// List of filters for a connection. The pointer to the first node will be
// less than the second node.
std::map<std::tuple<const Node *, const Node *>,
- message_bridge::ClippedAverageFilter>
+ std::tuple<message_bridge::NoncausalOffsetEstimator>>
filters_;
// Returns the offset from the monotonic clock for a node to the distributed
- // clock. distributed = monotonic + offset;
- std::chrono::nanoseconds offset(int node_index) const {
- CHECK_LT(node_index, offset_matrix_.rows())
+ // clock. monotonic = distributed * slope() + offset();
+ double slope(int node_index) const {
+ CHECK_LT(node_index, time_slope_matrix_.rows())
<< ": Got too high of a node index.";
- return -std::chrono::duration_cast<std::chrono::nanoseconds>(
- std::chrono::duration<double>(offset_matrix_(node_index))) -
- base_offset_matrix_(node_index);
+ return time_slope_matrix_(node_index);
+ }
+ std::chrono::nanoseconds offset(int node_index) const {
+ CHECK_LT(node_index, time_offset_matrix_.rows())
+ << ": Got too high of a node index.";
+ return std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(time_offset_matrix_(node_index)));
}
// Updates the offset matrix solution and sets the per-node distributed
// offsets in the factory.
void UpdateOffsets();
- // sample_matrix_ = map_matrix_ * offset_matrix_
- Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
- Eigen::Matrix<double, Eigen::Dynamic, 1> sample_matrix_;
- Eigen::Matrix<double, Eigen::Dynamic, 1> offset_matrix_;
+ // We have 2 types of equations to do a least squares regression over to fully
+ // constrain our time function.
+ //
+ // One is simple. The distributed clock is the average of all the clocks.
+ // (ta + tb + tc + td) / num_nodex = t_distributed
+ //
+ // The second is a bit more complicated. Our basic time conversion function
+ // is:
+ // tb = ta + (ta * slope + offset)
+ // We can rewrite this as follows
+ // tb - (1 + slope) * ta = offset
+ //
+ // From here, we have enough equations to solve for t{a,b,c,...} We want to
+ // take as an input the offsets and slope, and solve for the per-node times as
+ // a function of the distributed clock.
+ //
+ // We need to massage our equations to make this work. If we solve for the
+ // per-node times at two set distributed clock times, we will be able to
+ // recreate the linear function (we know it is linear). We can do a similar
+ // thing by breaking our equation up into:
+ //
+ // [1/3 1/3 1/3 ] [ta] [t_distributed]
+ // [ 1 -1-m1 0 ] [tb] = [oab]
+ // [ 1 0 -1-m2 ] [tc] [oac]
+ //
+ // This solves to:
+ //
+ // [ta] [ a00 a01 a02] [t_distributed]
+ // [tb] = [ a10 a11 a12] * [oab]
+ // [tc] [ a20 a21 a22] [oac]
+ //
+ // and can be split into:
+ //
+ // [ta] [ a00 ] [a01 a02]
+ // [tb] = [ a10 ] * t_distributed + [a11 a12] * [oab]
+ // [tc] [ a20 ] [a21 a22] [oac]
+ //
+ // (map_matrix_ + slope_matrix_) * [ta; tb; tc] = [offset_matrix_];
+ // offset_matrix_ will be in nanoseconds.
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> slope_matrix_;
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> offset_matrix_;
+ // Matrix tracking which offsets are valid.
+ Eigen::Matrix<bool, Eigen::Dynamic, 1> valid_matrix_;
+ // Matrix tracking the last valid matrix we used to determine connected nodes.
+ Eigen::Matrix<bool, Eigen::Dynamic, 1> last_valid_matrix_;
+ size_t cached_valid_node_count_ = 0;
- // Base offsets. The actual offset is the sum of this and the offset matrix.
- // This removes some of the dynamic range challenges from the double above.
- Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>
- base_offset_matrix_;
+ // [ta; tb; tc] = time_slope_matrix_ * t + time_offset_matrix;
+ // t is in seconds.
+ Eigen::Matrix<double, Eigen::Dynamic, 1> time_slope_matrix_;
+ Eigen::Matrix<double, Eigen::Dynamic, 1> time_offset_matrix_;
std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
remapped_configuration_buffer_;
diff --git a/aos/events/logging/logger_main.cc b/aos/events/logging/logger_main.cc
index 2575e85..f95fa17 100644
--- a/aos/events/logging/logger_main.cc
+++ b/aos/events/logging/logger_main.cc
@@ -25,11 +25,8 @@
std::unique_ptr<aos::logger::DetachedBufferWriter> writer;
std::unique_ptr<aos::logger::LogNamer> log_namer;
if (event_loop.node() == nullptr) {
- writer = std::make_unique<aos::logger::DetachedBufferWriter>(
- aos::logging::GetLogName("fbs_log"));
-
- log_namer = std::make_unique<aos::logger::LocalLogNamer>(writer.get(),
- event_loop.node());
+ log_namer = std::make_unique<aos::logger::LocalLogNamer>(
+ aos::logging::GetLogName("fbs_log"), event_loop.node());
} else {
log_namer = std::make_unique<aos::logger::MultiNodeLogNamer>(
aos::logging::GetLogName("fbs_log"), event_loop.configuration(),
@@ -41,5 +38,7 @@
event_loop.Run();
+ LOG(INFO) << "Shutting down";
+
return 0;
}
diff --git a/aos/events/logging/logger_math.cc b/aos/events/logging/logger_math.cc
index 313894b..c333f55 100644
--- a/aos/events/logging/logger_math.cc
+++ b/aos/events/logging/logger_math.cc
@@ -2,14 +2,187 @@
#include "Eigen/Dense"
+#include "third_party/gmp/gmpxx.h"
+
namespace aos {
namespace logger {
+namespace {
+Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> ToDouble(
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> in) {
+ Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> result =
+ Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>::Zero(in.rows(),
+ in.cols());
+ for (int i = 0; i < in.rows(); ++i) {
+ for (int j = 0; j < in.cols(); ++j) {
+ result(i, j) = in(i, j).get_d();
+ }
+ }
+ return result;
+}
+
+std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+Solve(const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> &mpq_map,
+ const Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> &mpq_offsets) {
+ aos::monotonic_clock::time_point start_time = aos::monotonic_clock::now();
+ // Least squares solve for the slopes and offsets.
+ const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> inv =
+ (mpq_map.transpose() * mpq_map).inverse() * mpq_map.transpose();
+ aos::monotonic_clock::time_point end_time = aos::monotonic_clock::now();
+
+ VLOG(1) << "Took "
+ << std::chrono::duration<double>(end_time - start_time).count()
+ << " seconds to invert";
+
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_solution_slope =
+ inv.block(0, 0, inv.rows(), 1);
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_solution_offset =
+ inv.block(0, 1, inv.rows(), inv.cols() - 1) *
+ mpq_offsets.block(1, 0, inv.rows() - 1, 1);
+
+ mpq_solution_offset *= mpq_class(1, 1000000000);
+
+ return std::make_tuple(ToDouble(mpq_solution_slope),
+ ToDouble(mpq_solution_offset));
+}
+} // namespace
+
// This is slow to compile, so we put it in a separate file. More parallelism
// and less change.
-Eigen::Matrix<double, Eigen::Dynamic, 1> LogReader::SolveOffsets() {
- return map_matrix_.bdcSvd(Eigen::ComputeThinU | Eigen::ComputeThinV)
- .solve(sample_matrix_);
+std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+LogReader::SolveOffsets() {
+ // TODO(austin): Split this out and unit tests a bit better. When we do
+ // partial node subsets and also try to optimize this again would be a good
+ // time.
+ //
+ // TODO(austin): CHECK that the number doesn't change over time. We can freak
+ // out if that happens.
+
+ // Start by counting how many node pairs we have an offset estimated for.
+ int nonzero_offset_count = 1;
+ for (int i = 1; i < valid_matrix_.rows(); ++i) {
+ if (valid_matrix_(i) != 0) {
+ ++nonzero_offset_count;
+ }
+ }
+
+ Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
+ "]");
+
+ // If there are missing rows, we can't solve the original problem and instead
+ // need to filter the matrix to remove the missing rows and solve a simplified
+ // problem. What this means practically is that we might have pairs of nodes
+ // which are communicating, but we don't have timestamps between. But we can
+ // have multiple paths in our graph between 2 nodes, so we can still solve
+ // time without the missing timestamp.
+ //
+ // In the following example, we can drop any of the last 3 rows, and still
+ // solve.
+ //
+ // [1/3 1/3 1/3 ] [ta] [t_distributed]
+ // [ 1 -1-m1 0 ] [tb] = [oab]
+ // [ 1 0 -1-m2 ] [tc] [oac]
+ // [ 0 1 -1-m2 ] [obc]
+ if (nonzero_offset_count != offset_matrix_.rows()) {
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ nonzero_offset_count, map_matrix_.cols());
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_offsets =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(nonzero_offset_count);
+
+ std::vector<bool> valid_nodes(nodes_count(), false);
+
+ size_t destination_row = 0;
+ for (int j = 0; j < map_matrix_.cols(); ++j) {
+ mpq_map(0, j) = mpq_class(1, map_matrix_.cols());
+ }
+ mpq_offsets(0) = mpq_class(0);
+ ++destination_row;
+
+ for (int i = 1; i < offset_matrix_.rows(); ++i) {
+ // Copy over the first row, i.e. the row which says that all times average
+ // to the distributed time. And then copy over all valid rows.
+ if (valid_matrix_(i)) {
+ mpq_offsets(destination_row) = mpq_class(offset_matrix_(i));
+
+ for (int j = 0; j < map_matrix_.cols(); ++j) {
+ mpq_map(destination_row, j) = map_matrix_(i, j) + slope_matrix_(i, j);
+ if (mpq_map(destination_row, j) != 0) {
+ valid_nodes[j] = true;
+ }
+ }
+
+ ++destination_row;
+ }
+ }
+
+ VLOG(1) << "Filtered map " << ToDouble(mpq_map).format(HeavyFmt);
+ VLOG(1) << "Filtered offsets " << ToDouble(mpq_offsets).format(HeavyFmt);
+
+ // Compute (and cache) the current connectivity. If we have N nodes
+ // configured, but logs only from one of them, we want to assume that the
+ // rest of the nodes match the distributed clock exactly.
+ //
+ // If data shows up later for them, we will CHECK when time jumps.
+ //
+ // TODO(austin): Once we have more info on what cases are reasonable, we can
+ // open up the restrictions.
+ if (valid_matrix_ != last_valid_matrix_) {
+ Eigen::FullPivLU<Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>
+ full_piv(mpq_map);
+ const size_t connected_nodes = full_piv.rank();
+
+ size_t valid_node_count = 0;
+ for (size_t i = 0; i < valid_nodes.size(); ++i) {
+ const bool valid_node = valid_nodes[i];
+ if (valid_node) {
+ ++valid_node_count;
+ } else {
+ LOG(WARNING)
+ << "Node "
+ << logged_configuration()->nodes()->Get(i)->name()->string_view()
+ << " has no observations, setting to distributed clock.";
+ }
+ }
+
+ // Confirm that the set of nodes we have connected matches the rank.
+ // Otherwise a<->b and c<->d would count as 4 but really is 3.
+ CHECK_EQ(std::max(static_cast<size_t>(1u), valid_node_count),
+ connected_nodes)
+ << ": Ambiguous nodes.";
+
+ last_valid_matrix_ = valid_matrix_;
+ cached_valid_node_count_ = valid_node_count;
+ }
+
+ // There are 2 cases. Either all the nodes are connected with each other by
+ // actual data, or we have isolated nodes. We want to force the isolated
+ // nodes to match the distributed clock exactly, and to solve for the other
+ // nodes.
+ if (cached_valid_node_count_ == 0) {
+ // Cheat. If there are no valid nodes, the slopes are 1, and offset is 0,
+ // ie, just be the distributed clock.
+ return std::make_tuple(
+ Eigen::Matrix<double, Eigen::Dynamic, 1>::Ones(nodes_count()),
+ Eigen::Matrix<double, Eigen::Dynamic, 1>::Zero(nodes_count()));
+ } else {
+ // TODO(austin): Solve just the nodes we know about. This is harder and
+ // there are no logs which require this yet to test on.
+ CHECK_EQ(cached_valid_node_count_, nodes_count())
+ << ": TODO(austin): Handle partial valid nodes";
+
+ return Solve(mpq_map, mpq_offsets);
+ }
+ } else {
+ const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
+ map_matrix_ + slope_matrix_;
+ VLOG(1) << "map " << (map_matrix_ + slope_matrix_).format(HeavyFmt);
+ VLOG(1) << "offsets " << offset_matrix_.format(HeavyFmt);
+
+ return Solve(mpq_map, offset_matrix_);
+ }
}
} // namespace logger
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 3b10f9d..47fe9e2 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -4,6 +4,7 @@
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/simulated_event_loop.h"
+#include "aos/util/file.h"
#include "glog/logging.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
@@ -42,20 +43,20 @@
// the config.
TEST_F(LoggerTest, Starts) {
const ::std::string tmpdir(getenv("TEST_TMPDIR"));
- const ::std::string logfile = tmpdir + "/logfile.bfbs";
+ const ::std::string base_name = tmpdir + "/logfile";
+ const ::std::string logfile = base_name + ".part0.bfbs";
// Remove it.
unlink(logfile.c_str());
LOG(INFO) << "Logging data to " << logfile;
{
- DetachedBufferWriter writer(logfile);
std::unique_ptr<EventLoop> logger_event_loop =
event_loop_factory_.MakeEventLoop("logger");
event_loop_factory_.RunFor(chrono::milliseconds(95));
- Logger logger(&writer, logger_event_loop.get(),
+ Logger logger(base_name, logger_event_loop.get(),
std::chrono::milliseconds(100));
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
@@ -101,8 +102,9 @@
// Tests that we can read and write rotated log files.
TEST_F(LoggerTest, RotatedLogFile) {
const ::std::string tmpdir(getenv("TEST_TMPDIR"));
- const ::std::string logfile0 = tmpdir + "/logfile0.bfbs";
- const ::std::string logfile1 = tmpdir + "/logfile1.bfbs";
+ const ::std::string base_name = tmpdir + "/logfile";
+ const ::std::string logfile0 = base_name + ".part0.bfbs";
+ const ::std::string logfile1 = base_name + ".part1.bfbs";
// Remove it.
unlink(logfile0.c_str());
unlink(logfile1.c_str());
@@ -110,20 +112,36 @@
LOG(INFO) << "Logging data to " << logfile0 << " and " << logfile1;
{
- DetachedBufferWriter writer0(logfile0);
- DetachedBufferWriter writer1(logfile1);
std::unique_ptr<EventLoop> logger_event_loop =
event_loop_factory_.MakeEventLoop("logger");
event_loop_factory_.RunFor(chrono::milliseconds(95));
- Logger logger(&writer0, logger_event_loop.get(),
- std::chrono::milliseconds(100));
+ Logger logger(
+ std::make_unique<LocalLogNamer>(base_name, logger_event_loop->node()),
+ logger_event_loop.get(), std::chrono::milliseconds(100));
event_loop_factory_.RunFor(chrono::milliseconds(10000));
- logger.Rotate(&writer1);
+ logger.Rotate();
event_loop_factory_.RunFor(chrono::milliseconds(10000));
}
+ {
+ // Confirm that the UUIDs match for both the parts and the logger, and the
+ // parts_index increments.
+ std::vector<FlatbufferVector<LogFileHeader>> log_header;
+ for (std::string_view f : {logfile0, logfile1}) {
+ log_header.emplace_back(ReadHeader(f));
+ }
+
+ EXPECT_EQ(log_header[0].message().logger_uuid()->string_view(),
+ log_header[1].message().logger_uuid()->string_view());
+ EXPECT_EQ(log_header[0].message().parts_uuid()->string_view(),
+ log_header[1].message().parts_uuid()->string_view());
+
+ EXPECT_EQ(log_header[0].message().parts_index(), 0);
+ EXPECT_EQ(log_header[1].message().parts_index(), 1);
+ }
+
// Even though it doesn't make any difference here, exercise the logic for
// passing in a separate config.
LogReader reader(std::vector<std::string>{logfile0, logfile1},
@@ -166,7 +184,8 @@
// Tests that a large number of messages per second doesn't overwhelm writev.
TEST_F(LoggerTest, ManyMessages) {
const ::std::string tmpdir(getenv("TEST_TMPDIR"));
- const ::std::string logfile = tmpdir + "/logfile.bfbs";
+ const ::std::string base_name = tmpdir + "/logfile";
+ const ::std::string logfile = base_name + ".part0.bfbs";
// Remove the log file.
unlink(logfile.c_str());
@@ -174,7 +193,6 @@
ping_.set_quiet(true);
{
- DetachedBufferWriter writer(logfile);
std::unique_ptr<EventLoop> logger_event_loop =
event_loop_factory_.MakeEventLoop("logger");
@@ -198,7 +216,7 @@
chrono::microseconds(50));
});
- Logger logger(&writer, logger_event_loop.get(),
+ Logger logger(base_name, logger_event_loop.get(),
std::chrono::milliseconds(100));
event_loop_factory_.RunFor(chrono::milliseconds(1000));
@@ -217,9 +235,35 @@
configuration::GetNode(event_loop_factory_.configuration(), "pi2")),
tmp_dir_(getenv("TEST_TMPDIR")),
logfile_base_(tmp_dir_ + "/multi_logfile"),
- logfiles_({logfile_base_ + "_pi1_data.bfbs",
- logfile_base_ + "_pi2_data/test/aos.examples.Pong.bfbs",
- logfile_base_ + "_pi2_data.bfbs"}),
+ logfiles_(
+ {logfile_base_ + "_pi1_data.part0.bfbs",
+ logfile_base_ + "_pi2_data/test/aos.examples.Pong.part0.bfbs",
+ logfile_base_ + "_pi2_data/test/aos.examples.Pong.part1.bfbs",
+ logfile_base_ + "_pi2_data.part0.bfbs",
+ logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.logger.MessageHeader.part0.bfbs",
+ logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.logger.MessageHeader.part1.bfbs",
+ logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
+ "aos.logger.MessageHeader.part0.bfbs",
+ logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
+ "aos.logger.MessageHeader.part1.bfbs",
+ logfile_base_ +
+ "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part0.bfbs",
+ logfile_base_ +
+ "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part1.bfbs",
+ logfile_base_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs",
+ logfile_base_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part1.bfbs"}),
+ structured_logfiles_{
+ std::vector<std::string>{logfiles_[0]},
+ std::vector<std::string>{logfiles_[1], logfiles_[2]},
+ std::vector<std::string>{logfiles_[3]},
+ std::vector<std::string>{logfiles_[4], logfiles_[5]},
+ std::vector<std::string>{logfiles_[6], logfiles_[7]},
+ std::vector<std::string>{logfiles_[8], logfiles_[9]},
+ std::vector<std::string>{logfiles_[10], logfiles_[11]}},
ping_event_loop_(event_loop_factory_.MakeEventLoop("ping", pi1_)),
ping_(ping_event_loop_.get()),
pong_event_loop_(event_loop_factory_.MakeEventLoop("pong", pi2_)),
@@ -259,7 +303,9 @@
std::string tmp_dir_;
std::string logfile_base_;
- std::array<std::string, 3> logfiles_;
+ std::vector<std::string> logfiles_;
+
+ std::vector<std::vector<std::string>> structured_logfiles_;
std::unique_ptr<EventLoop> ping_event_loop_;
Ping ping_;
@@ -346,47 +392,137 @@
}
{
- // Confirm that the headers are all for the correct nodes.
- FlatbufferVector<LogFileHeader> logheader1 = ReadHeader(logfiles_[0]);
- EXPECT_EQ(logheader1.message().node()->name()->string_view(), "pi1");
- FlatbufferVector<LogFileHeader> logheader2 = ReadHeader(logfiles_[1]);
- EXPECT_EQ(logheader2.message().node()->name()->string_view(), "pi2");
- FlatbufferVector<LogFileHeader> logheader3 = ReadHeader(logfiles_[2]);
- EXPECT_EQ(logheader3.message().node()->name()->string_view(), "pi2");
+ std::set<std::string> logfile_uuids;
+ std::set<std::string> parts_uuids;
+ // Confirm that we have the expected number of UUIDs for both the logfile
+ // UUIDs and parts UUIDs.
+ std::vector<FlatbufferVector<LogFileHeader>> log_header;
+ for (std::string_view f : logfiles_) {
+ log_header.emplace_back(ReadHeader(f));
+ logfile_uuids.insert(log_header.back().message().logger_uuid()->str());
+ parts_uuids.insert(log_header.back().message().parts_uuid()->str());
+ }
+ EXPECT_EQ(logfile_uuids.size(), 2u);
+ EXPECT_EQ(parts_uuids.size(), 7u);
+
+ // And confirm everything is on the correct node.
+ EXPECT_EQ(log_header[0].message().node()->name()->string_view(), "pi1");
+ EXPECT_EQ(log_header[1].message().node()->name()->string_view(), "pi2");
+ EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi2");
+ EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi2");
+ EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi2");
+ EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
+ EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi1");
+ EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi1");
+ EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi1");
+ EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi1");
+ EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi2");
+ EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi2");
+
+ // And the parts index matches.
+ EXPECT_EQ(log_header[0].message().parts_index(), 0);
+ EXPECT_EQ(log_header[1].message().parts_index(), 0);
+ EXPECT_EQ(log_header[2].message().parts_index(), 1);
+ EXPECT_EQ(log_header[3].message().parts_index(), 0);
+ EXPECT_EQ(log_header[4].message().parts_index(), 0);
+ EXPECT_EQ(log_header[5].message().parts_index(), 1);
+ EXPECT_EQ(log_header[6].message().parts_index(), 0);
+ EXPECT_EQ(log_header[7].message().parts_index(), 1);
+ EXPECT_EQ(log_header[8].message().parts_index(), 0);
+ EXPECT_EQ(log_header[9].message().parts_index(), 1);
+ EXPECT_EQ(log_header[10].message().parts_index(), 0);
+ EXPECT_EQ(log_header[11].message().parts_index(), 1);
+ }
+
+ {
using ::testing::UnorderedElementsAre;
// Timing reports, pings
- EXPECT_THAT(CountChannelsData(logfiles_[0]),
- UnorderedElementsAre(
- std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
- std::make_tuple("/test", "aos.examples.Ping", 2001)));
+ EXPECT_THAT(
+ CountChannelsData(logfiles_[0]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
+ std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
+ std::make_tuple("/test", "aos.examples.Ping", 2001)));
// Timestamps for pong
- EXPECT_THAT(CountChannelsTimestamp(logfiles_[0]),
- UnorderedElementsAre(
- std::make_tuple("/test", "aos.examples.Pong", 2001)));
+ EXPECT_THAT(
+ CountChannelsTimestamp(logfiles_[0]),
+ UnorderedElementsAre(
+ std::make_tuple("/test", "aos.examples.Pong", 2001),
+ std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)));
// Pong data.
EXPECT_THAT(CountChannelsData(logfiles_[1]),
UnorderedElementsAre(
- std::make_tuple("/test", "aos.examples.Pong", 2001)));
+ std::make_tuple("/test", "aos.examples.Pong", 101)));
+ EXPECT_THAT(CountChannelsData(logfiles_[2]),
+ UnorderedElementsAre(
+ std::make_tuple("/test", "aos.examples.Pong", 1900)));
// No timestamps
EXPECT_THAT(CountChannelsTimestamp(logfiles_[1]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[2]), UnorderedElementsAre());
// Timing reports and pongs.
- EXPECT_THAT(CountChannelsData(logfiles_[2]),
- UnorderedElementsAre(
- std::make_tuple("/pi2/aos", "aos.timing.Report", 40),
- std::make_tuple("/test", "aos.examples.Pong", 2001)));
+ EXPECT_THAT(
+ CountChannelsData(logfiles_[3]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
+ std::make_tuple("/pi2/aos", "aos.timing.Report", 40),
+ std::make_tuple("/test", "aos.examples.Pong", 2001)));
// And ping timestamps.
- EXPECT_THAT(CountChannelsTimestamp(logfiles_[2]),
- UnorderedElementsAre(
- std::make_tuple("/test", "aos.examples.Ping", 2001)));
+ EXPECT_THAT(
+ CountChannelsTimestamp(logfiles_[3]),
+ UnorderedElementsAre(
+ std::make_tuple("/test", "aos.examples.Ping", 2001),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)));
+
+ // Timestamps from pi2 on pi1, and the other way.
+ EXPECT_THAT(CountChannelsData(logfiles_[4]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsData(logfiles_[5]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsData(logfiles_[6]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsData(logfiles_[7]), UnorderedElementsAre());
+ EXPECT_THAT(
+ CountChannelsTimestamp(logfiles_[4]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 10),
+ std::make_tuple("/test", "aos.examples.Ping", 101)));
+ EXPECT_THAT(
+ CountChannelsTimestamp(logfiles_[5]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 190),
+ std::make_tuple("/test", "aos.examples.Ping", 1900)));
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[6]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi2/aos", "aos.message_bridge.Timestamp", 10)));
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[7]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi2/aos", "aos.message_bridge.Timestamp", 190)));
+
+ // And then test that the remotely logged timestamp data files only have
+ // timestamps in them.
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[8]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[9]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[10]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[11]), UnorderedElementsAre());
+
+ EXPECT_THAT(CountChannelsData(logfiles_[8]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi1/aos", "aos.message_bridge.Timestamp", 10)));
+ EXPECT_THAT(CountChannelsData(logfiles_[9]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi1/aos", "aos.message_bridge.Timestamp", 190)));
+
+ EXPECT_THAT(CountChannelsData(logfiles_[10]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi2/aos", "aos.message_bridge.Timestamp", 10)));
+ EXPECT_THAT(CountChannelsData(logfiles_[11]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi2/aos", "aos.message_bridge.Timestamp", 190)));
}
- LogReader reader({std::vector<std::string>{logfiles_[0]},
- std::vector<std::string>{logfiles_[2]}});
+ LogReader reader(structured_logfiles_);
SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -400,6 +536,13 @@
const Node *pi2 =
configuration::GetNode(log_reader_factory.configuration(), "pi2");
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+ LOG(INFO) << "now pi1 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+ LOG(INFO) << "now pi2 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(pi1, pi2));
reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
@@ -417,7 +560,7 @@
// Confirm that the ping value matches.
pi1_event_loop->MakeWatcher(
"/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
- VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
+ VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
<< pi1_event_loop->context().monotonic_remote_time << " -> "
<< pi1_event_loop->context().monotonic_event_time;
EXPECT_EQ(ping.value(), pi1_ping_count + 1);
@@ -436,7 +579,7 @@
});
pi2_event_loop->MakeWatcher(
"/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
- VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
+ VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
<< pi2_event_loop->context().monotonic_remote_time << " -> "
<< pi2_event_loop->context().monotonic_event_time;
EXPECT_EQ(ping.value(), pi2_ping_count + 1);
@@ -555,11 +698,8 @@
}
)");
- EXPECT_DEATH(LogReader({std::vector<std::string>{logfiles_[0]},
- std::vector<std::string>{logfiles_[2]}},
- &extra_nodes_config.message()),
+ EXPECT_DEATH(LogReader(structured_logfiles_, &extra_nodes_config.message()),
"Log file and replay config need to have matching nodes lists.");
- ;
}
// Tests that we can read log files where they don't start at the same monotonic
@@ -580,8 +720,7 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader({std::vector<std::string>{logfiles_[0]},
- std::vector<std::string>{logfiles_[2]}});
+ LogReader reader(structured_logfiles_);
SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -721,21 +860,27 @@
event_loop_factory_.RunFor(chrono::milliseconds(400));
}
- LogReader reader({std::vector<std::string>{logfiles_[0]},
- std::vector<std::string>{logfiles_[2]}});
+ LogReader reader(structured_logfiles_);
SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
- // This sends out the fetched messages and advances time to the start of the
- // log file.
- reader.Register(&log_reader_factory);
-
const Node *pi1 =
configuration::GetNode(log_reader_factory.configuration(), "pi1");
const Node *pi2 =
configuration::GetNode(log_reader_factory.configuration(), "pi2");
+ // This sends out the fetched messages and advances time to the start of the
+ // log file.
+ reader.Register(&log_reader_factory);
+
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+ LOG(INFO) << "now pi1 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+ LOG(INFO) << "now pi2 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
LOG(INFO) << "Done registering (pi1) "
<< log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
<< " "
@@ -813,6 +958,31 @@
reader.Deregister();
}
+// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
+TEST_F(MultinodeLoggerTest, SortParts) {
+ // Make a bunch of parts.
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(2000));
+ }
+
+ const std::vector<std::vector<std::string>> sorted_parts =
+ SortParts(logfiles_);
+
+ // Test that each list of parts is in order. Don't worry about the ordering
+ // between part file lists though.
+ // (inner vectors all need to be in order, but outer one doesn't matter).
+ EXPECT_THAT(sorted_parts,
+ ::testing::UnorderedElementsAreArray(structured_logfiles_));
+}
+
// TODO(austin): We can write a test which recreates a logfile and confirms that
// we get it back. That is the ultimate test.
diff --git a/aos/events/logging/multinode_pingpong.json b/aos/events/logging/multinode_pingpong.json
index a78b42c..6ac51e7 100644
--- a/aos/events/logging/multinode_pingpong.json
+++ b/aos/events/logging/multinode_pingpong.json
@@ -60,27 +60,43 @@
{
"name": "/pi1/aos",
"type": "aos.message_bridge.Timestamp",
- "logger": "NOT_LOGGED",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["pi2"],
"source_node": "pi1",
"destination_nodes": [
{
"name": "pi2",
- "timestamp_logger": "NOT_LOGGED"
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
}
]
},
{
"name": "/pi2/aos",
"type": "aos.message_bridge.Timestamp",
- "logger": "NOT_LOGGED",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["pi1"],
"source_node": "pi2",
"destination_nodes": [
{
"name": "pi1",
- "timestamp_logger": "NOT_LOGGED"
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
}
]
},
+ {
+ "name": "/pi1/aos/remote_timestamps/pi2",
+ "type": "aos.logger.MessageHeader",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi1",
+ "type": "aos.logger.MessageHeader",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi2"
+ },
/* Forwarded to pi2 */
{
"name": "/test",
diff --git a/aos/events/logging/uuid.cc b/aos/events/logging/uuid.cc
new file mode 100644
index 0000000..9298c8b
--- /dev/null
+++ b/aos/events/logging/uuid.cc
@@ -0,0 +1,60 @@
+#include "aos/events/logging/uuid.h"
+
+#include <array>
+#include <random>
+#include <string_view>
+
+namespace aos {
+namespace {
+char ToHex(int val) {
+ if (val < 10) {
+ return val + '0';
+ } else {
+ return val - 10 + 'a';
+ }
+}
+} // namespace
+
+UUID UUID::Random() {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+
+ std::uniform_int_distribution<> dis(0, 15);
+ std::uniform_int_distribution<> dis2(8, 11);
+
+ UUID result;
+
+ // UUID4 is implemented per https://www.cryptosys.net/pki/uuid-rfc4122.html
+ int i;
+ for (i = 0; i < 8; i++) {
+ result.data_[i] = ToHex(dis(gen));
+ }
+ result.data_[i] = '-';
+ ++i;
+ for (; i < 13; i++) {
+ result.data_[i] = ToHex(dis(gen));
+ }
+ result.data_[i] = '-';
+ ++i;
+ result.data_[i] = '4';
+ ++i;
+ for (; i < 18; i++) {
+ result.data_[i] = ToHex(dis(gen));
+ }
+ result.data_[i] = '-';
+ ++i;
+ result.data_[i] = ToHex(dis2(gen));
+ ++i;
+ for (; i < 23; i++) {
+ result.data_[i] = ToHex(dis(gen));
+ }
+ result.data_[i] = '-';
+ ++i;
+ for (; i < 36; i++) {
+ result.data_[i] = ToHex(dis(gen));
+ }
+
+ return result;
+}
+
+} // namespace aos
diff --git a/aos/events/logging/uuid.h b/aos/events/logging/uuid.h
new file mode 100644
index 0000000..b81b811
--- /dev/null
+++ b/aos/events/logging/uuid.h
@@ -0,0 +1,36 @@
+#ifndef AOS_EVENTS_LOGGING_UUID_H_
+#define AOS_EVENTS_LOGGING_UUID_H_
+
+#include <array>
+#include <random>
+#include <string_view>
+
+namespace aos {
+
+// Class to generate and hold a UUID.
+class UUID {
+ public:
+ // Returns a randomly generated UUID. This is known as a UUID4.
+ static UUID Random();
+
+ std::string_view string_view() const {
+ return std::string_view(data_.data(), data_.size());
+ }
+
+ bool operator==(const UUID &other) const {
+ return other.string_view() == string_view();
+ }
+ bool operator!=(const UUID &other) const {
+ return other.string_view() != string_view();
+ }
+
+ private:
+ UUID() {}
+
+ // Fixed size storage for the data. Non-null terminated.
+ std::array<char, 36> data_;
+};
+
+} // namespace aos
+
+#endif // AOS_EVENTS_LOGGING_UUID_H_
diff --git a/aos/events/logging/uuid_test.cc b/aos/events/logging/uuid_test.cc
new file mode 100644
index 0000000..d0320de
--- /dev/null
+++ b/aos/events/logging/uuid_test.cc
@@ -0,0 +1,18 @@
+#include "aos/events/logging/uuid.h"
+
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace testing {
+
+// Tests that random UUIDs are actually random, and we can convert them to a
+// string. Not very exhaustive, but it is a good smoke test.
+TEST(UUIDTest, GetOne) {
+ LOG(INFO) << UUID::Random().string_view();
+
+ EXPECT_NE(UUID::Random(), UUID::Random());
+}
+
+} // namespace testing
+} // namespace aos
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
index e56d331..970b7e3 100644
--- a/aos/events/multinode_pingpong.json
+++ b/aos/events/multinode_pingpong.json
@@ -35,7 +35,9 @@
{
"name": "pi2",
"priority": 1,
- "time_to_live": 5000000
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
},
{
"name": "pi3",
@@ -55,7 +57,9 @@
{
"name": "pi1",
"priority": 1,
- "time_to_live": 5000000
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
}
]
},
@@ -111,6 +115,18 @@
"frequency": 2
},
{
+ "name": "/pi1/aos/remote_timestamps/pi2",
+ "type": "aos.logger.MessageHeader",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi1",
+ "type": "aos.logger.MessageHeader",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi2"
+ },
+ {
"name": "/pi1/aos",
"type": "aos.timing.Report",
"source_node": "pi1",
@@ -142,8 +158,9 @@
{
"name": "pi2",
"priority": 1,
- "timestamp_logger": "LOCAL_LOGGER",
- "time_to_live": 5000000
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
}
]
},
@@ -155,8 +172,9 @@
{
"name": "pi1",
"priority": 1,
- "timestamp_logger": "LOCAL_LOGGER",
- "time_to_live": 5000000
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
}
]
},
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 1bb0313..29f01a4 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -118,10 +118,10 @@
return config;
}
-class MMapedQueue {
+class MMappedQueue {
public:
- MMapedQueue(std::string_view shm_base, const Channel *channel,
- std::chrono::seconds channel_storage_duration)
+ MMappedQueue(std::string_view shm_base, const Channel *channel,
+ std::chrono::seconds channel_storage_duration)
: config_(MakeQueueConfiguration(channel, channel_storage_duration)) {
std::string path = ShmPath(shm_base, channel);
@@ -172,7 +172,7 @@
ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
}
- ~MMapedQueue() {
+ ~MMappedQueue() {
PCHECK(munmap(data_, size_) == 0);
PCHECK(munmap(const_cast<void *>(const_data_), size_) == 0);
}
@@ -450,7 +450,7 @@
aos::ShmEventLoop *event_loop_;
const Channel *const channel_;
- MMapedQueue lockless_queue_memory_;
+ MMappedQueue lockless_queue_memory_;
ipc_lib::LocklessQueueReader reader_;
// This being nullopt indicates we're not looking for wakeups right now.
std::optional<ipc_lib::LocklessQueueWatcher> watcher_;
@@ -567,7 +567,7 @@
int buffer_index() override { return lockless_queue_sender_.buffer_index(); }
private:
- MMapedQueue lockless_queue_memory_;
+ MMappedQueue lockless_queue_memory_;
ipc_lib::LocklessQueueSender lockless_queue_sender_;
ipc_lib::LocklessQueueWakeUpper wake_upper_;
};
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index b2e54bb..a7301bf 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -741,20 +741,27 @@
DoCallCallback([monotonic_now]() { return monotonic_now; }, context);
msgs_.pop_front();
+ if (token_ != scheduler_->InvalidToken()) {
+ scheduler_->Deschedule(token_);
+ token_ = scheduler_->InvalidToken();
+ }
if (msgs_.size() != 0) {
event_.set_event_time(msgs_.front()->context.monotonic_event_time);
simulated_event_loop_->AddEvent(&event_);
DoSchedule(event_.event_time());
- } else {
- token_ = scheduler_->InvalidToken();
}
}
void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
- token_ =
- scheduler_->Schedule(event_time + simulated_event_loop_->send_delay(),
- [this]() { simulated_event_loop_->HandleEvent(); });
+ CHECK(token_ == scheduler_->InvalidToken())
+ << ": May not schedule multiple times";
+ token_ = scheduler_->Schedule(
+ event_time + simulated_event_loop_->send_delay(), [this]() {
+ DCHECK(token_ != scheduler_->InvalidToken());
+ token_ = scheduler_->InvalidToken();
+ simulated_event_loop_->HandleEvent();
+ });
}
void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
@@ -817,13 +824,11 @@
simulated_event_loop_->monotonic_now();
base_ = base;
repeat_offset_ = repeat_offset;
- if (base < monotonic_now) {
- token_ = scheduler_->Schedule(
- monotonic_now, [this]() { simulated_event_loop_->HandleEvent(); });
- } else {
- token_ = scheduler_->Schedule(
- base, [this]() { simulated_event_loop_->HandleEvent(); });
- }
+ token_ = scheduler_->Schedule(std::max(base, monotonic_now), [this]() {
+ DCHECK(token_ != scheduler_->InvalidToken());
+ token_ = scheduler_->InvalidToken();
+ simulated_event_loop_->HandleEvent();
+ });
event_.set_event_time(base_);
simulated_event_loop_->AddEvent(&event_);
}
@@ -835,15 +840,20 @@
if (simulated_event_loop_->log_impl_ != nullptr) {
logging::SetImplementation(simulated_event_loop_->log_impl_);
}
+ if (token_ != scheduler_->InvalidToken()) {
+ scheduler_->Deschedule(token_);
+ token_ = scheduler_->InvalidToken();
+ }
if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
// Reschedule.
while (base_ <= monotonic_now) base_ += repeat_offset_;
- token_ = scheduler_->Schedule(
- base_, [this]() { simulated_event_loop_->HandleEvent(); });
+ token_ = scheduler_->Schedule(base_, [this]() {
+ DCHECK(token_ != scheduler_->InvalidToken());
+ token_ = scheduler_->InvalidToken();
+ simulated_event_loop_->HandleEvent();
+ });
event_.set_event_time(base_);
simulated_event_loop_->AddEvent(&event_);
- } else {
- token_ = scheduler_->InvalidToken();
}
Call([monotonic_now]() { return monotonic_now; }, monotonic_now);
@@ -889,8 +899,15 @@
void SimulatedPhasedLoopHandler::Schedule(
monotonic_clock::time_point sleep_time) {
- token_ = scheduler_->Schedule(
- sleep_time, [this]() { simulated_event_loop_->HandleEvent(); });
+ if (token_ != scheduler_->InvalidToken()) {
+ scheduler_->Deschedule(token_);
+ token_ = scheduler_->InvalidToken();
+ }
+ token_ = scheduler_->Schedule(sleep_time, [this]() {
+ DCHECK(token_ != scheduler_->InvalidToken());
+ token_ = scheduler_->InvalidToken();
+ simulated_event_loop_->HandleEvent();
+ });
event_.set_event_time(sleep_time);
simulated_event_loop_->AddEvent(&event_);
}
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 44ba3e5..7de0540 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -3,6 +3,7 @@
#include <string_view>
#include "aos/events/event_loop_param_test.h"
+#include "aos/events/logging/logger_generated.h"
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/test_message_generated.h"
@@ -290,6 +291,11 @@
simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
MessageCounter<examples::Pong> pi2_pong_counter(
pi2_pong_counter_event_loop.get(), "/test");
+ aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
+ pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
+ "/pi1/aos");
+ aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
+ pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
@@ -298,6 +304,14 @@
simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
MessageCounter<examples::Pong> pi1_pong_counter(
pi1_pong_counter_event_loop.get(), "/test");
+ aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
+ pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
+ aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
+ pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
+ "/aos");
+
+ std::unique_ptr<EventLoop> pi1_remote_timestamp =
+ simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
// Count timestamps.
MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
@@ -315,6 +329,12 @@
MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
pi3_pong_counter_event_loop.get(), "/pi3/aos");
+ // Count remote timestamps
+ MessageCounter<logger::MessageHeader> remote_timestamps_pi2_on_pi1(
+ pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
+ MessageCounter<logger::MessageHeader> remote_timestamps_pi1_on_pi2(
+ pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
+
// Wait to let timestamp estimation start up before looking for the results.
simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
@@ -429,6 +449,98 @@
++pi3_client_statistics_count;
});
+ // Find the channel index for both the /pi1/aos Timestamp channel and Ping
+ // channel.
+ const size_t pi1_timestamp_channel =
+ configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
+ pi1_on_pi2_timestamp_fetcher.channel());
+ const size_t ping_timestamp_channel =
+ configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
+ ping_on_pi2_fetcher.channel());
+
+ for (const Channel *channel :
+ *pi1_pong_counter_event_loop->configuration()->channels()) {
+ VLOG(1) << "Channel "
+ << configuration::ChannelIndex(
+ pi1_pong_counter_event_loop->configuration(), channel)
+ << " " << configuration::CleanedChannelToString(channel);
+ }
+
+ // For each remote timestamp we get back, confirm that it is either a ping
+ // message, or a timestamp we sent out. Also confirm that the timestamps are
+ // correct.
+ pi1_remote_timestamp->MakeWatcher(
+ "/pi1/aos/remote_timestamps/pi2",
+ [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
+ &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
+ &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
+ VLOG(1) << aos::FlatbufferToJson(&header);
+
+ const aos::monotonic_clock::time_point header_monotonic_sent_time(
+ chrono::nanoseconds(header.monotonic_sent_time()));
+ const aos::realtime_clock::time_point header_realtime_sent_time(
+ chrono::nanoseconds(header.realtime_sent_time()));
+ const aos::monotonic_clock::time_point header_monotonic_remote_time(
+ chrono::nanoseconds(header.monotonic_remote_time()));
+ const aos::realtime_clock::time_point header_realtime_remote_time(
+ chrono::nanoseconds(header.realtime_remote_time()));
+
+ const Context *pi1_context = nullptr;
+ const Context *pi2_context = nullptr;
+
+ if (header.channel_index() == pi1_timestamp_channel) {
+ // Find the forwarded message.
+ while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
+ header_monotonic_sent_time) {
+ ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
+ }
+
+ // And the source message.
+ while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
+ header_monotonic_remote_time) {
+ ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
+ }
+
+ pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
+ pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
+ } else if (header.channel_index() == ping_timestamp_channel) {
+ // Find the forwarded message.
+ while (ping_on_pi2_fetcher.context().monotonic_event_time <
+ header_monotonic_sent_time) {
+ ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
+ }
+
+ // And the source message.
+ while (ping_on_pi1_fetcher.context().monotonic_event_time <
+ header_monotonic_remote_time) {
+ ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
+ }
+
+ pi1_context = &ping_on_pi1_fetcher.context();
+ pi2_context = &ping_on_pi2_fetcher.context();
+ } else {
+ LOG(FATAL) << "Unknown channel";
+ }
+
+ // Confirm the forwarded message has matching timestamps to the
+ // timestamps we got back.
+ EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi2_context->monotonic_event_time,
+ header_monotonic_sent_time);
+ EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
+ EXPECT_EQ(pi2_context->realtime_remote_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi2_context->monotonic_remote_time,
+ header_monotonic_remote_time);
+
+ // Confirm the forwarded message also matches the source message.
+ EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi1_context->monotonic_event_time,
+ header_monotonic_remote_time);
+ EXPECT_EQ(pi1_context->realtime_event_time,
+ header_realtime_remote_time);
+ });
+
simulated_event_loop_factory.RunFor(chrono::seconds(10) -
chrono::milliseconds(500) +
chrono::milliseconds(5));
@@ -451,6 +563,10 @@
EXPECT_EQ(pi1_client_statistics_count, 95);
EXPECT_EQ(pi2_client_statistics_count, 95);
EXPECT_EQ(pi3_client_statistics_count, 95);
+
+ // Also confirm that remote timestamps are being forwarded correctly.
+ EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 1101);
+ EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 1101);
}
// Tests that an offset between nodes can be recovered and shows up in
@@ -605,6 +721,12 @@
MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
pi3_pong_counter_event_loop.get(), "/pi3/aos");
+ // Count remote timestamps
+ MessageCounter<logger::MessageHeader> remote_timestamps_pi2_on_pi1(
+ pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
+ MessageCounter<logger::MessageHeader> remote_timestamps_pi1_on_pi2(
+ pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
+
MessageCounter<message_bridge::ServerStatistics>
pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
"/pi1/aos");
@@ -646,6 +768,10 @@
EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
+
+ // Also confirm that remote timestamps are being forwarded correctly.
+ EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 1001);
+ EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 1001);
}
} // namespace testing
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 0f6e8bc..825c830 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -19,7 +19,9 @@
std::unique_ptr<aos::RawFetcher> fetcher,
std::unique_ptr<aos::RawSender> sender,
ServerConnection *server_connection, int client_index,
- MessageBridgeClientStatus *client_status)
+ MessageBridgeClientStatus *client_status,
+ size_t channel_index,
+ aos::Sender<logger::MessageHeader> *timestamp_logger)
: fetch_node_factory_(fetch_node_factory),
send_node_factory_(send_node_factory),
send_event_loop_(send_event_loop),
@@ -28,7 +30,9 @@
server_connection_(server_connection),
client_status_(client_status),
client_index_(client_index),
- client_connection_(client_status_->GetClientConnection(client_index)) {
+ client_connection_(client_status_->GetClientConnection(client_index)),
+ channel_index_(channel_index),
+ timestamp_logger_(timestamp_logger) {
timer_ = send_event_loop_->AddTimer([this]() { Send(); });
Schedule();
@@ -101,6 +105,34 @@
client_connection_->mutate_received_packets(
client_connection_->received_packets() + 1);
+
+ if (timestamp_logger_) {
+ aos::Sender<logger::MessageHeader>::Builder builder =
+ timestamp_logger_->MakeBuilder();
+
+ logger::MessageHeader::Builder message_header_builder =
+ builder.MakeBuilder<logger::MessageHeader>();
+
+ message_header_builder.add_channel_index(channel_index_);
+
+ // Swap the remote and sent metrics. They are from the sender's
+ // perspective, not the receiver's perspective.
+ message_header_builder.add_monotonic_remote_time(
+ fetcher_->context().monotonic_event_time.time_since_epoch().count());
+ message_header_builder.add_realtime_remote_time(
+ fetcher_->context().realtime_event_time.time_since_epoch().count());
+ message_header_builder.add_remote_queue_index(
+ fetcher_->context().queue_index);
+
+ message_header_builder.add_monotonic_sent_time(
+ sender_->monotonic_sent_time().time_since_epoch().count());
+ message_header_builder.add_realtime_sent_time(
+ sender_->realtime_sent_time().time_since_epoch().count());
+ message_header_builder.add_queue_index(sender_->sent_queue_index());
+
+ builder.Send(message_header_builder.Finish());
+ }
+
sent_ = true;
Schedule();
}
@@ -136,6 +168,9 @@
MessageBridgeClientStatus *client_status_ = nullptr;
int client_index_;
ClientConnection *client_connection_ = nullptr;
+
+ size_t channel_index_;
+ aos::Sender<logger::MessageHeader> *timestamp_logger_ = nullptr;
};
SimulatedMessageBridge::SimulatedMessageBridge(
@@ -208,6 +243,13 @@
destination_event_loop->second.client_status.FindClientIndex(
channel->source_node()->string_view());
+ const size_t destination_node_index = configuration::GetNodeIndex(
+ simulated_event_loop_factory->configuration(), destination_node);
+
+ const bool delivery_time_is_logged =
+ configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ connection, source_event_loop->second.event_loop->node());
+
delayers->emplace_back(std::make_unique<RawMessageDelayer>(
simulated_event_loop_factory->GetNodeEventLoopFactory(node),
simulated_event_loop_factory->GetNodeEventLoopFactory(
@@ -216,7 +258,13 @@
source_event_loop->second.event_loop->MakeRawFetcher(channel),
destination_event_loop->second.event_loop->MakeRawSender(channel),
server_connection, client_index,
- &destination_event_loop->second.client_status));
+ &destination_event_loop->second.client_status,
+ configuration::ChannelIndex(
+ source_event_loop->second.event_loop->configuration(), channel),
+ delivery_time_is_logged
+ ? &source_event_loop->second
+ .timestamp_loggers[destination_node_index]
+ : nullptr));
}
const Channel *const timestamp_channel = configuration::GetChannel(
@@ -272,5 +320,46 @@
}
}
+SimulatedMessageBridge::State::State(
+ std::unique_ptr<aos::EventLoop> &&new_event_loop)
+ : event_loop(std::move(new_event_loop)),
+ server_status(event_loop.get()),
+ client_status(event_loop.get()) {
+ timestamp_loggers.resize(event_loop->configuration()->nodes()->size());
+
+ // Find all nodes which log timestamps back to us (from us).
+ for (const Channel *channel : *event_loop->configuration()->channels()) {
+ CHECK(channel->has_source_node());
+
+ // Sent by us.
+ if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
+ channel->has_destination_nodes()) {
+ for (const Connection *connection : *channel->destination_nodes()) {
+ const bool delivery_time_is_logged =
+ configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ connection, event_loop->node());
+
+ // And the timestamps are then logged back by us again.
+ if (!delivery_time_is_logged) {
+ continue;
+ }
+
+ // (And only construct the sender if it hasn't been constructed)
+ const Node *other_node = configuration::GetNode(
+ event_loop->configuration(), connection->name()->string_view());
+ const size_t other_node_index = configuration::GetNodeIndex(
+ event_loop->configuration(), other_node);
+
+ if (!timestamp_loggers[other_node_index]) {
+ timestamp_loggers[other_node_index] =
+ event_loop->MakeSender<logger::MessageHeader>(
+ absl::StrCat("/aos/remote_timestamps/",
+ connection->name()->string_view()));
+ }
+ }
+ }
+ }
+}
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 5784bb3..2a4da63 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -2,6 +2,7 @@
#define AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
#include "aos/events/event_loop.h"
+#include "aos/events/logging/logger_generated.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/network/message_bridge_client_status.h"
#include "aos/network/message_bridge_server_status.h"
@@ -32,16 +33,15 @@
private:
struct State {
- State(std::unique_ptr<aos::EventLoop> &&new_event_loop)
- : event_loop(std::move(new_event_loop)),
- server_status(event_loop.get()),
- client_status(event_loop.get()) {}
+ State(std::unique_ptr<aos::EventLoop> &&new_event_loop);
State(const State &state) = delete;
std::unique_ptr<aos::EventLoop> event_loop;
MessageBridgeServerStatus server_status;
MessageBridgeClientStatus client_status;
+
+ std::vector<aos::Sender<logger::MessageHeader>> timestamp_loggers;
};
// Map of nodes to event loops. This is a member variable so that the
// lifetime of the event loops matches the lifetime of the bridge.
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 392a7eb..bd095b8 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -328,6 +328,14 @@
virtual ~SizePrefixedFlatbufferDetachedBuffer() override {}
+ static SizePrefixedFlatbufferDetachedBuffer<T> Empty() {
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+ const auto end = fbb.EndTable(fbb.StartTable());
+ fbb.FinishSizePrefixed(flatbuffers::Offset<flatbuffers::Table>(end));
+ return SizePrefixedFlatbufferDetachedBuffer<T>(fbb.Release());
+ }
+
// Returns references to the buffer, and the data.
const flatbuffers::DetachedBuffer &buffer() const { return buffer_; }
const uint8_t *data() const override {
@@ -340,6 +348,13 @@
return buffer_.size() - sizeof(flatbuffers::uoffset_t);
}
+ absl::Span<uint8_t> full_span() {
+ return absl::Span<uint8_t>(buffer_.data(), buffer_.size());
+ }
+ absl::Span<const uint8_t> full_span() const {
+ return absl::Span<const uint8_t>(buffer_.data(), buffer_.size());
+ }
+
private:
flatbuffers::DetachedBuffer buffer_;
};
diff --git a/aos/ipc_lib/signalfd.cc b/aos/ipc_lib/signalfd.cc
index 363bf4a..051a654 100644
--- a/aos/ipc_lib/signalfd.cc
+++ b/aos/ipc_lib/signalfd.cc
@@ -2,6 +2,9 @@
#include <signal.h>
#include <sys/types.h>
+#if __has_feature(memory_sanitizer)
+#include <sanitizer/msan_interface.h>
+#endif
#include <unistd.h>
#include <initializer_list>
@@ -9,6 +12,50 @@
namespace aos {
namespace ipc_lib {
+namespace {
+
+// Wrapper which propagates msan information.
+// TODO(Brian): Drop this once we have <https://reviews.llvm.org/D82411> to
+// intercept this function natively.
+int wrapped_sigandset(sigset_t *dest, const sigset_t *left,
+ const sigset_t *right) {
+#if __has_feature(memory_sanitizer)
+ if (left) {
+ __msan_check_mem_is_initialized(left, sizeof(*left));
+ }
+ if (right) {
+ __msan_check_mem_is_initialized(right, sizeof(*right));
+ }
+#endif
+ const int r = sigandset(dest, left, right);
+#if __has_feature(memory_sanitizer)
+ if (!r && dest) {
+ __msan_unpoison(dest, sizeof(*dest));
+ }
+#endif
+ return r;
+}
+
+// Wrapper which propagates msan information.
+// TODO(Brian): Drop this once we have
+// <https://reviews.llvm.org/rG89ae290b58e20fc5f56b7bfae4b34e7fef06e1b1> to
+// intercept this function natively.
+int wrapped_pthread_sigmask(int how, const sigset_t *set, sigset_t *oldset) {
+#if __has_feature(memory_sanitizer)
+ if (set) {
+ __msan_check_mem_is_initialized(set, sizeof(*set));
+ }
+#endif
+ const int r = pthread_sigmask(how, set, oldset);
+#if __has_feature(memory_sanitizer)
+ if (!r && oldset) {
+ __msan_unpoison(oldset, sizeof(*oldset));
+ }
+#endif
+ return r;
+}
+
+} // namespace
SignalFd::SignalFd(::std::initializer_list<unsigned int> signals) {
// Build up the mask with the provided signals.
@@ -23,7 +70,7 @@
// signalfd gets them. Record which ones we actually blocked, so we can
// unblock just those later.
sigset_t old_mask;
- CHECK_EQ(0, pthread_sigmask(SIG_BLOCK, &blocked_mask_, &old_mask));
+ CHECK_EQ(0, wrapped_pthread_sigmask(SIG_BLOCK, &blocked_mask_, &old_mask));
for (int signal : signals) {
if (sigismember(&old_mask, signal)) {
CHECK_EQ(0, sigdelset(&blocked_mask_, signal));
@@ -35,9 +82,9 @@
// Unwind the constructor. Unblock the signals and close the fd. Verify nobody
// else unblocked the signals we're supposed to unblock in the meantime.
sigset_t old_mask;
- CHECK_EQ(0, pthread_sigmask(SIG_UNBLOCK, &blocked_mask_, &old_mask));
+ CHECK_EQ(0, wrapped_pthread_sigmask(SIG_UNBLOCK, &blocked_mask_, &old_mask));
sigset_t unblocked_mask;
- CHECK_EQ(0, sigandset(&unblocked_mask, &blocked_mask_, &old_mask));
+ CHECK_EQ(0, wrapped_sigandset(&unblocked_mask, &blocked_mask_, &old_mask));
if (memcmp(&unblocked_mask, &blocked_mask_, sizeof(unblocked_mask)) != 0) {
LOG(FATAL) << "Some other code unblocked one or more of our signals";
}
diff --git a/aos/logging/context.cc b/aos/logging/context.cc
index 72c1970..84e4e80 100644
--- a/aos/logging/context.cc
+++ b/aos/logging/context.cc
@@ -4,6 +4,9 @@
#define _GNU_SOURCE /* See feature_test_macros(7) */
#endif
+#if __has_feature(memory_sanitizer)
+#include <sanitizer/msan_interface.h>
+#endif
#include <string.h>
#include <sys/prctl.h>
#include <sys/types.h>
@@ -16,8 +19,8 @@
extern char *program_invocation_name;
extern char *program_invocation_short_name;
-#include "aos/die.h"
#include "aos/complex_thread_local.h"
+#include "aos/die.h"
namespace aos {
namespace logging {
@@ -39,6 +42,10 @@
if (prctl(PR_GET_NAME, thread_name_array) != 0) {
PDie("prctl(PR_GET_NAME, %p) failed", thread_name_array);
}
+#if __has_feature(memory_sanitizer)
+ // msan doesn't understand PR_GET_NAME, so help it along.
+ __msan_unpoison(thread_name_array, sizeof(thread_name_array));
+#endif
thread_name_array[sizeof(thread_name_array) - 1] = '\0';
::std::string thread_name(thread_name_array);
@@ -67,8 +74,7 @@
::std::atomic<LogImplementation *> global_top_implementation(NULL);
Context::Context()
- : implementation(global_top_implementation.load()),
- sequence(0) {
+ : implementation(global_top_implementation.load()), sequence(0) {
cork_data.Reset();
}
@@ -77,8 +83,7 @@
if (my_context.created()) {
::std::string my_name = GetMyName();
if (my_name.size() + 1 > sizeof(Context::name)) {
- Die("logging: process/thread name '%s' is too long\n",
- my_name.c_str());
+ Die("logging: process/thread name '%s' is too long\n", my_name.c_str());
}
strcpy(my_context->name, my_name.c_str());
my_context->name_size = my_name.size();
@@ -98,9 +103,7 @@
return my_context.get();
}
-void Context::Delete() {
- delete_current_context = true;
-}
+void Context::Delete() { delete_current_context = true; }
} // namespace internal
} // namespace logging
diff --git a/aos/logging/log_message.fbs b/aos/logging/log_message.fbs
index 789724f..7e246cf 100644
--- a/aos/logging/log_message.fbs
+++ b/aos/logging/log_message.fbs
@@ -18,7 +18,7 @@
level:Level (id: 1);
// Pid of the process creating the log message
- source:int (id:2);
+ source_pid:int (id:2);
// Application name
name:string (id:3);
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 65aff91..98a271c 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -268,6 +268,7 @@
name = "message_bridge_test_common_config",
src = "message_bridge_test_common.json",
flatbuffers = [
+ "//aos/events/logging:logger_fbs",
"//aos/events:ping_fbs",
"//aos/events:pong_fbs",
"//aos/network:message_bridge_client_fbs",
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index b371181..00040e1 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -78,6 +78,8 @@
logger::MessageHeader::Builder message_header_builder(fbb);
message_header_builder.add_channel_index(0);
message_header_builder.add_monotonic_sent_time(0);
+ message_header_builder.add_realtime_sent_time(0);
+ message_header_builder.add_queue_index(0);
message_header_builder.add_monotonic_remote_time(0);
message_header_builder.add_realtime_remote_time(0);
message_header_builder.add_remote_queue_index(0);
@@ -250,6 +252,10 @@
message_header->channel_index());
message_reception_reply_.mutable_message()->mutate_monotonic_sent_time(
message_header->monotonic_sent_time());
+ message_reception_reply_.mutable_message()->mutate_realtime_sent_time(
+ message_header->realtime_sent_time());
+ message_reception_reply_.mutable_message()->mutate_queue_index(
+ message_header->queue_index());
// And capture the relevant data needed to generate the forwarding
// MessageHeader.
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 2832339..de836ae 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -1,5 +1,6 @@
#include "aos/network/message_bridge_server_lib.h"
+#include "absl/strings/str_cat.h"
#include "absl/types/span.h"
#include "aos/events/logging/logger.h"
#include "aos/events/logging/logger_generated.h"
@@ -85,14 +86,55 @@
// and flushes. Whee.
}
-void ChannelState::HandleDelivery(sctp_assoc_t /*rcv_assoc_id*/,
- uint16_t /*ssn*/,
+void ChannelState::HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t /*ssn*/,
absl::Span<const uint8_t> data) {
const logger::MessageHeader *message_header =
flatbuffers::GetRoot<logger::MessageHeader>(data.data());
+ for (Peer &peer : peers_) {
+ if (peer.sac_assoc_id == rcv_assoc_id) {
+ if (peer.timestamp_logger != nullptr) {
+ // TODO(austin): Need to implement reliable sending of the delivery
+ // timestamps. Track what made it, and retry what didn't.
+ //
+ // This needs to be munged and cleaned up to match the timestamp
+ // standard.
+
+ aos::Sender<logger::MessageHeader>::Builder builder =
+ peer.timestamp_logger->MakeBuilder();
+
+ logger::MessageHeader::Builder message_header_builder =
+ builder.MakeBuilder<logger::MessageHeader>();
+
+ message_header_builder.add_channel_index(
+ message_header->channel_index());
+
+ message_header_builder.add_queue_index(
+ message_header->remote_queue_index());
+ message_header_builder.add_monotonic_sent_time(
+ message_header->monotonic_remote_time());
+ message_header_builder.add_realtime_sent_time(
+ message_header->realtime_remote_time());
+
+ // Swap the remote and sent metrics. They are from the sender's
+ // perspective, not the receiver's perspective.
+ message_header_builder.add_monotonic_remote_time(
+ message_header->monotonic_sent_time());
+ message_header_builder.add_realtime_remote_time(
+ message_header->realtime_sent_time());
+ message_header_builder.add_remote_queue_index(
+ message_header->queue_index());
+
+ builder.Send(message_header_builder.Finish());
+ }
+ break;
+ }
+ }
+
while (sent_messages_.size() > 0u) {
if (sent_messages_.begin()->message().monotonic_sent_time() ==
- message_header->monotonic_sent_time()) {
+ message_header->monotonic_sent_time() &&
+ sent_messages_.begin()->message().queue_index() ==
+ message_header->queue_index()) {
sent_messages_.pop_front();
continue;
}
@@ -124,11 +166,12 @@
// time out eventually. Need to sort that out.
}
-void ChannelState::AddPeer(const Connection *connection, int node_index,
- ServerConnection *server_connection_statistics,
- bool logged_remotely) {
+void ChannelState::AddPeer(
+ const Connection *connection, int node_index,
+ ServerConnection *server_connection_statistics, bool logged_remotely,
+ aos::Sender<logger::MessageHeader> *timestamp_logger) {
peers_.emplace_back(connection, node_index, server_connection_statistics,
- logged_remotely);
+ logged_remotely, timestamp_logger);
}
int ChannelState::NodeDisconnected(sctp_assoc_t assoc_id) {
@@ -168,6 +211,7 @@
CHECK(event_loop_->node() != nullptr) << ": No nodes configured.";
int32_t max_size = 0;
+ timestamp_loggers_.resize(event_loop->configuration()->nodes()->size());
// Seed up all the per-node connection state.
// We are making the assumption here that every connection is bidirectional
@@ -217,13 +261,30 @@
for (const Connection *connection : *channel->destination_nodes()) {
const Node *other_node = configuration::GetNode(
event_loop_->configuration(), connection->name()->string_view());
+ const size_t other_node_index = configuration::GetNodeIndex(
+ event_loop_->configuration(), other_node);
+
+ const bool delivery_time_is_logged =
+ configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ connection, event_loop_->node());
+
+ // Conditionally create the timestamp logger if we are supposed to log
+ // timestamps from it.
+ if (delivery_time_is_logged && !timestamp_loggers_[other_node_index]) {
+ timestamp_loggers_[other_node_index] =
+ event_loop_->MakeSender<logger::MessageHeader>(
+ absl::StrCat("/aos/remote_timestamps/",
+ connection->name()->string_view()));
+ }
state->AddPeer(
connection,
configuration::GetNodeIndex(event_loop_->configuration(),
connection->name()->string_view()),
server_status_.FindServerConnection(
connection->name()->string_view()),
- configuration::ChannelMessageIsLoggedOnNode(channel, other_node));
+ configuration::ChannelMessageIsLoggedOnNode(channel, other_node),
+ delivery_time_is_logged ? ×tamp_loggers_[other_node_index]
+ : nullptr);
}
// Don't subscribe to timestamps on the timestamp channel. Those get
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index b6a4f05..ad384f4 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -32,10 +32,12 @@
struct Peer {
Peer(const Connection *new_connection, int new_node_index,
ServerConnection *new_server_connection_statistics,
- bool new_logged_remotely)
+ bool new_logged_remotely,
+ aos::Sender<logger::MessageHeader> *new_timestamp_logger)
: connection(new_connection),
node_index(new_node_index),
server_connection_statistics(new_server_connection_statistics),
+ timestamp_logger(new_timestamp_logger),
logged_remotely(new_logged_remotely) {}
// Valid if != 0.
@@ -45,6 +47,7 @@
const aos::Connection *connection;
const int node_index;
ServerConnection *server_connection_statistics;
+ aos::Sender<logger::MessageHeader> *timestamp_logger = nullptr;
// If true, this message will be logged on a receiving node. We need to
// keep it around to log it locally if that fails.
@@ -60,7 +63,8 @@
// Adds a new peer.
void AddPeer(const Connection *connection, int node_index,
ServerConnection *server_connection_statistics,
- bool logged_remotely);
+ bool logged_remotely,
+ aos::Sender<logger::MessageHeader> *timestamp_logger);
// Returns true if this channel has the same name and type as the other
// channel.
@@ -111,6 +115,7 @@
// Event loop to schedule everything on.
aos::ShmEventLoop *event_loop_;
+ std::vector<aos::Sender<logger::MessageHeader>> timestamp_loggers_;
SctpServer server_;
MessageBridgeServerStatus server_status_;
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 8d52292..aa0a034 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -3,6 +3,7 @@
#include <chrono>
#include <thread>
+#include "absl/strings/str_cat.h"
#include "aos/events/ping_generated.h"
#include "aos/events/pong_generated.h"
#include "aos/network/message_bridge_client_lib.h"
@@ -10,11 +11,22 @@
#include "aos/network/team_number.h"
namespace aos {
+void SetShmBase(const std::string_view base);
+
namespace message_bridge {
namespace testing {
namespace chrono = std::chrono;
+void DoSetShmBase(const std::string_view node) {
+ const char *tmpdir_c_str = getenv("TEST_TMPDIR");
+ if (tmpdir_c_str != nullptr) {
+ aos::SetShmBase(absl::StrCat(tmpdir_c_str, "/", node));
+ } else {
+ aos::SetShmBase(absl::StrCat("/dev/shm/", node));
+ }
+}
+
// Test that we can send a ping message over sctp and receive it.
TEST(MessageBridgeTest, PingPong) {
// This is rather annoying to set up. We need to start up a client and
@@ -44,9 +56,11 @@
aos::configuration::ReadConfig(
"aos/network/message_bridge_test_client_config.json");
+ DoSetShmBase("pi1");
FLAGS_application_name = "pi1_message_bridge_server";
// Force ourselves to be "raspberrypi" and allocate everything.
FLAGS_override_hostname = "raspberrypi";
+
aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
@@ -60,9 +74,22 @@
aos::Sender<examples::Ping> ping_sender =
ping_event_loop.MakeSender<examples::Ping>("/test");
+ aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
+ aos::Fetcher<logger::MessageHeader> message_header_fetcher1 =
+ pi1_test_event_loop.MakeFetcher<logger::MessageHeader>(
+ "/pi1/aos/remote_timestamps/pi2");
+
+ // Fetchers for confirming the remote timestamps made it.
+ aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
+ ping_event_loop.MakeFetcher<examples::Ping>("/test");
+ aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
+ ping_event_loop.MakeFetcher<Timestamp>("/aos");
+
// Now do it for "raspberrypi2", the client.
FLAGS_application_name = "pi2_message_bridge_client";
FLAGS_override_hostname = "raspberrypi2";
+ DoSetShmBase("pi2");
+
aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
@@ -80,11 +107,24 @@
aos::Fetcher<ClientStatistics> client_statistics_fetcher =
test_event_loop.MakeFetcher<ClientStatistics>("/aos");
+ aos::Fetcher<logger::MessageHeader> message_header_fetcher2 =
+ test_event_loop.MakeFetcher<logger::MessageHeader>(
+ "/pi2/aos/remote_timestamps/pi1");
+
+ // Event loop for fetching data delivered to pi2 from pi1 to match up
+ // messages.
+ aos::ShmEventLoop delivered_messages_event_loop(&pi2_config.message());
+ aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
+ delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
+ aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
+ delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
+ EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
+ EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
// Count the pongs.
int pong_count = 0;
pong_event_loop.MakeWatcher(
- "/test2", [&pong_count](const examples::Ping &ping) {
+ "/test", [&pong_count](const examples::Ping &ping) {
++pong_count;
LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
});
@@ -191,11 +231,11 @@
ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp ×tamp) {
EXPECT_TRUE(timestamp.has_offsets());
- LOG(INFO) << FlatbufferToJson(×tamp);
+ LOG(INFO) << "/pi1/aos Timestamp " << FlatbufferToJson(×tamp);
});
pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp ×tamp) {
EXPECT_TRUE(timestamp.has_offsets());
- LOG(INFO) << FlatbufferToJson(×tamp);
+ LOG(INFO) << "/pi2/aos Timestamp " << FlatbufferToJson(×tamp);
});
// Run for 5 seconds to make sure we have time to estimate the offset.
@@ -206,6 +246,96 @@
quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
});
+ // Find the channel index for both the /pi1/aos Timestamp channel and Ping
+ // channel.
+ const size_t pi1_timestamp_channel = configuration::ChannelIndex(
+ pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
+ const size_t ping_timestamp_channel =
+ configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
+ ping_on_pi2_fetcher.channel());
+
+ for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
+ VLOG(1) << "Channel "
+ << configuration::ChannelIndex(ping_event_loop.configuration(),
+ channel)
+ << " " << configuration::CleanedChannelToString(channel);
+ }
+
+ // For each remote timestamp we get back, confirm that it is either a ping
+ // message, or a timestamp we sent out. Also confirm that the timestamps are
+ // correct.
+ ping_event_loop.MakeWatcher(
+ "/pi1/aos/remote_timestamps/pi2",
+ [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
+ &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
+ &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
+ VLOG(1) << aos::FlatbufferToJson(&header);
+
+ const aos::monotonic_clock::time_point header_monotonic_sent_time(
+ chrono::nanoseconds(header.monotonic_sent_time()));
+ const aos::realtime_clock::time_point header_realtime_sent_time(
+ chrono::nanoseconds(header.realtime_sent_time()));
+ const aos::monotonic_clock::time_point header_monotonic_remote_time(
+ chrono::nanoseconds(header.monotonic_remote_time()));
+ const aos::realtime_clock::time_point header_realtime_remote_time(
+ chrono::nanoseconds(header.realtime_remote_time()));
+
+ const Context *pi1_context = nullptr;
+ const Context *pi2_context = nullptr;
+
+ if (header.channel_index() == pi1_timestamp_channel) {
+ // Find the forwarded message.
+ while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
+ header_monotonic_sent_time) {
+ ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
+ }
+
+ // And the source message.
+ while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
+ header_monotonic_remote_time) {
+ ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
+ }
+
+ pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
+ pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
+ } else if (header.channel_index() == ping_timestamp_channel) {
+ // Find the forwarded message.
+ while (ping_on_pi2_fetcher.context().monotonic_event_time <
+ header_monotonic_sent_time) {
+ ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
+ }
+
+ // And the source message.
+ while (ping_on_pi1_fetcher.context().monotonic_event_time <
+ header_monotonic_remote_time) {
+ ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
+ }
+
+ pi1_context = &ping_on_pi1_fetcher.context();
+ pi2_context = &ping_on_pi2_fetcher.context();
+ } else {
+ LOG(FATAL) << "Unknown channel";
+ }
+
+ // Confirm the forwarded message has matching timestamps to the
+ // timestamps we got back.
+ EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi2_context->monotonic_event_time,
+ header_monotonic_sent_time);
+ EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
+ EXPECT_EQ(pi2_context->realtime_remote_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi2_context->monotonic_remote_time,
+ header_monotonic_remote_time);
+
+ // Confirm the forwarded message also matches the source message.
+ EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi1_context->monotonic_event_time,
+ header_monotonic_remote_time);
+ EXPECT_EQ(pi1_context->realtime_event_time,
+ header_realtime_remote_time);
+ });
+
// Start everything up. Pong is the only thing we don't know how to wait on,
// so start it first.
std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
@@ -261,6 +391,10 @@
EXPECT_GE(pi2_server_statistics_count, 2);
EXPECT_GE(pi1_client_statistics_count, 2);
EXPECT_GE(pi2_client_statistics_count, 2);
+
+ // Confirm we got timestamps back!
+ EXPECT_TRUE(message_header_fetcher1.Fetch());
+ EXPECT_TRUE(message_header_fetcher2.Fetch());
}
// Test that the client disconnecting triggers the server offsets on both sides
@@ -290,6 +424,7 @@
FLAGS_application_name = "pi1_message_bridge_server";
// Force ourselves to be "raspberrypi" and allocate everything.
FLAGS_override_hostname = "raspberrypi";
+ DoSetShmBase("pi1");
aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
@@ -305,6 +440,7 @@
// Now do it for "raspberrypi2", the client.
FLAGS_override_hostname = "raspberrypi2";
+ DoSetShmBase("pi2");
FLAGS_application_name = "pi2_message_bridge_server";
aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
@@ -496,6 +632,7 @@
FLAGS_application_name = "pi1_message_bridge_server";
// Force ourselves to be "raspberrypi" and allocate everything.
FLAGS_override_hostname = "raspberrypi";
+ DoSetShmBase("pi1");
aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
@@ -513,6 +650,7 @@
// Now do it for "raspberrypi2", the client.
FLAGS_override_hostname = "raspberrypi2";
+ DoSetShmBase("pi2");
FLAGS_application_name = "pi2_message_bridge_client";
aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index a8475b1..3ac2796 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -25,7 +25,9 @@
"destination_nodes": [
{
"name": "pi2",
- "priority": 1
+ "priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
}
]
},
@@ -38,25 +40,13 @@
"destination_nodes": [
{
"name": "pi1",
- "priority": 1
+ "priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
}
]
},
{
- "name": "/pi1_forwarded/aos",
- "type": "aos.message_bridge.Timestamp",
- "source_node": "pi2",
- "frequency": 10,
- "max_size": 200
- },
- {
- "name": "/pi2_forwarded/aos",
- "type": "aos.message_bridge.Timestamp",
- "source_node": "pi1",
- "frequency": 10,
- "max_size": 200
- },
- {
"name": "/pi1/aos",
"type": "aos.message_bridge.ServerStatistics",
"source_node": "pi1",
@@ -81,6 +71,18 @@
"frequency": 2
},
{
+ "name": "/pi1/aos/remote_timestamps/pi2",
+ "type": "aos.logger.MessageHeader",
+ "source_node": "pi1",
+ "frequency": 10
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi1",
+ "type": "aos.logger.MessageHeader",
+ "source_node": "pi2",
+ "frequency": 10
+ },
+ {
"name": "/pi1/aos",
"type": "aos.timing.Report",
"source_node": "pi1",
@@ -110,11 +112,6 @@
]
},
{
- "name": "/test2",
- "type": "aos.examples.Ping",
- "source_node": "pi2"
- },
- {
"name": "/test",
"type": "aos.examples.Pong",
"source_node": "pi2",
@@ -148,34 +145,6 @@
"rename": {
"name": "/pi2/aos"
}
- },
- {
- "match": {
- "name": "/test",
- "type": "aos.examples.Ping",
- "source_node": "pi2"
- },
- "rename": {
- "name": "/test2"
- }
- },
- {
- "match": {
- "name": "/pi1/aos*",
- "source_node": "pi2"
- },
- "rename": {
- "name": "/pi1_forwarded/aos"
- }
- },
- {
- "match": {
- "name": "/pi2/aos*",
- "source_node": "pi1"
- },
- "rename": {
- "name": "/pi2_forwarded/aos"
- }
}
]
}
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index ba8f9bd..116607c 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -49,7 +49,7 @@
VLOG(1) << " " << this << " Sample at " << monotonic_now << " is "
<< sample_ns.count() << "ns, Base is " << base_offset_.count();
CHECK_GE(monotonic_now, last_time_)
- << ": Being asked to filter backwards in time!";
+ << ": " << this << " Being asked to filter backwards in time!";
// Compute the sample offset as a double (seconds), taking into account the
// base offset.
const double sample =
@@ -380,7 +380,7 @@
const double hard_max = fwd_.offset();
const double hard_min = -rev_.offset();
const double average = (hard_max + hard_min) / 2.0;
- VLOG(1) << " Max(fwd) " << hard_max << " min(rev) " << hard_min;
+ VLOG(1) << this << " Max(fwd) " << hard_max << " min(rev) " << hard_min;
// We don't want to clip the offset to the hard min/max. We really want to
// keep it within a band around the middle. ratio of 0.3 means stay within
// +- 0.15 of the middle of the hard min and max.
@@ -390,7 +390,7 @@
// Update regardless for the first sample from both the min and max.
if (*last_time == aos::monotonic_clock::min_time) {
- VLOG(1) << " No last time " << average;
+ VLOG(1) << this << " No last time " << average;
offset_ = average;
offset_velocity_ = 0.0;
} else {
@@ -415,7 +415,7 @@
(offset_velocity_ -
(fwd_.filtered_velocity() - rev_.filtered_velocity()) / 2.0);
- VLOG(1) << " last time " << offset_;
+ VLOG(1) << this << " last time " << offset_;
}
*last_time = monotonic_now;
@@ -424,14 +424,14 @@
// reverse samples.
if (!MissingSamples()) {
*sample_pointer_ = offset_;
- VLOG(1) << "Updating sample to " << offset_;
+ VLOG(1) << this << " Updating sample to " << offset_;
} else {
- VLOG(1) << "Don't have both samples.";
+ VLOG(1) << this << " Don't have both samples.";
if (last_fwd_time_ == aos::monotonic_clock::min_time) {
- VLOG(1) << " Missing forward";
+ VLOG(1) << this << " Missing forward";
}
if (last_rev_time_ == aos::monotonic_clock::min_time) {
- VLOG(1) << " Missing reverse";
+ VLOG(1) << this << " Missing reverse";
}
}
}
@@ -657,6 +657,8 @@
void NoncausalOffsetEstimator::Sample(
const Node *node, aos::monotonic_clock::time_point node_delivered_time,
aos::monotonic_clock::time_point other_node_sent_time) {
+ VLOG(1) << "Sample delivered " << node_delivered_time << " sent "
+ << other_node_sent_time << " to " << node->name()->string_view();
if (node == node_a_) {
if (a_.Sample(node_delivered_time,
other_node_sent_time - node_delivered_time)) {
@@ -740,11 +742,16 @@
}
fit_ = AverageFits(a_.FitLine(), b_.FitLine());
if (offset_pointer_) {
+ VLOG(1) << "Setting offset to " << fit_.mpq_offset();
*offset_pointer_ = fit_.mpq_offset();
}
if (slope_pointer_) {
+ VLOG(1) << "Setting slope to " << fit_.mpq_slope();
*slope_pointer_ = -fit_.mpq_slope();
}
+ if (valid_pointer_) {
+ *valid_pointer_ = true;
+ }
if (VLOG_IS_ON(1)) {
LogFit("Refitting to");
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 6994a8a..10f436e 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -408,6 +408,9 @@
void set_slope_pointer(mpq_class *slope_pointer) {
slope_pointer_ = slope_pointer;
}
+ void set_valid_pointer(bool *valid_pointer) {
+ valid_pointer_ = valid_pointer;
+ }
// Returns the data points from each filter.
const std::deque<
@@ -441,6 +444,7 @@
mpq_class *offset_pointer_ = nullptr;
mpq_class *slope_pointer_ = nullptr;
+ bool *valid_pointer_ = nullptr;
Line fit_{std::chrono::nanoseconds(0), 0.0};
diff --git a/aos/realtime.cc b/aos/realtime.cc
index ff05f32..a0af97a 100644
--- a/aos/realtime.cc
+++ b/aos/realtime.cc
@@ -74,10 +74,12 @@
WriteCoreDumps();
PCHECK(mlockall(MCL_CURRENT | MCL_FUTURE) == 0);
+#if !__has_feature(address_sanitizer)
// Don't give freed memory back to the OS.
CHECK_EQ(1, mallopt(M_TRIM_THRESHOLD, -1));
// Don't use mmap for large malloc chunks.
CHECK_EQ(1, mallopt(M_MMAP_MAX, 0));
+#endif
if (&FLAGS_tcmalloc_release_rate) {
// Tell tcmalloc not to return memory.
diff --git a/aos/seasocks/gen_embedded.bzl b/aos/seasocks/gen_embedded.bzl
index 93cdc39..5bdb80d 100644
--- a/aos/seasocks/gen_embedded.bzl
+++ b/aos/seasocks/gen_embedded.bzl
@@ -16,7 +16,7 @@
attrs = {
"srcs": attr.label_list(
mandatory = True,
- non_empty = True,
+ allow_empty = False,
allow_files = True,
),
"_gen_embedded": attr.label(
diff --git a/debian/m4.BUILD b/debian/m4.BUILD
index 0abafda..50f83cd 100644
--- a/debian/m4.BUILD
+++ b/debian/m4.BUILD
@@ -3,3 +3,9 @@
srcs = ["usr/bin/m4"],
visibility = ["//visibility:public"],
)
+
+filegroup(
+ name = "lib",
+ srcs = glob(["usr/lib/**"]),
+ visibility = ["//visibility:public"],
+)
diff --git a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
index 3c206b3..1f362f3 100644
--- a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
+++ b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
@@ -63,10 +63,8 @@
if (!FLAGS_output_file.empty()) {
unlink(FLAGS_output_file.c_str());
- log_buffer_writer_ = std::make_unique<aos::logger::DetachedBufferWriter>(
- FLAGS_output_file);
logger_event_loop_ = MakeEventLoop("logger");
- logger_ = std::make_unique<aos::logger::Logger>(log_buffer_writer_.get(),
+ logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
logger_event_loop_.get());
}
diff --git a/third_party/gmp/BUILD b/third_party/gmp/BUILD
index e4f7029..06cfa51 100644
--- a/third_party/gmp/BUILD
+++ b/third_party/gmp/BUILD
@@ -4,6 +4,7 @@
":mpn.bzl",
"architecture_includes",
"config_include_from_architecture",
+ "current_directory",
"file_from_architecture",
"mparam_path",
"mpn_cc_library",
@@ -66,10 +67,10 @@
"-Wno-unused-parameter",
"-Wno-missing-field-initializers",
"-DHAVE_CONFIG_H",
- "-Ithird_party/gmp",
- "-I$(GENDIR)/third_party/gmp",
- "-Ithird_party/gmp/mpn",
- "-I$(GENDIR)/third_party/gmp/mpn",
+ "-I" + current_directory(),
+ "-I$(GENDIR)/" + current_directory(),
+ "-I" + current_directory() + "/mpn",
+ "-I$(GENDIR)/" + current_directory() + "/mpn",
"-Wno-cast-align",
"-Wno-dangling-else",
"-Wno-cast-qual",
@@ -926,17 +927,20 @@
genrule(
name = "call_m4",
- srcs = native.glob([
+ srcs = glob([
"**/*.m4",
"**/*.asm",
]) + ["config.m4"],
outs = ["tests/call.s"],
- cmd = "cd third_party/gmp/tests; ../../../$(location @m4_v1.4.18//:bin) -I ../../../$(GENDIR)/third_party/gmp/tests -DHAVE_CONFIG_H -DPIC " + cpu_select({
+ cmd = "ROOT=$$(pwd); cd ./" + current_directory() + "/tests; LD_LIBRARY_PATH=$${ROOT}/external/m4_v1.4.18/usr/lib/x86_64-linux-gnu/ $${ROOT}/$(location @m4_v1.4.18//:bin) -I $${ROOT}/$(GENDIR)/" + current_directory() + "/tests -DHAVE_CONFIG_H -DPIC " + cpu_select({
"arm": "arm32call.asm",
"amd64": "amd64call.asm",
}) +
- " > ../../../$@",
- tools = ["@m4_v1.4.18//:bin"],
+ " > $${ROOT}/$@",
+ tools = [
+ "@m4_v1.4.18//:bin",
+ "@m4_v1.4.18//:lib",
+ ],
)
cc_library(
diff --git a/third_party/gmp/mpn.bzl b/third_party/gmp/mpn.bzl
index 8f50773..039725f 100644
--- a/third_party/gmp/mpn.bzl
+++ b/third_party/gmp/mpn.bzl
@@ -48,6 +48,9 @@
"sec_pi1_div_r": ["sec_pi1_div"],
}
+def current_directory():
+ return native.package_name()
+
def mpn_cc_library(
name,
srcs,
@@ -60,9 +63,9 @@
hdrs = hdrs,
copts = copts + [
"-DHAVE_CONFIG_H",
- "-Ithird_party/gmp/mpn",
- "-Ithird_party/gmp",
- "-I$(GENDIR)/third_party/gmp",
+ "-I" + current_directory() + "/mpn",
+ "-I" + current_directory(),
+ "-I$(GENDIR)/" + current_directory(),
"-D__GMP_WITHIN_GMP",
"-DOPERATION_" + name,
],
@@ -87,17 +90,19 @@
out = ctx.actions.declare_file("mpn/" + ctx.attr.operation + ".s")
+ ruledir = ctx.label.workspace_root + "/" + ctx.label.package
ctx.actions.run_shell(
inputs = [ctx.files.files[0]] + ctx.files.deps,
outputs = [out],
progress_message = "Generating " + out.short_path,
- tools = [ctx.executable._m4],
- command = "&&".join([
- "cd third_party/gmp/mpn",
- "echo '#define OPERATION_" + ctx.attr.operation + " 1' > ../../../" + out.path,
- "../../../" + ctx.executable._m4.path + " -I ../../../" + ctx.var["GENDIR"] + "/third_party/gmp/mpn/ " +
+ tools = [ctx.executable._m4] + ctx.attr._m4_lib.files.to_list(),
+ command = " && ".join([
+ "ROOT=$(pwd)",
+ "cd ./" + ruledir + "/mpn",
+ "echo '#define OPERATION_" + ctx.attr.operation + " 1' > ${ROOT}/" + out.path,
+ "LD_LIBRARY_PATH=${ROOT}/external/m4_v1.4.18/usr/lib/x86_64-linux-gnu/ ${ROOT}/" + ctx.executable._m4.path + " -I ${ROOT}/" + ctx.var["GENDIR"] + "/" + ruledir + "/mpn" +
" -DHAVE_CONFIG_H -D__GMP_WITHIN_GMP -DOPERATION_" + ctx.attr.operation +
- " -DPIC ../../../" + ctx.files.files[0].path + " >> ../../../" + out.path,
+ " -DPIC ${ROOT}/" + ctx.files.files[0].path + " >> ${ROOT}/" + out.path,
]),
)
@@ -120,6 +125,10 @@
cfg = "host",
executable = True,
),
+ "_m4_lib": attr.label(
+ default = "@m4_v1.4.18//:lib",
+ cfg = "host",
+ ),
},
implementation = _m4_mpn_function_impl,
)
@@ -140,7 +149,7 @@
def architecture_includes(architecture_paths):
result = dict()
for key in architecture_paths:
- result[key] = ["-Ithird_party/gmp/mpn/" + p for p in architecture_paths[key]]
+ result[key] = ["-I" + current_directory() + "/mpn/" + p for p in architecture_paths[key]]
return select(result)
def file_from_architecture(architecture_paths, f):
@@ -152,7 +161,7 @@
def config_include_from_architecture(architecture_paths):
result = dict()
for key in architecture_paths:
- result[key] = ["-Ithird_party/gmp/config/" + architecture_paths[key][0] + "/"]
+ result[key] = ["-I" + current_directory() + "/config/" + architecture_paths[key][0] + "/"]
return select(result)
def mpn_m4_cc_library(name, architecture_paths):
diff --git a/tools/build_rules/BUILD b/tools/build_rules/BUILD
index f96fb3a..8ad7018 100644
--- a/tools/build_rules/BUILD
+++ b/tools/build_rules/BUILD
@@ -3,3 +3,10 @@
srcs = ["quiet_success.sh"],
visibility = ["//visibility:public"],
)
+
+py_binary(
+ name = "jinja2_generator",
+ srcs = ["jinja2_generator.py"],
+ visibility = ["//visibility:public"],
+ deps = ["@python_jinja2"],
+)
diff --git a/y2020/generate_pi_config.py b/tools/build_rules/jinja2_generator.py
similarity index 96%
rename from y2020/generate_pi_config.py
rename to tools/build_rules/jinja2_generator.py
index afd1fca..34f3354 100644
--- a/y2020/generate_pi_config.py
+++ b/tools/build_rules/jinja2_generator.py
@@ -1,7 +1,6 @@
#!/usr/bin/python3
import argparse
-from pathlib import Path
import json
import sys
diff --git a/tools/build_rules/template.bzl b/tools/build_rules/template.bzl
new file mode 100644
index 0000000..d4807e8
--- /dev/null
+++ b/tools/build_rules/template.bzl
@@ -0,0 +1,33 @@
+def _jinja2_template_impl(ctx):
+ out = ctx.actions.declare_file(ctx.attr.name)
+
+ ctx.actions.run_shell(
+ inputs = ctx.files.src,
+ tools = [ctx.executable._jinja2],
+ progress_message = "Generating " + out.short_path,
+ outputs = [out],
+ command = ctx.executable._jinja2.path + " " + ctx.files.src[0].path + " '" + str(ctx.attr.parameters) + "' " + out.path,
+ )
+
+ return [DefaultInfo(files = depset([out])), OutputGroupInfo(out = depset([out]))]
+
+jinja2_template = rule(
+ attrs = {
+ "src": attr.label(
+ mandatory = True,
+ allow_single_file = True,
+ doc = """The jinja2 template file to expand.""",
+ ),
+ "parameters": attr.string_dict(
+ mandatory = True,
+ doc = """The parameters to supply to Jinja2.""",
+ ),
+ "_jinja2": attr.label(
+ default = "//tools/build_rules:jinja2_generator",
+ cfg = "host",
+ executable = True,
+ ),
+ },
+ implementation = _jinja2_template_impl,
+ doc = """Expands a jinja2 template given parameters.""",
+)
diff --git a/y2019/control_loops/drivetrain/drivetrain_replay.cc b/y2019/control_loops/drivetrain/drivetrain_replay.cc
index 3689970..f7b69e4 100644
--- a/y2019/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2019/control_loops/drivetrain/drivetrain_replay.cc
@@ -15,7 +15,7 @@
"Name of the logfile to read from.");
DEFINE_string(config, "y2019/config.json",
"Name of the config file to replay using.");
-DEFINE_string(output_file, "/tmp/replayed.bfbs",
+DEFINE_string(output_file, "/tmp/replayed",
"Name of the logfile to write replayed data to.");
DEFINE_int32(team, 971, "Team number to use for logfile replay.");
int main(int argc, char **argv) {
@@ -37,13 +37,12 @@
"frc971.control_loops.drivetrain.Output");
reader.Register();
- aos::logger::DetachedBufferWriter file_writer(FLAGS_output_file);
std::unique_ptr<aos::EventLoop> log_writer_event_loop =
reader.event_loop_factory()->MakeEventLoop("log_writer");
log_writer_event_loop->SkipTimingReport();
log_writer_event_loop->SkipAosLog();
CHECK(nullptr == log_writer_event_loop->node());
- aos::logger::Logger writer(&file_writer, log_writer_event_loop.get());
+ aos::logger::Logger writer(FLAGS_output_file, log_writer_event_loop.get());
std::unique_ptr<aos::EventLoop> drivetrain_event_loop =
reader.event_loop_factory()->MakeEventLoop("drivetrain");
diff --git a/y2020/BUILD b/y2020/BUILD
index 2d3f0e5..38832b8 100644
--- a/y2020/BUILD
+++ b/y2020/BUILD
@@ -2,7 +2,7 @@
load("//aos:config.bzl", "aos_config")
load("@com_google_protobuf//:protobuf.bzl", "cc_proto_library")
load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
-load("//y2020:config_gen.bzl", "generate_pi_config")
+load("//tools/build_rules:template.bzl", "jinja2_template")
robot_downloader(
binaries = [
@@ -168,6 +168,7 @@
"//y2020/vision/sift:sift_fbs",
"//y2020/vision/sift:sift_training_fbs",
"//y2020/vision:vision_fbs",
+ "//aos/events/logging:logger_fbs",
],
visibility = ["//visibility:public"],
deps = [
@@ -181,11 +182,30 @@
"pi2",
"pi3",
"pi4",
- "laptop",
]
]
aos_config(
+ name = "config_laptop",
+ src = "y2020_laptop.json",
+ flatbuffers = [
+ "//aos/network:message_bridge_client_fbs",
+ "//aos/network:message_bridge_server_fbs",
+ "//aos/network:timestamp_fbs",
+ "//y2020/vision/sift:sift_fbs",
+ "//y2020/vision/sift:sift_training_fbs",
+ "//y2020/vision:vision_fbs",
+ "//aos/events/logging:logger_fbs",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//aos/events:config",
+ "//aos/robot_state:config",
+ "//frc971/control_loops/drivetrain:config",
+ ],
+)
+
+aos_config(
name = "config_roborio",
src = "y2020_roborio.json",
flatbuffers = [
@@ -247,10 +267,11 @@
],
)
-py_binary(
- name = "generate_pi_config",
- srcs = ["generate_pi_config.py"],
- deps = ["@python_jinja2"],
-)
-
-[generate_pi_config(num) for num in range(1, 5)]
+[
+ jinja2_template(
+ name = "y2020_pi" + str(num) + ".json",
+ src = "y2020_pi_template.json",
+ parameters = {"NUM": str(num)},
+ )
+ for num in range(1, 5)
+]
diff --git a/y2020/control_loops/drivetrain/drivetrain_replay.cc b/y2020/control_loops/drivetrain/drivetrain_replay.cc
index 5a97943..0dd5a1a 100644
--- a/y2020/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2020/control_loops/drivetrain/drivetrain_replay.cc
@@ -46,11 +46,10 @@
node = aos::configuration::GetNode(reader.configuration(), "roborio");
}
- aos::logger::DetachedBufferWriter file_writer(FLAGS_output_file);
std::unique_ptr<aos::EventLoop> log_writer_event_loop =
reader.event_loop_factory()->MakeEventLoop("log_writer", node);
log_writer_event_loop->SkipTimingReport();
- aos::logger::Logger writer(&file_writer, log_writer_event_loop.get());
+ aos::logger::Logger writer(FLAGS_output_file, log_writer_event_loop.get());
std::unique_ptr<aos::EventLoop> drivetrain_event_loop =
reader.event_loop_factory()->MakeEventLoop("drivetrain", node);
diff --git a/y2020/control_loops/drivetrain/localizer_test.cc b/y2020/control_loops/drivetrain/localizer_test.cc
index a7360c1..ae9f887 100644
--- a/y2020/control_loops/drivetrain/localizer_test.cc
+++ b/y2020/control_loops/drivetrain/localizer_test.cc
@@ -135,10 +135,8 @@
set_battery_voltage(12.0);
if (!FLAGS_output_file.empty()) {
- log_buffer_writer_ = std::make_unique<aos::logger::DetachedBufferWriter>(
- FLAGS_output_file);
logger_event_loop_ = MakeEventLoop("logger", roborio_);
- logger_ = std::make_unique<aos::logger::Logger>(log_buffer_writer_.get(),
+ logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
logger_event_loop_.get());
}
@@ -399,7 +397,6 @@
double turret_velocity_ = 0.0; // rad / sec
std::unique_ptr<aos::EventLoop> logger_event_loop_;
- std::unique_ptr<aos::logger::DetachedBufferWriter> log_buffer_writer_;
std::unique_ptr<aos::logger::Logger> logger_;
};
diff --git a/y2020/control_loops/superstructure/superstructure_lib_test.cc b/y2020/control_loops/superstructure/superstructure_lib_test.cc
index 1d42d23..8a0589c 100644
--- a/y2020/control_loops/superstructure/superstructure_lib_test.cc
+++ b/y2020/control_loops/superstructure/superstructure_lib_test.cc
@@ -439,10 +439,8 @@
if (!FLAGS_output_file.empty()) {
unlink(FLAGS_output_file.c_str());
- log_buffer_writer_ = std::make_unique<aos::logger::DetachedBufferWriter>(
- FLAGS_output_file);
logger_event_loop_ = MakeEventLoop("logger", roborio_);
- logger_ = std::make_unique<aos::logger::Logger>(log_buffer_writer_.get(),
+ logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
logger_event_loop_.get());
}
}
@@ -550,7 +548,6 @@
SuperstructureSimulation superstructure_plant_;
std::unique_ptr<aos::EventLoop> logger_event_loop_;
- std::unique_ptr<aos::logger::DetachedBufferWriter> log_buffer_writer_;
std::unique_ptr<aos::logger::Logger> logger_;
};
diff --git a/y2020/y2020_laptop.json b/y2020/y2020_laptop.json
index 44c4705..0797110 100644
--- a/y2020/y2020_laptop.json
+++ b/y2020/y2020_laptop.json
@@ -175,6 +175,51 @@
]
},
{
+ "name": "/laptop/aos/remote_timestamps/roborio",
+ "type": "aos.logger.MessageHeader",
+ "source_node": "laptop",
+ "logger": "NOT_LOGGED",
+ "frequency": 20,
+ "num_senders": 2,
+ "max_size": 300
+ },
+ {
+ "name": "/laptop/aos/remote_timestamps/pi1",
+ "type": "aos.logger.MessageHeader",
+ "source_node": "laptop",
+ "logger": "NOT_LOGGED",
+ "frequency": 20,
+ "num_senders": 2,
+ "max_size": 300
+ },
+ {
+ "name": "/laptop/aos/remote_timestamps/pi2",
+ "type": "aos.logger.MessageHeader",
+ "source_node": "laptop",
+ "logger": "NOT_LOGGED",
+ "frequency": 20,
+ "num_senders": 2,
+ "max_size": 300
+ },
+ {
+ "name": "/laptop/aos/remote_timestamps/pi3",
+ "type": "aos.logger.MessageHeader",
+ "source_node": "laptop",
+ "logger": "NOT_LOGGED",
+ "frequency": 20,
+ "num_senders": 2,
+ "max_size": 300
+ },
+ {
+ "name": "/laptop/aos/remote_timestamps/pi4",
+ "type": "aos.logger.MessageHeader",
+ "source_node": "laptop",
+ "logger": "NOT_LOGGED",
+ "frequency": 20,
+ "num_senders": 2,
+ "max_size": 300
+ },
+ {
"name": "/pi1/camera",
"type": "frc971.vision.CameraImage",
"source_node": "pi1",