blob: 6d91b692218ef9df9a99b16cf594746393d5021b [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#ifndef AOS_EVENTS_LOGGING_LOG_NAMER_H_
2#define AOS_EVENTS_LOGGING_LOG_NAMER_H_
3
4#include <functional>
5#include <map>
6#include <memory>
7#include <string_view>
8#include <vector>
9
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
12#include "aos/events/logging/uuid.h"
13#include "flatbuffers/flatbuffers.h"
14
15namespace aos {
16namespace logger {
17
18// Interface describing how to name, track, and add headers to log file parts.
19class LogNamer {
20 public:
21 // Constructs a LogNamer with the primary node (ie the one the logger runs on)
22 // being node.
23 LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
24 virtual ~LogNamer() {}
25
26 // Writes the header to all log files for a specific node. This function
27 // needs to be called after all the writers are created.
28 //
29 // Modifies header to contain the uuid and part number for each writer as it
30 // writes it. Since this is done unconditionally, it does not restore the
31 // previous value at the end.
32 virtual void WriteHeader(
33 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
34 const Node *node) = 0;
35
Brian Silverman87ac0402020-09-17 14:47:01 -070036 // Returns a writer for writing data from messages on this channel (on the
37 // primary node).
38 //
39 // The returned pointer will stay valid across rotations, but the object it
40 // points to will be assigned to.
Austin Schuhcb5601b2020-09-10 15:29:59 -070041 virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
42
Brian Silverman87ac0402020-09-17 14:47:01 -070043 // Returns a writer for writing timestamps from messages on this channel (on
44 // the primary node).
45 //
46 // The returned pointer will stay valid across rotations, but the object it
47 // points to will be assigned to.
Austin Schuhcb5601b2020-09-10 15:29:59 -070048 virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
49
50 // Returns a writer for writing timestamps delivered over the special
51 // /aos/remote_timestamps/* channels. node is the node that the timestamps
Brian Silverman87ac0402020-09-17 14:47:01 -070052 // are forwarded back from (to the primary node).
53 //
54 // The returned pointer will stay valid across rotations, but the object it
55 // points to will be assigned to.
Austin Schuhcb5601b2020-09-10 15:29:59 -070056 virtual DetachedBufferWriter *MakeForwardedTimestampWriter(
57 const Channel *channel, const Node *node) = 0;
58
59 // Rotates all log files for the provided node. The provided header will be
60 // modified and written per WriteHeader above.
61 virtual void Rotate(
62 const Node *node,
63 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) = 0;
64
65 // Returns all the nodes that data is being written for.
66 const std::vector<const Node *> &nodes() const { return nodes_; }
67
68 // Returns the node the logger is running on.
69 const Node *node() const { return node_; }
70
71 protected:
72 // Modifies the header to have the provided UUID and part id.
73 void UpdateHeader(
74 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
75 const UUID &uuid, int part_id) const;
76
77 const Node *const node_;
78 std::vector<const Node *> nodes_;
79};
80
81// Local log namer is a simple version which only names things
82// "base_name.part#.bfbs" and increments the part number. It doesn't support
83// any other log type.
84class LocalLogNamer : public LogNamer {
85 public:
86 LocalLogNamer(std::string_view base_name, const Node *node)
87 : LogNamer(node),
88 base_name_(base_name),
89 uuid_(UUID::Random()),
90 data_writer_(OpenDataWriter()) {}
Brian Silverman0465fcf2020-09-24 00:29:18 -070091 ~LocalLogNamer() override = default;
Austin Schuhcb5601b2020-09-10 15:29:59 -070092
93 void WriteHeader(
94 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
95 const Node *node) override;
96
97 DetachedBufferWriter *MakeWriter(const Channel *channel) override;
98
99 void Rotate(const Node *node,
100 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
101 override;
102
103 DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
104
105 DetachedBufferWriter *MakeForwardedTimestampWriter(
106 const Channel * /*channel*/, const Node * /*node*/) override;
107
108 private:
109 // Creates a new data writer with the new part number.
110 std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
111 return std::make_unique<DetachedBufferWriter>(
Brian Silvermanf51499a2020-09-21 12:49:08 -0700112 absl::StrCat(base_name_, ".part", part_number_, ".bfbs"),
113 std::make_unique<aos::logger::DummyEncoder>());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700114 }
115
116 const std::string base_name_;
117 const UUID uuid_;
118 size_t part_number_ = 0;
119 std::unique_ptr<DetachedBufferWriter> data_writer_;
120};
121
122// Log namer which uses a config and a base name to name a bunch of files.
123class MultiNodeLogNamer : public LogNamer {
124 public:
Brian Silvermancb805822020-10-06 17:43:35 -0700125 MultiNodeLogNamer(std::string_view base_name,
126 const Configuration *configuration, const Node *node);
127 ~MultiNodeLogNamer() override;
128
129 std::string_view base_name() const { return base_name_; }
130
Brian Silverman48deab12020-09-30 18:39:28 -0700131 // If temp_suffix is set, then this will write files under names beginning
132 // with the specified suffix, and then rename them to the desired name after
133 // they are fully written.
134 //
135 // This is useful to enable incremental copying of the log files.
136 //
137 // Defaults to writing directly to the final filename.
Brian Silvermancb805822020-10-06 17:43:35 -0700138 void set_temp_suffix(std::string_view temp_suffix) {
139 temp_suffix_ = temp_suffix;
140 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700141
Brian Silvermancb805822020-10-06 17:43:35 -0700142 // Sets the function for creating encoders.
143 //
144 // Defaults to just creating DummyEncoders.
145 void set_encoder_factory(
146 std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory) {
147 encoder_factory_ = std::move(encoder_factory);
148 }
149
150 // Sets an additional file extension.
151 //
152 // Defaults to nothing.
153 void set_extension(std::string_view extension) { extension_ = extension; }
Brian Silverman1f345222020-09-24 21:14:48 -0700154
Brian Silvermana621f522020-09-30 16:52:43 -0700155 // A list of all the filenames we've written.
156 //
157 // This only includes the part after base_name().
158 const std::vector<std::string> &all_filenames() const {
159 return all_filenames_;
160 }
161
Austin Schuhcb5601b2020-09-10 15:29:59 -0700162 void WriteHeader(
163 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
164 const Node *node) override;
165
166 void Rotate(const Node *node,
167 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
168 override;
169
170 DetachedBufferWriter *MakeWriter(const Channel *channel) override;
171
Brian Silvermand90905f2020-09-23 14:42:56 -0700172 DetachedBufferWriter *MakeForwardedTimestampWriter(const Channel *channel,
173 const Node *node) override;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700174
175 DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
176
Brian Silverman0465fcf2020-09-24 00:29:18 -0700177 // Indicates that at least one file ran out of space. Once this happens, we
178 // stop trying to open new files, to avoid writing any files with holes from
179 // previous parts.
180 //
181 // Besides this function, this object will silently stop logging data when
182 // this occurs. If you want to ensure log files are complete, you must call
183 // this method.
184 bool ran_out_of_space() const { return ran_out_of_space_; }
185
Brian Silverman1f345222020-09-24 21:14:48 -0700186 // Returns the maximum total_bytes() value for all existing
187 // DetachedBufferWriters.
188 //
189 // Returns 0 if no files are open.
190 size_t maximum_total_bytes() const {
Brian Silvermancb805822020-10-06 17:43:35 -0700191 return accumulate_data_writers<size_t>(
192 0, [](size_t x, const DataWriter &data_writer) {
193 return std::max(x, data_writer.writer->total_bytes());
194 });
Brian Silverman1f345222020-09-24 21:14:48 -0700195 }
196
Brian Silverman0465fcf2020-09-24 00:29:18 -0700197 // Closes all existing log files. No more data may be written after this.
198 //
199 // This may set ran_out_of_space().
200 void Close();
201
Brian Silvermancb805822020-10-06 17:43:35 -0700202 // Accessors for various statistics. See the identically-named methods in
203 // DetachedBufferWriter for documentation. These are aggregated across all
204 // past and present DetachedBufferWriters.
205 std::chrono::nanoseconds max_write_time() const {
206 return accumulate_data_writers(
207 max_write_time_,
208 [](std::chrono::nanoseconds x, const DataWriter &data_writer) {
209 return std::max(x, data_writer.writer->max_write_time());
210 });
211 }
212 int max_write_time_bytes() const {
213 return std::get<0>(accumulate_data_writers(
214 std::make_tuple(max_write_time_bytes_, max_write_time_),
215 [](std::tuple<int, std::chrono::nanoseconds> x,
216 const DataWriter &data_writer) {
217 if (data_writer.writer->max_write_time() > std::get<1>(x)) {
218 return std::make_tuple(data_writer.writer->max_write_time_bytes(),
219 data_writer.writer->max_write_time());
220 }
221 return x;
222 }));
223 }
224 int max_write_time_messages() const {
225 return std::get<0>(accumulate_data_writers(
226 std::make_tuple(max_write_time_messages_, max_write_time_),
227 [](std::tuple<int, std::chrono::nanoseconds> x,
228 const DataWriter &data_writer) {
229 if (data_writer.writer->max_write_time() > std::get<1>(x)) {
230 return std::make_tuple(
231 data_writer.writer->max_write_time_messages(),
232 data_writer.writer->max_write_time());
233 }
234 return x;
235 }));
236 }
237 std::chrono::nanoseconds total_write_time() const {
238 return accumulate_data_writers(
239 total_write_time_,
240 [](std::chrono::nanoseconds x, const DataWriter &data_writer) {
241 return x + data_writer.writer->total_write_time();
242 });
243 }
244 int total_write_count() const {
245 return accumulate_data_writers(
246 total_write_count_, [](int x, const DataWriter &data_writer) {
247 return x + data_writer.writer->total_write_count();
248 });
249 }
250 int total_write_messages() const {
251 return accumulate_data_writers(
252 total_write_messages_, [](int x, const DataWriter &data_writer) {
253 return x + data_writer.writer->total_write_messages();
254 });
255 }
256 int total_write_bytes() const {
257 return accumulate_data_writers(
258 total_write_bytes_, [](int x, const DataWriter &data_writer) {
259 return x + data_writer.writer->total_write_bytes();
260 });
261 }
262
263 void ResetStatistics();
264
Austin Schuhcb5601b2020-09-10 15:29:59 -0700265 private:
266 // Files to write remote data to. We want one per channel. Maps the channel
267 // to the writer, Node, and part number.
268 struct DataWriter {
269 std::unique_ptr<DetachedBufferWriter> writer = nullptr;
270 const Node *node;
271 size_t part_number = 0;
Brian Silvermancb805822020-10-06 17:43:35 -0700272 const UUID uuid = UUID::Random();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700273 std::function<void(const Channel *, DataWriter *)> rotate;
274 };
275
276 // Opens up a writer for timestamps forwarded back.
277 void OpenForwardedTimestampWriter(const Channel *channel,
278 DataWriter *data_writer);
279
280 // Opens up a writer for remote data.
281 void OpenWriter(const Channel *channel, DataWriter *data_writer);
282
283 // Opens the main data writer file for this node responsible for data_writer_.
Brian Silvermana621f522020-09-30 16:52:43 -0700284 void OpenDataWriter();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700285
Brian Silvermana621f522020-09-30 16:52:43 -0700286 void CreateBufferWriter(std::string_view path,
Brian Silverman0465fcf2020-09-24 00:29:18 -0700287 std::unique_ptr<DetachedBufferWriter> *destination);
288
Brian Silverman48deab12020-09-30 18:39:28 -0700289 void RenameTempFile(DetachedBufferWriter *destination);
290
Brian Silvermancb805822020-10-06 17:43:35 -0700291 void CloseWriter(std::unique_ptr<DetachedBufferWriter> *writer_pointer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700292
Brian Silvermancb805822020-10-06 17:43:35 -0700293 // A version of std::accumulate which operates over all of our DataWriters.
294 template <typename T, typename BinaryOperation>
295 T accumulate_data_writers(T t, BinaryOperation op) const {
296 for (const std::pair<const Channel *const, DataWriter> &data_writer :
297 data_writers_) {
298 t = op(std::move(t), data_writer.second);
299 }
300 if (data_writer_.writer) {
301 t = op(std::move(t), data_writer_);
302 }
303 return t;
304 }
305
306 const std::string base_name_;
307 const Configuration *const configuration_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700308
Brian Silverman0465fcf2020-09-24 00:29:18 -0700309 bool ran_out_of_space_ = false;
Brian Silvermana621f522020-09-30 16:52:43 -0700310 std::vector<std::string> all_filenames_;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700311
Brian Silvermancb805822020-10-06 17:43:35 -0700312 std::string temp_suffix_;
313 std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory_ =
314 []() { return std::make_unique<DummyEncoder>(); };
315 std::string extension_;
316
317 // Storage for statistics from previously-rotated DetachedBufferWriters.
318 std::chrono::nanoseconds max_write_time_ = std::chrono::nanoseconds::zero();
319 int max_write_time_bytes_ = -1;
320 int max_write_time_messages_ = -1;
321 std::chrono::nanoseconds total_write_time_ = std::chrono::nanoseconds::zero();
322 int total_write_count_ = 0;
323 int total_write_messages_ = 0;
324 int total_write_bytes_ = 0;
325
Austin Schuhcb5601b2020-09-10 15:29:59 -0700326 // File to write both delivery timestamps and local data to.
Brian Silvermancb805822020-10-06 17:43:35 -0700327 DataWriter data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700328
329 std::map<const Channel *, DataWriter> data_writers_;
330};
331
332} // namespace logger
333} // namespace aos
334
335#endif // AOS_EVENTS_LOGGING_LOG_NAMER_H_