blob: c08d6d482afe3cf5321481d0f0e2d4aa2c2066b0 [file] [log] [blame]
Austin Schuhe309d2a2019-11-29 13:25:21 -08001#ifndef AOS_EVENTS_LOGGER_H_
2#define AOS_EVENTS_LOGGER_H_
3
Austin Schuh8bd96322020-02-13 21:18:22 -08004#include <chrono>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <deque>
Austin Schuh05b70472020-01-01 17:11:17 -08006#include <string_view>
Austin Schuh6f3babe2020-01-26 20:34:50 -08007#include <vector>
Austin Schuhe309d2a2019-11-29 13:25:21 -08008
Austin Schuh8bd96322020-02-13 21:18:22 -08009#include "Eigen/Dense"
10#include "absl/strings/str_cat.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080011#include "absl/types/span.h"
12#include "aos/events/event_loop.h"
Austin Schuha36c8902019-12-30 18:07:15 -080013#include "aos/events/logging/logfile_utils.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080014#include "aos/events/logging/logger_generated.h"
Austin Schuh92547522019-12-28 14:33:43 -080015#include "aos/events/simulated_event_loop.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080016#include "aos/network/timestamp_filter.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080017#include "aos/time/time.h"
18#include "flatbuffers/flatbuffers.h"
19
20namespace aos {
21namespace logger {
22
Austin Schuh6f3babe2020-01-26 20:34:50 -080023class LogNamer {
24 public:
25 LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
26 virtual ~LogNamer() {}
27
28 virtual void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
29 const Node *node) = 0;
30 virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
31
32 virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
33 const std::vector<const Node *> &nodes() const { return nodes_; }
34
35 const Node *node() const { return node_; }
36
37 protected:
38 const Node *const node_;
39 std::vector<const Node *> nodes_;
40};
41
42class LocalLogNamer : public LogNamer {
43 public:
44 LocalLogNamer(DetachedBufferWriter *writer, const Node *node)
45 : LogNamer(node), writer_(writer) {}
46
47 ~LocalLogNamer() override { writer_->Flush(); }
48
49 void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
50 const Node *node) override {
51 CHECK_EQ(node, this->node());
52 writer_->WriteSizedFlatbuffer(
53 absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
54 }
55
56 DetachedBufferWriter *MakeWriter(const Channel *channel) override {
57 CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
58 return writer_;
59 }
60
61 DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
62 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
63 << ": Message is not delivered to this node.";
64 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
65 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
66 node_))
67 << ": Delivery times aren't logged for this channel on this node.";
68 return writer_;
69 }
70
71 private:
72 DetachedBufferWriter *writer_;
73};
74
75// TODO(austin): Split naming files from making files so we can re-use the
76// naming code to predict the log file names for a provided base name.
77class MultiNodeLogNamer : public LogNamer {
78 public:
79 MultiNodeLogNamer(std::string_view base_name,
80 const Configuration *configuration, const Node *node)
81 : LogNamer(node),
82 base_name_(base_name),
83 configuration_(configuration),
84 data_writer_(std::make_unique<DetachedBufferWriter>(absl::StrCat(
85 base_name_, "_", node->name()->string_view(), "_data.bfbs"))) {}
86
87 // Writes the header to all log files for a specific node. This function
88 // needs to be called after all the writers are created.
89 void WriteHeader(flatbuffers::FlatBufferBuilder *fbb, const Node *node) {
90 if (node == this->node()) {
91 data_writer_->WriteSizedFlatbuffer(
92 absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
93 } else {
94 for (std::pair<const Channel *const,
95 std::unique_ptr<DetachedBufferWriter>> &data_writer :
96 data_writers_) {
97 if (configuration::ChannelIsSendableOnNode(data_writer.first, node)) {
98 data_writer.second->WriteSizedFlatbuffer(absl::Span<const uint8_t>(
99 fbb->GetBufferPointer(), fbb->GetSize()));
100 }
101 }
102 }
103 }
104
105 // Makes a data logger for a specific channel.
106 DetachedBufferWriter *MakeWriter(const Channel *channel) {
107 // See if we can read the data on this node at all.
108 const bool is_readable =
109 configuration::ChannelIsReadableOnNode(channel, this->node());
110 if (!is_readable) {
111 return nullptr;
112 }
113
114 // Then, see if we are supposed to log the data here.
115 const bool log_message =
116 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
117
118 if (!log_message) {
119 return nullptr;
120 }
121
122 // Now, sort out if this is data generated on this node, or not. It is
123 // generated if it is sendable on this node.
124 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
125 return data_writer_.get();
126 } else {
127 // Ok, we have data that is being forwarded to us that we are supposed to
128 // log. It needs to be logged with send timestamps, but be sorted enough
129 // to be able to be processed.
130 CHECK(data_writers_.find(channel) == data_writers_.end());
131
132 // Track that this node is being logged.
133 if (configuration::MultiNode(configuration_)) {
134 const Node *source_node = configuration::GetNode(
135 configuration_, channel->source_node()->string_view());
136 if (std::find(nodes_.begin(), nodes_.end(), source_node) ==
137 nodes_.end()) {
138 nodes_.emplace_back(source_node);
139 }
140 }
141
142 return data_writers_
143 .insert(std::make_pair(
144 channel,
145 std::make_unique<DetachedBufferWriter>(absl::StrCat(
146 base_name_, "_", channel->source_node()->string_view(),
147 "_data", channel->name()->string_view(), "/",
148 channel->type()->string_view(), ".bfbs"))))
149 .first->second.get();
150 }
151 }
152
153 // Makes a timestamp (or timestamp and data) logger for a channel and
154 // forwarding connection.
155 DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) {
156 const bool log_delivery_times =
157 (this->node() == nullptr)
158 ? false
159 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
160 channel, this->node(), this->node());
161 if (!log_delivery_times) {
162 return nullptr;
163 }
164
165 return data_writer_.get();
166 }
167
168 const std::vector<const Node *> &nodes() const { return nodes_; }
169
170 private:
171 const std::string base_name_;
172 const Configuration *const configuration_;
173
174 // File to write both delivery timestamps and local data to.
175 std::unique_ptr<DetachedBufferWriter> data_writer_;
176 // Files to write remote data to. We want one per channel.
177 std::map<const Channel *, std::unique_ptr<DetachedBufferWriter>>
178 data_writers_;
179};
180
Austin Schuh8bd96322020-02-13 21:18:22 -0800181
Austin Schuhe309d2a2019-11-29 13:25:21 -0800182// Logs all channels available in the event loop to disk every 100 ms.
183// Start by logging one message per channel to capture any state and
184// configuration that is sent rately on a channel and would affect execution.
185class Logger {
186 public:
187 Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
188 std::chrono::milliseconds polling_period =
189 std::chrono::milliseconds(100));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800190 Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
191 std::chrono::milliseconds polling_period =
192 std::chrono::milliseconds(100));
Austin Schuhe309d2a2019-11-29 13:25:21 -0800193
Austin Schuhfa895892020-01-07 20:07:41 -0800194 // Rotates the log file with the new writer. This writes out the header
195 // again, but keeps going as if nothing else happened.
196 void Rotate(DetachedBufferWriter *writer);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800197 void Rotate(std::unique_ptr<LogNamer> log_namer);
Austin Schuhfa895892020-01-07 20:07:41 -0800198
Austin Schuhe309d2a2019-11-29 13:25:21 -0800199 private:
Austin Schuhfa895892020-01-07 20:07:41 -0800200 void WriteHeader();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800201 void WriteHeader(const Node *node);
Austin Schuhfa895892020-01-07 20:07:41 -0800202
Austin Schuhe309d2a2019-11-29 13:25:21 -0800203 void DoLogData();
204
205 EventLoop *event_loop_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800206 std::unique_ptr<LogNamer> log_namer_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800207
208 // Structure to track both a fetcher, and if the data fetched has been
209 // written. We may want to delay writing data to disk so that we don't let
210 // data get too far out of order when written to disk so we can avoid making
211 // it too hard to sort when reading.
212 struct FetcherStruct {
213 std::unique_ptr<RawFetcher> fetcher;
214 bool written = false;
Austin Schuh15649d62019-12-28 16:36:38 -0800215
Austin Schuh6f3babe2020-01-26 20:34:50 -0800216 int channel_index = -1;
217
218 LogType log_type = LogType::kLogMessage;
219
220 DetachedBufferWriter *writer = nullptr;
221 DetachedBufferWriter *timestamp_writer = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800222 };
223
224 std::vector<FetcherStruct> fetchers_;
225 TimerHandler *timer_handler_;
226
227 // Period to poll the channels.
228 const std::chrono::milliseconds polling_period_;
229
230 // Last time that data was written for all channels to disk.
231 monotonic_clock::time_point last_synchronized_time_;
232
Austin Schuhfa895892020-01-07 20:07:41 -0800233 monotonic_clock::time_point monotonic_start_time_;
234 realtime_clock::time_point realtime_start_time_;
235
Austin Schuhe309d2a2019-11-29 13:25:21 -0800236 // Max size that the header has consumed. This much extra data will be
237 // reserved in the builder to avoid reallocating.
238 size_t max_header_size_ = 0;
239};
240
Austin Schuh6f3babe2020-01-26 20:34:50 -0800241// We end up with one of the following 3 log file types.
242//
243// Single node logged as the source node.
244// -> Replayed just on the source node.
245//
246// Forwarding timestamps only logged from the perspective of the destination
247// node.
248// -> Matched with data on source node and logged.
249//
250// Forwarding timestamps with data logged as the destination node.
251// -> Replayed just as the destination
252// -> Replayed as the source (Much harder, ordering is not defined)
253//
254// Duplicate data logged. -> CHECK that it matches and explode otherwise.
255//
256// This can be boiled down to a set of constraints and tools.
257//
258// 1) Forwarding timestamps and data need to be logged separately.
259// 2) Any forwarded data logged on the destination node needs to be logged
260// separately such that it can be sorted.
261//
262// 1) Log reader needs to be able to sort a list of log files.
263// 2) Log reader needs to be able to merge sorted lists of log files.
264// 3) Log reader needs to be able to match timestamps with messages.
265//
266// We also need to be able to generate multiple views of a log file depending on
267// the target.
268
Austin Schuhe309d2a2019-11-29 13:25:21 -0800269// Replays all the channels in the logfile to the event loop.
270class LogReader {
271 public:
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800272 // If you want to supply a new configuration that will be used for replay
273 // (e.g., to change message rates, or to populate an updated schema), then
274 // pass it in here. It must provide all the channels that the original logged
275 // config did.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800276 //
277 // Log filenames are in the following format:
278 //
279 // {
280 // {log1_part0, log1_part1, ...},
281 // {log2}
282 // }
283 // The inner vector is a list of log file chunks which form up a log file.
284 // The outer vector is a list of log files with subsets of the messages, or
285 // messages from different nodes.
286 //
287 // If the outer vector isn't provided, it is assumed to be of size 1.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800288 LogReader(std::string_view filename,
289 const Configuration *replay_configuration = nullptr);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800290 LogReader(const std::vector<std::string> &filenames,
291 const Configuration *replay_configuration = nullptr);
292 LogReader(const std::vector<std::vector<std::string>> &filenames,
Austin Schuhfa895892020-01-07 20:07:41 -0800293 const Configuration *replay_configuration = nullptr);
James Kuszmaul7daef362019-12-31 18:28:17 -0800294 ~LogReader();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800295
Austin Schuh6331ef92020-01-07 18:28:09 -0800296 // Registers all the callbacks to send the log file data out on an event loop
297 // created in event_loop_factory. This also updates time to be at the start
298 // of the log file by running until the log file starts.
299 // Note: the configuration used in the factory should be configuration()
300 // below, but can be anything as long as the locations needed to send
301 // everything are available.
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800302 void Register(SimulatedEventLoopFactory *event_loop_factory);
Austin Schuh6331ef92020-01-07 18:28:09 -0800303 // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
304 // and then calls Register.
305 void Register();
306 // Registers callbacks for all the events after the log file starts. This is
307 // only useful when replaying live.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800308 void Register(EventLoop *event_loop);
Austin Schuh6331ef92020-01-07 18:28:09 -0800309
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800310 // Unregisters the senders. You only need to call this if you separately
311 // supplied an event loop or event loop factory and the lifetimes are such
312 // that they need to be explicitly destroyed before the LogReader destructor
313 // gets called.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800314 void Deregister();
315
Austin Schuhe309d2a2019-11-29 13:25:21 -0800316 // Returns the configuration from the log file.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800317 const Configuration *logged_configuration() const;
318 // Returns the configuration being used for replay.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800319 // The pointer is invalidated whenever RemapLoggedChannel is called.
Austin Schuh15649d62019-12-28 16:36:38 -0800320 const Configuration *configuration() const;
321
Austin Schuh6f3babe2020-01-26 20:34:50 -0800322 // Returns the nodes that this log file was created on. This is a list of
323 // pointers to a node in the nodes() list inside configuration(). The
324 // pointers here are invalidated whenever RemapLoggedChannel is called.
325 std::vector<const Node *> Nodes() const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800326
327 // Returns the starting timestamp for the log file.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800328 monotonic_clock::time_point monotonic_start_time(const Node *node = nullptr);
329 realtime_clock::time_point realtime_start_time(const Node *node = nullptr);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800330
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800331 // Causes the logger to publish the provided channel on a different name so
332 // that replayed applications can publish on the proper channel name without
333 // interference. This operates on raw channel names, without any node or
334 // application specific mappings.
335 void RemapLoggedChannel(std::string_view name, std::string_view type,
336 std::string_view add_prefix = "/original");
337 template <typename T>
338 void RemapLoggedChannel(std::string_view name,
339 std::string_view add_prefix = "/original") {
340 RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
341 }
342
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700343 template <typename T>
344 bool HasChannel(std::string_view name) {
345 return configuration::GetChannel(log_file_header()->configuration(), name,
346 T::GetFullyQualifiedName(), "",
347 nullptr) != nullptr;
348 }
349
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800350 SimulatedEventLoopFactory *event_loop_factory() {
351 return event_loop_factory_;
352 }
353
Brian Silvermande9f3ff2020-04-28 16:56:58 -0700354 const LogFileHeader *log_file_header() const {
355 return &log_file_header_.message();
356 }
357
Austin Schuhe309d2a2019-11-29 13:25:21 -0800358 private:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800359 const Channel *RemapChannel(const EventLoop *event_loop,
360 const Channel *channel);
361
Austin Schuhe309d2a2019-11-29 13:25:21 -0800362 // Queues at least max_out_of_order_duration_ messages into channels_.
363 void QueueMessages();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800364 // Handle constructing a configuration with all the additional remapped
365 // channels from calls to RemapLoggedChannel.
366 void MakeRemappedConfig();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800367
Austin Schuh6f3babe2020-01-26 20:34:50 -0800368 const std::vector<std::vector<std::string>> filenames_;
369
370 // This is *a* log file header used to provide the logged config. The rest of
371 // the header is likely distracting.
372 FlatbufferVector<LogFileHeader> log_file_header_;
373
Austin Schuh8bd96322020-02-13 21:18:22 -0800374 Eigen::Matrix<double, Eigen::Dynamic, 1> SolveOffsets();
375
Austin Schuh6f3babe2020-01-26 20:34:50 -0800376 // State per node.
377 struct State {
378 // Log file.
379 std::unique_ptr<ChannelMerger> channel_merger;
380 // Senders.
381 std::vector<std::unique_ptr<RawSender>> channels;
382
383 // Factory (if we are in sim) that this loop was created on.
384 NodeEventLoopFactory *node_event_loop_factory = nullptr;
385 std::unique_ptr<EventLoop> event_loop_unique_ptr;
386 // Event loop.
387 EventLoop *event_loop = nullptr;
388 // And timer used to send messages.
389 TimerHandler *timer_handler;
Austin Schuh8bd96322020-02-13 21:18:22 -0800390
391 // Updates the timestamp filter with the timestamp. Returns true if the
392 // provided timestamp was actually a forwarding timestamp and used, and
393 // false otherwise.
394 bool MaybeUpdateTimestamp(
395 const TimestampMerger::DeliveryTimestamp &channel_timestamp,
396 int channel_index);
397
398 // Filters (or nullptr if it isn't a forwarded channel) for each channel.
399 // This corresponds to the object which is shared among all the channels
400 // going between 2 nodes. The second element in the tuple indicates if this
401 // is the primary direction or not.
402 std::vector<std::tuple<message_bridge::ClippedAverageFilter *, bool>>
403 filters;
404
405 // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
406 // channel) which correspond to the originating node.
407 std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800408 };
409
Austin Schuh8bd96322020-02-13 21:18:22 -0800410 // Node index -> State.
411 std::vector<std::unique_ptr<State>> states_;
412
413 // Creates the requested filter if it doesn't exist, regardless of whether
414 // these nodes can actually communicate directly. The second return value
415 // reports if this is the primary direction or not.
416 std::tuple<message_bridge::ClippedAverageFilter *, bool> GetFilter(
417 const Node *node_a, const Node *node_b);
418
419 // FILE to write offsets to (if populated).
420 FILE *offset_fp_ = nullptr;
421 // Timestamp of the first piece of data used for the horizontal axis on the
422 // plot.
423 aos::realtime_clock::time_point first_time_;
424
425 // List of filters for a connection. The pointer to the first node will be
426 // less than the second node.
427 std::map<std::tuple<const Node *, const Node *>,
428 message_bridge::ClippedAverageFilter>
429 filters_;
430
431 // Returns the offset from the monotonic clock for a node to the distributed
432 // clock. distributed = monotonic + offset;
433 std::chrono::nanoseconds offset(int node_index) const {
James Kuszmaul46d82582020-05-09 19:50:09 -0700434 CHECK_LT(node_index, offset_matrix_.rows())
435 << ": Got too high of a node index.";
Austin Schuh8bd96322020-02-13 21:18:22 -0800436 return -std::chrono::duration_cast<std::chrono::nanoseconds>(
437 std::chrono::duration<double>(offset_matrix_(node_index))) -
438 base_offset_matrix_(node_index);
439 }
440
441 // Updates the offset matrix solution and sets the per-node distributed
442 // offsets in the factory.
443 void UpdateOffsets();
444
445 // sample_matrix_ = map_matrix_ * offset_matrix_
446 Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
447 Eigen::Matrix<double, Eigen::Dynamic, 1> sample_matrix_;
448 Eigen::Matrix<double, Eigen::Dynamic, 1> offset_matrix_;
449
450 // Base offsets. The actual offset is the sum of this and the offset matrix.
451 // This removes some of the dynamic range challenges from the double above.
452 Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>
453 base_offset_matrix_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800454
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800455 std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
456 remapped_configuration_buffer_;
457
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800458 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
459 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800460
461 // Map of channel indices to new name. The channel index will be an index into
462 // logged_configuration(), and the string key will be the name of the channel
463 // to send on instead of the logged channel name.
464 std::map<size_t, std::string> remapped_channels_;
465
Austin Schuh6f3babe2020-01-26 20:34:50 -0800466 // Number of nodes which still have data to send. This is used to figure out
467 // when to exit.
468 size_t live_nodes_ = 0;
469
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800470 const Configuration *remapped_configuration_ = nullptr;
471 const Configuration *replay_configuration_ = nullptr;
Austin Schuhcde938c2020-02-02 17:30:07 -0800472
473 // If true, the replay timer will ignore any missing data. This is used
474 // during startup when we are bootstrapping everything and trying to get to
475 // the start of all the log files.
476 bool ignore_missing_data_ = false;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800477};
478
479} // namespace logger
480} // namespace aos
481
482#endif // AOS_EVENTS_LOGGER_H_