blob: 411e666f563688cd68cd2bd7d20180b36e7f0cb4 [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
12#include "aos/events/logging/uuid.h"
13#include "flatbuffers/flatbuffers.h"
14#include "glog/logging.h"
15
16namespace aos {
17namespace logger {
18
19void LogNamer::UpdateHeader(
20 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
21 const UUID &uuid, int parts_index) const {
22 header->mutable_message()->mutate_parts_index(parts_index);
23 CHECK_EQ(uuid.string_view().size(),
24 header->mutable_message()->mutable_parts_uuid()->size());
25 std::copy(uuid.string_view().begin(), uuid.string_view().end(),
26 reinterpret_cast<char *>(
27 header->mutable_message()->mutable_parts_uuid()->Data()));
28}
29
30void LocalLogNamer::WriteHeader(
31 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
32 const Node *node) {
33 CHECK_EQ(node, this->node());
34 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070035 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070036}
37
38DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhdf576472020-10-19 09:39:37 -070039 CHECK(configuration::ChannelIsSendableOnNode(channel, node()))
40 << ": " << configuration::CleanedChannelToString(channel);
Austin Schuhcb5601b2020-09-10 15:29:59 -070041 return data_writer_.get();
42}
43
44void LocalLogNamer::Rotate(
45 const Node *node,
46 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
47 CHECK(node == this->node());
48 ++part_number_;
49 *data_writer_ = std::move(*OpenDataWriter());
50 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070051 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070052}
53
54DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
55 const Channel *channel) {
56 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
57 << ": Message is not delivered to this node.";
58 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
59 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
60 node_))
61 << ": Delivery times aren't logged for this channel on this node.";
62 return data_writer_.get();
63}
64
65DetachedBufferWriter *LocalLogNamer::MakeForwardedTimestampWriter(
66 const Channel * /*channel*/, const Node * /*node*/) {
67 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
68 return nullptr;
69}
70
71MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
72 const Configuration *configuration,
Brian Silvermancb805822020-10-06 17:43:35 -070073 const Node *node)
74 : LogNamer(node), base_name_(base_name), configuration_(configuration) {}
Austin Schuhcb5601b2020-09-10 15:29:59 -070075
Brian Silverman48deab12020-09-30 18:39:28 -070076MultiNodeLogNamer::~MultiNodeLogNamer() {
77 if (!ran_out_of_space_) {
78 // This handles renaming temporary files etc.
79 Close();
80 }
81}
82
Austin Schuhcb5601b2020-09-10 15:29:59 -070083void MultiNodeLogNamer::WriteHeader(
84 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
85 const Node *node) {
86 if (node == this->node()) {
Brian Silvermancb805822020-10-06 17:43:35 -070087 if (!data_writer_.writer) {
88 OpenDataWriter();
89 }
90 UpdateHeader(header, data_writer_.uuid, data_writer_.part_number);
91 data_writer_.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070092 } else {
93 for (std::pair<const Channel *const, DataWriter> &data_writer :
94 data_writers_) {
95 if (node == data_writer.second.node) {
96 UpdateHeader(header, data_writer.second.uuid,
97 data_writer.second.part_number);
Brian Silvermanf51499a2020-09-21 12:49:08 -070098 data_writer.second.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070099 }
100 }
101 }
102}
103
104void MultiNodeLogNamer::Rotate(
105 const Node *node,
106 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
107 if (node == this->node()) {
Brian Silvermancb805822020-10-06 17:43:35 -0700108 if (data_writer_.writer) {
109 ++data_writer_.part_number;
110 }
Brian Silvermana621f522020-09-30 16:52:43 -0700111 OpenDataWriter();
Brian Silvermancb805822020-10-06 17:43:35 -0700112 UpdateHeader(header, data_writer_.uuid, data_writer_.part_number);
113 data_writer_.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700114 } else {
115 for (std::pair<const Channel *const, DataWriter> &data_writer :
116 data_writers_) {
117 if (node == data_writer.second.node) {
118 ++data_writer.second.part_number;
119 data_writer.second.rotate(data_writer.first, &data_writer.second);
120 UpdateHeader(header, data_writer.second.uuid,
121 data_writer.second.part_number);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700122 data_writer.second.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700123 }
124 }
125 }
126}
127
128DetachedBufferWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
129 // See if we can read the data on this node at all.
130 const bool is_readable =
131 configuration::ChannelIsReadableOnNode(channel, this->node());
132 if (!is_readable) {
133 return nullptr;
134 }
135
136 // Then, see if we are supposed to log the data here.
137 const bool log_message =
138 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
139
140 if (!log_message) {
141 return nullptr;
142 }
143
144 // Now, sort out if this is data generated on this node, or not. It is
145 // generated if it is sendable on this node.
146 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
Brian Silvermancb805822020-10-06 17:43:35 -0700147 if (!data_writer_.writer) {
148 OpenDataWriter();
149 }
150 return data_writer_.writer.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700151 }
152
153 // Ok, we have data that is being forwarded to us that we are supposed to
154 // log. It needs to be logged with send timestamps, but be sorted enough
155 // to be able to be processed.
156 CHECK(data_writers_.find(channel) == data_writers_.end());
157
158 // Track that this node is being logged.
159 const Node *source_node = configuration::GetNode(
160 configuration_, channel->source_node()->string_view());
161
162 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
163 nodes_.emplace_back(source_node);
164 }
165
166 DataWriter data_writer;
167 data_writer.node = source_node;
168 data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
169 OpenWriter(channel, data_writer);
170 };
171 data_writer.rotate(channel, &data_writer);
172
173 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
174 .first->second.writer.get();
175}
176
177DetachedBufferWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
178 const Channel *channel, const Node *node) {
179 // See if we can read the data on this node at all.
180 const bool is_readable =
181 configuration::ChannelIsReadableOnNode(channel, this->node());
182 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
183
184 CHECK(data_writers_.find(channel) == data_writers_.end());
185
186 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
187 nodes_.emplace_back(node);
188 }
189
190 DataWriter data_writer;
191 data_writer.node = node;
192 data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
193 OpenForwardedTimestampWriter(channel, data_writer);
194 };
195 data_writer.rotate(channel, &data_writer);
196
197 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
198 .first->second.writer.get();
199}
200
201DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
202 const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700203 bool log_delivery_times = false;
204 if (this->node() != nullptr) {
205 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
206 channel, this->node(), this->node());
207 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700208 if (!log_delivery_times) {
209 return nullptr;
210 }
211
Brian Silvermancb805822020-10-06 17:43:35 -0700212 if (!data_writer_.writer) {
213 OpenDataWriter();
214 }
215 return data_writer_.writer.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700216}
217
Brian Silverman0465fcf2020-09-24 00:29:18 -0700218void MultiNodeLogNamer::Close() {
219 for (std::pair<const Channel *const, DataWriter> &data_writer :
220 data_writers_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700221 CloseWriter(&data_writer.second.writer);
222 data_writer.second.writer.reset();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700223 }
Brian Silvermancb805822020-10-06 17:43:35 -0700224 CloseWriter(&data_writer_.writer);
225 data_writer_.writer.reset();
226}
227
228void MultiNodeLogNamer::ResetStatistics() {
229 for (std::pair<const Channel *const, DataWriter> &data_writer :
230 data_writers_) {
231 data_writer.second.writer->ResetStatistics();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700232 }
Brian Silvermancb805822020-10-06 17:43:35 -0700233 if (data_writer_.writer) {
234 data_writer_.writer->ResetStatistics();
235 }
236 max_write_time_ = std::chrono::nanoseconds::zero();
237 max_write_time_bytes_ = -1;
238 max_write_time_messages_ = -1;
239 total_write_time_ = std::chrono::nanoseconds::zero();
240 total_write_count_ = 0;
241 total_write_messages_ = 0;
242 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700243}
244
Austin Schuhcb5601b2020-09-10 15:29:59 -0700245void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
246 DataWriter *data_writer) {
247 std::string filename =
Austin Schuhe715eae2020-10-10 15:39:30 -0700248 absl::StrCat("timestamps", channel->name()->string_view(), "/",
Brian Silvermana621f522020-09-30 16:52:43 -0700249 channel->type()->string_view(), ".part",
Brian Silverman1b071eb2020-10-09 12:24:10 -0700250 data_writer->part_number, ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700251 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700252}
253
254void MultiNodeLogNamer::OpenWriter(const Channel *channel,
255 DataWriter *data_writer) {
256 const std::string filename = absl::StrCat(
Austin Schuhe715eae2020-10-10 15:39:30 -0700257 CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
Brian Silvermana621f522020-09-30 16:52:43 -0700258 channel->name()->string_view(), "/", channel->type()->string_view(),
Brian Silvermancb805822020-10-06 17:43:35 -0700259 ".part", data_writer->part_number, ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700260 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700261}
262
Brian Silvermana621f522020-09-30 16:52:43 -0700263void MultiNodeLogNamer::OpenDataWriter() {
264 std::string name;
Brian Silverman7af8c902020-09-29 16:14:04 -0700265 if (node() != nullptr) {
Austin Schuhe715eae2020-10-10 15:39:30 -0700266 name = absl::StrCat(name, node()->name()->string_view(), "_");
Brian Silverman7af8c902020-09-29 16:14:04 -0700267 }
Austin Schuhe715eae2020-10-10 15:39:30 -0700268 absl::StrAppend(&name, "data.part", data_writer_.part_number, ".bfbs",
Brian Silvermancb805822020-10-06 17:43:35 -0700269 extension_);
270 CreateBufferWriter(name, &data_writer_.writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700271}
272
Brian Silverman0465fcf2020-09-24 00:29:18 -0700273void MultiNodeLogNamer::CreateBufferWriter(
Brian Silvermana621f522020-09-30 16:52:43 -0700274 std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700275 if (ran_out_of_space_) {
276 // Refuse to open any new files, which might skip data. Any existing files
277 // are in the same folder, which means they're on the same filesystem, which
278 // means they're probably going to run out of space and get stuck too.
279 return;
280 }
Austin Schuhe715eae2020-10-10 15:39:30 -0700281 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
282 const std::string filename =
283 absl::StrCat(base_name_, separator, path, temp_suffix_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700284 if (!destination->get()) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700285 if (ran_out_of_space_) {
286 *destination = std::make_unique<DetachedBufferWriter>(
287 DetachedBufferWriter::already_out_of_space_t());
288 return;
289 }
Brian Silvermancb805822020-10-06 17:43:35 -0700290 *destination =
291 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700292 if (!destination->get()->ran_out_of_space()) {
293 all_filenames_.emplace_back(path);
294 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700295 return;
296 }
Brian Silvermancb805822020-10-06 17:43:35 -0700297
298 CloseWriter(destination);
299 if (ran_out_of_space_) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700300 *destination->get() =
301 DetachedBufferWriter(DetachedBufferWriter::already_out_of_space_t());
Brian Silverman0465fcf2020-09-24 00:29:18 -0700302 return;
303 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700304
Brian Silvermancb805822020-10-06 17:43:35 -0700305 *destination->get() = DetachedBufferWriter(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700306 if (!destination->get()->ran_out_of_space()) {
307 all_filenames_.emplace_back(path);
308 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700309}
310
Brian Silverman48deab12020-09-30 18:39:28 -0700311void MultiNodeLogNamer::RenameTempFile(DetachedBufferWriter *destination) {
312 if (temp_suffix_.empty()) {
313 return;
314 }
315 const std::string current_filename = std::string(destination->filename());
316 CHECK(current_filename.size() > temp_suffix_.size());
317 const std::string final_filename =
318 current_filename.substr(0, current_filename.size() - temp_suffix_.size());
319 const int result = rename(current_filename.c_str(), final_filename.c_str());
320 if (result != 0) {
321 if (errno == ENOSPC) {
322 ran_out_of_space_ = true;
323 return;
324 } else {
325 PLOG(FATAL) << "Renaming " << current_filename << " to " << final_filename
326 << " failed";
327 }
328 }
329}
330
Brian Silvermancb805822020-10-06 17:43:35 -0700331void MultiNodeLogNamer::CloseWriter(
332 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
333 DetachedBufferWriter *const writer = writer_pointer->get();
334 if (!writer) {
335 return;
336 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700337 const bool was_open = writer->is_open();
Brian Silvermancb805822020-10-06 17:43:35 -0700338 writer->Close();
339
340 if (writer->max_write_time() > max_write_time_) {
341 max_write_time_ = writer->max_write_time();
342 max_write_time_bytes_ = writer->max_write_time_bytes();
343 max_write_time_messages_ = writer->max_write_time_messages();
344 }
345 total_write_time_ += writer->total_write_time();
346 total_write_count_ += writer->total_write_count();
347 total_write_messages_ += writer->total_write_messages();
348 total_write_bytes_ += writer->total_write_bytes();
349
350 if (writer->ran_out_of_space()) {
351 ran_out_of_space_ = true;
352 writer->acknowledge_out_of_space();
353 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700354 if (was_open) {
355 RenameTempFile(writer);
356 } else {
357 CHECK(access(std::string(writer->filename()).c_str(), F_OK) == -1)
358 << ": File should not exist: " << writer->filename();
359 }
Brian Silvermancb805822020-10-06 17:43:35 -0700360}
361
Austin Schuhcb5601b2020-09-10 15:29:59 -0700362} // namespace logger
363} // namespace aos