blob: 748da7c538a48fd29691531b29de014813d50b02 [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
12#include "aos/events/logging/uuid.h"
13#include "flatbuffers/flatbuffers.h"
14#include "glog/logging.h"
15
16namespace aos {
17namespace logger {
18
19void LogNamer::UpdateHeader(
20 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
21 const UUID &uuid, int parts_index) const {
22 header->mutable_message()->mutate_parts_index(parts_index);
23 CHECK_EQ(uuid.string_view().size(),
24 header->mutable_message()->mutable_parts_uuid()->size());
25 std::copy(uuid.string_view().begin(), uuid.string_view().end(),
26 reinterpret_cast<char *>(
27 header->mutable_message()->mutable_parts_uuid()->Data()));
28}
29
30void LocalLogNamer::WriteHeader(
31 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
32 const Node *node) {
33 CHECK_EQ(node, this->node());
34 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070035 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070036}
37
38DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
39 CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
40 return data_writer_.get();
41}
42
43void LocalLogNamer::Rotate(
44 const Node *node,
45 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
46 CHECK(node == this->node());
47 ++part_number_;
48 *data_writer_ = std::move(*OpenDataWriter());
49 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070050 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070051}
52
53DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
54 const Channel *channel) {
55 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
56 << ": Message is not delivered to this node.";
57 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
58 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
59 node_))
60 << ": Delivery times aren't logged for this channel on this node.";
61 return data_writer_.get();
62}
63
64DetachedBufferWriter *LocalLogNamer::MakeForwardedTimestampWriter(
65 const Channel * /*channel*/, const Node * /*node*/) {
66 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
67 return nullptr;
68}
69
70MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
71 const Configuration *configuration,
Brian Silvermancb805822020-10-06 17:43:35 -070072 const Node *node)
73 : LogNamer(node), base_name_(base_name), configuration_(configuration) {}
Austin Schuhcb5601b2020-09-10 15:29:59 -070074
Brian Silverman48deab12020-09-30 18:39:28 -070075MultiNodeLogNamer::~MultiNodeLogNamer() {
76 if (!ran_out_of_space_) {
77 // This handles renaming temporary files etc.
78 Close();
79 }
80}
81
Austin Schuhcb5601b2020-09-10 15:29:59 -070082void MultiNodeLogNamer::WriteHeader(
83 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
84 const Node *node) {
85 if (node == this->node()) {
Brian Silvermancb805822020-10-06 17:43:35 -070086 if (!data_writer_.writer) {
87 OpenDataWriter();
88 }
89 UpdateHeader(header, data_writer_.uuid, data_writer_.part_number);
90 data_writer_.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070091 } else {
92 for (std::pair<const Channel *const, DataWriter> &data_writer :
93 data_writers_) {
94 if (node == data_writer.second.node) {
95 UpdateHeader(header, data_writer.second.uuid,
96 data_writer.second.part_number);
Brian Silvermanf51499a2020-09-21 12:49:08 -070097 data_writer.second.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070098 }
99 }
100 }
101}
102
103void MultiNodeLogNamer::Rotate(
104 const Node *node,
105 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
106 if (node == this->node()) {
Brian Silvermancb805822020-10-06 17:43:35 -0700107 if (data_writer_.writer) {
108 ++data_writer_.part_number;
109 }
Brian Silvermana621f522020-09-30 16:52:43 -0700110 OpenDataWriter();
Brian Silvermancb805822020-10-06 17:43:35 -0700111 UpdateHeader(header, data_writer_.uuid, data_writer_.part_number);
112 data_writer_.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700113 } else {
114 for (std::pair<const Channel *const, DataWriter> &data_writer :
115 data_writers_) {
116 if (node == data_writer.second.node) {
117 ++data_writer.second.part_number;
118 data_writer.second.rotate(data_writer.first, &data_writer.second);
119 UpdateHeader(header, data_writer.second.uuid,
120 data_writer.second.part_number);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700121 data_writer.second.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700122 }
123 }
124 }
125}
126
127DetachedBufferWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
128 // See if we can read the data on this node at all.
129 const bool is_readable =
130 configuration::ChannelIsReadableOnNode(channel, this->node());
131 if (!is_readable) {
132 return nullptr;
133 }
134
135 // Then, see if we are supposed to log the data here.
136 const bool log_message =
137 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
138
139 if (!log_message) {
140 return nullptr;
141 }
142
143 // Now, sort out if this is data generated on this node, or not. It is
144 // generated if it is sendable on this node.
145 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
Brian Silvermancb805822020-10-06 17:43:35 -0700146 if (!data_writer_.writer) {
147 OpenDataWriter();
148 }
149 return data_writer_.writer.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700150 }
151
152 // Ok, we have data that is being forwarded to us that we are supposed to
153 // log. It needs to be logged with send timestamps, but be sorted enough
154 // to be able to be processed.
155 CHECK(data_writers_.find(channel) == data_writers_.end());
156
157 // Track that this node is being logged.
158 const Node *source_node = configuration::GetNode(
159 configuration_, channel->source_node()->string_view());
160
161 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
162 nodes_.emplace_back(source_node);
163 }
164
165 DataWriter data_writer;
166 data_writer.node = source_node;
167 data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
168 OpenWriter(channel, data_writer);
169 };
170 data_writer.rotate(channel, &data_writer);
171
172 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
173 .first->second.writer.get();
174}
175
176DetachedBufferWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
177 const Channel *channel, const Node *node) {
178 // See if we can read the data on this node at all.
179 const bool is_readable =
180 configuration::ChannelIsReadableOnNode(channel, this->node());
181 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
182
183 CHECK(data_writers_.find(channel) == data_writers_.end());
184
185 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
186 nodes_.emplace_back(node);
187 }
188
189 DataWriter data_writer;
190 data_writer.node = node;
191 data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
192 OpenForwardedTimestampWriter(channel, data_writer);
193 };
194 data_writer.rotate(channel, &data_writer);
195
196 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
197 .first->second.writer.get();
198}
199
200DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
201 const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700202 bool log_delivery_times = false;
203 if (this->node() != nullptr) {
204 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
205 channel, this->node(), this->node());
206 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700207 if (!log_delivery_times) {
208 return nullptr;
209 }
210
Brian Silvermancb805822020-10-06 17:43:35 -0700211 if (!data_writer_.writer) {
212 OpenDataWriter();
213 }
214 return data_writer_.writer.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700215}
216
Brian Silverman0465fcf2020-09-24 00:29:18 -0700217void MultiNodeLogNamer::Close() {
218 for (std::pair<const Channel *const, DataWriter> &data_writer :
219 data_writers_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700220 CloseWriter(&data_writer.second.writer);
221 data_writer.second.writer.reset();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700222 }
Brian Silvermancb805822020-10-06 17:43:35 -0700223 CloseWriter(&data_writer_.writer);
224 data_writer_.writer.reset();
225}
226
227void MultiNodeLogNamer::ResetStatistics() {
228 for (std::pair<const Channel *const, DataWriter> &data_writer :
229 data_writers_) {
230 data_writer.second.writer->ResetStatistics();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700231 }
Brian Silvermancb805822020-10-06 17:43:35 -0700232 if (data_writer_.writer) {
233 data_writer_.writer->ResetStatistics();
234 }
235 max_write_time_ = std::chrono::nanoseconds::zero();
236 max_write_time_bytes_ = -1;
237 max_write_time_messages_ = -1;
238 total_write_time_ = std::chrono::nanoseconds::zero();
239 total_write_count_ = 0;
240 total_write_messages_ = 0;
241 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700242}
243
Austin Schuhcb5601b2020-09-10 15:29:59 -0700244void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
245 DataWriter *data_writer) {
246 std::string filename =
Brian Silvermana621f522020-09-30 16:52:43 -0700247 absl::StrCat("_timestamps", channel->name()->string_view(), "/",
248 channel->type()->string_view(), ".part",
Austin Schuhcb5601b2020-09-10 15:29:59 -0700249 data_writer->part_number, ".bfbs");
Brian Silverman0465fcf2020-09-24 00:29:18 -0700250 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700251}
252
253void MultiNodeLogNamer::OpenWriter(const Channel *channel,
254 DataWriter *data_writer) {
255 const std::string filename = absl::StrCat(
Brian Silvermana621f522020-09-30 16:52:43 -0700256 "_", CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
257 channel->name()->string_view(), "/", channel->type()->string_view(),
Brian Silvermancb805822020-10-06 17:43:35 -0700258 ".part", data_writer->part_number, ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700259 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700260}
261
Brian Silvermana621f522020-09-30 16:52:43 -0700262void MultiNodeLogNamer::OpenDataWriter() {
263 std::string name;
Brian Silverman7af8c902020-09-29 16:14:04 -0700264 if (node() != nullptr) {
265 name = absl::StrCat(name, "_", node()->name()->string_view());
266 }
Brian Silvermancb805822020-10-06 17:43:35 -0700267 absl::StrAppend(&name, "_data.part", data_writer_.part_number, ".bfbs",
268 extension_);
269 CreateBufferWriter(name, &data_writer_.writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700270}
271
Brian Silverman0465fcf2020-09-24 00:29:18 -0700272void MultiNodeLogNamer::CreateBufferWriter(
Brian Silvermana621f522020-09-30 16:52:43 -0700273 std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700274 if (ran_out_of_space_) {
275 // Refuse to open any new files, which might skip data. Any existing files
276 // are in the same folder, which means they're on the same filesystem, which
277 // means they're probably going to run out of space and get stuck too.
278 return;
279 }
Brian Silverman48deab12020-09-30 18:39:28 -0700280 const std::string filename = absl::StrCat(base_name_, path, temp_suffix_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700281 if (!destination->get()) {
Brian Silvermana621f522020-09-30 16:52:43 -0700282 all_filenames_.emplace_back(path);
Brian Silvermancb805822020-10-06 17:43:35 -0700283 *destination =
284 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
Brian Silverman0465fcf2020-09-24 00:29:18 -0700285 return;
286 }
Brian Silvermancb805822020-10-06 17:43:35 -0700287
288 CloseWriter(destination);
289 if (ran_out_of_space_) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700290 return;
291 }
Brian Silvermana621f522020-09-30 16:52:43 -0700292 all_filenames_.emplace_back(path);
Brian Silvermancb805822020-10-06 17:43:35 -0700293 *destination->get() = DetachedBufferWriter(filename, encoder_factory_());
Brian Silverman0465fcf2020-09-24 00:29:18 -0700294}
295
Brian Silverman48deab12020-09-30 18:39:28 -0700296void MultiNodeLogNamer::RenameTempFile(DetachedBufferWriter *destination) {
297 if (temp_suffix_.empty()) {
298 return;
299 }
300 const std::string current_filename = std::string(destination->filename());
301 CHECK(current_filename.size() > temp_suffix_.size());
302 const std::string final_filename =
303 current_filename.substr(0, current_filename.size() - temp_suffix_.size());
304 const int result = rename(current_filename.c_str(), final_filename.c_str());
305 if (result != 0) {
306 if (errno == ENOSPC) {
307 ran_out_of_space_ = true;
308 return;
309 } else {
310 PLOG(FATAL) << "Renaming " << current_filename << " to " << final_filename
311 << " failed";
312 }
313 }
314}
315
Brian Silvermancb805822020-10-06 17:43:35 -0700316void MultiNodeLogNamer::CloseWriter(
317 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
318 DetachedBufferWriter *const writer = writer_pointer->get();
319 if (!writer) {
320 return;
321 }
322 writer->Close();
323
324 if (writer->max_write_time() > max_write_time_) {
325 max_write_time_ = writer->max_write_time();
326 max_write_time_bytes_ = writer->max_write_time_bytes();
327 max_write_time_messages_ = writer->max_write_time_messages();
328 }
329 total_write_time_ += writer->total_write_time();
330 total_write_count_ += writer->total_write_count();
331 total_write_messages_ += writer->total_write_messages();
332 total_write_bytes_ += writer->total_write_bytes();
333
334 if (writer->ran_out_of_space()) {
335 ran_out_of_space_ = true;
336 writer->acknowledge_out_of_space();
337 }
338 RenameTempFile(writer);
339}
340
Austin Schuhcb5601b2020-09-10 15:29:59 -0700341} // namespace logger
342} // namespace aos