blob: 9a5d0388e9e0f02b0e9b8198017446201e263170 [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
Austin Schuh73340842021-07-30 22:32:06 -070012#include "aos/flatbuffer_merge.h"
Austin Schuh4385b142021-03-14 21:31:13 -070013#include "aos/uuid.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070014#include "flatbuffers/flatbuffers.h"
15#include "glog/logging.h"
16
17namespace aos {
18namespace logger {
19
Austin Schuh572924a2021-07-30 22:32:12 -070020NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
21 std::function<void(NewDataWriter *)> reopen,
22 std::function<void(NewDataWriter *)> close)
23 : node_(node),
24 node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
25 log_namer_(log_namer),
26 reopen_(std::move(reopen)),
27 close_(std::move(close)) {
Austin Schuh72211ae2021-08-05 14:02:30 -070028 state_.resize(configuration::NodesCount(log_namer->configuration_));
29 CHECK_LT(node_index_, state_.size());
Austin Schuh572924a2021-07-30 22:32:12 -070030}
31
32NewDataWriter::~NewDataWriter() {
33 if (writer) {
34 Close();
35 }
36}
37
38void NewDataWriter::Rotate() {
Austin Schuhe46492f2021-07-31 19:49:41 -070039 // No need to rotate if nothing has been written.
40 if (header_written_) {
Austin Schuh58646e22021-08-23 23:51:46 -070041 VLOG(1) << "Rotated " << filename();
Austin Schuhe46492f2021-07-31 19:49:41 -070042 ++parts_index_;
43 reopen_(this);
44 header_written_ = false;
45 QueueHeader(MakeHeader());
46 }
Austin Schuh572924a2021-07-30 22:32:12 -070047}
48
49void NewDataWriter::Reboot() {
50 parts_uuid_ = UUID::Random();
51 ++parts_index_;
52 reopen_(this);
53 header_written_ = false;
54}
55
Austin Schuh72211ae2021-08-05 14:02:30 -070056void NewDataWriter::UpdateRemote(
57 const size_t remote_node_index, const UUID &remote_node_boot_uuid,
58 const monotonic_clock::time_point monotonic_remote_time,
59 const monotonic_clock::time_point monotonic_event_time,
60 const bool reliable) {
Austin Schuh58646e22021-08-23 23:51:46 -070061 // Trigger rotation if anything in the header changes.
Austin Schuh72211ae2021-08-05 14:02:30 -070062 bool rotate = false;
63 CHECK_LT(remote_node_index, state_.size());
64 State &state = state_[remote_node_index];
Austin Schuh58646e22021-08-23 23:51:46 -070065
66 // Did the remote boot UUID change?
Austin Schuh72211ae2021-08-05 14:02:30 -070067 if (state.boot_uuid != remote_node_boot_uuid) {
Austin Schuhe46492f2021-07-31 19:49:41 -070068 VLOG(1) << filename() << " Remote " << remote_node_index << " updated to "
Austin Schuh72211ae2021-08-05 14:02:30 -070069 << remote_node_boot_uuid << " from " << state.boot_uuid;
70 state.boot_uuid = remote_node_boot_uuid;
71 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
72 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
73 state.oldest_remote_unreliable_monotonic_timestamp =
74 monotonic_clock::max_time;
75 state.oldest_local_unreliable_monotonic_timestamp =
76 monotonic_clock::max_time;
77 rotate = true;
78 }
79
Austin Schuh58646e22021-08-23 23:51:46 -070080
81 // Did the unreliable timestamps change?
Austin Schuh72211ae2021-08-05 14:02:30 -070082 if (!reliable) {
83 if (state.oldest_remote_unreliable_monotonic_timestamp >
84 monotonic_remote_time) {
Austin Schuh58646e22021-08-23 23:51:46 -070085 VLOG(1) << filename() << " Remote " << remote_node_index
86 << " oldest_remote_unreliable_monotonic_timestamp updated from "
87 << state.oldest_remote_unreliable_monotonic_timestamp << " to "
88 << monotonic_remote_time;
Austin Schuh72211ae2021-08-05 14:02:30 -070089 state.oldest_remote_unreliable_monotonic_timestamp =
90 monotonic_remote_time;
91 state.oldest_local_unreliable_monotonic_timestamp = monotonic_event_time;
92 rotate = true;
93 }
94 }
95
Austin Schuh58646e22021-08-23 23:51:46 -070096 // Did any of the timestamps change?
Austin Schuh72211ae2021-08-05 14:02:30 -070097 if (state.oldest_remote_monotonic_timestamp > monotonic_remote_time) {
Austin Schuh58646e22021-08-23 23:51:46 -070098 VLOG(1) << filename() << " Remote " << remote_node_index
99 << " oldest_remote_monotonic_timestamp updated from "
100 << state.oldest_remote_monotonic_timestamp << " to "
101 << monotonic_remote_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700102 state.oldest_remote_monotonic_timestamp = monotonic_remote_time;
103 state.oldest_local_monotonic_timestamp = monotonic_event_time;
104 rotate = true;
105 }
106
107 if (rotate) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700108 Rotate();
109 }
110}
111
112void NewDataWriter::QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
113 const UUID &source_node_boot_uuid,
114 aos::monotonic_clock::time_point now) {
Austin Schuh58646e22021-08-23 23:51:46 -0700115 // Trigger a reboot if we detect the boot UUID change.
Austin Schuh72211ae2021-08-05 14:02:30 -0700116 if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
117 state_[node_index_].boot_uuid = source_node_boot_uuid;
Austin Schuh572924a2021-07-30 22:32:12 -0700118 if (header_written_) {
119 Reboot();
120 }
121
Austin Schuhe46492f2021-07-31 19:49:41 -0700122 QueueHeader(MakeHeader());
Austin Schuh572924a2021-07-30 22:32:12 -0700123 }
Austin Schuh58646e22021-08-23 23:51:46 -0700124
125 // If the start time has changed for this node, trigger a rotation.
126 if (log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid) !=
127 monotonic_start_time_) {
128 CHECK(header_written_);
129 Rotate();
130 }
131
132 CHECK_EQ(log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid),
133 monotonic_start_time_);
Austin Schuh72211ae2021-08-05 14:02:30 -0700134 CHECK_EQ(state_[node_index_].boot_uuid, source_node_boot_uuid);
milind-ua50344f2021-08-25 18:22:20 -0700135 CHECK(writer);
Austin Schuh572924a2021-07-30 22:32:12 -0700136 CHECK(header_written_) << ": Attempting to write message before header to "
137 << writer->filename();
138 writer->QueueSizedFlatbuffer(fbb, now);
139}
140
Austin Schuhe46492f2021-07-31 19:49:41 -0700141aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
142NewDataWriter::MakeHeader() {
143 const size_t logger_node_index = log_namer_->logger_node_index();
144 const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
Austin Schuh72211ae2021-08-05 14:02:30 -0700145 if (state_[logger_node_index].boot_uuid == UUID::Zero()) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700146 VLOG(1) << filename() << " Logger node is " << logger_node_index
147 << " and uuid is " << logger_node_boot_uuid;
Austin Schuh72211ae2021-08-05 14:02:30 -0700148 state_[logger_node_index].boot_uuid = logger_node_boot_uuid;
Austin Schuhe46492f2021-07-31 19:49:41 -0700149 } else {
Austin Schuh72211ae2021-08-05 14:02:30 -0700150 CHECK_EQ(state_[logger_node_index].boot_uuid, logger_node_boot_uuid);
Austin Schuhe46492f2021-07-31 19:49:41 -0700151 }
Austin Schuh72211ae2021-08-05 14:02:30 -0700152 return log_namer_->MakeHeader(node_index_, state_, parts_uuid(),
Austin Schuhe46492f2021-07-31 19:49:41 -0700153 parts_index_);
154}
155
Austin Schuh572924a2021-07-30 22:32:12 -0700156void NewDataWriter::QueueHeader(
157 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header) {
158 CHECK(!header_written_) << ": Attempting to write duplicate header to "
159 << writer->filename();
160 CHECK(header.message().has_source_node_boot_uuid());
Austin Schuh72211ae2021-08-05 14:02:30 -0700161 CHECK_EQ(state_[node_index_].boot_uuid,
Austin Schuhe46492f2021-07-31 19:49:41 -0700162 UUID::FromString(header.message().source_node_boot_uuid()));
Austin Schuh510dc622021-08-06 18:47:30 -0700163 if (!writer) {
164 reopen_(this);
165 }
166
Austin Schuh58646e22021-08-23 23:51:46 -0700167 VLOG(1) << "Writing to " << filename() << " "
168 << aos::FlatbufferToJson(
169 header, {.multi_line = false, .max_vector_size = 100});
170
Austin Schuh572924a2021-07-30 22:32:12 -0700171 // TODO(austin): This triggers a dummy allocation that we don't need as part
172 // of releasing. Can we skip it?
Austin Schuh510dc622021-08-06 18:47:30 -0700173 CHECK(writer);
Austin Schuh572924a2021-07-30 22:32:12 -0700174 writer->QueueSizedFlatbuffer(header.Release());
175 header_written_ = true;
Austin Schuh58646e22021-08-23 23:51:46 -0700176 monotonic_start_time_ = log_namer_->monotonic_start_time(
177 node_index_, state_[node_index_].boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -0700178}
179
180void NewDataWriter::Close() {
181 CHECK(writer);
182 close_(this);
183 writer.reset();
184 header_written_ = false;
185}
186
Austin Schuh58646e22021-08-23 23:51:46 -0700187LogNamer::NodeState *LogNamer::GetNodeState(size_t node_index,
188 const UUID &boot_uuid) {
189 auto it = node_states_.find(std::make_pair(node_index, boot_uuid));
190 if (it == node_states_.end()) {
191 it =
192 node_states_.emplace(std::make_pair(node_index, boot_uuid), NodeState())
193 .first;
194 }
195 return &it->second;
196}
197
Austin Schuh73340842021-07-30 22:32:06 -0700198aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
Austin Schuh72211ae2021-08-05 14:02:30 -0700199 size_t node_index, const std::vector<NewDataWriter::State> &state,
Austin Schuh58646e22021-08-23 23:51:46 -0700200 const UUID &parts_uuid, int parts_index) {
Austin Schuh72211ae2021-08-05 14:02:30 -0700201 const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
Austin Schuh73340842021-07-30 22:32:06 -0700202 const Node *const source_node =
203 configuration::GetNode(configuration_, node_index);
Austin Schuh4db9ec92021-09-22 13:11:12 -0700204 CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 28u);
Austin Schuh73340842021-07-30 22:32:06 -0700205 flatbuffers::FlatBufferBuilder fbb;
206 fbb.ForceDefaults(true);
207
208 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
209 flatbuffers::Offset<aos::Configuration> configuration_offset;
210 if (header_.message().has_configuration()) {
211 CHECK(!header_.message().has_configuration_sha256());
212 configuration_offset =
213 CopyFlatBuffer(header_.message().configuration(), &fbb);
214 } else {
215 CHECK(!header_.message().has_configuration());
216 CHECK(header_.message().has_configuration_sha256());
217 config_sha256_offset = fbb.CreateString(
218 header_.message().configuration_sha256()->string_view());
219 }
220
221 CHECK(header_.message().has_name());
222 const flatbuffers::Offset<flatbuffers::String> name_offset =
223 fbb.CreateString(header_.message().name()->string_view());
224
225 CHECK(header_.message().has_log_event_uuid());
226 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
227 fbb.CreateString(header_.message().log_event_uuid()->string_view());
228
229 CHECK(header_.message().has_logger_instance_uuid());
230 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
231 fbb.CreateString(header_.message().logger_instance_uuid()->string_view());
232
233 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
234 if (header_.message().has_log_start_uuid()) {
235 log_start_uuid_offset =
236 fbb.CreateString(header_.message().log_start_uuid()->string_view());
237 }
238
239 CHECK(header_.message().has_logger_node_boot_uuid());
240 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
241 fbb.CreateString(
242 header_.message().logger_node_boot_uuid()->string_view());
243
244 CHECK_NE(source_node_boot_uuid, UUID::Zero());
245 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
246 source_node_boot_uuid.PackString(&fbb);
247
248 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
249 parts_uuid.PackString(&fbb);
250
251 flatbuffers::Offset<Node> node_offset;
252 flatbuffers::Offset<Node> logger_node_offset;
253
254 if (configuration::MultiNode(configuration_)) {
255 node_offset = RecursiveCopyFlatBuffer(source_node, &fbb);
256 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
257 }
258
Austin Schuhe46492f2021-07-31 19:49:41 -0700259 std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
Austin Schuh72211ae2021-08-05 14:02:30 -0700260 boot_uuid_offsets.reserve(state.size());
Austin Schuhe46492f2021-07-31 19:49:41 -0700261
Austin Schuh4db9ec92021-09-22 13:11:12 -0700262 int64_t *unused;
Austin Schuh72211ae2021-08-05 14:02:30 -0700263 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
264 oldest_remote_monotonic_timestamps_offset = fbb.CreateUninitializedVector(
Austin Schuh4db9ec92021-09-22 13:11:12 -0700265 state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700266
Austin Schuh72211ae2021-08-05 14:02:30 -0700267 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
268 oldest_local_monotonic_timestamps_offset = fbb.CreateUninitializedVector(
Austin Schuh4db9ec92021-09-22 13:11:12 -0700269 state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700270
Austin Schuh72211ae2021-08-05 14:02:30 -0700271 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
272 oldest_remote_unreliable_monotonic_timestamps_offset =
273 fbb.CreateUninitializedVector(
Austin Schuh4db9ec92021-09-22 13:11:12 -0700274 state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700275
Austin Schuh72211ae2021-08-05 14:02:30 -0700276 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
277 oldest_local_unreliable_monotonic_timestamps_offset =
278 fbb.CreateUninitializedVector(
Austin Schuh4db9ec92021-09-22 13:11:12 -0700279 state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700280
281 for (size_t i = 0; i < state.size(); ++i) {
Austin Schuh4db9ec92021-09-22 13:11:12 -0700282 if (state[i].boot_uuid != UUID::Zero()) {
283 boot_uuid_offsets.emplace_back(state[i].boot_uuid.PackString(&fbb));
284 } else {
285 boot_uuid_offsets.emplace_back(fbb.CreateString(""));
286 }
Austin Schuh5ae8f4a2021-09-11 19:09:50 -0700287 if (state[i].boot_uuid == UUID::Zero()) {
288 CHECK_EQ(state[i].oldest_remote_monotonic_timestamp,
289 monotonic_clock::max_time);
290 CHECK_EQ(state[i].oldest_local_monotonic_timestamp,
291 monotonic_clock::max_time);
292 CHECK_EQ(state[i].oldest_remote_unreliable_monotonic_timestamp,
293 monotonic_clock::max_time);
294 CHECK_EQ(state[i].oldest_local_unreliable_monotonic_timestamp,
295 monotonic_clock::max_time);
296 }
297
Austin Schuh4db9ec92021-09-22 13:11:12 -0700298 flatbuffers::GetMutableTemporaryPointer(
299 fbb, oldest_remote_monotonic_timestamps_offset)
300 ->Mutate(i, state[i]
301 .oldest_remote_monotonic_timestamp.time_since_epoch()
302 .count());
303 flatbuffers::GetMutableTemporaryPointer(
304 fbb, oldest_local_monotonic_timestamps_offset)
305 ->Mutate(i, state[i]
306 .oldest_local_monotonic_timestamp.time_since_epoch()
307 .count());
308 flatbuffers::GetMutableTemporaryPointer(
309 fbb, oldest_remote_unreliable_monotonic_timestamps_offset)
310 ->Mutate(i, state[i]
311 .oldest_remote_unreliable_monotonic_timestamp.time_since_epoch()
312 .count());
313 flatbuffers::GetMutableTemporaryPointer(
314 fbb, oldest_local_unreliable_monotonic_timestamps_offset)
315 ->Mutate(i, state[i]
316 .oldest_local_unreliable_monotonic_timestamp.time_since_epoch()
317 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700318 }
319
Austin Schuh4db9ec92021-09-22 13:11:12 -0700320 flatbuffers::Offset<
321 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
322 boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
323
Austin Schuh73340842021-07-30 22:32:06 -0700324 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
325
326 log_file_header_builder.add_name(name_offset);
327
328 // Only add the node if we are running in a multinode configuration.
329 if (!logger_node_offset.IsNull()) {
330 log_file_header_builder.add_node(node_offset);
331 log_file_header_builder.add_logger_node(logger_node_offset);
332 }
333
334 if (!configuration_offset.IsNull()) {
335 log_file_header_builder.add_configuration(configuration_offset);
336 }
337 log_file_header_builder.add_max_out_of_order_duration(
338 header_.message().max_out_of_order_duration());
339
Austin Schuh58646e22021-08-23 23:51:46 -0700340 NodeState *node_state = GetNodeState(node_index, source_node_boot_uuid);
Austin Schuh73340842021-07-30 22:32:06 -0700341 log_file_header_builder.add_monotonic_start_time(
342 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700343 node_state->monotonic_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700344 .count());
345 if (source_node == node_) {
346 log_file_header_builder.add_realtime_start_time(
347 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700348 node_state->realtime_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700349 .count());
350 } else {
351 // Fill out the legacy start times. Since these were implemented to never
352 // change on reboot, they aren't very helpful in tracking what happened.
353 log_file_header_builder.add_logger_monotonic_start_time(
354 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700355 node_state->logger_monotonic_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700356 .count());
357 log_file_header_builder.add_logger_realtime_start_time(
358 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700359 node_state->logger_realtime_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700360 .count());
361 }
362
363 // TODO(austin): Add more useful times. When was this part started? What do
364 // we know about both the logger and remote then?
365
366 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
367 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
368 if (!log_start_uuid_offset.IsNull()) {
369 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
370 }
371 log_file_header_builder.add_logger_node_boot_uuid(
372 logger_node_boot_uuid_offset);
373 log_file_header_builder.add_source_node_boot_uuid(
374 source_node_boot_uuid_offset);
375
376 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
377 log_file_header_builder.add_parts_index(parts_index);
378
379 if (!config_sha256_offset.IsNull()) {
380 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
381 }
382
Austin Schuhe46492f2021-07-31 19:49:41 -0700383 log_file_header_builder.add_boot_uuids(boot_uuids_offset);
Austin Schuha499cea2021-07-31 19:49:53 -0700384 log_file_header_builder.add_logger_part_monotonic_start_time(
385 std::chrono::duration_cast<std::chrono::nanoseconds>(
386 event_loop_->monotonic_now().time_since_epoch())
387 .count());
388 log_file_header_builder.add_logger_part_realtime_start_time(
389 std::chrono::duration_cast<std::chrono::nanoseconds>(
390 event_loop_->realtime_now().time_since_epoch())
391 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700392 log_file_header_builder.add_oldest_remote_monotonic_timestamps(
393 oldest_remote_monotonic_timestamps_offset);
394 log_file_header_builder.add_oldest_local_monotonic_timestamps(
395 oldest_local_monotonic_timestamps_offset);
396 log_file_header_builder.add_oldest_remote_unreliable_monotonic_timestamps(
397 oldest_remote_unreliable_monotonic_timestamps_offset);
398 log_file_header_builder.add_oldest_local_unreliable_monotonic_timestamps(
399 oldest_local_unreliable_monotonic_timestamps_offset);
Austin Schuh73340842021-07-30 22:32:06 -0700400 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
401 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
402 fbb.Release());
403
404 CHECK(result.Verify()) << ": Built a corrupted header.";
405
406 return result;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700407}
408
Austin Schuhb8bca732021-07-30 22:32:00 -0700409NewDataWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhdf576472020-10-19 09:39:37 -0700410 CHECK(configuration::ChannelIsSendableOnNode(channel, node()))
411 << ": " << configuration::CleanedChannelToString(channel);
Austin Schuhb8bca732021-07-30 22:32:00 -0700412 return &data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700413}
414
Austin Schuh73340842021-07-30 22:32:06 -0700415void LocalLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700416 CHECK(node == this->node());
Austin Schuhb8bca732021-07-30 22:32:00 -0700417 data_writer_.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700418}
Austin Schuh8c399962020-12-25 21:51:45 -0800419
420void LocalLogNamer::WriteConfiguration(
421 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
422 std::string_view config_sha256) {
423 const std::string filename = absl::StrCat(base_name_, config_sha256, ".bfbs");
424
425 std::unique_ptr<DetachedBufferWriter> writer =
426 std::make_unique<DetachedBufferWriter>(
427 filename, std::make_unique<aos::logger::DummyEncoder>());
428 writer->QueueSizedFlatbuffer(header->Release());
429}
430
Austin Schuhb8bca732021-07-30 22:32:00 -0700431NewDataWriter *LocalLogNamer::MakeTimestampWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700432 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
433 << ": Message is not delivered to this node.";
434 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
435 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
436 node_))
437 << ": Delivery times aren't logged for this channel on this node.";
Austin Schuhb8bca732021-07-30 22:32:00 -0700438 return &data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700439}
440
Austin Schuhb8bca732021-07-30 22:32:00 -0700441NewDataWriter *LocalLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700442 const Channel * /*channel*/, const Node * /*node*/) {
443 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
444 return nullptr;
445}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700446MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
Austin Schuha499cea2021-07-31 19:49:53 -0700447 EventLoop *event_loop)
Austin Schuh5b728b72021-06-16 14:57:15 -0700448 : MultiNodeLogNamer(base_name, event_loop->configuration(), event_loop,
449 event_loop->node()) {}
450
451MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
452 const Configuration *configuration,
453 EventLoop *event_loop, const Node *node)
454 : LogNamer(configuration, event_loop, node),
455 base_name_(base_name),
456 old_base_name_() {}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700457
Brian Silverman48deab12020-09-30 18:39:28 -0700458MultiNodeLogNamer::~MultiNodeLogNamer() {
459 if (!ran_out_of_space_) {
460 // This handles renaming temporary files etc.
461 Close();
462 }
463}
464
Austin Schuh572924a2021-07-30 22:32:12 -0700465void MultiNodeLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700466 if (node == this->node()) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700467 if (data_writer_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700468 data_writer_->Rotate();
Brian Silvermancb805822020-10-06 17:43:35 -0700469 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700470 } else {
Austin Schuhb8bca732021-07-30 22:32:00 -0700471 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Austin Schuhcb5601b2020-09-10 15:29:59 -0700472 data_writers_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700473 if (node == data_writer.second.node()) {
474 data_writer.second.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700475 }
476 }
477 }
478}
479
Austin Schuh8c399962020-12-25 21:51:45 -0800480void MultiNodeLogNamer::WriteConfiguration(
481 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
482 std::string_view config_sha256) {
483 if (ran_out_of_space_) {
484 return;
485 }
486
487 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
488 const std::string filename = absl::StrCat(
489 base_name_, separator, config_sha256, ".bfbs", extension_, temp_suffix_);
490
491 std::unique_ptr<DetachedBufferWriter> writer =
492 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
493
494 writer->QueueSizedFlatbuffer(header->Release());
495
496 if (!writer->ran_out_of_space()) {
Austin Schuh5b728b72021-06-16 14:57:15 -0700497 all_filenames_.emplace_back(
498 absl::StrCat(config_sha256, ".bfbs", extension_));
Austin Schuh8c399962020-12-25 21:51:45 -0800499 }
500 CloseWriter(&writer);
501}
502
Austin Schuhb8bca732021-07-30 22:32:00 -0700503NewDataWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700504 // See if we can read the data on this node at all.
505 const bool is_readable =
506 configuration::ChannelIsReadableOnNode(channel, this->node());
507 if (!is_readable) {
508 return nullptr;
509 }
510
511 // Then, see if we are supposed to log the data here.
512 const bool log_message =
513 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
514
515 if (!log_message) {
516 return nullptr;
517 }
518
519 // Now, sort out if this is data generated on this node, or not. It is
520 // generated if it is sendable on this node.
521 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700522 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700523 OpenDataWriter();
524 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700525 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700526 }
527
528 // Ok, we have data that is being forwarded to us that we are supposed to
529 // log. It needs to be logged with send timestamps, but be sorted enough
530 // to be able to be processed.
531 CHECK(data_writers_.find(channel) == data_writers_.end());
532
533 // Track that this node is being logged.
534 const Node *source_node = configuration::GetNode(
535 configuration_, channel->source_node()->string_view());
536
537 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
538 nodes_.emplace_back(source_node);
539 }
540
Austin Schuh572924a2021-07-30 22:32:12 -0700541 NewDataWriter data_writer(this, source_node,
542 [this, channel](NewDataWriter *data_writer) {
543 OpenWriter(channel, data_writer);
544 },
545 [this](NewDataWriter *data_writer) {
546 CloseWriter(&data_writer->writer);
547 });
Austin Schuhb8bca732021-07-30 22:32:00 -0700548 return &(
549 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700550}
551
Austin Schuhb8bca732021-07-30 22:32:00 -0700552NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700553 const Channel *channel, const Node *node) {
554 // See if we can read the data on this node at all.
555 const bool is_readable =
556 configuration::ChannelIsReadableOnNode(channel, this->node());
557 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
558
559 CHECK(data_writers_.find(channel) == data_writers_.end());
560
561 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
562 nodes_.emplace_back(node);
563 }
564
Austin Schuh5b728b72021-06-16 14:57:15 -0700565 NewDataWriter data_writer(this, configuration::GetNode(configuration_, node),
Austin Schuh572924a2021-07-30 22:32:12 -0700566 [this, channel](NewDataWriter *data_writer) {
567 OpenForwardedTimestampWriter(channel,
568 data_writer);
569 },
570 [this](NewDataWriter *data_writer) {
571 CloseWriter(&data_writer->writer);
572 });
Austin Schuhb8bca732021-07-30 22:32:00 -0700573 return &(
574 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700575}
576
Austin Schuhb8bca732021-07-30 22:32:00 -0700577NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700578 bool log_delivery_times = false;
579 if (this->node() != nullptr) {
580 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
581 channel, this->node(), this->node());
582 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700583 if (!log_delivery_times) {
584 return nullptr;
585 }
586
Austin Schuhb8bca732021-07-30 22:32:00 -0700587 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700588 OpenDataWriter();
589 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700590 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700591}
592
Brian Silverman0465fcf2020-09-24 00:29:18 -0700593void MultiNodeLogNamer::Close() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700594 data_writers_.clear();
595 data_writer_.reset();
Brian Silvermancb805822020-10-06 17:43:35 -0700596}
597
598void MultiNodeLogNamer::ResetStatistics() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700599 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Brian Silvermancb805822020-10-06 17:43:35 -0700600 data_writers_) {
Austin Schuhad0cfc32020-12-21 12:34:26 -0800601 if (!data_writer.second.writer) continue;
Brian Silvermancb805822020-10-06 17:43:35 -0700602 data_writer.second.writer->ResetStatistics();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700603 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700604 if (data_writer_) {
605 data_writer_->writer->ResetStatistics();
Brian Silvermancb805822020-10-06 17:43:35 -0700606 }
607 max_write_time_ = std::chrono::nanoseconds::zero();
608 max_write_time_bytes_ = -1;
609 max_write_time_messages_ = -1;
610 total_write_time_ = std::chrono::nanoseconds::zero();
611 total_write_count_ = 0;
612 total_write_messages_ = 0;
613 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700614}
615
Austin Schuhb8bca732021-07-30 22:32:00 -0700616void MultiNodeLogNamer::OpenForwardedTimestampWriter(
617 const Channel *channel, NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700618 std::string filename =
Austin Schuhe715eae2020-10-10 15:39:30 -0700619 absl::StrCat("timestamps", channel->name()->string_view(), "/",
Brian Silvermana621f522020-09-30 16:52:43 -0700620 channel->type()->string_view(), ".part",
Austin Schuh572924a2021-07-30 22:32:12 -0700621 data_writer->parts_index(), ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700622 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700623}
624
625void MultiNodeLogNamer::OpenWriter(const Channel *channel,
Austin Schuhb8bca732021-07-30 22:32:00 -0700626 NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700627 const std::string filename = absl::StrCat(
Austin Schuhe715eae2020-10-10 15:39:30 -0700628 CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
Brian Silvermana621f522020-09-30 16:52:43 -0700629 channel->name()->string_view(), "/", channel->type()->string_view(),
Austin Schuh572924a2021-07-30 22:32:12 -0700630 ".part", data_writer->parts_index(), ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700631 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700632}
633
Brian Silvermana621f522020-09-30 16:52:43 -0700634void MultiNodeLogNamer::OpenDataWriter() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700635 if (!data_writer_) {
636 data_writer_ = std::make_unique<NewDataWriter>(
Austin Schuh572924a2021-07-30 22:32:12 -0700637 this, node_,
Austin Schuhb8bca732021-07-30 22:32:00 -0700638 [this](NewDataWriter *writer) {
639 std::string name;
640 if (node() != nullptr) {
641 name = absl::StrCat(name, node()->name()->string_view(), "_");
642 }
Austin Schuh572924a2021-07-30 22:32:12 -0700643 absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
Austin Schuhb8bca732021-07-30 22:32:00 -0700644 extension_);
645 CreateBufferWriter(name, &writer->writer);
646 },
647 [this](NewDataWriter *data_writer) {
648 CloseWriter(&data_writer->writer);
649 });
Brian Silverman7af8c902020-09-29 16:14:04 -0700650 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700651}
652
Brian Silverman0465fcf2020-09-24 00:29:18 -0700653void MultiNodeLogNamer::CreateBufferWriter(
Brian Silvermana621f522020-09-30 16:52:43 -0700654 std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700655 if (ran_out_of_space_) {
656 // Refuse to open any new files, which might skip data. Any existing files
657 // are in the same folder, which means they're on the same filesystem, which
658 // means they're probably going to run out of space and get stuck too.
Austin Schuha426f1f2021-03-31 22:27:41 -0700659 if (!destination->get()) {
660 // But avoid leaving a nullptr writer if we're out of space when
661 // attempting to open the first file.
662 *destination = std::make_unique<DetachedBufferWriter>(
663 DetachedBufferWriter::already_out_of_space_t());
664 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700665 return;
666 }
Austin Schuhe715eae2020-10-10 15:39:30 -0700667 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
668 const std::string filename =
669 absl::StrCat(base_name_, separator, path, temp_suffix_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700670 if (!destination->get()) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700671 if (ran_out_of_space_) {
672 *destination = std::make_unique<DetachedBufferWriter>(
673 DetachedBufferWriter::already_out_of_space_t());
674 return;
675 }
Brian Silvermancb805822020-10-06 17:43:35 -0700676 *destination =
677 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700678 if (!destination->get()->ran_out_of_space()) {
679 all_filenames_.emplace_back(path);
680 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700681 return;
682 }
Brian Silvermancb805822020-10-06 17:43:35 -0700683
684 CloseWriter(destination);
685 if (ran_out_of_space_) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700686 *destination->get() =
687 DetachedBufferWriter(DetachedBufferWriter::already_out_of_space_t());
Brian Silverman0465fcf2020-09-24 00:29:18 -0700688 return;
689 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700690
Brian Silvermancb805822020-10-06 17:43:35 -0700691 *destination->get() = DetachedBufferWriter(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700692 if (!destination->get()->ran_out_of_space()) {
693 all_filenames_.emplace_back(path);
694 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700695}
696
Brian Silverman48deab12020-09-30 18:39:28 -0700697void MultiNodeLogNamer::RenameTempFile(DetachedBufferWriter *destination) {
698 if (temp_suffix_.empty()) {
699 return;
700 }
Austin Schuh6bb8a822021-03-31 23:04:39 -0700701 std::string current_filename = std::string(destination->filename());
Brian Silverman48deab12020-09-30 18:39:28 -0700702 CHECK(current_filename.size() > temp_suffix_.size());
Austin Schuh6bb8a822021-03-31 23:04:39 -0700703 std::string final_filename =
Brian Silverman48deab12020-09-30 18:39:28 -0700704 current_filename.substr(0, current_filename.size() - temp_suffix_.size());
Austin Schuh6bb8a822021-03-31 23:04:39 -0700705 int result = rename(current_filename.c_str(), final_filename.c_str());
706
707 // When changing the base name, we rename the log folder while there active
708 // buffer writers. Therefore, the name of that active buffer may still refer
709 // to the old file location rather than the new one. This minimized changes to
710 // existing code.
711 if (result != 0 && errno != ENOSPC && !old_base_name_.empty()) {
712 auto offset = current_filename.find(old_base_name_);
713 if (offset != std::string::npos) {
714 current_filename.replace(offset, old_base_name_.length(), base_name_);
715 }
716 offset = final_filename.find(old_base_name_);
717 if (offset != std::string::npos) {
718 final_filename.replace(offset, old_base_name_.length(), base_name_);
719 }
720 result = rename(current_filename.c_str(), final_filename.c_str());
721 }
722
Brian Silverman48deab12020-09-30 18:39:28 -0700723 if (result != 0) {
724 if (errno == ENOSPC) {
725 ran_out_of_space_ = true;
726 return;
727 } else {
728 PLOG(FATAL) << "Renaming " << current_filename << " to " << final_filename
729 << " failed";
730 }
Austin Schuh6bb8a822021-03-31 23:04:39 -0700731 } else {
732 VLOG(1) << "Renamed " << current_filename << " -> " << final_filename;
Brian Silverman48deab12020-09-30 18:39:28 -0700733 }
734}
735
Brian Silvermancb805822020-10-06 17:43:35 -0700736void MultiNodeLogNamer::CloseWriter(
737 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
738 DetachedBufferWriter *const writer = writer_pointer->get();
739 if (!writer) {
740 return;
741 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700742 const bool was_open = writer->is_open();
Brian Silvermancb805822020-10-06 17:43:35 -0700743 writer->Close();
744
745 if (writer->max_write_time() > max_write_time_) {
746 max_write_time_ = writer->max_write_time();
747 max_write_time_bytes_ = writer->max_write_time_bytes();
748 max_write_time_messages_ = writer->max_write_time_messages();
749 }
750 total_write_time_ += writer->total_write_time();
751 total_write_count_ += writer->total_write_count();
752 total_write_messages_ += writer->total_write_messages();
753 total_write_bytes_ += writer->total_write_bytes();
754
755 if (writer->ran_out_of_space()) {
756 ran_out_of_space_ = true;
757 writer->acknowledge_out_of_space();
758 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700759 if (was_open) {
760 RenameTempFile(writer);
761 } else {
762 CHECK(access(std::string(writer->filename()).c_str(), F_OK) == -1)
763 << ": File should not exist: " << writer->filename();
764 }
Brian Silvermancb805822020-10-06 17:43:35 -0700765}
766
Austin Schuhcb5601b2020-09-10 15:29:59 -0700767} // namespace logger
768} // namespace aos