blob: e0350bb6122a869048c55cbb4bd57a844e6d494d [file] [log] [blame]
Austin Schuhe309d2a2019-11-29 13:25:21 -08001#ifndef AOS_EVENTS_LOGGER_H_
2#define AOS_EVENTS_LOGGER_H_
3
4#include <deque>
Austin Schuh05b70472020-01-01 17:11:17 -08005#include <string_view>
Austin Schuh6f3babe2020-01-26 20:34:50 -08006#include <vector>
Austin Schuhe309d2a2019-11-29 13:25:21 -08007
Austin Schuhe309d2a2019-11-29 13:25:21 -08008#include "absl/types/span.h"
9#include "aos/events/event_loop.h"
Austin Schuha36c8902019-12-30 18:07:15 -080010#include "aos/events/logging/logfile_utils.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080011#include "aos/events/logging/logger_generated.h"
Austin Schuh92547522019-12-28 14:33:43 -080012#include "aos/events/simulated_event_loop.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080013#include "aos/time/time.h"
14#include "flatbuffers/flatbuffers.h"
15
16namespace aos {
17namespace logger {
18
Austin Schuh6f3babe2020-01-26 20:34:50 -080019class LogNamer {
20 public:
21 LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
22 virtual ~LogNamer() {}
23
24 virtual void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
25 const Node *node) = 0;
26 virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
27
28 virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
29 const std::vector<const Node *> &nodes() const { return nodes_; }
30
31 const Node *node() const { return node_; }
32
33 protected:
34 const Node *const node_;
35 std::vector<const Node *> nodes_;
36};
37
38class LocalLogNamer : public LogNamer {
39 public:
40 LocalLogNamer(DetachedBufferWriter *writer, const Node *node)
41 : LogNamer(node), writer_(writer) {}
42
43 ~LocalLogNamer() override { writer_->Flush(); }
44
45 void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
46 const Node *node) override {
47 CHECK_EQ(node, this->node());
48 writer_->WriteSizedFlatbuffer(
49 absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
50 }
51
52 DetachedBufferWriter *MakeWriter(const Channel *channel) override {
53 CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
54 return writer_;
55 }
56
57 DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
58 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
59 << ": Message is not delivered to this node.";
60 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
61 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
62 node_))
63 << ": Delivery times aren't logged for this channel on this node.";
64 return writer_;
65 }
66
67 private:
68 DetachedBufferWriter *writer_;
69};
70
71// TODO(austin): Split naming files from making files so we can re-use the
72// naming code to predict the log file names for a provided base name.
73class MultiNodeLogNamer : public LogNamer {
74 public:
75 MultiNodeLogNamer(std::string_view base_name,
76 const Configuration *configuration, const Node *node)
77 : LogNamer(node),
78 base_name_(base_name),
79 configuration_(configuration),
80 data_writer_(std::make_unique<DetachedBufferWriter>(absl::StrCat(
81 base_name_, "_", node->name()->string_view(), "_data.bfbs"))) {}
82
83 // Writes the header to all log files for a specific node. This function
84 // needs to be called after all the writers are created.
85 void WriteHeader(flatbuffers::FlatBufferBuilder *fbb, const Node *node) {
86 if (node == this->node()) {
87 data_writer_->WriteSizedFlatbuffer(
88 absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
89 } else {
90 for (std::pair<const Channel *const,
91 std::unique_ptr<DetachedBufferWriter>> &data_writer :
92 data_writers_) {
93 if (configuration::ChannelIsSendableOnNode(data_writer.first, node)) {
94 data_writer.second->WriteSizedFlatbuffer(absl::Span<const uint8_t>(
95 fbb->GetBufferPointer(), fbb->GetSize()));
96 }
97 }
98 }
99 }
100
101 // Makes a data logger for a specific channel.
102 DetachedBufferWriter *MakeWriter(const Channel *channel) {
103 // See if we can read the data on this node at all.
104 const bool is_readable =
105 configuration::ChannelIsReadableOnNode(channel, this->node());
106 if (!is_readable) {
107 return nullptr;
108 }
109
110 // Then, see if we are supposed to log the data here.
111 const bool log_message =
112 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
113
114 if (!log_message) {
115 return nullptr;
116 }
117
118 // Now, sort out if this is data generated on this node, or not. It is
119 // generated if it is sendable on this node.
120 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
121 return data_writer_.get();
122 } else {
123 // Ok, we have data that is being forwarded to us that we are supposed to
124 // log. It needs to be logged with send timestamps, but be sorted enough
125 // to be able to be processed.
126 CHECK(data_writers_.find(channel) == data_writers_.end());
127
128 // Track that this node is being logged.
129 if (configuration::MultiNode(configuration_)) {
130 const Node *source_node = configuration::GetNode(
131 configuration_, channel->source_node()->string_view());
132 if (std::find(nodes_.begin(), nodes_.end(), source_node) ==
133 nodes_.end()) {
134 nodes_.emplace_back(source_node);
135 }
136 }
137
138 return data_writers_
139 .insert(std::make_pair(
140 channel,
141 std::make_unique<DetachedBufferWriter>(absl::StrCat(
142 base_name_, "_", channel->source_node()->string_view(),
143 "_data", channel->name()->string_view(), "/",
144 channel->type()->string_view(), ".bfbs"))))
145 .first->second.get();
146 }
147 }
148
149 // Makes a timestamp (or timestamp and data) logger for a channel and
150 // forwarding connection.
151 DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) {
152 const bool log_delivery_times =
153 (this->node() == nullptr)
154 ? false
155 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
156 channel, this->node(), this->node());
157 if (!log_delivery_times) {
158 return nullptr;
159 }
160
161 return data_writer_.get();
162 }
163
164 const std::vector<const Node *> &nodes() const { return nodes_; }
165
166 private:
167 const std::string base_name_;
168 const Configuration *const configuration_;
169
170 // File to write both delivery timestamps and local data to.
171 std::unique_ptr<DetachedBufferWriter> data_writer_;
172 // Files to write remote data to. We want one per channel.
173 std::map<const Channel *, std::unique_ptr<DetachedBufferWriter>>
174 data_writers_;
175};
176
Austin Schuhe309d2a2019-11-29 13:25:21 -0800177// Logs all channels available in the event loop to disk every 100 ms.
178// Start by logging one message per channel to capture any state and
179// configuration that is sent rately on a channel and would affect execution.
180class Logger {
181 public:
182 Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
183 std::chrono::milliseconds polling_period =
184 std::chrono::milliseconds(100));
Austin Schuh6f3babe2020-01-26 20:34:50 -0800185 Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
186 std::chrono::milliseconds polling_period =
187 std::chrono::milliseconds(100));
Austin Schuhe309d2a2019-11-29 13:25:21 -0800188
Austin Schuhfa895892020-01-07 20:07:41 -0800189 // Rotates the log file with the new writer. This writes out the header
190 // again, but keeps going as if nothing else happened.
191 void Rotate(DetachedBufferWriter *writer);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800192 void Rotate(std::unique_ptr<LogNamer> log_namer);
Austin Schuhfa895892020-01-07 20:07:41 -0800193
Austin Schuhe309d2a2019-11-29 13:25:21 -0800194 private:
Austin Schuhfa895892020-01-07 20:07:41 -0800195 void WriteHeader();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800196 void WriteHeader(const Node *node);
Austin Schuhfa895892020-01-07 20:07:41 -0800197
Austin Schuhe309d2a2019-11-29 13:25:21 -0800198 void DoLogData();
199
200 EventLoop *event_loop_;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800201 std::unique_ptr<LogNamer> log_namer_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800202
203 // Structure to track both a fetcher, and if the data fetched has been
204 // written. We may want to delay writing data to disk so that we don't let
205 // data get too far out of order when written to disk so we can avoid making
206 // it too hard to sort when reading.
207 struct FetcherStruct {
208 std::unique_ptr<RawFetcher> fetcher;
209 bool written = false;
Austin Schuh15649d62019-12-28 16:36:38 -0800210
Austin Schuh6f3babe2020-01-26 20:34:50 -0800211 int channel_index = -1;
212
213 LogType log_type = LogType::kLogMessage;
214
215 DetachedBufferWriter *writer = nullptr;
216 DetachedBufferWriter *timestamp_writer = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800217 };
218
219 std::vector<FetcherStruct> fetchers_;
220 TimerHandler *timer_handler_;
221
222 // Period to poll the channels.
223 const std::chrono::milliseconds polling_period_;
224
225 // Last time that data was written for all channels to disk.
226 monotonic_clock::time_point last_synchronized_time_;
227
Austin Schuhfa895892020-01-07 20:07:41 -0800228 monotonic_clock::time_point monotonic_start_time_;
229 realtime_clock::time_point realtime_start_time_;
230
Austin Schuhe309d2a2019-11-29 13:25:21 -0800231 // Max size that the header has consumed. This much extra data will be
232 // reserved in the builder to avoid reallocating.
233 size_t max_header_size_ = 0;
234};
235
Austin Schuh6f3babe2020-01-26 20:34:50 -0800236// We end up with one of the following 3 log file types.
237//
238// Single node logged as the source node.
239// -> Replayed just on the source node.
240//
241// Forwarding timestamps only logged from the perspective of the destination
242// node.
243// -> Matched with data on source node and logged.
244//
245// Forwarding timestamps with data logged as the destination node.
246// -> Replayed just as the destination
247// -> Replayed as the source (Much harder, ordering is not defined)
248//
249// Duplicate data logged. -> CHECK that it matches and explode otherwise.
250//
251// This can be boiled down to a set of constraints and tools.
252//
253// 1) Forwarding timestamps and data need to be logged separately.
254// 2) Any forwarded data logged on the destination node needs to be logged
255// separately such that it can be sorted.
256//
257// 1) Log reader needs to be able to sort a list of log files.
258// 2) Log reader needs to be able to merge sorted lists of log files.
259// 3) Log reader needs to be able to match timestamps with messages.
260//
261// We also need to be able to generate multiple views of a log file depending on
262// the target.
263
Austin Schuhe309d2a2019-11-29 13:25:21 -0800264// Replays all the channels in the logfile to the event loop.
265class LogReader {
266 public:
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800267 // If you want to supply a new configuration that will be used for replay
268 // (e.g., to change message rates, or to populate an updated schema), then
269 // pass it in here. It must provide all the channels that the original logged
270 // config did.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800271 //
272 // Log filenames are in the following format:
273 //
274 // {
275 // {log1_part0, log1_part1, ...},
276 // {log2}
277 // }
278 // The inner vector is a list of log file chunks which form up a log file.
279 // The outer vector is a list of log files with subsets of the messages, or
280 // messages from different nodes.
281 //
282 // If the outer vector isn't provided, it is assumed to be of size 1.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800283 LogReader(std::string_view filename,
284 const Configuration *replay_configuration = nullptr);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800285 LogReader(const std::vector<std::string> &filenames,
286 const Configuration *replay_configuration = nullptr);
287 LogReader(const std::vector<std::vector<std::string>> &filenames,
Austin Schuhfa895892020-01-07 20:07:41 -0800288 const Configuration *replay_configuration = nullptr);
James Kuszmaul7daef362019-12-31 18:28:17 -0800289 ~LogReader();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800290
Austin Schuh6331ef92020-01-07 18:28:09 -0800291 // Registers all the callbacks to send the log file data out on an event loop
292 // created in event_loop_factory. This also updates time to be at the start
293 // of the log file by running until the log file starts.
294 // Note: the configuration used in the factory should be configuration()
295 // below, but can be anything as long as the locations needed to send
296 // everything are available.
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800297 void Register(SimulatedEventLoopFactory *event_loop_factory);
Austin Schuh6331ef92020-01-07 18:28:09 -0800298 // Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
299 // and then calls Register.
300 void Register();
301 // Registers callbacks for all the events after the log file starts. This is
302 // only useful when replaying live.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800303 void Register(EventLoop *event_loop);
Austin Schuh6331ef92020-01-07 18:28:09 -0800304
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800305 // Unregisters the senders. You only need to call this if you separately
306 // supplied an event loop or event loop factory and the lifetimes are such
307 // that they need to be explicitly destroyed before the LogReader destructor
308 // gets called.
Austin Schuhe309d2a2019-11-29 13:25:21 -0800309 void Deregister();
310
Austin Schuhe309d2a2019-11-29 13:25:21 -0800311 // Returns the configuration from the log file.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800312 const Configuration *logged_configuration() const;
313 // Returns the configuration being used for replay.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800314 // The pointer is invalidated whenever RemapLoggedChannel is called.
Austin Schuh15649d62019-12-28 16:36:38 -0800315 const Configuration *configuration() const;
316
Austin Schuh6f3babe2020-01-26 20:34:50 -0800317 // Returns the nodes that this log file was created on. This is a list of
318 // pointers to a node in the nodes() list inside configuration(). The
319 // pointers here are invalidated whenever RemapLoggedChannel is called.
320 std::vector<const Node *> Nodes() const;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800321
322 // Returns the starting timestamp for the log file.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800323 monotonic_clock::time_point monotonic_start_time(const Node *node = nullptr);
324 realtime_clock::time_point realtime_start_time(const Node *node = nullptr);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800325
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800326 // Causes the logger to publish the provided channel on a different name so
327 // that replayed applications can publish on the proper channel name without
328 // interference. This operates on raw channel names, without any node or
329 // application specific mappings.
330 void RemapLoggedChannel(std::string_view name, std::string_view type,
331 std::string_view add_prefix = "/original");
332 template <typename T>
333 void RemapLoggedChannel(std::string_view name,
334 std::string_view add_prefix = "/original") {
335 RemapLoggedChannel(name, T::GetFullyQualifiedName(), add_prefix);
336 }
337
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800338 SimulatedEventLoopFactory *event_loop_factory() {
339 return event_loop_factory_;
340 }
341
Austin Schuhe309d2a2019-11-29 13:25:21 -0800342 private:
Austin Schuh6f3babe2020-01-26 20:34:50 -0800343 const Channel *RemapChannel(const EventLoop *event_loop,
344 const Channel *channel);
345
346 const LogFileHeader *log_file_header() const {
347 return &log_file_header_.message();
348 }
349
Austin Schuhe309d2a2019-11-29 13:25:21 -0800350 // Queues at least max_out_of_order_duration_ messages into channels_.
351 void QueueMessages();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800352 // Handle constructing a configuration with all the additional remapped
353 // channels from calls to RemapLoggedChannel.
354 void MakeRemappedConfig();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800355
Austin Schuh6f3babe2020-01-26 20:34:50 -0800356 const std::vector<std::vector<std::string>> filenames_;
357
358 // This is *a* log file header used to provide the logged config. The rest of
359 // the header is likely distracting.
360 FlatbufferVector<LogFileHeader> log_file_header_;
361
362 // State per node.
363 struct State {
364 // Log file.
365 std::unique_ptr<ChannelMerger> channel_merger;
366 // Senders.
367 std::vector<std::unique_ptr<RawSender>> channels;
368
369 // Factory (if we are in sim) that this loop was created on.
370 NodeEventLoopFactory *node_event_loop_factory = nullptr;
371 std::unique_ptr<EventLoop> event_loop_unique_ptr;
372 // Event loop.
373 EventLoop *event_loop = nullptr;
374 // And timer used to send messages.
375 TimerHandler *timer_handler;
376 };
377
378 // Map of nodes to States used to hold all the state for all the nodes.
379 std::map<const Node *, State> channel_mergers_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800380
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800381 std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
382 remapped_configuration_buffer_;
383
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800384 std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
385 SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800386
387 // Map of channel indices to new name. The channel index will be an index into
388 // logged_configuration(), and the string key will be the name of the channel
389 // to send on instead of the logged channel name.
390 std::map<size_t, std::string> remapped_channels_;
391
Austin Schuh6f3babe2020-01-26 20:34:50 -0800392 // Number of nodes which still have data to send. This is used to figure out
393 // when to exit.
394 size_t live_nodes_ = 0;
395
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800396 const Configuration *remapped_configuration_ = nullptr;
397 const Configuration *replay_configuration_ = nullptr;
Austin Schuhcde938c2020-02-02 17:30:07 -0800398
399 // If true, the replay timer will ignore any missing data. This is used
400 // during startup when we are bootstrapping everything and trying to get to
401 // the start of all the log files.
402 bool ignore_missing_data_ = false;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800403};
404
405} // namespace logger
406} // namespace aos
407
408#endif // AOS_EVENTS_LOGGER_H_