blob: 2f6f9312c96a13f70f9d27dde30c1010ff77b631 [file] [log] [blame]
Austin Schuhe309d2a2019-11-29 13:25:21 -08001#ifndef AOS_EVENTS_LOGGER_H_
2#define AOS_EVENTS_LOGGER_H_
3
Austin Schuh8bd96322020-02-13 21:18:22 -08004#include <chrono>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <deque>
Austin Schuh05b70472020-01-01 17:11:17 -08006#include <string_view>
Austin Schuh2f8fd752020-09-01 22:38:28 -07007#include <tuple>
Austin Schuh6f3babe2020-01-26 20:34:50 -08008#include <vector>
Austin Schuhe309d2a2019-11-29 13:25:21 -08009
Austin Schuh8bd96322020-02-13 21:18:22 -080010#include "Eigen/Dense"
11#include "absl/strings/str_cat.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "absl/types/span.h"
13#include "aos/events/event_loop.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070014#include "aos/events/logging/eigen_mpq.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070015#include "aos/events/logging/log_namer.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "aos/events/logging/logfile_utils.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080017#include "aos/events/logging/logger_generated.h"
Austin Schuh64fab802020-09-09 22:47:47 -070018#include "aos/events/logging/uuid.h"
Austin Schuh92547522019-12-28 14:33:43 -080019#include "aos/events/simulated_event_loop.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070020#include "aos/network/message_bridge_server_generated.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080021#include "aos/network/timestamp_filter.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080022#include "aos/time/time.h"
23#include "flatbuffers/flatbuffers.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070024#include "third_party/gmp/gmpxx.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080025
26namespace aos {
27namespace logger {
28
Austin Schuhe309d2a2019-11-29 13:25:21 -080029// Logs all channels available in the event loop to disk every 100 ms.
30// Start by logging one message per channel to capture any state and
31// configuration that is sent rately on a channel and would affect execution.
32class Logger {
33 public:
Austin Schuh0c297012020-09-16 18:41:59 -070034 // Constructs a logger.
35 // base_name/log_namer: Object used to write data to disk in one or more log
36 // files. If a base_name is passed in, a LocalLogNamer is wrapped
37 // around it.
38 // event_loop: The event loop used to read the messages.
39 // polling_period: The period used to poll the data.
40 // configuration: When provided, this is the configuration to log, and the
41 // configuration to use for the channel list to log. If not provided,
42 // this becomes the configuration from the event loop.
Austin Schuh2f8fd752020-09-01 22:38:28 -070043 Logger(std::string_view base_name, EventLoop *event_loop,
Austin Schuhe309d2a2019-11-29 13:25:21 -080044 std::chrono::milliseconds polling_period =
45 std::chrono::milliseconds(100));
Austin Schuh0c297012020-09-16 18:41:59 -070046 Logger(std::string_view base_name, EventLoop *event_loop,
47 const Configuration *configuration,
48 std::chrono::milliseconds polling_period =
49 std::chrono::milliseconds(100));
Austin Schuh6f3babe2020-01-26 20:34:50 -080050 Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
51 std::chrono::milliseconds polling_period =
52 std::chrono::milliseconds(100));
Austin Schuh0c297012020-09-16 18:41:59 -070053 Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
54 const Configuration *configuration,
55 std::chrono::milliseconds polling_period =
56 std::chrono::milliseconds(100));
57 ~Logger();
58
59 // Overrides the name in the log file header.
60 void set_name(std::string_view name) { name_ = name; }
Austin Schuhe309d2a2019-11-29 13:25:21 -080061
Austin Schuh2f8fd752020-09-01 22:38:28 -070062 // Rotates the log file(s), triggering new part files to be written for each
63 // log file.
64 void Rotate();
Austin Schuhfa895892020-01-07 20:07:41 -080065
Austin Schuhe309d2a2019-11-29 13:25:21 -080066 private:
Austin Schuhfa895892020-01-07 20:07:41 -080067 void WriteHeader();
Austin Schuh2f8fd752020-09-01 22:38:28 -070068 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
69 const Node *node);
70
71 bool MaybeUpdateTimestamp(
72 const Node *node, int node_index,
73 aos::monotonic_clock::time_point monotonic_start_time,
74 aos::realtime_clock::time_point realtime_start_time);
Austin Schuhfa895892020-01-07 20:07:41 -080075
Austin Schuhe309d2a2019-11-29 13:25:21 -080076 void DoLogData();
77
Austin Schuh2f8fd752020-09-01 22:38:28 -070078 void WriteMissingTimestamps();
79
80 void StartLogging();
81
82 // Fetches from each channel until all the data is logged.
83 void LogUntil(monotonic_clock::time_point t);
84
Austin Schuhe309d2a2019-11-29 13:25:21 -080085 EventLoop *event_loop_;
Austin Schuh64fab802020-09-09 22:47:47 -070086 const UUID uuid_;
Austin Schuh6f3babe2020-01-26 20:34:50 -080087 std::unique_ptr<LogNamer> log_namer_;
Austin Schuhe309d2a2019-11-29 13:25:21 -080088
Austin Schuh0c297012020-09-16 18:41:59 -070089 // The configuration to place at the top of the log file.
90 const Configuration *configuration_;
91
92 // Name to save in the log file. Defaults to hostname.
93 std::string name_;
94
Austin Schuhe309d2a2019-11-29 13:25:21 -080095 // Structure to track both a fetcher, and if the data fetched has been
96 // written. We may want to delay writing data to disk so that we don't let
97 // data get too far out of order when written to disk so we can avoid making
98 // it too hard to sort when reading.
99 struct FetcherStruct {
100 std::unique_ptr<RawFetcher> fetcher;
101 bool written = false;
Austin Schuh15649d62019-12-28 16:36:38 -0800102
Austin Schuh6f3babe2020-01-26 20:34:50 -0800103 int channel_index = -1;
104
105 LogType log_type = LogType::kLogMessage;
106
107 DetachedBufferWriter *writer = nullptr;
108 DetachedBufferWriter *timestamp_writer = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700109 DetachedBufferWriter *contents_writer = nullptr;
110 const Node *writer_node = nullptr;
111 const Node *timestamp_node = nullptr;
112 int node_index = 0;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800113 };
114
115 std::vector<FetcherStruct> fetchers_;
116 TimerHandler *timer_handler_;
117
118 // Period to poll the channels.
119 const std::chrono::milliseconds polling_period_;
120
121 // Last time that data was written for all channels to disk.
122 monotonic_clock::time_point last_synchronized_time_;
123
Austin Schuhfa895892020-01-07 20:07:41 -0800124 monotonic_clock::time_point monotonic_start_time_;
125 realtime_clock::time_point realtime_start_time_;
126
Austin Schuhe309d2a2019-11-29 13:25:21 -0800127 // Max size that the header has consumed. This much extra data will be
128 // reserved in the builder to avoid reallocating.
129 size_t max_header_size_ = 0;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700130
131 // Fetcher for all the statistics from all the nodes.
132 aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
133
134 // Sets the start time for a specific node.
135 void SetStartTime(size_t node_index,
136 aos::monotonic_clock::time_point monotonic_start_time,
137 aos::realtime_clock::time_point realtime_start_time);
138
139 struct NodeState {
140 aos::monotonic_clock::time_point monotonic_start_time =
141 aos::monotonic_clock::min_time;
142 aos::realtime_clock::time_point realtime_start_time =
143 aos::realtime_clock::min_time;
144
145 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
146 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
147 };
148 std::vector<NodeState> node_state_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800149};
150
Austin Schuh11d43732020-09-21 17:28:30 -0700151// Datastructure to hold ordered parts.
152struct LogParts {
153 // Monotonic and realtime start times for this set of log files. For log
154 // files which started out unknown and then became known, this is the known
155 // start time.
156 aos::monotonic_clock::time_point monotonic_start_time;
157 aos::realtime_clock::time_point realtime_start_time;
158
159 // UUIDs if available.
160 std::string logger_uuid;
161 std::string parts_uuid;
162
163 // The node this represents, or empty if we are in a single node world.
164 std::string node;
165
166 // Pre-sorted list of parts.
167 std::vector<std::string> parts;
168};
169
170// Datastructure to hold parts from the same run of the logger which have no
171// ordering constraints relative to each other.
172struct LogFile {
173 // The UUID tying them all together (if available)
174 std::string logger_uuid;
175
176 // All the parts, unsorted.
177 std::vector<LogParts> parts;
178};
179
180std::ostream &operator<<(std::ostream &stream, const LogFile &file);
181std::ostream &operator<<(std::ostream &stream, const LogParts &parts);
182
Austin Schuh5212cad2020-09-09 23:12:09 -0700183// Takes a bunch of parts and sorts them based on part_uuid and part_index.
Austin Schuh11d43732020-09-21 17:28:30 -0700184std::vector<LogFile> SortParts(const std::vector<std::string> &parts);
185
186std::vector<std::vector<std::string>> ToLogReaderVector(
187 const std::vector<LogFile> &log_files);
Austin Schuh5212cad2020-09-09 23:12:09 -0700188
Austin Schuh6f3babe2020-01-26 20:34:50 -0800189// We end up with one of the following 3 log file types.
190//
191// Single node logged as the source node.
192// -> Replayed just on the source node.
193//
194// Forwarding timestamps only logged from the perspective of the destination
195// node.
196// -> Matched with data on source node and logged.
197//
198// Forwarding timestamps with data logged as the destination node.
199// -> Replayed just as the destination
200// -> Replayed as the source (Much harder, ordering is not defined)
201//
202// Duplicate data logged. -> CHECK that it matches and explode otherwise.
203//
204// This can be boiled down to a set of constraints and tools.
205//
206// 1) Forwarding timestamps and data need to be logged separately.
207// 2) Any forwarded data logged on the destination node needs to be logged
208// separately such that it can be sorted.
209//
210// 1) Log reader needs to be able to sort a list of log files.
211// 2) Log reader needs to be able to merge sorted lists of log files.
212// 3) Log reader needs to be able to match timestamps with messages.
213//
214// We also need to be able to generate multiple views of a log file depending on
215// the target.
216
Austin Schuhe309d2a2019-11-29 13:25:21 -0800217// Replays all the channels in the logfile to the event loop.
218class LogReader {
219 public:
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800220 // If you want to supply a new configuration that will be used for replay
221 // (e.g., to change message rates, or to populate an updated schema), then
222 // pass it in here. It must provide all the channels that the original logged
223 // config did.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800224 //
225 // Log filenames are in the following format:
226 //
227 // {
228 // {log1_part0, log1_part1, ...},
229 // {log2}
230 // }
231 // The inner vector is a list of log file chunks which form up a log file.
232 // The outer vector is a list of log files with subsets of the messages, or
233 // messages from different nodes.
234 //
235 // If the outer vector isn't provided, it is assumed to be of size 1.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800236 LogReader(std::string_view filename,
237 const Configuration *replay_configuration = nullptr);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800238 LogReader(const std::vector<std::string> &filenames,
239 const Configuration *replay_configuration = nullptr);
240 LogReader(const std::vector<std::vector<std::string>> &filenames,
Austin Schuhfa895892020-01-07 20:07:41 -0800241 const Configuration *replay_configuration = nullptr);
Austin Schuh11d43732020-09-21 17:28:30 -0700242 LogReader(const std::vector<LogFile> &log_files,
243 const Configuration *replay_configuration = nullptr);
James Kuszmaul7daef362019-12-31 18:28:17 -0800244 ~LogReader();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800245
Austin Schuh6331ef92020-01-07 18:28:09 -0800246 // Registers all the callbacks to send the log file data out on an event loop
247 // created in event_loop_factory. This also updates time to be at the start
248 // of the log file by running until the log file starts.
249 // Note: the configuration used in the factory should be configuration()
250 // below, but can be anything as long as the locations needed to send
251 // everything are available.
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800252 void Register(SimulatedEventLoopFactory *event_loop_factory);
Austin Schuh6331ef92020-01-07 18:28:09 -0800253 // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
254 // and then calls Register.
255 void Register();
256 // Registers callbacks for all the events after the log file starts. This is
257 // only useful when replaying live.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800258 void Register(EventLoop *event_loop);
Austin Schuh6331ef92020-01-07 18:28:09 -0800259
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800260 // Unregisters the senders. You only need to call this if you separately
261 // supplied an event loop or event loop factory and the lifetimes are such
262 // that they need to be explicitly destroyed before the LogReader destructor
263 // gets called.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800264 void Deregister();
265
Austin Schuh0c297012020-09-16 18:41:59 -0700266 // Returns the configuration being used for replay from the log file.
267 // Note that this may be different from the configuration actually used for
268 // handling events. You should generally only use this to create a
269 // SimulatedEventLoopFactory, and then get the configuration from there for
270 // everything else.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800271 const Configuration *logged_configuration() const;
Austin Schuh11d43732020-09-21 17:28:30 -0700272 // Returns the configuration being used for replay from the log file.
273 // Note that this may be different from the configuration actually used for
274 // handling events. You should generally only use this to create a
275 // SimulatedEventLoopFactory, and then get the configuration from there for
276 // everything else.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800277 // The pointer is invalidated whenever RemapLoggedChannel is called.
Austin Schuh15649d62019-12-28 16:36:38 -0800278 const Configuration *configuration() const;
279
Austin Schuh6f3babe2020-01-26 20:34:50 -0800280 // Returns the nodes that this log file was created on. This is a list of
281 // pointers to a node in the nodes() list inside configuration(). The
282 // pointers here are invalidated whenever RemapLoggedChannel is called.
283 std::vector<const Node *> Nodes() const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800284
285 // Returns the starting timestamp for the log file.
Austin Schuh11d43732020-09-21 17:28:30 -0700286 monotonic_clock::time_point monotonic_start_time(
287 const Node *node = nullptr) const;
288 realtime_clock::time_point realtime_start_time(
289 const Node *node = nullptr) const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800290
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800291 // Causes the logger to publish the provided channel on a different name so
292 // that replayed applications can publish on the proper channel name without
293 // interference. This operates on raw channel names, without any node or
294 // application specific mappings.
295 void RemapLoggedChannel(std::string_view name, std::string_view type,
296 std::string_view add_prefix = "/original");
297 template <typename T>
298 void RemapLoggedChannel(std::string_view name,
299 std::string_view add_prefix = "/original") {
300 RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
301 }
302
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700303 template <typename T>
304 bool HasChannel(std::string_view name) {
305 return configuration::GetChannel(log_file_header()->configuration(), name,
306 T::GetFullyQualifiedName(), "",
307 nullptr) != nullptr;
308 }
309
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800310 SimulatedEventLoopFactory *event_loop_factory() {
311 return event_loop_factory_;
312 }
313
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700314 const LogFileHeader *log_file_header() const {
315 return &log_file_header_.message();
316 }
317
Austin Schuh0c297012020-09-16 18:41:59 -0700318 std::string_view name() const {
319 return log_file_header()->name()->string_view();
320 }
321
Austin Schuhe309d2a2019-11-29 13:25:21 -0800322 private:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800323 const Channel *RemapChannel(const EventLoop *event_loop,
324 const Channel *channel);
325
Austin Schuhe309d2a2019-11-29 13:25:21 -0800326 // Queues at least max_out_of_order_duration_ messages into channels_.
327 void QueueMessages();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800328 // Handle constructing a configuration with all the additional remapped
329 // channels from calls to RemapLoggedChannel.
330 void MakeRemappedConfig();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800331
Austin Schuh2f8fd752020-09-01 22:38:28 -0700332 // Returns the number of nodes.
333 size_t nodes_count() const {
334 return !configuration::MultiNode(logged_configuration())
335 ? 1u
336 : logged_configuration()->nodes()->size();
337 }
338
Austin Schuh6f3babe2020-01-26 20:34:50 -0800339 const std::vector<std::vector<std::string>> filenames_;
340
341 // This is *a* log file header used to provide the logged config. The rest of
342 // the header is likely distracting.
343 FlatbufferVector<LogFileHeader> log_file_header_;
344
Austin Schuh2f8fd752020-09-01 22:38:28 -0700345 // Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
346 std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
347 Eigen::Matrix<double, Eigen::Dynamic, 1>>
348 SolveOffsets();
349
350 void LogFit(std::string_view prefix);
Austin Schuh8bd96322020-02-13 21:18:22 -0800351
Austin Schuh6f3babe2020-01-26 20:34:50 -0800352 // State per node.
Austin Schuh858c9f32020-08-31 16:56:12 -0700353 class State {
354 public:
355 State(std::unique_ptr<ChannelMerger> channel_merger);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800356
Austin Schuh858c9f32020-08-31 16:56:12 -0700357 // Returns the timestamps, channel_index, and message from a channel.
358 // update_time (will be) set to true when popping this message causes the
359 // filter to change the time offset estimation function.
360 std::tuple<TimestampMerger::DeliveryTimestamp, int,
361 FlatbufferVector<MessageHeader>>
362 PopOldest(bool *update_time);
363
364 // Returns the monotonic time of the oldest message.
365 monotonic_clock::time_point OldestMessageTime() const;
366
367 // Primes the queues inside State. Should be called before calling
368 // OldestMessageTime.
369 void SeedSortedMessages();
Austin Schuh8bd96322020-02-13 21:18:22 -0800370
Austin Schuh858c9f32020-08-31 16:56:12 -0700371 // Returns the starting time for this node.
372 monotonic_clock::time_point monotonic_start_time() const {
373 return channel_merger_->monotonic_start_time();
374 }
375 realtime_clock::time_point realtime_start_time() const {
376 return channel_merger_->realtime_start_time();
377 }
378
379 // Sets the node event loop factory for replaying into a
380 // SimulatedEventLoopFactory. Returns the EventLoop to use.
381 EventLoop *SetNodeEventLoopFactory(
382 NodeEventLoopFactory *node_event_loop_factory);
383
384 // Sets and gets the event loop to use.
385 void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
386 EventLoop *event_loop() { return event_loop_; }
387
Austin Schuh858c9f32020-08-31 16:56:12 -0700388 // Sets the current realtime offset from the monotonic clock for this node
389 // (if we are on a simulated event loop).
390 void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
391 realtime_clock::time_point realtime_time) {
392 if (node_event_loop_factory_ != nullptr) {
393 node_event_loop_factory_->SetRealtimeOffset(monotonic_time,
394 realtime_time);
395 }
396 }
397
398 // Converts a timestamp from the monotonic clock on this node to the
399 // distributed clock.
400 distributed_clock::time_point ToDistributedClock(
401 monotonic_clock::time_point time) {
402 return node_event_loop_factory_->ToDistributedClock(time);
403 }
404
Austin Schuh2f8fd752020-09-01 22:38:28 -0700405 monotonic_clock::time_point FromDistributedClock(
406 distributed_clock::time_point time) {
407 return node_event_loop_factory_->FromDistributedClock(time);
408 }
409
Austin Schuh858c9f32020-08-31 16:56:12 -0700410 // Sets the offset (and slope) from the distributed clock.
411 void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
412 double distributed_slope) {
413 node_event_loop_factory_->SetDistributedOffset(distributed_offset,
414 distributed_slope);
415 }
416
417 // Returns the current time on the remote node which sends messages on
418 // channel_index.
419 monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
420 return channel_target_event_loop_factory_[channel_index]->monotonic_now();
421 }
422
Austin Schuh2f8fd752020-09-01 22:38:28 -0700423 distributed_clock::time_point RemoteToDistributedClock(
424 size_t channel_index, monotonic_clock::time_point time) {
425 return channel_target_event_loop_factory_[channel_index]
426 ->ToDistributedClock(time);
427 }
428
429 const Node *remote_node(size_t channel_index) {
430 return channel_target_event_loop_factory_[channel_index]->node();
431 }
432
433 monotonic_clock::time_point monotonic_now() {
434 return node_event_loop_factory_->monotonic_now();
435 }
436
Austin Schuh858c9f32020-08-31 16:56:12 -0700437 // Sets the node we will be merging as, and returns true if there is any
438 // data on it.
439 bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
440
441 // Sets the number of channels.
442 void SetChannelCount(size_t count);
443
444 // Sets the sender, filter, and target factory for a channel.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700445 void SetChannel(size_t channel, std::unique_ptr<RawSender> sender,
446 message_bridge::NoncausalOffsetEstimator *filter,
447 NodeEventLoopFactory *channel_target_event_loop_factory);
Austin Schuh858c9f32020-08-31 16:56:12 -0700448
449 // Returns if we have read all the messages from all the logs.
450 bool at_end() const { return channel_merger_->at_end(); }
451
452 // Unregisters everything so we can destory the event loop.
453 void Deregister();
454
455 // Sets the current TimerHandle for the replay callback.
456 void set_timer_handler(TimerHandler *timer_handler) {
457 timer_handler_ = timer_handler;
458 }
459
460 // Sets the next wakeup time on the replay callback.
461 void Setup(monotonic_clock::time_point next_time) {
462 timer_handler_->Setup(next_time);
463 }
464
465 // Sends a buffer on the provided channel index.
466 bool Send(size_t channel_index, const void *data, size_t size,
467 aos::monotonic_clock::time_point monotonic_remote_time,
468 aos::realtime_clock::time_point realtime_remote_time,
469 uint32_t remote_queue_index) {
470 return channels_[channel_index]->Send(data, size, monotonic_remote_time,
471 realtime_remote_time,
472 remote_queue_index);
473 }
474
475 // Returns a debug string for the channel merger.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700476 std::string DebugString() const {
477 std::stringstream messages;
478 size_t i = 0;
479 for (const auto &message : sorted_messages_) {
480 if (i < 7 || i + 7 > sorted_messages_.size()) {
481 messages << "sorted_messages[" << i
482 << "]: " << std::get<0>(message).monotonic_event_time << " "
483 << configuration::StrippedChannelToString(
484 event_loop_->configuration()->channels()->Get(
485 std::get<2>(message).message().channel_index()))
486 << "\n";
487 } else if (i == 7) {
488 messages << "...\n";
489 }
490 ++i;
491 }
492 return messages.str() + channel_merger_->DebugString();
493 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700494
495 private:
496 // Log file.
497 std::unique_ptr<ChannelMerger> channel_merger_;
498
499 std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700500 FlatbufferVector<MessageHeader>,
501 message_bridge::NoncausalOffsetEstimator *>>
Austin Schuh858c9f32020-08-31 16:56:12 -0700502 sorted_messages_;
503
504 // Senders.
505 std::vector<std::unique_ptr<RawSender>> channels_;
506
507 // Factory (if we are in sim) that this loop was created on.
508 NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
509 std::unique_ptr<EventLoop> event_loop_unique_ptr_;
510 // Event loop.
511 EventLoop *event_loop_ = nullptr;
512 // And timer used to send messages.
513 TimerHandler *timer_handler_;
514
Austin Schuh8bd96322020-02-13 21:18:22 -0800515 // Filters (or nullptr if it isn't a forwarded channel) for each channel.
516 // This corresponds to the object which is shared among all the channels
517 // going between 2 nodes. The second element in the tuple indicates if this
518 // is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700519 std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800520
521 // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
522 // channel) which correspond to the originating node.
Austin Schuh858c9f32020-08-31 16:56:12 -0700523 std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800524 };
525
Austin Schuh8bd96322020-02-13 21:18:22 -0800526 // Node index -> State.
527 std::vector<std::unique_ptr<State>> states_;
528
529 // Creates the requested filter if it doesn't exist, regardless of whether
530 // these nodes can actually communicate directly. The second return value
531 // reports if this is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700532 message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
533 const Node *node_b);
Austin Schuh8bd96322020-02-13 21:18:22 -0800534
535 // FILE to write offsets to (if populated).
536 FILE *offset_fp_ = nullptr;
537 // Timestamp of the first piece of data used for the horizontal axis on the
538 // plot.
539 aos::realtime_clock::time_point first_time_;
540
541 // List of filters for a connection. The pointer to the first node will be
542 // less than the second node.
543 std::map<std::tuple<const Node *, const Node *>,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700544 std::tuple<message_bridge::NoncausalOffsetEstimator>>
Austin Schuh8bd96322020-02-13 21:18:22 -0800545 filters_;
546
547 // Returns the offset from the monotonic clock for a node to the distributed
Austin Schuh2f8fd752020-09-01 22:38:28 -0700548 // clock. monotonic = distributed * slope() + offset();
549 double slope(int node_index) const {
550 CHECK_LT(node_index, time_slope_matrix_.rows())
James Kuszmaul46d82582020-05-09 19:50:09 -0700551 << ": Got too high of a node index.";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700552 return time_slope_matrix_(node_index);
553 }
554 std::chrono::nanoseconds offset(int node_index) const {
555 CHECK_LT(node_index, time_offset_matrix_.rows())
556 << ": Got too high of a node index.";
557 return std::chrono::duration_cast<std::chrono::nanoseconds>(
558 std::chrono::duration<double>(time_offset_matrix_(node_index)));
Austin Schuh8bd96322020-02-13 21:18:22 -0800559 }
560
561 // Updates the offset matrix solution and sets the per-node distributed
562 // offsets in the factory.
563 void UpdateOffsets();
564
Austin Schuh2f8fd752020-09-01 22:38:28 -0700565 // We have 2 types of equations to do a least squares regression over to fully
566 // constrain our time function.
567 //
568 // One is simple. The distributed clock is the average of all the clocks.
Brian Silverman87ac0402020-09-17 14:47:01 -0700569 // (ta + tb + tc + td) / num_nodes = t_distributed
Austin Schuh2f8fd752020-09-01 22:38:28 -0700570 //
571 // The second is a bit more complicated. Our basic time conversion function
572 // is:
573 // tb = ta + (ta * slope + offset)
574 // We can rewrite this as follows
575 // tb - (1 + slope) * ta = offset
576 //
577 // From here, we have enough equations to solve for t{a,b,c,...} We want to
578 // take as an input the offsets and slope, and solve for the per-node times as
579 // a function of the distributed clock.
580 //
581 // We need to massage our equations to make this work. If we solve for the
582 // per-node times at two set distributed clock times, we will be able to
583 // recreate the linear function (we know it is linear). We can do a similar
584 // thing by breaking our equation up into:
Brian Silverman87ac0402020-09-17 14:47:01 -0700585 //
Austin Schuh2f8fd752020-09-01 22:38:28 -0700586 // [1/3 1/3 1/3 ] [ta] [t_distributed]
587 // [ 1 -1-m1 0 ] [tb] = [oab]
588 // [ 1 0 -1-m2 ] [tc] [oac]
589 //
590 // This solves to:
591 //
592 // [ta] [ a00 a01 a02] [t_distributed]
593 // [tb] = [ a10 a11 a12] * [oab]
594 // [tc] [ a20 a21 a22] [oac]
595 //
596 // and can be split into:
597 //
598 // [ta] [ a00 ] [a01 a02]
599 // [tb] = [ a10 ] * t_distributed + [a11 a12] * [oab]
600 // [tc] [ a20 ] [a21 a22] [oac]
601 //
602 // (map_matrix_ + slope_matrix_) * [ta; tb; tc] = [offset_matrix_];
603 // offset_matrix_ will be in nanoseconds.
604 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
605 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> slope_matrix_;
606 Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> offset_matrix_;
607 // Matrix tracking which offsets are valid.
608 Eigen::Matrix<bool, Eigen::Dynamic, 1> valid_matrix_;
609 // Matrix tracking the last valid matrix we used to determine connected nodes.
610 Eigen::Matrix<bool, Eigen::Dynamic, 1> last_valid_matrix_;
611 size_t cached_valid_node_count_ = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800612
Austin Schuh2f8fd752020-09-01 22:38:28 -0700613 // [ta; tb; tc] = time_slope_matrix_ * t + time_offset_matrix;
614 // t is in seconds.
615 Eigen::Matrix<double, Eigen::Dynamic, 1> time_slope_matrix_;
616 Eigen::Matrix<double, Eigen::Dynamic, 1> time_offset_matrix_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800617
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800618 std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
619 remapped_configuration_buffer_;
620
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800621 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
622 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800623
624 // Map of channel indices to new name. The channel index will be an index into
625 // logged_configuration(), and the string key will be the name of the channel
626 // to send on instead of the logged channel name.
627 std::map<size_t, std::string> remapped_channels_;
628
Austin Schuh6f3babe2020-01-26 20:34:50 -0800629 // Number of nodes which still have data to send. This is used to figure out
630 // when to exit.
631 size_t live_nodes_ = 0;
632
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800633 const Configuration *remapped_configuration_ = nullptr;
634 const Configuration *replay_configuration_ = nullptr;
Austin Schuhcde938c2020-02-02 17:30:07 -0800635
636 // If true, the replay timer will ignore any missing data. This is used
637 // during startup when we are bootstrapping everything and trying to get to
638 // the start of all the log files.
639 bool ignore_missing_data_ = false;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800640};
641
642} // namespace logger
643} // namespace aos
644
645#endif // AOS_EVENTS_LOGGER_H_