blob: 93dd02de4e981bc63df9b088e5f83c5ac1098348 [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
Austin Schuh73340842021-07-30 22:32:06 -070012#include "aos/flatbuffer_merge.h"
Austin Schuh4385b142021-03-14 21:31:13 -070013#include "aos/uuid.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070014#include "flatbuffers/flatbuffers.h"
15#include "glog/logging.h"
16
17namespace aos {
18namespace logger {
19
Austin Schuh572924a2021-07-30 22:32:12 -070020NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
21 std::function<void(NewDataWriter *)> reopen,
22 std::function<void(NewDataWriter *)> close)
23 : node_(node),
24 node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
25 log_namer_(log_namer),
26 reopen_(std::move(reopen)),
27 close_(std::move(close)) {
Austin Schuh72211ae2021-08-05 14:02:30 -070028 state_.resize(configuration::NodesCount(log_namer->configuration_));
29 CHECK_LT(node_index_, state_.size());
Austin Schuh572924a2021-07-30 22:32:12 -070030 reopen_(this);
31}
32
33NewDataWriter::~NewDataWriter() {
34 if (writer) {
35 Close();
36 }
37}
38
39void NewDataWriter::Rotate() {
Austin Schuhe46492f2021-07-31 19:49:41 -070040 // No need to rotate if nothing has been written.
41 if (header_written_) {
42 ++parts_index_;
43 reopen_(this);
44 header_written_ = false;
45 QueueHeader(MakeHeader());
46 }
Austin Schuh572924a2021-07-30 22:32:12 -070047}
48
49void NewDataWriter::Reboot() {
50 parts_uuid_ = UUID::Random();
51 ++parts_index_;
52 reopen_(this);
53 header_written_ = false;
54}
55
Austin Schuh72211ae2021-08-05 14:02:30 -070056void NewDataWriter::UpdateRemote(
57 const size_t remote_node_index, const UUID &remote_node_boot_uuid,
58 const monotonic_clock::time_point monotonic_remote_time,
59 const monotonic_clock::time_point monotonic_event_time,
60 const bool reliable) {
61 bool rotate = false;
62 CHECK_LT(remote_node_index, state_.size());
63 State &state = state_[remote_node_index];
64 if (state.boot_uuid != remote_node_boot_uuid) {
Austin Schuhe46492f2021-07-31 19:49:41 -070065 VLOG(1) << filename() << " Remote " << remote_node_index << " updated to "
Austin Schuh72211ae2021-08-05 14:02:30 -070066 << remote_node_boot_uuid << " from " << state.boot_uuid;
67 state.boot_uuid = remote_node_boot_uuid;
68 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
69 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
70 state.oldest_remote_unreliable_monotonic_timestamp =
71 monotonic_clock::max_time;
72 state.oldest_local_unreliable_monotonic_timestamp =
73 monotonic_clock::max_time;
74 rotate = true;
75 }
76
77 if (!reliable) {
78 if (state.oldest_remote_unreliable_monotonic_timestamp >
79 monotonic_remote_time) {
80 state.oldest_remote_unreliable_monotonic_timestamp =
81 monotonic_remote_time;
82 state.oldest_local_unreliable_monotonic_timestamp = monotonic_event_time;
83 rotate = true;
84 }
85 }
86
87 if (state.oldest_remote_monotonic_timestamp > monotonic_remote_time) {
88 state.oldest_remote_monotonic_timestamp = monotonic_remote_time;
89 state.oldest_local_monotonic_timestamp = monotonic_event_time;
90 rotate = true;
91 }
92
93 if (rotate) {
Austin Schuhe46492f2021-07-31 19:49:41 -070094 Rotate();
95 }
96}
97
98void NewDataWriter::QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
99 const UUID &source_node_boot_uuid,
100 aos::monotonic_clock::time_point now) {
Austin Schuh572924a2021-07-30 22:32:12 -0700101 // TODO(austin): Handle remote nodes changing too, not just the source node.
Austin Schuh72211ae2021-08-05 14:02:30 -0700102 if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
103 state_[node_index_].boot_uuid = source_node_boot_uuid;
Austin Schuh572924a2021-07-30 22:32:12 -0700104 if (header_written_) {
105 Reboot();
106 }
107
Austin Schuhe46492f2021-07-31 19:49:41 -0700108 QueueHeader(MakeHeader());
Austin Schuh572924a2021-07-30 22:32:12 -0700109 }
Austin Schuh72211ae2021-08-05 14:02:30 -0700110 CHECK_EQ(state_[node_index_].boot_uuid, source_node_boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -0700111 CHECK(header_written_) << ": Attempting to write message before header to "
112 << writer->filename();
113 writer->QueueSizedFlatbuffer(fbb, now);
114}
115
Austin Schuhe46492f2021-07-31 19:49:41 -0700116aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
117NewDataWriter::MakeHeader() {
118 const size_t logger_node_index = log_namer_->logger_node_index();
119 const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
Austin Schuh72211ae2021-08-05 14:02:30 -0700120 if (state_[logger_node_index].boot_uuid == UUID::Zero()) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700121 VLOG(1) << filename() << " Logger node is " << logger_node_index
122 << " and uuid is " << logger_node_boot_uuid;
Austin Schuh72211ae2021-08-05 14:02:30 -0700123 state_[logger_node_index].boot_uuid = logger_node_boot_uuid;
Austin Schuhe46492f2021-07-31 19:49:41 -0700124 } else {
Austin Schuh72211ae2021-08-05 14:02:30 -0700125 CHECK_EQ(state_[logger_node_index].boot_uuid, logger_node_boot_uuid);
Austin Schuhe46492f2021-07-31 19:49:41 -0700126 }
Austin Schuh72211ae2021-08-05 14:02:30 -0700127 return log_namer_->MakeHeader(node_index_, state_, parts_uuid(),
Austin Schuhe46492f2021-07-31 19:49:41 -0700128 parts_index_);
129}
130
Austin Schuh572924a2021-07-30 22:32:12 -0700131void NewDataWriter::QueueHeader(
132 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header) {
133 CHECK(!header_written_) << ": Attempting to write duplicate header to "
134 << writer->filename();
135 CHECK(header.message().has_source_node_boot_uuid());
Austin Schuh72211ae2021-08-05 14:02:30 -0700136 CHECK_EQ(state_[node_index_].boot_uuid,
Austin Schuhe46492f2021-07-31 19:49:41 -0700137 UUID::FromString(header.message().source_node_boot_uuid()));
Austin Schuh572924a2021-07-30 22:32:12 -0700138 // TODO(austin): This triggers a dummy allocation that we don't need as part
139 // of releasing. Can we skip it?
140 writer->QueueSizedFlatbuffer(header.Release());
141 header_written_ = true;
142}
143
144void NewDataWriter::Close() {
145 CHECK(writer);
146 close_(this);
147 writer.reset();
148 header_written_ = false;
149}
150
Austin Schuh73340842021-07-30 22:32:06 -0700151aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
Austin Schuh72211ae2021-08-05 14:02:30 -0700152 size_t node_index, const std::vector<NewDataWriter::State> &state,
Austin Schuh73340842021-07-30 22:32:06 -0700153 const UUID &parts_uuid, int parts_index) const {
Austin Schuh72211ae2021-08-05 14:02:30 -0700154 const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
Austin Schuh73340842021-07-30 22:32:06 -0700155 const Node *const source_node =
156 configuration::GetNode(configuration_, node_index);
Austin Schuh72211ae2021-08-05 14:02:30 -0700157 CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 24u);
Austin Schuh73340842021-07-30 22:32:06 -0700158 flatbuffers::FlatBufferBuilder fbb;
159 fbb.ForceDefaults(true);
160
161 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
162 flatbuffers::Offset<aos::Configuration> configuration_offset;
163 if (header_.message().has_configuration()) {
164 CHECK(!header_.message().has_configuration_sha256());
165 configuration_offset =
166 CopyFlatBuffer(header_.message().configuration(), &fbb);
167 } else {
168 CHECK(!header_.message().has_configuration());
169 CHECK(header_.message().has_configuration_sha256());
170 config_sha256_offset = fbb.CreateString(
171 header_.message().configuration_sha256()->string_view());
172 }
173
174 CHECK(header_.message().has_name());
175 const flatbuffers::Offset<flatbuffers::String> name_offset =
176 fbb.CreateString(header_.message().name()->string_view());
177
178 CHECK(header_.message().has_log_event_uuid());
179 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
180 fbb.CreateString(header_.message().log_event_uuid()->string_view());
181
182 CHECK(header_.message().has_logger_instance_uuid());
183 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
184 fbb.CreateString(header_.message().logger_instance_uuid()->string_view());
185
186 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
187 if (header_.message().has_log_start_uuid()) {
188 log_start_uuid_offset =
189 fbb.CreateString(header_.message().log_start_uuid()->string_view());
190 }
191
192 CHECK(header_.message().has_logger_node_boot_uuid());
193 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
194 fbb.CreateString(
195 header_.message().logger_node_boot_uuid()->string_view());
196
197 CHECK_NE(source_node_boot_uuid, UUID::Zero());
198 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
199 source_node_boot_uuid.PackString(&fbb);
200
201 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
202 parts_uuid.PackString(&fbb);
203
204 flatbuffers::Offset<Node> node_offset;
205 flatbuffers::Offset<Node> logger_node_offset;
206
207 if (configuration::MultiNode(configuration_)) {
208 node_offset = RecursiveCopyFlatBuffer(source_node, &fbb);
209 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
210 }
211
Austin Schuhe46492f2021-07-31 19:49:41 -0700212 std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
Austin Schuh72211ae2021-08-05 14:02:30 -0700213 boot_uuid_offsets.reserve(state.size());
214 for (const NewDataWriter::State &state : state) {
215 if (state.boot_uuid != UUID::Zero()) {
216 boot_uuid_offsets.emplace_back(state.boot_uuid.PackString(&fbb));
Austin Schuhe46492f2021-07-31 19:49:41 -0700217 } else {
218 boot_uuid_offsets.emplace_back(fbb.CreateString(""));
219 }
220 }
221
222 flatbuffers::Offset<
223 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
224 boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
225
Austin Schuh72211ae2021-08-05 14:02:30 -0700226 int64_t *oldest_remote_monotonic_timestamps;
227 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
228 oldest_remote_monotonic_timestamps_offset = fbb.CreateUninitializedVector(
229 state.size(), &oldest_remote_monotonic_timestamps);
230
231 int64_t *oldest_local_monotonic_timestamps;
232 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
233 oldest_local_monotonic_timestamps_offset = fbb.CreateUninitializedVector(
234 state.size(), &oldest_local_monotonic_timestamps);
235
236 int64_t *oldest_remote_unreliable_monotonic_timestamps;
237 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
238 oldest_remote_unreliable_monotonic_timestamps_offset =
239 fbb.CreateUninitializedVector(
240 state.size(), &oldest_remote_unreliable_monotonic_timestamps);
241
242 int64_t *oldest_local_unreliable_monotonic_timestamps;
243 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
244 oldest_local_unreliable_monotonic_timestamps_offset =
245 fbb.CreateUninitializedVector(
246 state.size(), &oldest_local_unreliable_monotonic_timestamps);
247
248 for (size_t i = 0; i < state.size(); ++i) {
249 oldest_remote_monotonic_timestamps[i] =
250 state[i].oldest_remote_monotonic_timestamp.time_since_epoch().count();
251 oldest_local_monotonic_timestamps[i] =
252 state[i].oldest_local_monotonic_timestamp.time_since_epoch().count();
253 oldest_remote_unreliable_monotonic_timestamps[i] =
254 state[i]
255 .oldest_remote_unreliable_monotonic_timestamp.time_since_epoch()
256 .count();
257 oldest_local_unreliable_monotonic_timestamps[i] =
258 state[i]
259 .oldest_local_unreliable_monotonic_timestamp.time_since_epoch()
260 .count();
261 }
262
Austin Schuh73340842021-07-30 22:32:06 -0700263 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
264
265 log_file_header_builder.add_name(name_offset);
266
267 // Only add the node if we are running in a multinode configuration.
268 if (!logger_node_offset.IsNull()) {
269 log_file_header_builder.add_node(node_offset);
270 log_file_header_builder.add_logger_node(logger_node_offset);
271 }
272
273 if (!configuration_offset.IsNull()) {
274 log_file_header_builder.add_configuration(configuration_offset);
275 }
276 log_file_header_builder.add_max_out_of_order_duration(
277 header_.message().max_out_of_order_duration());
278
279 log_file_header_builder.add_monotonic_start_time(
280 std::chrono::duration_cast<std::chrono::nanoseconds>(
281 node_states_[node_index].monotonic_start_time.time_since_epoch())
282 .count());
283 if (source_node == node_) {
284 log_file_header_builder.add_realtime_start_time(
285 std::chrono::duration_cast<std::chrono::nanoseconds>(
286 node_states_[node_index].realtime_start_time.time_since_epoch())
287 .count());
288 } else {
289 // Fill out the legacy start times. Since these were implemented to never
290 // change on reboot, they aren't very helpful in tracking what happened.
291 log_file_header_builder.add_logger_monotonic_start_time(
292 std::chrono::duration_cast<std::chrono::nanoseconds>(
293 node_states_[node_index]
294 .logger_monotonic_start_time.time_since_epoch())
295 .count());
296 log_file_header_builder.add_logger_realtime_start_time(
297 std::chrono::duration_cast<std::chrono::nanoseconds>(
298 node_states_[node_index]
299 .logger_realtime_start_time.time_since_epoch())
300 .count());
301 }
302
303 // TODO(austin): Add more useful times. When was this part started? What do
304 // we know about both the logger and remote then?
305
306 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
307 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
308 if (!log_start_uuid_offset.IsNull()) {
309 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
310 }
311 log_file_header_builder.add_logger_node_boot_uuid(
312 logger_node_boot_uuid_offset);
313 log_file_header_builder.add_source_node_boot_uuid(
314 source_node_boot_uuid_offset);
315
316 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
317 log_file_header_builder.add_parts_index(parts_index);
318
319 if (!config_sha256_offset.IsNull()) {
320 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
321 }
322
Austin Schuhe46492f2021-07-31 19:49:41 -0700323 log_file_header_builder.add_boot_uuids(boot_uuids_offset);
Austin Schuha499cea2021-07-31 19:49:53 -0700324 log_file_header_builder.add_logger_part_monotonic_start_time(
325 std::chrono::duration_cast<std::chrono::nanoseconds>(
326 event_loop_->monotonic_now().time_since_epoch())
327 .count());
328 log_file_header_builder.add_logger_part_realtime_start_time(
329 std::chrono::duration_cast<std::chrono::nanoseconds>(
330 event_loop_->realtime_now().time_since_epoch())
331 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700332 log_file_header_builder.add_oldest_remote_monotonic_timestamps(
333 oldest_remote_monotonic_timestamps_offset);
334 log_file_header_builder.add_oldest_local_monotonic_timestamps(
335 oldest_local_monotonic_timestamps_offset);
336 log_file_header_builder.add_oldest_remote_unreliable_monotonic_timestamps(
337 oldest_remote_unreliable_monotonic_timestamps_offset);
338 log_file_header_builder.add_oldest_local_unreliable_monotonic_timestamps(
339 oldest_local_unreliable_monotonic_timestamps_offset);
Austin Schuh73340842021-07-30 22:32:06 -0700340 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
341 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
342 fbb.Release());
343
344 CHECK(result.Verify()) << ": Built a corrupted header.";
345
346 return result;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700347}
348
Austin Schuhb8bca732021-07-30 22:32:00 -0700349NewDataWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhdf576472020-10-19 09:39:37 -0700350 CHECK(configuration::ChannelIsSendableOnNode(channel, node()))
351 << ": " << configuration::CleanedChannelToString(channel);
Austin Schuhb8bca732021-07-30 22:32:00 -0700352 return &data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700353}
354
Austin Schuh73340842021-07-30 22:32:06 -0700355void LocalLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700356 CHECK(node == this->node());
Austin Schuhb8bca732021-07-30 22:32:00 -0700357 data_writer_.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700358}
Austin Schuh8c399962020-12-25 21:51:45 -0800359
360void LocalLogNamer::WriteConfiguration(
361 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
362 std::string_view config_sha256) {
363 const std::string filename = absl::StrCat(base_name_, config_sha256, ".bfbs");
364
365 std::unique_ptr<DetachedBufferWriter> writer =
366 std::make_unique<DetachedBufferWriter>(
367 filename, std::make_unique<aos::logger::DummyEncoder>());
368 writer->QueueSizedFlatbuffer(header->Release());
369}
370
Austin Schuhb8bca732021-07-30 22:32:00 -0700371NewDataWriter *LocalLogNamer::MakeTimestampWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700372 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
373 << ": Message is not delivered to this node.";
374 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
375 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
376 node_))
377 << ": Delivery times aren't logged for this channel on this node.";
Austin Schuhb8bca732021-07-30 22:32:00 -0700378 return &data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700379}
380
Austin Schuhb8bca732021-07-30 22:32:00 -0700381NewDataWriter *LocalLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700382 const Channel * /*channel*/, const Node * /*node*/) {
383 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
384 return nullptr;
385}
386
387MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
Austin Schuha499cea2021-07-31 19:49:53 -0700388 EventLoop *event_loop)
389 : LogNamer(event_loop), base_name_(base_name), old_base_name_() {}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700390
Brian Silverman48deab12020-09-30 18:39:28 -0700391MultiNodeLogNamer::~MultiNodeLogNamer() {
392 if (!ran_out_of_space_) {
393 // This handles renaming temporary files etc.
394 Close();
395 }
396}
397
Austin Schuh572924a2021-07-30 22:32:12 -0700398void MultiNodeLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700399 if (node == this->node()) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700400 if (data_writer_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700401 data_writer_->Rotate();
Brian Silvermancb805822020-10-06 17:43:35 -0700402 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700403 } else {
Austin Schuhb8bca732021-07-30 22:32:00 -0700404 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Austin Schuhcb5601b2020-09-10 15:29:59 -0700405 data_writers_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700406 if (node == data_writer.second.node()) {
407 data_writer.second.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700408 }
409 }
410 }
411}
412
Austin Schuh8c399962020-12-25 21:51:45 -0800413void MultiNodeLogNamer::WriteConfiguration(
414 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
415 std::string_view config_sha256) {
416 if (ran_out_of_space_) {
417 return;
418 }
419
420 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
421 const std::string filename = absl::StrCat(
422 base_name_, separator, config_sha256, ".bfbs", extension_, temp_suffix_);
423
424 std::unique_ptr<DetachedBufferWriter> writer =
425 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
426
427 writer->QueueSizedFlatbuffer(header->Release());
428
429 if (!writer->ran_out_of_space()) {
430 all_filenames_.emplace_back(filename);
431 }
432 CloseWriter(&writer);
433}
434
Austin Schuhb8bca732021-07-30 22:32:00 -0700435NewDataWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700436 // See if we can read the data on this node at all.
437 const bool is_readable =
438 configuration::ChannelIsReadableOnNode(channel, this->node());
439 if (!is_readable) {
440 return nullptr;
441 }
442
443 // Then, see if we are supposed to log the data here.
444 const bool log_message =
445 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
446
447 if (!log_message) {
448 return nullptr;
449 }
450
451 // Now, sort out if this is data generated on this node, or not. It is
452 // generated if it is sendable on this node.
453 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700454 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700455 OpenDataWriter();
456 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700457 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700458 }
459
460 // Ok, we have data that is being forwarded to us that we are supposed to
461 // log. It needs to be logged with send timestamps, but be sorted enough
462 // to be able to be processed.
463 CHECK(data_writers_.find(channel) == data_writers_.end());
464
465 // Track that this node is being logged.
466 const Node *source_node = configuration::GetNode(
467 configuration_, channel->source_node()->string_view());
468
469 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
470 nodes_.emplace_back(source_node);
471 }
472
Austin Schuh572924a2021-07-30 22:32:12 -0700473 NewDataWriter data_writer(this, source_node,
474 [this, channel](NewDataWriter *data_writer) {
475 OpenWriter(channel, data_writer);
476 },
477 [this](NewDataWriter *data_writer) {
478 CloseWriter(&data_writer->writer);
479 });
Austin Schuhb8bca732021-07-30 22:32:00 -0700480 return &(
481 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700482}
483
Austin Schuhb8bca732021-07-30 22:32:00 -0700484NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700485 const Channel *channel, const Node *node) {
486 // See if we can read the data on this node at all.
487 const bool is_readable =
488 configuration::ChannelIsReadableOnNode(channel, this->node());
489 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
490
491 CHECK(data_writers_.find(channel) == data_writers_.end());
492
493 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
494 nodes_.emplace_back(node);
495 }
496
Austin Schuh572924a2021-07-30 22:32:12 -0700497 NewDataWriter data_writer(this, node,
498 [this, channel](NewDataWriter *data_writer) {
499 OpenForwardedTimestampWriter(channel,
500 data_writer);
501 },
502 [this](NewDataWriter *data_writer) {
503 CloseWriter(&data_writer->writer);
504 });
Austin Schuhb8bca732021-07-30 22:32:00 -0700505 return &(
506 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700507}
508
Austin Schuhb8bca732021-07-30 22:32:00 -0700509NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700510 bool log_delivery_times = false;
511 if (this->node() != nullptr) {
512 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
513 channel, this->node(), this->node());
514 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700515 if (!log_delivery_times) {
516 return nullptr;
517 }
518
Austin Schuhb8bca732021-07-30 22:32:00 -0700519 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700520 OpenDataWriter();
521 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700522 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700523}
524
Brian Silverman0465fcf2020-09-24 00:29:18 -0700525void MultiNodeLogNamer::Close() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700526 data_writers_.clear();
527 data_writer_.reset();
Brian Silvermancb805822020-10-06 17:43:35 -0700528}
529
530void MultiNodeLogNamer::ResetStatistics() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700531 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Brian Silvermancb805822020-10-06 17:43:35 -0700532 data_writers_) {
Austin Schuhad0cfc32020-12-21 12:34:26 -0800533 if (!data_writer.second.writer) continue;
Brian Silvermancb805822020-10-06 17:43:35 -0700534 data_writer.second.writer->ResetStatistics();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700535 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700536 if (data_writer_) {
537 data_writer_->writer->ResetStatistics();
Brian Silvermancb805822020-10-06 17:43:35 -0700538 }
539 max_write_time_ = std::chrono::nanoseconds::zero();
540 max_write_time_bytes_ = -1;
541 max_write_time_messages_ = -1;
542 total_write_time_ = std::chrono::nanoseconds::zero();
543 total_write_count_ = 0;
544 total_write_messages_ = 0;
545 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700546}
547
Austin Schuhb8bca732021-07-30 22:32:00 -0700548void MultiNodeLogNamer::OpenForwardedTimestampWriter(
549 const Channel *channel, NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700550 std::string filename =
Austin Schuhe715eae2020-10-10 15:39:30 -0700551 absl::StrCat("timestamps", channel->name()->string_view(), "/",
Brian Silvermana621f522020-09-30 16:52:43 -0700552 channel->type()->string_view(), ".part",
Austin Schuh572924a2021-07-30 22:32:12 -0700553 data_writer->parts_index(), ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700554 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700555}
556
557void MultiNodeLogNamer::OpenWriter(const Channel *channel,
Austin Schuhb8bca732021-07-30 22:32:00 -0700558 NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700559 const std::string filename = absl::StrCat(
Austin Schuhe715eae2020-10-10 15:39:30 -0700560 CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
Brian Silvermana621f522020-09-30 16:52:43 -0700561 channel->name()->string_view(), "/", channel->type()->string_view(),
Austin Schuh572924a2021-07-30 22:32:12 -0700562 ".part", data_writer->parts_index(), ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700563 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700564}
565
Brian Silvermana621f522020-09-30 16:52:43 -0700566void MultiNodeLogNamer::OpenDataWriter() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700567 if (!data_writer_) {
568 data_writer_ = std::make_unique<NewDataWriter>(
Austin Schuh572924a2021-07-30 22:32:12 -0700569 this, node_,
Austin Schuhb8bca732021-07-30 22:32:00 -0700570 [this](NewDataWriter *writer) {
571 std::string name;
572 if (node() != nullptr) {
573 name = absl::StrCat(name, node()->name()->string_view(), "_");
574 }
Austin Schuh572924a2021-07-30 22:32:12 -0700575 absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
Austin Schuhb8bca732021-07-30 22:32:00 -0700576 extension_);
577 CreateBufferWriter(name, &writer->writer);
578 },
579 [this](NewDataWriter *data_writer) {
580 CloseWriter(&data_writer->writer);
581 });
Brian Silverman7af8c902020-09-29 16:14:04 -0700582 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700583}
584
Brian Silverman0465fcf2020-09-24 00:29:18 -0700585void MultiNodeLogNamer::CreateBufferWriter(
Brian Silvermana621f522020-09-30 16:52:43 -0700586 std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700587 if (ran_out_of_space_) {
588 // Refuse to open any new files, which might skip data. Any existing files
589 // are in the same folder, which means they're on the same filesystem, which
590 // means they're probably going to run out of space and get stuck too.
Austin Schuha426f1f2021-03-31 22:27:41 -0700591 if (!destination->get()) {
592 // But avoid leaving a nullptr writer if we're out of space when
593 // attempting to open the first file.
594 *destination = std::make_unique<DetachedBufferWriter>(
595 DetachedBufferWriter::already_out_of_space_t());
596 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700597 return;
598 }
Austin Schuhe715eae2020-10-10 15:39:30 -0700599 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
600 const std::string filename =
601 absl::StrCat(base_name_, separator, path, temp_suffix_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700602 if (!destination->get()) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700603 if (ran_out_of_space_) {
604 *destination = std::make_unique<DetachedBufferWriter>(
605 DetachedBufferWriter::already_out_of_space_t());
606 return;
607 }
Brian Silvermancb805822020-10-06 17:43:35 -0700608 *destination =
609 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700610 if (!destination->get()->ran_out_of_space()) {
611 all_filenames_.emplace_back(path);
612 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700613 return;
614 }
Brian Silvermancb805822020-10-06 17:43:35 -0700615
616 CloseWriter(destination);
617 if (ran_out_of_space_) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700618 *destination->get() =
619 DetachedBufferWriter(DetachedBufferWriter::already_out_of_space_t());
Brian Silverman0465fcf2020-09-24 00:29:18 -0700620 return;
621 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700622
Brian Silvermancb805822020-10-06 17:43:35 -0700623 *destination->get() = DetachedBufferWriter(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700624 if (!destination->get()->ran_out_of_space()) {
625 all_filenames_.emplace_back(path);
626 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700627}
628
Brian Silverman48deab12020-09-30 18:39:28 -0700629void MultiNodeLogNamer::RenameTempFile(DetachedBufferWriter *destination) {
630 if (temp_suffix_.empty()) {
631 return;
632 }
Austin Schuh6bb8a822021-03-31 23:04:39 -0700633 std::string current_filename = std::string(destination->filename());
Brian Silverman48deab12020-09-30 18:39:28 -0700634 CHECK(current_filename.size() > temp_suffix_.size());
Austin Schuh6bb8a822021-03-31 23:04:39 -0700635 std::string final_filename =
Brian Silverman48deab12020-09-30 18:39:28 -0700636 current_filename.substr(0, current_filename.size() - temp_suffix_.size());
Austin Schuh6bb8a822021-03-31 23:04:39 -0700637 int result = rename(current_filename.c_str(), final_filename.c_str());
638
639 // When changing the base name, we rename the log folder while there active
640 // buffer writers. Therefore, the name of that active buffer may still refer
641 // to the old file location rather than the new one. This minimized changes to
642 // existing code.
643 if (result != 0 && errno != ENOSPC && !old_base_name_.empty()) {
644 auto offset = current_filename.find(old_base_name_);
645 if (offset != std::string::npos) {
646 current_filename.replace(offset, old_base_name_.length(), base_name_);
647 }
648 offset = final_filename.find(old_base_name_);
649 if (offset != std::string::npos) {
650 final_filename.replace(offset, old_base_name_.length(), base_name_);
651 }
652 result = rename(current_filename.c_str(), final_filename.c_str());
653 }
654
Brian Silverman48deab12020-09-30 18:39:28 -0700655 if (result != 0) {
656 if (errno == ENOSPC) {
657 ran_out_of_space_ = true;
658 return;
659 } else {
660 PLOG(FATAL) << "Renaming " << current_filename << " to " << final_filename
661 << " failed";
662 }
Austin Schuh6bb8a822021-03-31 23:04:39 -0700663 } else {
664 VLOG(1) << "Renamed " << current_filename << " -> " << final_filename;
Brian Silverman48deab12020-09-30 18:39:28 -0700665 }
666}
667
Brian Silvermancb805822020-10-06 17:43:35 -0700668void MultiNodeLogNamer::CloseWriter(
669 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
670 DetachedBufferWriter *const writer = writer_pointer->get();
671 if (!writer) {
672 return;
673 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700674 const bool was_open = writer->is_open();
Brian Silvermancb805822020-10-06 17:43:35 -0700675 writer->Close();
676
677 if (writer->max_write_time() > max_write_time_) {
678 max_write_time_ = writer->max_write_time();
679 max_write_time_bytes_ = writer->max_write_time_bytes();
680 max_write_time_messages_ = writer->max_write_time_messages();
681 }
682 total_write_time_ += writer->total_write_time();
683 total_write_count_ += writer->total_write_count();
684 total_write_messages_ += writer->total_write_messages();
685 total_write_bytes_ += writer->total_write_bytes();
686
687 if (writer->ran_out_of_space()) {
688 ran_out_of_space_ = true;
689 writer->acknowledge_out_of_space();
690 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700691 if (was_open) {
692 RenameTempFile(writer);
693 } else {
694 CHECK(access(std::string(writer->filename()).c_str(), F_OK) == -1)
695 << ": File should not exist: " << writer->filename();
696 }
Brian Silvermancb805822020-10-06 17:43:35 -0700697}
698
Austin Schuhcb5601b2020-09-10 15:29:59 -0700699} // namespace logger
700} // namespace aos