blob: b41212cbbf2a633f9e3bd67d3c9fa20b4838fbd5 [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
10#include "aos/events/logging/logfile_utils.h"
11#include "aos/events/logging/logger_generated.h"
Austin Schuh73340842021-07-30 22:32:06 -070012#include "aos/flatbuffer_merge.h"
Austin Schuh4385b142021-03-14 21:31:13 -070013#include "aos/uuid.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070014#include "flatbuffers/flatbuffers.h"
15#include "glog/logging.h"
16
17namespace aos {
18namespace logger {
19
Austin Schuh572924a2021-07-30 22:32:12 -070020NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
21 std::function<void(NewDataWriter *)> reopen,
22 std::function<void(NewDataWriter *)> close)
23 : node_(node),
24 node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
25 log_namer_(log_namer),
26 reopen_(std::move(reopen)),
27 close_(std::move(close)) {
Austin Schuh72211ae2021-08-05 14:02:30 -070028 state_.resize(configuration::NodesCount(log_namer->configuration_));
29 CHECK_LT(node_index_, state_.size());
Austin Schuh572924a2021-07-30 22:32:12 -070030}
31
32NewDataWriter::~NewDataWriter() {
33 if (writer) {
34 Close();
35 }
36}
37
38void NewDataWriter::Rotate() {
Austin Schuhe46492f2021-07-31 19:49:41 -070039 // No need to rotate if nothing has been written.
40 if (header_written_) {
Austin Schuh58646e22021-08-23 23:51:46 -070041 VLOG(1) << "Rotated " << filename();
Austin Schuhe46492f2021-07-31 19:49:41 -070042 ++parts_index_;
43 reopen_(this);
44 header_written_ = false;
45 QueueHeader(MakeHeader());
46 }
Austin Schuh572924a2021-07-30 22:32:12 -070047}
48
Austin Schuh5e14d842022-01-21 12:02:15 -080049void NewDataWriter::Reboot(const UUID &source_node_boot_uuid) {
Austin Schuh572924a2021-07-30 22:32:12 -070050 parts_uuid_ = UUID::Random();
51 ++parts_index_;
52 reopen_(this);
53 header_written_ = false;
Austin Schuh5e14d842022-01-21 12:02:15 -080054 for (State &state : state_) {
55 state.boot_uuid = UUID::Zero();
56 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
57 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
58 state.oldest_remote_unreliable_monotonic_timestamp =
59 monotonic_clock::max_time;
60 state.oldest_local_unreliable_monotonic_timestamp =
61 monotonic_clock::max_time;
62 }
63
64 state_[node_index_].boot_uuid = source_node_boot_uuid;
65
66 VLOG(1) << "Rebooted " << filename();
67}
68
69void NewDataWriter::UpdateBoot(const UUID &source_node_boot_uuid) {
70 if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
71 state_[node_index_].boot_uuid = source_node_boot_uuid;
72 if (header_written_) {
73 Reboot(source_node_boot_uuid);
74 }
75 }
Austin Schuh572924a2021-07-30 22:32:12 -070076}
77
Austin Schuh72211ae2021-08-05 14:02:30 -070078void NewDataWriter::UpdateRemote(
79 const size_t remote_node_index, const UUID &remote_node_boot_uuid,
80 const monotonic_clock::time_point monotonic_remote_time,
81 const monotonic_clock::time_point monotonic_event_time,
82 const bool reliable) {
Austin Schuh58646e22021-08-23 23:51:46 -070083 // Trigger rotation if anything in the header changes.
Austin Schuh72211ae2021-08-05 14:02:30 -070084 bool rotate = false;
85 CHECK_LT(remote_node_index, state_.size());
86 State &state = state_[remote_node_index];
Austin Schuh58646e22021-08-23 23:51:46 -070087
88 // Did the remote boot UUID change?
Austin Schuh72211ae2021-08-05 14:02:30 -070089 if (state.boot_uuid != remote_node_boot_uuid) {
Austin Schuhe46492f2021-07-31 19:49:41 -070090 VLOG(1) << filename() << " Remote " << remote_node_index << " updated to "
Austin Schuh72211ae2021-08-05 14:02:30 -070091 << remote_node_boot_uuid << " from " << state.boot_uuid;
92 state.boot_uuid = remote_node_boot_uuid;
93 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
94 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
95 state.oldest_remote_unreliable_monotonic_timestamp =
96 monotonic_clock::max_time;
97 state.oldest_local_unreliable_monotonic_timestamp =
98 monotonic_clock::max_time;
99 rotate = true;
100 }
101
Austin Schuh58646e22021-08-23 23:51:46 -0700102 // Did the unreliable timestamps change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700103 if (!reliable) {
104 if (state.oldest_remote_unreliable_monotonic_timestamp >
105 monotonic_remote_time) {
Austin Schuh58646e22021-08-23 23:51:46 -0700106 VLOG(1) << filename() << " Remote " << remote_node_index
107 << " oldest_remote_unreliable_monotonic_timestamp updated from "
108 << state.oldest_remote_unreliable_monotonic_timestamp << " to "
109 << monotonic_remote_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700110 state.oldest_remote_unreliable_monotonic_timestamp =
111 monotonic_remote_time;
112 state.oldest_local_unreliable_monotonic_timestamp = monotonic_event_time;
113 rotate = true;
114 }
115 }
116
Austin Schuh58646e22021-08-23 23:51:46 -0700117 // Did any of the timestamps change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700118 if (state.oldest_remote_monotonic_timestamp > monotonic_remote_time) {
Austin Schuh58646e22021-08-23 23:51:46 -0700119 VLOG(1) << filename() << " Remote " << remote_node_index
120 << " oldest_remote_monotonic_timestamp updated from "
121 << state.oldest_remote_monotonic_timestamp << " to "
122 << monotonic_remote_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700123 state.oldest_remote_monotonic_timestamp = monotonic_remote_time;
124 state.oldest_local_monotonic_timestamp = monotonic_event_time;
125 rotate = true;
126 }
127
128 if (rotate) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700129 Rotate();
130 }
131}
132
133void NewDataWriter::QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
134 const UUID &source_node_boot_uuid,
135 aos::monotonic_clock::time_point now) {
Austin Schuh58646e22021-08-23 23:51:46 -0700136 // Trigger a reboot if we detect the boot UUID change.
Austin Schuh5e14d842022-01-21 12:02:15 -0800137 UpdateBoot(source_node_boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -0700138
Austin Schuh5e14d842022-01-21 12:02:15 -0800139 if (!header_written_) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700140 QueueHeader(MakeHeader());
Austin Schuh572924a2021-07-30 22:32:12 -0700141 }
Austin Schuh58646e22021-08-23 23:51:46 -0700142
143 // If the start time has changed for this node, trigger a rotation.
144 if (log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid) !=
Austin Schuh5e14d842022-01-21 12:02:15 -0800145 monotonic_start_time_) {
Austin Schuh58646e22021-08-23 23:51:46 -0700146 CHECK(header_written_);
147 Rotate();
148 }
149
150 CHECK_EQ(log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid),
151 monotonic_start_time_);
Austin Schuh72211ae2021-08-05 14:02:30 -0700152 CHECK_EQ(state_[node_index_].boot_uuid, source_node_boot_uuid);
milind-ua50344f2021-08-25 18:22:20 -0700153 CHECK(writer);
Austin Schuh572924a2021-07-30 22:32:12 -0700154 CHECK(header_written_) << ": Attempting to write message before header to "
155 << writer->filename();
156 writer->QueueSizedFlatbuffer(fbb, now);
157}
158
Austin Schuhe46492f2021-07-31 19:49:41 -0700159aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
160NewDataWriter::MakeHeader() {
161 const size_t logger_node_index = log_namer_->logger_node_index();
162 const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
Austin Schuh72211ae2021-08-05 14:02:30 -0700163 if (state_[logger_node_index].boot_uuid == UUID::Zero()) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700164 VLOG(1) << filename() << " Logger node is " << logger_node_index
165 << " and uuid is " << logger_node_boot_uuid;
Austin Schuh72211ae2021-08-05 14:02:30 -0700166 state_[logger_node_index].boot_uuid = logger_node_boot_uuid;
Austin Schuhe46492f2021-07-31 19:49:41 -0700167 } else {
Austin Schuh72211ae2021-08-05 14:02:30 -0700168 CHECK_EQ(state_[logger_node_index].boot_uuid, logger_node_boot_uuid);
Austin Schuhe46492f2021-07-31 19:49:41 -0700169 }
Austin Schuh72211ae2021-08-05 14:02:30 -0700170 return log_namer_->MakeHeader(node_index_, state_, parts_uuid(),
Austin Schuhe46492f2021-07-31 19:49:41 -0700171 parts_index_);
172}
173
Austin Schuh572924a2021-07-30 22:32:12 -0700174void NewDataWriter::QueueHeader(
175 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header) {
176 CHECK(!header_written_) << ": Attempting to write duplicate header to "
177 << writer->filename();
178 CHECK(header.message().has_source_node_boot_uuid());
Austin Schuh72211ae2021-08-05 14:02:30 -0700179 CHECK_EQ(state_[node_index_].boot_uuid,
Austin Schuhe46492f2021-07-31 19:49:41 -0700180 UUID::FromString(header.message().source_node_boot_uuid()));
Austin Schuh510dc622021-08-06 18:47:30 -0700181 if (!writer) {
182 reopen_(this);
183 }
184
Austin Schuh58646e22021-08-23 23:51:46 -0700185 VLOG(1) << "Writing to " << filename() << " "
186 << aos::FlatbufferToJson(
187 header, {.multi_line = false, .max_vector_size = 100});
188
Austin Schuh572924a2021-07-30 22:32:12 -0700189 // TODO(austin): This triggers a dummy allocation that we don't need as part
190 // of releasing. Can we skip it?
Austin Schuh510dc622021-08-06 18:47:30 -0700191 CHECK(writer);
Austin Schuh572924a2021-07-30 22:32:12 -0700192 writer->QueueSizedFlatbuffer(header.Release());
193 header_written_ = true;
Austin Schuh58646e22021-08-23 23:51:46 -0700194 monotonic_start_time_ = log_namer_->monotonic_start_time(
195 node_index_, state_[node_index_].boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -0700196}
197
198void NewDataWriter::Close() {
199 CHECK(writer);
200 close_(this);
201 writer.reset();
202 header_written_ = false;
203}
204
Austin Schuh58646e22021-08-23 23:51:46 -0700205LogNamer::NodeState *LogNamer::GetNodeState(size_t node_index,
206 const UUID &boot_uuid) {
207 auto it = node_states_.find(std::make_pair(node_index, boot_uuid));
208 if (it == node_states_.end()) {
209 it =
210 node_states_.emplace(std::make_pair(node_index, boot_uuid), NodeState())
211 .first;
212 }
213 return &it->second;
214}
215
Austin Schuh73340842021-07-30 22:32:06 -0700216aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
Austin Schuh72211ae2021-08-05 14:02:30 -0700217 size_t node_index, const std::vector<NewDataWriter::State> &state,
Austin Schuh58646e22021-08-23 23:51:46 -0700218 const UUID &parts_uuid, int parts_index) {
Austin Schuh72211ae2021-08-05 14:02:30 -0700219 const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
Austin Schuh73340842021-07-30 22:32:06 -0700220 const Node *const source_node =
221 configuration::GetNode(configuration_, node_index);
Austin Schuh4db9ec92021-09-22 13:11:12 -0700222 CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 28u);
Austin Schuh73340842021-07-30 22:32:06 -0700223 flatbuffers::FlatBufferBuilder fbb;
224 fbb.ForceDefaults(true);
225
226 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
227 flatbuffers::Offset<aos::Configuration> configuration_offset;
228 if (header_.message().has_configuration()) {
229 CHECK(!header_.message().has_configuration_sha256());
230 configuration_offset =
231 CopyFlatBuffer(header_.message().configuration(), &fbb);
232 } else {
233 CHECK(!header_.message().has_configuration());
234 CHECK(header_.message().has_configuration_sha256());
235 config_sha256_offset = fbb.CreateString(
236 header_.message().configuration_sha256()->string_view());
237 }
238
239 CHECK(header_.message().has_name());
240 const flatbuffers::Offset<flatbuffers::String> name_offset =
241 fbb.CreateString(header_.message().name()->string_view());
242
243 CHECK(header_.message().has_log_event_uuid());
244 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
245 fbb.CreateString(header_.message().log_event_uuid()->string_view());
246
247 CHECK(header_.message().has_logger_instance_uuid());
248 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
249 fbb.CreateString(header_.message().logger_instance_uuid()->string_view());
250
251 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
252 if (header_.message().has_log_start_uuid()) {
253 log_start_uuid_offset =
254 fbb.CreateString(header_.message().log_start_uuid()->string_view());
255 }
256
257 CHECK(header_.message().has_logger_node_boot_uuid());
258 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
259 fbb.CreateString(
260 header_.message().logger_node_boot_uuid()->string_view());
261
262 CHECK_NE(source_node_boot_uuid, UUID::Zero());
263 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
264 source_node_boot_uuid.PackString(&fbb);
265
266 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
267 parts_uuid.PackString(&fbb);
268
269 flatbuffers::Offset<Node> node_offset;
270 flatbuffers::Offset<Node> logger_node_offset;
271
272 if (configuration::MultiNode(configuration_)) {
273 node_offset = RecursiveCopyFlatBuffer(source_node, &fbb);
274 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
275 }
276
Austin Schuhe46492f2021-07-31 19:49:41 -0700277 std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
Austin Schuh72211ae2021-08-05 14:02:30 -0700278 boot_uuid_offsets.reserve(state.size());
Austin Schuhe46492f2021-07-31 19:49:41 -0700279
Austin Schuh4db9ec92021-09-22 13:11:12 -0700280 int64_t *unused;
Austin Schuh72211ae2021-08-05 14:02:30 -0700281 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
282 oldest_remote_monotonic_timestamps_offset = fbb.CreateUninitializedVector(
Austin Schuh4db9ec92021-09-22 13:11:12 -0700283 state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700284
Austin Schuh72211ae2021-08-05 14:02:30 -0700285 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
286 oldest_local_monotonic_timestamps_offset = fbb.CreateUninitializedVector(
Austin Schuh4db9ec92021-09-22 13:11:12 -0700287 state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700288
Austin Schuh72211ae2021-08-05 14:02:30 -0700289 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
290 oldest_remote_unreliable_monotonic_timestamps_offset =
291 fbb.CreateUninitializedVector(
Austin Schuh4db9ec92021-09-22 13:11:12 -0700292 state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700293
Austin Schuh72211ae2021-08-05 14:02:30 -0700294 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
295 oldest_local_unreliable_monotonic_timestamps_offset =
296 fbb.CreateUninitializedVector(
Austin Schuh4db9ec92021-09-22 13:11:12 -0700297 state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700298
299 for (size_t i = 0; i < state.size(); ++i) {
Austin Schuh4db9ec92021-09-22 13:11:12 -0700300 if (state[i].boot_uuid != UUID::Zero()) {
301 boot_uuid_offsets.emplace_back(state[i].boot_uuid.PackString(&fbb));
302 } else {
303 boot_uuid_offsets.emplace_back(fbb.CreateString(""));
304 }
Austin Schuh5ae8f4a2021-09-11 19:09:50 -0700305 if (state[i].boot_uuid == UUID::Zero()) {
306 CHECK_EQ(state[i].oldest_remote_monotonic_timestamp,
307 monotonic_clock::max_time);
308 CHECK_EQ(state[i].oldest_local_monotonic_timestamp,
309 monotonic_clock::max_time);
310 CHECK_EQ(state[i].oldest_remote_unreliable_monotonic_timestamp,
311 monotonic_clock::max_time);
312 CHECK_EQ(state[i].oldest_local_unreliable_monotonic_timestamp,
313 monotonic_clock::max_time);
314 }
315
Austin Schuh4db9ec92021-09-22 13:11:12 -0700316 flatbuffers::GetMutableTemporaryPointer(
317 fbb, oldest_remote_monotonic_timestamps_offset)
318 ->Mutate(i, state[i]
319 .oldest_remote_monotonic_timestamp.time_since_epoch()
320 .count());
321 flatbuffers::GetMutableTemporaryPointer(
322 fbb, oldest_local_monotonic_timestamps_offset)
323 ->Mutate(i, state[i]
324 .oldest_local_monotonic_timestamp.time_since_epoch()
325 .count());
326 flatbuffers::GetMutableTemporaryPointer(
327 fbb, oldest_remote_unreliable_monotonic_timestamps_offset)
328 ->Mutate(i, state[i]
329 .oldest_remote_unreliable_monotonic_timestamp.time_since_epoch()
330 .count());
331 flatbuffers::GetMutableTemporaryPointer(
332 fbb, oldest_local_unreliable_monotonic_timestamps_offset)
333 ->Mutate(i, state[i]
334 .oldest_local_unreliable_monotonic_timestamp.time_since_epoch()
335 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700336 }
337
Austin Schuh4db9ec92021-09-22 13:11:12 -0700338 flatbuffers::Offset<
339 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
340 boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
341
Austin Schuh73340842021-07-30 22:32:06 -0700342 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
343
344 log_file_header_builder.add_name(name_offset);
345
346 // Only add the node if we are running in a multinode configuration.
347 if (!logger_node_offset.IsNull()) {
348 log_file_header_builder.add_node(node_offset);
349 log_file_header_builder.add_logger_node(logger_node_offset);
350 }
351
352 if (!configuration_offset.IsNull()) {
353 log_file_header_builder.add_configuration(configuration_offset);
354 }
355 log_file_header_builder.add_max_out_of_order_duration(
356 header_.message().max_out_of_order_duration());
357
Austin Schuh58646e22021-08-23 23:51:46 -0700358 NodeState *node_state = GetNodeState(node_index, source_node_boot_uuid);
Austin Schuh73340842021-07-30 22:32:06 -0700359 log_file_header_builder.add_monotonic_start_time(
360 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700361 node_state->monotonic_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700362 .count());
363 if (source_node == node_) {
364 log_file_header_builder.add_realtime_start_time(
365 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700366 node_state->realtime_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700367 .count());
368 } else {
369 // Fill out the legacy start times. Since these were implemented to never
370 // change on reboot, they aren't very helpful in tracking what happened.
371 log_file_header_builder.add_logger_monotonic_start_time(
372 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700373 node_state->logger_monotonic_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700374 .count());
375 log_file_header_builder.add_logger_realtime_start_time(
376 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700377 node_state->logger_realtime_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700378 .count());
379 }
380
381 // TODO(austin): Add more useful times. When was this part started? What do
382 // we know about both the logger and remote then?
383
384 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
385 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
386 if (!log_start_uuid_offset.IsNull()) {
387 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
388 }
389 log_file_header_builder.add_logger_node_boot_uuid(
390 logger_node_boot_uuid_offset);
391 log_file_header_builder.add_source_node_boot_uuid(
392 source_node_boot_uuid_offset);
393
394 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
395 log_file_header_builder.add_parts_index(parts_index);
396
397 if (!config_sha256_offset.IsNull()) {
398 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
399 }
400
Austin Schuhe46492f2021-07-31 19:49:41 -0700401 log_file_header_builder.add_boot_uuids(boot_uuids_offset);
Austin Schuha499cea2021-07-31 19:49:53 -0700402 log_file_header_builder.add_logger_part_monotonic_start_time(
403 std::chrono::duration_cast<std::chrono::nanoseconds>(
404 event_loop_->monotonic_now().time_since_epoch())
405 .count());
406 log_file_header_builder.add_logger_part_realtime_start_time(
407 std::chrono::duration_cast<std::chrono::nanoseconds>(
408 event_loop_->realtime_now().time_since_epoch())
409 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700410 log_file_header_builder.add_oldest_remote_monotonic_timestamps(
411 oldest_remote_monotonic_timestamps_offset);
412 log_file_header_builder.add_oldest_local_monotonic_timestamps(
413 oldest_local_monotonic_timestamps_offset);
414 log_file_header_builder.add_oldest_remote_unreliable_monotonic_timestamps(
415 oldest_remote_unreliable_monotonic_timestamps_offset);
416 log_file_header_builder.add_oldest_local_unreliable_monotonic_timestamps(
417 oldest_local_unreliable_monotonic_timestamps_offset);
Austin Schuh73340842021-07-30 22:32:06 -0700418 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
419 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
420 fbb.Release());
421
422 CHECK(result.Verify()) << ": Built a corrupted header.";
423
424 return result;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700425}
426
Austin Schuhb8bca732021-07-30 22:32:00 -0700427NewDataWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhdf576472020-10-19 09:39:37 -0700428 CHECK(configuration::ChannelIsSendableOnNode(channel, node()))
429 << ": " << configuration::CleanedChannelToString(channel);
Austin Schuhb8bca732021-07-30 22:32:00 -0700430 return &data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700431}
432
Austin Schuh73340842021-07-30 22:32:06 -0700433void LocalLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700434 CHECK(node == this->node());
Austin Schuhb8bca732021-07-30 22:32:00 -0700435 data_writer_.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700436}
Austin Schuh8c399962020-12-25 21:51:45 -0800437
438void LocalLogNamer::WriteConfiguration(
439 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
440 std::string_view config_sha256) {
441 const std::string filename = absl::StrCat(base_name_, config_sha256, ".bfbs");
442
443 std::unique_ptr<DetachedBufferWriter> writer =
444 std::make_unique<DetachedBufferWriter>(
445 filename, std::make_unique<aos::logger::DummyEncoder>());
446 writer->QueueSizedFlatbuffer(header->Release());
447}
448
Austin Schuhb8bca732021-07-30 22:32:00 -0700449NewDataWriter *LocalLogNamer::MakeTimestampWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700450 CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
451 << ": Message is not delivered to this node.";
452 CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
453 CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
454 node_))
455 << ": Delivery times aren't logged for this channel on this node.";
Austin Schuhb8bca732021-07-30 22:32:00 -0700456 return &data_writer_;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700457}
458
Austin Schuhb8bca732021-07-30 22:32:00 -0700459NewDataWriter *LocalLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700460 const Channel * /*channel*/, const Node * /*node*/) {
461 LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
462 return nullptr;
463}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700464MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
Austin Schuha499cea2021-07-31 19:49:53 -0700465 EventLoop *event_loop)
Austin Schuh5b728b72021-06-16 14:57:15 -0700466 : MultiNodeLogNamer(base_name, event_loop->configuration(), event_loop,
467 event_loop->node()) {}
468
469MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
470 const Configuration *configuration,
471 EventLoop *event_loop, const Node *node)
472 : LogNamer(configuration, event_loop, node),
473 base_name_(base_name),
474 old_base_name_() {}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700475
Brian Silverman48deab12020-09-30 18:39:28 -0700476MultiNodeLogNamer::~MultiNodeLogNamer() {
477 if (!ran_out_of_space_) {
478 // This handles renaming temporary files etc.
479 Close();
480 }
481}
482
Austin Schuh572924a2021-07-30 22:32:12 -0700483void MultiNodeLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700484 if (node == this->node()) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700485 if (data_writer_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700486 data_writer_->Rotate();
Brian Silvermancb805822020-10-06 17:43:35 -0700487 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700488 } else {
Austin Schuhb8bca732021-07-30 22:32:00 -0700489 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Austin Schuhcb5601b2020-09-10 15:29:59 -0700490 data_writers_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700491 if (node == data_writer.second.node()) {
492 data_writer.second.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700493 }
494 }
495 }
496}
497
Austin Schuh8c399962020-12-25 21:51:45 -0800498void MultiNodeLogNamer::WriteConfiguration(
499 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
500 std::string_view config_sha256) {
501 if (ran_out_of_space_) {
502 return;
503 }
504
505 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
506 const std::string filename = absl::StrCat(
507 base_name_, separator, config_sha256, ".bfbs", extension_, temp_suffix_);
508
509 std::unique_ptr<DetachedBufferWriter> writer =
510 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
511
512 writer->QueueSizedFlatbuffer(header->Release());
513
514 if (!writer->ran_out_of_space()) {
Austin Schuh5b728b72021-06-16 14:57:15 -0700515 all_filenames_.emplace_back(
516 absl::StrCat(config_sha256, ".bfbs", extension_));
Austin Schuh8c399962020-12-25 21:51:45 -0800517 }
518 CloseWriter(&writer);
519}
520
Austin Schuhb8bca732021-07-30 22:32:00 -0700521NewDataWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700522 // See if we can read the data on this node at all.
523 const bool is_readable =
524 configuration::ChannelIsReadableOnNode(channel, this->node());
525 if (!is_readable) {
526 return nullptr;
527 }
528
529 // Then, see if we are supposed to log the data here.
530 const bool log_message =
531 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
532
533 if (!log_message) {
534 return nullptr;
535 }
536
537 // Now, sort out if this is data generated on this node, or not. It is
538 // generated if it is sendable on this node.
539 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700540 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700541 OpenDataWriter();
542 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700543 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700544 }
545
546 // Ok, we have data that is being forwarded to us that we are supposed to
547 // log. It needs to be logged with send timestamps, but be sorted enough
548 // to be able to be processed.
549 CHECK(data_writers_.find(channel) == data_writers_.end());
550
551 // Track that this node is being logged.
552 const Node *source_node = configuration::GetNode(
553 configuration_, channel->source_node()->string_view());
554
555 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
556 nodes_.emplace_back(source_node);
557 }
558
Austin Schuh572924a2021-07-30 22:32:12 -0700559 NewDataWriter data_writer(this, source_node,
560 [this, channel](NewDataWriter *data_writer) {
561 OpenWriter(channel, data_writer);
562 },
563 [this](NewDataWriter *data_writer) {
564 CloseWriter(&data_writer->writer);
565 });
Austin Schuhb8bca732021-07-30 22:32:00 -0700566 return &(
567 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700568}
569
Austin Schuhb8bca732021-07-30 22:32:00 -0700570NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700571 const Channel *channel, const Node *node) {
572 // See if we can read the data on this node at all.
573 const bool is_readable =
574 configuration::ChannelIsReadableOnNode(channel, this->node());
575 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
576
577 CHECK(data_writers_.find(channel) == data_writers_.end());
578
579 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
580 nodes_.emplace_back(node);
581 }
582
Austin Schuh5b728b72021-06-16 14:57:15 -0700583 NewDataWriter data_writer(this, configuration::GetNode(configuration_, node),
Austin Schuh572924a2021-07-30 22:32:12 -0700584 [this, channel](NewDataWriter *data_writer) {
585 OpenForwardedTimestampWriter(channel,
586 data_writer);
587 },
588 [this](NewDataWriter *data_writer) {
589 CloseWriter(&data_writer->writer);
590 });
Austin Schuhb8bca732021-07-30 22:32:00 -0700591 return &(
592 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700593}
594
Austin Schuhb8bca732021-07-30 22:32:00 -0700595NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700596 bool log_delivery_times = false;
597 if (this->node() != nullptr) {
598 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
599 channel, this->node(), this->node());
600 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700601 if (!log_delivery_times) {
602 return nullptr;
603 }
604
Austin Schuhb8bca732021-07-30 22:32:00 -0700605 if (!data_writer_) {
Brian Silvermancb805822020-10-06 17:43:35 -0700606 OpenDataWriter();
607 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700608 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700609}
610
Brian Silverman0465fcf2020-09-24 00:29:18 -0700611void MultiNodeLogNamer::Close() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700612 data_writers_.clear();
613 data_writer_.reset();
Brian Silvermancb805822020-10-06 17:43:35 -0700614}
615
616void MultiNodeLogNamer::ResetStatistics() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700617 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Brian Silvermancb805822020-10-06 17:43:35 -0700618 data_writers_) {
Austin Schuhad0cfc32020-12-21 12:34:26 -0800619 if (!data_writer.second.writer) continue;
Brian Silvermancb805822020-10-06 17:43:35 -0700620 data_writer.second.writer->ResetStatistics();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700621 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700622 if (data_writer_) {
623 data_writer_->writer->ResetStatistics();
Brian Silvermancb805822020-10-06 17:43:35 -0700624 }
625 max_write_time_ = std::chrono::nanoseconds::zero();
626 max_write_time_bytes_ = -1;
627 max_write_time_messages_ = -1;
628 total_write_time_ = std::chrono::nanoseconds::zero();
629 total_write_count_ = 0;
630 total_write_messages_ = 0;
631 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700632}
633
Austin Schuhb8bca732021-07-30 22:32:00 -0700634void MultiNodeLogNamer::OpenForwardedTimestampWriter(
635 const Channel *channel, NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700636 std::string filename =
Austin Schuhe715eae2020-10-10 15:39:30 -0700637 absl::StrCat("timestamps", channel->name()->string_view(), "/",
Brian Silvermana621f522020-09-30 16:52:43 -0700638 channel->type()->string_view(), ".part",
Austin Schuh572924a2021-07-30 22:32:12 -0700639 data_writer->parts_index(), ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700640 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700641}
642
643void MultiNodeLogNamer::OpenWriter(const Channel *channel,
Austin Schuhb8bca732021-07-30 22:32:00 -0700644 NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700645 const std::string filename = absl::StrCat(
Austin Schuhe715eae2020-10-10 15:39:30 -0700646 CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
Brian Silvermana621f522020-09-30 16:52:43 -0700647 channel->name()->string_view(), "/", channel->type()->string_view(),
Austin Schuh572924a2021-07-30 22:32:12 -0700648 ".part", data_writer->parts_index(), ".bfbs", extension_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700649 CreateBufferWriter(filename, &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700650}
651
Brian Silvermana621f522020-09-30 16:52:43 -0700652void MultiNodeLogNamer::OpenDataWriter() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700653 if (!data_writer_) {
654 data_writer_ = std::make_unique<NewDataWriter>(
Austin Schuh572924a2021-07-30 22:32:12 -0700655 this, node_,
Austin Schuhb8bca732021-07-30 22:32:00 -0700656 [this](NewDataWriter *writer) {
657 std::string name;
658 if (node() != nullptr) {
659 name = absl::StrCat(name, node()->name()->string_view(), "_");
660 }
Austin Schuh572924a2021-07-30 22:32:12 -0700661 absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
Austin Schuhb8bca732021-07-30 22:32:00 -0700662 extension_);
663 CreateBufferWriter(name, &writer->writer);
664 },
665 [this](NewDataWriter *data_writer) {
666 CloseWriter(&data_writer->writer);
667 });
Brian Silverman7af8c902020-09-29 16:14:04 -0700668 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700669}
670
Brian Silverman0465fcf2020-09-24 00:29:18 -0700671void MultiNodeLogNamer::CreateBufferWriter(
Brian Silvermana621f522020-09-30 16:52:43 -0700672 std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700673 if (ran_out_of_space_) {
674 // Refuse to open any new files, which might skip data. Any existing files
675 // are in the same folder, which means they're on the same filesystem, which
676 // means they're probably going to run out of space and get stuck too.
Austin Schuha426f1f2021-03-31 22:27:41 -0700677 if (!destination->get()) {
678 // But avoid leaving a nullptr writer if we're out of space when
679 // attempting to open the first file.
680 *destination = std::make_unique<DetachedBufferWriter>(
681 DetachedBufferWriter::already_out_of_space_t());
682 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700683 return;
684 }
Austin Schuhe715eae2020-10-10 15:39:30 -0700685 const std::string_view separator = base_name_.back() == '/' ? "" : "_";
686 const std::string filename =
687 absl::StrCat(base_name_, separator, path, temp_suffix_);
Brian Silverman0465fcf2020-09-24 00:29:18 -0700688 if (!destination->get()) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700689 if (ran_out_of_space_) {
690 *destination = std::make_unique<DetachedBufferWriter>(
691 DetachedBufferWriter::already_out_of_space_t());
692 return;
693 }
Brian Silvermancb805822020-10-06 17:43:35 -0700694 *destination =
695 std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700696 if (!destination->get()->ran_out_of_space()) {
697 all_filenames_.emplace_back(path);
698 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700699 return;
700 }
Brian Silvermancb805822020-10-06 17:43:35 -0700701
702 CloseWriter(destination);
703 if (ran_out_of_space_) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700704 *destination->get() =
705 DetachedBufferWriter(DetachedBufferWriter::already_out_of_space_t());
Brian Silverman0465fcf2020-09-24 00:29:18 -0700706 return;
707 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700708
Brian Silvermancb805822020-10-06 17:43:35 -0700709 *destination->get() = DetachedBufferWriter(filename, encoder_factory_());
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700710 if (!destination->get()->ran_out_of_space()) {
711 all_filenames_.emplace_back(path);
712 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700713}
714
Brian Silverman48deab12020-09-30 18:39:28 -0700715void MultiNodeLogNamer::RenameTempFile(DetachedBufferWriter *destination) {
716 if (temp_suffix_.empty()) {
717 return;
718 }
Austin Schuh6bb8a822021-03-31 23:04:39 -0700719 std::string current_filename = std::string(destination->filename());
Brian Silverman48deab12020-09-30 18:39:28 -0700720 CHECK(current_filename.size() > temp_suffix_.size());
Austin Schuh6bb8a822021-03-31 23:04:39 -0700721 std::string final_filename =
Brian Silverman48deab12020-09-30 18:39:28 -0700722 current_filename.substr(0, current_filename.size() - temp_suffix_.size());
Austin Schuh6bb8a822021-03-31 23:04:39 -0700723 int result = rename(current_filename.c_str(), final_filename.c_str());
724
725 // When changing the base name, we rename the log folder while there active
726 // buffer writers. Therefore, the name of that active buffer may still refer
727 // to the old file location rather than the new one. This minimized changes to
728 // existing code.
729 if (result != 0 && errno != ENOSPC && !old_base_name_.empty()) {
730 auto offset = current_filename.find(old_base_name_);
731 if (offset != std::string::npos) {
732 current_filename.replace(offset, old_base_name_.length(), base_name_);
733 }
734 offset = final_filename.find(old_base_name_);
735 if (offset != std::string::npos) {
736 final_filename.replace(offset, old_base_name_.length(), base_name_);
737 }
738 result = rename(current_filename.c_str(), final_filename.c_str());
739 }
740
Brian Silverman48deab12020-09-30 18:39:28 -0700741 if (result != 0) {
742 if (errno == ENOSPC) {
743 ran_out_of_space_ = true;
744 return;
745 } else {
746 PLOG(FATAL) << "Renaming " << current_filename << " to " << final_filename
747 << " failed";
748 }
Austin Schuh6bb8a822021-03-31 23:04:39 -0700749 } else {
750 VLOG(1) << "Renamed " << current_filename << " -> " << final_filename;
Brian Silverman48deab12020-09-30 18:39:28 -0700751 }
752}
753
Brian Silvermancb805822020-10-06 17:43:35 -0700754void MultiNodeLogNamer::CloseWriter(
755 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
756 DetachedBufferWriter *const writer = writer_pointer->get();
757 if (!writer) {
758 return;
759 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700760 const bool was_open = writer->is_open();
Brian Silvermancb805822020-10-06 17:43:35 -0700761 writer->Close();
762
763 if (writer->max_write_time() > max_write_time_) {
764 max_write_time_ = writer->max_write_time();
765 max_write_time_bytes_ = writer->max_write_time_bytes();
766 max_write_time_messages_ = writer->max_write_time_messages();
767 }
768 total_write_time_ += writer->total_write_time();
769 total_write_count_ += writer->total_write_count();
770 total_write_messages_ += writer->total_write_messages();
771 total_write_bytes_ += writer->total_write_bytes();
772
773 if (writer->ran_out_of_space()) {
774 ran_out_of_space_ = true;
775 writer->acknowledge_out_of_space();
776 }
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700777 if (was_open) {
778 RenameTempFile(writer);
779 } else {
780 CHECK(access(std::string(writer->filename()).c_str(), F_OK) == -1)
781 << ": File should not exist: " << writer->filename();
782 }
Brian Silvermancb805822020-10-06 17:43:35 -0700783}
784
Austin Schuhcb5601b2020-09-10 15:29:59 -0700785} // namespace logger
786} // namespace aos