blob: 5ab4636840c95316b9082223df0932daf9eba481 [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
Austin Schuh73340842021-07-30 22:32:06 -070012#include "aos/flatbuffer_merge.h"
Austin Schuh4385b142021-03-14 21:31:13 -070013#include "aos/uuid.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070014#include "flatbuffers/flatbuffers.h"
15#include "glog/logging.h"
16
17namespace aos {
18namespace logger {
19
Austin Schuh572924a2021-07-30 22:32:12 -070020NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
21 std::function<void(NewDataWriter *)> reopen,
22 std::function<void(NewDataWriter *)> close)
23 : node_(node),
24 node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
25 log_namer_(log_namer),
26 reopen_(std::move(reopen)),
27 close_(std::move(close)) {
Austin Schuhe46492f2021-07-31 19:49:41 -070028 boot_uuids_.resize(configuration::NodesCount(log_namer->configuration_),
29 UUID::Zero());
30 CHECK_LT(node_index_, boot_uuids_.size());
Austin Schuh572924a2021-07-30 22:32:12 -070031 reopen_(this);
32}
33
34NewDataWriter::~NewDataWriter() {
35 if (writer) {
36 Close();
37 }
38}
39
40void NewDataWriter::Rotate() {
Austin Schuhe46492f2021-07-31 19:49:41 -070041 // No need to rotate if nothing has been written.
42 if (header_written_) {
43 ++parts_index_;
44 reopen_(this);
45 header_written_ = false;
46 QueueHeader(MakeHeader());
47 }
Austin Schuh572924a2021-07-30 22:32:12 -070048}
49
50void NewDataWriter::Reboot() {
51 parts_uuid_ = UUID::Random();
52 ++parts_index_;
53 reopen_(this);
54 header_written_ = false;
55}
56
Austin Schuhe46492f2021-07-31 19:49:41 -070057void NewDataWriter::UpdateRemote(size_t remote_node_index,
58 const UUID &remote_node_boot_uuid) {
59 CHECK_LT(remote_node_index, boot_uuids_.size());
60 if (boot_uuids_[remote_node_index] != remote_node_boot_uuid) {
61 VLOG(1) << filename() << " Remote " << remote_node_index << " updated to "
62 << remote_node_boot_uuid << " from "
63 << boot_uuids_[remote_node_index];
64 boot_uuids_[remote_node_index] = remote_node_boot_uuid;
65 Rotate();
66 }
67}
68
69void NewDataWriter::QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
70 const UUID &source_node_boot_uuid,
71 aos::monotonic_clock::time_point now) {
Austin Schuh572924a2021-07-30 22:32:12 -070072 // TODO(austin): Handle remote nodes changing too, not just the source node.
Austin Schuhe46492f2021-07-31 19:49:41 -070073 if (boot_uuids_[node_index_] != source_node_boot_uuid) {
74 boot_uuids_[node_index_] = source_node_boot_uuid;
Austin Schuh572924a2021-07-30 22:32:12 -070075 if (header_written_) {
76 Reboot();
77 }
78
Austin Schuhe46492f2021-07-31 19:49:41 -070079 QueueHeader(MakeHeader());
Austin Schuh572924a2021-07-30 22:32:12 -070080 }
Austin Schuhe46492f2021-07-31 19:49:41 -070081 CHECK_EQ(boot_uuids_[node_index_], source_node_boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -070082 CHECK(header_written_) << ": Attempting to write message before header to "
83 << writer->filename();
84 writer->QueueSizedFlatbuffer(fbb, now);
85}
86
Austin Schuhe46492f2021-07-31 19:49:41 -070087aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
88NewDataWriter::MakeHeader() {
89 const size_t logger_node_index = log_namer_->logger_node_index();
90 const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
91 if (boot_uuids_[logger_node_index] == UUID::Zero()) {
92 VLOG(1) << filename() << " Logger node is " << logger_node_index
93 << " and uuid is " << logger_node_boot_uuid;
94 boot_uuids_[logger_node_index] = logger_node_boot_uuid;
95 } else {
96 CHECK_EQ(boot_uuids_[logger_node_index], logger_node_boot_uuid);
97 }
98 return log_namer_->MakeHeader(node_index_, boot_uuids_, parts_uuid(),
99 parts_index_);
100}
101
Austin Schuh572924a2021-07-30 22:32:12 -0700102void NewDataWriter::QueueHeader(
103 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header) {
104 CHECK(!header_written_) << ": Attempting to write duplicate header to "
105 << writer->filename();
106 CHECK(header.message().has_source_node_boot_uuid());
Austin Schuhe46492f2021-07-31 19:49:41 -0700107 CHECK_EQ(boot_uuids_[node_index_],
108 UUID::FromString(header.message().source_node_boot_uuid()));
Austin Schuh572924a2021-07-30 22:32:12 -0700109 // TODO(austin): This triggers a dummy allocation that we don't need as part
110 // of releasing. Can we skip it?
111 writer->QueueSizedFlatbuffer(header.Release());
112 header_written_ = true;
113}
114
115void NewDataWriter::Close() {
116 CHECK(writer);
117 close_(this);
118 writer.reset();
119 header_written_ = false;
120}
121
Austin Schuh73340842021-07-30 22:32:06 -0700122aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
Austin Schuhe46492f2021-07-31 19:49:41 -0700123 size_t node_index, const std::vector<UUID> &boot_uuids,
Austin Schuh73340842021-07-30 22:32:06 -0700124 const UUID &parts_uuid, int parts_index) const {
Austin Schuhe46492f2021-07-31 19:49:41 -0700125 const UUID &source_node_boot_uuid = boot_uuids[node_index];
Austin Schuh73340842021-07-30 22:32:06 -0700126 const Node *const source_node =
127 configuration::GetNode(configuration_, node_index);
128 CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 18u);
129 flatbuffers::FlatBufferBuilder fbb;
130 fbb.ForceDefaults(true);
131
132 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
133 flatbuffers::Offset<aos::Configuration> configuration_offset;
134 if (header_.message().has_configuration()) {
135 CHECK(!header_.message().has_configuration_sha256());
136 configuration_offset =
137 CopyFlatBuffer(header_.message().configuration(), &fbb);
138 } else {
139 CHECK(!header_.message().has_configuration());
140 CHECK(header_.message().has_configuration_sha256());
141 config_sha256_offset = fbb.CreateString(
142 header_.message().configuration_sha256()->string_view());
143 }
144
145 CHECK(header_.message().has_name());
146 const flatbuffers::Offset<flatbuffers::String> name_offset =
147 fbb.CreateString(header_.message().name()->string_view());
148
149 CHECK(header_.message().has_log_event_uuid());
150 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
151 fbb.CreateString(header_.message().log_event_uuid()->string_view());
152
153 CHECK(header_.message().has_logger_instance_uuid());
154 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
155 fbb.CreateString(header_.message().logger_instance_uuid()->string_view());
156
157 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
158 if (header_.message().has_log_start_uuid()) {
159 log_start_uuid_offset =
160 fbb.CreateString(header_.message().log_start_uuid()->string_view());
161 }
162
163 CHECK(header_.message().has_logger_node_boot_uuid());
164 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
165 fbb.CreateString(
166 header_.message().logger_node_boot_uuid()->string_view());
167
168 CHECK_NE(source_node_boot_uuid, UUID::Zero());
169 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
170 source_node_boot_uuid.PackString(&fbb);
171
172 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
173 parts_uuid.PackString(&fbb);
174
175 flatbuffers::Offset<Node> node_offset;
176 flatbuffers::Offset<Node> logger_node_offset;
177
178 if (configuration::MultiNode(configuration_)) {
179 node_offset = RecursiveCopyFlatBuffer(source_node, &fbb);
180 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
181 }
182
Austin Schuhe46492f2021-07-31 19:49:41 -0700183 std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
184 boot_uuid_offsets.reserve(boot_uuids.size());
185 for (const UUID &uuid : boot_uuids) {
186 if (uuid != UUID::Zero()) {
187 boot_uuid_offsets.emplace_back(uuid.PackString(&fbb));
188 } else {
189 boot_uuid_offsets.emplace_back(fbb.CreateString(""));
190 }
191 }
192
193 flatbuffers::Offset<
194 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
195 boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
196
Austin Schuh73340842021-07-30 22:32:06 -0700197 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
198
199 log_file_header_builder.add_name(name_offset);
200
201 // Only add the node if we are running in a multinode configuration.
202 if (!logger_node_offset.IsNull()) {
203 log_file_header_builder.add_node(node_offset);
204 log_file_header_builder.add_logger_node(logger_node_offset);
205 }
206
207 if (!configuration_offset.IsNull()) {
208 log_file_header_builder.add_configuration(configuration_offset);
209 }
210 log_file_header_builder.add_max_out_of_order_duration(
211 header_.message().max_out_of_order_duration());
212
213 log_file_header_builder.add_monotonic_start_time(
214 std::chrono::duration_cast<std::chrono::nanoseconds>(
215 node_states_[node_index].monotonic_start_time.time_since_epoch())
216 .count());
217 if (source_node == node_) {
218 log_file_header_builder.add_realtime_start_time(
219 std::chrono::duration_cast<std::chrono::nanoseconds>(
220 node_states_[node_index].realtime_start_time.time_since_epoch())
221 .count());
222 } else {
223 // Fill out the legacy start times. Since these were implemented to never
224 // change on reboot, they aren't very helpful in tracking what happened.
225 log_file_header_builder.add_logger_monotonic_start_time(
226 std::chrono::duration_cast<std::chrono::nanoseconds>(
227 node_states_[node_index]
228 .logger_monotonic_start_time.time_since_epoch())
229 .count());
230 log_file_header_builder.add_logger_realtime_start_time(
231 std::chrono::duration_cast<std::chrono::nanoseconds>(
232 node_states_[node_index]
233 .logger_realtime_start_time.time_since_epoch())
234 .count());
235 }
236
237 // TODO(austin): Add more useful times. When was this part started? What do
238 // we know about both the logger and remote then?
239
240 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
241 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
242 if (!log_start_uuid_offset.IsNull()) {
243 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
244 }
245 log_file_header_builder.add_logger_node_boot_uuid(
246 logger_node_boot_uuid_offset);
247 log_file_header_builder.add_source_node_boot_uuid(
248 source_node_boot_uuid_offset);
249
250 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
251 log_file_header_builder.add_parts_index(parts_index);
252
253 if (!config_sha256_offset.IsNull()) {
254 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
255 }
256
Austin Schuhe46492f2021-07-31 19:49:41 -0700257 log_file_header_builder.add_boot_uuids(boot_uuids_offset);
Austin Schuh73340842021-07-30 22:32:06 -0700258 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
259 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
260 fbb.Release());
261
262 CHECK(result.Verify()) << ": Built a corrupted header.";
263
264 return result;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700265}
266
Austin Schuhb8bca732021-07-30 22:32:00 -0700267NewDataWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhdf576472020-10-19 09:39:37 -0700268 CHECK(configuration::ChannelIsSendableOnNode(channel, node()))
269 << ": " << configuration::CleanedChannelToString(channel);
Austin Schuhb8bca732021-07-30 22:32:00 -0700270 return &data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700271}
272
Austin Schuh73340842021-07-30 22:32:06 -0700273void LocalLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700274 CHECK(node == this->node());
Austin Schuhb8bca732021-07-30 22:32:00 -0700275 data_writer_.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700276}
Austin Schuh8c399962020-12-25 21:51:45 -0800277
278void LocalLogNamer::WriteConfiguration(
279 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
280 std::string_view config_sha256) {
281 const std::string filename = absl::StrCat(base_name_, config_sha256, ".bfbs");
282
283 std::unique_ptr<DetachedBufferWriter> writer =
284 std::make_unique<DetachedBufferWriter>(
285 filename, std::make_unique<aos::logger::DummyEncoder>());
286 writer->QueueSizedFlatbuffer(header->Release());
287}
288
Austin Schuhb8bca732021-07-30 22:32:00 -0700289NewDataWriter *LocalLogNamer::MakeTimestampWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700290 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
291 << ": Message is not delivered to this node.";
292 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
293 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
294 node_))
295 << ": Delivery times aren't logged for this channel on this node.";
Austin Schuhb8bca732021-07-30 22:32:00 -0700296 return &data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700297}
298
Austin Schuhb8bca732021-07-30 22:32:00 -0700299NewDataWriter *LocalLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700300 const Channel * /*channel*/, const Node * /*node*/) {
301 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
302 return nullptr;
303}
304
305MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
306 const Configuration *configuration,
Brian Silvermancb805822020-10-06 17:43:35 -0700307 const Node *node)
Austin Schuh73340842021-07-30 22:32:06 -0700308 : LogNamer(configuration, node), base_name_(base_name), old_base_name_() {}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700309
Brian Silverman48deab12020-09-30 18:39:28 -0700310MultiNodeLogNamer::~MultiNodeLogNamer() {
311 if (!ran_out_of_space_) {
312 // This handles renaming temporary files etc.
313 Close();
314 }
315}
316
Austin Schuh572924a2021-07-30 22:32:12 -0700317void MultiNodeLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700318 if (node == this->node()) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700319 if (data_writer_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700320 data_writer_->Rotate();
Brian Silvermancb805822020-10-06 17:43:35 -0700321 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700322 } else {
Austin Schuhb8bca732021-07-30 22:32:00 -0700323 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Austin Schuhcb5601b2020-09-10 15:29:59 -0700324 data_writers_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700325 if (node == data_writer.second.node()) {
326 data_writer.second.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700327 }
328 }
329 }
330}
331
Austin Schuh8c399962020-12-25 21:51:45 -0800332void MultiNodeLogNamer::WriteConfiguration(
333 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
334 std::string_view config_sha256) {
335 if (ran_out_of_space_) {
336 return;
337 }
338
339 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
340 const std::string filename = absl::StrCat(
341 base_name_, separator, config_sha256, ".bfbs", extension_, temp_suffix_);
342
343 std::unique_ptr<DetachedBufferWriter> writer =
344 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
345
346 writer->QueueSizedFlatbuffer(header->Release());
347
348 if (!writer->ran_out_of_space()) {
349 all_filenames_.emplace_back(filename);
350 }
351 CloseWriter(&writer);
352}
353
Austin Schuhb8bca732021-07-30 22:32:00 -0700354NewDataWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700355 // See if we can read the data on this node at all.
356 const bool is_readable =
357 configuration::ChannelIsReadableOnNode(channel, this->node());
358 if (!is_readable) {
359 return nullptr;
360 }
361
362 // Then, see if we are supposed to log the data here.
363 const bool log_message =
364 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
365
366 if (!log_message) {
367 return nullptr;
368 }
369
370 // Now, sort out if this is data generated on this node, or not. It is
371 // generated if it is sendable on this node.
372 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700373 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700374 OpenDataWriter();
375 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700376 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700377 }
378
379 // Ok, we have data that is being forwarded to us that we are supposed to
380 // log. It needs to be logged with send timestamps, but be sorted enough
381 // to be able to be processed.
382 CHECK(data_writers_.find(channel) == data_writers_.end());
383
384 // Track that this node is being logged.
385 const Node *source_node = configuration::GetNode(
386 configuration_, channel->source_node()->string_view());
387
388 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
389 nodes_.emplace_back(source_node);
390 }
391
Austin Schuh572924a2021-07-30 22:32:12 -0700392 NewDataWriter data_writer(this, source_node,
393 [this, channel](NewDataWriter *data_writer) {
394 OpenWriter(channel, data_writer);
395 },
396 [this](NewDataWriter *data_writer) {
397 CloseWriter(&data_writer->writer);
398 });
Austin Schuhb8bca732021-07-30 22:32:00 -0700399 return &(
400 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700401}
402
Austin Schuhb8bca732021-07-30 22:32:00 -0700403NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700404 const Channel *channel, const Node *node) {
405 // See if we can read the data on this node at all.
406 const bool is_readable =
407 configuration::ChannelIsReadableOnNode(channel, this->node());
408 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
409
410 CHECK(data_writers_.find(channel) == data_writers_.end());
411
412 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
413 nodes_.emplace_back(node);
414 }
415
Austin Schuh572924a2021-07-30 22:32:12 -0700416 NewDataWriter data_writer(this, node,
417 [this, channel](NewDataWriter *data_writer) {
418 OpenForwardedTimestampWriter(channel,
419 data_writer);
420 },
421 [this](NewDataWriter *data_writer) {
422 CloseWriter(&data_writer->writer);
423 });
Austin Schuhb8bca732021-07-30 22:32:00 -0700424 return &(
425 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700426}
427
Austin Schuhb8bca732021-07-30 22:32:00 -0700428NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700429 bool log_delivery_times = false;
430 if (this->node() != nullptr) {
431 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
432 channel, this->node(), this->node());
433 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700434 if (!log_delivery_times) {
435 return nullptr;
436 }
437
Austin Schuhb8bca732021-07-30 22:32:00 -0700438 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700439 OpenDataWriter();
440 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700441 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700442}
443
Brian Silverman0465fcf2020-09-24 00:29:18 -0700444void MultiNodeLogNamer::Close() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700445 data_writers_.clear();
446 data_writer_.reset();
Brian Silvermancb805822020-10-06 17:43:35 -0700447}
448
449void MultiNodeLogNamer::ResetStatistics() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700450 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Brian Silvermancb805822020-10-06 17:43:35 -0700451 data_writers_) {
Austin Schuhad0cfc32020-12-21 12:34:26 -0800452 if (!data_writer.second.writer) continue;
Brian Silvermancb805822020-10-06 17:43:35 -0700453 data_writer.second.writer->ResetStatistics();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700454 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700455 if (data_writer_) {
456 data_writer_->writer->ResetStatistics();
Brian Silvermancb805822020-10-06 17:43:35 -0700457 }
458 max_write_time_ = std::chrono::nanoseconds::zero();
459 max_write_time_bytes_ = -1;
460 max_write_time_messages_ = -1;
461 total_write_time_ = std::chrono::nanoseconds::zero();
462 total_write_count_ = 0;
463 total_write_messages_ = 0;
464 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700465}
466
Austin Schuhb8bca732021-07-30 22:32:00 -0700467void MultiNodeLogNamer::OpenForwardedTimestampWriter(
468 const Channel *channel, NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700469 std::string filename =
Austin Schuhe715eae2020-10-10 15:39:30 -0700470 absl::StrCat("timestamps", channel->name()->string_view(), "/",
Brian Silvermana621f522020-09-30 16:52:43 -0700471 channel->type()->string_view(), ".part",
Austin Schuh572924a2021-07-30 22:32:12 -0700472 data_writer->parts_index(), ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700473 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700474}
475
476void MultiNodeLogNamer::OpenWriter(const Channel *channel,
Austin Schuhb8bca732021-07-30 22:32:00 -0700477 NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700478 const std::string filename = absl::StrCat(
Austin Schuhe715eae2020-10-10 15:39:30 -0700479 CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
Brian Silvermana621f522020-09-30 16:52:43 -0700480 channel->name()->string_view(), "/", channel->type()->string_view(),
Austin Schuh572924a2021-07-30 22:32:12 -0700481 ".part", data_writer->parts_index(), ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700482 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700483}
484
Brian Silvermana621f522020-09-30 16:52:43 -0700485void MultiNodeLogNamer::OpenDataWriter() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700486 if (!data_writer_) {
487 data_writer_ = std::make_unique<NewDataWriter>(
Austin Schuh572924a2021-07-30 22:32:12 -0700488 this, node_,
Austin Schuhb8bca732021-07-30 22:32:00 -0700489 [this](NewDataWriter *writer) {
490 std::string name;
491 if (node() != nullptr) {
492 name = absl::StrCat(name, node()->name()->string_view(), "_");
493 }
Austin Schuh572924a2021-07-30 22:32:12 -0700494 absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
Austin Schuhb8bca732021-07-30 22:32:00 -0700495 extension_);
496 CreateBufferWriter(name, &writer->writer);
497 },
498 [this](NewDataWriter *data_writer) {
499 CloseWriter(&data_writer->writer);
500 });
Brian Silverman7af8c902020-09-29 16:14:04 -0700501 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700502}
503
Brian Silverman0465fcf2020-09-24 00:29:18 -0700504void MultiNodeLogNamer::CreateBufferWriter(
Brian Silvermana621f522020-09-30 16:52:43 -0700505 std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700506 if (ran_out_of_space_) {
507 // Refuse to open any new files, which might skip data. Any existing files
508 // are in the same folder, which means they're on the same filesystem, which
509 // means they're probably going to run out of space and get stuck too.
Austin Schuha426f1f2021-03-31 22:27:41 -0700510 if (!destination->get()) {
511 // But avoid leaving a nullptr writer if we're out of space when
512 // attempting to open the first file.
513 *destination = std::make_unique<DetachedBufferWriter>(
514 DetachedBufferWriter::already_out_of_space_t());
515 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700516 return;
517 }
Austin Schuhe715eae2020-10-10 15:39:30 -0700518 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
519 const std::string filename =
520 absl::StrCat(base_name_, separator, path, temp_suffix_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700521 if (!destination->get()) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700522 if (ran_out_of_space_) {
523 *destination = std::make_unique<DetachedBufferWriter>(
524 DetachedBufferWriter::already_out_of_space_t());
525 return;
526 }
Brian Silvermancb805822020-10-06 17:43:35 -0700527 *destination =
528 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700529 if (!destination->get()->ran_out_of_space()) {
530 all_filenames_.emplace_back(path);
531 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700532 return;
533 }
Brian Silvermancb805822020-10-06 17:43:35 -0700534
535 CloseWriter(destination);
536 if (ran_out_of_space_) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700537 *destination->get() =
538 DetachedBufferWriter(DetachedBufferWriter::already_out_of_space_t());
Brian Silverman0465fcf2020-09-24 00:29:18 -0700539 return;
540 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700541
Brian Silvermancb805822020-10-06 17:43:35 -0700542 *destination->get() = DetachedBufferWriter(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700543 if (!destination->get()->ran_out_of_space()) {
544 all_filenames_.emplace_back(path);
545 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700546}
547
Brian Silverman48deab12020-09-30 18:39:28 -0700548void MultiNodeLogNamer::RenameTempFile(DetachedBufferWriter *destination) {
549 if (temp_suffix_.empty()) {
550 return;
551 }
Austin Schuh6bb8a822021-03-31 23:04:39 -0700552 std::string current_filename = std::string(destination->filename());
Brian Silverman48deab12020-09-30 18:39:28 -0700553 CHECK(current_filename.size() > temp_suffix_.size());
Austin Schuh6bb8a822021-03-31 23:04:39 -0700554 std::string final_filename =
Brian Silverman48deab12020-09-30 18:39:28 -0700555 current_filename.substr(0, current_filename.size() - temp_suffix_.size());
Austin Schuh6bb8a822021-03-31 23:04:39 -0700556 int result = rename(current_filename.c_str(), final_filename.c_str());
557
558 // When changing the base name, we rename the log folder while there active
559 // buffer writers. Therefore, the name of that active buffer may still refer
560 // to the old file location rather than the new one. This minimized changes to
561 // existing code.
562 if (result != 0 && errno != ENOSPC && !old_base_name_.empty()) {
563 auto offset = current_filename.find(old_base_name_);
564 if (offset != std::string::npos) {
565 current_filename.replace(offset, old_base_name_.length(), base_name_);
566 }
567 offset = final_filename.find(old_base_name_);
568 if (offset != std::string::npos) {
569 final_filename.replace(offset, old_base_name_.length(), base_name_);
570 }
571 result = rename(current_filename.c_str(), final_filename.c_str());
572 }
573
Brian Silverman48deab12020-09-30 18:39:28 -0700574 if (result != 0) {
575 if (errno == ENOSPC) {
576 ran_out_of_space_ = true;
577 return;
578 } else {
579 PLOG(FATAL) << "Renaming " << current_filename << " to " << final_filename
580 << " failed";
581 }
Austin Schuh6bb8a822021-03-31 23:04:39 -0700582 } else {
583 VLOG(1) << "Renamed " << current_filename << " -> " << final_filename;
Brian Silverman48deab12020-09-30 18:39:28 -0700584 }
585}
586
Brian Silvermancb805822020-10-06 17:43:35 -0700587void MultiNodeLogNamer::CloseWriter(
588 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
589 DetachedBufferWriter *const writer = writer_pointer->get();
590 if (!writer) {
591 return;
592 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700593 const bool was_open = writer->is_open();
Brian Silvermancb805822020-10-06 17:43:35 -0700594 writer->Close();
595
596 if (writer->max_write_time() > max_write_time_) {
597 max_write_time_ = writer->max_write_time();
598 max_write_time_bytes_ = writer->max_write_time_bytes();
599 max_write_time_messages_ = writer->max_write_time_messages();
600 }
601 total_write_time_ += writer->total_write_time();
602 total_write_count_ += writer->total_write_count();
603 total_write_messages_ += writer->total_write_messages();
604 total_write_bytes_ += writer->total_write_bytes();
605
606 if (writer->ran_out_of_space()) {
607 ran_out_of_space_ = true;
608 writer->acknowledge_out_of_space();
609 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700610 if (was_open) {
611 RenameTempFile(writer);
612 } else {
613 CHECK(access(std::string(writer->filename()).c_str(), F_OK) == -1)
614 << ": File should not exist: " << writer->filename();
615 }
Brian Silvermancb805822020-10-06 17:43:35 -0700616}
617
Austin Schuhcb5601b2020-09-10 15:29:59 -0700618} // namespace logger
619} // namespace aos