blob: 2d1e1117f86f21d5100745637e7039461e22891a [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
12#include "aos/events/logging/uuid.h"
13#include "flatbuffers/flatbuffers.h"
14#include "glog/logging.h"
15
16namespace aos {
17namespace logger {
18
19void LogNamer::UpdateHeader(
20 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
21 const UUID &uuid, int parts_index) const {
22 header->mutable_message()->mutate_parts_index(parts_index);
23 CHECK_EQ(uuid.string_view().size(),
24 header->mutable_message()->mutable_parts_uuid()->size());
25 std::copy(uuid.string_view().begin(), uuid.string_view().end(),
26 reinterpret_cast<char *>(
27 header->mutable_message()->mutable_parts_uuid()->Data()));
28}
29
30void LocalLogNamer::WriteHeader(
31 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
32 const Node *node) {
33 CHECK_EQ(node, this->node());
34 UpdateHeader(header, uuid_, part_number_);
Austin Schuhadd6eb32020-11-09 21:24:26 -080035 data_writer_->QueueSpan(header->span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070036}
37
38DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhdf576472020-10-19 09:39:37 -070039 CHECK(configuration::ChannelIsSendableOnNode(channel, node()))
40 << ": " << configuration::CleanedChannelToString(channel);
Austin Schuhcb5601b2020-09-10 15:29:59 -070041 return data_writer_.get();
42}
43
44void LocalLogNamer::Rotate(
45 const Node *node,
46 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
47 CHECK(node == this->node());
48 ++part_number_;
49 *data_writer_ = std::move(*OpenDataWriter());
50 UpdateHeader(header, uuid_, part_number_);
Austin Schuhadd6eb32020-11-09 21:24:26 -080051 data_writer_->QueueSpan(header->span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070052}
Austin Schuh315b96b2020-12-11 21:21:12 -080053void LocalLogNamer::Reboot(
54 const Node * /*node*/,
55 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> * /*header*/) {
56 LOG(FATAL) << "Can't reboot a single node.";
57}
Austin Schuhcb5601b2020-09-10 15:29:59 -070058
59DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
60 const Channel *channel) {
61 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
62 << ": Message is not delivered to this node.";
63 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
64 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
65 node_))
66 << ": Delivery times aren't logged for this channel on this node.";
67 return data_writer_.get();
68}
69
70DetachedBufferWriter *LocalLogNamer::MakeForwardedTimestampWriter(
71 const Channel * /*channel*/, const Node * /*node*/) {
72 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
73 return nullptr;
74}
75
76MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
77 const Configuration *configuration,
Brian Silvermancb805822020-10-06 17:43:35 -070078 const Node *node)
79 : LogNamer(node), base_name_(base_name), configuration_(configuration) {}
Austin Schuhcb5601b2020-09-10 15:29:59 -070080
Brian Silverman48deab12020-09-30 18:39:28 -070081MultiNodeLogNamer::~MultiNodeLogNamer() {
82 if (!ran_out_of_space_) {
83 // This handles renaming temporary files etc.
84 Close();
85 }
86}
87
Austin Schuhcb5601b2020-09-10 15:29:59 -070088void MultiNodeLogNamer::WriteHeader(
89 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
90 const Node *node) {
91 if (node == this->node()) {
Brian Silvermancb805822020-10-06 17:43:35 -070092 if (!data_writer_.writer) {
93 OpenDataWriter();
94 }
95 UpdateHeader(header, data_writer_.uuid, data_writer_.part_number);
Austin Schuhadd6eb32020-11-09 21:24:26 -080096 data_writer_.writer->QueueSpan(header->span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070097 } else {
98 for (std::pair<const Channel *const, DataWriter> &data_writer :
99 data_writers_) {
100 if (node == data_writer.second.node) {
101 UpdateHeader(header, data_writer.second.uuid,
102 data_writer.second.part_number);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800103 data_writer.second.writer->QueueSpan(header->span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700104 }
105 }
106 }
107}
108
109void MultiNodeLogNamer::Rotate(
110 const Node *node,
111 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
Austin Schuh315b96b2020-12-11 21:21:12 -0800112 DoRotate(node, header, false);
113}
114
115void MultiNodeLogNamer::Reboot(
116 const Node *node,
117 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
118 DoRotate(node, header, true);
119}
120
121void MultiNodeLogNamer::DoRotate(
122 const Node *node,
123 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header, bool reboot) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700124 if (node == this->node()) {
Brian Silvermancb805822020-10-06 17:43:35 -0700125 if (data_writer_.writer) {
Austin Schuh315b96b2020-12-11 21:21:12 -0800126 if (reboot) {
127 data_writer_.uuid = UUID::Random();
128 }
Brian Silvermancb805822020-10-06 17:43:35 -0700129 ++data_writer_.part_number;
130 }
Brian Silvermana621f522020-09-30 16:52:43 -0700131 OpenDataWriter();
Brian Silvermancb805822020-10-06 17:43:35 -0700132 UpdateHeader(header, data_writer_.uuid, data_writer_.part_number);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800133 data_writer_.writer->QueueSpan(header->span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700134 } else {
135 for (std::pair<const Channel *const, DataWriter> &data_writer :
136 data_writers_) {
137 if (node == data_writer.second.node) {
Austin Schuh315b96b2020-12-11 21:21:12 -0800138 if (reboot) {
139 data_writer.second.uuid = UUID::Random();
140 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700141 ++data_writer.second.part_number;
142 data_writer.second.rotate(data_writer.first, &data_writer.second);
143 UpdateHeader(header, data_writer.second.uuid,
144 data_writer.second.part_number);
Austin Schuhadd6eb32020-11-09 21:24:26 -0800145 data_writer.second.writer->QueueSpan(header->span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700146 }
147 }
148 }
149}
150
151DetachedBufferWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
152 // See if we can read the data on this node at all.
153 const bool is_readable =
154 configuration::ChannelIsReadableOnNode(channel, this->node());
155 if (!is_readable) {
156 return nullptr;
157 }
158
159 // Then, see if we are supposed to log the data here.
160 const bool log_message =
161 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
162
163 if (!log_message) {
164 return nullptr;
165 }
166
167 // Now, sort out if this is data generated on this node, or not. It is
168 // generated if it is sendable on this node.
169 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
Brian Silvermancb805822020-10-06 17:43:35 -0700170 if (!data_writer_.writer) {
171 OpenDataWriter();
172 }
173 return data_writer_.writer.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700174 }
175
176 // Ok, we have data that is being forwarded to us that we are supposed to
177 // log. It needs to be logged with send timestamps, but be sorted enough
178 // to be able to be processed.
179 CHECK(data_writers_.find(channel) == data_writers_.end());
180
181 // Track that this node is being logged.
182 const Node *source_node = configuration::GetNode(
183 configuration_, channel->source_node()->string_view());
184
185 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
186 nodes_.emplace_back(source_node);
187 }
188
189 DataWriter data_writer;
190 data_writer.node = source_node;
191 data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
192 OpenWriter(channel, data_writer);
193 };
194 data_writer.rotate(channel, &data_writer);
195
196 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
197 .first->second.writer.get();
198}
199
200DetachedBufferWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
201 const Channel *channel, const Node *node) {
202 // See if we can read the data on this node at all.
203 const bool is_readable =
204 configuration::ChannelIsReadableOnNode(channel, this->node());
205 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
206
207 CHECK(data_writers_.find(channel) == data_writers_.end());
208
209 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
210 nodes_.emplace_back(node);
211 }
212
213 DataWriter data_writer;
214 data_writer.node = node;
215 data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
216 OpenForwardedTimestampWriter(channel, data_writer);
217 };
218 data_writer.rotate(channel, &data_writer);
219
220 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
221 .first->second.writer.get();
222}
223
224DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
225 const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700226 bool log_delivery_times = false;
227 if (this->node() != nullptr) {
228 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
229 channel, this->node(), this->node());
230 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700231 if (!log_delivery_times) {
232 return nullptr;
233 }
234
Brian Silvermancb805822020-10-06 17:43:35 -0700235 if (!data_writer_.writer) {
236 OpenDataWriter();
237 }
238 return data_writer_.writer.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700239}
240
Brian Silverman0465fcf2020-09-24 00:29:18 -0700241void MultiNodeLogNamer::Close() {
242 for (std::pair<const Channel *const, DataWriter> &data_writer :
243 data_writers_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700244 CloseWriter(&data_writer.second.writer);
245 data_writer.second.writer.reset();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700246 }
Brian Silvermancb805822020-10-06 17:43:35 -0700247 CloseWriter(&data_writer_.writer);
248 data_writer_.writer.reset();
249}
250
251void MultiNodeLogNamer::ResetStatistics() {
252 for (std::pair<const Channel *const, DataWriter> &data_writer :
253 data_writers_) {
Austin Schuhad0cfc32020-12-21 12:34:26 -0800254 if (!data_writer.second.writer) continue;
Brian Silvermancb805822020-10-06 17:43:35 -0700255 data_writer.second.writer->ResetStatistics();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700256 }
Brian Silvermancb805822020-10-06 17:43:35 -0700257 if (data_writer_.writer) {
258 data_writer_.writer->ResetStatistics();
259 }
260 max_write_time_ = std::chrono::nanoseconds::zero();
261 max_write_time_bytes_ = -1;
262 max_write_time_messages_ = -1;
263 total_write_time_ = std::chrono::nanoseconds::zero();
264 total_write_count_ = 0;
265 total_write_messages_ = 0;
266 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700267}
268
Austin Schuhcb5601b2020-09-10 15:29:59 -0700269void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
270 DataWriter *data_writer) {
271 std::string filename =
Austin Schuhe715eae2020-10-10 15:39:30 -0700272 absl::StrCat("timestamps", channel->name()->string_view(), "/",
Brian Silvermana621f522020-09-30 16:52:43 -0700273 channel->type()->string_view(), ".part",
Brian Silverman1b071eb2020-10-09 12:24:10 -0700274 data_writer->part_number, ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700275 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700276}
277
278void MultiNodeLogNamer::OpenWriter(const Channel *channel,
279 DataWriter *data_writer) {
280 const std::string filename = absl::StrCat(
Austin Schuhe715eae2020-10-10 15:39:30 -0700281 CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
Brian Silvermana621f522020-09-30 16:52:43 -0700282 channel->name()->string_view(), "/", channel->type()->string_view(),
Brian Silvermancb805822020-10-06 17:43:35 -0700283 ".part", data_writer->part_number, ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700284 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700285}
286
Brian Silvermana621f522020-09-30 16:52:43 -0700287void MultiNodeLogNamer::OpenDataWriter() {
288 std::string name;
Brian Silverman7af8c902020-09-29 16:14:04 -0700289 if (node() != nullptr) {
Austin Schuhe715eae2020-10-10 15:39:30 -0700290 name = absl::StrCat(name, node()->name()->string_view(), "_");
Brian Silverman7af8c902020-09-29 16:14:04 -0700291 }
Austin Schuhe715eae2020-10-10 15:39:30 -0700292 absl::StrAppend(&name, "data.part", data_writer_.part_number, ".bfbs",
Brian Silvermancb805822020-10-06 17:43:35 -0700293 extension_);
294 CreateBufferWriter(name, &data_writer_.writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700295}
296
Brian Silverman0465fcf2020-09-24 00:29:18 -0700297void MultiNodeLogNamer::CreateBufferWriter(
Brian Silvermana621f522020-09-30 16:52:43 -0700298 std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700299 if (ran_out_of_space_) {
300 // Refuse to open any new files, which might skip data. Any existing files
301 // are in the same folder, which means they're on the same filesystem, which
302 // means they're probably going to run out of space and get stuck too.
303 return;
304 }
Austin Schuhe715eae2020-10-10 15:39:30 -0700305 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
306 const std::string filename =
307 absl::StrCat(base_name_, separator, path, temp_suffix_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700308 if (!destination->get()) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700309 if (ran_out_of_space_) {
310 *destination = std::make_unique<DetachedBufferWriter>(
311 DetachedBufferWriter::already_out_of_space_t());
312 return;
313 }
Brian Silvermancb805822020-10-06 17:43:35 -0700314 *destination =
315 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700316 if (!destination->get()->ran_out_of_space()) {
317 all_filenames_.emplace_back(path);
318 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700319 return;
320 }
Brian Silvermancb805822020-10-06 17:43:35 -0700321
322 CloseWriter(destination);
323 if (ran_out_of_space_) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700324 *destination->get() =
325 DetachedBufferWriter(DetachedBufferWriter::already_out_of_space_t());
Brian Silverman0465fcf2020-09-24 00:29:18 -0700326 return;
327 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700328
Brian Silvermancb805822020-10-06 17:43:35 -0700329 *destination->get() = DetachedBufferWriter(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700330 if (!destination->get()->ran_out_of_space()) {
331 all_filenames_.emplace_back(path);
332 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700333}
334
Brian Silverman48deab12020-09-30 18:39:28 -0700335void MultiNodeLogNamer::RenameTempFile(DetachedBufferWriter *destination) {
336 if (temp_suffix_.empty()) {
337 return;
338 }
339 const std::string current_filename = std::string(destination->filename());
340 CHECK(current_filename.size() > temp_suffix_.size());
341 const std::string final_filename =
342 current_filename.substr(0, current_filename.size() - temp_suffix_.size());
343 const int result = rename(current_filename.c_str(), final_filename.c_str());
344 if (result != 0) {
345 if (errno == ENOSPC) {
346 ran_out_of_space_ = true;
347 return;
348 } else {
349 PLOG(FATAL) << "Renaming " << current_filename << " to " << final_filename
350 << " failed";
351 }
352 }
353}
354
Brian Silvermancb805822020-10-06 17:43:35 -0700355void MultiNodeLogNamer::CloseWriter(
356 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
357 DetachedBufferWriter *const writer = writer_pointer->get();
358 if (!writer) {
359 return;
360 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700361 const bool was_open = writer->is_open();
Brian Silvermancb805822020-10-06 17:43:35 -0700362 writer->Close();
363
364 if (writer->max_write_time() > max_write_time_) {
365 max_write_time_ = writer->max_write_time();
366 max_write_time_bytes_ = writer->max_write_time_bytes();
367 max_write_time_messages_ = writer->max_write_time_messages();
368 }
369 total_write_time_ += writer->total_write_time();
370 total_write_count_ += writer->total_write_count();
371 total_write_messages_ += writer->total_write_messages();
372 total_write_bytes_ += writer->total_write_bytes();
373
374 if (writer->ran_out_of_space()) {
375 ran_out_of_space_ = true;
376 writer->acknowledge_out_of_space();
377 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700378 if (was_open) {
379 RenameTempFile(writer);
380 } else {
381 CHECK(access(std::string(writer->filename()).c_str(), F_OK) == -1)
382 << ": File should not exist: " << writer->filename();
383 }
Brian Silvermancb805822020-10-06 17:43:35 -0700384}
385
Austin Schuhcb5601b2020-09-10 15:29:59 -0700386} // namespace logger
387} // namespace aos