blob: 0e914d9d717759e53b68907c094c07cbcc629bac [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070010#include "flatbuffers/flatbuffers.h"
11#include "glog/logging.h"
12
Austin Schuhcb5601b2020-09-10 15:29:59 -070013#include "aos/events/logging/logfile_utils.h"
14#include "aos/events/logging/logger_generated.h"
Austin Schuh73340842021-07-30 22:32:06 -070015#include "aos/flatbuffer_merge.h"
Austin Schuh4385b142021-03-14 21:31:13 -070016#include "aos/uuid.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070017
Austin Schuh8bdfc492023-02-11 12:53:13 -080018DECLARE_int32(flush_size);
19
Austin Schuhcb5601b2020-09-10 15:29:59 -070020namespace aos {
21namespace logger {
22
Austin Schuh572924a2021-07-30 22:32:12 -070023NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
Austin Schuhf5f99f32022-02-07 20:05:37 -080024 const Node *logger_node,
Austin Schuh572924a2021-07-30 22:32:12 -070025 std::function<void(NewDataWriter *)> reopen,
Austin Schuh48d10d62022-10-16 22:19:23 -070026 std::function<void(NewDataWriter *)> close,
27 size_t max_message_size)
Austin Schuh572924a2021-07-30 22:32:12 -070028 : node_(node),
29 node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
Austin Schuhf5f99f32022-02-07 20:05:37 -080030 logger_node_index_(
31 configuration::GetNodeIndex(log_namer->configuration_, logger_node)),
Austin Schuh572924a2021-07-30 22:32:12 -070032 log_namer_(log_namer),
33 reopen_(std::move(reopen)),
Austin Schuh48d10d62022-10-16 22:19:23 -070034 close_(std::move(close)),
35 max_message_size_(max_message_size) {
Austin Schuh72211ae2021-08-05 14:02:30 -070036 state_.resize(configuration::NodesCount(log_namer->configuration_));
37 CHECK_LT(node_index_, state_.size());
Austin Schuh572924a2021-07-30 22:32:12 -070038}
39
40NewDataWriter::~NewDataWriter() {
41 if (writer) {
42 Close();
43 }
44}
45
46void NewDataWriter::Rotate() {
Austin Schuhe46492f2021-07-31 19:49:41 -070047 // No need to rotate if nothing has been written.
48 if (header_written_) {
Alexei Strotsbc082d82023-05-03 08:43:42 -070049 VLOG(1) << "Rotated " << name();
Austin Schuhe46492f2021-07-31 19:49:41 -070050 ++parts_index_;
51 reopen_(this);
52 header_written_ = false;
53 QueueHeader(MakeHeader());
54 }
Austin Schuh572924a2021-07-30 22:32:12 -070055}
56
Austin Schuh5e14d842022-01-21 12:02:15 -080057void NewDataWriter::Reboot(const UUID &source_node_boot_uuid) {
Austin Schuh572924a2021-07-30 22:32:12 -070058 parts_uuid_ = UUID::Random();
59 ++parts_index_;
60 reopen_(this);
61 header_written_ = false;
Austin Schuh5e14d842022-01-21 12:02:15 -080062 for (State &state : state_) {
63 state.boot_uuid = UUID::Zero();
64 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
65 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
66 state.oldest_remote_unreliable_monotonic_timestamp =
67 monotonic_clock::max_time;
68 state.oldest_local_unreliable_monotonic_timestamp =
69 monotonic_clock::max_time;
Austin Schuhbfe6c572022-01-27 20:48:20 -080070 state.oldest_remote_reliable_monotonic_timestamp =
71 monotonic_clock::max_time;
Austin Schuhf5f99f32022-02-07 20:05:37 -080072 state.oldest_local_reliable_monotonic_timestamp = monotonic_clock::max_time;
73 state.oldest_logger_remote_unreliable_monotonic_timestamp =
74 monotonic_clock::max_time;
75 state.oldest_logger_local_unreliable_monotonic_timestamp =
Austin Schuhbfe6c572022-01-27 20:48:20 -080076 monotonic_clock::max_time;
Austin Schuh5e14d842022-01-21 12:02:15 -080077 }
78
79 state_[node_index_].boot_uuid = source_node_boot_uuid;
80
Alexei Strotsbc082d82023-05-03 08:43:42 -070081 VLOG(1) << "Rebooted " << name();
Austin Schuh5e14d842022-01-21 12:02:15 -080082}
83
84void NewDataWriter::UpdateBoot(const UUID &source_node_boot_uuid) {
85 if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
86 state_[node_index_].boot_uuid = source_node_boot_uuid;
87 if (header_written_) {
88 Reboot(source_node_boot_uuid);
89 }
90 }
Austin Schuh572924a2021-07-30 22:32:12 -070091}
92
Austin Schuh72211ae2021-08-05 14:02:30 -070093void NewDataWriter::UpdateRemote(
94 const size_t remote_node_index, const UUID &remote_node_boot_uuid,
95 const monotonic_clock::time_point monotonic_remote_time,
Austin Schuhf5f99f32022-02-07 20:05:37 -080096 const monotonic_clock::time_point monotonic_event_time, const bool reliable,
97 monotonic_clock::time_point monotonic_timestamp_time) {
Austin Schuh58646e22021-08-23 23:51:46 -070098 // Trigger rotation if anything in the header changes.
Austin Schuh72211ae2021-08-05 14:02:30 -070099 bool rotate = false;
100 CHECK_LT(remote_node_index, state_.size());
101 State &state = state_[remote_node_index];
Austin Schuh58646e22021-08-23 23:51:46 -0700102
103 // Did the remote boot UUID change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700104 if (state.boot_uuid != remote_node_boot_uuid) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700105 VLOG(1) << name() << " Remote " << remote_node_index << " updated to "
Austin Schuh72211ae2021-08-05 14:02:30 -0700106 << remote_node_boot_uuid << " from " << state.boot_uuid;
107 state.boot_uuid = remote_node_boot_uuid;
108 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
109 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
110 state.oldest_remote_unreliable_monotonic_timestamp =
111 monotonic_clock::max_time;
112 state.oldest_local_unreliable_monotonic_timestamp =
113 monotonic_clock::max_time;
Austin Schuhbfe6c572022-01-27 20:48:20 -0800114 state.oldest_remote_reliable_monotonic_timestamp =
115 monotonic_clock::max_time;
Austin Schuhf5f99f32022-02-07 20:05:37 -0800116 state.oldest_local_reliable_monotonic_timestamp = monotonic_clock::max_time;
117 state.oldest_logger_remote_unreliable_monotonic_timestamp =
118 monotonic_clock::max_time;
119 state.oldest_logger_local_unreliable_monotonic_timestamp =
Austin Schuhbfe6c572022-01-27 20:48:20 -0800120 monotonic_clock::max_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700121 rotate = true;
122 }
123
Austin Schuh58646e22021-08-23 23:51:46 -0700124 // Did the unreliable timestamps change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700125 if (!reliable) {
126 if (state.oldest_remote_unreliable_monotonic_timestamp >
127 monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700128 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuh58646e22021-08-23 23:51:46 -0700129 << " oldest_remote_unreliable_monotonic_timestamp updated from "
130 << state.oldest_remote_unreliable_monotonic_timestamp << " to "
131 << monotonic_remote_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700132 state.oldest_remote_unreliable_monotonic_timestamp =
133 monotonic_remote_time;
134 state.oldest_local_unreliable_monotonic_timestamp = monotonic_event_time;
135 rotate = true;
136 }
Austin Schuhbfe6c572022-01-27 20:48:20 -0800137 } else {
138 if (state.oldest_remote_reliable_monotonic_timestamp >
139 monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700140 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuhbfe6c572022-01-27 20:48:20 -0800141 << " oldest_remote_reliable_monotonic_timestamp updated from "
142 << state.oldest_remote_reliable_monotonic_timestamp << " to "
143 << monotonic_remote_time;
144 state.oldest_remote_reliable_monotonic_timestamp = monotonic_remote_time;
145 state.oldest_local_reliable_monotonic_timestamp = monotonic_event_time;
146 rotate = true;
147 }
Austin Schuh72211ae2021-08-05 14:02:30 -0700148 }
149
Austin Schuhf5f99f32022-02-07 20:05:37 -0800150 // Track the logger timestamps too.
151 if (monotonic_timestamp_time != monotonic_clock::min_time) {
152 State &logger_state = state_[node_index_];
153 CHECK_EQ(remote_node_index, logger_node_index_);
154 if (monotonic_event_time <
155 logger_state.oldest_logger_remote_unreliable_monotonic_timestamp) {
156 VLOG(1)
Alexei Strotsbc082d82023-05-03 08:43:42 -0700157 << name() << " Remote " << node_index_
Austin Schuhf5f99f32022-02-07 20:05:37 -0800158 << " oldest_logger_remote_unreliable_monotonic_timestamp updated "
159 "from "
160 << logger_state.oldest_logger_remote_unreliable_monotonic_timestamp
161 << " to " << monotonic_event_time;
162 logger_state.oldest_logger_remote_unreliable_monotonic_timestamp =
163 monotonic_event_time;
164 logger_state.oldest_logger_local_unreliable_monotonic_timestamp =
165 monotonic_timestamp_time;
166
167 rotate = true;
168 }
169 }
170
Austin Schuh58646e22021-08-23 23:51:46 -0700171 // Did any of the timestamps change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700172 if (state.oldest_remote_monotonic_timestamp > monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700173 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuh58646e22021-08-23 23:51:46 -0700174 << " oldest_remote_monotonic_timestamp updated from "
175 << state.oldest_remote_monotonic_timestamp << " to "
176 << monotonic_remote_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700177 state.oldest_remote_monotonic_timestamp = monotonic_remote_time;
178 state.oldest_local_monotonic_timestamp = monotonic_event_time;
179 rotate = true;
180 }
181
182 if (rotate) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700183 Rotate();
184 }
185}
186
Austin Schuh48d10d62022-10-16 22:19:23 -0700187void NewDataWriter::CopyMessage(DataEncoder::Copier *coppier,
188 const UUID &source_node_boot_uuid,
189 aos::monotonic_clock::time_point now) {
Austin Schuh58646e22021-08-23 23:51:46 -0700190 // Trigger a reboot if we detect the boot UUID change.
Austin Schuh5e14d842022-01-21 12:02:15 -0800191 UpdateBoot(source_node_boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -0700192
Austin Schuh5e14d842022-01-21 12:02:15 -0800193 if (!header_written_) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700194 QueueHeader(MakeHeader());
Austin Schuh572924a2021-07-30 22:32:12 -0700195 }
Austin Schuh58646e22021-08-23 23:51:46 -0700196
197 // If the start time has changed for this node, trigger a rotation.
198 if (log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid) !=
Austin Schuh5e14d842022-01-21 12:02:15 -0800199 monotonic_start_time_) {
Austin Schuh58646e22021-08-23 23:51:46 -0700200 CHECK(header_written_);
201 Rotate();
202 }
203
204 CHECK_EQ(log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid),
205 monotonic_start_time_);
Austin Schuh72211ae2021-08-05 14:02:30 -0700206 CHECK_EQ(state_[node_index_].boot_uuid, source_node_boot_uuid);
milind-ua50344f2021-08-25 18:22:20 -0700207 CHECK(writer);
Austin Schuh572924a2021-07-30 22:32:12 -0700208 CHECK(header_written_) << ": Attempting to write message before header to "
Alexei Strotsbc082d82023-05-03 08:43:42 -0700209 << writer->name();
Austin Schuh48d10d62022-10-16 22:19:23 -0700210 writer->CopyMessage(coppier, now);
Austin Schuh572924a2021-07-30 22:32:12 -0700211}
212
Austin Schuhe46492f2021-07-31 19:49:41 -0700213aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
214NewDataWriter::MakeHeader() {
215 const size_t logger_node_index = log_namer_->logger_node_index();
216 const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
Austin Schuh72211ae2021-08-05 14:02:30 -0700217 if (state_[logger_node_index].boot_uuid == UUID::Zero()) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700218 VLOG(1) << name() << " Logger node is " << logger_node_index
Austin Schuhe46492f2021-07-31 19:49:41 -0700219 << " and uuid is " << logger_node_boot_uuid;
Austin Schuh72211ae2021-08-05 14:02:30 -0700220 state_[logger_node_index].boot_uuid = logger_node_boot_uuid;
Austin Schuhe46492f2021-07-31 19:49:41 -0700221 } else {
Austin Schuh72211ae2021-08-05 14:02:30 -0700222 CHECK_EQ(state_[logger_node_index].boot_uuid, logger_node_boot_uuid);
Austin Schuhe46492f2021-07-31 19:49:41 -0700223 }
Austin Schuh72211ae2021-08-05 14:02:30 -0700224 return log_namer_->MakeHeader(node_index_, state_, parts_uuid(),
Austin Schuhe46492f2021-07-31 19:49:41 -0700225 parts_index_);
226}
227
Austin Schuh572924a2021-07-30 22:32:12 -0700228void NewDataWriter::QueueHeader(
229 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header) {
230 CHECK(!header_written_) << ": Attempting to write duplicate header to "
Alexei Strotsbc082d82023-05-03 08:43:42 -0700231 << writer->name();
Austin Schuh572924a2021-07-30 22:32:12 -0700232 CHECK(header.message().has_source_node_boot_uuid());
Austin Schuh72211ae2021-08-05 14:02:30 -0700233 CHECK_EQ(state_[node_index_].boot_uuid,
Austin Schuhe46492f2021-07-31 19:49:41 -0700234 UUID::FromString(header.message().source_node_boot_uuid()));
Austin Schuh510dc622021-08-06 18:47:30 -0700235 if (!writer) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700236 // Since we haven't opened the first time, it's still not too late to update
237 // the max message size. Make sure the header fits.
238 //
239 // This won't work well on reboots, but the structure of the header is fixed
240 // by that point in time, so it's size is fixed too.
241 //
242 // Most of the time, the minimum buffer size inside the encoder of around
243 // 128k will make this a non-issue.
244 UpdateMaxMessageSize(header.span().size());
245
Austin Schuh510dc622021-08-06 18:47:30 -0700246 reopen_(this);
247 }
248
Alexei Strotsbc082d82023-05-03 08:43:42 -0700249 VLOG(1) << "Writing to " << name() << " "
Austin Schuh58646e22021-08-23 23:51:46 -0700250 << aos::FlatbufferToJson(
251 header, {.multi_line = false, .max_vector_size = 100});
252
Austin Schuh510dc622021-08-06 18:47:30 -0700253 CHECK(writer);
Austin Schuh7ef11a42023-02-04 17:15:12 -0800254 DataEncoder::SpanCopier coppier(header.span());
255 writer->CopyMessage(&coppier, aos::monotonic_clock::now());
Austin Schuh572924a2021-07-30 22:32:12 -0700256 header_written_ = true;
Austin Schuh58646e22021-08-23 23:51:46 -0700257 monotonic_start_time_ = log_namer_->monotonic_start_time(
258 node_index_, state_[node_index_].boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -0700259}
260
261void NewDataWriter::Close() {
262 CHECK(writer);
263 close_(this);
264 writer.reset();
265 header_written_ = false;
266}
267
Austin Schuh58646e22021-08-23 23:51:46 -0700268LogNamer::NodeState *LogNamer::GetNodeState(size_t node_index,
269 const UUID &boot_uuid) {
270 auto it = node_states_.find(std::make_pair(node_index, boot_uuid));
271 if (it == node_states_.end()) {
272 it =
273 node_states_.emplace(std::make_pair(node_index, boot_uuid), NodeState())
274 .first;
275 }
276 return &it->second;
277}
278
Austin Schuh73340842021-07-30 22:32:06 -0700279aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
Austin Schuh72211ae2021-08-05 14:02:30 -0700280 size_t node_index, const std::vector<NewDataWriter::State> &state,
Austin Schuh58646e22021-08-23 23:51:46 -0700281 const UUID &parts_uuid, int parts_index) {
Austin Schuh72211ae2021-08-05 14:02:30 -0700282 const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
Austin Schuh73340842021-07-30 22:32:06 -0700283 const Node *const source_node =
284 configuration::GetNode(configuration_, node_index);
Austin Schuhfa712682022-05-11 16:43:42 -0700285 CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 34u);
Austin Schuh73340842021-07-30 22:32:06 -0700286 flatbuffers::FlatBufferBuilder fbb;
287 fbb.ForceDefaults(true);
288
289 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
290 flatbuffers::Offset<aos::Configuration> configuration_offset;
291 if (header_.message().has_configuration()) {
292 CHECK(!header_.message().has_configuration_sha256());
293 configuration_offset =
294 CopyFlatBuffer(header_.message().configuration(), &fbb);
295 } else {
296 CHECK(!header_.message().has_configuration());
297 CHECK(header_.message().has_configuration_sha256());
298 config_sha256_offset = fbb.CreateString(
299 header_.message().configuration_sha256()->string_view());
300 }
301
302 CHECK(header_.message().has_name());
303 const flatbuffers::Offset<flatbuffers::String> name_offset =
304 fbb.CreateString(header_.message().name()->string_view());
Austin Schuhfa712682022-05-11 16:43:42 -0700305 const flatbuffers::Offset<flatbuffers::String> logger_sha1_offset =
306 header_.message().has_logger_sha1()
307 ? fbb.CreateString(header_.message().logger_sha1()->string_view())
308 : 0;
309 const flatbuffers::Offset<flatbuffers::String> logger_version_offset =
310 header_.message().has_logger_version()
311 ? fbb.CreateString(header_.message().logger_version()->string_view())
312 : 0;
Austin Schuh73340842021-07-30 22:32:06 -0700313
314 CHECK(header_.message().has_log_event_uuid());
315 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
316 fbb.CreateString(header_.message().log_event_uuid()->string_view());
317
318 CHECK(header_.message().has_logger_instance_uuid());
319 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
320 fbb.CreateString(header_.message().logger_instance_uuid()->string_view());
321
322 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
323 if (header_.message().has_log_start_uuid()) {
324 log_start_uuid_offset =
325 fbb.CreateString(header_.message().log_start_uuid()->string_view());
326 }
327
328 CHECK(header_.message().has_logger_node_boot_uuid());
329 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
330 fbb.CreateString(
331 header_.message().logger_node_boot_uuid()->string_view());
332
333 CHECK_NE(source_node_boot_uuid, UUID::Zero());
334 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
335 source_node_boot_uuid.PackString(&fbb);
336
337 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
338 parts_uuid.PackString(&fbb);
339
340 flatbuffers::Offset<Node> node_offset;
341 flatbuffers::Offset<Node> logger_node_offset;
342
343 if (configuration::MultiNode(configuration_)) {
344 node_offset = RecursiveCopyFlatBuffer(source_node, &fbb);
345 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
346 }
347
Austin Schuhe46492f2021-07-31 19:49:41 -0700348 std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
Austin Schuh72211ae2021-08-05 14:02:30 -0700349 boot_uuid_offsets.reserve(state.size());
Austin Schuhe46492f2021-07-31 19:49:41 -0700350
Austin Schuh4db9ec92021-09-22 13:11:12 -0700351 int64_t *unused;
Austin Schuh72211ae2021-08-05 14:02:30 -0700352 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
Austin Schuhf5f99f32022-02-07 20:05:37 -0800353 oldest_remote_monotonic_timestamps_offset =
354 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700355
Austin Schuh72211ae2021-08-05 14:02:30 -0700356 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
Austin Schuhf5f99f32022-02-07 20:05:37 -0800357 oldest_local_monotonic_timestamps_offset =
358 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700359
Austin Schuh72211ae2021-08-05 14:02:30 -0700360 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
361 oldest_remote_unreliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800362 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700363
Austin Schuh72211ae2021-08-05 14:02:30 -0700364 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
365 oldest_local_unreliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800366 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700367
Austin Schuhbfe6c572022-01-27 20:48:20 -0800368 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
369 oldest_remote_reliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800370 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800371
372 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
373 oldest_local_reliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800374 fbb.CreateUninitializedVector(state.size(), &unused);
375
376 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
377 oldest_logger_remote_unreliable_monotonic_timestamps_offset =
378 fbb.CreateUninitializedVector(state.size(), &unused);
379
380 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
381 oldest_logger_local_unreliable_monotonic_timestamps_offset =
382 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800383
Austin Schuh72211ae2021-08-05 14:02:30 -0700384 for (size_t i = 0; i < state.size(); ++i) {
Austin Schuh4db9ec92021-09-22 13:11:12 -0700385 if (state[i].boot_uuid != UUID::Zero()) {
386 boot_uuid_offsets.emplace_back(state[i].boot_uuid.PackString(&fbb));
387 } else {
388 boot_uuid_offsets.emplace_back(fbb.CreateString(""));
389 }
Austin Schuh5ae8f4a2021-09-11 19:09:50 -0700390 if (state[i].boot_uuid == UUID::Zero()) {
391 CHECK_EQ(state[i].oldest_remote_monotonic_timestamp,
392 monotonic_clock::max_time);
393 CHECK_EQ(state[i].oldest_local_monotonic_timestamp,
394 monotonic_clock::max_time);
395 CHECK_EQ(state[i].oldest_remote_unreliable_monotonic_timestamp,
396 monotonic_clock::max_time);
397 CHECK_EQ(state[i].oldest_local_unreliable_monotonic_timestamp,
398 monotonic_clock::max_time);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800399 CHECK_EQ(state[i].oldest_remote_reliable_monotonic_timestamp,
400 monotonic_clock::max_time);
401 CHECK_EQ(state[i].oldest_local_reliable_monotonic_timestamp,
402 monotonic_clock::max_time);
Austin Schuhf5f99f32022-02-07 20:05:37 -0800403 CHECK_EQ(state[i].oldest_logger_remote_unreliable_monotonic_timestamp,
404 monotonic_clock::max_time);
405 CHECK_EQ(state[i].oldest_logger_local_unreliable_monotonic_timestamp,
406 monotonic_clock::max_time);
Austin Schuh5ae8f4a2021-09-11 19:09:50 -0700407 }
408
Austin Schuh4db9ec92021-09-22 13:11:12 -0700409 flatbuffers::GetMutableTemporaryPointer(
410 fbb, oldest_remote_monotonic_timestamps_offset)
411 ->Mutate(i, state[i]
412 .oldest_remote_monotonic_timestamp.time_since_epoch()
413 .count());
414 flatbuffers::GetMutableTemporaryPointer(
415 fbb, oldest_local_monotonic_timestamps_offset)
416 ->Mutate(i, state[i]
417 .oldest_local_monotonic_timestamp.time_since_epoch()
418 .count());
419 flatbuffers::GetMutableTemporaryPointer(
420 fbb, oldest_remote_unreliable_monotonic_timestamps_offset)
421 ->Mutate(i, state[i]
Austin Schuhbfe6c572022-01-27 20:48:20 -0800422 .oldest_remote_unreliable_monotonic_timestamp
423 .time_since_epoch()
Austin Schuh4db9ec92021-09-22 13:11:12 -0700424 .count());
425 flatbuffers::GetMutableTemporaryPointer(
426 fbb, oldest_local_unreliable_monotonic_timestamps_offset)
427 ->Mutate(i, state[i]
Austin Schuhbfe6c572022-01-27 20:48:20 -0800428 .oldest_local_unreliable_monotonic_timestamp
429 .time_since_epoch()
Austin Schuh4db9ec92021-09-22 13:11:12 -0700430 .count());
Austin Schuhbfe6c572022-01-27 20:48:20 -0800431
432 flatbuffers::GetMutableTemporaryPointer(
433 fbb, oldest_remote_reliable_monotonic_timestamps_offset)
434 ->Mutate(i, state[i]
435 .oldest_remote_reliable_monotonic_timestamp
436 .time_since_epoch()
437 .count());
438 flatbuffers::GetMutableTemporaryPointer(
439 fbb, oldest_local_reliable_monotonic_timestamps_offset)
440 ->Mutate(
441 i, state[i]
442 .oldest_local_reliable_monotonic_timestamp.time_since_epoch()
443 .count());
Austin Schuhf5f99f32022-02-07 20:05:37 -0800444
445 flatbuffers::GetMutableTemporaryPointer(
446 fbb, oldest_logger_remote_unreliable_monotonic_timestamps_offset)
447 ->Mutate(i, state[i]
448 .oldest_logger_remote_unreliable_monotonic_timestamp
449 .time_since_epoch()
450 .count());
451 flatbuffers::GetMutableTemporaryPointer(
452 fbb, oldest_logger_local_unreliable_monotonic_timestamps_offset)
453 ->Mutate(i, state[i]
454 .oldest_logger_local_unreliable_monotonic_timestamp
455 .time_since_epoch()
456 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700457 }
458
Austin Schuh4db9ec92021-09-22 13:11:12 -0700459 flatbuffers::Offset<
460 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
461 boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
462
Austin Schuh73340842021-07-30 22:32:06 -0700463 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
464
465 log_file_header_builder.add_name(name_offset);
Austin Schuhfa712682022-05-11 16:43:42 -0700466 if (!logger_sha1_offset.IsNull()) {
467 log_file_header_builder.add_logger_sha1(logger_sha1_offset);
468 }
469 if (!logger_version_offset.IsNull()) {
470 log_file_header_builder.add_logger_version(logger_version_offset);
471 }
Austin Schuh73340842021-07-30 22:32:06 -0700472
473 // Only add the node if we are running in a multinode configuration.
474 if (!logger_node_offset.IsNull()) {
475 log_file_header_builder.add_node(node_offset);
476 log_file_header_builder.add_logger_node(logger_node_offset);
477 }
478
479 if (!configuration_offset.IsNull()) {
480 log_file_header_builder.add_configuration(configuration_offset);
481 }
482 log_file_header_builder.add_max_out_of_order_duration(
483 header_.message().max_out_of_order_duration());
484
Austin Schuh58646e22021-08-23 23:51:46 -0700485 NodeState *node_state = GetNodeState(node_index, source_node_boot_uuid);
Austin Schuh73340842021-07-30 22:32:06 -0700486 log_file_header_builder.add_monotonic_start_time(
487 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700488 node_state->monotonic_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700489 .count());
490 if (source_node == node_) {
491 log_file_header_builder.add_realtime_start_time(
492 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700493 node_state->realtime_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700494 .count());
495 } else {
496 // Fill out the legacy start times. Since these were implemented to never
497 // change on reboot, they aren't very helpful in tracking what happened.
498 log_file_header_builder.add_logger_monotonic_start_time(
499 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700500 node_state->logger_monotonic_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700501 .count());
502 log_file_header_builder.add_logger_realtime_start_time(
503 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700504 node_state->logger_realtime_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700505 .count());
506 }
507
508 // TODO(austin): Add more useful times. When was this part started? What do
509 // we know about both the logger and remote then?
510
511 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
512 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
513 if (!log_start_uuid_offset.IsNull()) {
514 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
515 }
516 log_file_header_builder.add_logger_node_boot_uuid(
517 logger_node_boot_uuid_offset);
518 log_file_header_builder.add_source_node_boot_uuid(
519 source_node_boot_uuid_offset);
520
521 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
522 log_file_header_builder.add_parts_index(parts_index);
523
524 if (!config_sha256_offset.IsNull()) {
525 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
526 }
527
Austin Schuhe46492f2021-07-31 19:49:41 -0700528 log_file_header_builder.add_boot_uuids(boot_uuids_offset);
Austin Schuha499cea2021-07-31 19:49:53 -0700529 log_file_header_builder.add_logger_part_monotonic_start_time(
530 std::chrono::duration_cast<std::chrono::nanoseconds>(
531 event_loop_->monotonic_now().time_since_epoch())
532 .count());
533 log_file_header_builder.add_logger_part_realtime_start_time(
534 std::chrono::duration_cast<std::chrono::nanoseconds>(
535 event_loop_->realtime_now().time_since_epoch())
536 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700537 log_file_header_builder.add_oldest_remote_monotonic_timestamps(
538 oldest_remote_monotonic_timestamps_offset);
539 log_file_header_builder.add_oldest_local_monotonic_timestamps(
540 oldest_local_monotonic_timestamps_offset);
541 log_file_header_builder.add_oldest_remote_unreliable_monotonic_timestamps(
542 oldest_remote_unreliable_monotonic_timestamps_offset);
543 log_file_header_builder.add_oldest_local_unreliable_monotonic_timestamps(
544 oldest_local_unreliable_monotonic_timestamps_offset);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800545 log_file_header_builder.add_oldest_remote_reliable_monotonic_timestamps(
546 oldest_remote_reliable_monotonic_timestamps_offset);
547 log_file_header_builder.add_oldest_local_reliable_monotonic_timestamps(
548 oldest_local_reliable_monotonic_timestamps_offset);
Austin Schuhf5f99f32022-02-07 20:05:37 -0800549 log_file_header_builder
550 .add_oldest_logger_remote_unreliable_monotonic_timestamps(
551 oldest_logger_remote_unreliable_monotonic_timestamps_offset);
552 log_file_header_builder
553 .add_oldest_logger_local_unreliable_monotonic_timestamps(
554 oldest_logger_local_unreliable_monotonic_timestamps_offset);
Austin Schuh73340842021-07-30 22:32:06 -0700555 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
556 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
557 fbb.Release());
558
559 CHECK(result.Verify()) << ": Built a corrupted header.";
560
561 return result;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700562}
563
Alexei Strotsbc082d82023-05-03 08:43:42 -0700564MultiNodeLogNamer::MultiNodeLogNamer(std::unique_ptr<LogBackend> log_backend,
565 EventLoop *event_loop)
Alexei Strots01395492023-03-20 13:59:56 -0700566 : MultiNodeLogNamer(std::move(log_backend), event_loop->configuration(),
567 event_loop, event_loop->node()) {}
Austin Schuh5b728b72021-06-16 14:57:15 -0700568
Alexei Strotsbc082d82023-05-03 08:43:42 -0700569MultiNodeLogNamer::MultiNodeLogNamer(std::unique_ptr<LogBackend> log_backend,
570 const Configuration *configuration,
571 EventLoop *event_loop, const Node *node)
Austin Schuh5b728b72021-06-16 14:57:15 -0700572 : LogNamer(configuration, event_loop, node),
Alexei Strots01395492023-03-20 13:59:56 -0700573 log_backend_(std::move(log_backend)),
Austin Schuh8bdfc492023-02-11 12:53:13 -0800574 encoder_factory_([](size_t max_message_size) {
575 // TODO(austin): For slow channels, can we allocate less memory?
576 return std::make_unique<DummyEncoder>(max_message_size,
577 FLAGS_flush_size);
578 }) {}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700579
Brian Silverman48deab12020-09-30 18:39:28 -0700580MultiNodeLogNamer::~MultiNodeLogNamer() {
581 if (!ran_out_of_space_) {
582 // This handles renaming temporary files etc.
583 Close();
584 }
585}
586
Austin Schuh572924a2021-07-30 22:32:12 -0700587void MultiNodeLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700588 if (node == this->node()) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700589 if (data_writer_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700590 data_writer_->Rotate();
Brian Silvermancb805822020-10-06 17:43:35 -0700591 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700592 } else {
Austin Schuhb8bca732021-07-30 22:32:00 -0700593 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Austin Schuhcb5601b2020-09-10 15:29:59 -0700594 data_writers_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700595 if (node == data_writer.second.node()) {
596 data_writer.second.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700597 }
598 }
599 }
600}
601
Austin Schuh8c399962020-12-25 21:51:45 -0800602void MultiNodeLogNamer::WriteConfiguration(
603 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
604 std::string_view config_sha256) {
605 if (ran_out_of_space_) {
606 return;
607 }
608
Alexei Strots01395492023-03-20 13:59:56 -0700609 const std::string filename = absl::StrCat(config_sha256, ".bfbs", extension_);
610 auto file_handle = log_backend_->RequestFile(filename);
Austin Schuh8c399962020-12-25 21:51:45 -0800611 std::unique_ptr<DetachedBufferWriter> writer =
Austin Schuh48d10d62022-10-16 22:19:23 -0700612 std::make_unique<DetachedBufferWriter>(
Alexei Strots01395492023-03-20 13:59:56 -0700613 std::move(file_handle), encoder_factory_(header->span().size()));
Austin Schuh8c399962020-12-25 21:51:45 -0800614
Austin Schuh7ef11a42023-02-04 17:15:12 -0800615 DataEncoder::SpanCopier coppier(header->span());
616 writer->CopyMessage(&coppier, aos::monotonic_clock::now());
Austin Schuh8c399962020-12-25 21:51:45 -0800617
618 if (!writer->ran_out_of_space()) {
Alexei Strots01395492023-03-20 13:59:56 -0700619 all_filenames_.emplace_back(filename);
Austin Schuh8c399962020-12-25 21:51:45 -0800620 }
Alexei Strots01395492023-03-20 13:59:56 -0700621 // Close the file and maybe rename it too.
Austin Schuh8c399962020-12-25 21:51:45 -0800622 CloseWriter(&writer);
623}
624
Austin Schuhb8bca732021-07-30 22:32:00 -0700625NewDataWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700626 // See if we can read the data on this node at all.
627 const bool is_readable =
628 configuration::ChannelIsReadableOnNode(channel, this->node());
629 if (!is_readable) {
630 return nullptr;
631 }
632
633 // Then, see if we are supposed to log the data here.
634 const bool log_message =
635 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
636
637 if (!log_message) {
638 return nullptr;
639 }
640
641 // Now, sort out if this is data generated on this node, or not. It is
642 // generated if it is sendable on this node.
643 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700644 if (!data_writer_) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700645 MakeDataWriter();
Brian Silvermancb805822020-10-06 17:43:35 -0700646 }
Alexei Strots01395492023-03-20 13:59:56 -0700647 data_writer_->UpdateMaxMessageSize(
648 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
Austin Schuhb8bca732021-07-30 22:32:00 -0700649 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700650 }
651
652 // Ok, we have data that is being forwarded to us that we are supposed to
653 // log. It needs to be logged with send timestamps, but be sorted enough
654 // to be able to be processed.
655 CHECK(data_writers_.find(channel) == data_writers_.end());
656
657 // Track that this node is being logged.
658 const Node *source_node = configuration::GetNode(
659 configuration_, channel->source_node()->string_view());
660
661 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
662 nodes_.emplace_back(source_node);
663 }
664
Austin Schuhf5f99f32022-02-07 20:05:37 -0800665 NewDataWriter data_writer(
666 this, source_node, node_,
667 [this, channel](NewDataWriter *data_writer) {
668 OpenWriter(channel, data_writer);
669 },
Austin Schuh48d10d62022-10-16 22:19:23 -0700670 [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
671 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
Austin Schuhb8bca732021-07-30 22:32:00 -0700672 return &(
673 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700674}
675
Austin Schuhb8bca732021-07-30 22:32:00 -0700676NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700677 const Channel *channel, const Node *node) {
678 // See if we can read the data on this node at all.
679 const bool is_readable =
680 configuration::ChannelIsReadableOnNode(channel, this->node());
681 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
682
683 CHECK(data_writers_.find(channel) == data_writers_.end());
684
685 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
686 nodes_.emplace_back(node);
687 }
688
Austin Schuhf5f99f32022-02-07 20:05:37 -0800689 NewDataWriter data_writer(
690 this, configuration::GetNode(configuration_, node), node_,
691 [this, channel](NewDataWriter *data_writer) {
692 OpenForwardedTimestampWriter(channel, data_writer);
693 },
Austin Schuh48d10d62022-10-16 22:19:23 -0700694 [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
695 PackRemoteMessageSize());
Austin Schuhb8bca732021-07-30 22:32:00 -0700696 return &(
697 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700698}
699
Austin Schuhb8bca732021-07-30 22:32:00 -0700700NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700701 bool log_delivery_times = false;
702 if (this->node() != nullptr) {
703 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
704 channel, this->node(), this->node());
705 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700706 if (!log_delivery_times) {
707 return nullptr;
708 }
709
Austin Schuhb8bca732021-07-30 22:32:00 -0700710 if (!data_writer_) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700711 MakeDataWriter();
Brian Silvermancb805822020-10-06 17:43:35 -0700712 }
Austin Schuh48d10d62022-10-16 22:19:23 -0700713 data_writer_->UpdateMaxMessageSize(
714 PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
Austin Schuhb8bca732021-07-30 22:32:00 -0700715 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700716}
717
Austin Schuh08dba8f2023-05-01 08:29:30 -0700718WriteCode MultiNodeLogNamer::Close() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700719 data_writers_.clear();
720 data_writer_.reset();
Austin Schuh08dba8f2023-05-01 08:29:30 -0700721 if (ran_out_of_space_) {
722 return WriteCode::kOutOfSpace;
723 }
724 return WriteCode::kOk;
Brian Silvermancb805822020-10-06 17:43:35 -0700725}
726
727void MultiNodeLogNamer::ResetStatistics() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700728 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Brian Silvermancb805822020-10-06 17:43:35 -0700729 data_writers_) {
Austin Schuhad0cfc32020-12-21 12:34:26 -0800730 if (!data_writer.second.writer) continue;
Alexei Strots01395492023-03-20 13:59:56 -0700731 data_writer.second.writer->WriteStatistics()->ResetStats();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700732 }
Austin Schuhb8bca732021-07-30 22:32:00 -0700733 if (data_writer_) {
Alexei Strots01395492023-03-20 13:59:56 -0700734 data_writer_->writer->WriteStatistics()->ResetStats();
Brian Silvermancb805822020-10-06 17:43:35 -0700735 }
736 max_write_time_ = std::chrono::nanoseconds::zero();
737 max_write_time_bytes_ = -1;
738 max_write_time_messages_ = -1;
739 total_write_time_ = std::chrono::nanoseconds::zero();
740 total_write_count_ = 0;
741 total_write_messages_ = 0;
742 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700743}
744
Austin Schuhb8bca732021-07-30 22:32:00 -0700745void MultiNodeLogNamer::OpenForwardedTimestampWriter(
746 const Channel *channel, NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700747 std::string filename =
Austin Schuhe715eae2020-10-10 15:39:30 -0700748 absl::StrCat("timestamps", channel->name()->string_view(), "/",
Brian Silvermana621f522020-09-30 16:52:43 -0700749 channel->type()->string_view(), ".part",
Austin Schuh572924a2021-07-30 22:32:12 -0700750 data_writer->parts_index(), ".bfbs", extension_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700751 CreateBufferWriter(filename, data_writer->max_message_size(),
752 &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700753}
754
755void MultiNodeLogNamer::OpenWriter(const Channel *channel,
Austin Schuhb8bca732021-07-30 22:32:00 -0700756 NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700757 const std::string filename = absl::StrCat(
Austin Schuhe715eae2020-10-10 15:39:30 -0700758 CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
Brian Silvermana621f522020-09-30 16:52:43 -0700759 channel->name()->string_view(), "/", channel->type()->string_view(),
Austin Schuh572924a2021-07-30 22:32:12 -0700760 ".part", data_writer->parts_index(), ".bfbs", extension_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700761 CreateBufferWriter(filename, data_writer->max_message_size(),
762 &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700763}
764
Austin Schuh48d10d62022-10-16 22:19:23 -0700765void MultiNodeLogNamer::MakeDataWriter() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700766 if (!data_writer_) {
767 data_writer_ = std::make_unique<NewDataWriter>(
Austin Schuhf5f99f32022-02-07 20:05:37 -0800768 this, node_, node_,
Austin Schuhb8bca732021-07-30 22:32:00 -0700769 [this](NewDataWriter *writer) {
770 std::string name;
771 if (node() != nullptr) {
772 name = absl::StrCat(name, node()->name()->string_view(), "_");
773 }
Austin Schuh572924a2021-07-30 22:32:12 -0700774 absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
Austin Schuhb8bca732021-07-30 22:32:00 -0700775 extension_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700776 CreateBufferWriter(name, writer->max_message_size(), &writer->writer);
Austin Schuhb8bca732021-07-30 22:32:00 -0700777 },
778 [this](NewDataWriter *data_writer) {
779 CloseWriter(&data_writer->writer);
Austin Schuh48d10d62022-10-16 22:19:23 -0700780 },
781 // Default size is 0 so it will be obvious if something doesn't fix it
782 // afterwards.
783 0);
Brian Silverman7af8c902020-09-29 16:14:04 -0700784 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700785}
786
Brian Silverman0465fcf2020-09-24 00:29:18 -0700787void MultiNodeLogNamer::CreateBufferWriter(
Austin Schuh48d10d62022-10-16 22:19:23 -0700788 std::string_view path, size_t max_message_size,
789 std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700790 if (ran_out_of_space_) {
791 // Refuse to open any new files, which might skip data. Any existing files
792 // are in the same folder, which means they're on the same filesystem, which
793 // means they're probably going to run out of space and get stuck too.
Alexei Strots01395492023-03-20 13:59:56 -0700794 if (!(*destination)) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700795 // But avoid leaving a nullptr writer if we're out of space when
796 // attempting to open the first file.
797 *destination = std::make_unique<DetachedBufferWriter>(
798 DetachedBufferWriter::already_out_of_space_t());
799 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700800 return;
801 }
Alexei Strots01395492023-03-20 13:59:56 -0700802
803 // Let's check that we need to close and replace current driver.
804 if (*destination) {
805 // Let's close the current writer.
806 CloseWriter(destination);
807 // Are we out of space now?
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700808 if (ran_out_of_space_) {
809 *destination = std::make_unique<DetachedBufferWriter>(
810 DetachedBufferWriter::already_out_of_space_t());
811 return;
812 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700813 }
Brian Silvermancb805822020-10-06 17:43:35 -0700814
Alexei Strots01395492023-03-20 13:59:56 -0700815 const std::string filename(path);
816 *destination = std::make_unique<DetachedBufferWriter>(
817 log_backend_->RequestFile(filename), encoder_factory_(max_message_size));
818 if (!(*destination)->ran_out_of_space()) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700819 all_filenames_.emplace_back(path);
820 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700821}
822
Brian Silvermancb805822020-10-06 17:43:35 -0700823void MultiNodeLogNamer::CloseWriter(
824 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
Alexei Strots01395492023-03-20 13:59:56 -0700825 CHECK_NOTNULL(writer_pointer);
826 if (!(*writer_pointer)) {
Brian Silvermancb805822020-10-06 17:43:35 -0700827 return;
828 }
Alexei Strots01395492023-03-20 13:59:56 -0700829 DetachedBufferWriter *const writer = writer_pointer->get();
Brian Silvermancb805822020-10-06 17:43:35 -0700830 writer->Close();
831
Alexei Strots01395492023-03-20 13:59:56 -0700832 const auto *stats = writer->WriteStatistics();
833 if (stats->max_write_time() > max_write_time_) {
834 max_write_time_ = stats->max_write_time();
835 max_write_time_bytes_ = stats->max_write_time_bytes();
836 max_write_time_messages_ = stats->max_write_time_messages();
Brian Silvermancb805822020-10-06 17:43:35 -0700837 }
Alexei Strots01395492023-03-20 13:59:56 -0700838 total_write_time_ += stats->total_write_time();
839 total_write_count_ += stats->total_write_count();
840 total_write_messages_ += stats->total_write_messages();
841 total_write_bytes_ += stats->total_write_bytes();
Brian Silvermancb805822020-10-06 17:43:35 -0700842
843 if (writer->ran_out_of_space()) {
844 ran_out_of_space_ = true;
845 writer->acknowledge_out_of_space();
846 }
Brian Silvermancb805822020-10-06 17:43:35 -0700847}
848
Austin Schuhcb5601b2020-09-10 15:29:59 -0700849} // namespace logger
850} // namespace aos