blob: e8856f300872f0dbdaf0899b338c883c0452fd8a [file] [log] [blame]
Austin Schuhe309d2a2019-11-29 13:25:21 -08001#ifndef AOS_EVENTS_LOGGER_H_
2#define AOS_EVENTS_LOGGER_H_
3
Austin Schuh8bd96322020-02-13 21:18:22 -08004#include <chrono>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <deque>
Austin Schuh05b70472020-01-01 17:11:17 -08006#include <string_view>
Austin Schuh2f8fd752020-09-01 22:38:28 -07007#include <tuple>
Austin Schuh6f3babe2020-01-26 20:34:50 -08008#include <vector>
Austin Schuhe309d2a2019-11-29 13:25:21 -08009
Austin Schuh8bd96322020-02-13 21:18:22 -080010#include "Eigen/Dense"
11#include "absl/strings/str_cat.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "absl/types/span.h"
13#include "aos/events/event_loop.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070014#include "aos/events/logging/eigen_mpq.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070015#include "aos/events/logging/log_namer.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "aos/events/logging/logfile_utils.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080017#include "aos/events/logging/logger_generated.h"
Austin Schuh64fab802020-09-09 22:47:47 -070018#include "aos/events/logging/uuid.h"
Austin Schuh92547522019-12-28 14:33:43 -080019#include "aos/events/simulated_event_loop.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070020#include "aos/network/message_bridge_server_generated.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080021#include "aos/network/timestamp_filter.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080022#include "aos/time/time.h"
23#include "flatbuffers/flatbuffers.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070024#include "third_party/gmp/gmpxx.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080025
26namespace aos {
27namespace logger {
28
Austin Schuhe309d2a2019-11-29 13:25:21 -080029// Logs all channels available in the event loop to disk every 100 ms.
30// Start by logging one message per channel to capture any state and
31// configuration that is sent rately on a channel and would affect execution.
32class Logger {
33 public:
Austin Schuh0c297012020-09-16 18:41:59 -070034 // Constructs a logger.
Austin Schuh0c297012020-09-16 18:41:59 -070035 // event_loop: The event loop used to read the messages.
Austin Schuh0c297012020-09-16 18:41:59 -070036 // configuration: When provided, this is the configuration to log, and the
37 // configuration to use for the channel list to log. If not provided,
38 // this becomes the configuration from the event loop.
Brian Silverman1f345222020-09-24 21:14:48 -070039 // should_log: When provided, a filter for channels to log. If not provided,
40 // all available channels are logged.
41 Logger(EventLoop *event_loop)
42 : Logger(event_loop, event_loop->configuration()) {}
43 Logger(EventLoop *event_loop, const Configuration *configuration)
44 : Logger(event_loop, configuration,
45 [](const Channel *) { return true; }) {}
46 Logger(EventLoop *event_loop, const Configuration *configuration,
47 std::function<bool(const Channel *)> should_log);
Austin Schuh0c297012020-09-16 18:41:59 -070048 ~Logger();
49
50 // Overrides the name in the log file header.
51 void set_name(std::string_view name) { name_ = name; }
Austin Schuhe309d2a2019-11-29 13:25:21 -080052
Brian Silverman1f345222020-09-24 21:14:48 -070053 // Sets the callback to run after each period of data is logged. Defaults to
54 // doing nothing.
55 //
56 // This callback may safely do things like call Rotate().
57 void set_on_logged_period(std::function<void()> on_logged_period) {
58 on_logged_period_ = std::move(on_logged_period);
59 }
60
61 // Sets the period between polling the data. Defaults to 100ms.
62 //
63 // Changing this while a set of files is being written may result in
64 // unreadable files.
65 void set_polling_period(std::chrono::nanoseconds polling_period) {
66 polling_period_ = polling_period;
67 }
68
Austin Schuh2f8fd752020-09-01 22:38:28 -070069 // Rotates the log file(s), triggering new part files to be written for each
70 // log file.
71 void Rotate();
Austin Schuhfa895892020-01-07 20:07:41 -080072
Brian Silverman1f345222020-09-24 21:14:48 -070073 // Starts logging to files with the given naming scheme.
74 void StartLogging(std::unique_ptr<LogNamer> log_namer);
75
76 // Stops logging. Ensures any messages through end_time make it into the log.
77 //
78 // If you want to stop ASAP, pass min_time to avoid reading any more messages.
79 //
80 // Returns the LogNamer in case the caller wants to do anything else with it
81 // before destroying it.
82 std::unique_ptr<LogNamer> StopLogging(
83 aos::monotonic_clock::time_point end_time);
84
85 // Returns whether a log is currently being written.
86 bool is_started() const { return static_cast<bool>(log_namer_); }
87
88 // Shortcut to call StartLogging with a LocalLogNamer when event processing
89 // starts.
90 void StartLoggingLocalNamerOnRun(std::string base_name) {
91 event_loop_->OnRun([this, base_name]() {
92 StartLogging(
93 std::make_unique<LocalLogNamer>(base_name, event_loop_->node()));
94 });
95 }
96
Austin Schuhe309d2a2019-11-29 13:25:21 -080097 private:
Austin Schuhe309d2a2019-11-29 13:25:21 -080098 // Structure to track both a fetcher, and if the data fetched has been
99 // written. We may want to delay writing data to disk so that we don't let
100 // data get too far out of order when written to disk so we can avoid making
101 // it too hard to sort when reading.
102 struct FetcherStruct {
103 std::unique_ptr<RawFetcher> fetcher;
104 bool written = false;
Austin Schuh15649d62019-12-28 16:36:38 -0800105
Austin Schuh6f3babe2020-01-26 20:34:50 -0800106 int channel_index = -1;
Brian Silverman1f345222020-09-24 21:14:48 -0700107 const Channel *channel = nullptr;
108 const Node *timestamp_node = nullptr;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800109
110 LogType log_type = LogType::kLogMessage;
111
Brian Silverman1f345222020-09-24 21:14:48 -0700112 // We fill out the metadata at construction, but the actual writers have to
113 // be updated each time we start logging. To avoid duplicating the complex
114 // logic determining whether each writer should be initialized, we just
115 // stash the answer in separate member variables.
116 bool wants_writer = false;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800117 DetachedBufferWriter *writer = nullptr;
Brian Silverman1f345222020-09-24 21:14:48 -0700118 bool wants_timestamp_writer = false;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800119 DetachedBufferWriter *timestamp_writer = nullptr;
Brian Silverman1f345222020-09-24 21:14:48 -0700120 bool wants_contents_writer = false;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700121 DetachedBufferWriter *contents_writer = nullptr;
Brian Silverman1f345222020-09-24 21:14:48 -0700122
Austin Schuh2f8fd752020-09-01 22:38:28 -0700123 int node_index = 0;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800124 };
125
Austin Schuh2f8fd752020-09-01 22:38:28 -0700126 struct NodeState {
127 aos::monotonic_clock::time_point monotonic_start_time =
128 aos::monotonic_clock::min_time;
129 aos::realtime_clock::time_point realtime_start_time =
130 aos::realtime_clock::min_time;
131
132 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
133 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
134 };
Brian Silverman1f345222020-09-24 21:14:48 -0700135
136 void WriteHeader();
137 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
138 const Node *node);
139
140 bool MaybeUpdateTimestamp(
141 const Node *node, int node_index,
142 aos::monotonic_clock::time_point monotonic_start_time,
143 aos::realtime_clock::time_point realtime_start_time);
144
145 void DoLogData(const monotonic_clock::time_point end_time);
146
147 void WriteMissingTimestamps();
148
149 // Fetches from each channel until all the data is logged.
150 void LogUntil(monotonic_clock::time_point t);
151
152 // Sets the start time for a specific node.
153 void SetStartTime(size_t node_index,
154 aos::monotonic_clock::time_point monotonic_start_time,
155 aos::realtime_clock::time_point realtime_start_time);
156
157 EventLoop *event_loop_;
158 UUID uuid_ = UUID::Zero();
159 std::unique_ptr<LogNamer> log_namer_;
160
161 // The configuration to place at the top of the log file.
162 const Configuration *const configuration_;
163
164 // Name to save in the log file. Defaults to hostname.
165 std::string name_;
166
167 std::function<void()> on_logged_period_ = []() {};
168
169 std::vector<FetcherStruct> fetchers_;
170 TimerHandler *timer_handler_;
171
172 // Period to poll the channels.
173 std::chrono::nanoseconds polling_period_ = std::chrono::milliseconds(100);
174
175 // Last time that data was written for all channels to disk.
176 monotonic_clock::time_point last_synchronized_time_;
177
178 // Max size that the header has consumed. This much extra data will be
179 // reserved in the builder to avoid reallocating.
180 size_t max_header_size_ = 0;
181
182 // Fetcher for all the statistics from all the nodes.
183 aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
184
Austin Schuh2f8fd752020-09-01 22:38:28 -0700185 std::vector<NodeState> node_state_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800186};
187
Austin Schuh11d43732020-09-21 17:28:30 -0700188// Datastructure to hold ordered parts.
189struct LogParts {
190 // Monotonic and realtime start times for this set of log files. For log
191 // files which started out unknown and then became known, this is the known
192 // start time.
193 aos::monotonic_clock::time_point monotonic_start_time;
194 aos::realtime_clock::time_point realtime_start_time;
195
196 // UUIDs if available.
197 std::string logger_uuid;
198 std::string parts_uuid;
199
200 // The node this represents, or empty if we are in a single node world.
201 std::string node;
202
203 // Pre-sorted list of parts.
204 std::vector<std::string> parts;
205};
206
207// Datastructure to hold parts from the same run of the logger which have no
208// ordering constraints relative to each other.
209struct LogFile {
210 // The UUID tying them all together (if available)
211 std::string logger_uuid;
212
213 // All the parts, unsorted.
214 std::vector<LogParts> parts;
215};
216
217std::ostream &operator<<(std::ostream &stream, const LogFile &file);
218std::ostream &operator<<(std::ostream &stream, const LogParts &parts);
219
Austin Schuh5212cad2020-09-09 23:12:09 -0700220// Takes a bunch of parts and sorts them based on part_uuid and part_index.
Austin Schuh11d43732020-09-21 17:28:30 -0700221std::vector<LogFile> SortParts(const std::vector<std::string> &parts);
222
223std::vector<std::vector<std::string>> ToLogReaderVector(
224 const std::vector<LogFile> &log_files);
Austin Schuh5212cad2020-09-09 23:12:09 -0700225
Austin Schuh6f3babe2020-01-26 20:34:50 -0800226// We end up with one of the following 3 log file types.
227//
228// Single node logged as the source node.
229// -> Replayed just on the source node.
230//
231// Forwarding timestamps only logged from the perspective of the destination
232// node.
233// -> Matched with data on source node and logged.
234//
235// Forwarding timestamps with data logged as the destination node.
236// -> Replayed just as the destination
237// -> Replayed as the source (Much harder, ordering is not defined)
238//
239// Duplicate data logged. -> CHECK that it matches and explode otherwise.
240//
241// This can be boiled down to a set of constraints and tools.
242//
243// 1) Forwarding timestamps and data need to be logged separately.
244// 2) Any forwarded data logged on the destination node needs to be logged
245// separately such that it can be sorted.
246//
247// 1) Log reader needs to be able to sort a list of log files.
248// 2) Log reader needs to be able to merge sorted lists of log files.
249// 3) Log reader needs to be able to match timestamps with messages.
250//
251// We also need to be able to generate multiple views of a log file depending on
252// the target.
253
Austin Schuhe309d2a2019-11-29 13:25:21 -0800254// Replays all the channels in the logfile to the event loop.
255class LogReader {
256 public:
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800257 // If you want to supply a new configuration that will be used for replay
258 // (e.g., to change message rates, or to populate an updated schema), then
259 // pass it in here. It must provide all the channels that the original logged
260 // config did.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800261 //
262 // Log filenames are in the following format:
263 //
264 // {
265 // {log1_part0, log1_part1, ...},
266 // {log2}
267 // }
268 // The inner vector is a list of log file chunks which form up a log file.
269 // The outer vector is a list of log files with subsets of the messages, or
270 // messages from different nodes.
271 //
272 // If the outer vector isn't provided, it is assumed to be of size 1.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800273 LogReader(std::string_view filename,
274 const Configuration *replay_configuration = nullptr);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800275 LogReader(const std::vector<std::string> &filenames,
276 const Configuration *replay_configuration = nullptr);
277 LogReader(const std::vector<std::vector<std::string>> &filenames,
Austin Schuhfa895892020-01-07 20:07:41 -0800278 const Configuration *replay_configuration = nullptr);
Austin Schuh11d43732020-09-21 17:28:30 -0700279 LogReader(const std::vector<LogFile> &log_files,
280 const Configuration *replay_configuration = nullptr);
James Kuszmaul7daef362019-12-31 18:28:17 -0800281 ~LogReader();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800282
Austin Schuh6331ef92020-01-07 18:28:09 -0800283 // Registers all the callbacks to send the log file data out on an event loop
284 // created in event_loop_factory. This also updates time to be at the start
285 // of the log file by running until the log file starts.
286 // Note: the configuration used in the factory should be configuration()
287 // below, but can be anything as long as the locations needed to send
288 // everything are available.
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800289 void Register(SimulatedEventLoopFactory *event_loop_factory);
Austin Schuh6331ef92020-01-07 18:28:09 -0800290 // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
291 // and then calls Register.
292 void Register();
293 // Registers callbacks for all the events after the log file starts. This is
294 // only useful when replaying live.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800295 void Register(EventLoop *event_loop);
Austin Schuh6331ef92020-01-07 18:28:09 -0800296
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800297 // Unregisters the senders. You only need to call this if you separately
298 // supplied an event loop or event loop factory and the lifetimes are such
299 // that they need to be explicitly destroyed before the LogReader destructor
300 // gets called.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800301 void Deregister();
302
Austin Schuh0c297012020-09-16 18:41:59 -0700303 // Returns the configuration being used for replay from the log file.
304 // Note that this may be different from the configuration actually used for
305 // handling events. You should generally only use this to create a
306 // SimulatedEventLoopFactory, and then get the configuration from there for
307 // everything else.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800308 const Configuration *logged_configuration() const;
Austin Schuh11d43732020-09-21 17:28:30 -0700309 // Returns the configuration being used for replay from the log file.
310 // Note that this may be different from the configuration actually used for
311 // handling events. You should generally only use this to create a
312 // SimulatedEventLoopFactory, and then get the configuration from there for
313 // everything else.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800314 // The pointer is invalidated whenever RemapLoggedChannel is called.
Austin Schuh15649d62019-12-28 16:36:38 -0800315 const Configuration *configuration() const;
316
Austin Schuh6f3babe2020-01-26 20:34:50 -0800317 // Returns the nodes that this log file was created on. This is a list of
318 // pointers to a node in the nodes() list inside configuration(). The
319 // pointers here are invalidated whenever RemapLoggedChannel is called.
320 std::vector<const Node *> Nodes() const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800321
322 // Returns the starting timestamp for the log file.
Austin Schuh11d43732020-09-21 17:28:30 -0700323 monotonic_clock::time_point monotonic_start_time(
324 const Node *node = nullptr) const;
325 realtime_clock::time_point realtime_start_time(
326 const Node *node = nullptr) const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800327
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800328 // Causes the logger to publish the provided channel on a different name so
329 // that replayed applications can publish on the proper channel name without
330 // interference. This operates on raw channel names, without any node or
331 // application specific mappings.
332 void RemapLoggedChannel(std::string_view name, std::string_view type,
333 std::string_view add_prefix = "/original");
334 template <typename T>
335 void RemapLoggedChannel(std::string_view name,
336 std::string_view add_prefix = "/original") {
337 RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
338 }
339
Austin Schuh01b4c352020-09-21 23:09:39 -0700340 // Remaps the provided channel, though this respects node mappings, and
341 // preserves them too. This makes it so if /aos -> /pi1/aos on one node,
342 // /original/aos -> /original/pi1/aos on the same node after renaming, just
343 // like you would hope.
344 //
345 // TODO(austin): If you have 2 nodes remapping something to the same channel,
346 // this doesn't handle that. No use cases exist yet for that, so it isn't
347 // being done yet.
348 void RemapLoggedChannel(std::string_view name, std::string_view type,
349 const Node *node,
350 std::string_view add_prefix = "/original");
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700351 template <typename T>
Austin Schuh01b4c352020-09-21 23:09:39 -0700352 void RemapLoggedChannel(std::string_view name, const Node *node,
353 std::string_view add_prefix = "/original") {
354 RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix);
355 }
356
357 template <typename T>
358 bool HasChannel(std::string_view name, const Node *node = nullptr) {
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700359 return configuration::GetChannel(log_file_header()->configuration(), name,
360 T::GetFullyQualifiedName(), "",
Austin Schuh01b4c352020-09-21 23:09:39 -0700361 node) != nullptr;
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700362 }
363
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800364 SimulatedEventLoopFactory *event_loop_factory() {
365 return event_loop_factory_;
366 }
367
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700368 const LogFileHeader *log_file_header() const {
369 return &log_file_header_.message();
370 }
371
Austin Schuh0c297012020-09-16 18:41:59 -0700372 std::string_view name() const {
373 return log_file_header()->name()->string_view();
374 }
375
Austin Schuhe309d2a2019-11-29 13:25:21 -0800376 private:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800377 const Channel *RemapChannel(const EventLoop *event_loop,
378 const Channel *channel);
379
Austin Schuhe309d2a2019-11-29 13:25:21 -0800380 // Queues at least max_out_of_order_duration_ messages into channels_.
381 void QueueMessages();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800382 // Handle constructing a configuration with all the additional remapped
383 // channels from calls to RemapLoggedChannel.
384 void MakeRemappedConfig();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800385
Austin Schuh2f8fd752020-09-01 22:38:28 -0700386 // Returns the number of nodes.
387 size_t nodes_count() const {
388 return !configuration::MultiNode(logged_configuration())
389 ? 1u
390 : logged_configuration()->nodes()->size();
391 }
392
Austin Schuh6f3babe2020-01-26 20:34:50 -0800393 const std::vector<std::vector<std::string>> filenames_;
394
395 // This is *a* log file header used to provide the logged config. The rest of
396 // the header is likely distracting.
397 FlatbufferVector<LogFileHeader> log_file_header_;
398
Austin Schuh2f8fd752020-09-01 22:38:28 -0700399 // Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
400 std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
401 Eigen::Matrix<double, Eigen::Dynamic, 1>>
402 SolveOffsets();
403
404 void LogFit(std::string_view prefix);
Austin Schuh8bd96322020-02-13 21:18:22 -0800405
Austin Schuh6f3babe2020-01-26 20:34:50 -0800406 // State per node.
Austin Schuh858c9f32020-08-31 16:56:12 -0700407 class State {
408 public:
409 State(std::unique_ptr<ChannelMerger> channel_merger);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800410
Austin Schuh858c9f32020-08-31 16:56:12 -0700411 // Returns the timestamps, channel_index, and message from a channel.
412 // update_time (will be) set to true when popping this message causes the
413 // filter to change the time offset estimation function.
414 std::tuple<TimestampMerger::DeliveryTimestamp, int,
415 FlatbufferVector<MessageHeader>>
416 PopOldest(bool *update_time);
417
418 // Returns the monotonic time of the oldest message.
419 monotonic_clock::time_point OldestMessageTime() const;
420
421 // Primes the queues inside State. Should be called before calling
422 // OldestMessageTime.
423 void SeedSortedMessages();
Austin Schuh8bd96322020-02-13 21:18:22 -0800424
Austin Schuh858c9f32020-08-31 16:56:12 -0700425 // Returns the starting time for this node.
426 monotonic_clock::time_point monotonic_start_time() const {
427 return channel_merger_->monotonic_start_time();
428 }
429 realtime_clock::time_point realtime_start_time() const {
430 return channel_merger_->realtime_start_time();
431 }
432
433 // Sets the node event loop factory for replaying into a
434 // SimulatedEventLoopFactory. Returns the EventLoop to use.
435 EventLoop *SetNodeEventLoopFactory(
436 NodeEventLoopFactory *node_event_loop_factory);
437
438 // Sets and gets the event loop to use.
439 void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
440 EventLoop *event_loop() { return event_loop_; }
441
Austin Schuh858c9f32020-08-31 16:56:12 -0700442 // Sets the current realtime offset from the monotonic clock for this node
443 // (if we are on a simulated event loop).
444 void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
445 realtime_clock::time_point realtime_time) {
446 if (node_event_loop_factory_ != nullptr) {
447 node_event_loop_factory_->SetRealtimeOffset(monotonic_time,
448 realtime_time);
449 }
450 }
451
452 // Converts a timestamp from the monotonic clock on this node to the
453 // distributed clock.
454 distributed_clock::time_point ToDistributedClock(
455 monotonic_clock::time_point time) {
456 return node_event_loop_factory_->ToDistributedClock(time);
457 }
458
Austin Schuh2f8fd752020-09-01 22:38:28 -0700459 monotonic_clock::time_point FromDistributedClock(
460 distributed_clock::time_point time) {
461 return node_event_loop_factory_->FromDistributedClock(time);
462 }
463
Austin Schuh858c9f32020-08-31 16:56:12 -0700464 // Sets the offset (and slope) from the distributed clock.
465 void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
466 double distributed_slope) {
467 node_event_loop_factory_->SetDistributedOffset(distributed_offset,
468 distributed_slope);
469 }
470
471 // Returns the current time on the remote node which sends messages on
472 // channel_index.
473 monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
474 return channel_target_event_loop_factory_[channel_index]->monotonic_now();
475 }
476
Austin Schuh2f8fd752020-09-01 22:38:28 -0700477 distributed_clock::time_point RemoteToDistributedClock(
478 size_t channel_index, monotonic_clock::time_point time) {
479 return channel_target_event_loop_factory_[channel_index]
480 ->ToDistributedClock(time);
481 }
482
483 const Node *remote_node(size_t channel_index) {
484 return channel_target_event_loop_factory_[channel_index]->node();
485 }
486
487 monotonic_clock::time_point monotonic_now() {
488 return node_event_loop_factory_->monotonic_now();
489 }
490
Austin Schuh858c9f32020-08-31 16:56:12 -0700491 // Sets the node we will be merging as, and returns true if there is any
492 // data on it.
493 bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
494
495 // Sets the number of channels.
496 void SetChannelCount(size_t count);
497
498 // Sets the sender, filter, and target factory for a channel.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700499 void SetChannel(size_t channel, std::unique_ptr<RawSender> sender,
500 message_bridge::NoncausalOffsetEstimator *filter,
501 NodeEventLoopFactory *channel_target_event_loop_factory);
Austin Schuh858c9f32020-08-31 16:56:12 -0700502
503 // Returns if we have read all the messages from all the logs.
504 bool at_end() const { return channel_merger_->at_end(); }
505
506 // Unregisters everything so we can destory the event loop.
507 void Deregister();
508
509 // Sets the current TimerHandle for the replay callback.
510 void set_timer_handler(TimerHandler *timer_handler) {
511 timer_handler_ = timer_handler;
512 }
513
514 // Sets the next wakeup time on the replay callback.
515 void Setup(monotonic_clock::time_point next_time) {
516 timer_handler_->Setup(next_time);
517 }
518
519 // Sends a buffer on the provided channel index.
520 bool Send(size_t channel_index, const void *data, size_t size,
521 aos::monotonic_clock::time_point monotonic_remote_time,
522 aos::realtime_clock::time_point realtime_remote_time,
523 uint32_t remote_queue_index) {
524 return channels_[channel_index]->Send(data, size, monotonic_remote_time,
525 realtime_remote_time,
526 remote_queue_index);
527 }
528
529 // Returns a debug string for the channel merger.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700530 std::string DebugString() const {
531 std::stringstream messages;
532 size_t i = 0;
533 for (const auto &message : sorted_messages_) {
534 if (i < 7 || i + 7 > sorted_messages_.size()) {
535 messages << "sorted_messages[" << i
536 << "]: " << std::get<0>(message).monotonic_event_time << " "
537 << configuration::StrippedChannelToString(
538 event_loop_->configuration()->channels()->Get(
539 std::get<2>(message).message().channel_index()))
540 << "\n";
541 } else if (i == 7) {
542 messages << "...\n";
543 }
544 ++i;
545 }
546 return messages.str() + channel_merger_->DebugString();
547 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700548
549 private:
550 // Log file.
551 std::unique_ptr<ChannelMerger> channel_merger_;
552
553 std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700554 FlatbufferVector<MessageHeader>,
555 message_bridge::NoncausalOffsetEstimator *>>
Austin Schuh858c9f32020-08-31 16:56:12 -0700556 sorted_messages_;
557
558 // Senders.
559 std::vector<std::unique_ptr<RawSender>> channels_;
560
561 // Factory (if we are in sim) that this loop was created on.
562 NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
563 std::unique_ptr<EventLoop> event_loop_unique_ptr_;
564 // Event loop.
565 EventLoop *event_loop_ = nullptr;
566 // And timer used to send messages.
567 TimerHandler *timer_handler_;
568
Austin Schuh8bd96322020-02-13 21:18:22 -0800569 // Filters (or nullptr if it isn't a forwarded channel) for each channel.
570 // This corresponds to the object which is shared among all the channels
571 // going between 2 nodes. The second element in the tuple indicates if this
572 // is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700573 std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800574
575 // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
576 // channel) which correspond to the originating node.
Austin Schuh858c9f32020-08-31 16:56:12 -0700577 std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800578 };
579
Austin Schuh8bd96322020-02-13 21:18:22 -0800580 // Node index -> State.
581 std::vector<std::unique_ptr<State>> states_;
582
583 // Creates the requested filter if it doesn't exist, regardless of whether
584 // these nodes can actually communicate directly. The second return value
585 // reports if this is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700586 message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
587 const Node *node_b);
Austin Schuh8bd96322020-02-13 21:18:22 -0800588
589 // FILE to write offsets to (if populated).
590 FILE *offset_fp_ = nullptr;
591 // Timestamp of the first piece of data used for the horizontal axis on the
592 // plot.
593 aos::realtime_clock::time_point first_time_;
594
595 // List of filters for a connection. The pointer to the first node will be
596 // less than the second node.
597 std::map<std::tuple<const Node *, const Node *>,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700598 std::tuple<message_bridge::NoncausalOffsetEstimator>>
Austin Schuh8bd96322020-02-13 21:18:22 -0800599 filters_;
600
601 // Returns the offset from the monotonic clock for a node to the distributed
Austin Schuh2f8fd752020-09-01 22:38:28 -0700602 // clock. monotonic = distributed * slope() + offset();
603 double slope(int node_index) const {
604 CHECK_LT(node_index, time_slope_matrix_.rows())
James Kuszmaul46d82582020-05-09 19:50:09 -0700605 << ": Got too high of a node index.";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700606 return time_slope_matrix_(node_index);
607 }
608 std::chrono::nanoseconds offset(int node_index) const {
609 CHECK_LT(node_index, time_offset_matrix_.rows())
610 << ": Got too high of a node index.";
611 return std::chrono::duration_cast<std::chrono::nanoseconds>(
612 std::chrono::duration<double>(time_offset_matrix_(node_index)));
Austin Schuh8bd96322020-02-13 21:18:22 -0800613 }
614
615 // Updates the offset matrix solution and sets the per-node distributed
616 // offsets in the factory.
617 void UpdateOffsets();
618
Austin Schuh2f8fd752020-09-01 22:38:28 -0700619 // We have 2 types of equations to do a least squares regression over to fully
620 // constrain our time function.
621 //
622 // One is simple. The distributed clock is the average of all the clocks.
Brian Silverman87ac0402020-09-17 14:47:01 -0700623 // (ta + tb + tc + td) / num_nodes = t_distributed
Austin Schuh2f8fd752020-09-01 22:38:28 -0700624 //
625 // The second is a bit more complicated. Our basic time conversion function
626 // is:
627 // tb = ta + (ta * slope + offset)
628 // We can rewrite this as follows
629 // tb - (1 + slope) * ta = offset
630 //
631 // From here, we have enough equations to solve for t{a,b,c,...} We want to
632 // take as an input the offsets and slope, and solve for the per-node times as
633 // a function of the distributed clock.
634 //
635 // We need to massage our equations to make this work. If we solve for the
636 // per-node times at two set distributed clock times, we will be able to
637 // recreate the linear function (we know it is linear). We can do a similar
638 // thing by breaking our equation up into:
Brian Silverman87ac0402020-09-17 14:47:01 -0700639 //
Austin Schuh2f8fd752020-09-01 22:38:28 -0700640 // [1/3 1/3 1/3 ] [ta] [t_distributed]
641 // [ 1 -1-m1 0 ] [tb] = [oab]
642 // [ 1 0 -1-m2 ] [tc] [oac]
643 //
644 // This solves to:
645 //
646 // [ta] [ a00 a01 a02] [t_distributed]
647 // [tb] = [ a10 a11 a12] * [oab]
648 // [tc] [ a20 a21 a22] [oac]
649 //
650 // and can be split into:
651 //
652 // [ta] [ a00 ] [a01 a02]
653 // [tb] = [ a10 ] * t_distributed + [a11 a12] * [oab]
654 // [tc] [ a20 ] [a21 a22] [oac]
655 //
656 // (map_matrix_ + slope_matrix_) * [ta; tb; tc] = [offset_matrix_];
657 // offset_matrix_ will be in nanoseconds.
658 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
659 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> slope_matrix_;
660 Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> offset_matrix_;
661 // Matrix tracking which offsets are valid.
662 Eigen::Matrix<bool, Eigen::Dynamic, 1> valid_matrix_;
663 // Matrix tracking the last valid matrix we used to determine connected nodes.
664 Eigen::Matrix<bool, Eigen::Dynamic, 1> last_valid_matrix_;
665 size_t cached_valid_node_count_ = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800666
Austin Schuh2f8fd752020-09-01 22:38:28 -0700667 // [ta; tb; tc] = time_slope_matrix_ * t + time_offset_matrix;
668 // t is in seconds.
669 Eigen::Matrix<double, Eigen::Dynamic, 1> time_slope_matrix_;
670 Eigen::Matrix<double, Eigen::Dynamic, 1> time_offset_matrix_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800671
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800672 std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
673 remapped_configuration_buffer_;
674
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800675 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
676 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800677
678 // Map of channel indices to new name. The channel index will be an index into
679 // logged_configuration(), and the string key will be the name of the channel
680 // to send on instead of the logged channel name.
681 std::map<size_t, std::string> remapped_channels_;
Austin Schuh01b4c352020-09-21 23:09:39 -0700682 std::vector<MapT> maps_;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800683
Austin Schuh6f3babe2020-01-26 20:34:50 -0800684 // Number of nodes which still have data to send. This is used to figure out
685 // when to exit.
686 size_t live_nodes_ = 0;
687
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800688 const Configuration *remapped_configuration_ = nullptr;
689 const Configuration *replay_configuration_ = nullptr;
Austin Schuhcde938c2020-02-02 17:30:07 -0800690
691 // If true, the replay timer will ignore any missing data. This is used
692 // during startup when we are bootstrapping everything and trying to get to
693 // the start of all the log files.
694 bool ignore_missing_data_ = false;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800695};
696
697} // namespace logger
698} // namespace aos
699
700#endif // AOS_EVENTS_LOGGER_H_