Fix log sorting for good
Unfortunately, this is really hard to split up, so a bunch of stuff
happens at once.
When a message is sent from node A -> node B, add support for sending
that timestamp back from node B to node A for logging.
{
"name": "/pi1/aos",
"type": "aos.message_bridge.Timestamp",
"source_node": "pi1",
"frequency": 10,
"max_size": 200,
"destination_nodes": [
{
"name": "pi2",
"priority": 1,
"timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
"timestamp_logger_nodes": ["pi1"]
}
]
},
This gives us a way to log enough information on node A such that
everything is self contained. We know all the messages we sent to B,
and when they got there, so we can recreate the time offset and replay
the node.
This data is then published over
{ "name": "/aos/remote_timestamps/pi2", "type": ".aos.logger.MessageHeader"}
The logger then treats that channel specially and log the contents
directly as though the message contents were received on the remote
node.
This (among other things) exposes log sorting problems. Use our fancy
new infinite precision noncausal filter to estimate time precise enough
to actually order events. This gets us down to 2-3 ns of error due to
integer precision.
Change-Id: Ia843c5176a2c4efc227e669c07d7bb4c7cbe7c91
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 242638c..89839ec 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -8,6 +8,7 @@
#include <vector>
#include "Eigen/Dense"
+#include "absl/strings/escaping.h"
#include "absl/types/span.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/logger_generated.h"
@@ -15,6 +16,7 @@
#include "aos/network/team_number.h"
#include "aos/time/time.h"
#include "flatbuffers/flatbuffers.h"
+#include "third_party/gmp/gmpxx.h"
DEFINE_bool(skip_missing_forwarding_entries, false,
"If true, drop any forwarding entries with missing data. If "
@@ -25,14 +27,50 @@
"of CSV files in /tmp/. This should only be needed when debugging "
"time synchronization.");
+DEFINE_bool(skip_order_validation, false,
+ "If true, ignore any out of orderness in replay");
+
namespace aos {
namespace logger {
-
namespace chrono = std::chrono;
-Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
+void MultiNodeLogNamer::WriteHeader(
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header,
+ const Node *node) {
+ if (node == this->node()) {
+ data_writer_->WriteSizedFlatbuffer(header.full_span());
+ } else {
+ for (std::pair<const Channel *const, DataWriter> &data_writer :
+ data_writers_) {
+ if (node == data_writer.second.node) {
+ data_writer.second.writer->WriteSizedFlatbuffer(header.full_span());
+ }
+ }
+ }
+}
+
+void MultiNodeLogNamer::Rotate(
+ const Node *node,
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header) {
+ if (node == this->node()) {
+ ++part_number_;
+ *data_writer_ = std::move(*OpenDataWriter());
+ data_writer_->WriteSizedFlatbuffer(header.full_span());
+ } else {
+ for (std::pair<const Channel *const, DataWriter> &data_writer :
+ data_writers_) {
+ if (node == data_writer.second.node) {
+ ++data_writer.second.part_number;
+ data_writer.second.rotate(data_writer.first, &data_writer.second);
+ data_writer.second.writer->WriteSizedFlatbuffer(header.full_span());
+ }
+ }
+ }
+}
+
+Logger::Logger(std::string_view base_name, EventLoop *event_loop,
std::chrono::milliseconds polling_period)
- : Logger(std::make_unique<LocalLogNamer>(writer, event_loop->node()),
+ : Logger(std::make_unique<LocalLogNamer>(base_name, event_loop->node()),
event_loop, polling_period) {}
Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
@@ -40,11 +78,60 @@
: event_loop_(event_loop),
log_namer_(std::move(log_namer)),
timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
- polling_period_(polling_period) {
+ polling_period_(polling_period),
+ server_statistics_fetcher_(
+ configuration::MultiNode(event_loop_->configuration())
+ ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
+ "/aos")
+ : aos::Fetcher<message_bridge::ServerStatistics>()) {
VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
int channel_index = 0;
+
+ // Find all the nodes which are logging timestamps on our node.
+ std::set<const Node *> timestamp_logger_nodes;
+ for (const Channel *channel : *event_loop_->configuration()->channels()) {
+ if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) ||
+ !channel->has_destination_nodes()) {
+ continue;
+ }
+ for (const Connection *connection : *channel->destination_nodes()) {
+ const Node *other_node = configuration::GetNode(
+ event_loop_->configuration(), connection->name()->string_view());
+
+ if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ connection, event_loop_->node())) {
+ VLOG(1) << "Timestamps are logged from "
+ << FlatbufferToJson(other_node);
+ timestamp_logger_nodes.insert(other_node);
+ }
+ }
+ }
+
+ std::map<const Channel *, const Node *> timestamp_logger_channels;
+
+ // Now that we have all the nodes accumulated, make remote timestamp loggers
+ // for them.
+ for (const Node *node : timestamp_logger_nodes) {
+ const Channel *channel = configuration::GetChannel(
+ event_loop_->configuration(),
+ absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
+ logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
+ event_loop_->node());
+
+ CHECK(channel != nullptr)
+ << ": Remote timestamps are logged on "
+ << event_loop_->node()->name()->string_view()
+ << " but can't find channel /aos/remote_timestamps/"
+ << node->name()->string_view();
+ timestamp_logger_channels.insert(std::make_pair(channel, node));
+ }
+
+ const size_t our_node_index = configuration::GetNodeIndex(
+ event_loop_->configuration(), event_loop_->node());
+
for (const Channel *channel : *event_loop_->configuration()->channels()) {
FetcherStruct fs;
+ fs.node_index = our_node_index;
const bool is_local =
configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
@@ -60,7 +147,15 @@
: configuration::ConnectionDeliveryTimeIsLoggedOnNode(
channel, event_loop_->node(), event_loop_->node());
- if (log_message || log_delivery_times) {
+ // Now, detect a MessageHeader timestamp logger where we should just log the
+ // contents to a file directly.
+ const bool log_contents = timestamp_logger_channels.find(channel) !=
+ timestamp_logger_channels.end();
+ const Node *timestamp_node =
+ log_contents ? timestamp_logger_channels.find(channel)->second
+ : nullptr;
+
+ if (log_message || log_delivery_times || log_contents) {
fs.fetcher = event_loop->MakeRawFetcher(channel);
VLOG(1) << "Logging channel "
<< configuration::CleanedChannelToString(channel);
@@ -76,6 +171,14 @@
fs.log_type = LogType::kLogRemoteMessage;
}
}
+ if (log_contents) {
+ VLOG(1) << "Timestamp logger channel "
+ << configuration::CleanedChannelToString(channel);
+ fs.contents_writer =
+ log_namer_->MakeForwardedTimestampWriter(channel, timestamp_node);
+ fs.node_index = configuration::GetNodeIndex(
+ event_loop_->configuration(), timestamp_node);
+ }
fs.channel_index = channel_index;
fs.written = false;
fetchers_.emplace_back(std::move(fs));
@@ -83,49 +186,178 @@
++channel_index;
}
- // When things start, we want to log the header, then the most recent messages
- // available on each fetcher to capture the previous state, then start
- // polling.
- event_loop_->OnRun([this, polling_period]() {
- // Grab data from each channel right before we declare the log file started
- // so we can capture the latest message on each channel. This lets us have
- // non periodic messages with configuration that now get logged.
- for (FetcherStruct &f : fetchers_) {
- f.written = !f.fetcher->Fetch();
- }
+ node_state_.resize(configuration::MultiNode(event_loop_->configuration())
+ ? event_loop_->configuration()->nodes()->size()
+ : 1u);
- // We need to pick a point in time to declare the log file "started". This
- // starts here. It needs to be after everything is fetched so that the
- // fetchers are all pointed at the most recent message before the start
- // time.
- monotonic_start_time_ = event_loop_->monotonic_now();
- realtime_start_time_ = event_loop_->realtime_now();
- last_synchronized_time_ = monotonic_start_time_;
+ for (const Node *node : log_namer_->nodes()) {
+ const int node_index =
+ configuration::GetNodeIndex(event_loop_->configuration(), node);
- LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
- << " start_time " << monotonic_start_time_;
+ node_state_[node_index].log_file_header = MakeHeader(node);
+ }
- WriteHeader();
-
- timer_handler_->Setup(event_loop_->monotonic_now() + polling_period,
- polling_period);
- });
+ // When things start, we want to log the header, then the most recent
+ // messages available on each fetcher to capture the previous state, then
+ // start polling.
+ event_loop_->OnRun([this]() { StartLogging(); });
}
-// TODO(austin): Set the remote start time to the first time we see a remote
-// message when we are logging those messages separate? Need to signal what to
-// do, or how to get a good timestamp.
+void Logger::StartLogging() {
+ // Grab data from each channel right before we declare the log file started
+ // so we can capture the latest message on each channel. This lets us have
+ // non periodic messages with configuration that now get logged.
+ for (FetcherStruct &f : fetchers_) {
+ f.written = !f.fetcher->Fetch();
+ }
+
+ // Clear out any old timestamps in case we are re-starting logging.
+ for (size_t i = 0; i < node_state_.size(); ++i) {
+ SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time);
+ }
+
+ WriteHeader();
+
+ LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
+ << " start_time " << last_synchronized_time_;
+
+ timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
+ polling_period_);
+}
+
void Logger::WriteHeader() {
+ if (configuration::MultiNode(event_loop_->configuration())) {
+ server_statistics_fetcher_.Fetch();
+ }
+
+ aos::monotonic_clock::time_point monotonic_start_time =
+ event_loop_->monotonic_now();
+ aos::realtime_clock::time_point realtime_start_time =
+ event_loop_->realtime_now();
+
+ // We need to pick a point in time to declare the log file "started". This
+ // starts here. It needs to be after everything is fetched so that the
+ // fetchers are all pointed at the most recent message before the start
+ // time.
+ last_synchronized_time_ = monotonic_start_time;
+
for (const Node *node : log_namer_->nodes()) {
- WriteHeader(node);
+ const int node_index =
+ configuration::GetNodeIndex(event_loop_->configuration(), node);
+ MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
+ realtime_start_time);
+ log_namer_->WriteHeader(node_state_[node_index].log_file_header, node);
}
}
-void Logger::WriteHeader(const Node *node) {
+void Logger::WriteMissingTimestamps() {
+ if (configuration::MultiNode(event_loop_->configuration())) {
+ server_statistics_fetcher_.Fetch();
+ } else {
+ return;
+ }
+
+ if (server_statistics_fetcher_.get() == nullptr) {
+ return;
+ }
+
+ for (const Node *node : log_namer_->nodes()) {
+ const int node_index =
+ configuration::GetNodeIndex(event_loop_->configuration(), node);
+ if (MaybeUpdateTimestamp(
+ node, node_index,
+ server_statistics_fetcher_.context().monotonic_event_time,
+ server_statistics_fetcher_.context().realtime_event_time)) {
+ log_namer_->Rotate(node, node_state_[node_index].log_file_header);
+ }
+ }
+}
+
+void Logger::SetStartTime(size_t node_index,
+ aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time) {
+ node_state_[node_index].monotonic_start_time = monotonic_start_time;
+ node_state_[node_index].realtime_start_time = realtime_start_time;
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_monotonic_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ monotonic_start_time.time_since_epoch())
+ .count());
+ if (node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->has_realtime_start_time()) {
+ node_state_[node_index]
+ .log_file_header.mutable_message()
+ ->mutate_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_start_time.time_since_epoch())
+ .count());
+ }
+}
+
+bool Logger::MaybeUpdateTimestamp(
+ const Node *node, int node_index,
+ aos::monotonic_clock::time_point monotonic_start_time,
+ aos::realtime_clock::time_point realtime_start_time) {
+ // Bail early if there the start times are already set.
+ if (node_state_[node_index].monotonic_start_time !=
+ monotonic_clock::min_time) {
+ return false;
+ }
+ if (configuration::MultiNode(event_loop_->configuration())) {
+ if (event_loop_->node() == node) {
+ // There are no offsets to compute for ourself, so always succeed.
+ SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+ return true;
+ } else if (server_statistics_fetcher_.get() != nullptr) {
+ // We must be a remote node now. Look for the connection and see if it is
+ // connected.
+
+ for (const message_bridge::ServerConnection *connection :
+ *server_statistics_fetcher_->connections()) {
+ if (connection->node()->name()->string_view() !=
+ node->name()->string_view()) {
+ continue;
+ }
+
+ if (connection->state() != message_bridge::State::CONNECTED) {
+ VLOG(1) << node->name()->string_view()
+ << " is not connected, can't start it yet.";
+ break;
+ }
+
+ if (!connection->has_monotonic_offset()) {
+ VLOG(1) << "Missing monotonic offset for setting start time for node "
+ << aos::FlatbufferToJson(node);
+ break;
+ }
+
+ VLOG(1) << "Updating start time for " << aos::FlatbufferToJson(node);
+
+ // Found it and it is connected. Compensate and go.
+ monotonic_start_time +=
+ std::chrono::nanoseconds(connection->monotonic_offset());
+
+ SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+ return true;
+ }
+ }
+ } else {
+ SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+ return true;
+ }
+ return false;
+}
+
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
+ const Node *node) {
// Now write the header with this timestamp in it.
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
+ // TODO(austin): Compress this much more efficiently. There are a bunch of
+ // duplicated schemas.
flatbuffers::Offset<aos::Configuration> configuration_offset =
CopyFlatBuffer(event_loop_->configuration(), &fbb);
@@ -133,7 +365,8 @@
fbb.CreateString(network::GetHostname());
flatbuffers::Offset<Node> node_offset;
- if (event_loop_->node() != nullptr) {
+
+ if (configuration::MultiNode(event_loop_->configuration())) {
node_offset = CopyFlatBuffer(node, &fbb);
}
@@ -158,41 +391,133 @@
log_file_header_builder.add_monotonic_start_time(
std::chrono::duration_cast<std::chrono::nanoseconds>(
- monotonic_start_time_.time_since_epoch())
+ monotonic_clock::min_time.time_since_epoch())
.count());
- log_file_header_builder.add_realtime_start_time(
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- realtime_start_time_.time_since_epoch())
- .count());
-
- fbb.FinishSizePrefixed(log_file_header_builder.Finish());
- log_namer_->WriteHeader(&fbb, node);
-}
-
-void Logger::Rotate(DetachedBufferWriter *writer) {
- Rotate(std::make_unique<LocalLogNamer>(writer, event_loop_->node()));
-}
-
-void Logger::Rotate(std::unique_ptr<LogNamer> log_namer) {
- // Force data up until now to be written.
- DoLogData();
-
- // Swap the writer out, and re-write the header.
- log_namer_ = std::move(log_namer);
-
- // And then update the writers.
- for (FetcherStruct &f : fetchers_) {
- const Channel *channel =
- event_loop_->configuration()->channels()->Get(f.channel_index);
- if (f.timestamp_writer != nullptr) {
- f.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
- }
- if (f.writer != nullptr) {
- f.writer = log_namer_->MakeWriter(channel);
- }
+ if (node == event_loop_->node()) {
+ log_file_header_builder.add_realtime_start_time(
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ realtime_clock::min_time.time_since_epoch())
+ .count());
}
- WriteHeader();
+ fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+ return fbb.Release();
+}
+
+void Logger::Rotate() {
+ for (const Node *node : log_namer_->nodes()) {
+ const int node_index =
+ configuration::GetNodeIndex(event_loop_->configuration(), node);
+ log_namer_->Rotate(node, node_state_[node_index].log_file_header);
+ }
+}
+
+void Logger::LogUntil(monotonic_clock::time_point t) {
+ WriteMissingTimestamps();
+
+ // Write each channel to disk, one at a time.
+ for (FetcherStruct &f : fetchers_) {
+ while (true) {
+ if (f.written) {
+ if (!f.fetcher->FetchNext()) {
+ VLOG(2) << "No new data on "
+ << configuration::CleanedChannelToString(
+ f.fetcher->channel());
+ break;
+ } else {
+ f.written = false;
+ }
+ }
+
+ CHECK(!f.written);
+
+ // TODO(james): Write tests to exercise this logic.
+ if (f.fetcher->context().monotonic_event_time < t) {
+ if (f.writer != nullptr) {
+ // Write!
+ flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+ max_header_size_);
+ fbb.ForceDefaults(true);
+
+ fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+ f.channel_index, f.log_type));
+
+ VLOG(2) << "Writing data as node "
+ << FlatbufferToJson(event_loop_->node()) << " for channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel())
+ << " to " << f.writer->filename() << " data "
+ << FlatbufferToJson(
+ flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
+
+ max_header_size_ = std::max(
+ max_header_size_, fbb.GetSize() - f.fetcher->context().size);
+ f.writer->QueueSizedFlatbuffer(&fbb);
+ }
+
+ if (f.timestamp_writer != nullptr) {
+ // And now handle timestamps.
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+ f.channel_index,
+ LogType::kLogDeliveryTimeOnly));
+
+ VLOG(2) << "Writing timestamps as node "
+ << FlatbufferToJson(event_loop_->node()) << " for channel "
+ << configuration::CleanedChannelToString(f.fetcher->channel())
+ << " to " << f.timestamp_writer->filename() << " timestamp "
+ << FlatbufferToJson(
+ flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
+
+ f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
+ }
+
+ if (f.contents_writer != nullptr) {
+ // And now handle the special message contents channel. Copy the
+ // message into a FlatBufferBuilder and save it to disk.
+ // TODO(austin): We can be more efficient here when we start to
+ // care...
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ const MessageHeader *msg =
+ flatbuffers::GetRoot<MessageHeader>(f.fetcher->context().data);
+
+ logger::MessageHeader::Builder message_header_builder(fbb);
+
+ // Note: this must match the same order as MessageBridgeServer and
+ // PackMessage. We want identical headers to have identical
+ // on-the-wire formats to make comparing them easier.
+ message_header_builder.add_channel_index(msg->channel_index());
+
+ message_header_builder.add_queue_index(msg->queue_index());
+ message_header_builder.add_monotonic_sent_time(
+ msg->monotonic_sent_time());
+ message_header_builder.add_realtime_sent_time(
+ msg->realtime_sent_time());
+
+ message_header_builder.add_monotonic_remote_time(
+ msg->monotonic_remote_time());
+ message_header_builder.add_realtime_remote_time(
+ msg->realtime_remote_time());
+ message_header_builder.add_remote_queue_index(
+ msg->remote_queue_index());
+
+ fbb.FinishSizePrefixed(message_header_builder.Finish());
+
+ f.contents_writer->QueueSizedFlatbuffer(&fbb);
+ }
+
+ f.written = true;
+ } else {
+ break;
+ }
+ }
+ }
+ last_synchronized_time_ = t;
}
void Logger::DoLogData() {
@@ -205,77 +530,8 @@
do {
// Move the sync point up by at most polling_period. This forces one sync
// per iteration, even if it is small.
- last_synchronized_time_ =
- std::min(last_synchronized_time_ + polling_period_, monotonic_now);
- // Write each channel to disk, one at a time.
- for (FetcherStruct &f : fetchers_) {
- while (true) {
- if (f.written) {
- if (!f.fetcher->FetchNext()) {
- VLOG(2) << "No new data on "
- << configuration::CleanedChannelToString(
- f.fetcher->channel());
- break;
- } else {
- f.written = false;
- }
- }
-
- CHECK(!f.written);
-
- // TODO(james): Write tests to exercise this logic.
- if (f.fetcher->context().monotonic_event_time <
- last_synchronized_time_) {
- if (f.writer != nullptr) {
- // Write!
- flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
- max_header_size_);
- fbb.ForceDefaults(true);
-
- fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- f.channel_index, f.log_type));
-
- VLOG(2) << "Writing data as node "
- << FlatbufferToJson(event_loop_->node()) << " for channel "
- << configuration::CleanedChannelToString(
- f.fetcher->channel())
- << " to " << f.writer->filename() << " data "
- << FlatbufferToJson(
- flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
-
- max_header_size_ = std::max(
- max_header_size_, fbb.GetSize() - f.fetcher->context().size);
- f.writer->QueueSizedFlatbuffer(&fbb);
- }
-
- if (f.timestamp_writer != nullptr) {
- // And now handle timestamps.
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
-
- fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- f.channel_index,
- LogType::kLogDeliveryTimeOnly));
-
- VLOG(2) << "Writing timestamps as node "
- << FlatbufferToJson(event_loop_->node()) << " for channel "
- << configuration::CleanedChannelToString(
- f.fetcher->channel())
- << " to " << f.timestamp_writer->filename() << " timestamp "
- << FlatbufferToJson(
- flatbuffers::GetSizePrefixedRoot<MessageHeader>(
- fbb.GetBufferPointer()));
-
- f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
- }
-
- f.written = true;
- } else {
- break;
- }
- }
- }
+ LogUntil(
+ std::min(last_synchronized_time_ + polling_period_, monotonic_now));
// If we missed cycles, we could be pretty far behind. Spin until we are
// caught up.
@@ -302,7 +558,8 @@
if (replay_configuration) {
CHECK_EQ(configuration::MultiNode(configuration()),
configuration::MultiNode(replay_configuration))
- << ": Log file and replay config need to both be multi or single node.";
+ << ": Log file and replay config need to both be multi or single "
+ "node.";
}
if (!configuration::MultiNode(configuration())) {
@@ -312,12 +569,13 @@
if (replay_configuration) {
CHECK_EQ(logged_configuration()->nodes()->size(),
replay_configuration->nodes()->size())
- << ": Log file and replay config need to have matching nodes lists.";
+ << ": Log file and replay config need to have matching nodes "
+ "lists.";
for (const Node *node : *logged_configuration()->nodes()) {
if (configuration::GetNode(replay_configuration, node) == nullptr) {
- LOG(FATAL)
- << "Found node " << FlatbufferToJson(node)
- << " in logged config that is not present in the replay config.";
+ LOG(FATAL) << "Found node " << FlatbufferToJson(node)
+ << " in logged config that is not present in the replay "
+ "config.";
}
}
}
@@ -335,8 +593,8 @@
if (offset_fp_ != nullptr) {
fclose(offset_fp_);
}
- // Zero out some buffers. It's easy to do use-after-frees on these, so make it
- // more obvious.
+ // Zero out some buffers. It's easy to do use-after-frees on these, so make
+ // it more obvious.
if (remapped_configuration_buffer_) {
remapped_configuration_buffer_->Wipe();
}
@@ -352,8 +610,8 @@
}
std::vector<const Node *> LogReader::Nodes() const {
- // Because the Node pointer will only be valid if it actually points to memory
- // owned by remapped_configuration_, we need to wait for the
+ // Because the Node pointer will only be valid if it actually points to
+ // memory owned by remapped_configuration_, we need to wait for the
// remapped_configuration_ to be populated before accessing it.
//
// Also, note, that when ever a map is changed, the nodes in here are
@@ -404,38 +662,45 @@
"you sure that the replay config matches the original config?";
}
- // We need to now seed our per-node time offsets and get everything set up to
- // run.
- const size_t num_nodes = !configuration::MultiNode(logged_configuration())
- ? 1u
- : logged_configuration()->nodes()->size();
+ // We need to now seed our per-node time offsets and get everything set up
+ // to run.
+ const size_t num_nodes = nodes_count();
// It is easiest to solve for per node offsets with a matrix rather than
// trying to solve the equations by hand. So let's get after it.
//
// Now, build up the map matrix.
//
- // sample_matrix_ = map_matrix_ * offset_matrix_
- map_matrix_ = Eigen::MatrixXd::Zero(filters_.size() + 1, num_nodes);
+ // offset_matrix_ = (map_matrix_ + slope_matrix_) * [ta; tb; tc]
+ map_matrix_ = Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ filters_.size() + 1, num_nodes);
+ slope_matrix_ =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ filters_.size() + 1, num_nodes);
- sample_matrix_ = Eigen::VectorXd::Zero(filters_.size() + 1);
- offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
+ offset_matrix_ =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
+ valid_matrix_ =
+ Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
+ last_valid_matrix_ =
+ Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
- // And the base offset matrix, which will be a copy of the initial offset
- // matrix.
- base_offset_matrix_ =
- Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>::Zero(
- num_nodes);
+ time_offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
+ time_slope_matrix_ = Eigen::VectorXd::Zero(num_nodes);
- // All offsets should sum to 0. Add that as the first constraint in our least
- // squares.
- map_matrix_.row(0).setOnes();
+ // All times should average out to the distributed clock.
+ for (int i = 0; i < map_matrix_.cols(); ++i) {
+ // 1/num_nodes.
+ map_matrix_(0, i) = mpq_class(1, num_nodes);
+ }
+ valid_matrix_(0) = true;
{
// Now, add the a - b -> sample elements.
size_t i = 1;
for (std::pair<const std::tuple<const Node *, const Node *>,
- message_bridge::ClippedAverageFilter> &filter : filters_) {
+ std::tuple<message_bridge::NoncausalOffsetEstimator>>
+ &filter : filters_) {
const Node *const node_a = std::get<0>(filter.first);
const Node *const node_b = std::get<1>(filter.first);
@@ -444,136 +709,68 @@
const size_t node_b_index =
configuration::GetNodeIndex(configuration(), node_b);
- // +a
- map_matrix_(i, node_a_index) = 1.0;
- // -b
- map_matrix_(i, node_b_index) = -1.0;
+ // -a
+ map_matrix_(i, node_a_index) = mpq_class(-1);
+ // +b
+ map_matrix_(i, node_b_index) = mpq_class(1);
// -> sample
- filter.second.set_sample_pointer(&sample_matrix_(i, 0));
+ std::get<0>(filter.second)
+ .set_slope_pointer(&slope_matrix_(i, node_a_index));
+ std::get<0>(filter.second).set_offset_pointer(&offset_matrix_(i, 0));
+
+ valid_matrix_(i) = false;
+ std::get<0>(filter.second).set_valid_pointer(&valid_matrix_(i));
++i;
}
}
- // Rank of the map matrix tells you if all the nodes are in communication with
- // each other, which tells you if the offsets are observable.
- const size_t connected_nodes =
- Eigen::FullPivLU<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>>(
- map_matrix_)
- .rank();
-
- // We don't need to support isolated nodes until someone has a real use case.
- CHECK_EQ(connected_nodes, num_nodes)
- << ": There is a node which isn't communicating with the rest.";
-
- // Now, iterate through all the timestamps from all the nodes and seed
- // everything.
- for (std::unique_ptr<State> &state : states_) {
- for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
- TimestampMerger::DeliveryTimestamp timestamp =
- state->OldestTimestampForChannel(i);
- if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
- CHECK(state->MaybeUpdateTimestamp(timestamp, i));
- }
- }
- }
-
- // Make sure all the samples have been seeded.
- for (int i = 1; i < sample_matrix_.cols(); ++i) {
- // The seeding logic is pretty basic right now because we don't have great
- // use cases yet. It wants to see data from every node. Blow up for now,
- // and once we have a reason to do something different, update this logic.
- // Maybe read further in the log file? Or seed off the realtime time?
- CHECK_NE(sample_matrix_(i, 0), 0.0)
- << ": Sample " << i << " is not seeded.";
- }
-
- // And solve.
- offset_matrix_ = SolveOffsets();
-
- // Save off the base offsets so we can work in deltas from here out. That
- // will significantly simplify the numerical precision problems.
- for (size_t i = 0; i < num_nodes; ++i) {
- base_offset_matrix_(i, 0) =
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- std::chrono::duration<double>(offset_matrix_(i, 0)));
- }
-
- {
- // Shift everything so we never could (reasonably) require the distributed
- // clock to have a large backwards jump in time. This makes it so the boot
- // time on the node up the longest will essentially start matching the
- // distributed clock.
- const chrono::nanoseconds offset = -base_offset_matrix_.maxCoeff();
- for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
- base_offset_matrix_(i, 0) += offset;
- }
- }
-
- {
- // Re-compute the samples and setup all the filters so that they
- // subtract this base offset.
-
- size_t i = 1;
- for (std::pair<const std::tuple<const Node *, const Node *>,
- message_bridge::ClippedAverageFilter> &filter : filters_) {
- CHECK(filter.second.sample_pointer() == &sample_matrix_(i, 0));
-
- const Node *const node_a = std::get<0>(filter.first);
- const Node *const node_b = std::get<1>(filter.first);
-
- const size_t node_a_index =
- configuration::GetNodeIndex(configuration(), node_a);
- const size_t node_b_index =
- configuration::GetNodeIndex(configuration(), node_b);
-
- filter.second.set_base_offset(base_offset_matrix_(node_a_index) -
- base_offset_matrix_(node_b_index));
-
- ++i;
- }
- }
-
- // Now, iterate again through all the offsets now that we have set the base
- // offset to something sane. This will seed everything with an accurate
- // initial offset.
- for (std::unique_ptr<State> &state : states_) {
- for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
- TimestampMerger::DeliveryTimestamp timestamp =
- state->OldestTimestampForChannel(i);
- if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
- CHECK(state->MaybeUpdateTimestamp(timestamp, i));
- }
- }
- }
-
for (std::unique_ptr<State> &state : states_) {
state->SeedSortedMessages();
}
+ // Rank of the map matrix tells you if all the nodes are in communication
+ // with each other, which tells you if the offsets are observable.
+ const size_t connected_nodes =
+ Eigen::FullPivLU<
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>(map_matrix_)
+ .rank();
+
+ // We don't need to support isolated nodes until someone has a real use
+ // case.
+ CHECK_EQ(connected_nodes, num_nodes)
+ << ": There is a node which isn't communicating with the rest.";
+
+ // And solve.
UpdateOffsets();
- // We want to start the log file at the last start time of the log files from
- // all the nodes. Compute how long each node's simulation needs to run to
- // move time to this point.
+ // We want to start the log file at the last start time of the log files
+ // from all the nodes. Compute how long each node's simulation needs to run
+ // to move time to this point.
distributed_clock::time_point start_time = distributed_clock::min_time;
+ // TODO(austin): We want an "OnStart" callback for each node rather than
+ // running until the last node.
+
for (std::unique_ptr<State> &state : states_) {
- // Setup the realtime clock to have something sane in it now.
- state->SetRealtimeOffset(state->monotonic_start_time(),
- state->realtime_start_time());
- // And start computing the start time on the distributed clock now that that
- // works.
+ VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
+ << MaybeNodeName(state->event_loop()->node()) << "now "
+ << state->monotonic_now();
+ // And start computing the start time on the distributed clock now that
+ // that works.
start_time = std::max(
start_time, state->ToDistributedClock(state->monotonic_start_time()));
}
- CHECK_GE(start_time, distributed_clock::epoch());
+
+ CHECK_GE(start_time, distributed_clock::epoch())
+ << ": Hmm, we have a node starting before the start of time. Offset "
+ "everything.";
// Forwarding is tracked per channel. If it is enabled, we want to turn it
// off. Otherwise messages replayed will get forwarded across to the other
- // nodes, and also replayed on the other nodes. This may not satisfy all our
- // users, but it'll start the discussion.
+ // nodes, and also replayed on the other nodes. This may not satisfy all
+ // our users, but it'll start the discussion.
if (configuration::MultiNode(event_loop_factory_->configuration())) {
for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
const Channel *channel = logged_configuration()->channels()->Get(i);
@@ -598,34 +795,177 @@
// to timestamps on log files where the timestamp log file starts before the
// data. In this case, it is reasonable to expect missing data.
ignore_missing_data_ = true;
- VLOG(1) << "Running until start time: " << start_time;
+ VLOG(1) << "Running until " << start_time << " in Register";
event_loop_factory_->RunFor(start_time.time_since_epoch());
VLOG(1) << "At start time";
// Now that we are running for real, missing data means that the log file is
// corrupted or went wrong.
ignore_missing_data_ = false;
-}
-void LogReader::UpdateOffsets() {
- // TODO(austin): Evaluate less accurate inverses. We might be able to
- // do some tricks to keep the accuracy up.
- offset_matrix_ = SolveOffsets();
-
- size_t node_index = 0;
for (std::unique_ptr<State> &state : states_) {
- state->SetDistributedOffset(-offset(node_index), 1.0);
- ++node_index;
+ // Make the RT clock be correct before handing it to the user.
+ if (state->realtime_start_time() != realtime_clock::min_time) {
+ state->SetRealtimeOffset(state->monotonic_start_time(),
+ state->realtime_start_time());
+ }
+ VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
+ << MaybeNodeName(state->event_loop()->node()) << "now "
+ << state->monotonic_now();
+ }
+
+ if (FLAGS_timestamps_to_csv) {
+ for (std::pair<const std::tuple<const Node *, const Node *>,
+ std::tuple<message_bridge::NoncausalOffsetEstimator>>
+ &filter : filters_) {
+ const Node *const node_a = std::get<0>(filter.first);
+ const Node *const node_b = std::get<1>(filter.first);
+
+ std::get<0>(filter.second)
+ .SetFirstFwdTime(event_loop_factory_->GetNodeEventLoopFactory(node_a)
+ ->monotonic_now());
+ std::get<0>(filter.second)
+ .SetFirstRevTime(event_loop_factory_->GetNodeEventLoopFactory(node_b)
+ ->monotonic_now());
+ }
}
}
-std::tuple<message_bridge::ClippedAverageFilter *, bool> LogReader::GetFilter(
+void LogReader::UpdateOffsets() {
+ VLOG(2) << "Samples are " << offset_matrix_;
+ VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
+ std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
+ Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
+ "]");
+ VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
+ << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
+
+ size_t node_index = 0;
+ for (std::unique_ptr<State> &state : states_) {
+ state->SetDistributedOffset(offset(node_index), slope(node_index));
+ VLOG(1) << "Offset for node " << node_index << " "
+ << MaybeNodeName(state->event_loop()->node()) << "is "
+ << aos::distributed_clock::time_point(offset(node_index))
+ << " slope " << std::setprecision(9) << std::fixed
+ << slope(node_index);
+ ++node_index;
+ }
+
+ if (VLOG_IS_ON(1)) {
+ LogFit("Offset is");
+ }
+}
+
+void LogReader::LogFit(std::string_view prefix) {
+ for (std::unique_ptr<State> &state : states_) {
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << " now "
+ << state->monotonic_now() << " distributed "
+ << event_loop_factory_->distributed_now();
+ }
+
+ for (std::pair<const std::tuple<const Node *, const Node *>,
+ std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
+ filters_) {
+ message_bridge::NoncausalOffsetEstimator *estimator =
+ &std::get<0>(filter.second);
+
+ if (estimator->a_timestamps().size() == 0 &&
+ estimator->b_timestamps().size() == 0) {
+ continue;
+ }
+
+ if (VLOG_IS_ON(1)) {
+ estimator->LogFit(prefix);
+ }
+
+ const Node *const node_a = std::get<0>(filter.first);
+ const Node *const node_b = std::get<1>(filter.first);
+
+ const size_t node_a_index =
+ configuration::GetNodeIndex(configuration(), node_a);
+ const size_t node_b_index =
+ configuration::GetNodeIndex(configuration(), node_b);
+
+ const double recovered_slope =
+ slope(node_b_index) / slope(node_a_index) - 1.0;
+ const int64_t recovered_offset =
+ offset(node_b_index).count() - offset(node_a_index).count() *
+ slope(node_b_index) /
+ slope(node_a_index);
+
+ VLOG(1) << "Recovered slope " << std::setprecision(20) << recovered_slope
+ << " (error " << recovered_slope - estimator->fit().slope() << ") "
+ << " offset " << std::setprecision(20) << recovered_offset
+ << " (error "
+ << recovered_offset - estimator->fit().offset().count() << ")";
+
+ const aos::distributed_clock::time_point a0 =
+ states_[node_a_index]->ToDistributedClock(
+ std::get<0>(estimator->a_timestamps()[0]));
+ const aos::distributed_clock::time_point a1 =
+ states_[node_a_index]->ToDistributedClock(
+ std::get<0>(estimator->a_timestamps()[1]));
+
+ VLOG(1) << node_a->name()->string_view() << " timestamps()[0] = "
+ << std::get<0>(estimator->a_timestamps()[0]) << " -> " << a0
+ << " distributed -> " << node_b->name()->string_view() << " "
+ << states_[node_b_index]->FromDistributedClock(a0) << " should be "
+ << aos::monotonic_clock::time_point(
+ std::chrono::nanoseconds(static_cast<int64_t>(
+ std::get<0>(estimator->a_timestamps()[0])
+ .time_since_epoch()
+ .count() *
+ (1.0 + estimator->fit().slope()))) +
+ estimator->fit().offset())
+ << ((a0 <= event_loop_factory_->distributed_now())
+ ? ""
+ : " After now, investigate");
+ VLOG(1) << node_a->name()->string_view() << " timestamps()[1] = "
+ << std::get<0>(estimator->a_timestamps()[1]) << " -> " << a1
+ << " distributed -> " << node_b->name()->string_view() << " "
+ << states_[node_b_index]->FromDistributedClock(a1) << " should be "
+ << aos::monotonic_clock::time_point(
+ std::chrono::nanoseconds(static_cast<int64_t>(
+ std::get<0>(estimator->a_timestamps()[1])
+ .time_since_epoch()
+ .count() *
+ (1.0 + estimator->fit().slope()))) +
+ estimator->fit().offset())
+ << ((event_loop_factory_->distributed_now() <= a1)
+ ? ""
+ : " Before now, investigate");
+
+ const aos::distributed_clock::time_point b0 =
+ states_[node_b_index]->ToDistributedClock(
+ std::get<0>(estimator->b_timestamps()[0]));
+ const aos::distributed_clock::time_point b1 =
+ states_[node_b_index]->ToDistributedClock(
+ std::get<0>(estimator->b_timestamps()[1]));
+
+ VLOG(1) << node_b->name()->string_view() << " timestamps()[0] = "
+ << std::get<0>(estimator->b_timestamps()[0]) << " -> " << b0
+ << " distributed -> " << node_a->name()->string_view() << " "
+ << states_[node_a_index]->FromDistributedClock(b0)
+ << ((b0 <= event_loop_factory_->distributed_now())
+ ? ""
+ : " After now, investigate");
+ VLOG(1) << node_b->name()->string_view() << " timestamps()[1] = "
+ << std::get<0>(estimator->b_timestamps()[1]) << " -> " << b1
+ << " distributed -> " << node_a->name()->string_view() << " "
+ << states_[node_a_index]->FromDistributedClock(b1)
+ << ((event_loop_factory_->distributed_now() <= b1)
+ ? ""
+ : " Before now, investigate");
+ }
+}
+
+message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
const Node *node_a, const Node *node_b) {
CHECK_NE(node_a, node_b);
CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
if (node_a > node_b) {
- return std::make_pair(std::get<0>(GetFilter(node_b, node_a)), false);
+ return GetFilter(node_b, node_a);
}
auto tuple = std::make_tuple(node_a, node_b);
@@ -633,53 +973,27 @@
auto it = filters_.find(tuple);
if (it == filters_.end()) {
- auto &x = filters_
- .insert(std::make_pair(
- tuple, message_bridge::ClippedAverageFilter()))
- .first->second;
+ auto &x =
+ filters_
+ .insert(std::make_pair(
+ tuple, std::make_tuple(message_bridge::NoncausalOffsetEstimator(
+ node_a, node_b))))
+ .first->second;
if (FLAGS_timestamps_to_csv) {
- std::string fwd_name =
- absl::StrCat("/tmp/timestamp_", node_a->name()->string_view(), "_",
- node_b->name()->string_view());
- x.SetFwdCsvFileName(fwd_name);
- std::string rev_name =
- absl::StrCat("/tmp/timestamp_", node_b->name()->string_view(), "_",
- node_a->name()->string_view());
- x.SetRevCsvFileName(rev_name);
+ std::get<0>(x).SetFwdCsvFileName(absl::StrCat(
+ "/tmp/timestamp_noncausal_", node_a->name()->string_view(), "_",
+ node_b->name()->string_view()));
+ std::get<0>(x).SetRevCsvFileName(absl::StrCat(
+ "/tmp/timestamp_noncausal_", node_b->name()->string_view(), "_",
+ node_a->name()->string_view()));
}
- return std::make_tuple(&x, true);
+ return &std::get<0>(x);
} else {
- return std::make_tuple(&(it->second), true);
+ return &std::get<0>(it->second);
}
}
-bool LogReader::State::MaybeUpdateTimestamp(
- const TimestampMerger::DeliveryTimestamp &channel_timestamp,
- int channel_index) {
- if (channel_timestamp.monotonic_remote_time == monotonic_clock::min_time) {
- CHECK(std::get<0>(filters_[channel_index]) == nullptr);
- return false;
- }
-
- // Got a forwarding timestamp!
- CHECK(std::get<0>(filters_[channel_index]) != nullptr);
-
- // Call the correct method depending on if we are the forward or reverse
- // direction here.
- if (std::get<1>(filters_[channel_index])) {
- std::get<0>(filters_[channel_index])
- ->FwdSample(channel_timestamp.monotonic_event_time,
- channel_timestamp.monotonic_event_time -
- channel_timestamp.monotonic_remote_time);
- } else {
- std::get<0>(filters_[channel_index])
- ->RevSample(channel_timestamp.monotonic_event_time,
- channel_timestamp.monotonic_event_time -
- channel_timestamp.monotonic_remote_time);
- }
- return true;
-}
void LogReader::Register(EventLoop *event_loop) {
State *state =
@@ -702,10 +1016,8 @@
const Channel *channel =
RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
- std::tuple<message_bridge::ClippedAverageFilter *, bool> filter =
- std::make_tuple(nullptr, false);
-
NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
+ message_bridge::NoncausalOffsetEstimator *filter = nullptr;
if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
@@ -730,31 +1042,48 @@
}
state->set_timer_handler(event_loop->AddTimer([this, state]() {
+ VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
+ << "at " << state->event_loop()->context().monotonic_event_time
+ << " now " << state->monotonic_now();
if (state->OldestMessageTime() == monotonic_clock::max_time) {
--live_nodes_;
- VLOG(1) << "Node down!";
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
if (live_nodes_ == 0) {
event_loop_factory_->Exit();
}
return;
}
- bool update_offsets = false;
TimestampMerger::DeliveryTimestamp channel_timestamp;
int channel_index;
FlatbufferVector<MessageHeader> channel_data =
FlatbufferVector<MessageHeader>::Empty();
- bool dummy_update_time = false;
+ if (VLOG_IS_ON(1)) {
+ LogFit("Offset was");
+ }
+
+ bool update_time;
std::tie(channel_timestamp, channel_index, channel_data) =
- state->PopOldest(&dummy_update_time);
+ state->PopOldest(&update_time);
const monotonic_clock::time_point monotonic_now =
state->event_loop()->context().monotonic_event_time;
- CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
- << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
- << monotonic_now << " trying to send "
- << channel_timestamp.monotonic_event_time << " failure "
- << state->DebugString();
+ if (!FLAGS_skip_order_validation) {
+ CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
+ << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
+ << monotonic_now << " trying to send "
+ << channel_timestamp.monotonic_event_time << " failure "
+ << state->DebugString();
+ } else if (monotonic_now != channel_timestamp.monotonic_event_time) {
+ LOG(WARNING) << "Check failed: monotonic_now == "
+ "channel_timestamp.monotonic_event_time) ("
+ << monotonic_now << " vs. "
+ << channel_timestamp.monotonic_event_time
+ << "): " << FlatbufferToJson(state->event_loop()->node())
+ << " Now " << monotonic_now << " trying to send "
+ << channel_timestamp.monotonic_event_time << " failure "
+ << state->DebugString();
+ }
if (channel_timestamp.monotonic_event_time >
state->monotonic_start_time() ||
@@ -764,17 +1093,39 @@
channel_data.message().data() != nullptr) {
CHECK(channel_data.message().data() != nullptr)
<< ": Got a message without data. Forwarding entry which was "
- "not matched? Use --skip_missing_forwarding_entries to ignore "
+ "not matched? Use --skip_missing_forwarding_entries to "
+ "ignore "
"this.";
- if (state->MaybeUpdateTimestamp(channel_timestamp, channel_index)) {
+ if (update_time) {
// Confirm that the message was sent on the sending node before the
// destination node (this node). As a proxy, do this by making sure
// that time on the source node is past when the message was sent.
- CHECK_LT(channel_timestamp.monotonic_remote_time,
- state->monotonic_remote_now(channel_index));
-
- update_offsets = true;
+ if (!FLAGS_skip_order_validation) {
+ CHECK_LT(channel_timestamp.monotonic_remote_time,
+ state->monotonic_remote_now(channel_index))
+ << state->event_loop()->node()->name()->string_view() << " to "
+ << state->remote_node(channel_index)->name()->string_view()
+ << " " << state->DebugString();
+ } else if (channel_timestamp.monotonic_remote_time >=
+ state->monotonic_remote_now(channel_index)) {
+ LOG(WARNING)
+ << "Check failed: channel_timestamp.monotonic_remote_time < "
+ "state->monotonic_remote_now(channel_index) ("
+ << channel_timestamp.monotonic_remote_time << " vs. "
+ << state->monotonic_remote_now(channel_index) << ") "
+ << state->event_loop()->node()->name()->string_view() << " to "
+ << state->remote_node(channel_index)->name()->string_view()
+ << " currently " << channel_timestamp.monotonic_event_time
+ << " ("
+ << state->ToDistributedClock(
+ channel_timestamp.monotonic_event_time)
+ << ") remote event time "
+ << channel_timestamp.monotonic_remote_time << " ("
+ << state->RemoteToDistributedClock(
+ channel_index, channel_timestamp.monotonic_remote_time)
+ << ") " << state->DebugString();
+ }
if (FLAGS_timestamps_to_csv) {
if (offset_fp_ == nullptr) {
@@ -789,13 +1140,14 @@
std::chrono::duration_cast<std::chrono::duration<double>>(
channel_timestamp.realtime_event_time - first_time_)
.count());
- for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
- fprintf(
- offset_fp_, ", %.9f",
- offset_matrix_(i, 0) +
- std::chrono::duration_cast<std::chrono::duration<double>>(
- base_offset_matrix_(i, 0))
- .count());
+ for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
+ fprintf(offset_fp_, ", %.9f",
+ time_offset_matrix_(i, 0) +
+ time_slope_matrix_(i, 0) *
+ chrono::duration<double>(
+ event_loop_factory_->distributed_now()
+ .time_since_epoch())
+ .count());
}
fprintf(offset_fp_, "\n");
}
@@ -805,21 +1157,26 @@
state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
channel_timestamp.realtime_event_time);
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
+ << channel_timestamp.monotonic_event_time;
+ // TODO(austin): std::move channel_data in and make that efficient in
+ // simulation.
state->Send(channel_index, channel_data.message().data()->Data(),
channel_data.message().data()->size(),
channel_timestamp.monotonic_remote_time,
channel_timestamp.realtime_remote_time,
channel_timestamp.remote_queue_index);
- } else if (state->at_end()) {
+ } else if (state->at_end() && !ignore_missing_data_) {
// We are at the end of the log file and found missing data. Finish
- // reading the rest of the log file and call it quits. We don't want to
- // replay partial data.
+ // reading the rest of the log file and call it quits. We don't want
+ // to replay partial data.
while (state->OldestMessageTime() != monotonic_clock::max_time) {
bool update_time_dummy;
state->PopOldest(&update_time_dummy);
}
+ } else {
+ CHECK(channel_data.message().data() == nullptr) << ": Nullptr";
}
-
} else {
LOG(WARNING)
<< "Not sending data from before the start of the log file. "
@@ -830,22 +1187,100 @@
const monotonic_clock::time_point next_time = state->OldestMessageTime();
if (next_time != monotonic_clock::max_time) {
+ VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+ << "wakeup for " << next_time << "("
+ << state->ToDistributedClock(next_time)
+ << " distributed), now is " << state->monotonic_now();
state->Setup(next_time);
} else {
- // Set a timer up immediately after now to die. If we don't do this, then
- // the senders waiting on the message we just read will never get called.
+ VLOG(1) << MaybeNodeName(state->event_loop()->node())
+ << "No next message, scheduling shutdown";
+ // Set a timer up immediately after now to die. If we don't do this,
+ // then the senders waiting on the message we just read will never get
+ // called.
if (event_loop_factory_ != nullptr) {
state->Setup(monotonic_now + event_loop_factory_->send_delay() +
std::chrono::nanoseconds(1));
}
}
- // Once we make this call, the current time changes. So do everything which
- // involves time before changing it. That especially includes sending the
- // message.
- if (update_offsets) {
+ // Once we make this call, the current time changes. So do everything
+ // which involves time before changing it. That especially includes
+ // sending the message.
+ if (update_time) {
+ VLOG(1) << MaybeNodeName(state->event_loop()->node())
+ << "updating offsets";
+
+ std::vector<aos::monotonic_clock::time_point> before_times;
+ before_times.resize(states_.size());
+ std::transform(states_.begin(), states_.end(), before_times.begin(),
+ [](const std::unique_ptr<State> &state) {
+ return state->monotonic_now();
+ });
+
+ for (size_t i = 0; i < states_.size(); ++i) {
+ VLOG(1) << MaybeNodeName(
+ states_[i]->event_loop()->node())
+ << "before " << states_[i]->monotonic_now();
+ }
+
UpdateOffsets();
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Now is now "
+ << state->monotonic_now();
+
+ for (size_t i = 0; i < states_.size(); ++i) {
+ VLOG(1) << MaybeNodeName(
+ states_[i]->event_loop()->node())
+ << "after " << states_[i]->monotonic_now();
+ }
+
+ // TODO(austin): We should be perfect.
+ const std::chrono::nanoseconds kTolerance{3};
+ if (!FLAGS_skip_order_validation) {
+ CHECK_GE(next_time, state->monotonic_now())
+ << ": Time skipped the next event.";
+
+ for (size_t i = 0; i < states_.size(); ++i) {
+ CHECK_GE(states_[i]->monotonic_now(), before_times[i] - kTolerance)
+ << ": Time changed too much on node "
+ << MaybeNodeName(states_[i]->event_loop()->node());
+ CHECK_LE(states_[i]->monotonic_now(), before_times[i] + kTolerance)
+ << ": Time changed too much on node "
+ << states_[i]->event_loop()->node()->name()->string_view();
+ }
+ } else {
+ if (next_time < state->monotonic_now()) {
+ LOG(WARNING) << "Check failed: next_time >= "
+ "state->monotonic_now() ("
+ << next_time << " vs. " << state->monotonic_now()
+ << "): Time skipped the next event.";
+ }
+ for (size_t i = 0; i < states_.size(); ++i) {
+ if (states_[i]->monotonic_now() >= before_times[i] - kTolerance) {
+ LOG(WARNING) << "Check failed: "
+ "states_[i]->monotonic_now() "
+ ">= before_times[i] - kTolerance ("
+ << states_[i]->monotonic_now() << " vs. "
+ << before_times[i] - kTolerance
+ << ") : Time changed too much on node "
+ << MaybeNodeName(states_[i]->event_loop()->node());
+ }
+ if (states_[i]->monotonic_now() <= before_times[i] + kTolerance) {
+ LOG(WARNING) << "Check failed: "
+ "states_[i]->monotonic_now() "
+ "<= before_times[i] + kTolerance ("
+ << states_[i]->monotonic_now() << " vs. "
+ << before_times[i] - kTolerance
+ << ") : Time changed too much on node "
+ << MaybeNodeName(states_[i]->event_loop()->node());
+ }
+ }
+ }
}
+
+ VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Done sending at "
+ << state->event_loop()->context().monotonic_event_time << " now "
+ << state->monotonic_now();
}));
++live_nodes_;
@@ -942,8 +1377,8 @@
new_name_builder.add_name(name_offset);
new_name_fbb.Finish(new_name_builder.Finish());
const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
- // Retrieve the channel that we want to copy, confirming that it is actually
- // present in base_config.
+ // Retrieve the channel that we want to copy, confirming that it is
+ // actually present in base_config.
const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
base_config, logged_configuration()->channels()->Get(pair.first), "",
nullptr));
@@ -1020,7 +1455,7 @@
void LogReader::State::SetChannel(
size_t channel, std::unique_ptr<RawSender> sender,
- std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
+ message_bridge::NoncausalOffsetEstimator *filter,
NodeEventLoopFactory *channel_target_event_loop_factory) {
channels_[channel] = std::move(sender);
filters_[channel] = filter;
@@ -1034,21 +1469,27 @@
CHECK_GT(sorted_messages_.size(), 0u);
std::tuple<TimestampMerger::DeliveryTimestamp, int,
- FlatbufferVector<MessageHeader>>
+ FlatbufferVector<MessageHeader>,
+ message_bridge::NoncausalOffsetEstimator *>
result = std::move(sorted_messages_.front());
- VLOG(1) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
+ VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
<< std::get<0>(result).monotonic_event_time;
sorted_messages_.pop_front();
SeedSortedMessages();
- *update_time = false;
+ if (std::get<3>(result) != nullptr) {
+ *update_time = std::get<3>(result)->Pop(
+ event_loop_->node(), std::get<0>(result).monotonic_event_time);
+ } else {
+ *update_time = false;
+ }
return std::make_tuple(std::get<0>(result), std::get<1>(result),
std::move(std::get<2>(result)));
}
monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
if (sorted_messages_.size() > 0) {
- VLOG(1) << MaybeNodeName(event_loop_->node()) << "oldest message at "
+ VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
<< std::get<0>(sorted_messages_.front()).monotonic_event_time;
return std::get<0>(sorted_messages_.front()).monotonic_event_time;
}
@@ -1081,11 +1522,26 @@
FlatbufferVector<MessageHeader> channel_data =
FlatbufferVector<MessageHeader>::Empty();
+ message_bridge::NoncausalOffsetEstimator *filter = nullptr;
+
std::tie(channel_timestamp, channel_index, channel_data) =
channel_merger_->PopOldest();
+ // Skip any messages without forwarding information.
+ if (channel_timestamp.monotonic_remote_time != monotonic_clock::min_time) {
+ // Got a forwarding timestamp!
+ filter = filters_[channel_index];
+
+ CHECK(filter != nullptr);
+
+ // Call the correct method depending on if we are the forward or
+ // reverse direction here.
+ filter->Sample(event_loop_->node(),
+ channel_timestamp.monotonic_event_time,
+ channel_timestamp.monotonic_remote_time);
+ }
sorted_messages_.emplace_back(channel_timestamp, channel_index,
- std::move(channel_data));
+ std::move(channel_data), filter);
}
}