Merge "Print out which queue was corrupted"
diff --git a/aos/configuration.cc b/aos/configuration.cc
index c74234e..f466ee0 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -975,5 +975,40 @@
return result;
}
+std::vector<const Node *> TimestampNodes(const Configuration *config,
+ const Node *my_node) {
+ if (!configuration::MultiNode(config)) {
+ CHECK(my_node == nullptr);
+ return std::vector<const Node *>{};
+ }
+
+ std::set<const Node *> timestamp_logger_nodes;
+ for (const Channel *channel : *config->channels()) {
+ if (!configuration::ChannelIsSendableOnNode(channel, my_node)) {
+ continue;
+ }
+ if (!channel->has_destination_nodes()) {
+ continue;
+ }
+ for (const Connection *connection : *channel->destination_nodes()) {
+ const Node *other_node =
+ configuration::GetNode(config, connection->name()->string_view());
+
+ if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+ my_node)) {
+ VLOG(1) << "Timestamps are logged from "
+ << FlatbufferToJson(other_node);
+ timestamp_logger_nodes.insert(other_node);
+ }
+ }
+ }
+
+ std::vector<const Node *> result;
+ for (const Node *node : timestamp_logger_nodes) {
+ result.emplace_back(node);
+ }
+ return result;
+}
+
} // namespace configuration
} // namespace aos
diff --git a/aos/configuration.h b/aos/configuration.h
index fbd9cff..f1bcece 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -135,6 +135,10 @@
std::vector<std::string_view> SourceNodeNames(const Configuration *config,
const Node *my_node);
+// Returns the all nodes that are logging timestamps on our node.
+std::vector<const Node *> TimestampNodes(const Configuration *config,
+ const Node *my_node);
+
// TODO(austin): GetSchema<T>(const Flatbuffer<Configuration> &config);
} // namespace configuration
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 15b4fd9..70dae61 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1153,6 +1153,8 @@
timestamp.realtime_event_time =
realtime_clock::time_point(chrono::nanoseconds(
std::get<2>(oldest_timestamp).message().realtime_sent_time()));
+ timestamp.queue_index =
+ std::get<2>(oldest_timestamp).message().queue_index();
// Consistency check.
CHECK_EQ(timestamp.monotonic_event_time, std::get<0>(oldest_timestamp));
@@ -1255,6 +1257,7 @@
timestamp.realtime_event_time =
realtime_clock::time_point(chrono::nanoseconds(
std::get<2>(oldest_message).message().realtime_sent_time()));
+ timestamp.queue_index = std::get<2>(oldest_message).message().queue_index();
timestamp.remote_queue_index = 0xffffffff;
CHECK_EQ(std::get<0>(oldest_message), timestamp.monotonic_event_time);
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index a3d16f7..0b0c8f5 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -492,6 +492,7 @@
monotonic_clock::time_point monotonic_event_time =
monotonic_clock::min_time;
realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
+ uint32_t queue_index = 0xffffffff;
monotonic_clock::time_point monotonic_remote_time =
monotonic_clock::min_time;
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 2780bd5..19a0229 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -52,38 +52,22 @@
: aos::Fetcher<message_bridge::ServerStatistics>()) {
VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
- // Find all the nodes which are logging timestamps on our node.
- std::set<const Node *> timestamp_logger_nodes;
- for (const Channel *channel : *configuration_->channels()) {
- if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
- continue;
- }
- if (!channel->has_destination_nodes()) {
- continue;
- }
- if (!should_log(channel)) {
- continue;
- }
- for (const Connection *connection : *channel->destination_nodes()) {
- const Node *other_node = configuration::GetNode(
- configuration_, connection->name()->string_view());
-
- if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
- connection, event_loop_->node())) {
- VLOG(1) << "Timestamps are logged from "
- << FlatbufferToJson(other_node);
- timestamp_logger_nodes.insert(other_node);
- }
- }
- }
+ // Find all the nodes which are logging timestamps on our node. This may
+ // over-estimate if should_log is specified.
+ std::vector<const Node *> timestamp_logger_nodes =
+ configuration::TimestampNodes(configuration_, event_loop_->node());
std::map<const Channel *, const Node *> timestamp_logger_channels;
// Now that we have all the nodes accumulated, make remote timestamp loggers
// for them.
for (const Node *node : timestamp_logger_nodes) {
+ // Note: since we are doing a find using the event loop channel, we need to
+ // make sure this channel pointer is part of the event loop configuration,
+ // not configuration_. This only matters when configuration_ !=
+ // event_loop->configuration();
const Channel *channel = configuration::GetChannel(
- configuration_,
+ event_loop->configuration(),
absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
event_loop_->node());
@@ -112,6 +96,9 @@
const Channel *channel = aos::configuration::GetChannel(
event_loop_->configuration(), config_channel->name()->string_view(),
config_channel->type()->string_view(), "", event_loop_->node());
+ CHECK(channel != nullptr)
+ << ": Failed to look up channel "
+ << aos::configuration::CleanedChannelToString(config_channel);
if (!should_log(channel)) {
continue;
}
@@ -168,6 +155,29 @@
fetchers_.emplace_back(std::move(fs));
}
}
+
+ // When we are logging remote timestamps, we need to be able to translate from
+ // the channel index that the event loop uses to the channel index in the
+ // config in the log file.
+ event_loop_to_logged_channel_index_.resize(
+ event_loop->configuration()->channels()->size(), -1);
+ for (size_t event_loop_channel_index = 0;
+ event_loop_channel_index <
+ event_loop->configuration()->channels()->size();
+ ++event_loop_channel_index) {
+ const Channel *event_loop_channel =
+ event_loop->configuration()->channels()->Get(event_loop_channel_index);
+
+ const Channel *logged_channel = aos::configuration::GetChannel(
+ configuration_, event_loop_channel->name()->string_view(),
+ event_loop_channel->type()->string_view(), "",
+ configuration::GetNode(configuration_, event_loop_->node()));
+
+ if (logged_channel != nullptr) {
+ event_loop_to_logged_channel_index_[event_loop_channel_index] =
+ configuration::ChannelIndex(configuration_, logged_channel);
+ }
+ }
}
Logger::~Logger() {
@@ -549,10 +559,18 @@
logger::MessageHeader::Builder message_header_builder(fbb);
+ // TODO(austin): This needs to check the channel_index and confirm
+ // that it should be logged before squirreling away the timestamp to
+ // disk. We don't want to log irrelevant timestamps.
+
// Note: this must match the same order as MessageBridgeServer and
// PackMessage. We want identical headers to have identical
// on-the-wire formats to make comparing them easier.
- message_header_builder.add_channel_index(msg->channel_index());
+
+ // Translate from the channel index that the event loop uses to the
+ // channel index in the log file.
+ message_header_builder.add_channel_index(
+ event_loop_to_logged_channel_index_[msg->channel_index()]);
message_header_builder.add_queue_index(msg->queue_index());
message_header_builder.add_monotonic_sent_time(
@@ -856,6 +874,22 @@
replay_configuration_(replay_configuration) {
MakeRemappedConfig();
+ // Remap all existing remote timestamp channels. They will be recreated, and
+ // the data logged isn't relevant anymore.
+ for (const Node *node : Nodes()) {
+ std::vector<const Node *> timestamp_logger_nodes =
+ configuration::TimestampNodes(logged_configuration(), node);
+ for (const Node *remote_node : timestamp_logger_nodes) {
+ const std::string channel = absl::StrCat(
+ "/aos/remote_timestamps/", remote_node->name()->string_view());
+ CHECK(HasChannel<logger::MessageHeader>(channel, node))
+ << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
+ << logger::MessageHeader::GetFullyQualifiedName() << "\"} for node "
+ << node->name()->string_view();
+ RemapLoggedChannel<logger::MessageHeader>(channel, node);
+ }
+ }
+
if (replay_configuration) {
CHECK_EQ(configuration::MultiNode(configuration()),
configuration::MultiNode(replay_configuration))
@@ -956,10 +990,22 @@
states_[node_index] =
std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
State *state = states_[node_index].get();
-
- Register(state->SetNodeEventLoopFactory(
+ state->set_event_loop(state->SetNodeEventLoopFactory(
event_loop_factory_->GetNodeEventLoopFactory(node)));
+
+ state->SetChannelCount(logged_configuration()->channels()->size());
}
+
+ // Register after making all the State objects so we can build references
+ // between them.
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ const size_t node_index =
+ configuration::GetNodeIndex(configuration(), node);
+ State *state = states_[node_index].get();
+
+ Register(state->event_loop());
+ }
+
if (live_nodes_ == 0) {
LOG(FATAL)
<< "Don't have logs from any of the nodes in the replay config--are "
@@ -1313,29 +1359,46 @@
const bool has_data = state->SetNode();
- state->SetChannelCount(logged_configuration()->channels()->size());
+ for (size_t logged_channel_index = 0;
+ logged_channel_index < logged_configuration()->channels()->size();
+ ++logged_channel_index) {
+ const Channel *channel = RemapChannel(
+ event_loop,
+ logged_configuration()->channels()->Get(logged_channel_index));
- for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
- const Channel *channel =
- RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
-
- NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
+ aos::Sender<MessageHeader> *remote_timestamp_sender = nullptr;
+
+ State *source_state = nullptr;
if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
- const Node *target_node = configuration::GetNode(
+ // We've got a message which is being forwarded to this node.
+ const Node *source_node = configuration::GetNode(
event_loop->configuration(), channel->source_node()->string_view());
- filter = GetFilter(event_loop->node(), target_node);
+ filter = GetFilter(event_loop->node(), source_node);
- if (event_loop_factory_ != nullptr) {
- channel_target_event_loop_factory =
- event_loop_factory_->GetNodeEventLoopFactory(target_node);
+ // Delivery timestamps are supposed to be logged back on the source node.
+ // Configure remote timestamps to be sent.
+ const bool delivery_time_is_logged =
+ configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+ channel, event_loop->node(), source_node);
+
+ source_state =
+ states_[configuration::GetNodeIndex(configuration(), source_node)]
+ .get();
+
+ if (delivery_time_is_logged) {
+ remote_timestamp_sender =
+ source_state->RemoteTimestampSender(event_loop->node());
}
}
- state->SetChannel(i, event_loop->MakeRawSender(channel), filter,
- channel_target_event_loop_factory);
+ state->SetChannel(
+ logged_channel_index,
+ configuration::ChannelIndex(event_loop->configuration(), channel),
+ event_loop->MakeRawSender(channel), filter, remote_timestamp_sender,
+ source_state);
}
// If we didn't find any log files with data in them, we won't ever get a
@@ -1464,10 +1527,7 @@
// TODO(austin): std::move channel_data in and make that efficient in
// simulation.
state->Send(channel_index, channel_data.message().data()->Data(),
- channel_data.message().data()->size(),
- channel_timestamp.monotonic_remote_time,
- channel_timestamp.realtime_remote_time,
- channel_timestamp.remote_queue_index);
+ channel_data.message().data()->size(), channel_timestamp);
} else if (state->at_end() && !ignore_missing_data_) {
// We are at the end of the log file and found missing data. Finish
// reading the rest of the log file and call it quits. We don't want
@@ -1829,18 +1889,148 @@
void LogReader::State::SetChannelCount(size_t count) {
channels_.resize(count);
+ remote_timestamp_senders_.resize(count);
filters_.resize(count);
- channel_target_event_loop_factory_.resize(count);
+ channel_source_state_.resize(count);
+ factory_channel_index_.resize(count);
+ queue_index_map_.resize(count);
}
void LogReader::State::SetChannel(
- size_t channel, std::unique_ptr<RawSender> sender,
+ size_t logged_channel_index, size_t factory_channel_index,
+ std::unique_ptr<RawSender> sender,
message_bridge::NoncausalOffsetEstimator *filter,
- NodeEventLoopFactory *channel_target_event_loop_factory) {
- channels_[channel] = std::move(sender);
- filters_[channel] = filter;
- channel_target_event_loop_factory_[channel] =
- channel_target_event_loop_factory;
+ aos::Sender<MessageHeader> *remote_timestamp_sender, State *source_state) {
+ channels_[logged_channel_index] = std::move(sender);
+ filters_[logged_channel_index] = filter;
+ remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
+
+ if (source_state) {
+ channel_source_state_[logged_channel_index] = source_state;
+
+ if (remote_timestamp_sender != nullptr) {
+ source_state->queue_index_map_[logged_channel_index] =
+ std::make_unique<std::vector<State::SentTimestamp>>();
+ }
+ }
+
+ factory_channel_index_[logged_channel_index] = factory_channel_index;
+}
+
+bool LogReader::State::Send(
+ size_t channel_index, const void *data, size_t size,
+ const TimestampMerger::DeliveryTimestamp &delivery_timestamp) {
+ aos::RawSender *sender = channels_[channel_index].get();
+ uint32_t remote_queue_index = 0xffffffff;
+
+ if (remote_timestamp_senders_[channel_index] != nullptr) {
+ std::vector<SentTimestamp> *queue_index_map =
+ CHECK_NOTNULL(CHECK_NOTNULL(channel_source_state_[channel_index])
+ ->queue_index_map_[channel_index]
+ .get());
+
+ SentTimestamp search;
+ search.monotonic_event_time = delivery_timestamp.monotonic_remote_time;
+ search.realtime_event_time = delivery_timestamp.realtime_remote_time;
+ search.queue_index = delivery_timestamp.remote_queue_index;
+
+ // Find the sent time if available.
+ auto element = std::lower_bound(
+ queue_index_map->begin(), queue_index_map->end(), search,
+ [](SentTimestamp a, SentTimestamp b) {
+ if (b.monotonic_event_time < a.monotonic_event_time) {
+ return false;
+ }
+ if (b.monotonic_event_time > a.monotonic_event_time) {
+ return true;
+ }
+
+ if (b.queue_index < a.queue_index) {
+ return false;
+ }
+ if (b.queue_index > a.queue_index) {
+ return true;
+ }
+
+ CHECK_EQ(a.realtime_event_time, b.realtime_event_time);
+ return false;
+ });
+
+ // TODO(austin): Be a bit more principled here, but we will want to do that
+ // after the logger rewrite. We hit this when one node finishes, but the
+ // other node isn't done yet. So there is no send time, but there is a
+ // receive time.
+ if (element != queue_index_map->end()) {
+ CHECK_EQ(element->monotonic_event_time,
+ delivery_timestamp.monotonic_remote_time);
+ CHECK_EQ(element->realtime_event_time,
+ delivery_timestamp.realtime_remote_time);
+ CHECK_EQ(element->queue_index, delivery_timestamp.remote_queue_index);
+
+ remote_queue_index = element->actual_queue_index;
+ }
+ }
+
+ // Send! Use the replayed queue index here instead of the logged queue index
+ // for the remote queue index. This makes re-logging work.
+ const bool sent =
+ sender->Send(data, size, delivery_timestamp.monotonic_remote_time,
+ delivery_timestamp.realtime_remote_time, remote_queue_index);
+ if (!sent) return false;
+
+ if (queue_index_map_[channel_index]) {
+ SentTimestamp timestamp;
+ timestamp.monotonic_event_time = delivery_timestamp.monotonic_event_time;
+ timestamp.realtime_event_time = delivery_timestamp.realtime_event_time;
+ timestamp.queue_index = delivery_timestamp.queue_index;
+ timestamp.actual_queue_index = sender->sent_queue_index();
+ queue_index_map_[channel_index]->emplace_back(timestamp);
+ } else if (remote_timestamp_senders_[channel_index] != nullptr) {
+ aos::Sender<MessageHeader>::Builder builder =
+ remote_timestamp_senders_[channel_index]->MakeBuilder();
+
+ logger::MessageHeader::Builder message_header_builder =
+ builder.MakeBuilder<logger::MessageHeader>();
+
+ message_header_builder.add_channel_index(
+ factory_channel_index_[channel_index]);
+
+ // Swap the remote and sent metrics. They are from the sender's
+ // perspective, not the receiver's perspective.
+ message_header_builder.add_monotonic_sent_time(
+ sender->monotonic_sent_time().time_since_epoch().count());
+ message_header_builder.add_realtime_sent_time(
+ sender->realtime_sent_time().time_since_epoch().count());
+ message_header_builder.add_queue_index(sender->sent_queue_index());
+
+ message_header_builder.add_monotonic_remote_time(
+ delivery_timestamp.monotonic_remote_time.time_since_epoch().count());
+ message_header_builder.add_realtime_remote_time(
+ delivery_timestamp.realtime_remote_time.time_since_epoch().count());
+
+ message_header_builder.add_remote_queue_index(remote_queue_index);
+
+ builder.Send(message_header_builder.Finish());
+ }
+
+ return true;
+}
+
+aos::Sender<MessageHeader> *LogReader::State::RemoteTimestampSender(
+ const Node *delivered_node) {
+ auto sender = remote_timestamp_senders_map_.find(delivered_node);
+
+ if (sender == remote_timestamp_senders_map_.end()) {
+ sender = remote_timestamp_senders_map_
+ .emplace(std::make_pair(
+ delivered_node,
+ event_loop()->MakeSender<MessageHeader>(
+ absl::StrCat("/aos/remote_timestamps/",
+ delivered_node->name()->string_view()))))
+ .first;
+ }
+
+ return &(sender->second);
}
std::tuple<TimestampMerger::DeliveryTimestamp, int,
@@ -1929,6 +2119,7 @@
for (size_t i = 0; i < channels_.size(); ++i) {
channels_[i].reset();
}
+ remote_timestamp_senders_map_.clear();
event_loop_unique_ptr_.reset();
event_loop_ = nullptr;
timer_handler_ = nullptr;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 161300c..dd10f38 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -110,6 +110,7 @@
std::unique_ptr<RawFetcher> fetcher;
bool written = false;
+ // Channel index to log to.
int channel_index = -1;
const Channel *channel = nullptr;
const Node *timestamp_node = nullptr;
@@ -130,6 +131,10 @@
int node_index = 0;
};
+ // Vector mapping from the channel index from the event loop to the logged
+ // channel index.
+ std::vector<int> event_loop_to_logged_channel_index_;
+
struct NodeState {
aos::monotonic_clock::time_point monotonic_start_time =
aos::monotonic_clock::min_time;
@@ -460,6 +465,11 @@
}
}
+ // Returns the MessageHeader sender to log delivery timestamps to for the
+ // provided remote node.
+ aos::Sender<MessageHeader> *RemoteTimestampSender(
+ const Node *delivered_node);
+
// Converts a timestamp from the monotonic clock on this node to the
// distributed clock.
distributed_clock::time_point ToDistributedClock(
@@ -482,17 +492,19 @@
// Returns the current time on the remote node which sends messages on
// channel_index.
monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
- return channel_target_event_loop_factory_[channel_index]->monotonic_now();
+ return channel_source_state_[channel_index]
+ ->node_event_loop_factory_->monotonic_now();
}
distributed_clock::time_point RemoteToDistributedClock(
size_t channel_index, monotonic_clock::time_point time) {
- return channel_target_event_loop_factory_[channel_index]
- ->ToDistributedClock(time);
+ return channel_source_state_[channel_index]
+ ->node_event_loop_factory_->ToDistributedClock(time);
}
const Node *remote_node(size_t channel_index) {
- return channel_target_event_loop_factory_[channel_index]->node();
+ return channel_source_state_[channel_index]
+ ->node_event_loop_factory_->node();
}
monotonic_clock::time_point monotonic_now() {
@@ -507,9 +519,11 @@
void SetChannelCount(size_t count);
// Sets the sender, filter, and target factory for a channel.
- void SetChannel(size_t channel, std::unique_ptr<RawSender> sender,
+ void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
+ std::unique_ptr<RawSender> sender,
message_bridge::NoncausalOffsetEstimator *filter,
- NodeEventLoopFactory *channel_target_event_loop_factory);
+ aos::Sender<MessageHeader> *remote_timestamp_sender,
+ State *source_state);
// Returns if we have read all the messages from all the logs.
bool at_end() const { return channel_merger_->at_end(); }
@@ -529,13 +543,7 @@
// Sends a buffer on the provided channel index.
bool Send(size_t channel_index, const void *data, size_t size,
- aos::monotonic_clock::time_point monotonic_remote_time,
- aos::realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index) {
- return channels_[channel_index]->Send(data, size, monotonic_remote_time,
- realtime_remote_time,
- remote_queue_index);
- }
+ const TimestampMerger::DeliveryTimestamp &delivery_timestamp);
// Returns a debug string for the channel merger.
std::string DebugString() const {
@@ -568,6 +576,28 @@
// Senders.
std::vector<std::unique_ptr<RawSender>> channels_;
+ std::vector<aos::Sender<MessageHeader> *> remote_timestamp_senders_;
+ // The mapping from logged channel index to sent channel index. Needed for
+ // sending out MessageHeaders.
+ std::vector<int> factory_channel_index_;
+
+ struct SentTimestamp {
+ monotonic_clock::time_point monotonic_event_time =
+ monotonic_clock::min_time;
+ realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
+ uint32_t queue_index = 0xffffffff;
+
+ // The queue index that this message *actually* was sent with.
+ uint32_t actual_queue_index = 0xffffffff;
+ };
+
+ // Stores all the timestamps that have been sent on this channel. This is
+ // only done for channels which are forwarded and on the node which
+ // initially sends the message.
+ //
+ // TODO(austin): This whole concept is a hack. We should be able to
+ // associate state with the message as it gets sorted and recover it.
+ std::vector<std::unique_ptr<std::vector<SentTimestamp>>> queue_index_map_;
// Factory (if we are in sim) that this loop was created on.
NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
@@ -585,7 +615,10 @@
// List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
// channel) which correspond to the originating node.
- std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory_;
+ std::vector<State *> channel_source_state_;
+
+ std::map<const Node *, aos::Sender<MessageHeader>>
+ remote_timestamp_senders_map_;
};
// Node index -> State.
diff --git a/aos/events/logging/logger_math.cc b/aos/events/logging/logger_math.cc
index c333f55..ea360cc 100644
--- a/aos/events/logging/logger_math.cc
+++ b/aos/events/logging/logger_math.cc
@@ -167,13 +167,64 @@
return std::make_tuple(
Eigen::Matrix<double, Eigen::Dynamic, 1>::Ones(nodes_count()),
Eigen::Matrix<double, Eigen::Dynamic, 1>::Zero(nodes_count()));
- } else {
- // TODO(austin): Solve just the nodes we know about. This is harder and
- // there are no logs which require this yet to test on.
- CHECK_EQ(cached_valid_node_count_, nodes_count())
- << ": TODO(austin): Handle partial valid nodes";
-
+ } else if (cached_valid_node_count_ == nodes_count()) {
return Solve(mpq_map, mpq_offsets);
+ } else {
+ // Strip out any columns (nodes) which aren't relevant. Solve the
+ // simplified problem, then set any nodes which were missing back to slope
+ // 1, offset 0 (ie the distributed clock).
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>
+ valid_node_mpq_map =
+ Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+ nonzero_offset_count, cached_valid_node_count_);
+
+ {
+ // Only copy over the columns with valid nodes in them.
+ size_t column = 0;
+ for (size_t i = 0; i < valid_nodes.size(); ++i) {
+ if (valid_nodes[i]) {
+ valid_node_mpq_map.col(column) = mpq_map.col(i);
+
+ ++column;
+ }
+ }
+ // The 1/n needs to be based on the number of nodes being solved.
+ // Recreate it here.
+ for (int j = 0; j < valid_node_mpq_map.cols(); ++j) {
+ valid_node_mpq_map(0, j) = mpq_class(1, cached_valid_node_count_);
+ }
+ }
+
+ VLOG(1) << "Reduced node filtered map "
+ << ToDouble(valid_node_mpq_map).format(HeavyFmt);
+ VLOG(1) << "Reduced node filtered offsets "
+ << ToDouble(mpq_offsets).format(HeavyFmt);
+
+ // Solve the simplified problem now.
+ std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+ valid_result = Solve(valid_node_mpq_map, mpq_offsets);
+
+ // And expand the results back into a solution matrix.
+ std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+ Eigen::Matrix<double, Eigen::Dynamic, 1>>
+ result = std::make_tuple(
+ Eigen::Matrix<double, Eigen::Dynamic, 1>::Ones(nodes_count()),
+ Eigen::Matrix<double, Eigen::Dynamic, 1>::Zero(nodes_count()));
+
+ {
+ size_t column = 0;
+ for (size_t i = 0; i < valid_nodes.size(); ++i) {
+ if (valid_nodes[i]) {
+ std::get<0>(result)(i) = std::get<0>(valid_result)(column);
+ std::get<1>(result)(i) = std::get<1>(valid_result)(column);
+
+ ++column;
+ }
+ }
+ }
+
+ return result;
}
} else {
const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 108d4de..51994e3 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -5,6 +5,7 @@
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/simulated_event_loop.h"
+#include "aos/network/timestamp_generated.h"
#include "aos/util/file.h"
#include "glog/logging.h"
#include "gmock/gmock.h"
@@ -412,16 +413,24 @@
std::unique_ptr<Logger> logger;
};
- LoggerState MakeLogger(const Node *node) {
- return {event_loop_factory_.MakeEventLoop("logger", node), {}};
+ LoggerState MakeLogger(const Node *node,
+ SimulatedEventLoopFactory *factory = nullptr) {
+ if (factory == nullptr) {
+ factory = &event_loop_factory_;
+ }
+ return {factory->MakeEventLoop("logger", node), {}};
}
- void StartLogger(LoggerState *logger) {
+ void StartLogger(LoggerState *logger, std::string logfile_base = "") {
+ if (logfile_base.empty()) {
+ logfile_base = logfile_base_;
+ }
+
logger->logger = std::make_unique<Logger>(logger->event_loop.get());
logger->logger->set_polling_period(std::chrono::milliseconds(100));
- logger->event_loop->OnRun([this, logger]() {
+ logger->event_loop->OnRun([logger, logfile_base]() {
logger->logger->StartLogging(std::make_unique<MultiNodeLogNamer>(
- logfile_base_, logger->event_loop->configuration(),
+ logfile_base, logger->event_loop->configuration(),
logger->event_loop->node()));
});
}
@@ -656,7 +665,7 @@
LogReader reader(structured_logfiles_);
- SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
// This sends out the fetched messages and advances time to the start of the
@@ -731,7 +740,7 @@
++pi2_ping_count;
});
- constexpr ssize_t kQueueIndexOffset = 0;
+ constexpr ssize_t kQueueIndexOffset = -9;
// Confirm that the ping and pong counts both match, and the value also
// matches.
pi1_event_loop->MakeWatcher(
@@ -771,7 +780,7 @@
<< pi2_event_loop->context().monotonic_event_time;
EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
- pi2_pong_count + kQueueIndexOffset - 9);
+ pi2_pong_count + kQueueIndexOffset);
EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
chrono::microseconds(200) +
@@ -854,7 +863,7 @@
LogReader reader(structured_logfiles_);
- SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
// This sends out the fetched messages and advances time to the start of the
@@ -994,7 +1003,7 @@
LogReader reader(structured_logfiles_);
- SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
const Node *pi1 =
@@ -1216,6 +1225,213 @@
reader.Deregister();
}
+// Tests that we properly recreate forwarded timestamps when replaying a log.
+// This should be enough that we can then re-run the logger and get a valid log
+// back.
+TEST_F(MultinodeLoggerTest, MessageHeader) {
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+
+ LogReader reader(structured_logfiles_);
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+ // This sends out the fetched messages and advances time to the start of the
+ // log file.
+ reader.Register(&log_reader_factory);
+
+ const Node *pi1 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi1");
+ const Node *pi2 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+ LOG(INFO) << "now pi1 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+ LOG(INFO) << "now pi2 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
+ EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(pi1, pi2));
+
+ reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
+
+ std::unique_ptr<EventLoop> pi1_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi1);
+ std::unique_ptr<EventLoop> pi2_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi2);
+
+ MessageCounter<MessageHeader> pi1_original_message_header_counter(
+ pi1_event_loop.get(), "/original/aos/remote_timestamps/pi2");
+ MessageCounter<MessageHeader> pi2_original_message_header_counter(
+ pi2_event_loop.get(), "/original/aos/remote_timestamps/pi1");
+
+ aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
+ pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
+ aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
+ pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
+
+ aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
+ pi1_event_loop->MakeFetcher<examples::Ping>("/test");
+ aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
+ pi2_event_loop->MakeFetcher<examples::Ping>("/test");
+
+ aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
+ pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
+ aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
+ pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
+
+ aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
+ pi2_event_loop->MakeFetcher<examples::Pong>("/test");
+ aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
+ pi1_event_loop->MakeFetcher<examples::Pong>("/test");
+
+ const size_t pi1_timestamp_channel = configuration::ChannelIndex(
+ pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
+ const size_t ping_timestamp_channel = configuration::ChannelIndex(
+ pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
+
+ const size_t pi2_timestamp_channel = configuration::ChannelIndex(
+ pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
+ const size_t pong_timestamp_channel = configuration::ChannelIndex(
+ pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
+
+ pi1_event_loop->MakeWatcher(
+ "/aos/remote_timestamps/pi2",
+ [&pi1_event_loop, pi1_timestamp_channel, ping_timestamp_channel,
+ &pi1_timestamp_on_pi1_fetcher, &pi1_timestamp_on_pi2_fetcher,
+ &ping_on_pi1_fetcher,
+ &ping_on_pi2_fetcher](const MessageHeader &header) {
+ const aos::monotonic_clock::time_point header_monotonic_sent_time(
+ chrono::nanoseconds(header.monotonic_sent_time()));
+ const aos::realtime_clock::time_point header_realtime_sent_time(
+ chrono::nanoseconds(header.realtime_sent_time()));
+ const aos::monotonic_clock::time_point header_monotonic_remote_time(
+ chrono::nanoseconds(header.monotonic_remote_time()));
+ const aos::realtime_clock::time_point header_realtime_remote_time(
+ chrono::nanoseconds(header.realtime_remote_time()));
+
+ const Context *pi1_context = nullptr;
+ const Context *pi2_context = nullptr;
+
+ if (header.channel_index() == pi1_timestamp_channel) {
+ ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
+ ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
+ pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
+ pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
+ } else if (header.channel_index() == ping_timestamp_channel) {
+ ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
+ ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
+ pi1_context = &ping_on_pi1_fetcher.context();
+ pi2_context = &ping_on_pi2_fetcher.context();
+ } else {
+ LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
+ << configuration::CleanedChannelToString(
+ pi1_event_loop->configuration()->channels()->Get(
+ header.channel_index()));
+ }
+
+ EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
+ EXPECT_EQ(pi2_context->remote_queue_index, header.remote_queue_index());
+ EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+
+ EXPECT_EQ(pi2_context->monotonic_event_time,
+ header_monotonic_sent_time);
+ EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
+ EXPECT_EQ(pi2_context->realtime_remote_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi2_context->monotonic_remote_time,
+ header_monotonic_remote_time);
+
+ EXPECT_EQ(pi1_context->realtime_event_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi1_context->monotonic_event_time,
+ header_monotonic_remote_time);
+ });
+ pi2_event_loop->MakeWatcher(
+ "/aos/remote_timestamps/pi1",
+ [&pi2_event_loop, pi2_timestamp_channel, pong_timestamp_channel,
+ &pi2_timestamp_on_pi2_fetcher, &pi2_timestamp_on_pi1_fetcher,
+ &pong_on_pi2_fetcher,
+ &pong_on_pi1_fetcher](const MessageHeader &header) {
+ const aos::monotonic_clock::time_point header_monotonic_sent_time(
+ chrono::nanoseconds(header.monotonic_sent_time()));
+ const aos::realtime_clock::time_point header_realtime_sent_time(
+ chrono::nanoseconds(header.realtime_sent_time()));
+ const aos::monotonic_clock::time_point header_monotonic_remote_time(
+ chrono::nanoseconds(header.monotonic_remote_time()));
+ const aos::realtime_clock::time_point header_realtime_remote_time(
+ chrono::nanoseconds(header.realtime_remote_time()));
+
+ const Context *pi2_context = nullptr;
+ const Context *pi1_context = nullptr;
+
+ if (header.channel_index() == pi2_timestamp_channel) {
+ ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
+ ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
+ pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
+ pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
+ } else if (header.channel_index() == pong_timestamp_channel) {
+ ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
+ ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
+ pi2_context = &pong_on_pi2_fetcher.context();
+ pi1_context = &pong_on_pi1_fetcher.context();
+ } else {
+ LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
+ << configuration::CleanedChannelToString(
+ pi2_event_loop->configuration()->channels()->Get(
+ header.channel_index()));
+ }
+
+ EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
+ EXPECT_EQ(pi1_context->remote_queue_index, header.remote_queue_index());
+ EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+
+ EXPECT_EQ(pi1_context->monotonic_event_time,
+ header_monotonic_sent_time);
+ EXPECT_EQ(pi1_context->realtime_event_time, header_realtime_sent_time);
+ EXPECT_EQ(pi1_context->realtime_remote_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi1_context->monotonic_remote_time,
+ header_monotonic_remote_time);
+
+ EXPECT_EQ(pi2_context->realtime_event_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi2_context->monotonic_event_time,
+ header_monotonic_remote_time);
+ });
+
+ // And confirm we can re-create a log again, while checking the contents.
+ {
+ LoggerState pi1_logger = MakeLogger(
+ configuration::GetNode(log_reader_factory.configuration(), pi1_),
+ &log_reader_factory);
+ LoggerState pi2_logger = MakeLogger(
+ configuration::GetNode(log_reader_factory.configuration(), pi2_),
+ &log_reader_factory);
+
+ StartLogger(&pi1_logger, "relogged");
+ StartLogger(&pi2_logger, "relogged");
+
+ log_reader_factory.Run();
+ }
+
+ EXPECT_EQ(pi2_original_message_header_counter.count(), 0u);
+ EXPECT_EQ(pi1_original_message_header_counter.count(), 0u);
+
+ reader.Deregister();
+}
+
// TODO(austin): We can write a test which recreates a logfile and confirms that
// we get it back. That is the ultimate test.
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 81937ca..3923297 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -518,6 +518,7 @@
// Confirm the forwarded message has matching timestamps to the
// timestamps we got back.
EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi2_context->remote_queue_index, header.remote_queue_index());
EXPECT_EQ(pi2_context->monotonic_event_time,
header_monotonic_sent_time);
EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
@@ -527,7 +528,7 @@
header_monotonic_remote_time);
// Confirm the forwarded message also matches the source message.
- EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
EXPECT_EQ(pi1_context->monotonic_event_time,
header_monotonic_remote_time);
EXPECT_EQ(pi1_context->realtime_event_time,