blob: a123dcc7b433ebf7ef206ffb9b9e3eee2c3b5a36 [file] [log] [blame]
Austin Schuhe309d2a2019-11-29 13:25:21 -08001#ifndef AOS_EVENTS_LOGGER_H_
2#define AOS_EVENTS_LOGGER_H_
3
Austin Schuh8bd96322020-02-13 21:18:22 -08004#include <chrono>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <deque>
Austin Schuh05b70472020-01-01 17:11:17 -08006#include <string_view>
Austin Schuh6f3babe2020-01-26 20:34:50 -08007#include <vector>
Austin Schuhe309d2a2019-11-29 13:25:21 -08008
Austin Schuh8bd96322020-02-13 21:18:22 -08009#include "Eigen/Dense"
10#include "absl/strings/str_cat.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080011#include "absl/types/span.h"
12#include "aos/events/event_loop.h"
Austin Schuha36c8902019-12-30 18:07:15 -080013#include "aos/events/logging/logfile_utils.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080014#include "aos/events/logging/logger_generated.h"
Austin Schuh92547522019-12-28 14:33:43 -080015#include "aos/events/simulated_event_loop.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080016#include "aos/network/timestamp_filter.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080017#include "aos/time/time.h"
18#include "flatbuffers/flatbuffers.h"
19
20namespace aos {
21namespace logger {
22
Austin Schuh6f3babe2020-01-26 20:34:50 -080023class LogNamer {
24 public:
25 LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
26 virtual ~LogNamer() {}
27
28 virtual void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
29 const Node *node) = 0;
30 virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
31
32 virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
33 const std::vector<const Node *> &nodes() const { return nodes_; }
34
35 const Node *node() const { return node_; }
36
37 protected:
38 const Node *const node_;
39 std::vector<const Node *> nodes_;
40};
41
42class LocalLogNamer : public LogNamer {
43 public:
44 LocalLogNamer(DetachedBufferWriter *writer, const Node *node)
45 : LogNamer(node), writer_(writer) {}
46
47 ~LocalLogNamer() override { writer_->Flush(); }
48
49 void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
50 const Node *node) override {
51 CHECK_EQ(node, this->node());
52 writer_->WriteSizedFlatbuffer(
53 absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
54 }
55
56 DetachedBufferWriter *MakeWriter(const Channel *channel) override {
57 CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
58 return writer_;
59 }
60
61 DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
62 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
63 << ": Message is not delivered to this node.";
64 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
65 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
66 node_))
67 << ": Delivery times aren't logged for this channel on this node.";
68 return writer_;
69 }
70
71 private:
72 DetachedBufferWriter *writer_;
73};
74
75// TODO(austin): Split naming files from making files so we can re-use the
76// naming code to predict the log file names for a provided base name.
77class MultiNodeLogNamer : public LogNamer {
78 public:
79 MultiNodeLogNamer(std::string_view base_name,
80 const Configuration *configuration, const Node *node)
81 : LogNamer(node),
82 base_name_(base_name),
83 configuration_(configuration),
84 data_writer_(std::make_unique<DetachedBufferWriter>(absl::StrCat(
85 base_name_, "_", node->name()->string_view(), "_data.bfbs"))) {}
86
87 // Writes the header to all log files for a specific node. This function
88 // needs to be called after all the writers are created.
89 void WriteHeader(flatbuffers::FlatBufferBuilder *fbb, const Node *node) {
90 if (node == this->node()) {
91 data_writer_->WriteSizedFlatbuffer(
92 absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
93 } else {
94 for (std::pair<const Channel *const,
95 std::unique_ptr<DetachedBufferWriter>> &data_writer :
96 data_writers_) {
97 if (configuration::ChannelIsSendableOnNode(data_writer.first, node)) {
98 data_writer.second->WriteSizedFlatbuffer(absl::Span<const uint8_t>(
99 fbb->GetBufferPointer(), fbb->GetSize()));
100 }
101 }
102 }
103 }
104
105 // Makes a data logger for a specific channel.
106 DetachedBufferWriter *MakeWriter(const Channel *channel) {
107 // See if we can read the data on this node at all.
108 const bool is_readable =
109 configuration::ChannelIsReadableOnNode(channel, this->node());
110 if (!is_readable) {
111 return nullptr;
112 }
113
114 // Then, see if we are supposed to log the data here.
115 const bool log_message =
116 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
117
118 if (!log_message) {
119 return nullptr;
120 }
121
122 // Now, sort out if this is data generated on this node, or not. It is
123 // generated if it is sendable on this node.
124 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
125 return data_writer_.get();
126 } else {
127 // Ok, we have data that is being forwarded to us that we are supposed to
128 // log. It needs to be logged with send timestamps, but be sorted enough
129 // to be able to be processed.
130 CHECK(data_writers_.find(channel) == data_writers_.end());
131
132 // Track that this node is being logged.
133 if (configuration::MultiNode(configuration_)) {
134 const Node *source_node = configuration::GetNode(
135 configuration_, channel->source_node()->string_view());
136 if (std::find(nodes_.begin(), nodes_.end(), source_node) ==
137 nodes_.end()) {
138 nodes_.emplace_back(source_node);
139 }
140 }
141
142 return data_writers_
143 .insert(std::make_pair(
144 channel,
145 std::make_unique<DetachedBufferWriter>(absl::StrCat(
146 base_name_, "_", channel->source_node()->string_view(),
147 "_data", channel->name()->string_view(), "/",
148 channel->type()->string_view(), ".bfbs"))))
149 .first->second.get();
150 }
151 }
152
153 // Makes a timestamp (or timestamp and data) logger for a channel and
154 // forwarding connection.
155 DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) {
156 const bool log_delivery_times =
157 (this->node() == nullptr)
158 ? false
159 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
160 channel, this->node(), this->node());
161 if (!log_delivery_times) {
162 return nullptr;
163 }
164
165 return data_writer_.get();
166 }
167
168 const std::vector<const Node *> &nodes() const { return nodes_; }
169
170 private:
171 const std::string base_name_;
172 const Configuration *const configuration_;
173
174 // File to write both delivery timestamps and local data to.
175 std::unique_ptr<DetachedBufferWriter> data_writer_;
176 // Files to write remote data to. We want one per channel.
177 std::map<const Channel *, std::unique_ptr<DetachedBufferWriter>>
178 data_writers_;
179};
180
Austin Schuh8bd96322020-02-13 21:18:22 -0800181
Austin Schuhe309d2a2019-11-29 13:25:21 -0800182// Logs all channels available in the event loop to disk every 100 ms.
183// Start by logging one message per channel to capture any state and
184// configuration that is sent rately on a channel and would affect execution.
185class Logger {
186 public:
187 Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
188 std::chrono::milliseconds polling_period =
189 std::chrono::milliseconds(100));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800190 Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
191 std::chrono::milliseconds polling_period =
192 std::chrono::milliseconds(100));
Austin Schuhe309d2a2019-11-29 13:25:21 -0800193
Austin Schuhfa895892020-01-07 20:07:41 -0800194 // Rotates the log file with the new writer. This writes out the header
195 // again, but keeps going as if nothing else happened.
196 void Rotate(DetachedBufferWriter *writer);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800197 void Rotate(std::unique_ptr<LogNamer> log_namer);
Austin Schuhfa895892020-01-07 20:07:41 -0800198
Austin Schuhe309d2a2019-11-29 13:25:21 -0800199 private:
Austin Schuhfa895892020-01-07 20:07:41 -0800200 void WriteHeader();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800201 void WriteHeader(const Node *node);
Austin Schuhfa895892020-01-07 20:07:41 -0800202
Austin Schuhe309d2a2019-11-29 13:25:21 -0800203 void DoLogData();
204
205 EventLoop *event_loop_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800206 std::unique_ptr<LogNamer> log_namer_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800207
208 // Structure to track both a fetcher, and if the data fetched has been
209 // written. We may want to delay writing data to disk so that we don't let
210 // data get too far out of order when written to disk so we can avoid making
211 // it too hard to sort when reading.
212 struct FetcherStruct {
213 std::unique_ptr<RawFetcher> fetcher;
214 bool written = false;
Austin Schuh15649d62019-12-28 16:36:38 -0800215
Austin Schuh6f3babe2020-01-26 20:34:50 -0800216 int channel_index = -1;
217
218 LogType log_type = LogType::kLogMessage;
219
220 DetachedBufferWriter *writer = nullptr;
221 DetachedBufferWriter *timestamp_writer = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800222 };
223
224 std::vector<FetcherStruct> fetchers_;
225 TimerHandler *timer_handler_;
226
227 // Period to poll the channels.
228 const std::chrono::milliseconds polling_period_;
229
230 // Last time that data was written for all channels to disk.
231 monotonic_clock::time_point last_synchronized_time_;
232
Austin Schuhfa895892020-01-07 20:07:41 -0800233 monotonic_clock::time_point monotonic_start_time_;
234 realtime_clock::time_point realtime_start_time_;
235
Austin Schuhe309d2a2019-11-29 13:25:21 -0800236 // Max size that the header has consumed. This much extra data will be
237 // reserved in the builder to avoid reallocating.
238 size_t max_header_size_ = 0;
239};
240
Austin Schuh6f3babe2020-01-26 20:34:50 -0800241// We end up with one of the following 3 log file types.
242//
243// Single node logged as the source node.
244// -> Replayed just on the source node.
245//
246// Forwarding timestamps only logged from the perspective of the destination
247// node.
248// -> Matched with data on source node and logged.
249//
250// Forwarding timestamps with data logged as the destination node.
251// -> Replayed just as the destination
252// -> Replayed as the source (Much harder, ordering is not defined)
253//
254// Duplicate data logged. -> CHECK that it matches and explode otherwise.
255//
256// This can be boiled down to a set of constraints and tools.
257//
258// 1) Forwarding timestamps and data need to be logged separately.
259// 2) Any forwarded data logged on the destination node needs to be logged
260// separately such that it can be sorted.
261//
262// 1) Log reader needs to be able to sort a list of log files.
263// 2) Log reader needs to be able to merge sorted lists of log files.
264// 3) Log reader needs to be able to match timestamps with messages.
265//
266// We also need to be able to generate multiple views of a log file depending on
267// the target.
268
Austin Schuhe309d2a2019-11-29 13:25:21 -0800269// Replays all the channels in the logfile to the event loop.
270class LogReader {
271 public:
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800272 // If you want to supply a new configuration that will be used for replay
273 // (e.g., to change message rates, or to populate an updated schema), then
274 // pass it in here. It must provide all the channels that the original logged
275 // config did.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800276 //
277 // Log filenames are in the following format:
278 //
279 // {
280 // {log1_part0, log1_part1, ...},
281 // {log2}
282 // }
283 // The inner vector is a list of log file chunks which form up a log file.
284 // The outer vector is a list of log files with subsets of the messages, or
285 // messages from different nodes.
286 //
287 // If the outer vector isn't provided, it is assumed to be of size 1.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800288 LogReader(std::string_view filename,
289 const Configuration *replay_configuration = nullptr);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800290 LogReader(const std::vector<std::string> &filenames,
291 const Configuration *replay_configuration = nullptr);
292 LogReader(const std::vector<std::vector<std::string>> &filenames,
Austin Schuhfa895892020-01-07 20:07:41 -0800293 const Configuration *replay_configuration = nullptr);
James Kuszmaul7daef362019-12-31 18:28:17 -0800294 ~LogReader();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800295
Austin Schuh6331ef92020-01-07 18:28:09 -0800296 // Registers all the callbacks to send the log file data out on an event loop
297 // created in event_loop_factory. This also updates time to be at the start
298 // of the log file by running until the log file starts.
299 // Note: the configuration used in the factory should be configuration()
300 // below, but can be anything as long as the locations needed to send
301 // everything are available.
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800302 void Register(SimulatedEventLoopFactory *event_loop_factory);
Austin Schuh6331ef92020-01-07 18:28:09 -0800303 // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
304 // and then calls Register.
305 void Register();
306 // Registers callbacks for all the events after the log file starts. This is
307 // only useful when replaying live.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800308 void Register(EventLoop *event_loop);
Austin Schuh6331ef92020-01-07 18:28:09 -0800309
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800310 // Unregisters the senders. You only need to call this if you separately
311 // supplied an event loop or event loop factory and the lifetimes are such
312 // that they need to be explicitly destroyed before the LogReader destructor
313 // gets called.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800314 void Deregister();
315
Austin Schuhe309d2a2019-11-29 13:25:21 -0800316 // Returns the configuration from the log file.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800317 const Configuration *logged_configuration() const;
318 // Returns the configuration being used for replay.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800319 // The pointer is invalidated whenever RemapLoggedChannel is called.
Austin Schuh15649d62019-12-28 16:36:38 -0800320 const Configuration *configuration() const;
321
Austin Schuh6f3babe2020-01-26 20:34:50 -0800322 // Returns the nodes that this log file was created on. This is a list of
323 // pointers to a node in the nodes() list inside configuration(). The
324 // pointers here are invalidated whenever RemapLoggedChannel is called.
325 std::vector<const Node *> Nodes() const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800326
327 // Returns the starting timestamp for the log file.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800328 monotonic_clock::time_point monotonic_start_time(const Node *node = nullptr);
329 realtime_clock::time_point realtime_start_time(const Node *node = nullptr);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800330
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800331 // Causes the logger to publish the provided channel on a different name so
332 // that replayed applications can publish on the proper channel name without
333 // interference. This operates on raw channel names, without any node or
334 // application specific mappings.
335 void RemapLoggedChannel(std::string_view name, std::string_view type,
336 std::string_view add_prefix = "/original");
337 template <typename T>
338 void RemapLoggedChannel(std::string_view name,
339 std::string_view add_prefix = "/original") {
340 RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
341 }
342
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700343 template <typename T>
344 bool HasChannel(std::string_view name) {
345 return configuration::GetChannel(log_file_header()->configuration(), name,
346 T::GetFullyQualifiedName(), "",
347 nullptr) != nullptr;
348 }
349
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800350 SimulatedEventLoopFactory *event_loop_factory() {
351 return event_loop_factory_;
352 }
353
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700354 const LogFileHeader *log_file_header() const {
355 return &log_file_header_.message();
356 }
357
Austin Schuhe309d2a2019-11-29 13:25:21 -0800358 private:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800359 const Channel *RemapChannel(const EventLoop *event_loop,
360 const Channel *channel);
361
Austin Schuhe309d2a2019-11-29 13:25:21 -0800362 // Queues at least max_out_of_order_duration_ messages into channels_.
363 void QueueMessages();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800364 // Handle constructing a configuration with all the additional remapped
365 // channels from calls to RemapLoggedChannel.
366 void MakeRemappedConfig();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800367
Austin Schuh6f3babe2020-01-26 20:34:50 -0800368 const std::vector<std::vector<std::string>> filenames_;
369
370 // This is *a* log file header used to provide the logged config. The rest of
371 // the header is likely distracting.
372 FlatbufferVector<LogFileHeader> log_file_header_;
373
Austin Schuh8bd96322020-02-13 21:18:22 -0800374 Eigen::Matrix<double, Eigen::Dynamic, 1> SolveOffsets();
375
Austin Schuh6f3babe2020-01-26 20:34:50 -0800376 // State per node.
Austin Schuh858c9f32020-08-31 16:56:12 -0700377 class State {
378 public:
379 State(std::unique_ptr<ChannelMerger> channel_merger);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800380
Austin Schuh858c9f32020-08-31 16:56:12 -0700381 // Returns the timestamps, channel_index, and message from a channel.
382 // update_time (will be) set to true when popping this message causes the
383 // filter to change the time offset estimation function.
384 std::tuple<TimestampMerger::DeliveryTimestamp, int,
385 FlatbufferVector<MessageHeader>>
386 PopOldest(bool *update_time);
387
388 // Returns the monotonic time of the oldest message.
389 monotonic_clock::time_point OldestMessageTime() const;
390
391 // Primes the queues inside State. Should be called before calling
392 // OldestMessageTime.
393 void SeedSortedMessages();
Austin Schuh8bd96322020-02-13 21:18:22 -0800394
395 // Updates the timestamp filter with the timestamp. Returns true if the
396 // provided timestamp was actually a forwarding timestamp and used, and
397 // false otherwise.
398 bool MaybeUpdateTimestamp(
399 const TimestampMerger::DeliveryTimestamp &channel_timestamp,
400 int channel_index);
401
Austin Schuh858c9f32020-08-31 16:56:12 -0700402 // Returns the starting time for this node.
403 monotonic_clock::time_point monotonic_start_time() const {
404 return channel_merger_->monotonic_start_time();
405 }
406 realtime_clock::time_point realtime_start_time() const {
407 return channel_merger_->realtime_start_time();
408 }
409
410 // Sets the node event loop factory for replaying into a
411 // SimulatedEventLoopFactory. Returns the EventLoop to use.
412 EventLoop *SetNodeEventLoopFactory(
413 NodeEventLoopFactory *node_event_loop_factory);
414
415 // Sets and gets the event loop to use.
416 void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
417 EventLoop *event_loop() { return event_loop_; }
418
419 // Returns the oldest timestamp for the provided channel. This should only
420 // be called before SeedSortedMessages();
421 TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
422 size_t channel) {
423 return channel_merger_->OldestTimestampForChannel(channel);
424 }
425
426 // Sets the current realtime offset from the monotonic clock for this node
427 // (if we are on a simulated event loop).
428 void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
429 realtime_clock::time_point realtime_time) {
430 if (node_event_loop_factory_ != nullptr) {
431 node_event_loop_factory_->SetRealtimeOffset(monotonic_time,
432 realtime_time);
433 }
434 }
435
436 // Converts a timestamp from the monotonic clock on this node to the
437 // distributed clock.
438 distributed_clock::time_point ToDistributedClock(
439 monotonic_clock::time_point time) {
440 return node_event_loop_factory_->ToDistributedClock(time);
441 }
442
443 // Sets the offset (and slope) from the distributed clock.
444 void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
445 double distributed_slope) {
446 node_event_loop_factory_->SetDistributedOffset(distributed_offset,
447 distributed_slope);
448 }
449
450 // Returns the current time on the remote node which sends messages on
451 // channel_index.
452 monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
453 return channel_target_event_loop_factory_[channel_index]->monotonic_now();
454 }
455
456 // Sets the node we will be merging as, and returns true if there is any
457 // data on it.
458 bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
459
460 // Sets the number of channels.
461 void SetChannelCount(size_t count);
462
463 // Sets the sender, filter, and target factory for a channel.
464 void SetChannel(
465 size_t channel, std::unique_ptr<RawSender> sender,
466 std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
467 NodeEventLoopFactory *channel_target_event_loop_factory);
468
469 // Returns if we have read all the messages from all the logs.
470 bool at_end() const { return channel_merger_->at_end(); }
471
472 // Unregisters everything so we can destory the event loop.
473 void Deregister();
474
475 // Sets the current TimerHandle for the replay callback.
476 void set_timer_handler(TimerHandler *timer_handler) {
477 timer_handler_ = timer_handler;
478 }
479
480 // Sets the next wakeup time on the replay callback.
481 void Setup(monotonic_clock::time_point next_time) {
482 timer_handler_->Setup(next_time);
483 }
484
485 // Sends a buffer on the provided channel index.
486 bool Send(size_t channel_index, const void *data, size_t size,
487 aos::monotonic_clock::time_point monotonic_remote_time,
488 aos::realtime_clock::time_point realtime_remote_time,
489 uint32_t remote_queue_index) {
490 return channels_[channel_index]->Send(data, size, monotonic_remote_time,
491 realtime_remote_time,
492 remote_queue_index);
493 }
494
495 // Returns a debug string for the channel merger.
496 std::string DebugString() const { return channel_merger_->DebugString(); }
497
498 private:
499 // Log file.
500 std::unique_ptr<ChannelMerger> channel_merger_;
501
502 std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
503 FlatbufferVector<MessageHeader>>>
504 sorted_messages_;
505
506 // Senders.
507 std::vector<std::unique_ptr<RawSender>> channels_;
508
509 // Factory (if we are in sim) that this loop was created on.
510 NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
511 std::unique_ptr<EventLoop> event_loop_unique_ptr_;
512 // Event loop.
513 EventLoop *event_loop_ = nullptr;
514 // And timer used to send messages.
515 TimerHandler *timer_handler_;
516
Austin Schuh8bd96322020-02-13 21:18:22 -0800517 // Filters (or nullptr if it isn't a forwarded channel) for each channel.
518 // This corresponds to the object which is shared among all the channels
519 // going between 2 nodes. The second element in the tuple indicates if this
520 // is the primary direction or not.
521 std::vector<std::tuple<message_bridge::ClippedAverageFilter *, bool>>
Austin Schuh858c9f32020-08-31 16:56:12 -0700522 filters_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800523
524 // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
525 // channel) which correspond to the originating node.
Austin Schuh858c9f32020-08-31 16:56:12 -0700526 std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800527 };
528
Austin Schuh8bd96322020-02-13 21:18:22 -0800529 // Node index -> State.
530 std::vector<std::unique_ptr<State>> states_;
531
532 // Creates the requested filter if it doesn't exist, regardless of whether
533 // these nodes can actually communicate directly. The second return value
534 // reports if this is the primary direction or not.
535 std::tuple<message_bridge::ClippedAverageFilter *, bool> GetFilter(
536 const Node *node_a, const Node *node_b);
537
538 // FILE to write offsets to (if populated).
539 FILE *offset_fp_ = nullptr;
540 // Timestamp of the first piece of data used for the horizontal axis on the
541 // plot.
542 aos::realtime_clock::time_point first_time_;
543
544 // List of filters for a connection. The pointer to the first node will be
545 // less than the second node.
546 std::map<std::tuple<const Node *, const Node *>,
547 message_bridge::ClippedAverageFilter>
548 filters_;
549
550 // Returns the offset from the monotonic clock for a node to the distributed
551 // clock. distributed = monotonic + offset;
552 std::chrono::nanoseconds offset(int node_index) const {
James Kuszmaul46d82582020-05-09 19:50:09 -0700553 CHECK_LT(node_index, offset_matrix_.rows())
554 << ": Got too high of a node index.";
Austin Schuh8bd96322020-02-13 21:18:22 -0800555 return -std::chrono::duration_cast<std::chrono::nanoseconds>(
556 std::chrono::duration<double>(offset_matrix_(node_index))) -
557 base_offset_matrix_(node_index);
558 }
559
560 // Updates the offset matrix solution and sets the per-node distributed
561 // offsets in the factory.
562 void UpdateOffsets();
563
564 // sample_matrix_ = map_matrix_ * offset_matrix_
565 Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
566 Eigen::Matrix<double, Eigen::Dynamic, 1> sample_matrix_;
567 Eigen::Matrix<double, Eigen::Dynamic, 1> offset_matrix_;
568
569 // Base offsets. The actual offset is the sum of this and the offset matrix.
570 // This removes some of the dynamic range challenges from the double above.
571 Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>
572 base_offset_matrix_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800573
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800574 std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
575 remapped_configuration_buffer_;
576
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800577 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
578 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800579
580 // Map of channel indices to new name. The channel index will be an index into
581 // logged_configuration(), and the string key will be the name of the channel
582 // to send on instead of the logged channel name.
583 std::map<size_t, std::string> remapped_channels_;
584
Austin Schuh6f3babe2020-01-26 20:34:50 -0800585 // Number of nodes which still have data to send. This is used to figure out
586 // when to exit.
587 size_t live_nodes_ = 0;
588
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800589 const Configuration *remapped_configuration_ = nullptr;
590 const Configuration *replay_configuration_ = nullptr;
Austin Schuhcde938c2020-02-02 17:30:07 -0800591
592 // If true, the replay timer will ignore any missing data. This is used
593 // during startup when we are bootstrapping everything and trying to get to
594 // the start of all the log files.
595 bool ignore_missing_data_ = false;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800596};
597
598} // namespace logger
599} // namespace aos
600
601#endif // AOS_EVENTS_LOGGER_H_