blob: 3756f83ba612be7d2cf1271c5dee424a064ac244 [file] [log] [blame]
Austin Schuhe309d2a2019-11-29 13:25:21 -08001#ifndef AOS_EVENTS_LOGGER_H_
2#define AOS_EVENTS_LOGGER_H_
3
Austin Schuh8bd96322020-02-13 21:18:22 -08004#include <chrono>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <deque>
Austin Schuh05b70472020-01-01 17:11:17 -08006#include <string_view>
Austin Schuh2f8fd752020-09-01 22:38:28 -07007#include <tuple>
Austin Schuh6f3babe2020-01-26 20:34:50 -08008#include <vector>
Austin Schuhe309d2a2019-11-29 13:25:21 -08009
Austin Schuh8bd96322020-02-13 21:18:22 -080010#include "Eigen/Dense"
11#include "absl/strings/str_cat.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "absl/types/span.h"
13#include "aos/events/event_loop.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070014#include "aos/events/logging/eigen_mpq.h"
Austin Schuha36c8902019-12-30 18:07:15 -080015#include "aos/events/logging/logfile_utils.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080016#include "aos/events/logging/logger_generated.h"
Austin Schuh64fab802020-09-09 22:47:47 -070017#include "aos/events/logging/uuid.h"
Austin Schuh92547522019-12-28 14:33:43 -080018#include "aos/events/simulated_event_loop.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070019#include "aos/network/message_bridge_server_generated.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080020#include "aos/network/timestamp_filter.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080021#include "aos/time/time.h"
22#include "flatbuffers/flatbuffers.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070023#include "third_party/gmp/gmpxx.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080024
25namespace aos {
26namespace logger {
27
Austin Schuh6f3babe2020-01-26 20:34:50 -080028class LogNamer {
29 public:
30 LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
31 virtual ~LogNamer() {}
32
Austin Schuh2f8fd752020-09-01 22:38:28 -070033 virtual void WriteHeader(
Austin Schuh64fab802020-09-09 22:47:47 -070034 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
Austin Schuh2f8fd752020-09-01 22:38:28 -070035 const Node *node) = 0;
Austin Schuh6f3babe2020-01-26 20:34:50 -080036 virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
37
38 virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
Austin Schuh2f8fd752020-09-01 22:38:28 -070039 virtual DetachedBufferWriter *MakeForwardedTimestampWriter(
40 const Channel *channel, const Node *node) = 0;
41 virtual void Rotate(
42 const Node *node,
Austin Schuh64fab802020-09-09 22:47:47 -070043 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) = 0;
Austin Schuh6f3babe2020-01-26 20:34:50 -080044 const std::vector<const Node *> &nodes() const { return nodes_; }
45
46 const Node *node() const { return node_; }
47
48 protected:
Austin Schuh64fab802020-09-09 22:47:47 -070049 void UpdateHeader(
50 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
51 const UUID &uuid, int part_id);
52
Austin Schuh6f3babe2020-01-26 20:34:50 -080053 const Node *const node_;
54 std::vector<const Node *> nodes_;
55};
56
57class LocalLogNamer : public LogNamer {
58 public:
Austin Schuh2f8fd752020-09-01 22:38:28 -070059 LocalLogNamer(std::string_view base_name, const Node *node)
Austin Schuh64fab802020-09-09 22:47:47 -070060 : LogNamer(node),
61 base_name_(base_name),
62 uuid_(UUID::Random()),
63 data_writer_(OpenDataWriter()) {}
Austin Schuh6f3babe2020-01-26 20:34:50 -080064
Austin Schuh2f8fd752020-09-01 22:38:28 -070065 void WriteHeader(
Austin Schuh64fab802020-09-09 22:47:47 -070066 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
Austin Schuh2f8fd752020-09-01 22:38:28 -070067 const Node *node) override {
Austin Schuh6f3babe2020-01-26 20:34:50 -080068 CHECK_EQ(node, this->node());
Austin Schuh64fab802020-09-09 22:47:47 -070069 UpdateHeader(header, uuid_, part_number_);
70 data_writer_->WriteSizedFlatbuffer(header->full_span());
Austin Schuh6f3babe2020-01-26 20:34:50 -080071 }
72
73 DetachedBufferWriter *MakeWriter(const Channel *channel) override {
74 CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
Austin Schuh2f8fd752020-09-01 22:38:28 -070075 return data_writer_.get();
76 }
77
78 void Rotate(const Node *node,
Austin Schuh64fab802020-09-09 22:47:47 -070079 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
80 override {
Austin Schuh2f8fd752020-09-01 22:38:28 -070081 CHECK(node == this->node());
82 ++part_number_;
83 *data_writer_ = std::move(*OpenDataWriter());
Austin Schuh64fab802020-09-09 22:47:47 -070084 UpdateHeader(header, uuid_, part_number_);
85 data_writer_->WriteSizedFlatbuffer(header->full_span());
Austin Schuh6f3babe2020-01-26 20:34:50 -080086 }
87
88 DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
89 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
90 << ": Message is not delivered to this node.";
91 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
92 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
93 node_))
94 << ": Delivery times aren't logged for this channel on this node.";
Austin Schuh2f8fd752020-09-01 22:38:28 -070095 return data_writer_.get();
96 }
97
98 DetachedBufferWriter *MakeForwardedTimestampWriter(
99 const Channel * /*channel*/, const Node * /*node*/) override {
100 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
101 return nullptr;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800102 }
103
104 private:
Austin Schuh2f8fd752020-09-01 22:38:28 -0700105 std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
106 return std::make_unique<DetachedBufferWriter>(
107 absl::StrCat(base_name_, ".part", part_number_, ".bfbs"));
108 }
109 const std::string base_name_;
Austin Schuh64fab802020-09-09 22:47:47 -0700110 const UUID uuid_;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700111 size_t part_number_ = 0;
112 std::unique_ptr<DetachedBufferWriter> data_writer_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800113};
114
115// TODO(austin): Split naming files from making files so we can re-use the
116// naming code to predict the log file names for a provided base name.
117class MultiNodeLogNamer : public LogNamer {
118 public:
119 MultiNodeLogNamer(std::string_view base_name,
120 const Configuration *configuration, const Node *node)
121 : LogNamer(node),
122 base_name_(base_name),
123 configuration_(configuration),
Austin Schuh64fab802020-09-09 22:47:47 -0700124 uuid_(UUID::Random()),
Austin Schuh2f8fd752020-09-01 22:38:28 -0700125 data_writer_(OpenDataWriter()) {}
Austin Schuh6f3babe2020-01-26 20:34:50 -0800126
127 // Writes the header to all log files for a specific node. This function
128 // needs to be called after all the writers are created.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700129 void WriteHeader(
Austin Schuh64fab802020-09-09 22:47:47 -0700130 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700131 const Node *node) override;
132
133 void Rotate(const Node *node,
Austin Schuh64fab802020-09-09 22:47:47 -0700134 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
135 override;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800136
137 // Makes a data logger for a specific channel.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700138 DetachedBufferWriter *MakeWriter(const Channel *channel) override {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800139 // See if we can read the data on this node at all.
140 const bool is_readable =
141 configuration::ChannelIsReadableOnNode(channel, this->node());
142 if (!is_readable) {
143 return nullptr;
144 }
145
146 // Then, see if we are supposed to log the data here.
147 const bool log_message =
148 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
149
150 if (!log_message) {
151 return nullptr;
152 }
153
154 // Now, sort out if this is data generated on this node, or not. It is
155 // generated if it is sendable on this node.
156 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
157 return data_writer_.get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800158 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700159
160 // Ok, we have data that is being forwarded to us that we are supposed to
161 // log. It needs to be logged with send timestamps, but be sorted enough
162 // to be able to be processed.
163 CHECK(data_writers_.find(channel) == data_writers_.end());
164
165 // Track that this node is being logged.
166 const Node *source_node = configuration::GetNode(
167 configuration_, channel->source_node()->string_view());
168
169 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
170 nodes_.emplace_back(source_node);
171 }
172
173 DataWriter data_writer;
174 data_writer.node = source_node;
175 data_writer.rotate = [this](const Channel *channel,
176 DataWriter *data_writer) {
177 OpenWriter(channel, data_writer);
178 };
179 data_writer.rotate(channel, &data_writer);
180
181 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
182 .first->second.writer.get();
183 }
184
185 DetachedBufferWriter *MakeForwardedTimestampWriter(
186 const Channel *channel, const Node *node) override {
187 // See if we can read the data on this node at all.
188 const bool is_readable =
189 configuration::ChannelIsReadableOnNode(channel, this->node());
190 CHECK(is_readable) << ": "
191 << configuration::CleanedChannelToString(channel);
192
193 CHECK(data_writers_.find(channel) == data_writers_.end());
194
195 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
196 nodes_.emplace_back(node);
197 }
198
199 DataWriter data_writer;
200 data_writer.node = node;
201 data_writer.rotate = [this](const Channel *channel,
202 DataWriter *data_writer) {
203 OpenForwardedTimestampWriter(channel, data_writer);
204 };
205 data_writer.rotate(channel, &data_writer);
206
207 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
208 .first->second.writer.get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800209 }
210
211 // Makes a timestamp (or timestamp and data) logger for a channel and
212 // forwarding connection.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700213 DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800214 const bool log_delivery_times =
215 (this->node() == nullptr)
216 ? false
217 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
218 channel, this->node(), this->node());
219 if (!log_delivery_times) {
220 return nullptr;
221 }
222
223 return data_writer_.get();
224 }
225
226 const std::vector<const Node *> &nodes() const { return nodes_; }
227
228 private:
Austin Schuh2f8fd752020-09-01 22:38:28 -0700229 // Files to write remote data to. We want one per channel. Maps the channel
230 // to the writer, Node, and part number.
231 struct DataWriter {
232 std::unique_ptr<DetachedBufferWriter> writer = nullptr;
233 const Node *node;
234 size_t part_number = 0;
Austin Schuh64fab802020-09-09 22:47:47 -0700235 UUID uuid = UUID::Random();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700236 std::function<void(const Channel *, DataWriter *)> rotate;
237 };
238
239 void OpenForwardedTimestampWriter(const Channel *channel,
240 DataWriter *data_writer) {
241 std::string filename =
242 absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
243 "/", channel->type()->string_view(), ".part",
244 data_writer->part_number, ".bfbs");
245
246 if (!data_writer->writer) {
247 data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
248 } else {
249 *data_writer->writer = DetachedBufferWriter(filename);
250 }
251 }
252
253 void OpenWriter(const Channel *channel, DataWriter *data_writer) {
254 const std::string filename = absl::StrCat(
255 base_name_, "_", channel->source_node()->string_view(), "_data",
256 channel->name()->string_view(), "/", channel->type()->string_view(),
257 ".part", data_writer->part_number, ".bfbs");
258 if (!data_writer->writer) {
259 data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
260 } else {
261 *data_writer->writer = DetachedBufferWriter(filename);
262 }
263 }
264
265 std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
266 return std::make_unique<DetachedBufferWriter>(
267 absl::StrCat(base_name_, "_", node()->name()->string_view(),
268 "_data.part", part_number_, ".bfbs"));
269 }
270
Austin Schuh6f3babe2020-01-26 20:34:50 -0800271 const std::string base_name_;
272 const Configuration *const configuration_;
Austin Schuh64fab802020-09-09 22:47:47 -0700273 const UUID uuid_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800274
Austin Schuh2f8fd752020-09-01 22:38:28 -0700275 size_t part_number_ = 0;
276
Austin Schuh6f3babe2020-01-26 20:34:50 -0800277 // File to write both delivery timestamps and local data to.
278 std::unique_ptr<DetachedBufferWriter> data_writer_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800279
Austin Schuh2f8fd752020-09-01 22:38:28 -0700280 std::map<const Channel *, DataWriter> data_writers_;
281};
Austin Schuh8bd96322020-02-13 21:18:22 -0800282
Austin Schuhe309d2a2019-11-29 13:25:21 -0800283// Logs all channels available in the event loop to disk every 100 ms.
284// Start by logging one message per channel to capture any state and
285// configuration that is sent rately on a channel and would affect execution.
286class Logger {
287 public:
Austin Schuh0c297012020-09-16 18:41:59 -0700288 // Constructs a logger.
289 // base_name/log_namer: Object used to write data to disk in one or more log
290 // files. If a base_name is passed in, a LocalLogNamer is wrapped
291 // around it.
292 // event_loop: The event loop used to read the messages.
293 // polling_period: The period used to poll the data.
294 // configuration: When provided, this is the configuration to log, and the
295 // configuration to use for the channel list to log. If not provided,
296 // this becomes the configuration from the event loop.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700297 Logger(std::string_view base_name, EventLoop *event_loop,
Austin Schuhe309d2a2019-11-29 13:25:21 -0800298 std::chrono::milliseconds polling_period =
299 std::chrono::milliseconds(100));
Austin Schuh0c297012020-09-16 18:41:59 -0700300 Logger(std::string_view base_name, EventLoop *event_loop,
301 const Configuration *configuration,
302 std::chrono::milliseconds polling_period =
303 std::chrono::milliseconds(100));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800304 Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
305 std::chrono::milliseconds polling_period =
306 std::chrono::milliseconds(100));
Austin Schuh0c297012020-09-16 18:41:59 -0700307 Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
308 const Configuration *configuration,
309 std::chrono::milliseconds polling_period =
310 std::chrono::milliseconds(100));
311 ~Logger();
312
313 // Overrides the name in the log file header.
314 void set_name(std::string_view name) { name_ = name; }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800315
Austin Schuh2f8fd752020-09-01 22:38:28 -0700316 // Rotates the log file(s), triggering new part files to be written for each
317 // log file.
318 void Rotate();
Austin Schuhfa895892020-01-07 20:07:41 -0800319
Austin Schuhe309d2a2019-11-29 13:25:21 -0800320 private:
Austin Schuhfa895892020-01-07 20:07:41 -0800321 void WriteHeader();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700322 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
323 const Node *node);
324
325 bool MaybeUpdateTimestamp(
326 const Node *node, int node_index,
327 aos::monotonic_clock::time_point monotonic_start_time,
328 aos::realtime_clock::time_point realtime_start_time);
Austin Schuhfa895892020-01-07 20:07:41 -0800329
Austin Schuhe309d2a2019-11-29 13:25:21 -0800330 void DoLogData();
331
Austin Schuh2f8fd752020-09-01 22:38:28 -0700332 void WriteMissingTimestamps();
333
334 void StartLogging();
335
336 // Fetches from each channel until all the data is logged.
337 void LogUntil(monotonic_clock::time_point t);
338
Austin Schuhe309d2a2019-11-29 13:25:21 -0800339 EventLoop *event_loop_;
Austin Schuh64fab802020-09-09 22:47:47 -0700340 const UUID uuid_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800341 std::unique_ptr<LogNamer> log_namer_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800342
Austin Schuh0c297012020-09-16 18:41:59 -0700343 // The configuration to place at the top of the log file.
344 const Configuration *configuration_;
345
346 // Name to save in the log file. Defaults to hostname.
347 std::string name_;
348
Austin Schuhe309d2a2019-11-29 13:25:21 -0800349 // Structure to track both a fetcher, and if the data fetched has been
350 // written. We may want to delay writing data to disk so that we don't let
351 // data get too far out of order when written to disk so we can avoid making
352 // it too hard to sort when reading.
353 struct FetcherStruct {
354 std::unique_ptr<RawFetcher> fetcher;
355 bool written = false;
Austin Schuh15649d62019-12-28 16:36:38 -0800356
Austin Schuh6f3babe2020-01-26 20:34:50 -0800357 int channel_index = -1;
358
359 LogType log_type = LogType::kLogMessage;
360
361 DetachedBufferWriter *writer = nullptr;
362 DetachedBufferWriter *timestamp_writer = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700363 DetachedBufferWriter *contents_writer = nullptr;
364 const Node *writer_node = nullptr;
365 const Node *timestamp_node = nullptr;
366 int node_index = 0;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800367 };
368
369 std::vector<FetcherStruct> fetchers_;
370 TimerHandler *timer_handler_;
371
372 // Period to poll the channels.
373 const std::chrono::milliseconds polling_period_;
374
375 // Last time that data was written for all channels to disk.
376 monotonic_clock::time_point last_synchronized_time_;
377
Austin Schuhfa895892020-01-07 20:07:41 -0800378 monotonic_clock::time_point monotonic_start_time_;
379 realtime_clock::time_point realtime_start_time_;
380
Austin Schuhe309d2a2019-11-29 13:25:21 -0800381 // Max size that the header has consumed. This much extra data will be
382 // reserved in the builder to avoid reallocating.
383 size_t max_header_size_ = 0;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700384
385 // Fetcher for all the statistics from all the nodes.
386 aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
387
388 // Sets the start time for a specific node.
389 void SetStartTime(size_t node_index,
390 aos::monotonic_clock::time_point monotonic_start_time,
391 aos::realtime_clock::time_point realtime_start_time);
392
393 struct NodeState {
394 aos::monotonic_clock::time_point monotonic_start_time =
395 aos::monotonic_clock::min_time;
396 aos::realtime_clock::time_point realtime_start_time =
397 aos::realtime_clock::min_time;
398
399 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
400 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
401 };
402 std::vector<NodeState> node_state_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800403};
404
Austin Schuh6f3babe2020-01-26 20:34:50 -0800405// We end up with one of the following 3 log file types.
406//
407// Single node logged as the source node.
408// -> Replayed just on the source node.
409//
410// Forwarding timestamps only logged from the perspective of the destination
411// node.
412// -> Matched with data on source node and logged.
413//
414// Forwarding timestamps with data logged as the destination node.
415// -> Replayed just as the destination
416// -> Replayed as the source (Much harder, ordering is not defined)
417//
418// Duplicate data logged. -> CHECK that it matches and explode otherwise.
419//
420// This can be boiled down to a set of constraints and tools.
421//
422// 1) Forwarding timestamps and data need to be logged separately.
423// 2) Any forwarded data logged on the destination node needs to be logged
424// separately such that it can be sorted.
425//
426// 1) Log reader needs to be able to sort a list of log files.
427// 2) Log reader needs to be able to merge sorted lists of log files.
428// 3) Log reader needs to be able to match timestamps with messages.
429//
430// We also need to be able to generate multiple views of a log file depending on
431// the target.
432
Austin Schuhe309d2a2019-11-29 13:25:21 -0800433// Replays all the channels in the logfile to the event loop.
434class LogReader {
435 public:
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800436 // If you want to supply a new configuration that will be used for replay
437 // (e.g., to change message rates, or to populate an updated schema), then
438 // pass it in here. It must provide all the channels that the original logged
439 // config did.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800440 //
441 // Log filenames are in the following format:
442 //
443 // {
444 // {log1_part0, log1_part1, ...},
445 // {log2}
446 // }
447 // The inner vector is a list of log file chunks which form up a log file.
448 // The outer vector is a list of log files with subsets of the messages, or
449 // messages from different nodes.
450 //
451 // If the outer vector isn't provided, it is assumed to be of size 1.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800452 LogReader(std::string_view filename,
453 const Configuration *replay_configuration = nullptr);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800454 LogReader(const std::vector<std::string> &filenames,
455 const Configuration *replay_configuration = nullptr);
456 LogReader(const std::vector<std::vector<std::string>> &filenames,
Austin Schuhfa895892020-01-07 20:07:41 -0800457 const Configuration *replay_configuration = nullptr);
James Kuszmaul7daef362019-12-31 18:28:17 -0800458 ~LogReader();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800459
Austin Schuh6331ef92020-01-07 18:28:09 -0800460 // Registers all the callbacks to send the log file data out on an event loop
461 // created in event_loop_factory. This also updates time to be at the start
462 // of the log file by running until the log file starts.
463 // Note: the configuration used in the factory should be configuration()
464 // below, but can be anything as long as the locations needed to send
465 // everything are available.
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800466 void Register(SimulatedEventLoopFactory *event_loop_factory);
Austin Schuh6331ef92020-01-07 18:28:09 -0800467 // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
468 // and then calls Register.
469 void Register();
470 // Registers callbacks for all the events after the log file starts. This is
471 // only useful when replaying live.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800472 void Register(EventLoop *event_loop);
Austin Schuh6331ef92020-01-07 18:28:09 -0800473
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800474 // Unregisters the senders. You only need to call this if you separately
475 // supplied an event loop or event loop factory and the lifetimes are such
476 // that they need to be explicitly destroyed before the LogReader destructor
477 // gets called.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800478 void Deregister();
479
Austin Schuh0c297012020-09-16 18:41:59 -0700480 // Returns the configuration being used for replay from the log file.
481 // Note that this may be different from the configuration actually used for
482 // handling events. You should generally only use this to create a
483 // SimulatedEventLoopFactory, and then get the configuration from there for
484 // everything else.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800485 const Configuration *logged_configuration() const;
486 // Returns the configuration being used for replay.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800487 // The pointer is invalidated whenever RemapLoggedChannel is called.
Austin Schuh15649d62019-12-28 16:36:38 -0800488 const Configuration *configuration() const;
489
Austin Schuh6f3babe2020-01-26 20:34:50 -0800490 // Returns the nodes that this log file was created on. This is a list of
491 // pointers to a node in the nodes() list inside configuration(). The
492 // pointers here are invalidated whenever RemapLoggedChannel is called.
493 std::vector<const Node *> Nodes() const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800494
495 // Returns the starting timestamp for the log file.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800496 monotonic_clock::time_point monotonic_start_time(const Node *node = nullptr);
497 realtime_clock::time_point realtime_start_time(const Node *node = nullptr);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800498
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800499 // Causes the logger to publish the provided channel on a different name so
500 // that replayed applications can publish on the proper channel name without
501 // interference. This operates on raw channel names, without any node or
502 // application specific mappings.
503 void RemapLoggedChannel(std::string_view name, std::string_view type,
504 std::string_view add_prefix = "/original");
505 template <typename T>
506 void RemapLoggedChannel(std::string_view name,
507 std::string_view add_prefix = "/original") {
508 RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
509 }
510
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700511 template <typename T>
512 bool HasChannel(std::string_view name) {
513 return configuration::GetChannel(log_file_header()->configuration(), name,
514 T::GetFullyQualifiedName(), "",
515 nullptr) != nullptr;
516 }
517
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800518 SimulatedEventLoopFactory *event_loop_factory() {
519 return event_loop_factory_;
520 }
521
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700522 const LogFileHeader *log_file_header() const {
523 return &log_file_header_.message();
524 }
525
Austin Schuh0c297012020-09-16 18:41:59 -0700526 std::string_view name() const {
527 return log_file_header()->name()->string_view();
528 }
529
Austin Schuhe309d2a2019-11-29 13:25:21 -0800530 private:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800531 const Channel *RemapChannel(const EventLoop *event_loop,
532 const Channel *channel);
533
Austin Schuhe309d2a2019-11-29 13:25:21 -0800534 // Queues at least max_out_of_order_duration_ messages into channels_.
535 void QueueMessages();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800536 // Handle constructing a configuration with all the additional remapped
537 // channels from calls to RemapLoggedChannel.
538 void MakeRemappedConfig();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800539
Austin Schuh2f8fd752020-09-01 22:38:28 -0700540 // Returns the number of nodes.
541 size_t nodes_count() const {
542 return !configuration::MultiNode(logged_configuration())
543 ? 1u
544 : logged_configuration()->nodes()->size();
545 }
546
Austin Schuh6f3babe2020-01-26 20:34:50 -0800547 const std::vector<std::vector<std::string>> filenames_;
548
549 // This is *a* log file header used to provide the logged config. The rest of
550 // the header is likely distracting.
551 FlatbufferVector<LogFileHeader> log_file_header_;
552
Austin Schuh2f8fd752020-09-01 22:38:28 -0700553 // Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
554 std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
555 Eigen::Matrix<double, Eigen::Dynamic, 1>>
556 SolveOffsets();
557
558 void LogFit(std::string_view prefix);
Austin Schuh8bd96322020-02-13 21:18:22 -0800559
Austin Schuh6f3babe2020-01-26 20:34:50 -0800560 // State per node.
Austin Schuh858c9f32020-08-31 16:56:12 -0700561 class State {
562 public:
563 State(std::unique_ptr<ChannelMerger> channel_merger);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800564
Austin Schuh858c9f32020-08-31 16:56:12 -0700565 // Returns the timestamps, channel_index, and message from a channel.
566 // update_time (will be) set to true when popping this message causes the
567 // filter to change the time offset estimation function.
568 std::tuple<TimestampMerger::DeliveryTimestamp, int,
569 FlatbufferVector<MessageHeader>>
570 PopOldest(bool *update_time);
571
572 // Returns the monotonic time of the oldest message.
573 monotonic_clock::time_point OldestMessageTime() const;
574
575 // Primes the queues inside State. Should be called before calling
576 // OldestMessageTime.
577 void SeedSortedMessages();
Austin Schuh8bd96322020-02-13 21:18:22 -0800578
Austin Schuh858c9f32020-08-31 16:56:12 -0700579 // Returns the starting time for this node.
580 monotonic_clock::time_point monotonic_start_time() const {
581 return channel_merger_->monotonic_start_time();
582 }
583 realtime_clock::time_point realtime_start_time() const {
584 return channel_merger_->realtime_start_time();
585 }
586
587 // Sets the node event loop factory for replaying into a
588 // SimulatedEventLoopFactory. Returns the EventLoop to use.
589 EventLoop *SetNodeEventLoopFactory(
590 NodeEventLoopFactory *node_event_loop_factory);
591
592 // Sets and gets the event loop to use.
593 void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
594 EventLoop *event_loop() { return event_loop_; }
595
Austin Schuh858c9f32020-08-31 16:56:12 -0700596 // Sets the current realtime offset from the monotonic clock for this node
597 // (if we are on a simulated event loop).
598 void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
599 realtime_clock::time_point realtime_time) {
600 if (node_event_loop_factory_ != nullptr) {
601 node_event_loop_factory_->SetRealtimeOffset(monotonic_time,
602 realtime_time);
603 }
604 }
605
606 // Converts a timestamp from the monotonic clock on this node to the
607 // distributed clock.
608 distributed_clock::time_point ToDistributedClock(
609 monotonic_clock::time_point time) {
610 return node_event_loop_factory_->ToDistributedClock(time);
611 }
612
Austin Schuh2f8fd752020-09-01 22:38:28 -0700613 monotonic_clock::time_point FromDistributedClock(
614 distributed_clock::time_point time) {
615 return node_event_loop_factory_->FromDistributedClock(time);
616 }
617
Austin Schuh858c9f32020-08-31 16:56:12 -0700618 // Sets the offset (and slope) from the distributed clock.
619 void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
620 double distributed_slope) {
621 node_event_loop_factory_->SetDistributedOffset(distributed_offset,
622 distributed_slope);
623 }
624
625 // Returns the current time on the remote node which sends messages on
626 // channel_index.
627 monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
628 return channel_target_event_loop_factory_[channel_index]->monotonic_now();
629 }
630
Austin Schuh2f8fd752020-09-01 22:38:28 -0700631 distributed_clock::time_point RemoteToDistributedClock(
632 size_t channel_index, monotonic_clock::time_point time) {
633 return channel_target_event_loop_factory_[channel_index]
634 ->ToDistributedClock(time);
635 }
636
637 const Node *remote_node(size_t channel_index) {
638 return channel_target_event_loop_factory_[channel_index]->node();
639 }
640
641 monotonic_clock::time_point monotonic_now() {
642 return node_event_loop_factory_->monotonic_now();
643 }
644
Austin Schuh858c9f32020-08-31 16:56:12 -0700645 // Sets the node we will be merging as, and returns true if there is any
646 // data on it.
647 bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
648
649 // Sets the number of channels.
650 void SetChannelCount(size_t count);
651
652 // Sets the sender, filter, and target factory for a channel.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700653 void SetChannel(size_t channel, std::unique_ptr<RawSender> sender,
654 message_bridge::NoncausalOffsetEstimator *filter,
655 NodeEventLoopFactory *channel_target_event_loop_factory);
Austin Schuh858c9f32020-08-31 16:56:12 -0700656
657 // Returns if we have read all the messages from all the logs.
658 bool at_end() const { return channel_merger_->at_end(); }
659
660 // Unregisters everything so we can destory the event loop.
661 void Deregister();
662
663 // Sets the current TimerHandle for the replay callback.
664 void set_timer_handler(TimerHandler *timer_handler) {
665 timer_handler_ = timer_handler;
666 }
667
668 // Sets the next wakeup time on the replay callback.
669 void Setup(monotonic_clock::time_point next_time) {
670 timer_handler_->Setup(next_time);
671 }
672
673 // Sends a buffer on the provided channel index.
674 bool Send(size_t channel_index, const void *data, size_t size,
675 aos::monotonic_clock::time_point monotonic_remote_time,
676 aos::realtime_clock::time_point realtime_remote_time,
677 uint32_t remote_queue_index) {
678 return channels_[channel_index]->Send(data, size, monotonic_remote_time,
679 realtime_remote_time,
680 remote_queue_index);
681 }
682
683 // Returns a debug string for the channel merger.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700684 std::string DebugString() const {
685 std::stringstream messages;
686 size_t i = 0;
687 for (const auto &message : sorted_messages_) {
688 if (i < 7 || i + 7 > sorted_messages_.size()) {
689 messages << "sorted_messages[" << i
690 << "]: " << std::get<0>(message).monotonic_event_time << " "
691 << configuration::StrippedChannelToString(
692 event_loop_->configuration()->channels()->Get(
693 std::get<2>(message).message().channel_index()))
694 << "\n";
695 } else if (i == 7) {
696 messages << "...\n";
697 }
698 ++i;
699 }
700 return messages.str() + channel_merger_->DebugString();
701 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700702
703 private:
704 // Log file.
705 std::unique_ptr<ChannelMerger> channel_merger_;
706
707 std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700708 FlatbufferVector<MessageHeader>,
709 message_bridge::NoncausalOffsetEstimator *>>
Austin Schuh858c9f32020-08-31 16:56:12 -0700710 sorted_messages_;
711
712 // Senders.
713 std::vector<std::unique_ptr<RawSender>> channels_;
714
715 // Factory (if we are in sim) that this loop was created on.
716 NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
717 std::unique_ptr<EventLoop> event_loop_unique_ptr_;
718 // Event loop.
719 EventLoop *event_loop_ = nullptr;
720 // And timer used to send messages.
721 TimerHandler *timer_handler_;
722
Austin Schuh8bd96322020-02-13 21:18:22 -0800723 // Filters (or nullptr if it isn't a forwarded channel) for each channel.
724 // This corresponds to the object which is shared among all the channels
725 // going between 2 nodes. The second element in the tuple indicates if this
726 // is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700727 std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800728
729 // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
730 // channel) which correspond to the originating node.
Austin Schuh858c9f32020-08-31 16:56:12 -0700731 std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800732 };
733
Austin Schuh8bd96322020-02-13 21:18:22 -0800734 // Node index -> State.
735 std::vector<std::unique_ptr<State>> states_;
736
737 // Creates the requested filter if it doesn't exist, regardless of whether
738 // these nodes can actually communicate directly. The second return value
739 // reports if this is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700740 message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
741 const Node *node_b);
Austin Schuh8bd96322020-02-13 21:18:22 -0800742
743 // FILE to write offsets to (if populated).
744 FILE *offset_fp_ = nullptr;
745 // Timestamp of the first piece of data used for the horizontal axis on the
746 // plot.
747 aos::realtime_clock::time_point first_time_;
748
749 // List of filters for a connection. The pointer to the first node will be
750 // less than the second node.
751 std::map<std::tuple<const Node *, const Node *>,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700752 std::tuple<message_bridge::NoncausalOffsetEstimator>>
Austin Schuh8bd96322020-02-13 21:18:22 -0800753 filters_;
754
755 // Returns the offset from the monotonic clock for a node to the distributed
Austin Schuh2f8fd752020-09-01 22:38:28 -0700756 // clock. monotonic = distributed * slope() + offset();
757 double slope(int node_index) const {
758 CHECK_LT(node_index, time_slope_matrix_.rows())
James Kuszmaul46d82582020-05-09 19:50:09 -0700759 << ": Got too high of a node index.";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700760 return time_slope_matrix_(node_index);
761 }
762 std::chrono::nanoseconds offset(int node_index) const {
763 CHECK_LT(node_index, time_offset_matrix_.rows())
764 << ": Got too high of a node index.";
765 return std::chrono::duration_cast<std::chrono::nanoseconds>(
766 std::chrono::duration<double>(time_offset_matrix_(node_index)));
Austin Schuh8bd96322020-02-13 21:18:22 -0800767 }
768
769 // Updates the offset matrix solution and sets the per-node distributed
770 // offsets in the factory.
771 void UpdateOffsets();
772
Austin Schuh2f8fd752020-09-01 22:38:28 -0700773 // We have 2 types of equations to do a least squares regression over to fully
774 // constrain our time function.
775 //
776 // One is simple. The distributed clock is the average of all the clocks.
777 // (ta + tb + tc + td) / num_nodex = t_distributed
778 //
779 // The second is a bit more complicated. Our basic time conversion function
780 // is:
781 // tb = ta + (ta * slope + offset)
782 // We can rewrite this as follows
783 // tb - (1 + slope) * ta = offset
784 //
785 // From here, we have enough equations to solve for t{a,b,c,...} We want to
786 // take as an input the offsets and slope, and solve for the per-node times as
787 // a function of the distributed clock.
788 //
789 // We need to massage our equations to make this work. If we solve for the
790 // per-node times at two set distributed clock times, we will be able to
791 // recreate the linear function (we know it is linear). We can do a similar
792 // thing by breaking our equation up into:
793 //
794 // [1/3 1/3 1/3 ] [ta] [t_distributed]
795 // [ 1 -1-m1 0 ] [tb] = [oab]
796 // [ 1 0 -1-m2 ] [tc] [oac]
797 //
798 // This solves to:
799 //
800 // [ta] [ a00 a01 a02] [t_distributed]
801 // [tb] = [ a10 a11 a12] * [oab]
802 // [tc] [ a20 a21 a22] [oac]
803 //
804 // and can be split into:
805 //
806 // [ta] [ a00 ] [a01 a02]
807 // [tb] = [ a10 ] * t_distributed + [a11 a12] * [oab]
808 // [tc] [ a20 ] [a21 a22] [oac]
809 //
810 // (map_matrix_ + slope_matrix_) * [ta; tb; tc] = [offset_matrix_];
811 // offset_matrix_ will be in nanoseconds.
812 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
813 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> slope_matrix_;
814 Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> offset_matrix_;
815 // Matrix tracking which offsets are valid.
816 Eigen::Matrix<bool, Eigen::Dynamic, 1> valid_matrix_;
817 // Matrix tracking the last valid matrix we used to determine connected nodes.
818 Eigen::Matrix<bool, Eigen::Dynamic, 1> last_valid_matrix_;
819 size_t cached_valid_node_count_ = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800820
Austin Schuh2f8fd752020-09-01 22:38:28 -0700821 // [ta; tb; tc] = time_slope_matrix_ * t + time_offset_matrix;
822 // t is in seconds.
823 Eigen::Matrix<double, Eigen::Dynamic, 1> time_slope_matrix_;
824 Eigen::Matrix<double, Eigen::Dynamic, 1> time_offset_matrix_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800825
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800826 std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
827 remapped_configuration_buffer_;
828
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800829 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
830 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800831
832 // Map of channel indices to new name. The channel index will be an index into
833 // logged_configuration(), and the string key will be the name of the channel
834 // to send on instead of the logged channel name.
835 std::map<size_t, std::string> remapped_channels_;
836
Austin Schuh6f3babe2020-01-26 20:34:50 -0800837 // Number of nodes which still have data to send. This is used to figure out
838 // when to exit.
839 size_t live_nodes_ = 0;
840
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800841 const Configuration *remapped_configuration_ = nullptr;
842 const Configuration *replay_configuration_ = nullptr;
Austin Schuhcde938c2020-02-02 17:30:07 -0800843
844 // If true, the replay timer will ignore any missing data. This is used
845 // during startup when we are bootstrapping everything and trying to get to
846 // the start of all the log files.
847 bool ignore_missing_data_ = false;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800848};
849
850} // namespace logger
851} // namespace aos
852
853#endif // AOS_EVENTS_LOGGER_H_