blob: f0f0a69611c154563fefdc9eed982b582a7e9329 [file] [log] [blame]
Austin Schuhe309d2a2019-11-29 13:25:21 -08001#ifndef AOS_EVENTS_LOGGER_H_
2#define AOS_EVENTS_LOGGER_H_
3
Austin Schuh8bd96322020-02-13 21:18:22 -08004#include <chrono>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <deque>
Austin Schuh05b70472020-01-01 17:11:17 -08006#include <string_view>
Austin Schuh2f8fd752020-09-01 22:38:28 -07007#include <tuple>
Austin Schuh6f3babe2020-01-26 20:34:50 -08008#include <vector>
Austin Schuhe309d2a2019-11-29 13:25:21 -08009
Austin Schuh8bd96322020-02-13 21:18:22 -080010#include "Eigen/Dense"
11#include "absl/strings/str_cat.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "absl/types/span.h"
13#include "aos/events/event_loop.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070014#include "aos/events/logging/eigen_mpq.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070015#include "aos/events/logging/log_namer.h"
Austin Schuhf6f9bf32020-10-11 14:37:43 -070016#include "aos/events/logging/logfile_sorting.h"
Austin Schuha36c8902019-12-30 18:07:15 -080017#include "aos/events/logging/logfile_utils.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080018#include "aos/events/logging/logger_generated.h"
Austin Schuh64fab802020-09-09 22:47:47 -070019#include "aos/events/logging/uuid.h"
Austin Schuh92547522019-12-28 14:33:43 -080020#include "aos/events/simulated_event_loop.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070021#include "aos/network/message_bridge_server_generated.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080022#include "aos/network/timestamp_filter.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080023#include "aos/time/time.h"
24#include "flatbuffers/flatbuffers.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070025#include "third_party/gmp/gmpxx.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080026
27namespace aos {
28namespace logger {
29
Austin Schuhe309d2a2019-11-29 13:25:21 -080030// Logs all channels available in the event loop to disk every 100 ms.
31// Start by logging one message per channel to capture any state and
32// configuration that is sent rately on a channel and would affect execution.
33class Logger {
34 public:
Austin Schuh0c297012020-09-16 18:41:59 -070035 // Constructs a logger.
Austin Schuh0c297012020-09-16 18:41:59 -070036 // event_loop: The event loop used to read the messages.
Austin Schuh0c297012020-09-16 18:41:59 -070037 // configuration: When provided, this is the configuration to log, and the
38 // configuration to use for the channel list to log. If not provided,
39 // this becomes the configuration from the event loop.
Brian Silverman1f345222020-09-24 21:14:48 -070040 // should_log: When provided, a filter for channels to log. If not provided,
41 // all available channels are logged.
42 Logger(EventLoop *event_loop)
43 : Logger(event_loop, event_loop->configuration()) {}
44 Logger(EventLoop *event_loop, const Configuration *configuration)
45 : Logger(event_loop, configuration,
46 [](const Channel *) { return true; }) {}
47 Logger(EventLoop *event_loop, const Configuration *configuration,
48 std::function<bool(const Channel *)> should_log);
Austin Schuh0c297012020-09-16 18:41:59 -070049 ~Logger();
50
51 // Overrides the name in the log file header.
52 void set_name(std::string_view name) { name_ = name; }
Austin Schuhe309d2a2019-11-29 13:25:21 -080053
Brian Silverman1f345222020-09-24 21:14:48 -070054 // Sets the callback to run after each period of data is logged. Defaults to
55 // doing nothing.
56 //
57 // This callback may safely do things like call Rotate().
58 void set_on_logged_period(std::function<void()> on_logged_period) {
59 on_logged_period_ = std::move(on_logged_period);
60 }
61
62 // Sets the period between polling the data. Defaults to 100ms.
63 //
64 // Changing this while a set of files is being written may result in
65 // unreadable files.
66 void set_polling_period(std::chrono::nanoseconds polling_period) {
67 polling_period_ = polling_period;
68 }
69
Brian Silvermanae7c0332020-09-30 16:58:23 -070070 std::string_view log_start_uuid() const { return log_start_uuid_; }
Brian Silverman035e4182020-10-06 17:13:00 -070071 UUID logger_instance_uuid() const { return logger_instance_uuid_; }
Brian Silvermanae7c0332020-09-30 16:58:23 -070072
Brian Silvermancb805822020-10-06 17:43:35 -070073 // The maximum time for a single fetch which returned a message, or 0 if none
74 // of those have happened.
75 std::chrono::nanoseconds max_message_fetch_time() const {
76 return max_message_fetch_time_;
77 }
78 // The channel for that longest fetch which returned a message, or -1 if none
79 // of those have happened.
80 int max_message_fetch_time_channel() const {
81 return max_message_fetch_time_channel_;
82 }
83 // The size of the message returned by that longest fetch, or -1 if none of
84 // those have happened.
85 int max_message_fetch_time_size() const {
86 return max_message_fetch_time_size_;
87 }
88 // The total time spent fetching messages.
89 std::chrono::nanoseconds total_message_fetch_time() const {
90 return total_message_fetch_time_;
91 }
92 // The total number of fetch calls which returned messages.
93 int total_message_fetch_count() const { return total_message_fetch_count_; }
94 // The total number of bytes fetched.
95 int64_t total_message_fetch_bytes() const {
96 return total_message_fetch_bytes_;
97 }
98
99 // The total time spent in fetches which did not return a message.
100 std::chrono::nanoseconds total_nop_fetch_time() const {
101 return total_nop_fetch_time_;
102 }
103 // The total number of fetches which did not return a message.
104 int total_nop_fetch_count() const { return total_nop_fetch_count_; }
105
106 // The maximum time for a single copy, or 0 if none of those have happened.
107 std::chrono::nanoseconds max_copy_time() const { return max_copy_time_; }
108 // The channel for that longest copy, or -1 if none of those have happened.
109 int max_copy_time_channel() const { return max_copy_time_channel_; }
110 // The size of the message for that longest copy, or -1 if none of those have
111 // happened.
112 int max_copy_time_size() const { return max_copy_time_size_; }
113 // The total time spent copying messages.
114 std::chrono::nanoseconds total_copy_time() const { return total_copy_time_; }
115 // The total number of messages copied.
116 int total_copy_count() const { return total_copy_count_; }
117 // The total number of bytes copied.
118 int64_t total_copy_bytes() const { return total_copy_bytes_; }
119
120 void ResetStatisics();
121
Austin Schuh2f8fd752020-09-01 22:38:28 -0700122 // Rotates the log file(s), triggering new part files to be written for each
123 // log file.
124 void Rotate();
Austin Schuhfa895892020-01-07 20:07:41 -0800125
Brian Silverman1f345222020-09-24 21:14:48 -0700126 // Starts logging to files with the given naming scheme.
Brian Silvermanae7c0332020-09-30 16:58:23 -0700127 //
128 // log_start_uuid may be used to tie this log event to other log events across
129 // multiple nodes. The default (empty string) indicates there isn't one
130 // available.
131 void StartLogging(std::unique_ptr<LogNamer> log_namer,
132 std::string_view log_start_uuid = "");
Brian Silverman1f345222020-09-24 21:14:48 -0700133
134 // Stops logging. Ensures any messages through end_time make it into the log.
135 //
136 // If you want to stop ASAP, pass min_time to avoid reading any more messages.
137 //
138 // Returns the LogNamer in case the caller wants to do anything else with it
139 // before destroying it.
140 std::unique_ptr<LogNamer> StopLogging(
141 aos::monotonic_clock::time_point end_time);
142
143 // Returns whether a log is currently being written.
144 bool is_started() const { return static_cast<bool>(log_namer_); }
145
146 // Shortcut to call StartLogging with a LocalLogNamer when event processing
147 // starts.
148 void StartLoggingLocalNamerOnRun(std::string base_name) {
149 event_loop_->OnRun([this, base_name]() {
150 StartLogging(
151 std::make_unique<LocalLogNamer>(base_name, event_loop_->node()));
152 });
153 }
154
Austin Schuhe309d2a2019-11-29 13:25:21 -0800155 private:
Austin Schuhe309d2a2019-11-29 13:25:21 -0800156 // Structure to track both a fetcher, and if the data fetched has been
157 // written. We may want to delay writing data to disk so that we don't let
158 // data get too far out of order when written to disk so we can avoid making
159 // it too hard to sort when reading.
160 struct FetcherStruct {
161 std::unique_ptr<RawFetcher> fetcher;
162 bool written = false;
Austin Schuh15649d62019-12-28 16:36:38 -0800163
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700164 // Channel index to log to.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800165 int channel_index = -1;
Brian Silverman1f345222020-09-24 21:14:48 -0700166 const Channel *channel = nullptr;
167 const Node *timestamp_node = nullptr;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800168
169 LogType log_type = LogType::kLogMessage;
170
Brian Silverman1f345222020-09-24 21:14:48 -0700171 // We fill out the metadata at construction, but the actual writers have to
172 // be updated each time we start logging. To avoid duplicating the complex
173 // logic determining whether each writer should be initialized, we just
174 // stash the answer in separate member variables.
175 bool wants_writer = false;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800176 DetachedBufferWriter *writer = nullptr;
Brian Silverman1f345222020-09-24 21:14:48 -0700177 bool wants_timestamp_writer = false;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800178 DetachedBufferWriter *timestamp_writer = nullptr;
Brian Silverman1f345222020-09-24 21:14:48 -0700179 bool wants_contents_writer = false;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700180 DetachedBufferWriter *contents_writer = nullptr;
Brian Silverman1f345222020-09-24 21:14:48 -0700181
Austin Schuh2f8fd752020-09-01 22:38:28 -0700182 int node_index = 0;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800183 };
184
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700185 // Vector mapping from the channel index from the event loop to the logged
186 // channel index.
187 std::vector<int> event_loop_to_logged_channel_index_;
188
Austin Schuh2f8fd752020-09-01 22:38:28 -0700189 struct NodeState {
190 aos::monotonic_clock::time_point monotonic_start_time =
191 aos::monotonic_clock::min_time;
192 aos::realtime_clock::time_point realtime_start_time =
193 aos::realtime_clock::min_time;
194
195 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
196 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
197 };
Brian Silverman1f345222020-09-24 21:14:48 -0700198
199 void WriteHeader();
200 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
201 const Node *node);
202
203 bool MaybeUpdateTimestamp(
204 const Node *node, int node_index,
205 aos::monotonic_clock::time_point monotonic_start_time,
206 aos::realtime_clock::time_point realtime_start_time);
207
208 void DoLogData(const monotonic_clock::time_point end_time);
209
210 void WriteMissingTimestamps();
211
212 // Fetches from each channel until all the data is logged.
213 void LogUntil(monotonic_clock::time_point t);
214
Brian Silvermancb805822020-10-06 17:43:35 -0700215 void RecordFetchResult(aos::monotonic_clock::time_point start,
216 aos::monotonic_clock::time_point end, bool got_new,
217 FetcherStruct *fetcher);
218
219 void RecordCreateMessageTime(aos::monotonic_clock::time_point start,
220 aos::monotonic_clock::time_point end,
221 FetcherStruct *fetcher);
222
Brian Silverman1f345222020-09-24 21:14:48 -0700223 // Sets the start time for a specific node.
224 void SetStartTime(size_t node_index,
225 aos::monotonic_clock::time_point monotonic_start_time,
226 aos::realtime_clock::time_point realtime_start_time);
227
Brian Silvermanae7c0332020-09-30 16:58:23 -0700228 EventLoop *const event_loop_;
Brian Silverman1f345222020-09-24 21:14:48 -0700229 // The configuration to place at the top of the log file.
230 const Configuration *const configuration_;
231
Brian Silvermanae7c0332020-09-30 16:58:23 -0700232 UUID log_event_uuid_ = UUID::Zero();
233 const UUID logger_instance_uuid_ = UUID::Random();
234 std::unique_ptr<LogNamer> log_namer_;
235 // Empty indicates there isn't one.
236 std::string log_start_uuid_;
237 const std::string boot_uuid_;
238
Brian Silverman1f345222020-09-24 21:14:48 -0700239 // Name to save in the log file. Defaults to hostname.
240 std::string name_;
241
242 std::function<void()> on_logged_period_ = []() {};
243
Brian Silvermancb805822020-10-06 17:43:35 -0700244 std::chrono::nanoseconds max_message_fetch_time_ =
245 std::chrono::nanoseconds::zero();
246 int max_message_fetch_time_channel_ = -1;
247 int max_message_fetch_time_size_ = -1;
248 std::chrono::nanoseconds total_message_fetch_time_ =
249 std::chrono::nanoseconds::zero();
250 int total_message_fetch_count_ = 0;
251 int64_t total_message_fetch_bytes_ = 0;
252
253 std::chrono::nanoseconds total_nop_fetch_time_ =
254 std::chrono::nanoseconds::zero();
255 int total_nop_fetch_count_ = 0;
256
257 std::chrono::nanoseconds max_copy_time_ = std::chrono::nanoseconds::zero();
258 int max_copy_time_channel_ = -1;
259 int max_copy_time_size_ = -1;
260 std::chrono::nanoseconds total_copy_time_ = std::chrono::nanoseconds::zero();
261 int total_copy_count_ = 0;
262 int64_t total_copy_bytes_ = 0;
263
Brian Silverman1f345222020-09-24 21:14:48 -0700264 std::vector<FetcherStruct> fetchers_;
265 TimerHandler *timer_handler_;
266
267 // Period to poll the channels.
268 std::chrono::nanoseconds polling_period_ = std::chrono::milliseconds(100);
269
270 // Last time that data was written for all channels to disk.
271 monotonic_clock::time_point last_synchronized_time_;
272
273 // Max size that the header has consumed. This much extra data will be
274 // reserved in the builder to avoid reallocating.
275 size_t max_header_size_ = 0;
276
277 // Fetcher for all the statistics from all the nodes.
278 aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
279
Austin Schuh2f8fd752020-09-01 22:38:28 -0700280 std::vector<NodeState> node_state_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800281};
282
Austin Schuh11d43732020-09-21 17:28:30 -0700283std::vector<std::vector<std::string>> ToLogReaderVector(
284 const std::vector<LogFile> &log_files);
Austin Schuh5212cad2020-09-09 23:12:09 -0700285
Austin Schuh6f3babe2020-01-26 20:34:50 -0800286// We end up with one of the following 3 log file types.
287//
288// Single node logged as the source node.
289// -> Replayed just on the source node.
290//
291// Forwarding timestamps only logged from the perspective of the destination
292// node.
293// -> Matched with data on source node and logged.
294//
295// Forwarding timestamps with data logged as the destination node.
296// -> Replayed just as the destination
297// -> Replayed as the source (Much harder, ordering is not defined)
298//
299// Duplicate data logged. -> CHECK that it matches and explode otherwise.
300//
301// This can be boiled down to a set of constraints and tools.
302//
303// 1) Forwarding timestamps and data need to be logged separately.
304// 2) Any forwarded data logged on the destination node needs to be logged
305// separately such that it can be sorted.
306//
307// 1) Log reader needs to be able to sort a list of log files.
308// 2) Log reader needs to be able to merge sorted lists of log files.
309// 3) Log reader needs to be able to match timestamps with messages.
310//
311// We also need to be able to generate multiple views of a log file depending on
312// the target.
313
Austin Schuhe309d2a2019-11-29 13:25:21 -0800314// Replays all the channels in the logfile to the event loop.
315class LogReader {
316 public:
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800317 // If you want to supply a new configuration that will be used for replay
318 // (e.g., to change message rates, or to populate an updated schema), then
319 // pass it in here. It must provide all the channels that the original logged
320 // config did.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800321 //
Austin Schuh287d43d2020-12-04 20:19:33 -0800322 // The single file constructor calls SortParts internally.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800323 LogReader(std::string_view filename,
324 const Configuration *replay_configuration = nullptr);
Austin Schuh287d43d2020-12-04 20:19:33 -0800325 LogReader(std::vector<LogFile> log_files,
Austin Schuh11d43732020-09-21 17:28:30 -0700326 const Configuration *replay_configuration = nullptr);
James Kuszmaul7daef362019-12-31 18:28:17 -0800327 ~LogReader();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800328
Austin Schuh6331ef92020-01-07 18:28:09 -0800329 // Registers all the callbacks to send the log file data out on an event loop
330 // created in event_loop_factory. This also updates time to be at the start
331 // of the log file by running until the log file starts.
332 // Note: the configuration used in the factory should be configuration()
333 // below, but can be anything as long as the locations needed to send
334 // everything are available.
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800335 void Register(SimulatedEventLoopFactory *event_loop_factory);
Austin Schuh6331ef92020-01-07 18:28:09 -0800336 // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
337 // and then calls Register.
338 void Register();
339 // Registers callbacks for all the events after the log file starts. This is
340 // only useful when replaying live.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800341 void Register(EventLoop *event_loop);
Austin Schuh6331ef92020-01-07 18:28:09 -0800342
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800343 // Unregisters the senders. You only need to call this if you separately
344 // supplied an event loop or event loop factory and the lifetimes are such
345 // that they need to be explicitly destroyed before the LogReader destructor
346 // gets called.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800347 void Deregister();
348
Austin Schuh0c297012020-09-16 18:41:59 -0700349 // Returns the configuration being used for replay from the log file.
350 // Note that this may be different from the configuration actually used for
351 // handling events. You should generally only use this to create a
352 // SimulatedEventLoopFactory, and then get the configuration from there for
353 // everything else.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800354 const Configuration *logged_configuration() const;
Austin Schuh11d43732020-09-21 17:28:30 -0700355 // Returns the configuration being used for replay from the log file.
356 // Note that this may be different from the configuration actually used for
357 // handling events. You should generally only use this to create a
358 // SimulatedEventLoopFactory, and then get the configuration from there for
359 // everything else.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800360 // The pointer is invalidated whenever RemapLoggedChannel is called.
Austin Schuh15649d62019-12-28 16:36:38 -0800361 const Configuration *configuration() const;
362
Austin Schuh6f3babe2020-01-26 20:34:50 -0800363 // Returns the nodes that this log file was created on. This is a list of
364 // pointers to a node in the nodes() list inside configuration(). The
365 // pointers here are invalidated whenever RemapLoggedChannel is called.
366 std::vector<const Node *> Nodes() const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800367
368 // Returns the starting timestamp for the log file.
Austin Schuh11d43732020-09-21 17:28:30 -0700369 monotonic_clock::time_point monotonic_start_time(
370 const Node *node = nullptr) const;
371 realtime_clock::time_point realtime_start_time(
372 const Node *node = nullptr) const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800373
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800374 // Causes the logger to publish the provided channel on a different name so
375 // that replayed applications can publish on the proper channel name without
376 // interference. This operates on raw channel names, without any node or
377 // application specific mappings.
378 void RemapLoggedChannel(std::string_view name, std::string_view type,
379 std::string_view add_prefix = "/original");
380 template <typename T>
381 void RemapLoggedChannel(std::string_view name,
382 std::string_view add_prefix = "/original") {
383 RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
384 }
385
Austin Schuh01b4c352020-09-21 23:09:39 -0700386 // Remaps the provided channel, though this respects node mappings, and
387 // preserves them too. This makes it so if /aos -> /pi1/aos on one node,
388 // /original/aos -> /original/pi1/aos on the same node after renaming, just
389 // like you would hope.
390 //
391 // TODO(austin): If you have 2 nodes remapping something to the same channel,
392 // this doesn't handle that. No use cases exist yet for that, so it isn't
393 // being done yet.
394 void RemapLoggedChannel(std::string_view name, std::string_view type,
395 const Node *node,
396 std::string_view add_prefix = "/original");
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700397 template <typename T>
Austin Schuh01b4c352020-09-21 23:09:39 -0700398 void RemapLoggedChannel(std::string_view name, const Node *node,
399 std::string_view add_prefix = "/original") {
400 RemapLoggedChannel(name, T::GetFullyQualifiedName(), node, add_prefix);
401 }
402
403 template <typename T>
404 bool HasChannel(std::string_view name, const Node *node = nullptr) {
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700405 return configuration::GetChannel(log_file_header()->configuration(), name,
406 T::GetFullyQualifiedName(), "",
Austin Schuh01b4c352020-09-21 23:09:39 -0700407 node) != nullptr;
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700408 }
409
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800410 SimulatedEventLoopFactory *event_loop_factory() {
411 return event_loop_factory_;
412 }
413
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700414 const LogFileHeader *log_file_header() const {
415 return &log_file_header_.message();
416 }
417
Austin Schuh0c297012020-09-16 18:41:59 -0700418 std::string_view name() const {
419 return log_file_header()->name()->string_view();
420 }
421
Austin Schuhe309d2a2019-11-29 13:25:21 -0800422 private:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800423 const Channel *RemapChannel(const EventLoop *event_loop,
424 const Channel *channel);
425
Austin Schuhe309d2a2019-11-29 13:25:21 -0800426 // Queues at least max_out_of_order_duration_ messages into channels_.
427 void QueueMessages();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800428 // Handle constructing a configuration with all the additional remapped
429 // channels from calls to RemapLoggedChannel.
430 void MakeRemappedConfig();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800431
Austin Schuh2f8fd752020-09-01 22:38:28 -0700432 // Returns the number of nodes.
433 size_t nodes_count() const {
434 return !configuration::MultiNode(logged_configuration())
435 ? 1u
436 : logged_configuration()->nodes()->size();
437 }
438
Austin Schuh287d43d2020-12-04 20:19:33 -0800439 const std::vector<LogFile> log_files_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800440
441 // This is *a* log file header used to provide the logged config. The rest of
442 // the header is likely distracting.
Austin Schuhadd6eb32020-11-09 21:24:26 -0800443 SizePrefixedFlatbufferVector<LogFileHeader> log_file_header_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800444
Austin Schuh2f8fd752020-09-01 22:38:28 -0700445 // Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
446 std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
447 Eigen::Matrix<double, Eigen::Dynamic, 1>>
448 SolveOffsets();
449
450 void LogFit(std::string_view prefix);
Austin Schuh8bd96322020-02-13 21:18:22 -0800451
Austin Schuh6f3babe2020-01-26 20:34:50 -0800452 // State per node.
Austin Schuh858c9f32020-08-31 16:56:12 -0700453 class State {
454 public:
Austin Schuh287d43d2020-12-04 20:19:33 -0800455 State(std::unique_ptr<TimestampMapper> timestamp_mapper);
456
457 // Connects up the timestamp mappers.
458 void AddPeer(State *peer);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800459
Austin Schuh858c9f32020-08-31 16:56:12 -0700460 // Returns the timestamps, channel_index, and message from a channel.
461 // update_time (will be) set to true when popping this message causes the
462 // filter to change the time offset estimation function.
Austin Schuh287d43d2020-12-04 20:19:33 -0800463 TimestampedMessage PopOldest(bool *update_time);
Austin Schuh858c9f32020-08-31 16:56:12 -0700464
465 // Returns the monotonic time of the oldest message.
466 monotonic_clock::time_point OldestMessageTime() const;
467
468 // Primes the queues inside State. Should be called before calling
469 // OldestMessageTime.
470 void SeedSortedMessages();
Austin Schuh8bd96322020-02-13 21:18:22 -0800471
Austin Schuh858c9f32020-08-31 16:56:12 -0700472 // Returns the starting time for this node.
473 monotonic_clock::time_point monotonic_start_time() const {
Austin Schuh287d43d2020-12-04 20:19:33 -0800474 return timestamp_mapper_ ? timestamp_mapper_->monotonic_start_time()
475 : monotonic_clock::min_time;
Austin Schuh858c9f32020-08-31 16:56:12 -0700476 }
477 realtime_clock::time_point realtime_start_time() const {
Austin Schuh287d43d2020-12-04 20:19:33 -0800478 return timestamp_mapper_ ? timestamp_mapper_->realtime_start_time()
479 : realtime_clock::min_time;
Austin Schuh858c9f32020-08-31 16:56:12 -0700480 }
481
482 // Sets the node event loop factory for replaying into a
483 // SimulatedEventLoopFactory. Returns the EventLoop to use.
484 EventLoop *SetNodeEventLoopFactory(
485 NodeEventLoopFactory *node_event_loop_factory);
486
487 // Sets and gets the event loop to use.
488 void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
489 EventLoop *event_loop() { return event_loop_; }
490
Austin Schuh858c9f32020-08-31 16:56:12 -0700491 // Sets the current realtime offset from the monotonic clock for this node
492 // (if we are on a simulated event loop).
493 void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
494 realtime_clock::time_point realtime_time) {
495 if (node_event_loop_factory_ != nullptr) {
496 node_event_loop_factory_->SetRealtimeOffset(monotonic_time,
497 realtime_time);
498 }
499 }
500
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700501 // Returns the MessageHeader sender to log delivery timestamps to for the
502 // provided remote node.
503 aos::Sender<MessageHeader> *RemoteTimestampSender(
504 const Node *delivered_node);
505
Austin Schuh858c9f32020-08-31 16:56:12 -0700506 // Converts a timestamp from the monotonic clock on this node to the
507 // distributed clock.
508 distributed_clock::time_point ToDistributedClock(
509 monotonic_clock::time_point time) {
510 return node_event_loop_factory_->ToDistributedClock(time);
511 }
512
Austin Schuh2f8fd752020-09-01 22:38:28 -0700513 monotonic_clock::time_point FromDistributedClock(
514 distributed_clock::time_point time) {
515 return node_event_loop_factory_->FromDistributedClock(time);
516 }
517
Austin Schuh858c9f32020-08-31 16:56:12 -0700518 // Sets the offset (and slope) from the distributed clock.
519 void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
520 double distributed_slope) {
521 node_event_loop_factory_->SetDistributedOffset(distributed_offset,
522 distributed_slope);
523 }
524
525 // Returns the current time on the remote node which sends messages on
526 // channel_index.
527 monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700528 return channel_source_state_[channel_index]
529 ->node_event_loop_factory_->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -0700530 }
531
Austin Schuh2f8fd752020-09-01 22:38:28 -0700532 distributed_clock::time_point RemoteToDistributedClock(
533 size_t channel_index, monotonic_clock::time_point time) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700534 return channel_source_state_[channel_index]
535 ->node_event_loop_factory_->ToDistributedClock(time);
Austin Schuh2f8fd752020-09-01 22:38:28 -0700536 }
537
538 const Node *remote_node(size_t channel_index) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700539 return channel_source_state_[channel_index]
540 ->node_event_loop_factory_->node();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700541 }
542
543 monotonic_clock::time_point monotonic_now() {
544 return node_event_loop_factory_->monotonic_now();
545 }
546
Austin Schuh858c9f32020-08-31 16:56:12 -0700547 // Sets the number of channels.
548 void SetChannelCount(size_t count);
549
550 // Sets the sender, filter, and target factory for a channel.
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700551 void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
552 std::unique_ptr<RawSender> sender,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700553 message_bridge::NoncausalOffsetEstimator *filter,
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700554 aos::Sender<MessageHeader> *remote_timestamp_sender,
555 State *source_state);
Austin Schuh858c9f32020-08-31 16:56:12 -0700556
557 // Returns if we have read all the messages from all the logs.
Austin Schuh287d43d2020-12-04 20:19:33 -0800558 bool at_end() const {
559 return timestamp_mapper_ ? timestamp_mapper_->Front() == nullptr : true;
560 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700561
562 // Unregisters everything so we can destory the event loop.
563 void Deregister();
564
565 // Sets the current TimerHandle for the replay callback.
566 void set_timer_handler(TimerHandler *timer_handler) {
567 timer_handler_ = timer_handler;
568 }
569
570 // Sets the next wakeup time on the replay callback.
571 void Setup(monotonic_clock::time_point next_time) {
572 timer_handler_->Setup(next_time);
573 }
574
575 // Sends a buffer on the provided channel index.
Austin Schuh287d43d2020-12-04 20:19:33 -0800576 bool Send(const TimestampedMessage &timestamped_message);
Austin Schuh858c9f32020-08-31 16:56:12 -0700577
578 // Returns a debug string for the channel merger.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700579 std::string DebugString() const {
580 std::stringstream messages;
581 size_t i = 0;
582 for (const auto &message : sorted_messages_) {
583 if (i < 7 || i + 7 > sorted_messages_.size()) {
584 messages << "sorted_messages[" << i
585 << "]: " << std::get<0>(message).monotonic_event_time << " "
586 << configuration::StrippedChannelToString(
587 event_loop_->configuration()->channels()->Get(
Austin Schuh287d43d2020-12-04 20:19:33 -0800588 std::get<0>(message).channel_index))
Austin Schuh2f8fd752020-09-01 22:38:28 -0700589 << "\n";
590 } else if (i == 7) {
591 messages << "...\n";
592 }
593 ++i;
594 }
Austin Schuh287d43d2020-12-04 20:19:33 -0800595 if (!timestamp_mapper_) {
596 return messages.str();
597 }
598 return messages.str() + timestamp_mapper_->DebugString();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700599 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700600
601 private:
602 // Log file.
Austin Schuh287d43d2020-12-04 20:19:33 -0800603 std::unique_ptr<TimestampMapper> timestamp_mapper_;
Austin Schuh858c9f32020-08-31 16:56:12 -0700604
Austin Schuh287d43d2020-12-04 20:19:33 -0800605 std::deque<std::tuple<TimestampedMessage,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700606 message_bridge::NoncausalOffsetEstimator *>>
Austin Schuh858c9f32020-08-31 16:56:12 -0700607 sorted_messages_;
608
609 // Senders.
610 std::vector<std::unique_ptr<RawSender>> channels_;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700611 std::vector<aos::Sender<MessageHeader> *> remote_timestamp_senders_;
612 // The mapping from logged channel index to sent channel index. Needed for
613 // sending out MessageHeaders.
614 std::vector<int> factory_channel_index_;
615
616 struct SentTimestamp {
617 monotonic_clock::time_point monotonic_event_time =
618 monotonic_clock::min_time;
619 realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
620 uint32_t queue_index = 0xffffffff;
621
622 // The queue index that this message *actually* was sent with.
623 uint32_t actual_queue_index = 0xffffffff;
624 };
625
626 // Stores all the timestamps that have been sent on this channel. This is
627 // only done for channels which are forwarded and on the node which
628 // initially sends the message.
629 //
630 // TODO(austin): This whole concept is a hack. We should be able to
631 // associate state with the message as it gets sorted and recover it.
632 std::vector<std::unique_ptr<std::vector<SentTimestamp>>> queue_index_map_;
Austin Schuh858c9f32020-08-31 16:56:12 -0700633
634 // Factory (if we are in sim) that this loop was created on.
635 NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
636 std::unique_ptr<EventLoop> event_loop_unique_ptr_;
637 // Event loop.
638 EventLoop *event_loop_ = nullptr;
639 // And timer used to send messages.
640 TimerHandler *timer_handler_;
641
Austin Schuh8bd96322020-02-13 21:18:22 -0800642 // Filters (or nullptr if it isn't a forwarded channel) for each channel.
643 // This corresponds to the object which is shared among all the channels
644 // going between 2 nodes. The second element in the tuple indicates if this
645 // is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700646 std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800647
648 // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
649 // channel) which correspond to the originating node.
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700650 std::vector<State *> channel_source_state_;
651
652 std::map<const Node *, aos::Sender<MessageHeader>>
653 remote_timestamp_senders_map_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800654 };
655
Austin Schuh8bd96322020-02-13 21:18:22 -0800656 // Node index -> State.
657 std::vector<std::unique_ptr<State>> states_;
658
659 // Creates the requested filter if it doesn't exist, regardless of whether
660 // these nodes can actually communicate directly. The second return value
661 // reports if this is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700662 message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
663 const Node *node_b);
Austin Schuh8bd96322020-02-13 21:18:22 -0800664
665 // FILE to write offsets to (if populated).
666 FILE *offset_fp_ = nullptr;
667 // Timestamp of the first piece of data used for the horizontal axis on the
668 // plot.
669 aos::realtime_clock::time_point first_time_;
670
671 // List of filters for a connection. The pointer to the first node will be
672 // less than the second node.
673 std::map<std::tuple<const Node *, const Node *>,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700674 std::tuple<message_bridge::NoncausalOffsetEstimator>>
Austin Schuh8bd96322020-02-13 21:18:22 -0800675 filters_;
676
677 // Returns the offset from the monotonic clock for a node to the distributed
Austin Schuh2f8fd752020-09-01 22:38:28 -0700678 // clock. monotonic = distributed * slope() + offset();
679 double slope(int node_index) const {
680 CHECK_LT(node_index, time_slope_matrix_.rows())
James Kuszmaul46d82582020-05-09 19:50:09 -0700681 << ": Got too high of a node index.";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700682 return time_slope_matrix_(node_index);
683 }
684 std::chrono::nanoseconds offset(int node_index) const {
685 CHECK_LT(node_index, time_offset_matrix_.rows())
686 << ": Got too high of a node index.";
687 return std::chrono::duration_cast<std::chrono::nanoseconds>(
688 std::chrono::duration<double>(time_offset_matrix_(node_index)));
Austin Schuh8bd96322020-02-13 21:18:22 -0800689 }
690
691 // Updates the offset matrix solution and sets the per-node distributed
692 // offsets in the factory.
693 void UpdateOffsets();
694
Austin Schuh2f8fd752020-09-01 22:38:28 -0700695 // We have 2 types of equations to do a least squares regression over to fully
696 // constrain our time function.
697 //
698 // One is simple. The distributed clock is the average of all the clocks.
Brian Silverman87ac0402020-09-17 14:47:01 -0700699 // (ta + tb + tc + td) / num_nodes = t_distributed
Austin Schuh2f8fd752020-09-01 22:38:28 -0700700 //
701 // The second is a bit more complicated. Our basic time conversion function
702 // is:
703 // tb = ta + (ta * slope + offset)
704 // We can rewrite this as follows
705 // tb - (1 + slope) * ta = offset
706 //
707 // From here, we have enough equations to solve for t{a,b,c,...} We want to
708 // take as an input the offsets and slope, and solve for the per-node times as
709 // a function of the distributed clock.
710 //
711 // We need to massage our equations to make this work. If we solve for the
712 // per-node times at two set distributed clock times, we will be able to
713 // recreate the linear function (we know it is linear). We can do a similar
714 // thing by breaking our equation up into:
Brian Silverman87ac0402020-09-17 14:47:01 -0700715 //
Austin Schuh2f8fd752020-09-01 22:38:28 -0700716 // [1/3 1/3 1/3 ] [ta] [t_distributed]
717 // [ 1 -1-m1 0 ] [tb] = [oab]
718 // [ 1 0 -1-m2 ] [tc] [oac]
719 //
720 // This solves to:
721 //
722 // [ta] [ a00 a01 a02] [t_distributed]
723 // [tb] = [ a10 a11 a12] * [oab]
724 // [tc] [ a20 a21 a22] [oac]
725 //
726 // and can be split into:
727 //
728 // [ta] [ a00 ] [a01 a02]
729 // [tb] = [ a10 ] * t_distributed + [a11 a12] * [oab]
730 // [tc] [ a20 ] [a21 a22] [oac]
731 //
732 // (map_matrix_ + slope_matrix_) * [ta; tb; tc] = [offset_matrix_];
733 // offset_matrix_ will be in nanoseconds.
734 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
735 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> slope_matrix_;
736 Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> offset_matrix_;
737 // Matrix tracking which offsets are valid.
738 Eigen::Matrix<bool, Eigen::Dynamic, 1> valid_matrix_;
739 // Matrix tracking the last valid matrix we used to determine connected nodes.
740 Eigen::Matrix<bool, Eigen::Dynamic, 1> last_valid_matrix_;
741 size_t cached_valid_node_count_ = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800742
Austin Schuh2f8fd752020-09-01 22:38:28 -0700743 // [ta; tb; tc] = time_slope_matrix_ * t + time_offset_matrix;
744 // t is in seconds.
745 Eigen::Matrix<double, Eigen::Dynamic, 1> time_slope_matrix_;
746 Eigen::Matrix<double, Eigen::Dynamic, 1> time_offset_matrix_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800747
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800748 std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
749 remapped_configuration_buffer_;
750
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800751 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
752 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800753
754 // Map of channel indices to new name. The channel index will be an index into
755 // logged_configuration(), and the string key will be the name of the channel
756 // to send on instead of the logged channel name.
757 std::map<size_t, std::string> remapped_channels_;
Austin Schuh01b4c352020-09-21 23:09:39 -0700758 std::vector<MapT> maps_;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800759
Austin Schuh6f3babe2020-01-26 20:34:50 -0800760 // Number of nodes which still have data to send. This is used to figure out
761 // when to exit.
762 size_t live_nodes_ = 0;
763
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800764 const Configuration *remapped_configuration_ = nullptr;
765 const Configuration *replay_configuration_ = nullptr;
Austin Schuhcde938c2020-02-02 17:30:07 -0800766
767 // If true, the replay timer will ignore any missing data. This is used
768 // during startup when we are bootstrapping everything and trying to get to
769 // the start of all the log files.
770 bool ignore_missing_data_ = false;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800771};
772
773} // namespace logger
774} // namespace aos
775
776#endif // AOS_EVENTS_LOGGER_H_