blob: 7ff3bc41652e2af8a54d165ada38e1688ef71e0f [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
Austin Schuh73340842021-07-30 22:32:06 -070012#include "aos/flatbuffer_merge.h"
Austin Schuh4385b142021-03-14 21:31:13 -070013#include "aos/uuid.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070014#include "flatbuffers/flatbuffers.h"
15#include "glog/logging.h"
16
17namespace aos {
18namespace logger {
19
Austin Schuh73340842021-07-30 22:32:06 -070020aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
21 size_t node_index, const UUID &source_node_boot_uuid,
22 const UUID &parts_uuid, int parts_index) const {
23 const Node *const source_node =
24 configuration::GetNode(configuration_, node_index);
25 CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 18u);
26 flatbuffers::FlatBufferBuilder fbb;
27 fbb.ForceDefaults(true);
28
29 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
30 flatbuffers::Offset<aos::Configuration> configuration_offset;
31 if (header_.message().has_configuration()) {
32 CHECK(!header_.message().has_configuration_sha256());
33 configuration_offset =
34 CopyFlatBuffer(header_.message().configuration(), &fbb);
35 } else {
36 CHECK(!header_.message().has_configuration());
37 CHECK(header_.message().has_configuration_sha256());
38 config_sha256_offset = fbb.CreateString(
39 header_.message().configuration_sha256()->string_view());
40 }
41
42 CHECK(header_.message().has_name());
43 const flatbuffers::Offset<flatbuffers::String> name_offset =
44 fbb.CreateString(header_.message().name()->string_view());
45
46 CHECK(header_.message().has_log_event_uuid());
47 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
48 fbb.CreateString(header_.message().log_event_uuid()->string_view());
49
50 CHECK(header_.message().has_logger_instance_uuid());
51 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
52 fbb.CreateString(header_.message().logger_instance_uuid()->string_view());
53
54 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
55 if (header_.message().has_log_start_uuid()) {
56 log_start_uuid_offset =
57 fbb.CreateString(header_.message().log_start_uuid()->string_view());
58 }
59
60 CHECK(header_.message().has_logger_node_boot_uuid());
61 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
62 fbb.CreateString(
63 header_.message().logger_node_boot_uuid()->string_view());
64
65 CHECK_NE(source_node_boot_uuid, UUID::Zero());
66 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
67 source_node_boot_uuid.PackString(&fbb);
68
69 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
70 parts_uuid.PackString(&fbb);
71
72 flatbuffers::Offset<Node> node_offset;
73 flatbuffers::Offset<Node> logger_node_offset;
74
75 if (configuration::MultiNode(configuration_)) {
76 node_offset = RecursiveCopyFlatBuffer(source_node, &fbb);
77 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
78 }
79
80 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
81
82 log_file_header_builder.add_name(name_offset);
83
84 // Only add the node if we are running in a multinode configuration.
85 if (!logger_node_offset.IsNull()) {
86 log_file_header_builder.add_node(node_offset);
87 log_file_header_builder.add_logger_node(logger_node_offset);
88 }
89
90 if (!configuration_offset.IsNull()) {
91 log_file_header_builder.add_configuration(configuration_offset);
92 }
93 log_file_header_builder.add_max_out_of_order_duration(
94 header_.message().max_out_of_order_duration());
95
96 log_file_header_builder.add_monotonic_start_time(
97 std::chrono::duration_cast<std::chrono::nanoseconds>(
98 node_states_[node_index].monotonic_start_time.time_since_epoch())
99 .count());
100 if (source_node == node_) {
101 log_file_header_builder.add_realtime_start_time(
102 std::chrono::duration_cast<std::chrono::nanoseconds>(
103 node_states_[node_index].realtime_start_time.time_since_epoch())
104 .count());
105 } else {
106 // Fill out the legacy start times. Since these were implemented to never
107 // change on reboot, they aren't very helpful in tracking what happened.
108 log_file_header_builder.add_logger_monotonic_start_time(
109 std::chrono::duration_cast<std::chrono::nanoseconds>(
110 node_states_[node_index]
111 .logger_monotonic_start_time.time_since_epoch())
112 .count());
113 log_file_header_builder.add_logger_realtime_start_time(
114 std::chrono::duration_cast<std::chrono::nanoseconds>(
115 node_states_[node_index]
116 .logger_realtime_start_time.time_since_epoch())
117 .count());
118 }
119
120 // TODO(austin): Add more useful times. When was this part started? What do
121 // we know about both the logger and remote then?
122
123 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
124 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
125 if (!log_start_uuid_offset.IsNull()) {
126 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
127 }
128 log_file_header_builder.add_logger_node_boot_uuid(
129 logger_node_boot_uuid_offset);
130 log_file_header_builder.add_source_node_boot_uuid(
131 source_node_boot_uuid_offset);
132
133 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
134 log_file_header_builder.add_parts_index(parts_index);
135
136 if (!config_sha256_offset.IsNull()) {
137 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
138 }
139
140 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
141 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
142 fbb.Release());
143
144 CHECK(result.Verify()) << ": Built a corrupted header.";
145
146 return result;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700147}
148
Austin Schuh73340842021-07-30 22:32:06 -0700149void LocalLogNamer::WriteHeader(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700150 CHECK_EQ(node, this->node());
Austin Schuh73340842021-07-30 22:32:06 -0700151 const size_t node_index = configuration::GetNodeIndex(configuration_, node);
152 data_writer_.QueueHeader(
153 MakeHeader(node_index, node_states_[node_index].source_node_boot_uuid,
154 data_writer_.uuid(), data_writer_.part_number));
Austin Schuhcb5601b2020-09-10 15:29:59 -0700155}
156
Austin Schuhb8bca732021-07-30 22:32:00 -0700157NewDataWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhdf576472020-10-19 09:39:37 -0700158 CHECK(configuration::ChannelIsSendableOnNode(channel, node()))
159 << ": " << configuration::CleanedChannelToString(channel);
Austin Schuhb8bca732021-07-30 22:32:00 -0700160 return &data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700161}
162
Austin Schuh73340842021-07-30 22:32:06 -0700163void LocalLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700164 CHECK(node == this->node());
Austin Schuh73340842021-07-30 22:32:06 -0700165 const size_t node_index = configuration::GetNodeIndex(configuration_, node);
Austin Schuhb8bca732021-07-30 22:32:00 -0700166 data_writer_.Rotate();
Austin Schuh73340842021-07-30 22:32:06 -0700167
168 data_writer_.QueueHeader(
169 MakeHeader(node_index, node_states_[node_index].source_node_boot_uuid,
170 data_writer_.uuid(), data_writer_.part_number));
Austin Schuhcb5601b2020-09-10 15:29:59 -0700171}
Austin Schuh8c399962020-12-25 21:51:45 -0800172
173void LocalLogNamer::WriteConfiguration(
174 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
175 std::string_view config_sha256) {
176 const std::string filename = absl::StrCat(base_name_, config_sha256, ".bfbs");
177
178 std::unique_ptr<DetachedBufferWriter> writer =
179 std::make_unique<DetachedBufferWriter>(
180 filename, std::make_unique<aos::logger::DummyEncoder>());
181 writer->QueueSizedFlatbuffer(header->Release());
182}
183
Austin Schuh73340842021-07-30 22:32:06 -0700184void LocalLogNamer::Reboot(const Node * /*node*/
185) {
Austin Schuh315b96b2020-12-11 21:21:12 -0800186 LOG(FATAL) << "Can't reboot a single node.";
187}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700188
Austin Schuhb8bca732021-07-30 22:32:00 -0700189NewDataWriter *LocalLogNamer::MakeTimestampWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700190 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
191 << ": Message is not delivered to this node.";
192 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
193 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
194 node_))
195 << ": Delivery times aren't logged for this channel on this node.";
Austin Schuhb8bca732021-07-30 22:32:00 -0700196 return &data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700197}
198
Austin Schuhb8bca732021-07-30 22:32:00 -0700199NewDataWriter *LocalLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700200 const Channel * /*channel*/, const Node * /*node*/) {
201 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
202 return nullptr;
203}
204
205MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
206 const Configuration *configuration,
Brian Silvermancb805822020-10-06 17:43:35 -0700207 const Node *node)
Austin Schuh73340842021-07-30 22:32:06 -0700208 : LogNamer(configuration, node), base_name_(base_name), old_base_name_() {}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700209
Brian Silverman48deab12020-09-30 18:39:28 -0700210MultiNodeLogNamer::~MultiNodeLogNamer() {
211 if (!ran_out_of_space_) {
212 // This handles renaming temporary files etc.
213 Close();
214 }
215}
216
Austin Schuh73340842021-07-30 22:32:06 -0700217void MultiNodeLogNamer::WriteHeader(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700218 if (node == this->node()) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700219 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700220 OpenDataWriter();
221 }
Austin Schuh73340842021-07-30 22:32:06 -0700222
223 const size_t node_index = configuration::GetNodeIndex(configuration_, node);
224 data_writer_->QueueHeader(
225 MakeHeader(node_index, node_states_[node_index].source_node_boot_uuid,
226 data_writer_->uuid(), data_writer_->part_number));
Austin Schuhcb5601b2020-09-10 15:29:59 -0700227 } else {
Austin Schuh73340842021-07-30 22:32:06 -0700228 const size_t node_index = configuration::GetNodeIndex(configuration_, node);
Austin Schuhb8bca732021-07-30 22:32:00 -0700229 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Austin Schuhcb5601b2020-09-10 15:29:59 -0700230 data_writers_) {
231 if (node == data_writer.second.node) {
Austin Schuh73340842021-07-30 22:32:06 -0700232 data_writer.second.QueueHeader(MakeHeader(
233 node_index, node_states_[node_index].source_node_boot_uuid,
234 data_writer.second.uuid(), data_writer.second.part_number));
Austin Schuhcb5601b2020-09-10 15:29:59 -0700235 }
236 }
237 }
238}
239
Austin Schuh73340842021-07-30 22:32:06 -0700240void MultiNodeLogNamer::Rotate(const Node *node) { DoRotate(node, false); }
Austin Schuh315b96b2020-12-11 21:21:12 -0800241
Austin Schuh73340842021-07-30 22:32:06 -0700242void MultiNodeLogNamer::Reboot(const Node *node) { DoRotate(node, true); }
Austin Schuh315b96b2020-12-11 21:21:12 -0800243
Austin Schuh73340842021-07-30 22:32:06 -0700244void MultiNodeLogNamer::DoRotate(const Node *node, bool reboot) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700245 if (node == this->node()) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700246 if (data_writer_) {
Austin Schuh315b96b2020-12-11 21:21:12 -0800247 if (reboot) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700248 data_writer_->Reboot();
249 } else {
250 data_writer_->Rotate();
Austin Schuh315b96b2020-12-11 21:21:12 -0800251 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700252 // TODO(austin): Move this logic down once we have a better ownership
253 // model for the header.
Austin Schuh73340842021-07-30 22:32:06 -0700254
255 const size_t node_index =
256 configuration::GetNodeIndex(configuration_, node);
257 data_writer_->QueueHeader(
258 MakeHeader(node_index, node_states_[node_index].source_node_boot_uuid,
259 data_writer_->uuid(), data_writer_->part_number));
Brian Silvermancb805822020-10-06 17:43:35 -0700260 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700261 } else {
Austin Schuh73340842021-07-30 22:32:06 -0700262 const size_t node_index = configuration::GetNodeIndex(configuration_, node);
Austin Schuhb8bca732021-07-30 22:32:00 -0700263 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Austin Schuhcb5601b2020-09-10 15:29:59 -0700264 data_writers_) {
265 if (node == data_writer.second.node) {
Austin Schuh315b96b2020-12-11 21:21:12 -0800266 if (reboot) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700267 data_writer.second.Reboot();
268 } else {
269 data_writer.second.Rotate();
Austin Schuh315b96b2020-12-11 21:21:12 -0800270 }
Austin Schuh73340842021-07-30 22:32:06 -0700271 data_writer.second.QueueHeader(MakeHeader(
272 node_index, node_states_[node_index].source_node_boot_uuid,
273 data_writer.second.uuid(), data_writer.second.part_number));
Austin Schuhcb5601b2020-09-10 15:29:59 -0700274 }
275 }
276 }
277}
278
Austin Schuh8c399962020-12-25 21:51:45 -0800279void MultiNodeLogNamer::WriteConfiguration(
280 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
281 std::string_view config_sha256) {
282 if (ran_out_of_space_) {
283 return;
284 }
285
286 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
287 const std::string filename = absl::StrCat(
288 base_name_, separator, config_sha256, ".bfbs", extension_, temp_suffix_);
289
290 std::unique_ptr<DetachedBufferWriter> writer =
291 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
292
293 writer->QueueSizedFlatbuffer(header->Release());
294
295 if (!writer->ran_out_of_space()) {
296 all_filenames_.emplace_back(filename);
297 }
298 CloseWriter(&writer);
299}
300
Austin Schuhb8bca732021-07-30 22:32:00 -0700301NewDataWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700302 // See if we can read the data on this node at all.
303 const bool is_readable =
304 configuration::ChannelIsReadableOnNode(channel, this->node());
305 if (!is_readable) {
306 return nullptr;
307 }
308
309 // Then, see if we are supposed to log the data here.
310 const bool log_message =
311 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
312
313 if (!log_message) {
314 return nullptr;
315 }
316
317 // Now, sort out if this is data generated on this node, or not. It is
318 // generated if it is sendable on this node.
319 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700320 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700321 OpenDataWriter();
322 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700323 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700324 }
325
326 // Ok, we have data that is being forwarded to us that we are supposed to
327 // log. It needs to be logged with send timestamps, but be sorted enough
328 // to be able to be processed.
329 CHECK(data_writers_.find(channel) == data_writers_.end());
330
331 // Track that this node is being logged.
332 const Node *source_node = configuration::GetNode(
333 configuration_, channel->source_node()->string_view());
334
335 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
336 nodes_.emplace_back(source_node);
337 }
338
Austin Schuhb8bca732021-07-30 22:32:00 -0700339 NewDataWriter data_writer(
340 [this, channel](NewDataWriter *data_writer) {
341 OpenWriter(channel, data_writer);
342 },
343 [this](NewDataWriter *data_writer) {
344 CloseWriter(&data_writer->writer);
345 });
Austin Schuhcb5601b2020-09-10 15:29:59 -0700346 data_writer.node = source_node;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700347
Austin Schuhb8bca732021-07-30 22:32:00 -0700348 return &(
349 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700350}
351
Austin Schuhb8bca732021-07-30 22:32:00 -0700352NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700353 const Channel *channel, const Node *node) {
354 // See if we can read the data on this node at all.
355 const bool is_readable =
356 configuration::ChannelIsReadableOnNode(channel, this->node());
357 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
358
359 CHECK(data_writers_.find(channel) == data_writers_.end());
360
361 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
362 nodes_.emplace_back(node);
363 }
364
Austin Schuhb8bca732021-07-30 22:32:00 -0700365 NewDataWriter data_writer(
366 [this, channel](NewDataWriter *data_writer) {
367 OpenForwardedTimestampWriter(channel, data_writer);
368 },
369 [this](NewDataWriter *data_writer) {
370 CloseWriter(&data_writer->writer);
371 });
Austin Schuhcb5601b2020-09-10 15:29:59 -0700372 data_writer.node = node;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700373
Austin Schuhb8bca732021-07-30 22:32:00 -0700374 return &(
375 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700376}
377
Austin Schuhb8bca732021-07-30 22:32:00 -0700378NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700379 bool log_delivery_times = false;
380 if (this->node() != nullptr) {
381 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
382 channel, this->node(), this->node());
383 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700384 if (!log_delivery_times) {
385 return nullptr;
386 }
387
Austin Schuhb8bca732021-07-30 22:32:00 -0700388 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700389 OpenDataWriter();
390 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700391 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700392}
393
Brian Silverman0465fcf2020-09-24 00:29:18 -0700394void MultiNodeLogNamer::Close() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700395 data_writers_.clear();
396 data_writer_.reset();
Brian Silvermancb805822020-10-06 17:43:35 -0700397}
398
399void MultiNodeLogNamer::ResetStatistics() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700400 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Brian Silvermancb805822020-10-06 17:43:35 -0700401 data_writers_) {
Austin Schuhad0cfc32020-12-21 12:34:26 -0800402 if (!data_writer.second.writer) continue;
Brian Silvermancb805822020-10-06 17:43:35 -0700403 data_writer.second.writer->ResetStatistics();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700404 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700405 if (data_writer_) {
406 data_writer_->writer->ResetStatistics();
Brian Silvermancb805822020-10-06 17:43:35 -0700407 }
408 max_write_time_ = std::chrono::nanoseconds::zero();
409 max_write_time_bytes_ = -1;
410 max_write_time_messages_ = -1;
411 total_write_time_ = std::chrono::nanoseconds::zero();
412 total_write_count_ = 0;
413 total_write_messages_ = 0;
414 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700415}
416
Austin Schuhb8bca732021-07-30 22:32:00 -0700417void MultiNodeLogNamer::OpenForwardedTimestampWriter(
418 const Channel *channel, NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700419 std::string filename =
Austin Schuhe715eae2020-10-10 15:39:30 -0700420 absl::StrCat("timestamps", channel->name()->string_view(), "/",
Brian Silvermana621f522020-09-30 16:52:43 -0700421 channel->type()->string_view(), ".part",
Brian Silverman1b071eb2020-10-09 12:24:10 -0700422 data_writer->part_number, ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700423 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700424}
425
426void MultiNodeLogNamer::OpenWriter(const Channel *channel,
Austin Schuhb8bca732021-07-30 22:32:00 -0700427 NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700428 const std::string filename = absl::StrCat(
Austin Schuhe715eae2020-10-10 15:39:30 -0700429 CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
Brian Silvermana621f522020-09-30 16:52:43 -0700430 channel->name()->string_view(), "/", channel->type()->string_view(),
Brian Silvermancb805822020-10-06 17:43:35 -0700431 ".part", data_writer->part_number, ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700432 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700433}
434
Brian Silvermana621f522020-09-30 16:52:43 -0700435void MultiNodeLogNamer::OpenDataWriter() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700436 if (!data_writer_) {
437 data_writer_ = std::make_unique<NewDataWriter>(
438 [this](NewDataWriter *writer) {
439 std::string name;
440 if (node() != nullptr) {
441 name = absl::StrCat(name, node()->name()->string_view(), "_");
442 }
443 absl::StrAppend(&name, "data.part", writer->part_number, ".bfbs",
444 extension_);
445 CreateBufferWriter(name, &writer->writer);
446 },
447 [this](NewDataWriter *data_writer) {
448 CloseWriter(&data_writer->writer);
449 });
Brian Silverman7af8c902020-09-29 16:14:04 -0700450 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700451}
452
Brian Silverman0465fcf2020-09-24 00:29:18 -0700453void MultiNodeLogNamer::CreateBufferWriter(
Brian Silvermana621f522020-09-30 16:52:43 -0700454 std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700455 if (ran_out_of_space_) {
456 // Refuse to open any new files, which might skip data. Any existing files
457 // are in the same folder, which means they're on the same filesystem, which
458 // means they're probably going to run out of space and get stuck too.
Austin Schuha426f1f2021-03-31 22:27:41 -0700459 if (!destination->get()) {
460 // But avoid leaving a nullptr writer if we're out of space when
461 // attempting to open the first file.
462 *destination = std::make_unique<DetachedBufferWriter>(
463 DetachedBufferWriter::already_out_of_space_t());
464 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700465 return;
466 }
Austin Schuhe715eae2020-10-10 15:39:30 -0700467 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
468 const std::string filename =
469 absl::StrCat(base_name_, separator, path, temp_suffix_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700470 if (!destination->get()) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700471 if (ran_out_of_space_) {
472 *destination = std::make_unique<DetachedBufferWriter>(
473 DetachedBufferWriter::already_out_of_space_t());
474 return;
475 }
Brian Silvermancb805822020-10-06 17:43:35 -0700476 *destination =
477 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700478 if (!destination->get()->ran_out_of_space()) {
479 all_filenames_.emplace_back(path);
480 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700481 return;
482 }
Brian Silvermancb805822020-10-06 17:43:35 -0700483
484 CloseWriter(destination);
485 if (ran_out_of_space_) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700486 *destination->get() =
487 DetachedBufferWriter(DetachedBufferWriter::already_out_of_space_t());
Brian Silverman0465fcf2020-09-24 00:29:18 -0700488 return;
489 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700490
Brian Silvermancb805822020-10-06 17:43:35 -0700491 *destination->get() = DetachedBufferWriter(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700492 if (!destination->get()->ran_out_of_space()) {
493 all_filenames_.emplace_back(path);
494 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700495}
496
Brian Silverman48deab12020-09-30 18:39:28 -0700497void MultiNodeLogNamer::RenameTempFile(DetachedBufferWriter *destination) {
498 if (temp_suffix_.empty()) {
499 return;
500 }
Austin Schuh6bb8a822021-03-31 23:04:39 -0700501 std::string current_filename = std::string(destination->filename());
Brian Silverman48deab12020-09-30 18:39:28 -0700502 CHECK(current_filename.size() > temp_suffix_.size());
Austin Schuh6bb8a822021-03-31 23:04:39 -0700503 std::string final_filename =
Brian Silverman48deab12020-09-30 18:39:28 -0700504 current_filename.substr(0, current_filename.size() - temp_suffix_.size());
Austin Schuh6bb8a822021-03-31 23:04:39 -0700505 int result = rename(current_filename.c_str(), final_filename.c_str());
506
507 // When changing the base name, we rename the log folder while there active
508 // buffer writers. Therefore, the name of that active buffer may still refer
509 // to the old file location rather than the new one. This minimized changes to
510 // existing code.
511 if (result != 0 && errno != ENOSPC && !old_base_name_.empty()) {
512 auto offset = current_filename.find(old_base_name_);
513 if (offset != std::string::npos) {
514 current_filename.replace(offset, old_base_name_.length(), base_name_);
515 }
516 offset = final_filename.find(old_base_name_);
517 if (offset != std::string::npos) {
518 final_filename.replace(offset, old_base_name_.length(), base_name_);
519 }
520 result = rename(current_filename.c_str(), final_filename.c_str());
521 }
522
Brian Silverman48deab12020-09-30 18:39:28 -0700523 if (result != 0) {
524 if (errno == ENOSPC) {
525 ran_out_of_space_ = true;
526 return;
527 } else {
528 PLOG(FATAL) << "Renaming " << current_filename << " to " << final_filename
529 << " failed";
530 }
Austin Schuh6bb8a822021-03-31 23:04:39 -0700531 } else {
532 VLOG(1) << "Renamed " << current_filename << " -> " << final_filename;
Brian Silverman48deab12020-09-30 18:39:28 -0700533 }
534}
535
Brian Silvermancb805822020-10-06 17:43:35 -0700536void MultiNodeLogNamer::CloseWriter(
537 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
538 DetachedBufferWriter *const writer = writer_pointer->get();
539 if (!writer) {
540 return;
541 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700542 const bool was_open = writer->is_open();
Brian Silvermancb805822020-10-06 17:43:35 -0700543 writer->Close();
544
545 if (writer->max_write_time() > max_write_time_) {
546 max_write_time_ = writer->max_write_time();
547 max_write_time_bytes_ = writer->max_write_time_bytes();
548 max_write_time_messages_ = writer->max_write_time_messages();
549 }
550 total_write_time_ += writer->total_write_time();
551 total_write_count_ += writer->total_write_count();
552 total_write_messages_ += writer->total_write_messages();
553 total_write_bytes_ += writer->total_write_bytes();
554
555 if (writer->ran_out_of_space()) {
556 ran_out_of_space_ = true;
557 writer->acknowledge_out_of_space();
558 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700559 if (was_open) {
560 RenameTempFile(writer);
561 } else {
562 CHECK(access(std::string(writer->filename()).c_str(), F_OK) == -1)
563 << ": File should not exist: " << writer->filename();
564 }
Brian Silvermancb805822020-10-06 17:43:35 -0700565}
566
Austin Schuhcb5601b2020-09-10 15:29:59 -0700567} // namespace logger
568} // namespace aos