blob: 9830a61eb9ed20f5f7ba999248b2b7a0f3663809 [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
Austin Schuh73340842021-07-30 22:32:06 -070012#include "aos/flatbuffer_merge.h"
Austin Schuh4385b142021-03-14 21:31:13 -070013#include "aos/uuid.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070014#include "flatbuffers/flatbuffers.h"
15#include "glog/logging.h"
16
17namespace aos {
18namespace logger {
19
Austin Schuh572924a2021-07-30 22:32:12 -070020NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
21 std::function<void(NewDataWriter *)> reopen,
22 std::function<void(NewDataWriter *)> close)
23 : node_(node),
24 node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
25 log_namer_(log_namer),
26 reopen_(std::move(reopen)),
27 close_(std::move(close)) {
Austin Schuh72211ae2021-08-05 14:02:30 -070028 state_.resize(configuration::NodesCount(log_namer->configuration_));
29 CHECK_LT(node_index_, state_.size());
Austin Schuh572924a2021-07-30 22:32:12 -070030}
31
32NewDataWriter::~NewDataWriter() {
33 if (writer) {
34 Close();
35 }
36}
37
38void NewDataWriter::Rotate() {
Austin Schuhe46492f2021-07-31 19:49:41 -070039 // No need to rotate if nothing has been written.
40 if (header_written_) {
41 ++parts_index_;
42 reopen_(this);
43 header_written_ = false;
44 QueueHeader(MakeHeader());
45 }
Austin Schuh572924a2021-07-30 22:32:12 -070046}
47
48void NewDataWriter::Reboot() {
49 parts_uuid_ = UUID::Random();
50 ++parts_index_;
51 reopen_(this);
52 header_written_ = false;
53}
54
Austin Schuh72211ae2021-08-05 14:02:30 -070055void NewDataWriter::UpdateRemote(
56 const size_t remote_node_index, const UUID &remote_node_boot_uuid,
57 const monotonic_clock::time_point monotonic_remote_time,
58 const monotonic_clock::time_point monotonic_event_time,
59 const bool reliable) {
60 bool rotate = false;
61 CHECK_LT(remote_node_index, state_.size());
62 State &state = state_[remote_node_index];
63 if (state.boot_uuid != remote_node_boot_uuid) {
Austin Schuhe46492f2021-07-31 19:49:41 -070064 VLOG(1) << filename() << " Remote " << remote_node_index << " updated to "
Austin Schuh72211ae2021-08-05 14:02:30 -070065 << remote_node_boot_uuid << " from " << state.boot_uuid;
66 state.boot_uuid = remote_node_boot_uuid;
67 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
68 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
69 state.oldest_remote_unreliable_monotonic_timestamp =
70 monotonic_clock::max_time;
71 state.oldest_local_unreliable_monotonic_timestamp =
72 monotonic_clock::max_time;
73 rotate = true;
74 }
75
76 if (!reliable) {
77 if (state.oldest_remote_unreliable_monotonic_timestamp >
78 monotonic_remote_time) {
79 state.oldest_remote_unreliable_monotonic_timestamp =
80 monotonic_remote_time;
81 state.oldest_local_unreliable_monotonic_timestamp = monotonic_event_time;
82 rotate = true;
83 }
84 }
85
86 if (state.oldest_remote_monotonic_timestamp > monotonic_remote_time) {
87 state.oldest_remote_monotonic_timestamp = monotonic_remote_time;
88 state.oldest_local_monotonic_timestamp = monotonic_event_time;
89 rotate = true;
90 }
91
92 if (rotate) {
Austin Schuhe46492f2021-07-31 19:49:41 -070093 Rotate();
94 }
95}
96
97void NewDataWriter::QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
98 const UUID &source_node_boot_uuid,
99 aos::monotonic_clock::time_point now) {
Austin Schuh572924a2021-07-30 22:32:12 -0700100 // TODO(austin): Handle remote nodes changing too, not just the source node.
Austin Schuh72211ae2021-08-05 14:02:30 -0700101 if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
102 state_[node_index_].boot_uuid = source_node_boot_uuid;
Austin Schuh572924a2021-07-30 22:32:12 -0700103 if (header_written_) {
104 Reboot();
105 }
106
Austin Schuhe46492f2021-07-31 19:49:41 -0700107 QueueHeader(MakeHeader());
Austin Schuh572924a2021-07-30 22:32:12 -0700108 }
Austin Schuh72211ae2021-08-05 14:02:30 -0700109 CHECK_EQ(state_[node_index_].boot_uuid, source_node_boot_uuid);
milind-ua50344f2021-08-25 18:22:20 -0700110 CHECK(writer);
Austin Schuh572924a2021-07-30 22:32:12 -0700111 CHECK(header_written_) << ": Attempting to write message before header to "
112 << writer->filename();
113 writer->QueueSizedFlatbuffer(fbb, now);
114}
115
Austin Schuhe46492f2021-07-31 19:49:41 -0700116aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
117NewDataWriter::MakeHeader() {
118 const size_t logger_node_index = log_namer_->logger_node_index();
119 const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
Austin Schuh72211ae2021-08-05 14:02:30 -0700120 if (state_[logger_node_index].boot_uuid == UUID::Zero()) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700121 VLOG(1) << filename() << " Logger node is " << logger_node_index
122 << " and uuid is " << logger_node_boot_uuid;
Austin Schuh72211ae2021-08-05 14:02:30 -0700123 state_[logger_node_index].boot_uuid = logger_node_boot_uuid;
Austin Schuhe46492f2021-07-31 19:49:41 -0700124 } else {
Austin Schuh72211ae2021-08-05 14:02:30 -0700125 CHECK_EQ(state_[logger_node_index].boot_uuid, logger_node_boot_uuid);
Austin Schuhe46492f2021-07-31 19:49:41 -0700126 }
Austin Schuh72211ae2021-08-05 14:02:30 -0700127 return log_namer_->MakeHeader(node_index_, state_, parts_uuid(),
Austin Schuhe46492f2021-07-31 19:49:41 -0700128 parts_index_);
129}
130
Austin Schuh572924a2021-07-30 22:32:12 -0700131void NewDataWriter::QueueHeader(
132 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header) {
133 CHECK(!header_written_) << ": Attempting to write duplicate header to "
134 << writer->filename();
135 CHECK(header.message().has_source_node_boot_uuid());
Austin Schuh72211ae2021-08-05 14:02:30 -0700136 CHECK_EQ(state_[node_index_].boot_uuid,
Austin Schuhe46492f2021-07-31 19:49:41 -0700137 UUID::FromString(header.message().source_node_boot_uuid()));
Austin Schuh510dc622021-08-06 18:47:30 -0700138 if (!writer) {
139 reopen_(this);
140 }
141
Austin Schuh572924a2021-07-30 22:32:12 -0700142 // TODO(austin): This triggers a dummy allocation that we don't need as part
143 // of releasing. Can we skip it?
Austin Schuh510dc622021-08-06 18:47:30 -0700144 CHECK(writer);
Austin Schuh572924a2021-07-30 22:32:12 -0700145 writer->QueueSizedFlatbuffer(header.Release());
146 header_written_ = true;
147}
148
149void NewDataWriter::Close() {
150 CHECK(writer);
151 close_(this);
152 writer.reset();
153 header_written_ = false;
154}
155
Austin Schuh73340842021-07-30 22:32:06 -0700156aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
Austin Schuh72211ae2021-08-05 14:02:30 -0700157 size_t node_index, const std::vector<NewDataWriter::State> &state,
Austin Schuh73340842021-07-30 22:32:06 -0700158 const UUID &parts_uuid, int parts_index) const {
Austin Schuh72211ae2021-08-05 14:02:30 -0700159 const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
Austin Schuh73340842021-07-30 22:32:06 -0700160 const Node *const source_node =
161 configuration::GetNode(configuration_, node_index);
Austin Schuh72211ae2021-08-05 14:02:30 -0700162 CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 24u);
Austin Schuh73340842021-07-30 22:32:06 -0700163 flatbuffers::FlatBufferBuilder fbb;
164 fbb.ForceDefaults(true);
165
166 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
167 flatbuffers::Offset<aos::Configuration> configuration_offset;
168 if (header_.message().has_configuration()) {
169 CHECK(!header_.message().has_configuration_sha256());
170 configuration_offset =
171 CopyFlatBuffer(header_.message().configuration(), &fbb);
172 } else {
173 CHECK(!header_.message().has_configuration());
174 CHECK(header_.message().has_configuration_sha256());
175 config_sha256_offset = fbb.CreateString(
176 header_.message().configuration_sha256()->string_view());
177 }
178
179 CHECK(header_.message().has_name());
180 const flatbuffers::Offset<flatbuffers::String> name_offset =
181 fbb.CreateString(header_.message().name()->string_view());
182
183 CHECK(header_.message().has_log_event_uuid());
184 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
185 fbb.CreateString(header_.message().log_event_uuid()->string_view());
186
187 CHECK(header_.message().has_logger_instance_uuid());
188 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
189 fbb.CreateString(header_.message().logger_instance_uuid()->string_view());
190
191 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
192 if (header_.message().has_log_start_uuid()) {
193 log_start_uuid_offset =
194 fbb.CreateString(header_.message().log_start_uuid()->string_view());
195 }
196
197 CHECK(header_.message().has_logger_node_boot_uuid());
198 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
199 fbb.CreateString(
200 header_.message().logger_node_boot_uuid()->string_view());
201
202 CHECK_NE(source_node_boot_uuid, UUID::Zero());
203 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
204 source_node_boot_uuid.PackString(&fbb);
205
206 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
207 parts_uuid.PackString(&fbb);
208
209 flatbuffers::Offset<Node> node_offset;
210 flatbuffers::Offset<Node> logger_node_offset;
211
212 if (configuration::MultiNode(configuration_)) {
213 node_offset = RecursiveCopyFlatBuffer(source_node, &fbb);
214 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
215 }
216
Austin Schuhe46492f2021-07-31 19:49:41 -0700217 std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
Austin Schuh72211ae2021-08-05 14:02:30 -0700218 boot_uuid_offsets.reserve(state.size());
219 for (const NewDataWriter::State &state : state) {
220 if (state.boot_uuid != UUID::Zero()) {
221 boot_uuid_offsets.emplace_back(state.boot_uuid.PackString(&fbb));
Austin Schuhe46492f2021-07-31 19:49:41 -0700222 } else {
223 boot_uuid_offsets.emplace_back(fbb.CreateString(""));
224 }
225 }
226
227 flatbuffers::Offset<
228 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
229 boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
230
Austin Schuh72211ae2021-08-05 14:02:30 -0700231 int64_t *oldest_remote_monotonic_timestamps;
232 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
233 oldest_remote_monotonic_timestamps_offset = fbb.CreateUninitializedVector(
234 state.size(), &oldest_remote_monotonic_timestamps);
235
236 int64_t *oldest_local_monotonic_timestamps;
237 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
238 oldest_local_monotonic_timestamps_offset = fbb.CreateUninitializedVector(
239 state.size(), &oldest_local_monotonic_timestamps);
240
241 int64_t *oldest_remote_unreliable_monotonic_timestamps;
242 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
243 oldest_remote_unreliable_monotonic_timestamps_offset =
244 fbb.CreateUninitializedVector(
245 state.size(), &oldest_remote_unreliable_monotonic_timestamps);
246
247 int64_t *oldest_local_unreliable_monotonic_timestamps;
248 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
249 oldest_local_unreliable_monotonic_timestamps_offset =
250 fbb.CreateUninitializedVector(
251 state.size(), &oldest_local_unreliable_monotonic_timestamps);
252
253 for (size_t i = 0; i < state.size(); ++i) {
254 oldest_remote_monotonic_timestamps[i] =
255 state[i].oldest_remote_monotonic_timestamp.time_since_epoch().count();
256 oldest_local_monotonic_timestamps[i] =
257 state[i].oldest_local_monotonic_timestamp.time_since_epoch().count();
258 oldest_remote_unreliable_monotonic_timestamps[i] =
259 state[i]
260 .oldest_remote_unreliable_monotonic_timestamp.time_since_epoch()
261 .count();
262 oldest_local_unreliable_monotonic_timestamps[i] =
263 state[i]
264 .oldest_local_unreliable_monotonic_timestamp.time_since_epoch()
265 .count();
266 }
267
Austin Schuh73340842021-07-30 22:32:06 -0700268 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
269
270 log_file_header_builder.add_name(name_offset);
271
272 // Only add the node if we are running in a multinode configuration.
273 if (!logger_node_offset.IsNull()) {
274 log_file_header_builder.add_node(node_offset);
275 log_file_header_builder.add_logger_node(logger_node_offset);
276 }
277
278 if (!configuration_offset.IsNull()) {
279 log_file_header_builder.add_configuration(configuration_offset);
280 }
281 log_file_header_builder.add_max_out_of_order_duration(
282 header_.message().max_out_of_order_duration());
283
284 log_file_header_builder.add_monotonic_start_time(
285 std::chrono::duration_cast<std::chrono::nanoseconds>(
286 node_states_[node_index].monotonic_start_time.time_since_epoch())
287 .count());
288 if (source_node == node_) {
289 log_file_header_builder.add_realtime_start_time(
290 std::chrono::duration_cast<std::chrono::nanoseconds>(
291 node_states_[node_index].realtime_start_time.time_since_epoch())
292 .count());
293 } else {
294 // Fill out the legacy start times. Since these were implemented to never
295 // change on reboot, they aren't very helpful in tracking what happened.
296 log_file_header_builder.add_logger_monotonic_start_time(
297 std::chrono::duration_cast<std::chrono::nanoseconds>(
298 node_states_[node_index]
299 .logger_monotonic_start_time.time_since_epoch())
300 .count());
301 log_file_header_builder.add_logger_realtime_start_time(
302 std::chrono::duration_cast<std::chrono::nanoseconds>(
303 node_states_[node_index]
304 .logger_realtime_start_time.time_since_epoch())
305 .count());
306 }
307
308 // TODO(austin): Add more useful times. When was this part started? What do
309 // we know about both the logger and remote then?
310
311 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
312 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
313 if (!log_start_uuid_offset.IsNull()) {
314 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
315 }
316 log_file_header_builder.add_logger_node_boot_uuid(
317 logger_node_boot_uuid_offset);
318 log_file_header_builder.add_source_node_boot_uuid(
319 source_node_boot_uuid_offset);
320
321 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
322 log_file_header_builder.add_parts_index(parts_index);
323
324 if (!config_sha256_offset.IsNull()) {
325 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
326 }
327
Austin Schuhe46492f2021-07-31 19:49:41 -0700328 log_file_header_builder.add_boot_uuids(boot_uuids_offset);
Austin Schuha499cea2021-07-31 19:49:53 -0700329 log_file_header_builder.add_logger_part_monotonic_start_time(
330 std::chrono::duration_cast<std::chrono::nanoseconds>(
331 event_loop_->monotonic_now().time_since_epoch())
332 .count());
333 log_file_header_builder.add_logger_part_realtime_start_time(
334 std::chrono::duration_cast<std::chrono::nanoseconds>(
335 event_loop_->realtime_now().time_since_epoch())
336 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700337 log_file_header_builder.add_oldest_remote_monotonic_timestamps(
338 oldest_remote_monotonic_timestamps_offset);
339 log_file_header_builder.add_oldest_local_monotonic_timestamps(
340 oldest_local_monotonic_timestamps_offset);
341 log_file_header_builder.add_oldest_remote_unreliable_monotonic_timestamps(
342 oldest_remote_unreliable_monotonic_timestamps_offset);
343 log_file_header_builder.add_oldest_local_unreliable_monotonic_timestamps(
344 oldest_local_unreliable_monotonic_timestamps_offset);
Austin Schuh73340842021-07-30 22:32:06 -0700345 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
346 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
347 fbb.Release());
348
349 CHECK(result.Verify()) << ": Built a corrupted header.";
350
351 return result;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700352}
353
Austin Schuhb8bca732021-07-30 22:32:00 -0700354NewDataWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhdf576472020-10-19 09:39:37 -0700355 CHECK(configuration::ChannelIsSendableOnNode(channel, node()))
356 << ": " << configuration::CleanedChannelToString(channel);
Austin Schuhb8bca732021-07-30 22:32:00 -0700357 return &data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700358}
359
Austin Schuh73340842021-07-30 22:32:06 -0700360void LocalLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700361 CHECK(node == this->node());
Austin Schuhb8bca732021-07-30 22:32:00 -0700362 data_writer_.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700363}
Austin Schuh8c399962020-12-25 21:51:45 -0800364
365void LocalLogNamer::WriteConfiguration(
366 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
367 std::string_view config_sha256) {
368 const std::string filename = absl::StrCat(base_name_, config_sha256, ".bfbs");
369
370 std::unique_ptr<DetachedBufferWriter> writer =
371 std::make_unique<DetachedBufferWriter>(
372 filename, std::make_unique<aos::logger::DummyEncoder>());
373 writer->QueueSizedFlatbuffer(header->Release());
374}
375
Austin Schuhb8bca732021-07-30 22:32:00 -0700376NewDataWriter *LocalLogNamer::MakeTimestampWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700377 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
378 << ": Message is not delivered to this node.";
379 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
380 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
381 node_))
382 << ": Delivery times aren't logged for this channel on this node.";
Austin Schuhb8bca732021-07-30 22:32:00 -0700383 return &data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700384}
385
Austin Schuhb8bca732021-07-30 22:32:00 -0700386NewDataWriter *LocalLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700387 const Channel * /*channel*/, const Node * /*node*/) {
388 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
389 return nullptr;
390}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700391MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
Austin Schuha499cea2021-07-31 19:49:53 -0700392 EventLoop *event_loop)
Austin Schuh5b728b72021-06-16 14:57:15 -0700393 : MultiNodeLogNamer(base_name, event_loop->configuration(), event_loop,
394 event_loop->node()) {}
395
396MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
397 const Configuration *configuration,
398 EventLoop *event_loop, const Node *node)
399 : LogNamer(configuration, event_loop, node),
400 base_name_(base_name),
401 old_base_name_() {}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700402
Brian Silverman48deab12020-09-30 18:39:28 -0700403MultiNodeLogNamer::~MultiNodeLogNamer() {
404 if (!ran_out_of_space_) {
405 // This handles renaming temporary files etc.
406 Close();
407 }
408}
409
Austin Schuh572924a2021-07-30 22:32:12 -0700410void MultiNodeLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700411 if (node == this->node()) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700412 if (data_writer_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700413 data_writer_->Rotate();
Brian Silvermancb805822020-10-06 17:43:35 -0700414 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700415 } else {
Austin Schuhb8bca732021-07-30 22:32:00 -0700416 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Austin Schuhcb5601b2020-09-10 15:29:59 -0700417 data_writers_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700418 if (node == data_writer.second.node()) {
419 data_writer.second.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700420 }
421 }
422 }
423}
424
Austin Schuh8c399962020-12-25 21:51:45 -0800425void MultiNodeLogNamer::WriteConfiguration(
426 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
427 std::string_view config_sha256) {
428 if (ran_out_of_space_) {
429 return;
430 }
431
432 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
433 const std::string filename = absl::StrCat(
434 base_name_, separator, config_sha256, ".bfbs", extension_, temp_suffix_);
435
436 std::unique_ptr<DetachedBufferWriter> writer =
437 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
438
439 writer->QueueSizedFlatbuffer(header->Release());
440
441 if (!writer->ran_out_of_space()) {
Austin Schuh5b728b72021-06-16 14:57:15 -0700442 all_filenames_.emplace_back(
443 absl::StrCat(config_sha256, ".bfbs", extension_));
Austin Schuh8c399962020-12-25 21:51:45 -0800444 }
445 CloseWriter(&writer);
446}
447
Austin Schuhb8bca732021-07-30 22:32:00 -0700448NewDataWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700449 // See if we can read the data on this node at all.
450 const bool is_readable =
451 configuration::ChannelIsReadableOnNode(channel, this->node());
452 if (!is_readable) {
453 return nullptr;
454 }
455
456 // Then, see if we are supposed to log the data here.
457 const bool log_message =
458 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
459
460 if (!log_message) {
461 return nullptr;
462 }
463
464 // Now, sort out if this is data generated on this node, or not. It is
465 // generated if it is sendable on this node.
466 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700467 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700468 OpenDataWriter();
469 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700470 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700471 }
472
473 // Ok, we have data that is being forwarded to us that we are supposed to
474 // log. It needs to be logged with send timestamps, but be sorted enough
475 // to be able to be processed.
476 CHECK(data_writers_.find(channel) == data_writers_.end());
477
478 // Track that this node is being logged.
479 const Node *source_node = configuration::GetNode(
480 configuration_, channel->source_node()->string_view());
481
482 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
483 nodes_.emplace_back(source_node);
484 }
485
Austin Schuh572924a2021-07-30 22:32:12 -0700486 NewDataWriter data_writer(this, source_node,
487 [this, channel](NewDataWriter *data_writer) {
488 OpenWriter(channel, data_writer);
489 },
490 [this](NewDataWriter *data_writer) {
491 CloseWriter(&data_writer->writer);
492 });
Austin Schuhb8bca732021-07-30 22:32:00 -0700493 return &(
494 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700495}
496
Austin Schuhb8bca732021-07-30 22:32:00 -0700497NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700498 const Channel *channel, const Node *node) {
499 // See if we can read the data on this node at all.
500 const bool is_readable =
501 configuration::ChannelIsReadableOnNode(channel, this->node());
502 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
503
504 CHECK(data_writers_.find(channel) == data_writers_.end());
505
506 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
507 nodes_.emplace_back(node);
508 }
509
Austin Schuh5b728b72021-06-16 14:57:15 -0700510 NewDataWriter data_writer(this, configuration::GetNode(configuration_, node),
Austin Schuh572924a2021-07-30 22:32:12 -0700511 [this, channel](NewDataWriter *data_writer) {
512 OpenForwardedTimestampWriter(channel,
513 data_writer);
514 },
515 [this](NewDataWriter *data_writer) {
516 CloseWriter(&data_writer->writer);
517 });
Austin Schuhb8bca732021-07-30 22:32:00 -0700518 return &(
519 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700520}
521
Austin Schuhb8bca732021-07-30 22:32:00 -0700522NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700523 bool log_delivery_times = false;
524 if (this->node() != nullptr) {
525 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
526 channel, this->node(), this->node());
527 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700528 if (!log_delivery_times) {
529 return nullptr;
530 }
531
Austin Schuhb8bca732021-07-30 22:32:00 -0700532 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700533 OpenDataWriter();
534 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700535 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700536}
537
Brian Silverman0465fcf2020-09-24 00:29:18 -0700538void MultiNodeLogNamer::Close() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700539 data_writers_.clear();
540 data_writer_.reset();
Brian Silvermancb805822020-10-06 17:43:35 -0700541}
542
543void MultiNodeLogNamer::ResetStatistics() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700544 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Brian Silvermancb805822020-10-06 17:43:35 -0700545 data_writers_) {
Austin Schuhad0cfc32020-12-21 12:34:26 -0800546 if (!data_writer.second.writer) continue;
Brian Silvermancb805822020-10-06 17:43:35 -0700547 data_writer.second.writer->ResetStatistics();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700548 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700549 if (data_writer_) {
550 data_writer_->writer->ResetStatistics();
Brian Silvermancb805822020-10-06 17:43:35 -0700551 }
552 max_write_time_ = std::chrono::nanoseconds::zero();
553 max_write_time_bytes_ = -1;
554 max_write_time_messages_ = -1;
555 total_write_time_ = std::chrono::nanoseconds::zero();
556 total_write_count_ = 0;
557 total_write_messages_ = 0;
558 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700559}
560
Austin Schuhb8bca732021-07-30 22:32:00 -0700561void MultiNodeLogNamer::OpenForwardedTimestampWriter(
562 const Channel *channel, NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700563 std::string filename =
Austin Schuhe715eae2020-10-10 15:39:30 -0700564 absl::StrCat("timestamps", channel->name()->string_view(), "/",
Brian Silvermana621f522020-09-30 16:52:43 -0700565 channel->type()->string_view(), ".part",
Austin Schuh572924a2021-07-30 22:32:12 -0700566 data_writer->parts_index(), ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700567 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700568}
569
570void MultiNodeLogNamer::OpenWriter(const Channel *channel,
Austin Schuhb8bca732021-07-30 22:32:00 -0700571 NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700572 const std::string filename = absl::StrCat(
Austin Schuhe715eae2020-10-10 15:39:30 -0700573 CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
Brian Silvermana621f522020-09-30 16:52:43 -0700574 channel->name()->string_view(), "/", channel->type()->string_view(),
Austin Schuh572924a2021-07-30 22:32:12 -0700575 ".part", data_writer->parts_index(), ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700576 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700577}
578
Brian Silvermana621f522020-09-30 16:52:43 -0700579void MultiNodeLogNamer::OpenDataWriter() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700580 if (!data_writer_) {
581 data_writer_ = std::make_unique<NewDataWriter>(
Austin Schuh572924a2021-07-30 22:32:12 -0700582 this, node_,
Austin Schuhb8bca732021-07-30 22:32:00 -0700583 [this](NewDataWriter *writer) {
584 std::string name;
585 if (node() != nullptr) {
586 name = absl::StrCat(name, node()->name()->string_view(), "_");
587 }
Austin Schuh572924a2021-07-30 22:32:12 -0700588 absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
Austin Schuhb8bca732021-07-30 22:32:00 -0700589 extension_);
590 CreateBufferWriter(name, &writer->writer);
591 },
592 [this](NewDataWriter *data_writer) {
593 CloseWriter(&data_writer->writer);
594 });
Brian Silverman7af8c902020-09-29 16:14:04 -0700595 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700596}
597
Brian Silverman0465fcf2020-09-24 00:29:18 -0700598void MultiNodeLogNamer::CreateBufferWriter(
Brian Silvermana621f522020-09-30 16:52:43 -0700599 std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700600 if (ran_out_of_space_) {
601 // Refuse to open any new files, which might skip data. Any existing files
602 // are in the same folder, which means they're on the same filesystem, which
603 // means they're probably going to run out of space and get stuck too.
Austin Schuha426f1f2021-03-31 22:27:41 -0700604 if (!destination->get()) {
605 // But avoid leaving a nullptr writer if we're out of space when
606 // attempting to open the first file.
607 *destination = std::make_unique<DetachedBufferWriter>(
608 DetachedBufferWriter::already_out_of_space_t());
609 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700610 return;
611 }
Austin Schuhe715eae2020-10-10 15:39:30 -0700612 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
613 const std::string filename =
614 absl::StrCat(base_name_, separator, path, temp_suffix_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700615 if (!destination->get()) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700616 if (ran_out_of_space_) {
617 *destination = std::make_unique<DetachedBufferWriter>(
618 DetachedBufferWriter::already_out_of_space_t());
619 return;
620 }
Brian Silvermancb805822020-10-06 17:43:35 -0700621 *destination =
622 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700623 if (!destination->get()->ran_out_of_space()) {
624 all_filenames_.emplace_back(path);
625 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700626 return;
627 }
Brian Silvermancb805822020-10-06 17:43:35 -0700628
629 CloseWriter(destination);
630 if (ran_out_of_space_) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700631 *destination->get() =
632 DetachedBufferWriter(DetachedBufferWriter::already_out_of_space_t());
Brian Silverman0465fcf2020-09-24 00:29:18 -0700633 return;
634 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700635
Brian Silvermancb805822020-10-06 17:43:35 -0700636 *destination->get() = DetachedBufferWriter(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700637 if (!destination->get()->ran_out_of_space()) {
638 all_filenames_.emplace_back(path);
639 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700640}
641
Brian Silverman48deab12020-09-30 18:39:28 -0700642void MultiNodeLogNamer::RenameTempFile(DetachedBufferWriter *destination) {
643 if (temp_suffix_.empty()) {
644 return;
645 }
Austin Schuh6bb8a822021-03-31 23:04:39 -0700646 std::string current_filename = std::string(destination->filename());
Brian Silverman48deab12020-09-30 18:39:28 -0700647 CHECK(current_filename.size() > temp_suffix_.size());
Austin Schuh6bb8a822021-03-31 23:04:39 -0700648 std::string final_filename =
Brian Silverman48deab12020-09-30 18:39:28 -0700649 current_filename.substr(0, current_filename.size() - temp_suffix_.size());
Austin Schuh6bb8a822021-03-31 23:04:39 -0700650 int result = rename(current_filename.c_str(), final_filename.c_str());
651
652 // When changing the base name, we rename the log folder while there active
653 // buffer writers. Therefore, the name of that active buffer may still refer
654 // to the old file location rather than the new one. This minimized changes to
655 // existing code.
656 if (result != 0 && errno != ENOSPC && !old_base_name_.empty()) {
657 auto offset = current_filename.find(old_base_name_);
658 if (offset != std::string::npos) {
659 current_filename.replace(offset, old_base_name_.length(), base_name_);
660 }
661 offset = final_filename.find(old_base_name_);
662 if (offset != std::string::npos) {
663 final_filename.replace(offset, old_base_name_.length(), base_name_);
664 }
665 result = rename(current_filename.c_str(), final_filename.c_str());
666 }
667
Brian Silverman48deab12020-09-30 18:39:28 -0700668 if (result != 0) {
669 if (errno == ENOSPC) {
670 ran_out_of_space_ = true;
671 return;
672 } else {
673 PLOG(FATAL) << "Renaming " << current_filename << " to " << final_filename
674 << " failed";
675 }
Austin Schuh6bb8a822021-03-31 23:04:39 -0700676 } else {
677 VLOG(1) << "Renamed " << current_filename << " -> " << final_filename;
Brian Silverman48deab12020-09-30 18:39:28 -0700678 }
679}
680
Brian Silvermancb805822020-10-06 17:43:35 -0700681void MultiNodeLogNamer::CloseWriter(
682 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
683 DetachedBufferWriter *const writer = writer_pointer->get();
684 if (!writer) {
685 return;
686 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700687 const bool was_open = writer->is_open();
Brian Silvermancb805822020-10-06 17:43:35 -0700688 writer->Close();
689
690 if (writer->max_write_time() > max_write_time_) {
691 max_write_time_ = writer->max_write_time();
692 max_write_time_bytes_ = writer->max_write_time_bytes();
693 max_write_time_messages_ = writer->max_write_time_messages();
694 }
695 total_write_time_ += writer->total_write_time();
696 total_write_count_ += writer->total_write_count();
697 total_write_messages_ += writer->total_write_messages();
698 total_write_bytes_ += writer->total_write_bytes();
699
700 if (writer->ran_out_of_space()) {
701 ran_out_of_space_ = true;
702 writer->acknowledge_out_of_space();
703 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700704 if (was_open) {
705 RenameTempFile(writer);
706 } else {
707 CHECK(access(std::string(writer->filename()).c_str(), F_OK) == -1)
708 << ": File should not exist: " << writer->filename();
709 }
Brian Silvermancb805822020-10-06 17:43:35 -0700710}
711
Austin Schuhcb5601b2020-09-10 15:29:59 -0700712} // namespace logger
713} // namespace aos