Fix log sorting for good
Unfortunately, this is really hard to split up, so a bunch of stuff
happens at once.
When a message is sent from node A -> node B, add support for sending
that timestamp back from node B to node A for logging.
{
"name": "/pi1/aos",
"type": "aos.message_bridge.Timestamp",
"source_node": "pi1",
"frequency": 10,
"max_size": 200,
"destination_nodes": [
{
"name": "pi2",
"priority": 1,
"timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
"timestamp_logger_nodes": ["pi1"]
}
]
},
This gives us a way to log enough information on node A such that
everything is self contained. We know all the messages we sent to B,
and when they got there, so we can recreate the time offset and replay
the node.
This data is then published over
{ "name": "/aos/remote_timestamps/pi2", "type": ".aos.logger.MessageHeader"}
The logger then treats that channel specially and log the contents
directly as though the message contents were received on the remote
node.
This (among other things) exposes log sorting problems. Use our fancy
new infinite precision noncausal filter to estimate time precise enough
to actually order events. This gets us down to 2-3 ns of error due to
integer precision.
Change-Id: Ia843c5176a2c4efc227e669c07d7bb4c7cbe7c91
diff --git a/aos/configuration.h b/aos/configuration.h
index 4ada459..fbd9cff 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -57,6 +57,14 @@
const Node *node) {
return GetChannel(&config.message(), name, type, application_name, node);
}
+template <typename T>
+inline const Channel *GetChannel(const Configuration *config,
+ const std::string_view name,
+ const std::string_view application_name,
+ const Node *node) {
+ return GetChannel(config, name, T::GetFullyQualifiedName(), application_name,
+ node);
+}
// Convenience wrapper for getting a channel from a specified config if you
// already have the name/type in a Channel object--this is useful if you Channel
// object you have does not point to memory within config.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 5346647..2a032e5 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -143,6 +143,7 @@
":ping_fbs",
":pong_fbs",
"//aos/network:message_bridge_client_fbs",
+ "//aos/events/logging:logger_fbs",
"//aos/network:timestamp_fbs",
"//aos/network:message_bridge_server_fbs",
],
@@ -306,6 +307,7 @@
":aos_logging",
":event_loop",
":simple_channel",
+ "//aos/events/logging:logger_fbs",
"//aos/ipc_lib:index",
"//aos/network:message_bridge_client_status",
"//aos/network:message_bridge_server_status",
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index c922f2e..de397e2 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -527,7 +527,10 @@
CHECK(channel != nullptr)
<< ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
<< T::GetFullyQualifiedName() << "\" } not found in config for "
- << name() << ".";
+ << name()
+ << (configuration::MultiNode(configuration_)
+ ? absl::StrCat(" on node ", node()->name()->string_view())
+ : ".");
if (!configuration::ChannelIsSendableOnNode(channel, node())) {
LOG(FATAL) << "Channel { \"name\": \"" << channel_name
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index 202c772..f5d6c92 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -94,9 +94,12 @@
// We get to pick our tradeoffs here. Either we assume that there are no
// backward step changes in our time function for each node, or we have to
- // let time go backwards. This backwards time jump should be small, so we
- // can check for it and bound it.
- CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::milliseconds(100))
+ // let time go backwards. We currently only really see this happen when 2
+ // events are scheduled for "now", time changes, and there is a nanosecond
+ // or two of rounding due to integer math.
+ //
+ // //aos/events/logging:logger_test triggers this.
+ CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
<< ": Simulated time went backwards by too much. Please investigate.";
now_ = std::get<0>(oldest_event);
@@ -120,9 +123,12 @@
// We get to pick our tradeoffs here. Either we assume that there are no
// backward step changes in our time function for each node, or we have to
- // let time go backwards. This backwards time jump should be small, so we
- // can check for it and bound it.
- CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::milliseconds(100))
+ // let time go backwards. We currently only really see this happen when 2
+ // events are scheduled for "now", time changes, and there is a nanosecond
+ // or two of rounding due to integer math.
+ //
+ // //aos/events/logging:logger_test triggers this.
+ CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
<< ": Simulated time went backwards by too much. Please investigate.";
now_ = std::get<0>(oldest_event);
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 468d904..c067048 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -196,9 +196,16 @@
inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
// Make sure we stay in sync.
if (monotonic_now_valid_) {
+ // We want time to be smooth, so confirm that it doesn't change too much
+ // while handling an event.
+ //
+ // There are 2 sources of error. There are numerical precision and interger
+ // rounding problems going from the monotonic clock to the distributed clock
+ // and back again. When we update the time function as well to transition
+ // line segments, we have a slight jump as well.
CHECK_NEAR(monotonic_now_,
FromDistributedClock(scheduler_scheduler_->distributed_now()),
- std::chrono::nanoseconds(1));
+ std::chrono::nanoseconds(2));
return monotonic_now_;
} else {
return FromDistributedClock(scheduler_scheduler_->distributed_now());
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index c3af9e2..b4a9b00 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -40,6 +40,7 @@
"logger_math.cc",
],
hdrs = [
+ "eigen_mpq.h",
"logger.h",
],
visibility = ["//visibility:public"],
@@ -48,10 +49,13 @@
":logger_fbs",
"//aos/events:event_loop",
"//aos/events:simulated_event_loop",
+ "//aos/network:message_bridge_server_fbs",
"//aos/network:team_number",
"//aos/network:timestamp_filter",
"//aos/time",
+ "//third_party/gmp",
"@com_github_google_flatbuffers//:flatbuffers",
+ "@com_google_absl//absl/strings",
"@com_google_absl//absl/types:span",
"@org_tuxfamily_eigen//:eigen",
],
@@ -129,6 +133,7 @@
name = "multinode_pingpong_config",
src = "multinode_pingpong.json",
flatbuffers = [
+ ":logger_fbs",
"//aos/events:ping_fbs",
"//aos/events:pong_fbs",
"//aos/network:message_bridge_client_fbs",
diff --git a/aos/events/logging/eigen_mpq.h b/aos/events/logging/eigen_mpq.h
new file mode 100644
index 0000000..1463648
--- /dev/null
+++ b/aos/events/logging/eigen_mpq.h
@@ -0,0 +1,35 @@
+#ifndef AOS_EVENTS_LOGGING_EIGEN_MPQ_H_
+#define AOS_EVENTS_LOGGING_EIGEN_MPQ_H_
+
+#include "Eigen/Dense"
+#include "third_party/gmp/gmpxx.h"
+
+namespace Eigen {
+
+// TypeTraits for mpq_class. This is only really enough to use inverse().
+template <>
+struct NumTraits<mpq_class>
+ : GenericNumTraits<mpq_class> {
+ typedef mpq_class Real;
+ typedef mpq_class Literal;
+ typedef mpq_class NonInteger;
+ typedef mpq_class Nested;
+
+ enum {
+ IsComplex = 0,
+ IsInteger = 0,
+ IsSigned = 1,
+ RequireInitialization = 1,
+ ReadCost = 1,
+ AddCost = 3,
+ MulCost = 9
+ };
+
+ static inline Real dummy_precision() { return mpq_class(0); }
+ static inline Real epsilon() { return mpq_class(0); }
+ static inline int digits10() { return 0; }
+};
+
+} // namespace Eigen
+
+#endif // AOS_EVENTS_LOGGING_EIGEN_MPQ_H_
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index b9d53ff..4ef9237 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -78,6 +78,11 @@
LOG(FATAL) << "Expected 1 logfile as an argument.";
}
aos::logger::MessageReader reader(argv[1]);
+ std::cout << aos::FlatbufferToJson(reader.log_file_header(),
+ {.multi_line = FLAGS_pretty,
+ .max_vector_size = static_cast<size_t>(
+ FLAGS_max_vector_size)})
+ << std::endl;
while (true) {
std::optional<aos::FlatbufferVector<aos::logger::MessageHeader>> message =
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index ade82f9..05ee4e0 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -37,6 +37,24 @@
DetachedBufferWriter::~DetachedBufferWriter() {
Flush();
PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
+ VLOG(1) << "Closed " << filename_;
+}
+
+DetachedBufferWriter::DetachedBufferWriter(
+ DetachedBufferWriter &&other) {
+ *this = std::move(other);
+}
+
+DetachedBufferWriter &DetachedBufferWriter::operator=(
+ DetachedBufferWriter &&other) {
+ Flush();
+ std::swap(filename_, other.filename_);
+ std::swap(fd_, other.fd_);
+ std::swap(queued_size_, other.queued_size_);
+ std::swap(written_size_, other.written_size_);
+ std::swap(queue_, other.queue_);
+ std::swap(iovec_, other.iovec_);
+ return *this;
}
void DetachedBufferWriter::QueueSizedFlatbuffer(
@@ -290,7 +308,7 @@
FlatbufferVector<LogFileHeader>(std::move(header_data_copy));
max_out_of_order_duration_ =
- std::chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
+ chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
VLOG(1) << "Opened " << filename << " as node "
<< FlatbufferToJson(log_file_header()->node());
@@ -323,6 +341,49 @@
// open more of them).
log_file_header_ = message_reader_->raw_log_file_header();
+ for (size_t i = 1; i < filenames_.size(); ++i) {
+ MessageReader message_reader(filenames_[i]);
+
+ const monotonic_clock::time_point new_monotonic_start_time(
+ chrono::nanoseconds(
+ message_reader.log_file_header()->monotonic_start_time()));
+ const realtime_clock::time_point new_realtime_start_time(
+ chrono::nanoseconds(
+ message_reader.log_file_header()->realtime_start_time()));
+
+ // There are 2 types of part files. Part files from before time estimation
+ // has started, and part files after. We don't declare a log file "started"
+ // until time estimation is up. And once a log file starts, it should never
+ // stop again, and should remain constant.
+ // To compare both types of headers, we mutate our saved copy of the header
+ // to match the next chunk by updating time if we detect a stopped ->
+ // started transition.
+ if (monotonic_start_time() == monotonic_clock::min_time) {
+ CHECK_EQ(realtime_start_time(), realtime_clock::min_time);
+ // We should only be missing the monotonic start time when logging data
+ // for remote nodes. We don't have a good way to deteremine the remote
+ // realtime offset, so it shouldn't be filled out.
+ // TODO(austin): If we have a good way, feel free to fill it out. It
+ // probably won't be better than we could do in post though with the same
+ // data.
+ CHECK(!log_file_header_.mutable_message()->has_realtime_start_time());
+ if (new_monotonic_start_time != monotonic_clock::min_time) {
+ // If we finally found our start time, update the header. Do this once
+ // because it should never change again.
+ log_file_header_.mutable_message()->mutate_monotonic_start_time(
+ new_monotonic_start_time.time_since_epoch().count());
+ log_file_header_.mutable_message()->mutate_realtime_start_time(
+ new_realtime_start_time.time_since_epoch().count());
+ }
+ }
+
+ // Now compare that the headers match.
+ CHECK(CompareFlatBuffer(message_reader.raw_log_file_header(),
+ log_file_header_))
+ << ": Header is different between log file chunks " << filenames_[0]
+ << " and " << filenames_[i] << ", this is not supported.";
+ }
+
// Setup per channel state.
channels_.resize(configuration()->channels()->size());
for (ChannelData &channel_data : channels_) {
@@ -545,10 +606,13 @@
timestamp = channels_[channel_index].data.front_timestamp();
FlatbufferVector<MessageHeader> front =
std::move(channels_[channel_index].data.front());
- channels_[channel_index].data.pop_front();
+ channels_[channel_index].data.PopFront();
- VLOG(1) << MaybeNodeName(target_node_) << "Popped " << this << " "
- << std::get<0>(timestamp) << " for " << channel_index;
+ VLOG(1) << MaybeNodeName(target_node_) << "Popped Data " << this << " "
+ << std::get<0>(timestamp) << " for "
+ << configuration::StrippedChannelToString(
+ configuration()->channels()->Get(channel_index))
+ << " (" << channel_index << ")";
QueueMessages(std::get<0>(timestamp));
@@ -558,19 +622,21 @@
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
-SplitMessageReader::PopOldest(int channel, int node_index) {
+SplitMessageReader::PopOldestTimestamp(int channel, int node_index) {
CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
timestamp = channels_[channel].timestamps[node_index].front_timestamp();
FlatbufferVector<MessageHeader> front =
std::move(channels_[channel].timestamps[node_index].front());
- channels_[channel].timestamps[node_index].pop_front();
+ channels_[channel].timestamps[node_index].PopFront();
- VLOG(1) << MaybeNodeName(target_node_) << "Popped " << this << " "
+ VLOG(1) << MaybeNodeName(target_node_) << "Popped timestamp " << this << " "
<< std::get<0>(timestamp) << " for "
<< configuration::StrippedChannelToString(
configuration()->channels()->Get(channel))
- << " on " << node_index;
+ << " on "
+ << configuration()->nodes()->Get(node_index)->name()->string_view()
+ << " (" << node_index << ")";
QueueMessages(std::get<0>(timestamp));
@@ -607,7 +673,7 @@
return true;
}
-void SplitMessageReader::MessageHeaderQueue::pop_front() {
+void SplitMessageReader::MessageHeaderQueue::PopFront() {
data_.pop_front();
if (data_.size() != 0u) {
// Yup, new data.
@@ -616,6 +682,15 @@
} else {
timestamp_merger->Update(split_reader, front_timestamp());
}
+ } else {
+ // Poke anyways to update the heap.
+ if (timestamps) {
+ timestamp_merger->UpdateTimestamp(
+ nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
+ } else {
+ timestamp_merger->Update(
+ nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
+ }
}
}
@@ -687,25 +762,32 @@
std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
timestamp,
SplitMessageReader *split_message_reader) {
- DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
- [split_message_reader](
- const std::tuple<monotonic_clock::time_point,
- uint32_t, SplitMessageReader *>
- x) {
- return std::get<2>(x) == split_message_reader;
- }) == message_heap_.end())
- << ": Pushing message when it is already in the heap.";
+ if (split_message_reader != nullptr) {
+ DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
+ [split_message_reader](
+ const std::tuple<monotonic_clock::time_point,
+ uint32_t, SplitMessageReader *>
+ x) {
+ return std::get<2>(x) == split_message_reader;
+ }) == message_heap_.end())
+ << ": Pushing message when it is already in the heap.";
- message_heap_.push_back(std::make_tuple(
- std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
+ message_heap_.push_back(std::make_tuple(
+ std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
- std::push_heap(message_heap_.begin(), message_heap_.end(),
- &SplitMessageReaderHeapCompare);
+ std::push_heap(message_heap_.begin(), message_heap_.end(),
+ &SplitMessageReaderHeapCompare);
+ }
// If we are just a data merger, don't wait for timestamps.
if (!has_timestamps_) {
- channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
- pushed_ = true;
+ if (!message_heap_.empty()) {
+ channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
+ pushed_ = true;
+ } else {
+ // Remove ourselves if we are empty.
+ channel_merger_->Update(monotonic_clock::min_time, channel_index_);
+ }
}
}
@@ -730,26 +812,33 @@
std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
timestamp,
SplitMessageReader *split_message_reader) {
- DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
- [split_message_reader](
- const std::tuple<monotonic_clock::time_point,
- uint32_t, SplitMessageReader *>
- x) {
- return std::get<2>(x) == split_message_reader;
- }) == timestamp_heap_.end())
- << ": Pushing timestamp when it is already in the heap.";
+ if (split_message_reader != nullptr) {
+ DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
+ [split_message_reader](
+ const std::tuple<monotonic_clock::time_point,
+ uint32_t, SplitMessageReader *>
+ x) {
+ return std::get<2>(x) == split_message_reader;
+ }) == timestamp_heap_.end())
+ << ": Pushing timestamp when it is already in the heap.";
- timestamp_heap_.push_back(std::make_tuple(
- std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
+ timestamp_heap_.push_back(std::make_tuple(
+ std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
- std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
- SplitMessageReaderHeapCompare);
+ std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+ SplitMessageReaderHeapCompare);
+ }
// If we are a timestamp merger, don't wait for data. Missing data will be
// caught at read time.
if (has_timestamps_) {
- channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
- pushed_ = true;
+ if (!timestamp_heap_.empty()) {
+ channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
+ pushed_ = true;
+ } else {
+ // Remove ourselves if we are empty.
+ channel_merger_->Update(monotonic_clock::min_time, channel_index_);
+ }
}
}
@@ -832,50 +921,73 @@
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
oldest_timestamp = std::get<2>(oldest_timestamp_reader)
- ->PopOldest(channel_index_, node_index_);
+ ->PopOldestTimestamp(channel_index_, node_index_);
// Confirm that the time we have recorded matches.
CHECK_EQ(std::get<0>(oldest_timestamp), std::get<0>(oldest_timestamp_reader));
CHECK_EQ(std::get<1>(oldest_timestamp), std::get<1>(oldest_timestamp_reader));
- // TODO(austin): What if we get duplicate timestamps?
+ // Now, keep reading until we have found all duplicates.
+ while (!timestamp_heap_.empty()) {
+ // See if it is a duplicate.
+ std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+ next_oldest_timestamp_reader = timestamp_heap_.front();
- return oldest_timestamp;
-}
+ std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+ next_oldest_timestamp_time =
+ std::get<2>(next_oldest_timestamp_reader)
+ ->oldest_message(channel_index_, node_index_);
-TimestampMerger::DeliveryTimestamp TimestampMerger::OldestTimestamp() const {
- if (!has_timestamps_ || timestamp_heap_.size() == 0u) {
- return TimestampMerger::DeliveryTimestamp{};
+ if (std::get<0>(next_oldest_timestamp_time) ==
+ std::get<0>(oldest_timestamp) &&
+ std::get<1>(next_oldest_timestamp_time) ==
+ std::get<1>(oldest_timestamp)) {
+ // Pop the timestamp reader pointer.
+ std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+ &SplitMessageReaderHeapCompare);
+ timestamp_heap_.pop_back();
+
+ // Pop the next oldest timestamp. This re-pushes any messages from the
+ // reader.
+ std::tuple<monotonic_clock::time_point, uint32_t,
+ FlatbufferVector<MessageHeader>>
+ next_oldest_timestamp =
+ std::get<2>(next_oldest_timestamp_reader)
+ ->PopOldestTimestamp(channel_index_, node_index_);
+
+ // And make sure the contents matches in it's entirety.
+ CHECK(std::get<2>(oldest_timestamp).span() ==
+ std::get<2>(next_oldest_timestamp).span())
+ << ": Data at the same timestamp doesn't match, "
+ << aos::FlatbufferToJson(std::get<2>(oldest_timestamp)) << " vs "
+ << aos::FlatbufferToJson(std::get<2>(next_oldest_timestamp)) << " "
+ << absl::BytesToHexString(std::string_view(
+ reinterpret_cast<const char *>(
+ std::get<2>(oldest_timestamp).span().data()),
+ std::get<2>(oldest_timestamp).span().size()))
+ << " vs "
+ << absl::BytesToHexString(std::string_view(
+ reinterpret_cast<const char *>(
+ std::get<2>(next_oldest_timestamp).span().data()),
+ std::get<2>(next_oldest_timestamp).span().size()));
+
+ } else {
+ break;
+ }
}
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_timestamp_reader = timestamp_heap_.front();
-
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_timestamp = std::get<2>(oldest_timestamp_reader)
- ->oldest_message(channel_index_, node_index_);
-
- TimestampMerger::DeliveryTimestamp timestamp;
- timestamp.monotonic_event_time =
- monotonic_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_timestamp)->monotonic_sent_time()));
- timestamp.realtime_event_time = realtime_clock::time_point(
- chrono::nanoseconds(std::get<2>(oldest_timestamp)->realtime_sent_time()));
-
- timestamp.monotonic_remote_time =
- monotonic_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_timestamp)->monotonic_remote_time()));
- timestamp.realtime_remote_time =
- realtime_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_timestamp)->realtime_remote_time()));
-
- timestamp.remote_queue_index = std::get<2>(oldest_timestamp)->queue_index();
- return timestamp;
+ return oldest_timestamp;
}
std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
TimestampMerger::PopOldest() {
if (has_timestamps_) {
+ VLOG(1) << "Looking for matching timestamp for "
+ << configuration::StrippedChannelToString(
+ configuration_->channels()->Get(channel_index_))
+ << " (" << channel_index_ << ") "
+ << " at " << std::get<0>(oldest_timestamp());
+
// Read the timestamps.
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
@@ -944,7 +1056,8 @@
<< " on channel "
<< configuration::StrippedChannelToString(
configuration_->channels()->Get(channel_index_))
- << " (" << channel_index_ << ")";
+ << " (" << channel_index_ << ")"
+ << (VLOG_IS_ON(1) ? DebugString() : "");
return std::make_tuple(timestamp,
std::move(std::get<2>(oldest_timestamp)));
}
@@ -952,6 +1065,10 @@
timestamp.monotonic_remote_time = remote_monotonic_time;
}
+ VLOG(1) << "Found matching data "
+ << configuration::StrippedChannelToString(
+ configuration_->channels()->Get(channel_index_))
+ << " (" << channel_index_ << ")";
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
oldest_message = PopMessageHeap();
@@ -1062,10 +1179,27 @@
<< FlatbufferToJson(reader->node()) << " start_time "
<< monotonic_start_time();
} else {
- // And then make sure all the other files have matching headers.
- CHECK(CompareFlatBuffer(log_file_header(), reader->log_file_header()))
- << ": " << FlatbufferToJson(log_file_header()) << " reader "
- << FlatbufferToJson(reader->log_file_header());
+ // Find the earliest start time. That way, if we get a full log file
+ // directly from the node, and a partial later, we start with the
+ // full. Update our header to match that.
+ const monotonic_clock::time_point new_monotonic_start_time(
+ chrono::nanoseconds(
+ reader->log_file_header()->monotonic_start_time()));
+ const realtime_clock::time_point new_realtime_start_time(
+ chrono::nanoseconds(
+ reader->log_file_header()->realtime_start_time()));
+
+ if (monotonic_start_time() == monotonic_clock::min_time ||
+ (new_monotonic_start_time != monotonic_clock::min_time &&
+ new_monotonic_start_time < monotonic_start_time())) {
+ log_file_header_.mutable_message()->mutate_monotonic_start_time(
+ new_monotonic_start_time.time_since_epoch().count());
+ log_file_header_.mutable_message()->mutate_realtime_start_time(
+ new_realtime_start_time.time_since_epoch().count());
+ VLOG(1) << "Updated log file " << reader->filename()
+ << " with node " << FlatbufferToJson(reader->node())
+ << " start_time " << new_monotonic_start_time;
+ }
}
}
}
@@ -1105,24 +1239,6 @@
return channel_heap_.front().first;
}
-TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestamp() const {
- if (timestamp_heap_.empty()) {
- return TimestampMerger::DeliveryTimestamp{};
- }
- return timestamp_mergers_[timestamp_heap_.front().second].OldestTimestamp();
-}
-
-TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestampForChannel(
- int channel) const {
- // If we didn't find any data for this node, we won't have any mergers. Return
- // an invalid timestamp in that case.
- if (timestamp_mergers_.size() <= static_cast<size_t>(channel)) {
- TimestampMerger::DeliveryTimestamp result;
- return result;
- }
- return timestamp_mergers_[channel].OldestTimestamp();
-}
-
void ChannelMerger::PushChannelHeap(monotonic_clock::time_point timestamp,
int channel_index) {
// Pop and recreate the heap if it has already been pushed. And since we are
@@ -1161,6 +1277,11 @@
}
}
+ if (timestamp == monotonic_clock::min_time) {
+ timestamp_mergers_[channel_index].set_pushed(false);
+ return;
+ }
+
channel_heap_.push_back(std::make_pair(timestamp, channel_index));
// The default sort puts the newest message first. Use a custom comparator to
@@ -1175,6 +1296,32 @@
}
}
+void ChannelMerger::VerifyHeaps() {
+ {
+ std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
+ channel_heap_;
+ std::make_heap(channel_heap.begin(), channel_heap.end(),
+ &ChannelHeapCompare);
+
+ for (size_t i = 0; i < channel_heap_.size(); ++i) {
+ CHECK(channel_heap_[i] == channel_heap[i]) << ": Heaps diverged...";
+ CHECK_EQ(std::get<0>(channel_heap[i]),
+ timestamp_mergers_[std::get<1>(channel_heap[i])]
+ .channel_merger_time());
+ }
+ }
+ {
+ std::vector<std::pair<monotonic_clock::time_point, int>> timestamp_heap =
+ timestamp_heap_;
+ std::make_heap(timestamp_heap.begin(), timestamp_heap.end(),
+ &ChannelHeapCompare);
+
+ for (size_t i = 0; i < timestamp_heap_.size(); ++i) {
+ CHECK(timestamp_heap_[i] == timestamp_heap[i]) << ": Heaps diverged...";
+ }
+ }
+}
+
std::tuple<TimestampMerger::DeliveryTimestamp, int,
FlatbufferVector<MessageHeader>>
ChannelMerger::PopOldest() {
@@ -1210,6 +1357,16 @@
<< ": channel_heap_ was corrupted for " << channel_index << ": "
<< DebugString();
+ CHECK_GE(std::get<0>(message).monotonic_event_time, last_popped_time_)
+ << ": " << MaybeNodeName(log_file_header()->node())
+ << "Messages came off the queue out of order. " << DebugString();
+ last_popped_time_ = std::get<0>(message).monotonic_event_time;
+
+ VLOG(1) << "Popped " << last_popped_time_ << " "
+ << configuration::StrippedChannelToString(
+ configuration()->channels()->Get(channel_index))
+ << " (" << channel_index << ")";
+
return std::make_tuple(std::get<0>(message), channel_index,
std::move(std::get<1>(message)));
}
@@ -1217,27 +1374,31 @@
std::string SplitMessageReader::MessageHeaderQueue::DebugString() const {
std::stringstream ss;
for (size_t i = 0; i < data_.size(); ++i) {
- if (timestamps) {
- ss << " msg: ";
- } else {
- ss << " timestamp: ";
- }
- ss << monotonic_clock::time_point(std::chrono::nanoseconds(
- data_[i].message().monotonic_sent_time()))
- << " ("
- << realtime_clock::time_point(
- std::chrono::nanoseconds(data_[i].message().realtime_sent_time()))
- << ") " << data_[i].message().queue_index();
- if (timestamps) {
- ss << " <- remote "
- << monotonic_clock::time_point(std::chrono::nanoseconds(
- data_[i].message().monotonic_remote_time()))
+ if (i < 5 || i + 5 > data_.size()) {
+ if (timestamps) {
+ ss << " msg: ";
+ } else {
+ ss << " timestamp: ";
+ }
+ ss << monotonic_clock::time_point(
+ chrono::nanoseconds(data_[i].message().monotonic_sent_time()))
<< " ("
- << realtime_clock::time_point(std::chrono::nanoseconds(
- data_[i].message().realtime_remote_time()))
- << ")";
+ << realtime_clock::time_point(
+ chrono::nanoseconds(data_[i].message().realtime_sent_time()))
+ << ") " << data_[i].message().queue_index();
+ if (timestamps) {
+ ss << " <- remote "
+ << monotonic_clock::time_point(chrono::nanoseconds(
+ data_[i].message().monotonic_remote_time()))
+ << " ("
+ << realtime_clock::time_point(chrono::nanoseconds(
+ data_[i].message().realtime_remote_time()))
+ << ")";
+ }
+ ss << "\n";
+ } else if (i == 5) {
+ ss << " ...\n";
}
- ss << "\n";
}
return ss.str();
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 2b08b59..d1fbeb2 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -41,13 +41,20 @@
class DetachedBufferWriter {
public:
DetachedBufferWriter(std::string_view filename);
+ DetachedBufferWriter(DetachedBufferWriter &&other);
+ DetachedBufferWriter(const DetachedBufferWriter &) = delete;
+
~DetachedBufferWriter();
- DetachedBufferWriter(const DetachedBufferWriter &) = delete;
+ DetachedBufferWriter &operator=(DetachedBufferWriter &&other);
DetachedBufferWriter &operator=(const DetachedBufferWriter &) = delete;
std::string_view filename() const { return filename_; }
+ // Rewrites a location in a file (relative to the start) to have new data in
+ // it. The main use case is updating start times after a log file starts.
+ void RewriteLocation(off64_t offset, absl::Span<const uint8_t> data);
+
// TODO(austin): Snappy compress the log file if it ends with .snappy!
// Queues up a finished FlatBufferBuilder to be written. Steals the detached
@@ -68,7 +75,7 @@
size_t total_size() const { return written_size_ + queued_size_; }
private:
- const std::string filename_;
+ std::string filename_;
int fd_ = -1;
@@ -236,15 +243,15 @@
void SetTimestampMerger(TimestampMerger *timestamp_merger, int channel,
const Node *target_node);
- // Returns the (timestamp, queue_idex) for the oldest message in a channel, or
- // max_time if there is nothing in the channel.
+ // Returns the (timestamp, queue_index, message_header) for the oldest message
+ // in a channel, or max_time if there is nothing in the channel.
std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
oldest_message(int channel) {
return channels_[channel].data.front_timestamp();
}
- // Returns the (timestamp, queue_index) for the oldest delivery time in a
- // channel, or max_time if there is nothing in the channel.
+ // Returns the (timestamp, queue_index, message_header) for the oldest
+ // delivery time in a channel, or max_time if there is nothing in the channel.
std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
oldest_message(int channel, int destination_node) {
return channels_[channel].timestamps[destination_node].front_timestamp();
@@ -260,7 +267,7 @@
// a channel delivered to a node. Requeues data as needed.
std::tuple<monotonic_clock::time_point, uint32_t,
FlatbufferVector<MessageHeader>>
- PopOldest(int channel, int node_index);
+ PopOldestTimestamp(int channel, int node_index);
// Returns the header for the log files.
const LogFileHeader *log_file_header() const {
@@ -367,7 +374,7 @@
bool emplace_back(FlatbufferVector<MessageHeader> &&msg);
// Drops the front message. Invalidates the front() reference.
- void pop_front();
+ void PopFront();
// The size of the queue.
size_t size() { return data_.size(); }
@@ -375,15 +382,16 @@
// Returns a debug string with info about each message in the queue.
std::string DebugString() const;
- // Returns the (timestamp, queue_index) for the oldest message.
+ // Returns the (timestamp, queue_index, message_header) for the oldest
+ // message.
const std::tuple<monotonic_clock::time_point, uint32_t,
const MessageHeader *>
front_timestamp() {
- CHECK_GT(data_.size(), 0u);
+ const MessageHeader &message = front().message();
return std::make_tuple(
- monotonic_clock::time_point(std::chrono::nanoseconds(
- front().message().monotonic_sent_time())),
- front().message().queue_index(), &front().message());
+ monotonic_clock::time_point(
+ std::chrono::nanoseconds(message.monotonic_sent_time())),
+ message.queue_index(), &message);
}
// Pointer to the timestamp merger for this queue if available.
@@ -471,9 +479,6 @@
// The caller can determine what the appropriate action is to recover.
std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
- // Returns the oldest forwarding timestamp.
- DeliveryTimestamp OldestTimestamp() const;
-
// Tracks if the channel merger has pushed this onto it's heap or not.
bool pushed() { return pushed_; }
// Sets if this has been pushed to the channel merger heap. Should only be
@@ -490,6 +495,14 @@
// called by a SplitMessageReader.
void NoticeAtEnd();
+ aos::monotonic_clock::time_point channel_merger_time() {
+ if (has_timestamps_) {
+ return std::get<0>(timestamp_heap_[0]);
+ } else {
+ return std::get<0>(message_heap_[0]);
+ }
+ }
+
private:
// Pushes messages and timestamps to the corresponding heaps.
void PushMessageHeap(
@@ -576,12 +589,6 @@
FlatbufferVector<MessageHeader>>
PopOldest();
- // Returns the oldest timestamp in the timestamp heap.
- TimestampMerger::DeliveryTimestamp OldestTimestamp() const;
- // Returns the oldest timestamp in the timestamp heap for a specific channel.
- TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
- int channel) const;
-
// Returns the config for this set of log files.
const Configuration *configuration() const {
return log_file_header()->configuration();
@@ -628,6 +635,9 @@
void PushChannelHeap(monotonic_clock::time_point timestamp,
int channel_index);
+ // CHECKs that channel_heap_ and timestamp_heap_ are valid heaps.
+ void VerifyHeaps();
+
// All the message readers.
std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
@@ -641,6 +651,7 @@
std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
// A heap of just the timestamp channel readers and timestamps for the oldest
// data in each.
+ // TODO(austin): I think this is no longer used and can be removed (!)
std::vector<std::pair<monotonic_clock::time_point, int>> timestamp_heap_;
// Configured node.
@@ -650,6 +661,9 @@
// Cached copy of the list of nodes.
std::vector<const Node *> nodes_;
+
+ // Last time popped. Used to detect events being returned out of order.
+ monotonic_clock::time_point last_popped_time_ = monotonic_clock::min_time;
};
// Returns the node name with a trailing space, or an empty string if we are on
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 242638c..89839ec 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -8,6 +8,7 @@
#include <vector>
#include "Eigen/Dense"
+#include "absl/strings/escaping.h"
#include "absl/types/span.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/logger_generated.h"
@@ -15,6 +16,7 @@
#include "aos/network/team_number.h"
#include "aos/time/time.h"
#include "flatbuffers/flatbuffers.h"
+#include "third_party/gmp/gmpxx.h"
DEFINE_bool(skip_missing_forwarding_entries, false,
"If true, drop any forwarding entries with missing data. If "
@@ -25,14 +27,50 @@
"of CSV files in /tmp/. This should only be needed when debugging "
"time synchronization.");
+DEFINE_bool(skip_order_validation, false,
+ "If true, ignore any out of orderness in replay");
+
namespace aos {
namespace logger {
-
namespace chrono = std::chrono;
-Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
+void MultiNodeLogNamer::WriteHeader(
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header,
+ const Node *node) {
+ if (node == this->node()) {
+ data_writer_->WriteSizedFlatbuffer(header.full_span());
+ } else {
+ for (std::pair<const Channel *const, DataWriter> &data_writer :
+ data_writers_) {
+ if (node == data_writer.second.node) {
+ data_writer.second.writer->WriteSizedFlatbuffer(header.full_span());
+ }
+ }
+ }
+}
+
+void MultiNodeLogNamer::Rotate(
+ const Node *node,
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header) {
+ if (node == this->node()) {
+ ++part_number_;
+ *data_writer_ = std::move(*OpenDataWriter());
+ data_writer_->WriteSizedFlatbuffer(header.full_span());
+ } else {
+ for (std::pair<const Channel *const, DataWriter> &data_writer :
+ data_writers_) {
+ if (node == data_writer.second.node) {
+ ++data_writer.second.part_number;
+ data_writer.second.rotate(data_writer.first, &data_writer.second);
+ data_writer.second.writer->WriteSizedFlatbuffer(header.full_span());
+ }
+ }
+ }
+}
+
+Logger::Logger(std::string_view base_name, EventLoop *event_loop,
std::chrono::milliseconds polling_period)
- : Logger(std::make_unique<LocalLogNamer>(writer, event_loop->node()),
+ : Logger(std::make_unique<LocalLogNamer>(base_name, event_loop->node()),
event_loop, polling_period) {}
Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
@@ -40,11 +78,60 @@
: event_loop_(event_loop),
log_namer_(std::move(log_namer)),
timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
- polling_period_(polling_period) {
+ polling_period_(polling_period),
+ server_statistics_fetcher_(
+ configuration::MultiNode(event_loop_->configuration())
+ ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
+ "/aos")
+ : aos::Fetcher<message_bridge::ServerStatistics>()) {
VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
int channel_index = 0;
+
+ // Find all the nodes which are logging timestamps on our node.
+ std::set<const Node *> timestamp_logger_nodes;
+ for (const Channel *channel : *event_loop_->configuration()->channels()) {
+ if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) ||
+ !channel->has_destination_nodes()) {
+ continue;
+ }
+ for (const Connection *connection : *channel->destination_nodes()) {
+ const Node *other_node = configuration::GetNode(
+ event_loop_->configuration(), connection->name()->string_view());
+
+ if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ connection, event_loop_->node())) {
+ VLOG(1) << "Timestamps are logged from "
+ << FlatbufferToJson(other_node);
+ timestamp_logger_nodes.insert(other_node);
+ }
+ }
+ }
+
+ std::map<const Channel *, const Node *> timestamp_logger_channels;
+
+ // Now that we have all the nodes accumulated, make remote timestamp loggers
+ // for them.
+ for (const Node *node : timestamp_logger_nodes) {
+ const Channel *channel = configuration::GetChannel(
+ event_loop_->configuration(),
+ absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
+ logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
+ event_loop_->node());
+
+ CHECK(channel != nullptr)
+ << ": Remote timestamps are logged on "
+ << event_loop_->node()->name()->string_view()
+ << " but can't find channel /aos/remote_timestamps/"
+ << node->name()->string_view();
+ timestamp_logger_channels.insert(std::make_pair(channel, node));
+ }
+
+ const size_t our_node_index = configuration::GetNodeIndex(
+ event_loop_->configuration(), event_loop_->node());
+
for (const Channel *channel : *event_loop_->configuration()->channels()) {
FetcherStruct fs;
+ fs.node_index = our_node_index;
const bool is_local =
configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
@@ -60,7 +147,15 @@
: configuration::ConnectionDeliveryTimeIsLoggedOnNode(
channel, event_loop_->node(), event_loop_->node());
- if (log_message || log_delivery_times) {
+ // Now, detect a MessageHeader timestamp logger where we should just log the
+ // contents to a file directly.
+ const bool log_contents = timestamp_logger_channels.find(channel) !=
+ timestamp_logger_channels.end();
+ const Node *timestamp_node =
+ log_contents ? timestamp_logger_channels.find(channel)->second
+ : nullptr;
+
+ if (log_message || log_delivery_times || log_contents) {
fs.fetcher = event_loop->MakeRawFetcher(channel);
VLOG(1) << "Logging channel "
<< configuration::CleanedChannelToString(channel);
@@ -76,6 +171,14 @@
fs.log_type = LogType::kLogRemoteMessage;
}
}
+ if (log_contents) {
+ VLOG(1) << "Timestamp logger channel "
+ << configuration::CleanedChannelToString(channel);
+ fs.contents_writer =
+ log_namer_->MakeForwardedTimestampWriter(channel, timestamp_node);
+ fs.node_index = configuration::GetNodeIndex(
+ event_loop_->configuration(), timestamp_node);
+ }
fs.channel_index = channel_index;
fs.written = false;
fetchers_.emplace_back(std::move(fs));
@@ -83,49 +186,178 @@
++channel_index;
}
- // When things start, we want to log the header, then the most recent messages
- // available on each fetcher to capture the previous state, then start
- // polling.
- event_loop_->OnRun([this, polling_period]() {
- // Grab data from each channel right before we declare the log file started
- // so we can capture the latest message on each channel. This lets us have
- // non periodic messages with configuration that now get logged.
- for (FetcherStruct &f : fetchers_) {
- f.written = !f.fetcher->Fetch();
- }
+ node_state_.resize(configuration::MultiNode(event_loop_->configuration())
+ ? event_loop_->configuration()->nodes()->size()
+ : 1u);
- // We need to pick a point in time to declare the log file "started". This
- // starts here. It needs to be after everything is fetched so that the
- // fetchers are all pointed at the most recent message before the start
- // time.
- monotonic_start_time_ = event_loop_->monotonic_now();
- realtime_start_time_ = event_loop_->realtime_now();
- last_synchronized_time_ = monotonic_start_time_;
+ for (const Node *node : log_namer_->nodes()) {
+ const int node_index =
+ configuration::GetNodeIndex(event_loop_->configuration(), node);
- LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
- << " start_time " << monotonic_start_time_;
+ node_state_[node_index].log_file_header = MakeHeader(node);
+ }
- WriteHeader();
-
- timer_handler_->Setup(event_loop_->monotonic_now() + polling_period,
- polling_period);
- });
+ // When things start, we want to log the header, then the most recent
+ // messages available on each fetcher to capture the previous state, then
+ // start polling.
+ event_loop_->OnRun([this]() { StartLogging(); });
}
-// TODO(austin): Set the remote start time to the first time we see a remote
-// message when we are logging those messages separate? Need to signal what to
-// do, or how to get a good timestamp.
+void Logger::StartLogging() {
+ // Grab data from each channel right before we declare the log file started
+ // so we can capture the latest message on each channel. This lets us have
+ // non periodic messages with configuration that now get logged.
+ for (FetcherStruct &f : fetchers_) {
+ f.written = !f.fetcher->Fetch();
+ }
+
+ // Clear out any old timestamps in case we are re-starting logging.
+ for (size_t i = 0; i < node_state_.size(); ++i) {
+ SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time);
+ }
+
+ WriteHeader();
+
+ LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
+ << " start_time " << last_synchronized_time_;
+
+ timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
+ polling_period_);
+}
+
void Logger::WriteHeader() {
+ if (configuration::MultiNode(event_loop_->configuration())) {
+ server_statistics_fetcher_.Fetch();
+ }
+
+ aos::monotonic_clock::time_point monotonic_start_time =
+ event_loop_->monotonic_now();
+ aos::realtime_clock::time_point realtime_start_time =
+ event_loop_->realtime_now();
+
+ // We need to pick a point in time to declare the log file "started". This
+ // starts here. It needs to be after everything is fetched so that the
+ // fetchers are all pointed at the most recent message before the start
+ // time.
+ last_synchronized_time_ = monotonic_start_time;
+
for (const Node *node : log_namer_->nodes()) {
- WriteHeader(node);
+ const int node_index =
+ configuration::GetNodeIndex(event_loop_->configuration(), node);
+ MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
+ realtime_start_time);
+ log_namer_->WriteHeader(node_state_[node_index].log_file_header, node);
}
}
-void Logger::WriteHeader(const Node *node) {
+void Logger::WriteMissingTimestamps() {
+ if (configuration::MultiNode(event_loop_->configuration())) {
+ server_statistics_fetcher_.Fetch();
+ } else {
+ return;
+ }
+
+ if (server_statistics_fetcher_.get() == nullptr) {
+ return;
+ }
+
+ for (const Node *node : log_namer_->nodes()) {
+ const int node_index =
+ configuration::GetNodeIndex(event_loop_->configuration(), node);
+ if (MaybeUpdateTimestamp(
+ node, node_index,
+ server_statistics_fetcher_.context().monotonic_event_time,
+ server_statistics_fetcher_.context().realtime_event_time)) {
+ log_namer_->Rotate(node, node_state_[node_index].log_file_header);
+ }
+ }
+}
+
+void Logger::SetStartTime(size_t node_index,
+ aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time) {
+ node_state_[node_index].monotonic_start_time = monotonic_start_time;
+ node_state_[node_index].realtime_start_time = realtime_start_time;
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ monotonic_start_time.time_since_epoch())
+ .count());
+ if (node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->has_realtime_start_time()) {
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_start_time.time_since_epoch())
+ .count());
+ }
+}
+
+bool Logger::MaybeUpdateTimestamp(
+ const Node *node, int node_index,
+ aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time) {
+ // Bail early if there the start times are already set.
+ if (node_state_[node_index].monotonic_start_time !=
+ monotonic_clock::min_time) {
+ return false;
+ }
+ if (configuration::MultiNode(event_loop_->configuration())) {
+ if (event_loop_->node() == node) {
+ // There are no offsets to compute for ourself, so always succeed.
+ SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+ return true;
+ } else if (server_statistics_fetcher_.get() != nullptr) {
+ // We must be a remote node now. Look for the connection and see if it is
+ // connected.
+
+ for (const message_bridge::ServerConnection *connection :
+ *server_statistics_fetcher_->connections()) {
+ if (connection->node()->name()->string_view() !=
+ node->name()->string_view()) {
+ continue;
+ }
+
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ VLOG(1) << node->name()->string_view()
+ << " is not connected, can't start it yet.";
+ break;
+ }
+
+ if (!connection->has_monotonic_offset()) {
+ VLOG(1) << "Missing monotonic offset for setting start time for node "
+ << aos::FlatbufferToJson(node);
+ break;
+ }
+
+ VLOG(1) << "Updating start time for " << aos::FlatbufferToJson(node);
+
+ // Found it and it is connected. Compensate and go.
+ monotonic_start_time +=
+ std::chrono::nanoseconds(connection->monotonic_offset());
+
+ SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+ return true;
+ }
+ }
+ } else {
+ SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+ return true;
+ }
+ return false;
+}
+
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
+ const Node *node) {
// Now write the header with this timestamp in it.
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
+ // TODO(austin): Compress this much more efficiently. There are a bunch of
+ // duplicated schemas.
flatbuffers::Offset<aos::Configuration> configuration_offset =
CopyFlatBuffer(event_loop_->configuration(), &fbb);
@@ -133,7 +365,8 @@
fbb.CreateString(network::GetHostname());
flatbuffers::Offset<Node> node_offset;
- if (event_loop_->node() != nullptr) {
+
+ if (configuration::MultiNode(event_loop_->configuration())) {
node_offset = CopyFlatBuffer(node, &fbb);
}
@@ -158,41 +391,133 @@
log_file_header_builder.add_monotonic_start_time(
std::chrono::duration_cast<std::chrono::nanoseconds>(
- monotonic_start_time_.time_since_epoch())
+ monotonic_clock::min_time.time_since_epoch())
.count());
- log_file_header_builder.add_realtime_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- realtime_start_time_.time_since_epoch())
- .count());
-
- fbb.FinishSizePrefixed(log_file_header_builder.Finish());
- log_namer_->WriteHeader(&fbb, node);
-}
-
-void Logger::Rotate(DetachedBufferWriter *writer) {
- Rotate(std::make_unique<LocalLogNamer>(writer, event_loop_->node()));
-}
-
-void Logger::Rotate(std::unique_ptr<LogNamer> log_namer) {
- // Force data up until now to be written.
- DoLogData();
-
- // Swap the writer out, and re-write the header.
- log_namer_ = std::move(log_namer);
-
- // And then update the writers.
- for (FetcherStruct &f : fetchers_) {
- const Channel *channel =
- event_loop_->configuration()->channels()->Get(f.channel_index);
- if (f.timestamp_writer != nullptr) {
- f.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
- }
- if (f.writer != nullptr) {
- f.writer = log_namer_->MakeWriter(channel);
- }
+ if (node == event_loop_->node()) {
+ log_file_header_builder.add_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_clock::min_time.time_since_epoch())
+ .count());
}
- WriteHeader();
+ fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+ return fbb.Release();
+}
+
+void Logger::Rotate() {
+ for (const Node *node : log_namer_->nodes()) {
+ const int node_index =
+ configuration::GetNodeIndex(event_loop_->configuration(), node);
+ log_namer_->Rotate(node, node_state_[node_index].log_file_header);
+ }
+}
+
+void Logger::LogUntil(monotonic_clock::time_point t) {
+ WriteMissingTimestamps();
+
+ // Write each channel to disk, one at a time.
+ for (FetcherStruct &f : fetchers_) {
+ while (true) {
+ if (f.written) {
+ if (!f.fetcher->FetchNext()) {
+ VLOG(2) << "No new data on "
+ << configuration::CleanedChannelToString(
+ f.fetcher->channel());
+ break;
+ } else {
+ f.written = false;
+ }
+ }
+
+ CHECK(!f.written);
+
+ // TODO(james): Write tests to exercise this logic.
+ if (f.fetcher->context().monotonic_event_time < t) {
+ if (f.writer != nullptr) {
+ // Write!
+ flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+ max_header_size_);
+ fbb.ForceDefaults(true);
+
+ fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+ f.channel_index, f.log_type));
+
+ VLOG(2) << "Writing data as node "
+ << FlatbufferToJson(event_loop_->node()) << " for channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel())
+ << " to " << f.writer->filename() << " data "
+ << FlatbufferToJson(
+ flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
+
+ max_header_size_ = std::max(
+ max_header_size_, fbb.GetSize() - f.fetcher->context().size);
+ f.writer->QueueSizedFlatbuffer(&fbb);
+ }
+
+ if (f.timestamp_writer != nullptr) {
+ // And now handle timestamps.
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+ f.channel_index,
+ LogType::kLogDeliveryTimeOnly));
+
+ VLOG(2) << "Writing timestamps as node "
+ << FlatbufferToJson(event_loop_->node()) << " for channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel())
+ << " to " << f.timestamp_writer->filename() << " timestamp "
+ << FlatbufferToJson(
+ flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
+
+ f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
+ }
+
+ if (f.contents_writer != nullptr) {
+ // And now handle the special message contents channel. Copy the
+ // message into a FlatBufferBuilder and save it to disk.
+ // TODO(austin): We can be more efficient here when we start to
+ // care...
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ const MessageHeader *msg =
+ flatbuffers::GetRoot<MessageHeader>(f.fetcher->context().data);
+
+ logger::MessageHeader::Builder message_header_builder(fbb);
+
+ // Note: this must match the same order as MessageBridgeServer and
+ // PackMessage. We want identical headers to have identical
+ // on-the-wire formats to make comparing them easier.
+ message_header_builder.add_channel_index(msg->channel_index());
+
+ message_header_builder.add_queue_index(msg->queue_index());
+ message_header_builder.add_monotonic_sent_time(
+ msg->monotonic_sent_time());
+ message_header_builder.add_realtime_sent_time(
+ msg->realtime_sent_time());
+
+ message_header_builder.add_monotonic_remote_time(
+ msg->monotonic_remote_time());
+ message_header_builder.add_realtime_remote_time(
+ msg->realtime_remote_time());
+ message_header_builder.add_remote_queue_index(
+ msg->remote_queue_index());
+
+ fbb.FinishSizePrefixed(message_header_builder.Finish());
+
+ f.contents_writer->QueueSizedFlatbuffer(&fbb);
+ }
+
+ f.written = true;
+ } else {
+ break;
+ }
+ }
+ }
+ last_synchronized_time_ = t;
}
void Logger::DoLogData() {
@@ -205,77 +530,8 @@
do {
// Move the sync point up by at most polling_period. This forces one sync
// per iteration, even if it is small.
- last_synchronized_time_ =
- std::min(last_synchronized_time_ + polling_period_, monotonic_now);
- // Write each channel to disk, one at a time.
- for (FetcherStruct &f : fetchers_) {
- while (true) {
- if (f.written) {
- if (!f.fetcher->FetchNext()) {
- VLOG(2) << "No new data on "
- << configuration::CleanedChannelToString(
- f.fetcher->channel());
- break;
- } else {
- f.written = false;
- }
- }
-
- CHECK(!f.written);
-
- // TODO(james): Write tests to exercise this logic.
- if (f.fetcher->context().monotonic_event_time <
- last_synchronized_time_) {
- if (f.writer != nullptr) {
- // Write!
- flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
- max_header_size_);
- fbb.ForceDefaults(true);
-
- fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- f.channel_index, f.log_type));
-
- VLOG(2) << "Writing data as node "
- << FlatbufferToJson(event_loop_->node()) << " for channel "
- << configuration::CleanedChannelToString(
- f.fetcher->channel())
- << " to " << f.writer->filename() << " data "
- << FlatbufferToJson(
- flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
-
- max_header_size_ = std::max(
- max_header_size_, fbb.GetSize() - f.fetcher->context().size);
- f.writer->QueueSizedFlatbuffer(&fbb);
- }
-
- if (f.timestamp_writer != nullptr) {
- // And now handle timestamps.
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
-
- fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- f.channel_index,
- LogType::kLogDeliveryTimeOnly));
-
- VLOG(2) << "Writing timestamps as node "
- << FlatbufferToJson(event_loop_->node()) << " for channel "
- << configuration::CleanedChannelToString(
- f.fetcher->channel())
- << " to " << f.timestamp_writer->filename() << " timestamp "
- << FlatbufferToJson(
- flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
-
- f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
- }
-
- f.written = true;
- } else {
- break;
- }
- }
- }
+ LogUntil(
+ std::min(last_synchronized_time_ + polling_period_, monotonic_now));
// If we missed cycles, we could be pretty far behind. Spin until we are
// caught up.
@@ -302,7 +558,8 @@
if (replay_configuration) {
CHECK_EQ(configuration::MultiNode(configuration()),
configuration::MultiNode(replay_configuration))
- << ": Log file and replay config need to both be multi or single node.";
+ << ": Log file and replay config need to both be multi or single "
+ "node.";
}
if (!configuration::MultiNode(configuration())) {
@@ -312,12 +569,13 @@
if (replay_configuration) {
CHECK_EQ(logged_configuration()->nodes()->size(),
replay_configuration->nodes()->size())
- << ": Log file and replay config need to have matching nodes lists.";
+ << ": Log file and replay config need to have matching nodes "
+ "lists.";
for (const Node *node : *logged_configuration()->nodes()) {
if (configuration::GetNode(replay_configuration, node) == nullptr) {
- LOG(FATAL)
- << "Found node " << FlatbufferToJson(node)
- << " in logged config that is not present in the replay config.";
+ LOG(FATAL) << "Found node " << FlatbufferToJson(node)
+ << " in logged config that is not present in the replay "
+ "config.";
}
}
}
@@ -335,8 +593,8 @@
if (offset_fp_ != nullptr) {
fclose(offset_fp_);
}
- // Zero out some buffers. It's easy to do use-after-frees on these, so make it
- // more obvious.
+ // Zero out some buffers. It's easy to do use-after-frees on these, so make
+ // it more obvious.
if (remapped_configuration_buffer_) {
remapped_configuration_buffer_->Wipe();
}
@@ -352,8 +610,8 @@
}
std::vector<const Node *> LogReader::Nodes() const {
- // Because the Node pointer will only be valid if it actually points to memory
- // owned by remapped_configuration_, we need to wait for the
+ // Because the Node pointer will only be valid if it actually points to
+ // memory owned by remapped_configuration_, we need to wait for the
// remapped_configuration_ to be populated before accessing it.
//
// Also, note, that when ever a map is changed, the nodes in here are
@@ -404,38 +662,45 @@
"you sure that the replay config matches the original config?";
}
- // We need to now seed our per-node time offsets and get everything set up to
- // run.
- const size_t num_nodes = !configuration::MultiNode(logged_configuration())
- ? 1u
- : logged_configuration()->nodes()->size();
+ // We need to now seed our per-node time offsets and get everything set up
+ // to run.
+ const size_t num_nodes = nodes_count();
// It is easiest to solve for per node offsets with a matrix rather than
// trying to solve the equations by hand. So let's get after it.
//
// Now, build up the map matrix.
//
- // sample_matrix_ = map_matrix_ * offset_matrix_
- map_matrix_ = Eigen::MatrixXd::Zero(filters_.size() + 1, num_nodes);
+ // offset_matrix_ = (map_matrix_ + slope_matrix_) * [ta; tb; tc]
+ map_matrix_ = Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ filters_.size() + 1, num_nodes);
+ slope_matrix_ =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ filters_.size() + 1, num_nodes);
- sample_matrix_ = Eigen::VectorXd::Zero(filters_.size() + 1);
- offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
+ offset_matrix_ =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
+ valid_matrix_ =
+ Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
+ last_valid_matrix_ =
+ Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
- // And the base offset matrix, which will be a copy of the initial offset
- // matrix.
- base_offset_matrix_ =
- Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>::Zero(
- num_nodes);
+ time_offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
+ time_slope_matrix_ = Eigen::VectorXd::Zero(num_nodes);
- // All offsets should sum to 0. Add that as the first constraint in our least
- // squares.
- map_matrix_.row(0).setOnes();
+ // All times should average out to the distributed clock.
+ for (int i = 0; i < map_matrix_.cols(); ++i) {
+ // 1/num_nodes.
+ map_matrix_(0, i) = mpq_class(1, num_nodes);
+ }
+ valid_matrix_(0) = true;
{
// Now, add the a - b -> sample elements.
size_t i = 1;
for (std::pair<const std::tuple<const Node *, const Node *>,
- message_bridge::ClippedAverageFilter> &filter : filters_) {
+ std::tuple<message_bridge::NoncausalOffsetEstimator>>
+ &filter : filters_) {
const Node *const node_a = std::get<0>(filter.first);
const Node *const node_b = std::get<1>(filter.first);
@@ -444,136 +709,68 @@
const size_t node_b_index =
configuration::GetNodeIndex(configuration(), node_b);
- // +a
- map_matrix_(i, node_a_index) = 1.0;
- // -b
- map_matrix_(i, node_b_index) = -1.0;
+ // -a
+ map_matrix_(i, node_a_index) = mpq_class(-1);
+ // +b
+ map_matrix_(i, node_b_index) = mpq_class(1);
// -> sample
- filter.second.set_sample_pointer(&sample_matrix_(i, 0));
+ std::get<0>(filter.second)
+ .set_slope_pointer(&slope_matrix_(i, node_a_index));
+ std::get<0>(filter.second).set_offset_pointer(&offset_matrix_(i, 0));
+
+ valid_matrix_(i) = false;
+ std::get<0>(filter.second).set_valid_pointer(&valid_matrix_(i));
++i;
}
}
- // Rank of the map matrix tells you if all the nodes are in communication with
- // each other, which tells you if the offsets are observable.
- const size_t connected_nodes =
- Eigen::FullPivLU<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>>(
- map_matrix_)
- .rank();
-
- // We don't need to support isolated nodes until someone has a real use case.
- CHECK_EQ(connected_nodes, num_nodes)
- << ": There is a node which isn't communicating with the rest.";
-
- // Now, iterate through all the timestamps from all the nodes and seed
- // everything.
- for (std::unique_ptr<State> &state : states_) {
- for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
- TimestampMerger::DeliveryTimestamp timestamp =
- state->OldestTimestampForChannel(i);
- if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
- CHECK(state->MaybeUpdateTimestamp(timestamp, i));
- }
- }
- }
-
- // Make sure all the samples have been seeded.
- for (int i = 1; i < sample_matrix_.cols(); ++i) {
- // The seeding logic is pretty basic right now because we don't have great
- // use cases yet. It wants to see data from every node. Blow up for now,
- // and once we have a reason to do something different, update this logic.
- // Maybe read further in the log file? Or seed off the realtime time?
- CHECK_NE(sample_matrix_(i, 0), 0.0)
- << ": Sample " << i << " is not seeded.";
- }
-
- // And solve.
- offset_matrix_ = SolveOffsets();
-
- // Save off the base offsets so we can work in deltas from here out. That
- // will significantly simplify the numerical precision problems.
- for (size_t i = 0; i < num_nodes; ++i) {
- base_offset_matrix_(i, 0) =
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- std::chrono::duration<double>(offset_matrix_(i, 0)));
- }
-
- {
- // Shift everything so we never could (reasonably) require the distributed
- // clock to have a large backwards jump in time. This makes it so the boot
- // time on the node up the longest will essentially start matching the
- // distributed clock.
- const chrono::nanoseconds offset = -base_offset_matrix_.maxCoeff();
- for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
- base_offset_matrix_(i, 0) += offset;
- }
- }
-
- {
- // Re-compute the samples and setup all the filters so that they
- // subtract this base offset.
-
- size_t i = 1;
- for (std::pair<const std::tuple<const Node *, const Node *>,
- message_bridge::ClippedAverageFilter> &filter : filters_) {
- CHECK(filter.second.sample_pointer() == &sample_matrix_(i, 0));
-
- const Node *const node_a = std::get<0>(filter.first);
- const Node *const node_b = std::get<1>(filter.first);
-
- const size_t node_a_index =
- configuration::GetNodeIndex(configuration(), node_a);
- const size_t node_b_index =
- configuration::GetNodeIndex(configuration(), node_b);
-
- filter.second.set_base_offset(base_offset_matrix_(node_a_index) -
- base_offset_matrix_(node_b_index));
-
- ++i;
- }
- }
-
- // Now, iterate again through all the offsets now that we have set the base
- // offset to something sane. This will seed everything with an accurate
- // initial offset.
- for (std::unique_ptr<State> &state : states_) {
- for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
- TimestampMerger::DeliveryTimestamp timestamp =
- state->OldestTimestampForChannel(i);
- if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
- CHECK(state->MaybeUpdateTimestamp(timestamp, i));
- }
- }
- }
-
for (std::unique_ptr<State> &state : states_) {
state->SeedSortedMessages();
}
+ // Rank of the map matrix tells you if all the nodes are in communication
+ // with each other, which tells you if the offsets are observable.
+ const size_t connected_nodes =
+ Eigen::FullPivLU<
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>(map_matrix_)
+ .rank();
+
+ // We don't need to support isolated nodes until someone has a real use
+ // case.
+ CHECK_EQ(connected_nodes, num_nodes)
+ << ": There is a node which isn't communicating with the rest.";
+
+ // And solve.
UpdateOffsets();
- // We want to start the log file at the last start time of the log files from
- // all the nodes. Compute how long each node's simulation needs to run to
- // move time to this point.
+ // We want to start the log file at the last start time of the log files
+ // from all the nodes. Compute how long each node's simulation needs to run
+ // to move time to this point.
distributed_clock::time_point start_time = distributed_clock::min_time;
+ // TODO(austin): We want an "OnStart" callback for each node rather than
+ // running until the last node.
+
for (std::unique_ptr<State> &state : states_) {
- // Setup the realtime clock to have something sane in it now.
- state->SetRealtimeOffset(state->monotonic_start_time(),
- state->realtime_start_time());
- // And start computing the start time on the distributed clock now that that
- // works.
+ VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
+ << MaybeNodeName(state->event_loop()->node()) << "now "
+ << state->monotonic_now();
+ // And start computing the start time on the distributed clock now that
+ // that works.
start_time = std::max(
start_time, state->ToDistributedClock(state->monotonic_start_time()));
}
- CHECK_GE(start_time, distributed_clock::epoch());
+
+ CHECK_GE(start_time, distributed_clock::epoch())
+ << ": Hmm, we have a node starting before the start of time. Offset "
+ "everything.";
// Forwarding is tracked per channel. If it is enabled, we want to turn it
// off. Otherwise messages replayed will get forwarded across to the other
- // nodes, and also replayed on the other nodes. This may not satisfy all our
- // users, but it'll start the discussion.
+ // nodes, and also replayed on the other nodes. This may not satisfy all
+ // our users, but it'll start the discussion.
if (configuration::MultiNode(event_loop_factory_->configuration())) {
for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
const Channel *channel = logged_configuration()->channels()->Get(i);
@@ -598,34 +795,177 @@
// to timestamps on log files where the timestamp log file starts before the
// data. In this case, it is reasonable to expect missing data.
ignore_missing_data_ = true;
- VLOG(1) << "Running until start time: " << start_time;
+ VLOG(1) << "Running until " << start_time << " in Register";
event_loop_factory_->RunFor(start_time.time_since_epoch());
VLOG(1) << "At start time";
// Now that we are running for real, missing data means that the log file is
// corrupted or went wrong.
ignore_missing_data_ = false;
-}
-void LogReader::UpdateOffsets() {
- // TODO(austin): Evaluate less accurate inverses. We might be able to
- // do some tricks to keep the accuracy up.
- offset_matrix_ = SolveOffsets();
-
- size_t node_index = 0;
for (std::unique_ptr<State> &state : states_) {
- state->SetDistributedOffset(-offset(node_index), 1.0);
- ++node_index;
+ // Make the RT clock be correct before handing it to the user.
+ if (state->realtime_start_time() != realtime_clock::min_time) {
+ state->SetRealtimeOffset(state->monotonic_start_time(),
+ state->realtime_start_time());
+ }
+ VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
+ << MaybeNodeName(state->event_loop()->node()) << "now "
+ << state->monotonic_now();
+ }
+
+ if (FLAGS_timestamps_to_csv) {
+ for (std::pair<const std::tuple<const Node *, const Node *>,
+ std::tuple<message_bridge::NoncausalOffsetEstimator>>
+ &filter : filters_) {
+ const Node *const node_a = std::get<0>(filter.first);
+ const Node *const node_b = std::get<1>(filter.first);
+
+ std::get<0>(filter.second)
+ .SetFirstFwdTime(event_loop_factory_->GetNodeEventLoopFactory(node_a)
+ ->monotonic_now());
+ std::get<0>(filter.second)
+ .SetFirstRevTime(event_loop_factory_->GetNodeEventLoopFactory(node_b)
+ ->monotonic_now());
+ }
}
}
-std::tuple<message_bridge::ClippedAverageFilter *, bool> LogReader::GetFilter(
+void LogReader::UpdateOffsets() {
+ VLOG(2) << "Samples are " << offset_matrix_;
+ VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
+ std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
+ Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
+ "]");
+ VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
+ << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
+
+ size_t node_index = 0;
+ for (std::unique_ptr<State> &state : states_) {
+ state->SetDistributedOffset(offset(node_index), slope(node_index));
+ VLOG(1) << "Offset for node " << node_index << " "
+ << MaybeNodeName(state->event_loop()->node()) << "is "
+ << aos::distributed_clock::time_point(offset(node_index))
+ << " slope " << std::setprecision(9) << std::fixed
+ << slope(node_index);
+ ++node_index;
+ }
+
+ if (VLOG_IS_ON(1)) {
+ LogFit("Offset is");
+ }
+}
+
+void LogReader::LogFit(std::string_view prefix) {
+ for (std::unique_ptr<State> &state : states_) {
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << " now "
+ << state->monotonic_now() << " distributed "
+ << event_loop_factory_->distributed_now();
+ }
+
+ for (std::pair<const std::tuple<const Node *, const Node *>,
+ std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
+ filters_) {
+ message_bridge::NoncausalOffsetEstimator *estimator =
+ &std::get<0>(filter.second);
+
+ if (estimator->a_timestamps().size() == 0 &&
+ estimator->b_timestamps().size() == 0) {
+ continue;
+ }
+
+ if (VLOG_IS_ON(1)) {
+ estimator->LogFit(prefix);
+ }
+
+ const Node *const node_a = std::get<0>(filter.first);
+ const Node *const node_b = std::get<1>(filter.first);
+
+ const size_t node_a_index =
+ configuration::GetNodeIndex(configuration(), node_a);
+ const size_t node_b_index =
+ configuration::GetNodeIndex(configuration(), node_b);
+
+ const double recovered_slope =
+ slope(node_b_index) / slope(node_a_index) - 1.0;
+ const int64_t recovered_offset =
+ offset(node_b_index).count() - offset(node_a_index).count() *
+ slope(node_b_index) /
+ slope(node_a_index);
+
+ VLOG(1) << "Recovered slope " << std::setprecision(20) << recovered_slope
+ << " (error " << recovered_slope - estimator->fit().slope() << ") "
+ << " offset " << std::setprecision(20) << recovered_offset
+ << " (error "
+ << recovered_offset - estimator->fit().offset().count() << ")";
+
+ const aos::distributed_clock::time_point a0 =
+ states_[node_a_index]->ToDistributedClock(
+ std::get<0>(estimator->a_timestamps()[0]));
+ const aos::distributed_clock::time_point a1 =
+ states_[node_a_index]->ToDistributedClock(
+ std::get<0>(estimator->a_timestamps()[1]));
+
+ VLOG(1) << node_a->name()->string_view() << " timestamps()[0] = "
+ << std::get<0>(estimator->a_timestamps()[0]) << " -> " << a0
+ << " distributed -> " << node_b->name()->string_view() << " "
+ << states_[node_b_index]->FromDistributedClock(a0) << " should be "
+ << aos::monotonic_clock::time_point(
+ std::chrono::nanoseconds(static_cast<int64_t>(
+ std::get<0>(estimator->a_timestamps()[0])
+ .time_since_epoch()
+ .count() *
+ (1.0 + estimator->fit().slope()))) +
+ estimator->fit().offset())
+ << ((a0 <= event_loop_factory_->distributed_now())
+ ? ""
+ : " After now, investigate");
+ VLOG(1) << node_a->name()->string_view() << " timestamps()[1] = "
+ << std::get<0>(estimator->a_timestamps()[1]) << " -> " << a1
+ << " distributed -> " << node_b->name()->string_view() << " "
+ << states_[node_b_index]->FromDistributedClock(a1) << " should be "
+ << aos::monotonic_clock::time_point(
+ std::chrono::nanoseconds(static_cast<int64_t>(
+ std::get<0>(estimator->a_timestamps()[1])
+ .time_since_epoch()
+ .count() *
+ (1.0 + estimator->fit().slope()))) +
+ estimator->fit().offset())
+ << ((event_loop_factory_->distributed_now() <= a1)
+ ? ""
+ : " Before now, investigate");
+
+ const aos::distributed_clock::time_point b0 =
+ states_[node_b_index]->ToDistributedClock(
+ std::get<0>(estimator->b_timestamps()[0]));
+ const aos::distributed_clock::time_point b1 =
+ states_[node_b_index]->ToDistributedClock(
+ std::get<0>(estimator->b_timestamps()[1]));
+
+ VLOG(1) << node_b->name()->string_view() << " timestamps()[0] = "
+ << std::get<0>(estimator->b_timestamps()[0]) << " -> " << b0
+ << " distributed -> " << node_a->name()->string_view() << " "
+ << states_[node_a_index]->FromDistributedClock(b0)
+ << ((b0 <= event_loop_factory_->distributed_now())
+ ? ""
+ : " After now, investigate");
+ VLOG(1) << node_b->name()->string_view() << " timestamps()[1] = "
+ << std::get<0>(estimator->b_timestamps()[1]) << " -> " << b1
+ << " distributed -> " << node_a->name()->string_view() << " "
+ << states_[node_a_index]->FromDistributedClock(b1)
+ << ((event_loop_factory_->distributed_now() <= b1)
+ ? ""
+ : " Before now, investigate");
+ }
+}
+
+message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
const Node *node_a, const Node *node_b) {
CHECK_NE(node_a, node_b);
CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
if (node_a > node_b) {
- return std::make_pair(std::get<0>(GetFilter(node_b, node_a)), false);
+ return GetFilter(node_b, node_a);
}
auto tuple = std::make_tuple(node_a, node_b);
@@ -633,53 +973,27 @@
auto it = filters_.find(tuple);
if (it == filters_.end()) {
- auto &x = filters_
- .insert(std::make_pair(
- tuple, message_bridge::ClippedAverageFilter()))
- .first->second;
+ auto &x =
+ filters_
+ .insert(std::make_pair(
+ tuple, std::make_tuple(message_bridge::NoncausalOffsetEstimator(
+ node_a, node_b))))
+ .first->second;
if (FLAGS_timestamps_to_csv) {
- std::string fwd_name =
- absl::StrCat("/tmp/timestamp_", node_a->name()->string_view(), "_",
- node_b->name()->string_view());
- x.SetFwdCsvFileName(fwd_name);
- std::string rev_name =
- absl::StrCat("/tmp/timestamp_", node_b->name()->string_view(), "_",
- node_a->name()->string_view());
- x.SetRevCsvFileName(rev_name);
+ std::get<0>(x).SetFwdCsvFileName(absl::StrCat(
+ "/tmp/timestamp_noncausal_", node_a->name()->string_view(), "_",
+ node_b->name()->string_view()));
+ std::get<0>(x).SetRevCsvFileName(absl::StrCat(
+ "/tmp/timestamp_noncausal_", node_b->name()->string_view(), "_",
+ node_a->name()->string_view()));
}
- return std::make_tuple(&x, true);
+ return &std::get<0>(x);
} else {
- return std::make_tuple(&(it->second), true);
+ return &std::get<0>(it->second);
}
}
-bool LogReader::State::MaybeUpdateTimestamp(
- const TimestampMerger::DeliveryTimestamp &channel_timestamp,
- int channel_index) {
- if (channel_timestamp.monotonic_remote_time == monotonic_clock::min_time) {
- CHECK(std::get<0>(filters_[channel_index]) == nullptr);
- return false;
- }
-
- // Got a forwarding timestamp!
- CHECK(std::get<0>(filters_[channel_index]) != nullptr);
-
- // Call the correct method depending on if we are the forward or reverse
- // direction here.
- if (std::get<1>(filters_[channel_index])) {
- std::get<0>(filters_[channel_index])
- ->FwdSample(channel_timestamp.monotonic_event_time,
- channel_timestamp.monotonic_event_time -
- channel_timestamp.monotonic_remote_time);
- } else {
- std::get<0>(filters_[channel_index])
- ->RevSample(channel_timestamp.monotonic_event_time,
- channel_timestamp.monotonic_event_time -
- channel_timestamp.monotonic_remote_time);
- }
- return true;
-}
void LogReader::Register(EventLoop *event_loop) {
State *state =
@@ -702,10 +1016,8 @@
const Channel *channel =
RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
- std::tuple<message_bridge::ClippedAverageFilter *, bool> filter =
- std::make_tuple(nullptr, false);
-
NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
+ message_bridge::NoncausalOffsetEstimator *filter = nullptr;
if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
@@ -730,31 +1042,48 @@
}
state->set_timer_handler(event_loop->AddTimer([this, state]() {
+ VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
+ << "at " << state->event_loop()->context().monotonic_event_time
+ << " now " << state->monotonic_now();
if (state->OldestMessageTime() == monotonic_clock::max_time) {
--live_nodes_;
- VLOG(1) << "Node down!";
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
if (live_nodes_ == 0) {
event_loop_factory_->Exit();
}
return;
}
- bool update_offsets = false;
TimestampMerger::DeliveryTimestamp channel_timestamp;
int channel_index;
FlatbufferVector<MessageHeader> channel_data =
FlatbufferVector<MessageHeader>::Empty();
- bool dummy_update_time = false;
+ if (VLOG_IS_ON(1)) {
+ LogFit("Offset was");
+ }
+
+ bool update_time;
std::tie(channel_timestamp, channel_index, channel_data) =
- state->PopOldest(&dummy_update_time);
+ state->PopOldest(&update_time);
const monotonic_clock::time_point monotonic_now =
state->event_loop()->context().monotonic_event_time;
- CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
- << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
- << monotonic_now << " trying to send "
- << channel_timestamp.monotonic_event_time << " failure "
- << state->DebugString();
+ if (!FLAGS_skip_order_validation) {
+ CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
+ << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
+ << monotonic_now << " trying to send "
+ << channel_timestamp.monotonic_event_time << " failure "
+ << state->DebugString();
+ } else if (monotonic_now != channel_timestamp.monotonic_event_time) {
+ LOG(WARNING) << "Check failed: monotonic_now == "
+ "channel_timestamp.monotonic_event_time) ("
+ << monotonic_now << " vs. "
+ << channel_timestamp.monotonic_event_time
+ << "): " << FlatbufferToJson(state->event_loop()->node())
+ << " Now " << monotonic_now << " trying to send "
+ << channel_timestamp.monotonic_event_time << " failure "
+ << state->DebugString();
+ }
if (channel_timestamp.monotonic_event_time >
state->monotonic_start_time() ||
@@ -764,17 +1093,39 @@
channel_data.message().data() != nullptr) {
CHECK(channel_data.message().data() != nullptr)
<< ": Got a message without data. Forwarding entry which was "
- "not matched? Use --skip_missing_forwarding_entries to ignore "
+ "not matched? Use --skip_missing_forwarding_entries to "
+ "ignore "
"this.";
- if (state->MaybeUpdateTimestamp(channel_timestamp, channel_index)) {
+ if (update_time) {
// Confirm that the message was sent on the sending node before the
// destination node (this node). As a proxy, do this by making sure
// that time on the source node is past when the message was sent.
- CHECK_LT(channel_timestamp.monotonic_remote_time,
- state->monotonic_remote_now(channel_index));
-
- update_offsets = true;
+ if (!FLAGS_skip_order_validation) {
+ CHECK_LT(channel_timestamp.monotonic_remote_time,
+ state->monotonic_remote_now(channel_index))
+ << state->event_loop()->node()->name()->string_view() << " to "
+ << state->remote_node(channel_index)->name()->string_view()
+ << " " << state->DebugString();
+ } else if (channel_timestamp.monotonic_remote_time >=
+ state->monotonic_remote_now(channel_index)) {
+ LOG(WARNING)
+ << "Check failed: channel_timestamp.monotonic_remote_time < "
+ "state->monotonic_remote_now(channel_index) ("
+ << channel_timestamp.monotonic_remote_time << " vs. "
+ << state->monotonic_remote_now(channel_index) << ") "
+ << state->event_loop()->node()->name()->string_view() << " to "
+ << state->remote_node(channel_index)->name()->string_view()
+ << " currently " << channel_timestamp.monotonic_event_time
+ << " ("
+ << state->ToDistributedClock(
+ channel_timestamp.monotonic_event_time)
+ << ") remote event time "
+ << channel_timestamp.monotonic_remote_time << " ("
+ << state->RemoteToDistributedClock(
+ channel_index, channel_timestamp.monotonic_remote_time)
+ << ") " << state->DebugString();
+ }
if (FLAGS_timestamps_to_csv) {
if (offset_fp_ == nullptr) {
@@ -789,13 +1140,14 @@
std::chrono::duration_cast<std::chrono::duration<double>>(
channel_timestamp.realtime_event_time - first_time_)
.count());
- for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
- fprintf(
- offset_fp_, ", %.9f",
- offset_matrix_(i, 0) +
- std::chrono::duration_cast<std::chrono::duration<double>>(
- base_offset_matrix_(i, 0))
- .count());
+ for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
+ fprintf(offset_fp_, ", %.9f",
+ time_offset_matrix_(i, 0) +
+ time_slope_matrix_(i, 0) *
+ chrono::duration<double>(
+ event_loop_factory_->distributed_now()
+ .time_since_epoch())
+ .count());
}
fprintf(offset_fp_, "\n");
}
@@ -805,21 +1157,26 @@
state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
channel_timestamp.realtime_event_time);
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
+ << channel_timestamp.monotonic_event_time;
+ // TODO(austin): std::move channel_data in and make that efficient in
+ // simulation.
state->Send(channel_index, channel_data.message().data()->Data(),
channel_data.message().data()->size(),
channel_timestamp.monotonic_remote_time,
channel_timestamp.realtime_remote_time,
channel_timestamp.remote_queue_index);
- } else if (state->at_end()) {
+ } else if (state->at_end() && !ignore_missing_data_) {
// We are at the end of the log file and found missing data. Finish
- // reading the rest of the log file and call it quits. We don't want to
- // replay partial data.
+ // reading the rest of the log file and call it quits. We don't want
+ // to replay partial data.
while (state->OldestMessageTime() != monotonic_clock::max_time) {
bool update_time_dummy;
state->PopOldest(&update_time_dummy);
}
+ } else {
+ CHECK(channel_data.message().data() == nullptr) << ": Nullptr";
}
-
} else {
LOG(WARNING)
<< "Not sending data from before the start of the log file. "
@@ -830,22 +1187,100 @@
const monotonic_clock::time_point next_time = state->OldestMessageTime();
if (next_time != monotonic_clock::max_time) {
+ VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+ << "wakeup for " << next_time << "("
+ << state->ToDistributedClock(next_time)
+ << " distributed), now is " << state->monotonic_now();
state->Setup(next_time);
} else {
- // Set a timer up immediately after now to die. If we don't do this, then
- // the senders waiting on the message we just read will never get called.
+ VLOG(1) << MaybeNodeName(state->event_loop()->node())
+ << "No next message, scheduling shutdown";
+ // Set a timer up immediately after now to die. If we don't do this,
+ // then the senders waiting on the message we just read will never get
+ // called.
if (event_loop_factory_ != nullptr) {
state->Setup(monotonic_now + event_loop_factory_->send_delay() +
std::chrono::nanoseconds(1));
}
}
- // Once we make this call, the current time changes. So do everything which
- // involves time before changing it. That especially includes sending the
- // message.
- if (update_offsets) {
+ // Once we make this call, the current time changes. So do everything
+ // which involves time before changing it. That especially includes
+ // sending the message.
+ if (update_time) {
+ VLOG(1) << MaybeNodeName(state->event_loop()->node())
+ << "updating offsets";
+
+ std::vector<aos::monotonic_clock::time_point> before_times;
+ before_times.resize(states_.size());
+ std::transform(states_.begin(), states_.end(), before_times.begin(),
+ [](const std::unique_ptr<State> &state) {
+ return state->monotonic_now();
+ });
+
+ for (size_t i = 0; i < states_.size(); ++i) {
+ VLOG(1) << MaybeNodeName(
+ states_[i]->event_loop()->node())
+ << "before " << states_[i]->monotonic_now();
+ }
+
UpdateOffsets();
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Now is now "
+ << state->monotonic_now();
+
+ for (size_t i = 0; i < states_.size(); ++i) {
+ VLOG(1) << MaybeNodeName(
+ states_[i]->event_loop()->node())
+ << "after " << states_[i]->monotonic_now();
+ }
+
+ // TODO(austin): We should be perfect.
+ const std::chrono::nanoseconds kTolerance{3};
+ if (!FLAGS_skip_order_validation) {
+ CHECK_GE(next_time, state->monotonic_now())
+ << ": Time skipped the next event.";
+
+ for (size_t i = 0; i < states_.size(); ++i) {
+ CHECK_GE(states_[i]->monotonic_now(), before_times[i] - kTolerance)
+ << ": Time changed too much on node "
+ << MaybeNodeName(states_[i]->event_loop()->node());
+ CHECK_LE(states_[i]->monotonic_now(), before_times[i] + kTolerance)
+ << ": Time changed too much on node "
+ << states_[i]->event_loop()->node()->name()->string_view();
+ }
+ } else {
+ if (next_time < state->monotonic_now()) {
+ LOG(WARNING) << "Check failed: next_time >= "
+ "state->monotonic_now() ("
+ << next_time << " vs. " << state->monotonic_now()
+ << "): Time skipped the next event.";
+ }
+ for (size_t i = 0; i < states_.size(); ++i) {
+ if (states_[i]->monotonic_now() >= before_times[i] - kTolerance) {
+ LOG(WARNING) << "Check failed: "
+ "states_[i]->monotonic_now() "
+ ">= before_times[i] - kTolerance ("
+ << states_[i]->monotonic_now() << " vs. "
+ << before_times[i] - kTolerance
+ << ") : Time changed too much on node "
+ << MaybeNodeName(states_[i]->event_loop()->node());
+ }
+ if (states_[i]->monotonic_now() <= before_times[i] + kTolerance) {
+ LOG(WARNING) << "Check failed: "
+ "states_[i]->monotonic_now() "
+ "<= before_times[i] + kTolerance ("
+ << states_[i]->monotonic_now() << " vs. "
+ << before_times[i] - kTolerance
+ << ") : Time changed too much on node "
+ << MaybeNodeName(states_[i]->event_loop()->node());
+ }
+ }
+ }
}
+
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Done sending at "
+ << state->event_loop()->context().monotonic_event_time << " now "
+ << state->monotonic_now();
}));
++live_nodes_;
@@ -942,8 +1377,8 @@
new_name_builder.add_name(name_offset);
new_name_fbb.Finish(new_name_builder.Finish());
const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
- // Retrieve the channel that we want to copy, confirming that it is actually
- // present in base_config.
+ // Retrieve the channel that we want to copy, confirming that it is
+ // actually present in base_config.
const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
base_config, logged_configuration()->channels()->Get(pair.first), "",
nullptr));
@@ -1020,7 +1455,7 @@
void LogReader::State::SetChannel(
size_t channel, std::unique_ptr<RawSender> sender,
- std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
+ message_bridge::NoncausalOffsetEstimator *filter,
NodeEventLoopFactory *channel_target_event_loop_factory) {
channels_[channel] = std::move(sender);
filters_[channel] = filter;
@@ -1034,21 +1469,27 @@
CHECK_GT(sorted_messages_.size(), 0u);
std::tuple<TimestampMerger::DeliveryTimestamp, int,
- FlatbufferVector<MessageHeader>>
+ FlatbufferVector<MessageHeader>,
+ message_bridge::NoncausalOffsetEstimator *>
result = std::move(sorted_messages_.front());
- VLOG(1) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
+ VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
<< std::get<0>(result).monotonic_event_time;
sorted_messages_.pop_front();
SeedSortedMessages();
- *update_time = false;
+ if (std::get<3>(result) != nullptr) {
+ *update_time = std::get<3>(result)->Pop(
+ event_loop_->node(), std::get<0>(result).monotonic_event_time);
+ } else {
+ *update_time = false;
+ }
return std::make_tuple(std::get<0>(result), std::get<1>(result),
std::move(std::get<2>(result)));
}
monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
if (sorted_messages_.size() > 0) {
- VLOG(1) << MaybeNodeName(event_loop_->node()) << "oldest message at "
+ VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
<< std::get<0>(sorted_messages_.front()).monotonic_event_time;
return std::get<0>(sorted_messages_.front()).monotonic_event_time;
}
@@ -1081,11 +1522,26 @@
FlatbufferVector<MessageHeader> channel_data =
FlatbufferVector<MessageHeader>::Empty();
+ message_bridge::NoncausalOffsetEstimator *filter = nullptr;
+
std::tie(channel_timestamp, channel_index, channel_data) =
channel_merger_->PopOldest();
+ // Skip any messages without forwarding information.
+ if (channel_timestamp.monotonic_remote_time != monotonic_clock::min_time) {
+ // Got a forwarding timestamp!
+ filter = filters_[channel_index];
+
+ CHECK(filter != nullptr);
+
+ // Call the correct method depending on if we are the forward or
+ // reverse direction here.
+ filter->Sample(event_loop_->node(),
+ channel_timestamp.monotonic_event_time,
+ channel_timestamp.monotonic_remote_time);
+ }
sorted_messages_.emplace_back(channel_timestamp, channel_index,
- std::move(channel_data));
+ std::move(channel_data), filter);
}
}
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index f1af17e..00becfa 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -15,9 +15,9 @@
table LogFileHeader {
// Time this log file started on the monotonic clock in nanoseconds.
- monotonic_start_time:long;
+ monotonic_start_time:long = -9223372036854775808;
// Time this log file started on the realtime clock in nanoseconds.
- realtime_start_time:long;
+ realtime_start_time:long = -9223372036854775808;
// Messages are not written in order to disk. They will be out of order by
// at most this duration (in nanoseconds). If the log reader buffers until
@@ -65,3 +65,5 @@
// Queue index of this message on the remote node.
remote_queue_index:uint = 4294967295;
}
+
+root_type MessageHeader;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index a123dcc..e599091 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -4,18 +4,22 @@
#include <chrono>
#include <deque>
#include <string_view>
+#include <tuple>
#include <vector>
#include "Eigen/Dense"
#include "absl/strings/str_cat.h"
#include "absl/types/span.h"
#include "aos/events/event_loop.h"
+#include "aos/events/logging/eigen_mpq.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/events/simulated_event_loop.h"
+#include "aos/network/message_bridge_server_generated.h"
#include "aos/network/timestamp_filter.h"
#include "aos/time/time.h"
#include "flatbuffers/flatbuffers.h"
+#include "third_party/gmp/gmpxx.h"
namespace aos {
namespace logger {
@@ -25,11 +29,18 @@
LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
virtual ~LogNamer() {}
- virtual void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
- const Node *node) = 0;
+ virtual void WriteHeader(
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header,
+ const Node *node) = 0;
virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
+ virtual DetachedBufferWriter *MakeForwardedTimestampWriter(
+ const Channel *channel, const Node *node) = 0;
+ virtual void Rotate(
+ const Node *node,
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
+ &header) = 0;
const std::vector<const Node *> &nodes() const { return nodes_; }
const Node *node() const { return node_; }
@@ -41,21 +52,28 @@
class LocalLogNamer : public LogNamer {
public:
- LocalLogNamer(DetachedBufferWriter *writer, const Node *node)
- : LogNamer(node), writer_(writer) {}
+ LocalLogNamer(std::string_view base_name, const Node *node)
+ : LogNamer(node), base_name_(base_name), data_writer_(OpenDataWriter()) {}
- ~LocalLogNamer() override { writer_->Flush(); }
-
- void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
- const Node *node) override {
+ void WriteHeader(
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header,
+ const Node *node) override {
CHECK_EQ(node, this->node());
- writer_->WriteSizedFlatbuffer(
- absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
+ data_writer_->WriteSizedFlatbuffer(header.full_span());
}
DetachedBufferWriter *MakeWriter(const Channel *channel) override {
CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
- return writer_;
+ return data_writer_.get();
+ }
+
+ void Rotate(const Node *node,
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
+ &header) override {
+ CHECK(node == this->node());
+ ++part_number_;
+ *data_writer_ = std::move(*OpenDataWriter());
+ data_writer_->WriteSizedFlatbuffer(header.full_span());
}
DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
@@ -65,11 +83,23 @@
CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
node_))
<< ": Delivery times aren't logged for this channel on this node.";
- return writer_;
+ return data_writer_.get();
+ }
+
+ DetachedBufferWriter *MakeForwardedTimestampWriter(
+ const Channel * /*channel*/, const Node * /*node*/) override {
+ LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
+ return nullptr;
}
private:
- DetachedBufferWriter *writer_;
+ std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
+ return std::make_unique<DetachedBufferWriter>(
+ absl::StrCat(base_name_, ".part", part_number_, ".bfbs"));
+ }
+ const std::string base_name_;
+ size_t part_number_ = 0;
+ std::unique_ptr<DetachedBufferWriter> data_writer_;
};
// TODO(austin): Split naming files from making files so we can re-use the
@@ -81,29 +111,20 @@
: LogNamer(node),
base_name_(base_name),
configuration_(configuration),
- data_writer_(std::make_unique<DetachedBufferWriter>(absl::StrCat(
- base_name_, "_", node->name()->string_view(), "_data.bfbs"))) {}
+ data_writer_(OpenDataWriter()) {}
// Writes the header to all log files for a specific node. This function
// needs to be called after all the writers are created.
- void WriteHeader(flatbuffers::FlatBufferBuilder *fbb, const Node *node) {
- if (node == this->node()) {
- data_writer_->WriteSizedFlatbuffer(
- absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
- } else {
- for (std::pair<const Channel *const,
- std::unique_ptr<DetachedBufferWriter>> &data_writer :
- data_writers_) {
- if (configuration::ChannelIsSendableOnNode(data_writer.first, node)) {
- data_writer.second->WriteSizedFlatbuffer(absl::Span<const uint8_t>(
- fbb->GetBufferPointer(), fbb->GetSize()));
- }
- }
- }
- }
+ void WriteHeader(
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header,
+ const Node *node) override;
+
+ void Rotate(const Node *node,
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
+ &header) override;
// Makes a data logger for a specific channel.
- DetachedBufferWriter *MakeWriter(const Channel *channel) {
+ DetachedBufferWriter *MakeWriter(const Channel *channel) override {
// See if we can read the data on this node at all.
const bool is_readable =
configuration::ChannelIsReadableOnNode(channel, this->node());
@@ -123,36 +144,62 @@
// generated if it is sendable on this node.
if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
return data_writer_.get();
- } else {
- // Ok, we have data that is being forwarded to us that we are supposed to
- // log. It needs to be logged with send timestamps, but be sorted enough
- // to be able to be processed.
- CHECK(data_writers_.find(channel) == data_writers_.end());
-
- // Track that this node is being logged.
- if (configuration::MultiNode(configuration_)) {
- const Node *source_node = configuration::GetNode(
- configuration_, channel->source_node()->string_view());
- if (std::find(nodes_.begin(), nodes_.end(), source_node) ==
- nodes_.end()) {
- nodes_.emplace_back(source_node);
- }
- }
-
- return data_writers_
- .insert(std::make_pair(
- channel,
- std::make_unique<DetachedBufferWriter>(absl::StrCat(
- base_name_, "_", channel->source_node()->string_view(),
- "_data", channel->name()->string_view(), "/",
- channel->type()->string_view(), ".bfbs"))))
- .first->second.get();
}
+
+ // Ok, we have data that is being forwarded to us that we are supposed to
+ // log. It needs to be logged with send timestamps, but be sorted enough
+ // to be able to be processed.
+ CHECK(data_writers_.find(channel) == data_writers_.end());
+
+ // Track that this node is being logged.
+ const Node *source_node = configuration::GetNode(
+ configuration_, channel->source_node()->string_view());
+
+ if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
+ nodes_.emplace_back(source_node);
+ }
+
+ DataWriter data_writer;
+ data_writer.node = source_node;
+ data_writer.rotate = [this](const Channel *channel,
+ DataWriter *data_writer) {
+ OpenWriter(channel, data_writer);
+ };
+ data_writer.rotate(channel, &data_writer);
+
+ return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
+ .first->second.writer.get();
+ }
+
+ DetachedBufferWriter *MakeForwardedTimestampWriter(
+ const Channel *channel, const Node *node) override {
+ // See if we can read the data on this node at all.
+ const bool is_readable =
+ configuration::ChannelIsReadableOnNode(channel, this->node());
+ CHECK(is_readable) << ": "
+ << configuration::CleanedChannelToString(channel);
+
+ CHECK(data_writers_.find(channel) == data_writers_.end());
+
+ if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
+ nodes_.emplace_back(node);
+ }
+
+ DataWriter data_writer;
+ data_writer.node = node;
+ data_writer.rotate = [this](const Channel *channel,
+ DataWriter *data_writer) {
+ OpenForwardedTimestampWriter(channel, data_writer);
+ };
+ data_writer.rotate(channel, &data_writer);
+
+ return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
+ .first->second.writer.get();
}
// Makes a timestamp (or timestamp and data) logger for a channel and
// forwarding connection.
- DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) {
+ DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
const bool log_delivery_times =
(this->node() == nullptr)
? false
@@ -168,40 +215,93 @@
const std::vector<const Node *> &nodes() const { return nodes_; }
private:
+ // Files to write remote data to. We want one per channel. Maps the channel
+ // to the writer, Node, and part number.
+ struct DataWriter {
+ std::unique_ptr<DetachedBufferWriter> writer = nullptr;
+ const Node *node;
+ size_t part_number = 0;
+ std::function<void(const Channel *, DataWriter *)> rotate;
+ };
+
+ void OpenForwardedTimestampWriter(const Channel *channel,
+ DataWriter *data_writer) {
+ std::string filename =
+ absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
+ "/", channel->type()->string_view(), ".part",
+ data_writer->part_number, ".bfbs");
+
+ if (!data_writer->writer) {
+ data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+ } else {
+ *data_writer->writer = DetachedBufferWriter(filename);
+ }
+ }
+
+ void OpenWriter(const Channel *channel, DataWriter *data_writer) {
+ const std::string filename = absl::StrCat(
+ base_name_, "_", channel->source_node()->string_view(), "_data",
+ channel->name()->string_view(), "/", channel->type()->string_view(),
+ ".part", data_writer->part_number, ".bfbs");
+ if (!data_writer->writer) {
+ data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+ } else {
+ *data_writer->writer = DetachedBufferWriter(filename);
+ }
+ }
+
+ std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
+ return std::make_unique<DetachedBufferWriter>(
+ absl::StrCat(base_name_, "_", node()->name()->string_view(),
+ "_data.part", part_number_, ".bfbs"));
+ }
+
const std::string base_name_;
const Configuration *const configuration_;
+ size_t part_number_ = 0;
+
// File to write both delivery timestamps and local data to.
std::unique_ptr<DetachedBufferWriter> data_writer_;
- // Files to write remote data to. We want one per channel.
- std::map<const Channel *, std::unique_ptr<DetachedBufferWriter>>
- data_writers_;
-};
+ std::map<const Channel *, DataWriter> data_writers_;
+};
// Logs all channels available in the event loop to disk every 100 ms.
// Start by logging one message per channel to capture any state and
// configuration that is sent rately on a channel and would affect execution.
class Logger {
public:
- Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
+ Logger(std::string_view base_name, EventLoop *event_loop,
std::chrono::milliseconds polling_period =
std::chrono::milliseconds(100));
Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
std::chrono::milliseconds polling_period =
std::chrono::milliseconds(100));
- // Rotates the log file with the new writer. This writes out the header
- // again, but keeps going as if nothing else happened.
- void Rotate(DetachedBufferWriter *writer);
- void Rotate(std::unique_ptr<LogNamer> log_namer);
+ // Rotates the log file(s), triggering new part files to be written for each
+ // log file.
+ void Rotate();
private:
void WriteHeader();
- void WriteHeader(const Node *node);
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+ const Node *node);
+
+ bool MaybeUpdateTimestamp(
+ const Node *node, int node_index,
+ aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time);
void DoLogData();
+ void WriteMissingTimestamps();
+
+ void StartLogging();
+
+ // Fetches from each channel until all the data is logged.
+ void LogUntil(monotonic_clock::time_point t);
+
EventLoop *event_loop_;
std::unique_ptr<LogNamer> log_namer_;
@@ -219,6 +319,10 @@
DetachedBufferWriter *writer = nullptr;
DetachedBufferWriter *timestamp_writer = nullptr;
+ DetachedBufferWriter *contents_writer = nullptr;
+ const Node *writer_node = nullptr;
+ const Node *timestamp_node = nullptr;
+ int node_index = 0;
};
std::vector<FetcherStruct> fetchers_;
@@ -236,6 +340,25 @@
// Max size that the header has consumed. This much extra data will be
// reserved in the builder to avoid reallocating.
size_t max_header_size_ = 0;
+
+ // Fetcher for all the statistics from all the nodes.
+ aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
+
+ // Sets the start time for a specific node.
+ void SetStartTime(size_t node_index,
+ aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time);
+
+ struct NodeState {
+ aos::monotonic_clock::time_point monotonic_start_time =
+ aos::monotonic_clock::min_time;
+ aos::realtime_clock::time_point realtime_start_time =
+ aos::realtime_clock::min_time;
+
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
+ aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
+ };
+ std::vector<NodeState> node_state_;
};
// We end up with one of the following 3 log file types.
@@ -365,13 +488,25 @@
// channels from calls to RemapLoggedChannel.
void MakeRemappedConfig();
+ // Returns the number of nodes.
+ size_t nodes_count() const {
+ return !configuration::MultiNode(logged_configuration())
+ ? 1u
+ : logged_configuration()->nodes()->size();
+ }
+
const std::vector<std::vector<std::string>> filenames_;
// This is *a* log file header used to provide the logged config. The rest of
// the header is likely distracting.
FlatbufferVector<LogFileHeader> log_file_header_;
- Eigen::Matrix<double, Eigen::Dynamic, 1> SolveOffsets();
+ // Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
+ std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+ SolveOffsets();
+
+ void LogFit(std::string_view prefix);
// State per node.
class State {
@@ -392,13 +527,6 @@
// OldestMessageTime.
void SeedSortedMessages();
- // Updates the timestamp filter with the timestamp. Returns true if the
- // provided timestamp was actually a forwarding timestamp and used, and
- // false otherwise.
- bool MaybeUpdateTimestamp(
- const TimestampMerger::DeliveryTimestamp &channel_timestamp,
- int channel_index);
-
// Returns the starting time for this node.
monotonic_clock::time_point monotonic_start_time() const {
return channel_merger_->monotonic_start_time();
@@ -416,13 +544,6 @@
void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
EventLoop *event_loop() { return event_loop_; }
- // Returns the oldest timestamp for the provided channel. This should only
- // be called before SeedSortedMessages();
- TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
- size_t channel) {
- return channel_merger_->OldestTimestampForChannel(channel);
- }
-
// Sets the current realtime offset from the monotonic clock for this node
// (if we are on a simulated event loop).
void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
@@ -440,6 +561,11 @@
return node_event_loop_factory_->ToDistributedClock(time);
}
+ monotonic_clock::time_point FromDistributedClock(
+ distributed_clock::time_point time) {
+ return node_event_loop_factory_->FromDistributedClock(time);
+ }
+
// Sets the offset (and slope) from the distributed clock.
void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
double distributed_slope) {
@@ -453,6 +579,20 @@
return channel_target_event_loop_factory_[channel_index]->monotonic_now();
}
+ distributed_clock::time_point RemoteToDistributedClock(
+ size_t channel_index, monotonic_clock::time_point time) {
+ return channel_target_event_loop_factory_[channel_index]
+ ->ToDistributedClock(time);
+ }
+
+ const Node *remote_node(size_t channel_index) {
+ return channel_target_event_loop_factory_[channel_index]->node();
+ }
+
+ monotonic_clock::time_point monotonic_now() {
+ return node_event_loop_factory_->monotonic_now();
+ }
+
// Sets the node we will be merging as, and returns true if there is any
// data on it.
bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
@@ -461,10 +601,9 @@
void SetChannelCount(size_t count);
// Sets the sender, filter, and target factory for a channel.
- void SetChannel(
- size_t channel, std::unique_ptr<RawSender> sender,
- std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
- NodeEventLoopFactory *channel_target_event_loop_factory);
+ void SetChannel(size_t channel, std::unique_ptr<RawSender> sender,
+ message_bridge::NoncausalOffsetEstimator *filter,
+ NodeEventLoopFactory *channel_target_event_loop_factory);
// Returns if we have read all the messages from all the logs.
bool at_end() const { return channel_merger_->at_end(); }
@@ -493,14 +632,32 @@
}
// Returns a debug string for the channel merger.
- std::string DebugString() const { return channel_merger_->DebugString(); }
+ std::string DebugString() const {
+ std::stringstream messages;
+ size_t i = 0;
+ for (const auto &message : sorted_messages_) {
+ if (i < 7 || i + 7 > sorted_messages_.size()) {
+ messages << "sorted_messages[" << i
+ << "]: " << std::get<0>(message).monotonic_event_time << " "
+ << configuration::StrippedChannelToString(
+ event_loop_->configuration()->channels()->Get(
+ std::get<2>(message).message().channel_index()))
+ << "\n";
+ } else if (i == 7) {
+ messages << "...\n";
+ }
+ ++i;
+ }
+ return messages.str() + channel_merger_->DebugString();
+ }
private:
// Log file.
std::unique_ptr<ChannelMerger> channel_merger_;
std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
- FlatbufferVector<MessageHeader>>>
+ FlatbufferVector<MessageHeader>,
+ message_bridge::NoncausalOffsetEstimator *>>
sorted_messages_;
// Senders.
@@ -518,8 +675,7 @@
// This corresponds to the object which is shared among all the channels
// going between 2 nodes. The second element in the tuple indicates if this
// is the primary direction or not.
- std::vector<std::tuple<message_bridge::ClippedAverageFilter *, bool>>
- filters_;
+ std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
// List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
// channel) which correspond to the originating node.
@@ -532,8 +688,8 @@
// Creates the requested filter if it doesn't exist, regardless of whether
// these nodes can actually communicate directly. The second return value
// reports if this is the primary direction or not.
- std::tuple<message_bridge::ClippedAverageFilter *, bool> GetFilter(
- const Node *node_a, const Node *node_b);
+ message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
+ const Node *node_b);
// FILE to write offsets to (if populated).
FILE *offset_fp_ = nullptr;
@@ -544,32 +700,79 @@
// List of filters for a connection. The pointer to the first node will be
// less than the second node.
std::map<std::tuple<const Node *, const Node *>,
- message_bridge::ClippedAverageFilter>
+ std::tuple<message_bridge::NoncausalOffsetEstimator>>
filters_;
// Returns the offset from the monotonic clock for a node to the distributed
- // clock. distributed = monotonic + offset;
- std::chrono::nanoseconds offset(int node_index) const {
- CHECK_LT(node_index, offset_matrix_.rows())
+ // clock. monotonic = distributed * slope() + offset();
+ double slope(int node_index) const {
+ CHECK_LT(node_index, time_slope_matrix_.rows())
<< ": Got too high of a node index.";
- return -std::chrono::duration_cast<std::chrono::nanoseconds>(
- std::chrono::duration<double>(offset_matrix_(node_index))) -
- base_offset_matrix_(node_index);
+ return time_slope_matrix_(node_index);
+ }
+ std::chrono::nanoseconds offset(int node_index) const {
+ CHECK_LT(node_index, time_offset_matrix_.rows())
+ << ": Got too high of a node index.";
+ return std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(time_offset_matrix_(node_index)));
}
// Updates the offset matrix solution and sets the per-node distributed
// offsets in the factory.
void UpdateOffsets();
- // sample_matrix_ = map_matrix_ * offset_matrix_
- Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
- Eigen::Matrix<double, Eigen::Dynamic, 1> sample_matrix_;
- Eigen::Matrix<double, Eigen::Dynamic, 1> offset_matrix_;
+ // We have 2 types of equations to do a least squares regression over to fully
+ // constrain our time function.
+ //
+ // One is simple. The distributed clock is the average of all the clocks.
+ // (ta + tb + tc + td) / num_nodex = t_distributed
+ //
+ // The second is a bit more complicated. Our basic time conversion function
+ // is:
+ // tb = ta + (ta * slope + offset)
+ // We can rewrite this as follows
+ // tb - (1 + slope) * ta = offset
+ //
+ // From here, we have enough equations to solve for t{a,b,c,...} We want to
+ // take as an input the offsets and slope, and solve for the per-node times as
+ // a function of the distributed clock.
+ //
+ // We need to massage our equations to make this work. If we solve for the
+ // per-node times at two set distributed clock times, we will be able to
+ // recreate the linear function (we know it is linear). We can do a similar
+ // thing by breaking our equation up into:
+ //
+ // [1/3 1/3 1/3 ] [ta] [t_distributed]
+ // [ 1 -1-m1 0 ] [tb] = [oab]
+ // [ 1 0 -1-m2 ] [tc] [oac]
+ //
+ // This solves to:
+ //
+ // [ta] [ a00 a01 a02] [t_distributed]
+ // [tb] = [ a10 a11 a12] * [oab]
+ // [tc] [ a20 a21 a22] [oac]
+ //
+ // and can be split into:
+ //
+ // [ta] [ a00 ] [a01 a02]
+ // [tb] = [ a10 ] * t_distributed + [a11 a12] * [oab]
+ // [tc] [ a20 ] [a21 a22] [oac]
+ //
+ // (map_matrix_ + slope_matrix_) * [ta; tb; tc] = [offset_matrix_];
+ // offset_matrix_ will be in nanoseconds.
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> slope_matrix_;
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> offset_matrix_;
+ // Matrix tracking which offsets are valid.
+ Eigen::Matrix<bool, Eigen::Dynamic, 1> valid_matrix_;
+ // Matrix tracking the last valid matrix we used to determine connected nodes.
+ Eigen::Matrix<bool, Eigen::Dynamic, 1> last_valid_matrix_;
+ size_t cached_valid_node_count_ = 0;
- // Base offsets. The actual offset is the sum of this and the offset matrix.
- // This removes some of the dynamic range challenges from the double above.
- Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>
- base_offset_matrix_;
+ // [ta; tb; tc] = time_slope_matrix_ * t + time_offset_matrix;
+ // t is in seconds.
+ Eigen::Matrix<double, Eigen::Dynamic, 1> time_slope_matrix_;
+ Eigen::Matrix<double, Eigen::Dynamic, 1> time_offset_matrix_;
std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
remapped_configuration_buffer_;
diff --git a/aos/events/logging/logger_main.cc b/aos/events/logging/logger_main.cc
index 2575e85..f95fa17 100644
--- a/aos/events/logging/logger_main.cc
+++ b/aos/events/logging/logger_main.cc
@@ -25,11 +25,8 @@
std::unique_ptr<aos::logger::DetachedBufferWriter> writer;
std::unique_ptr<aos::logger::LogNamer> log_namer;
if (event_loop.node() == nullptr) {
- writer = std::make_unique<aos::logger::DetachedBufferWriter>(
- aos::logging::GetLogName("fbs_log"));
-
- log_namer = std::make_unique<aos::logger::LocalLogNamer>(writer.get(),
- event_loop.node());
+ log_namer = std::make_unique<aos::logger::LocalLogNamer>(
+ aos::logging::GetLogName("fbs_log"), event_loop.node());
} else {
log_namer = std::make_unique<aos::logger::MultiNodeLogNamer>(
aos::logging::GetLogName("fbs_log"), event_loop.configuration(),
@@ -41,5 +38,7 @@
event_loop.Run();
+ LOG(INFO) << "Shutting down";
+
return 0;
}
diff --git a/aos/events/logging/logger_math.cc b/aos/events/logging/logger_math.cc
index 313894b..c333f55 100644
--- a/aos/events/logging/logger_math.cc
+++ b/aos/events/logging/logger_math.cc
@@ -2,14 +2,187 @@
#include "Eigen/Dense"
+#include "third_party/gmp/gmpxx.h"
+
namespace aos {
namespace logger {
+namespace {
+Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> ToDouble(
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> in) {
+ Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> result =
+ Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>::Zero(in.rows(),
+ in.cols());
+ for (int i = 0; i < in.rows(); ++i) {
+ for (int j = 0; j < in.cols(); ++j) {
+ result(i, j) = in(i, j).get_d();
+ }
+ }
+ return result;
+}
+
+std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+Solve(const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> &mpq_map,
+ const Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> &mpq_offsets) {
+ aos::monotonic_clock::time_point start_time = aos::monotonic_clock::now();
+ // Least squares solve for the slopes and offsets.
+ const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> inv =
+ (mpq_map.transpose() * mpq_map).inverse() * mpq_map.transpose();
+ aos::monotonic_clock::time_point end_time = aos::monotonic_clock::now();
+
+ VLOG(1) << "Took "
+ << std::chrono::duration<double>(end_time - start_time).count()
+ << " seconds to invert";
+
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_solution_slope =
+ inv.block(0, 0, inv.rows(), 1);
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_solution_offset =
+ inv.block(0, 1, inv.rows(), inv.cols() - 1) *
+ mpq_offsets.block(1, 0, inv.rows() - 1, 1);
+
+ mpq_solution_offset *= mpq_class(1, 1000000000);
+
+ return std::make_tuple(ToDouble(mpq_solution_slope),
+ ToDouble(mpq_solution_offset));
+}
+} // namespace
+
// This is slow to compile, so we put it in a separate file. More parallelism
// and less change.
-Eigen::Matrix<double, Eigen::Dynamic, 1> LogReader::SolveOffsets() {
- return map_matrix_.bdcSvd(Eigen::ComputeThinU | Eigen::ComputeThinV)
- .solve(sample_matrix_);
+std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+LogReader::SolveOffsets() {
+ // TODO(austin): Split this out and unit tests a bit better. When we do
+ // partial node subsets and also try to optimize this again would be a good
+ // time.
+ //
+ // TODO(austin): CHECK that the number doesn't change over time. We can freak
+ // out if that happens.
+
+ // Start by counting how many node pairs we have an offset estimated for.
+ int nonzero_offset_count = 1;
+ for (int i = 1; i < valid_matrix_.rows(); ++i) {
+ if (valid_matrix_(i) != 0) {
+ ++nonzero_offset_count;
+ }
+ }
+
+ Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
+ "]");
+
+ // If there are missing rows, we can't solve the original problem and instead
+ // need to filter the matrix to remove the missing rows and solve a simplified
+ // problem. What this means practically is that we might have pairs of nodes
+ // which are communicating, but we don't have timestamps between. But we can
+ // have multiple paths in our graph between 2 nodes, so we can still solve
+ // time without the missing timestamp.
+ //
+ // In the following example, we can drop any of the last 3 rows, and still
+ // solve.
+ //
+ // [1/3 1/3 1/3 ] [ta] [t_distributed]
+ // [ 1 -1-m1 0 ] [tb] = [oab]
+ // [ 1 0 -1-m2 ] [tc] [oac]
+ // [ 0 1 -1-m2 ] [obc]
+ if (nonzero_offset_count != offset_matrix_.rows()) {
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ nonzero_offset_count, map_matrix_.cols());
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_offsets =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(nonzero_offset_count);
+
+ std::vector<bool> valid_nodes(nodes_count(), false);
+
+ size_t destination_row = 0;
+ for (int j = 0; j < map_matrix_.cols(); ++j) {
+ mpq_map(0, j) = mpq_class(1, map_matrix_.cols());
+ }
+ mpq_offsets(0) = mpq_class(0);
+ ++destination_row;
+
+ for (int i = 1; i < offset_matrix_.rows(); ++i) {
+ // Copy over the first row, i.e. the row which says that all times average
+ // to the distributed time. And then copy over all valid rows.
+ if (valid_matrix_(i)) {
+ mpq_offsets(destination_row) = mpq_class(offset_matrix_(i));
+
+ for (int j = 0; j < map_matrix_.cols(); ++j) {
+ mpq_map(destination_row, j) = map_matrix_(i, j) + slope_matrix_(i, j);
+ if (mpq_map(destination_row, j) != 0) {
+ valid_nodes[j] = true;
+ }
+ }
+
+ ++destination_row;
+ }
+ }
+
+ VLOG(1) << "Filtered map " << ToDouble(mpq_map).format(HeavyFmt);
+ VLOG(1) << "Filtered offsets " << ToDouble(mpq_offsets).format(HeavyFmt);
+
+ // Compute (and cache) the current connectivity. If we have N nodes
+ // configured, but logs only from one of them, we want to assume that the
+ // rest of the nodes match the distributed clock exactly.
+ //
+ // If data shows up later for them, we will CHECK when time jumps.
+ //
+ // TODO(austin): Once we have more info on what cases are reasonable, we can
+ // open up the restrictions.
+ if (valid_matrix_ != last_valid_matrix_) {
+ Eigen::FullPivLU<Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>
+ full_piv(mpq_map);
+ const size_t connected_nodes = full_piv.rank();
+
+ size_t valid_node_count = 0;
+ for (size_t i = 0; i < valid_nodes.size(); ++i) {
+ const bool valid_node = valid_nodes[i];
+ if (valid_node) {
+ ++valid_node_count;
+ } else {
+ LOG(WARNING)
+ << "Node "
+ << logged_configuration()->nodes()->Get(i)->name()->string_view()
+ << " has no observations, setting to distributed clock.";
+ }
+ }
+
+ // Confirm that the set of nodes we have connected matches the rank.
+ // Otherwise a<->b and c<->d would count as 4 but really is 3.
+ CHECK_EQ(std::max(static_cast<size_t>(1u), valid_node_count),
+ connected_nodes)
+ << ": Ambiguous nodes.";
+
+ last_valid_matrix_ = valid_matrix_;
+ cached_valid_node_count_ = valid_node_count;
+ }
+
+ // There are 2 cases. Either all the nodes are connected with each other by
+ // actual data, or we have isolated nodes. We want to force the isolated
+ // nodes to match the distributed clock exactly, and to solve for the other
+ // nodes.
+ if (cached_valid_node_count_ == 0) {
+ // Cheat. If there are no valid nodes, the slopes are 1, and offset is 0,
+ // ie, just be the distributed clock.
+ return std::make_tuple(
+ Eigen::Matrix<double, Eigen::Dynamic, 1>::Ones(nodes_count()),
+ Eigen::Matrix<double, Eigen::Dynamic, 1>::Zero(nodes_count()));
+ } else {
+ // TODO(austin): Solve just the nodes we know about. This is harder and
+ // there are no logs which require this yet to test on.
+ CHECK_EQ(cached_valid_node_count_, nodes_count())
+ << ": TODO(austin): Handle partial valid nodes";
+
+ return Solve(mpq_map, mpq_offsets);
+ }
+ } else {
+ const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
+ map_matrix_ + slope_matrix_;
+ VLOG(1) << "map " << (map_matrix_ + slope_matrix_).format(HeavyFmt);
+ VLOG(1) << "offsets " << offset_matrix_.format(HeavyFmt);
+
+ return Solve(mpq_map, offset_matrix_);
+ }
}
} // namespace logger
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 3b10f9d..ab5f02c 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -4,6 +4,7 @@
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/simulated_event_loop.h"
+#include "aos/util/file.h"
#include "glog/logging.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
@@ -42,20 +43,20 @@
// the config.
TEST_F(LoggerTest, Starts) {
const ::std::string tmpdir(getenv("TEST_TMPDIR"));
- const ::std::string logfile = tmpdir + "/logfile.bfbs";
+ const ::std::string base_name = tmpdir + "/logfile";
+ const ::std::string logfile = base_name + ".part0.bfbs";
// Remove it.
unlink(logfile.c_str());
LOG(INFO) << "Logging data to " << logfile;
{
- DetachedBufferWriter writer(logfile);
std::unique_ptr<EventLoop> logger_event_loop =
event_loop_factory_.MakeEventLoop("logger");
event_loop_factory_.RunFor(chrono::milliseconds(95));
- Logger logger(&writer, logger_event_loop.get(),
+ Logger logger(base_name, logger_event_loop.get(),
std::chrono::milliseconds(100));
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
@@ -101,8 +102,9 @@
// Tests that we can read and write rotated log files.
TEST_F(LoggerTest, RotatedLogFile) {
const ::std::string tmpdir(getenv("TEST_TMPDIR"));
- const ::std::string logfile0 = tmpdir + "/logfile0.bfbs";
- const ::std::string logfile1 = tmpdir + "/logfile1.bfbs";
+ const ::std::string base_name = tmpdir + "/logfile";
+ const ::std::string logfile0 = base_name + ".part0.bfbs";
+ const ::std::string logfile1 = base_name + ".part1.bfbs";
// Remove it.
unlink(logfile0.c_str());
unlink(logfile1.c_str());
@@ -110,17 +112,16 @@
LOG(INFO) << "Logging data to " << logfile0 << " and " << logfile1;
{
- DetachedBufferWriter writer0(logfile0);
- DetachedBufferWriter writer1(logfile1);
std::unique_ptr<EventLoop> logger_event_loop =
event_loop_factory_.MakeEventLoop("logger");
event_loop_factory_.RunFor(chrono::milliseconds(95));
- Logger logger(&writer0, logger_event_loop.get(),
- std::chrono::milliseconds(100));
+ Logger logger(
+ std::make_unique<LocalLogNamer>(base_name, logger_event_loop->node()),
+ logger_event_loop.get(), std::chrono::milliseconds(100));
event_loop_factory_.RunFor(chrono::milliseconds(10000));
- logger.Rotate(&writer1);
+ logger.Rotate();
event_loop_factory_.RunFor(chrono::milliseconds(10000));
}
@@ -166,7 +167,8 @@
// Tests that a large number of messages per second doesn't overwhelm writev.
TEST_F(LoggerTest, ManyMessages) {
const ::std::string tmpdir(getenv("TEST_TMPDIR"));
- const ::std::string logfile = tmpdir + "/logfile.bfbs";
+ const ::std::string base_name = tmpdir + "/logfile";
+ const ::std::string logfile = base_name + ".part0.bfbs";
// Remove the log file.
unlink(logfile.c_str());
@@ -174,7 +176,6 @@
ping_.set_quiet(true);
{
- DetachedBufferWriter writer(logfile);
std::unique_ptr<EventLoop> logger_event_loop =
event_loop_factory_.MakeEventLoop("logger");
@@ -198,7 +199,7 @@
chrono::microseconds(50));
});
- Logger logger(&writer, logger_event_loop.get(),
+ Logger logger(base_name, logger_event_loop.get(),
std::chrono::milliseconds(100));
event_loop_factory_.RunFor(chrono::milliseconds(1000));
@@ -217,9 +218,35 @@
configuration::GetNode(event_loop_factory_.configuration(), "pi2")),
tmp_dir_(getenv("TEST_TMPDIR")),
logfile_base_(tmp_dir_ + "/multi_logfile"),
- logfiles_({logfile_base_ + "_pi1_data.bfbs",
- logfile_base_ + "_pi2_data/test/aos.examples.Pong.bfbs",
- logfile_base_ + "_pi2_data.bfbs"}),
+ logfiles_(
+ {logfile_base_ + "_pi1_data.part0.bfbs",
+ logfile_base_ + "_pi2_data/test/aos.examples.Pong.part0.bfbs",
+ logfile_base_ + "_pi2_data/test/aos.examples.Pong.part1.bfbs",
+ logfile_base_ + "_pi2_data.part0.bfbs",
+ logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.logger.MessageHeader.part0.bfbs",
+ logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.logger.MessageHeader.part1.bfbs",
+ logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
+ "aos.logger.MessageHeader.part0.bfbs",
+ logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
+ "aos.logger.MessageHeader.part1.bfbs",
+ logfile_base_ +
+ "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part0.bfbs",
+ logfile_base_ +
+ "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part1.bfbs",
+ logfile_base_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs",
+ logfile_base_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part1.bfbs"}),
+ structured_logfiles_{
+ std::vector<std::string>{logfiles_[0]},
+ std::vector<std::string>{logfiles_[1], logfiles_[2]},
+ std::vector<std::string>{logfiles_[3]},
+ std::vector<std::string>{logfiles_[4], logfiles_[5]},
+ std::vector<std::string>{logfiles_[6], logfiles_[7]},
+ std::vector<std::string>{logfiles_[8], logfiles_[9]},
+ std::vector<std::string>{logfiles_[10], logfiles_[11]}},
ping_event_loop_(event_loop_factory_.MakeEventLoop("ping", pi1_)),
ping_(ping_event_loop_.get()),
pong_event_loop_(event_loop_factory_.MakeEventLoop("pong", pi2_)),
@@ -259,7 +286,9 @@
std::string tmp_dir_;
std::string logfile_base_;
- std::array<std::string, 3> logfiles_;
+ std::vector<std::string> logfiles_;
+
+ std::vector<std::vector<std::string>> structured_logfiles_;
std::unique_ptr<EventLoop> ping_event_loop_;
Ping ping_;
@@ -353,40 +382,96 @@
EXPECT_EQ(logheader2.message().node()->name()->string_view(), "pi2");
FlatbufferVector<LogFileHeader> logheader3 = ReadHeader(logfiles_[2]);
EXPECT_EQ(logheader3.message().node()->name()->string_view(), "pi2");
+ FlatbufferVector<LogFileHeader> logheader4 = ReadHeader(logfiles_[3]);
+ EXPECT_EQ(logheader4.message().node()->name()->string_view(), "pi2");
using ::testing::UnorderedElementsAre;
// Timing reports, pings
- EXPECT_THAT(CountChannelsData(logfiles_[0]),
- UnorderedElementsAre(
- std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
- std::make_tuple("/test", "aos.examples.Ping", 2001)));
+ EXPECT_THAT(
+ CountChannelsData(logfiles_[0]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
+ std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
+ std::make_tuple("/test", "aos.examples.Ping", 2001)));
// Timestamps for pong
- EXPECT_THAT(CountChannelsTimestamp(logfiles_[0]),
- UnorderedElementsAre(
- std::make_tuple("/test", "aos.examples.Pong", 2001)));
+ EXPECT_THAT(
+ CountChannelsTimestamp(logfiles_[0]),
+ UnorderedElementsAre(
+ std::make_tuple("/test", "aos.examples.Pong", 2001),
+ std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)));
// Pong data.
EXPECT_THAT(CountChannelsData(logfiles_[1]),
UnorderedElementsAre(
- std::make_tuple("/test", "aos.examples.Pong", 2001)));
+ std::make_tuple("/test", "aos.examples.Pong", 101)));
+ EXPECT_THAT(CountChannelsData(logfiles_[2]),
+ UnorderedElementsAre(
+ std::make_tuple("/test", "aos.examples.Pong", 1900)));
// No timestamps
EXPECT_THAT(CountChannelsTimestamp(logfiles_[1]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[2]), UnorderedElementsAre());
// Timing reports and pongs.
- EXPECT_THAT(CountChannelsData(logfiles_[2]),
- UnorderedElementsAre(
- std::make_tuple("/pi2/aos", "aos.timing.Report", 40),
- std::make_tuple("/test", "aos.examples.Pong", 2001)));
+ EXPECT_THAT(
+ CountChannelsData(logfiles_[3]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
+ std::make_tuple("/pi2/aos", "aos.timing.Report", 40),
+ std::make_tuple("/test", "aos.examples.Pong", 2001)));
// And ping timestamps.
- EXPECT_THAT(CountChannelsTimestamp(logfiles_[2]),
- UnorderedElementsAre(
- std::make_tuple("/test", "aos.examples.Ping", 2001)));
+ EXPECT_THAT(
+ CountChannelsTimestamp(logfiles_[3]),
+ UnorderedElementsAre(
+ std::make_tuple("/test", "aos.examples.Ping", 2001),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)));
+
+ // Timestamps from pi2 on pi1, and the other way.
+ EXPECT_THAT(CountChannelsData(logfiles_[4]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsData(logfiles_[5]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsData(logfiles_[6]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsData(logfiles_[7]), UnorderedElementsAre());
+ EXPECT_THAT(
+ CountChannelsTimestamp(logfiles_[4]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 10),
+ std::make_tuple("/test", "aos.examples.Ping", 101)));
+ EXPECT_THAT(
+ CountChannelsTimestamp(logfiles_[5]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 190),
+ std::make_tuple("/test", "aos.examples.Ping", 1900)));
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[6]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi2/aos", "aos.message_bridge.Timestamp", 10)));
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[7]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi2/aos", "aos.message_bridge.Timestamp", 190)));
+
+ // And then test that the remotely logged timestamp data files only have
+ // timestamps in them.
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[8]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[9]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[10]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[11]), UnorderedElementsAre());
+
+ EXPECT_THAT(CountChannelsData(logfiles_[8]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi1/aos", "aos.message_bridge.Timestamp", 10)));
+ EXPECT_THAT(CountChannelsData(logfiles_[9]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi1/aos", "aos.message_bridge.Timestamp", 190)));
+
+ EXPECT_THAT(CountChannelsData(logfiles_[10]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi2/aos", "aos.message_bridge.Timestamp", 10)));
+ EXPECT_THAT(CountChannelsData(logfiles_[11]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi2/aos", "aos.message_bridge.Timestamp", 190)));
}
- LogReader reader({std::vector<std::string>{logfiles_[0]},
- std::vector<std::string>{logfiles_[2]}});
+ LogReader reader(structured_logfiles_);
SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -400,6 +485,13 @@
const Node *pi2 =
configuration::GetNode(log_reader_factory.configuration(), "pi2");
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+ LOG(INFO) << "now pi1 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+ LOG(INFO) << "now pi2 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(pi1, pi2));
reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
@@ -417,7 +509,7 @@
// Confirm that the ping value matches.
pi1_event_loop->MakeWatcher(
"/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
- VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
+ VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
<< pi1_event_loop->context().monotonic_remote_time << " -> "
<< pi1_event_loop->context().monotonic_event_time;
EXPECT_EQ(ping.value(), pi1_ping_count + 1);
@@ -436,7 +528,7 @@
});
pi2_event_loop->MakeWatcher(
"/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
- VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
+ VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
<< pi2_event_loop->context().monotonic_remote_time << " -> "
<< pi2_event_loop->context().monotonic_event_time;
EXPECT_EQ(ping.value(), pi2_ping_count + 1);
@@ -555,11 +647,8 @@
}
)");
- EXPECT_DEATH(LogReader({std::vector<std::string>{logfiles_[0]},
- std::vector<std::string>{logfiles_[2]}},
- &extra_nodes_config.message()),
+ EXPECT_DEATH(LogReader(structured_logfiles_, &extra_nodes_config.message()),
"Log file and replay config need to have matching nodes lists.");
- ;
}
// Tests that we can read log files where they don't start at the same monotonic
@@ -580,8 +669,7 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader({std::vector<std::string>{logfiles_[0]},
- std::vector<std::string>{logfiles_[2]}});
+ LogReader reader(structured_logfiles_);
SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -721,21 +809,27 @@
event_loop_factory_.RunFor(chrono::milliseconds(400));
}
- LogReader reader({std::vector<std::string>{logfiles_[0]},
- std::vector<std::string>{logfiles_[2]}});
+ LogReader reader(structured_logfiles_);
SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
- // This sends out the fetched messages and advances time to the start of the
- // log file.
- reader.Register(&log_reader_factory);
-
const Node *pi1 =
configuration::GetNode(log_reader_factory.configuration(), "pi1");
const Node *pi2 =
configuration::GetNode(log_reader_factory.configuration(), "pi2");
+ // This sends out the fetched messages and advances time to the start of the
+ // log file.
+ reader.Register(&log_reader_factory);
+
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+ LOG(INFO) << "now pi1 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+ LOG(INFO) << "now pi2 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
LOG(INFO) << "Done registering (pi1) "
<< log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
<< " "
diff --git a/aos/events/logging/multinode_pingpong.json b/aos/events/logging/multinode_pingpong.json
index a78b42c..6ac51e7 100644
--- a/aos/events/logging/multinode_pingpong.json
+++ b/aos/events/logging/multinode_pingpong.json
@@ -60,27 +60,43 @@
{
"name": "/pi1/aos",
"type": "aos.message_bridge.Timestamp",
- "logger": "NOT_LOGGED",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["pi2"],
"source_node": "pi1",
"destination_nodes": [
{
"name": "pi2",
- "timestamp_logger": "NOT_LOGGED"
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
}
]
},
{
"name": "/pi2/aos",
"type": "aos.message_bridge.Timestamp",
- "logger": "NOT_LOGGED",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["pi1"],
"source_node": "pi2",
"destination_nodes": [
{
"name": "pi1",
- "timestamp_logger": "NOT_LOGGED"
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
}
]
},
+ {
+ "name": "/pi1/aos/remote_timestamps/pi2",
+ "type": "aos.logger.MessageHeader",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi1",
+ "type": "aos.logger.MessageHeader",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi2"
+ },
/* Forwarded to pi2 */
{
"name": "/test",
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
index e56d331..970b7e3 100644
--- a/aos/events/multinode_pingpong.json
+++ b/aos/events/multinode_pingpong.json
@@ -35,7 +35,9 @@
{
"name": "pi2",
"priority": 1,
- "time_to_live": 5000000
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
},
{
"name": "pi3",
@@ -55,7 +57,9 @@
{
"name": "pi1",
"priority": 1,
- "time_to_live": 5000000
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
}
]
},
@@ -111,6 +115,18 @@
"frequency": 2
},
{
+ "name": "/pi1/aos/remote_timestamps/pi2",
+ "type": "aos.logger.MessageHeader",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi1",
+ "type": "aos.logger.MessageHeader",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi2"
+ },
+ {
"name": "/pi1/aos",
"type": "aos.timing.Report",
"source_node": "pi1",
@@ -142,8 +158,9 @@
{
"name": "pi2",
"priority": 1,
- "timestamp_logger": "LOCAL_LOGGER",
- "time_to_live": 5000000
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
}
]
},
@@ -155,8 +172,9 @@
{
"name": "pi1",
"priority": 1,
- "timestamp_logger": "LOCAL_LOGGER",
- "time_to_live": 5000000
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
}
]
},
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 1bb0313..29f01a4 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -118,10 +118,10 @@
return config;
}
-class MMapedQueue {
+class MMappedQueue {
public:
- MMapedQueue(std::string_view shm_base, const Channel *channel,
- std::chrono::seconds channel_storage_duration)
+ MMappedQueue(std::string_view shm_base, const Channel *channel,
+ std::chrono::seconds channel_storage_duration)
: config_(MakeQueueConfiguration(channel, channel_storage_duration)) {
std::string path = ShmPath(shm_base, channel);
@@ -172,7 +172,7 @@
ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
}
- ~MMapedQueue() {
+ ~MMappedQueue() {
PCHECK(munmap(data_, size_) == 0);
PCHECK(munmap(const_cast<void *>(const_data_), size_) == 0);
}
@@ -450,7 +450,7 @@
aos::ShmEventLoop *event_loop_;
const Channel *const channel_;
- MMapedQueue lockless_queue_memory_;
+ MMappedQueue lockless_queue_memory_;
ipc_lib::LocklessQueueReader reader_;
// This being nullopt indicates we're not looking for wakeups right now.
std::optional<ipc_lib::LocklessQueueWatcher> watcher_;
@@ -567,7 +567,7 @@
int buffer_index() override { return lockless_queue_sender_.buffer_index(); }
private:
- MMapedQueue lockless_queue_memory_;
+ MMappedQueue lockless_queue_memory_;
ipc_lib::LocklessQueueSender lockless_queue_sender_;
ipc_lib::LocklessQueueWakeUpper wake_upper_;
};
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 44ba3e5..7de0540 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -3,6 +3,7 @@
#include <string_view>
#include "aos/events/event_loop_param_test.h"
+#include "aos/events/logging/logger_generated.h"
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/test_message_generated.h"
@@ -290,6 +291,11 @@
simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
MessageCounter<examples::Pong> pi2_pong_counter(
pi2_pong_counter_event_loop.get(), "/test");
+ aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
+ pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
+ "/pi1/aos");
+ aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
+ pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
@@ -298,6 +304,14 @@
simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
MessageCounter<examples::Pong> pi1_pong_counter(
pi1_pong_counter_event_loop.get(), "/test");
+ aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
+ pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
+ aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
+ pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
+ "/aos");
+
+ std::unique_ptr<EventLoop> pi1_remote_timestamp =
+ simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
// Count timestamps.
MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
@@ -315,6 +329,12 @@
MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
pi3_pong_counter_event_loop.get(), "/pi3/aos");
+ // Count remote timestamps
+ MessageCounter<logger::MessageHeader> remote_timestamps_pi2_on_pi1(
+ pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
+ MessageCounter<logger::MessageHeader> remote_timestamps_pi1_on_pi2(
+ pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
+
// Wait to let timestamp estimation start up before looking for the results.
simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
@@ -429,6 +449,98 @@
++pi3_client_statistics_count;
});
+ // Find the channel index for both the /pi1/aos Timestamp channel and Ping
+ // channel.
+ const size_t pi1_timestamp_channel =
+ configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
+ pi1_on_pi2_timestamp_fetcher.channel());
+ const size_t ping_timestamp_channel =
+ configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
+ ping_on_pi2_fetcher.channel());
+
+ for (const Channel *channel :
+ *pi1_pong_counter_event_loop->configuration()->channels()) {
+ VLOG(1) << "Channel "
+ << configuration::ChannelIndex(
+ pi1_pong_counter_event_loop->configuration(), channel)
+ << " " << configuration::CleanedChannelToString(channel);
+ }
+
+ // For each remote timestamp we get back, confirm that it is either a ping
+ // message, or a timestamp we sent out. Also confirm that the timestamps are
+ // correct.
+ pi1_remote_timestamp->MakeWatcher(
+ "/pi1/aos/remote_timestamps/pi2",
+ [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
+ &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
+ &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
+ VLOG(1) << aos::FlatbufferToJson(&header);
+
+ const aos::monotonic_clock::time_point header_monotonic_sent_time(
+ chrono::nanoseconds(header.monotonic_sent_time()));
+ const aos::realtime_clock::time_point header_realtime_sent_time(
+ chrono::nanoseconds(header.realtime_sent_time()));
+ const aos::monotonic_clock::time_point header_monotonic_remote_time(
+ chrono::nanoseconds(header.monotonic_remote_time()));
+ const aos::realtime_clock::time_point header_realtime_remote_time(
+ chrono::nanoseconds(header.realtime_remote_time()));
+
+ const Context *pi1_context = nullptr;
+ const Context *pi2_context = nullptr;
+
+ if (header.channel_index() == pi1_timestamp_channel) {
+ // Find the forwarded message.
+ while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
+ header_monotonic_sent_time) {
+ ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
+ }
+
+ // And the source message.
+ while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
+ header_monotonic_remote_time) {
+ ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
+ }
+
+ pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
+ pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
+ } else if (header.channel_index() == ping_timestamp_channel) {
+ // Find the forwarded message.
+ while (ping_on_pi2_fetcher.context().monotonic_event_time <
+ header_monotonic_sent_time) {
+ ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
+ }
+
+ // And the source message.
+ while (ping_on_pi1_fetcher.context().monotonic_event_time <
+ header_monotonic_remote_time) {
+ ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
+ }
+
+ pi1_context = &ping_on_pi1_fetcher.context();
+ pi2_context = &ping_on_pi2_fetcher.context();
+ } else {
+ LOG(FATAL) << "Unknown channel";
+ }
+
+ // Confirm the forwarded message has matching timestamps to the
+ // timestamps we got back.
+ EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi2_context->monotonic_event_time,
+ header_monotonic_sent_time);
+ EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
+ EXPECT_EQ(pi2_context->realtime_remote_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi2_context->monotonic_remote_time,
+ header_monotonic_remote_time);
+
+ // Confirm the forwarded message also matches the source message.
+ EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi1_context->monotonic_event_time,
+ header_monotonic_remote_time);
+ EXPECT_EQ(pi1_context->realtime_event_time,
+ header_realtime_remote_time);
+ });
+
simulated_event_loop_factory.RunFor(chrono::seconds(10) -
chrono::milliseconds(500) +
chrono::milliseconds(5));
@@ -451,6 +563,10 @@
EXPECT_EQ(pi1_client_statistics_count, 95);
EXPECT_EQ(pi2_client_statistics_count, 95);
EXPECT_EQ(pi3_client_statistics_count, 95);
+
+ // Also confirm that remote timestamps are being forwarded correctly.
+ EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 1101);
+ EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 1101);
}
// Tests that an offset between nodes can be recovered and shows up in
@@ -605,6 +721,12 @@
MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
pi3_pong_counter_event_loop.get(), "/pi3/aos");
+ // Count remote timestamps
+ MessageCounter<logger::MessageHeader> remote_timestamps_pi2_on_pi1(
+ pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
+ MessageCounter<logger::MessageHeader> remote_timestamps_pi1_on_pi2(
+ pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
+
MessageCounter<message_bridge::ServerStatistics>
pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
"/pi1/aos");
@@ -646,6 +768,10 @@
EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
+
+ // Also confirm that remote timestamps are being forwarded correctly.
+ EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 1001);
+ EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 1001);
}
} // namespace testing
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 0f6e8bc..825c830 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -19,7 +19,9 @@
std::unique_ptr<aos::RawFetcher> fetcher,
std::unique_ptr<aos::RawSender> sender,
ServerConnection *server_connection, int client_index,
- MessageBridgeClientStatus *client_status)
+ MessageBridgeClientStatus *client_status,
+ size_t channel_index,
+ aos::Sender<logger::MessageHeader> *timestamp_logger)
: fetch_node_factory_(fetch_node_factory),
send_node_factory_(send_node_factory),
send_event_loop_(send_event_loop),
@@ -28,7 +30,9 @@
server_connection_(server_connection),
client_status_(client_status),
client_index_(client_index),
- client_connection_(client_status_->GetClientConnection(client_index)) {
+ client_connection_(client_status_->GetClientConnection(client_index)),
+ channel_index_(channel_index),
+ timestamp_logger_(timestamp_logger) {
timer_ = send_event_loop_->AddTimer([this]() { Send(); });
Schedule();
@@ -101,6 +105,34 @@
client_connection_->mutate_received_packets(
client_connection_->received_packets() + 1);
+
+ if (timestamp_logger_) {
+ aos::Sender<logger::MessageHeader>::Builder builder =
+ timestamp_logger_->MakeBuilder();
+
+ logger::MessageHeader::Builder message_header_builder =
+ builder.MakeBuilder<logger::MessageHeader>();
+
+ message_header_builder.add_channel_index(channel_index_);
+
+ // Swap the remote and sent metrics. They are from the sender's
+ // perspective, not the receiver's perspective.
+ message_header_builder.add_monotonic_remote_time(
+ fetcher_->context().monotonic_event_time.time_since_epoch().count());
+ message_header_builder.add_realtime_remote_time(
+ fetcher_->context().realtime_event_time.time_since_epoch().count());
+ message_header_builder.add_remote_queue_index(
+ fetcher_->context().queue_index);
+
+ message_header_builder.add_monotonic_sent_time(
+ sender_->monotonic_sent_time().time_since_epoch().count());
+ message_header_builder.add_realtime_sent_time(
+ sender_->realtime_sent_time().time_since_epoch().count());
+ message_header_builder.add_queue_index(sender_->sent_queue_index());
+
+ builder.Send(message_header_builder.Finish());
+ }
+
sent_ = true;
Schedule();
}
@@ -136,6 +168,9 @@
MessageBridgeClientStatus *client_status_ = nullptr;
int client_index_;
ClientConnection *client_connection_ = nullptr;
+
+ size_t channel_index_;
+ aos::Sender<logger::MessageHeader> *timestamp_logger_ = nullptr;
};
SimulatedMessageBridge::SimulatedMessageBridge(
@@ -208,6 +243,13 @@
destination_event_loop->second.client_status.FindClientIndex(
channel->source_node()->string_view());
+ const size_t destination_node_index = configuration::GetNodeIndex(
+ simulated_event_loop_factory->configuration(), destination_node);
+
+ const bool delivery_time_is_logged =
+ configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ connection, source_event_loop->second.event_loop->node());
+
delayers->emplace_back(std::make_unique<RawMessageDelayer>(
simulated_event_loop_factory->GetNodeEventLoopFactory(node),
simulated_event_loop_factory->GetNodeEventLoopFactory(
@@ -216,7 +258,13 @@
source_event_loop->second.event_loop->MakeRawFetcher(channel),
destination_event_loop->second.event_loop->MakeRawSender(channel),
server_connection, client_index,
- &destination_event_loop->second.client_status));
+ &destination_event_loop->second.client_status,
+ configuration::ChannelIndex(
+ source_event_loop->second.event_loop->configuration(), channel),
+ delivery_time_is_logged
+ ? &source_event_loop->second
+ .timestamp_loggers[destination_node_index]
+ : nullptr));
}
const Channel *const timestamp_channel = configuration::GetChannel(
@@ -272,5 +320,46 @@
}
}
+SimulatedMessageBridge::State::State(
+ std::unique_ptr<aos::EventLoop> &&new_event_loop)
+ : event_loop(std::move(new_event_loop)),
+ server_status(event_loop.get()),
+ client_status(event_loop.get()) {
+ timestamp_loggers.resize(event_loop->configuration()->nodes()->size());
+
+ // Find all nodes which log timestamps back to us (from us).
+ for (const Channel *channel : *event_loop->configuration()->channels()) {
+ CHECK(channel->has_source_node());
+
+ // Sent by us.
+ if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
+ channel->has_destination_nodes()) {
+ for (const Connection *connection : *channel->destination_nodes()) {
+ const bool delivery_time_is_logged =
+ configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ connection, event_loop->node());
+
+ // And the timestamps are then logged back by us again.
+ if (!delivery_time_is_logged) {
+ continue;
+ }
+
+ // (And only construct the sender if it hasn't been constructed)
+ const Node *other_node = configuration::GetNode(
+ event_loop->configuration(), connection->name()->string_view());
+ const size_t other_node_index = configuration::GetNodeIndex(
+ event_loop->configuration(), other_node);
+
+ if (!timestamp_loggers[other_node_index]) {
+ timestamp_loggers[other_node_index] =
+ event_loop->MakeSender<logger::MessageHeader>(
+ absl::StrCat("/aos/remote_timestamps/",
+ connection->name()->string_view()));
+ }
+ }
+ }
+ }
+}
+
} // namespace message_bridge
} // namespace aos
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 5784bb3..2a4da63 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -2,6 +2,7 @@
#define AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
#include "aos/events/event_loop.h"
+#include "aos/events/logging/logger_generated.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/network/message_bridge_client_status.h"
#include "aos/network/message_bridge_server_status.h"
@@ -32,16 +33,15 @@
private:
struct State {
- State(std::unique_ptr<aos::EventLoop> &&new_event_loop)
- : event_loop(std::move(new_event_loop)),
- server_status(event_loop.get()),
- client_status(event_loop.get()) {}
+ State(std::unique_ptr<aos::EventLoop> &&new_event_loop);
State(const State &state) = delete;
std::unique_ptr<aos::EventLoop> event_loop;
MessageBridgeServerStatus server_status;
MessageBridgeClientStatus client_status;
+
+ std::vector<aos::Sender<logger::MessageHeader>> timestamp_loggers;
};
// Map of nodes to event loops. This is a member variable so that the
// lifetime of the event loops matches the lifetime of the bridge.
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 392a7eb..bd095b8 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -328,6 +328,14 @@
virtual ~SizePrefixedFlatbufferDetachedBuffer() override {}
+ static SizePrefixedFlatbufferDetachedBuffer<T> Empty() {
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+ const auto end = fbb.EndTable(fbb.StartTable());
+ fbb.FinishSizePrefixed(flatbuffers::Offset<flatbuffers::Table>(end));
+ return SizePrefixedFlatbufferDetachedBuffer<T>(fbb.Release());
+ }
+
// Returns references to the buffer, and the data.
const flatbuffers::DetachedBuffer &buffer() const { return buffer_; }
const uint8_t *data() const override {
@@ -340,6 +348,13 @@
return buffer_.size() - sizeof(flatbuffers::uoffset_t);
}
+ absl::Span<uint8_t> full_span() {
+ return absl::Span<uint8_t>(buffer_.data(), buffer_.size());
+ }
+ absl::Span<const uint8_t> full_span() const {
+ return absl::Span<const uint8_t>(buffer_.data(), buffer_.size());
+ }
+
private:
flatbuffers::DetachedBuffer buffer_;
};
diff --git a/aos/network/BUILD b/aos/network/BUILD
index f9fe7ea..69482e1 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -268,6 +268,7 @@
name = "message_bridge_test_common_config",
src = "message_bridge_test_common.json",
flatbuffers = [
+ "//aos/events/logging:logger_fbs",
"//aos/events:ping_fbs",
"//aos/events:pong_fbs",
"//aos/network:message_bridge_client_fbs",
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index b371181..00040e1 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -78,6 +78,8 @@
logger::MessageHeader::Builder message_header_builder(fbb);
message_header_builder.add_channel_index(0);
message_header_builder.add_monotonic_sent_time(0);
+ message_header_builder.add_realtime_sent_time(0);
+ message_header_builder.add_queue_index(0);
message_header_builder.add_monotonic_remote_time(0);
message_header_builder.add_realtime_remote_time(0);
message_header_builder.add_remote_queue_index(0);
@@ -250,6 +252,10 @@
message_header->channel_index());
message_reception_reply_.mutable_message()->mutate_monotonic_sent_time(
message_header->monotonic_sent_time());
+ message_reception_reply_.mutable_message()->mutate_realtime_sent_time(
+ message_header->realtime_sent_time());
+ message_reception_reply_.mutable_message()->mutate_queue_index(
+ message_header->queue_index());
// And capture the relevant data needed to generate the forwarding
// MessageHeader.
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 2832339..de836ae 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -1,5 +1,6 @@
#include "aos/network/message_bridge_server_lib.h"
+#include "absl/strings/str_cat.h"
#include "absl/types/span.h"
#include "aos/events/logging/logger.h"
#include "aos/events/logging/logger_generated.h"
@@ -85,14 +86,55 @@
// and flushes. Whee.
}
-void ChannelState::HandleDelivery(sctp_assoc_t /*rcv_assoc_id*/,
- uint16_t /*ssn*/,
+void ChannelState::HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t /*ssn*/,
absl::Span<const uint8_t> data) {
const logger::MessageHeader *message_header =
flatbuffers::GetRoot<logger::MessageHeader>(data.data());
+ for (Peer &peer : peers_) {
+ if (peer.sac_assoc_id == rcv_assoc_id) {
+ if (peer.timestamp_logger != nullptr) {
+ // TODO(austin): Need to implement reliable sending of the delivery
+ // timestamps. Track what made it, and retry what didn't.
+ //
+ // This needs to be munged and cleaned up to match the timestamp
+ // standard.
+
+ aos::Sender<logger::MessageHeader>::Builder builder =
+ peer.timestamp_logger->MakeBuilder();
+
+ logger::MessageHeader::Builder message_header_builder =
+ builder.MakeBuilder<logger::MessageHeader>();
+
+ message_header_builder.add_channel_index(
+ message_header->channel_index());
+
+ message_header_builder.add_queue_index(
+ message_header->remote_queue_index());
+ message_header_builder.add_monotonic_sent_time(
+ message_header->monotonic_remote_time());
+ message_header_builder.add_realtime_sent_time(
+ message_header->realtime_remote_time());
+
+ // Swap the remote and sent metrics. They are from the sender's
+ // perspective, not the receiver's perspective.
+ message_header_builder.add_monotonic_remote_time(
+ message_header->monotonic_sent_time());
+ message_header_builder.add_realtime_remote_time(
+ message_header->realtime_sent_time());
+ message_header_builder.add_remote_queue_index(
+ message_header->queue_index());
+
+ builder.Send(message_header_builder.Finish());
+ }
+ break;
+ }
+ }
+
while (sent_messages_.size() > 0u) {
if (sent_messages_.begin()->message().monotonic_sent_time() ==
- message_header->monotonic_sent_time()) {
+ message_header->monotonic_sent_time() &&
+ sent_messages_.begin()->message().queue_index() ==
+ message_header->queue_index()) {
sent_messages_.pop_front();
continue;
}
@@ -124,11 +166,12 @@
// time out eventually. Need to sort that out.
}
-void ChannelState::AddPeer(const Connection *connection, int node_index,
- ServerConnection *server_connection_statistics,
- bool logged_remotely) {
+void ChannelState::AddPeer(
+ const Connection *connection, int node_index,
+ ServerConnection *server_connection_statistics, bool logged_remotely,
+ aos::Sender<logger::MessageHeader> *timestamp_logger) {
peers_.emplace_back(connection, node_index, server_connection_statistics,
- logged_remotely);
+ logged_remotely, timestamp_logger);
}
int ChannelState::NodeDisconnected(sctp_assoc_t assoc_id) {
@@ -168,6 +211,7 @@
CHECK(event_loop_->node() != nullptr) << ": No nodes configured.";
int32_t max_size = 0;
+ timestamp_loggers_.resize(event_loop->configuration()->nodes()->size());
// Seed up all the per-node connection state.
// We are making the assumption here that every connection is bidirectional
@@ -217,13 +261,30 @@
for (const Connection *connection : *channel->destination_nodes()) {
const Node *other_node = configuration::GetNode(
event_loop_->configuration(), connection->name()->string_view());
+ const size_t other_node_index = configuration::GetNodeIndex(
+ event_loop_->configuration(), other_node);
+
+ const bool delivery_time_is_logged =
+ configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ connection, event_loop_->node());
+
+ // Conditionally create the timestamp logger if we are supposed to log
+ // timestamps from it.
+ if (delivery_time_is_logged && !timestamp_loggers_[other_node_index]) {
+ timestamp_loggers_[other_node_index] =
+ event_loop_->MakeSender<logger::MessageHeader>(
+ absl::StrCat("/aos/remote_timestamps/",
+ connection->name()->string_view()));
+ }
state->AddPeer(
connection,
configuration::GetNodeIndex(event_loop_->configuration(),
connection->name()->string_view()),
server_status_.FindServerConnection(
connection->name()->string_view()),
- configuration::ChannelMessageIsLoggedOnNode(channel, other_node));
+ configuration::ChannelMessageIsLoggedOnNode(channel, other_node),
+ delivery_time_is_logged ? ×tamp_loggers_[other_node_index]
+ : nullptr);
}
// Don't subscribe to timestamps on the timestamp channel. Those get
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index b6a4f05..ad384f4 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -32,10 +32,12 @@
struct Peer {
Peer(const Connection *new_connection, int new_node_index,
ServerConnection *new_server_connection_statistics,
- bool new_logged_remotely)
+ bool new_logged_remotely,
+ aos::Sender<logger::MessageHeader> *new_timestamp_logger)
: connection(new_connection),
node_index(new_node_index),
server_connection_statistics(new_server_connection_statistics),
+ timestamp_logger(new_timestamp_logger),
logged_remotely(new_logged_remotely) {}
// Valid if != 0.
@@ -45,6 +47,7 @@
const aos::Connection *connection;
const int node_index;
ServerConnection *server_connection_statistics;
+ aos::Sender<logger::MessageHeader> *timestamp_logger = nullptr;
// If true, this message will be logged on a receiving node. We need to
// keep it around to log it locally if that fails.
@@ -60,7 +63,8 @@
// Adds a new peer.
void AddPeer(const Connection *connection, int node_index,
ServerConnection *server_connection_statistics,
- bool logged_remotely);
+ bool logged_remotely,
+ aos::Sender<logger::MessageHeader> *timestamp_logger);
// Returns true if this channel has the same name and type as the other
// channel.
@@ -111,6 +115,7 @@
// Event loop to schedule everything on.
aos::ShmEventLoop *event_loop_;
+ std::vector<aos::Sender<logger::MessageHeader>> timestamp_loggers_;
SctpServer server_;
MessageBridgeServerStatus server_status_;
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 8d52292..aa0a034 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -3,6 +3,7 @@
#include <chrono>
#include <thread>
+#include "absl/strings/str_cat.h"
#include "aos/events/ping_generated.h"
#include "aos/events/pong_generated.h"
#include "aos/network/message_bridge_client_lib.h"
@@ -10,11 +11,22 @@
#include "aos/network/team_number.h"
namespace aos {
+void SetShmBase(const std::string_view base);
+
namespace message_bridge {
namespace testing {
namespace chrono = std::chrono;
+void DoSetShmBase(const std::string_view node) {
+ const char *tmpdir_c_str = getenv("TEST_TMPDIR");
+ if (tmpdir_c_str != nullptr) {
+ aos::SetShmBase(absl::StrCat(tmpdir_c_str, "/", node));
+ } else {
+ aos::SetShmBase(absl::StrCat("/dev/shm/", node));
+ }
+}
+
// Test that we can send a ping message over sctp and receive it.
TEST(MessageBridgeTest, PingPong) {
// This is rather annoying to set up. We need to start up a client and
@@ -44,9 +56,11 @@
aos::configuration::ReadConfig(
"aos/network/message_bridge_test_client_config.json");
+ DoSetShmBase("pi1");
FLAGS_application_name = "pi1_message_bridge_server";
// Force ourselves to be "raspberrypi" and allocate everything.
FLAGS_override_hostname = "raspberrypi";
+
aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
@@ -60,9 +74,22 @@
aos::Sender<examples::Ping> ping_sender =
ping_event_loop.MakeSender<examples::Ping>("/test");
+ aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
+ aos::Fetcher<logger::MessageHeader> message_header_fetcher1 =
+ pi1_test_event_loop.MakeFetcher<logger::MessageHeader>(
+ "/pi1/aos/remote_timestamps/pi2");
+
+ // Fetchers for confirming the remote timestamps made it.
+ aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
+ ping_event_loop.MakeFetcher<examples::Ping>("/test");
+ aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
+ ping_event_loop.MakeFetcher<Timestamp>("/aos");
+
// Now do it for "raspberrypi2", the client.
FLAGS_application_name = "pi2_message_bridge_client";
FLAGS_override_hostname = "raspberrypi2";
+ DoSetShmBase("pi2");
+
aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
@@ -80,11 +107,24 @@
aos::Fetcher<ClientStatistics> client_statistics_fetcher =
test_event_loop.MakeFetcher<ClientStatistics>("/aos");
+ aos::Fetcher<logger::MessageHeader> message_header_fetcher2 =
+ test_event_loop.MakeFetcher<logger::MessageHeader>(
+ "/pi2/aos/remote_timestamps/pi1");
+
+ // Event loop for fetching data delivered to pi2 from pi1 to match up
+ // messages.
+ aos::ShmEventLoop delivered_messages_event_loop(&pi2_config.message());
+ aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
+ delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
+ aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
+ delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
+ EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
+ EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
// Count the pongs.
int pong_count = 0;
pong_event_loop.MakeWatcher(
- "/test2", [&pong_count](const examples::Ping &ping) {
+ "/test", [&pong_count](const examples::Ping &ping) {
++pong_count;
LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
});
@@ -191,11 +231,11 @@
ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp ×tamp) {
EXPECT_TRUE(timestamp.has_offsets());
- LOG(INFO) << FlatbufferToJson(×tamp);
+ LOG(INFO) << "/pi1/aos Timestamp " << FlatbufferToJson(×tamp);
});
pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp ×tamp) {
EXPECT_TRUE(timestamp.has_offsets());
- LOG(INFO) << FlatbufferToJson(×tamp);
+ LOG(INFO) << "/pi2/aos Timestamp " << FlatbufferToJson(×tamp);
});
// Run for 5 seconds to make sure we have time to estimate the offset.
@@ -206,6 +246,96 @@
quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
});
+ // Find the channel index for both the /pi1/aos Timestamp channel and Ping
+ // channel.
+ const size_t pi1_timestamp_channel = configuration::ChannelIndex(
+ pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
+ const size_t ping_timestamp_channel =
+ configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
+ ping_on_pi2_fetcher.channel());
+
+ for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
+ VLOG(1) << "Channel "
+ << configuration::ChannelIndex(ping_event_loop.configuration(),
+ channel)
+ << " " << configuration::CleanedChannelToString(channel);
+ }
+
+ // For each remote timestamp we get back, confirm that it is either a ping
+ // message, or a timestamp we sent out. Also confirm that the timestamps are
+ // correct.
+ ping_event_loop.MakeWatcher(
+ "/pi1/aos/remote_timestamps/pi2",
+ [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
+ &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
+ &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
+ VLOG(1) << aos::FlatbufferToJson(&header);
+
+ const aos::monotonic_clock::time_point header_monotonic_sent_time(
+ chrono::nanoseconds(header.monotonic_sent_time()));
+ const aos::realtime_clock::time_point header_realtime_sent_time(
+ chrono::nanoseconds(header.realtime_sent_time()));
+ const aos::monotonic_clock::time_point header_monotonic_remote_time(
+ chrono::nanoseconds(header.monotonic_remote_time()));
+ const aos::realtime_clock::time_point header_realtime_remote_time(
+ chrono::nanoseconds(header.realtime_remote_time()));
+
+ const Context *pi1_context = nullptr;
+ const Context *pi2_context = nullptr;
+
+ if (header.channel_index() == pi1_timestamp_channel) {
+ // Find the forwarded message.
+ while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
+ header_monotonic_sent_time) {
+ ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
+ }
+
+ // And the source message.
+ while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
+ header_monotonic_remote_time) {
+ ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
+ }
+
+ pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
+ pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
+ } else if (header.channel_index() == ping_timestamp_channel) {
+ // Find the forwarded message.
+ while (ping_on_pi2_fetcher.context().monotonic_event_time <
+ header_monotonic_sent_time) {
+ ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
+ }
+
+ // And the source message.
+ while (ping_on_pi1_fetcher.context().monotonic_event_time <
+ header_monotonic_remote_time) {
+ ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
+ }
+
+ pi1_context = &ping_on_pi1_fetcher.context();
+ pi2_context = &ping_on_pi2_fetcher.context();
+ } else {
+ LOG(FATAL) << "Unknown channel";
+ }
+
+ // Confirm the forwarded message has matching timestamps to the
+ // timestamps we got back.
+ EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi2_context->monotonic_event_time,
+ header_monotonic_sent_time);
+ EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
+ EXPECT_EQ(pi2_context->realtime_remote_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi2_context->monotonic_remote_time,
+ header_monotonic_remote_time);
+
+ // Confirm the forwarded message also matches the source message.
+ EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi1_context->monotonic_event_time,
+ header_monotonic_remote_time);
+ EXPECT_EQ(pi1_context->realtime_event_time,
+ header_realtime_remote_time);
+ });
+
// Start everything up. Pong is the only thing we don't know how to wait on,
// so start it first.
std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
@@ -261,6 +391,10 @@
EXPECT_GE(pi2_server_statistics_count, 2);
EXPECT_GE(pi1_client_statistics_count, 2);
EXPECT_GE(pi2_client_statistics_count, 2);
+
+ // Confirm we got timestamps back!
+ EXPECT_TRUE(message_header_fetcher1.Fetch());
+ EXPECT_TRUE(message_header_fetcher2.Fetch());
}
// Test that the client disconnecting triggers the server offsets on both sides
@@ -290,6 +424,7 @@
FLAGS_application_name = "pi1_message_bridge_server";
// Force ourselves to be "raspberrypi" and allocate everything.
FLAGS_override_hostname = "raspberrypi";
+ DoSetShmBase("pi1");
aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
@@ -305,6 +440,7 @@
// Now do it for "raspberrypi2", the client.
FLAGS_override_hostname = "raspberrypi2";
+ DoSetShmBase("pi2");
FLAGS_application_name = "pi2_message_bridge_server";
aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
@@ -496,6 +632,7 @@
FLAGS_application_name = "pi1_message_bridge_server";
// Force ourselves to be "raspberrypi" and allocate everything.
FLAGS_override_hostname = "raspberrypi";
+ DoSetShmBase("pi1");
aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
@@ -513,6 +650,7 @@
// Now do it for "raspberrypi2", the client.
FLAGS_override_hostname = "raspberrypi2";
+ DoSetShmBase("pi2");
FLAGS_application_name = "pi2_message_bridge_client";
aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index a8475b1..3ac2796 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -25,7 +25,9 @@
"destination_nodes": [
{
"name": "pi2",
- "priority": 1
+ "priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
}
]
},
@@ -38,25 +40,13 @@
"destination_nodes": [
{
"name": "pi1",
- "priority": 1
+ "priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
}
]
},
{
- "name": "/pi1_forwarded/aos",
- "type": "aos.message_bridge.Timestamp",
- "source_node": "pi2",
- "frequency": 10,
- "max_size": 200
- },
- {
- "name": "/pi2_forwarded/aos",
- "type": "aos.message_bridge.Timestamp",
- "source_node": "pi1",
- "frequency": 10,
- "max_size": 200
- },
- {
"name": "/pi1/aos",
"type": "aos.message_bridge.ServerStatistics",
"source_node": "pi1",
@@ -81,6 +71,18 @@
"frequency": 2
},
{
+ "name": "/pi1/aos/remote_timestamps/pi2",
+ "type": "aos.logger.MessageHeader",
+ "source_node": "pi1",
+ "frequency": 10
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi1",
+ "type": "aos.logger.MessageHeader",
+ "source_node": "pi2",
+ "frequency": 10
+ },
+ {
"name": "/pi1/aos",
"type": "aos.timing.Report",
"source_node": "pi1",
@@ -110,11 +112,6 @@
]
},
{
- "name": "/test2",
- "type": "aos.examples.Ping",
- "source_node": "pi2"
- },
- {
"name": "/test",
"type": "aos.examples.Pong",
"source_node": "pi2",
@@ -148,34 +145,6 @@
"rename": {
"name": "/pi2/aos"
}
- },
- {
- "match": {
- "name": "/test",
- "type": "aos.examples.Ping",
- "source_node": "pi2"
- },
- "rename": {
- "name": "/test2"
- }
- },
- {
- "match": {
- "name": "/pi1/aos*",
- "source_node": "pi2"
- },
- "rename": {
- "name": "/pi1_forwarded/aos"
- }
- },
- {
- "match": {
- "name": "/pi2/aos*",
- "source_node": "pi1"
- },
- "rename": {
- "name": "/pi2_forwarded/aos"
- }
}
]
}
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index ba8f9bd..116607c 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -49,7 +49,7 @@
VLOG(1) << " " << this << " Sample at " << monotonic_now << " is "
<< sample_ns.count() << "ns, Base is " << base_offset_.count();
CHECK_GE(monotonic_now, last_time_)
- << ": Being asked to filter backwards in time!";
+ << ": " << this << " Being asked to filter backwards in time!";
// Compute the sample offset as a double (seconds), taking into account the
// base offset.
const double sample =
@@ -380,7 +380,7 @@
const double hard_max = fwd_.offset();
const double hard_min = -rev_.offset();
const double average = (hard_max + hard_min) / 2.0;
- VLOG(1) << " Max(fwd) " << hard_max << " min(rev) " << hard_min;
+ VLOG(1) << this << " Max(fwd) " << hard_max << " min(rev) " << hard_min;
// We don't want to clip the offset to the hard min/max. We really want to
// keep it within a band around the middle. ratio of 0.3 means stay within
// +- 0.15 of the middle of the hard min and max.
@@ -390,7 +390,7 @@
// Update regardless for the first sample from both the min and max.
if (*last_time == aos::monotonic_clock::min_time) {
- VLOG(1) << " No last time " << average;
+ VLOG(1) << this << " No last time " << average;
offset_ = average;
offset_velocity_ = 0.0;
} else {
@@ -415,7 +415,7 @@
(offset_velocity_ -
(fwd_.filtered_velocity() - rev_.filtered_velocity()) / 2.0);
- VLOG(1) << " last time " << offset_;
+ VLOG(1) << this << " last time " << offset_;
}
*last_time = monotonic_now;
@@ -424,14 +424,14 @@
// reverse samples.
if (!MissingSamples()) {
*sample_pointer_ = offset_;
- VLOG(1) << "Updating sample to " << offset_;
+ VLOG(1) << this << " Updating sample to " << offset_;
} else {
- VLOG(1) << "Don't have both samples.";
+ VLOG(1) << this << " Don't have both samples.";
if (last_fwd_time_ == aos::monotonic_clock::min_time) {
- VLOG(1) << " Missing forward";
+ VLOG(1) << this << " Missing forward";
}
if (last_rev_time_ == aos::monotonic_clock::min_time) {
- VLOG(1) << " Missing reverse";
+ VLOG(1) << this << " Missing reverse";
}
}
}
@@ -657,6 +657,8 @@
void NoncausalOffsetEstimator::Sample(
const Node *node, aos::monotonic_clock::time_point node_delivered_time,
aos::monotonic_clock::time_point other_node_sent_time) {
+ VLOG(1) << "Sample delivered " << node_delivered_time << " sent "
+ << other_node_sent_time << " to " << node->name()->string_view();
if (node == node_a_) {
if (a_.Sample(node_delivered_time,
other_node_sent_time - node_delivered_time)) {
@@ -740,11 +742,16 @@
}
fit_ = AverageFits(a_.FitLine(), b_.FitLine());
if (offset_pointer_) {
+ VLOG(1) << "Setting offset to " << fit_.mpq_offset();
*offset_pointer_ = fit_.mpq_offset();
}
if (slope_pointer_) {
+ VLOG(1) << "Setting slope to " << fit_.mpq_slope();
*slope_pointer_ = -fit_.mpq_slope();
}
+ if (valid_pointer_) {
+ *valid_pointer_ = true;
+ }
if (VLOG_IS_ON(1)) {
LogFit("Refitting to");
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 6994a8a..10f436e 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -408,6 +408,9 @@
void set_slope_pointer(mpq_class *slope_pointer) {
slope_pointer_ = slope_pointer;
}
+ void set_valid_pointer(bool *valid_pointer) {
+ valid_pointer_ = valid_pointer;
+ }
// Returns the data points from each filter.
const std::deque<
@@ -441,6 +444,7 @@
mpq_class *offset_pointer_ = nullptr;
mpq_class *slope_pointer_ = nullptr;
+ bool *valid_pointer_ = nullptr;
Line fit_{std::chrono::nanoseconds(0), 0.0};
diff --git a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
index 3c206b3..1f362f3 100644
--- a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
+++ b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
@@ -63,10 +63,8 @@
if (!FLAGS_output_file.empty()) {
unlink(FLAGS_output_file.c_str());
- log_buffer_writer_ = std::make_unique<aos::logger::DetachedBufferWriter>(
- FLAGS_output_file);
logger_event_loop_ = MakeEventLoop("logger");
- logger_ = std::make_unique<aos::logger::Logger>(log_buffer_writer_.get(),
+ logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
logger_event_loop_.get());
}
diff --git a/y2019/control_loops/drivetrain/drivetrain_replay.cc b/y2019/control_loops/drivetrain/drivetrain_replay.cc
index 3689970..f7b69e4 100644
--- a/y2019/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2019/control_loops/drivetrain/drivetrain_replay.cc
@@ -15,7 +15,7 @@
"Name of the logfile to read from.");
DEFINE_string(config, "y2019/config.json",
"Name of the config file to replay using.");
-DEFINE_string(output_file, "/tmp/replayed.bfbs",
+DEFINE_string(output_file, "/tmp/replayed",
"Name of the logfile to write replayed data to.");
DEFINE_int32(team, 971, "Team number to use for logfile replay.");
int main(int argc, char **argv) {
@@ -37,13 +37,12 @@
"frc971.control_loops.drivetrain.Output");
reader.Register();
- aos::logger::DetachedBufferWriter file_writer(FLAGS_output_file);
std::unique_ptr<aos::EventLoop> log_writer_event_loop =
reader.event_loop_factory()->MakeEventLoop("log_writer");
log_writer_event_loop->SkipTimingReport();
log_writer_event_loop->SkipAosLog();
CHECK(nullptr == log_writer_event_loop->node());
- aos::logger::Logger writer(&file_writer, log_writer_event_loop.get());
+ aos::logger::Logger writer(FLAGS_output_file, log_writer_event_loop.get());
std::unique_ptr<aos::EventLoop> drivetrain_event_loop =
reader.event_loop_factory()->MakeEventLoop("drivetrain");
diff --git a/y2020/BUILD b/y2020/BUILD
index cd977f9..918ae1d 100644
--- a/y2020/BUILD
+++ b/y2020/BUILD
@@ -168,6 +168,7 @@
"//y2020/vision/sift:sift_fbs",
"//y2020/vision/sift:sift_training_fbs",
"//y2020/vision:vision_fbs",
+ "//aos/events/logging:logger_fbs",
],
visibility = ["//visibility:public"],
deps = [
diff --git a/y2020/control_loops/drivetrain/drivetrain_replay.cc b/y2020/control_loops/drivetrain/drivetrain_replay.cc
index 5a97943..0dd5a1a 100644
--- a/y2020/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2020/control_loops/drivetrain/drivetrain_replay.cc
@@ -46,11 +46,10 @@
node = aos::configuration::GetNode(reader.configuration(), "roborio");
}
- aos::logger::DetachedBufferWriter file_writer(FLAGS_output_file);
std::unique_ptr<aos::EventLoop> log_writer_event_loop =
reader.event_loop_factory()->MakeEventLoop("log_writer", node);
log_writer_event_loop->SkipTimingReport();
- aos::logger::Logger writer(&file_writer, log_writer_event_loop.get());
+ aos::logger::Logger writer(FLAGS_output_file, log_writer_event_loop.get());
std::unique_ptr<aos::EventLoop> drivetrain_event_loop =
reader.event_loop_factory()->MakeEventLoop("drivetrain", node);
diff --git a/y2020/control_loops/drivetrain/localizer_test.cc b/y2020/control_loops/drivetrain/localizer_test.cc
index a7360c1..ae9f887 100644
--- a/y2020/control_loops/drivetrain/localizer_test.cc
+++ b/y2020/control_loops/drivetrain/localizer_test.cc
@@ -135,10 +135,8 @@
set_battery_voltage(12.0);
if (!FLAGS_output_file.empty()) {
- log_buffer_writer_ = std::make_unique<aos::logger::DetachedBufferWriter>(
- FLAGS_output_file);
logger_event_loop_ = MakeEventLoop("logger", roborio_);
- logger_ = std::make_unique<aos::logger::Logger>(log_buffer_writer_.get(),
+ logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
logger_event_loop_.get());
}
@@ -399,7 +397,6 @@
double turret_velocity_ = 0.0; // rad / sec
std::unique_ptr<aos::EventLoop> logger_event_loop_;
- std::unique_ptr<aos::logger::DetachedBufferWriter> log_buffer_writer_;
std::unique_ptr<aos::logger::Logger> logger_;
};
diff --git a/y2020/control_loops/superstructure/superstructure_lib_test.cc b/y2020/control_loops/superstructure/superstructure_lib_test.cc
index 1d42d23..8a0589c 100644
--- a/y2020/control_loops/superstructure/superstructure_lib_test.cc
+++ b/y2020/control_loops/superstructure/superstructure_lib_test.cc
@@ -439,10 +439,8 @@
if (!FLAGS_output_file.empty()) {
unlink(FLAGS_output_file.c_str());
- log_buffer_writer_ = std::make_unique<aos::logger::DetachedBufferWriter>(
- FLAGS_output_file);
logger_event_loop_ = MakeEventLoop("logger", roborio_);
- logger_ = std::make_unique<aos::logger::Logger>(log_buffer_writer_.get(),
+ logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
logger_event_loop_.get());
}
}
@@ -550,7 +548,6 @@
SuperstructureSimulation superstructure_plant_;
std::unique_ptr<aos::EventLoop> logger_event_loop_;
- std::unique_ptr<aos::logger::DetachedBufferWriter> log_buffer_writer_;
std::unique_ptr<aos::logger::Logger> logger_;
};
diff --git a/y2020/y2020_laptop.json b/y2020/y2020_laptop.json
index 44c4705..0797110 100644
--- a/y2020/y2020_laptop.json
+++ b/y2020/y2020_laptop.json
@@ -175,6 +175,51 @@
]
},
{
+ "name": "/laptop/aos/remote_timestamps/roborio",
+ "type": "aos.logger.MessageHeader",
+ "source_node": "laptop",
+ "logger": "NOT_LOGGED",
+ "frequency": 20,
+ "num_senders": 2,
+ "max_size": 300
+ },
+ {
+ "name": "/laptop/aos/remote_timestamps/pi1",
+ "type": "aos.logger.MessageHeader",
+ "source_node": "laptop",
+ "logger": "NOT_LOGGED",
+ "frequency": 20,
+ "num_senders": 2,
+ "max_size": 300
+ },
+ {
+ "name": "/laptop/aos/remote_timestamps/pi2",
+ "type": "aos.logger.MessageHeader",
+ "source_node": "laptop",
+ "logger": "NOT_LOGGED",
+ "frequency": 20,
+ "num_senders": 2,
+ "max_size": 300
+ },
+ {
+ "name": "/laptop/aos/remote_timestamps/pi3",
+ "type": "aos.logger.MessageHeader",
+ "source_node": "laptop",
+ "logger": "NOT_LOGGED",
+ "frequency": 20,
+ "num_senders": 2,
+ "max_size": 300
+ },
+ {
+ "name": "/laptop/aos/remote_timestamps/pi4",
+ "type": "aos.logger.MessageHeader",
+ "source_node": "laptop",
+ "logger": "NOT_LOGGED",
+ "frequency": 20,
+ "num_senders": 2,
+ "max_size": 300
+ },
+ {
"name": "/pi1/camera",
"type": "frc971.vision.CameraImage",
"source_node": "pi1",