blob: 679d364b4f7991cdfcd8a0c5a2e6cb25b506bc87 [file] [log] [blame]
Austin Schuhcb5601b2020-09-10 15:29:59 -07001#include "aos/events/logging/log_namer.h"
2
3#include <functional>
4#include <map>
5#include <memory>
6#include <string_view>
7#include <vector>
8
9#include "absl/strings/str_cat.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070010#include "flatbuffers/flatbuffers.h"
11#include "glog/logging.h"
12
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -070013#include "aos/containers/error_list.h"
14#include "aos/containers/sized_array.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070015#include "aos/events/logging/logfile_utils.h"
16#include "aos/events/logging/logger_generated.h"
Austin Schuh73340842021-07-30 22:32:06 -070017#include "aos/flatbuffer_merge.h"
Austin Schuh4385b142021-03-14 21:31:13 -070018#include "aos/uuid.h"
Austin Schuhcb5601b2020-09-10 15:29:59 -070019
Austin Schuh8bdfc492023-02-11 12:53:13 -080020DECLARE_int32(flush_size);
21
Austin Schuhcb5601b2020-09-10 15:29:59 -070022namespace aos {
23namespace logger {
24
Austin Schuh572924a2021-07-30 22:32:12 -070025NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
Austin Schuhf5f99f32022-02-07 20:05:37 -080026 const Node *logger_node,
Austin Schuh572924a2021-07-30 22:32:12 -070027 std::function<void(NewDataWriter *)> reopen,
Austin Schuh48d10d62022-10-16 22:19:23 -070028 std::function<void(NewDataWriter *)> close,
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -070029 size_t max_message_size,
30 std::initializer_list<StoredDataType> types)
Austin Schuh572924a2021-07-30 22:32:12 -070031 : node_(node),
32 node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
Austin Schuhf5f99f32022-02-07 20:05:37 -080033 logger_node_index_(
34 configuration::GetNodeIndex(log_namer->configuration_, logger_node)),
Austin Schuh572924a2021-07-30 22:32:12 -070035 log_namer_(log_namer),
36 reopen_(std::move(reopen)),
Austin Schuh48d10d62022-10-16 22:19:23 -070037 close_(std::move(close)),
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -070038 max_message_size_(max_message_size),
39 max_out_of_order_duration_(log_namer_->base_max_out_of_order_duration()) {
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -070040 allowed_data_types_.fill(false);
41
Austin Schuh72211ae2021-08-05 14:02:30 -070042 state_.resize(configuration::NodesCount(log_namer->configuration_));
43 CHECK_LT(node_index_, state_.size());
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -070044 for (StoredDataType type : types) {
45 CHECK_LT(static_cast<size_t>(type), allowed_data_types_.size());
46 allowed_data_types_[static_cast<size_t>(type)] = true;
47 }
Austin Schuh572924a2021-07-30 22:32:12 -070048}
49
50NewDataWriter::~NewDataWriter() {
51 if (writer) {
52 Close();
53 }
54}
55
56void NewDataWriter::Rotate() {
Austin Schuhe46492f2021-07-31 19:49:41 -070057 // No need to rotate if nothing has been written.
58 if (header_written_) {
Alexei Strotsbc082d82023-05-03 08:43:42 -070059 VLOG(1) << "Rotated " << name();
Austin Schuhe46492f2021-07-31 19:49:41 -070060 ++parts_index_;
61 reopen_(this);
62 header_written_ = false;
63 QueueHeader(MakeHeader());
64 }
Austin Schuh572924a2021-07-30 22:32:12 -070065}
66
Austin Schuh5e14d842022-01-21 12:02:15 -080067void NewDataWriter::Reboot(const UUID &source_node_boot_uuid) {
Austin Schuh572924a2021-07-30 22:32:12 -070068 parts_uuid_ = UUID::Random();
69 ++parts_index_;
70 reopen_(this);
71 header_written_ = false;
Austin Schuh5e14d842022-01-21 12:02:15 -080072 for (State &state : state_) {
73 state.boot_uuid = UUID::Zero();
74 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
75 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
76 state.oldest_remote_unreliable_monotonic_timestamp =
77 monotonic_clock::max_time;
78 state.oldest_local_unreliable_monotonic_timestamp =
79 monotonic_clock::max_time;
Austin Schuhbfe6c572022-01-27 20:48:20 -080080 state.oldest_remote_reliable_monotonic_timestamp =
81 monotonic_clock::max_time;
Austin Schuhf5f99f32022-02-07 20:05:37 -080082 state.oldest_local_reliable_monotonic_timestamp = monotonic_clock::max_time;
83 state.oldest_logger_remote_unreliable_monotonic_timestamp =
84 monotonic_clock::max_time;
85 state.oldest_logger_local_unreliable_monotonic_timestamp =
Austin Schuhbfe6c572022-01-27 20:48:20 -080086 monotonic_clock::max_time;
Austin Schuh5e14d842022-01-21 12:02:15 -080087 }
88
89 state_[node_index_].boot_uuid = source_node_boot_uuid;
90
Alexei Strotsbc082d82023-05-03 08:43:42 -070091 VLOG(1) << "Rebooted " << name();
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -070092 newest_message_time_ = monotonic_clock::min_time;
93 // When a node reboots, parts_uuid changes but the same writer continues to
94 // write the data, so we can reset the max out of order duration. If we don't
95 // do this, the max out of order duration can grow to an unreasonable value.
96 max_out_of_order_duration_ = log_namer_->base_max_out_of_order_duration();
Austin Schuh5e14d842022-01-21 12:02:15 -080097}
98
99void NewDataWriter::UpdateBoot(const UUID &source_node_boot_uuid) {
100 if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
101 state_[node_index_].boot_uuid = source_node_boot_uuid;
102 if (header_written_) {
103 Reboot(source_node_boot_uuid);
104 }
105 }
Austin Schuh572924a2021-07-30 22:32:12 -0700106}
107
Austin Schuh72211ae2021-08-05 14:02:30 -0700108void NewDataWriter::UpdateRemote(
109 const size_t remote_node_index, const UUID &remote_node_boot_uuid,
110 const monotonic_clock::time_point monotonic_remote_time,
Austin Schuhf5f99f32022-02-07 20:05:37 -0800111 const monotonic_clock::time_point monotonic_event_time, const bool reliable,
112 monotonic_clock::time_point monotonic_timestamp_time) {
Austin Schuh58646e22021-08-23 23:51:46 -0700113 // Trigger rotation if anything in the header changes.
Austin Schuh72211ae2021-08-05 14:02:30 -0700114 bool rotate = false;
115 CHECK_LT(remote_node_index, state_.size());
116 State &state = state_[remote_node_index];
Austin Schuh58646e22021-08-23 23:51:46 -0700117
118 // Did the remote boot UUID change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700119 if (state.boot_uuid != remote_node_boot_uuid) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700120 VLOG(1) << name() << " Remote " << remote_node_index << " updated to "
Austin Schuh72211ae2021-08-05 14:02:30 -0700121 << remote_node_boot_uuid << " from " << state.boot_uuid;
122 state.boot_uuid = remote_node_boot_uuid;
123 state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
124 state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
125 state.oldest_remote_unreliable_monotonic_timestamp =
126 monotonic_clock::max_time;
127 state.oldest_local_unreliable_monotonic_timestamp =
128 monotonic_clock::max_time;
Austin Schuhbfe6c572022-01-27 20:48:20 -0800129 state.oldest_remote_reliable_monotonic_timestamp =
130 monotonic_clock::max_time;
Austin Schuhf5f99f32022-02-07 20:05:37 -0800131 state.oldest_local_reliable_monotonic_timestamp = monotonic_clock::max_time;
132 state.oldest_logger_remote_unreliable_monotonic_timestamp =
133 monotonic_clock::max_time;
134 state.oldest_logger_local_unreliable_monotonic_timestamp =
Austin Schuhbfe6c572022-01-27 20:48:20 -0800135 monotonic_clock::max_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700136 rotate = true;
137 }
138
Austin Schuh58646e22021-08-23 23:51:46 -0700139 // Did the unreliable timestamps change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700140 if (!reliable) {
141 if (state.oldest_remote_unreliable_monotonic_timestamp >
142 monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700143 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuh58646e22021-08-23 23:51:46 -0700144 << " oldest_remote_unreliable_monotonic_timestamp updated from "
145 << state.oldest_remote_unreliable_monotonic_timestamp << " to "
146 << monotonic_remote_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700147 state.oldest_remote_unreliable_monotonic_timestamp =
148 monotonic_remote_time;
149 state.oldest_local_unreliable_monotonic_timestamp = monotonic_event_time;
150 rotate = true;
151 }
Austin Schuhbfe6c572022-01-27 20:48:20 -0800152 } else {
153 if (state.oldest_remote_reliable_monotonic_timestamp >
154 monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700155 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuhbfe6c572022-01-27 20:48:20 -0800156 << " oldest_remote_reliable_monotonic_timestamp updated from "
157 << state.oldest_remote_reliable_monotonic_timestamp << " to "
158 << monotonic_remote_time;
159 state.oldest_remote_reliable_monotonic_timestamp = monotonic_remote_time;
160 state.oldest_local_reliable_monotonic_timestamp = monotonic_event_time;
161 rotate = true;
162 }
Austin Schuh72211ae2021-08-05 14:02:30 -0700163 }
164
Austin Schuhf5f99f32022-02-07 20:05:37 -0800165 // Track the logger timestamps too.
166 if (monotonic_timestamp_time != monotonic_clock::min_time) {
167 State &logger_state = state_[node_index_];
168 CHECK_EQ(remote_node_index, logger_node_index_);
169 if (monotonic_event_time <
170 logger_state.oldest_logger_remote_unreliable_monotonic_timestamp) {
171 VLOG(1)
Alexei Strotsbc082d82023-05-03 08:43:42 -0700172 << name() << " Remote " << node_index_
Austin Schuhf5f99f32022-02-07 20:05:37 -0800173 << " oldest_logger_remote_unreliable_monotonic_timestamp updated "
174 "from "
175 << logger_state.oldest_logger_remote_unreliable_monotonic_timestamp
176 << " to " << monotonic_event_time;
177 logger_state.oldest_logger_remote_unreliable_monotonic_timestamp =
178 monotonic_event_time;
179 logger_state.oldest_logger_local_unreliable_monotonic_timestamp =
180 monotonic_timestamp_time;
181
182 rotate = true;
183 }
184 }
185
Austin Schuh58646e22021-08-23 23:51:46 -0700186 // Did any of the timestamps change?
Austin Schuh72211ae2021-08-05 14:02:30 -0700187 if (state.oldest_remote_monotonic_timestamp > monotonic_remote_time) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700188 VLOG(1) << name() << " Remote " << remote_node_index
Austin Schuh58646e22021-08-23 23:51:46 -0700189 << " oldest_remote_monotonic_timestamp updated from "
190 << state.oldest_remote_monotonic_timestamp << " to "
191 << monotonic_remote_time;
Austin Schuh72211ae2021-08-05 14:02:30 -0700192 state.oldest_remote_monotonic_timestamp = monotonic_remote_time;
193 state.oldest_local_monotonic_timestamp = monotonic_event_time;
194 rotate = true;
195 }
196
197 if (rotate) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700198 Rotate();
199 }
200}
201
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700202void NewDataWriter::CopyDataMessage(
203 DataEncoder::Copier *coppier, const UUID &source_node_boot_uuid,
204 aos::monotonic_clock::time_point now,
205 aos::monotonic_clock::time_point message_time) {
206 CHECK(allowed_data_types_[static_cast<size_t>(StoredDataType::DATA)])
207 << ": Tried to write data on non-data writer.";
208 CopyMessage(coppier, source_node_boot_uuid, now, message_time);
209}
210
211void NewDataWriter::CopyTimestampMessage(
212 DataEncoder::Copier *coppier, const UUID &source_node_boot_uuid,
213 aos::monotonic_clock::time_point now,
214 aos::monotonic_clock::time_point message_time) {
215 CHECK(allowed_data_types_[static_cast<size_t>(StoredDataType::TIMESTAMPS)])
216 << ": Tried to write timestamps on non-timestamp writer.";
217 CopyMessage(coppier, source_node_boot_uuid, now, message_time);
218}
219
220void NewDataWriter::CopyRemoteTimestampMessage(
221 DataEncoder::Copier *coppier, const UUID &source_node_boot_uuid,
222 aos::monotonic_clock::time_point now,
223 aos::monotonic_clock::time_point message_time) {
224 CHECK(allowed_data_types_[static_cast<size_t>(
225 StoredDataType::REMOTE_TIMESTAMPS)])
226 << ": Tried to write remote timestamps on non-remote timestamp writer.";
227 CopyMessage(coppier, source_node_boot_uuid, now, message_time);
228}
229
Austin Schuh48d10d62022-10-16 22:19:23 -0700230void NewDataWriter::CopyMessage(DataEncoder::Copier *coppier,
231 const UUID &source_node_boot_uuid,
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700232 aos::monotonic_clock::time_point now,
233 aos::monotonic_clock::time_point message_time) {
Austin Schuh58646e22021-08-23 23:51:46 -0700234 // Trigger a reboot if we detect the boot UUID change.
Austin Schuh5e14d842022-01-21 12:02:15 -0800235 UpdateBoot(source_node_boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -0700236
Austin Schuh5e14d842022-01-21 12:02:15 -0800237 if (!header_written_) {
Austin Schuhe46492f2021-07-31 19:49:41 -0700238 QueueHeader(MakeHeader());
Austin Schuh572924a2021-07-30 22:32:12 -0700239 }
Austin Schuh58646e22021-08-23 23:51:46 -0700240
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700241 bool max_out_of_order_duration_exceeded = false;
242 // Enforce max out of duration contract.
243 //
244 // Updates the newest message time.
245 // Rotate the part file if current message is more than
246 // max_out_of_order_duration behind the newest message we've logged so far.
247 if (message_time > newest_message_time_) {
248 newest_message_time_ = message_time;
249 }
250
251 // Don't consider messages before start up when checking for max out of order
252 // duration.
253 monotonic_clock::time_point monotonic_start_time =
254 log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid);
255
256 if (std::chrono::nanoseconds((newest_message_time_ -
257 std::max(monotonic_start_time, message_time))) >
258 max_out_of_order_duration_) {
259 // If the new message is older than 2 * max_out_order_duration, doubling it
260 // won't be sufficient.
261 //
262 // Example: newest_message_time = 10, logged_message_time = 5,
263 // max_ooo_duration = 2
264 //
265 // In this case actual max_ooo_duration = 10 - 5 = 5, but we double the
266 // existing max_ooo_duration we get 4 which is not sufficient.
267 //
268 // Take the max of the two values.
269 max_out_of_order_duration_ =
270 2 * std::max(max_out_of_order_duration_,
271 std::chrono::nanoseconds(
272 (newest_message_time_ - message_time)));
273 max_out_of_order_duration_exceeded = true;
274 }
275
Austin Schuh58646e22021-08-23 23:51:46 -0700276 // If the start time has changed for this node, trigger a rotation.
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700277 if ((monotonic_start_time != monotonic_start_time_) ||
278 max_out_of_order_duration_exceeded) {
279 // If we just received a start time now, we will rotate parts shortly. Use a
280 // reasonable max out of order durationin the new header based on start time
281 // information available now.
282 if ((monotonic_start_time_ == monotonic_clock::min_time) &&
283 (monotonic_start_time != monotonic_clock::min_time)) {
284 // If we're writing current messages but we receive an older start time,
285 // we can pick a reasonable max ooo duration number for the next part.
286 //
287 // For example - Our current max ooo duration is 0.3 seconds. We're
288 // writing messages at 20 seconds and recieve a start time of 1 second. We
289 // don't need max ooo duration to be (20 - 1) = 19 seconds although that
290 // would still work.
291 //
292 // Pick the minimum max out of duration value that satisifies the
293 // requirement but bound the minimum at the base value we started with.
294 max_out_of_order_duration_ =
295 std::max(log_namer_->base_max_out_of_order_duration(),
296 std::min(max_out_of_order_duration_,
297 std::chrono::nanoseconds(newest_message_time_ -
298 monotonic_start_time)));
299 }
Austin Schuh58646e22021-08-23 23:51:46 -0700300 CHECK(header_written_);
301 Rotate();
302 }
303
304 CHECK_EQ(log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid),
305 monotonic_start_time_);
Austin Schuh72211ae2021-08-05 14:02:30 -0700306 CHECK_EQ(state_[node_index_].boot_uuid, source_node_boot_uuid);
milind-ua50344f2021-08-25 18:22:20 -0700307 CHECK(writer);
Austin Schuh572924a2021-07-30 22:32:12 -0700308 CHECK(header_written_) << ": Attempting to write message before header to "
Alexei Strotsbc082d82023-05-03 08:43:42 -0700309 << writer->name();
Austin Schuh48d10d62022-10-16 22:19:23 -0700310 writer->CopyMessage(coppier, now);
Austin Schuh572924a2021-07-30 22:32:12 -0700311}
312
Austin Schuhe46492f2021-07-31 19:49:41 -0700313aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
314NewDataWriter::MakeHeader() {
315 const size_t logger_node_index = log_namer_->logger_node_index();
316 const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
Austin Schuh72211ae2021-08-05 14:02:30 -0700317 if (state_[logger_node_index].boot_uuid == UUID::Zero()) {
Alexei Strotsbc082d82023-05-03 08:43:42 -0700318 VLOG(1) << name() << " Logger node is " << logger_node_index
Austin Schuhe46492f2021-07-31 19:49:41 -0700319 << " and uuid is " << logger_node_boot_uuid;
Austin Schuh72211ae2021-08-05 14:02:30 -0700320 state_[logger_node_index].boot_uuid = logger_node_boot_uuid;
Austin Schuhe46492f2021-07-31 19:49:41 -0700321 } else {
Austin Schuh72211ae2021-08-05 14:02:30 -0700322 CHECK_EQ(state_[logger_node_index].boot_uuid, logger_node_boot_uuid);
Austin Schuhe46492f2021-07-31 19:49:41 -0700323 }
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700324 return log_namer_->MakeHeader(node_index_, state_, parts_uuid(), parts_index_,
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700325 max_out_of_order_duration_,
326 allowed_data_types_);
Austin Schuhe46492f2021-07-31 19:49:41 -0700327}
328
Austin Schuh572924a2021-07-30 22:32:12 -0700329void NewDataWriter::QueueHeader(
330 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header) {
331 CHECK(!header_written_) << ": Attempting to write duplicate header to "
Alexei Strotsbc082d82023-05-03 08:43:42 -0700332 << writer->name();
Austin Schuh572924a2021-07-30 22:32:12 -0700333 CHECK(header.message().has_source_node_boot_uuid());
Austin Schuh72211ae2021-08-05 14:02:30 -0700334 CHECK_EQ(state_[node_index_].boot_uuid,
Austin Schuhe46492f2021-07-31 19:49:41 -0700335 UUID::FromString(header.message().source_node_boot_uuid()));
Austin Schuh510dc622021-08-06 18:47:30 -0700336 if (!writer) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700337 // Since we haven't opened the first time, it's still not too late to update
338 // the max message size. Make sure the header fits.
339 //
340 // This won't work well on reboots, but the structure of the header is fixed
341 // by that point in time, so it's size is fixed too.
342 //
343 // Most of the time, the minimum buffer size inside the encoder of around
344 // 128k will make this a non-issue.
345 UpdateMaxMessageSize(header.span().size());
346
Austin Schuh510dc622021-08-06 18:47:30 -0700347 reopen_(this);
348 }
349
Alexei Strotsbc082d82023-05-03 08:43:42 -0700350 VLOG(1) << "Writing to " << name() << " "
Austin Schuh58646e22021-08-23 23:51:46 -0700351 << aos::FlatbufferToJson(
352 header, {.multi_line = false, .max_vector_size = 100});
353
Austin Schuh510dc622021-08-06 18:47:30 -0700354 CHECK(writer);
Austin Schuh7ef11a42023-02-04 17:15:12 -0800355 DataEncoder::SpanCopier coppier(header.span());
356 writer->CopyMessage(&coppier, aos::monotonic_clock::now());
Austin Schuh572924a2021-07-30 22:32:12 -0700357 header_written_ = true;
Austin Schuh58646e22021-08-23 23:51:46 -0700358 monotonic_start_time_ = log_namer_->monotonic_start_time(
359 node_index_, state_[node_index_].boot_uuid);
Austin Schuh572924a2021-07-30 22:32:12 -0700360}
361
362void NewDataWriter::Close() {
363 CHECK(writer);
364 close_(this);
365 writer.reset();
366 header_written_ = false;
367}
368
Austin Schuh58646e22021-08-23 23:51:46 -0700369LogNamer::NodeState *LogNamer::GetNodeState(size_t node_index,
370 const UUID &boot_uuid) {
371 auto it = node_states_.find(std::make_pair(node_index, boot_uuid));
372 if (it == node_states_.end()) {
373 it =
374 node_states_.emplace(std::make_pair(node_index, boot_uuid), NodeState())
375 .first;
376 }
377 return &it->second;
378}
379
Austin Schuh73340842021-07-30 22:32:06 -0700380aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
Austin Schuh72211ae2021-08-05 14:02:30 -0700381 size_t node_index, const std::vector<NewDataWriter::State> &state,
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700382 const UUID &parts_uuid, int parts_index,
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700383 std::chrono::nanoseconds max_out_of_order_duration,
384 const std::array<bool, static_cast<size_t>(StoredDataType::MAX) + 1>
385 &allowed_data_types) {
Austin Schuh72211ae2021-08-05 14:02:30 -0700386 const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
Austin Schuh73340842021-07-30 22:32:06 -0700387 const Node *const source_node =
388 configuration::GetNode(configuration_, node_index);
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700389 CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 35u);
Austin Schuh73340842021-07-30 22:32:06 -0700390 flatbuffers::FlatBufferBuilder fbb;
391 fbb.ForceDefaults(true);
392
393 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
394 flatbuffers::Offset<aos::Configuration> configuration_offset;
395 if (header_.message().has_configuration()) {
396 CHECK(!header_.message().has_configuration_sha256());
397 configuration_offset =
398 CopyFlatBuffer(header_.message().configuration(), &fbb);
399 } else {
400 CHECK(!header_.message().has_configuration());
401 CHECK(header_.message().has_configuration_sha256());
402 config_sha256_offset = fbb.CreateString(
403 header_.message().configuration_sha256()->string_view());
404 }
405
406 CHECK(header_.message().has_name());
407 const flatbuffers::Offset<flatbuffers::String> name_offset =
408 fbb.CreateString(header_.message().name()->string_view());
Austin Schuhfa712682022-05-11 16:43:42 -0700409 const flatbuffers::Offset<flatbuffers::String> logger_sha1_offset =
410 header_.message().has_logger_sha1()
411 ? fbb.CreateString(header_.message().logger_sha1()->string_view())
412 : 0;
413 const flatbuffers::Offset<flatbuffers::String> logger_version_offset =
414 header_.message().has_logger_version()
415 ? fbb.CreateString(header_.message().logger_version()->string_view())
416 : 0;
Austin Schuh73340842021-07-30 22:32:06 -0700417
418 CHECK(header_.message().has_log_event_uuid());
419 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
420 fbb.CreateString(header_.message().log_event_uuid()->string_view());
421
422 CHECK(header_.message().has_logger_instance_uuid());
423 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
424 fbb.CreateString(header_.message().logger_instance_uuid()->string_view());
425
426 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
427 if (header_.message().has_log_start_uuid()) {
428 log_start_uuid_offset =
429 fbb.CreateString(header_.message().log_start_uuid()->string_view());
430 }
431
432 CHECK(header_.message().has_logger_node_boot_uuid());
433 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
434 fbb.CreateString(
435 header_.message().logger_node_boot_uuid()->string_view());
436
437 CHECK_NE(source_node_boot_uuid, UUID::Zero());
438 const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
439 source_node_boot_uuid.PackString(&fbb);
440
441 const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
442 parts_uuid.PackString(&fbb);
443
444 flatbuffers::Offset<Node> node_offset;
445 flatbuffers::Offset<Node> logger_node_offset;
446
447 if (configuration::MultiNode(configuration_)) {
448 node_offset = RecursiveCopyFlatBuffer(source_node, &fbb);
449 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
450 }
451
Austin Schuhe46492f2021-07-31 19:49:41 -0700452 std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
Austin Schuh72211ae2021-08-05 14:02:30 -0700453 boot_uuid_offsets.reserve(state.size());
Austin Schuhe46492f2021-07-31 19:49:41 -0700454
Austin Schuh4db9ec92021-09-22 13:11:12 -0700455 int64_t *unused;
Austin Schuh72211ae2021-08-05 14:02:30 -0700456 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
Austin Schuhf5f99f32022-02-07 20:05:37 -0800457 oldest_remote_monotonic_timestamps_offset =
458 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700459
Austin Schuh72211ae2021-08-05 14:02:30 -0700460 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
Austin Schuhf5f99f32022-02-07 20:05:37 -0800461 oldest_local_monotonic_timestamps_offset =
462 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700463
Austin Schuh72211ae2021-08-05 14:02:30 -0700464 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
465 oldest_remote_unreliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800466 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700467
Austin Schuh72211ae2021-08-05 14:02:30 -0700468 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
469 oldest_local_unreliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800470 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuh72211ae2021-08-05 14:02:30 -0700471
Austin Schuhbfe6c572022-01-27 20:48:20 -0800472 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
473 oldest_remote_reliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800474 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800475
476 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
477 oldest_local_reliable_monotonic_timestamps_offset =
Austin Schuhf5f99f32022-02-07 20:05:37 -0800478 fbb.CreateUninitializedVector(state.size(), &unused);
479
480 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
481 oldest_logger_remote_unreliable_monotonic_timestamps_offset =
482 fbb.CreateUninitializedVector(state.size(), &unused);
483
484 flatbuffers::Offset<flatbuffers::Vector<int64_t>>
485 oldest_logger_local_unreliable_monotonic_timestamps_offset =
486 fbb.CreateUninitializedVector(state.size(), &unused);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800487
Austin Schuh72211ae2021-08-05 14:02:30 -0700488 for (size_t i = 0; i < state.size(); ++i) {
Austin Schuh4db9ec92021-09-22 13:11:12 -0700489 if (state[i].boot_uuid != UUID::Zero()) {
490 boot_uuid_offsets.emplace_back(state[i].boot_uuid.PackString(&fbb));
491 } else {
492 boot_uuid_offsets.emplace_back(fbb.CreateString(""));
493 }
Austin Schuh5ae8f4a2021-09-11 19:09:50 -0700494 if (state[i].boot_uuid == UUID::Zero()) {
495 CHECK_EQ(state[i].oldest_remote_monotonic_timestamp,
496 monotonic_clock::max_time);
497 CHECK_EQ(state[i].oldest_local_monotonic_timestamp,
498 monotonic_clock::max_time);
499 CHECK_EQ(state[i].oldest_remote_unreliable_monotonic_timestamp,
500 monotonic_clock::max_time);
501 CHECK_EQ(state[i].oldest_local_unreliable_monotonic_timestamp,
502 monotonic_clock::max_time);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800503 CHECK_EQ(state[i].oldest_remote_reliable_monotonic_timestamp,
504 monotonic_clock::max_time);
505 CHECK_EQ(state[i].oldest_local_reliable_monotonic_timestamp,
506 monotonic_clock::max_time);
Austin Schuhf5f99f32022-02-07 20:05:37 -0800507 CHECK_EQ(state[i].oldest_logger_remote_unreliable_monotonic_timestamp,
508 monotonic_clock::max_time);
509 CHECK_EQ(state[i].oldest_logger_local_unreliable_monotonic_timestamp,
510 monotonic_clock::max_time);
Austin Schuh5ae8f4a2021-09-11 19:09:50 -0700511 }
512
Austin Schuh4db9ec92021-09-22 13:11:12 -0700513 flatbuffers::GetMutableTemporaryPointer(
514 fbb, oldest_remote_monotonic_timestamps_offset)
515 ->Mutate(i, state[i]
516 .oldest_remote_monotonic_timestamp.time_since_epoch()
517 .count());
518 flatbuffers::GetMutableTemporaryPointer(
519 fbb, oldest_local_monotonic_timestamps_offset)
520 ->Mutate(i, state[i]
521 .oldest_local_monotonic_timestamp.time_since_epoch()
522 .count());
523 flatbuffers::GetMutableTemporaryPointer(
524 fbb, oldest_remote_unreliable_monotonic_timestamps_offset)
525 ->Mutate(i, state[i]
Austin Schuhbfe6c572022-01-27 20:48:20 -0800526 .oldest_remote_unreliable_monotonic_timestamp
527 .time_since_epoch()
Austin Schuh4db9ec92021-09-22 13:11:12 -0700528 .count());
529 flatbuffers::GetMutableTemporaryPointer(
530 fbb, oldest_local_unreliable_monotonic_timestamps_offset)
531 ->Mutate(i, state[i]
Austin Schuhbfe6c572022-01-27 20:48:20 -0800532 .oldest_local_unreliable_monotonic_timestamp
533 .time_since_epoch()
Austin Schuh4db9ec92021-09-22 13:11:12 -0700534 .count());
Austin Schuhbfe6c572022-01-27 20:48:20 -0800535
536 flatbuffers::GetMutableTemporaryPointer(
537 fbb, oldest_remote_reliable_monotonic_timestamps_offset)
538 ->Mutate(i, state[i]
539 .oldest_remote_reliable_monotonic_timestamp
540 .time_since_epoch()
541 .count());
542 flatbuffers::GetMutableTemporaryPointer(
543 fbb, oldest_local_reliable_monotonic_timestamps_offset)
544 ->Mutate(
545 i, state[i]
546 .oldest_local_reliable_monotonic_timestamp.time_since_epoch()
547 .count());
Austin Schuhf5f99f32022-02-07 20:05:37 -0800548
549 flatbuffers::GetMutableTemporaryPointer(
550 fbb, oldest_logger_remote_unreliable_monotonic_timestamps_offset)
551 ->Mutate(i, state[i]
552 .oldest_logger_remote_unreliable_monotonic_timestamp
553 .time_since_epoch()
554 .count());
555 flatbuffers::GetMutableTemporaryPointer(
556 fbb, oldest_logger_local_unreliable_monotonic_timestamps_offset)
557 ->Mutate(i, state[i]
558 .oldest_logger_local_unreliable_monotonic_timestamp
559 .time_since_epoch()
560 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700561 }
562
Austin Schuh4db9ec92021-09-22 13:11:12 -0700563 flatbuffers::Offset<
564 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
565 boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
566
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700567 aos::ErrorList<StoredDataType> allowed_data_types_vector;
568 for (size_t type = static_cast<size_t>(StoredDataType::MIN);
569 type <= static_cast<size_t>(StoredDataType::MAX); ++type) {
570 if (allowed_data_types[type]) {
571 allowed_data_types_vector.Set(static_cast<StoredDataType>(type));
572 }
573 }
574
575 flatbuffers::Offset<flatbuffers::Vector<StoredDataType>> data_stored_offset =
576 fbb.CreateVector(allowed_data_types_vector.data(),
577 allowed_data_types_vector.size());
578
Austin Schuh73340842021-07-30 22:32:06 -0700579 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
580
581 log_file_header_builder.add_name(name_offset);
Austin Schuhfa712682022-05-11 16:43:42 -0700582 if (!logger_sha1_offset.IsNull()) {
583 log_file_header_builder.add_logger_sha1(logger_sha1_offset);
584 }
585 if (!logger_version_offset.IsNull()) {
586 log_file_header_builder.add_logger_version(logger_version_offset);
587 }
Austin Schuh73340842021-07-30 22:32:06 -0700588
589 // Only add the node if we are running in a multinode configuration.
590 if (!logger_node_offset.IsNull()) {
591 log_file_header_builder.add_node(node_offset);
592 log_file_header_builder.add_logger_node(logger_node_offset);
593 }
594
595 if (!configuration_offset.IsNull()) {
596 log_file_header_builder.add_configuration(configuration_offset);
597 }
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700598
Austin Schuh73340842021-07-30 22:32:06 -0700599 log_file_header_builder.add_max_out_of_order_duration(
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700600 max_out_of_order_duration.count());
Austin Schuh73340842021-07-30 22:32:06 -0700601
Austin Schuh58646e22021-08-23 23:51:46 -0700602 NodeState *node_state = GetNodeState(node_index, source_node_boot_uuid);
Austin Schuh73340842021-07-30 22:32:06 -0700603 log_file_header_builder.add_monotonic_start_time(
604 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700605 node_state->monotonic_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700606 .count());
607 if (source_node == node_) {
608 log_file_header_builder.add_realtime_start_time(
609 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700610 node_state->realtime_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700611 .count());
612 } else {
613 // Fill out the legacy start times. Since these were implemented to never
614 // change on reboot, they aren't very helpful in tracking what happened.
615 log_file_header_builder.add_logger_monotonic_start_time(
616 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700617 node_state->logger_monotonic_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700618 .count());
619 log_file_header_builder.add_logger_realtime_start_time(
620 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh58646e22021-08-23 23:51:46 -0700621 node_state->logger_realtime_start_time.time_since_epoch())
Austin Schuh73340842021-07-30 22:32:06 -0700622 .count());
623 }
624
625 // TODO(austin): Add more useful times. When was this part started? What do
626 // we know about both the logger and remote then?
627
628 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
629 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
630 if (!log_start_uuid_offset.IsNull()) {
631 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
632 }
633 log_file_header_builder.add_logger_node_boot_uuid(
634 logger_node_boot_uuid_offset);
635 log_file_header_builder.add_source_node_boot_uuid(
636 source_node_boot_uuid_offset);
637
638 log_file_header_builder.add_parts_uuid(parts_uuid_offset);
639 log_file_header_builder.add_parts_index(parts_index);
640
641 if (!config_sha256_offset.IsNull()) {
642 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
643 }
644
Austin Schuhe46492f2021-07-31 19:49:41 -0700645 log_file_header_builder.add_boot_uuids(boot_uuids_offset);
Austin Schuha499cea2021-07-31 19:49:53 -0700646 log_file_header_builder.add_logger_part_monotonic_start_time(
647 std::chrono::duration_cast<std::chrono::nanoseconds>(
648 event_loop_->monotonic_now().time_since_epoch())
649 .count());
650 log_file_header_builder.add_logger_part_realtime_start_time(
651 std::chrono::duration_cast<std::chrono::nanoseconds>(
652 event_loop_->realtime_now().time_since_epoch())
653 .count());
Austin Schuh72211ae2021-08-05 14:02:30 -0700654 log_file_header_builder.add_oldest_remote_monotonic_timestamps(
655 oldest_remote_monotonic_timestamps_offset);
656 log_file_header_builder.add_oldest_local_monotonic_timestamps(
657 oldest_local_monotonic_timestamps_offset);
658 log_file_header_builder.add_oldest_remote_unreliable_monotonic_timestamps(
659 oldest_remote_unreliable_monotonic_timestamps_offset);
660 log_file_header_builder.add_oldest_local_unreliable_monotonic_timestamps(
661 oldest_local_unreliable_monotonic_timestamps_offset);
Austin Schuhbfe6c572022-01-27 20:48:20 -0800662 log_file_header_builder.add_oldest_remote_reliable_monotonic_timestamps(
663 oldest_remote_reliable_monotonic_timestamps_offset);
664 log_file_header_builder.add_oldest_local_reliable_monotonic_timestamps(
665 oldest_local_reliable_monotonic_timestamps_offset);
Austin Schuhf5f99f32022-02-07 20:05:37 -0800666 log_file_header_builder
667 .add_oldest_logger_remote_unreliable_monotonic_timestamps(
668 oldest_logger_remote_unreliable_monotonic_timestamps_offset);
669 log_file_header_builder
670 .add_oldest_logger_local_unreliable_monotonic_timestamps(
671 oldest_logger_local_unreliable_monotonic_timestamps_offset);
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700672
673 log_file_header_builder.add_data_stored(data_stored_offset);
Austin Schuh73340842021-07-30 22:32:06 -0700674 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
675 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
676 fbb.Release());
677
678 CHECK(result.Verify()) << ": Built a corrupted header.";
679
680 return result;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700681}
682
Alexei Strotsbc082d82023-05-03 08:43:42 -0700683MultiNodeLogNamer::MultiNodeLogNamer(std::unique_ptr<LogBackend> log_backend,
684 EventLoop *event_loop)
Alexei Strots01395492023-03-20 13:59:56 -0700685 : MultiNodeLogNamer(std::move(log_backend), event_loop->configuration(),
686 event_loop, event_loop->node()) {}
Austin Schuh5b728b72021-06-16 14:57:15 -0700687
Alexei Strotsbc082d82023-05-03 08:43:42 -0700688MultiNodeLogNamer::MultiNodeLogNamer(std::unique_ptr<LogBackend> log_backend,
689 const Configuration *configuration,
690 EventLoop *event_loop, const Node *node)
Austin Schuh5b728b72021-06-16 14:57:15 -0700691 : LogNamer(configuration, event_loop, node),
Alexei Strots01395492023-03-20 13:59:56 -0700692 log_backend_(std::move(log_backend)),
Austin Schuh8bdfc492023-02-11 12:53:13 -0800693 encoder_factory_([](size_t max_message_size) {
694 // TODO(austin): For slow channels, can we allocate less memory?
695 return std::make_unique<DummyEncoder>(max_message_size,
696 FLAGS_flush_size);
697 }) {}
Austin Schuhcb5601b2020-09-10 15:29:59 -0700698
Brian Silverman48deab12020-09-30 18:39:28 -0700699MultiNodeLogNamer::~MultiNodeLogNamer() {
700 if (!ran_out_of_space_) {
701 // This handles renaming temporary files etc.
702 Close();
703 }
704}
705
Austin Schuh572924a2021-07-30 22:32:12 -0700706void MultiNodeLogNamer::Rotate(const Node *node) {
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700707 for (auto &data_map : {&node_data_writers_, &node_timestamp_writers_}) {
708 auto it = data_map->find(node);
709 if (it != data_map->end()) {
710 it->second.Rotate();
Austin Schuhcb5601b2020-09-10 15:29:59 -0700711 }
712 }
713}
714
Austin Schuh8c399962020-12-25 21:51:45 -0800715void MultiNodeLogNamer::WriteConfiguration(
716 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
717 std::string_view config_sha256) {
718 if (ran_out_of_space_) {
719 return;
720 }
721
Alexei Strots01395492023-03-20 13:59:56 -0700722 const std::string filename = absl::StrCat(config_sha256, ".bfbs", extension_);
723 auto file_handle = log_backend_->RequestFile(filename);
Austin Schuh8c399962020-12-25 21:51:45 -0800724 std::unique_ptr<DetachedBufferWriter> writer =
Austin Schuh48d10d62022-10-16 22:19:23 -0700725 std::make_unique<DetachedBufferWriter>(
Alexei Strots01395492023-03-20 13:59:56 -0700726 std::move(file_handle), encoder_factory_(header->span().size()));
Austin Schuh8c399962020-12-25 21:51:45 -0800727
Austin Schuh7ef11a42023-02-04 17:15:12 -0800728 DataEncoder::SpanCopier coppier(header->span());
729 writer->CopyMessage(&coppier, aos::monotonic_clock::now());
Austin Schuh8c399962020-12-25 21:51:45 -0800730
731 if (!writer->ran_out_of_space()) {
Alexei Strots01395492023-03-20 13:59:56 -0700732 all_filenames_.emplace_back(filename);
Austin Schuh8c399962020-12-25 21:51:45 -0800733 }
Alexei Strots01395492023-03-20 13:59:56 -0700734 // Close the file and maybe rename it too.
Austin Schuh8c399962020-12-25 21:51:45 -0800735 CloseWriter(&writer);
736}
737
Austin Schuhb8bca732021-07-30 22:32:00 -0700738NewDataWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700739 // See if we can read the data on this node at all.
740 const bool is_readable =
741 configuration::ChannelIsReadableOnNode(channel, this->node());
742 if (!is_readable) {
743 return nullptr;
744 }
745
746 // Then, see if we are supposed to log the data here.
747 const bool log_message =
748 configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
749
750 if (!log_message) {
751 return nullptr;
752 }
753
Austin Schuhcb5601b2020-09-10 15:29:59 -0700754 // Ok, we have data that is being forwarded to us that we are supposed to
755 // log. It needs to be logged with send timestamps, but be sorted enough
756 // to be able to be processed.
Austin Schuhcb5601b2020-09-10 15:29:59 -0700757
758 // Track that this node is being logged.
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700759 const Node *source_node =
760 configuration::MultiNode(configuration_)
761 ? configuration::GetNode(configuration_,
762 channel->source_node()->string_view())
763 : nullptr;
Austin Schuhcb5601b2020-09-10 15:29:59 -0700764
765 if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
766 nodes_.emplace_back(source_node);
767 }
768
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700769 // If we already have a data writer for the node, then use the same writer for
770 // all channels of that node.
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700771 auto it = node_data_writers_.find(source_node);
772 if (it != node_data_writers_.end()) {
773 it->second.UpdateMaxMessageSize(
774 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
775 return &(it->second);
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700776 }
777
778 // If we don't have a data writer for the node, create one.
Austin Schuhf5f99f32022-02-07 20:05:37 -0800779 NewDataWriter data_writer(
780 this, source_node, node_,
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700781 [this, source_node](NewDataWriter *data_writer) {
782 OpenDataWriter(source_node, data_writer);
Austin Schuhf5f99f32022-02-07 20:05:37 -0800783 },
Austin Schuh48d10d62022-10-16 22:19:23 -0700784 [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700785 PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()),
786 {StoredDataType::DATA});
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700787
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700788 auto result = node_data_writers_.emplace(source_node, std::move(data_writer));
789 CHECK(result.second);
790 return &(result.first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700791}
792
Austin Schuhb8bca732021-07-30 22:32:00 -0700793NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
Austin Schuhcb5601b2020-09-10 15:29:59 -0700794 const Channel *channel, const Node *node) {
795 // See if we can read the data on this node at all.
796 const bool is_readable =
797 configuration::ChannelIsReadableOnNode(channel, this->node());
798 CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
799
Austin Schuhcb5601b2020-09-10 15:29:59 -0700800 if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
801 nodes_.emplace_back(node);
802 }
803
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700804 CHECK_NE(node, this->node());
805
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700806 // If we have a remote timestamp writer for a particular node, use the same
807 // writer for all remote timestamp channels of that node.
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700808 auto it = node_timestamp_writers_.find(node);
809 if (it != node_timestamp_writers_.end()) {
810 return &(it->second);
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700811 }
812
813 // If there are no remote timestamp writers for the node, create one.
Austin Schuhf5f99f32022-02-07 20:05:37 -0800814 NewDataWriter data_writer(
815 this, configuration::GetNode(configuration_, node), node_,
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700816 [this](NewDataWriter *data_writer) {
817 OpenForwardedTimestampWriter(node_, data_writer);
Austin Schuhf5f99f32022-02-07 20:05:37 -0800818 },
Austin Schuh48d10d62022-10-16 22:19:23 -0700819 [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700820 PackRemoteMessageSize(), {StoredDataType::REMOTE_TIMESTAMPS});
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700821
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700822 auto result = node_timestamp_writers_.emplace(node, std::move(data_writer));
823 CHECK(result.second);
824 return &(result.first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700825}
826
Austin Schuhb8bca732021-07-30 22:32:00 -0700827NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700828 bool log_delivery_times = false;
829 if (this->node() != nullptr) {
830 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
831 channel, this->node(), this->node());
832 }
Austin Schuhcb5601b2020-09-10 15:29:59 -0700833 if (!log_delivery_times) {
834 return nullptr;
835 }
836
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700837 // There is only one of these.
838 auto it = node_timestamp_writers_.find(this->node());
839 if (it != node_timestamp_writers_.end()) {
840 it->second.UpdateMaxMessageSize(
841 PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
842 return &(it->second);
Brian Silvermancb805822020-10-06 17:43:35 -0700843 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700844
845 NewDataWriter data_writer(
846 this, node_, node_,
847 [this](NewDataWriter *data_writer) { OpenTimestampWriter(data_writer); },
848 [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
849 PackMessageSize(LogType::kLogDeliveryTimeOnly, 0),
850 {StoredDataType::TIMESTAMPS});
851
852 auto result = node_timestamp_writers_.emplace(node_, std::move(data_writer));
853 CHECK(result.second);
854 return &(result.first->second);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700855}
856
Austin Schuh08dba8f2023-05-01 08:29:30 -0700857WriteCode MultiNodeLogNamer::Close() {
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700858 node_data_writers_.clear();
Mithun Bharadwaj99aec9e2023-08-02 16:10:40 -0700859 node_timestamp_writers_.clear();
Austin Schuh08dba8f2023-05-01 08:29:30 -0700860 if (ran_out_of_space_) {
861 return WriteCode::kOutOfSpace;
862 }
863 return WriteCode::kOk;
Brian Silvermancb805822020-10-06 17:43:35 -0700864}
865
866void MultiNodeLogNamer::ResetStatistics() {
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700867 for (std::pair<const Node *const, NewDataWriter> &data_writer :
868 node_data_writers_) {
Austin Schuhad0cfc32020-12-21 12:34:26 -0800869 if (!data_writer.second.writer) continue;
Alexei Strots01395492023-03-20 13:59:56 -0700870 data_writer.second.writer->WriteStatistics()->ResetStats();
Brian Silverman0465fcf2020-09-24 00:29:18 -0700871 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700872 for (std::pair<const Node *const, NewDataWriter> &data_writer :
873 node_timestamp_writers_) {
874 if (!data_writer.second.writer) continue;
875 data_writer.second.writer->WriteStatistics()->ResetStats();
Brian Silvermancb805822020-10-06 17:43:35 -0700876 }
877 max_write_time_ = std::chrono::nanoseconds::zero();
878 max_write_time_bytes_ = -1;
879 max_write_time_messages_ = -1;
880 total_write_time_ = std::chrono::nanoseconds::zero();
881 total_write_count_ = 0;
882 total_write_messages_ = 0;
883 total_write_bytes_ = 0;
Brian Silverman0465fcf2020-09-24 00:29:18 -0700884}
885
Austin Schuhb8bca732021-07-30 22:32:00 -0700886void MultiNodeLogNamer::OpenForwardedTimestampWriter(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700887 const Node * /*source_node*/, NewDataWriter *data_writer) {
Austin Schuhcb5601b2020-09-10 15:29:59 -0700888 const std::string filename = absl::StrCat(
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700889 "timestamps/remote_", data_writer->node()->name()->string_view(), ".part",
Mithun Bharadwaj0c629932023-08-02 16:10:40 -0700890 data_writer->parts_index(), ".bfbs", extension_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700891 CreateBufferWriter(filename, data_writer->max_message_size(),
892 &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700893}
894
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700895void MultiNodeLogNamer::OpenDataWriter(const Node *source_node,
896 NewDataWriter *data_writer) {
897 std::string filename;
898
899 if (source_node != nullptr) {
900 if (source_node == node_) {
901 filename = absl::StrCat(source_node->name()->string_view(), "_");
902 } else {
903 filename = absl::StrCat("data/", source_node->name()->string_view(), "_");
904 }
Brian Silverman7af8c902020-09-29 16:14:04 -0700905 }
Mithun Bharadwajc54aa022023-08-02 16:10:41 -0700906
907 absl::StrAppend(&filename, "data.part", data_writer->parts_index(), ".bfbs",
908 extension_);
909 CreateBufferWriter(filename, data_writer->max_message_size(),
910 &data_writer->writer);
911}
912
913void MultiNodeLogNamer::OpenTimestampWriter(NewDataWriter *data_writer) {
914 std::string filename =
915 absl::StrCat(node()->name()->string_view(), "_timestamps.part",
916 data_writer->parts_index(), ".bfbs", extension_);
917 CreateBufferWriter(filename, data_writer->max_message_size(),
918 &data_writer->writer);
Austin Schuhcb5601b2020-09-10 15:29:59 -0700919}
920
Brian Silverman0465fcf2020-09-24 00:29:18 -0700921void MultiNodeLogNamer::CreateBufferWriter(
Austin Schuh48d10d62022-10-16 22:19:23 -0700922 std::string_view path, size_t max_message_size,
923 std::unique_ptr<DetachedBufferWriter> *destination) {
Brian Silverman0465fcf2020-09-24 00:29:18 -0700924 if (ran_out_of_space_) {
925 // Refuse to open any new files, which might skip data. Any existing files
926 // are in the same folder, which means they're on the same filesystem, which
927 // means they're probably going to run out of space and get stuck too.
Alexei Strots01395492023-03-20 13:59:56 -0700928 if (!(*destination)) {
Austin Schuha426f1f2021-03-31 22:27:41 -0700929 // But avoid leaving a nullptr writer if we're out of space when
930 // attempting to open the first file.
931 *destination = std::make_unique<DetachedBufferWriter>(
932 DetachedBufferWriter::already_out_of_space_t());
933 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700934 return;
935 }
Alexei Strots01395492023-03-20 13:59:56 -0700936
937 // Let's check that we need to close and replace current driver.
938 if (*destination) {
939 // Let's close the current writer.
940 CloseWriter(destination);
941 // Are we out of space now?
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700942 if (ran_out_of_space_) {
943 *destination = std::make_unique<DetachedBufferWriter>(
944 DetachedBufferWriter::already_out_of_space_t());
945 return;
946 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700947 }
Brian Silvermancb805822020-10-06 17:43:35 -0700948
Alexei Strots01395492023-03-20 13:59:56 -0700949 const std::string filename(path);
950 *destination = std::make_unique<DetachedBufferWriter>(
951 log_backend_->RequestFile(filename), encoder_factory_(max_message_size));
952 if (!(*destination)->ran_out_of_space()) {
Brian Silvermana9f2ec92020-10-06 18:00:53 -0700953 all_filenames_.emplace_back(path);
954 }
Brian Silverman0465fcf2020-09-24 00:29:18 -0700955}
956
Brian Silvermancb805822020-10-06 17:43:35 -0700957void MultiNodeLogNamer::CloseWriter(
958 std::unique_ptr<DetachedBufferWriter> *writer_pointer) {
Alexei Strots01395492023-03-20 13:59:56 -0700959 CHECK_NOTNULL(writer_pointer);
960 if (!(*writer_pointer)) {
Brian Silvermancb805822020-10-06 17:43:35 -0700961 return;
962 }
Alexei Strots01395492023-03-20 13:59:56 -0700963 DetachedBufferWriter *const writer = writer_pointer->get();
Brian Silvermancb805822020-10-06 17:43:35 -0700964 writer->Close();
965
Alexei Strots01395492023-03-20 13:59:56 -0700966 const auto *stats = writer->WriteStatistics();
967 if (stats->max_write_time() > max_write_time_) {
968 max_write_time_ = stats->max_write_time();
969 max_write_time_bytes_ = stats->max_write_time_bytes();
970 max_write_time_messages_ = stats->max_write_time_messages();
Brian Silvermancb805822020-10-06 17:43:35 -0700971 }
Alexei Strots01395492023-03-20 13:59:56 -0700972 total_write_time_ += stats->total_write_time();
973 total_write_count_ += stats->total_write_count();
974 total_write_messages_ += stats->total_write_messages();
975 total_write_bytes_ += stats->total_write_bytes();
Brian Silvermancb805822020-10-06 17:43:35 -0700976
977 if (writer->ran_out_of_space()) {
978 ran_out_of_space_ = true;
979 writer->acknowledge_out_of_space();
980 }
Brian Silvermancb805822020-10-06 17:43:35 -0700981}
982
Austin Schuhcb5601b2020-09-10 15:29:59 -0700983} // namespace logger
984} // namespace aos