blob: 71c5750a4d00f885b5392cbf0f92ed3d05f98fe5 [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
Austin Schuh6bb8a822021-03-31 23:04:39 -07003#include <dirent.h>
4
Austin Schuhb06f03b2021-02-17 22:00:37 -08005#include <functional>
6#include <map>
7#include <vector>
8
9#include "aos/configuration.h"
10#include "aos/events/event_loop.h"
11#include "aos/network/message_bridge_server_generated.h"
12#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080013#include "aos/network/timestamp_channel.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080014
15namespace aos {
16namespace logger {
17namespace {
18using message_bridge::RemoteMessage;
Austin Schuhbd06ae42021-03-31 22:48:21 -070019namespace chrono = std::chrono;
Austin Schuhb06f03b2021-02-17 22:00:37 -080020} // namespace
21
22Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
23 std::function<bool(const Channel *)> should_log)
24 : event_loop_(event_loop),
25 configuration_(configuration),
26 name_(network::GetHostname()),
27 timer_handler_(event_loop_->AddTimer(
Austin Schuh30586902021-03-30 22:54:08 -070028 [this]() { DoLogData(event_loop_->monotonic_now(), true); })),
Austin Schuhb06f03b2021-02-17 22:00:37 -080029 server_statistics_fetcher_(
30 configuration::MultiNode(event_loop_->configuration())
31 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
32 "/aos")
33 : aos::Fetcher<message_bridge::ServerStatistics>()) {
34 VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
35
Austin Schuhb06f03b2021-02-17 22:00:37 -080036 std::map<const Channel *, const Node *> timestamp_logger_channels;
37
Austin Schuh61e973f2021-02-21 21:43:56 -080038 message_bridge::ChannelTimestampFinder finder(event_loop_);
39 for (const Channel *channel : *event_loop_->configuration()->channels()) {
40 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080041 continue;
42 }
Austin Schuh61e973f2021-02-21 21:43:56 -080043 if (!channel->has_destination_nodes()) {
44 continue;
45 }
46 for (const Connection *connection : *channel->destination_nodes()) {
47 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
48 connection, event_loop_->node())) {
49 const Node *other_node = configuration::GetNode(
50 event_loop_->configuration(), connection->name()->string_view());
51
52 VLOG(1) << "Timestamps are logged from "
53 << FlatbufferToJson(other_node);
54 timestamp_logger_channels.insert(
55 std::make_pair(finder.ForChannel(channel, connection), other_node));
56 }
57 }
Austin Schuhb06f03b2021-02-17 22:00:37 -080058 }
59
60 const size_t our_node_index =
61 configuration::GetNodeIndex(configuration_, event_loop_->node());
62
63 for (size_t channel_index = 0;
64 channel_index < configuration_->channels()->size(); ++channel_index) {
65 const Channel *const config_channel =
66 configuration_->channels()->Get(channel_index);
67 // The MakeRawFetcher method needs a channel which is in the event loop
68 // configuration() object, not the configuration_ object. Go look that up
69 // from the config.
70 const Channel *channel = aos::configuration::GetChannel(
71 event_loop_->configuration(), config_channel->name()->string_view(),
72 config_channel->type()->string_view(), "", event_loop_->node());
73 CHECK(channel != nullptr)
74 << ": Failed to look up channel "
75 << aos::configuration::CleanedChannelToString(config_channel);
76 if (!should_log(channel)) {
77 continue;
78 }
79
80 FetcherStruct fs;
81 fs.channel_index = channel_index;
82 fs.channel = channel;
83
84 const bool is_local =
85 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
86
87 const bool is_readable =
88 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
89 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
90 channel, event_loop_->node());
91 const bool log_message = is_logged && is_readable;
92
93 bool log_delivery_times = false;
94 if (event_loop_->node() != nullptr) {
95 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
96 channel, event_loop_->node(), event_loop_->node());
97 }
98
99 // Now, detect a RemoteMessage timestamp logger where we should just log the
100 // contents to a file directly.
101 const bool log_contents = timestamp_logger_channels.find(channel) !=
102 timestamp_logger_channels.end();
103
104 if (log_message || log_delivery_times || log_contents) {
105 fs.fetcher = event_loop->MakeRawFetcher(channel);
106 VLOG(1) << "Logging channel "
107 << configuration::CleanedChannelToString(channel);
108
109 if (log_delivery_times) {
110 VLOG(1) << " Delivery times";
111 fs.wants_timestamp_writer = true;
112 fs.timestamp_node_index = our_node_index;
113 }
114 if (log_message) {
115 VLOG(1) << " Data";
116 fs.wants_writer = true;
117 if (!is_local) {
118 const Node *source_node = configuration::GetNode(
119 configuration_, channel->source_node()->string_view());
120 fs.data_node_index =
121 configuration::GetNodeIndex(configuration_, source_node);
122 fs.log_type = LogType::kLogRemoteMessage;
123 } else {
124 fs.data_node_index = our_node_index;
125 }
126 }
127 if (log_contents) {
128 VLOG(1) << "Timestamp logger channel "
129 << configuration::CleanedChannelToString(channel);
130 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
131 fs.wants_contents_writer = true;
132 fs.contents_node_index =
133 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
134 }
135 fetchers_.emplace_back(std::move(fs));
136 }
137 }
138
139 // When we are logging remote timestamps, we need to be able to translate from
140 // the channel index that the event loop uses to the channel index in the
141 // config in the log file.
142 event_loop_to_logged_channel_index_.resize(
143 event_loop->configuration()->channels()->size(), -1);
144 for (size_t event_loop_channel_index = 0;
145 event_loop_channel_index <
146 event_loop->configuration()->channels()->size();
147 ++event_loop_channel_index) {
148 const Channel *event_loop_channel =
149 event_loop->configuration()->channels()->Get(event_loop_channel_index);
150
151 const Channel *logged_channel = aos::configuration::GetChannel(
152 configuration_, event_loop_channel->name()->string_view(),
153 event_loop_channel->type()->string_view(), "",
154 configuration::GetNode(configuration_, event_loop_->node()));
155
156 if (logged_channel != nullptr) {
157 event_loop_to_logged_channel_index_[event_loop_channel_index] =
158 configuration::ChannelIndex(configuration_, logged_channel);
159 }
160 }
161}
162
163Logger::~Logger() {
164 if (log_namer_) {
165 // If we are replaying a log file, or in simulation, we want to force the
166 // last bit of data to be logged. The easiest way to deal with this is to
167 // poll everything as we go to destroy the class, ie, shut down the logger,
168 // and write it to disk.
169 StopLogging(event_loop_->monotonic_now());
170 }
171}
172
Austin Schuh6bb8a822021-03-31 23:04:39 -0700173bool Logger::RenameLogBase(std::string new_base_name) {
174 if (new_base_name == log_namer_->base_name()) {
175 return true;
176 }
177 std::string current_directory = std::string(log_namer_->base_name());
178 std::string new_directory = new_base_name;
179
180 auto current_path_split = current_directory.rfind("/");
181 auto new_path_split = new_directory.rfind("/");
182
183 CHECK(new_base_name.substr(new_path_split) ==
184 current_directory.substr(current_path_split))
185 << "Rename of file base from " << current_directory << " to "
186 << new_directory << " is not supported.";
187
188 current_directory.resize(current_path_split);
189 new_directory.resize(new_path_split);
190 DIR *dir = opendir(current_directory.c_str());
191 if (dir) {
192 closedir(dir);
193 const int result = rename(current_directory.c_str(), new_directory.c_str());
194 if (result != 0) {
195 PLOG(ERROR) << "Unable to rename " << current_directory << " to "
196 << new_directory;
197 return false;
198 }
199 } else {
200 // Handle if directory was already renamed.
201 dir = opendir(new_directory.c_str());
202 if (!dir) {
203 LOG(ERROR) << "Old directory " << current_directory
204 << " missing and new directory " << new_directory
205 << " not present.";
206 return false;
207 }
208 closedir(dir);
209 }
210
211 log_namer_->set_base_name(new_base_name);
212 Rotate();
213 return true;
214}
215
Austin Schuhb06f03b2021-02-17 22:00:37 -0800216void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
Austin Schuh34f9e482021-03-31 22:54:18 -0700217 std::optional<UUID> log_start_uuid) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800218 CHECK(!log_namer_) << ": Already logging";
219 log_namer_ = std::move(log_namer);
220
221 std::string config_sha256;
222 if (separate_config_) {
223 flatbuffers::FlatBufferBuilder fbb;
224 flatbuffers::Offset<aos::Configuration> configuration_offset =
225 CopyFlatBuffer(configuration_, &fbb);
226 LogFileHeader::Builder log_file_header_builder(fbb);
227 log_file_header_builder.add_configuration(configuration_offset);
228 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
229 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
230 fbb.Release());
231 config_sha256 = Sha256(config_header.span());
232 LOG(INFO) << "Config sha256 of " << config_sha256;
233 log_namer_->WriteConfiguration(&config_header, config_sha256);
234 }
235
236 log_event_uuid_ = UUID::Random();
237 log_start_uuid_ = log_start_uuid;
238 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
239
240 // We want to do as much work as possible before the initial Fetch. Time
241 // between that and actually starting to log opens up the possibility of
242 // falling off the end of the queue during that time.
243
244 for (FetcherStruct &f : fetchers_) {
245 if (f.wants_writer) {
246 f.writer = log_namer_->MakeWriter(f.channel);
247 }
248 if (f.wants_timestamp_writer) {
249 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
250 }
251 if (f.wants_contents_writer) {
252 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
253 f.channel, CHECK_NOTNULL(f.timestamp_node));
254 }
255 }
256
257 CHECK(node_state_.empty());
258 node_state_.resize(configuration::MultiNode(configuration_)
259 ? configuration_->nodes()->size()
260 : 1u);
261
Austin Schuh73340842021-07-30 22:32:06 -0700262 log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
Austin Schuhb06f03b2021-02-17 22:00:37 -0800263
Austin Schuha42ee962021-03-31 22:49:30 -0700264 const aos::monotonic_clock::time_point beginning_time =
265 event_loop_->monotonic_now();
266
Austin Schuhb06f03b2021-02-17 22:00:37 -0800267 // Grab data from each channel right before we declare the log file started
268 // so we can capture the latest message on each channel. This lets us have
269 // non periodic messages with configuration that now get logged.
270 for (FetcherStruct &f : fetchers_) {
271 const auto start = event_loop_->monotonic_now();
272 const bool got_new = f.fetcher->Fetch();
273 const auto end = event_loop_->monotonic_now();
274 RecordFetchResult(start, end, got_new, &f);
275
276 // If there is a message, we want to write it.
277 f.written = f.fetcher->context().data == nullptr;
278 }
279
280 // Clear out any old timestamps in case we are re-starting logging.
281 for (size_t i = 0; i < node_state_.size(); ++i) {
Austin Schuh73340842021-07-30 22:32:06 -0700282 log_namer_->SetStartTimes(
283 i, monotonic_clock::min_time, realtime_clock::min_time,
284 monotonic_clock::min_time, realtime_clock::min_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800285 }
286
Austin Schuha42ee962021-03-31 22:49:30 -0700287 const aos::monotonic_clock::time_point fetch_time =
288 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800289 WriteHeader();
Austin Schuha42ee962021-03-31 22:49:30 -0700290 const aos::monotonic_clock::time_point header_time =
291 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800292
293 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
Austin Schuha42ee962021-03-31 22:49:30 -0700294 << " start_time " << last_synchronized_time_ << ", took "
295 << chrono::duration<double>(fetch_time - beginning_time).count()
296 << " to fetch, "
297 << chrono::duration<double>(header_time - fetch_time).count()
298 << " to write headers, boot uuid " << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800299
300 // Force logging up until the start of the log file now, so the messages at
301 // the start are always ordered before the rest of the messages.
302 // Note: this ship may have already sailed, but we don't have to make it
303 // worse.
304 // TODO(austin): Test...
Austin Schuh855f8932021-03-19 22:01:32 -0700305 //
306 // This is safe to call here since we have set last_synchronized_time_ as the
307 // same time as in the header, and all the data before it should be logged
308 // without ordering concerns.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800309 LogUntil(last_synchronized_time_);
310
311 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
312 polling_period_);
313}
314
315std::unique_ptr<LogNamer> Logger::StopLogging(
316 aos::monotonic_clock::time_point end_time) {
317 CHECK(log_namer_) << ": Not logging right now";
318
319 if (end_time != aos::monotonic_clock::min_time) {
Austin Schuh30586902021-03-30 22:54:08 -0700320 // Folks like to use the on_logged_period_ callback to trigger stop and
321 // start events. We can't have those then recurse and try to stop again.
322 // Rather than making everything reentrant, let's just instead block the
323 // callback here.
324 DoLogData(end_time, false);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800325 }
326 timer_handler_->Disable();
327
328 for (FetcherStruct &f : fetchers_) {
329 f.writer = nullptr;
330 f.timestamp_writer = nullptr;
331 f.contents_writer = nullptr;
332 }
333 node_state_.clear();
334
335 log_event_uuid_ = UUID::Zero();
Austin Schuh34f9e482021-03-31 22:54:18 -0700336 log_start_uuid_ = std::nullopt;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800337
338 return std::move(log_namer_);
339}
340
341void Logger::WriteHeader() {
342 if (configuration::MultiNode(configuration_)) {
343 server_statistics_fetcher_.Fetch();
344 }
345
Austin Schuh73340842021-07-30 22:32:06 -0700346 const aos::monotonic_clock::time_point monotonic_start_time =
Austin Schuhb06f03b2021-02-17 22:00:37 -0800347 event_loop_->monotonic_now();
Austin Schuh73340842021-07-30 22:32:06 -0700348 const aos::realtime_clock::time_point realtime_start_time =
Austin Schuhb06f03b2021-02-17 22:00:37 -0800349 event_loop_->realtime_now();
350
351 // We need to pick a point in time to declare the log file "started". This
352 // starts here. It needs to be after everything is fetched so that the
353 // fetchers are all pointed at the most recent message before the start
354 // time.
355 last_synchronized_time_ = monotonic_start_time;
356
357 for (const Node *node : log_namer_->nodes()) {
358 const int node_index = configuration::GetNodeIndex(configuration_, node);
359 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
360 realtime_start_time);
361 MaybeWriteHeader(node_index, node);
362 }
363}
364
365void Logger::MaybeWriteHeader(int node_index) {
366 if (configuration::MultiNode(configuration_)) {
367 return MaybeWriteHeader(node_index,
368 configuration_->nodes()->Get(node_index));
369 } else {
370 return MaybeWriteHeader(node_index, nullptr);
371 }
372}
373
374void Logger::MaybeWriteHeader(int node_index, const Node *node) {
375 // This function is responsible for writing the header when the header both
376 // has valid data, and when it needs to be written.
377 if (node_state_[node_index].header_written &&
378 node_state_[node_index].header_valid) {
379 // The header has been written and is valid, nothing to do.
380 return;
381 }
382 if (!node_state_[node_index].has_source_node_boot_uuid) {
383 // Can't write a header if we don't have the boot UUID.
384 return;
385 }
386
387 // WriteHeader writes the first header in a log file. We want to do this only
388 // once.
389 //
390 // Rotate rewrites the same header with a new part ID, but keeps the same part
391 // UUID. We don't want that when things reboot, because that implies that
392 // parts go together across a reboot.
393 //
394 // Reboot resets the parts UUID. So, once we've written a header the first
395 // time, we want to use Reboot to rotate the log and reset the parts UUID.
396 //
397 // header_valid is cleared whenever the remote reboots.
398 if (node_state_[node_index].header_written) {
Austin Schuh73340842021-07-30 22:32:06 -0700399 VLOG(1) << "Rebooting";
400 log_namer_->Reboot(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800401 } else {
Austin Schuh73340842021-07-30 22:32:06 -0700402 log_namer_->WriteHeader(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800403
404 node_state_[node_index].header_written = true;
405 }
406 node_state_[node_index].header_valid = true;
407}
408
409void Logger::WriteMissingTimestamps() {
410 if (configuration::MultiNode(configuration_)) {
411 server_statistics_fetcher_.Fetch();
412 } else {
413 return;
414 }
415
416 if (server_statistics_fetcher_.get() == nullptr) {
417 return;
418 }
419
420 for (const Node *node : log_namer_->nodes()) {
421 const int node_index = configuration::GetNodeIndex(configuration_, node);
422 if (MaybeUpdateTimestamp(
423 node, node_index,
424 server_statistics_fetcher_.context().monotonic_event_time,
425 server_statistics_fetcher_.context().realtime_event_time)) {
426 CHECK(node_state_[node_index].header_written);
427 CHECK(node_state_[node_index].header_valid);
Austin Schuh73340842021-07-30 22:32:06 -0700428 VLOG(1) << "Rotating because timestamps changed";
429 log_namer_->Rotate(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800430 } else {
431 MaybeWriteHeader(node_index, node);
432 }
433 }
434}
435
Austin Schuhb06f03b2021-02-17 22:00:37 -0800436bool Logger::MaybeUpdateTimestamp(
437 const Node *node, int node_index,
438 aos::monotonic_clock::time_point monotonic_start_time,
439 aos::realtime_clock::time_point realtime_start_time) {
440 // Bail early if the start times are already set.
Austin Schuh73340842021-07-30 22:32:06 -0700441 if (log_namer_->monotonic_start_time(node_index) !=
Austin Schuhb06f03b2021-02-17 22:00:37 -0800442 monotonic_clock::min_time) {
443 return false;
444 }
445 if (event_loop_->node() == node ||
446 !configuration::MultiNode(configuration_)) {
447 // There are no offsets to compute for ourself, so always succeed.
Austin Schuh73340842021-07-30 22:32:06 -0700448 log_namer_->SetStartTimes(node_index, monotonic_start_time,
449 realtime_start_time, monotonic_start_time,
450 realtime_start_time);
451 log_namer_->SetBootUUID(node_index, event_loop_->boot_uuid());
452 node_state_[node_index].header_valid = false;
453 node_state_[node_index].has_source_node_boot_uuid = true;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800454 return true;
455 } else if (server_statistics_fetcher_.get() != nullptr) {
456 // We must be a remote node now. Look for the connection and see if it is
457 // connected.
458
459 for (const message_bridge::ServerConnection *connection :
460 *server_statistics_fetcher_->connections()) {
461 if (connection->node()->name()->string_view() !=
462 node->name()->string_view()) {
463 continue;
464 }
465
466 if (connection->state() != message_bridge::State::CONNECTED) {
467 VLOG(1) << node->name()->string_view()
468 << " is not connected, can't start it yet.";
469 break;
470 }
471
Austin Schuhb06f03b2021-02-17 22:00:37 -0800472 if (!connection->has_monotonic_offset()) {
473 VLOG(1) << "Missing monotonic offset for setting start time for node "
474 << aos::FlatbufferToJson(node);
475 break;
476 }
477
478 // Found it and it is connected. Compensate and go.
Austin Schuh73340842021-07-30 22:32:06 -0700479 log_namer_->SetStartTimes(
480 node_index,
481 monotonic_start_time +
482 std::chrono::nanoseconds(connection->monotonic_offset()),
483 realtime_start_time, monotonic_start_time, realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800484 return true;
485 }
486 }
487 return false;
488}
489
490aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
Austin Schuh73340842021-07-30 22:32:06 -0700491 std::string_view config_sha256) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800492 flatbuffers::FlatBufferBuilder fbb;
493 fbb.ForceDefaults(true);
494
495 flatbuffers::Offset<aos::Configuration> configuration_offset;
496 if (!separate_config_) {
497 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
498 } else {
499 CHECK(!config_sha256.empty());
500 }
501
502 const flatbuffers::Offset<flatbuffers::String> name_offset =
503 fbb.CreateString(name_);
504
505 CHECK(log_event_uuid_ != UUID::Zero());
506 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800507 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800508
509 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800510 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800511
512 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
Austin Schuh34f9e482021-03-31 22:54:18 -0700513 if (log_start_uuid_) {
514 log_start_uuid_offset = fbb.CreateString(log_start_uuid_->ToString());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800515 }
516
517 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
518 if (!config_sha256.empty()) {
519 config_sha256_offset = fbb.CreateString(config_sha256);
520 }
521
522 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800523 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800524
Austin Schuhb06f03b2021-02-17 22:00:37 -0800525 flatbuffers::Offset<Node> logger_node_offset;
526
527 if (configuration::MultiNode(configuration_)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800528 logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
529 }
530
531 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
532
533 log_file_header_builder.add_name(name_offset);
534
535 // Only add the node if we are running in a multinode configuration.
Austin Schuh73340842021-07-30 22:32:06 -0700536 if (configuration::MultiNode(configuration_)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800537 log_file_header_builder.add_logger_node(logger_node_offset);
538 }
539
540 if (!configuration_offset.IsNull()) {
541 log_file_header_builder.add_configuration(configuration_offset);
542 }
543 // The worst case theoretical out of order is the polling period times 2.
544 // One message could get logged right after the boundary, but be for right
545 // before the next boundary. And the reverse could happen for another
546 // message. Report back 3x to be extra safe, and because the cost isn't
547 // huge on the read side.
548 log_file_header_builder.add_max_out_of_order_duration(
549 std::chrono::nanoseconds(3 * polling_period_).count());
550
Austin Schuhb06f03b2021-02-17 22:00:37 -0800551 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
552 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
553 if (!log_start_uuid_offset.IsNull()) {
554 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
555 }
556 log_file_header_builder.add_logger_node_boot_uuid(
557 logger_node_boot_uuid_offset);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800558
559 if (!config_sha256_offset.IsNull()) {
560 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
561 }
562
563 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
564 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
565 fbb.Release());
566
567 CHECK(result.Verify()) << ": Built a corrupted header.";
568
569 return result;
570}
571
572void Logger::ResetStatisics() {
573 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
574 max_message_fetch_time_channel_ = -1;
575 max_message_fetch_time_size_ = -1;
576 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
577 total_message_fetch_count_ = 0;
578 total_message_fetch_bytes_ = 0;
579 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
580 total_nop_fetch_count_ = 0;
581 max_copy_time_ = std::chrono::nanoseconds::zero();
582 max_copy_time_channel_ = -1;
583 max_copy_time_size_ = -1;
584 total_copy_time_ = std::chrono::nanoseconds::zero();
585 total_copy_count_ = 0;
586 total_copy_bytes_ = 0;
587}
588
589void Logger::Rotate() {
590 for (const Node *node : log_namer_->nodes()) {
Austin Schuh73340842021-07-30 22:32:06 -0700591 log_namer_->Rotate(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800592 }
593}
594
595void Logger::LogUntil(monotonic_clock::time_point t) {
596 // Grab the latest ServerStatistics message. This will always have the
597 // oppertunity to be >= to the current time, so it will always represent any
598 // reboots which may have happened.
599 WriteMissingTimestamps();
600
Austin Schuhcdd90272021-03-15 12:46:16 -0700601 int our_node_index = aos::configuration::GetNodeIndex(
602 event_loop_->configuration(), event_loop_->node());
603
Austin Schuhb06f03b2021-02-17 22:00:37 -0800604 // Write each channel to disk, one at a time.
605 for (FetcherStruct &f : fetchers_) {
606 while (true) {
607 if (f.written) {
608 const auto start = event_loop_->monotonic_now();
609 const bool got_new = f.fetcher->FetchNext();
610 const auto end = event_loop_->monotonic_now();
611 RecordFetchResult(start, end, got_new, &f);
612 if (!got_new) {
613 VLOG(2) << "No new data on "
614 << configuration::CleanedChannelToString(
615 f.fetcher->channel());
616 break;
617 }
618 f.written = false;
619 }
620
621 // TODO(james): Write tests to exercise this logic.
622 if (f.fetcher->context().monotonic_event_time >= t) {
623 break;
624 }
625 if (f.writer != nullptr) {
Austin Schuhcdd90272021-03-15 12:46:16 -0700626 // Only check if the boot UUID has changed if this is data from another
627 // node. Our UUID can't change without restarting the application.
628 if (our_node_index != f.data_node_index) {
629 // And update our boot UUID if the UUID has changed.
Austin Schuh73340842021-07-30 22:32:06 -0700630 if (log_namer_->SetBootUUID(f.data_node_index,
631 f.fetcher->context().source_boot_uuid)) {
632 node_state_[f.data_node_index].header_valid = false;
633 node_state_[f.data_node_index].has_source_node_boot_uuid = true;
Austin Schuhcdd90272021-03-15 12:46:16 -0700634 MaybeWriteHeader(f.data_node_index);
635 }
636 }
637
Austin Schuhb06f03b2021-02-17 22:00:37 -0800638 // Write!
639 const auto start = event_loop_->monotonic_now();
640 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
641 max_header_size_);
642 fbb.ForceDefaults(true);
643
644 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
645 f.channel_index, f.log_type));
646 const auto end = event_loop_->monotonic_now();
647 RecordCreateMessageTime(start, end, &f);
648
649 VLOG(2) << "Writing data as node "
650 << FlatbufferToJson(event_loop_->node()) << " for channel "
651 << configuration::CleanedChannelToString(f.fetcher->channel())
652 << " to " << f.writer->filename() << " data "
653 << FlatbufferToJson(
654 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
655 fbb.GetBufferPointer()));
656
657 max_header_size_ = std::max(max_header_size_,
658 fbb.GetSize() - f.fetcher->context().size);
659 CHECK(node_state_[f.data_node_index].header_valid)
660 << ": Can't write data before the header on channel "
661 << configuration::CleanedChannelToString(f.fetcher->channel());
Austin Schuhbd06ae42021-03-31 22:48:21 -0700662 f.writer->QueueSizedFlatbuffer(&fbb, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800663 }
664
665 if (f.timestamp_writer != nullptr) {
666 // And now handle timestamps.
667 const auto start = event_loop_->monotonic_now();
668 flatbuffers::FlatBufferBuilder fbb;
669 fbb.ForceDefaults(true);
670
671 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
672 f.channel_index,
673 LogType::kLogDeliveryTimeOnly));
674 const auto end = event_loop_->monotonic_now();
675 RecordCreateMessageTime(start, end, &f);
676
677 VLOG(2) << "Writing timestamps as node "
678 << FlatbufferToJson(event_loop_->node()) << " for channel "
679 << configuration::CleanedChannelToString(f.fetcher->channel())
680 << " to " << f.timestamp_writer->filename() << " timestamp "
681 << FlatbufferToJson(
682 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
683 fbb.GetBufferPointer()));
684
685 CHECK(node_state_[f.timestamp_node_index].header_valid)
686 << ": Can't write data before the header on channel "
687 << configuration::CleanedChannelToString(f.fetcher->channel());
Austin Schuhbd06ae42021-03-31 22:48:21 -0700688 f.timestamp_writer->QueueSizedFlatbuffer(&fbb, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800689 }
690
691 if (f.contents_writer != nullptr) {
692 const auto start = event_loop_->monotonic_now();
693 // And now handle the special message contents channel. Copy the
694 // message into a FlatBufferBuilder and save it to disk.
695 // TODO(austin): We can be more efficient here when we start to
696 // care...
697 flatbuffers::FlatBufferBuilder fbb;
698 fbb.ForceDefaults(true);
699
700 const RemoteMessage *msg =
701 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
702
703 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Austin Schuh73340842021-07-30 22:32:06 -0700704 if (log_namer_->SetBootUUID(f.contents_node_index,
705 UUID::FromVector(msg->boot_uuid()))) {
706 node_state_[f.contents_node_index].header_valid = false;
707 node_state_[f.contents_node_index].has_source_node_boot_uuid = true;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800708 MaybeWriteHeader(f.contents_node_index);
709 }
710
711 logger::MessageHeader::Builder message_header_builder(fbb);
712
713 // TODO(austin): This needs to check the channel_index and confirm
714 // that it should be logged before squirreling away the timestamp to
715 // disk. We don't want to log irrelevant timestamps.
716
717 // Note: this must match the same order as MessageBridgeServer and
718 // PackMessage. We want identical headers to have identical
719 // on-the-wire formats to make comparing them easier.
720
721 // Translate from the channel index that the event loop uses to the
722 // channel index in the log file.
723 message_header_builder.add_channel_index(
724 event_loop_to_logged_channel_index_[msg->channel_index()]);
725
726 message_header_builder.add_queue_index(msg->queue_index());
727 message_header_builder.add_monotonic_sent_time(
728 msg->monotonic_sent_time());
729 message_header_builder.add_realtime_sent_time(
730 msg->realtime_sent_time());
731
732 message_header_builder.add_monotonic_remote_time(
733 msg->monotonic_remote_time());
734 message_header_builder.add_realtime_remote_time(
735 msg->realtime_remote_time());
736 message_header_builder.add_remote_queue_index(
737 msg->remote_queue_index());
738
739 message_header_builder.add_monotonic_timestamp_time(
740 f.fetcher->context()
741 .monotonic_event_time.time_since_epoch()
742 .count());
743
744 fbb.FinishSizePrefixed(message_header_builder.Finish());
745 const auto end = event_loop_->monotonic_now();
746 RecordCreateMessageTime(start, end, &f);
747
748 CHECK(node_state_[f.contents_node_index].header_valid)
749 << ": Can't write data before the header on channel "
750 << configuration::CleanedChannelToString(f.fetcher->channel());
Austin Schuhbd06ae42021-03-31 22:48:21 -0700751 f.contents_writer->QueueSizedFlatbuffer(&fbb, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800752 }
753
754 f.written = true;
755 }
756 }
757 last_synchronized_time_ = t;
758}
759
Austin Schuh30586902021-03-30 22:54:08 -0700760void Logger::DoLogData(const monotonic_clock::time_point end_time,
761 bool run_on_logged) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800762 // We want to guarantee that messages aren't out of order by more than
763 // max_out_of_order_duration. To do this, we need sync points. Every write
764 // cycle should be a sync point.
765
766 do {
767 // Move the sync point up by at most polling_period. This forces one sync
768 // per iteration, even if it is small.
769 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
770
Austin Schuh30586902021-03-30 22:54:08 -0700771 if (run_on_logged) {
772 on_logged_period_();
773 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800774
775 // If we missed cycles, we could be pretty far behind. Spin until we are
776 // caught up.
777 } while (last_synchronized_time_ + polling_period_ < end_time);
778}
779
780void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
781 aos::monotonic_clock::time_point end,
782 bool got_new, FetcherStruct *fetcher) {
783 const auto duration = end - start;
784 if (!got_new) {
785 ++total_nop_fetch_count_;
786 total_nop_fetch_time_ += duration;
787 return;
788 }
789 ++total_message_fetch_count_;
790 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
791 total_message_fetch_time_ += duration;
792 if (duration > max_message_fetch_time_) {
793 max_message_fetch_time_ = duration;
794 max_message_fetch_time_channel_ = fetcher->channel_index;
795 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
796 }
797}
798
799void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
800 aos::monotonic_clock::time_point end,
801 FetcherStruct *fetcher) {
802 const auto duration = end - start;
803 total_copy_time_ += duration;
804 ++total_copy_count_;
805 total_copy_bytes_ += fetcher->fetcher->context().size;
806 if (duration > max_copy_time_) {
807 max_copy_time_ = duration;
808 max_copy_time_channel_ = fetcher->channel_index;
809 max_copy_time_size_ = fetcher->fetcher->context().size;
810 }
811}
812
813} // namespace logger
814} // namespace aos