blob: 3bbca3b87f6c6650a4613c14a1010a0f6d97d634 [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070010#include "flatbuffers/flatbuffers.h"
11#include "glog/logging.h"
12
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -070013#include "aos/containers/error_list.h"
14#include "aos/containers/sized_array.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070015#include "aos/events/logging/logfile_utils.h"
16#include "aos/events/logging/logger_generated.h"
Austin Schuh73340842021-07-30 22:32:06 -070017#include "aos/flatbuffer_merge.h"
Austin Schuh4385b142021-03-14 21:31:13 -070018#include "aos/uuid.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070019
Austin Schuh8bdfc492023-02-11 12:53:13 -080020DECLARE_int32(flush_size);
21
Stephan Pleinesf63bde82024-01-13 15:59:33 -080022namespace aos::logger {
Austin Schuhcb5601b2020-09-10 15:29:59 -070023
Austin Schuh572924a2021-07-30 22:32:12 -070024NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
Austin Schuhf5f99f32022-02-07 20:05:37 -080025 const Node *logger_node,
Austin Schuh572924a2021-07-30 22:32:12 -070026 std::function<void(NewDataWriter *)> reopen,
Austin Schuh48d10d62022-10-16 22:19:23 -070027 std::function<void(NewDataWriter *)> close,
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -070028 size_t max_message_size,
29 std::initializer_list<StoredDataType> types)
Austin Schuh572924a2021-07-30 22:32:12 -070030 : node_(node),
31 node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
Austin Schuhf5f99f32022-02-07 20:05:37 -080032 logger_node_index_(
33 configuration::GetNodeIndex(log_namer->configuration_, logger_node)),
Austin Schuh572924a2021-07-30 22:32:12 -070034 log_namer_(log_namer),
35 reopen_(std::move(reopen)),
Austin Schuh48d10d62022-10-16 22:19:23 -070036 close_(std::move(close)),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -070037 max_message_size_(max_message_size),
38 max_out_of_order_duration_(log_namer_->base_max_out_of_order_duration()) {
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -070039 allowed_data_types_.fill(false);
40
Austin Schuh72211ae2021-08-05 14:02:30 -070041 state_.resize(configuration::NodesCount(log_namer->configuration_));
42 CHECK_LT(node_index_, state_.size());
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -070043 for (StoredDataType type : types) {
44 CHECK_LT(static_cast<size_t>(type), allowed_data_types_.size());
45 allowed_data_types_[static_cast<size_t>(type)] = true;
46 }
Austin Schuh572924a2021-07-30 22:32:12 -070047}
48
49NewDataWriter::~NewDataWriter() {
50 if (writer) {
51 Close();
52 }
53}
54
55void NewDataWriter::Rotate() {
Austin Schuhe46492f2021-07-31 19:49:41 -070056 // No need to rotate if nothing has been written.
57 if (header_written_) {
Alexei Strotsbc082d82023-05-03 08:43:42 -070058 VLOG(1) << "Rotated " << name();
Austin Schuhe46492f2021-07-31 19:49:41 -070059 ++parts_index_;
Austin Schuh1124c512023-08-01 15:20:44 -070060
61 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> header =
62 MakeHeader();
63
64 if (header.span().size() > max_message_size_) {
65 max_message_size_ = header.span().size();
66 }
67
Austin Schuhe46492f2021-07-31 19:49:41 -070068 reopen_(this);
69 header_written_ = false;
Austin Schuh1124c512023-08-01 15:20:44 -070070 QueueHeader(std::move(header));
Austin Schuhe46492f2021-07-31 19:49:41 -070071 }
Austin Schuh572924a2021-07-30 22:32:12 -070072}
73
Austin Schuh5e14d842022-01-21 12:02:15 -080074void NewDataWriter::Reboot(const UUID &source_node_boot_uuid) {
Austin Schuh572924a2021-07-30 22:32:12 -070075 parts_uuid_ = UUID::Random();
76 ++parts_index_;
77 reopen_(this);
78 header_written_ = false;
Austin Schuh5e14d842022-01-21 12:02:15 -080079 for (State &state : state_) {
80 state.boot_uuid = UUID::Zero();
81 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
82 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
83 state.oldest_remote_unreliable_monotonic_timestamp =
84 monotonic_clock::max_time;
85 state.oldest_local_unreliable_monotonic_timestamp =
86 monotonic_clock::max_time;
Austin Schuhbfe6c572022-01-27 20:48:20 -080087 state.oldest_remote_reliable_monotonic_timestamp =
88 monotonic_clock::max_time;
Austin Schuhf5f99f32022-02-07 20:05:37 -080089 state.oldest_local_reliable_monotonic_timestamp = monotonic_clock::max_time;
90 state.oldest_logger_remote_unreliable_monotonic_timestamp =
91 monotonic_clock::max_time;
92 state.oldest_logger_local_unreliable_monotonic_timestamp =
Austin Schuhbfe6c572022-01-27 20:48:20 -080093 monotonic_clock::max_time;
Austin Schuh5e14d842022-01-21 12:02:15 -080094 }
95
96 state_[node_index_].boot_uuid = source_node_boot_uuid;
97
Alexei Strotsbc082d82023-05-03 08:43:42 -070098 VLOG(1) << "Rebooted " << name();
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -070099 newest_message_time_ = monotonic_clock::min_time;
100 // When a node reboots, parts_uuid changes but the same writer continues to
101 // write the data, so we can reset the max out of order duration. If we don't
102 // do this, the max out of order duration can grow to an unreasonable value.
103 max_out_of_order_duration_ = log_namer_->base_max_out_of_order_duration();
Austin Schuh5e14d842022-01-21 12:02:15 -0800104}
105
106void NewDataWriter::UpdateBoot(const UUID &source_node_boot_uuid) {
107 if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
108 state_[node_index_].boot_uuid = source_node_boot_uuid;
109 if (header_written_) {
110 Reboot(source_node_boot_uuid);
111 }
112 }
Austin Schuh572924a2021-07-30 22:32:12 -0700113}
114
Austin Schuh72211ae2021-08-05 14:02:30 -0700115void NewDataWriter::UpdateRemote(
116 const size_t remote_node_index, const UUID &remote_node_boot_uuid,
117 const monotonic_clock::time_point monotonic_remote_time,
Austin Schuhf5f99f32022-02-07 20:05:37 -0800118 const monotonic_clock::time_point monotonic_event_time, const bool reliable,
119 monotonic_clock::time_point monotonic_timestamp_time) {
Austin Schuh58646e22021-08-23 23:51:46 -0700120 // Trigger rotation if anything in the header changes.
Austin Schuh72211ae2021-08-05 14:02:30 -0700121 bool rotate = false;
122 CHECK_LT(remote_node_index, state_.size());
123 State &state = state_[remote_node_index];
Austin Schuh58646e22021-08-23 23:51:46 -0700124
125 // Did the remote boot UUID change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700126 if (state.boot_uuid != remote_node_boot_uuid) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700127 VLOG(1) << name() << " Remote " << remote_node_index << " updated to "
Austin Schuh72211ae2021-08-05 14:02:30 -0700128 << remote_node_boot_uuid << " from " << state.boot_uuid;
129 state.boot_uuid = remote_node_boot_uuid;
130 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
131 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
132 state.oldest_remote_unreliable_monotonic_timestamp =
133 monotonic_clock::max_time;
134 state.oldest_local_unreliable_monotonic_timestamp =
135 monotonic_clock::max_time;
Austin Schuhbfe6c572022-01-27 20:48:20 -0800136 state.oldest_remote_reliable_monotonic_timestamp =
137 monotonic_clock::max_time;
Austin Schuhf5f99f32022-02-07 20:05:37 -0800138 state.oldest_local_reliable_monotonic_timestamp = monotonic_clock::max_time;
139 state.oldest_logger_remote_unreliable_monotonic_timestamp =
140 monotonic_clock::max_time;
141 state.oldest_logger_local_unreliable_monotonic_timestamp =
Austin Schuhbfe6c572022-01-27 20:48:20 -0800142 monotonic_clock::max_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700143 rotate = true;
144 }
145
Austin Schuh58646e22021-08-23 23:51:46 -0700146 // Did the unreliable timestamps change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700147 if (!reliable) {
148 if (state.oldest_remote_unreliable_monotonic_timestamp >
149 monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700150 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuh58646e22021-08-23 23:51:46 -0700151 << " oldest_remote_unreliable_monotonic_timestamp updated from "
152 << state.oldest_remote_unreliable_monotonic_timestamp << " to "
153 << monotonic_remote_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700154 state.oldest_remote_unreliable_monotonic_timestamp =
155 monotonic_remote_time;
156 state.oldest_local_unreliable_monotonic_timestamp = monotonic_event_time;
157 rotate = true;
158 }
Austin Schuhbfe6c572022-01-27 20:48:20 -0800159 } else {
160 if (state.oldest_remote_reliable_monotonic_timestamp >
161 monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700162 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuhbfe6c572022-01-27 20:48:20 -0800163 << " oldest_remote_reliable_monotonic_timestamp updated from "
164 << state.oldest_remote_reliable_monotonic_timestamp << " to "
165 << monotonic_remote_time;
166 state.oldest_remote_reliable_monotonic_timestamp = monotonic_remote_time;
167 state.oldest_local_reliable_monotonic_timestamp = monotonic_event_time;
168 rotate = true;
169 }
Austin Schuh72211ae2021-08-05 14:02:30 -0700170 }
171
Austin Schuhf5f99f32022-02-07 20:05:37 -0800172 // Track the logger timestamps too.
173 if (monotonic_timestamp_time != monotonic_clock::min_time) {
174 State &logger_state = state_[node_index_];
175 CHECK_EQ(remote_node_index, logger_node_index_);
176 if (monotonic_event_time <
177 logger_state.oldest_logger_remote_unreliable_monotonic_timestamp) {
178 VLOG(1)
Alexei Strotsbc082d82023-05-03 08:43:42 -0700179 << name() << " Remote " << node_index_
Austin Schuhf5f99f32022-02-07 20:05:37 -0800180 << " oldest_logger_remote_unreliable_monotonic_timestamp updated "
181 "from "
182 << logger_state.oldest_logger_remote_unreliable_monotonic_timestamp
183 << " to " << monotonic_event_time;
184 logger_state.oldest_logger_remote_unreliable_monotonic_timestamp =
185 monotonic_event_time;
186 logger_state.oldest_logger_local_unreliable_monotonic_timestamp =
187 monotonic_timestamp_time;
188
189 rotate = true;
190 }
191 }
192
Austin Schuh58646e22021-08-23 23:51:46 -0700193 // Did any of the timestamps change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700194 if (state.oldest_remote_monotonic_timestamp > monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700195 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuh58646e22021-08-23 23:51:46 -0700196 << " oldest_remote_monotonic_timestamp updated from "
197 << state.oldest_remote_monotonic_timestamp << " to "
198 << monotonic_remote_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700199 state.oldest_remote_monotonic_timestamp = monotonic_remote_time;
200 state.oldest_local_monotonic_timestamp = monotonic_event_time;
201 rotate = true;
202 }
203
204 if (rotate) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700205 Rotate();
206 }
207}
208
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700209void NewDataWriter::CopyDataMessage(
210 DataEncoder::Copier *coppier, const UUID &source_node_boot_uuid,
211 aos::monotonic_clock::time_point now,
212 aos::monotonic_clock::time_point message_time) {
213 CHECK(allowed_data_types_[static_cast<size_t>(StoredDataType::DATA)])
214 << ": Tried to write data on non-data writer.";
215 CopyMessage(coppier, source_node_boot_uuid, now, message_time);
216}
217
218void NewDataWriter::CopyTimestampMessage(
219 DataEncoder::Copier *coppier, const UUID &source_node_boot_uuid,
220 aos::monotonic_clock::time_point now,
221 aos::monotonic_clock::time_point message_time) {
222 CHECK(allowed_data_types_[static_cast<size_t>(StoredDataType::TIMESTAMPS)])
223 << ": Tried to write timestamps on non-timestamp writer.";
224 CopyMessage(coppier, source_node_boot_uuid, now, message_time);
225}
226
227void NewDataWriter::CopyRemoteTimestampMessage(
228 DataEncoder::Copier *coppier, const UUID &source_node_boot_uuid,
229 aos::monotonic_clock::time_point now,
230 aos::monotonic_clock::time_point message_time) {
231 CHECK(allowed_data_types_[static_cast<size_t>(
232 StoredDataType::REMOTE_TIMESTAMPS)])
233 << ": Tried to write remote timestamps on non-remote timestamp writer.";
234 CopyMessage(coppier, source_node_boot_uuid, now, message_time);
235}
236
Austin Schuh48d10d62022-10-16 22:19:23 -0700237void NewDataWriter::CopyMessage(DataEncoder::Copier *coppier,
238 const UUID &source_node_boot_uuid,
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700239 aos::monotonic_clock::time_point now,
240 aos::monotonic_clock::time_point message_time) {
Austin Schuh58646e22021-08-23 23:51:46 -0700241 // Trigger a reboot if we detect the boot UUID change.
Austin Schuh5e14d842022-01-21 12:02:15 -0800242 UpdateBoot(source_node_boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -0700243
Austin Schuh5e14d842022-01-21 12:02:15 -0800244 if (!header_written_) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700245 QueueHeader(MakeHeader());
Austin Schuh572924a2021-07-30 22:32:12 -0700246 }
Austin Schuh58646e22021-08-23 23:51:46 -0700247
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700248 bool max_out_of_order_duration_exceeded = false;
249 // Enforce max out of duration contract.
250 //
251 // Updates the newest message time.
252 // Rotate the part file if current message is more than
253 // max_out_of_order_duration behind the newest message we've logged so far.
254 if (message_time > newest_message_time_) {
255 newest_message_time_ = message_time;
256 }
257
258 // Don't consider messages before start up when checking for max out of order
259 // duration.
260 monotonic_clock::time_point monotonic_start_time =
261 log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid);
262
263 if (std::chrono::nanoseconds((newest_message_time_ -
264 std::max(monotonic_start_time, message_time))) >
265 max_out_of_order_duration_) {
266 // If the new message is older than 2 * max_out_order_duration, doubling it
267 // won't be sufficient.
268 //
269 // Example: newest_message_time = 10, logged_message_time = 5,
270 // max_ooo_duration = 2
271 //
272 // In this case actual max_ooo_duration = 10 - 5 = 5, but we double the
273 // existing max_ooo_duration we get 4 which is not sufficient.
274 //
275 // Take the max of the two values.
276 max_out_of_order_duration_ =
277 2 * std::max(max_out_of_order_duration_,
278 std::chrono::nanoseconds(
279 (newest_message_time_ - message_time)));
280 max_out_of_order_duration_exceeded = true;
281 }
282
Austin Schuh58646e22021-08-23 23:51:46 -0700283 // If the start time has changed for this node, trigger a rotation.
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700284 if ((monotonic_start_time != monotonic_start_time_) ||
285 max_out_of_order_duration_exceeded) {
286 // If we just received a start time now, we will rotate parts shortly. Use a
287 // reasonable max out of order durationin the new header based on start time
288 // information available now.
289 if ((monotonic_start_time_ == monotonic_clock::min_time) &&
290 (monotonic_start_time != monotonic_clock::min_time)) {
291 // If we're writing current messages but we receive an older start time,
292 // we can pick a reasonable max ooo duration number for the next part.
293 //
294 // For example - Our current max ooo duration is 0.3 seconds. We're
295 // writing messages at 20 seconds and recieve a start time of 1 second. We
296 // don't need max ooo duration to be (20 - 1) = 19 seconds although that
297 // would still work.
298 //
299 // Pick the minimum max out of duration value that satisifies the
300 // requirement but bound the minimum at the base value we started with.
301 max_out_of_order_duration_ =
302 std::max(log_namer_->base_max_out_of_order_duration(),
303 std::min(max_out_of_order_duration_,
304 std::chrono::nanoseconds(newest_message_time_ -
305 monotonic_start_time)));
306 }
Austin Schuh58646e22021-08-23 23:51:46 -0700307 CHECK(header_written_);
308 Rotate();
309 }
310
311 CHECK_EQ(log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid),
312 monotonic_start_time_);
Austin Schuh72211ae2021-08-05 14:02:30 -0700313 CHECK_EQ(state_[node_index_].boot_uuid, source_node_boot_uuid);
milind-ua50344f2021-08-25 18:22:20 -0700314 CHECK(writer);
Austin Schuh572924a2021-07-30 22:32:12 -0700315 CHECK(header_written_) << ": Attempting to write message before header to "
Alexei Strotsbc082d82023-05-03 08:43:42 -0700316 << writer->name();
Austin Schuh1124c512023-08-01 15:20:44 -0700317 CHECK_LE(coppier->size(), max_message_size_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700318 writer->CopyMessage(coppier, now);
Austin Schuh572924a2021-07-30 22:32:12 -0700319}
320
Austin Schuhe46492f2021-07-31 19:49:41 -0700321aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
322NewDataWriter::MakeHeader() {
323 const size_t logger_node_index = log_namer_->logger_node_index();
324 const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
Austin Schuh72211ae2021-08-05 14:02:30 -0700325 if (state_[logger_node_index].boot_uuid == UUID::Zero()) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700326 VLOG(1) << name() << " Logger node is " << logger_node_index
Austin Schuhe46492f2021-07-31 19:49:41 -0700327 << " and uuid is " << logger_node_boot_uuid;
Austin Schuh72211ae2021-08-05 14:02:30 -0700328 state_[logger_node_index].boot_uuid = logger_node_boot_uuid;
Austin Schuhe46492f2021-07-31 19:49:41 -0700329 } else {
Austin Schuh72211ae2021-08-05 14:02:30 -0700330 CHECK_EQ(state_[logger_node_index].boot_uuid, logger_node_boot_uuid);
Austin Schuhe46492f2021-07-31 19:49:41 -0700331 }
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700332 return log_namer_->MakeHeader(node_index_, state_, parts_uuid(), parts_index_,
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700333 max_out_of_order_duration_,
334 allowed_data_types_);
Austin Schuhe46492f2021-07-31 19:49:41 -0700335}
336
Austin Schuh572924a2021-07-30 22:32:12 -0700337void NewDataWriter::QueueHeader(
338 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header) {
339 CHECK(!header_written_) << ": Attempting to write duplicate header to "
Alexei Strotsbc082d82023-05-03 08:43:42 -0700340 << writer->name();
Austin Schuh572924a2021-07-30 22:32:12 -0700341 CHECK(header.message().has_source_node_boot_uuid());
Austin Schuh72211ae2021-08-05 14:02:30 -0700342 CHECK_EQ(state_[node_index_].boot_uuid,
Austin Schuhe46492f2021-07-31 19:49:41 -0700343 UUID::FromString(header.message().source_node_boot_uuid()));
Austin Schuh510dc622021-08-06 18:47:30 -0700344 if (!writer) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700345 // Since we haven't opened the first time, it's still not too late to update
346 // the max message size. Make sure the header fits.
347 //
348 // This won't work well on reboots, but the structure of the header is fixed
349 // by that point in time, so it's size is fixed too.
350 //
351 // Most of the time, the minimum buffer size inside the encoder of around
352 // 128k will make this a non-issue.
353 UpdateMaxMessageSize(header.span().size());
354
Austin Schuh510dc622021-08-06 18:47:30 -0700355 reopen_(this);
356 }
357
Alexei Strotsbc082d82023-05-03 08:43:42 -0700358 VLOG(1) << "Writing to " << name() << " "
Austin Schuh58646e22021-08-23 23:51:46 -0700359 << aos::FlatbufferToJson(
360 header, {.multi_line = false, .max_vector_size = 100});
361
Austin Schuh510dc622021-08-06 18:47:30 -0700362 CHECK(writer);
Austin Schuh7ef11a42023-02-04 17:15:12 -0800363 DataEncoder::SpanCopier coppier(header.span());
Austin Schuh1124c512023-08-01 15:20:44 -0700364 CHECK_LE(coppier.size(), max_message_size_);
Austin Schuh7ef11a42023-02-04 17:15:12 -0800365 writer->CopyMessage(&coppier, aos::monotonic_clock::now());
Austin Schuh572924a2021-07-30 22:32:12 -0700366 header_written_ = true;
Austin Schuh58646e22021-08-23 23:51:46 -0700367 monotonic_start_time_ = log_namer_->monotonic_start_time(
368 node_index_, state_[node_index_].boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -0700369}
370
371void NewDataWriter::Close() {
372 CHECK(writer);
373 close_(this);
374 writer.reset();
375 header_written_ = false;
376}
377
Austin Schuh58646e22021-08-23 23:51:46 -0700378LogNamer::NodeState *LogNamer::GetNodeState(size_t node_index,
379 const UUID &boot_uuid) {
380 auto it = node_states_.find(std::make_pair(node_index, boot_uuid));
381 if (it == node_states_.end()) {
382 it =
383 node_states_.emplace(std::make_pair(node_index, boot_uuid), NodeState())
384 .first;
385 }
386 return &it->second;
387}
388
Austin Schuh73340842021-07-30 22:32:06 -0700389aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
Austin Schuh72211ae2021-08-05 14:02:30 -0700390 size_t node_index, const std::vector<NewDataWriter::State> &state,
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700391 const UUID &parts_uuid, int parts_index,
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700392 std::chrono::nanoseconds max_out_of_order_duration,
393 const std::array<bool, static_cast<size_t>(StoredDataType::MAX) + 1>
394 &allowed_data_types) {
Austin Schuh72211ae2021-08-05 14:02:30 -0700395 const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
Austin Schuh73340842021-07-30 22:32:06 -0700396 const Node *const source_node =
397 configuration::GetNode(configuration_, node_index);
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700398 CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 35u);
Austin Schuh73340842021-07-30 22:32:06 -0700399 flatbuffers::FlatBufferBuilder fbb;
400 fbb.ForceDefaults(true);
401
402 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
403 flatbuffers::Offset<aos::Configuration> configuration_offset;
404 if (header_.message().has_configuration()) {
405 CHECK(!header_.message().has_configuration_sha256());
406 configuration_offset =
407 CopyFlatBuffer(header_.message().configuration(), &fbb);
408 } else {
409 CHECK(!header_.message().has_configuration());
410 CHECK(header_.message().has_configuration_sha256());
411 config_sha256_offset = fbb.CreateString(
412 header_.message().configuration_sha256()->string_view());
413 }
414
415 CHECK(header_.message().has_name());
416 const flatbuffers::Offset<flatbuffers::String> name_offset =
417 fbb.CreateString(header_.message().name()->string_view());
Austin Schuhfa712682022-05-11 16:43:42 -0700418 const flatbuffers::Offset<flatbuffers::String> logger_sha1_offset =
419 header_.message().has_logger_sha1()
420 ? fbb.CreateString(header_.message().logger_sha1()->string_view())
421 : 0;
422 const flatbuffers::Offset<flatbuffers::String> logger_version_offset =
423 header_.message().has_logger_version()
424 ? fbb.CreateString(header_.message().logger_version()->string_view())
425 : 0;
Austin Schuh73340842021-07-30 22:32:06 -0700426
427 CHECK(header_.message().has_log_event_uuid());
428 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
429 fbb.CreateString(header_.message().log_event_uuid()->string_view());
430
431 CHECK(header_.message().has_logger_instance_uuid());
432 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
433 fbb.CreateString(header_.message().logger_instance_uuid()->string_view());
434
435 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
436 if (header_.message().has_log_start_uuid()) {
437 log_start_uuid_offset =
438 fbb.CreateString(header_.message().log_start_uuid()->string_view());
439 }
440
441 CHECK(header_.message().has_logger_node_boot_uuid());
442 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
443 fbb.CreateString(
444 header_.message().logger_node_boot_uuid()->string_view());
445
446 CHECK_NE(source_node_boot_uuid, UUID::Zero());
447 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
448 source_node_boot_uuid.PackString(&fbb);
449
450 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
451 parts_uuid.PackString(&fbb);
452
453 flatbuffers::Offset<Node> node_offset;
454 flatbuffers::Offset<Node> logger_node_offset;
455
456 if (configuration::MultiNode(configuration_)) {
457 node_offset = RecursiveCopyFlatBuffer(source_node, &fbb);
458 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
459 }
460
Austin Schuhe46492f2021-07-31 19:49:41 -0700461 std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
Austin Schuh72211ae2021-08-05 14:02:30 -0700462 boot_uuid_offsets.reserve(state.size());
Austin Schuhe46492f2021-07-31 19:49:41 -0700463
Austin Schuh4db9ec92021-09-22 13:11:12 -0700464 int64_t *unused;
Austin Schuh72211ae2021-08-05 14:02:30 -0700465 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
Austin Schuhf5f99f32022-02-07 20:05:37 -0800466 oldest_remote_monotonic_timestamps_offset =
467 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700468
Austin Schuh72211ae2021-08-05 14:02:30 -0700469 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
Austin Schuhf5f99f32022-02-07 20:05:37 -0800470 oldest_local_monotonic_timestamps_offset =
471 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700472
Austin Schuh72211ae2021-08-05 14:02:30 -0700473 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
474 oldest_remote_unreliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800475 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700476
Austin Schuh72211ae2021-08-05 14:02:30 -0700477 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
478 oldest_local_unreliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800479 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700480
Austin Schuhbfe6c572022-01-27 20:48:20 -0800481 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
482 oldest_remote_reliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800483 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800484
485 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
486 oldest_local_reliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800487 fbb.CreateUninitializedVector(state.size(), &unused);
488
489 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
490 oldest_logger_remote_unreliable_monotonic_timestamps_offset =
491 fbb.CreateUninitializedVector(state.size(), &unused);
492
493 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
494 oldest_logger_local_unreliable_monotonic_timestamps_offset =
495 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800496
Austin Schuh72211ae2021-08-05 14:02:30 -0700497 for (size_t i = 0; i < state.size(); ++i) {
Austin Schuh4db9ec92021-09-22 13:11:12 -0700498 if (state[i].boot_uuid != UUID::Zero()) {
499 boot_uuid_offsets.emplace_back(state[i].boot_uuid.PackString(&fbb));
500 } else {
501 boot_uuid_offsets.emplace_back(fbb.CreateString(""));
502 }
Austin Schuh5ae8f4a2021-09-11 19:09:50 -0700503 if (state[i].boot_uuid == UUID::Zero()) {
504 CHECK_EQ(state[i].oldest_remote_monotonic_timestamp,
505 monotonic_clock::max_time);
506 CHECK_EQ(state[i].oldest_local_monotonic_timestamp,
507 monotonic_clock::max_time);
508 CHECK_EQ(state[i].oldest_remote_unreliable_monotonic_timestamp,
509 monotonic_clock::max_time);
510 CHECK_EQ(state[i].oldest_local_unreliable_monotonic_timestamp,
511 monotonic_clock::max_time);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800512 CHECK_EQ(state[i].oldest_remote_reliable_monotonic_timestamp,
513 monotonic_clock::max_time);
514 CHECK_EQ(state[i].oldest_local_reliable_monotonic_timestamp,
515 monotonic_clock::max_time);
Austin Schuhf5f99f32022-02-07 20:05:37 -0800516 CHECK_EQ(state[i].oldest_logger_remote_unreliable_monotonic_timestamp,
517 monotonic_clock::max_time);
518 CHECK_EQ(state[i].oldest_logger_local_unreliable_monotonic_timestamp,
519 monotonic_clock::max_time);
Austin Schuh5ae8f4a2021-09-11 19:09:50 -0700520 }
521
Austin Schuh4db9ec92021-09-22 13:11:12 -0700522 flatbuffers::GetMutableTemporaryPointer(
523 fbb, oldest_remote_monotonic_timestamps_offset)
524 ->Mutate(i, state[i]
525 .oldest_remote_monotonic_timestamp.time_since_epoch()
526 .count());
527 flatbuffers::GetMutableTemporaryPointer(
528 fbb, oldest_local_monotonic_timestamps_offset)
529 ->Mutate(i, state[i]
530 .oldest_local_monotonic_timestamp.time_since_epoch()
531 .count());
532 flatbuffers::GetMutableTemporaryPointer(
533 fbb, oldest_remote_unreliable_monotonic_timestamps_offset)
534 ->Mutate(i, state[i]
Austin Schuhbfe6c572022-01-27 20:48:20 -0800535 .oldest_remote_unreliable_monotonic_timestamp
536 .time_since_epoch()
Austin Schuh4db9ec92021-09-22 13:11:12 -0700537 .count());
538 flatbuffers::GetMutableTemporaryPointer(
539 fbb, oldest_local_unreliable_monotonic_timestamps_offset)
540 ->Mutate(i, state[i]
Austin Schuhbfe6c572022-01-27 20:48:20 -0800541 .oldest_local_unreliable_monotonic_timestamp
542 .time_since_epoch()
Austin Schuh4db9ec92021-09-22 13:11:12 -0700543 .count());
Austin Schuhbfe6c572022-01-27 20:48:20 -0800544
545 flatbuffers::GetMutableTemporaryPointer(
546 fbb, oldest_remote_reliable_monotonic_timestamps_offset)
547 ->Mutate(i, state[i]
548 .oldest_remote_reliable_monotonic_timestamp
549 .time_since_epoch()
550 .count());
551 flatbuffers::GetMutableTemporaryPointer(
552 fbb, oldest_local_reliable_monotonic_timestamps_offset)
553 ->Mutate(
554 i, state[i]
555 .oldest_local_reliable_monotonic_timestamp.time_since_epoch()
556 .count());
Austin Schuhf5f99f32022-02-07 20:05:37 -0800557
558 flatbuffers::GetMutableTemporaryPointer(
559 fbb, oldest_logger_remote_unreliable_monotonic_timestamps_offset)
560 ->Mutate(i, state[i]
561 .oldest_logger_remote_unreliable_monotonic_timestamp
562 .time_since_epoch()
563 .count());
564 flatbuffers::GetMutableTemporaryPointer(
565 fbb, oldest_logger_local_unreliable_monotonic_timestamps_offset)
566 ->Mutate(i, state[i]
567 .oldest_logger_local_unreliable_monotonic_timestamp
568 .time_since_epoch()
569 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700570 }
571
Austin Schuh4db9ec92021-09-22 13:11:12 -0700572 flatbuffers::Offset<
573 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
574 boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
575
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700576 aos::ErrorList<StoredDataType> allowed_data_types_vector;
577 for (size_t type = static_cast<size_t>(StoredDataType::MIN);
578 type <= static_cast<size_t>(StoredDataType::MAX); ++type) {
579 if (allowed_data_types[type]) {
580 allowed_data_types_vector.Set(static_cast<StoredDataType>(type));
581 }
582 }
583
584 flatbuffers::Offset<flatbuffers::Vector<StoredDataType>> data_stored_offset =
585 fbb.CreateVector(allowed_data_types_vector.data(),
586 allowed_data_types_vector.size());
587
Austin Schuh73340842021-07-30 22:32:06 -0700588 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
589
590 log_file_header_builder.add_name(name_offset);
Austin Schuhfa712682022-05-11 16:43:42 -0700591 if (!logger_sha1_offset.IsNull()) {
592 log_file_header_builder.add_logger_sha1(logger_sha1_offset);
593 }
594 if (!logger_version_offset.IsNull()) {
595 log_file_header_builder.add_logger_version(logger_version_offset);
596 }
Austin Schuh73340842021-07-30 22:32:06 -0700597
598 // Only add the node if we are running in a multinode configuration.
599 if (!logger_node_offset.IsNull()) {
600 log_file_header_builder.add_node(node_offset);
601 log_file_header_builder.add_logger_node(logger_node_offset);
602 }
603
604 if (!configuration_offset.IsNull()) {
605 log_file_header_builder.add_configuration(configuration_offset);
606 }
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700607
Austin Schuh73340842021-07-30 22:32:06 -0700608 log_file_header_builder.add_max_out_of_order_duration(
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700609 max_out_of_order_duration.count());
Austin Schuh73340842021-07-30 22:32:06 -0700610
Austin Schuh58646e22021-08-23 23:51:46 -0700611 NodeState *node_state = GetNodeState(node_index, source_node_boot_uuid);
Austin Schuh73340842021-07-30 22:32:06 -0700612 log_file_header_builder.add_monotonic_start_time(
613 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700614 node_state->monotonic_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700615 .count());
616 if (source_node == node_) {
617 log_file_header_builder.add_realtime_start_time(
618 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700619 node_state->realtime_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700620 .count());
621 } else {
622 // Fill out the legacy start times. Since these were implemented to never
623 // change on reboot, they aren't very helpful in tracking what happened.
624 log_file_header_builder.add_logger_monotonic_start_time(
625 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700626 node_state->logger_monotonic_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700627 .count());
628 log_file_header_builder.add_logger_realtime_start_time(
629 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700630 node_state->logger_realtime_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700631 .count());
632 }
633
634 // TODO(austin): Add more useful times. When was this part started? What do
635 // we know about both the logger and remote then?
636
637 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
638 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
639 if (!log_start_uuid_offset.IsNull()) {
640 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
641 }
642 log_file_header_builder.add_logger_node_boot_uuid(
643 logger_node_boot_uuid_offset);
644 log_file_header_builder.add_source_node_boot_uuid(
645 source_node_boot_uuid_offset);
646
647 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
648 log_file_header_builder.add_parts_index(parts_index);
649
650 if (!config_sha256_offset.IsNull()) {
651 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
652 }
653
Austin Schuhe46492f2021-07-31 19:49:41 -0700654 log_file_header_builder.add_boot_uuids(boot_uuids_offset);
Austin Schuha499cea2021-07-31 19:49:53 -0700655 log_file_header_builder.add_logger_part_monotonic_start_time(
656 std::chrono::duration_cast<std::chrono::nanoseconds>(
657 event_loop_->monotonic_now().time_since_epoch())
658 .count());
659 log_file_header_builder.add_logger_part_realtime_start_time(
660 std::chrono::duration_cast<std::chrono::nanoseconds>(
661 event_loop_->realtime_now().time_since_epoch())
662 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700663 log_file_header_builder.add_oldest_remote_monotonic_timestamps(
664 oldest_remote_monotonic_timestamps_offset);
665 log_file_header_builder.add_oldest_local_monotonic_timestamps(
666 oldest_local_monotonic_timestamps_offset);
667 log_file_header_builder.add_oldest_remote_unreliable_monotonic_timestamps(
668 oldest_remote_unreliable_monotonic_timestamps_offset);
669 log_file_header_builder.add_oldest_local_unreliable_monotonic_timestamps(
670 oldest_local_unreliable_monotonic_timestamps_offset);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800671 log_file_header_builder.add_oldest_remote_reliable_monotonic_timestamps(
672 oldest_remote_reliable_monotonic_timestamps_offset);
673 log_file_header_builder.add_oldest_local_reliable_monotonic_timestamps(
674 oldest_local_reliable_monotonic_timestamps_offset);
Austin Schuhf5f99f32022-02-07 20:05:37 -0800675 log_file_header_builder
676 .add_oldest_logger_remote_unreliable_monotonic_timestamps(
677 oldest_logger_remote_unreliable_monotonic_timestamps_offset);
678 log_file_header_builder
679 .add_oldest_logger_local_unreliable_monotonic_timestamps(
680 oldest_logger_local_unreliable_monotonic_timestamps_offset);
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700681
682 log_file_header_builder.add_data_stored(data_stored_offset);
Austin Schuh73340842021-07-30 22:32:06 -0700683 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
684 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
685 fbb.Release());
686
687 CHECK(result.Verify()) << ": Built a corrupted header.";
688
689 return result;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700690}
691
Alexei Strotsbc082d82023-05-03 08:43:42 -0700692MultiNodeLogNamer::MultiNodeLogNamer(std::unique_ptr<LogBackend> log_backend,
693 EventLoop *event_loop)
Alexei Strots01395492023-03-20 13:59:56 -0700694 : MultiNodeLogNamer(std::move(log_backend), event_loop->configuration(),
695 event_loop, event_loop->node()) {}
Austin Schuh5b728b72021-06-16 14:57:15 -0700696
Alexei Strotsbc082d82023-05-03 08:43:42 -0700697MultiNodeLogNamer::MultiNodeLogNamer(std::unique_ptr<LogBackend> log_backend,
698 const Configuration *configuration,
699 EventLoop *event_loop, const Node *node)
Austin Schuh5b728b72021-06-16 14:57:15 -0700700 : LogNamer(configuration, event_loop, node),
Alexei Strots01395492023-03-20 13:59:56 -0700701 log_backend_(std::move(log_backend)),
Austin Schuh8bdfc492023-02-11 12:53:13 -0800702 encoder_factory_([](size_t max_message_size) {
703 // TODO(austin): For slow channels, can we allocate less memory?
704 return std::make_unique<DummyEncoder>(max_message_size,
705 FLAGS_flush_size);
706 }) {}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700707
Brian Silverman48deab12020-09-30 18:39:28 -0700708MultiNodeLogNamer::~MultiNodeLogNamer() {
709 if (!ran_out_of_space_) {
710 // This handles renaming temporary files etc.
711 Close();
712 }
713}
714
Austin Schuh572924a2021-07-30 22:32:12 -0700715void MultiNodeLogNamer::Rotate(const Node *node) {
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700716 for (auto &data_map : {&node_data_writers_, &node_timestamp_writers_}) {
717 auto it = data_map->find(node);
718 if (it != data_map->end()) {
719 it->second.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700720 }
721 }
722}
723
Austin Schuh8c399962020-12-25 21:51:45 -0800724void MultiNodeLogNamer::WriteConfiguration(
725 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
726 std::string_view config_sha256) {
727 if (ran_out_of_space_) {
728 return;
729 }
730
Alexei Strots01395492023-03-20 13:59:56 -0700731 const std::string filename = absl::StrCat(config_sha256, ".bfbs", extension_);
732 auto file_handle = log_backend_->RequestFile(filename);
Austin Schuh8c399962020-12-25 21:51:45 -0800733 std::unique_ptr<DetachedBufferWriter> writer =
Austin Schuh48d10d62022-10-16 22:19:23 -0700734 std::make_unique<DetachedBufferWriter>(
Alexei Strots01395492023-03-20 13:59:56 -0700735 std::move(file_handle), encoder_factory_(header->span().size()));
Austin Schuh8c399962020-12-25 21:51:45 -0800736
Austin Schuh7ef11a42023-02-04 17:15:12 -0800737 DataEncoder::SpanCopier coppier(header->span());
738 writer->CopyMessage(&coppier, aos::monotonic_clock::now());
Austin Schuh8c399962020-12-25 21:51:45 -0800739
740 if (!writer->ran_out_of_space()) {
Alexei Strots01395492023-03-20 13:59:56 -0700741 all_filenames_.emplace_back(filename);
Austin Schuh8c399962020-12-25 21:51:45 -0800742 }
Alexei Strots01395492023-03-20 13:59:56 -0700743 // Close the file and maybe rename it too.
Austin Schuh8c399962020-12-25 21:51:45 -0800744 CloseWriter(&writer);
745}
746
Austin Schuh6ecfe902023-08-04 22:44:37 -0700747void MultiNodeLogNamer::NoticeNode(const Node *source_node) {
748 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
749 nodes_.emplace_back(source_node);
750 }
751}
752
753NewDataWriter *MultiNodeLogNamer::FindNodeDataWriter(const Node *source_node,
754 size_t max_message_size) {
755 NoticeNode(source_node);
756
757 auto it = node_data_writers_.find(source_node);
758 if (it != node_data_writers_.end()) {
759 it->second.UpdateMaxMessageSize(max_message_size);
760 return &(it->second);
761 }
762 return nullptr;
763}
764
765NewDataWriter *MultiNodeLogNamer::FindNodeTimestampWriter(
766 const Node *source_node, size_t max_message_size) {
767 NoticeNode(source_node);
768
769 auto it = node_timestamp_writers_.find(source_node);
770 if (it != node_timestamp_writers_.end()) {
771 it->second.UpdateMaxMessageSize(max_message_size);
772 return &(it->second);
773 }
774 return nullptr;
775}
776
777NewDataWriter *MultiNodeLogNamer::AddNodeDataWriter(const Node *source_node,
778 NewDataWriter &&writer) {
779 auto result = node_data_writers_.emplace(source_node, std::move(writer));
780 CHECK(result.second);
781 return &(result.first->second);
782}
783
784NewDataWriter *MultiNodeLogNamer::AddNodeTimestampWriter(
785 const Node *source_node, NewDataWriter &&writer) {
786 auto result = node_timestamp_writers_.emplace(source_node, std::move(writer));
787 CHECK(result.second);
788 return &(result.first->second);
789}
790
Austin Schuhb8bca732021-07-30 22:32:00 -0700791NewDataWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700792 // See if we can read the data on this node at all.
793 const bool is_readable =
794 configuration::ChannelIsReadableOnNode(channel, this->node());
795 if (!is_readable) {
796 return nullptr;
797 }
798
799 // Then, see if we are supposed to log the data here.
800 const bool log_message =
801 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
802
803 if (!log_message) {
804 return nullptr;
805 }
806
Austin Schuhcb5601b2020-09-10 15:29:59 -0700807 // Ok, we have data that is being forwarded to us that we are supposed to
808 // log. It needs to be logged with send timestamps, but be sorted enough
809 // to be able to be processed.
Austin Schuhcb5601b2020-09-10 15:29:59 -0700810
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700811 const Node *source_node =
812 configuration::MultiNode(configuration_)
813 ? configuration::GetNode(configuration_,
814 channel->source_node()->string_view())
815 : nullptr;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700816
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700817 // If we already have a data writer for the node, then use the same writer for
818 // all channels of that node.
Austin Schuh6ecfe902023-08-04 22:44:37 -0700819 NewDataWriter *result = FindNodeDataWriter(
820 source_node,
821 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
822 if (result != nullptr) {
823 return result;
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700824 }
825
826 // If we don't have a data writer for the node, create one.
Austin Schuh6ecfe902023-08-04 22:44:37 -0700827 return AddNodeDataWriter(
828 source_node,
829 NewDataWriter{
830 this,
831 source_node,
832 node_,
833 [this, source_node](NewDataWriter *data_writer) {
834 OpenDataWriter(source_node, data_writer);
835 },
836 [this](NewDataWriter *data_writer) {
837 CloseWriter(&data_writer->writer);
838 },
839 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()),
840 {StoredDataType::DATA}});
Austin Schuhcb5601b2020-09-10 15:29:59 -0700841}
842
Austin Schuhb8bca732021-07-30 22:32:00 -0700843NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700844 const Channel *channel, const Node *node) {
845 // See if we can read the data on this node at all.
846 const bool is_readable =
847 configuration::ChannelIsReadableOnNode(channel, this->node());
848 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
849
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700850 CHECK_NE(node, this->node());
851
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700852 // If we have a remote timestamp writer for a particular node, use the same
853 // writer for all remote timestamp channels of that node.
Austin Schuh6ecfe902023-08-04 22:44:37 -0700854 NewDataWriter *result =
855 FindNodeTimestampWriter(node, PackRemoteMessageSize());
856 if (result != nullptr) {
857 return result;
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700858 }
859
860 // If there are no remote timestamp writers for the node, create one.
Austin Schuh6ecfe902023-08-04 22:44:37 -0700861 return AddNodeTimestampWriter(
862 node, NewDataWriter{this,
863 configuration::GetNode(configuration_, node),
864 node_,
865 [this](NewDataWriter *data_writer) {
866 OpenForwardedTimestampWriter(node_, data_writer);
867 },
868 [this](NewDataWriter *data_writer) {
869 CloseWriter(&data_writer->writer);
870 },
871 PackRemoteMessageSize(),
872 {StoredDataType::REMOTE_TIMESTAMPS}});
Austin Schuhcb5601b2020-09-10 15:29:59 -0700873}
874
Austin Schuhb8bca732021-07-30 22:32:00 -0700875NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700876 bool log_delivery_times = false;
877 if (this->node() != nullptr) {
878 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
879 channel, this->node(), this->node());
880 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700881 if (!log_delivery_times) {
882 return nullptr;
883 }
884
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700885 // There is only one of these.
Austin Schuh6ecfe902023-08-04 22:44:37 -0700886 NewDataWriter *result = FindNodeTimestampWriter(
887 this->node(), PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
888 if (result != nullptr) {
889 return result;
Brian Silvermancb805822020-10-06 17:43:35 -0700890 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700891
Austin Schuh6ecfe902023-08-04 22:44:37 -0700892 return AddNodeTimestampWriter(
893 node_, NewDataWriter{this,
894 node_,
895 node_,
896 [this](NewDataWriter *data_writer) {
897 OpenTimestampWriter(data_writer);
898 },
899 [this](NewDataWriter *data_writer) {
900 CloseWriter(&data_writer->writer);
901 },
902 PackMessageSize(LogType::kLogDeliveryTimeOnly, 0),
903 {StoredDataType::TIMESTAMPS}});
Austin Schuhcb5601b2020-09-10 15:29:59 -0700904}
905
Austin Schuh08dba8f2023-05-01 08:29:30 -0700906WriteCode MultiNodeLogNamer::Close() {
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700907 node_data_writers_.clear();
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700908 node_timestamp_writers_.clear();
Austin Schuh08dba8f2023-05-01 08:29:30 -0700909 if (ran_out_of_space_) {
910 return WriteCode::kOutOfSpace;
911 }
912 return WriteCode::kOk;
Brian Silvermancb805822020-10-06 17:43:35 -0700913}
914
915void MultiNodeLogNamer::ResetStatistics() {
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700916 for (std::pair<const Node *const, NewDataWriter> &data_writer :
917 node_data_writers_) {
Austin Schuhad0cfc32020-12-21 12:34:26 -0800918 if (!data_writer.second.writer) continue;
Alexei Strots01395492023-03-20 13:59:56 -0700919 data_writer.second.writer->WriteStatistics()->ResetStats();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700920 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700921 for (std::pair<const Node *const, NewDataWriter> &data_writer :
922 node_timestamp_writers_) {
923 if (!data_writer.second.writer) continue;
924 data_writer.second.writer->WriteStatistics()->ResetStats();
Brian Silvermancb805822020-10-06 17:43:35 -0700925 }
926 max_write_time_ = std::chrono::nanoseconds::zero();
927 max_write_time_bytes_ = -1;
928 max_write_time_messages_ = -1;
929 total_write_time_ = std::chrono::nanoseconds::zero();
930 total_write_count_ = 0;
931 total_write_messages_ = 0;
932 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700933}
934
Austin Schuhb8bca732021-07-30 22:32:00 -0700935void MultiNodeLogNamer::OpenForwardedTimestampWriter(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700936 const Node * /*source_node*/, NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700937 const std::string filename = absl::StrCat(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700938 "timestamps/remote_", data_writer->node()->name()->string_view(), ".part",
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700939 data_writer->parts_index(), ".bfbs", extension_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700940 CreateBufferWriter(filename, data_writer->max_message_size(),
941 &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700942}
943
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700944void MultiNodeLogNamer::OpenDataWriter(const Node *source_node,
945 NewDataWriter *data_writer) {
946 std::string filename;
947
948 if (source_node != nullptr) {
949 if (source_node == node_) {
950 filename = absl::StrCat(source_node->name()->string_view(), "_");
951 } else {
952 filename = absl::StrCat("data/", source_node->name()->string_view(), "_");
953 }
Brian Silverman7af8c902020-09-29 16:14:04 -0700954 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700955
956 absl::StrAppend(&filename, "data.part", data_writer->parts_index(), ".bfbs",
957 extension_);
958 CreateBufferWriter(filename, data_writer->max_message_size(),
959 &data_writer->writer);
960}
961
962void MultiNodeLogNamer::OpenTimestampWriter(NewDataWriter *data_writer) {
963 std::string filename =
964 absl::StrCat(node()->name()->string_view(), "_timestamps.part",
965 data_writer->parts_index(), ".bfbs", extension_);
966 CreateBufferWriter(filename, data_writer->max_message_size(),
967 &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700968}
969
Brian Silverman0465fcf2020-09-24 00:29:18 -0700970void MultiNodeLogNamer::CreateBufferWriter(
Austin Schuh48d10d62022-10-16 22:19:23 -0700971 std::string_view path, size_t max_message_size,
972 std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700973 if (ran_out_of_space_) {
974 // Refuse to open any new files, which might skip data. Any existing files
975 // are in the same folder, which means they're on the same filesystem, which
976 // means they're probably going to run out of space and get stuck too.
Alexei Strots01395492023-03-20 13:59:56 -0700977 if (!(*destination)) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700978 // But avoid leaving a nullptr writer if we're out of space when
979 // attempting to open the first file.
980 *destination = std::make_unique<DetachedBufferWriter>(
981 DetachedBufferWriter::already_out_of_space_t());
982 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700983 return;
984 }
Alexei Strots01395492023-03-20 13:59:56 -0700985
986 // Let's check that we need to close and replace current driver.
987 if (*destination) {
988 // Let's close the current writer.
989 CloseWriter(destination);
990 // Are we out of space now?
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700991 if (ran_out_of_space_) {
992 *destination = std::make_unique<DetachedBufferWriter>(
993 DetachedBufferWriter::already_out_of_space_t());
994 return;
995 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700996 }
Brian Silvermancb805822020-10-06 17:43:35 -0700997
Alexei Strots01395492023-03-20 13:59:56 -0700998 const std::string filename(path);
999 *destination = std::make_unique<DetachedBufferWriter>(
1000 log_backend_->RequestFile(filename), encoder_factory_(max_message_size));
1001 if (!(*destination)->ran_out_of_space()) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -07001002 all_filenames_.emplace_back(path);
1003 }
Brian Silverman0465fcf2020-09-24 00:29:18 -07001004}
1005
Brian Silvermancb805822020-10-06 17:43:35 -07001006void MultiNodeLogNamer::CloseWriter(
1007 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
Alexei Strots01395492023-03-20 13:59:56 -07001008 CHECK_NOTNULL(writer_pointer);
1009 if (!(*writer_pointer)) {
Brian Silvermancb805822020-10-06 17:43:35 -07001010 return;
1011 }
Alexei Strots01395492023-03-20 13:59:56 -07001012 DetachedBufferWriter *const writer = writer_pointer->get();
Brian Silvermancb805822020-10-06 17:43:35 -07001013 writer->Close();
1014
Alexei Strots01395492023-03-20 13:59:56 -07001015 const auto *stats = writer->WriteStatistics();
1016 if (stats->max_write_time() > max_write_time_) {
1017 max_write_time_ = stats->max_write_time();
1018 max_write_time_bytes_ = stats->max_write_time_bytes();
1019 max_write_time_messages_ = stats->max_write_time_messages();
Brian Silvermancb805822020-10-06 17:43:35 -07001020 }
Alexei Strots01395492023-03-20 13:59:56 -07001021 total_write_time_ += stats->total_write_time();
1022 total_write_count_ += stats->total_write_count();
1023 total_write_messages_ += stats->total_write_messages();
1024 total_write_bytes_ += stats->total_write_bytes();
Brian Silvermancb805822020-10-06 17:43:35 -07001025
1026 if (writer->ran_out_of_space()) {
1027 ran_out_of_space_ = true;
1028 writer->acknowledge_out_of_space();
1029 }
Brian Silvermancb805822020-10-06 17:43:35 -07001030}
1031
Austin Schuh6ecfe902023-08-04 22:44:37 -07001032NewDataWriter *MinimalFileMultiNodeLogNamer::MakeWriter(
1033 const Channel *channel) {
1034 // See if we can read the data on this node at all.
1035 const bool is_readable =
1036 configuration::ChannelIsReadableOnNode(channel, this->node());
1037 if (!is_readable) {
1038 return nullptr;
1039 }
1040
1041 // Then, see if we are supposed to log the data here.
1042 const bool log_message =
1043 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
1044
1045 if (!log_message) {
1046 return nullptr;
1047 }
1048
1049 // Ok, we have data that is being forwarded to us that we are supposed to
1050 // log. It needs to be logged with send timestamps, but be sorted enough
1051 // to be able to be processed.
1052
1053 const Node *source_node =
1054 configuration::MultiNode(configuration_)
1055 ? configuration::GetNode(configuration_,
1056 channel->source_node()->string_view())
1057 : nullptr;
1058
1059 // If we don't have a data writer for the node, create one.
1060 if (this->node() == source_node) {
1061 // If we already have a data writer for the node, then use the same writer
1062 // for all channels of that node.
1063 NewDataWriter *result = FindNodeDataWriter(
1064 source_node,
1065 PackMessageSize(LogType::kLogMessage, channel->max_size()));
1066 if (result != nullptr) {
1067 return result;
1068 }
1069
1070 return AddNodeDataWriter(
1071 source_node,
1072 NewDataWriter{
1073 this,
1074 source_node,
1075 node_,
1076 [this, source_node](NewDataWriter *data_writer) {
1077 OpenNodeWriter(source_node, data_writer);
1078 },
1079 [this](NewDataWriter *data_writer) {
1080 CloseWriter(&data_writer->writer);
1081 },
1082 PackMessageSize(LogType::kLogMessage, channel->max_size()),
1083 {StoredDataType::DATA, StoredDataType::TIMESTAMPS}});
1084 } else {
1085 // If we already have a data writer for the node, then use the same writer
1086 // for all channels of that node.
1087 NewDataWriter *result = FindNodeDataWriter(
1088 source_node,
1089 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
1090 if (result != nullptr) {
1091 return result;
1092 }
1093
1094 return AddNodeDataWriter(
1095 source_node,
1096 NewDataWriter{
1097 this,
1098 source_node,
1099 node_,
1100 [this, source_node](NewDataWriter *data_writer) {
1101 OpenNodeWriter(source_node, data_writer);
1102 },
1103 [this](NewDataWriter *data_writer) {
1104 CloseWriter(&data_writer->writer);
1105 },
1106 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()),
1107 {StoredDataType::DATA, StoredDataType::REMOTE_TIMESTAMPS}});
1108 }
1109}
1110
1111NewDataWriter *MinimalFileMultiNodeLogNamer::MakeTimestampWriter(
1112 const Channel *channel) {
1113 bool log_delivery_times = false;
1114 if (this->node() != nullptr) {
1115 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
1116 channel, this->node(), this->node());
1117 }
1118 if (!log_delivery_times) {
1119 return nullptr;
1120 }
1121
1122 // There is only one of these.
1123 NewDataWriter *result = FindNodeDataWriter(
1124 this->node(), PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
1125 if (result != nullptr) {
1126 return result;
1127 }
1128
1129 return AddNodeDataWriter(
1130 node_, NewDataWriter{this,
1131 node_,
1132 node_,
1133 [this](NewDataWriter *data_writer) {
1134 OpenNodeWriter(node_, data_writer);
1135 },
1136 [this](NewDataWriter *data_writer) {
1137 CloseWriter(&data_writer->writer);
1138 },
1139 PackMessageSize(LogType::kLogDeliveryTimeOnly, 0),
1140 {StoredDataType::DATA, StoredDataType::TIMESTAMPS}});
1141}
1142
1143NewDataWriter *MinimalFileMultiNodeLogNamer::MakeForwardedTimestampWriter(
1144 const Channel *channel, const Node *node) {
1145 // See if we can read the data on this node at all.
1146 const bool is_readable =
1147 configuration::ChannelIsReadableOnNode(channel, this->node());
1148 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
1149
1150 CHECK_NE(node, this->node());
1151
1152 // If we have a remote timestamp writer for a particular node, use the same
1153 // writer for all remote timestamp channels of that node.
1154 NewDataWriter *result = FindNodeDataWriter(node, PackRemoteMessageSize());
1155 if (result != nullptr) {
1156 return result;
1157 }
1158
1159 // If there are no remote timestamp writers for the node, create one.
1160 return AddNodeDataWriter(
1161 node,
1162 NewDataWriter{this,
1163 configuration::GetNode(configuration_, node),
1164 node_,
1165 [this, node](NewDataWriter *data_writer) {
1166 OpenNodeWriter(node, data_writer);
1167 },
1168 [this](NewDataWriter *data_writer) {
1169 CloseWriter(&data_writer->writer);
1170 },
1171 PackRemoteMessageSize(),
1172 {StoredDataType::DATA, StoredDataType::REMOTE_TIMESTAMPS}});
1173}
1174
1175void MinimalFileMultiNodeLogNamer::OpenNodeWriter(const Node *source_node,
1176 NewDataWriter *data_writer) {
1177 std::string filename;
1178
1179 if (node() != nullptr) {
1180 filename = absl::StrCat(node()->name()->string_view(), "_");
1181 }
1182
1183 if (source_node != nullptr) {
1184 absl::StrAppend(&filename, source_node->name()->string_view(), "_");
1185 }
1186
1187 absl::StrAppend(&filename, "all.part", data_writer->parts_index(), ".bfbs",
1188 extension_);
1189 VLOG(1) << "Going to open " << filename;
1190 CreateBufferWriter(filename, data_writer->max_message_size(),
1191 &data_writer->writer);
1192}
1193
Stephan Pleinesf63bde82024-01-13 15:59:33 -08001194} // namespace aos::logger