blob: e59909103e210e59d82ff557595743740d9486aa [file] [log] [blame]
Austin Schuhe309d2a2019-11-29 13:25:21 -08001#ifndef AOS_EVENTS_LOGGER_H_
2#define AOS_EVENTS_LOGGER_H_
3
Austin Schuh8bd96322020-02-13 21:18:22 -08004#include <chrono>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <deque>
Austin Schuh05b70472020-01-01 17:11:17 -08006#include <string_view>
Austin Schuh2f8fd752020-09-01 22:38:28 -07007#include <tuple>
Austin Schuh6f3babe2020-01-26 20:34:50 -08008#include <vector>
Austin Schuhe309d2a2019-11-29 13:25:21 -08009
Austin Schuh8bd96322020-02-13 21:18:22 -080010#include "Eigen/Dense"
11#include "absl/strings/str_cat.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "absl/types/span.h"
13#include "aos/events/event_loop.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070014#include "aos/events/logging/eigen_mpq.h"
Austin Schuha36c8902019-12-30 18:07:15 -080015#include "aos/events/logging/logfile_utils.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080016#include "aos/events/logging/logger_generated.h"
Austin Schuh92547522019-12-28 14:33:43 -080017#include "aos/events/simulated_event_loop.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070018#include "aos/network/message_bridge_server_generated.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080019#include "aos/network/timestamp_filter.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080020#include "aos/time/time.h"
21#include "flatbuffers/flatbuffers.h"
Austin Schuh2f8fd752020-09-01 22:38:28 -070022#include "third_party/gmp/gmpxx.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080023
24namespace aos {
25namespace logger {
26
Austin Schuh6f3babe2020-01-26 20:34:50 -080027class LogNamer {
28 public:
29 LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
30 virtual ~LogNamer() {}
31
Austin Schuh2f8fd752020-09-01 22:38:28 -070032 virtual void WriteHeader(
33 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header,
34 const Node *node) = 0;
Austin Schuh6f3babe2020-01-26 20:34:50 -080035 virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
36
37 virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
Austin Schuh2f8fd752020-09-01 22:38:28 -070038 virtual DetachedBufferWriter *MakeForwardedTimestampWriter(
39 const Channel *channel, const Node *node) = 0;
40 virtual void Rotate(
41 const Node *node,
42 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
43 &header) = 0;
Austin Schuh6f3babe2020-01-26 20:34:50 -080044 const std::vector<const Node *> &nodes() const { return nodes_; }
45
46 const Node *node() const { return node_; }
47
48 protected:
49 const Node *const node_;
50 std::vector<const Node *> nodes_;
51};
52
53class LocalLogNamer : public LogNamer {
54 public:
Austin Schuh2f8fd752020-09-01 22:38:28 -070055 LocalLogNamer(std::string_view base_name, const Node *node)
56 : LogNamer(node), base_name_(base_name), data_writer_(OpenDataWriter()) {}
Austin Schuh6f3babe2020-01-26 20:34:50 -080057
Austin Schuh2f8fd752020-09-01 22:38:28 -070058 void WriteHeader(
59 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header,
60 const Node *node) override {
Austin Schuh6f3babe2020-01-26 20:34:50 -080061 CHECK_EQ(node, this->node());
Austin Schuh2f8fd752020-09-01 22:38:28 -070062 data_writer_->WriteSizedFlatbuffer(header.full_span());
Austin Schuh6f3babe2020-01-26 20:34:50 -080063 }
64
65 DetachedBufferWriter *MakeWriter(const Channel *channel) override {
66 CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
Austin Schuh2f8fd752020-09-01 22:38:28 -070067 return data_writer_.get();
68 }
69
70 void Rotate(const Node *node,
71 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
72 &header) override {
73 CHECK(node == this->node());
74 ++part_number_;
75 *data_writer_ = std::move(*OpenDataWriter());
76 data_writer_->WriteSizedFlatbuffer(header.full_span());
Austin Schuh6f3babe2020-01-26 20:34:50 -080077 }
78
79 DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
80 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
81 << ": Message is not delivered to this node.";
82 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
83 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
84 node_))
85 << ": Delivery times aren't logged for this channel on this node.";
Austin Schuh2f8fd752020-09-01 22:38:28 -070086 return data_writer_.get();
87 }
88
89 DetachedBufferWriter *MakeForwardedTimestampWriter(
90 const Channel * /*channel*/, const Node * /*node*/) override {
91 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
92 return nullptr;
Austin Schuh6f3babe2020-01-26 20:34:50 -080093 }
94
95 private:
Austin Schuh2f8fd752020-09-01 22:38:28 -070096 std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
97 return std::make_unique<DetachedBufferWriter>(
98 absl::StrCat(base_name_, ".part", part_number_, ".bfbs"));
99 }
100 const std::string base_name_;
101 size_t part_number_ = 0;
102 std::unique_ptr<DetachedBufferWriter> data_writer_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800103};
104
105// TODO(austin): Split naming files from making files so we can re-use the
106// naming code to predict the log file names for a provided base name.
107class MultiNodeLogNamer : public LogNamer {
108 public:
109 MultiNodeLogNamer(std::string_view base_name,
110 const Configuration *configuration, const Node *node)
111 : LogNamer(node),
112 base_name_(base_name),
113 configuration_(configuration),
Austin Schuh2f8fd752020-09-01 22:38:28 -0700114 data_writer_(OpenDataWriter()) {}
Austin Schuh6f3babe2020-01-26 20:34:50 -0800115
116 // Writes the header to all log files for a specific node. This function
117 // needs to be called after all the writers are created.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700118 void WriteHeader(
119 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header,
120 const Node *node) override;
121
122 void Rotate(const Node *node,
123 const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
124 &header) override;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800125
126 // Makes a data logger for a specific channel.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700127 DetachedBufferWriter *MakeWriter(const Channel *channel) override {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800128 // See if we can read the data on this node at all.
129 const bool is_readable =
130 configuration::ChannelIsReadableOnNode(channel, this->node());
131 if (!is_readable) {
132 return nullptr;
133 }
134
135 // Then, see if we are supposed to log the data here.
136 const bool log_message =
137 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
138
139 if (!log_message) {
140 return nullptr;
141 }
142
143 // Now, sort out if this is data generated on this node, or not. It is
144 // generated if it is sendable on this node.
145 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
146 return data_writer_.get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800147 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700148
149 // Ok, we have data that is being forwarded to us that we are supposed to
150 // log. It needs to be logged with send timestamps, but be sorted enough
151 // to be able to be processed.
152 CHECK(data_writers_.find(channel) == data_writers_.end());
153
154 // Track that this node is being logged.
155 const Node *source_node = configuration::GetNode(
156 configuration_, channel->source_node()->string_view());
157
158 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
159 nodes_.emplace_back(source_node);
160 }
161
162 DataWriter data_writer;
163 data_writer.node = source_node;
164 data_writer.rotate = [this](const Channel *channel,
165 DataWriter *data_writer) {
166 OpenWriter(channel, data_writer);
167 };
168 data_writer.rotate(channel, &data_writer);
169
170 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
171 .first->second.writer.get();
172 }
173
174 DetachedBufferWriter *MakeForwardedTimestampWriter(
175 const Channel *channel, const Node *node) override {
176 // See if we can read the data on this node at all.
177 const bool is_readable =
178 configuration::ChannelIsReadableOnNode(channel, this->node());
179 CHECK(is_readable) << ": "
180 << configuration::CleanedChannelToString(channel);
181
182 CHECK(data_writers_.find(channel) == data_writers_.end());
183
184 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
185 nodes_.emplace_back(node);
186 }
187
188 DataWriter data_writer;
189 data_writer.node = node;
190 data_writer.rotate = [this](const Channel *channel,
191 DataWriter *data_writer) {
192 OpenForwardedTimestampWriter(channel, data_writer);
193 };
194 data_writer.rotate(channel, &data_writer);
195
196 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
197 .first->second.writer.get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800198 }
199
200 // Makes a timestamp (or timestamp and data) logger for a channel and
201 // forwarding connection.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700202 DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800203 const bool log_delivery_times =
204 (this->node() == nullptr)
205 ? false
206 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
207 channel, this->node(), this->node());
208 if (!log_delivery_times) {
209 return nullptr;
210 }
211
212 return data_writer_.get();
213 }
214
215 const std::vector<const Node *> &nodes() const { return nodes_; }
216
217 private:
Austin Schuh2f8fd752020-09-01 22:38:28 -0700218 // Files to write remote data to. We want one per channel. Maps the channel
219 // to the writer, Node, and part number.
220 struct DataWriter {
221 std::unique_ptr<DetachedBufferWriter> writer = nullptr;
222 const Node *node;
223 size_t part_number = 0;
224 std::function<void(const Channel *, DataWriter *)> rotate;
225 };
226
227 void OpenForwardedTimestampWriter(const Channel *channel,
228 DataWriter *data_writer) {
229 std::string filename =
230 absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
231 "/", channel->type()->string_view(), ".part",
232 data_writer->part_number, ".bfbs");
233
234 if (!data_writer->writer) {
235 data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
236 } else {
237 *data_writer->writer = DetachedBufferWriter(filename);
238 }
239 }
240
241 void OpenWriter(const Channel *channel, DataWriter *data_writer) {
242 const std::string filename = absl::StrCat(
243 base_name_, "_", channel->source_node()->string_view(), "_data",
244 channel->name()->string_view(), "/", channel->type()->string_view(),
245 ".part", data_writer->part_number, ".bfbs");
246 if (!data_writer->writer) {
247 data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
248 } else {
249 *data_writer->writer = DetachedBufferWriter(filename);
250 }
251 }
252
253 std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
254 return std::make_unique<DetachedBufferWriter>(
255 absl::StrCat(base_name_, "_", node()->name()->string_view(),
256 "_data.part", part_number_, ".bfbs"));
257 }
258
Austin Schuh6f3babe2020-01-26 20:34:50 -0800259 const std::string base_name_;
260 const Configuration *const configuration_;
261
Austin Schuh2f8fd752020-09-01 22:38:28 -0700262 size_t part_number_ = 0;
263
Austin Schuh6f3babe2020-01-26 20:34:50 -0800264 // File to write both delivery timestamps and local data to.
265 std::unique_ptr<DetachedBufferWriter> data_writer_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800266
Austin Schuh2f8fd752020-09-01 22:38:28 -0700267 std::map<const Channel *, DataWriter> data_writers_;
268};
Austin Schuh8bd96322020-02-13 21:18:22 -0800269
Austin Schuhe309d2a2019-11-29 13:25:21 -0800270// Logs all channels available in the event loop to disk every 100 ms.
271// Start by logging one message per channel to capture any state and
272// configuration that is sent rately on a channel and would affect execution.
273class Logger {
274 public:
Austin Schuh2f8fd752020-09-01 22:38:28 -0700275 Logger(std::string_view base_name, EventLoop *event_loop,
Austin Schuhe309d2a2019-11-29 13:25:21 -0800276 std::chrono::milliseconds polling_period =
277 std::chrono::milliseconds(100));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800278 Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
279 std::chrono::milliseconds polling_period =
280 std::chrono::milliseconds(100));
Austin Schuhe309d2a2019-11-29 13:25:21 -0800281
Austin Schuh2f8fd752020-09-01 22:38:28 -0700282 // Rotates the log file(s), triggering new part files to be written for each
283 // log file.
284 void Rotate();
Austin Schuhfa895892020-01-07 20:07:41 -0800285
Austin Schuhe309d2a2019-11-29 13:25:21 -0800286 private:
Austin Schuhfa895892020-01-07 20:07:41 -0800287 void WriteHeader();
Austin Schuh2f8fd752020-09-01 22:38:28 -0700288 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
289 const Node *node);
290
291 bool MaybeUpdateTimestamp(
292 const Node *node, int node_index,
293 aos::monotonic_clock::time_point monotonic_start_time,
294 aos::realtime_clock::time_point realtime_start_time);
Austin Schuhfa895892020-01-07 20:07:41 -0800295
Austin Schuhe309d2a2019-11-29 13:25:21 -0800296 void DoLogData();
297
Austin Schuh2f8fd752020-09-01 22:38:28 -0700298 void WriteMissingTimestamps();
299
300 void StartLogging();
301
302 // Fetches from each channel until all the data is logged.
303 void LogUntil(monotonic_clock::time_point t);
304
Austin Schuhe309d2a2019-11-29 13:25:21 -0800305 EventLoop *event_loop_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800306 std::unique_ptr<LogNamer> log_namer_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800307
308 // Structure to track both a fetcher, and if the data fetched has been
309 // written. We may want to delay writing data to disk so that we don't let
310 // data get too far out of order when written to disk so we can avoid making
311 // it too hard to sort when reading.
312 struct FetcherStruct {
313 std::unique_ptr<RawFetcher> fetcher;
314 bool written = false;
Austin Schuh15649d62019-12-28 16:36:38 -0800315
Austin Schuh6f3babe2020-01-26 20:34:50 -0800316 int channel_index = -1;
317
318 LogType log_type = LogType::kLogMessage;
319
320 DetachedBufferWriter *writer = nullptr;
321 DetachedBufferWriter *timestamp_writer = nullptr;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700322 DetachedBufferWriter *contents_writer = nullptr;
323 const Node *writer_node = nullptr;
324 const Node *timestamp_node = nullptr;
325 int node_index = 0;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800326 };
327
328 std::vector<FetcherStruct> fetchers_;
329 TimerHandler *timer_handler_;
330
331 // Period to poll the channels.
332 const std::chrono::milliseconds polling_period_;
333
334 // Last time that data was written for all channels to disk.
335 monotonic_clock::time_point last_synchronized_time_;
336
Austin Schuhfa895892020-01-07 20:07:41 -0800337 monotonic_clock::time_point monotonic_start_time_;
338 realtime_clock::time_point realtime_start_time_;
339
Austin Schuhe309d2a2019-11-29 13:25:21 -0800340 // Max size that the header has consumed. This much extra data will be
341 // reserved in the builder to avoid reallocating.
342 size_t max_header_size_ = 0;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700343
344 // Fetcher for all the statistics from all the nodes.
345 aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
346
347 // Sets the start time for a specific node.
348 void SetStartTime(size_t node_index,
349 aos::monotonic_clock::time_point monotonic_start_time,
350 aos::realtime_clock::time_point realtime_start_time);
351
352 struct NodeState {
353 aos::monotonic_clock::time_point monotonic_start_time =
354 aos::monotonic_clock::min_time;
355 aos::realtime_clock::time_point realtime_start_time =
356 aos::realtime_clock::min_time;
357
358 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
359 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
360 };
361 std::vector<NodeState> node_state_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800362};
363
Austin Schuh6f3babe2020-01-26 20:34:50 -0800364// We end up with one of the following 3 log file types.
365//
366// Single node logged as the source node.
367// -> Replayed just on the source node.
368//
369// Forwarding timestamps only logged from the perspective of the destination
370// node.
371// -> Matched with data on source node and logged.
372//
373// Forwarding timestamps with data logged as the destination node.
374// -> Replayed just as the destination
375// -> Replayed as the source (Much harder, ordering is not defined)
376//
377// Duplicate data logged. -> CHECK that it matches and explode otherwise.
378//
379// This can be boiled down to a set of constraints and tools.
380//
381// 1) Forwarding timestamps and data need to be logged separately.
382// 2) Any forwarded data logged on the destination node needs to be logged
383// separately such that it can be sorted.
384//
385// 1) Log reader needs to be able to sort a list of log files.
386// 2) Log reader needs to be able to merge sorted lists of log files.
387// 3) Log reader needs to be able to match timestamps with messages.
388//
389// We also need to be able to generate multiple views of a log file depending on
390// the target.
391
Austin Schuhe309d2a2019-11-29 13:25:21 -0800392// Replays all the channels in the logfile to the event loop.
393class LogReader {
394 public:
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800395 // If you want to supply a new configuration that will be used for replay
396 // (e.g., to change message rates, or to populate an updated schema), then
397 // pass it in here. It must provide all the channels that the original logged
398 // config did.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800399 //
400 // Log filenames are in the following format:
401 //
402 // {
403 // {log1_part0, log1_part1, ...},
404 // {log2}
405 // }
406 // The inner vector is a list of log file chunks which form up a log file.
407 // The outer vector is a list of log files with subsets of the messages, or
408 // messages from different nodes.
409 //
410 // If the outer vector isn't provided, it is assumed to be of size 1.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800411 LogReader(std::string_view filename,
412 const Configuration *replay_configuration = nullptr);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800413 LogReader(const std::vector<std::string> &filenames,
414 const Configuration *replay_configuration = nullptr);
415 LogReader(const std::vector<std::vector<std::string>> &filenames,
Austin Schuhfa895892020-01-07 20:07:41 -0800416 const Configuration *replay_configuration = nullptr);
James Kuszmaul7daef362019-12-31 18:28:17 -0800417 ~LogReader();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800418
Austin Schuh6331ef92020-01-07 18:28:09 -0800419 // Registers all the callbacks to send the log file data out on an event loop
420 // created in event_loop_factory. This also updates time to be at the start
421 // of the log file by running until the log file starts.
422 // Note: the configuration used in the factory should be configuration()
423 // below, but can be anything as long as the locations needed to send
424 // everything are available.
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800425 void Register(SimulatedEventLoopFactory *event_loop_factory);
Austin Schuh6331ef92020-01-07 18:28:09 -0800426 // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
427 // and then calls Register.
428 void Register();
429 // Registers callbacks for all the events after the log file starts. This is
430 // only useful when replaying live.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800431 void Register(EventLoop *event_loop);
Austin Schuh6331ef92020-01-07 18:28:09 -0800432
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800433 // Unregisters the senders. You only need to call this if you separately
434 // supplied an event loop or event loop factory and the lifetimes are such
435 // that they need to be explicitly destroyed before the LogReader destructor
436 // gets called.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800437 void Deregister();
438
Austin Schuhe309d2a2019-11-29 13:25:21 -0800439 // Returns the configuration from the log file.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800440 const Configuration *logged_configuration() const;
441 // Returns the configuration being used for replay.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800442 // The pointer is invalidated whenever RemapLoggedChannel is called.
Austin Schuh15649d62019-12-28 16:36:38 -0800443 const Configuration *configuration() const;
444
Austin Schuh6f3babe2020-01-26 20:34:50 -0800445 // Returns the nodes that this log file was created on. This is a list of
446 // pointers to a node in the nodes() list inside configuration(). The
447 // pointers here are invalidated whenever RemapLoggedChannel is called.
448 std::vector<const Node *> Nodes() const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800449
450 // Returns the starting timestamp for the log file.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800451 monotonic_clock::time_point monotonic_start_time(const Node *node = nullptr);
452 realtime_clock::time_point realtime_start_time(const Node *node = nullptr);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800453
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800454 // Causes the logger to publish the provided channel on a different name so
455 // that replayed applications can publish on the proper channel name without
456 // interference. This operates on raw channel names, without any node or
457 // application specific mappings.
458 void RemapLoggedChannel(std::string_view name, std::string_view type,
459 std::string_view add_prefix = "/original");
460 template <typename T>
461 void RemapLoggedChannel(std::string_view name,
462 std::string_view add_prefix = "/original") {
463 RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
464 }
465
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700466 template <typename T>
467 bool HasChannel(std::string_view name) {
468 return configuration::GetChannel(log_file_header()->configuration(), name,
469 T::GetFullyQualifiedName(), "",
470 nullptr) != nullptr;
471 }
472
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800473 SimulatedEventLoopFactory *event_loop_factory() {
474 return event_loop_factory_;
475 }
476
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700477 const LogFileHeader *log_file_header() const {
478 return &log_file_header_.message();
479 }
480
Austin Schuhe309d2a2019-11-29 13:25:21 -0800481 private:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800482 const Channel *RemapChannel(const EventLoop *event_loop,
483 const Channel *channel);
484
Austin Schuhe309d2a2019-11-29 13:25:21 -0800485 // Queues at least max_out_of_order_duration_ messages into channels_.
486 void QueueMessages();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800487 // Handle constructing a configuration with all the additional remapped
488 // channels from calls to RemapLoggedChannel.
489 void MakeRemappedConfig();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800490
Austin Schuh2f8fd752020-09-01 22:38:28 -0700491 // Returns the number of nodes.
492 size_t nodes_count() const {
493 return !configuration::MultiNode(logged_configuration())
494 ? 1u
495 : logged_configuration()->nodes()->size();
496 }
497
Austin Schuh6f3babe2020-01-26 20:34:50 -0800498 const std::vector<std::vector<std::string>> filenames_;
499
500 // This is *a* log file header used to provide the logged config. The rest of
501 // the header is likely distracting.
502 FlatbufferVector<LogFileHeader> log_file_header_;
503
Austin Schuh2f8fd752020-09-01 22:38:28 -0700504 // Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
505 std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
506 Eigen::Matrix<double, Eigen::Dynamic, 1>>
507 SolveOffsets();
508
509 void LogFit(std::string_view prefix);
Austin Schuh8bd96322020-02-13 21:18:22 -0800510
Austin Schuh6f3babe2020-01-26 20:34:50 -0800511 // State per node.
Austin Schuh858c9f32020-08-31 16:56:12 -0700512 class State {
513 public:
514 State(std::unique_ptr<ChannelMerger> channel_merger);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800515
Austin Schuh858c9f32020-08-31 16:56:12 -0700516 // Returns the timestamps, channel_index, and message from a channel.
517 // update_time (will be) set to true when popping this message causes the
518 // filter to change the time offset estimation function.
519 std::tuple<TimestampMerger::DeliveryTimestamp, int,
520 FlatbufferVector<MessageHeader>>
521 PopOldest(bool *update_time);
522
523 // Returns the monotonic time of the oldest message.
524 monotonic_clock::time_point OldestMessageTime() const;
525
526 // Primes the queues inside State. Should be called before calling
527 // OldestMessageTime.
528 void SeedSortedMessages();
Austin Schuh8bd96322020-02-13 21:18:22 -0800529
Austin Schuh858c9f32020-08-31 16:56:12 -0700530 // Returns the starting time for this node.
531 monotonic_clock::time_point monotonic_start_time() const {
532 return channel_merger_->monotonic_start_time();
533 }
534 realtime_clock::time_point realtime_start_time() const {
535 return channel_merger_->realtime_start_time();
536 }
537
538 // Sets the node event loop factory for replaying into a
539 // SimulatedEventLoopFactory. Returns the EventLoop to use.
540 EventLoop *SetNodeEventLoopFactory(
541 NodeEventLoopFactory *node_event_loop_factory);
542
543 // Sets and gets the event loop to use.
544 void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
545 EventLoop *event_loop() { return event_loop_; }
546
Austin Schuh858c9f32020-08-31 16:56:12 -0700547 // Sets the current realtime offset from the monotonic clock for this node
548 // (if we are on a simulated event loop).
549 void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
550 realtime_clock::time_point realtime_time) {
551 if (node_event_loop_factory_ != nullptr) {
552 node_event_loop_factory_->SetRealtimeOffset(monotonic_time,
553 realtime_time);
554 }
555 }
556
557 // Converts a timestamp from the monotonic clock on this node to the
558 // distributed clock.
559 distributed_clock::time_point ToDistributedClock(
560 monotonic_clock::time_point time) {
561 return node_event_loop_factory_->ToDistributedClock(time);
562 }
563
Austin Schuh2f8fd752020-09-01 22:38:28 -0700564 monotonic_clock::time_point FromDistributedClock(
565 distributed_clock::time_point time) {
566 return node_event_loop_factory_->FromDistributedClock(time);
567 }
568
Austin Schuh858c9f32020-08-31 16:56:12 -0700569 // Sets the offset (and slope) from the distributed clock.
570 void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
571 double distributed_slope) {
572 node_event_loop_factory_->SetDistributedOffset(distributed_offset,
573 distributed_slope);
574 }
575
576 // Returns the current time on the remote node which sends messages on
577 // channel_index.
578 monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
579 return channel_target_event_loop_factory_[channel_index]->monotonic_now();
580 }
581
Austin Schuh2f8fd752020-09-01 22:38:28 -0700582 distributed_clock::time_point RemoteToDistributedClock(
583 size_t channel_index, monotonic_clock::time_point time) {
584 return channel_target_event_loop_factory_[channel_index]
585 ->ToDistributedClock(time);
586 }
587
588 const Node *remote_node(size_t channel_index) {
589 return channel_target_event_loop_factory_[channel_index]->node();
590 }
591
592 monotonic_clock::time_point monotonic_now() {
593 return node_event_loop_factory_->monotonic_now();
594 }
595
Austin Schuh858c9f32020-08-31 16:56:12 -0700596 // Sets the node we will be merging as, and returns true if there is any
597 // data on it.
598 bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
599
600 // Sets the number of channels.
601 void SetChannelCount(size_t count);
602
603 // Sets the sender, filter, and target factory for a channel.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700604 void SetChannel(size_t channel, std::unique_ptr<RawSender> sender,
605 message_bridge::NoncausalOffsetEstimator *filter,
606 NodeEventLoopFactory *channel_target_event_loop_factory);
Austin Schuh858c9f32020-08-31 16:56:12 -0700607
608 // Returns if we have read all the messages from all the logs.
609 bool at_end() const { return channel_merger_->at_end(); }
610
611 // Unregisters everything so we can destory the event loop.
612 void Deregister();
613
614 // Sets the current TimerHandle for the replay callback.
615 void set_timer_handler(TimerHandler *timer_handler) {
616 timer_handler_ = timer_handler;
617 }
618
619 // Sets the next wakeup time on the replay callback.
620 void Setup(monotonic_clock::time_point next_time) {
621 timer_handler_->Setup(next_time);
622 }
623
624 // Sends a buffer on the provided channel index.
625 bool Send(size_t channel_index, const void *data, size_t size,
626 aos::monotonic_clock::time_point monotonic_remote_time,
627 aos::realtime_clock::time_point realtime_remote_time,
628 uint32_t remote_queue_index) {
629 return channels_[channel_index]->Send(data, size, monotonic_remote_time,
630 realtime_remote_time,
631 remote_queue_index);
632 }
633
634 // Returns a debug string for the channel merger.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700635 std::string DebugString() const {
636 std::stringstream messages;
637 size_t i = 0;
638 for (const auto &message : sorted_messages_) {
639 if (i < 7 || i + 7 > sorted_messages_.size()) {
640 messages << "sorted_messages[" << i
641 << "]: " << std::get<0>(message).monotonic_event_time << " "
642 << configuration::StrippedChannelToString(
643 event_loop_->configuration()->channels()->Get(
644 std::get<2>(message).message().channel_index()))
645 << "\n";
646 } else if (i == 7) {
647 messages << "...\n";
648 }
649 ++i;
650 }
651 return messages.str() + channel_merger_->DebugString();
652 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700653
654 private:
655 // Log file.
656 std::unique_ptr<ChannelMerger> channel_merger_;
657
658 std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700659 FlatbufferVector<MessageHeader>,
660 message_bridge::NoncausalOffsetEstimator *>>
Austin Schuh858c9f32020-08-31 16:56:12 -0700661 sorted_messages_;
662
663 // Senders.
664 std::vector<std::unique_ptr<RawSender>> channels_;
665
666 // Factory (if we are in sim) that this loop was created on.
667 NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
668 std::unique_ptr<EventLoop> event_loop_unique_ptr_;
669 // Event loop.
670 EventLoop *event_loop_ = nullptr;
671 // And timer used to send messages.
672 TimerHandler *timer_handler_;
673
Austin Schuh8bd96322020-02-13 21:18:22 -0800674 // Filters (or nullptr if it isn't a forwarded channel) for each channel.
675 // This corresponds to the object which is shared among all the channels
676 // going between 2 nodes. The second element in the tuple indicates if this
677 // is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700678 std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800679
680 // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
681 // channel) which correspond to the originating node.
Austin Schuh858c9f32020-08-31 16:56:12 -0700682 std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800683 };
684
Austin Schuh8bd96322020-02-13 21:18:22 -0800685 // Node index -> State.
686 std::vector<std::unique_ptr<State>> states_;
687
688 // Creates the requested filter if it doesn't exist, regardless of whether
689 // these nodes can actually communicate directly. The second return value
690 // reports if this is the primary direction or not.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700691 message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
692 const Node *node_b);
Austin Schuh8bd96322020-02-13 21:18:22 -0800693
694 // FILE to write offsets to (if populated).
695 FILE *offset_fp_ = nullptr;
696 // Timestamp of the first piece of data used for the horizontal axis on the
697 // plot.
698 aos::realtime_clock::time_point first_time_;
699
700 // List of filters for a connection. The pointer to the first node will be
701 // less than the second node.
702 std::map<std::tuple<const Node *, const Node *>,
Austin Schuh2f8fd752020-09-01 22:38:28 -0700703 std::tuple<message_bridge::NoncausalOffsetEstimator>>
Austin Schuh8bd96322020-02-13 21:18:22 -0800704 filters_;
705
706 // Returns the offset from the monotonic clock for a node to the distributed
Austin Schuh2f8fd752020-09-01 22:38:28 -0700707 // clock. monotonic = distributed * slope() + offset();
708 double slope(int node_index) const {
709 CHECK_LT(node_index, time_slope_matrix_.rows())
James Kuszmaul46d82582020-05-09 19:50:09 -0700710 << ": Got too high of a node index.";
Austin Schuh2f8fd752020-09-01 22:38:28 -0700711 return time_slope_matrix_(node_index);
712 }
713 std::chrono::nanoseconds offset(int node_index) const {
714 CHECK_LT(node_index, time_offset_matrix_.rows())
715 << ": Got too high of a node index.";
716 return std::chrono::duration_cast<std::chrono::nanoseconds>(
717 std::chrono::duration<double>(time_offset_matrix_(node_index)));
Austin Schuh8bd96322020-02-13 21:18:22 -0800718 }
719
720 // Updates the offset matrix solution and sets the per-node distributed
721 // offsets in the factory.
722 void UpdateOffsets();
723
Austin Schuh2f8fd752020-09-01 22:38:28 -0700724 // We have 2 types of equations to do a least squares regression over to fully
725 // constrain our time function.
726 //
727 // One is simple. The distributed clock is the average of all the clocks.
728 // (ta + tb + tc + td) / num_nodex = t_distributed
729 //
730 // The second is a bit more complicated. Our basic time conversion function
731 // is:
732 // tb = ta + (ta * slope + offset)
733 // We can rewrite this as follows
734 // tb - (1 + slope) * ta = offset
735 //
736 // From here, we have enough equations to solve for t{a,b,c,...} We want to
737 // take as an input the offsets and slope, and solve for the per-node times as
738 // a function of the distributed clock.
739 //
740 // We need to massage our equations to make this work. If we solve for the
741 // per-node times at two set distributed clock times, we will be able to
742 // recreate the linear function (we know it is linear). We can do a similar
743 // thing by breaking our equation up into:
744 //
745 // [1/3 1/3 1/3 ] [ta] [t_distributed]
746 // [ 1 -1-m1 0 ] [tb] = [oab]
747 // [ 1 0 -1-m2 ] [tc] [oac]
748 //
749 // This solves to:
750 //
751 // [ta] [ a00 a01 a02] [t_distributed]
752 // [tb] = [ a10 a11 a12] * [oab]
753 // [tc] [ a20 a21 a22] [oac]
754 //
755 // and can be split into:
756 //
757 // [ta] [ a00 ] [a01 a02]
758 // [tb] = [ a10 ] * t_distributed + [a11 a12] * [oab]
759 // [tc] [ a20 ] [a21 a22] [oac]
760 //
761 // (map_matrix_ + slope_matrix_) * [ta; tb; tc] = [offset_matrix_];
762 // offset_matrix_ will be in nanoseconds.
763 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
764 Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> slope_matrix_;
765 Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> offset_matrix_;
766 // Matrix tracking which offsets are valid.
767 Eigen::Matrix<bool, Eigen::Dynamic, 1> valid_matrix_;
768 // Matrix tracking the last valid matrix we used to determine connected nodes.
769 Eigen::Matrix<bool, Eigen::Dynamic, 1> last_valid_matrix_;
770 size_t cached_valid_node_count_ = 0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800771
Austin Schuh2f8fd752020-09-01 22:38:28 -0700772 // [ta; tb; tc] = time_slope_matrix_ * t + time_offset_matrix;
773 // t is in seconds.
774 Eigen::Matrix<double, Eigen::Dynamic, 1> time_slope_matrix_;
775 Eigen::Matrix<double, Eigen::Dynamic, 1> time_offset_matrix_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800776
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800777 std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
778 remapped_configuration_buffer_;
779
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800780 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
781 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800782
783 // Map of channel indices to new name. The channel index will be an index into
784 // logged_configuration(), and the string key will be the name of the channel
785 // to send on instead of the logged channel name.
786 std::map<size_t, std::string> remapped_channels_;
787
Austin Schuh6f3babe2020-01-26 20:34:50 -0800788 // Number of nodes which still have data to send. This is used to figure out
789 // when to exit.
790 size_t live_nodes_ = 0;
791
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800792 const Configuration *remapped_configuration_ = nullptr;
793 const Configuration *replay_configuration_ = nullptr;
Austin Schuhcde938c2020-02-02 17:30:07 -0800794
795 // If true, the replay timer will ignore any missing data. This is used
796 // during startup when we are bootstrapping everything and trying to get to
797 // the start of all the log files.
798 bool ignore_missing_data_ = false;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800799};
800
801} // namespace logger
802} // namespace aos
803
804#endif // AOS_EVENTS_LOGGER_H_