blob: 16bd609e06cfff24660a42b255a06e553f6930af [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070010#include "flatbuffers/flatbuffers.h"
11#include "glog/logging.h"
12
Austin Schuhcb5601b2020-09-10 15:29:59 -070013#include "aos/events/logging/logfile_utils.h"
14#include "aos/events/logging/logger_generated.h"
Austin Schuh73340842021-07-30 22:32:06 -070015#include "aos/flatbuffer_merge.h"
Austin Schuh4385b142021-03-14 21:31:13 -070016#include "aos/uuid.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070017
Austin Schuh8bdfc492023-02-11 12:53:13 -080018DECLARE_int32(flush_size);
19
Austin Schuhcb5601b2020-09-10 15:29:59 -070020namespace aos {
21namespace logger {
22
Austin Schuh572924a2021-07-30 22:32:12 -070023NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
Austin Schuhf5f99f32022-02-07 20:05:37 -080024 const Node *logger_node,
Austin Schuh572924a2021-07-30 22:32:12 -070025 std::function<void(NewDataWriter *)> reopen,
Austin Schuh48d10d62022-10-16 22:19:23 -070026 std::function<void(NewDataWriter *)> close,
27 size_t max_message_size)
Austin Schuh572924a2021-07-30 22:32:12 -070028 : node_(node),
29 node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
Austin Schuhf5f99f32022-02-07 20:05:37 -080030 logger_node_index_(
31 configuration::GetNodeIndex(log_namer->configuration_, logger_node)),
Austin Schuh572924a2021-07-30 22:32:12 -070032 log_namer_(log_namer),
33 reopen_(std::move(reopen)),
Austin Schuh48d10d62022-10-16 22:19:23 -070034 close_(std::move(close)),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -070035 max_message_size_(max_message_size),
36 max_out_of_order_duration_(log_namer_->base_max_out_of_order_duration()) {
Austin Schuh72211ae2021-08-05 14:02:30 -070037 state_.resize(configuration::NodesCount(log_namer->configuration_));
38 CHECK_LT(node_index_, state_.size());
Austin Schuh572924a2021-07-30 22:32:12 -070039}
40
41NewDataWriter::~NewDataWriter() {
42 if (writer) {
43 Close();
44 }
45}
46
47void NewDataWriter::Rotate() {
Austin Schuhe46492f2021-07-31 19:49:41 -070048 // No need to rotate if nothing has been written.
49 if (header_written_) {
Alexei Strotsbc082d82023-05-03 08:43:42 -070050 VLOG(1) << "Rotated " << name();
Austin Schuhe46492f2021-07-31 19:49:41 -070051 ++parts_index_;
52 reopen_(this);
53 header_written_ = false;
54 QueueHeader(MakeHeader());
55 }
Austin Schuh572924a2021-07-30 22:32:12 -070056}
57
Austin Schuh5e14d842022-01-21 12:02:15 -080058void NewDataWriter::Reboot(const UUID &source_node_boot_uuid) {
Austin Schuh572924a2021-07-30 22:32:12 -070059 parts_uuid_ = UUID::Random();
60 ++parts_index_;
61 reopen_(this);
62 header_written_ = false;
Austin Schuh5e14d842022-01-21 12:02:15 -080063 for (State &state : state_) {
64 state.boot_uuid = UUID::Zero();
65 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
66 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
67 state.oldest_remote_unreliable_monotonic_timestamp =
68 monotonic_clock::max_time;
69 state.oldest_local_unreliable_monotonic_timestamp =
70 monotonic_clock::max_time;
Austin Schuhbfe6c572022-01-27 20:48:20 -080071 state.oldest_remote_reliable_monotonic_timestamp =
72 monotonic_clock::max_time;
Austin Schuhf5f99f32022-02-07 20:05:37 -080073 state.oldest_local_reliable_monotonic_timestamp = monotonic_clock::max_time;
74 state.oldest_logger_remote_unreliable_monotonic_timestamp =
75 monotonic_clock::max_time;
76 state.oldest_logger_local_unreliable_monotonic_timestamp =
Austin Schuhbfe6c572022-01-27 20:48:20 -080077 monotonic_clock::max_time;
Austin Schuh5e14d842022-01-21 12:02:15 -080078 }
79
80 state_[node_index_].boot_uuid = source_node_boot_uuid;
81
Alexei Strotsbc082d82023-05-03 08:43:42 -070082 VLOG(1) << "Rebooted " << name();
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -070083 newest_message_time_ = monotonic_clock::min_time;
84 // When a node reboots, parts_uuid changes but the same writer continues to
85 // write the data, so we can reset the max out of order duration. If we don't
86 // do this, the max out of order duration can grow to an unreasonable value.
87 max_out_of_order_duration_ = log_namer_->base_max_out_of_order_duration();
Austin Schuh5e14d842022-01-21 12:02:15 -080088}
89
90void NewDataWriter::UpdateBoot(const UUID &source_node_boot_uuid) {
91 if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
92 state_[node_index_].boot_uuid = source_node_boot_uuid;
93 if (header_written_) {
94 Reboot(source_node_boot_uuid);
95 }
96 }
Austin Schuh572924a2021-07-30 22:32:12 -070097}
98
Austin Schuh72211ae2021-08-05 14:02:30 -070099void NewDataWriter::UpdateRemote(
100 const size_t remote_node_index, const UUID &remote_node_boot_uuid,
101 const monotonic_clock::time_point monotonic_remote_time,
Austin Schuhf5f99f32022-02-07 20:05:37 -0800102 const monotonic_clock::time_point monotonic_event_time, const bool reliable,
103 monotonic_clock::time_point monotonic_timestamp_time) {
Austin Schuh58646e22021-08-23 23:51:46 -0700104 // Trigger rotation if anything in the header changes.
Austin Schuh72211ae2021-08-05 14:02:30 -0700105 bool rotate = false;
106 CHECK_LT(remote_node_index, state_.size());
107 State &state = state_[remote_node_index];
Austin Schuh58646e22021-08-23 23:51:46 -0700108
109 // Did the remote boot UUID change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700110 if (state.boot_uuid != remote_node_boot_uuid) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700111 VLOG(1) << name() << " Remote " << remote_node_index << " updated to "
Austin Schuh72211ae2021-08-05 14:02:30 -0700112 << remote_node_boot_uuid << " from " << state.boot_uuid;
113 state.boot_uuid = remote_node_boot_uuid;
114 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
115 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
116 state.oldest_remote_unreliable_monotonic_timestamp =
117 monotonic_clock::max_time;
118 state.oldest_local_unreliable_monotonic_timestamp =
119 monotonic_clock::max_time;
Austin Schuhbfe6c572022-01-27 20:48:20 -0800120 state.oldest_remote_reliable_monotonic_timestamp =
121 monotonic_clock::max_time;
Austin Schuhf5f99f32022-02-07 20:05:37 -0800122 state.oldest_local_reliable_monotonic_timestamp = monotonic_clock::max_time;
123 state.oldest_logger_remote_unreliable_monotonic_timestamp =
124 monotonic_clock::max_time;
125 state.oldest_logger_local_unreliable_monotonic_timestamp =
Austin Schuhbfe6c572022-01-27 20:48:20 -0800126 monotonic_clock::max_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700127 rotate = true;
128 }
129
Austin Schuh58646e22021-08-23 23:51:46 -0700130 // Did the unreliable timestamps change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700131 if (!reliable) {
132 if (state.oldest_remote_unreliable_monotonic_timestamp >
133 monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700134 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuh58646e22021-08-23 23:51:46 -0700135 << " oldest_remote_unreliable_monotonic_timestamp updated from "
136 << state.oldest_remote_unreliable_monotonic_timestamp << " to "
137 << monotonic_remote_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700138 state.oldest_remote_unreliable_monotonic_timestamp =
139 monotonic_remote_time;
140 state.oldest_local_unreliable_monotonic_timestamp = monotonic_event_time;
141 rotate = true;
142 }
Austin Schuhbfe6c572022-01-27 20:48:20 -0800143 } else {
144 if (state.oldest_remote_reliable_monotonic_timestamp >
145 monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700146 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuhbfe6c572022-01-27 20:48:20 -0800147 << " oldest_remote_reliable_monotonic_timestamp updated from "
148 << state.oldest_remote_reliable_monotonic_timestamp << " to "
149 << monotonic_remote_time;
150 state.oldest_remote_reliable_monotonic_timestamp = monotonic_remote_time;
151 state.oldest_local_reliable_monotonic_timestamp = monotonic_event_time;
152 rotate = true;
153 }
Austin Schuh72211ae2021-08-05 14:02:30 -0700154 }
155
Austin Schuhf5f99f32022-02-07 20:05:37 -0800156 // Track the logger timestamps too.
157 if (monotonic_timestamp_time != monotonic_clock::min_time) {
158 State &logger_state = state_[node_index_];
159 CHECK_EQ(remote_node_index, logger_node_index_);
160 if (monotonic_event_time <
161 logger_state.oldest_logger_remote_unreliable_monotonic_timestamp) {
162 VLOG(1)
Alexei Strotsbc082d82023-05-03 08:43:42 -0700163 << name() << " Remote " << node_index_
Austin Schuhf5f99f32022-02-07 20:05:37 -0800164 << " oldest_logger_remote_unreliable_monotonic_timestamp updated "
165 "from "
166 << logger_state.oldest_logger_remote_unreliable_monotonic_timestamp
167 << " to " << monotonic_event_time;
168 logger_state.oldest_logger_remote_unreliable_monotonic_timestamp =
169 monotonic_event_time;
170 logger_state.oldest_logger_local_unreliable_monotonic_timestamp =
171 monotonic_timestamp_time;
172
173 rotate = true;
174 }
175 }
176
Austin Schuh58646e22021-08-23 23:51:46 -0700177 // Did any of the timestamps change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700178 if (state.oldest_remote_monotonic_timestamp > monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700179 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuh58646e22021-08-23 23:51:46 -0700180 << " oldest_remote_monotonic_timestamp updated from "
181 << state.oldest_remote_monotonic_timestamp << " to "
182 << monotonic_remote_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700183 state.oldest_remote_monotonic_timestamp = monotonic_remote_time;
184 state.oldest_local_monotonic_timestamp = monotonic_event_time;
185 rotate = true;
186 }
187
188 if (rotate) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700189 Rotate();
190 }
191}
192
Austin Schuh48d10d62022-10-16 22:19:23 -0700193void NewDataWriter::CopyMessage(DataEncoder::Copier *coppier,
194 const UUID &source_node_boot_uuid,
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700195 aos::monotonic_clock::time_point now,
196 aos::monotonic_clock::time_point message_time) {
Austin Schuh58646e22021-08-23 23:51:46 -0700197 // Trigger a reboot if we detect the boot UUID change.
Austin Schuh5e14d842022-01-21 12:02:15 -0800198 UpdateBoot(source_node_boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -0700199
Austin Schuh5e14d842022-01-21 12:02:15 -0800200 if (!header_written_) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700201 QueueHeader(MakeHeader());
Austin Schuh572924a2021-07-30 22:32:12 -0700202 }
Austin Schuh58646e22021-08-23 23:51:46 -0700203
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700204 bool max_out_of_order_duration_exceeded = false;
205 // Enforce max out of duration contract.
206 //
207 // Updates the newest message time.
208 // Rotate the part file if current message is more than
209 // max_out_of_order_duration behind the newest message we've logged so far.
210 if (message_time > newest_message_time_) {
211 newest_message_time_ = message_time;
212 }
213
214 // Don't consider messages before start up when checking for max out of order
215 // duration.
216 monotonic_clock::time_point monotonic_start_time =
217 log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid);
218
219 if (std::chrono::nanoseconds((newest_message_time_ -
220 std::max(monotonic_start_time, message_time))) >
221 max_out_of_order_duration_) {
222 // If the new message is older than 2 * max_out_order_duration, doubling it
223 // won't be sufficient.
224 //
225 // Example: newest_message_time = 10, logged_message_time = 5,
226 // max_ooo_duration = 2
227 //
228 // In this case actual max_ooo_duration = 10 - 5 = 5, but we double the
229 // existing max_ooo_duration we get 4 which is not sufficient.
230 //
231 // Take the max of the two values.
232 max_out_of_order_duration_ =
233 2 * std::max(max_out_of_order_duration_,
234 std::chrono::nanoseconds(
235 (newest_message_time_ - message_time)));
236 max_out_of_order_duration_exceeded = true;
237 }
238
Austin Schuh58646e22021-08-23 23:51:46 -0700239 // If the start time has changed for this node, trigger a rotation.
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700240 if ((monotonic_start_time != monotonic_start_time_) ||
241 max_out_of_order_duration_exceeded) {
242 // If we just received a start time now, we will rotate parts shortly. Use a
243 // reasonable max out of order durationin the new header based on start time
244 // information available now.
245 if ((monotonic_start_time_ == monotonic_clock::min_time) &&
246 (monotonic_start_time != monotonic_clock::min_time)) {
247 // If we're writing current messages but we receive an older start time,
248 // we can pick a reasonable max ooo duration number for the next part.
249 //
250 // For example - Our current max ooo duration is 0.3 seconds. We're
251 // writing messages at 20 seconds and recieve a start time of 1 second. We
252 // don't need max ooo duration to be (20 - 1) = 19 seconds although that
253 // would still work.
254 //
255 // Pick the minimum max out of duration value that satisifies the
256 // requirement but bound the minimum at the base value we started with.
257 max_out_of_order_duration_ =
258 std::max(log_namer_->base_max_out_of_order_duration(),
259 std::min(max_out_of_order_duration_,
260 std::chrono::nanoseconds(newest_message_time_ -
261 monotonic_start_time)));
262 }
Austin Schuh58646e22021-08-23 23:51:46 -0700263 CHECK(header_written_);
264 Rotate();
265 }
266
267 CHECK_EQ(log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid),
268 monotonic_start_time_);
Austin Schuh72211ae2021-08-05 14:02:30 -0700269 CHECK_EQ(state_[node_index_].boot_uuid, source_node_boot_uuid);
milind-ua50344f2021-08-25 18:22:20 -0700270 CHECK(writer);
Austin Schuh572924a2021-07-30 22:32:12 -0700271 CHECK(header_written_) << ": Attempting to write message before header to "
Alexei Strotsbc082d82023-05-03 08:43:42 -0700272 << writer->name();
Austin Schuh48d10d62022-10-16 22:19:23 -0700273 writer->CopyMessage(coppier, now);
Austin Schuh572924a2021-07-30 22:32:12 -0700274}
275
Austin Schuhe46492f2021-07-31 19:49:41 -0700276aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
277NewDataWriter::MakeHeader() {
278 const size_t logger_node_index = log_namer_->logger_node_index();
279 const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
Austin Schuh72211ae2021-08-05 14:02:30 -0700280 if (state_[logger_node_index].boot_uuid == UUID::Zero()) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700281 VLOG(1) << name() << " Logger node is " << logger_node_index
Austin Schuhe46492f2021-07-31 19:49:41 -0700282 << " and uuid is " << logger_node_boot_uuid;
Austin Schuh72211ae2021-08-05 14:02:30 -0700283 state_[logger_node_index].boot_uuid = logger_node_boot_uuid;
Austin Schuhe46492f2021-07-31 19:49:41 -0700284 } else {
Austin Schuh72211ae2021-08-05 14:02:30 -0700285 CHECK_EQ(state_[logger_node_index].boot_uuid, logger_node_boot_uuid);
Austin Schuhe46492f2021-07-31 19:49:41 -0700286 }
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700287 return log_namer_->MakeHeader(node_index_, state_, parts_uuid(), parts_index_,
288 max_out_of_order_duration_);
Austin Schuhe46492f2021-07-31 19:49:41 -0700289}
290
Austin Schuh572924a2021-07-30 22:32:12 -0700291void NewDataWriter::QueueHeader(
292 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header) {
293 CHECK(!header_written_) << ": Attempting to write duplicate header to "
Alexei Strotsbc082d82023-05-03 08:43:42 -0700294 << writer->name();
Austin Schuh572924a2021-07-30 22:32:12 -0700295 CHECK(header.message().has_source_node_boot_uuid());
Austin Schuh72211ae2021-08-05 14:02:30 -0700296 CHECK_EQ(state_[node_index_].boot_uuid,
Austin Schuhe46492f2021-07-31 19:49:41 -0700297 UUID::FromString(header.message().source_node_boot_uuid()));
Austin Schuh510dc622021-08-06 18:47:30 -0700298 if (!writer) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700299 // Since we haven't opened the first time, it's still not too late to update
300 // the max message size. Make sure the header fits.
301 //
302 // This won't work well on reboots, but the structure of the header is fixed
303 // by that point in time, so it's size is fixed too.
304 //
305 // Most of the time, the minimum buffer size inside the encoder of around
306 // 128k will make this a non-issue.
307 UpdateMaxMessageSize(header.span().size());
308
Austin Schuh510dc622021-08-06 18:47:30 -0700309 reopen_(this);
310 }
311
Alexei Strotsbc082d82023-05-03 08:43:42 -0700312 VLOG(1) << "Writing to " << name() << " "
Austin Schuh58646e22021-08-23 23:51:46 -0700313 << aos::FlatbufferToJson(
314 header, {.multi_line = false, .max_vector_size = 100});
315
Austin Schuh510dc622021-08-06 18:47:30 -0700316 CHECK(writer);
Austin Schuh7ef11a42023-02-04 17:15:12 -0800317 DataEncoder::SpanCopier coppier(header.span());
318 writer->CopyMessage(&coppier, aos::monotonic_clock::now());
Austin Schuh572924a2021-07-30 22:32:12 -0700319 header_written_ = true;
Austin Schuh58646e22021-08-23 23:51:46 -0700320 monotonic_start_time_ = log_namer_->monotonic_start_time(
321 node_index_, state_[node_index_].boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -0700322}
323
324void NewDataWriter::Close() {
325 CHECK(writer);
326 close_(this);
327 writer.reset();
328 header_written_ = false;
329}
330
Austin Schuh58646e22021-08-23 23:51:46 -0700331LogNamer::NodeState *LogNamer::GetNodeState(size_t node_index,
332 const UUID &boot_uuid) {
333 auto it = node_states_.find(std::make_pair(node_index, boot_uuid));
334 if (it == node_states_.end()) {
335 it =
336 node_states_.emplace(std::make_pair(node_index, boot_uuid), NodeState())
337 .first;
338 }
339 return &it->second;
340}
341
Austin Schuh73340842021-07-30 22:32:06 -0700342aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
Austin Schuh72211ae2021-08-05 14:02:30 -0700343 size_t node_index, const std::vector<NewDataWriter::State> &state,
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700344 const UUID &parts_uuid, int parts_index,
345 std::chrono::nanoseconds max_out_of_order_duration) {
Austin Schuh72211ae2021-08-05 14:02:30 -0700346 const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
Austin Schuh73340842021-07-30 22:32:06 -0700347 const Node *const source_node =
348 configuration::GetNode(configuration_, node_index);
Austin Schuhfa712682022-05-11 16:43:42 -0700349 CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 34u);
Austin Schuh73340842021-07-30 22:32:06 -0700350 flatbuffers::FlatBufferBuilder fbb;
351 fbb.ForceDefaults(true);
352
353 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
354 flatbuffers::Offset<aos::Configuration> configuration_offset;
355 if (header_.message().has_configuration()) {
356 CHECK(!header_.message().has_configuration_sha256());
357 configuration_offset =
358 CopyFlatBuffer(header_.message().configuration(), &fbb);
359 } else {
360 CHECK(!header_.message().has_configuration());
361 CHECK(header_.message().has_configuration_sha256());
362 config_sha256_offset = fbb.CreateString(
363 header_.message().configuration_sha256()->string_view());
364 }
365
366 CHECK(header_.message().has_name());
367 const flatbuffers::Offset<flatbuffers::String> name_offset =
368 fbb.CreateString(header_.message().name()->string_view());
Austin Schuhfa712682022-05-11 16:43:42 -0700369 const flatbuffers::Offset<flatbuffers::String> logger_sha1_offset =
370 header_.message().has_logger_sha1()
371 ? fbb.CreateString(header_.message().logger_sha1()->string_view())
372 : 0;
373 const flatbuffers::Offset<flatbuffers::String> logger_version_offset =
374 header_.message().has_logger_version()
375 ? fbb.CreateString(header_.message().logger_version()->string_view())
376 : 0;
Austin Schuh73340842021-07-30 22:32:06 -0700377
378 CHECK(header_.message().has_log_event_uuid());
379 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
380 fbb.CreateString(header_.message().log_event_uuid()->string_view());
381
382 CHECK(header_.message().has_logger_instance_uuid());
383 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
384 fbb.CreateString(header_.message().logger_instance_uuid()->string_view());
385
386 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
387 if (header_.message().has_log_start_uuid()) {
388 log_start_uuid_offset =
389 fbb.CreateString(header_.message().log_start_uuid()->string_view());
390 }
391
392 CHECK(header_.message().has_logger_node_boot_uuid());
393 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
394 fbb.CreateString(
395 header_.message().logger_node_boot_uuid()->string_view());
396
397 CHECK_NE(source_node_boot_uuid, UUID::Zero());
398 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
399 source_node_boot_uuid.PackString(&fbb);
400
401 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
402 parts_uuid.PackString(&fbb);
403
404 flatbuffers::Offset<Node> node_offset;
405 flatbuffers::Offset<Node> logger_node_offset;
406
407 if (configuration::MultiNode(configuration_)) {
408 node_offset = RecursiveCopyFlatBuffer(source_node, &fbb);
409 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
410 }
411
Austin Schuhe46492f2021-07-31 19:49:41 -0700412 std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
Austin Schuh72211ae2021-08-05 14:02:30 -0700413 boot_uuid_offsets.reserve(state.size());
Austin Schuhe46492f2021-07-31 19:49:41 -0700414
Austin Schuh4db9ec92021-09-22 13:11:12 -0700415 int64_t *unused;
Austin Schuh72211ae2021-08-05 14:02:30 -0700416 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
Austin Schuhf5f99f32022-02-07 20:05:37 -0800417 oldest_remote_monotonic_timestamps_offset =
418 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700419
Austin Schuh72211ae2021-08-05 14:02:30 -0700420 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
Austin Schuhf5f99f32022-02-07 20:05:37 -0800421 oldest_local_monotonic_timestamps_offset =
422 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700423
Austin Schuh72211ae2021-08-05 14:02:30 -0700424 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
425 oldest_remote_unreliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800426 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700427
Austin Schuh72211ae2021-08-05 14:02:30 -0700428 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
429 oldest_local_unreliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800430 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700431
Austin Schuhbfe6c572022-01-27 20:48:20 -0800432 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
433 oldest_remote_reliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800434 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800435
436 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
437 oldest_local_reliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800438 fbb.CreateUninitializedVector(state.size(), &unused);
439
440 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
441 oldest_logger_remote_unreliable_monotonic_timestamps_offset =
442 fbb.CreateUninitializedVector(state.size(), &unused);
443
444 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
445 oldest_logger_local_unreliable_monotonic_timestamps_offset =
446 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800447
Austin Schuh72211ae2021-08-05 14:02:30 -0700448 for (size_t i = 0; i < state.size(); ++i) {
Austin Schuh4db9ec92021-09-22 13:11:12 -0700449 if (state[i].boot_uuid != UUID::Zero()) {
450 boot_uuid_offsets.emplace_back(state[i].boot_uuid.PackString(&fbb));
451 } else {
452 boot_uuid_offsets.emplace_back(fbb.CreateString(""));
453 }
Austin Schuh5ae8f4a2021-09-11 19:09:50 -0700454 if (state[i].boot_uuid == UUID::Zero()) {
455 CHECK_EQ(state[i].oldest_remote_monotonic_timestamp,
456 monotonic_clock::max_time);
457 CHECK_EQ(state[i].oldest_local_monotonic_timestamp,
458 monotonic_clock::max_time);
459 CHECK_EQ(state[i].oldest_remote_unreliable_monotonic_timestamp,
460 monotonic_clock::max_time);
461 CHECK_EQ(state[i].oldest_local_unreliable_monotonic_timestamp,
462 monotonic_clock::max_time);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800463 CHECK_EQ(state[i].oldest_remote_reliable_monotonic_timestamp,
464 monotonic_clock::max_time);
465 CHECK_EQ(state[i].oldest_local_reliable_monotonic_timestamp,
466 monotonic_clock::max_time);
Austin Schuhf5f99f32022-02-07 20:05:37 -0800467 CHECK_EQ(state[i].oldest_logger_remote_unreliable_monotonic_timestamp,
468 monotonic_clock::max_time);
469 CHECK_EQ(state[i].oldest_logger_local_unreliable_monotonic_timestamp,
470 monotonic_clock::max_time);
Austin Schuh5ae8f4a2021-09-11 19:09:50 -0700471 }
472
Austin Schuh4db9ec92021-09-22 13:11:12 -0700473 flatbuffers::GetMutableTemporaryPointer(
474 fbb, oldest_remote_monotonic_timestamps_offset)
475 ->Mutate(i, state[i]
476 .oldest_remote_monotonic_timestamp.time_since_epoch()
477 .count());
478 flatbuffers::GetMutableTemporaryPointer(
479 fbb, oldest_local_monotonic_timestamps_offset)
480 ->Mutate(i, state[i]
481 .oldest_local_monotonic_timestamp.time_since_epoch()
482 .count());
483 flatbuffers::GetMutableTemporaryPointer(
484 fbb, oldest_remote_unreliable_monotonic_timestamps_offset)
485 ->Mutate(i, state[i]
Austin Schuhbfe6c572022-01-27 20:48:20 -0800486 .oldest_remote_unreliable_monotonic_timestamp
487 .time_since_epoch()
Austin Schuh4db9ec92021-09-22 13:11:12 -0700488 .count());
489 flatbuffers::GetMutableTemporaryPointer(
490 fbb, oldest_local_unreliable_monotonic_timestamps_offset)
491 ->Mutate(i, state[i]
Austin Schuhbfe6c572022-01-27 20:48:20 -0800492 .oldest_local_unreliable_monotonic_timestamp
493 .time_since_epoch()
Austin Schuh4db9ec92021-09-22 13:11:12 -0700494 .count());
Austin Schuhbfe6c572022-01-27 20:48:20 -0800495
496 flatbuffers::GetMutableTemporaryPointer(
497 fbb, oldest_remote_reliable_monotonic_timestamps_offset)
498 ->Mutate(i, state[i]
499 .oldest_remote_reliable_monotonic_timestamp
500 .time_since_epoch()
501 .count());
502 flatbuffers::GetMutableTemporaryPointer(
503 fbb, oldest_local_reliable_monotonic_timestamps_offset)
504 ->Mutate(
505 i, state[i]
506 .oldest_local_reliable_monotonic_timestamp.time_since_epoch()
507 .count());
Austin Schuhf5f99f32022-02-07 20:05:37 -0800508
509 flatbuffers::GetMutableTemporaryPointer(
510 fbb, oldest_logger_remote_unreliable_monotonic_timestamps_offset)
511 ->Mutate(i, state[i]
512 .oldest_logger_remote_unreliable_monotonic_timestamp
513 .time_since_epoch()
514 .count());
515 flatbuffers::GetMutableTemporaryPointer(
516 fbb, oldest_logger_local_unreliable_monotonic_timestamps_offset)
517 ->Mutate(i, state[i]
518 .oldest_logger_local_unreliable_monotonic_timestamp
519 .time_since_epoch()
520 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700521 }
522
Austin Schuh4db9ec92021-09-22 13:11:12 -0700523 flatbuffers::Offset<
524 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
525 boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
526
Austin Schuh73340842021-07-30 22:32:06 -0700527 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
528
529 log_file_header_builder.add_name(name_offset);
Austin Schuhfa712682022-05-11 16:43:42 -0700530 if (!logger_sha1_offset.IsNull()) {
531 log_file_header_builder.add_logger_sha1(logger_sha1_offset);
532 }
533 if (!logger_version_offset.IsNull()) {
534 log_file_header_builder.add_logger_version(logger_version_offset);
535 }
Austin Schuh73340842021-07-30 22:32:06 -0700536
537 // Only add the node if we are running in a multinode configuration.
538 if (!logger_node_offset.IsNull()) {
539 log_file_header_builder.add_node(node_offset);
540 log_file_header_builder.add_logger_node(logger_node_offset);
541 }
542
543 if (!configuration_offset.IsNull()) {
544 log_file_header_builder.add_configuration(configuration_offset);
545 }
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700546
Austin Schuh73340842021-07-30 22:32:06 -0700547 log_file_header_builder.add_max_out_of_order_duration(
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700548 max_out_of_order_duration.count());
Austin Schuh73340842021-07-30 22:32:06 -0700549
Austin Schuh58646e22021-08-23 23:51:46 -0700550 NodeState *node_state = GetNodeState(node_index, source_node_boot_uuid);
Austin Schuh73340842021-07-30 22:32:06 -0700551 log_file_header_builder.add_monotonic_start_time(
552 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700553 node_state->monotonic_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700554 .count());
555 if (source_node == node_) {
556 log_file_header_builder.add_realtime_start_time(
557 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700558 node_state->realtime_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700559 .count());
560 } else {
561 // Fill out the legacy start times. Since these were implemented to never
562 // change on reboot, they aren't very helpful in tracking what happened.
563 log_file_header_builder.add_logger_monotonic_start_time(
564 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700565 node_state->logger_monotonic_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700566 .count());
567 log_file_header_builder.add_logger_realtime_start_time(
568 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700569 node_state->logger_realtime_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700570 .count());
571 }
572
573 // TODO(austin): Add more useful times. When was this part started? What do
574 // we know about both the logger and remote then?
575
576 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
577 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
578 if (!log_start_uuid_offset.IsNull()) {
579 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
580 }
581 log_file_header_builder.add_logger_node_boot_uuid(
582 logger_node_boot_uuid_offset);
583 log_file_header_builder.add_source_node_boot_uuid(
584 source_node_boot_uuid_offset);
585
586 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
587 log_file_header_builder.add_parts_index(parts_index);
588
589 if (!config_sha256_offset.IsNull()) {
590 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
591 }
592
Austin Schuhe46492f2021-07-31 19:49:41 -0700593 log_file_header_builder.add_boot_uuids(boot_uuids_offset);
Austin Schuha499cea2021-07-31 19:49:53 -0700594 log_file_header_builder.add_logger_part_monotonic_start_time(
595 std::chrono::duration_cast<std::chrono::nanoseconds>(
596 event_loop_->monotonic_now().time_since_epoch())
597 .count());
598 log_file_header_builder.add_logger_part_realtime_start_time(
599 std::chrono::duration_cast<std::chrono::nanoseconds>(
600 event_loop_->realtime_now().time_since_epoch())
601 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700602 log_file_header_builder.add_oldest_remote_monotonic_timestamps(
603 oldest_remote_monotonic_timestamps_offset);
604 log_file_header_builder.add_oldest_local_monotonic_timestamps(
605 oldest_local_monotonic_timestamps_offset);
606 log_file_header_builder.add_oldest_remote_unreliable_monotonic_timestamps(
607 oldest_remote_unreliable_monotonic_timestamps_offset);
608 log_file_header_builder.add_oldest_local_unreliable_monotonic_timestamps(
609 oldest_local_unreliable_monotonic_timestamps_offset);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800610 log_file_header_builder.add_oldest_remote_reliable_monotonic_timestamps(
611 oldest_remote_reliable_monotonic_timestamps_offset);
612 log_file_header_builder.add_oldest_local_reliable_monotonic_timestamps(
613 oldest_local_reliable_monotonic_timestamps_offset);
Austin Schuhf5f99f32022-02-07 20:05:37 -0800614 log_file_header_builder
615 .add_oldest_logger_remote_unreliable_monotonic_timestamps(
616 oldest_logger_remote_unreliable_monotonic_timestamps_offset);
617 log_file_header_builder
618 .add_oldest_logger_local_unreliable_monotonic_timestamps(
619 oldest_logger_local_unreliable_monotonic_timestamps_offset);
Austin Schuh73340842021-07-30 22:32:06 -0700620 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
621 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
622 fbb.Release());
623
624 CHECK(result.Verify()) << ": Built a corrupted header.";
625
626 return result;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700627}
628
Alexei Strotsbc082d82023-05-03 08:43:42 -0700629MultiNodeLogNamer::MultiNodeLogNamer(std::unique_ptr<LogBackend> log_backend,
630 EventLoop *event_loop)
Alexei Strots01395492023-03-20 13:59:56 -0700631 : MultiNodeLogNamer(std::move(log_backend), event_loop->configuration(),
632 event_loop, event_loop->node()) {}
Austin Schuh5b728b72021-06-16 14:57:15 -0700633
Alexei Strotsbc082d82023-05-03 08:43:42 -0700634MultiNodeLogNamer::MultiNodeLogNamer(std::unique_ptr<LogBackend> log_backend,
635 const Configuration *configuration,
636 EventLoop *event_loop, const Node *node)
Austin Schuh5b728b72021-06-16 14:57:15 -0700637 : LogNamer(configuration, event_loop, node),
Alexei Strots01395492023-03-20 13:59:56 -0700638 log_backend_(std::move(log_backend)),
Austin Schuh8bdfc492023-02-11 12:53:13 -0800639 encoder_factory_([](size_t max_message_size) {
640 // TODO(austin): For slow channels, can we allocate less memory?
641 return std::make_unique<DummyEncoder>(max_message_size,
642 FLAGS_flush_size);
643 }) {}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700644
Brian Silverman48deab12020-09-30 18:39:28 -0700645MultiNodeLogNamer::~MultiNodeLogNamer() {
646 if (!ran_out_of_space_) {
647 // This handles renaming temporary files etc.
648 Close();
649 }
650}
651
Austin Schuh572924a2021-07-30 22:32:12 -0700652void MultiNodeLogNamer::Rotate(const Node *node) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700653 if (node == this->node()) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700654 if (data_writer_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700655 data_writer_->Rotate();
Brian Silvermancb805822020-10-06 17:43:35 -0700656 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700657 } else {
Austin Schuhb8bca732021-07-30 22:32:00 -0700658 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Austin Schuhcb5601b2020-09-10 15:29:59 -0700659 data_writers_) {
Austin Schuh572924a2021-07-30 22:32:12 -0700660 if (node == data_writer.second.node()) {
661 data_writer.second.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700662 }
663 }
664 }
665}
666
Austin Schuh8c399962020-12-25 21:51:45 -0800667void MultiNodeLogNamer::WriteConfiguration(
668 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
669 std::string_view config_sha256) {
670 if (ran_out_of_space_) {
671 return;
672 }
673
Alexei Strots01395492023-03-20 13:59:56 -0700674 const std::string filename = absl::StrCat(config_sha256, ".bfbs", extension_);
675 auto file_handle = log_backend_->RequestFile(filename);
Austin Schuh8c399962020-12-25 21:51:45 -0800676 std::unique_ptr<DetachedBufferWriter> writer =
Austin Schuh48d10d62022-10-16 22:19:23 -0700677 std::make_unique<DetachedBufferWriter>(
Alexei Strots01395492023-03-20 13:59:56 -0700678 std::move(file_handle), encoder_factory_(header->span().size()));
Austin Schuh8c399962020-12-25 21:51:45 -0800679
Austin Schuh7ef11a42023-02-04 17:15:12 -0800680 DataEncoder::SpanCopier coppier(header->span());
681 writer->CopyMessage(&coppier, aos::monotonic_clock::now());
Austin Schuh8c399962020-12-25 21:51:45 -0800682
683 if (!writer->ran_out_of_space()) {
Alexei Strots01395492023-03-20 13:59:56 -0700684 all_filenames_.emplace_back(filename);
Austin Schuh8c399962020-12-25 21:51:45 -0800685 }
Alexei Strots01395492023-03-20 13:59:56 -0700686 // Close the file and maybe rename it too.
Austin Schuh8c399962020-12-25 21:51:45 -0800687 CloseWriter(&writer);
688}
689
Austin Schuhb8bca732021-07-30 22:32:00 -0700690NewDataWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700691 // See if we can read the data on this node at all.
692 const bool is_readable =
693 configuration::ChannelIsReadableOnNode(channel, this->node());
694 if (!is_readable) {
695 return nullptr;
696 }
697
698 // Then, see if we are supposed to log the data here.
699 const bool log_message =
700 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
701
702 if (!log_message) {
703 return nullptr;
704 }
705
706 // Now, sort out if this is data generated on this node, or not. It is
707 // generated if it is sendable on this node.
708 if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
Austin Schuhb8bca732021-07-30 22:32:00 -0700709 if (!data_writer_) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700710 MakeDataWriter();
Brian Silvermancb805822020-10-06 17:43:35 -0700711 }
Alexei Strots01395492023-03-20 13:59:56 -0700712 data_writer_->UpdateMaxMessageSize(
713 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
Austin Schuhb8bca732021-07-30 22:32:00 -0700714 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700715 }
716
717 // Ok, we have data that is being forwarded to us that we are supposed to
718 // log. It needs to be logged with send timestamps, but be sorted enough
719 // to be able to be processed.
720 CHECK(data_writers_.find(channel) == data_writers_.end());
721
722 // Track that this node is being logged.
723 const Node *source_node = configuration::GetNode(
724 configuration_, channel->source_node()->string_view());
725
726 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
727 nodes_.emplace_back(source_node);
728 }
729
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700730 // If we already have a data writer for the node, then use the same writer for
731 // all channels of that node.
732 if (node_data_writers_.find(source_node) != node_data_writers_.end()) {
733 node_data_writers_.find(source_node)
734 ->second->UpdateMaxMessageSize(
735 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
736 return node_data_writers_.find(source_node)->second;
737 }
738
739 // If we don't have a data writer for the node, create one.
Austin Schuhf5f99f32022-02-07 20:05:37 -0800740 NewDataWriter data_writer(
741 this, source_node, node_,
742 [this, channel](NewDataWriter *data_writer) {
743 OpenWriter(channel, data_writer);
744 },
Austin Schuh48d10d62022-10-16 22:19:23 -0700745 [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700746 0);
747 data_writer.UpdateMaxMessageSize(
Austin Schuh48d10d62022-10-16 22:19:23 -0700748 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700749
750 data_writers_.emplace(channel, std::move(data_writer));
751 node_data_writers_.emplace(source_node, &data_writers_.find(channel)->second);
752 return &(data_writers_.find(channel)->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700753}
754
Austin Schuhb8bca732021-07-30 22:32:00 -0700755NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700756 const Channel *channel, const Node *node) {
757 // See if we can read the data on this node at all.
758 const bool is_readable =
759 configuration::ChannelIsReadableOnNode(channel, this->node());
760 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
761
762 CHECK(data_writers_.find(channel) == data_writers_.end());
763
764 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
765 nodes_.emplace_back(node);
766 }
767
Austin Schuhf5f99f32022-02-07 20:05:37 -0800768 NewDataWriter data_writer(
769 this, configuration::GetNode(configuration_, node), node_,
770 [this, channel](NewDataWriter *data_writer) {
771 OpenForwardedTimestampWriter(channel, data_writer);
772 },
Austin Schuh48d10d62022-10-16 22:19:23 -0700773 [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
774 PackRemoteMessageSize());
Austin Schuhb8bca732021-07-30 22:32:00 -0700775 return &(
776 data_writers_.emplace(channel, std::move(data_writer)).first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700777}
778
Austin Schuhb8bca732021-07-30 22:32:00 -0700779NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700780 bool log_delivery_times = false;
781 if (this->node() != nullptr) {
782 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
783 channel, this->node(), this->node());
784 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700785 if (!log_delivery_times) {
786 return nullptr;
787 }
788
Austin Schuhb8bca732021-07-30 22:32:00 -0700789 if (!data_writer_) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700790 MakeDataWriter();
Brian Silvermancb805822020-10-06 17:43:35 -0700791 }
Austin Schuh48d10d62022-10-16 22:19:23 -0700792 data_writer_->UpdateMaxMessageSize(
793 PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
Austin Schuhb8bca732021-07-30 22:32:00 -0700794 return data_writer_.get();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700795}
796
Austin Schuh08dba8f2023-05-01 08:29:30 -0700797WriteCode MultiNodeLogNamer::Close() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700798 data_writers_.clear();
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700799 node_data_writers_.clear();
Austin Schuhb8bca732021-07-30 22:32:00 -0700800 data_writer_.reset();
Austin Schuh08dba8f2023-05-01 08:29:30 -0700801 if (ran_out_of_space_) {
802 return WriteCode::kOutOfSpace;
803 }
804 return WriteCode::kOk;
Brian Silvermancb805822020-10-06 17:43:35 -0700805}
806
807void MultiNodeLogNamer::ResetStatistics() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700808 for (std::pair<const Channel *const, NewDataWriter> &data_writer :
Brian Silvermancb805822020-10-06 17:43:35 -0700809 data_writers_) {
Austin Schuhad0cfc32020-12-21 12:34:26 -0800810 if (!data_writer.second.writer) continue;
Alexei Strots01395492023-03-20 13:59:56 -0700811 data_writer.second.writer->WriteStatistics()->ResetStats();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700812 }
Maxwell Gumley8ad77782023-07-11 13:27:03 -0600813 if (data_writer_ != nullptr && data_writer_->writer != nullptr) {
Alexei Strots01395492023-03-20 13:59:56 -0700814 data_writer_->writer->WriteStatistics()->ResetStats();
Brian Silvermancb805822020-10-06 17:43:35 -0700815 }
816 max_write_time_ = std::chrono::nanoseconds::zero();
817 max_write_time_bytes_ = -1;
818 max_write_time_messages_ = -1;
819 total_write_time_ = std::chrono::nanoseconds::zero();
820 total_write_count_ = 0;
821 total_write_messages_ = 0;
822 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700823}
824
Austin Schuhb8bca732021-07-30 22:32:00 -0700825void MultiNodeLogNamer::OpenForwardedTimestampWriter(
826 const Channel *channel, NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700827 std::string filename =
Austin Schuhe715eae2020-10-10 15:39:30 -0700828 absl::StrCat("timestamps", channel->name()->string_view(), "/",
Brian Silvermana621f522020-09-30 16:52:43 -0700829 channel->type()->string_view(), ".part",
Austin Schuh572924a2021-07-30 22:32:12 -0700830 data_writer->parts_index(), ".bfbs", extension_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700831 CreateBufferWriter(filename, data_writer->max_message_size(),
832 &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700833}
834
835void MultiNodeLogNamer::OpenWriter(const Channel *channel,
Austin Schuhb8bca732021-07-30 22:32:00 -0700836 NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700837 const std::string filename = absl::StrCat(
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700838 CHECK_NOTNULL(channel->source_node())->string_view(), "_data/",
839 channel->source_node()->string_view(), "_data.part",
840 data_writer->parts_index(), ".bfbs", extension_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700841 CreateBufferWriter(filename, data_writer->max_message_size(),
842 &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700843}
844
Austin Schuh48d10d62022-10-16 22:19:23 -0700845void MultiNodeLogNamer::MakeDataWriter() {
Austin Schuhb8bca732021-07-30 22:32:00 -0700846 if (!data_writer_) {
847 data_writer_ = std::make_unique<NewDataWriter>(
Austin Schuhf5f99f32022-02-07 20:05:37 -0800848 this, node_, node_,
Austin Schuhb8bca732021-07-30 22:32:00 -0700849 [this](NewDataWriter *writer) {
850 std::string name;
851 if (node() != nullptr) {
852 name = absl::StrCat(name, node()->name()->string_view(), "_");
853 }
Austin Schuh572924a2021-07-30 22:32:12 -0700854 absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
Austin Schuhb8bca732021-07-30 22:32:00 -0700855 extension_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700856 CreateBufferWriter(name, writer->max_message_size(), &writer->writer);
Austin Schuhb8bca732021-07-30 22:32:00 -0700857 },
858 [this](NewDataWriter *data_writer) {
859 CloseWriter(&data_writer->writer);
Austin Schuh48d10d62022-10-16 22:19:23 -0700860 },
861 // Default size is 0 so it will be obvious if something doesn't fix it
862 // afterwards.
863 0);
Brian Silverman7af8c902020-09-29 16:14:04 -0700864 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700865}
866
Brian Silverman0465fcf2020-09-24 00:29:18 -0700867void MultiNodeLogNamer::CreateBufferWriter(
Austin Schuh48d10d62022-10-16 22:19:23 -0700868 std::string_view path, size_t max_message_size,
869 std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700870 if (ran_out_of_space_) {
871 // Refuse to open any new files, which might skip data. Any existing files
872 // are in the same folder, which means they're on the same filesystem, which
873 // means they're probably going to run out of space and get stuck too.
Alexei Strots01395492023-03-20 13:59:56 -0700874 if (!(*destination)) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700875 // But avoid leaving a nullptr writer if we're out of space when
876 // attempting to open the first file.
877 *destination = std::make_unique<DetachedBufferWriter>(
878 DetachedBufferWriter::already_out_of_space_t());
879 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700880 return;
881 }
Alexei Strots01395492023-03-20 13:59:56 -0700882
883 // Let's check that we need to close and replace current driver.
884 if (*destination) {
885 // Let's close the current writer.
886 CloseWriter(destination);
887 // Are we out of space now?
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700888 if (ran_out_of_space_) {
889 *destination = std::make_unique<DetachedBufferWriter>(
890 DetachedBufferWriter::already_out_of_space_t());
891 return;
892 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700893 }
Brian Silvermancb805822020-10-06 17:43:35 -0700894
Alexei Strots01395492023-03-20 13:59:56 -0700895 const std::string filename(path);
896 *destination = std::make_unique<DetachedBufferWriter>(
897 log_backend_->RequestFile(filename), encoder_factory_(max_message_size));
898 if (!(*destination)->ran_out_of_space()) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700899 all_filenames_.emplace_back(path);
900 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700901}
902
Brian Silvermancb805822020-10-06 17:43:35 -0700903void MultiNodeLogNamer::CloseWriter(
904 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
Alexei Strots01395492023-03-20 13:59:56 -0700905 CHECK_NOTNULL(writer_pointer);
906 if (!(*writer_pointer)) {
Brian Silvermancb805822020-10-06 17:43:35 -0700907 return;
908 }
Alexei Strots01395492023-03-20 13:59:56 -0700909 DetachedBufferWriter *const writer = writer_pointer->get();
Brian Silvermancb805822020-10-06 17:43:35 -0700910 writer->Close();
911
Alexei Strots01395492023-03-20 13:59:56 -0700912 const auto *stats = writer->WriteStatistics();
913 if (stats->max_write_time() > max_write_time_) {
914 max_write_time_ = stats->max_write_time();
915 max_write_time_bytes_ = stats->max_write_time_bytes();
916 max_write_time_messages_ = stats->max_write_time_messages();
Brian Silvermancb805822020-10-06 17:43:35 -0700917 }
Alexei Strots01395492023-03-20 13:59:56 -0700918 total_write_time_ += stats->total_write_time();
919 total_write_count_ += stats->total_write_count();
920 total_write_messages_ += stats->total_write_messages();
921 total_write_bytes_ += stats->total_write_bytes();
Brian Silvermancb805822020-10-06 17:43:35 -0700922
923 if (writer->ran_out_of_space()) {
924 ran_out_of_space_ = true;
925 writer->acknowledge_out_of_space();
926 }
Brian Silvermancb805822020-10-06 17:43:35 -0700927}
928
Austin Schuhcb5601b2020-09-10 15:29:59 -0700929} // namespace logger
930} // namespace aos