blob: 4bc444870f3f1fbc8bed17afc263fcb2a932c195 [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
Austin Schuh6bb8a822021-03-31 23:04:39 -07003#include <dirent.h>
4
Austin Schuhb06f03b2021-02-17 22:00:37 -08005#include <functional>
6#include <map>
7#include <vector>
8
9#include "aos/configuration.h"
10#include "aos/events/event_loop.h"
11#include "aos/network/message_bridge_server_generated.h"
12#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080013#include "aos/network/timestamp_channel.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080014
15namespace aos {
16namespace logger {
17namespace {
18using message_bridge::RemoteMessage;
Austin Schuhbd06ae42021-03-31 22:48:21 -070019namespace chrono = std::chrono;
Austin Schuhb06f03b2021-02-17 22:00:37 -080020} // namespace
21
22Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
23 std::function<bool(const Channel *)> should_log)
24 : event_loop_(event_loop),
25 configuration_(configuration),
26 name_(network::GetHostname()),
27 timer_handler_(event_loop_->AddTimer(
Austin Schuh30586902021-03-30 22:54:08 -070028 [this]() { DoLogData(event_loop_->monotonic_now(), true); })),
Austin Schuhb06f03b2021-02-17 22:00:37 -080029 server_statistics_fetcher_(
30 configuration::MultiNode(event_loop_->configuration())
31 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
32 "/aos")
33 : aos::Fetcher<message_bridge::ServerStatistics>()) {
34 VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
35
Austin Schuhb06f03b2021-02-17 22:00:37 -080036 std::map<const Channel *, const Node *> timestamp_logger_channels;
37
Austin Schuh61e973f2021-02-21 21:43:56 -080038 message_bridge::ChannelTimestampFinder finder(event_loop_);
39 for (const Channel *channel : *event_loop_->configuration()->channels()) {
40 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080041 continue;
42 }
Austin Schuh61e973f2021-02-21 21:43:56 -080043 if (!channel->has_destination_nodes()) {
44 continue;
45 }
46 for (const Connection *connection : *channel->destination_nodes()) {
47 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
48 connection, event_loop_->node())) {
49 const Node *other_node = configuration::GetNode(
50 event_loop_->configuration(), connection->name()->string_view());
51
52 VLOG(1) << "Timestamps are logged from "
53 << FlatbufferToJson(other_node);
54 timestamp_logger_channels.insert(
55 std::make_pair(finder.ForChannel(channel, connection), other_node));
56 }
57 }
Austin Schuhb06f03b2021-02-17 22:00:37 -080058 }
59
60 const size_t our_node_index =
61 configuration::GetNodeIndex(configuration_, event_loop_->node());
62
63 for (size_t channel_index = 0;
64 channel_index < configuration_->channels()->size(); ++channel_index) {
65 const Channel *const config_channel =
66 configuration_->channels()->Get(channel_index);
67 // The MakeRawFetcher method needs a channel which is in the event loop
68 // configuration() object, not the configuration_ object. Go look that up
69 // from the config.
70 const Channel *channel = aos::configuration::GetChannel(
71 event_loop_->configuration(), config_channel->name()->string_view(),
72 config_channel->type()->string_view(), "", event_loop_->node());
73 CHECK(channel != nullptr)
74 << ": Failed to look up channel "
75 << aos::configuration::CleanedChannelToString(config_channel);
76 if (!should_log(channel)) {
77 continue;
78 }
79
80 FetcherStruct fs;
81 fs.channel_index = channel_index;
82 fs.channel = channel;
83
84 const bool is_local =
85 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
86
87 const bool is_readable =
88 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
89 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
90 channel, event_loop_->node());
91 const bool log_message = is_logged && is_readable;
92
93 bool log_delivery_times = false;
94 if (event_loop_->node() != nullptr) {
95 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
96 channel, event_loop_->node(), event_loop_->node());
97 }
98
99 // Now, detect a RemoteMessage timestamp logger where we should just log the
100 // contents to a file directly.
101 const bool log_contents = timestamp_logger_channels.find(channel) !=
102 timestamp_logger_channels.end();
103
104 if (log_message || log_delivery_times || log_contents) {
105 fs.fetcher = event_loop->MakeRawFetcher(channel);
106 VLOG(1) << "Logging channel "
107 << configuration::CleanedChannelToString(channel);
108
109 if (log_delivery_times) {
110 VLOG(1) << " Delivery times";
111 fs.wants_timestamp_writer = true;
112 fs.timestamp_node_index = our_node_index;
113 }
Austin Schuhe46492f2021-07-31 19:49:41 -0700114 // Both the timestamp and data writers want data_node_index so it knows
115 // what the source node is.
116 if (log_message || log_delivery_times) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800117 if (!is_local) {
118 const Node *source_node = configuration::GetNode(
119 configuration_, channel->source_node()->string_view());
120 fs.data_node_index =
121 configuration::GetNodeIndex(configuration_, source_node);
Austin Schuhe46492f2021-07-31 19:49:41 -0700122 }
123 }
124 if (log_message) {
125 VLOG(1) << " Data";
126 fs.wants_writer = true;
127 if (!is_local) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800128 fs.log_type = LogType::kLogRemoteMessage;
129 } else {
130 fs.data_node_index = our_node_index;
131 }
132 }
133 if (log_contents) {
134 VLOG(1) << "Timestamp logger channel "
135 << configuration::CleanedChannelToString(channel);
136 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
137 fs.wants_contents_writer = true;
138 fs.contents_node_index =
139 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
140 }
141 fetchers_.emplace_back(std::move(fs));
142 }
143 }
144
145 // When we are logging remote timestamps, we need to be able to translate from
146 // the channel index that the event loop uses to the channel index in the
147 // config in the log file.
148 event_loop_to_logged_channel_index_.resize(
149 event_loop->configuration()->channels()->size(), -1);
150 for (size_t event_loop_channel_index = 0;
151 event_loop_channel_index <
152 event_loop->configuration()->channels()->size();
153 ++event_loop_channel_index) {
154 const Channel *event_loop_channel =
155 event_loop->configuration()->channels()->Get(event_loop_channel_index);
156
157 const Channel *logged_channel = aos::configuration::GetChannel(
158 configuration_, event_loop_channel->name()->string_view(),
159 event_loop_channel->type()->string_view(), "",
160 configuration::GetNode(configuration_, event_loop_->node()));
161
162 if (logged_channel != nullptr) {
163 event_loop_to_logged_channel_index_[event_loop_channel_index] =
164 configuration::ChannelIndex(configuration_, logged_channel);
165 }
166 }
167}
168
169Logger::~Logger() {
170 if (log_namer_) {
171 // If we are replaying a log file, or in simulation, we want to force the
172 // last bit of data to be logged. The easiest way to deal with this is to
173 // poll everything as we go to destroy the class, ie, shut down the logger,
174 // and write it to disk.
175 StopLogging(event_loop_->monotonic_now());
176 }
177}
178
Austin Schuh6bb8a822021-03-31 23:04:39 -0700179bool Logger::RenameLogBase(std::string new_base_name) {
180 if (new_base_name == log_namer_->base_name()) {
181 return true;
182 }
183 std::string current_directory = std::string(log_namer_->base_name());
184 std::string new_directory = new_base_name;
185
186 auto current_path_split = current_directory.rfind("/");
187 auto new_path_split = new_directory.rfind("/");
188
189 CHECK(new_base_name.substr(new_path_split) ==
190 current_directory.substr(current_path_split))
191 << "Rename of file base from " << current_directory << " to "
192 << new_directory << " is not supported.";
193
194 current_directory.resize(current_path_split);
195 new_directory.resize(new_path_split);
196 DIR *dir = opendir(current_directory.c_str());
197 if (dir) {
198 closedir(dir);
199 const int result = rename(current_directory.c_str(), new_directory.c_str());
200 if (result != 0) {
201 PLOG(ERROR) << "Unable to rename " << current_directory << " to "
202 << new_directory;
203 return false;
204 }
205 } else {
206 // Handle if directory was already renamed.
207 dir = opendir(new_directory.c_str());
208 if (!dir) {
209 LOG(ERROR) << "Old directory " << current_directory
210 << " missing and new directory " << new_directory
211 << " not present.";
212 return false;
213 }
214 closedir(dir);
215 }
216
217 log_namer_->set_base_name(new_base_name);
218 Rotate();
219 return true;
220}
221
Austin Schuhb06f03b2021-02-17 22:00:37 -0800222void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
Austin Schuh34f9e482021-03-31 22:54:18 -0700223 std::optional<UUID> log_start_uuid) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800224 CHECK(!log_namer_) << ": Already logging";
225 log_namer_ = std::move(log_namer);
226
227 std::string config_sha256;
228 if (separate_config_) {
229 flatbuffers::FlatBufferBuilder fbb;
230 flatbuffers::Offset<aos::Configuration> configuration_offset =
231 CopyFlatBuffer(configuration_, &fbb);
232 LogFileHeader::Builder log_file_header_builder(fbb);
233 log_file_header_builder.add_configuration(configuration_offset);
234 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
235 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
236 fbb.Release());
237 config_sha256 = Sha256(config_header.span());
238 LOG(INFO) << "Config sha256 of " << config_sha256;
239 log_namer_->WriteConfiguration(&config_header, config_sha256);
240 }
241
242 log_event_uuid_ = UUID::Random();
243 log_start_uuid_ = log_start_uuid;
244 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
245
246 // We want to do as much work as possible before the initial Fetch. Time
247 // between that and actually starting to log opens up the possibility of
248 // falling off the end of the queue during that time.
249
250 for (FetcherStruct &f : fetchers_) {
251 if (f.wants_writer) {
252 f.writer = log_namer_->MakeWriter(f.channel);
253 }
254 if (f.wants_timestamp_writer) {
255 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
256 }
257 if (f.wants_contents_writer) {
258 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
259 f.channel, CHECK_NOTNULL(f.timestamp_node));
260 }
261 }
262
Austin Schuh73340842021-07-30 22:32:06 -0700263 log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
Austin Schuhb06f03b2021-02-17 22:00:37 -0800264
Austin Schuha42ee962021-03-31 22:49:30 -0700265 const aos::monotonic_clock::time_point beginning_time =
266 event_loop_->monotonic_now();
267
Austin Schuhb06f03b2021-02-17 22:00:37 -0800268 // Grab data from each channel right before we declare the log file started
269 // so we can capture the latest message on each channel. This lets us have
270 // non periodic messages with configuration that now get logged.
271 for (FetcherStruct &f : fetchers_) {
272 const auto start = event_loop_->monotonic_now();
273 const bool got_new = f.fetcher->Fetch();
274 const auto end = event_loop_->monotonic_now();
275 RecordFetchResult(start, end, got_new, &f);
276
277 // If there is a message, we want to write it.
278 f.written = f.fetcher->context().data == nullptr;
279 }
280
281 // Clear out any old timestamps in case we are re-starting logging.
Austin Schuh572924a2021-07-30 22:32:12 -0700282 for (size_t i = 0; i < configuration::NodesCount(configuration_); ++i) {
Austin Schuh73340842021-07-30 22:32:06 -0700283 log_namer_->SetStartTimes(
284 i, monotonic_clock::min_time, realtime_clock::min_time,
285 monotonic_clock::min_time, realtime_clock::min_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800286 }
287
Austin Schuha42ee962021-03-31 22:49:30 -0700288 const aos::monotonic_clock::time_point fetch_time =
289 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800290 WriteHeader();
Austin Schuha42ee962021-03-31 22:49:30 -0700291 const aos::monotonic_clock::time_point header_time =
292 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800293
294 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
Austin Schuha42ee962021-03-31 22:49:30 -0700295 << " start_time " << last_synchronized_time_ << ", took "
296 << chrono::duration<double>(fetch_time - beginning_time).count()
297 << " to fetch, "
298 << chrono::duration<double>(header_time - fetch_time).count()
299 << " to write headers, boot uuid " << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800300
301 // Force logging up until the start of the log file now, so the messages at
302 // the start are always ordered before the rest of the messages.
303 // Note: this ship may have already sailed, but we don't have to make it
304 // worse.
305 // TODO(austin): Test...
Austin Schuh855f8932021-03-19 22:01:32 -0700306 //
307 // This is safe to call here since we have set last_synchronized_time_ as the
308 // same time as in the header, and all the data before it should be logged
309 // without ordering concerns.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800310 LogUntil(last_synchronized_time_);
311
312 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
313 polling_period_);
314}
315
316std::unique_ptr<LogNamer> Logger::StopLogging(
317 aos::monotonic_clock::time_point end_time) {
318 CHECK(log_namer_) << ": Not logging right now";
319
320 if (end_time != aos::monotonic_clock::min_time) {
Austin Schuh30586902021-03-30 22:54:08 -0700321 // Folks like to use the on_logged_period_ callback to trigger stop and
322 // start events. We can't have those then recurse and try to stop again.
323 // Rather than making everything reentrant, let's just instead block the
324 // callback here.
325 DoLogData(end_time, false);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800326 }
327 timer_handler_->Disable();
328
329 for (FetcherStruct &f : fetchers_) {
330 f.writer = nullptr;
331 f.timestamp_writer = nullptr;
332 f.contents_writer = nullptr;
333 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800334
335 log_event_uuid_ = UUID::Zero();
Austin Schuh34f9e482021-03-31 22:54:18 -0700336 log_start_uuid_ = std::nullopt;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800337
338 return std::move(log_namer_);
339}
340
341void Logger::WriteHeader() {
342 if (configuration::MultiNode(configuration_)) {
343 server_statistics_fetcher_.Fetch();
344 }
345
Austin Schuh73340842021-07-30 22:32:06 -0700346 const aos::monotonic_clock::time_point monotonic_start_time =
Austin Schuhb06f03b2021-02-17 22:00:37 -0800347 event_loop_->monotonic_now();
Austin Schuh73340842021-07-30 22:32:06 -0700348 const aos::realtime_clock::time_point realtime_start_time =
Austin Schuhb06f03b2021-02-17 22:00:37 -0800349 event_loop_->realtime_now();
350
351 // We need to pick a point in time to declare the log file "started". This
352 // starts here. It needs to be after everything is fetched so that the
353 // fetchers are all pointed at the most recent message before the start
354 // time.
355 last_synchronized_time_ = monotonic_start_time;
356
357 for (const Node *node : log_namer_->nodes()) {
358 const int node_index = configuration::GetNodeIndex(configuration_, node);
359 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
360 realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800361 }
362}
363
Austin Schuhb06f03b2021-02-17 22:00:37 -0800364void Logger::WriteMissingTimestamps() {
365 if (configuration::MultiNode(configuration_)) {
366 server_statistics_fetcher_.Fetch();
367 } else {
368 return;
369 }
370
371 if (server_statistics_fetcher_.get() == nullptr) {
372 return;
373 }
374
375 for (const Node *node : log_namer_->nodes()) {
376 const int node_index = configuration::GetNodeIndex(configuration_, node);
377 if (MaybeUpdateTimestamp(
378 node, node_index,
379 server_statistics_fetcher_.context().monotonic_event_time,
380 server_statistics_fetcher_.context().realtime_event_time)) {
Austin Schuh73340842021-07-30 22:32:06 -0700381 VLOG(1) << "Rotating because timestamps changed";
382 log_namer_->Rotate(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800383 }
384 }
385}
386
Austin Schuhb06f03b2021-02-17 22:00:37 -0800387bool Logger::MaybeUpdateTimestamp(
388 const Node *node, int node_index,
389 aos::monotonic_clock::time_point monotonic_start_time,
390 aos::realtime_clock::time_point realtime_start_time) {
391 // Bail early if the start times are already set.
Austin Schuh73340842021-07-30 22:32:06 -0700392 if (log_namer_->monotonic_start_time(node_index) !=
Austin Schuhb06f03b2021-02-17 22:00:37 -0800393 monotonic_clock::min_time) {
394 return false;
395 }
396 if (event_loop_->node() == node ||
397 !configuration::MultiNode(configuration_)) {
398 // There are no offsets to compute for ourself, so always succeed.
Austin Schuh73340842021-07-30 22:32:06 -0700399 log_namer_->SetStartTimes(node_index, monotonic_start_time,
400 realtime_start_time, monotonic_start_time,
401 realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800402 return true;
403 } else if (server_statistics_fetcher_.get() != nullptr) {
404 // We must be a remote node now. Look for the connection and see if it is
405 // connected.
406
407 for (const message_bridge::ServerConnection *connection :
408 *server_statistics_fetcher_->connections()) {
409 if (connection->node()->name()->string_view() !=
410 node->name()->string_view()) {
411 continue;
412 }
413
414 if (connection->state() != message_bridge::State::CONNECTED) {
415 VLOG(1) << node->name()->string_view()
416 << " is not connected, can't start it yet.";
417 break;
418 }
419
Austin Schuhb06f03b2021-02-17 22:00:37 -0800420 if (!connection->has_monotonic_offset()) {
421 VLOG(1) << "Missing monotonic offset for setting start time for node "
422 << aos::FlatbufferToJson(node);
423 break;
424 }
425
426 // Found it and it is connected. Compensate and go.
Austin Schuh73340842021-07-30 22:32:06 -0700427 log_namer_->SetStartTimes(
428 node_index,
429 monotonic_start_time +
430 std::chrono::nanoseconds(connection->monotonic_offset()),
431 realtime_start_time, monotonic_start_time, realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800432 return true;
433 }
434 }
435 return false;
436}
437
438aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
Austin Schuh73340842021-07-30 22:32:06 -0700439 std::string_view config_sha256) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800440 flatbuffers::FlatBufferBuilder fbb;
441 fbb.ForceDefaults(true);
442
443 flatbuffers::Offset<aos::Configuration> configuration_offset;
444 if (!separate_config_) {
445 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
446 } else {
447 CHECK(!config_sha256.empty());
448 }
449
450 const flatbuffers::Offset<flatbuffers::String> name_offset =
451 fbb.CreateString(name_);
452
453 CHECK(log_event_uuid_ != UUID::Zero());
454 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800455 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800456
457 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800458 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800459
460 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
Austin Schuh34f9e482021-03-31 22:54:18 -0700461 if (log_start_uuid_) {
462 log_start_uuid_offset = fbb.CreateString(log_start_uuid_->ToString());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800463 }
464
465 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
466 if (!config_sha256.empty()) {
467 config_sha256_offset = fbb.CreateString(config_sha256);
468 }
469
470 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800471 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800472
Austin Schuhb06f03b2021-02-17 22:00:37 -0800473 flatbuffers::Offset<Node> logger_node_offset;
474
475 if (configuration::MultiNode(configuration_)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800476 logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
477 }
478
479 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
480
481 log_file_header_builder.add_name(name_offset);
482
483 // Only add the node if we are running in a multinode configuration.
Austin Schuh73340842021-07-30 22:32:06 -0700484 if (configuration::MultiNode(configuration_)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800485 log_file_header_builder.add_logger_node(logger_node_offset);
486 }
487
488 if (!configuration_offset.IsNull()) {
489 log_file_header_builder.add_configuration(configuration_offset);
490 }
491 // The worst case theoretical out of order is the polling period times 2.
492 // One message could get logged right after the boundary, but be for right
493 // before the next boundary. And the reverse could happen for another
494 // message. Report back 3x to be extra safe, and because the cost isn't
495 // huge on the read side.
496 log_file_header_builder.add_max_out_of_order_duration(
497 std::chrono::nanoseconds(3 * polling_period_).count());
498
Austin Schuhb06f03b2021-02-17 22:00:37 -0800499 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
500 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
501 if (!log_start_uuid_offset.IsNull()) {
502 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
503 }
504 log_file_header_builder.add_logger_node_boot_uuid(
505 logger_node_boot_uuid_offset);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800506
507 if (!config_sha256_offset.IsNull()) {
508 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
509 }
510
511 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
512 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
513 fbb.Release());
514
515 CHECK(result.Verify()) << ": Built a corrupted header.";
516
517 return result;
518}
519
520void Logger::ResetStatisics() {
521 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
522 max_message_fetch_time_channel_ = -1;
523 max_message_fetch_time_size_ = -1;
524 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
525 total_message_fetch_count_ = 0;
526 total_message_fetch_bytes_ = 0;
527 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
528 total_nop_fetch_count_ = 0;
529 max_copy_time_ = std::chrono::nanoseconds::zero();
530 max_copy_time_channel_ = -1;
531 max_copy_time_size_ = -1;
532 total_copy_time_ = std::chrono::nanoseconds::zero();
533 total_copy_count_ = 0;
534 total_copy_bytes_ = 0;
535}
536
537void Logger::Rotate() {
538 for (const Node *node : log_namer_->nodes()) {
Austin Schuh73340842021-07-30 22:32:06 -0700539 log_namer_->Rotate(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800540 }
541}
542
543void Logger::LogUntil(monotonic_clock::time_point t) {
544 // Grab the latest ServerStatistics message. This will always have the
545 // oppertunity to be >= to the current time, so it will always represent any
546 // reboots which may have happened.
547 WriteMissingTimestamps();
548
Austin Schuhcdd90272021-03-15 12:46:16 -0700549 int our_node_index = aos::configuration::GetNodeIndex(
550 event_loop_->configuration(), event_loop_->node());
551
Austin Schuhb06f03b2021-02-17 22:00:37 -0800552 // Write each channel to disk, one at a time.
553 for (FetcherStruct &f : fetchers_) {
554 while (true) {
555 if (f.written) {
556 const auto start = event_loop_->monotonic_now();
557 const bool got_new = f.fetcher->FetchNext();
558 const auto end = event_loop_->monotonic_now();
559 RecordFetchResult(start, end, got_new, &f);
560 if (!got_new) {
561 VLOG(2) << "No new data on "
562 << configuration::CleanedChannelToString(
563 f.fetcher->channel());
564 break;
565 }
566 f.written = false;
567 }
568
569 // TODO(james): Write tests to exercise this logic.
570 if (f.fetcher->context().monotonic_event_time >= t) {
571 break;
572 }
573 if (f.writer != nullptr) {
Austin Schuh572924a2021-07-30 22:32:12 -0700574 const UUID source_node_boot_uuid =
575 our_node_index != f.data_node_index
576 ? f.fetcher->context().source_boot_uuid
577 : event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800578 // Write!
579 const auto start = event_loop_->monotonic_now();
580 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
581 max_header_size_);
582 fbb.ForceDefaults(true);
583
584 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
585 f.channel_index, f.log_type));
586 const auto end = event_loop_->monotonic_now();
587 RecordCreateMessageTime(start, end, &f);
588
589 VLOG(2) << "Writing data as node "
590 << FlatbufferToJson(event_loop_->node()) << " for channel "
591 << configuration::CleanedChannelToString(f.fetcher->channel())
592 << " to " << f.writer->filename() << " data "
593 << FlatbufferToJson(
594 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
595 fbb.GetBufferPointer()));
596
597 max_header_size_ = std::max(max_header_size_,
598 fbb.GetSize() - f.fetcher->context().size);
Austin Schuhe46492f2021-07-31 19:49:41 -0700599 f.writer->QueueMessage(&fbb, source_node_boot_uuid, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800600 }
601
602 if (f.timestamp_writer != nullptr) {
603 // And now handle timestamps.
604 const auto start = event_loop_->monotonic_now();
605 flatbuffers::FlatBufferBuilder fbb;
606 fbb.ForceDefaults(true);
607
608 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
609 f.channel_index,
610 LogType::kLogDeliveryTimeOnly));
611 const auto end = event_loop_->monotonic_now();
612 RecordCreateMessageTime(start, end, &f);
613
614 VLOG(2) << "Writing timestamps as node "
615 << FlatbufferToJson(event_loop_->node()) << " for channel "
616 << configuration::CleanedChannelToString(f.fetcher->channel())
617 << " to " << f.timestamp_writer->filename() << " timestamp "
618 << FlatbufferToJson(
619 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
620 fbb.GetBufferPointer()));
621
Austin Schuhe46492f2021-07-31 19:49:41 -0700622 // Tell our writer that we know something about the remote boot.
623 f.timestamp_writer->UpdateRemote(f.data_node_index,
624 f.fetcher->context().source_boot_uuid);
625 f.timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800626 }
627
628 if (f.contents_writer != nullptr) {
629 const auto start = event_loop_->monotonic_now();
630 // And now handle the special message contents channel. Copy the
631 // message into a FlatBufferBuilder and save it to disk.
632 // TODO(austin): We can be more efficient here when we start to
633 // care...
634 flatbuffers::FlatBufferBuilder fbb;
635 fbb.ForceDefaults(true);
636
637 const RemoteMessage *msg =
638 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
639
640 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800641
642 logger::MessageHeader::Builder message_header_builder(fbb);
643
644 // TODO(austin): This needs to check the channel_index and confirm
645 // that it should be logged before squirreling away the timestamp to
646 // disk. We don't want to log irrelevant timestamps.
647
648 // Note: this must match the same order as MessageBridgeServer and
649 // PackMessage. We want identical headers to have identical
650 // on-the-wire formats to make comparing them easier.
651
652 // Translate from the channel index that the event loop uses to the
653 // channel index in the log file.
654 message_header_builder.add_channel_index(
655 event_loop_to_logged_channel_index_[msg->channel_index()]);
656
657 message_header_builder.add_queue_index(msg->queue_index());
658 message_header_builder.add_monotonic_sent_time(
659 msg->monotonic_sent_time());
660 message_header_builder.add_realtime_sent_time(
661 msg->realtime_sent_time());
662
663 message_header_builder.add_monotonic_remote_time(
664 msg->monotonic_remote_time());
665 message_header_builder.add_realtime_remote_time(
666 msg->realtime_remote_time());
667 message_header_builder.add_remote_queue_index(
668 msg->remote_queue_index());
669
670 message_header_builder.add_monotonic_timestamp_time(
671 f.fetcher->context()
672 .monotonic_event_time.time_since_epoch()
673 .count());
674
675 fbb.FinishSizePrefixed(message_header_builder.Finish());
676 const auto end = event_loop_->monotonic_now();
677 RecordCreateMessageTime(start, end, &f);
678
Austin Schuhe46492f2021-07-31 19:49:41 -0700679 f.contents_writer->QueueMessage(
Austin Schuh572924a2021-07-30 22:32:12 -0700680 &fbb, UUID::FromVector(msg->boot_uuid()), end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800681 }
682
683 f.written = true;
684 }
685 }
686 last_synchronized_time_ = t;
687}
688
Austin Schuh30586902021-03-30 22:54:08 -0700689void Logger::DoLogData(const monotonic_clock::time_point end_time,
690 bool run_on_logged) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800691 // We want to guarantee that messages aren't out of order by more than
692 // max_out_of_order_duration. To do this, we need sync points. Every write
693 // cycle should be a sync point.
694
695 do {
696 // Move the sync point up by at most polling_period. This forces one sync
697 // per iteration, even if it is small.
698 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
699
Austin Schuh30586902021-03-30 22:54:08 -0700700 if (run_on_logged) {
701 on_logged_period_();
702 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800703
704 // If we missed cycles, we could be pretty far behind. Spin until we are
705 // caught up.
706 } while (last_synchronized_time_ + polling_period_ < end_time);
707}
708
709void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
710 aos::monotonic_clock::time_point end,
711 bool got_new, FetcherStruct *fetcher) {
712 const auto duration = end - start;
713 if (!got_new) {
714 ++total_nop_fetch_count_;
715 total_nop_fetch_time_ += duration;
716 return;
717 }
718 ++total_message_fetch_count_;
719 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
720 total_message_fetch_time_ += duration;
721 if (duration > max_message_fetch_time_) {
722 max_message_fetch_time_ = duration;
723 max_message_fetch_time_channel_ = fetcher->channel_index;
724 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
725 }
726}
727
728void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
729 aos::monotonic_clock::time_point end,
730 FetcherStruct *fetcher) {
731 const auto duration = end - start;
732 total_copy_time_ += duration;
733 ++total_copy_count_;
734 total_copy_bytes_ += fetcher->fetcher->context().size;
735 if (duration > max_copy_time_) {
736 max_copy_time_ = duration;
737 max_copy_time_channel_ = fetcher->channel_index;
738 max_copy_time_size_ = fetcher->fetcher->context().size;
739 }
740}
741
742} // namespace logger
743} // namespace aos