blob: a4b15e843dbe077474e5de95f542db14fabd3b3d [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
Austin Schuh73340842021-07-30 22:32:06 -070012#include "aos/flatbuffer_merge.h"
Austin Schuh4385b142021-03-14 21:31:13 -070013#include "aos/uuid.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070014#include "flatbuffers/flatbuffers.h"
15#include "glog/logging.h"
16
17namespace aos {
18namespace logger {
19
Austin Schuh572924a2021-07-30 22:32:12 -070020NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
21 std::function<void(NewDataWriter *)> reopen,
22 std::function<void(NewDataWriter *)> close)
23 : node_(node),
24 node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
25 log_namer_(log_namer),
26 reopen_(std::move(reopen)),
27 close_(std::move(close)) {
Austin Schuhe46492f2021-07-31 19:49:41 -070028 boot_uuids_.resize(configuration::NodesCount(log_namer->configuration_),
29 UUID::Zero());
30 CHECK_LT(node_index_, boot_uuids_.size());
Austin Schuh572924a2021-07-30 22:32:12 -070031 reopen_(this);
32}
33
34NewDataWriter::~NewDataWriter() {
35 if (writer) {
36 Close();
37 }
38}
39
40void NewDataWriter::Rotate() {
Austin Schuhe46492f2021-07-31 19:49:41 -070041 // No need to rotate if nothing has been written.
42 if (header_written_) {
43 ++parts_index_;
44 reopen_(this);
45 header_written_ = false;
46 QueueHeader(MakeHeader());
47 }
Austin Schuh572924a2021-07-30 22:32:12 -070048}
49
50void NewDataWriter::Reboot() {
51 parts_uuid_ = UUID::Random();
52 ++parts_index_;
53 reopen_(this);
54 header_written_ = false;
55}
56
Austin Schuhe46492f2021-07-31 19:49:41 -070057void NewDataWriter::UpdateRemote(size_t remote_node_index,
58 const UUID &remote_node_boot_uuid) {
59 CHECK_LT(remote_node_index, boot_uuids_.size());
60 if (boot_uuids_[remote_node_index] != remote_node_boot_uuid) {
61 VLOG(1) << filename() << " Remote " << remote_node_index << " updated to "
62 << remote_node_boot_uuid << " from "
63 << boot_uuids_[remote_node_index];
64 boot_uuids_[remote_node_index] = remote_node_boot_uuid;
65 Rotate();
66 }
67}
68
69void NewDataWriter::QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
70 const UUID &source_node_boot_uuid,
71 aos::monotonic_clock::time_point now) {
Austin Schuh572924a2021-07-30 22:32:12 -070072 // TODO(austin): Handle remote nodes changing too, not just the source node.
Austin Schuhe46492f2021-07-31 19:49:41 -070073 if (boot_uuids_[node_index_] != source_node_boot_uuid) {
74 boot_uuids_[node_index_] = source_node_boot_uuid;
Austin Schuh572924a2021-07-30 22:32:12 -070075 if (header_written_) {
76 Reboot();
77 }
78
Austin Schuhe46492f2021-07-31 19:49:41 -070079 QueueHeader(MakeHeader());
Austin Schuh572924a2021-07-30 22:32:12 -070080 }
Austin Schuhe46492f2021-07-31 19:49:41 -070081 CHECK_EQ(boot_uuids_[node_index_], source_node_boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -070082 CHECK(header_written_) << ": Attempting to write message before header to "
83 << writer->filename();
84 writer->QueueSizedFlatbuffer(fbb, now);
85}
86
Austin Schuhe46492f2021-07-31 19:49:41 -070087aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
88NewDataWriter::MakeHeader() {
89 const size_t logger_node_index = log_namer_->logger_node_index();
90 const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
91 if (boot_uuids_[logger_node_index] == UUID::Zero()) {
92 VLOG(1) << filename() << " Logger node is " << logger_node_index
93 << " and uuid is " << logger_node_boot_uuid;
94 boot_uuids_[logger_node_index] = logger_node_boot_uuid;
95 } else {
96 CHECK_EQ(boot_uuids_[logger_node_index], logger_node_boot_uuid);
97 }
98 return log_namer_->MakeHeader(node_index_, boot_uuids_, parts_uuid(),
99 parts_index_);
100}
101
Austin Schuh572924a2021-07-30 22:32:12 -0700102void NewDataWriter::QueueHeader(
103 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header) {
104 CHECK(!header_written_) << ": Attempting to write duplicate header to "
105 << writer->filename();
106 CHECK(header.message().has_source_node_boot_uuid());
Austin Schuhe46492f2021-07-31 19:49:41 -0700107 CHECK_EQ(boot_uuids_[node_index_],
108 UUID::FromString(header.message().source_node_boot_uuid()));
Austin Schuh572924a2021-07-30 22:32:12 -0700109 // TODO(austin): This triggers a dummy allocation that we don't need as part
110 // of releasing. Can we skip it?
111 writer->QueueSizedFlatbuffer(header.Release());
112 header_written_ = true;
113}
114
115void NewDataWriter::Close() {
116 CHECK(writer);
117 close_(this);
118 writer.reset();
119 header_written_ = false;
120}
121
Austin Schuh73340842021-07-30 22:32:06 -0700122aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
Austin Schuhe46492f2021-07-31 19:49:41 -0700123 size_t node_index, const std::vector<UUID> &boot_uuids,
Austin Schuh73340842021-07-30 22:32:06 -0700124 const UUID &parts_uuid, int parts_index) const {
Austin Schuhe46492f2021-07-31 19:49:41 -0700125 const UUID &source_node_boot_uuid = boot_uuids[node_index];
Austin Schuh73340842021-07-30 22:32:06 -0700126 const Node *const source_node =
127 configuration::GetNode(configuration_, node_index);
Austin Schuha499cea2021-07-31 19:49:53 -0700128 CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 20u);
Austin Schuh73340842021-07-30 22:32:06 -0700129 flatbuffers::FlatBufferBuilder fbb;
130 fbb.ForceDefaults(true);
131
132 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
133 flatbuffers::Offset<aos::Configuration> configuration_offset;
134 if (header_.message().has_configuration()) {
135 CHECK(!header_.message().has_configuration_sha256());
136 configuration_offset =
137 CopyFlatBuffer(header_.message().configuration(), &fbb);
138 } else {
139 CHECK(!header_.message().has_configuration());
140 CHECK(header_.message().has_configuration_sha256());
141 config_sha256_offset = fbb.CreateString(
142 header_.message().configuration_sha256()->string_view());
143 }
144
145 CHECK(header_.message().has_name());
146 const flatbuffers::Offset<flatbuffers::String> name_offset =
147 fbb.CreateString(header_.message().name()->string_view());
148
149 CHECK(header_.message().has_log_event_uuid());
150 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
151 fbb.CreateString(header_.message().log_event_uuid()->string_view());
152
153 CHECK(header_.message().has_logger_instance_uuid());
154 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
155 fbb.CreateString(header_.message().logger_instance_uuid()->string_view());
156
157 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
158 if (header_.message().has_log_start_uuid()) {
159 log_start_uuid_offset =
160 fbb.CreateString(header_.message().log_start_uuid()->string_view());
161 }
162
163 CHECK(header_.message().has_logger_node_boot_uuid());
164 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
165 fbb.CreateString(
166 header_.message().logger_node_boot_uuid()->string_view());
167
168 CHECK_NE(source_node_boot_uuid, UUID::Zero());
169 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
170 source_node_boot_uuid.PackString(&fbb);
171
172 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
173 parts_uuid.PackString(&fbb);
174
175 flatbuffers::Offset<Node> node_offset;
176 flatbuffers::Offset<Node> logger_node_offset;
177
178 if (configuration::MultiNode(configuration_)) {
179 node_offset = RecursiveCopyFlatBuffer(source_node, &fbb);
180 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
181 }
182
Austin Schuhe46492f2021-07-31 19:49:41 -0700183 std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
184 boot_uuid_offsets.reserve(boot_uuids.size());
185 for (const UUID &uuid : boot_uuids) {
186 if (uuid != UUID::Zero()) {
187 boot_uuid_offsets.emplace_back(uuid.PackString(&fbb));
188 } else {
189 boot_uuid_offsets.emplace_back(fbb.CreateString(""));
190 }
191 }
192
193 flatbuffers::Offset<
194 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
195 boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
196
Austin Schuh73340842021-07-30 22:32:06 -0700197 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
198
199 log_file_header_builder.add_name(name_offset);
200
201 // Only add the node if we are running in a multinode configuration.
202 if (!logger_node_offset.IsNull()) {
203 log_file_header_builder.add_node(node_offset);
204 log_file_header_builder.add_logger_node(logger_node_offset);
205 }
206
207 if (!configuration_offset.IsNull()) {
208 log_file_header_builder.add_configuration(configuration_offset);
209 }
210 log_file_header_builder.add_max_out_of_order_duration(
211 header_.message().max_out_of_order_duration());
212
213 log_file_header_builder.add_monotonic_start_time(
214 std::chrono::duration_cast<std::chrono::nanoseconds>(
215 node_states_[node_index].monotonic_start_time.time_since_epoch())
216 .count());
217 if (source_node == node_) {
218 log_file_header_builder.add_realtime_start_time(
219 std::chrono::duration_cast<std::chrono::nanoseconds>(
220 node_states_[node_index].realtime_start_time.time_since_epoch())
221 .count());
222 } else {
223 // Fill out the legacy start times. Since these were implemented to never
224 // change on reboot, they aren't very helpful in tracking what happened.
225 log_file_header_builder.add_logger_monotonic_start_time(
226 std::chrono::duration_cast<std::chrono::nanoseconds>(
227 node_states_[node_index]
228 .logger_monotonic_start_time.time_since_epoch())
229 .count());
230 log_file_header_builder.add_logger_realtime_start_time(
231 std::chrono::duration_cast<std::chrono::nanoseconds>(
232 node_states_[node_index]
233 .logger_realtime_start_time.time_since_epoch())
234 .count());
235 }
236
237 // TODO(austin): Add more useful times. When was this part started? What do
238 // we know about both the logger and remote then?
239
240 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
241 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
242 if (!log_start_uuid_offset.IsNull()) {
243 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
244 }
245 log_file_header_builder.add_logger_node_boot_uuid(
246 logger_node_boot_uuid_offset);
247 log_file_header_builder.add_source_node_boot_uuid(
248 source_node_boot_uuid_offset);
249
250 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
251 log_file_header_builder.add_parts_index(parts_index);
252
253 if (!config_sha256_offset.IsNull()) {
254 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
255 }
256
Austin Schuhe46492f2021-07-31 19:49:41 -0700257 log_file_header_builder.add_boot_uuids(boot_uuids_offset);
Austin Schuha499cea2021-07-31 19:49:53 -0700258 log_file_header_builder.add_logger_part_monotonic_start_time(
259 std::chrono::duration_cast<std::chrono::nanoseconds>(
260 event_loop_->monotonic_now().time_since_epoch())
261 .count());
262 log_file_header_builder.add_logger_part_realtime_start_time(
263 std::chrono::duration_cast<std::chrono::nanoseconds>(
264 event_loop_->realtime_now().time_since_epoch())
265 .count());
Austin Schuh73340842021-07-30 22:32:06 -0700266 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
267 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
268 fbb.Release());
269
270 CHECK(result.Verify()) << ": Built a corrupted header.";
271
272 return result;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700273}
274
Austin Schuhb8bca732021-07-30 22:32:00 -0700275NewDataWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhdf576472020-10-19 09:39:37 -0700276 CHECK(configuration::ChannelIsSendableOnNode(channel, node()))
277 << ": " << configuration::CleanedChannelToString(channel);
Austin Schuhb8bca732021-07-30 22:32:00 -0700278 return &data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700279}
280
Austin Schuh73340842021-07-30 22:32:06 -0700281void LocalLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700282 CHECK(node == this->node());
Austin Schuhb8bca732021-07-30 22:32:00 -0700283 data_writer_.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700284}
Austin Schuh8c399962020-12-25 21:51:45 -0800285
286void LocalLogNamer::WriteConfiguration(
287 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
288 std::string_view config_sha256) {
289 const std::string filename = absl::StrCat(base_name_, config_sha256, ".bfbs");
290
291 std::unique_ptr<DetachedBufferWriter> writer =
292 std::make_unique<DetachedBufferWriter>(
293 filename, std::make_unique<aos::logger::DummyEncoder>());
294 writer->QueueSizedFlatbuffer(header->Release());
295}
296
Austin Schuhb8bca732021-07-30 22:32:00 -0700297NewDataWriter *LocalLogNamer::MakeTimestampWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700298 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
299 << ": Message is not delivered to this node.";
300 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
301 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
302 node_))
303 << ": Delivery times aren't logged for this channel on this node.";
Austin Schuhb8bca732021-07-30 22:32:00 -0700304 return &data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700305}
306
Austin Schuhb8bca732021-07-30 22:32:00 -0700307NewDataWriter *LocalLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700308 const Channel * /*channel*/, const Node * /*node*/) {
309 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
310 return nullptr;
311}
312
313MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
Austin Schuha499cea2021-07-31 19:49:53 -0700314 EventLoop *event_loop)
315 : LogNamer(event_loop), base_name_(base_name), old_base_name_() {}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700316
Brian Silverman48deab12020-09-30 18:39:28 -0700317MultiNodeLogNamer::~MultiNodeLogNamer() {
318 if (!ran_out_of_space_) {
319 // This handles renaming temporary files etc.
320 Close();
321 }
322}
323
Austin Schuh572924a2021-07-30 22:32:12 -0700324void MultiNodeLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700325 if (node == this->node()) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700326 if (data_writer_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700327 data_writer_->Rotate();
Brian Silvermancb805822020-10-06 17:43:35 -0700328 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700329 } else {
Austin Schuhb8bca732021-07-30 22:32:00 -0700330 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Austin Schuhcb5601b2020-09-10 15:29:59 -0700331 data_writers_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700332 if (node == data_writer.second.node()) {
333 data_writer.second.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700334 }
335 }
336 }
337}
338
Austin Schuh8c399962020-12-25 21:51:45 -0800339void MultiNodeLogNamer::WriteConfiguration(
340 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
341 std::string_view config_sha256) {
342 if (ran_out_of_space_) {
343 return;
344 }
345
346 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
347 const std::string filename = absl::StrCat(
348 base_name_, separator, config_sha256, ".bfbs", extension_, temp_suffix_);
349
350 std::unique_ptr<DetachedBufferWriter> writer =
351 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
352
353 writer->QueueSizedFlatbuffer(header->Release());
354
355 if (!writer->ran_out_of_space()) {
356 all_filenames_.emplace_back(filename);
357 }
358 CloseWriter(&writer);
359}
360
Austin Schuhb8bca732021-07-30 22:32:00 -0700361NewDataWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700362 // See if we can read the data on this node at all.
363 const bool is_readable =
364 configuration::ChannelIsReadableOnNode(channel, this->node());
365 if (!is_readable) {
366 return nullptr;
367 }
368
369 // Then, see if we are supposed to log the data here.
370 const bool log_message =
371 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
372
373 if (!log_message) {
374 return nullptr;
375 }
376
377 // Now, sort out if this is data generated on this node, or not. It is
378 // generated if it is sendable on this node.
379 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700380 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700381 OpenDataWriter();
382 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700383 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700384 }
385
386 // Ok, we have data that is being forwarded to us that we are supposed to
387 // log. It needs to be logged with send timestamps, but be sorted enough
388 // to be able to be processed.
389 CHECK(data_writers_.find(channel) == data_writers_.end());
390
391 // Track that this node is being logged.
392 const Node *source_node = configuration::GetNode(
393 configuration_, channel->source_node()->string_view());
394
395 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
396 nodes_.emplace_back(source_node);
397 }
398
Austin Schuh572924a2021-07-30 22:32:12 -0700399 NewDataWriter data_writer(this, source_node,
400 [this, channel](NewDataWriter *data_writer) {
401 OpenWriter(channel, data_writer);
402 },
403 [this](NewDataWriter *data_writer) {
404 CloseWriter(&data_writer->writer);
405 });
Austin Schuhb8bca732021-07-30 22:32:00 -0700406 return &(
407 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700408}
409
Austin Schuhb8bca732021-07-30 22:32:00 -0700410NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700411 const Channel *channel, const Node *node) {
412 // See if we can read the data on this node at all.
413 const bool is_readable =
414 configuration::ChannelIsReadableOnNode(channel, this->node());
415 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
416
417 CHECK(data_writers_.find(channel) == data_writers_.end());
418
419 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
420 nodes_.emplace_back(node);
421 }
422
Austin Schuh572924a2021-07-30 22:32:12 -0700423 NewDataWriter data_writer(this, node,
424 [this, channel](NewDataWriter *data_writer) {
425 OpenForwardedTimestampWriter(channel,
426 data_writer);
427 },
428 [this](NewDataWriter *data_writer) {
429 CloseWriter(&data_writer->writer);
430 });
Austin Schuhb8bca732021-07-30 22:32:00 -0700431 return &(
432 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700433}
434
Austin Schuhb8bca732021-07-30 22:32:00 -0700435NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700436 bool log_delivery_times = false;
437 if (this->node() != nullptr) {
438 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
439 channel, this->node(), this->node());
440 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700441 if (!log_delivery_times) {
442 return nullptr;
443 }
444
Austin Schuhb8bca732021-07-30 22:32:00 -0700445 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700446 OpenDataWriter();
447 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700448 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700449}
450
Brian Silverman0465fcf2020-09-24 00:29:18 -0700451void MultiNodeLogNamer::Close() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700452 data_writers_.clear();
453 data_writer_.reset();
Brian Silvermancb805822020-10-06 17:43:35 -0700454}
455
456void MultiNodeLogNamer::ResetStatistics() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700457 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Brian Silvermancb805822020-10-06 17:43:35 -0700458 data_writers_) {
Austin Schuhad0cfc32020-12-21 12:34:26 -0800459 if (!data_writer.second.writer) continue;
Brian Silvermancb805822020-10-06 17:43:35 -0700460 data_writer.second.writer->ResetStatistics();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700461 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700462 if (data_writer_) {
463 data_writer_->writer->ResetStatistics();
Brian Silvermancb805822020-10-06 17:43:35 -0700464 }
465 max_write_time_ = std::chrono::nanoseconds::zero();
466 max_write_time_bytes_ = -1;
467 max_write_time_messages_ = -1;
468 total_write_time_ = std::chrono::nanoseconds::zero();
469 total_write_count_ = 0;
470 total_write_messages_ = 0;
471 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700472}
473
Austin Schuhb8bca732021-07-30 22:32:00 -0700474void MultiNodeLogNamer::OpenForwardedTimestampWriter(
475 const Channel *channel, NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700476 std::string filename =
Austin Schuhe715eae2020-10-10 15:39:30 -0700477 absl::StrCat("timestamps", channel->name()->string_view(), "/",
Brian Silvermana621f522020-09-30 16:52:43 -0700478 channel->type()->string_view(), ".part",
Austin Schuh572924a2021-07-30 22:32:12 -0700479 data_writer->parts_index(), ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700480 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700481}
482
483void MultiNodeLogNamer::OpenWriter(const Channel *channel,
Austin Schuhb8bca732021-07-30 22:32:00 -0700484 NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700485 const std::string filename = absl::StrCat(
Austin Schuhe715eae2020-10-10 15:39:30 -0700486 CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
Brian Silvermana621f522020-09-30 16:52:43 -0700487 channel->name()->string_view(), "/", channel->type()->string_view(),
Austin Schuh572924a2021-07-30 22:32:12 -0700488 ".part", data_writer->parts_index(), ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700489 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700490}
491
Brian Silvermana621f522020-09-30 16:52:43 -0700492void MultiNodeLogNamer::OpenDataWriter() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700493 if (!data_writer_) {
494 data_writer_ = std::make_unique<NewDataWriter>(
Austin Schuh572924a2021-07-30 22:32:12 -0700495 this, node_,
Austin Schuhb8bca732021-07-30 22:32:00 -0700496 [this](NewDataWriter *writer) {
497 std::string name;
498 if (node() != nullptr) {
499 name = absl::StrCat(name, node()->name()->string_view(), "_");
500 }
Austin Schuh572924a2021-07-30 22:32:12 -0700501 absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
Austin Schuhb8bca732021-07-30 22:32:00 -0700502 extension_);
503 CreateBufferWriter(name, &writer->writer);
504 },
505 [this](NewDataWriter *data_writer) {
506 CloseWriter(&data_writer->writer);
507 });
Brian Silverman7af8c902020-09-29 16:14:04 -0700508 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700509}
510
Brian Silverman0465fcf2020-09-24 00:29:18 -0700511void MultiNodeLogNamer::CreateBufferWriter(
Brian Silvermana621f522020-09-30 16:52:43 -0700512 std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700513 if (ran_out_of_space_) {
514 // Refuse to open any new files, which might skip data. Any existing files
515 // are in the same folder, which means they're on the same filesystem, which
516 // means they're probably going to run out of space and get stuck too.
Austin Schuha426f1f2021-03-31 22:27:41 -0700517 if (!destination->get()) {
518 // But avoid leaving a nullptr writer if we're out of space when
519 // attempting to open the first file.
520 *destination = std::make_unique<DetachedBufferWriter>(
521 DetachedBufferWriter::already_out_of_space_t());
522 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700523 return;
524 }
Austin Schuhe715eae2020-10-10 15:39:30 -0700525 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
526 const std::string filename =
527 absl::StrCat(base_name_, separator, path, temp_suffix_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700528 if (!destination->get()) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700529 if (ran_out_of_space_) {
530 *destination = std::make_unique<DetachedBufferWriter>(
531 DetachedBufferWriter::already_out_of_space_t());
532 return;
533 }
Brian Silvermancb805822020-10-06 17:43:35 -0700534 *destination =
535 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700536 if (!destination->get()->ran_out_of_space()) {
537 all_filenames_.emplace_back(path);
538 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700539 return;
540 }
Brian Silvermancb805822020-10-06 17:43:35 -0700541
542 CloseWriter(destination);
543 if (ran_out_of_space_) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700544 *destination->get() =
545 DetachedBufferWriter(DetachedBufferWriter::already_out_of_space_t());
Brian Silverman0465fcf2020-09-24 00:29:18 -0700546 return;
547 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700548
Brian Silvermancb805822020-10-06 17:43:35 -0700549 *destination->get() = DetachedBufferWriter(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700550 if (!destination->get()->ran_out_of_space()) {
551 all_filenames_.emplace_back(path);
552 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700553}
554
Brian Silverman48deab12020-09-30 18:39:28 -0700555void MultiNodeLogNamer::RenameTempFile(DetachedBufferWriter *destination) {
556 if (temp_suffix_.empty()) {
557 return;
558 }
Austin Schuh6bb8a822021-03-31 23:04:39 -0700559 std::string current_filename = std::string(destination->filename());
Brian Silverman48deab12020-09-30 18:39:28 -0700560 CHECK(current_filename.size() > temp_suffix_.size());
Austin Schuh6bb8a822021-03-31 23:04:39 -0700561 std::string final_filename =
Brian Silverman48deab12020-09-30 18:39:28 -0700562 current_filename.substr(0, current_filename.size() - temp_suffix_.size());
Austin Schuh6bb8a822021-03-31 23:04:39 -0700563 int result = rename(current_filename.c_str(), final_filename.c_str());
564
565 // When changing the base name, we rename the log folder while there active
566 // buffer writers. Therefore, the name of that active buffer may still refer
567 // to the old file location rather than the new one. This minimized changes to
568 // existing code.
569 if (result != 0 && errno != ENOSPC && !old_base_name_.empty()) {
570 auto offset = current_filename.find(old_base_name_);
571 if (offset != std::string::npos) {
572 current_filename.replace(offset, old_base_name_.length(), base_name_);
573 }
574 offset = final_filename.find(old_base_name_);
575 if (offset != std::string::npos) {
576 final_filename.replace(offset, old_base_name_.length(), base_name_);
577 }
578 result = rename(current_filename.c_str(), final_filename.c_str());
579 }
580
Brian Silverman48deab12020-09-30 18:39:28 -0700581 if (result != 0) {
582 if (errno == ENOSPC) {
583 ran_out_of_space_ = true;
584 return;
585 } else {
586 PLOG(FATAL) << "Renaming " << current_filename << " to " << final_filename
587 << " failed";
588 }
Austin Schuh6bb8a822021-03-31 23:04:39 -0700589 } else {
590 VLOG(1) << "Renamed " << current_filename << " -> " << final_filename;
Brian Silverman48deab12020-09-30 18:39:28 -0700591 }
592}
593
Brian Silvermancb805822020-10-06 17:43:35 -0700594void MultiNodeLogNamer::CloseWriter(
595 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
596 DetachedBufferWriter *const writer = writer_pointer->get();
597 if (!writer) {
598 return;
599 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700600 const bool was_open = writer->is_open();
Brian Silvermancb805822020-10-06 17:43:35 -0700601 writer->Close();
602
603 if (writer->max_write_time() > max_write_time_) {
604 max_write_time_ = writer->max_write_time();
605 max_write_time_bytes_ = writer->max_write_time_bytes();
606 max_write_time_messages_ = writer->max_write_time_messages();
607 }
608 total_write_time_ += writer->total_write_time();
609 total_write_count_ += writer->total_write_count();
610 total_write_messages_ += writer->total_write_messages();
611 total_write_bytes_ += writer->total_write_bytes();
612
613 if (writer->ran_out_of_space()) {
614 ran_out_of_space_ = true;
615 writer->acknowledge_out_of_space();
616 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700617 if (was_open) {
618 RenameTempFile(writer);
619 } else {
620 CHECK(access(std::string(writer->filename()).c_str(), F_OK) == -1)
621 << ": File should not exist: " << writer->filename();
622 }
Brian Silvermancb805822020-10-06 17:43:35 -0700623}
624
Austin Schuhcb5601b2020-09-10 15:29:59 -0700625} // namespace logger
626} // namespace aos