blob: 6682ae3ee84edf67f76d8539a52d2ab1aaf47f7e [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070010#include "flatbuffers/flatbuffers.h"
11#include "glog/logging.h"
12
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -070013#include "aos/containers/error_list.h"
14#include "aos/containers/sized_array.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070015#include "aos/events/logging/logfile_utils.h"
16#include "aos/events/logging/logger_generated.h"
Austin Schuh73340842021-07-30 22:32:06 -070017#include "aos/flatbuffer_merge.h"
Austin Schuh4385b142021-03-14 21:31:13 -070018#include "aos/uuid.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070019
Austin Schuh8bdfc492023-02-11 12:53:13 -080020DECLARE_int32(flush_size);
21
Austin Schuhcb5601b2020-09-10 15:29:59 -070022namespace aos {
23namespace logger {
24
Austin Schuh572924a2021-07-30 22:32:12 -070025NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
Austin Schuhf5f99f32022-02-07 20:05:37 -080026 const Node *logger_node,
Austin Schuh572924a2021-07-30 22:32:12 -070027 std::function<void(NewDataWriter *)> reopen,
Austin Schuh48d10d62022-10-16 22:19:23 -070028 std::function<void(NewDataWriter *)> close,
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -070029 size_t max_message_size,
30 std::initializer_list<StoredDataType> types)
Austin Schuh572924a2021-07-30 22:32:12 -070031 : node_(node),
32 node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
Austin Schuhf5f99f32022-02-07 20:05:37 -080033 logger_node_index_(
34 configuration::GetNodeIndex(log_namer->configuration_, logger_node)),
Austin Schuh572924a2021-07-30 22:32:12 -070035 log_namer_(log_namer),
36 reopen_(std::move(reopen)),
Austin Schuh48d10d62022-10-16 22:19:23 -070037 close_(std::move(close)),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -070038 max_message_size_(max_message_size),
39 max_out_of_order_duration_(log_namer_->base_max_out_of_order_duration()) {
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -070040 allowed_data_types_.fill(false);
41
Austin Schuh72211ae2021-08-05 14:02:30 -070042 state_.resize(configuration::NodesCount(log_namer->configuration_));
43 CHECK_LT(node_index_, state_.size());
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -070044 for (StoredDataType type : types) {
45 CHECK_LT(static_cast<size_t>(type), allowed_data_types_.size());
46 allowed_data_types_[static_cast<size_t>(type)] = true;
47 }
Austin Schuh572924a2021-07-30 22:32:12 -070048}
49
50NewDataWriter::~NewDataWriter() {
51 if (writer) {
52 Close();
53 }
54}
55
56void NewDataWriter::Rotate() {
Austin Schuhe46492f2021-07-31 19:49:41 -070057 // No need to rotate if nothing has been written.
58 if (header_written_) {
Alexei Strotsbc082d82023-05-03 08:43:42 -070059 VLOG(1) << "Rotated " << name();
Austin Schuhe46492f2021-07-31 19:49:41 -070060 ++parts_index_;
Austin Schuh1124c512023-08-01 15:20:44 -070061
62 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> header =
63 MakeHeader();
64
65 if (header.span().size() > max_message_size_) {
66 max_message_size_ = header.span().size();
67 }
68
Austin Schuhe46492f2021-07-31 19:49:41 -070069 reopen_(this);
70 header_written_ = false;
Austin Schuh1124c512023-08-01 15:20:44 -070071 QueueHeader(std::move(header));
Austin Schuhe46492f2021-07-31 19:49:41 -070072 }
Austin Schuh572924a2021-07-30 22:32:12 -070073}
74
Austin Schuh5e14d842022-01-21 12:02:15 -080075void NewDataWriter::Reboot(const UUID &source_node_boot_uuid) {
Austin Schuh572924a2021-07-30 22:32:12 -070076 parts_uuid_ = UUID::Random();
77 ++parts_index_;
78 reopen_(this);
79 header_written_ = false;
Austin Schuh5e14d842022-01-21 12:02:15 -080080 for (State &state : state_) {
81 state.boot_uuid = UUID::Zero();
82 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
83 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
84 state.oldest_remote_unreliable_monotonic_timestamp =
85 monotonic_clock::max_time;
86 state.oldest_local_unreliable_monotonic_timestamp =
87 monotonic_clock::max_time;
Austin Schuhbfe6c572022-01-27 20:48:20 -080088 state.oldest_remote_reliable_monotonic_timestamp =
89 monotonic_clock::max_time;
Austin Schuhf5f99f32022-02-07 20:05:37 -080090 state.oldest_local_reliable_monotonic_timestamp = monotonic_clock::max_time;
91 state.oldest_logger_remote_unreliable_monotonic_timestamp =
92 monotonic_clock::max_time;
93 state.oldest_logger_local_unreliable_monotonic_timestamp =
Austin Schuhbfe6c572022-01-27 20:48:20 -080094 monotonic_clock::max_time;
Austin Schuh5e14d842022-01-21 12:02:15 -080095 }
96
97 state_[node_index_].boot_uuid = source_node_boot_uuid;
98
Alexei Strotsbc082d82023-05-03 08:43:42 -070099 VLOG(1) << "Rebooted " << name();
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700100 newest_message_time_ = monotonic_clock::min_time;
101 // When a node reboots, parts_uuid changes but the same writer continues to
102 // write the data, so we can reset the max out of order duration. If we don't
103 // do this, the max out of order duration can grow to an unreasonable value.
104 max_out_of_order_duration_ = log_namer_->base_max_out_of_order_duration();
Austin Schuh5e14d842022-01-21 12:02:15 -0800105}
106
107void NewDataWriter::UpdateBoot(const UUID &source_node_boot_uuid) {
108 if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
109 state_[node_index_].boot_uuid = source_node_boot_uuid;
110 if (header_written_) {
111 Reboot(source_node_boot_uuid);
112 }
113 }
Austin Schuh572924a2021-07-30 22:32:12 -0700114}
115
Austin Schuh72211ae2021-08-05 14:02:30 -0700116void NewDataWriter::UpdateRemote(
117 const size_t remote_node_index, const UUID &remote_node_boot_uuid,
118 const monotonic_clock::time_point monotonic_remote_time,
Austin Schuhf5f99f32022-02-07 20:05:37 -0800119 const monotonic_clock::time_point monotonic_event_time, const bool reliable,
120 monotonic_clock::time_point monotonic_timestamp_time) {
Austin Schuh58646e22021-08-23 23:51:46 -0700121 // Trigger rotation if anything in the header changes.
Austin Schuh72211ae2021-08-05 14:02:30 -0700122 bool rotate = false;
123 CHECK_LT(remote_node_index, state_.size());
124 State &state = state_[remote_node_index];
Austin Schuh58646e22021-08-23 23:51:46 -0700125
126 // Did the remote boot UUID change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700127 if (state.boot_uuid != remote_node_boot_uuid) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700128 VLOG(1) << name() << " Remote " << remote_node_index << " updated to "
Austin Schuh72211ae2021-08-05 14:02:30 -0700129 << remote_node_boot_uuid << " from " << state.boot_uuid;
130 state.boot_uuid = remote_node_boot_uuid;
131 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
132 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
133 state.oldest_remote_unreliable_monotonic_timestamp =
134 monotonic_clock::max_time;
135 state.oldest_local_unreliable_monotonic_timestamp =
136 monotonic_clock::max_time;
Austin Schuhbfe6c572022-01-27 20:48:20 -0800137 state.oldest_remote_reliable_monotonic_timestamp =
138 monotonic_clock::max_time;
Austin Schuhf5f99f32022-02-07 20:05:37 -0800139 state.oldest_local_reliable_monotonic_timestamp = monotonic_clock::max_time;
140 state.oldest_logger_remote_unreliable_monotonic_timestamp =
141 monotonic_clock::max_time;
142 state.oldest_logger_local_unreliable_monotonic_timestamp =
Austin Schuhbfe6c572022-01-27 20:48:20 -0800143 monotonic_clock::max_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700144 rotate = true;
145 }
146
Austin Schuh58646e22021-08-23 23:51:46 -0700147 // Did the unreliable timestamps change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700148 if (!reliable) {
149 if (state.oldest_remote_unreliable_monotonic_timestamp >
150 monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700151 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuh58646e22021-08-23 23:51:46 -0700152 << " oldest_remote_unreliable_monotonic_timestamp updated from "
153 << state.oldest_remote_unreliable_monotonic_timestamp << " to "
154 << monotonic_remote_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700155 state.oldest_remote_unreliable_monotonic_timestamp =
156 monotonic_remote_time;
157 state.oldest_local_unreliable_monotonic_timestamp = monotonic_event_time;
158 rotate = true;
159 }
Austin Schuhbfe6c572022-01-27 20:48:20 -0800160 } else {
161 if (state.oldest_remote_reliable_monotonic_timestamp >
162 monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700163 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuhbfe6c572022-01-27 20:48:20 -0800164 << " oldest_remote_reliable_monotonic_timestamp updated from "
165 << state.oldest_remote_reliable_monotonic_timestamp << " to "
166 << monotonic_remote_time;
167 state.oldest_remote_reliable_monotonic_timestamp = monotonic_remote_time;
168 state.oldest_local_reliable_monotonic_timestamp = monotonic_event_time;
169 rotate = true;
170 }
Austin Schuh72211ae2021-08-05 14:02:30 -0700171 }
172
Austin Schuhf5f99f32022-02-07 20:05:37 -0800173 // Track the logger timestamps too.
174 if (monotonic_timestamp_time != monotonic_clock::min_time) {
175 State &logger_state = state_[node_index_];
176 CHECK_EQ(remote_node_index, logger_node_index_);
177 if (monotonic_event_time <
178 logger_state.oldest_logger_remote_unreliable_monotonic_timestamp) {
179 VLOG(1)
Alexei Strotsbc082d82023-05-03 08:43:42 -0700180 << name() << " Remote " << node_index_
Austin Schuhf5f99f32022-02-07 20:05:37 -0800181 << " oldest_logger_remote_unreliable_monotonic_timestamp updated "
182 "from "
183 << logger_state.oldest_logger_remote_unreliable_monotonic_timestamp
184 << " to " << monotonic_event_time;
185 logger_state.oldest_logger_remote_unreliable_monotonic_timestamp =
186 monotonic_event_time;
187 logger_state.oldest_logger_local_unreliable_monotonic_timestamp =
188 monotonic_timestamp_time;
189
190 rotate = true;
191 }
192 }
193
Austin Schuh58646e22021-08-23 23:51:46 -0700194 // Did any of the timestamps change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700195 if (state.oldest_remote_monotonic_timestamp > monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700196 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuh58646e22021-08-23 23:51:46 -0700197 << " oldest_remote_monotonic_timestamp updated from "
198 << state.oldest_remote_monotonic_timestamp << " to "
199 << monotonic_remote_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700200 state.oldest_remote_monotonic_timestamp = monotonic_remote_time;
201 state.oldest_local_monotonic_timestamp = monotonic_event_time;
202 rotate = true;
203 }
204
205 if (rotate) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700206 Rotate();
207 }
208}
209
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700210void NewDataWriter::CopyDataMessage(
211 DataEncoder::Copier *coppier, const UUID &source_node_boot_uuid,
212 aos::monotonic_clock::time_point now,
213 aos::monotonic_clock::time_point message_time) {
214 CHECK(allowed_data_types_[static_cast<size_t>(StoredDataType::DATA)])
215 << ": Tried to write data on non-data writer.";
216 CopyMessage(coppier, source_node_boot_uuid, now, message_time);
217}
218
219void NewDataWriter::CopyTimestampMessage(
220 DataEncoder::Copier *coppier, const UUID &source_node_boot_uuid,
221 aos::monotonic_clock::time_point now,
222 aos::monotonic_clock::time_point message_time) {
223 CHECK(allowed_data_types_[static_cast<size_t>(StoredDataType::TIMESTAMPS)])
224 << ": Tried to write timestamps on non-timestamp writer.";
225 CopyMessage(coppier, source_node_boot_uuid, now, message_time);
226}
227
228void NewDataWriter::CopyRemoteTimestampMessage(
229 DataEncoder::Copier *coppier, const UUID &source_node_boot_uuid,
230 aos::monotonic_clock::time_point now,
231 aos::monotonic_clock::time_point message_time) {
232 CHECK(allowed_data_types_[static_cast<size_t>(
233 StoredDataType::REMOTE_TIMESTAMPS)])
234 << ": Tried to write remote timestamps on non-remote timestamp writer.";
235 CopyMessage(coppier, source_node_boot_uuid, now, message_time);
236}
237
Austin Schuh48d10d62022-10-16 22:19:23 -0700238void NewDataWriter::CopyMessage(DataEncoder::Copier *coppier,
239 const UUID &source_node_boot_uuid,
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700240 aos::monotonic_clock::time_point now,
241 aos::monotonic_clock::time_point message_time) {
Austin Schuh58646e22021-08-23 23:51:46 -0700242 // Trigger a reboot if we detect the boot UUID change.
Austin Schuh5e14d842022-01-21 12:02:15 -0800243 UpdateBoot(source_node_boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -0700244
Austin Schuh5e14d842022-01-21 12:02:15 -0800245 if (!header_written_) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700246 QueueHeader(MakeHeader());
Austin Schuh572924a2021-07-30 22:32:12 -0700247 }
Austin Schuh58646e22021-08-23 23:51:46 -0700248
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700249 bool max_out_of_order_duration_exceeded = false;
250 // Enforce max out of duration contract.
251 //
252 // Updates the newest message time.
253 // Rotate the part file if current message is more than
254 // max_out_of_order_duration behind the newest message we've logged so far.
255 if (message_time > newest_message_time_) {
256 newest_message_time_ = message_time;
257 }
258
259 // Don't consider messages before start up when checking for max out of order
260 // duration.
261 monotonic_clock::time_point monotonic_start_time =
262 log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid);
263
264 if (std::chrono::nanoseconds((newest_message_time_ -
265 std::max(monotonic_start_time, message_time))) >
266 max_out_of_order_duration_) {
267 // If the new message is older than 2 * max_out_order_duration, doubling it
268 // won't be sufficient.
269 //
270 // Example: newest_message_time = 10, logged_message_time = 5,
271 // max_ooo_duration = 2
272 //
273 // In this case actual max_ooo_duration = 10 - 5 = 5, but we double the
274 // existing max_ooo_duration we get 4 which is not sufficient.
275 //
276 // Take the max of the two values.
277 max_out_of_order_duration_ =
278 2 * std::max(max_out_of_order_duration_,
279 std::chrono::nanoseconds(
280 (newest_message_time_ - message_time)));
281 max_out_of_order_duration_exceeded = true;
282 }
283
Austin Schuh58646e22021-08-23 23:51:46 -0700284 // If the start time has changed for this node, trigger a rotation.
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700285 if ((monotonic_start_time != monotonic_start_time_) ||
286 max_out_of_order_duration_exceeded) {
287 // If we just received a start time now, we will rotate parts shortly. Use a
288 // reasonable max out of order durationin the new header based on start time
289 // information available now.
290 if ((monotonic_start_time_ == monotonic_clock::min_time) &&
291 (monotonic_start_time != monotonic_clock::min_time)) {
292 // If we're writing current messages but we receive an older start time,
293 // we can pick a reasonable max ooo duration number for the next part.
294 //
295 // For example - Our current max ooo duration is 0.3 seconds. We're
296 // writing messages at 20 seconds and recieve a start time of 1 second. We
297 // don't need max ooo duration to be (20 - 1) = 19 seconds although that
298 // would still work.
299 //
300 // Pick the minimum max out of duration value that satisifies the
301 // requirement but bound the minimum at the base value we started with.
302 max_out_of_order_duration_ =
303 std::max(log_namer_->base_max_out_of_order_duration(),
304 std::min(max_out_of_order_duration_,
305 std::chrono::nanoseconds(newest_message_time_ -
306 monotonic_start_time)));
307 }
Austin Schuh58646e22021-08-23 23:51:46 -0700308 CHECK(header_written_);
309 Rotate();
310 }
311
312 CHECK_EQ(log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid),
313 monotonic_start_time_);
Austin Schuh72211ae2021-08-05 14:02:30 -0700314 CHECK_EQ(state_[node_index_].boot_uuid, source_node_boot_uuid);
milind-ua50344f2021-08-25 18:22:20 -0700315 CHECK(writer);
Austin Schuh572924a2021-07-30 22:32:12 -0700316 CHECK(header_written_) << ": Attempting to write message before header to "
Alexei Strotsbc082d82023-05-03 08:43:42 -0700317 << writer->name();
Austin Schuh1124c512023-08-01 15:20:44 -0700318 CHECK_LE(coppier->size(), max_message_size_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700319 writer->CopyMessage(coppier, now);
Austin Schuh572924a2021-07-30 22:32:12 -0700320}
321
Austin Schuhe46492f2021-07-31 19:49:41 -0700322aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
323NewDataWriter::MakeHeader() {
324 const size_t logger_node_index = log_namer_->logger_node_index();
325 const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
Austin Schuh72211ae2021-08-05 14:02:30 -0700326 if (state_[logger_node_index].boot_uuid == UUID::Zero()) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700327 VLOG(1) << name() << " Logger node is " << logger_node_index
Austin Schuhe46492f2021-07-31 19:49:41 -0700328 << " and uuid is " << logger_node_boot_uuid;
Austin Schuh72211ae2021-08-05 14:02:30 -0700329 state_[logger_node_index].boot_uuid = logger_node_boot_uuid;
Austin Schuhe46492f2021-07-31 19:49:41 -0700330 } else {
Austin Schuh72211ae2021-08-05 14:02:30 -0700331 CHECK_EQ(state_[logger_node_index].boot_uuid, logger_node_boot_uuid);
Austin Schuhe46492f2021-07-31 19:49:41 -0700332 }
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700333 return log_namer_->MakeHeader(node_index_, state_, parts_uuid(), parts_index_,
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700334 max_out_of_order_duration_,
335 allowed_data_types_);
Austin Schuhe46492f2021-07-31 19:49:41 -0700336}
337
Austin Schuh572924a2021-07-30 22:32:12 -0700338void NewDataWriter::QueueHeader(
339 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header) {
340 CHECK(!header_written_) << ": Attempting to write duplicate header to "
Alexei Strotsbc082d82023-05-03 08:43:42 -0700341 << writer->name();
Austin Schuh572924a2021-07-30 22:32:12 -0700342 CHECK(header.message().has_source_node_boot_uuid());
Austin Schuh72211ae2021-08-05 14:02:30 -0700343 CHECK_EQ(state_[node_index_].boot_uuid,
Austin Schuhe46492f2021-07-31 19:49:41 -0700344 UUID::FromString(header.message().source_node_boot_uuid()));
Austin Schuh510dc622021-08-06 18:47:30 -0700345 if (!writer) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700346 // Since we haven't opened the first time, it's still not too late to update
347 // the max message size. Make sure the header fits.
348 //
349 // This won't work well on reboots, but the structure of the header is fixed
350 // by that point in time, so it's size is fixed too.
351 //
352 // Most of the time, the minimum buffer size inside the encoder of around
353 // 128k will make this a non-issue.
354 UpdateMaxMessageSize(header.span().size());
355
Austin Schuh510dc622021-08-06 18:47:30 -0700356 reopen_(this);
357 }
358
Alexei Strotsbc082d82023-05-03 08:43:42 -0700359 VLOG(1) << "Writing to " << name() << " "
Austin Schuh58646e22021-08-23 23:51:46 -0700360 << aos::FlatbufferToJson(
361 header, {.multi_line = false, .max_vector_size = 100});
362
Austin Schuh510dc622021-08-06 18:47:30 -0700363 CHECK(writer);
Austin Schuh7ef11a42023-02-04 17:15:12 -0800364 DataEncoder::SpanCopier coppier(header.span());
Austin Schuh1124c512023-08-01 15:20:44 -0700365 CHECK_LE(coppier.size(), max_message_size_);
Austin Schuh7ef11a42023-02-04 17:15:12 -0800366 writer->CopyMessage(&coppier, aos::monotonic_clock::now());
Austin Schuh572924a2021-07-30 22:32:12 -0700367 header_written_ = true;
Austin Schuh58646e22021-08-23 23:51:46 -0700368 monotonic_start_time_ = log_namer_->monotonic_start_time(
369 node_index_, state_[node_index_].boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -0700370}
371
372void NewDataWriter::Close() {
373 CHECK(writer);
374 close_(this);
375 writer.reset();
376 header_written_ = false;
377}
378
Austin Schuh58646e22021-08-23 23:51:46 -0700379LogNamer::NodeState *LogNamer::GetNodeState(size_t node_index,
380 const UUID &boot_uuid) {
381 auto it = node_states_.find(std::make_pair(node_index, boot_uuid));
382 if (it == node_states_.end()) {
383 it =
384 node_states_.emplace(std::make_pair(node_index, boot_uuid), NodeState())
385 .first;
386 }
387 return &it->second;
388}
389
Austin Schuh73340842021-07-30 22:32:06 -0700390aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
Austin Schuh72211ae2021-08-05 14:02:30 -0700391 size_t node_index, const std::vector<NewDataWriter::State> &state,
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700392 const UUID &parts_uuid, int parts_index,
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700393 std::chrono::nanoseconds max_out_of_order_duration,
394 const std::array<bool, static_cast<size_t>(StoredDataType::MAX) + 1>
395 &allowed_data_types) {
Austin Schuh72211ae2021-08-05 14:02:30 -0700396 const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
Austin Schuh73340842021-07-30 22:32:06 -0700397 const Node *const source_node =
398 configuration::GetNode(configuration_, node_index);
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700399 CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 35u);
Austin Schuh73340842021-07-30 22:32:06 -0700400 flatbuffers::FlatBufferBuilder fbb;
401 fbb.ForceDefaults(true);
402
403 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
404 flatbuffers::Offset<aos::Configuration> configuration_offset;
405 if (header_.message().has_configuration()) {
406 CHECK(!header_.message().has_configuration_sha256());
407 configuration_offset =
408 CopyFlatBuffer(header_.message().configuration(), &fbb);
409 } else {
410 CHECK(!header_.message().has_configuration());
411 CHECK(header_.message().has_configuration_sha256());
412 config_sha256_offset = fbb.CreateString(
413 header_.message().configuration_sha256()->string_view());
414 }
415
416 CHECK(header_.message().has_name());
417 const flatbuffers::Offset<flatbuffers::String> name_offset =
418 fbb.CreateString(header_.message().name()->string_view());
Austin Schuhfa712682022-05-11 16:43:42 -0700419 const flatbuffers::Offset<flatbuffers::String> logger_sha1_offset =
420 header_.message().has_logger_sha1()
421 ? fbb.CreateString(header_.message().logger_sha1()->string_view())
422 : 0;
423 const flatbuffers::Offset<flatbuffers::String> logger_version_offset =
424 header_.message().has_logger_version()
425 ? fbb.CreateString(header_.message().logger_version()->string_view())
426 : 0;
Austin Schuh73340842021-07-30 22:32:06 -0700427
428 CHECK(header_.message().has_log_event_uuid());
429 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
430 fbb.CreateString(header_.message().log_event_uuid()->string_view());
431
432 CHECK(header_.message().has_logger_instance_uuid());
433 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
434 fbb.CreateString(header_.message().logger_instance_uuid()->string_view());
435
436 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
437 if (header_.message().has_log_start_uuid()) {
438 log_start_uuid_offset =
439 fbb.CreateString(header_.message().log_start_uuid()->string_view());
440 }
441
442 CHECK(header_.message().has_logger_node_boot_uuid());
443 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
444 fbb.CreateString(
445 header_.message().logger_node_boot_uuid()->string_view());
446
447 CHECK_NE(source_node_boot_uuid, UUID::Zero());
448 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
449 source_node_boot_uuid.PackString(&fbb);
450
451 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
452 parts_uuid.PackString(&fbb);
453
454 flatbuffers::Offset<Node> node_offset;
455 flatbuffers::Offset<Node> logger_node_offset;
456
457 if (configuration::MultiNode(configuration_)) {
458 node_offset = RecursiveCopyFlatBuffer(source_node, &fbb);
459 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
460 }
461
Austin Schuhe46492f2021-07-31 19:49:41 -0700462 std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
Austin Schuh72211ae2021-08-05 14:02:30 -0700463 boot_uuid_offsets.reserve(state.size());
Austin Schuhe46492f2021-07-31 19:49:41 -0700464
Austin Schuh4db9ec92021-09-22 13:11:12 -0700465 int64_t *unused;
Austin Schuh72211ae2021-08-05 14:02:30 -0700466 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
Austin Schuhf5f99f32022-02-07 20:05:37 -0800467 oldest_remote_monotonic_timestamps_offset =
468 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700469
Austin Schuh72211ae2021-08-05 14:02:30 -0700470 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
Austin Schuhf5f99f32022-02-07 20:05:37 -0800471 oldest_local_monotonic_timestamps_offset =
472 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700473
Austin Schuh72211ae2021-08-05 14:02:30 -0700474 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
475 oldest_remote_unreliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800476 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700477
Austin Schuh72211ae2021-08-05 14:02:30 -0700478 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
479 oldest_local_unreliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800480 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700481
Austin Schuhbfe6c572022-01-27 20:48:20 -0800482 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
483 oldest_remote_reliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800484 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800485
486 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
487 oldest_local_reliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800488 fbb.CreateUninitializedVector(state.size(), &unused);
489
490 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
491 oldest_logger_remote_unreliable_monotonic_timestamps_offset =
492 fbb.CreateUninitializedVector(state.size(), &unused);
493
494 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
495 oldest_logger_local_unreliable_monotonic_timestamps_offset =
496 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800497
Austin Schuh72211ae2021-08-05 14:02:30 -0700498 for (size_t i = 0; i < state.size(); ++i) {
Austin Schuh4db9ec92021-09-22 13:11:12 -0700499 if (state[i].boot_uuid != UUID::Zero()) {
500 boot_uuid_offsets.emplace_back(state[i].boot_uuid.PackString(&fbb));
501 } else {
502 boot_uuid_offsets.emplace_back(fbb.CreateString(""));
503 }
Austin Schuh5ae8f4a2021-09-11 19:09:50 -0700504 if (state[i].boot_uuid == UUID::Zero()) {
505 CHECK_EQ(state[i].oldest_remote_monotonic_timestamp,
506 monotonic_clock::max_time);
507 CHECK_EQ(state[i].oldest_local_monotonic_timestamp,
508 monotonic_clock::max_time);
509 CHECK_EQ(state[i].oldest_remote_unreliable_monotonic_timestamp,
510 monotonic_clock::max_time);
511 CHECK_EQ(state[i].oldest_local_unreliable_monotonic_timestamp,
512 monotonic_clock::max_time);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800513 CHECK_EQ(state[i].oldest_remote_reliable_monotonic_timestamp,
514 monotonic_clock::max_time);
515 CHECK_EQ(state[i].oldest_local_reliable_monotonic_timestamp,
516 monotonic_clock::max_time);
Austin Schuhf5f99f32022-02-07 20:05:37 -0800517 CHECK_EQ(state[i].oldest_logger_remote_unreliable_monotonic_timestamp,
518 monotonic_clock::max_time);
519 CHECK_EQ(state[i].oldest_logger_local_unreliable_monotonic_timestamp,
520 monotonic_clock::max_time);
Austin Schuh5ae8f4a2021-09-11 19:09:50 -0700521 }
522
Austin Schuh4db9ec92021-09-22 13:11:12 -0700523 flatbuffers::GetMutableTemporaryPointer(
524 fbb, oldest_remote_monotonic_timestamps_offset)
525 ->Mutate(i, state[i]
526 .oldest_remote_monotonic_timestamp.time_since_epoch()
527 .count());
528 flatbuffers::GetMutableTemporaryPointer(
529 fbb, oldest_local_monotonic_timestamps_offset)
530 ->Mutate(i, state[i]
531 .oldest_local_monotonic_timestamp.time_since_epoch()
532 .count());
533 flatbuffers::GetMutableTemporaryPointer(
534 fbb, oldest_remote_unreliable_monotonic_timestamps_offset)
535 ->Mutate(i, state[i]
Austin Schuhbfe6c572022-01-27 20:48:20 -0800536 .oldest_remote_unreliable_monotonic_timestamp
537 .time_since_epoch()
Austin Schuh4db9ec92021-09-22 13:11:12 -0700538 .count());
539 flatbuffers::GetMutableTemporaryPointer(
540 fbb, oldest_local_unreliable_monotonic_timestamps_offset)
541 ->Mutate(i, state[i]
Austin Schuhbfe6c572022-01-27 20:48:20 -0800542 .oldest_local_unreliable_monotonic_timestamp
543 .time_since_epoch()
Austin Schuh4db9ec92021-09-22 13:11:12 -0700544 .count());
Austin Schuhbfe6c572022-01-27 20:48:20 -0800545
546 flatbuffers::GetMutableTemporaryPointer(
547 fbb, oldest_remote_reliable_monotonic_timestamps_offset)
548 ->Mutate(i, state[i]
549 .oldest_remote_reliable_monotonic_timestamp
550 .time_since_epoch()
551 .count());
552 flatbuffers::GetMutableTemporaryPointer(
553 fbb, oldest_local_reliable_monotonic_timestamps_offset)
554 ->Mutate(
555 i, state[i]
556 .oldest_local_reliable_monotonic_timestamp.time_since_epoch()
557 .count());
Austin Schuhf5f99f32022-02-07 20:05:37 -0800558
559 flatbuffers::GetMutableTemporaryPointer(
560 fbb, oldest_logger_remote_unreliable_monotonic_timestamps_offset)
561 ->Mutate(i, state[i]
562 .oldest_logger_remote_unreliable_monotonic_timestamp
563 .time_since_epoch()
564 .count());
565 flatbuffers::GetMutableTemporaryPointer(
566 fbb, oldest_logger_local_unreliable_monotonic_timestamps_offset)
567 ->Mutate(i, state[i]
568 .oldest_logger_local_unreliable_monotonic_timestamp
569 .time_since_epoch()
570 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700571 }
572
Austin Schuh4db9ec92021-09-22 13:11:12 -0700573 flatbuffers::Offset<
574 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
575 boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
576
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700577 aos::ErrorList<StoredDataType> allowed_data_types_vector;
578 for (size_t type = static_cast<size_t>(StoredDataType::MIN);
579 type <= static_cast<size_t>(StoredDataType::MAX); ++type) {
580 if (allowed_data_types[type]) {
581 allowed_data_types_vector.Set(static_cast<StoredDataType>(type));
582 }
583 }
584
585 flatbuffers::Offset<flatbuffers::Vector<StoredDataType>> data_stored_offset =
586 fbb.CreateVector(allowed_data_types_vector.data(),
587 allowed_data_types_vector.size());
588
Austin Schuh73340842021-07-30 22:32:06 -0700589 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
590
591 log_file_header_builder.add_name(name_offset);
Austin Schuhfa712682022-05-11 16:43:42 -0700592 if (!logger_sha1_offset.IsNull()) {
593 log_file_header_builder.add_logger_sha1(logger_sha1_offset);
594 }
595 if (!logger_version_offset.IsNull()) {
596 log_file_header_builder.add_logger_version(logger_version_offset);
597 }
Austin Schuh73340842021-07-30 22:32:06 -0700598
599 // Only add the node if we are running in a multinode configuration.
600 if (!logger_node_offset.IsNull()) {
601 log_file_header_builder.add_node(node_offset);
602 log_file_header_builder.add_logger_node(logger_node_offset);
603 }
604
605 if (!configuration_offset.IsNull()) {
606 log_file_header_builder.add_configuration(configuration_offset);
607 }
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700608
Austin Schuh73340842021-07-30 22:32:06 -0700609 log_file_header_builder.add_max_out_of_order_duration(
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700610 max_out_of_order_duration.count());
Austin Schuh73340842021-07-30 22:32:06 -0700611
Austin Schuh58646e22021-08-23 23:51:46 -0700612 NodeState *node_state = GetNodeState(node_index, source_node_boot_uuid);
Austin Schuh73340842021-07-30 22:32:06 -0700613 log_file_header_builder.add_monotonic_start_time(
614 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700615 node_state->monotonic_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700616 .count());
617 if (source_node == node_) {
618 log_file_header_builder.add_realtime_start_time(
619 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700620 node_state->realtime_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700621 .count());
622 } else {
623 // Fill out the legacy start times. Since these were implemented to never
624 // change on reboot, they aren't very helpful in tracking what happened.
625 log_file_header_builder.add_logger_monotonic_start_time(
626 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700627 node_state->logger_monotonic_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700628 .count());
629 log_file_header_builder.add_logger_realtime_start_time(
630 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700631 node_state->logger_realtime_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700632 .count());
633 }
634
635 // TODO(austin): Add more useful times. When was this part started? What do
636 // we know about both the logger and remote then?
637
638 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
639 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
640 if (!log_start_uuid_offset.IsNull()) {
641 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
642 }
643 log_file_header_builder.add_logger_node_boot_uuid(
644 logger_node_boot_uuid_offset);
645 log_file_header_builder.add_source_node_boot_uuid(
646 source_node_boot_uuid_offset);
647
648 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
649 log_file_header_builder.add_parts_index(parts_index);
650
651 if (!config_sha256_offset.IsNull()) {
652 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
653 }
654
Austin Schuhe46492f2021-07-31 19:49:41 -0700655 log_file_header_builder.add_boot_uuids(boot_uuids_offset);
Austin Schuha499cea2021-07-31 19:49:53 -0700656 log_file_header_builder.add_logger_part_monotonic_start_time(
657 std::chrono::duration_cast<std::chrono::nanoseconds>(
658 event_loop_->monotonic_now().time_since_epoch())
659 .count());
660 log_file_header_builder.add_logger_part_realtime_start_time(
661 std::chrono::duration_cast<std::chrono::nanoseconds>(
662 event_loop_->realtime_now().time_since_epoch())
663 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700664 log_file_header_builder.add_oldest_remote_monotonic_timestamps(
665 oldest_remote_monotonic_timestamps_offset);
666 log_file_header_builder.add_oldest_local_monotonic_timestamps(
667 oldest_local_monotonic_timestamps_offset);
668 log_file_header_builder.add_oldest_remote_unreliable_monotonic_timestamps(
669 oldest_remote_unreliable_monotonic_timestamps_offset);
670 log_file_header_builder.add_oldest_local_unreliable_monotonic_timestamps(
671 oldest_local_unreliable_monotonic_timestamps_offset);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800672 log_file_header_builder.add_oldest_remote_reliable_monotonic_timestamps(
673 oldest_remote_reliable_monotonic_timestamps_offset);
674 log_file_header_builder.add_oldest_local_reliable_monotonic_timestamps(
675 oldest_local_reliable_monotonic_timestamps_offset);
Austin Schuhf5f99f32022-02-07 20:05:37 -0800676 log_file_header_builder
677 .add_oldest_logger_remote_unreliable_monotonic_timestamps(
678 oldest_logger_remote_unreliable_monotonic_timestamps_offset);
679 log_file_header_builder
680 .add_oldest_logger_local_unreliable_monotonic_timestamps(
681 oldest_logger_local_unreliable_monotonic_timestamps_offset);
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700682
683 log_file_header_builder.add_data_stored(data_stored_offset);
Austin Schuh73340842021-07-30 22:32:06 -0700684 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
685 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
686 fbb.Release());
687
688 CHECK(result.Verify()) << ": Built a corrupted header.";
689
690 return result;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700691}
692
Alexei Strotsbc082d82023-05-03 08:43:42 -0700693MultiNodeLogNamer::MultiNodeLogNamer(std::unique_ptr<LogBackend> log_backend,
694 EventLoop *event_loop)
Alexei Strots01395492023-03-20 13:59:56 -0700695 : MultiNodeLogNamer(std::move(log_backend), event_loop->configuration(),
696 event_loop, event_loop->node()) {}
Austin Schuh5b728b72021-06-16 14:57:15 -0700697
Alexei Strotsbc082d82023-05-03 08:43:42 -0700698MultiNodeLogNamer::MultiNodeLogNamer(std::unique_ptr<LogBackend> log_backend,
699 const Configuration *configuration,
700 EventLoop *event_loop, const Node *node)
Austin Schuh5b728b72021-06-16 14:57:15 -0700701 : LogNamer(configuration, event_loop, node),
Alexei Strots01395492023-03-20 13:59:56 -0700702 log_backend_(std::move(log_backend)),
Austin Schuh8bdfc492023-02-11 12:53:13 -0800703 encoder_factory_([](size_t max_message_size) {
704 // TODO(austin): For slow channels, can we allocate less memory?
705 return std::make_unique<DummyEncoder>(max_message_size,
706 FLAGS_flush_size);
707 }) {}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700708
Brian Silverman48deab12020-09-30 18:39:28 -0700709MultiNodeLogNamer::~MultiNodeLogNamer() {
710 if (!ran_out_of_space_) {
711 // This handles renaming temporary files etc.
712 Close();
713 }
714}
715
Austin Schuh572924a2021-07-30 22:32:12 -0700716void MultiNodeLogNamer::Rotate(const Node *node) {
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700717 for (auto &data_map : {&node_data_writers_, &node_timestamp_writers_}) {
718 auto it = data_map->find(node);
719 if (it != data_map->end()) {
720 it->second.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700721 }
722 }
723}
724
Austin Schuh8c399962020-12-25 21:51:45 -0800725void MultiNodeLogNamer::WriteConfiguration(
726 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
727 std::string_view config_sha256) {
728 if (ran_out_of_space_) {
729 return;
730 }
731
Alexei Strots01395492023-03-20 13:59:56 -0700732 const std::string filename = absl::StrCat(config_sha256, ".bfbs", extension_);
733 auto file_handle = log_backend_->RequestFile(filename);
Austin Schuh8c399962020-12-25 21:51:45 -0800734 std::unique_ptr<DetachedBufferWriter> writer =
Austin Schuh48d10d62022-10-16 22:19:23 -0700735 std::make_unique<DetachedBufferWriter>(
Alexei Strots01395492023-03-20 13:59:56 -0700736 std::move(file_handle), encoder_factory_(header->span().size()));
Austin Schuh8c399962020-12-25 21:51:45 -0800737
Austin Schuh7ef11a42023-02-04 17:15:12 -0800738 DataEncoder::SpanCopier coppier(header->span());
739 writer->CopyMessage(&coppier, aos::monotonic_clock::now());
Austin Schuh8c399962020-12-25 21:51:45 -0800740
741 if (!writer->ran_out_of_space()) {
Alexei Strots01395492023-03-20 13:59:56 -0700742 all_filenames_.emplace_back(filename);
Austin Schuh8c399962020-12-25 21:51:45 -0800743 }
Alexei Strots01395492023-03-20 13:59:56 -0700744 // Close the file and maybe rename it too.
Austin Schuh8c399962020-12-25 21:51:45 -0800745 CloseWriter(&writer);
746}
747
Austin Schuh6ecfe902023-08-04 22:44:37 -0700748void MultiNodeLogNamer::NoticeNode(const Node *source_node) {
749 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
750 nodes_.emplace_back(source_node);
751 }
752}
753
754NewDataWriter *MultiNodeLogNamer::FindNodeDataWriter(const Node *source_node,
755 size_t max_message_size) {
756 NoticeNode(source_node);
757
758 auto it = node_data_writers_.find(source_node);
759 if (it != node_data_writers_.end()) {
760 it->second.UpdateMaxMessageSize(max_message_size);
761 return &(it->second);
762 }
763 return nullptr;
764}
765
766NewDataWriter *MultiNodeLogNamer::FindNodeTimestampWriter(
767 const Node *source_node, size_t max_message_size) {
768 NoticeNode(source_node);
769
770 auto it = node_timestamp_writers_.find(source_node);
771 if (it != node_timestamp_writers_.end()) {
772 it->second.UpdateMaxMessageSize(max_message_size);
773 return &(it->second);
774 }
775 return nullptr;
776}
777
778NewDataWriter *MultiNodeLogNamer::AddNodeDataWriter(const Node *source_node,
779 NewDataWriter &&writer) {
780 auto result = node_data_writers_.emplace(source_node, std::move(writer));
781 CHECK(result.second);
782 return &(result.first->second);
783}
784
785NewDataWriter *MultiNodeLogNamer::AddNodeTimestampWriter(
786 const Node *source_node, NewDataWriter &&writer) {
787 auto result = node_timestamp_writers_.emplace(source_node, std::move(writer));
788 CHECK(result.second);
789 return &(result.first->second);
790}
791
Austin Schuhb8bca732021-07-30 22:32:00 -0700792NewDataWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700793 // See if we can read the data on this node at all.
794 const bool is_readable =
795 configuration::ChannelIsReadableOnNode(channel, this->node());
796 if (!is_readable) {
797 return nullptr;
798 }
799
800 // Then, see if we are supposed to log the data here.
801 const bool log_message =
802 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
803
804 if (!log_message) {
805 return nullptr;
806 }
807
Austin Schuhcb5601b2020-09-10 15:29:59 -0700808 // Ok, we have data that is being forwarded to us that we are supposed to
809 // log. It needs to be logged with send timestamps, but be sorted enough
810 // to be able to be processed.
Austin Schuhcb5601b2020-09-10 15:29:59 -0700811
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700812 const Node *source_node =
813 configuration::MultiNode(configuration_)
814 ? configuration::GetNode(configuration_,
815 channel->source_node()->string_view())
816 : nullptr;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700817
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700818 // If we already have a data writer for the node, then use the same writer for
819 // all channels of that node.
Austin Schuh6ecfe902023-08-04 22:44:37 -0700820 NewDataWriter *result = FindNodeDataWriter(
821 source_node,
822 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
823 if (result != nullptr) {
824 return result;
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700825 }
826
827 // If we don't have a data writer for the node, create one.
Austin Schuh6ecfe902023-08-04 22:44:37 -0700828 return AddNodeDataWriter(
829 source_node,
830 NewDataWriter{
831 this,
832 source_node,
833 node_,
834 [this, source_node](NewDataWriter *data_writer) {
835 OpenDataWriter(source_node, data_writer);
836 },
837 [this](NewDataWriter *data_writer) {
838 CloseWriter(&data_writer->writer);
839 },
840 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()),
841 {StoredDataType::DATA}});
Austin Schuhcb5601b2020-09-10 15:29:59 -0700842}
843
Austin Schuhb8bca732021-07-30 22:32:00 -0700844NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700845 const Channel *channel, const Node *node) {
846 // See if we can read the data on this node at all.
847 const bool is_readable =
848 configuration::ChannelIsReadableOnNode(channel, this->node());
849 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
850
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700851 CHECK_NE(node, this->node());
852
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700853 // If we have a remote timestamp writer for a particular node, use the same
854 // writer for all remote timestamp channels of that node.
Austin Schuh6ecfe902023-08-04 22:44:37 -0700855 NewDataWriter *result =
856 FindNodeTimestampWriter(node, PackRemoteMessageSize());
857 if (result != nullptr) {
858 return result;
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700859 }
860
861 // If there are no remote timestamp writers for the node, create one.
Austin Schuh6ecfe902023-08-04 22:44:37 -0700862 return AddNodeTimestampWriter(
863 node, NewDataWriter{this,
864 configuration::GetNode(configuration_, node),
865 node_,
866 [this](NewDataWriter *data_writer) {
867 OpenForwardedTimestampWriter(node_, data_writer);
868 },
869 [this](NewDataWriter *data_writer) {
870 CloseWriter(&data_writer->writer);
871 },
872 PackRemoteMessageSize(),
873 {StoredDataType::REMOTE_TIMESTAMPS}});
Austin Schuhcb5601b2020-09-10 15:29:59 -0700874}
875
Austin Schuhb8bca732021-07-30 22:32:00 -0700876NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700877 bool log_delivery_times = false;
878 if (this->node() != nullptr) {
879 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
880 channel, this->node(), this->node());
881 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700882 if (!log_delivery_times) {
883 return nullptr;
884 }
885
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700886 // There is only one of these.
Austin Schuh6ecfe902023-08-04 22:44:37 -0700887 NewDataWriter *result = FindNodeTimestampWriter(
888 this->node(), PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
889 if (result != nullptr) {
890 return result;
Brian Silvermancb805822020-10-06 17:43:35 -0700891 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700892
Austin Schuh6ecfe902023-08-04 22:44:37 -0700893 return AddNodeTimestampWriter(
894 node_, NewDataWriter{this,
895 node_,
896 node_,
897 [this](NewDataWriter *data_writer) {
898 OpenTimestampWriter(data_writer);
899 },
900 [this](NewDataWriter *data_writer) {
901 CloseWriter(&data_writer->writer);
902 },
903 PackMessageSize(LogType::kLogDeliveryTimeOnly, 0),
904 {StoredDataType::TIMESTAMPS}});
Austin Schuhcb5601b2020-09-10 15:29:59 -0700905}
906
Austin Schuh08dba8f2023-05-01 08:29:30 -0700907WriteCode MultiNodeLogNamer::Close() {
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700908 node_data_writers_.clear();
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700909 node_timestamp_writers_.clear();
Austin Schuh08dba8f2023-05-01 08:29:30 -0700910 if (ran_out_of_space_) {
911 return WriteCode::kOutOfSpace;
912 }
913 return WriteCode::kOk;
Brian Silvermancb805822020-10-06 17:43:35 -0700914}
915
916void MultiNodeLogNamer::ResetStatistics() {
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700917 for (std::pair<const Node *const, NewDataWriter> &data_writer :
918 node_data_writers_) {
Austin Schuhad0cfc32020-12-21 12:34:26 -0800919 if (!data_writer.second.writer) continue;
Alexei Strots01395492023-03-20 13:59:56 -0700920 data_writer.second.writer->WriteStatistics()->ResetStats();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700921 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700922 for (std::pair<const Node *const, NewDataWriter> &data_writer :
923 node_timestamp_writers_) {
924 if (!data_writer.second.writer) continue;
925 data_writer.second.writer->WriteStatistics()->ResetStats();
Brian Silvermancb805822020-10-06 17:43:35 -0700926 }
927 max_write_time_ = std::chrono::nanoseconds::zero();
928 max_write_time_bytes_ = -1;
929 max_write_time_messages_ = -1;
930 total_write_time_ = std::chrono::nanoseconds::zero();
931 total_write_count_ = 0;
932 total_write_messages_ = 0;
933 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700934}
935
Austin Schuhb8bca732021-07-30 22:32:00 -0700936void MultiNodeLogNamer::OpenForwardedTimestampWriter(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700937 const Node * /*source_node*/, NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700938 const std::string filename = absl::StrCat(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700939 "timestamps/remote_", data_writer->node()->name()->string_view(), ".part",
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700940 data_writer->parts_index(), ".bfbs", extension_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700941 CreateBufferWriter(filename, data_writer->max_message_size(),
942 &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700943}
944
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700945void MultiNodeLogNamer::OpenDataWriter(const Node *source_node,
946 NewDataWriter *data_writer) {
947 std::string filename;
948
949 if (source_node != nullptr) {
950 if (source_node == node_) {
951 filename = absl::StrCat(source_node->name()->string_view(), "_");
952 } else {
953 filename = absl::StrCat("data/", source_node->name()->string_view(), "_");
954 }
Brian Silverman7af8c902020-09-29 16:14:04 -0700955 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700956
957 absl::StrAppend(&filename, "data.part", data_writer->parts_index(), ".bfbs",
958 extension_);
959 CreateBufferWriter(filename, data_writer->max_message_size(),
960 &data_writer->writer);
961}
962
963void MultiNodeLogNamer::OpenTimestampWriter(NewDataWriter *data_writer) {
964 std::string filename =
965 absl::StrCat(node()->name()->string_view(), "_timestamps.part",
966 data_writer->parts_index(), ".bfbs", extension_);
967 CreateBufferWriter(filename, data_writer->max_message_size(),
968 &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700969}
970
Brian Silverman0465fcf2020-09-24 00:29:18 -0700971void MultiNodeLogNamer::CreateBufferWriter(
Austin Schuh48d10d62022-10-16 22:19:23 -0700972 std::string_view path, size_t max_message_size,
973 std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700974 if (ran_out_of_space_) {
975 // Refuse to open any new files, which might skip data. Any existing files
976 // are in the same folder, which means they're on the same filesystem, which
977 // means they're probably going to run out of space and get stuck too.
Alexei Strots01395492023-03-20 13:59:56 -0700978 if (!(*destination)) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700979 // But avoid leaving a nullptr writer if we're out of space when
980 // attempting to open the first file.
981 *destination = std::make_unique<DetachedBufferWriter>(
982 DetachedBufferWriter::already_out_of_space_t());
983 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700984 return;
985 }
Alexei Strots01395492023-03-20 13:59:56 -0700986
987 // Let's check that we need to close and replace current driver.
988 if (*destination) {
989 // Let's close the current writer.
990 CloseWriter(destination);
991 // Are we out of space now?
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700992 if (ran_out_of_space_) {
993 *destination = std::make_unique<DetachedBufferWriter>(
994 DetachedBufferWriter::already_out_of_space_t());
995 return;
996 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700997 }
Brian Silvermancb805822020-10-06 17:43:35 -0700998
Alexei Strots01395492023-03-20 13:59:56 -0700999 const std::string filename(path);
1000 *destination = std::make_unique<DetachedBufferWriter>(
1001 log_backend_->RequestFile(filename), encoder_factory_(max_message_size));
1002 if (!(*destination)->ran_out_of_space()) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -07001003 all_filenames_.emplace_back(path);
1004 }
Brian Silverman0465fcf2020-09-24 00:29:18 -07001005}
1006
Brian Silvermancb805822020-10-06 17:43:35 -07001007void MultiNodeLogNamer::CloseWriter(
1008 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
Alexei Strots01395492023-03-20 13:59:56 -07001009 CHECK_NOTNULL(writer_pointer);
1010 if (!(*writer_pointer)) {
Brian Silvermancb805822020-10-06 17:43:35 -07001011 return;
1012 }
Alexei Strots01395492023-03-20 13:59:56 -07001013 DetachedBufferWriter *const writer = writer_pointer->get();
Brian Silvermancb805822020-10-06 17:43:35 -07001014 writer->Close();
1015
Alexei Strots01395492023-03-20 13:59:56 -07001016 const auto *stats = writer->WriteStatistics();
1017 if (stats->max_write_time() > max_write_time_) {
1018 max_write_time_ = stats->max_write_time();
1019 max_write_time_bytes_ = stats->max_write_time_bytes();
1020 max_write_time_messages_ = stats->max_write_time_messages();
Brian Silvermancb805822020-10-06 17:43:35 -07001021 }
Alexei Strots01395492023-03-20 13:59:56 -07001022 total_write_time_ += stats->total_write_time();
1023 total_write_count_ += stats->total_write_count();
1024 total_write_messages_ += stats->total_write_messages();
1025 total_write_bytes_ += stats->total_write_bytes();
Brian Silvermancb805822020-10-06 17:43:35 -07001026
1027 if (writer->ran_out_of_space()) {
1028 ran_out_of_space_ = true;
1029 writer->acknowledge_out_of_space();
1030 }
Brian Silvermancb805822020-10-06 17:43:35 -07001031}
1032
Austin Schuh6ecfe902023-08-04 22:44:37 -07001033NewDataWriter *MinimalFileMultiNodeLogNamer::MakeWriter(
1034 const Channel *channel) {
1035 // See if we can read the data on this node at all.
1036 const bool is_readable =
1037 configuration::ChannelIsReadableOnNode(channel, this->node());
1038 if (!is_readable) {
1039 return nullptr;
1040 }
1041
1042 // Then, see if we are supposed to log the data here.
1043 const bool log_message =
1044 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
1045
1046 if (!log_message) {
1047 return nullptr;
1048 }
1049
1050 // Ok, we have data that is being forwarded to us that we are supposed to
1051 // log. It needs to be logged with send timestamps, but be sorted enough
1052 // to be able to be processed.
1053
1054 const Node *source_node =
1055 configuration::MultiNode(configuration_)
1056 ? configuration::GetNode(configuration_,
1057 channel->source_node()->string_view())
1058 : nullptr;
1059
1060 // If we don't have a data writer for the node, create one.
1061 if (this->node() == source_node) {
1062 // If we already have a data writer for the node, then use the same writer
1063 // for all channels of that node.
1064 NewDataWriter *result = FindNodeDataWriter(
1065 source_node,
1066 PackMessageSize(LogType::kLogMessage, channel->max_size()));
1067 if (result != nullptr) {
1068 return result;
1069 }
1070
1071 return AddNodeDataWriter(
1072 source_node,
1073 NewDataWriter{
1074 this,
1075 source_node,
1076 node_,
1077 [this, source_node](NewDataWriter *data_writer) {
1078 OpenNodeWriter(source_node, data_writer);
1079 },
1080 [this](NewDataWriter *data_writer) {
1081 CloseWriter(&data_writer->writer);
1082 },
1083 PackMessageSize(LogType::kLogMessage, channel->max_size()),
1084 {StoredDataType::DATA, StoredDataType::TIMESTAMPS}});
1085 } else {
1086 // If we already have a data writer for the node, then use the same writer
1087 // for all channels of that node.
1088 NewDataWriter *result = FindNodeDataWriter(
1089 source_node,
1090 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
1091 if (result != nullptr) {
1092 return result;
1093 }
1094
1095 return AddNodeDataWriter(
1096 source_node,
1097 NewDataWriter{
1098 this,
1099 source_node,
1100 node_,
1101 [this, source_node](NewDataWriter *data_writer) {
1102 OpenNodeWriter(source_node, data_writer);
1103 },
1104 [this](NewDataWriter *data_writer) {
1105 CloseWriter(&data_writer->writer);
1106 },
1107 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()),
1108 {StoredDataType::DATA, StoredDataType::REMOTE_TIMESTAMPS}});
1109 }
1110}
1111
1112NewDataWriter *MinimalFileMultiNodeLogNamer::MakeTimestampWriter(
1113 const Channel *channel) {
1114 bool log_delivery_times = false;
1115 if (this->node() != nullptr) {
1116 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
1117 channel, this->node(), this->node());
1118 }
1119 if (!log_delivery_times) {
1120 return nullptr;
1121 }
1122
1123 // There is only one of these.
1124 NewDataWriter *result = FindNodeDataWriter(
1125 this->node(), PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
1126 if (result != nullptr) {
1127 return result;
1128 }
1129
1130 return AddNodeDataWriter(
1131 node_, NewDataWriter{this,
1132 node_,
1133 node_,
1134 [this](NewDataWriter *data_writer) {
1135 OpenNodeWriter(node_, data_writer);
1136 },
1137 [this](NewDataWriter *data_writer) {
1138 CloseWriter(&data_writer->writer);
1139 },
1140 PackMessageSize(LogType::kLogDeliveryTimeOnly, 0),
1141 {StoredDataType::DATA, StoredDataType::TIMESTAMPS}});
1142}
1143
1144NewDataWriter *MinimalFileMultiNodeLogNamer::MakeForwardedTimestampWriter(
1145 const Channel *channel, const Node *node) {
1146 // See if we can read the data on this node at all.
1147 const bool is_readable =
1148 configuration::ChannelIsReadableOnNode(channel, this->node());
1149 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
1150
1151 CHECK_NE(node, this->node());
1152
1153 // If we have a remote timestamp writer for a particular node, use the same
1154 // writer for all remote timestamp channels of that node.
1155 NewDataWriter *result = FindNodeDataWriter(node, PackRemoteMessageSize());
1156 if (result != nullptr) {
1157 return result;
1158 }
1159
1160 // If there are no remote timestamp writers for the node, create one.
1161 return AddNodeDataWriter(
1162 node,
1163 NewDataWriter{this,
1164 configuration::GetNode(configuration_, node),
1165 node_,
1166 [this, node](NewDataWriter *data_writer) {
1167 OpenNodeWriter(node, data_writer);
1168 },
1169 [this](NewDataWriter *data_writer) {
1170 CloseWriter(&data_writer->writer);
1171 },
1172 PackRemoteMessageSize(),
1173 {StoredDataType::DATA, StoredDataType::REMOTE_TIMESTAMPS}});
1174}
1175
1176void MinimalFileMultiNodeLogNamer::OpenNodeWriter(const Node *source_node,
1177 NewDataWriter *data_writer) {
1178 std::string filename;
1179
1180 if (node() != nullptr) {
1181 filename = absl::StrCat(node()->name()->string_view(), "_");
1182 }
1183
1184 if (source_node != nullptr) {
1185 absl::StrAppend(&filename, source_node->name()->string_view(), "_");
1186 }
1187
1188 absl::StrAppend(&filename, "all.part", data_writer->parts_index(), ".bfbs",
1189 extension_);
1190 VLOG(1) << "Going to open " << filename;
1191 CreateBufferWriter(filename, data_writer->max_message_size(),
1192 &data_writer->writer);
1193}
1194
Austin Schuhcb5601b2020-09-10 15:29:59 -07001195} // namespace logger
1196} // namespace aos