Sort messages between nodes properly
We used to assume the realtime clocks were in sync. This isn't
realistic. Use the timestamps on forwarded messages in each
direction to observe the network latency and the offset between nodes.
Since time is no longer exactly linear with all the adjustments, we need
to redo how events are scheduled. They can't be converted to the
distributed_clock once. They need to now be converted every time they
are compared between nodes.
Change-Id: I1888c1e6a12f475c321a73aa020b0dc0bab107b3
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e0350bb..34dbd24 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -1,15 +1,19 @@
#ifndef AOS_EVENTS_LOGGER_H_
#define AOS_EVENTS_LOGGER_H_
+#include <chrono>
#include <deque>
#include <string_view>
#include <vector>
+#include "Eigen/Dense"
+#include "absl/strings/str_cat.h"
#include "absl/types/span.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/events/simulated_event_loop.h"
+#include "aos/network/timestamp_filter.h"
#include "aos/time/time.h"
#include "flatbuffers/flatbuffers.h"
@@ -174,6 +178,7 @@
data_writers_;
};
+
// Logs all channels available in the event loop to disk every 100 ms.
// Start by logging one message per channel to capture any state and
// configuration that is sent rately on a channel and would affect execution.
@@ -359,6 +364,8 @@
// the header is likely distracting.
FlatbufferVector<LogFileHeader> log_file_header_;
+ Eigen::Matrix<double, Eigen::Dynamic, 1> SolveOffsets();
+
// State per node.
struct State {
// Log file.
@@ -373,10 +380,68 @@
EventLoop *event_loop = nullptr;
// And timer used to send messages.
TimerHandler *timer_handler;
+
+ // Updates the timestamp filter with the timestamp. Returns true if the
+ // provided timestamp was actually a forwarding timestamp and used, and
+ // false otherwise.
+ bool MaybeUpdateTimestamp(
+ const TimestampMerger::DeliveryTimestamp &channel_timestamp,
+ int channel_index);
+
+ // Filters (or nullptr if it isn't a forwarded channel) for each channel.
+ // This corresponds to the object which is shared among all the channels
+ // going between 2 nodes. The second element in the tuple indicates if this
+ // is the primary direction or not.
+ std::vector<std::tuple<message_bridge::ClippedAverageFilter *, bool>>
+ filters;
+
+ // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
+ // channel) which correspond to the originating node.
+ std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory;
};
- // Map of nodes to States used to hold all the state for all the nodes.
- std::map<const Node *, State> channel_mergers_;
+ // Node index -> State.
+ std::vector<std::unique_ptr<State>> states_;
+
+ // Creates the requested filter if it doesn't exist, regardless of whether
+ // these nodes can actually communicate directly. The second return value
+ // reports if this is the primary direction or not.
+ std::tuple<message_bridge::ClippedAverageFilter *, bool> GetFilter(
+ const Node *node_a, const Node *node_b);
+
+ // FILE to write offsets to (if populated).
+ FILE *offset_fp_ = nullptr;
+ // Timestamp of the first piece of data used for the horizontal axis on the
+ // plot.
+ aos::realtime_clock::time_point first_time_;
+
+ // List of filters for a connection. The pointer to the first node will be
+ // less than the second node.
+ std::map<std::tuple<const Node *, const Node *>,
+ message_bridge::ClippedAverageFilter>
+ filters_;
+
+ // Returns the offset from the monotonic clock for a node to the distributed
+ // clock. distributed = monotonic + offset;
+ std::chrono::nanoseconds offset(int node_index) const {
+ return -std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(offset_matrix_(node_index))) -
+ base_offset_matrix_(node_index);
+ }
+
+ // Updates the offset matrix solution and sets the per-node distributed
+ // offsets in the factory.
+ void UpdateOffsets();
+
+ // sample_matrix_ = map_matrix_ * offset_matrix_
+ Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
+ Eigen::Matrix<double, Eigen::Dynamic, 1> sample_matrix_;
+ Eigen::Matrix<double, Eigen::Dynamic, 1> offset_matrix_;
+
+ // Base offsets. The actual offset is the sum of this and the offset matrix.
+ // This removes some of the dynamic range challenges from the double above.
+ Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>
+ base_offset_matrix_;
std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
remapped_configuration_buffer_;