blob: 06243b06cb973223317b7bccb205db4676b4260a [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
12#include "aos/events/logging/uuid.h"
13#include "flatbuffers/flatbuffers.h"
14#include "glog/logging.h"
15
16namespace aos {
17namespace logger {
18
19void LogNamer::UpdateHeader(
20 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
21 const UUID &uuid, int parts_index) const {
22 header->mutable_message()->mutate_parts_index(parts_index);
23 CHECK_EQ(uuid.string_view().size(),
24 header->mutable_message()->mutable_parts_uuid()->size());
25 std::copy(uuid.string_view().begin(), uuid.string_view().end(),
26 reinterpret_cast<char *>(
27 header->mutable_message()->mutable_parts_uuid()->Data()));
28}
29
30void LocalLogNamer::WriteHeader(
31 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
32 const Node *node) {
33 CHECK_EQ(node, this->node());
34 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070035 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070036}
37
38DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
39 CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
40 return data_writer_.get();
41}
42
43void LocalLogNamer::Rotate(
44 const Node *node,
45 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
46 CHECK(node == this->node());
47 ++part_number_;
48 *data_writer_ = std::move(*OpenDataWriter());
49 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070050 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070051}
52
53DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
54 const Channel *channel) {
55 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
56 << ": Message is not delivered to this node.";
57 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
58 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
59 node_))
60 << ": Delivery times aren't logged for this channel on this node.";
61 return data_writer_.get();
62}
63
64DetachedBufferWriter *LocalLogNamer::MakeForwardedTimestampWriter(
65 const Channel * /*channel*/, const Node * /*node*/) {
66 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
67 return nullptr;
68}
69
70MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
71 const Configuration *configuration,
Brian Silverman48deab12020-09-30 18:39:28 -070072 const Node *node,
73 std::string_view temp_suffix)
Austin Schuhcb5601b2020-09-10 15:29:59 -070074 : LogNamer(node),
75 base_name_(base_name),
Brian Silverman48deab12020-09-30 18:39:28 -070076 temp_suffix_(temp_suffix),
Austin Schuhcb5601b2020-09-10 15:29:59 -070077 configuration_(configuration),
Brian Silvermana621f522020-09-30 16:52:43 -070078 uuid_(UUID::Random()) {
79 OpenDataWriter();
80}
Austin Schuhcb5601b2020-09-10 15:29:59 -070081
Brian Silverman48deab12020-09-30 18:39:28 -070082MultiNodeLogNamer::~MultiNodeLogNamer() {
83 if (!ran_out_of_space_) {
84 // This handles renaming temporary files etc.
85 Close();
86 }
87}
88
Austin Schuhcb5601b2020-09-10 15:29:59 -070089void MultiNodeLogNamer::WriteHeader(
90 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
91 const Node *node) {
92 if (node == this->node()) {
93 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -070094 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -070095 } else {
96 for (std::pair<const Channel *const, DataWriter> &data_writer :
97 data_writers_) {
98 if (node == data_writer.second.node) {
99 UpdateHeader(header, data_writer.second.uuid,
100 data_writer.second.part_number);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700101 data_writer.second.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700102 }
103 }
104 }
105}
106
107void MultiNodeLogNamer::Rotate(
108 const Node *node,
109 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
110 if (node == this->node()) {
111 ++part_number_;
Brian Silvermana621f522020-09-30 16:52:43 -0700112 OpenDataWriter();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700113 UpdateHeader(header, uuid_, part_number_);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700114 data_writer_->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700115 } else {
116 for (std::pair<const Channel *const, DataWriter> &data_writer :
117 data_writers_) {
118 if (node == data_writer.second.node) {
119 ++data_writer.second.part_number;
120 data_writer.second.rotate(data_writer.first, &data_writer.second);
121 UpdateHeader(header, data_writer.second.uuid,
122 data_writer.second.part_number);
Brian Silvermanf51499a2020-09-21 12:49:08 -0700123 data_writer.second.writer->QueueSpan(header->full_span());
Austin Schuhcb5601b2020-09-10 15:29:59 -0700124 }
125 }
126 }
127}
128
129DetachedBufferWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
130 // See if we can read the data on this node at all.
131 const bool is_readable =
132 configuration::ChannelIsReadableOnNode(channel, this->node());
133 if (!is_readable) {
134 return nullptr;
135 }
136
137 // Then, see if we are supposed to log the data here.
138 const bool log_message =
139 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
140
141 if (!log_message) {
142 return nullptr;
143 }
144
145 // Now, sort out if this is data generated on this node, or not. It is
146 // generated if it is sendable on this node.
147 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
148 return data_writer_.get();
149 }
150
151 // Ok, we have data that is being forwarded to us that we are supposed to
152 // log. It needs to be logged with send timestamps, but be sorted enough
153 // to be able to be processed.
154 CHECK(data_writers_.find(channel) == data_writers_.end());
155
156 // Track that this node is being logged.
157 const Node *source_node = configuration::GetNode(
158 configuration_, channel->source_node()->string_view());
159
160 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
161 nodes_.emplace_back(source_node);
162 }
163
164 DataWriter data_writer;
165 data_writer.node = source_node;
166 data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
167 OpenWriter(channel, data_writer);
168 };
169 data_writer.rotate(channel, &data_writer);
170
171 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
172 .first->second.writer.get();
173}
174
175DetachedBufferWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
176 const Channel *channel, const Node *node) {
177 // See if we can read the data on this node at all.
178 const bool is_readable =
179 configuration::ChannelIsReadableOnNode(channel, this->node());
180 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
181
182 CHECK(data_writers_.find(channel) == data_writers_.end());
183
184 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
185 nodes_.emplace_back(node);
186 }
187
188 DataWriter data_writer;
189 data_writer.node = node;
190 data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
191 OpenForwardedTimestampWriter(channel, data_writer);
192 };
193 data_writer.rotate(channel, &data_writer);
194
195 return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
196 .first->second.writer.get();
197}
198
199DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
200 const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700201 bool log_delivery_times = false;
202 if (this->node() != nullptr) {
203 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
204 channel, this->node(), this->node());
205 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700206 if (!log_delivery_times) {
207 return nullptr;
208 }
209
210 return data_writer_.get();
211}
212
Brian Silverman0465fcf2020-09-24 00:29:18 -0700213void MultiNodeLogNamer::Close() {
214 for (std::pair<const Channel *const, DataWriter> &data_writer :
215 data_writers_) {
216 if (data_writer.second.writer) {
217 data_writer.second.writer->Close();
218 if (data_writer.second.writer->ran_out_of_space()) {
219 ran_out_of_space_ = true;
220 data_writer.second.writer->acknowledge_out_of_space();
221 }
Brian Silverman48deab12020-09-30 18:39:28 -0700222 RenameTempFile(data_writer.second.writer.get());
223 data_writer.second.writer.reset();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700224 }
225 }
226 if (data_writer_) {
227 data_writer_->Close();
228 if (data_writer_->ran_out_of_space()) {
229 ran_out_of_space_ = true;
230 data_writer_->acknowledge_out_of_space();
231 }
Brian Silverman48deab12020-09-30 18:39:28 -0700232 RenameTempFile(data_writer_.get());
233 data_writer_.reset();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700234 }
235}
236
Austin Schuhcb5601b2020-09-10 15:29:59 -0700237void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
238 DataWriter *data_writer) {
239 std::string filename =
Brian Silvermana621f522020-09-30 16:52:43 -0700240 absl::StrCat("_timestamps", channel->name()->string_view(), "/",
241 channel->type()->string_view(), ".part",
Austin Schuhcb5601b2020-09-10 15:29:59 -0700242 data_writer->part_number, ".bfbs");
Brian Silverman0465fcf2020-09-24 00:29:18 -0700243 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700244}
245
246void MultiNodeLogNamer::OpenWriter(const Channel *channel,
247 DataWriter *data_writer) {
248 const std::string filename = absl::StrCat(
Brian Silvermana621f522020-09-30 16:52:43 -0700249 "_", CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
250 channel->name()->string_view(), "/", channel->type()->string_view(),
251 ".part", data_writer->part_number, ".bfbs");
Brian Silverman0465fcf2020-09-24 00:29:18 -0700252 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700253}
254
Brian Silvermana621f522020-09-30 16:52:43 -0700255void MultiNodeLogNamer::OpenDataWriter() {
256 std::string name;
Brian Silverman7af8c902020-09-29 16:14:04 -0700257 if (node() != nullptr) {
258 name = absl::StrCat(name, "_", node()->name()->string_view());
259 }
Brian Silvermana621f522020-09-30 16:52:43 -0700260 absl::StrAppend(&name, "_data.part", part_number_, ".bfbs");
261 CreateBufferWriter(name, &data_writer_);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700262}
263
Brian Silverman0465fcf2020-09-24 00:29:18 -0700264void MultiNodeLogNamer::CreateBufferWriter(
Brian Silvermana621f522020-09-30 16:52:43 -0700265 std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700266 if (ran_out_of_space_) {
267 // Refuse to open any new files, which might skip data. Any existing files
268 // are in the same folder, which means they're on the same filesystem, which
269 // means they're probably going to run out of space and get stuck too.
270 return;
271 }
Brian Silverman48deab12020-09-30 18:39:28 -0700272 const std::string filename = absl::StrCat(base_name_, path, temp_suffix_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700273 if (!destination->get()) {
Brian Silvermana621f522020-09-30 16:52:43 -0700274 all_filenames_.emplace_back(path);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700275 *destination = std::make_unique<DetachedBufferWriter>(
276 filename, std::make_unique<DummyEncoder>());
277 return;
278 }
279 destination->get()->Close();
280 if (destination->get()->ran_out_of_space()) {
281 ran_out_of_space_ = true;
282 return;
283 }
Brian Silverman48deab12020-09-30 18:39:28 -0700284 RenameTempFile(destination->get());
Brian Silvermana621f522020-09-30 16:52:43 -0700285 all_filenames_.emplace_back(path);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700286 *destination->get() =
287 DetachedBufferWriter(filename, std::make_unique<DummyEncoder>());
288}
289
Brian Silverman48deab12020-09-30 18:39:28 -0700290void MultiNodeLogNamer::RenameTempFile(DetachedBufferWriter *destination) {
291 if (temp_suffix_.empty()) {
292 return;
293 }
294 const std::string current_filename = std::string(destination->filename());
295 CHECK(current_filename.size() > temp_suffix_.size());
296 const std::string final_filename =
297 current_filename.substr(0, current_filename.size() - temp_suffix_.size());
298 const int result = rename(current_filename.c_str(), final_filename.c_str());
299 if (result != 0) {
300 if (errno == ENOSPC) {
301 ran_out_of_space_ = true;
302 return;
303 } else {
304 PLOG(FATAL) << "Renaming " << current_filename << " to " << final_filename
305 << " failed";
306 }
307 }
308}
309
Austin Schuhcb5601b2020-09-10 15:29:59 -0700310} // namespace logger
311} // namespace aos