blob: 230f773b06619531966d5f80916a2d865f318a9b [file] [log] [blame]
Austin Schuhe309d2a2019-11-29 13:25:21 -08001#ifndef AOS_EVENTS_LOGGER_H_
2#define AOS_EVENTS_LOGGER_H_
3
Austin Schuh8bd96322020-02-13 21:18:22 -08004#include <chrono>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <deque>
Austin Schuh05b70472020-01-01 17:11:17 -08006#include <string_view>
Austin Schuh2f8fd752020-09-01 22:38:28 -07007#include <tuple>
Austin Schuh6f3babe2020-01-26 20:34:50 -08008#include <vector>
Austin Schuhe309d2a2019-11-29 13:25:21 -08009
Austin Schuh8bd96322020-02-13 21:18:22 -080010#include "Eigen/Dense"
11#include "absl/strings/str_cat.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "absl/types/span.h"
13#include "aos/events/event_loop.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070014#include "aos/events/logging/eigen_mpq.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070015#include "aos/events/logging/log_namer.h"
Austin Schuha36c8902019-12-30 18:07:15 -080016#include "aos/events/logging/logfile_utils.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080017#include "aos/events/logging/logger_generated.h"
Austin Schuh64fab802020-09-09 22:47:47 -070018#include "aos/events/logging/uuid.h"
Austin Schuh92547522019-12-28 14:33:43 -080019#include "aos/events/simulated_event_loop.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070020#include "aos/network/message_bridge_server_generated.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080021#include "aos/network/timestamp_filter.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080022#include "aos/time/time.h"
23#include "flatbuffers/flatbuffers.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070024#include "third_party/gmp/gmpxx.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080025
26namespace aos {
27namespace logger {
28
Austin Schuhe309d2a2019-11-29 13:25:21 -080029// Logs all channels available in the event loop to disk every 100 ms.
30// Start by logging one message per channel to capture any state and
31// configuration that is sent rately on a channel and would affect execution.
32class Logger {
33 public:
Austin Schuh0c297012020-09-16 18:41:59 -070034 // Constructs a logger.
Austin Schuh0c297012020-09-16 18:41:59 -070035 // event_loop: The event loop used to read the messages.
Austin Schuh0c297012020-09-16 18:41:59 -070036 // configuration: When provided, this is the configuration to log, and the
37 // configuration to use for the channel list to log. If not provided,
38 // this becomes the configuration from the event loop.
Brian Silverman1f345222020-09-24 21:14:48 -070039 // should_log: When provided, a filter for channels to log. If not provided,
40 // all available channels are logged.
41 Logger(EventLoop *event_loop)
42 : Logger(event_loop, event_loop->configuration()) {}
43 Logger(EventLoop *event_loop, const Configuration *configuration)
44 : Logger(event_loop, configuration,
45 [](const Channel *) { return true; }) {}
46 Logger(EventLoop *event_loop, const Configuration *configuration,
47 std::function<bool(const Channel *)> should_log);
Austin Schuh0c297012020-09-16 18:41:59 -070048 ~Logger();
49
50 // Overrides the name in the log file header.
51 void set_name(std::string_view name) { name_ = name; }
Austin Schuhe309d2a2019-11-29 13:25:21 -080052
Brian Silverman1f345222020-09-24 21:14:48 -070053 // Sets the callback to run after each period of data is logged. Defaults to
54 // doing nothing.
55 //
56 // This callback may safely do things like call Rotate().
57 void set_on_logged_period(std::function<void()> on_logged_period) {
58 on_logged_period_ = std::move(on_logged_period);
59 }
60
61 // Sets the period between polling the data. Defaults to 100ms.
62 //
63 // Changing this while a set of files is being written may result in
64 // unreadable files.
65 void set_polling_period(std::chrono::nanoseconds polling_period) {
66 polling_period_ = polling_period;
67 }
68
Brian Silvermanae7c0332020-09-30 16:58:23 -070069 std::string_view log_start_uuid() const { return log_start_uuid_; }
Brian Silverman035e4182020-10-06 17:13:00 -070070 UUID logger_instance_uuid() const { return logger_instance_uuid_; }
Brian Silvermanae7c0332020-09-30 16:58:23 -070071
Austin Schuh2f8fd752020-09-01 22:38:28 -070072 // Rotates the log file(s), triggering new part files to be written for each
73 // log file.
74 void Rotate();
Austin Schuhfa895892020-01-07 20:07:41 -080075
Brian Silverman1f345222020-09-24 21:14:48 -070076 // Starts logging to files with the given naming scheme.
Brian Silvermanae7c0332020-09-30 16:58:23 -070077 //
78 // log_start_uuid may be used to tie this log event to other log events across
79 // multiple nodes. The default (empty string) indicates there isn't one
80 // available.
81 void StartLogging(std::unique_ptr<LogNamer> log_namer,
82 std::string_view log_start_uuid = "");
Brian Silverman1f345222020-09-24 21:14:48 -070083
84 // Stops logging. Ensures any messages through end_time make it into the log.
85 //
86 // If you want to stop ASAP, pass min_time to avoid reading any more messages.
87 //
88 // Returns the LogNamer in case the caller wants to do anything else with it
89 // before destroying it.
90 std::unique_ptr<LogNamer> StopLogging(
91 aos::monotonic_clock::time_point end_time);
92
93 // Returns whether a log is currently being written.
94 bool is_started() const { return static_cast<bool>(log_namer_); }
95
96 // Shortcut to call StartLogging with a LocalLogNamer when event processing
97 // starts.
98 void StartLoggingLocalNamerOnRun(std::string base_name) {
99 event_loop_->OnRun([this, base_name]() {
100 StartLogging(
101 std::make_unique<LocalLogNamer>(base_name, event_loop_->node()));
102 });
103 }
104
Austin Schuhe309d2a2019-11-29 13:25:21 -0800105 private:
Austin Schuhe309d2a2019-11-29 13:25:21 -0800106 // Structure to track both a fetcher, and if the data fetched has been
107 // written. We may want to delay writing data to disk so that we don't let
108 // data get too far out of order when written to disk so we can avoid making
109 // it too hard to sort when reading.
110 struct FetcherStruct {
111 std::unique_ptr<RawFetcher> fetcher;
112 bool written = false;
Austin Schuh15649d62019-12-28 16:36:38 -0800113
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700114 // Channel index to log to.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800115 int channel_index = -1;
Brian Silverman1f345222020-09-24 21:14:48 -0700116 const Channel *channel = nullptr;
117 const Node *timestamp_node = nullptr;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800118
119 LogType log_type = LogType::kLogMessage;
120
Brian Silverman1f345222020-09-24 21:14:48 -0700121 // We fill out the metadata at construction, but the actual writers have to
122 // be updated each time we start logging. To avoid duplicating the complex
123 // logic determining whether each writer should be initialized, we just
124 // stash the answer in separate member variables.
125 bool wants_writer = false;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800126 DetachedBufferWriter *writer = nullptr;
Brian Silverman1f345222020-09-24 21:14:48 -0700127 bool wants_timestamp_writer = false;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800128 DetachedBufferWriter *timestamp_writer = nullptr;
Brian Silverman1f345222020-09-24 21:14:48 -0700129 bool wants_contents_writer = false;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700130 DetachedBufferWriter *contents_writer = nullptr;
Brian Silverman1f345222020-09-24 21:14:48 -0700131
Austin Schuh2f8fd752020-09-01 22:38:28 -0700132 int node_index = 0;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800133 };
134
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700135 // Vector mapping from the channel index from the event loop to the logged
136 // channel index.
137 std::vector<int> event_loop_to_logged_channel_index_;
138
Austin Schuh2f8fd752020-09-01 22:38:28 -0700139 struct NodeState {
140 aos::monotonic_clock::time_point monotonic_start_time =
141 aos::monotonic_clock::min_time;
142 aos::realtime_clock::time_point realtime_start_time =
143 aos::realtime_clock::min_time;
144
145 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
146 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
147 };
Brian Silverman1f345222020-09-24 21:14:48 -0700148
149 void WriteHeader();
150 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
151 const Node *node);
152
153 bool MaybeUpdateTimestamp(
154 const Node *node, int node_index,
155 aos::monotonic_clock::time_point monotonic_start_time,
156 aos::realtime_clock::time_point realtime_start_time);
157
158 void DoLogData(const monotonic_clock::time_point end_time);
159
160 void WriteMissingTimestamps();
161
162 // Fetches from each channel until all the data is logged.
163 void LogUntil(monotonic_clock::time_point t);
164
165 // Sets the start time for a specific node.
166 void SetStartTime(size_t node_index,
167 aos::monotonic_clock::time_point monotonic_start_time,
168 aos::realtime_clock::time_point realtime_start_time);
169
Brian Silvermanae7c0332020-09-30 16:58:23 -0700170 EventLoop *const event_loop_;
Brian Silverman1f345222020-09-24 21:14:48 -0700171 // The configuration to place at the top of the log file.
172 const Configuration *const configuration_;
173
Brian Silvermanae7c0332020-09-30 16:58:23 -0700174 UUID log_event_uuid_ = UUID::Zero();
175 const UUID logger_instance_uuid_ = UUID::Random();
176 std::unique_ptr<LogNamer> log_namer_;
177 // Empty indicates there isn't one.
178 std::string log_start_uuid_;
179 const std::string boot_uuid_;
180
Brian Silverman1f345222020-09-24 21:14:48 -0700181 // Name to save in the log file. Defaults to hostname.
182 std::string name_;
183
184 std::function<void()> on_logged_period_ = []() {};
185
186 std::vector<FetcherStruct> fetchers_;
187 TimerHandler *timer_handler_;
188
189 // Period to poll the channels.
190 std::chrono::nanoseconds polling_period_ = std::chrono::milliseconds(100);
191
192 // Last time that data was written for all channels to disk.
193 monotonic_clock::time_point last_synchronized_time_;
194
195 // Max size that the header has consumed. This much extra data will be
196 // reserved in the builder to avoid reallocating.
197 size_t max_header_size_ = 0;
198
199 // Fetcher for all the statistics from all the nodes.
200 aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
201
Austin Schuh2f8fd752020-09-01 22:38:28 -0700202 std::vector<NodeState> node_state_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800203};
204
Austin Schuh11d43732020-09-21 17:28:30 -0700205// Datastructure to hold ordered parts.
206struct LogParts {
207 // Monotonic and realtime start times for this set of log files. For log
208 // files which started out unknown and then became known, this is the known
209 // start time.
210 aos::monotonic_clock::time_point monotonic_start_time;
211 aos::realtime_clock::time_point realtime_start_time;
212
213 // UUIDs if available.
Brian Silvermanae7c0332020-09-30 16:58:23 -0700214 std::string log_event_uuid;
Austin Schuh11d43732020-09-21 17:28:30 -0700215 std::string parts_uuid;
216
217 // The node this represents, or empty if we are in a single node world.
218 std::string node;
219
220 // Pre-sorted list of parts.
221 std::vector<std::string> parts;
222};
223
224// Datastructure to hold parts from the same run of the logger which have no
225// ordering constraints relative to each other.
226struct LogFile {
227 // The UUID tying them all together (if available)
Brian Silvermanae7c0332020-09-30 16:58:23 -0700228 std::string log_event_uuid;
Austin Schuh11d43732020-09-21 17:28:30 -0700229
230 // All the parts, unsorted.
231 std::vector<LogParts> parts;
232};
233
234std::ostream &operator<<(std::ostream &stream, const LogFile &file);
235std::ostream &operator<<(std::ostream &stream, const LogParts &parts);
236
Austin Schuh5212cad2020-09-09 23:12:09 -0700237// Takes a bunch of parts and sorts them based on part_uuid and part_index.
Austin Schuh11d43732020-09-21 17:28:30 -0700238std::vector<LogFile> SortParts(const std::vector<std::string> &parts);
239
240std::vector<std::vector<std::string>> ToLogReaderVector(
241 const std::vector<LogFile> &log_files);
Austin Schuh5212cad2020-09-09 23:12:09 -0700242
Austin Schuh6f3babe2020-01-26 20:34:50 -0800243// We end up with one of the following 3 log file types.
244//
245// Single node logged as the source node.
246// -> Replayed just on the source node.
247//
248// Forwarding timestamps only logged from the perspective of the destination
249// node.
250// -> Matched with data on source node and logged.
251//
252// Forwarding timestamps with data logged as the destination node.
253// -> Replayed just as the destination
254// -> Replayed as the source (Much harder, ordering is not defined)
255//
256// Duplicate data logged. -> CHECK that it matches and explode otherwise.
257//
258// This can be boiled down to a set of constraints and tools.
259//
260// 1) Forwarding timestamps and data need to be logged separately.
261// 2) Any forwarded data logged on the destination node needs to be logged
262// separately such that it can be sorted.
263//
264// 1) Log reader needs to be able to sort a list of log files.
265// 2) Log reader needs to be able to merge sorted lists of log files.
266// 3) Log reader needs to be able to match timestamps with messages.
267//
268// We also need to be able to generate multiple views of a log file depending on
269// the target.
270
Austin Schuhe309d2a2019-11-29 13:25:21 -0800271// Replays all the channels in the logfile to the event loop.
272class LogReader {
273 public:
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800274 // If you want to supply a new configuration that will be used for replay
275 // (e.g., to change message rates, or to populate an updated schema), then
276 // pass it in here. It must provide all the channels that the original logged
277 // config did.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800278 //
279 // Log filenames are in the following format:
280 //
281 // {
282 // {log1_part0, log1_part1, ...},
283 // {log2}
284 // }
285 // The inner vector is a list of log file chunks which form up a log file.
286 // The outer vector is a list of log files with subsets of the messages, or
287 // messages from different nodes.
288 //
289 // If the outer vector isn't provided, it is assumed to be of size 1.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800290 LogReader(std::string_view filename,
291 const Configuration *replay_configuration = nullptr);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800292 LogReader(const std::vector<std::string> &filenames,
293 const Configuration *replay_configuration = nullptr);
294 LogReader(const std::vector<std::vector<std::string>> &filenames,
Austin Schuhfa895892020-01-07 20:07:41 -0800295 const Configuration *replay_configuration = nullptr);
Austin Schuh11d43732020-09-21 17:28:30 -0700296 LogReader(const std::vector<LogFile> &log_files,
297 const Configuration *replay_configuration = nullptr);
James Kuszmaul7daef362019-12-31 18:28:17 -0800298 ~LogReader();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800299
Austin Schuh6331ef92020-01-07 18:28:09 -0800300 // Registers all the callbacks to send the log file data out on an event loop
301 // created in event_loop_factory. This also updates time to be at the start
302 // of the log file by running until the log file starts.
303 // Note: the configuration used in the factory should be configuration()
304 // below, but can be anything as long as the locations needed to send
305 // everything are available.
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800306 void Register(SimulatedEventLoopFactory *event_loop_factory);
Austin Schuh6331ef92020-01-07 18:28:09 -0800307 // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
308 // and then calls Register.
309 void Register();
310 // Registers callbacks for all the events after the log file starts. This is
311 // only useful when replaying live.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800312 void Register(EventLoop *event_loop);
Austin Schuh6331ef92020-01-07 18:28:09 -0800313
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800314 // Unregisters the senders. You only need to call this if you separately
315 // supplied an event loop or event loop factory and the lifetimes are such
316 // that they need to be explicitly destroyed before the LogReader destructor
317 // gets called.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800318 void Deregister();
319
Austin Schuh0c297012020-09-16 18:41:59 -0700320 // Returns the configuration being used for replay from the log file.
321 // Note that this may be different from the configuration actually used for
322 // handling events. You should generally only use this to create a
323 // SimulatedEventLoopFactory, and then get the configuration from there for
324 // everything else.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800325 const Configuration *logged_configuration() const;
Austin Schuh11d43732020-09-21 17:28:30 -0700326 // Returns the configuration being used for replay from the log file.
327 // Note that this may be different from the configuration actually used for
328 // handling events. You should generally only use this to create a
329 // SimulatedEventLoopFactory, and then get the configuration from there for
330 // everything else.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800331 // The pointer is invalidated whenever RemapLoggedChannel is called.
Austin Schuh15649d62019-12-28 16:36:38 -0800332 const Configuration *configuration() const;
333
Austin Schuh6f3babe2020-01-26 20:34:50 -0800334 // Returns the nodes that this log file was created on. This is a list of
335 // pointers to a node in the nodes() list inside configuration(). The
336 // pointers here are invalidated whenever RemapLoggedChannel is called.
337 std::vector<const Node *> Nodes() const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800338
339 // Returns the starting timestamp for the log file.
Austin Schuh11d43732020-09-21 17:28:30 -0700340 monotonic_clock::time_point monotonic_start_time(
341 const Node *node = nullptr) const;
342 realtime_clock::time_point realtime_start_time(
343 const Node *node = nullptr) const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800344
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800345 // Causes the logger to publish the provided channel on a different name so
346 // that replayed applications can publish on the proper channel name without
347 // interference. This operates on raw channel names, without any node or
348 // application specific mappings.
349 void RemapLoggedChannel(std::string_view name, std::string_view type,
350 std::string_view add_prefix = "/original");
351 template <typename T>
352 void RemapLoggedChannel(std::string_view name,
353 std::string_view add_prefix = "/original") {
354 RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
355 }
356
Austin Schuh01b4c352020-09-21 23:09:39 -0700357 // Remaps the provided channel, though this respects node mappings, and
358 // preserves them too. This makes it so if /aos -> /pi1/aos on one node,
359 // /original/aos -> /original/pi1/aos on the same node after renaming, just
360 // like you would hope.
361 //
362 // TODO(austin): If you have 2 nodes remapping something to the same channel,
363 // this doesn't handle that. No use cases exist yet for that, so it isn't
364 // being done yet.
365 void RemapLoggedChannel(std::string_view name, std::string_view type,
366 const Node *node,
367 std::string_view add_prefix = "/original");
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700368 template <typename T>
Austin Schuh01b4c352020-09-21 23:09:39 -0700369 void RemapLoggedChannel(std::string_view name, const Node *node,
370 std::string_view add_prefix = "/original") {
371 RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix);
372 }
373
374 template <typename T>
375 bool HasChannel(std::string_view name, const Node *node = nullptr) {
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700376 return configuration::GetChannel(log_file_header()->configuration(), name,
377 T::GetFullyQualifiedName(), "",
Austin Schuh01b4c352020-09-21 23:09:39 -0700378 node) != nullptr;
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700379 }
380
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800381 SimulatedEventLoopFactory *event_loop_factory() {
382 return event_loop_factory_;
383 }
384
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700385 const LogFileHeader *log_file_header() const {
386 return &log_file_header_.message();
387 }
388
Austin Schuh0c297012020-09-16 18:41:59 -0700389 std::string_view name() const {
390 return log_file_header()->name()->string_view();
391 }
392
Austin Schuhe309d2a2019-11-29 13:25:21 -0800393 private:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800394 const Channel *RemapChannel(const EventLoop *event_loop,
395 const Channel *channel);
396
Austin Schuhe309d2a2019-11-29 13:25:21 -0800397 // Queues at least max_out_of_order_duration_ messages into channels_.
398 void QueueMessages();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800399 // Handle constructing a configuration with all the additional remapped
400 // channels from calls to RemapLoggedChannel.
401 void MakeRemappedConfig();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800402
Austin Schuh2f8fd752020-09-01 22:38:28 -0700403 // Returns the number of nodes.
404 size_t nodes_count() const {
405 return !configuration::MultiNode(logged_configuration())
406 ? 1u
407 : logged_configuration()->nodes()->size();
408 }
409
Austin Schuh6f3babe2020-01-26 20:34:50 -0800410 const std::vector<std::vector<std::string>> filenames_;
411
412 // This is *a* log file header used to provide the logged config. The rest of
413 // the header is likely distracting.
414 FlatbufferVector<LogFileHeader> log_file_header_;
415
Austin Schuh2f8fd752020-09-01 22:38:28 -0700416 // Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
417 std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
418 Eigen::Matrix<double, Eigen::Dynamic, 1>>
419 SolveOffsets();
420
421 void LogFit(std::string_view prefix);
Austin Schuh8bd96322020-02-13 21:18:22 -0800422
Austin Schuh6f3babe2020-01-26 20:34:50 -0800423 // State per node.
Austin Schuh858c9f32020-08-31 16:56:12 -0700424 class State {
425 public:
426 State(std::unique_ptr<ChannelMerger> channel_merger);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800427
Austin Schuh858c9f32020-08-31 16:56:12 -0700428 // Returns the timestamps, channel_index, and message from a channel.
429 // update_time (will be) set to true when popping this message causes the
430 // filter to change the time offset estimation function.
431 std::tuple<TimestampMerger::DeliveryTimestamp, int,
432 FlatbufferVector<MessageHeader>>
433 PopOldest(bool *update_time);
434
435 // Returns the monotonic time of the oldest message.
436 monotonic_clock::time_point OldestMessageTime() const;
437
438 // Primes the queues inside State. Should be called before calling
439 // OldestMessageTime.
440 void SeedSortedMessages();
Austin Schuh8bd96322020-02-13 21:18:22 -0800441
Austin Schuh858c9f32020-08-31 16:56:12 -0700442 // Returns the starting time for this node.
443 monotonic_clock::time_point monotonic_start_time() const {
444 return channel_merger_->monotonic_start_time();
445 }
446 realtime_clock::time_point realtime_start_time() const {
447 return channel_merger_->realtime_start_time();
448 }
449
450 // Sets the node event loop factory for replaying into a
451 // SimulatedEventLoopFactory. Returns the EventLoop to use.
452 EventLoop *SetNodeEventLoopFactory(
453 NodeEventLoopFactory *node_event_loop_factory);
454
455 // Sets and gets the event loop to use.
456 void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
457 EventLoop *event_loop() { return event_loop_; }
458
Austin Schuh858c9f32020-08-31 16:56:12 -0700459 // Sets the current realtime offset from the monotonic clock for this node
460 // (if we are on a simulated event loop).
461 void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
462 realtime_clock::time_point realtime_time) {
463 if (node_event_loop_factory_ != nullptr) {
464 node_event_loop_factory_->SetRealtimeOffset(monotonic_time,
465 realtime_time);
466 }
467 }
468
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700469 // Returns the MessageHeader sender to log delivery timestamps to for the
470 // provided remote node.
471 aos::Sender<MessageHeader> *RemoteTimestampSender(
472 const Node *delivered_node);
473
Austin Schuh858c9f32020-08-31 16:56:12 -0700474 // Converts a timestamp from the monotonic clock on this node to the
475 // distributed clock.
476 distributed_clock::time_point ToDistributedClock(
477 monotonic_clock::time_point time) {
478 return node_event_loop_factory_->ToDistributedClock(time);
479 }
480
Austin Schuh2f8fd752020-09-01 22:38:28 -0700481 monotonic_clock::time_point FromDistributedClock(
482 distributed_clock::time_point time) {
483 return node_event_loop_factory_->FromDistributedClock(time);
484 }
485
Austin Schuh858c9f32020-08-31 16:56:12 -0700486 // Sets the offset (and slope) from the distributed clock.
487 void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
488 double distributed_slope) {
489 node_event_loop_factory_->SetDistributedOffset(distributed_offset,
490 distributed_slope);
491 }
492
493 // Returns the current time on the remote node which sends messages on
494 // channel_index.
495 monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700496 return channel_source_state_[channel_index]
497 ->node_event_loop_factory_->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -0700498 }
499
Austin Schuh2f8fd752020-09-01 22:38:28 -0700500 distributed_clock::time_point RemoteToDistributedClock(
501 size_t channel_index, monotonic_clock::time_point time) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700502 return channel_source_state_[channel_index]
503 ->node_event_loop_factory_->ToDistributedClock(time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700504 }
505
506 const Node *remote_node(size_t channel_index) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700507 return channel_source_state_[channel_index]
508 ->node_event_loop_factory_->node();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700509 }
510
511 monotonic_clock::time_point monotonic_now() {
512 return node_event_loop_factory_->monotonic_now();
513 }
514
Austin Schuh858c9f32020-08-31 16:56:12 -0700515 // Sets the node we will be merging as, and returns true if there is any
516 // data on it.
517 bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
518
519 // Sets the number of channels.
520 void SetChannelCount(size_t count);
521
522 // Sets the sender, filter, and target factory for a channel.
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700523 void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
524 std::unique_ptr<RawSender> sender,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700525 message_bridge::NoncausalOffsetEstimator *filter,
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700526 aos::Sender<MessageHeader> *remote_timestamp_sender,
527 State *source_state);
Austin Schuh858c9f32020-08-31 16:56:12 -0700528
529 // Returns if we have read all the messages from all the logs.
530 bool at_end() const { return channel_merger_->at_end(); }
531
532 // Unregisters everything so we can destory the event loop.
533 void Deregister();
534
535 // Sets the current TimerHandle for the replay callback.
536 void set_timer_handler(TimerHandler *timer_handler) {
537 timer_handler_ = timer_handler;
538 }
539
540 // Sets the next wakeup time on the replay callback.
541 void Setup(monotonic_clock::time_point next_time) {
542 timer_handler_->Setup(next_time);
543 }
544
545 // Sends a buffer on the provided channel index.
546 bool Send(size_t channel_index, const void *data, size_t size,
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700547 const TimestampMerger::DeliveryTimestamp &delivery_timestamp);
Austin Schuh858c9f32020-08-31 16:56:12 -0700548
549 // Returns a debug string for the channel merger.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700550 std::string DebugString() const {
551 std::stringstream messages;
552 size_t i = 0;
553 for (const auto &message : sorted_messages_) {
554 if (i < 7 || i + 7 > sorted_messages_.size()) {
555 messages << "sorted_messages[" << i
556 << "]: " << std::get<0>(message).monotonic_event_time << " "
557 << configuration::StrippedChannelToString(
558 event_loop_->configuration()->channels()->Get(
559 std::get<2>(message).message().channel_index()))
560 << "\n";
561 } else if (i == 7) {
562 messages << "...\n";
563 }
564 ++i;
565 }
566 return messages.str() + channel_merger_->DebugString();
567 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700568
569 private:
570 // Log file.
571 std::unique_ptr<ChannelMerger> channel_merger_;
572
573 std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700574 FlatbufferVector<MessageHeader>,
575 message_bridge::NoncausalOffsetEstimator *>>
Austin Schuh858c9f32020-08-31 16:56:12 -0700576 sorted_messages_;
577
578 // Senders.
579 std::vector<std::unique_ptr<RawSender>> channels_;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700580 std::vector<aos::Sender<MessageHeader> *> remote_timestamp_senders_;
581 // The mapping from logged channel index to sent channel index. Needed for
582 // sending out MessageHeaders.
583 std::vector<int> factory_channel_index_;
584
585 struct SentTimestamp {
586 monotonic_clock::time_point monotonic_event_time =
587 monotonic_clock::min_time;
588 realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
589 uint32_t queue_index = 0xffffffff;
590
591 // The queue index that this message *actually* was sent with.
592 uint32_t actual_queue_index = 0xffffffff;
593 };
594
595 // Stores all the timestamps that have been sent on this channel. This is
596 // only done for channels which are forwarded and on the node which
597 // initially sends the message.
598 //
599 // TODO(austin): This whole concept is a hack. We should be able to
600 // associate state with the message as it gets sorted and recover it.
601 std::vector<std::unique_ptr<std::vector<SentTimestamp>>> queue_index_map_;
Austin Schuh858c9f32020-08-31 16:56:12 -0700602
603 // Factory (if we are in sim) that this loop was created on.
604 NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
605 std::unique_ptr<EventLoop> event_loop_unique_ptr_;
606 // Event loop.
607 EventLoop *event_loop_ = nullptr;
608 // And timer used to send messages.
609 TimerHandler *timer_handler_;
610
Austin Schuh8bd96322020-02-13 21:18:22 -0800611 // Filters (or nullptr if it isn't a forwarded channel) for each channel.
612 // This corresponds to the object which is shared among all the channels
613 // going between 2 nodes. The second element in the tuple indicates if this
614 // is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700615 std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800616
617 // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
618 // channel) which correspond to the originating node.
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700619 std::vector<State *> channel_source_state_;
620
621 std::map<const Node *, aos::Sender<MessageHeader>>
622 remote_timestamp_senders_map_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800623 };
624
Austin Schuh8bd96322020-02-13 21:18:22 -0800625 // Node index -> State.
626 std::vector<std::unique_ptr<State>> states_;
627
628 // Creates the requested filter if it doesn't exist, regardless of whether
629 // these nodes can actually communicate directly. The second return value
630 // reports if this is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700631 message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
632 const Node *node_b);
Austin Schuh8bd96322020-02-13 21:18:22 -0800633
634 // FILE to write offsets to (if populated).
635 FILE *offset_fp_ = nullptr;
636 // Timestamp of the first piece of data used for the horizontal axis on the
637 // plot.
638 aos::realtime_clock::time_point first_time_;
639
640 // List of filters for a connection. The pointer to the first node will be
641 // less than the second node.
642 std::map<std::tuple<const Node *, const Node *>,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700643 std::tuple<message_bridge::NoncausalOffsetEstimator>>
Austin Schuh8bd96322020-02-13 21:18:22 -0800644 filters_;
645
646 // Returns the offset from the monotonic clock for a node to the distributed
Austin Schuh2f8fd752020-09-01 22:38:28 -0700647 // clock. monotonic = distributed * slope() + offset();
648 double slope(int node_index) const {
649 CHECK_LT(node_index, time_slope_matrix_.rows())
James Kuszmaul46d82582020-05-09 19:50:09 -0700650 << ": Got too high of a node index.";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700651 return time_slope_matrix_(node_index);
652 }
653 std::chrono::nanoseconds offset(int node_index) const {
654 CHECK_LT(node_index, time_offset_matrix_.rows())
655 << ": Got too high of a node index.";
656 return std::chrono::duration_cast<std::chrono::nanoseconds>(
657 std::chrono::duration<double>(time_offset_matrix_(node_index)));
Austin Schuh8bd96322020-02-13 21:18:22 -0800658 }
659
660 // Updates the offset matrix solution and sets the per-node distributed
661 // offsets in the factory.
662 void UpdateOffsets();
663
Austin Schuh2f8fd752020-09-01 22:38:28 -0700664 // We have 2 types of equations to do a least squares regression over to fully
665 // constrain our time function.
666 //
667 // One is simple. The distributed clock is the average of all the clocks.
Brian Silverman87ac0402020-09-17 14:47:01 -0700668 // (ta + tb + tc + td) / num_nodes = t_distributed
Austin Schuh2f8fd752020-09-01 22:38:28 -0700669 //
670 // The second is a bit more complicated. Our basic time conversion function
671 // is:
672 // tb = ta + (ta * slope + offset)
673 // We can rewrite this as follows
674 // tb - (1 + slope) * ta = offset
675 //
676 // From here, we have enough equations to solve for t{a,b,c,...} We want to
677 // take as an input the offsets and slope, and solve for the per-node times as
678 // a function of the distributed clock.
679 //
680 // We need to massage our equations to make this work. If we solve for the
681 // per-node times at two set distributed clock times, we will be able to
682 // recreate the linear function (we know it is linear). We can do a similar
683 // thing by breaking our equation up into:
Brian Silverman87ac0402020-09-17 14:47:01 -0700684 //
Austin Schuh2f8fd752020-09-01 22:38:28 -0700685 // [1/3 1/3 1/3 ] [ta] [t_distributed]
686 // [ 1 -1-m1 0 ] [tb] = [oab]
687 // [ 1 0 -1-m2 ] [tc] [oac]
688 //
689 // This solves to:
690 //
691 // [ta] [ a00 a01 a02] [t_distributed]
692 // [tb] = [ a10 a11 a12] * [oab]
693 // [tc] [ a20 a21 a22] [oac]
694 //
695 // and can be split into:
696 //
697 // [ta] [ a00 ] [a01 a02]
698 // [tb] = [ a10 ] * t_distributed + [a11 a12] * [oab]
699 // [tc] [ a20 ] [a21 a22] [oac]
700 //
701 // (map_matrix_ + slope_matrix_) * [ta; tb; tc] = [offset_matrix_];
702 // offset_matrix_ will be in nanoseconds.
703 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
704 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> slope_matrix_;
705 Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> offset_matrix_;
706 // Matrix tracking which offsets are valid.
707 Eigen::Matrix<bool, Eigen::Dynamic, 1> valid_matrix_;
708 // Matrix tracking the last valid matrix we used to determine connected nodes.
709 Eigen::Matrix<bool, Eigen::Dynamic, 1> last_valid_matrix_;
710 size_t cached_valid_node_count_ = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800711
Austin Schuh2f8fd752020-09-01 22:38:28 -0700712 // [ta; tb; tc] = time_slope_matrix_ * t + time_offset_matrix;
713 // t is in seconds.
714 Eigen::Matrix<double, Eigen::Dynamic, 1> time_slope_matrix_;
715 Eigen::Matrix<double, Eigen::Dynamic, 1> time_offset_matrix_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800716
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800717 std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
718 remapped_configuration_buffer_;
719
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800720 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
721 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800722
723 // Map of channel indices to new name. The channel index will be an index into
724 // logged_configuration(), and the string key will be the name of the channel
725 // to send on instead of the logged channel name.
726 std::map<size_t, std::string> remapped_channels_;
Austin Schuh01b4c352020-09-21 23:09:39 -0700727 std::vector<MapT> maps_;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800728
Austin Schuh6f3babe2020-01-26 20:34:50 -0800729 // Number of nodes which still have data to send. This is used to figure out
730 // when to exit.
731 size_t live_nodes_ = 0;
732
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800733 const Configuration *remapped_configuration_ = nullptr;
734 const Configuration *replay_configuration_ = nullptr;
Austin Schuhcde938c2020-02-02 17:30:07 -0800735
736 // If true, the replay timer will ignore any missing data. This is used
737 // during startup when we are bootstrapping everything and trying to get to
738 // the start of all the log files.
739 bool ignore_missing_data_ = false;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800740};
741
742} // namespace logger
743} // namespace aos
744
745#endif // AOS_EVENTS_LOGGER_H_